{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}
module Box.Connectors
( fuse_
, fuseSTM_
, fuse
, fuseSTM
, forkEmit
, feedback
, feedbackE
, fuseEmit
, fuseEmitM
, fuseCommit
, emerge
, emergeM
, splitCommit
, contCommit
) where
import Control.Category
import Control.Lens hiding ((:>), (.>), (<|), (|>))
import Data.Semigroup hiding (First, getFirst)
import Box.Box
import Box.Queue
import Box.Committer
import Box.Cont
import Box.Emitter
import Protolude hiding (STM, (.), (<>))
import Control.Monad.Conc.Class as C
import Control.Concurrent.Classy.Async as C
fuse_ :: (Monad m) => Emitter m a -> Committer m a -> m ()
fuse_ e c = go
where
go = do
a <- emit e
c' <- maybe (pure False) (commit c) a
when c' go
fuseSTM_ :: (MonadConc m) => Emitter (STM m) a -> Committer (STM m) a -> m ()
fuseSTM_ e c = go
where
go = do
b <-
C.atomically $ do
a <- emit e
maybe (pure False) (commit c) a
when b go
fuse :: (Monad m) => (a -> m (Maybe b)) -> Cont m (Box m b a) -> m ()
fuse f box = with box $ \(Box c e) -> fuse_ (emap f e) c
fuseSTM :: (MonadConc m) => (a -> (STM m) (Maybe b)) -> Cont m (Box (STM m) b a) -> m ()
fuseSTM f box = with box $ \(Box c e) -> fuseSTM_ (emap f e) c
forkEmit :: (Monad m) => Emitter m a -> Committer m a -> Emitter m a
forkEmit e c =
Emitter $ do
a <- emit e
maybe (pure ()) (void <$> commit c) a
pure a
fuseCommit :: (MonadConc m) => Committer (STM m) a -> Cont m (Committer (STM m) a)
fuseCommit c = Cont $ \caction -> queueC caction (`fuseSTM_` c)
fuseEmit :: (MonadConc m) => Emitter (STM m) a -> Cont m (Emitter (STM m) a)
fuseEmit e = Cont $ \eaction -> queueE (fuseSTM_ e) eaction
fuseEmitM :: (MonadConc m) => Emitter m a -> Cont m (Emitter m a)
fuseEmitM e = Cont $ \eaction -> queueEM (fuse_ e) eaction
emerge ::
(MonadConc m) =>
Cont m (Emitter (STM m) a, Emitter (STM m) a) ->
Cont m (Emitter (STM m) a)
emerge e =
Cont $ \eaction ->
with e $ \e' ->
fst <$>
C.concurrently
(queueE (fuseSTM_ (fst e')) eaction)
(queueE (fuseSTM_ (snd e')) eaction)
emergeM ::
(MonadConc m) =>
Cont m (Emitter m a, Emitter m a) ->
Cont m (Emitter m a)
emergeM e =
Cont $ \eaction ->
with e $ \e' ->
fst <$>
C.concurrently
(queueEM (fuse_ (fst e')) eaction)
(queueEM (fuse_ (snd e')) eaction)
splitCommit :: (MonadConc m) =>
Cont m (Committer (STM m) a)
-> Cont m (Either (Committer (STM m) a) (Committer (STM m) a))
splitCommit c =
Cont $ \kk ->
with c $ \c' ->
fst <$>
C.concurrently
(queueC (kk . Left) (`fuseSTM_` c'))
(queueC (kk . Right) (`fuseSTM_` c'))
contCommit :: Either (Committer m Text) (Committer m Text) -> Committer m Text
contCommit ec =
Committer $ \a ->
case ec of
Left lc -> commit (contramap ("left " <>) lc) a
Right rc -> commit rc a
feedback ::
(MonadConc m) =>
(a -> m (Maybe b)) ->
Cont m (Box m b a) ->
Cont m (Box m b a)
feedback f box =
Cont $ \bio ->
with box $ \(Box c e) -> do
fuse_ (emap f e) c
bio (Box c e)
feedbackE ::
(MonadConc m) =>
(a -> m (Maybe a)) ->
Emitter m a ->
Cont m (Emitter m a)
feedbackE f e =
emergeM ((,) <$> pure e <*> fuseEmitM (emap f e))