{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE BlockArguments #-}

-- | Datatypes and definitions used by Churro library.
-- 
-- Expand instances for additional documentation!

module Control.Churro.Types where

import Prelude hiding (id, (.))
import Control.Arrow
import Control.Category
import Control.Concurrent.Async (cancel, wait, Async, async)
import Data.Void
import Control.Exception (finally)

-- $setup
-- 
-- We import the library for testing, although this would be a circular import in the module itself.
-- 
-- >>> import Control.Churro

-- ** Data, Classes and Instances

-- | The core datatype for the library.
-- 
-- Parameters `t`, `i` and `o` represent the transport, input, and output types respectively.
-- 
-- The items on transports are wrapped in `Maybe` to allow signalling of completion of a source.
-- 
-- When building a program by composing Churros, the output Transport of one Churro is fed into the input Transports of other Churros.
-- 
-- Convenience types of `Source`, `Sink`, and `DoubleDipped` are also defined, although use is not required.
-- 
data Churro t i o   = Churro { Churro t i o -> IO (t (Maybe i), t (Maybe o), Async ())
runChurro :: IO (t (Maybe i), t (Maybe o), Async ()) }
type Source t   o   = Churro t Void o
type Sink   t i     = Churro t i Void
type DoubleDipped t = Churro t Void Void

-- | The transport method is abstracted via the Transport class
-- 
-- This allows use of pure or impure channels, such as:
-- 
-- * Chan (Included in `Control.Churro.Transport.Chan`)
-- * TChan
-- * Seq
-- * Various buffered options
-- 
-- Transports used in conjunction with Churros wrap items in Maybe so that once a source has been depleted it can signal completion with
-- a Nothing item.
-- 
class Transport t where
    flex :: IO (t a)           -- ^ Create a new Transport
    yank :: t a -> IO a        -- ^ Yank an item of the Transport
    yeet :: t a -> a -> IO ()  -- ^ Yeet an item onto the Transport

-- | Covariant functor instance for Churro - Maps over the output.
-- 
-- >>> let s = sourceList [1,2]
-- >>> runWaitChan $ s >>> sinkPrint
-- 1
-- 2
-- 
-- >>> runWaitChan $ fmap succ s >>> sinkPrint
-- 2
-- 3
instance Transport t => Functor (Churro t i) where
    fmap :: (a -> b) -> Churro t i a -> Churro t i b
fmap a -> b
f Churro t i a
c = IO (t (Maybe i), t (Maybe b), Async ()) -> Churro t i b
forall (t :: * -> *) i o.
IO (t (Maybe i), t (Maybe o), Async ()) -> Churro t i o
Churro do
        (t (Maybe i)
i,t (Maybe a)
o,Async ()
a) <- Churro t i a -> IO (t (Maybe i), t (Maybe a), Async ())
forall (t :: * -> *) i o.
Churro t i o -> IO (t (Maybe i), t (Maybe o), Async ())
runChurro Churro t i a
c
        t (Maybe b)
o'  <- IO (t (Maybe b))
forall (t :: * -> *) a. Transport t => IO (t a)
flex
        Async ()
a'  <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async do
            IO () -> IO () -> IO ()
forall b a. IO b -> IO a -> IO a
finally' (Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
a) do
                (a -> b) -> t (Maybe a) -> t (Maybe b) -> IO ()
forall (t :: * -> *) a1 a2.
Transport t =>
(a1 -> a2) -> t (Maybe a1) -> t (Maybe a2) -> IO ()
c2c a -> b
f t (Maybe a)
o t (Maybe b)
o'
                Async () -> IO ()
forall a. Async a -> IO a
wait Async ()
a
        (t (Maybe i), t (Maybe b), Async ())
-> IO (t (Maybe i), t (Maybe b), Async ())
forall (m :: * -> *) a. Monad m => a -> m a
return (t (Maybe i)
i,t (Maybe b)
o',Async ()
a')

-- | The Category instance allows for the creation of Churro pipelines.
-- 
-- All other examples of the form `a >>> b` use this instance.
-- 
-- The `id` method creates a passthrough arrow.
-- There isn't usually a reason to use `id` directly as it has no effect:
-- 
-- >>> runWaitChan $ pure 1 >>> id >>> id >>> id >>> sinkPrint
-- 1
instance Transport t => Category (Churro t) where
    id :: Churro t a a
id = IO (t (Maybe a), t (Maybe a), Async ()) -> Churro t a a
forall (t :: * -> *) i o.
IO (t (Maybe i), t (Maybe o), Async ()) -> Churro t i o
Churro do
        Async ()
a <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
        t (Maybe a)
c <- IO (t (Maybe a))
forall (t :: * -> *) a. Transport t => IO (t a)
flex
        (t (Maybe a), t (Maybe a), Async ())
-> IO (t (Maybe a), t (Maybe a), Async ())
forall (m :: * -> *) a. Monad m => a -> m a
return (t (Maybe a)
c,t (Maybe a)
c,Async ()
a)

    Churro t b c
g . :: Churro t b c -> Churro t a b -> Churro t a c
. Churro t a b
f = IO (t (Maybe a), t (Maybe c), Async ()) -> Churro t a c
forall (t :: * -> *) i o.
IO (t (Maybe i), t (Maybe o), Async ()) -> Churro t i o
Churro do
        (t (Maybe a)
fi, t (Maybe b)
fo, Async ()
fa) <- Churro t a b -> IO (t (Maybe a), t (Maybe b), Async ())
forall (t :: * -> *) i o.
Churro t i o -> IO (t (Maybe i), t (Maybe o), Async ())
runChurro Churro t a b
f
        (t (Maybe b)
gi, t (Maybe c)
go, Async ()
ga) <- Churro t b c -> IO (t (Maybe b), t (Maybe c), Async ())
forall (t :: * -> *) i o.
Churro t i o -> IO (t (Maybe i), t (Maybe o), Async ())
runChurro Churro t b c
g
        Async ()
a <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async do (b -> b) -> t (Maybe b) -> t (Maybe b) -> IO ()
forall (t :: * -> *) a1 a2.
Transport t =>
(a1 -> a2) -> t (Maybe a1) -> t (Maybe a2) -> IO ()
c2c b -> b
forall k (cat :: k -> k -> *) (a :: k). Category cat => cat a a
id t (Maybe b)
fo t (Maybe b)
gi
        Async ()
b <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async do
            IO () -> IO () -> IO ()
forall b a. IO b -> IO a -> IO a
finally' (Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
a IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
fa IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
ga) do
                Async () -> IO ()
forall a. Async a -> IO a
wait Async ()
ga
                Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
fa
                Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
a
        (t (Maybe a), t (Maybe c), Async ())
-> IO (t (Maybe a), t (Maybe c), Async ())
forall (m :: * -> *) a. Monad m => a -> m a
return (t (Maybe a)
fi, t (Maybe c)
go, Async ()
b)

-- | The Applicative instance allows for pairwise composition of Churro pipelines.
--   Once again this is covariat and the composition occurs on the output transports of the Churros.
-- 
--  The `pure` method allows for the creation of a Churro yielding a single item.
-- 
instance Transport t => Applicative (Churro t Void) where
    pure :: a -> Churro t Void a
pure a
x = (t (Maybe Void) -> t (Maybe a) -> IO ()) -> Churro t Void a
forall (t :: * -> *) i o.
Transport t =>
(t (Maybe i) -> t (Maybe o) -> IO ()) -> Churro t i o
buildChurro \t (Maybe Void)
_i t (Maybe a)
o -> t (Maybe a) -> Maybe a -> IO ()
forall (t :: * -> *) a. Transport t => t a -> a -> IO ()
yeet t (Maybe a)
o (a -> Maybe a
forall a. a -> Maybe a
Just a
x) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> t (Maybe a) -> Maybe a -> IO ()
forall (t :: * -> *) a. Transport t => t a -> a -> IO ()
yeet t (Maybe a)
o Maybe a
forall a. Maybe a
Nothing

    Churro t Void (a -> b)
f <*> :: Churro t Void (a -> b) -> Churro t Void a -> Churro t Void b
<*> Churro t Void a
g = (t (Maybe Void) -> t (Maybe b) -> IO ()) -> Churro t Void b
forall (t :: * -> *) i o.
Transport t =>
(t (Maybe i) -> t (Maybe o) -> IO ()) -> Churro t i o
buildChurro \t (Maybe Void)
_i t (Maybe b)
o -> do
        (t (Maybe Void)
_fi, t (Maybe (a -> b))
fo, Async ()
fa) <- Churro t Void (a -> b)
-> IO (t (Maybe Void), t (Maybe (a -> b)), Async ())
forall (t :: * -> *) i o.
Churro t i o -> IO (t (Maybe i), t (Maybe o), Async ())
runChurro Churro t Void (a -> b)
f
        (t (Maybe Void)
_gi, t (Maybe a)
go, Async ()
ga) <- Churro t Void a -> IO (t (Maybe Void), t (Maybe a), Async ())
forall (t :: * -> *) i o.
Churro t i o -> IO (t (Maybe i), t (Maybe o), Async ())
runChurro Churro t Void a
g

        let
            prog :: IO ()
            prog :: IO ()
prog = do
                Maybe (a -> b)
fx <- t (Maybe (a -> b)) -> IO (Maybe (a -> b))
forall (t :: * -> *) a. Transport t => t a -> IO a
yank t (Maybe (a -> b))
fo
                Maybe a
gx <- t (Maybe a) -> IO (Maybe a)
forall (t :: * -> *) a. Transport t => t a -> IO a
yank t (Maybe a)
go
                case (Maybe (a -> b)
fx, Maybe a
gx) of
                    (Just a -> b
f', Just a
g') -> (t (Maybe b) -> Maybe b -> IO ()
forall (t :: * -> *) a. Transport t => t a -> a -> IO ()
yeet t (Maybe b)
o (Maybe b -> IO ()) -> Maybe b -> IO ()
forall a b. (a -> b) -> a -> b
$ b -> Maybe b
forall a. a -> Maybe a
Just (a -> b
f' a
g')) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
prog
                    (Maybe (a -> b), Maybe a)
_                  -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

        -- TODO: Should we cancel asyncs here in finally block?
        IO ()
prog
        t (Maybe b) -> Maybe b -> IO ()
forall (t :: * -> *) a. Transport t => t a -> a -> IO ()
yeet t (Maybe b)
o Maybe b
forall a. Maybe a
Nothing
        Async () -> IO ()
forall a. Async a -> IO a
wait Async ()
fa
        Async () -> IO ()
forall a. Async a -> IO a
wait Async ()
ga

-- | The Arrow instance allows for building non-cyclic directed graphs of churros.
-- 
--  The `arr` method allows for the creation of a that maps items with a pure function.
--  This is equivalent to `fmap f id`.
-- 
-- >>> :set -XArrows
-- >>> :{
-- let sect  = process $ \x@(_x,_y,z) -> print x >> return z
--     graph =
--       proc i -> do
--         j <- arr succ  -< i
--         k <- arr show  -< j
--         l <- arr succ  -< j
--         m <- arr (> 5) -< j
--         n <- sect      -< (k,l,m)
--         o <- arr not   -< n
--         p <- delay 0.1 -< o
--         sinkPrint      -< p
-- in
-- runWaitChan $ sourceList [1,5,30] >>> graph
-- :}
-- ("2",3,False)
-- ("6",7,True)
-- ("31",32,True)
-- True
-- False
-- False
-- 
-- The other Arrow methods are also usable:
-- 
-- >>> runWaitChan $ pure 1 >>> (arr show &&& arr succ) >>> sinkPrint
-- ("1",2)
instance Transport t => Arrow (Churro t) where
    arr :: (b -> c) -> Churro t b c
arr = ((b -> c) -> Churro t b b -> Churro t b c)
-> Churro t b b -> (b -> c) -> Churro t b c
forall a b c. (a -> b -> c) -> b -> a -> c
flip (b -> c) -> Churro t b b -> Churro t b c
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Churro t b b
forall k (cat :: k -> k -> *) (a :: k). Category cat => cat a a
id

    first :: Churro t b c -> Churro t (b, d) (c, d)
first Churro t b c
c = IO (t (Maybe (b, d)), t (Maybe (c, d)), Async ())
-> Churro t (b, d) (c, d)
forall (t :: * -> *) i o.
IO (t (Maybe i), t (Maybe o), Async ()) -> Churro t i o
Churro do
        (t (Maybe b)
i,t (Maybe c)
o,Async ()
a) <- Churro t b c -> IO (t (Maybe b), t (Maybe c), Async ())
forall (t :: * -> *) i o.
Churro t i o -> IO (t (Maybe i), t (Maybe o), Async ())
runChurro Churro t b c
c
        t (Maybe (b, d))
i'      <- IO (t (Maybe (b, d)))
forall (t :: * -> *) a. Transport t => IO (t a)
flex
        t (Maybe (c, d))
o'      <- IO (t (Maybe (c, d)))
forall (t :: * -> *) a. Transport t => IO (t a)
flex

        let go :: IO ()
go = do
                Maybe (b, d)
is <- t (Maybe (b, d)) -> IO (Maybe (b, d))
forall (t :: * -> *) a. Transport t => t a -> IO a
yank t (Maybe (b, d))
i'
                t (Maybe b) -> Maybe b -> IO ()
forall (t :: * -> *) a. Transport t => t a -> a -> IO ()
yeet t (Maybe b)
i (((b, d) -> b) -> Maybe (b, d) -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (b, d) -> b
forall a b. (a, b) -> a
fst Maybe (b, d)
is)

                Maybe c
os <- t (Maybe c) -> IO (Maybe c)
forall (t :: * -> *) a. Transport t => t a -> IO a
yank t (Maybe c)
o
                t (Maybe (c, d)) -> Maybe (c, d) -> IO ()
forall (t :: * -> *) a. Transport t => t a -> a -> IO ()
yeet t (Maybe (c, d))
o' (Maybe (c, d) -> IO ()) -> Maybe (c, d) -> IO ()
forall a b. (a -> b) -> a -> b
$ (,) (c -> d -> (c, d)) -> Maybe c -> Maybe (d -> (c, d))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe c
os Maybe (d -> (c, d)) -> Maybe d -> Maybe (c, d)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ((b, d) -> d) -> Maybe (b, d) -> Maybe d
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (b, d) -> d
forall a b. (a, b) -> b
snd Maybe (b, d)
is

                case (Maybe (b, d)
is, Maybe c
os) of
                    (Just _, Just c
_) -> IO ()
go
                    (Maybe (b, d), Maybe c)
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

        Async ()
a' <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async do
            IO ()
go
            t (Maybe (c, d)) -> Maybe (c, d) -> IO ()
forall (t :: * -> *) a. Transport t => t a -> a -> IO ()
yeet t (Maybe (c, d))
o' Maybe (c, d)
forall a. Maybe a
Nothing
            Async () -> IO ()
forall a. Async a -> IO a
wait Async ()
a

        (t (Maybe (b, d)), t (Maybe (c, d)), Async ())
-> IO (t (Maybe (b, d)), t (Maybe (c, d)), Async ())
forall (m :: * -> *) a. Monad m => a -> m a
return (t (Maybe (b, d))
i',t (Maybe (c, d))
o',Async ()
a')

-- ** Helpers

-- | A helper to facilitate constructing a Churro that makes new input and output transports available for manipulation.
-- 
-- The manipulations performed are carried out in the async action associated with the Churro
-- 
buildChurro :: Transport t => (t (Maybe i) -> t (Maybe o) -> IO ()) -> Churro t i o
buildChurro :: (t (Maybe i) -> t (Maybe o) -> IO ()) -> Churro t i o
buildChurro t (Maybe i) -> t (Maybe o) -> IO ()
cb = IO (t (Maybe i), t (Maybe o), Async ()) -> Churro t i o
forall (t :: * -> *) i o.
IO (t (Maybe i), t (Maybe o), Async ()) -> Churro t i o
Churro do
    t (Maybe i)
i <- IO (t (Maybe i))
forall (t :: * -> *) a. Transport t => IO (t a)
flex
    t (Maybe o)
o <- IO (t (Maybe o))
forall (t :: * -> *) a. Transport t => IO (t a)
flex
    Async ()
a <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async do t (Maybe i) -> t (Maybe o) -> IO ()
cb t (Maybe i)
i t (Maybe o)
o
    (t (Maybe i), t (Maybe o), Async ())
-> IO (t (Maybe i), t (Maybe o), Async ())
forall (m :: * -> *) a. Monad m => a -> m a
return (t (Maybe i)
i,t (Maybe o)
o,Async ()
a)

-- | Yeet all items from a list into a transport.
-- 
yeetList :: (Foldable t1, Transport t2) => t2 a -> t1 a -> IO ()
yeetList :: t2 a -> t1 a -> IO ()
yeetList t2 a
t = (a -> IO ()) -> t1 a -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (t2 a -> a -> IO ()
forall (t :: * -> *) a. Transport t => t a -> a -> IO ()
yeet t2 a
t)

-- | Yank all items from a Raw transport into a list.
-- 
--   Won't terminate until the transport has been consumed.
-- 
yankList :: Transport t => t (Maybe a) -> IO [a]
yankList :: t (Maybe a) -> IO [a]
yankList t (Maybe a)
t = do
    Maybe a
x <- t (Maybe a) -> IO (Maybe a)
forall (t :: * -> *) a. Transport t => t a -> IO a
yank t (Maybe a)
t
    case Maybe a
x of 
        Maybe a
Nothing -> [a] -> IO [a]
forall (m :: * -> *) a. Monad m => a -> m a
return []
        Just a
y  -> (a
y a -> [a] -> [a]
forall a. a -> [a] -> [a]
:) ([a] -> [a]) -> IO [a] -> IO [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> t (Maybe a) -> IO [a]
forall (t :: * -> *) a. Transport t => t (Maybe a) -> IO [a]
yankList t (Maybe a)
t

-- | Yank each item from a transport into a callback.
-- 
yankAll :: Transport t => t (Maybe i) -> (i -> IO a) -> IO ()
yankAll :: t (Maybe i) -> (i -> IO a) -> IO ()
yankAll t (Maybe i)
c i -> IO a
f = do
    Maybe i
x <- t (Maybe i) -> IO (Maybe i)
forall (t :: * -> *) a. Transport t => t a -> IO a
yank t (Maybe i)
c
    case Maybe i
x of
        Maybe i
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just i
y  -> i -> IO a
f i
y IO a -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> t (Maybe i) -> (i -> IO a) -> IO ()
forall (t :: * -> *) i a.
Transport t =>
t (Maybe i) -> (i -> IO a) -> IO ()
yankAll t (Maybe i)
c i -> IO a
f

-- | Yank each raw item from a transport into a callback.
-- 
-- The items are wrapped in Maybes and when all items are yanked, Nothing is fed to the callback.
-- 
yankAll' :: Transport t => t (Maybe a) -> (Maybe a -> IO b) -> IO b
yankAll' :: t (Maybe a) -> (Maybe a -> IO b) -> IO b
yankAll' t (Maybe a)
c Maybe a -> IO b
f = do
    t (Maybe a) -> (a -> IO b) -> IO ()
forall (t :: * -> *) i a.
Transport t =>
t (Maybe i) -> (i -> IO a) -> IO ()
yankAll t (Maybe a)
c (Maybe a -> IO b
f (Maybe a -> IO b) -> (a -> Maybe a) -> a -> IO b
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> Maybe a
forall a. a -> Maybe a
Just)
    Maybe a -> IO b
f Maybe a
forall a. Maybe a
Nothing

-- | Yank then Yeet each item from one Transport into another.
-- 
-- Raw items are used so `Nothing` should be Yeeted once the transport is depleted.
-- 
c2c :: Transport t => (a1 -> a2) -> t (Maybe a1) -> t (Maybe a2) -> IO ()
c2c :: (a1 -> a2) -> t (Maybe a1) -> t (Maybe a2) -> IO ()
c2c a1 -> a2
f t (Maybe a1)
i t (Maybe a2)
o = t (Maybe a1) -> (Maybe a1 -> IO ()) -> IO ()
forall (t :: * -> *) a b.
Transport t =>
t (Maybe a) -> (Maybe a -> IO b) -> IO b
yankAll' t (Maybe a1)
i (t (Maybe a2) -> Maybe a2 -> IO ()
forall (t :: * -> *) a. Transport t => t a -> a -> IO ()
yeet t (Maybe a2)
o (Maybe a2 -> IO ()) -> (Maybe a1 -> Maybe a2) -> Maybe a1 -> IO ()
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. (a1 -> a2) -> Maybe a1 -> Maybe a2
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a1 -> a2
f)

-- | Flipped `finally`.
finally' :: IO b -> IO a -> IO a
finally' :: IO b -> IO a -> IO a
finally' = (IO a -> IO b -> IO a) -> IO b -> IO a -> IO a
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO a -> IO b -> IO a
forall a b. IO a -> IO b -> IO a
finally