{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RebindableSyntax #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}

-- | Various ways to connect things up.
module Box.Connectors
  ( qList,
    popList,
    pushList,
    pushListN,
    sink,
    source,
    forkEmit,
    bufferCommitter,
    bufferEmitter,
    concurrentE,
    concurrentC,
  )
where

import Box.Box
import Box.Codensity
import Box.Committer
import Box.Emitter
import Box.Functor
import Box.Queue
import Control.Concurrent.Classy.Async as C
import Control.Monad.Conc.Class (MonadConc)
import Control.Monad.State.Lazy
import Data.Foldable
import qualified Data.Sequence as Seq
import Prelude

-- $setup
-- >>> :set -XOverloadedStrings
-- >>> import Box
-- >>> import Prelude
-- >>> import Data.Bool
-- >>> import Control.Monad

-- | Queue a list.
--
-- >>> pushList <$|> qList [1,2,3]
-- [1,2,3]
qList :: (MonadConc m) => [a] -> CoEmitter m a
qList :: [a] -> CoEmitter m a
qList [a]
xs = Queue a -> (Committer m a -> m Bool) -> CoEmitter m a
forall (m :: * -> *) a r.
MonadConc m =>
Queue a -> (Committer m a -> m r) -> CoEmitter m a
emitQ Queue a
forall a. Queue a
Unbounded (\Committer m a
c -> ([Bool] -> Bool) -> m [Bool] -> m Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Bool] -> Bool
forall (t :: * -> *). Foldable t => t Bool -> Bool
and ((a -> m Bool) -> [a] -> m [Bool]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c) [a]
xs))

-- | Directly supply a list to a committer action, via pop.
--
-- >>> popList [1..3] showStdout
-- 1
-- 2
-- 3
popList :: Monad m => [a] -> Committer m a -> m ()
popList :: [a] -> Committer m a -> m ()
popList [a]
xs Committer m a
c = (StateT (Seq a) m () -> Seq a -> m ())
-> Seq a -> StateT (Seq a) m () -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT (Seq a) m () -> Seq a -> m ()
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a
evalStateT ([a] -> Seq a
forall a. [a] -> Seq a
Seq.fromList [a]
xs) (StateT (Seq a) m () -> m ()) -> StateT (Seq a) m () -> m ()
forall a b. (a -> b) -> a -> b
$ Committer (StateT (Seq a) m) a
-> Emitter (StateT (Seq a) m) a -> StateT (Seq a) m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue ((forall x. m x -> StateT (Seq a) m x)
-> Committer m a -> Committer (StateT (Seq a) m) a
forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall x. m x -> StateT (Seq a) m x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Committer m a
c) Emitter (StateT (Seq a) m) a
forall (m :: * -> *) a. Monad m => Emitter (StateT (Seq a) m) a
pop

-- | Push an Emitter into a list, via push.
--
-- >>> pushList <$|> qList [1..3]
-- [1,2,3]
pushList :: (Monad m) => Emitter m a -> m [a]
pushList :: Emitter m a -> m [a]
pushList Emitter m a
e = Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq a -> [a]) -> m (Seq a) -> m [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (StateT (Seq a) m () -> Seq a -> m (Seq a))
-> Seq a -> StateT (Seq a) m () -> m (Seq a)
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT (Seq a) m () -> Seq a -> m (Seq a)
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT Seq a
forall a. Seq a
Seq.empty (Committer (StateT (Seq a) m) a
-> Emitter (StateT (Seq a) m) a -> StateT (Seq a) m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer (StateT (Seq a) m) a
forall (m :: * -> *) a. Monad m => Committer (StateT (Seq a) m) a
push ((forall x. m x -> StateT (Seq a) m x)
-> Emitter m a -> Emitter (StateT (Seq a) m) a
forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall x. m x -> StateT (Seq a) m x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Emitter m a
e))

-- | Push an Emitter into a list, finitely.
--
-- >>> pushListN 2 <$|> qList [1..3]
-- [1,2]
pushListN :: (Monad m) => Int -> Emitter m a -> m [a]
pushListN :: Int -> Emitter m a -> m [a]
pushListN Int
n Emitter m a
e = Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq a -> [a]) -> m (Seq a) -> m [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (StateT (Seq a) m () -> Seq a -> m (Seq a))
-> Seq a -> StateT (Seq a) m () -> m (Seq a)
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT (Seq a) m () -> Seq a -> m (Seq a)
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT Seq a
forall a. Seq a
Seq.empty (Int
-> Committer (StateT (Seq a) m) a
-> Emitter (StateT (Seq a) m) a
-> StateT (Seq a) m ()
forall (m :: * -> *) a.
Monad m =>
Int -> Committer m a -> Emitter m a -> m ()
glueN Int
n Committer (StateT (Seq a) m) a
forall (m :: * -> *) a. Monad m => Committer (StateT (Seq a) m) a
push ((forall x. m x -> StateT (Seq a) m x)
-> Emitter m a -> Emitter (StateT (Seq a) m) a
forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall x. m x -> StateT (Seq a) m x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Emitter m a
e))

-- singleton sink
sink1 :: (Monad m) => (a -> m ()) -> Emitter m a -> m ()
sink1 :: (a -> m ()) -> Emitter m a -> m ()
sink1 a -> m ()
f Emitter m a
e = do
  Maybe a
a <- Emitter m a -> m (Maybe a)
forall (m :: * -> *) a. Emitter m a -> m (Maybe a)
emit Emitter m a
e
  case Maybe a
a of
    Maybe a
Nothing -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just a
a' -> a -> m ()
f a
a'

-- | Create a finite Committer.
--
-- >>> glue <$> sink 2 print <*|> qList [1..3]
-- 1
-- 2
sink :: (MonadConc m) => Int -> (a -> m ()) -> CoCommitter m a
sink :: Int -> (a -> m ()) -> CoCommitter m a
sink Int
n a -> m ()
f = Queue a -> (Emitter m a -> m ()) -> CoCommitter m a
forall (m :: * -> *) a r.
MonadConc m =>
Queue a -> (Emitter m a -> m r) -> CoCommitter m a
commitQ Queue a
forall a. Queue a
Unbounded ((Emitter m a -> m ()) -> CoCommitter m a)
-> (Emitter m a -> m ()) -> CoCommitter m a
forall a b. (a -> b) -> a -> b
$ Int -> m () -> m ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (m () -> m ()) -> (Emitter m a -> m ()) -> Emitter m a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> m ()) -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
(a -> m ()) -> Emitter m a -> m ()
sink1 a -> m ()
f

-- singleton source
source1 :: (Monad m) => m a -> Committer m a -> m ()
source1 :: m a -> Committer m a -> m ()
source1 m a
a Committer m a
c = do
  a
a' <- m a
a
  m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c a
a'

-- | Create a finite Emitter.
--
-- >>> glue toStdout <$|> source 2 (pure "hi")
-- hi
-- hi
source :: (MonadConc m) => Int -> m a -> CoEmitter m a
source :: Int -> m a -> CoEmitter m a
source Int
n m a
f = Queue a -> (Committer m a -> m ()) -> CoEmitter m a
forall (m :: * -> *) a r.
MonadConc m =>
Queue a -> (Committer m a -> m r) -> CoEmitter m a
emitQ Queue a
forall a. Queue a
Unbounded ((Committer m a -> m ()) -> CoEmitter m a)
-> (Committer m a -> m ()) -> CoEmitter m a
forall a b. (a -> b) -> a -> b
$ Int -> m () -> m ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (m () -> m ()) -> (Committer m a -> m ()) -> Committer m a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Committer m a -> m ()
forall (m :: * -> *) a. Monad m => m a -> Committer m a -> m ()
source1 m a
f

-- | Glues an emitter to a committer, then resupplies the emitter.
--
-- >>> (c1,l1) <- refCommitter :: IO (Committer IO Int, IO [Int])
-- >>> close $ toListM <$> (forkEmit <$> (qList [1..3]) <*> pure c1)
-- [1,2,3]
--
-- >>> l1
-- [1,2,3]
forkEmit :: (Monad m) => Emitter m a -> Committer m a -> Emitter m a
forkEmit :: Emitter m a -> Committer m a -> Emitter m a
forkEmit Emitter m a
e Committer m a
c =
  m (Maybe a) -> Emitter m a
forall (m :: * -> *) a. m (Maybe a) -> Emitter m a
Emitter (m (Maybe a) -> Emitter m a) -> m (Maybe a) -> Emitter m a
forall a b. (a -> b) -> a -> b
$ do
    Maybe a
a <- Emitter m a -> m (Maybe a)
forall (m :: * -> *) a. Emitter m a -> m (Maybe a)
emit Emitter m a
e
    m () -> (a -> m ()) -> Maybe a -> m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> (a -> m Bool) -> a -> m ()
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c) Maybe a
a
    Maybe a -> m (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
a

-- | Buffer a committer.
bufferCommitter :: (MonadConc m) => Committer m a -> CoCommitter m a
bufferCommitter :: Committer m a -> CoCommitter m a
bufferCommitter Committer m a
c = (forall b. (Committer m a -> m b) -> m b) -> CoCommitter m a
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b. (Committer m a -> m b) -> m b) -> CoCommitter m a)
-> (forall b. (Committer m a -> m b) -> m b) -> CoCommitter m a
forall a b. (a -> b) -> a -> b
$ \Committer m a -> m b
caction -> Queue a -> (Committer m a -> m b) -> (Emitter m a -> m ()) -> m b
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a -> (Committer m a -> m l) -> (Emitter m a -> m r) -> m l
queueL Queue a
forall a. Queue a
Unbounded Committer m a -> m b
caction (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer m a
c)

-- | Buffer an emitter.
bufferEmitter :: (MonadConc m) => Emitter m a -> CoEmitter m a
bufferEmitter :: Emitter m a -> CoEmitter m a
bufferEmitter Emitter m a
e = (forall b. (Emitter m a -> m b) -> m b) -> CoEmitter m a
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b. (Emitter m a -> m b) -> m b) -> CoEmitter m a)
-> (forall b. (Emitter m a -> m b) -> m b) -> CoEmitter m a
forall a b. (a -> b) -> a -> b
$ \Emitter m a -> m b
eaction -> Queue a -> (Committer m a -> m ()) -> (Emitter m a -> m b) -> m b
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a -> (Committer m a -> m l) -> (Emitter m a -> m r) -> m r
queueR Queue a
forall a. Queue a
Unbounded (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter m a
e) Emitter m a -> m b
eaction

-- | Concurrently run two emitters.
--
-- This differs to (<>), which is left-biased.
--
-- Note that functions such as toListM, which complete on the first Nothing emitted, will not work as expected.
--
-- >>> close $ (fmap toListM) (join $ concurrentE Single <$> qList [1..3] <*> qList [5..9])
-- [1,2,3]
--
-- In the code below, the ordering is non-deterministic.
--
-- > (c,l) <- refCommitter :: IO (Committer IO Int, IO [Int])
-- > close $ glue c <$> (join $ concurrentE Single <$> qList [1..30] <*> qList [40..60])
concurrentE ::
  MonadConc f =>
  Queue a ->
  Emitter f a ->
  Emitter f a ->
  CoEmitter f a
concurrentE :: Queue a -> Emitter f a -> Emitter f a -> CoEmitter f a
concurrentE Queue a
q Emitter f a
e Emitter f a
e' =
  (forall b. (Emitter f a -> f b) -> f b) -> CoEmitter f a
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b. (Emitter f a -> f b) -> f b) -> CoEmitter f a)
-> (forall b. (Emitter f a -> f b) -> f b) -> CoEmitter f a
forall a b. (a -> b) -> a -> b
$ \Emitter f a -> f b
eaction -> ((), b) -> b
forall a b. (a, b) -> b
snd (((), b) -> b)
-> ((((), b), ((), b)) -> ((), b)) -> (((), b), ((), b)) -> b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (((), b), ((), b)) -> ((), b)
forall a b. (a, b) -> a
fst ((((), b), ((), b)) -> b) -> f (((), b), ((), b)) -> f b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> f ((), b) -> f ((), b) -> f (((), b), ((), b))
forall (m :: * -> *) a b. MonadConc m => m a -> m b -> m (a, b)
C.concurrently (Queue a
-> (Committer f a -> f ()) -> (Emitter f a -> f b) -> f ((), b)
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a
-> (Committer m a -> m l) -> (Emitter m a -> m r) -> m (l, r)
queue Queue a
q (Committer f a -> Emitter f a -> f ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter f a
e) Emitter f a -> f b
eaction) (Queue a
-> (Committer f a -> f ()) -> (Emitter f a -> f b) -> f ((), b)
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a
-> (Committer m a -> m l) -> (Emitter m a -> m r) -> m (l, r)
queue Queue a
q (Committer f a -> Emitter f a -> f ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter f a
e') Emitter f a -> f b
eaction)

-- | Concurrently run two committers.
--
-- >>> import Data.Functor.Contravariant
-- >>> import Data.Text (pack)
-- >>> cFast = witherC (\b -> pure (Just b)) . contramap ("fast: " <>) $ toStdout
-- >>> cSlow = witherC (\b -> sleep 0.1 >> pure (Just b)) . contramap ("slow: " <>) $ toStdout
-- >>> close $ (popList ((pack . show) <$> [1..3]) <$> (concurrentC Unbounded cFast cSlow)) <> pure (sleep 1)
-- fast: 1
-- fast: 2
-- fast: 3
-- slow: 1
-- slow: 2
-- slow: 3
concurrentC :: (MonadConc m) => Queue a -> Committer m a -> Committer m a -> CoCommitter m a
concurrentC :: Queue a -> Committer m a -> Committer m a -> CoCommitter m a
concurrentC Queue a
q Committer m a
c Committer m a
c' = Either (Committer m a) (Committer m a) -> Committer m a
forall (m :: * -> *) a.
Either (Committer m a) (Committer m a) -> Committer m a
mergeC (Either (Committer m a) (Committer m a) -> Committer m a)
-> Codensity m (Either (Committer m a) (Committer m a))
-> CoCommitter m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Queue a
-> Committer m a
-> Committer m a
-> Codensity m (Either (Committer m a) (Committer m a))
forall (m :: * -> *) a.
MonadConc m =>
Queue a
-> Committer m a
-> Committer m a
-> Codensity m (Either (Committer m a) (Committer m a))
eitherC Queue a
q Committer m a
c Committer m a
c'

eitherC ::
  (MonadConc m) =>
  Queue a ->
  Committer m a ->
  Committer m a ->
  Codensity m (Either (Committer m a) (Committer m a))
eitherC :: Queue a
-> Committer m a
-> Committer m a
-> Codensity m (Either (Committer m a) (Committer m a))
eitherC Queue a
q Committer m a
cl Committer m a
cr =
  (forall b. (Either (Committer m a) (Committer m a) -> m b) -> m b)
-> Codensity m (Either (Committer m a) (Committer m a))
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b. (Either (Committer m a) (Committer m a) -> m b) -> m b)
 -> Codensity m (Either (Committer m a) (Committer m a)))
-> (forall b.
    (Either (Committer m a) (Committer m a) -> m b) -> m b)
-> Codensity m (Either (Committer m a) (Committer m a))
forall a b. (a -> b) -> a -> b
$
    \Either (Committer m a) (Committer m a) -> m b
kk ->
      (b, b) -> b
forall a b. (a, b) -> a
fst
        ((b, b) -> b) -> m (b, b) -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m b -> m b -> m (b, b)
forall (m :: * -> *) a b. MonadConc m => m a -> m b -> m (a, b)
C.concurrently
          (Queue a -> (Committer m a -> m b) -> (Emitter m a -> m ()) -> m b
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a -> (Committer m a -> m l) -> (Emitter m a -> m r) -> m l
queueL Queue a
q (Either (Committer m a) (Committer m a) -> m b
kk (Either (Committer m a) (Committer m a) -> m b)
-> (Committer m a -> Either (Committer m a) (Committer m a))
-> Committer m a
-> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Committer m a -> Either (Committer m a) (Committer m a)
forall a b. a -> Either a b
Left) (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer m a
cl))
          (Queue a -> (Committer m a -> m b) -> (Emitter m a -> m ()) -> m b
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a -> (Committer m a -> m l) -> (Emitter m a -> m r) -> m l
queueL Queue a
q (Either (Committer m a) (Committer m a) -> m b
kk (Either (Committer m a) (Committer m a) -> m b)
-> (Committer m a -> Either (Committer m a) (Committer m a))
-> Committer m a
-> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Committer m a -> Either (Committer m a) (Committer m a)
forall a b. b -> Either a b
Right) (Committer m a -> Emitter m a -> m ()
forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer m a
cr))

mergeC :: Either (Committer m a) (Committer m a) -> Committer m a
mergeC :: Either (Committer m a) (Committer m a) -> Committer m a
mergeC Either (Committer m a) (Committer m a)
ec =
  (a -> m Bool) -> Committer m a
forall (m :: * -> *) a. (a -> m Bool) -> Committer m a
Committer ((a -> m Bool) -> Committer m a) -> (a -> m Bool) -> Committer m a
forall a b. (a -> b) -> a -> b
$ \a
a ->
    case Either (Committer m a) (Committer m a)
ec of
      Left Committer m a
lc -> Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
lc a
a
      Right Committer m a
rc -> Committer m a -> a -> m Bool
forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
rc a
a