{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE BlockArguments #-}

-- | Common transport-agnostic functions for using Churro.
-- 
-- Variants with a trailing underscore - E.g. `runWait_` specialised the Async action to
-- be () if you don't care about accumulating results and only processing items as they
-- pass through the pipeline.
-- 
-- Variants with a trailing prime - E.g. `processRetry'`. also change the generality of the
-- types involved in some way.
-- 
module Control.Churro.Prelude where

import Control.Churro.Types

import Prelude hiding (id, (.))

import           Control.Arrow            (arr)
import           Control.Category         (id, (.), (>>>))
import           Control.Concurrent       (threadDelay)
import           Control.Concurrent.Async (cancel, Async, wait)
import           Control.Exception        (Exception, SomeException, try)
import           Control.Monad            (replicateM_, when)
import           Data.Foldable            (toList, for_)
import           Data.Maybe               (isJust)
import           Data.Time                (NominalDiffTime)
import           Data.Void                (Void)
import           GHC.Natural              (Natural)


-- $setup
-- 
-- The examples in this module require the following imports:
-- 
-- >>> :set -XBlockArguments
-- >>> import Control.Churro.Transport
-- >>> import Data.Time.Clock
-- 

-- * Runners

-- | Automatically wait for a churro to complete.
-- 
runWait :: Transport t => Churro a t Void Void -> IO a
runWait :: Churro a t Void Void -> IO a
runWait Churro a t Void Void
x = Async a -> IO a
forall a. Async a -> IO a
wait (Async a -> IO a) -> IO (Async a) -> IO a
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Churro a t Void Void -> IO (Async a)
forall (t :: * -> *) a.
Transport t =>
Churro a t Void Void -> IO (Async a)
run Churro a t Void Void
x

-- | Version of `runWait` specialised to `()`.
-- 
runWait_ :: Transport t => Churro () t Void Void -> IO ()
runWait_ :: Churro () t Void Void -> IO ()
runWait_ = Churro () t Void Void -> IO ()
forall (t :: * -> *) a. Transport t => Churro a t Void Void -> IO a
runWait

-- | Read the output of a Churro into a list.
-- 
-- Warning: This will block until the Churro terminates,
--          Accumulating items in memory.
--          Only use when you expect a finite amount of output.
--          Otherwise consider composing with a Sink and using `runWait`.
-- 
-- >>> runWaitListChan $ sourceList [0..4] >>> arr succ
-- [1,2,3,4,5]
-- 
runWaitList :: (Transport t, Monoid a) => Churro a t Void b -> IO [b]
runWaitList :: Churro a t Void b -> IO [b]
runWaitList Churro a t Void b
c = Churro [b] t Void Void -> IO [b]
forall (t :: * -> *) a. Transport t => Churro a t Void Void -> IO a
runWait (Churro [b] t Void Void -> IO [b])
-> Churro [b] t Void Void -> IO [b]
forall a b. (a -> b) -> a -> b
$ (Churro a t Void b
c Churro a t Void b -> Churro a t b [b] -> Churro a t Void [b]
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> (b -> [b]) -> Churro a t b [b]
forall (cat :: * -> * -> *) a b.
(Functor (cat a), Category cat) =>
(a -> b) -> cat a b
arr' (b -> [b] -> [b]
forall a. a -> [a] -> [a]
:[])) Churro a t Void [b]
-> Churro [b] t [b] Void -> Churro [b] t Void Void
forall (t :: * -> *) fo gi a1 fi a2 go.
(Transport t, fo ~ gi) =>
Churro a1 t fi fo -> Churro a2 t gi go -> Churro a2 t fi go
>>>> Churro [b] t [b] Void
forall (t :: * -> *) a.
(Transport t, Monoid a) =>
Churro a t a Void
sink 

-- | Version of `runWaitList` specialised to `()`.
-- 
runWaitList_ :: Transport t => Churro () t Void b -> IO [b]
runWaitList_ :: Churro () t Void b -> IO [b]
runWaitList_ = Churro () t Void b -> IO [b]
forall (t :: * -> *) a b.
(Transport t, Monoid a) =>
Churro a t Void b -> IO [b]
runWaitList

-- | Run a sourced and sinked (double-dipped) churro and return an async action representing the in-flight processes.
--
run :: Transport t => Churro a t Void Void -> IO (Async a)
run :: Churro a t Void Void -> IO (Async a)
run = Churro a t Void Void -> IO (Async a)
forall (t :: * -> *) a i o.
Transport t =>
Churro a t i o -> IO (Async a)
run'

-- | Version of `run` with async return type specialised to `()`.
--
run_ :: Transport t => Churro () t Void Void -> IO (Async ())
run_ :: Churro () t Void Void -> IO (Async ())
run_ = Churro () t Void Void -> IO (Async ())
forall (t :: * -> *) a i o.
Transport t =>
Churro a t i o -> IO (Async a)
run'

-- | Run any churro, there is no check that this was spawned with a source, or terminated with a sink.
--   This is unsafe, since the pipeline may not generate or consume in a predictable way.
--   Use `run` instead unless you are confident you know what you're doing.
-- 
run' :: Transport t => Churro a t i o -> IO (Async a)
run' :: Churro a t i o -> IO (Async a)
run' Churro a t i o
c = do
    (In t (Maybe i)
_i,Out t (Maybe o)
_o,Async a
a) <- Churro a t i o -> IO (In t (Maybe i), Out t (Maybe o), Async a)
forall a (t :: * -> *) i o.
Churro a t i o -> IO (In t (Maybe i), Out t (Maybe o), Async a)
runChurro Churro a t i o
c
    Async a -> IO (Async a)
forall (m :: * -> *) a. Monad m => a -> m a
return Async a
a

-- * Library

-- ** Sources

-- | A single items source.
--
-- >>> runWaitChan $ sourceSingleton 13 >>> sinkPrint
-- 13
--
-- Equivalent to `pure` from `Applicative`. Redefined here in case you're looking for a source!
-- 
-- >>> runWaitChan $ pure 23 >>> sinkPrint
-- 23
sourceSingleton :: Transport t => o -> Churro () t Void o
sourceSingleton :: o -> Churro () t Void o
sourceSingleton o
x = [o] -> Churro () t Void o
forall (t :: * -> *) (f :: * -> *) o.
(Transport t, Foldable f) =>
f o -> Churro () t Void o
sourceList [o
x]

-- | Create a source from a list of items, sending each down the churro independently.
--
-- >>> runWaitChan $ sourceList [4,2] >>> sinkPrint
-- 4
-- 2
sourceList :: (Transport t, Foldable f) => f o -> Churro () t Void o
sourceList :: f o -> Churro () t Void o
sourceList = ((o -> IO ()) -> IO ()) -> Churro () t Void o
forall (t :: * -> *) o a.
Transport t =>
((o -> IO ()) -> IO a) -> Churro a t Void o
sourceIO (((o -> IO ()) -> IO ()) -> Churro () t Void o)
-> (f o -> (o -> IO ()) -> IO ()) -> f o -> Churro () t Void o
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. f o -> (o -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_

-- | Create a source from an IO action that is passed a function to yield new items.
--
-- >>> runWaitChan $ sourceIO (\cb -> cb 4 >> cb 2) >>> sinkPrint
-- 4
-- 2
sourceIO :: Transport t => ((o -> IO ()) -> IO a) -> Churro a t Void o
sourceIO :: ((o -> IO ()) -> IO a) -> Churro a t Void o
sourceIO (o -> IO ()) -> IO a
cb =
    (Out t (Maybe Void) -> In t (Maybe o) -> IO a) -> Churro a t Void o
forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro \Out t (Maybe Void)
_i In t (Maybe o)
o -> do
        a
r <- (o -> IO ()) -> IO a
cb (In t (Maybe o) -> Maybe o -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe o)
o (Maybe o -> IO ()) -> (o -> Maybe o) -> o -> IO ()
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. o -> Maybe o
forall a. a -> Maybe a
Just)
        In t (Maybe o) -> Maybe o -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe o)
o Maybe o
forall a. Maybe a
Nothing
        a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
r

-- | Variant of `sourceIO` with Async action specialised to `()`.
-- 
sourceIO_ :: Transport t => ((o -> IO ()) -> IO ()) -> Churro () t Void o
sourceIO_ :: ((o -> IO ()) -> IO ()) -> Churro () t Void o
sourceIO_ = ((o -> IO ()) -> IO ()) -> Churro () t Void o
forall (t :: * -> *) o a.
Transport t =>
((o -> IO ()) -> IO a) -> Churro a t Void o
sourceIO

-- | Combine a list of sources into a single source.
-- 
-- Sends individual items downstream without attempting to combine them.
-- 
-- Warning: Passing an empty list is unspecified.
-- 
-- TODO: Use NonEmptyList instead of []
-- 
-- >>> runWaitChan $ sources_ [pure 1, pure 1] >>> sinkPrint
-- 1
-- 1
sources :: (Transport t, Foldable f, Traversable f) => f (Churro a t Void i) -> Churro () t Void i
sources :: f (Churro a t Void i) -> Churro () t Void i
sources f (Churro a t Void i)
ss = ((i -> IO ()) -> IO ()) -> Churro () t Void i
forall (t :: * -> *) o a.
Transport t =>
((o -> IO ()) -> IO a) -> Churro a t Void o
sourceIO \i -> IO ()
cb -> do
    f (Async ())
asyncs <- (Churro a t Void i -> IO (Async ()))
-> f (Churro a t Void i) -> IO (f (Async ()))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (\Churro a t Void i
s -> Churro () t Void Void -> IO (Async ())
forall (t :: * -> *) a.
Transport t =>
Churro a t Void Void -> IO (Async a)
run (Churro () t Void Void -> IO (Async ()))
-> Churro () t Void Void -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Churro a t Void i
s Churro a t Void i -> Churro () t i Void -> Churro () t Void Void
forall (t :: * -> *) fo gi a1 fi a2 go.
(Transport t, fo ~ gi) =>
Churro a1 t fi fo -> Churro a2 t gi go -> Churro a2 t fi go
>>>> (i -> IO ()) -> Churro () t i Void
forall (t :: * -> *) a o.
(Transport t, Monoid a) =>
(o -> IO a) -> Churro a t o Void
sinkIO i -> IO ()
cb) f (Churro a t Void i)
ss
    let as :: [Async ()]
as = f (Async ()) -> [Async ()]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList f (Async ())
asyncs
    IO () -> IO () -> IO ()
forall b a. IO b -> IO a -> IO a
finally' ((Async () -> IO ()) -> [Async ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async () -> IO ()
forall a. Async a -> IO ()
cancel [Async ()]
as) do
        (Async () -> IO ()) -> [Async ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async () -> IO ()
forall a. Async a -> IO a
wait [Async ()]
as

-- | Variant of `sources` with Async action of sources in argument specialised to `()`.
-- 
sources_ :: Transport t => [Source () t o] -> Source () t o
sources_ :: [Source () t o] -> Source () t o
sources_ = [Source () t o] -> Source () t o
forall (t :: * -> *) (f :: * -> *) a i.
(Transport t, Foldable f, Traversable f) =>
f (Churro a t Void i) -> Churro () t Void i
sources

-- ** Sinks

-- | Consume all items and combines them into a result via their monoid.
-- 
-- >>> :set -XFlexibleContexts
-- >>> r <- runWaitChan $ pure' [1 :: Int] >>> sink
-- >>> print r
-- [1]
sink :: (Transport t, Monoid a) => Churro a t a Void
sink :: Churro a t a Void
sink = (a -> IO a) -> Churro a t a Void
forall (t :: * -> *) a o.
(Transport t, Monoid a) =>
(o -> IO a) -> Churro a t o Void
sinkIO a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return

-- | Consume all items with no additional effects.
-- 
-- TODO: Decide if we should use some kind of `nf` evaluation here to force items.
-- 
-- >>> runWaitChan $ pure 1 >>> process print >>> sink_
-- 1
-- 
sink_ :: Transport t => Churro () t i Void
sink_ :: Churro () t i Void
sink_ = (i -> IO ()) -> Churro () t i Void
forall (t :: * -> *) a o.
(Transport t, Monoid a) =>
(o -> IO a) -> Churro a t o Void
sinkIO (IO () -> i -> IO ()
forall a b. a -> b -> a
const (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()))

-- | Consume a churro with an IO process.
-- 
-- >>> runWaitChan $ pure 1 >>> sinkIO (\x -> print "hello" >> print (succ x))
-- "hello"
-- 2
sinkIO :: (Transport t, Monoid a) => (o -> IO a) -> Churro a t o Void
sinkIO :: (o -> IO a) -> Churro a t o Void
sinkIO o -> IO a
cb = (Out t (Maybe o) -> In t (Maybe Void) -> IO a) -> Churro a t o Void
forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro \Out t (Maybe o)
i In t (Maybe Void)
_o -> Out t (Maybe o) -> (o -> IO a) -> IO a
forall (t :: * -> *) a i.
(Transport t, Monoid a) =>
Out t (Maybe i) -> (i -> IO a) -> IO a
yankAll Out t (Maybe o)
i o -> IO a
cb

-- | Variant of `sinkIO` with Async action specialised to `()`.
-- 
sinkIO_ :: Transport t => (o -> IO ()) -> Churro () t o Void
sinkIO_ :: (o -> IO ()) -> Churro () t o Void
sinkIO_ o -> IO ()
cb = (Out t (Maybe o) -> In t (Maybe Void) -> IO ())
-> Churro () t o Void
forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro \Out t (Maybe o)
i In t (Maybe Void)
_o -> Out t (Maybe o) -> (o -> IO ()) -> IO ()
forall (t :: * -> *) a i.
(Transport t, Monoid a) =>
Out t (Maybe i) -> (i -> IO a) -> IO a
yankAll Out t (Maybe o)
i o -> IO ()
cb

-- | Create a "sink" with more flexibility about when items are demanded using a higher-order "HO" callback.
-- 
-- This also allows a non-unit async action that can be recovered when run.
-- 
-- WARNING: You should use the provided callback if you want to acually create a sink.
-- 
-- TODO: Use hidden callback return type in order to ensure that the callback is called.
-- 
-- >>> import System.Timeout (timeout)
-- >>> :{
-- do
--   r <- timeout 100000 $ runWaitChan $ sourceSingleton 1 >>>> sinkHO \ya -> do
--     ya (print . show)
--     return 25
--   print r
-- :}
-- "1"
-- Just 25
sinkHO :: Transport t => (((i -> IO ()) -> IO ()) -> IO a) -> Churro a t i o
sinkHO :: (((i -> IO ()) -> IO ()) -> IO a) -> Churro a t i o
sinkHO ((i -> IO ()) -> IO ()) -> IO a
cb = (Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro \Out t (Maybe i)
i In t (Maybe o)
_o -> ((i -> IO ()) -> IO ()) -> IO a
cb (Out t (Maybe i) -> (i -> IO ()) -> IO ()
forall (t :: * -> *) a i.
(Transport t, Monoid a) =>
Out t (Maybe i) -> (i -> IO a) -> IO a
yankAll Out t (Maybe i)
i)

-- | Consume and print each item. Used in many examples, but not much use outside debugging!
-- 
-- >>> runWaitChan $ pure "hi" >>> sinkPrint
-- "hi"
sinkPrint :: (Transport t, Show a) => Churro () t a Void
sinkPrint :: Churro () t a Void
sinkPrint = (a -> IO ()) -> Churro () t a Void
forall (t :: * -> *) a o.
(Transport t, Monoid a) =>
(o -> IO a) -> Churro a t o Void
sinkIO a -> IO ()
forall a. Show a => a -> IO ()
print
    

-- ** Churros

-- | Process each item with an IO action.
--   Acts as a one-to-one process.
-- 
-- >>> runWaitChan $ pure "hi" >>> process (\x -> print x >> return (reverse x)) >>> sinkPrint
-- "hi"
-- "ih"
process :: Transport t => (a -> IO b) -> Churro () t a b
process :: (a -> IO b) -> Churro () t a b
process a -> IO b
f = (a -> IO [b]) -> Churro () t a b
forall (t :: * -> *) i o.
Transport t =>
(i -> IO [o]) -> Churro () t i o
processN ((b -> [b]) -> IO b -> IO [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap b -> [b]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IO b -> IO [b]) -> (a -> IO b) -> a -> IO [b]
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> IO b
f)

-- | Print each item then pass it on.
processPrint :: (Transport t, Show b) => Churro () t b b
processPrint :: Churro () t b b
processPrint = (b -> IO b) -> Churro () t b b
forall (t :: * -> *) a b.
Transport t =>
(a -> IO b) -> Churro () t a b
process \b
x -> do b -> IO ()
forall a. Show a => a -> IO ()
print b
x IO () -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return b
x

-- | Print each item with an additional debugging label.
processDebug :: (Transport t, Show b) => String -> Churro () t b b
processDebug :: String -> Churro () t b b
processDebug String
d = (b -> IO b) -> Churro () t b b
forall (t :: * -> *) a b.
Transport t =>
(a -> IO b) -> Churro () t a b
process \b
x -> String -> IO ()
putStrLn (String
"Debugging [" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
d String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"]: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> b -> String
forall a. Show a => a -> String
show b
x) IO () -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return b
x

-- | Process each item with an IO action and potentially yield many items as a result.
--   Acts as a one-to-many process.
-- 
-- >>> runWaitChan $ pure 1 >>> processN (\x -> print (show x) >> return [x, succ x]) >>> sinkPrint
-- "1"
-- 1
-- 2
processN :: Transport t => (i -> IO [o]) -> Churro () t i o
processN :: (i -> IO [o]) -> Churro () t i o
processN i -> IO [o]
f =
    (Out t (Maybe i) -> In t (Maybe o) -> IO ()) -> Churro () t i o
forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro \Out t (Maybe i)
i In t (Maybe o)
o -> do
        Out t (Maybe i) -> (i -> IO ()) -> IO ()
forall (t :: * -> *) a i.
(Transport t, Monoid a) =>
Out t (Maybe i) -> (i -> IO a) -> IO a
yankAll Out t (Maybe i)
i \i
x -> do (o -> IO ()) -> [o] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (In t (Maybe o) -> Maybe o -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe o)
o (Maybe o -> IO ()) -> (o -> Maybe o) -> o -> IO ()
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. o -> Maybe o
forall a. a -> Maybe a
Just) ([o] -> IO ()) -> IO [o] -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< i -> IO [o]
f i
x
        In t (Maybe o) -> Maybe o -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe o)
o Maybe o
forall a. Maybe a
Nothing

-- | Extract xs from (Just x)s. Similar to `catMaybes`.
-- 
-- >>> runWaitChan $ sourceList [Just 1, Nothing, Just 3] >>> justs >>> sinkPrint
-- 1
-- 3
justs :: Transport t => Churro () t (Maybe a) a
justs :: Churro () t (Maybe a) a
justs = (Maybe a -> [a]) -> Churro () t (Maybe a) a
forall (t :: * -> *) a b.
Transport t =>
(a -> [b]) -> Churro () t a b
mapN ([a] -> (a -> [a]) -> Maybe a -> [a]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] a -> [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure)

-- | Extract ls from (Left l)s.
-- 
-- >>> runWaitChan $ sourceList [Left 1, Right 2, Left 3] >>> lefts >>> sinkPrint
-- 1
-- 3
lefts :: Transport t => Churro () t (Either a b) a
lefts :: Churro () t (Either a b) a
lefts = (Either a b -> [a]) -> Churro () t (Either a b) a
forall (t :: * -> *) a b.
Transport t =>
(a -> [b]) -> Churro () t a b
mapN ((a -> [a]) -> (b -> [a]) -> Either a b -> [a]
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either a -> [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> b -> [a]
forall a b. a -> b -> a
const []))

-- | Extract rs from (Right r)s.
-- 
-- >>> runWaitChan $ sourceList [Left 1, Right 2, Left 3] >>> rights >>> sinkPrint
-- 2
rights :: Transport t => Churro () t (Either a b) b
rights :: Churro () t (Either a b) b
rights = (Either a b -> [b]) -> Churro () t (Either a b) b
forall (t :: * -> *) a b.
Transport t =>
(a -> [b]) -> Churro () t a b
mapN ((a -> [b]) -> (b -> [b]) -> Either a b -> [b]
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ([b] -> a -> [b]
forall a b. a -> b -> a
const []) b -> [b]
forall (f :: * -> *) a. Applicative f => a -> f a
pure)

-- | Take and yield the first n items.
-- 
-- WARNING: This is intended to terminate upstream once the items have been consumed
--          downstream, but there is a bug preventing this from working at present!
-- 
-- >>> runWaitChan $ sourceList [1..100] >>> takeC 2 >>> sinkPrint
-- 1
-- 2
-- 
-- This implementation explicitly stops propagating when the Churro completes,
-- although this could be handled by downstream consumer composition terminating
-- the producer and just using replicateM.
takeC :: (Transport t, Integral n) => n -> Churro () t a a
takeC :: n -> Churro () t a a
takeC n
n = (Out t (Maybe a) -> In t (Maybe a) -> IO ()) -> Churro () t a a
forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro \Out t (Maybe a)
i In t (Maybe a)
o -> n -> Out t (Maybe a) -> In t (Maybe a) -> IO ()
forall t (t :: * -> *) (t :: * -> *) a.
(Ord t, Num t, Transport t, Transport t, Enum t) =>
t -> Out t (Maybe a) -> In t (Maybe a) -> IO ()
go n
n Out t (Maybe a)
i In t (Maybe a)
o
    where
    go :: t -> Out t (Maybe a) -> In t (Maybe a) -> IO ()
go t
t Out t (Maybe a)
i In t (Maybe a)
o
        | t
t t -> t -> Bool
forall a. Ord a => a -> a -> Bool
<= t
0 = In t (Maybe a) -> Maybe a -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe a)
o Maybe a
forall a. Maybe a
Nothing
        | Bool
otherwise = do
            Maybe a
x <- Out t (Maybe a) -> IO (Maybe a)
forall (t :: * -> *) a. Transport t => Out t a -> IO a
yank Out t (Maybe a)
i
            In t (Maybe a) -> Maybe a -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe a)
o Maybe a
x
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe a -> Bool
forall a. Maybe a -> Bool
isJust Maybe a
x) do t -> Out t (Maybe a) -> In t (Maybe a) -> IO ()
go (t -> t
forall a. Enum a => a -> a
pred t
t) Out t (Maybe a)
i In t (Maybe a)
o

-- | Drop the first n items.
-- 
-- >>> runWaitChan $ sourceList [1..4] >>> dropC 2 >>> sinkPrint
-- 3
-- 4
dropC :: (Transport t, Integral n) => n -> Churro () t a a
dropC :: n -> Churro () t a a
dropC n
n = (Out t (Maybe a) -> In t (Maybe a) -> IO ()) -> Churro () t a a
forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro \Out t (Maybe a)
i In t (Maybe a)
o -> do
    Int -> IO (Maybe a) -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ (n -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral n
n) (Out t (Maybe a) -> IO (Maybe a)
forall (t :: * -> *) a. Transport t => Out t a -> IO a
yank Out t (Maybe a)
i) -- TODO: Check the async behaviour of this...
    (a -> a) -> Out t (Maybe a) -> In t (Maybe a) -> IO ()
forall (t :: * -> *) a b.
Transport t =>
(a -> b) -> Out t (Maybe a) -> In t (Maybe b) -> IO ()
c2c a -> a
forall k (cat :: k -> k -> *) (a :: k). Category cat => cat a a
id Out t (Maybe a)
i In t (Maybe a)
o

-- | Filter items according to a predicate.
--
-- >>> runWaitChan $ sourceList [1..5] >>> filterC (> 3) >>> sinkPrint
-- 4
-- 5
filterC :: Transport t => (a -> Bool) -> Churro () t a a
filterC :: (a -> Bool) -> Churro () t a a
filterC a -> Bool
p = (a -> [a]) -> Churro () t a a
forall (t :: * -> *) a b.
Transport t =>
(a -> [b]) -> Churro () t a b
mapN ((a -> Bool) -> [a] -> [a]
forall a. (a -> Bool) -> [a] -> [a]
filter a -> Bool
p ([a] -> [a]) -> (a -> [a]) -> a -> [a]
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure)

-- | Run a pure function over items, producing multiple outputs.
-- 
-- >>> runWaitChan $ pure 9 >>> mapN (\x -> [x,x*10]) >>> sinkPrint
-- 9
-- 90
mapN :: Transport t => (a -> [b]) -> Churro () t a b
mapN :: (a -> [b]) -> Churro () t a b
mapN a -> [b]
f = (a -> IO [b]) -> Churro () t a b
forall (t :: * -> *) i o.
Transport t =>
(i -> IO [o]) -> Churro () t i o
processN ([b] -> IO [b]
forall (m :: * -> *) a. Monad m => a -> m a
return ([b] -> IO [b]) -> (a -> [b]) -> a -> IO [b]
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> [b]
f)

-- | Delay items from being sent downstream.
-- 
--   Note: NominalDiffTime's Num instance interprets literals as seconds.
-- 
-- >>> let sinkTimeCheck = process (const getCurrentTime) >>> withPrevious >>> arr (\(x,y) -> diffUTCTime y x > 0.01) >>> sinkPrint
-- 
-- >>> runWaitChan $ sourceList [1..2] >>> sinkTimeCheck
-- False
-- 
-- >>> runWaitChan $ sourceList [1..2] >>> delay 0.1 >>> sinkTimeCheck
-- True
delay :: Transport t => NominalDiffTime -> Churro () t a a
delay :: NominalDiffTime -> Churro () t a a
delay = Int -> Churro () t a a
forall (t :: * -> *) a. Transport t => Int -> Churro () t a a
delayMicro (Int -> Churro () t a a)
-> (NominalDiffTime -> Int) -> NominalDiffTime -> Churro () t a a
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall b. (RealFrac Double, Integral b) => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling @Double (Double -> Int)
-> (NominalDiffTime -> Double) -> NominalDiffTime -> Int
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. Rational -> Double
forall a. Fractional a => Rational -> a
fromRational (Rational -> Double)
-> (NominalDiffTime -> Rational) -> NominalDiffTime -> Double
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. (Rational -> Rational -> Rational
forall a. Num a => a -> a -> a
*Rational
1000000) (Rational -> Rational)
-> (NominalDiffTime -> Rational) -> NominalDiffTime -> Rational
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. NominalDiffTime -> Rational
forall a. Real a => a -> Rational
toRational

-- | Delay items in microseconds. Works the same way as `delay`.
delayMicro :: Transport t => Int -> Churro () t a a
delayMicro :: Int -> Churro () t a a
delayMicro Int
d = (a -> IO a) -> Churro () t a a
forall (t :: * -> *) a b.
Transport t =>
(a -> IO b) -> Churro () t a b
process \a
x -> do
    Int -> IO ()
threadDelay Int
d
    a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x

-- | Passes consecutive pairs of items downstream.
-- 
-- >>> runWaitChan $ sourceList [1,2,3] >>> withPrevious >>> sinkPrint
-- (1,2)
-- (2,3)
withPrevious :: Transport t => Churro () t a (a,a)
withPrevious :: Churro () t a (a, a)
withPrevious = (Out t (Maybe a) -> In t (Maybe (a, a)) -> IO ())
-> Churro () t a (a, a)
forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro \Out t (Maybe a)
i In t (Maybe (a, a))
o -> do
    Maybe a -> Out t (Maybe a) -> In t (Maybe (a, a)) -> IO ()
forall (t :: * -> *) (t :: * -> *) a.
(Transport t, Transport t) =>
Maybe a -> Out t (Maybe a) -> In t (Maybe (a, a)) -> IO ()
prog Maybe a
forall a. Maybe a
Nothing Out t (Maybe a)
i In t (Maybe (a, a))
o 
    In t (Maybe (a, a)) -> Maybe (a, a) -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe (a, a))
o Maybe (a, a)
forall a. Maybe a
Nothing
    where
    prog :: Maybe a -> Out t (Maybe a) -> In t (Maybe (a, a)) -> IO ()
prog Maybe a
x Out t (Maybe a)
i In t (Maybe (a, a))
o = do
        Maybe a
y <- Out t (Maybe a) -> IO (Maybe a)
forall (t :: * -> *) a. Transport t => Out t a -> IO a
yank Out t (Maybe a)
i
        case (Maybe a
x,Maybe a
y) of
            (Just a
x', Just a
y') -> In t (Maybe (a, a)) -> Maybe (a, a) -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe (a, a))
o ((a, a) -> Maybe (a, a)
forall a. a -> Maybe a
Just (a
x',a
y')) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe a -> Out t (Maybe a) -> In t (Maybe (a, a)) -> IO ()
prog Maybe a
y Out t (Maybe a)
i In t (Maybe (a, a))
o
            (Maybe a
Nothing, Just a
y') -> Maybe a -> Out t (Maybe a) -> In t (Maybe (a, a)) -> IO ()
prog (a -> Maybe a
forall a. a -> Maybe a
Just a
y') Out t (Maybe a)
i In t (Maybe (a, a))
o
            (Maybe a, Maybe a)
_                  -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Requeue an item if it fails. Swallows exceptions and gives up after retries.
-- 
--  Note: Process will always try once so if retries = 1 then a failing process will execute twice.
-- 
--  The item is requeues on the input side of the churro, so if other items have
--  been passed in they will appear first!
-- 
--  Catches all `SomeException`s. If you wish to narrow the execption type, consider
--  using the processRetry' variant composed with `rights`.
-- 
--  Note: There is an edgecase with Chan transport where a queued retry may not execute
--        if a source completes and finalises before the item is requeued.
--        A different transport type may allow a modified retry function that requeues differently.
-- 
-- >>> :{
-- let
--   prog = processRetry 1 flakeyThing
--   flakeyThing x = do
--     if x > 1
--       then print "GT"  >> return x
--       else print "LTE" >> error ("oops! " <> show x)
-- in
--   runWaitChan $ sourceList [1,2] >>> delay 0.1 >>> prog >>> sinkPrint
-- :}
-- "LTE"
-- "LTE"
-- "GT"
-- 2
-- 
processRetry :: Transport t => Natural -> (i -> IO o) -> Churro () t i o
processRetry :: Natural -> (i -> IO o) -> Churro () t i o
processRetry Natural
retries i -> IO o
f = Natural -> (i -> IO o) -> Churro () t i (Either SomeException o)
forall e (t :: * -> *) i o.
(Exception e, Transport t) =>
Natural -> (i -> IO o) -> Churro () t i (Either e o)
processRetry' @SomeException Natural
retries i -> IO o
f Churro () t i (Either SomeException o)
-> Churro () t (Either SomeException o) o -> Churro () t i o
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> Churro () t (Either SomeException o) o
forall (t :: * -> *) a b. Transport t => Churro () t (Either a b) b
rights

-- | Raw version of `processRetry`. -- Polymorphic over exception type and forwards errors.
--   
processRetry' :: (Exception e, Transport t) => Natural -> (i -> IO o) -> Churro () t i (Either e o)
processRetry' :: Natural -> (i -> IO o) -> Churro () t i (Either e o)
processRetry' Natural
retries i -> IO o
f = (i -> (Natural, i)) -> Churro () t i (Natural, i)
forall (a :: * -> * -> *) b c. Arrow a => (b -> c) -> a b c
arr (Natural
0,) Churro () t i (Natural, i)
-> Churro () t (Natural, i) (Either e o)
-> Churro () t i (Either e o)
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> Natural -> (i -> IO o) -> Churro () t (Natural, i) (Either e o)
forall (t :: * -> *) e n a b.
(Transport t, Exception e, Ord n, Enum n) =>
n -> (a -> IO b) -> Churro () t (n, a) (Either e b)
processRetry'' Natural
retries i -> IO o
f

-- | Rawest version of `processRetry`.
--   Expects the incoming items to contain number of retries.
-- 
--   Also polymorphic over exception type. And forwards errors.
--   
processRetry'' :: (Transport t, Exception e, Ord n, Enum n) => n -> (a -> IO b) -> Churro () t (n, a) (Either e b)
processRetry'' :: n -> (a -> IO b) -> Churro () t (n, a) (Either e b)
processRetry'' n
retries a -> IO b
f =
    (In t (Maybe (n, a))
 -> Out t (Maybe (n, a)) -> In t (Maybe (Either e b)) -> IO ())
-> Churro () t (n, a) (Either e b)
forall (t :: * -> *) i o a.
Transport t =>
(In t (Maybe i) -> Out t (Maybe i) -> In t (Maybe o) -> IO a)
-> Churro a t i o
buildChurro' \In t (Maybe (n, a))
i' Out t (Maybe (n, a))
o In t (Maybe (Either e b))
i -> do
        Out t (Maybe (n, a)) -> ((n, a) -> IO ()) -> IO ()
forall (t :: * -> *) a i.
(Transport t, Monoid a) =>
Out t (Maybe i) -> (i -> IO a) -> IO a
yankAll Out t (Maybe (n, a))
o \(n
n, a
y) -> do
            Either e b
r <- IO b -> IO (Either e b)
forall e a. Exception e => IO a -> IO (Either e a)
try do a -> IO b
f a
y
            In t (Maybe (Either e b)) -> Maybe (Either e b) -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe (Either e b))
i (Either e b -> Maybe (Either e b)
forall a. a -> Maybe a
Just Either e b
r)
            case Either e b
r of
                Right b
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                Left  e
_ -> Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (n
n n -> n -> Bool
forall a. Ord a => a -> a -> Bool
< n
retries) do In t (Maybe (n, a)) -> Maybe (n, a) -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe (n, a))
i' ((n, a) -> Maybe (n, a)
forall a. a -> Maybe a
Just (n -> n
forall a. Enum a => a -> a
succ n
n, a
y))
        In t (Maybe (Either e b)) -> Maybe (Either e b) -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe (Either e b))
i Maybe (Either e b)
forall a. Maybe a
Nothing