{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE BlockArguments #-}

-- | Common transport-agnostic functions for using Churro.
-- 
module Control.Churro.Prelude where

import Control.Churro.Types

import Prelude hiding (id, (.))

import           Control.Arrow            (arr)
import           Control.Category         (id, (.), (>>>))
import           Control.Concurrent       (threadDelay)
import           Control.Concurrent.Async (waitAny, Async, wait)
import           Control.Exception        (Exception, SomeException, try)
import           Control.Monad            (replicateM_, when)
import           Data.Foldable            (for_)
import           Data.IORef               (newIORef, readIORef, writeIORef)
import           Data.Maybe               (isJust)
import           Data.Time                (NominalDiffTime)
import           Data.Void                (Void)
import           GHC.Natural              (Natural)


-- $setup
-- 
-- The examples in this module require the following imports:
-- 
-- >>> import Control.Churro.Transport
-- >>> import Data.Time.Clock
-- 

-- * Runners

-- | Automatically wait for a churro to complete.
-- 
runWait :: Transport t => Churro t Void Void -> IO ()
runWait :: Churro t Void Void -> IO ()
runWait Churro t Void Void
x = Async () -> IO ()
forall a. Async a -> IO a
wait (Async () -> IO ()) -> IO (Async ()) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Churro t Void Void -> IO (Async ())
forall (t :: * -> *).
Transport t =>
Churro t Void Void -> IO (Async ())
run Churro t Void Void
x

-- | Read the output of a Churro into a list.
-- 
-- Warning: This will block until the Churro terminates,
--          Accumulating items in memory.
--          Only use when you expect a finite amount of output.
--          Otherwise consider composing with a Sink and using `runWait`.
-- 
-- >>> runWaitListChan $ sourceList [0..4] >>> arr succ
-- [1,2,3,4,5]
-- 
runWaitList :: Transport t => Churro t Void o -> IO [o]
runWaitList :: Churro t Void o -> IO [o]
runWaitList Churro t Void o
x = do
    IORef [o]
t <- [o] -> IO (IORef [o])
forall a. a -> IO (IORef a)
newIORef []

    let
        c :: Churro t o o
c = (Out t (Maybe o) -> In t (Maybe o) -> IO ()) -> Churro t o o
forall (t :: * -> *) i o.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO ()) -> Churro t i o
buildChurro \Out t (Maybe o)
i In t (Maybe o)
_o -> do
            [o]
l <- Out t (Maybe o) -> IO [o]
forall (t :: * -> *) a. Transport t => Out t (Maybe a) -> IO [a]
yankList Out t (Maybe o)
i
            IORef [o] -> [o] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef [o]
t [o]
l

    Churro t Void Void -> IO ()
forall (t :: * -> *). Transport t => Churro t Void Void -> IO ()
runWait (Churro t Void Void -> IO ()) -> Churro t Void Void -> IO ()
forall a b. (a -> b) -> a -> b
$ Churro t Void o
x Churro t Void o -> Churro t o Void -> Churro t Void Void
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> Churro t o Void
forall o. Churro t o o
c

    IORef [o] -> IO [o]
forall a. IORef a -> IO a
readIORef IORef [o]
t

-- | Run a sourced and sinked (double-dipped) churro and return an async action representing the in-flight processes.
-- 
run :: Transport t => Churro t Void Void -> IO (Async ())
run :: Churro t Void Void -> IO (Async ())
run = Churro t Void Void -> IO (Async ())
forall (t :: * -> *) i o.
Transport t =>
Churro t i o -> IO (Async ())
run'

-- | Run any churro, there is no check that this was spawned with a source, or terminated with a sink.
--   This is unsafe, since the pipeline may not generate or consume in a predictable way.
--   Use `run` instead unless you are confident you know what you're doing.
-- 
run' :: Transport t => Churro t i o -> IO (Async ())
run' :: Churro t i o -> IO (Async ())
run' Churro t i o
c = do
    -- Compose an empty sourceList to ensure termination
    (In t (Maybe Void)
_i,Out t (Maybe o)
_o,Async ()
a) <- Churro t Void o
-> IO (In t (Maybe Void), Out t (Maybe o), Async ())
forall (t :: * -> *) i o.
Churro t i o -> IO (In t (Maybe i), Out t (Maybe o), Async ())
runChurro ([i] -> Churro t Void i
forall (t :: * -> *) (f :: * -> *) o.
(Transport t, Foldable f) =>
f o -> Churro t Void o
sourceList [] Churro t Void i -> Churro t i o -> Churro t Void o
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> Churro t i o
c)
    Async () -> IO (Async ())
forall (m :: * -> *) a. Monad m => a -> m a
return Async ()
a

-- * Library

-- ** Sources

-- | A single items source.
--
-- >>> runWaitChan $ sourceSingleton 13 >>> sinkPrint
-- 13
--
-- Equivalent to `pure` from `Applicative`. Redefined here in case you're looking for a source!
-- 
-- >>> runWaitChan $ pure 23 >>> sinkPrint
-- 23
sourceSingleton :: Transport t => o -> Churro t Void o
sourceSingleton :: o -> Churro t Void o
sourceSingleton o
x = [o] -> Churro t Void o
forall (t :: * -> *) (f :: * -> *) o.
(Transport t, Foldable f) =>
f o -> Churro t Void o
sourceList [o
x]

-- | Create a source from a list of items, sending each down the churro independently.
--
-- >>> runWaitChan $ sourceList [4,2] >>> sinkPrint
-- 4
-- 2
sourceList :: (Transport t, Foldable f) => f o -> Churro t Void o
sourceList :: f o -> Churro t Void o
sourceList = ((o -> IO ()) -> IO ()) -> Churro t Void o
forall (t :: * -> *) o.
Transport t =>
((o -> IO ()) -> IO ()) -> Churro t Void o
sourceIO (((o -> IO ()) -> IO ()) -> Churro t Void o)
-> (f o -> (o -> IO ()) -> IO ()) -> f o -> Churro t Void o
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. f o -> (o -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_

-- | Create a source from an IO action that is passed a function to yield new items.
--
-- >>> runWaitChan $ sourceIO (\cb -> cb 4 >> cb 2) >>> sinkPrint
-- 4
-- 2
sourceIO :: Transport t => ((o -> IO ()) -> IO ()) -> Churro t Void o
sourceIO :: ((o -> IO ()) -> IO ()) -> Churro t Void o
sourceIO (o -> IO ()) -> IO ()
cb =
    (Out t (Maybe Void) -> In t (Maybe o) -> IO ()) -> Churro t Void o
forall (t :: * -> *) i o.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO ()) -> Churro t i o
buildChurro \Out t (Maybe Void)
_i In t (Maybe o)
o -> do
        (o -> IO ()) -> IO ()
cb (In t (Maybe o) -> Maybe o -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe o)
o (Maybe o -> IO ()) -> (o -> Maybe o) -> o -> IO ()
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. o -> Maybe o
forall a. a -> Maybe a
Just)
        In t (Maybe o) -> Maybe o -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe o)
o Maybe o
forall a. Maybe a
Nothing

-- | Combine a list of sources into a single source.
-- 
-- Sends individual items downstream without attempting to combine them.
-- 
-- >>> runWaitChan $ sources [pure 1, pure 1] >>> sinkPrint
-- 1
-- 1
sources :: Transport t => [Source t o] -> Source t o
sources :: [Source t o] -> Source t o
sources [Source t o]
ss = ((o -> IO ()) -> IO ()) -> Source t o
forall (t :: * -> *) o.
Transport t =>
((o -> IO ()) -> IO ()) -> Churro t Void o
sourceIO \o -> IO ()
cb -> do
    [Async ()]
asyncs <- (Source t o -> IO (Async ())) -> [Source t o] -> IO [Async ()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (\Source t o
s -> Churro t Void Void -> IO (Async ())
forall (t :: * -> *).
Transport t =>
Churro t Void Void -> IO (Async ())
run (Churro t Void Void -> IO (Async ()))
-> Churro t Void Void -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Source t o
s Source t o -> Churro t o Void -> Churro t Void Void
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> (o -> IO ()) -> Churro t o Void
forall (t :: * -> *) o.
Transport t =>
(o -> IO ()) -> Churro t o Void
sinkIO o -> IO ()
cb) [Source t o]
ss
    (Async ()
a, ()
_) <- [Async ()] -> IO (Async (), ())
forall a. [Async a] -> IO (Async a, a)
waitAny [Async ()]
asyncs
    Async () -> IO ()
forall a. Async a -> IO a
wait Async ()
a

-- ** Sinks

-- | Consume all items with no additional effects.
-- 
-- TODO: Decide if we should use some kind of `nf` evaluation here to force items.
-- 
-- >>> runWaitChan $ pure 1 >>> process print >>> sink
-- 1
-- 
sink :: Transport t => Churro t b Void
sink :: Churro t b Void
sink = (b -> IO ()) -> Churro t b Void
forall (t :: * -> *) o.
Transport t =>
(o -> IO ()) -> Churro t o Void
sinkIO (IO () -> b -> IO ()
forall a b. a -> b -> a
const (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()))

-- | Consume a churro with an IO process.
-- 
-- >>> runWaitChan $ pure 1 >>> sinkIO (\x -> print "hello" >> print (succ x))
-- "hello"
-- 2
sinkIO :: Transport t => (o -> IO ()) -> Churro t o Void
sinkIO :: (o -> IO ()) -> Churro t o Void
sinkIO o -> IO ()
cb = (Out t (Maybe o) -> In t (Maybe Void) -> IO ()) -> Churro t o Void
forall (t :: * -> *) i o.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO ()) -> Churro t i o
buildChurro \Out t (Maybe o)
i In t (Maybe Void)
_o -> Out t (Maybe o) -> (o -> IO ()) -> IO ()
forall (t :: * -> *) i a.
Transport t =>
Out t (Maybe i) -> (i -> IO a) -> IO ()
yankAll Out t (Maybe o)
i o -> IO ()
cb

-- | Consume and print each item. Used in many examples, but not much use outside debugging!
-- 
-- >>> runWaitChan $ pure "hi" >>> sinkPrint
-- "hi"
sinkPrint :: (Transport t, Show a) => Churro t a Void
sinkPrint :: Churro t a Void
sinkPrint = (a -> IO ()) -> Churro t a Void
forall (t :: * -> *) o.
Transport t =>
(o -> IO ()) -> Churro t o Void
sinkIO a -> IO ()
forall a. Show a => a -> IO ()
print
    

-- ** Churros

-- | Process each item with an IO action.
--   Acts as a one-to-one process.
-- 
-- >>> runWaitChan $ pure "hi" >>> process (\x -> print x >> return (reverse x)) >>> sinkPrint
-- "hi"
-- "ih"
process :: Transport t => (a -> IO b) -> Churro t a b
process :: (a -> IO b) -> Churro t a b
process a -> IO b
f = (a -> IO [b]) -> Churro t a b
forall (t :: * -> *) a b.
Transport t =>
(a -> IO [b]) -> Churro t a b
processN ((b -> [b]) -> IO b -> IO [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap b -> [b]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IO b -> IO [b]) -> (a -> IO b) -> a -> IO [b]
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> IO b
f)

-- | Print each item then pass it on.
processPrint :: (Transport t, Show b) => Churro t b b
processPrint :: Churro t b b
processPrint = (b -> IO b) -> Churro t b b
forall (t :: * -> *) a b.
Transport t =>
(a -> IO b) -> Churro t a b
process \b
x -> do b -> IO ()
forall a. Show a => a -> IO ()
print b
x IO () -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return b
x

-- | Print each item with an additional debugging label.
processDebug :: (Transport t, Show b) => String -> Churro t b b
processDebug :: String -> Churro t b b
processDebug String
d = (b -> IO b) -> Churro t b b
forall (t :: * -> *) a b.
Transport t =>
(a -> IO b) -> Churro t a b
process \b
x -> String -> IO ()
putStrLn (String
"Debugging [" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
d String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"]: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> b -> String
forall a. Show a => a -> String
show b
x) IO () -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return b
x

-- | Process each item with an IO action and potentially yield many items as a result.
--   Acts as a one-to-many process.
-- 
-- >>> runWaitChan $ pure 1 >>> processN (\x -> print (show x) >> return [x, succ x]) >>> sinkPrint
-- "1"
-- 1
-- 2
processN :: Transport t => (a -> IO [b]) -> Churro t a b
processN :: (a -> IO [b]) -> Churro t a b
processN a -> IO [b]
f =
    (Out t (Maybe a) -> In t (Maybe b) -> IO ()) -> Churro t a b
forall (t :: * -> *) i o.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO ()) -> Churro t i o
buildChurro \Out t (Maybe a)
i In t (Maybe b)
o -> do
        Out t (Maybe a) -> (a -> IO ()) -> IO ()
forall (t :: * -> *) i a.
Transport t =>
Out t (Maybe i) -> (i -> IO a) -> IO ()
yankAll Out t (Maybe a)
i \a
x -> do (b -> IO ()) -> [b] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (In t (Maybe b) -> Maybe b -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe b)
o (Maybe b -> IO ()) -> (b -> Maybe b) -> b -> IO ()
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. b -> Maybe b
forall a. a -> Maybe a
Just) ([b] -> IO ()) -> IO [b] -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< a -> IO [b]
f a
x
        In t (Maybe b) -> Maybe b -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe b)
o Maybe b
forall a. Maybe a
Nothing

-- | Extract xs from (Just x)s. Similar to `catMaybes`.
-- 
-- >>> runWaitChan $ sourceList [Just 1, Nothing, Just 3] >>> justs >>> sinkPrint
-- 1
-- 3
justs :: Transport t => Churro t (Maybe a) a
justs :: Churro t (Maybe a) a
justs = (Maybe a -> [a]) -> Churro t (Maybe a) a
forall (t :: * -> *) a b. Transport t => (a -> [b]) -> Churro t a b
mapN ([a] -> (a -> [a]) -> Maybe a -> [a]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] a -> [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure)

-- | Extract ls from (Left l)s.
-- 
-- >>> runWaitChan $ sourceList [Left 1, Right 2, Left 3] >>> lefts >>> sinkPrint
-- 1
-- 3
lefts :: Transport t => Churro t (Either a b) a
lefts :: Churro t (Either a b) a
lefts = (Either a b -> [a]) -> Churro t (Either a b) a
forall (t :: * -> *) a b. Transport t => (a -> [b]) -> Churro t a b
mapN ((a -> [a]) -> (b -> [a]) -> Either a b -> [a]
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either a -> [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> b -> [a]
forall a b. a -> b -> a
const []))

-- | Extract rs from (Right r)s.
-- 
-- >>> runWaitChan $ sourceList [Left 1, Right 2, Left 3] >>> rights >>> sinkPrint
-- 2
rights :: Transport t => Churro t (Either a b) b
rights :: Churro t (Either a b) b
rights = (Either a b -> [b]) -> Churro t (Either a b) b
forall (t :: * -> *) a b. Transport t => (a -> [b]) -> Churro t a b
mapN ((a -> [b]) -> (b -> [b]) -> Either a b -> [b]
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ([b] -> a -> [b]
forall a b. a -> b -> a
const []) b -> [b]
forall (f :: * -> *) a. Applicative f => a -> f a
pure)

-- | Take and yield the first n items.
-- 
-- WARNING: This is intended to terminate upstream once the items have been consumed
--          downstream, but there is a bug preventing this from working at present!
-- 
-- >>> runWaitChan $ sourceList [1..100] >>> takeC 2 >>> sinkPrint
-- 1
-- 2
-- 
-- This implementation explicitly stops propagating when the Churro completes,
-- although this could be handled by downstream consumer composition terminating
-- the producer and just using replicateM.
takeC :: (Transport t, Integral n) => n -> Churro t a a
takeC :: n -> Churro t a a
takeC n
n = (Out t (Maybe a) -> In t (Maybe a) -> IO ()) -> Churro t a a
forall (t :: * -> *) i o.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO ()) -> Churro t i o
buildChurro \Out t (Maybe a)
i In t (Maybe a)
o -> n -> Out t (Maybe a) -> In t (Maybe a) -> IO ()
forall t (t :: * -> *) (t :: * -> *) a.
(Ord t, Num t, Transport t, Transport t, Enum t) =>
t -> Out t (Maybe a) -> In t (Maybe a) -> IO ()
go n
n Out t (Maybe a)
i In t (Maybe a)
o
    where
    go :: t -> Out t (Maybe a) -> In t (Maybe a) -> IO ()
go t
t Out t (Maybe a)
i In t (Maybe a)
o
        | t
t t -> t -> Bool
forall a. Ord a => a -> a -> Bool
<= t
0 = In t (Maybe a) -> Maybe a -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe a)
o Maybe a
forall a. Maybe a
Nothing
        | Bool
otherwise = do
            Maybe a
x <- Out t (Maybe a) -> IO (Maybe a)
forall (t :: * -> *) a. Transport t => Out t a -> IO a
yank Out t (Maybe a)
i
            In t (Maybe a) -> Maybe a -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe a)
o Maybe a
x
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe a -> Bool
forall a. Maybe a -> Bool
isJust Maybe a
x) do t -> Out t (Maybe a) -> In t (Maybe a) -> IO ()
go (t -> t
forall a. Enum a => a -> a
pred t
t) Out t (Maybe a)
i In t (Maybe a)
o

-- | Drop the first n items.
-- 
-- >>> runWaitChan $ sourceList [1..4] >>> dropC 2 >>> sinkPrint
-- 3
-- 4
dropC :: (Transport t, Integral n) => n -> Churro t a a
dropC :: n -> Churro t a a
dropC n
n = (Out t (Maybe a) -> In t (Maybe a) -> IO ()) -> Churro t a a
forall (t :: * -> *) i o.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO ()) -> Churro t i o
buildChurro \Out t (Maybe a)
i In t (Maybe a)
o -> do
    Int -> IO (Maybe a) -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ (n -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral n
n) (Out t (Maybe a) -> IO (Maybe a)
forall (t :: * -> *) a. Transport t => Out t a -> IO a
yank Out t (Maybe a)
i) -- TODO: Check the async behaviour of this...
    (a -> a) -> Out t (Maybe a) -> In t (Maybe a) -> IO ()
forall (t :: * -> *) a b.
Transport t =>
(a -> b) -> Out t (Maybe a) -> In t (Maybe b) -> IO ()
c2c a -> a
forall k (cat :: k -> k -> *) (a :: k). Category cat => cat a a
id Out t (Maybe a)
i In t (Maybe a)
o

-- | Filter items according to a predicate.
--
-- >>> runWaitChan $ sourceList [1..5] >>> filterC (> 3) >>> sinkPrint
-- 4
-- 5
filterC :: Transport t => (a -> Bool) -> Churro t a a
filterC :: (a -> Bool) -> Churro t a a
filterC a -> Bool
p = (a -> [a]) -> Churro t a a
forall (t :: * -> *) a b. Transport t => (a -> [b]) -> Churro t a b
mapN ((a -> Bool) -> [a] -> [a]
forall a. (a -> Bool) -> [a] -> [a]
filter a -> Bool
p ([a] -> [a]) -> (a -> [a]) -> a -> [a]
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure)

-- | Run a pure function over items, producing multiple outputs.
-- 
-- >>> runWaitChan $ pure 9 >>> mapN (\x -> [x,x*10]) >>> sinkPrint
-- 9
-- 90
mapN :: Transport t => (a -> [b]) -> Churro t a b
mapN :: (a -> [b]) -> Churro t a b
mapN a -> [b]
f = (a -> IO [b]) -> Churro t a b
forall (t :: * -> *) a b.
Transport t =>
(a -> IO [b]) -> Churro t a b
processN ([b] -> IO [b]
forall (m :: * -> *) a. Monad m => a -> m a
return ([b] -> IO [b]) -> (a -> [b]) -> a -> IO [b]
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> [b]
f)

-- | Delay items from being sent downstream.
-- 
--   Note: NominalDiffTime's Num instance interprets literals as seconds.
-- 
-- >>> let sinkTimeCheck = process (const getCurrentTime) >>> withPrevious >>> arr (\(x,y) -> diffUTCTime y x > 0.01) >>> sinkPrint
-- 
-- >>> runWaitChan $ sourceList [1..2] >>> sinkTimeCheck
-- False
-- 
-- >>> runWaitChan $ sourceList [1..2] >>> delay 0.1 >>> sinkTimeCheck
-- True
delay :: Transport t => NominalDiffTime -> Churro t a a
delay :: NominalDiffTime -> Churro t a a
delay = Int -> Churro t a a
forall (t :: * -> *) a. Transport t => Int -> Churro t a a
delayMicro (Int -> Churro t a a)
-> (NominalDiffTime -> Int) -> NominalDiffTime -> Churro t a a
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall b. (RealFrac Double, Integral b) => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling @Double (Double -> Int)
-> (NominalDiffTime -> Double) -> NominalDiffTime -> Int
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. Rational -> Double
forall a. Fractional a => Rational -> a
fromRational (Rational -> Double)
-> (NominalDiffTime -> Rational) -> NominalDiffTime -> Double
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. (Rational -> Rational -> Rational
forall a. Num a => a -> a -> a
*Rational
1000000) (Rational -> Rational)
-> (NominalDiffTime -> Rational) -> NominalDiffTime -> Rational
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. NominalDiffTime -> Rational
forall a. Real a => a -> Rational
toRational

-- | Delay items in microseconds. Works the same way as `delay`.
delayMicro :: Transport t => Int -> Churro t a a
delayMicro :: Int -> Churro t a a
delayMicro Int
d = (a -> IO a) -> Churro t a a
forall (t :: * -> *) a b.
Transport t =>
(a -> IO b) -> Churro t a b
process \a
x -> do
    Int -> IO ()
threadDelay Int
d
    a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x

-- | Passes consecutive pairs of items downstream.
-- 
-- >>> runWaitChan $ sourceList [1,2,3] >>> withPrevious >>> sinkPrint
-- (1,2)
-- (2,3)
withPrevious :: Transport t => Churro t a (a,a)
withPrevious :: Churro t a (a, a)
withPrevious = (Out t (Maybe a) -> In t (Maybe (a, a)) -> IO ())
-> Churro t a (a, a)
forall (t :: * -> *) i o.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO ()) -> Churro t i o
buildChurro \Out t (Maybe a)
i In t (Maybe (a, a))
o -> do
    Maybe a -> Out t (Maybe a) -> In t (Maybe (a, a)) -> IO ()
forall (t :: * -> *) (t :: * -> *) a.
(Transport t, Transport t) =>
Maybe a -> Out t (Maybe a) -> In t (Maybe (a, a)) -> IO ()
prog Maybe a
forall a. Maybe a
Nothing Out t (Maybe a)
i In t (Maybe (a, a))
o 
    In t (Maybe (a, a)) -> Maybe (a, a) -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe (a, a))
o Maybe (a, a)
forall a. Maybe a
Nothing
    where
    prog :: Maybe a -> Out t (Maybe a) -> In t (Maybe (a, a)) -> IO ()
prog Maybe a
x Out t (Maybe a)
i In t (Maybe (a, a))
o = do
        Maybe a
y <- Out t (Maybe a) -> IO (Maybe a)
forall (t :: * -> *) a. Transport t => Out t a -> IO a
yank Out t (Maybe a)
i
        case (Maybe a
x,Maybe a
y) of
            (Just a
x', Just a
y') -> In t (Maybe (a, a)) -> Maybe (a, a) -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe (a, a))
o ((a, a) -> Maybe (a, a)
forall a. a -> Maybe a
Just (a
x',a
y')) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe a -> Out t (Maybe a) -> In t (Maybe (a, a)) -> IO ()
prog Maybe a
y Out t (Maybe a)
i In t (Maybe (a, a))
o
            (Maybe a
Nothing, Just a
y') -> Maybe a -> Out t (Maybe a) -> In t (Maybe (a, a)) -> IO ()
prog (a -> Maybe a
forall a. a -> Maybe a
Just a
y') Out t (Maybe a)
i In t (Maybe (a, a))
o
            (Maybe a, Maybe a)
_                  -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Requeue an item if it fails. Swallows exceptions and gives up after retries.
-- 
--  Note: Process will always try once so if retries = 1 then a failing process will execute twice.
-- 
--  The item is requeues on the input side of the churro, so if other items have
--  been passed in they will appear first!
-- 
--  Catches all `SomeException`s. If you wish to narrow the execption type, consider
--  using the processRetry' variant composed with `rights`.
-- 
--  Note: There is an edgecase with Chan transport where a queued retry may not execute
--        if a source completes and finalises before the item is requeued.
--        A different transport type may allow a modified retry function that requeues differently.
-- 
-- >>> :{
-- let
--   prog = processRetry 1 flakeyThing
--   flakeyThing x = do
--     if x > 1
--       then print "GT"  >> return x
--       else print "LTE" >> error ("oops! " <> show x)
-- in
--   runWaitChan $ sourceList [1,2] >>> delay 0.1 >>> prog >>> sinkPrint
-- :}
-- "LTE"
-- "LTE"
-- "GT"
-- 2
-- 
processRetry :: Transport t => Natural -> (i -> IO o) -> Churro t i o
processRetry :: Natural -> (i -> IO o) -> Churro t i o
processRetry Natural
retries i -> IO o
f = Natural -> (i -> IO o) -> Churro t i (Either SomeException o)
forall e (t :: * -> *) i o.
(Exception e, Transport t) =>
Natural -> (i -> IO o) -> Churro t i (Either e o)
processRetry' @SomeException Natural
retries i -> IO o
f Churro t i (Either SomeException o)
-> Churro t (Either SomeException o) o -> Churro t i o
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> Churro t (Either SomeException o) o
forall (t :: * -> *) a b. Transport t => Churro t (Either a b) b
rights

-- | Raw version of `processRetry`. -- Polymorphic over exception type and forwards errors.
--   
processRetry' :: (Exception e, Transport t) => Natural -> (i -> IO o) -> Churro t i (Either e o)
processRetry' :: Natural -> (i -> IO o) -> Churro t i (Either e o)
processRetry' Natural
retries i -> IO o
f = (i -> (Natural, i)) -> Churro t i (Natural, i)
forall (a :: * -> * -> *) b c. Arrow a => (b -> c) -> a b c
arr (Natural
0,) Churro t i (Natural, i)
-> Churro t (Natural, i) (Either e o) -> Churro t i (Either e o)
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> Natural -> (i -> IO o) -> Churro t (Natural, i) (Either e o)
forall (t :: * -> *) e n a b.
(Transport t, Exception e, Ord n, Enum n) =>
n -> (a -> IO b) -> Churro t (n, a) (Either e b)
processRetry'' Natural
retries i -> IO o
f

-- | Rawest version of `processRetry`.
--   Expects the incoming items to contain number of retries.
-- 
--   Also polymorphic over exception type. And forwards errors.
--   
processRetry'' :: (Transport t, Exception e, Ord n, Enum n) => n -> (a -> IO b) -> Churro t (n, a) (Either e b)
processRetry'' :: n -> (a -> IO b) -> Churro t (n, a) (Either e b)
processRetry'' n
retries a -> IO b
f =
    (In t (Maybe (n, a))
 -> Out t (Maybe (n, a)) -> In t (Maybe (Either e b)) -> IO ())
-> Churro t (n, a) (Either e b)
forall (t :: * -> *) i o.
Transport t =>
(In t (Maybe i) -> Out t (Maybe i) -> In t (Maybe o) -> IO ())
-> Churro t i o
buildChurro' \In t (Maybe (n, a))
i' Out t (Maybe (n, a))
o In t (Maybe (Either e b))
i -> do
        Out t (Maybe (n, a)) -> ((n, a) -> IO ()) -> IO ()
forall (t :: * -> *) i a.
Transport t =>
Out t (Maybe i) -> (i -> IO a) -> IO ()
yankAll Out t (Maybe (n, a))
o \(n
n, a
y) -> do
            Either e b
r <- IO b -> IO (Either e b)
forall e a. Exception e => IO a -> IO (Either e a)
try do a -> IO b
f a
y
            In t (Maybe (Either e b)) -> Maybe (Either e b) -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe (Either e b))
i (Either e b -> Maybe (Either e b)
forall a. a -> Maybe a
Just Either e b
r)
            case Either e b
r of
                Right b
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                Left  e
_ -> Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (n
n n -> n -> Bool
forall a. Ord a => a -> a -> Bool
< n
retries) do In t (Maybe (n, a)) -> Maybe (n, a) -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe (n, a))
i' ((n, a) -> Maybe (n, a)
forall a. a -> Maybe a
Just (n -> n
forall a. Enum a => a -> a
succ n
n, a
y))
        In t (Maybe (Either e b)) -> Maybe (Either e b) -> IO ()
forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe (Either e b))
i Maybe (Either e b)
forall a. Maybe a
Nothing