{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RebindableSyntax #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}

-- | Various ways to connect things up.
module Box.Connectors
  ( qList,
    qListWith,
    popList,
    pushList,
    pushListN,
    sink,
    sinkWith,
    source,
    sourceWith,
    forkEmit,
    bufferCommitter,
    bufferEmitter,
    concurrentE,
    concurrentC,
    takeQ,
    evalEmitter,
    evalEmitterWith,
  )
where

import Box.Box
import Box.Codensity
import Box.Committer
import Box.Emitter
import Box.Functor
import Box.Queue
import Control.Concurrent.Async
import Control.Monad.State.Lazy
import Data.Foldable
import qualified Data.Sequence as Seq
import Prelude

-- $setup
-- >>> :set -XOverloadedStrings
-- >>> import Box
-- >>> import Prelude
-- >>> import Data.Bool
-- >>> import Control.Monad

-- | Queue a list Unbounded.
--
-- >>> pushList <$|> qList [1,2,3]
-- [1,2,3]
qList :: [a] -> CoEmitter IO a
qList :: forall a. [a] -> CoEmitter IO a
qList [a]
xs = forall a. Queue a -> [a] -> CoEmitter IO a
qListWith forall a. Queue a
Unbounded [a]
xs

-- | Queue a list with an explicit 'Queue'.
--
-- >>> pushList <$|> qListWith Single [1,2,3]
-- [1,2,3]
qListWith :: Queue a -> [a] -> CoEmitter IO a
qListWith :: forall a. Queue a -> [a] -> CoEmitter IO a
qListWith Queue a
q [a]
xs = forall a r. Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ Queue a
q (\Committer IO a
c -> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (t :: * -> *). Foldable t => t Bool -> Bool
and (forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer IO a
c) [a]
xs))

-- | Directly supply a list to a committer action, via pop.
--
-- >>> popList [1..3] showStdout
-- 1
-- 2
-- 3
popList :: Monad m => [a] -> Committer m a -> m ()
popList :: forall (m :: * -> *) a. Monad m => [a] -> Committer m a -> m ()
popList [a]
xs Committer m a
c = forall (m :: * -> *) s a.
Monad m =>
s -> Committer m a -> Emitter (StateT s m) a -> m ()
glueES (forall a. [a] -> Seq a
Seq.fromList [a]
xs) Committer m a
c forall (m :: * -> *) a. Monad m => Emitter (StateT (Seq a) m) a
pop

-- | Push an Emitter into a list, via push.
--
-- >>> pushList <$|> qList [1..3]
-- [1,2,3]
pushList :: (Monad m) => Emitter m a -> m [a]
pushList :: forall (m :: * -> *) a. Monad m => Emitter m a -> m [a]
pushList Emitter m a
e = forall (t :: * -> *) a. Foldable t => t a -> [a]
toList forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT forall a. Seq a
Seq.empty (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue forall (m :: * -> *) a. Monad m => Committer (StateT (Seq a) m) a
push (forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Emitter m a
e))

-- | Push an Emitter into a list, finitely.
--
-- >>> pushListN 2 <$|> qList [1..3]
-- [1,2]
pushListN :: (Monad m) => Int -> Emitter m a -> m [a]
pushListN :: forall (m :: * -> *) a. Monad m => Int -> Emitter m a -> m [a]
pushListN Int
n Emitter m a
e = forall (t :: * -> *) a. Foldable t => t a -> [a]
toList forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT forall a. Seq a
Seq.empty (forall (m :: * -> *) a.
Monad m =>
Int -> Committer m a -> Emitter m a -> m ()
glueN Int
n forall (m :: * -> *) a. Monad m => Committer (StateT (Seq a) m) a
push (forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Emitter m a
e))

-- singleton sink
sink1 :: (Monad m) => (a -> m ()) -> Emitter m a -> m ()
sink1 :: forall (m :: * -> *) a.
Monad m =>
(a -> m ()) -> Emitter m a -> m ()
sink1 a -> m ()
f Emitter m a
e = do
  Maybe a
a <- forall (m :: * -> *) a. Emitter m a -> m (Maybe a)
emit Emitter m a
e
  case Maybe a
a of
    Maybe a
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just a
a' -> a -> m ()
f a
a'

-- FIXME: This doctest sometimes fails with the last value not being printed. Hypothesis: the pipe collapses before the console print effect happens.

-- | Create a finite Committer Unbounded Queue.
--
-- > glue <$> sink 2 print <*|> qList [1..3]
-- 1
-- 2
sink :: Int -> (a -> IO ()) -> CoCommitter IO a
sink :: forall a. Int -> (a -> IO ()) -> CoCommitter IO a
sink Int
n a -> IO ()
f = forall a. Queue a -> Int -> (a -> IO ()) -> CoCommitter IO a
sinkWith forall a. Queue a
Unbounded Int
n a -> IO ()
f

-- | Create a finite Committer Queue.
sinkWith :: Queue a -> Int -> (a -> IO ()) -> CoCommitter IO a
sinkWith :: forall a. Queue a -> Int -> (a -> IO ()) -> CoCommitter IO a
sinkWith Queue a
q Int
n a -> IO ()
f = forall a r. Queue a -> (Emitter IO a -> IO r) -> CoCommitter IO a
commitQ Queue a
q forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
Monad m =>
(a -> m ()) -> Emitter m a -> m ()
sink1 a -> IO ()
f

-- singleton source
source1 :: (Monad m) => m a -> Committer m a -> m ()
source1 :: forall (m :: * -> *) a. Monad m => m a -> Committer m a -> m ()
source1 m a
a Committer m a
c = do
  a
a' <- m a
a
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c a
a'

-- | Create a finite (Co)Emitter Unbounded Queue.
--
-- >>> glue toStdout <$|> source 2 (pure "hi")
-- hi
-- hi
source :: Int -> IO a -> CoEmitter IO a
source :: forall a. Int -> IO a -> CoEmitter IO a
source Int
n IO a
f = forall a. Queue a -> Int -> IO a -> CoEmitter IO a
sourceWith forall a. Queue a
Unbounded Int
n IO a
f

-- | Create a finite (Co)Emitter Unbounded Queue.
--
-- >>> glue toStdout <$|> sourceWith Single 2 (pure "hi")
-- hi
-- hi
sourceWith :: Queue a -> Int -> IO a -> CoEmitter IO a
sourceWith :: forall a. Queue a -> Int -> IO a -> CoEmitter IO a
sourceWith Queue a
q Int
n IO a
f = forall a r. Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ Queue a
q forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => m a -> Committer m a -> m ()
source1 IO a
f

-- | Glues an emitter to a committer, then resupplies the emitter.
--
-- >>> (c1,l1) <- refCommitter :: IO (Committer IO Int, IO [Int])
-- >>> close $ toListM <$> (forkEmit <$> (qList [1..3]) <*> pure c1)
-- [1,2,3]
--
-- >>> l1
-- [1,2,3]
forkEmit :: (Monad m) => Emitter m a -> Committer m a -> Emitter m a
forkEmit :: forall (m :: * -> *) a.
Monad m =>
Emitter m a -> Committer m a -> Emitter m a
forkEmit Emitter m a
e Committer m a
c =
  forall (m :: * -> *) a. m (Maybe a) -> Emitter m a
Emitter forall a b. (a -> b) -> a -> b
$ do
    Maybe a
a <- forall (m :: * -> *) a. Emitter m a -> m (Maybe a)
emit Emitter m a
e
    forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (forall (f :: * -> *) a. Functor f => f a -> f ()
void forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c) Maybe a
a
    forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
a

-- | Buffer a committer.
bufferCommitter :: Committer IO a -> CoCommitter IO a
bufferCommitter :: forall a. Committer IO a -> CoCommitter IO a
bufferCommitter Committer IO a
c = forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$ \Committer IO a -> IO b
caction -> forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO l
queueL forall a. Queue a
Unbounded Committer IO a -> IO b
caction (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer IO a
c)

-- | Buffer an emitter.
bufferEmitter :: Emitter IO a -> CoEmitter IO a
bufferEmitter :: forall a. Emitter IO a -> CoEmitter IO a
bufferEmitter Emitter IO a
e = forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$ \Emitter IO a -> IO b
eaction -> forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO r
queueR forall a. Queue a
Unbounded (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter IO a
e) Emitter IO a -> IO b
eaction

-- | Concurrently run two emitters.
--
-- This differs to (<>), which is left-biased.
--
-- Note that functions such as toListM, which complete on the first Nothing emitted, will not work as expected.
--
-- >>> close $ (fmap toListM) (join $ concurrentE Single <$> qList [1..3] <*> qList [5..9])
-- [1,2,3]
--
-- In the code below, the ordering is non-deterministic.
--
-- > (c,l) <- refCommitter :: IO (Committer IO Int, IO [Int])
-- > close $ glue c <$> (join $ concurrentE Single <$> qList [1..30] <*> qList [40..60])
concurrentE ::
  Queue a ->
  Emitter IO a ->
  Emitter IO a ->
  CoEmitter IO a
concurrentE :: forall a. Queue a -> Emitter IO a -> Emitter IO a -> CoEmitter IO a
concurrentE Queue a
q Emitter IO a
e Emitter IO a
e' =
  forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$ \Emitter IO a -> IO b
eaction -> forall a b. (a, b) -> b
snd forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> a
fst forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a b. IO a -> IO b -> IO (a, b)
concurrently (forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO (l, r)
queue Queue a
q (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter IO a
e) Emitter IO a -> IO b
eaction) (forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO (l, r)
queue Queue a
q (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter IO a
e') Emitter IO a -> IO b
eaction)

-- | Concurrently run two committers.
--
-- >>> import Data.Functor.Contravariant
-- >>> import Data.Text (pack)
-- >>> cFast = witherC (\b -> pure (Just b)) . contramap ("fast: " <>) $ toStdout
-- >>> cSlow = witherC (\b -> sleep 0.1 >> pure (Just b)) . contramap ("slow: " <>) $ toStdout
-- >>> close $ (popList ((pack . show) <$> [1..3]) <$> (concurrentC Unbounded cFast cSlow)) <> pure (sleep 1)
-- fast: 1
-- fast: 2
-- fast: 3
-- slow: 1
-- slow: 2
-- slow: 3
concurrentC :: Queue a -> Committer IO a -> Committer IO a -> CoCommitter IO a
concurrentC :: forall a.
Queue a -> Committer IO a -> Committer IO a -> CoCommitter IO a
concurrentC Queue a
q Committer IO a
c Committer IO a
c' = forall a.
Either (Committer IO a) (Committer IO a) -> Committer IO a
mergeC forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a.
Queue a
-> Committer IO a
-> Committer IO a
-> Codensity IO (Either (Committer IO a) (Committer IO a))
eitherC Queue a
q Committer IO a
c Committer IO a
c'

eitherC ::
  Queue a ->
  Committer IO a ->
  Committer IO a ->
  Codensity IO (Either (Committer IO a) (Committer IO a))
eitherC :: forall a.
Queue a
-> Committer IO a
-> Committer IO a
-> Codensity IO (Either (Committer IO a) (Committer IO a))
eitherC Queue a
q Committer IO a
cl Committer IO a
cr =
  forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$
    \Either (Committer IO a) (Committer IO a) -> IO b
kk ->
      forall a b. (a, b) -> a
fst
        forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a b. IO a -> IO b -> IO (a, b)
concurrently
          (forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO l
queueL Queue a
q (Either (Committer IO a) (Committer IO a) -> IO b
kk forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> Either a b
Left) (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer IO a
cl))
          (forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO l
queueL Queue a
q (Either (Committer IO a) (Committer IO a) -> IO b
kk forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right) (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer IO a
cr))

mergeC :: Either (Committer IO a) (Committer IO a) -> Committer IO a
mergeC :: forall a.
Either (Committer IO a) (Committer IO a) -> Committer IO a
mergeC Either (Committer IO a) (Committer IO a)
ec =
  forall (m :: * -> *) a. (a -> m Bool) -> Committer m a
Committer forall a b. (a -> b) -> a -> b
$ \a
a ->
    case Either (Committer IO a) (Committer IO a)
ec of
      Left Committer IO a
lc -> forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer IO a
lc a
a
      Right Committer IO a
rc -> forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer IO a
rc a
a

-- | Take and queue n emits.
--
-- >>> import Control.Monad.State.Lazy
-- >>> toListM <$|> (takeQ Single 4 =<< qList [0..])
-- [0,1,2,3]
takeQ :: Queue a -> Int -> Emitter IO a -> CoEmitter IO a
takeQ :: forall a. Queue a -> Int -> Emitter IO a -> CoEmitter IO a
takeQ Queue a
q Int
n Emitter IO a
e = forall a r. Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ Queue a
q forall a b. (a -> b) -> a -> b
$ \Committer IO a
c -> forall (m :: * -> *) s a.
Monad m =>
s -> Committer m a -> Emitter (StateT s m) a -> m ()
glueES Int
0 Committer IO a
c (forall (m :: * -> *) a.
Monad m =>
Int -> Emitter m a -> Emitter (StateT Int m) a
takeE Int
n Emitter IO a
e)

-- | queue a stateful emitter, supplying initial state
--
-- >>> import Control.Monad.State.Lazy
-- >>> toListM <$|> (evalEmitter 0 <$> takeE 4 =<< qList [0..])
-- [0,1,2,3]
evalEmitter :: s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitter :: forall s a. s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitter s
s Emitter (StateT s IO) a
e = forall a s.
Queue a -> s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitterWith forall a. Queue a
Unbounded s
s Emitter (StateT s IO) a
e

-- | queue a stateful emitter, supplying initial state
evalEmitterWith :: Queue a -> s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitterWith :: forall a s.
Queue a -> s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitterWith Queue a
q s
s Emitter (StateT s IO) a
e = forall a r. Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ Queue a
q forall a b. (a -> b) -> a -> b
$ \Committer IO a
c -> forall (m :: * -> *) s a.
Monad m =>
s -> Committer m a -> Emitter (StateT s m) a -> m ()
glueES s
s Committer IO a
c Emitter (StateT s IO) a
e