-- | Various ways to connect things up.
module Box.Connectors
  ( qList,
    qListWith,
    popList,
    pushList,
    pushListN,
    sink,
    sinkWith,
    source,
    sourceWith,
    forkEmit,
    bufferCommitter,
    bufferEmitter,
    concurrentE,
    concurrentC,
    takeQ,
    evalEmitter,
    evalEmitterWith,
  )
where

import Box.Box
import Box.Codensity
import Box.Committer
import Box.Emitter
import Box.Functor
import Box.Queue
import Control.Concurrent.Async
import Control.Monad
import Control.Monad.State.Lazy
import Data.Foldable
import Data.Functor
import Data.Sequence qualified as Seq
import Prelude

-- $setup
-- >>> :set -XOverloadedStrings
-- >>> import Box
-- >>> import Prelude
-- >>> import Data.Bool
-- >>> import Control.Monad

-- | Queue a list 'Unbounded'.
--
-- >>> pushList <$|> qList [1,2,3]
-- [1,2,3]
qList :: [a] -> CoEmitter IO a
qList :: forall a. [a] -> CoEmitter IO a
qList [a]
xs = Queue a -> [a] -> CoEmitter IO a
forall a. Queue a -> [a] -> CoEmitter IO a
qListWith Queue a
forall a. Queue a
Unbounded [a]
xs

-- | Queue a list with an explicit 'Queue'.
--
-- >>> pushList <$|> qListWith Single [1,2,3]
-- [1,2,3]
qListWith :: Queue a -> [a] -> CoEmitter IO a
qListWith :: forall a. Queue a -> [a] -> CoEmitter IO a
qListWith Queue a
q [a]
xs = Queue a -> (Committer IO a -> IO Bool) -> CoEmitter IO a
forall a r. Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ Queue a
q (\Committer IO a
c -> ([Bool] -> Bool) -> IO [Bool] -> IO Bool
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Bool] -> Bool
forall (t :: * -> *). Foldable t => t Bool -> Bool
and ((a -> IO Bool) -> [a] -> IO [Bool]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse (Committer IO a -> a -> IO Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer IO a
c) [a]
xs))

-- | Directly supply a list to a committer action, via pop.
--
-- >>> popList [1..3] showStdout
-- 1
-- 2
-- 3
popList :: (Monad m) => [a] -> Committer m a -> m ()
popList :: forall (m :: * -> *) a. Monad m => [a] -> Committer m a -> m ()
popList [a]
xs Committer m a
c = Seq a -> Committer m a -> Emitter (StateT (Seq a) m) a -> m ()
forall (m :: * -> *) s a.
Monad m =>
s -> Committer m a -> Emitter (StateT s m) a -> m ()
glueES ([a] -> Seq a
forall a. [a] -> Seq a
Seq.fromList [a]
xs) Committer m a
c Emitter (StateT (Seq a) m) a
forall (m :: * -> *) a. Monad m => Emitter (StateT (Seq a) m) a
pop

-- | Push an Emitter into a list, via push.
--
-- >>> pushList <$|> qList [1..3]
-- [1,2,3]
pushList :: (Monad m) => Emitter m a -> m [a]
pushList :: forall (m :: * -> *) a. Monad m => Emitter m a -> m [a]
pushList Emitter m a
e = Seq a -> [a]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq a -> [a]) -> m (Seq a) -> m [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (StateT (Seq a) m () -> Seq a -> m (Seq a))
-> Seq a -> StateT (Seq a) m () -> m (Seq a)
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT (Seq a) m () -> Seq a -> m (Seq a)
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT Seq a
forall a. Seq a
Seq.empty (Committer (StateT (Seq a) m) a
-> Emitter (StateT (Seq a) m) a -> StateT (Seq a) m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer (StateT (Seq a) m) a
forall (m :: * -> *) a. Monad m => Committer (StateT (Seq a) m) a
push ((forall x. m x -> StateT (Seq a) m x)
-> Emitter m a -> Emitter (StateT (Seq a) m) a
forall (f :: * -> *) (g :: * -> *) a.
(forall x. f x -> g x) -> Emitter f a -> Emitter g a
forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist m x -> StateT (Seq a) m x
forall x. m x -> StateT (Seq a) m x
forall (m :: * -> *) a. Monad m => m a -> StateT (Seq a) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Emitter m a
e))

-- | Push an Emitter into a list, finitely.
--
-- >>> pushListN 2 <$|> qList [1..3]
-- [1,2]
pushListN :: (Monad m) => Int -> Emitter m a -> m [a]
pushListN :: forall (m :: * -> *) a. Monad m => Int -> Emitter m a -> m [a]
pushListN Int
n Emitter m a
e = Seq a -> [a]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq a -> [a]) -> m (Seq a) -> m [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (StateT (Seq a) m () -> Seq a -> m (Seq a))
-> Seq a -> StateT (Seq a) m () -> m (Seq a)
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT (Seq a) m () -> Seq a -> m (Seq a)
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT Seq a
forall a. Seq a
Seq.empty (Int
-> Committer (StateT (Seq a) m) a
-> Emitter (StateT (Seq a) m) a
-> StateT (Seq a) m ()
forall (m :: * -> *) a.
Monad m =>
Int -> Committer m a -> Emitter m a -> m ()
glueN Int
n Committer (StateT (Seq a) m) a
forall (m :: * -> *) a. Monad m => Committer (StateT (Seq a) m) a
push ((forall x. m x -> StateT (Seq a) m x)
-> Emitter m a -> Emitter (StateT (Seq a) m) a
forall (f :: * -> *) (g :: * -> *) a.
(forall x. f x -> g x) -> Emitter f a -> Emitter g a
forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist m x -> StateT (Seq a) m x
forall x. m x -> StateT (Seq a) m x
forall (m :: * -> *) a. Monad m => m a -> StateT (Seq a) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Emitter m a
e))

-- singleton sink
sink1 :: (Monad m) => (a -> m ()) -> Emitter m a -> m ()
sink1 :: forall (m :: * -> *) a.
Monad m =>
(a -> m ()) -> Emitter m a -> m ()
sink1 a -> m ()
f Emitter m a
e = do
  Maybe a
a <- Emitter m a -> m (Maybe a)
forall (m :: * -> *) a. Emitter m a -> m (Maybe a)
emit Emitter m a
e
  Maybe a -> (a -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe a
a a -> m ()
f

-- | Create a finite Committer Unbounded Queue.
--
-- > glue <$> sink 2 print <*|> qList [1..3]
-- > 1
-- > 2
sink :: Int -> (a -> IO ()) -> CoCommitter IO a
sink :: forall a. Int -> (a -> IO ()) -> CoCommitter IO a
sink Int
n a -> IO ()
f = Queue a -> Int -> (a -> IO ()) -> CoCommitter IO a
forall a. Queue a -> Int -> (a -> IO ()) -> CoCommitter IO a
sinkWith Queue a
forall a. Queue a
Unbounded Int
n a -> IO ()
f

-- | Create a finite Committer Queue.
sinkWith :: Queue a -> Int -> (a -> IO ()) -> CoCommitter IO a
sinkWith :: forall a. Queue a -> Int -> (a -> IO ()) -> CoCommitter IO a
sinkWith Queue a
q Int
n a -> IO ()
f = Queue a -> (Emitter IO a -> IO ()) -> CoCommitter IO a
forall a r. Queue a -> (Emitter IO a -> IO r) -> CoCommitter IO a
commitQ Queue a
q ((Emitter IO a -> IO ()) -> CoCommitter IO a)
-> (Emitter IO a -> IO ()) -> CoCommitter IO a
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (IO () -> IO ())
-> (Emitter IO a -> IO ()) -> Emitter IO a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> IO ()) -> Emitter IO a -> IO ()
forall (m :: * -> *) a.
Monad m =>
(a -> m ()) -> Emitter m a -> m ()
sink1 a -> IO ()
f

-- singleton source
source1 :: (Monad m) => m a -> Committer m a -> m ()
source1 :: forall (m :: * -> *) a. Monad m => m a -> Committer m a -> m ()
source1 m a
a Committer m a
c = do
  a
a' <- m a
a
  m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c a
a'

-- | Create a finite (Co)Emitter Unbounded Queue.
--
-- >>> glue toStdout <$|> source 2 (pure "hi")
-- hi
-- hi
source :: Int -> IO a -> CoEmitter IO a
source :: forall a. Int -> IO a -> CoEmitter IO a
source Int
n IO a
f = Queue a -> Int -> IO a -> CoEmitter IO a
forall a. Queue a -> Int -> IO a -> CoEmitter IO a
sourceWith Queue a
forall a. Queue a
Unbounded Int
n IO a
f

-- | Create a finite (Co)Emitter Unbounded Queue.
--
-- >>> glue toStdout <$|> sourceWith Single 2 (pure "hi")
-- hi
-- hi
sourceWith :: Queue a -> Int -> IO a -> CoEmitter IO a
sourceWith :: forall a. Queue a -> Int -> IO a -> CoEmitter IO a
sourceWith Queue a
q Int
n IO a
f = Queue a -> (Committer IO a -> IO ()) -> CoEmitter IO a
forall a r. Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ Queue a
q ((Committer IO a -> IO ()) -> CoEmitter IO a)
-> (Committer IO a -> IO ()) -> CoEmitter IO a
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (IO () -> IO ())
-> (Committer IO a -> IO ()) -> Committer IO a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO a -> Committer IO a -> IO ()
forall (m :: * -> *) a. Monad m => m a -> Committer m a -> m ()
source1 IO a
f

-- | Glues an emitter to a committer, then resupplies the emitter.
--
-- >>> (c1,l1) <- refCommitter :: IO (Committer IO Int, IO [Int])
-- >>> close $ toListM <$> (forkEmit <$> (qList [1..3]) <*> pure c1)
-- [1,2,3]
--
-- >>> l1
-- [1,2,3]
forkEmit :: (Monad m) => Emitter m a -> Committer m a -> Emitter m a
forkEmit :: forall (m :: * -> *) a.
Monad m =>
Emitter m a -> Committer m a -> Emitter m a
forkEmit Emitter m a
e Committer m a
c =
  m (Maybe a) -> Emitter m a
forall (m :: * -> *) a. m (Maybe a) -> Emitter m a
Emitter (m (Maybe a) -> Emitter m a) -> m (Maybe a) -> Emitter m a
forall a b. (a -> b) -> a -> b
$ do
    Maybe a
a <- Emitter m a -> m (Maybe a)
forall (m :: * -> *) a. Emitter m a -> m (Maybe a)
emit Emitter m a
e
    m () -> (a -> m ()) -> Maybe a -> m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> (a -> m Bool) -> a -> m ()
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c) Maybe a
a
    Maybe a -> m (Maybe a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
a

-- | Buffer a committer.
bufferCommitter :: Committer IO a -> CoCommitter IO a
bufferCommitter :: forall a. Committer IO a -> CoCommitter IO a
bufferCommitter Committer IO a
c = (forall b. (Committer IO a -> IO b) -> IO b)
-> Codensity IO (Committer IO a)
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b. (Committer IO a -> IO b) -> IO b)
 -> Codensity IO (Committer IO a))
-> (forall b. (Committer IO a -> IO b) -> IO b)
-> Codensity IO (Committer IO a)
forall a b. (a -> b) -> a -> b
$ \Committer IO a -> IO b
caction -> Queue a
-> (Committer IO a -> IO b) -> (Emitter IO a -> IO ()) -> IO b
forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO l
queueL Queue a
forall a. Queue a
Unbounded Committer IO a -> IO b
caction (Committer IO a -> Emitter IO a -> IO ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer IO a
c)

-- | Buffer an emitter.
bufferEmitter :: Emitter IO a -> CoEmitter IO a
bufferEmitter :: forall a. Emitter IO a -> CoEmitter IO a
bufferEmitter Emitter IO a
e = (forall b. (Emitter IO a -> IO b) -> IO b)
-> Codensity IO (Emitter IO a)
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b. (Emitter IO a -> IO b) -> IO b)
 -> Codensity IO (Emitter IO a))
-> (forall b. (Emitter IO a -> IO b) -> IO b)
-> Codensity IO (Emitter IO a)
forall a b. (a -> b) -> a -> b
$ \Emitter IO a -> IO b
eaction -> Queue a
-> (Committer IO a -> IO ()) -> (Emitter IO a -> IO b) -> IO b
forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO r
queueR Queue a
forall a. Queue a
Unbounded (Committer IO a -> Emitter IO a -> IO ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter IO a
e) Emitter IO a -> IO b
eaction

-- | Concurrently run two emitters.
--
-- This differs to (<>), which is left-biased.
--
-- Note that functions such as toListM, which complete on the first Nothing emitted, will not work as expected.
--
-- >>> close $ (fmap toListM) (join $ concurrentE Single <$> qList [1..3] <*> qList [5..9])
-- [1,2,3]
--
-- In the code below, the ordering is non-deterministic.
--
-- > (c,l) <- refCommitter :: IO (Committer IO Int, IO [Int])
-- > close $ glue c <$> (join $ concurrentE Single <$> qList [1..30] <*> qList [40..60])
concurrentE ::
  Queue a ->
  Emitter IO a ->
  Emitter IO a ->
  CoEmitter IO a
concurrentE :: forall a. Queue a -> Emitter IO a -> Emitter IO a -> CoEmitter IO a
concurrentE Queue a
q Emitter IO a
e Emitter IO a
e' =
  (forall b. (Emitter IO a -> IO b) -> IO b)
-> Codensity IO (Emitter IO a)
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b. (Emitter IO a -> IO b) -> IO b)
 -> Codensity IO (Emitter IO a))
-> (forall b. (Emitter IO a -> IO b) -> IO b)
-> Codensity IO (Emitter IO a)
forall a b. (a -> b) -> a -> b
$ \Emitter IO a -> IO b
eaction -> ((), b) -> b
forall a b. (a, b) -> b
snd (((), b) -> b)
-> ((((), b), ((), b)) -> ((), b)) -> (((), b), ((), b)) -> b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (((), b), ((), b)) -> ((), b)
forall a b. (a, b) -> a
fst ((((), b), ((), b)) -> b) -> IO (((), b), ((), b)) -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO ((), b) -> IO ((), b) -> IO (((), b), ((), b))
forall a b. IO a -> IO b -> IO (a, b)
concurrently (Queue a
-> (Committer IO a -> IO ())
-> (Emitter IO a -> IO b)
-> IO ((), b)
forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO (l, r)
queue Queue a
q (Committer IO a -> Emitter IO a -> IO ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter IO a
e) Emitter IO a -> IO b
eaction) (Queue a
-> (Committer IO a -> IO ())
-> (Emitter IO a -> IO b)
-> IO ((), b)
forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO (l, r)
queue Queue a
q (Committer IO a -> Emitter IO a -> IO ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter IO a
e') Emitter IO a -> IO b
eaction)

-- | Concurrently run two committers.
--
-- >>> import Data.Functor.Contravariant
-- >>> import Data.Text (pack)
-- >>> cFast = witherC (\b -> pure (Just b)) . contramap ("fast: " <>) $ toStdout
-- >>> cSlow = witherC (\b -> sleep 0.1 >> pure (Just b)) . contramap ("slow: " <>) $ toStdout
-- >>> close $ (popList ((pack . show) <$> [1..3]) <$> (concurrentC Unbounded cFast cSlow)) <> pure (sleep 1)
-- fast: 1
-- fast: 2
-- fast: 3
-- slow: 1
-- slow: 2
-- slow: 3
concurrentC :: Queue a -> Committer IO a -> Committer IO a -> CoCommitter IO a
concurrentC :: forall a.
Queue a -> Committer IO a -> Committer IO a -> CoCommitter IO a
concurrentC Queue a
q Committer IO a
c Committer IO a
c' = Either (Committer IO a) (Committer IO a) -> Committer IO a
forall a.
Either (Committer IO a) (Committer IO a) -> Committer IO a
mergeC (Either (Committer IO a) (Committer IO a) -> Committer IO a)
-> Codensity IO (Either (Committer IO a) (Committer IO a))
-> Codensity IO (Committer IO a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Queue a
-> Committer IO a
-> Committer IO a
-> Codensity IO (Either (Committer IO a) (Committer IO a))
forall a.
Queue a
-> Committer IO a
-> Committer IO a
-> Codensity IO (Either (Committer IO a) (Committer IO a))
eitherC Queue a
q Committer IO a
c Committer IO a
c'

eitherC ::
  Queue a ->
  Committer IO a ->
  Committer IO a ->
  Codensity IO (Either (Committer IO a) (Committer IO a))
eitherC :: forall a.
Queue a
-> Committer IO a
-> Committer IO a
-> Codensity IO (Either (Committer IO a) (Committer IO a))
eitherC Queue a
q Committer IO a
cl Committer IO a
cr =
  (forall b.
 (Either (Committer IO a) (Committer IO a) -> IO b) -> IO b)
-> Codensity IO (Either (Committer IO a) (Committer IO a))
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b.
  (Either (Committer IO a) (Committer IO a) -> IO b) -> IO b)
 -> Codensity IO (Either (Committer IO a) (Committer IO a)))
-> (forall b.
    (Either (Committer IO a) (Committer IO a) -> IO b) -> IO b)
-> Codensity IO (Either (Committer IO a) (Committer IO a))
forall a b. (a -> b) -> a -> b
$
    \Either (Committer IO a) (Committer IO a) -> IO b
kk ->
      (b, b) -> b
forall a b. (a, b) -> a
fst
        ((b, b) -> b) -> IO (b, b) -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO b -> IO b -> IO (b, b)
forall a b. IO a -> IO b -> IO (a, b)
concurrently
          (Queue a
-> (Committer IO a -> IO b) -> (Emitter IO a -> IO ()) -> IO b
forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO l
queueL Queue a
q (Either (Committer IO a) (Committer IO a) -> IO b
kk (Either (Committer IO a) (Committer IO a) -> IO b)
-> (Committer IO a -> Either (Committer IO a) (Committer IO a))
-> Committer IO a
-> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Committer IO a -> Either (Committer IO a) (Committer IO a)
forall a b. a -> Either a b
Left) (Committer IO a -> Emitter IO a -> IO ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer IO a
cl))
          (Queue a
-> (Committer IO a -> IO b) -> (Emitter IO a -> IO ()) -> IO b
forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO l
queueL Queue a
q (Either (Committer IO a) (Committer IO a) -> IO b
kk (Either (Committer IO a) (Committer IO a) -> IO b)
-> (Committer IO a -> Either (Committer IO a) (Committer IO a))
-> Committer IO a
-> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Committer IO a -> Either (Committer IO a) (Committer IO a)
forall a b. b -> Either a b
Right) (Committer IO a -> Emitter IO a -> IO ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer IO a
cr))

mergeC :: Either (Committer IO a) (Committer IO a) -> Committer IO a
mergeC :: forall a.
Either (Committer IO a) (Committer IO a) -> Committer IO a
mergeC Either (Committer IO a) (Committer IO a)
ec =
  (a -> IO Bool) -> Committer IO a
forall (m :: * -> *) a. (a -> m Bool) -> Committer m a
Committer ((a -> IO Bool) -> Committer IO a)
-> (a -> IO Bool) -> Committer IO a
forall a b. (a -> b) -> a -> b
$ \a
a ->
    case Either (Committer IO a) (Committer IO a)
ec of
      Left Committer IO a
lc -> Committer IO a -> a -> IO Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer IO a
lc a
a
      Right Committer IO a
rc -> Committer IO a -> a -> IO Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer IO a
rc a
a

-- | Take and queue n emits.
--
-- >>> import Control.Monad.State.Lazy
-- >>> toListM <$|> (takeQ Single 4 =<< qList [0..])
-- [0,1,2,3]
takeQ :: Queue a -> Int -> Emitter IO a -> CoEmitter IO a
takeQ :: forall a. Queue a -> Int -> Emitter IO a -> CoEmitter IO a
takeQ Queue a
q Int
n Emitter IO a
e = Queue a -> (Committer IO a -> IO ()) -> CoEmitter IO a
forall a r. Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ Queue a
q ((Committer IO a -> IO ()) -> CoEmitter IO a)
-> (Committer IO a -> IO ()) -> CoEmitter IO a
forall a b. (a -> b) -> a -> b
$ \Committer IO a
c -> Int -> Committer IO a -> Emitter (StateT Int IO) a -> IO ()
forall (m :: * -> *) s a.
Monad m =>
s -> Committer m a -> Emitter (StateT s m) a -> m ()
glueES Int
0 Committer IO a
c (Int -> Emitter IO a -> Emitter (StateT Int IO) a
forall (m :: * -> *) a.
Monad m =>
Int -> Emitter m a -> Emitter (StateT Int m) a
takeE Int
n Emitter IO a
e)

-- | queue a stateful emitter, supplying initial state
--
-- >>> import Control.Monad.State.Lazy
-- >>> toListM <$|> (evalEmitter 0 <$> takeE 4 =<< qList [0..])
-- [0,1,2,3]
evalEmitter :: s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitter :: forall s a. s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitter s
s Emitter (StateT s IO) a
e = Queue a -> s -> Emitter (StateT s IO) a -> CoEmitter IO a
forall a s.
Queue a -> s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitterWith Queue a
forall a. Queue a
Unbounded s
s Emitter (StateT s IO) a
e

-- | queue a stateful emitter, supplying initial state
evalEmitterWith :: Queue a -> s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitterWith :: forall a s.
Queue a -> s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitterWith Queue a
q s
s Emitter (StateT s IO) a
e = Queue a -> (Committer IO a -> IO ()) -> CoEmitter IO a
forall a r. Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ Queue a
q ((Committer IO a -> IO ()) -> CoEmitter IO a)
-> (Committer IO a -> IO ()) -> CoEmitter IO a
forall a b. (a -> b) -> a -> b
$ \Committer IO a
c -> s -> Committer IO a -> Emitter (StateT s IO) a -> IO ()
forall (m :: * -> *) s a.
Monad m =>
s -> Committer m a -> Emitter (StateT s m) a -> m ()
glueES s
s Committer IO a
c Emitter (StateT s IO) a
e