{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RebindableSyntax #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}
module Box.Connectors
( fromListE,
fromList_,
toList_,
fromToList_,
emitQ,
commitQ,
sink,
source,
forkEmit,
feedback,
queueCommitter,
queueEmitter,
concurrentE,
concurrentC,
glueN,
)
where
import Box.Box
import Box.Committer
import Box.Cont
import Box.Emitter
import Box.Queue
import Control.Concurrent.Classy.Async as C
import Control.Lens
import Control.Monad.Conc.Class (MonadConc)
import Control.Monad.Morph
import Control.Monad.State.Lazy
import Data.Foldable
import qualified Data.Sequence as Seq
import Prelude
fromListE :: (MonadConc m) => [a] -> Cont m (Emitter m a)
fromListE :: [a] -> Cont m (Emitter m a)
fromListE [a]
xs = (forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a)
forall (m :: * -> *) a. (forall r. (a -> m r) -> m r) -> Cont m a
Cont ((forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a))
-> (forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a)
forall a b. (a -> b) -> a -> b
$ (Committer m a -> m ()) -> (Emitter m a -> m r) -> m r
forall (m :: * -> *) a l r.
MonadConc m =>
(Committer m a -> m l) -> (Emitter m a -> m r) -> m r
queueE ([Emitter m a] -> Committer m a -> m ()
forall (m :: * -> *) a.
Monad m =>
[Emitter m a] -> Committer m a -> m ()
eListC (m (Maybe a) -> Emitter m a
forall (m :: * -> *) a. m (Maybe a) -> Emitter m a
Emitter (m (Maybe a) -> Emitter m a)
-> (a -> m (Maybe a)) -> a -> Emitter m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe a -> m (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a -> m (Maybe a)) -> (a -> Maybe a) -> a -> m (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just (a -> Emitter m a) -> [a] -> [Emitter m a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [a]
xs))
eListC :: (Monad m) => [Emitter m a] -> Committer m a -> m ()
eListC :: [Emitter m a] -> Committer m a -> m ()
eListC [] Committer m a
_ = () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
eListC (Emitter m a
e : [Emitter m a]
es) Committer m a
c = do
Maybe a
x <- Emitter m a -> m (Maybe a)
forall (m :: * -> *) a. Emitter m a -> m (Maybe a)
emit Emitter m a
e
case Maybe a
x of
Maybe a
Nothing -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just a
x' -> Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c a
x' m Bool -> m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> [Emitter m a] -> Committer m a -> m ()
forall (m :: * -> *) a.
Monad m =>
[Emitter m a] -> Committer m a -> m ()
eListC [Emitter m a]
es Committer m a
c
fromList_ :: Monad m => [a] -> Committer m a -> m ()
fromList_ :: [a] -> Committer m a -> m ()
fromList_ [a]
xs Committer m a
c = (StateT (Seq a) m () -> Seq a -> m ())
-> Seq a -> StateT (Seq a) m () -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT (Seq a) m () -> Seq a -> m ()
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a
evalStateT ([a] -> Seq a
forall a. [a] -> Seq a
Seq.fromList [a]
xs) (StateT (Seq a) m () -> m ()) -> StateT (Seq a) m () -> m ()
forall a b. (a -> b) -> a -> b
$ Committer (StateT (Seq a) m) a
-> Emitter (StateT (Seq a) m) a -> StateT (Seq a) m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue ((forall a. m a -> StateT (Seq a) m a)
-> Committer m a -> Committer (StateT (Seq a) m) a
forall k (t :: (* -> *) -> k -> *) (m :: * -> *) (n :: * -> *)
(b :: k).
(MFunctor t, Monad m) =>
(forall a. m a -> n a) -> t m b -> t n b
hoist forall a. m a -> StateT (Seq a) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Committer m a
c) Emitter (StateT (Seq a) m) a
forall (m :: * -> *) a. Monad m => Emitter (StateT (Seq a) m) a
stateE
toList_ :: (Monad m) => Emitter m a -> m [a]
toList_ :: Emitter m a -> m [a]
toList_ Emitter m a
e = Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq a -> [a]) -> m (Seq a) -> m [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (StateT (Seq a) m () -> Seq a -> m (Seq a))
-> Seq a -> StateT (Seq a) m () -> m (Seq a)
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT (Seq a) m () -> Seq a -> m (Seq a)
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT Seq a
forall a. Seq a
Seq.empty (Committer (StateT (Seq a) m) a
-> Emitter (StateT (Seq a) m) a -> StateT (Seq a) m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer (StateT (Seq a) m) a
forall (m :: * -> *) a. Monad m => Committer (StateT (Seq a) m) a
stateC ((forall a. m a -> StateT (Seq a) m a)
-> Emitter m a -> Emitter (StateT (Seq a) m) a
forall k (t :: (* -> *) -> k -> *) (m :: * -> *) (n :: * -> *)
(b :: k).
(MFunctor t, Monad m) =>
(forall a. m a -> n a) -> t m b -> t n b
hoist forall a. m a -> StateT (Seq a) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Emitter m a
e))
glueN :: Monad m => Int -> Committer m a -> Emitter m a -> m ()
glueN :: Int -> Committer m a -> Emitter m a -> m ()
glueN Int
n Committer m a
c Emitter m a
e = (StateT Int m () -> Int -> m ()) -> Int -> StateT Int m () -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT Int m () -> Int -> m ()
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a
evalStateT Int
0 (StateT Int m () -> m ()) -> StateT Int m () -> m ()
forall a b. (a -> b) -> a -> b
$ Committer (StateT Int m) a
-> Emitter (StateT Int m) a -> StateT Int m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue ((forall a. m a -> StateT Int m a)
-> Committer m a -> Committer (StateT Int m) a
forall k (t :: (* -> *) -> k -> *) (m :: * -> *) (n :: * -> *)
(b :: k).
(MFunctor t, Monad m) =>
(forall a. m a -> n a) -> t m b -> t n b
hoist forall a. m a -> StateT Int m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Committer m a
c) (Int -> Emitter m a -> Emitter (StateT Int m) a
forall (m :: * -> *) a.
Monad m =>
Int -> Emitter m a -> Emitter (StateT Int m) a
takeE Int
n Emitter m a
e)
fromToList_ :: (Monad m) => [a] -> (Box (StateT (Seq.Seq b, Seq.Seq a) m) b a -> StateT (Seq.Seq b, Seq.Seq a) m r) -> m [b]
fromToList_ :: [a]
-> (Box (StateT (Seq b, Seq a) m) b a -> StateT (Seq b, Seq a) m r)
-> m [b]
fromToList_ [a]
xs Box (StateT (Seq b, Seq a) m) b a -> StateT (Seq b, Seq a) m r
f = do
(Seq b
res, Seq a
_) <-
(StateT (Seq b, Seq a) m r -> (Seq b, Seq a) -> m (Seq b, Seq a))
-> (Seq b, Seq a) -> StateT (Seq b, Seq a) m r -> m (Seq b, Seq a)
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT (Seq b, Seq a) m r -> (Seq b, Seq a) -> m (Seq b, Seq a)
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT (Seq b
forall a. Seq a
Seq.empty, [a] -> Seq a
forall a. [a] -> Seq a
Seq.fromList [a]
xs) (StateT (Seq b, Seq a) m r -> m (Seq b, Seq a))
-> StateT (Seq b, Seq a) m r -> m (Seq b, Seq a)
forall a b. (a -> b) -> a -> b
$
Box (StateT (Seq b, Seq a) m) b a -> StateT (Seq b, Seq a) m r
f (Committer (StateT (Seq b, Seq a) m) b
-> Emitter (StateT (Seq b, Seq a) m) a
-> Box (StateT (Seq b, Seq a) m) b a
forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box ((forall a. StateT (Seq b) m a -> StateT (Seq b, Seq a) m a)
-> Committer (StateT (Seq b) m) b
-> Committer (StateT (Seq b, Seq a) m) b
forall k (t :: (* -> *) -> k -> *) (m :: * -> *) (n :: * -> *)
(b :: k).
(MFunctor t, Monad m) =>
(forall a. m a -> n a) -> t m b -> t n b
hoist (LensLike' (Zoomed (StateT (Seq b) m) a) (Seq b, Seq a) (Seq b)
-> StateT (Seq b) m a -> StateT (Seq b, Seq a) m a
forall (m :: * -> *) (n :: * -> *) s t c.
Zoom m n s t =>
LensLike' (Zoomed m c) t s -> m c -> n c
zoom LensLike' (Zoomed (StateT (Seq b) m) a) (Seq b, Seq a) (Seq b)
forall s t a b. Field1 s t a b => Lens s t a b
_1) Committer (StateT (Seq b) m) b
forall (m :: * -> *) a. Monad m => Committer (StateT (Seq a) m) a
stateC) ((forall a. StateT (Seq a) m a -> StateT (Seq b, Seq a) m a)
-> Emitter (StateT (Seq a) m) a
-> Emitter (StateT (Seq b, Seq a) m) a
forall k (t :: (* -> *) -> k -> *) (m :: * -> *) (n :: * -> *)
(b :: k).
(MFunctor t, Monad m) =>
(forall a. m a -> n a) -> t m b -> t n b
hoist (LensLike' (Zoomed (StateT (Seq a) m) a) (Seq b, Seq a) (Seq a)
-> StateT (Seq a) m a -> StateT (Seq b, Seq a) m a
forall (m :: * -> *) (n :: * -> *) s t c.
Zoom m n s t =>
LensLike' (Zoomed m c) t s -> m c -> n c
zoom LensLike' (Zoomed (StateT (Seq a) m) a) (Seq b, Seq a) (Seq a)
forall s t a b. Field2 s t a b => Lens s t a b
_2) Emitter (StateT (Seq a) m) a
forall (m :: * -> *) a. Monad m => Emitter (StateT (Seq a) m) a
stateE))
[b] -> m [b]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([b] -> m [b]) -> [b] -> m [b]
forall a b. (a -> b) -> a -> b
$ Seq b -> [b]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq b
res
emitQ :: (MonadConc m) => (Committer m a -> m r) -> Cont m (Emitter m a)
emitQ :: (Committer m a -> m r) -> Cont m (Emitter m a)
emitQ Committer m a -> m r
cio = (forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a)
forall (m :: * -> *) a. (forall r. (a -> m r) -> m r) -> Cont m a
Cont ((forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a))
-> (forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a)
forall a b. (a -> b) -> a -> b
$ \Emitter m a -> m r
eio -> (Committer m a -> m r) -> (Emitter m a -> m r) -> m r
forall (m :: * -> *) a l r.
MonadConc m =>
(Committer m a -> m l) -> (Emitter m a -> m r) -> m r
queueE Committer m a -> m r
cio Emitter m a -> m r
eio
commitQ :: (MonadConc m) => (Emitter m a -> m r) -> Cont m (Committer m a)
commitQ :: (Emitter m a -> m r) -> Cont m (Committer m a)
commitQ Emitter m a -> m r
eio = (forall r. (Committer m a -> m r) -> m r) -> Cont m (Committer m a)
forall (m :: * -> *) a. (forall r. (a -> m r) -> m r) -> Cont m a
Cont ((forall r. (Committer m a -> m r) -> m r)
-> Cont m (Committer m a))
-> (forall r. (Committer m a -> m r) -> m r)
-> Cont m (Committer m a)
forall a b. (a -> b) -> a -> b
$ \Committer m a -> m r
cio -> (Committer m a -> m r) -> (Emitter m a -> m r) -> m r
forall (m :: * -> *) a l r.
MonadConc m =>
(Committer m a -> m l) -> (Emitter m a -> m r) -> m l
queueC Committer m a -> m r
cio Emitter m a -> m r
eio
sink1 :: (Monad m) => (a -> m ()) -> Emitter m a -> m ()
sink1 :: (a -> m ()) -> Emitter m a -> m ()
sink1 a -> m ()
f Emitter m a
e = do
Maybe a
a <- Emitter m a -> m (Maybe a)
forall (m :: * -> *) a. Emitter m a -> m (Maybe a)
emit Emitter m a
e
case Maybe a
a of
Maybe a
Nothing -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just a
a' -> a -> m ()
f a
a'
sink :: (MonadConc m) => Int -> (a -> m ()) -> Cont m (Committer m a)
sink :: Int -> (a -> m ()) -> Cont m (Committer m a)
sink Int
n a -> m ()
f = (Emitter m a -> m ()) -> Cont m (Committer m a)
forall (m :: * -> *) a r.
MonadConc m =>
(Emitter m a -> m r) -> Cont m (Committer m a)
commitQ ((Emitter m a -> m ()) -> Cont m (Committer m a))
-> (Emitter m a -> m ()) -> Cont m (Committer m a)
forall a b. (a -> b) -> a -> b
$ Int -> m () -> m ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (m () -> m ()) -> (Emitter m a -> m ()) -> Emitter m a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> m ()) -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
(a -> m ()) -> Emitter m a -> m ()
sink1 a -> m ()
f
source1 :: (Monad m) => m a -> Committer m a -> m ()
source1 :: m a -> Committer m a -> m ()
source1 m a
a Committer m a
c = do
a
a' <- m a
a
m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c a
a'
source :: (MonadConc m) => Int -> m a -> Cont m (Emitter m a)
source :: Int -> m a -> Cont m (Emitter m a)
source Int
n m a
f = (Committer m a -> m ()) -> Cont m (Emitter m a)
forall (m :: * -> *) a r.
MonadConc m =>
(Committer m a -> m r) -> Cont m (Emitter m a)
emitQ ((Committer m a -> m ()) -> Cont m (Emitter m a))
-> (Committer m a -> m ()) -> Cont m (Emitter m a)
forall a b. (a -> b) -> a -> b
$ Int -> m () -> m ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (m () -> m ()) -> (Committer m a -> m ()) -> Committer m a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Committer m a -> m ()
forall (m :: * -> *) a. Monad m => m a -> Committer m a -> m ()
source1 m a
f
forkEmit :: (Monad m) => Emitter m a -> Committer m a -> Emitter m a
forkEmit :: Emitter m a -> Committer m a -> Emitter m a
forkEmit Emitter m a
e Committer m a
c =
m (Maybe a) -> Emitter m a
forall (m :: * -> *) a. m (Maybe a) -> Emitter m a
Emitter (m (Maybe a) -> Emitter m a) -> m (Maybe a) -> Emitter m a
forall a b. (a -> b) -> a -> b
$ do
Maybe a
a <- Emitter m a -> m (Maybe a)
forall (m :: * -> *) a. Emitter m a -> m (Maybe a)
emit Emitter m a
e
m () -> (a -> m ()) -> Maybe a -> m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> (a -> m Bool) -> a -> m ()
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c) Maybe a
a
Maybe a -> m (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
a
queueCommitter :: (MonadConc m) => Committer m a -> Cont m (Committer m a)
queueCommitter :: Committer m a -> Cont m (Committer m a)
queueCommitter Committer m a
c = (forall r. (Committer m a -> m r) -> m r) -> Cont m (Committer m a)
forall (m :: * -> *) a. (forall r. (a -> m r) -> m r) -> Cont m a
Cont ((forall r. (Committer m a -> m r) -> m r)
-> Cont m (Committer m a))
-> (forall r. (Committer m a -> m r) -> m r)
-> Cont m (Committer m a)
forall a b. (a -> b) -> a -> b
$ \Committer m a -> m r
caction -> (Committer m a -> m r) -> (Emitter m a -> m ()) -> m r
forall (m :: * -> *) a l r.
MonadConc m =>
(Committer m a -> m l) -> (Emitter m a -> m r) -> m l
queueC Committer m a -> m r
caction (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer m a
c)
queueEmitter :: (MonadConc m) => Emitter m a -> Cont m (Emitter m a)
queueEmitter :: Emitter m a -> Cont m (Emitter m a)
queueEmitter Emitter m a
e = (forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a)
forall (m :: * -> *) a. (forall r. (a -> m r) -> m r) -> Cont m a
Cont ((forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a))
-> (forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a)
forall a b. (a -> b) -> a -> b
$ \Emitter m a -> m r
eaction -> (Committer m a -> m ()) -> (Emitter m a -> m r) -> m r
forall (m :: * -> *) a l r.
MonadConc m =>
(Committer m a -> m l) -> (Emitter m a -> m r) -> m r
queueE (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter m a
e) Emitter m a -> m r
eaction
concurrentE ::
(MonadConc m) =>
Emitter m a ->
Emitter m a ->
Cont m (Emitter m a)
concurrentE :: Emitter m a -> Emitter m a -> Cont m (Emitter m a)
concurrentE Emitter m a
e Emitter m a
e' =
(forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a)
forall (m :: * -> *) a. (forall r. (a -> m r) -> m r) -> Cont m a
Cont ((forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a))
-> (forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a)
forall a b. (a -> b) -> a -> b
$ \Emitter m a -> m r
eaction ->
(r, r) -> r
forall a b. (a, b) -> a
fst
((r, r) -> r) -> m (r, r) -> m r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m r -> m r -> m (r, r)
forall (m :: * -> *) a b. MonadConc m => m a -> m b -> m (a, b)
C.concurrently
((Committer m a -> m ()) -> (Emitter m a -> m r) -> m r
forall (m :: * -> *) a l r.
MonadConc m =>
(Committer m a -> m l) -> (Emitter m a -> m r) -> m r
queueE (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter m a
e) Emitter m a -> m r
eaction)
((Committer m a -> m ()) -> (Emitter m a -> m r) -> m r
forall (m :: * -> *) a l r.
MonadConc m =>
(Committer m a -> m l) -> (Emitter m a -> m r) -> m r
queueE (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter m a
e') Emitter m a -> m r
eaction)
concurrentC :: (MonadConc m) => Committer m a -> Committer m a -> Cont m (Committer m a)
concurrentC :: Committer m a -> Committer m a -> Cont m (Committer m a)
concurrentC Committer m a
c Committer m a
c' = Either (Committer m a) (Committer m a) -> Committer m a
forall (m :: * -> *) a.
Either (Committer m a) (Committer m a) -> Committer m a
mergeC (Either (Committer m a) (Committer m a) -> Committer m a)
-> Cont m (Either (Committer m a) (Committer m a))
-> Cont m (Committer m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Committer m a
-> Committer m a -> Cont m (Either (Committer m a) (Committer m a))
forall (m :: * -> *) a.
MonadConc m =>
Committer m a
-> Committer m a -> Cont m (Either (Committer m a) (Committer m a))
eitherC Committer m a
c Committer m a
c'
eitherC ::
(MonadConc m) =>
Committer m a ->
Committer m a ->
Cont m (Either (Committer m a) (Committer m a))
eitherC :: Committer m a
-> Committer m a -> Cont m (Either (Committer m a) (Committer m a))
eitherC Committer m a
cl Committer m a
cr =
(forall r. (Either (Committer m a) (Committer m a) -> m r) -> m r)
-> Cont m (Either (Committer m a) (Committer m a))
forall (m :: * -> *) a. (forall r. (a -> m r) -> m r) -> Cont m a
Cont ((forall r. (Either (Committer m a) (Committer m a) -> m r) -> m r)
-> Cont m (Either (Committer m a) (Committer m a)))
-> (forall r.
(Either (Committer m a) (Committer m a) -> m r) -> m r)
-> Cont m (Either (Committer m a) (Committer m a))
forall a b. (a -> b) -> a -> b
$
\Either (Committer m a) (Committer m a) -> m r
kk ->
(r, r) -> r
forall a b. (a, b) -> a
fst
((r, r) -> r) -> m (r, r) -> m r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m r -> m r -> m (r, r)
forall (m :: * -> *) a b. MonadConc m => m a -> m b -> m (a, b)
C.concurrently
((Committer m a -> m r) -> (Emitter m a -> m ()) -> m r
forall (m :: * -> *) a l r.
MonadConc m =>
(Committer m a -> m l) -> (Emitter m a -> m r) -> m l
queueC (Either (Committer m a) (Committer m a) -> m r
kk (Either (Committer m a) (Committer m a) -> m r)
-> (Committer m a -> Either (Committer m a) (Committer m a))
-> Committer m a
-> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Committer m a -> Either (Committer m a) (Committer m a)
forall a b. a -> Either a b
Left) (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer m a
cl))
((Committer m a -> m r) -> (Emitter m a -> m ()) -> m r
forall (m :: * -> *) a l r.
MonadConc m =>
(Committer m a -> m l) -> (Emitter m a -> m r) -> m l
queueC (Either (Committer m a) (Committer m a) -> m r
kk (Either (Committer m a) (Committer m a) -> m r)
-> (Committer m a -> Either (Committer m a) (Committer m a))
-> Committer m a
-> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Committer m a -> Either (Committer m a) (Committer m a)
forall a b. b -> Either a b
Right) (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer m a
cr))
mergeC :: Either (Committer m a) (Committer m a) -> Committer m a
mergeC :: Either (Committer m a) (Committer m a) -> Committer m a
mergeC Either (Committer m a) (Committer m a)
ec =
(a -> m Bool) -> Committer m a
forall (m :: * -> *) a. (a -> m Bool) -> Committer m a
Committer ((a -> m Bool) -> Committer m a) -> (a -> m Bool) -> Committer m a
forall a b. (a -> b) -> a -> b
$ \a
a ->
case Either (Committer m a) (Committer m a)
ec of
Left Committer m a
lc -> Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
lc a
a
Right Committer m a
rc -> Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
rc a
a
feedback ::
(MonadConc m) =>
(a -> m (Maybe b)) ->
Cont m (Box m b a) ->
Cont m (Box m b a)
feedback :: (a -> m (Maybe b)) -> Cont m (Box m b a) -> Cont m (Box m b a)
feedback a -> m (Maybe b)
f Cont m (Box m b a)
box =
(forall r. (Box m b a -> m r) -> m r) -> Cont m (Box m b a)
forall (m :: * -> *) a. (forall r. (a -> m r) -> m r) -> Cont m a
Cont ((forall r. (Box m b a -> m r) -> m r) -> Cont m (Box m b a))
-> (forall r. (Box m b a -> m r) -> m r) -> Cont m (Box m b a)
forall a b. (a -> b) -> a -> b
$ \Box m b a -> m r
bio ->
Cont m (Box m b a) -> forall r. (Box m b a -> m r) -> m r
forall (m :: * -> *) a. Cont m a -> forall r. (a -> m r) -> m r
with Cont m (Box m b a)
box ((Box m b a -> m r) -> m r) -> (Box m b a -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \(Box Committer m b
c Emitter m a
e) -> do
Committer m b -> Emitter m b -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer m b
c ((a -> m (Maybe b)) -> Emitter m a -> Emitter m b
forall (m :: * -> *) a b.
Monad m =>
(a -> m (Maybe b)) -> Emitter m a -> Emitter m b
mapE a -> m (Maybe b)
f Emitter m a
e)
Box m b a -> m r
bio (Committer m b -> Emitter m a -> Box m b a
forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box Committer m b
c Emitter m a
e)