{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE NoMonomorphismRestriction #-}

-- | Datatypes and definitions used by Churro library.
-- 
-- Expand instances for additional documentation!

module Control.Churro.Types where

import Prelude hiding (id, (.))
import Control.Arrow
import Control.Category
import Control.Concurrent.Async (cancel, wait, Async, async)
import Data.Void
import Data.Kind (Type)
import Control.Exception (finally)

-- $setup
-- 
-- We import the library for testing, although this would be a circular import in the module itself.
-- 
-- >>> import Control.Churro

-- ** Data, Classes and Instances

-- | The core datatype for the library.
-- 
-- Parameters `t`, `i` and `o` represent the transport, input, and output types respectively.
-- 
-- The items on transports are wrapped in `Maybe` to allow signalling of completion of a source.
-- 
-- When building a program by composing Churros, the output Transport of one
-- Churro is fed into the input Transports of other Churros.
-- 
-- Type families are used to allow the in/out channels to have different types
-- and prevent accidentally reading/writing from the wrong transport.
-- 
-- Convenience types of `Source`, `Sink`, and `DoubleDipped` are also defined,
-- although use is not required.
-- 
newtype Churro a t i o   = Churro { forall a (t :: * -> *) i o.
Churro a t i o -> IO (In t (Maybe i), Out t (Maybe o), Async a)
runChurro :: IO (In t (Maybe i), Out t (Maybe o), Async a) }
type    Source a t   o   = Churro a t Void o
type    Sink   a t i     = Churro a t i Void
type    DoubleDipped a t = Churro a t Void Void

-- | The transport method is abstracted via the Transport class
-- 
-- This allows use of pure or impure channels, such as:
-- 
-- * Chan (Included in `Control.Churro.Transport.Chan`)
-- * TChan
-- * Seq
-- * Unagi
-- * Various buffered options
-- 
-- Transports used in conjunction with Churros wrap items in Maybe so that once
-- a source has been depleted it can signal completion with a Nothing item.
-- 
-- The flex method returns two transports, so that channels such as unagi that
-- create an in/outs pair can have a Transport instance.
-- 
-- Channels like Chan that have a single channel act as in/out simply reuse the
-- same channel in the pair returned.
-- 
class Transport (t :: Type -> Type) where
    data In  t :: Type -> Type
    data Out t :: Type -> Type
    flex :: IO (In t a, Out t a)  -- ^ Create a new pair of Transports.
    yank :: Out t a -> IO a       -- ^ Yank an item off the Transport
    yeet :: In t a -> a -> IO ()  -- ^ Yeet an item onto the Transport

-- | Covariant functor instance for Churro - Maps over the output.
-- 
-- >>> let s = sourceList [1,2]
-- >>> runWaitChan $ s >>> sinkPrint
-- 1
-- 2
-- 
-- >>> runWaitChan $ fmap succ s >>> sinkPrint
-- 2
-- 3
instance Transport t => Functor (Churro a t i) where
    fmap :: forall a b. (a -> b) -> Churro a t i a -> Churro a t i b
fmap a -> b
f Churro a t i a
c = forall a (t :: * -> *) i o.
IO (In t (Maybe i), Out t (Maybe o), Async a) -> Churro a t i o
Churro do
        (In t (Maybe i)
i,Out t (Maybe a)
o,Async a
a) <- forall a (t :: * -> *) i o.
Churro a t i o -> IO (In t (Maybe i), Out t (Maybe o), Async a)
runChurro Churro a t i a
c
        (In t (Maybe b)
i',Out t (Maybe b)
o') <- forall (t :: * -> *) a. Transport t => IO (In t a, Out t a)
flex
        Async a
a' <- forall a. IO a -> IO (Async a)
async do
            forall b a. IO b -> IO a -> IO a
finally' (forall a. Async a -> IO ()
cancel Async a
a) do
                forall (t :: * -> *) a b.
Transport t =>
(a -> b) -> Out t (Maybe a) -> In t (Maybe b) -> IO ()
c2c a -> b
f Out t (Maybe a)
o In t (Maybe b)
i'
                forall a. Async a -> IO a
wait Async a
a
        forall (m :: * -> *) a. Monad m => a -> m a
return (In t (Maybe i)
i,Out t (Maybe b)
o',Async a
a')

-- | The Category instance allows for the creation of Churro pipelines.
-- 
-- All other examples of the form `a >>> b` use this instance.
-- 
-- The `id` method creates a passthrough arrow.
-- There isn't usually a reason to use `id` directly as it has no effect:
-- 
-- >>> runWaitChan $ pure 1 >>> id >>> id >>> id >>> sinkPrint
-- 1
instance (Transport t, Monoid a) => Category (Churro a t) where
    id :: forall a. Churro a t a a
id = forall a (t :: * -> *) i o.
IO (In t (Maybe i), Out t (Maybe o), Async a) -> Churro a t i o
Churro do
        (In t (Maybe a)
i,Out t (Maybe a)
o) <- forall (t :: * -> *) a. Transport t => IO (In t a, Out t a)
flex
        Async a
a     <- forall a. IO a -> IO (Async a)
async forall a. Monoid a => a
mempty
        forall (m :: * -> *) a. Monad m => a -> m a
return (In t (Maybe a)
i,Out t (Maybe a)
o,Async a
a)

    Churro a t b c
g . :: forall b c a. Churro a t b c -> Churro a t a b -> Churro a t a c
. Churro a t a b
f = Churro a t a b
f forall (t :: * -> *) fo gi a1 fi a2 go.
(Transport t, fo ~ gi) =>
Churro a1 t fi fo -> Churro a2 t gi go -> Churro a2 t fi go
>>>> Churro a t b c
g

-- | Category style composition that allows for return type to change downstream.
-- 
(>>>>) :: (Transport t, fo ~ gi) => Churro a1 t fi fo -> Churro a2 t gi go -> Churro a2 t fi go
Churro a1 t fi fo
f >>>> :: forall (t :: * -> *) fo gi a1 fi a2 go.
(Transport t, fo ~ gi) =>
Churro a1 t fi fo -> Churro a2 t gi go -> Churro a2 t fi go
>>>> Churro a2 t gi go
g = forall a (t :: * -> *) i o.
IO (In t (Maybe i), Out t (Maybe o), Async a) -> Churro a t i o
Churro do
    (In t (Maybe fi)
fi, Out t (Maybe fo)
fo, Async a1
fa) <- forall a (t :: * -> *) i o.
Churro a t i o -> IO (In t (Maybe i), Out t (Maybe o), Async a)
runChurro Churro a1 t fi fo
f
    (In t (Maybe gi)
gi, Out t (Maybe go)
go, Async a2
ga) <- forall a (t :: * -> *) i o.
Churro a t i o -> IO (In t (Maybe i), Out t (Maybe o), Async a)
runChurro Churro a2 t gi go
g
    Async ()
a <- forall a. IO a -> IO (Async a)
async do forall (t :: * -> *) a b.
Transport t =>
(a -> b) -> Out t (Maybe a) -> In t (Maybe b) -> IO ()
c2c forall {k} (cat :: k -> k -> *) (a :: k). Category cat => cat a a
id Out t (Maybe fo)
fo In t (Maybe gi)
gi
    Async a2
b <- forall a. IO a -> IO (Async a)
async do
        forall b a. IO b -> IO a -> IO a
finally' (forall a. Async a -> IO ()
cancel Async ()
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. Async a -> IO ()
cancel Async a1
fa forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. Async a -> IO ()
cancel Async a2
ga) do
            a2
r <- forall a. Async a -> IO a
wait Async a2
ga
            forall a. Async a -> IO ()
cancel Async a1
fa
            forall a. Async a -> IO ()
cancel Async ()
a
            forall (m :: * -> *) a. Monad m => a -> m a
return a2
r
    forall (m :: * -> *) a. Monad m => a -> m a
return (In t (Maybe fi)
fi, Out t (Maybe go)
go, Async a2
b)

-- | The Applicative instance allows for pairwise composition of Churro pipelines.
--   Once again this is covariat and the composition occurs on the output transports of the Churros.
-- 
--  The `pure` method allows for the creation of a Churro yielding a single item.
-- 
-- TODO: Write test to check Monoid return type.
-- 
instance (Transport t, Monoid a) => Applicative (Churro a t Void) where
    pure :: forall a. a -> Churro a t Void a
pure = forall (t :: * -> *) a o i.
(Transport t, Monoid a) =>
o -> Churro a t i o
pure'

    Churro a t Void (a -> b)
f <*> :: forall a b.
Churro a t Void (a -> b) -> Churro a t Void a -> Churro a t Void b
<*> Churro a t Void a
g = forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro \Out t (Maybe Void)
_i In t (Maybe b)
o -> do
        (In t (Maybe Void)
_fi, Out t (Maybe (a -> b))
fo, Async a
fa) <- forall a (t :: * -> *) i o.
Churro a t i o -> IO (In t (Maybe i), Out t (Maybe o), Async a)
runChurro Churro a t Void (a -> b)
f
        (In t (Maybe Void)
_gi, Out t (Maybe a)
go, Async a
ga) <- forall a (t :: * -> *) i o.
Churro a t i o -> IO (In t (Maybe i), Out t (Maybe o), Async a)
runChurro Churro a t Void a
g

        let
            prog :: IO ()
            prog :: IO ()
prog = do
                Maybe (a -> b)
fx <- forall (t :: * -> *) a. Transport t => Out t a -> IO a
yank Out t (Maybe (a -> b))
fo
                Maybe a
gx <- forall (t :: * -> *) a. Transport t => Out t a -> IO a
yank Out t (Maybe a)
go
                case (Maybe (a -> b)
fx, Maybe a
gx) of
                    (Just a -> b
f', Just a
g') -> forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe b)
o (forall a. a -> Maybe a
Just (a -> b
f' a
g')) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
prog
                    (Maybe (a -> b), Maybe a)
_                  -> forall (m :: * -> *) a. Monad m => a -> m a
return ()

        -- TODO: Should we cancel asyncs here in finally block?
        IO ()
prog
        forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe b)
o forall a. Maybe a
Nothing
        a
_ <- forall a. Async a -> IO a
wait Async a
fa
        forall a. Async a -> IO a
wait Async a
ga

-- | More general variant of `pure` with Monoid constraint.
pure' :: (Transport t, Monoid a) => o -> Churro a t i o
pure' :: forall (t :: * -> *) a o i.
(Transport t, Monoid a) =>
o -> Churro a t i o
pure' o
x = forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro \Out t (Maybe i)
_i In t (Maybe o)
o -> forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe o)
o (forall a. a -> Maybe a
Just o
x) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe o)
o forall a. Maybe a
Nothing forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Monoid a => a
mempty

-- | The Arrow instance allows for building non-cyclic directed graphs of churros.
-- 
--  The `arr` method allows for the creation of a that maps items with a pure function.
--  This is equivalent to `fmap f id`. This is more general and exposed via arr`.
-- 
-- >>> :set -XArrows
-- >>> :{
-- let sect  = process $ \x@(_x,_y,z) -> print x >> return z
--     graph =
--       proc i -> do
--         j <- arr succ  -< i
--         k <- arr show  -< j
--         l <- arr succ  -< j
--         m <- arr (> 5) -< j
--         n <- sect      -< (k,l,m)
--         o <- arr not   -< n
--         p <- delay 0.1 -< o
--         sinkPrint      -< p
-- in
-- runWaitChan $ sourceList [1,5,30] >>> graph
-- :}
-- ("2",3,False)
-- ("6",7,True)
-- ("31",32,True)
-- True
-- False
-- False
-- 
-- The other Arrow methods are also usable:
-- 
-- >>> runWaitChan $ pure 1 >>> (arr show &&& arr succ) >>> sinkPrint
-- ("1",2)
-- 
-- TODO: Write tests to check if the monoid return type is implemented correctly.
-- 
instance (Transport t, Monoid a) => Arrow (Churro a t) where
    arr :: forall b c. (b -> c) -> Churro a t b c
arr = forall (cat :: * -> * -> *) a b.
(Functor (cat a), Category cat) =>
(a -> b) -> cat a b
arr'

    first :: forall b c d. Churro a t b c -> Churro a t (b, d) (c, d)
first Churro a t b c
c = forall a (t :: * -> *) i o.
IO (In t (Maybe i), Out t (Maybe o), Async a) -> Churro a t i o
Churro do
        (In t (Maybe b)
i,Out t (Maybe c)
o,Async a
a)   <- forall a (t :: * -> *) i o.
Churro a t i o -> IO (In t (Maybe i), Out t (Maybe o), Async a)
runChurro Churro a t b c
c
        (In t (Maybe (b, d))
ai',Out t (Maybe (b, d))
ao') <- forall (t :: * -> *) a. Transport t => IO (In t a, Out t a)
flex
        (In t (Maybe (c, d))
bi',Out t (Maybe (c, d))
bo') <- forall (t :: * -> *) a. Transport t => IO (In t a, Out t a)
flex

        let go :: IO ()
go = do
                Maybe (b, d)
is <- forall (t :: * -> *) a. Transport t => Out t a -> IO a
yank Out t (Maybe (b, d))
ao'
                forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe b)
i (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> a
fst Maybe (b, d)
is)

                Maybe c
os <- forall (t :: * -> *) a. Transport t => Out t a -> IO a
yank Out t (Maybe c)
o
                forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe (c, d))
bi' forall a b. (a -> b) -> a -> b
$ (,) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe c
os forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> b
snd Maybe (b, d)
is

                case (Maybe (b, d)
is, Maybe c
os) of
                    (Just (b, d)
_, Just c
_) -> IO ()
go
                    (Maybe (b, d), Maybe c)
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()

        Async a
a' <- forall a. IO a -> IO (Async a)
async do
            IO ()
go
            forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe (c, d))
bi' forall a. Maybe a
Nothing
            forall a. Async a -> IO a
wait Async a
a

        forall (m :: * -> *) a. Monad m => a -> m a
return (In t (Maybe (b, d))
ai',Out t (Maybe (c, d))
bo',Async a
a')

-- | More general version of `arr`.
-- 
-- Useful when building pipelines that need to work with return types.
arr' :: (Functor (cat a), Category cat) => (a -> b) -> cat a b
arr' :: forall (cat :: * -> * -> *) a b.
(Functor (cat a), Category cat) =>
(a -> b) -> cat a b
arr' a -> b
f = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f forall {k} (cat :: k -> k -> *) (a :: k). Category cat => cat a a
id

-- ** Helpers

-- | A helper to facilitate constructing a Churro that makes new input and output transports available for manipulation.
-- 
-- The manipulations performed are carried out in the async action associated with the Churro
-- 
buildChurro :: Transport t => (Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro :: forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro Out t (Maybe i) -> In t (Maybe o) -> IO a
cb = forall (t :: * -> *) i o a.
Transport t =>
(In t (Maybe i) -> Out t (Maybe i) -> In t (Maybe o) -> IO a)
-> Churro a t i o
buildChurro' \In t (Maybe i)
_o' Out t (Maybe i)
i In t (Maybe o)
o -> Out t (Maybe i) -> In t (Maybe o) -> IO a
cb Out t (Maybe i)
i In t (Maybe o)
o

-- | A version of `buildChurro` that also passes the original input to the callback so that you can reschedule items.
-- 
-- Used by "retry" style functions.
-- 
buildChurro' :: Transport t => (In t (Maybe i) -> Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro' :: forall (t :: * -> *) i o a.
Transport t =>
(In t (Maybe i) -> Out t (Maybe i) -> In t (Maybe o) -> IO a)
-> Churro a t i o
buildChurro' In t (Maybe i) -> Out t (Maybe i) -> In t (Maybe o) -> IO a
cb = forall a (t :: * -> *) i o.
IO (In t (Maybe i), Out t (Maybe o), Async a) -> Churro a t i o
Churro do
    (In t (Maybe i)
ai,Out t (Maybe i)
ao) <- forall (t :: * -> *) a. Transport t => IO (In t a, Out t a)
flex
    (In t (Maybe o)
bi,Out t (Maybe o)
bo) <- forall (t :: * -> *) a. Transport t => IO (In t a, Out t a)
flex
    Async a
a       <- forall a. IO a -> IO (Async a)
async do In t (Maybe i) -> Out t (Maybe i) -> In t (Maybe o) -> IO a
cb In t (Maybe i)
ai Out t (Maybe i)
ao In t (Maybe o)
bi
    forall (m :: * -> *) a. Monad m => a -> m a
return (In t (Maybe i)
ai,Out t (Maybe o)
bo,Async a
a)

-- | Helper. Finalises cancellation of async.
-- 
-- Use instead of runChurro unless you want to directly manage cancellation.
-- 
withChurro :: Churro a t i o -> (In t (Maybe i) -> Out t (Maybe o) -> Async a -> IO b) -> IO b
withChurro :: forall a (t :: * -> *) i o b.
Churro a t i o
-> (In t (Maybe i) -> Out t (Maybe o) -> Async a -> IO b) -> IO b
withChurro Churro a t i o
c In t (Maybe i) -> Out t (Maybe o) -> Async a -> IO b
f = do
    (In t (Maybe i)
i,Out t (Maybe o)
o,Async a
a) <- forall a (t :: * -> *) i o.
Churro a t i o -> IO (In t (Maybe i), Out t (Maybe o), Async a)
runChurro Churro a t i o
c
    forall b a. IO b -> IO a -> IO a
finally' (forall a. Async a -> IO ()
cancel Async a
a) do In t (Maybe i) -> Out t (Maybe o) -> Async a -> IO b
f In t (Maybe i)
i Out t (Maybe o)
o Async a
a

-- | Yeet all items from a list into a raw transport.
-- 
-- WARNING: If you are using this to build a churro by hand make sure you yeet Nothing once you're finished.
-- 
yeetList :: (Foldable f, Transport t) => In t a -> f a -> IO ()
yeetList :: forall (f :: * -> *) (t :: * -> *) a.
(Foldable f, Transport t) =>
In t a -> f a -> IO ()
yeetList In t a
t = forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t a
t)

-- | Yank all items from a Raw transport into a list.
-- 
--   Won't terminate until the transport has been consumed.
-- 
yankList :: Transport t => Out t (Maybe a) -> IO [a]
yankList :: forall (t :: * -> *) a. Transport t => Out t (Maybe a) -> IO [a]
yankList = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (t :: * -> *) a i.
(Transport t, Monoid a) =>
Out t (Maybe i) -> (i -> IO a) -> IO a
yankAll (forall (f :: * -> *) a. Applicative f => a -> f a
pure forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall (f :: * -> *) a. Applicative f => a -> f a
pure)

-- | Yank each item from a transport into a callback.
-- 
yankAll :: (Transport t, Monoid a) => Out t (Maybe i) -> (i -> IO a) -> IO a
yankAll :: forall (t :: * -> *) a i.
(Transport t, Monoid a) =>
Out t (Maybe i) -> (i -> IO a) -> IO a
yankAll Out t (Maybe i)
c i -> IO a
f = do
    Maybe i
x <- forall (t :: * -> *) a. Transport t => Out t a -> IO a
yank Out t (Maybe i)
c
    case Maybe i
x of
        Maybe i
Nothing -> forall a. Monoid a => a
mempty
        Just i
y  -> i -> IO a
f i
y forall a. Semigroup a => a -> a -> a
<> forall (t :: * -> *) a i.
(Transport t, Monoid a) =>
Out t (Maybe i) -> (i -> IO a) -> IO a
yankAll Out t (Maybe i)
c i -> IO a
f

-- | Yank each raw item from a transport into a callback.
-- 
-- The items are wrapped in Maybes and when Nothing is yanked, Nothing is fed to the callback and `yankAll'` completes.
-- 
yankAll' :: (Transport t, Monoid b) => Out t (Maybe a) -> (Maybe a -> IO b) -> IO b
yankAll' :: forall (t :: * -> *) b a.
(Transport t, Monoid b) =>
Out t (Maybe a) -> (Maybe a -> IO b) -> IO b
yankAll' Out t (Maybe a)
c Maybe a -> IO b
f = do
    b
x <- forall (t :: * -> *) a i.
(Transport t, Monoid a) =>
Out t (Maybe i) -> (i -> IO a) -> IO a
yankAll Out t (Maybe a)
c (Maybe a -> IO b
f forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. a -> Maybe a
Just)
    b
y <- Maybe a -> IO b
f forall a. Maybe a
Nothing
    forall (m :: * -> *) a. Monad m => a -> m a
return (b
x forall a. Semigroup a => a -> a -> a
<> b
y)

-- | Yank then Yeet each item from one Transport into another.
-- 
-- Raw items are used so `Nothing` should be Yeeted once the transport is depleted.
-- 
c2c :: Transport t => (a -> b) -> Out t (Maybe a) -> In t (Maybe b) -> IO ()
c2c :: forall (t :: * -> *) a b.
Transport t =>
(a -> b) -> Out t (Maybe a) -> In t (Maybe b) -> IO ()
c2c a -> b
f Out t (Maybe a)
o In t (Maybe b)
i = forall (t :: * -> *) b a.
(Transport t, Monoid b) =>
Out t (Maybe a) -> (Maybe a -> IO b) -> IO b
yankAll' Out t (Maybe a)
o (forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe b)
i forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f)

-- | Flipped `finally`.
finally' :: IO b -> IO a -> IO a
finally' :: forall b a. IO b -> IO a -> IO a
finally' = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall a b. IO a -> IO b -> IO a
finally