module Control.Concurrent.SCC.Types
(
Performer(..),
OpenConsumer, Consumer(..), OpenProducer, Producer(..),
OpenTransducer, Transducer(..), OpenSplitter, Splitter(..),
Boundary(..), Markup(..), Parser,
Branching (combineBranches),
isolateConsumer, isolateProducer, isolateTransducer, isolateSplitter,
oneToOneTransducer, statelessTransducer, foldingTransducer, statefulTransducer,
statelessSplitter, statefulSplitter,
splitToConsumers, splitInputToConsumers, pipePS
)
where
import Control.Concurrent.Coroutine
import Control.Concurrent.SCC.Streams
import Control.Monad (liftM, when)
import Data.Maybe (maybe)
type OpenConsumer m a d x r = AncestorFunctor a d => Source m a x -> Coroutine d m r
type OpenProducer m a d x r = AncestorFunctor a d => Sink m a x -> Coroutine d m r
type OpenTransducer m a1 a2 d x y =
(AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 y -> Coroutine d m [x]
type OpenSplitter m a1 a2 a3 a4 d x b =
(AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d, AncestorFunctor a4 d) =>
Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Sink m a4 b -> Coroutine d m [x]
newtype Performer m r = Performer {perform :: m r}
newtype Consumer m x r = Consumer {consume :: forall a d. OpenConsumer m a d x r}
newtype Producer m x r = Producer {produce :: forall a d. OpenProducer m a d x r}
newtype Transducer m x y = Transducer {transduce :: forall a1 a2 d. OpenTransducer m a1 a2 d x y}
newtype Splitter m x b = Splitter {split :: forall a1 a2 a3 a4 d. OpenSplitter m a1 a2 a3 a4 d x b}
data Boundary y = Start y | End y | Point y deriving (Eq, Show)
data Markup y x = Content x | Markup (Boundary y) deriving (Eq)
type Parser m x b = Transducer m x (Markup b x)
instance Functor Boundary where
fmap f (Start b) = Start (f b)
fmap f (End b) = End (f b)
fmap f (Point b) = Point (f b)
instance Functor (Markup y) where
fmap f (Content x) = Content (f x)
fmap f (Markup b) = Markup b
instance (Show y) => Show (Markup y Char) where
showsPrec p (Content x) s = x : s
showsPrec p (Markup b) s = '[' : shows b (']' : s)
isolateConsumer :: forall m x r. Monad m => (forall d. Functor d => Source m d x -> Coroutine d m r) -> Consumer m x r
isolateConsumer consume = Consumer consume'
where consume' :: forall a d. OpenConsumer m a d x r
consume' source = let source' :: Source m d x
source' = liftSource source
in consume source'
isolateProducer :: forall m x r. Monad m => (forall d. Functor d => Sink m d x -> Coroutine d m r) -> Producer m x r
isolateProducer produce = Producer produce'
where produce' :: forall a d. OpenProducer m a d x r
produce' sink = let sink' :: Sink m d x
sink' = liftSink sink
in produce sink'
isolateTransducer :: forall m x y. Monad m =>
(forall d. Functor d => Source m d x -> Sink m d y -> Coroutine d m [x]) -> Transducer m x y
isolateTransducer transduce = Transducer transduce'
where transduce' :: forall a1 a2 d. OpenTransducer m a1 a2 d x y
transduce' source sink = let source' :: Source m d x
source' = liftSource source
sink' :: Sink m d y
sink' = liftSink sink
in transduce source' sink'
isolateSplitter :: forall m x b. Monad m =>
(forall d. Functor d =>
Source m d x -> Sink m d x -> Sink m d x -> Sink m d b -> Coroutine d m [x])
-> Splitter m x b
isolateSplitter split = Splitter split'
where split' :: forall a1 a2 a3 a4 d. OpenSplitter m a1 a2 a3 a4 d x b
split' source true false edge = let source' :: Source m d x
source' = liftSource source
true' :: Sink m d x
true' = liftSink true
false' :: Sink m d x
false' = liftSink false
edge' :: Sink m d b
edge' = liftSink edge
in split source' true' false' edge'
class Branching c (m :: * -> *) x r | c -> m x where
combineBranches :: (forall d. (Bool ->
(forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x r) ->
(forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x r) ->
(forall a. OpenConsumer m a d x r))) ->
Bool -> c -> c -> c
instance forall m x r. Monad m => Branching (Consumer m x r) m x r where
combineBranches combinator parallel c1 c2 = Consumer $ combinator parallel (consume c1) (consume c2)
instance forall m x. Monad m => Branching (Consumer m x ()) m x [x] where
combineBranches combinator parallel c1 c2
= Consumer $
liftM (const ())
. combinator parallel
(\source-> consume c1 source >> return [])
(\source-> consume c2 source >> return [])
instance forall m x y. Monad m => Branching (Transducer m x y) m x [x] where
combineBranches combinator parallel t1 t2
= let transduce' :: forall a1 a2 d. OpenTransducer m a1 a2 d x y
transduce' source sink = combinator parallel
(\source-> transduce t1 source sink')
(\source-> transduce t2 source sink')
source
where sink' :: Sink m d y
sink' = liftSink sink
in Transducer transduce'
instance forall m x b. (ParallelizableMonad m) => Branching (Splitter m x b) m x [x] where
combineBranches combinator parallel s1 s2
= let split' :: forall a1 a2 a3 a4 d. OpenSplitter m a1 a2 a3 a4 d x b
split' source true false edge = combinator parallel
(\source-> split s1 source true' false' edge')
(\source-> split s2 source true' false' edge')
source
where true' :: Sink m d x
true' = liftSink true
false' :: Sink m d x
false' = liftSink false
edge' :: Sink m d b
edge' = liftSink edge
in Splitter split'
oneToOneTransducer :: Monad m => (x -> y) -> Transducer m x y
oneToOneTransducer f = Transducer $
\source sink-> let t = canPut sink
>>= flip when (getSuccess source (\x-> put sink (f x) >> t))
in t >> return []
statelessTransducer :: Monad m => (x -> [y]) -> Transducer m x y
statelessTransducer f = Transducer $
\source sink-> let t = canPut sink
>>= flip when (getSuccess source (\x-> putList (f x) sink >> t))
in t >> return []
foldingTransducer :: Monad m => (s -> x -> s) -> s -> (s -> y) -> Transducer m x y
foldingTransducer f s0 w = Transducer $
\source sink-> let t s = canPut sink
>>= flip when (get source
>>= maybe
(put sink (w s) >> return ())
(t . f s))
in t s0 >> return []
statefulTransducer :: Monad m => (state -> x -> (state, [y])) -> state -> Transducer m x y
statefulTransducer f s0 = Transducer $
\source sink-> let t s = canPut sink
>>= flip when (getSuccess source
(\x-> let (s', ys) = f s x
in putList ys sink >> t s'))
in t s0 >> return []
statelessSplitter :: Monad m => (x -> Bool) -> Splitter m x b
statelessSplitter f = Splitter (\source true false edge->
let s = get source
>>= maybe
(return [])
(\x-> (if f x then put true x else put false x)
>>= cond s (return [x]))
in s)
statefulSplitter :: Monad m => (state -> x -> (state, Bool)) -> state -> Splitter m x ()
statefulSplitter f s0 = Splitter (\source true false edge->
let split s = get source
>>= maybe
(return [])
(\x-> let (s', truth) = f s x
in (if truth then put true x else put false x)
>>= cond (split s') (return [x]))
in split s0)
splitToConsumers :: (Functor d, Monad m, d1 ~ SinkFunctor d x, AncestorFunctor a (SinkFunctor (SinkFunctor d1 x) b)) =>
Splitter m x b ->
Source m a x ->
(Source m (SourceFunctor d x) x -> Coroutine (SourceFunctor d x) m r1) ->
(Source m (SourceFunctor d1 x) x -> Coroutine (SourceFunctor d1 x) m r2) ->
(Source m (SourceFunctor (SinkFunctor d1 x) b) b
-> Coroutine (SourceFunctor (SinkFunctor d1 x) b) m r3) ->
Coroutine d m ([x], r1, r2, r3)
splitToConsumers s source trueConsumer falseConsumer edgeConsumer
= pipe
(\true-> pipe
(\false-> pipe
(split s source true false)
edgeConsumer)
falseConsumer)
trueConsumer
>>= \(((extra, r3), r2), r1)-> return (extra, r1, r2, r3)
splitInputToConsumers :: forall m a d d1 x b. (ParallelizableMonad m, d1 ~ SinkFunctor d x, AncestorFunctor a d) =>
Bool -> Splitter m x b -> Source m a x ->
(Source m (SourceFunctor d1 x) x -> Coroutine (SourceFunctor d1 x) m [x]) ->
(Source m (SourceFunctor d x) x -> Coroutine (SourceFunctor d x) m [x]) ->
Coroutine d m [x]
splitInputToConsumers parallel s source trueConsumer falseConsumer
= pipePS parallel
(\false-> pipePS parallel
(\true-> pipePS parallel
(split s source' true false)
consumeAndSuppress)
trueConsumer)
falseConsumer
>>= \(((extra, _), xs1), xs2)-> return (prependCommonPrefix xs1 xs2 extra)
where prependCommonPrefix (x:xs) (y:ys) tail = x : prependCommonPrefix xs ys tail
prependCommonPrefix _ _ tail = tail
source' :: Source m d x
source' = liftSource source