{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RebindableSyntax #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}

-- | various ways to connect things up
module Box.Connectors
  ( fromListE,
    fromList_,
    toList_,
    fromToList_,
    emitQ,
    commitQ,
    sink,
    source,
    forkEmit,
    feedback,
    queueCommitter,
    queueEmitter,
    concurrentE,
    concurrentC,
    glueN,
  )
where

import Box.Box
import Box.Committer
import Box.Cont
import Box.Emitter
import Box.Queue
import Control.Concurrent.Classy.Async as C
import Control.Lens
import Control.Monad.Conc.Class (MonadConc)
import Control.Monad.Morph
import Control.Monad.State.Lazy
import Data.Foldable
import qualified Data.Sequence as Seq
import Prelude

-- $setup
-- >>> :set -XOverloadedStrings
-- >>> :set -XGADTs
-- >>> :set -XFlexibleContexts
-- >>> import Data.Functor.Contravariant
-- >>> import Box
-- >>> import Control.Applicative
-- >>> import Control.Monad.Conc.Class as C
-- >>> import Control.Lens
-- >>> import qualified Data.Sequence as Seq
-- >>> import Data.Text (pack, Text)
-- >>> import Data.Functor.Contravariant

-- | Turn a list into an 'Emitter' continuation via a 'Queue'
fromListE :: (MonadConc m) => [a] -> Cont m (Emitter m a)
fromListE :: [a] -> Cont m (Emitter m a)
fromListE [a]
xs = (forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a)
forall (m :: * -> *) a. (forall r. (a -> m r) -> m r) -> Cont m a
Cont ((forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a))
-> (forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a)
forall a b. (a -> b) -> a -> b
$ (Committer m a -> m ()) -> (Emitter m a -> m r) -> m r
forall (m :: * -> *) a l r.
MonadConc m =>
(Committer m a -> m l) -> (Emitter m a -> m r) -> m r
queueE ([Emitter m a] -> Committer m a -> m ()
forall (m :: * -> *) a.
Monad m =>
[Emitter m a] -> Committer m a -> m ()
eListC (m (Maybe a) -> Emitter m a
forall (m :: * -> *) a. m (Maybe a) -> Emitter m a
Emitter (m (Maybe a) -> Emitter m a)
-> (a -> m (Maybe a)) -> a -> Emitter m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe a -> m (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a -> m (Maybe a)) -> (a -> Maybe a) -> a -> m (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just (a -> Emitter m a) -> [a] -> [Emitter m a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [a]
xs))

eListC :: (Monad m) => [Emitter m a] -> Committer m a -> m ()
eListC :: [Emitter m a] -> Committer m a -> m ()
eListC [] Committer m a
_ = () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
eListC (Emitter m a
e : [Emitter m a]
es) Committer m a
c = do
  Maybe a
x <- Emitter m a -> m (Maybe a)
forall (m :: * -> *) a. Emitter m a -> m (Maybe a)
emit Emitter m a
e
  case Maybe a
x of
    Maybe a
Nothing -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just a
x' -> Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c a
x' m Bool -> m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> [Emitter m a] -> Committer m a -> m ()
forall (m :: * -> *) a.
Monad m =>
[Emitter m a] -> Committer m a -> m ()
eListC [Emitter m a]
es Committer m a
c

-- | fromList_ directly supplies to a committer action
--
-- FIXME: fromList_ combined with cRef is failing dejavu concurrency testing...
fromList_ :: Monad m => [a] -> Committer m a -> m ()
fromList_ :: [a] -> Committer m a -> m ()
fromList_ [a]
xs Committer m a
c = (StateT (Seq a) m () -> Seq a -> m ())
-> Seq a -> StateT (Seq a) m () -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT (Seq a) m () -> Seq a -> m ()
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a
evalStateT ([a] -> Seq a
forall a. [a] -> Seq a
Seq.fromList [a]
xs) (StateT (Seq a) m () -> m ()) -> StateT (Seq a) m () -> m ()
forall a b. (a -> b) -> a -> b
$ Committer (StateT (Seq a) m) a
-> Emitter (StateT (Seq a) m) a -> StateT (Seq a) m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue ((forall a. m a -> StateT (Seq a) m a)
-> Committer m a -> Committer (StateT (Seq a) m) a
forall k (t :: (* -> *) -> k -> *) (m :: * -> *) (n :: * -> *)
       (b :: k).
(MFunctor t, Monad m) =>
(forall a. m a -> n a) -> t m b -> t n b
hoist forall a. m a -> StateT (Seq a) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Committer m a
c) Emitter (StateT (Seq a) m) a
forall (m :: * -> *) a. Monad m => Emitter (StateT (Seq a) m) a
stateE

-- | toList_ directly receives from an emitter
--
-- TODO: check isomorphism
--
-- > toList_ == toListE
toList_ :: (Monad m) => Emitter m a -> m [a]
toList_ :: Emitter m a -> m [a]
toList_ Emitter m a
e = Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq a -> [a]) -> m (Seq a) -> m [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (StateT (Seq a) m () -> Seq a -> m (Seq a))
-> Seq a -> StateT (Seq a) m () -> m (Seq a)
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT (Seq a) m () -> Seq a -> m (Seq a)
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT Seq a
forall a. Seq a
Seq.empty (Committer (StateT (Seq a) m) a
-> Emitter (StateT (Seq a) m) a -> StateT (Seq a) m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer (StateT (Seq a) m) a
forall (m :: * -> *) a. Monad m => Committer (StateT (Seq a) m) a
stateC ((forall a. m a -> StateT (Seq a) m a)
-> Emitter m a -> Emitter (StateT (Seq a) m) a
forall k (t :: (* -> *) -> k -> *) (m :: * -> *) (n :: * -> *)
       (b :: k).
(MFunctor t, Monad m) =>
(forall a. m a -> n a) -> t m b -> t n b
hoist forall a. m a -> StateT (Seq a) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Emitter m a
e))

-- | Glues a committer and emitter, taking n emits
--
-- >>> glueN 4 <$> pure (contramap (pack . show) toStdout) <*.> fromListE [1..]
-- 1
-- 2
-- 3
-- 4
glueN :: Monad m => Int -> Committer m a -> Emitter m a -> m ()
glueN :: Int -> Committer m a -> Emitter m a -> m ()
glueN Int
n Committer m a
c Emitter m a
e = (StateT Int m () -> Int -> m ()) -> Int -> StateT Int m () -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT Int m () -> Int -> m ()
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a
evalStateT Int
0 (StateT Int m () -> m ()) -> StateT Int m () -> m ()
forall a b. (a -> b) -> a -> b
$ Committer (StateT Int m) a
-> Emitter (StateT Int m) a -> StateT Int m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue ((forall a. m a -> StateT Int m a)
-> Committer m a -> Committer (StateT Int m) a
forall k (t :: (* -> *) -> k -> *) (m :: * -> *) (n :: * -> *)
       (b :: k).
(MFunctor t, Monad m) =>
(forall a. m a -> n a) -> t m b -> t n b
hoist forall a. m a -> StateT Int m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Committer m a
c) (Int -> Emitter m a -> Emitter (StateT Int m) a
forall (m :: * -> *) a.
Monad m =>
Int -> Emitter m a -> Emitter (StateT Int m) a
takeE Int
n Emitter m a
e)

-- | take a list, emit it through a box, and output the committed result.
--
-- The pure nature of this computation is highly useful for testing,
-- especially where parts of the box under investigation has non-deterministic attributes.
fromToList_ :: (Monad m) => [a] -> (Box (StateT (Seq.Seq b, Seq.Seq a) m) b a -> StateT (Seq.Seq b, Seq.Seq a) m r) -> m [b]
fromToList_ :: [a]
-> (Box (StateT (Seq b, Seq a) m) b a -> StateT (Seq b, Seq a) m r)
-> m [b]
fromToList_ [a]
xs Box (StateT (Seq b, Seq a) m) b a -> StateT (Seq b, Seq a) m r
f = do
  (Seq b
res, Seq a
_) <-
    (StateT (Seq b, Seq a) m r -> (Seq b, Seq a) -> m (Seq b, Seq a))
-> (Seq b, Seq a) -> StateT (Seq b, Seq a) m r -> m (Seq b, Seq a)
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT (Seq b, Seq a) m r -> (Seq b, Seq a) -> m (Seq b, Seq a)
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT (Seq b
forall a. Seq a
Seq.empty, [a] -> Seq a
forall a. [a] -> Seq a
Seq.fromList [a]
xs) (StateT (Seq b, Seq a) m r -> m (Seq b, Seq a))
-> StateT (Seq b, Seq a) m r -> m (Seq b, Seq a)
forall a b. (a -> b) -> a -> b
$
      Box (StateT (Seq b, Seq a) m) b a -> StateT (Seq b, Seq a) m r
f (Committer (StateT (Seq b, Seq a) m) b
-> Emitter (StateT (Seq b, Seq a) m) a
-> Box (StateT (Seq b, Seq a) m) b a
forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box ((forall a. StateT (Seq b) m a -> StateT (Seq b, Seq a) m a)
-> Committer (StateT (Seq b) m) b
-> Committer (StateT (Seq b, Seq a) m) b
forall k (t :: (* -> *) -> k -> *) (m :: * -> *) (n :: * -> *)
       (b :: k).
(MFunctor t, Monad m) =>
(forall a. m a -> n a) -> t m b -> t n b
hoist (LensLike' (Zoomed (StateT (Seq b) m) a) (Seq b, Seq a) (Seq b)
-> StateT (Seq b) m a -> StateT (Seq b, Seq a) m a
forall (m :: * -> *) (n :: * -> *) s t c.
Zoom m n s t =>
LensLike' (Zoomed m c) t s -> m c -> n c
zoom LensLike' (Zoomed (StateT (Seq b) m) a) (Seq b, Seq a) (Seq b)
forall s t a b. Field1 s t a b => Lens s t a b
_1) Committer (StateT (Seq b) m) b
forall (m :: * -> *) a. Monad m => Committer (StateT (Seq a) m) a
stateC) ((forall a. StateT (Seq a) m a -> StateT (Seq b, Seq a) m a)
-> Emitter (StateT (Seq a) m) a
-> Emitter (StateT (Seq b, Seq a) m) a
forall k (t :: (* -> *) -> k -> *) (m :: * -> *) (n :: * -> *)
       (b :: k).
(MFunctor t, Monad m) =>
(forall a. m a -> n a) -> t m b -> t n b
hoist (LensLike' (Zoomed (StateT (Seq a) m) a) (Seq b, Seq a) (Seq a)
-> StateT (Seq a) m a -> StateT (Seq b, Seq a) m a
forall (m :: * -> *) (n :: * -> *) s t c.
Zoom m n s t =>
LensLike' (Zoomed m c) t s -> m c -> n c
zoom LensLike' (Zoomed (StateT (Seq a) m) a) (Seq b, Seq a) (Seq a)
forall s t a b. Field2 s t a b => Lens s t a b
_2) Emitter (StateT (Seq a) m) a
forall (m :: * -> *) a. Monad m => Emitter (StateT (Seq a) m) a
stateE))
  [b] -> m [b]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([b] -> m [b]) -> [b] -> m [b]
forall a b. (a -> b) -> a -> b
$ Seq b -> [b]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq b
res

-- | hook a committer action to a queue, creating an emitter continuation
emitQ :: (MonadConc m) => (Committer m a -> m r) -> Cont m (Emitter m a)
emitQ :: (Committer m a -> m r) -> Cont m (Emitter m a)
emitQ Committer m a -> m r
cio = (forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a)
forall (m :: * -> *) a. (forall r. (a -> m r) -> m r) -> Cont m a
Cont ((forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a))
-> (forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a)
forall a b. (a -> b) -> a -> b
$ \Emitter m a -> m r
eio -> (Committer m a -> m r) -> (Emitter m a -> m r) -> m r
forall (m :: * -> *) a l r.
MonadConc m =>
(Committer m a -> m l) -> (Emitter m a -> m r) -> m r
queueE Committer m a -> m r
cio Emitter m a -> m r
eio

-- | hook a committer action to a queue, creating an emitter continuation
commitQ :: (MonadConc m) => (Emitter m a -> m r) -> Cont m (Committer m a)
commitQ :: (Emitter m a -> m r) -> Cont m (Committer m a)
commitQ Emitter m a -> m r
eio = (forall r. (Committer m a -> m r) -> m r) -> Cont m (Committer m a)
forall (m :: * -> *) a. (forall r. (a -> m r) -> m r) -> Cont m a
Cont ((forall r. (Committer m a -> m r) -> m r)
 -> Cont m (Committer m a))
-> (forall r. (Committer m a -> m r) -> m r)
-> Cont m (Committer m a)
forall a b. (a -> b) -> a -> b
$ \Committer m a -> m r
cio -> (Committer m a -> m r) -> (Emitter m a -> m r) -> m r
forall (m :: * -> *) a l r.
MonadConc m =>
(Committer m a -> m l) -> (Emitter m a -> m r) -> m l
queueC Committer m a -> m r
cio Emitter m a -> m r
eio

-- | singleton sink
sink1 :: (Monad m) => (a -> m ()) -> Emitter m a -> m ()
sink1 :: (a -> m ()) -> Emitter m a -> m ()
sink1 a -> m ()
f Emitter m a
e = do
  Maybe a
a <- Emitter m a -> m (Maybe a)
forall (m :: * -> *) a. Emitter m a -> m (Maybe a)
emit Emitter m a
e
  case Maybe a
a of
    Maybe a
Nothing -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just a
a' -> a -> m ()
f a
a'

-- | finite sink
sink :: (MonadConc m) => Int -> (a -> m ()) -> Cont m (Committer m a)
sink :: Int -> (a -> m ()) -> Cont m (Committer m a)
sink Int
n a -> m ()
f = (Emitter m a -> m ()) -> Cont m (Committer m a)
forall (m :: * -> *) a r.
MonadConc m =>
(Emitter m a -> m r) -> Cont m (Committer m a)
commitQ ((Emitter m a -> m ()) -> Cont m (Committer m a))
-> (Emitter m a -> m ()) -> Cont m (Committer m a)
forall a b. (a -> b) -> a -> b
$ Int -> m () -> m ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (m () -> m ()) -> (Emitter m a -> m ()) -> Emitter m a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> m ()) -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
(a -> m ()) -> Emitter m a -> m ()
sink1 a -> m ()
f

-- | singleton source
source1 :: (Monad m) => m a -> Committer m a -> m ()
source1 :: m a -> Committer m a -> m ()
source1 m a
a Committer m a
c = do
  a
a' <- m a
a
  m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c a
a'

-- | finite source
source :: (MonadConc m) => Int -> m a -> Cont m (Emitter m a)
source :: Int -> m a -> Cont m (Emitter m a)
source Int
n m a
f = (Committer m a -> m ()) -> Cont m (Emitter m a)
forall (m :: * -> *) a r.
MonadConc m =>
(Committer m a -> m r) -> Cont m (Emitter m a)
emitQ ((Committer m a -> m ()) -> Cont m (Emitter m a))
-> (Committer m a -> m ()) -> Cont m (Emitter m a)
forall a b. (a -> b) -> a -> b
$ Int -> m () -> m ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (m () -> m ()) -> (Committer m a -> m ()) -> Committer m a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Committer m a -> m ()
forall (m :: * -> *) a. Monad m => m a -> Committer m a -> m ()
source1 m a
f

-- | glues an emitter to a committer, then resupplies the emitter
forkEmit :: (Monad m) => Emitter m a -> Committer m a -> Emitter m a
forkEmit :: Emitter m a -> Committer m a -> Emitter m a
forkEmit Emitter m a
e Committer m a
c =
  m (Maybe a) -> Emitter m a
forall (m :: * -> *) a. m (Maybe a) -> Emitter m a
Emitter (m (Maybe a) -> Emitter m a) -> m (Maybe a) -> Emitter m a
forall a b. (a -> b) -> a -> b
$ do
    Maybe a
a <- Emitter m a -> m (Maybe a)
forall (m :: * -> *) a. Emitter m a -> m (Maybe a)
emit Emitter m a
e
    m () -> (a -> m ()) -> Maybe a -> m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> (a -> m Bool) -> a -> m ()
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c) Maybe a
a
    Maybe a -> m (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
a

-- | fuse a committer to a buffer
queueCommitter :: (MonadConc m) => Committer m a -> Cont m (Committer m a)
queueCommitter :: Committer m a -> Cont m (Committer m a)
queueCommitter Committer m a
c = (forall r. (Committer m a -> m r) -> m r) -> Cont m (Committer m a)
forall (m :: * -> *) a. (forall r. (a -> m r) -> m r) -> Cont m a
Cont ((forall r. (Committer m a -> m r) -> m r)
 -> Cont m (Committer m a))
-> (forall r. (Committer m a -> m r) -> m r)
-> Cont m (Committer m a)
forall a b. (a -> b) -> a -> b
$ \Committer m a -> m r
caction -> (Committer m a -> m r) -> (Emitter m a -> m ()) -> m r
forall (m :: * -> *) a l r.
MonadConc m =>
(Committer m a -> m l) -> (Emitter m a -> m r) -> m l
queueC Committer m a -> m r
caction (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer m a
c)

-- | fuse an emitter to a buffer
queueEmitter :: (MonadConc m) => Emitter m a -> Cont m (Emitter m a)
queueEmitter :: Emitter m a -> Cont m (Emitter m a)
queueEmitter Emitter m a
e = (forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a)
forall (m :: * -> *) a. (forall r. (a -> m r) -> m r) -> Cont m a
Cont ((forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a))
-> (forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a)
forall a b. (a -> b) -> a -> b
$ \Emitter m a -> m r
eaction -> (Committer m a -> m ()) -> (Emitter m a -> m r) -> m r
forall (m :: * -> *) a l r.
MonadConc m =>
(Committer m a -> m l) -> (Emitter m a -> m r) -> m r
queueE (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter m a
e) Emitter m a -> m r
eaction

-- | concurrently run two emitters
--
-- This differs from mappend in that the monoidal (and alternative) instance of an Emitter is left-biased (The left emitter exhausts before the right one is begun). This is non-deterministically concurrent.
concurrentE ::
  (MonadConc m) =>
  Emitter m a ->
  Emitter m a ->
  Cont m (Emitter m a)
concurrentE :: Emitter m a -> Emitter m a -> Cont m (Emitter m a)
concurrentE Emitter m a
e Emitter m a
e' =
  (forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a)
forall (m :: * -> *) a. (forall r. (a -> m r) -> m r) -> Cont m a
Cont ((forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a))
-> (forall r. (Emitter m a -> m r) -> m r) -> Cont m (Emitter m a)
forall a b. (a -> b) -> a -> b
$ \Emitter m a -> m r
eaction ->
    (r, r) -> r
forall a b. (a, b) -> a
fst
      ((r, r) -> r) -> m (r, r) -> m r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m r -> m r -> m (r, r)
forall (m :: * -> *) a b. MonadConc m => m a -> m b -> m (a, b)
C.concurrently
        ((Committer m a -> m ()) -> (Emitter m a -> m r) -> m r
forall (m :: * -> *) a l r.
MonadConc m =>
(Committer m a -> m l) -> (Emitter m a -> m r) -> m r
queueE (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter m a
e) Emitter m a -> m r
eaction)
        ((Committer m a -> m ()) -> (Emitter m a -> m r) -> m r
forall (m :: * -> *) a l r.
MonadConc m =>
(Committer m a -> m l) -> (Emitter m a -> m r) -> m r
queueE (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter m a
e') Emitter m a -> m r
eaction)

-- | run two committers concurrently
concurrentC :: (MonadConc m) => Committer m a -> Committer m a -> Cont m (Committer m a)
concurrentC :: Committer m a -> Committer m a -> Cont m (Committer m a)
concurrentC Committer m a
c Committer m a
c' = Either (Committer m a) (Committer m a) -> Committer m a
forall (m :: * -> *) a.
Either (Committer m a) (Committer m a) -> Committer m a
mergeC (Either (Committer m a) (Committer m a) -> Committer m a)
-> Cont m (Either (Committer m a) (Committer m a))
-> Cont m (Committer m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Committer m a
-> Committer m a -> Cont m (Either (Committer m a) (Committer m a))
forall (m :: * -> *) a.
MonadConc m =>
Committer m a
-> Committer m a -> Cont m (Either (Committer m a) (Committer m a))
eitherC Committer m a
c Committer m a
c'

eitherC ::
  (MonadConc m) =>
  Committer m a ->
  Committer m a ->
  Cont m (Either (Committer m a) (Committer m a))
eitherC :: Committer m a
-> Committer m a -> Cont m (Either (Committer m a) (Committer m a))
eitherC Committer m a
cl Committer m a
cr =
  (forall r. (Either (Committer m a) (Committer m a) -> m r) -> m r)
-> Cont m (Either (Committer m a) (Committer m a))
forall (m :: * -> *) a. (forall r. (a -> m r) -> m r) -> Cont m a
Cont ((forall r. (Either (Committer m a) (Committer m a) -> m r) -> m r)
 -> Cont m (Either (Committer m a) (Committer m a)))
-> (forall r.
    (Either (Committer m a) (Committer m a) -> m r) -> m r)
-> Cont m (Either (Committer m a) (Committer m a))
forall a b. (a -> b) -> a -> b
$
    \Either (Committer m a) (Committer m a) -> m r
kk ->
      (r, r) -> r
forall a b. (a, b) -> a
fst
        ((r, r) -> r) -> m (r, r) -> m r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m r -> m r -> m (r, r)
forall (m :: * -> *) a b. MonadConc m => m a -> m b -> m (a, b)
C.concurrently
          ((Committer m a -> m r) -> (Emitter m a -> m ()) -> m r
forall (m :: * -> *) a l r.
MonadConc m =>
(Committer m a -> m l) -> (Emitter m a -> m r) -> m l
queueC (Either (Committer m a) (Committer m a) -> m r
kk (Either (Committer m a) (Committer m a) -> m r)
-> (Committer m a -> Either (Committer m a) (Committer m a))
-> Committer m a
-> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Committer m a -> Either (Committer m a) (Committer m a)
forall a b. a -> Either a b
Left) (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer m a
cl))
          ((Committer m a -> m r) -> (Emitter m a -> m ()) -> m r
forall (m :: * -> *) a l r.
MonadConc m =>
(Committer m a -> m l) -> (Emitter m a -> m r) -> m l
queueC (Either (Committer m a) (Committer m a) -> m r
kk (Either (Committer m a) (Committer m a) -> m r)
-> (Committer m a -> Either (Committer m a) (Committer m a))
-> Committer m a
-> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Committer m a -> Either (Committer m a) (Committer m a)
forall a b. b -> Either a b
Right) (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer m a
cr))

mergeC :: Either (Committer m a) (Committer m a) -> Committer m a
mergeC :: Either (Committer m a) (Committer m a) -> Committer m a
mergeC Either (Committer m a) (Committer m a)
ec =
  (a -> m Bool) -> Committer m a
forall (m :: * -> *) a. (a -> m Bool) -> Committer m a
Committer ((a -> m Bool) -> Committer m a) -> (a -> m Bool) -> Committer m a
forall a b. (a -> b) -> a -> b
$ \a
a ->
    case Either (Committer m a) (Committer m a)
ec of
      Left Committer m a
lc -> Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
lc a
a
      Right Committer m a
rc -> Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
rc a
a

-- | a box modifier that feeds commits back to the emitter
feedback ::
  (MonadConc m) =>
  (a -> m (Maybe b)) ->
  Cont m (Box m b a) ->
  Cont m (Box m b a)
feedback :: (a -> m (Maybe b)) -> Cont m (Box m b a) -> Cont m (Box m b a)
feedback a -> m (Maybe b)
f Cont m (Box m b a)
box =
  (forall r. (Box m b a -> m r) -> m r) -> Cont m (Box m b a)
forall (m :: * -> *) a. (forall r. (a -> m r) -> m r) -> Cont m a
Cont ((forall r. (Box m b a -> m r) -> m r) -> Cont m (Box m b a))
-> (forall r. (Box m b a -> m r) -> m r) -> Cont m (Box m b a)
forall a b. (a -> b) -> a -> b
$ \Box m b a -> m r
bio ->
    Cont m (Box m b a) -> forall r. (Box m b a -> m r) -> m r
forall (m :: * -> *) a. Cont m a -> forall r. (a -> m r) -> m r
with Cont m (Box m b a)
box ((Box m b a -> m r) -> m r) -> (Box m b a -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \(Box Committer m b
c Emitter m a
e) -> do
      Committer m b -> Emitter m b -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer m b
c ((a -> m (Maybe b)) -> Emitter m a -> Emitter m b
forall (m :: * -> *) a b.
Monad m =>
(a -> m (Maybe b)) -> Emitter m a -> Emitter m b
mapE a -> m (Maybe b)
f Emitter m a
e)
      Box m b a -> m r
bio (Committer m b -> Emitter m a -> Box m b a
forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box Committer m b
c Emitter m a
e)