{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RebindableSyntax #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}
module Box.Connectors
( qList,
qListWith,
popList,
pushList,
pushListN,
sink,
sinkWith,
source,
sourceWith,
forkEmit,
bufferCommitter,
bufferEmitter,
concurrentE,
concurrentC,
takeQ,
evalEmitter,
evalEmitterWith,
)
where
import Box.Box
import Box.Codensity
import Box.Committer
import Box.Emitter
import Box.Functor
import Box.Queue
import Control.Concurrent.Async
import Control.Monad.State.Lazy
import Data.Foldable
import qualified Data.Sequence as Seq
import Prelude
qList :: [a] -> CoEmitter IO a
qList :: forall a. [a] -> CoEmitter IO a
qList [a]
xs = forall a. Queue a -> [a] -> CoEmitter IO a
qListWith forall a. Queue a
Unbounded [a]
xs
qListWith :: Queue a -> [a] -> CoEmitter IO a
qListWith :: forall a. Queue a -> [a] -> CoEmitter IO a
qListWith Queue a
q [a]
xs = forall a r. Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ Queue a
q (\Committer IO a
c -> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (t :: * -> *). Foldable t => t Bool -> Bool
and (forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer IO a
c) [a]
xs))
popList :: Monad m => [a] -> Committer m a -> m ()
popList :: forall (m :: * -> *) a. Monad m => [a] -> Committer m a -> m ()
popList [a]
xs Committer m a
c = forall (m :: * -> *) s a.
Monad m =>
s -> Committer m a -> Emitter (StateT s m) a -> m ()
glueES (forall a. [a] -> Seq a
Seq.fromList [a]
xs) Committer m a
c forall (m :: * -> *) a. Monad m => Emitter (StateT (Seq a) m) a
pop
pushList :: (Monad m) => Emitter m a -> m [a]
pushList :: forall (m :: * -> *) a. Monad m => Emitter m a -> m [a]
pushList Emitter m a
e = forall (t :: * -> *) a. Foldable t => t a -> [a]
toList forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT forall a. Seq a
Seq.empty (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue forall (m :: * -> *) a. Monad m => Committer (StateT (Seq a) m) a
push (forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Emitter m a
e))
pushListN :: (Monad m) => Int -> Emitter m a -> m [a]
pushListN :: forall (m :: * -> *) a. Monad m => Int -> Emitter m a -> m [a]
pushListN Int
n Emitter m a
e = forall (t :: * -> *) a. Foldable t => t a -> [a]
toList forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT forall a. Seq a
Seq.empty (forall (m :: * -> *) a.
Monad m =>
Int -> Committer m a -> Emitter m a -> m ()
glueN Int
n forall (m :: * -> *) a. Monad m => Committer (StateT (Seq a) m) a
push (forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Emitter m a
e))
sink1 :: (Monad m) => (a -> m ()) -> Emitter m a -> m ()
sink1 :: forall (m :: * -> *) a.
Monad m =>
(a -> m ()) -> Emitter m a -> m ()
sink1 a -> m ()
f Emitter m a
e = do
Maybe a
a <- forall (m :: * -> *) a. Emitter m a -> m (Maybe a)
emit Emitter m a
e
case Maybe a
a of
Maybe a
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just a
a' -> a -> m ()
f a
a'
sink :: Int -> (a -> IO ()) -> CoCommitter IO a
sink :: forall a. Int -> (a -> IO ()) -> CoCommitter IO a
sink Int
n a -> IO ()
f = forall a. Queue a -> Int -> (a -> IO ()) -> CoCommitter IO a
sinkWith forall a. Queue a
Unbounded Int
n a -> IO ()
f
sinkWith :: Queue a -> Int -> (a -> IO ()) -> CoCommitter IO a
sinkWith :: forall a. Queue a -> Int -> (a -> IO ()) -> CoCommitter IO a
sinkWith Queue a
q Int
n a -> IO ()
f = forall a r. Queue a -> (Emitter IO a -> IO r) -> CoCommitter IO a
commitQ Queue a
q forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
Monad m =>
(a -> m ()) -> Emitter m a -> m ()
sink1 a -> IO ()
f
source1 :: (Monad m) => m a -> Committer m a -> m ()
source1 :: forall (m :: * -> *) a. Monad m => m a -> Committer m a -> m ()
source1 m a
a Committer m a
c = do
a
a' <- m a
a
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c a
a'
source :: Int -> IO a -> CoEmitter IO a
source :: forall a. Int -> IO a -> CoEmitter IO a
source Int
n IO a
f = forall a. Queue a -> Int -> IO a -> CoEmitter IO a
sourceWith forall a. Queue a
Unbounded Int
n IO a
f
sourceWith :: Queue a -> Int -> IO a -> CoEmitter IO a
sourceWith :: forall a. Queue a -> Int -> IO a -> CoEmitter IO a
sourceWith Queue a
q Int
n IO a
f = forall a r. Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ Queue a
q forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => m a -> Committer m a -> m ()
source1 IO a
f
forkEmit :: (Monad m) => Emitter m a -> Committer m a -> Emitter m a
forkEmit :: forall (m :: * -> *) a.
Monad m =>
Emitter m a -> Committer m a -> Emitter m a
forkEmit Emitter m a
e Committer m a
c =
forall (m :: * -> *) a. m (Maybe a) -> Emitter m a
Emitter forall a b. (a -> b) -> a -> b
$ do
Maybe a
a <- forall (m :: * -> *) a. Emitter m a -> m (Maybe a)
emit Emitter m a
e
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (forall (f :: * -> *) a. Functor f => f a -> f ()
void forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c) Maybe a
a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
a
bufferCommitter :: Committer IO a -> CoCommitter IO a
bufferCommitter :: forall a. Committer IO a -> CoCommitter IO a
bufferCommitter Committer IO a
c = forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$ \Committer IO a -> IO b
caction -> forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO l
queueL forall a. Queue a
Unbounded Committer IO a -> IO b
caction (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer IO a
c)
bufferEmitter :: Emitter IO a -> CoEmitter IO a
bufferEmitter :: forall a. Emitter IO a -> CoEmitter IO a
bufferEmitter Emitter IO a
e = forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$ \Emitter IO a -> IO b
eaction -> forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO r
queueR forall a. Queue a
Unbounded (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter IO a
e) Emitter IO a -> IO b
eaction
concurrentE ::
Queue a ->
Emitter IO a ->
Emitter IO a ->
CoEmitter IO a
concurrentE :: forall a. Queue a -> Emitter IO a -> Emitter IO a -> CoEmitter IO a
concurrentE Queue a
q Emitter IO a
e Emitter IO a
e' =
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$ \Emitter IO a -> IO b
eaction -> forall a b. (a, b) -> b
snd forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> a
fst forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a b. IO a -> IO b -> IO (a, b)
concurrently (forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO (l, r)
queue Queue a
q (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter IO a
e) Emitter IO a -> IO b
eaction) (forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO (l, r)
queue Queue a
q (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter IO a
e') Emitter IO a -> IO b
eaction)
concurrentC :: Queue a -> Committer IO a -> Committer IO a -> CoCommitter IO a
concurrentC :: forall a.
Queue a -> Committer IO a -> Committer IO a -> CoCommitter IO a
concurrentC Queue a
q Committer IO a
c Committer IO a
c' = forall a.
Either (Committer IO a) (Committer IO a) -> Committer IO a
mergeC forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a.
Queue a
-> Committer IO a
-> Committer IO a
-> Codensity IO (Either (Committer IO a) (Committer IO a))
eitherC Queue a
q Committer IO a
c Committer IO a
c'
eitherC ::
Queue a ->
Committer IO a ->
Committer IO a ->
Codensity IO (Either (Committer IO a) (Committer IO a))
eitherC :: forall a.
Queue a
-> Committer IO a
-> Committer IO a
-> Codensity IO (Either (Committer IO a) (Committer IO a))
eitherC Queue a
q Committer IO a
cl Committer IO a
cr =
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$
\Either (Committer IO a) (Committer IO a) -> IO b
kk ->
forall a b. (a, b) -> a
fst
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a b. IO a -> IO b -> IO (a, b)
concurrently
(forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO l
queueL Queue a
q (Either (Committer IO a) (Committer IO a) -> IO b
kk forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> Either a b
Left) (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer IO a
cl))
(forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO l
queueL Queue a
q (Either (Committer IO a) (Committer IO a) -> IO b
kk forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right) (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer IO a
cr))
mergeC :: Either (Committer IO a) (Committer IO a) -> Committer IO a
mergeC :: forall a.
Either (Committer IO a) (Committer IO a) -> Committer IO a
mergeC Either (Committer IO a) (Committer IO a)
ec =
forall (m :: * -> *) a. (a -> m Bool) -> Committer m a
Committer forall a b. (a -> b) -> a -> b
$ \a
a ->
case Either (Committer IO a) (Committer IO a)
ec of
Left Committer IO a
lc -> forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer IO a
lc a
a
Right Committer IO a
rc -> forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer IO a
rc a
a
takeQ :: Queue a -> Int -> Emitter IO a -> CoEmitter IO a
takeQ :: forall a. Queue a -> Int -> Emitter IO a -> CoEmitter IO a
takeQ Queue a
q Int
n Emitter IO a
e = forall a r. Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ Queue a
q forall a b. (a -> b) -> a -> b
$ \Committer IO a
c -> forall (m :: * -> *) s a.
Monad m =>
s -> Committer m a -> Emitter (StateT s m) a -> m ()
glueES Int
0 Committer IO a
c (forall (m :: * -> *) a.
Monad m =>
Int -> Emitter m a -> Emitter (StateT Int m) a
takeE Int
n Emitter IO a
e)
evalEmitter :: s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitter :: forall s a. s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitter s
s Emitter (StateT s IO) a
e = forall a s.
Queue a -> s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitterWith forall a. Queue a
Unbounded s
s Emitter (StateT s IO) a
e
evalEmitterWith :: Queue a -> s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitterWith :: forall a s.
Queue a -> s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitterWith Queue a
q s
s Emitter (StateT s IO) a
e = forall a r. Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ Queue a
q forall a b. (a -> b) -> a -> b
$ \Committer IO a
c -> forall (m :: * -> *) s a.
Monad m =>
s -> Committer m a -> Emitter (StateT s m) a -> m ()
glueES s
s Committer IO a
c Emitter (StateT s IO) a
e