{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RebindableSyntax #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}
module Box.Connectors
( qList,
popList,
pushList,
pushListN,
sink,
source,
forkEmit,
bufferCommitter,
bufferEmitter,
concurrentE,
concurrentC,
)
where
import Box.Box
import Box.Committer
import Box.Codensity
import Box.Emitter
import Box.Queue
import Control.Concurrent.Classy.Async as C
import Control.Monad.Conc.Class (MonadConc)
import Control.Monad.State.Lazy
import Data.Foldable
import qualified Data.Sequence as Seq
import Prelude
import Box.Functor
qList :: (MonadConc m) => [a] -> CoEmitter m a
qList :: [a] -> CoEmitter m a
qList [a]
xs = Queue a -> (Committer m a -> m Bool) -> CoEmitter m a
forall (m :: * -> *) a r.
MonadConc m =>
Queue a -> (Committer m a -> m r) -> CoEmitter m a
emitQ Queue a
forall a. Queue a
Unbounded (\Committer m a
c -> ([Bool] -> Bool) -> m [Bool] -> m Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Bool] -> Bool
forall (t :: * -> *). Foldable t => t Bool -> Bool
and ((a -> m Bool) -> [a] -> m [Bool]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c) [a]
xs))
popList :: Monad m => [a] -> Committer m a -> m ()
popList :: [a] -> Committer m a -> m ()
popList [a]
xs Committer m a
c = (StateT (Seq a) m () -> Seq a -> m ())
-> Seq a -> StateT (Seq a) m () -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT (Seq a) m () -> Seq a -> m ()
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a
evalStateT ([a] -> Seq a
forall a. [a] -> Seq a
Seq.fromList [a]
xs) (StateT (Seq a) m () -> m ()) -> StateT (Seq a) m () -> m ()
forall a b. (a -> b) -> a -> b
$ Committer (StateT (Seq a) m) a
-> Emitter (StateT (Seq a) m) a -> StateT (Seq a) m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue ((forall x. m x -> StateT (Seq a) m x)
-> Committer m a -> Committer (StateT (Seq a) m) a
forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall x. m x -> StateT (Seq a) m x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Committer m a
c) Emitter (StateT (Seq a) m) a
forall (m :: * -> *) a. Monad m => Emitter (StateT (Seq a) m) a
pop
pushList :: (Monad m) => Emitter m a -> m [a]
pushList :: Emitter m a -> m [a]
pushList Emitter m a
e = Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq a -> [a]) -> m (Seq a) -> m [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (StateT (Seq a) m () -> Seq a -> m (Seq a))
-> Seq a -> StateT (Seq a) m () -> m (Seq a)
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT (Seq a) m () -> Seq a -> m (Seq a)
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT Seq a
forall a. Seq a
Seq.empty (Committer (StateT (Seq a) m) a
-> Emitter (StateT (Seq a) m) a -> StateT (Seq a) m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer (StateT (Seq a) m) a
forall (m :: * -> *) a. Monad m => Committer (StateT (Seq a) m) a
push ((forall x. m x -> StateT (Seq a) m x)
-> Emitter m a -> Emitter (StateT (Seq a) m) a
forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall x. m x -> StateT (Seq a) m x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Emitter m a
e))
pushListN :: (Monad m) => Int -> Emitter m a -> m [a]
pushListN :: Int -> Emitter m a -> m [a]
pushListN Int
n Emitter m a
e = Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq a -> [a]) -> m (Seq a) -> m [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (StateT (Seq a) m () -> Seq a -> m (Seq a))
-> Seq a -> StateT (Seq a) m () -> m (Seq a)
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT (Seq a) m () -> Seq a -> m (Seq a)
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT Seq a
forall a. Seq a
Seq.empty (Int
-> Committer (StateT (Seq a) m) a
-> Emitter (StateT (Seq a) m) a
-> StateT (Seq a) m ()
forall (m :: * -> *) a.
Monad m =>
Int -> Committer m a -> Emitter m a -> m ()
glueN Int
n Committer (StateT (Seq a) m) a
forall (m :: * -> *) a. Monad m => Committer (StateT (Seq a) m) a
push ((forall x. m x -> StateT (Seq a) m x)
-> Emitter m a -> Emitter (StateT (Seq a) m) a
forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall x. m x -> StateT (Seq a) m x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Emitter m a
e))
sink1 :: (Monad m) => (a -> m ()) -> Emitter m a -> m ()
sink1 :: (a -> m ()) -> Emitter m a -> m ()
sink1 a -> m ()
f Emitter m a
e = do
Maybe a
a <- Emitter m a -> m (Maybe a)
forall (m :: * -> *) a. Emitter m a -> m (Maybe a)
emit Emitter m a
e
case Maybe a
a of
Maybe a
Nothing -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just a
a' -> a -> m ()
f a
a'
sink :: (MonadConc m) => Int -> (a -> m ()) -> CoCommitter m a
sink :: Int -> (a -> m ()) -> CoCommitter m a
sink Int
n a -> m ()
f = Queue a -> (Emitter m a -> m ()) -> CoCommitter m a
forall (m :: * -> *) a r.
MonadConc m =>
Queue a -> (Emitter m a -> m r) -> CoCommitter m a
commitQ Queue a
forall a. Queue a
Unbounded ((Emitter m a -> m ()) -> CoCommitter m a)
-> (Emitter m a -> m ()) -> CoCommitter m a
forall a b. (a -> b) -> a -> b
$ Int -> m () -> m ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (m () -> m ()) -> (Emitter m a -> m ()) -> Emitter m a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> m ()) -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
(a -> m ()) -> Emitter m a -> m ()
sink1 a -> m ()
f
source1 :: (Monad m) => m a -> Committer m a -> m ()
source1 :: m a -> Committer m a -> m ()
source1 m a
a Committer m a
c = do
a
a' <- m a
a
m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c a
a'
source :: (MonadConc m) => Int -> m a -> CoEmitter m a
source :: Int -> m a -> CoEmitter m a
source Int
n m a
f = Queue a -> (Committer m a -> m ()) -> CoEmitter m a
forall (m :: * -> *) a r.
MonadConc m =>
Queue a -> (Committer m a -> m r) -> CoEmitter m a
emitQ Queue a
forall a. Queue a
Unbounded ((Committer m a -> m ()) -> CoEmitter m a)
-> (Committer m a -> m ()) -> CoEmitter m a
forall a b. (a -> b) -> a -> b
$ Int -> m () -> m ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (m () -> m ()) -> (Committer m a -> m ()) -> Committer m a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Committer m a -> m ()
forall (m :: * -> *) a. Monad m => m a -> Committer m a -> m ()
source1 m a
f
forkEmit :: (Monad m) => Emitter m a -> Committer m a -> Emitter m a
forkEmit :: Emitter m a -> Committer m a -> Emitter m a
forkEmit Emitter m a
e Committer m a
c =
m (Maybe a) -> Emitter m a
forall (m :: * -> *) a. m (Maybe a) -> Emitter m a
Emitter (m (Maybe a) -> Emitter m a) -> m (Maybe a) -> Emitter m a
forall a b. (a -> b) -> a -> b
$ do
Maybe a
a <- Emitter m a -> m (Maybe a)
forall (m :: * -> *) a. Emitter m a -> m (Maybe a)
emit Emitter m a
e
m () -> (a -> m ()) -> Maybe a -> m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> (a -> m Bool) -> a -> m ()
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c) Maybe a
a
Maybe a -> m (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
a
bufferCommitter :: (MonadConc m) => Committer m a -> CoCommitter m a
bufferCommitter :: Committer m a -> CoCommitter m a
bufferCommitter Committer m a
c = (forall b. (Committer m a -> m b) -> m b) -> CoCommitter m a
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b. (Committer m a -> m b) -> m b) -> CoCommitter m a)
-> (forall b. (Committer m a -> m b) -> m b) -> CoCommitter m a
forall a b. (a -> b) -> a -> b
$ \Committer m a -> m b
caction -> Queue a -> (Committer m a -> m b) -> (Emitter m a -> m ()) -> m b
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a -> (Committer m a -> m l) -> (Emitter m a -> m r) -> m l
queueL Queue a
forall a. Queue a
Unbounded Committer m a -> m b
caction (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer m a
c)
bufferEmitter :: (MonadConc m) => Emitter m a -> CoEmitter m a
bufferEmitter :: Emitter m a -> CoEmitter m a
bufferEmitter Emitter m a
e = (forall b. (Emitter m a -> m b) -> m b) -> CoEmitter m a
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b. (Emitter m a -> m b) -> m b) -> CoEmitter m a)
-> (forall b. (Emitter m a -> m b) -> m b) -> CoEmitter m a
forall a b. (a -> b) -> a -> b
$ \Emitter m a -> m b
eaction -> Queue a -> (Committer m a -> m ()) -> (Emitter m a -> m b) -> m b
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a -> (Committer m a -> m l) -> (Emitter m a -> m r) -> m r
queueR Queue a
forall a. Queue a
Unbounded (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter m a
e) Emitter m a -> m b
eaction
concurrentE :: MonadConc f =>
Queue a -> Emitter f a -> Emitter f a -> CoEmitter f a
concurrentE :: Queue a -> Emitter f a -> Emitter f a -> CoEmitter f a
concurrentE Queue a
q Emitter f a
e Emitter f a
e' =
(forall b. (Emitter f a -> f b) -> f b) -> CoEmitter f a
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b. (Emitter f a -> f b) -> f b) -> CoEmitter f a)
-> (forall b. (Emitter f a -> f b) -> f b) -> CoEmitter f a
forall a b. (a -> b) -> a -> b
$ \Emitter f a -> f b
eaction -> ((), b) -> b
forall a b. (a, b) -> b
snd (((), b) -> b)
-> ((((), b), ((), b)) -> ((), b)) -> (((), b), ((), b)) -> b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (((), b), ((), b)) -> ((), b)
forall a b. (a, b) -> a
fst ((((), b), ((), b)) -> b) -> f (((), b), ((), b)) -> f b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> f ((), b) -> f ((), b) -> f (((), b), ((), b))
forall (m :: * -> *) a b. MonadConc m => m a -> m b -> m (a, b)
C.concurrently (Queue a
-> (Committer f a -> f ()) -> (Emitter f a -> f b) -> f ((), b)
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a
-> (Committer m a -> m l) -> (Emitter m a -> m r) -> m (l, r)
queue Queue a
q (Committer f a -> Emitter f a -> f ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter f a
e) Emitter f a -> f b
eaction) (Queue a
-> (Committer f a -> f ()) -> (Emitter f a -> f b) -> f ((), b)
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a
-> (Committer m a -> m l) -> (Emitter m a -> m r) -> m (l, r)
queue Queue a
q (Committer f a -> Emitter f a -> f ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter f a
e') Emitter f a -> f b
eaction)
concurrentC :: (MonadConc m) => Queue a -> Committer m a -> Committer m a -> CoCommitter m a
concurrentC :: Queue a -> Committer m a -> Committer m a -> CoCommitter m a
concurrentC Queue a
q Committer m a
c Committer m a
c' = Either (Committer m a) (Committer m a) -> Committer m a
forall (m :: * -> *) a.
Either (Committer m a) (Committer m a) -> Committer m a
mergeC (Either (Committer m a) (Committer m a) -> Committer m a)
-> Codensity m (Either (Committer m a) (Committer m a))
-> CoCommitter m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Queue a
-> Committer m a
-> Committer m a
-> Codensity m (Either (Committer m a) (Committer m a))
forall (m :: * -> *) a.
MonadConc m =>
Queue a
-> Committer m a
-> Committer m a
-> Codensity m (Either (Committer m a) (Committer m a))
eitherC Queue a
q Committer m a
c Committer m a
c'
eitherC ::
(MonadConc m) =>
Queue a ->
Committer m a ->
Committer m a ->
Codensity m (Either (Committer m a) (Committer m a))
eitherC :: Queue a
-> Committer m a
-> Committer m a
-> Codensity m (Either (Committer m a) (Committer m a))
eitherC Queue a
q Committer m a
cl Committer m a
cr =
(forall b. (Either (Committer m a) (Committer m a) -> m b) -> m b)
-> Codensity m (Either (Committer m a) (Committer m a))
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b. (Either (Committer m a) (Committer m a) -> m b) -> m b)
-> Codensity m (Either (Committer m a) (Committer m a)))
-> (forall b.
(Either (Committer m a) (Committer m a) -> m b) -> m b)
-> Codensity m (Either (Committer m a) (Committer m a))
forall a b. (a -> b) -> a -> b
$
\Either (Committer m a) (Committer m a) -> m b
kk ->
(b, b) -> b
forall a b. (a, b) -> a
fst
((b, b) -> b) -> m (b, b) -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m b -> m b -> m (b, b)
forall (m :: * -> *) a b. MonadConc m => m a -> m b -> m (a, b)
C.concurrently
(Queue a -> (Committer m a -> m b) -> (Emitter m a -> m ()) -> m b
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a -> (Committer m a -> m l) -> (Emitter m a -> m r) -> m l
queueL Queue a
q (Either (Committer m a) (Committer m a) -> m b
kk (Either (Committer m a) (Committer m a) -> m b)
-> (Committer m a -> Either (Committer m a) (Committer m a))
-> Committer m a
-> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Committer m a -> Either (Committer m a) (Committer m a)
forall a b. a -> Either a b
Left) (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer m a
cl))
(Queue a -> (Committer m a -> m b) -> (Emitter m a -> m ()) -> m b
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a -> (Committer m a -> m l) -> (Emitter m a -> m r) -> m l
queueL Queue a
q (Either (Committer m a) (Committer m a) -> m b
kk (Either (Committer m a) (Committer m a) -> m b)
-> (Committer m a -> Either (Committer m a) (Committer m a))
-> Committer m a
-> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Committer m a -> Either (Committer m a) (Committer m a)
forall a b. b -> Either a b
Right) (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer m a
cr))
mergeC :: Either (Committer m a) (Committer m a) -> Committer m a
mergeC :: Either (Committer m a) (Committer m a) -> Committer m a
mergeC Either (Committer m a) (Committer m a)
ec =
(a -> m Bool) -> Committer m a
forall (m :: * -> *) a. (a -> m Bool) -> Committer m a
Committer ((a -> m Bool) -> Committer m a) -> (a -> m Bool) -> Committer m a
forall a b. (a -> b) -> a -> b
$ \a
a ->
case Either (Committer m a) (Committer m a)
ec of
Left Committer m a
lc -> Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
lc a
a
Right Committer m a
rc -> Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
rc a
a