{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}
{-# OPTIONS_GHC -fno-warn-type-defaults #-}
module Box.Transducer
( Transducer(..)
, etc
, etcM
, asPipe
) where
import Control.Category
import Control.Lens hiding ((:>), (.>), (<|), (|>))
import Control.Monad.Base (MonadBase, liftBase)
import Box.Box
import Box.Committer
import Box.Cont
import Box.Emitter
import Box.Stream
import qualified Pipes
import qualified Pipes.Prelude as Pipes
import Protolude hiding ((.), (<>))
import Streaming (Of(..), Stream)
import qualified Streaming.Prelude as S
import Control.Monad.Conc.Class as C
newtype Transducer s a b = Transducer
{ transduce :: forall m. Monad m =>
Stream (Of a) (StateT s m) () -> Stream (Of b) (StateT s m) ()
}
instance Category (Transducer s) where
(Transducer t1) . (Transducer t2) = Transducer (t1 . t2)
id = Transducer id
asPipe ::
(Monad m)
=> Pipes.Pipe a b (StateT s m) ()
-> (Stream (Of a) (StateT s m) () -> Stream (Of b) (StateT s m) ())
asPipe p s = ((s & Pipes.unfoldr S.next) Pipes.>-> p) & S.unfoldr Pipes.next
etc :: (MonadConc m) => s -> Transducer s a b -> Cont m (Box (C.STM m) b a) -> m s
etc st t box =
with box $ \(Box c e) ->
(e & toStream & transduce t & fromStream) c & flip execStateT st
etcM :: (MonadConc m, MonadBase m m) => s -> Transducer s a b -> Cont m (Box m b a) -> m s
etcM st t box =
with box $ \(Box c e) ->
(liftE' e & toStreamM & transduce t & fromStreamM) (liftC' c) & flip execStateT st
where
liftC' c = Committer $ liftBase . commit c
liftE' = Emitter . liftBase . emit