module Control.Concurrent.SCC.Combinators
(
splitterToMarker,
consumeBy, prepend, append, substitute,
PipeableComponentPair (connect), JoinableComponentPair (join, sequence),
sNot, sAnd, sOr,
pAnd, pOr,
ifs, wherever, unless, select,
while, nestedIn,
foreach, having, havingOnly, followedBy, even,
first, uptoFirst, prefix,
last, lastAndAfter, suffix,
startOf, endOf,
between,
parseRegions, parseNestedRegions,
groupMarks)
where
import Control.Concurrent.Coroutine
import Control.Concurrent.SCC.Streams
import Control.Concurrent.SCC.Types
import Prelude hiding (even, last, sequence, (||), (&&))
import qualified Prelude
import Control.Exception (assert)
import Control.Monad (liftM, when)
import qualified Control.Monad as Monad
import Control.Monad.Trans (lift)
import Data.Maybe (isJust, isNothing, fromJust)
import qualified Data.Foldable as Foldable
import qualified Data.Sequence as Seq
import Data.Sequence (Seq, (|>), (><), ViewL (EmptyL, (:<)))
import Debug.Trace (trace)
consumeBy :: forall m x y r. (Monad m) => Consumer m x r -> Transducer m x y
consumeBy c = Transducer $ \ source _sink -> consume c source >> return []
class PipeableComponentPair (m :: * -> *) w c1 c2 c3 | c1 c2 -> c3, c1 c3 -> c2, c2 c3 -> c2,
c1 -> m w, c2 -> m w, c3 -> m
where connect :: Bool -> c1 -> c2 -> c3
instance forall m x. (ParallelizableMonad m) =>
PipeableComponentPair m x (Producer m x ()) (Consumer m x ()) (Performer m ())
where connect parallel p c = let performPipe :: Coroutine Naught m ((), ())
performPipe = pipePS parallel (produce p) (consume c)
in Performer (runCoroutine performPipe >> return ())
instance (ParallelizableMonad m)
=> PipeableComponentPair m y (Transducer m x y) (Consumer m y r) (Consumer m x r)
where connect parallel t c = isolateConsumer $ \source->
liftM snd $
pipePS parallel
(transduce t source)
(consume c)
instance (ParallelizableMonad m) => PipeableComponentPair m x (Producer m x r) (Transducer m x y) (Producer m y r)
where connect parallel p t = isolateProducer $ \sink->
liftM fst $
pipePS parallel
(produce p)
(\source-> transduce t source sink)
instance ParallelizableMonad m => PipeableComponentPair m y (Transducer m x y) (Transducer m y z) (Transducer m x z)
where connect parallel t1 t2 = isolateTransducer $ \source sink->
liftM fst $
pipePS parallel
(transduce t1 source)
(\source-> transduce t2 source sink)
class CompatibleSignature c cons (m :: * -> *) input output | c -> cons m
class AnyListOrUnit c
instance AnyListOrUnit [x]
instance AnyListOrUnit ()
instance (AnyListOrUnit x, AnyListOrUnit y) => CompatibleSignature (Performer m r) (PerformerType r) m x y
instance AnyListOrUnit y => CompatibleSignature (Consumer m x r) (ConsumerType r) m [x] y
instance AnyListOrUnit y => CompatibleSignature (Producer m x r) (ProducerType r) m y [x]
instance CompatibleSignature (Transducer m x y) TransducerType m [x] [y]
data PerformerType r
data ConsumerType r
data ProducerType r
data TransducerType
class (Monad m, CompatibleSignature c1 t1 m x y, CompatibleSignature c2 t2 m x y, CompatibleSignature c3 t3 m x y)
=> JoinableComponentPair t1 t2 t3 m x y c1 c2 c3 | c1 c2 -> c3, c1 -> t1 m, c2 -> t2 m, c3 -> t3 m x y,
t1 m x y -> c1, t2 m x y -> c2, t3 m x y -> c3
where join :: Bool -> c1 -> c2 -> c3
sequence :: c1 -> c2 -> c3
join = const sequence
instance forall m x r1 r2. Monad m =>
JoinableComponentPair (ProducerType r1) (ProducerType r2) (ProducerType r2) m () [x]
(Producer m x r1) (Producer m x r2) (Producer m x r2)
where sequence p1 p2 = Producer $ \sink-> produce p1 sink >> produce p2 sink
instance forall m x. ParallelizableMonad m =>
JoinableComponentPair (ConsumerType ()) (ConsumerType ()) (ConsumerType ()) m [x] ()
(Consumer m x ()) (Consumer m x ()) (Consumer m x ())
where join parallel c1 c2 = isolateConsumer $ \source->
pipePS parallel
(\sink1-> pipe (tee source sink1) (consume c2))
(consume c1)
>> return ()
sequence c1 c2 = isolateConsumer $ \source->
pipe
(\buffer-> pipe (tee source buffer) (consume c1))
getList
>>= \(_, list)-> pipe (putList list) (consume c2)
>> return ()
instance forall m x y. (ParallelizableMonad m) =>
JoinableComponentPair TransducerType TransducerType TransducerType m [x] [y]
(Transducer m x y) (Transducer m x y) (Transducer m x y)
where join parallel t1 t2 = isolateTransducer $ \source sink->
pipe
(\buffer-> pipePS parallel
(\sink1-> pipe
(\sink2-> tee source sink1 sink2)
(\src-> transduce t2 src buffer))
(\source-> transduce t1 source sink))
getList
>>= \(_, list)-> putList list sink
>> getList source
sequence t1 t2 = isolateTransducer $ \source sink->
pipe
(\buffer-> pipe
(tee source buffer)
(\source-> transduce t1 source sink))
getList
>>= \(_, list)-> pipe
(\sink-> putList list sink
>>= whenNull (pour source sink
>> return []))
(\source-> transduce t2 source sink)
>>= return . fst
instance forall m r1 r2. ParallelizableMonad m =>
JoinableComponentPair (PerformerType r1) (PerformerType r2) (PerformerType r2) m () ()
(Performer m r1) (Performer m r2) (Performer m r2)
where join parallel p1 p2 = Performer $ if parallel
then bindM2 (const return) (perform p1) (perform p2)
else perform p1 >> perform p2
sequence p1 p2 = Performer $ perform p1 >> perform p2
instance forall m x r1 r2. (ParallelizableMonad m) =>
JoinableComponentPair (PerformerType r1) (ProducerType r2) (ProducerType r2) m () [x]
(Performer m r1) (Producer m x r2) (Producer m x r2)
where join parallel pe pr = Producer $ \sink-> if parallel
then bindM2 (const return) (lift (perform pe)) (produce pr sink)
else lift (perform pe) >> produce pr sink
sequence pe pr = Producer $ \sink-> lift (perform pe) >> produce pr sink
instance forall m x r1 r2. (ParallelizableMonad m) =>
JoinableComponentPair (ProducerType r1) (PerformerType r2) (ProducerType r2) m () [x]
(Producer m x r1) (Performer m r2) (Producer m x r2)
where join parallel pr pe = Producer $ \sink-> if parallel
then bindM2 (const return) (produce pr sink) (lift (perform pe))
else produce pr sink >> lift (perform pe)
sequence pr pe = Producer $ \sink-> produce pr sink >> lift (perform pe)
instance forall m x r1 r2. (ParallelizableMonad m) =>
JoinableComponentPair (PerformerType r1) (ConsumerType r2) (ConsumerType r2) m [x] ()
(Performer m r1) (Consumer m x r2) (Consumer m x r2)
where join parallel p c = Consumer $ \source-> if parallel
then bindM2 (const return) (lift (perform p)) (consume c source)
else lift (perform p) >> consume c source
sequence p c = Consumer $ \source-> lift (perform p) >> consume c source
instance forall m x r1 r2. (ParallelizableMonad m) =>
JoinableComponentPair (ConsumerType r1) (PerformerType r2) (ConsumerType r2) m [x] ()
(Consumer m x r1) (Performer m r2) (Consumer m x r2)
where join parallel c p = Consumer $ \source-> if parallel
then bindM2 (const return) (consume c source) (lift (perform p))
else consume c source >> lift (perform p)
sequence c p = Consumer $ \source-> consume c source >> lift (perform p)
instance forall m x y r. (ParallelizableMonad m) =>
JoinableComponentPair (PerformerType r) TransducerType TransducerType m [x] [y]
(Performer m r) (Transducer m x y) (Transducer m x y)
where join parallel p t = Transducer $ \ source sink -> if parallel
then bindM2 (const return)
(lift (perform p)) (transduce t source sink)
else lift (perform p) >> transduce t source sink
sequence p t = Transducer $ \ source sink -> lift (perform p) >> transduce t source sink
instance forall m x y r. (ParallelizableMonad m)
=> JoinableComponentPair TransducerType (PerformerType r) TransducerType m [x] [y]
(Transducer m x y) (Performer m r) (Transducer m x y)
where join parallel t p = Transducer $ \ source sink -> if parallel
then bindM2 (const . return)
(transduce t source sink) (lift (perform p))
else do result <- transduce t source sink
lift (perform p)
return result
sequence t p = Transducer $ \ source sink -> do result <- transduce t source sink
lift (perform p)
return result
instance forall m x y. (ParallelizableMonad m) =>
JoinableComponentPair (ProducerType ()) TransducerType TransducerType m [x] [y]
(Producer m y ()) (Transducer m x y) (Transducer m x y)
where join parallel p t = if parallel
then isolateTransducer $ \source sink->
do (rest, out) <- pipe
(\buffer-> bindM2 (const return)
(produce p sink) (transduce t source buffer))
getList
putList out sink
return rest
else sequence p t
sequence p t = Transducer $ \ source sink -> produce p sink >> transduce t source sink
instance forall m x y. (ParallelizableMonad m) =>
JoinableComponentPair TransducerType (ProducerType ()) TransducerType m [x] [y]
(Transducer m x y) (Producer m y ()) (Transducer m x y)
where join parallel t p = if parallel
then isolateTransducer $ \source sink->
do (rest, out) <- pipe
(\buffer-> bindM2 (const . return)
(transduce t source sink)
(produce p buffer))
getList
putList out sink
return rest
else sequence t p
sequence t p = Transducer $ \ source sink -> do result <- transduce t source sink
produce p sink
return result
instance forall m x y. (ParallelizableMonad m) =>
JoinableComponentPair (ConsumerType ()) TransducerType TransducerType m [x] [y]
(Consumer m x ()) (Transducer m x y) (Transducer m x y)
where join parallel c t = isolateTransducer $ \source sink->
liftM (snd . fst) $
pipePS parallel
(\sink1-> pipe
(tee source sink1)
(\source-> transduce t source sink))
(consume c)
sequence c t = isolateTransducer $ \source sink->
pipe
(\buffer-> pipe
(tee source buffer)
(consume c))
getList
>>= \(_, list)-> pipe
(\sink-> putList list sink
>>= whenNull (pour source sink >> return []))
(\source-> transduce t source sink)
>>= return . fst
instance forall m x y. ParallelizableMonad m =>
JoinableComponentPair TransducerType (ConsumerType ()) TransducerType m [x] [y]
(Transducer m x y) (Consumer m x ()) (Transducer m x y)
where join parallel t c = join parallel c t
sequence t c = isolateTransducer $ \source sink->
pipe
(\buffer-> pipe
(tee source buffer)
(\source-> transduce t source sink))
getList
>>= \(_, list)-> pipe
(\sink-> putList list sink
>>= whenNull (pour source sink
>> return []))
(consume c)
>>= return . fst
instance forall m x y. (ParallelizableMonad m) =>
JoinableComponentPair (ProducerType ()) (ConsumerType ()) TransducerType m [x] [y]
(Producer m y ()) (Consumer m x ()) (Transducer m x y)
where join parallel p c = Transducer $ \ source sink ->
if parallel
then bindM2 (\ _ _ -> return []) (produce p sink) (consume c source)
else produce p sink >> consume c source >> return []
sequence p c = Transducer $ \ source sink -> produce p sink >> consume c source >> return []
instance forall m x y. (ParallelizableMonad m) =>
JoinableComponentPair (ConsumerType ()) (ProducerType ()) TransducerType m [x] [y]
(Consumer m x ()) (Producer m y ()) (Transducer m x y)
where join parallel c p = join parallel p c
sequence c p = Transducer $ \ source sink -> consume c source >> produce p sink >> return []
prepend :: forall m x r. (Monad m) => Producer m x r -> Transducer m x x
prepend prefix = Transducer $ \ source sink -> produce prefix sink >> pour source sink >> return []
append :: forall m x r. (Monad m) => Producer m x r -> Transducer m x x
append suffix = Transducer $ \ source sink -> pour source sink >> produce suffix sink >> return []
substitute :: forall m x y r. (Monad m) => Producer m y r -> Transducer m x y
substitute feed = Transducer $ \ source sink -> consumeAndSuppress source >> produce feed sink >> return []
sNot :: forall m x b. Monad m => Splitter m x b -> Splitter m x b
sNot splitter = isolateSplitter $ \ source true false edge -> suppressProducer (split splitter source false true)
sAnd :: forall m x b1 b2. ParallelizableMonad m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2)
sAnd parallel s1 s2 =
isolateSplitter $ \ source true false edge ->
liftM (fst . fst . fst . fst) $
pipe
(\edges-> pipe
(\edge1-> pipe
(\edge2-> pipePS parallel
(\true-> split s1 source true false edge1)
(\source-> split s2 source true false edge2))
(flip (pourMap Right) edges))
(flip (pourMap Left) edges))
(flip intersectRegions edge)
intersectRegions source sink = next Nothing Nothing
where next lastLeft lastRight = get source
>>= maybe
(return ())
(either
(flip pair lastRight . Just)
(pair lastLeft . Just))
pair l@(Just x) r@(Just y) = put sink (x, y)
>>= flip when (next Nothing Nothing)
pair l r = next l r
sOr :: forall m x b1 b2. ParallelizableMonad m =>
Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (Either b1 b2)
sOr parallel s1 s2 = isolateSplitter $ \ source true false edge ->
liftM (fst . fst . fst) $
pipe
(\edge1-> pipe
(\edge2-> pipePS parallel
(\false-> split s1 source true false edge1)
(\source-> split s2 source true false edge2))
(flip (pourMap Right) edge))
(flip (pourMap Left) edge)
pAnd :: forall m x b1 b2. ParallelizableMonad m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2)
pAnd parallel s1 s2 = isolateSplitter $ \ source true false edge ->
liftM (\(x, y)-> y ++ x) $
pipePS parallel
(transduce (splittersToPairMarker parallel s1 s2) source)
(\source-> let split l r = get source
>>= maybe
(return [])
(test l r)
test l r (Left (x, t1, t2))
= (if t1 Prelude.&& t2 then put true x else put false x)
>>= cond
(split
(if t1 then l else Nothing)
(if t2 then r else Nothing))
(return [x])
test _ Nothing (Right (Left l)) = split (Just l) Nothing
test _ (Just r) (Right (Left l))
= put edge (l, r) >> split (Just l) (Just r)
test Nothing _ (Right (Right r)) = split Nothing (Just r)
test (Just l) _ (Right (Right r))
= put edge (l, r) >> split (Just l) (Just r)
in split Nothing Nothing)
pOr :: forall c m x b1 b2. ParallelizableMonad m =>
Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (Either b1 b2)
pOr = zipSplittersWith (Prelude.||) pour
ifs :: forall c m x b. (ParallelizableMonad m, Branching c m x [x]) => Bool -> Splitter m x b -> c -> c -> c
ifs parallel s c1 c2 = combineBranches if' parallel c1 c2
where if' :: forall d. Bool -> (forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x [x]) ->
(forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x [x]) ->
forall a. OpenConsumer m a d x [x]
if' parallel c1 c2 source = splitInputToConsumers parallel s source c1 c2
wherever :: forall m x b. ParallelizableMonad m => Bool -> Transducer m x x -> Splitter m x b -> Transducer m x x
wherever parallel t s = isolateTransducer $ \source sink->
splitInputToConsumers parallel s source
(\source-> transduce t source sink)
(\source-> pour source sink >> return [])
unless :: forall m x b. ParallelizableMonad m => Bool -> Transducer m x x -> Splitter m x b -> Transducer m x x
unless parallel t s = isolateTransducer $ \source sink->
splitInputToConsumers parallel s source
(\source-> pour source sink >> return [])
(\source-> transduce t source sink)
select :: forall m x b. Monad m => Splitter m x b -> Transducer m x x
select s = isolateTransducer $ \source sink-> suppressProducer (suppressProducer . split s source sink)
parseRegions :: forall m x b. Monad m => Splitter m x b -> Parser m x b
parseRegions s = isolateTransducer $ \source sink->
liftM (\(x, y)-> y ++ x) $
pipe
(transduce (splitterToMarker s) source)
(\source-> wrapRegions source sink)
where wrapRegions source sink = let wrap0 mb = get source
>>= maybe
(maybe (return True) flush mb >> return [])
(wrap1 mb)
wrap1 Nothing (Left (x, _)) = put sink (Content x)
>>= cond (wrap0 Nothing) (return [x])
wrap1 (Just p) (Left (x, False)) = flush p
>> put sink (Content x)
>>= cond
(wrap0 Nothing)
(return [x])
wrap1 (Just (b, t)) (Left (x, True))
= (if t then return True else put sink (Markup (Start b)))
>> put sink (Content x)
>>= cond (wrap0 (Just (b, True))) (return [x])
wrap1 (Just p) (Right b') = flush p >> wrap0 (Just (b', False))
wrap1 Nothing (Right b) = wrap0 (Just (b, False))
flush (b, t) = put sink $ Markup $ (if t then End else Point) b
in wrap0 Nothing
parseNestedRegions :: forall m x b. Monad m => Splitter m x (Boundary b) -> Parser m x b
parseNestedRegions s = isolateTransducer $ \source sink->
liftM (\(w, (), (), _)-> w) $
splitToConsumers s source
(flip (pourMap Content) sink)
(flip (pourMap Content) sink)
(flip (pourMap Markup) sink)
while :: forall m x b. ParallelizableMonad m => [(Bool, (Transducer m x x, Splitter m x b))] -> Transducer m x x
while ((parallel, (t, s)) : rest) =
isolateTransducer $ \source sink->
splitInputToConsumers parallel s source
(\source-> get source
>>= maybe
(return [])
(\x-> liftM (uncurry (++)) $
pipe
(\sink-> put sink x >>= cond (pour source sink >> return []) (return [x]))
(\source-> transduce while' source sink)))
(\source-> pour source sink >> return [])
where while' = connect parallel t (while rest)
nestedIn :: forall m x b. ParallelizableMonad m => [(Bool, (Splitter m x b, Splitter m x b))] -> Splitter m x b
nestedIn ((parallel, (s1, s2)) : rest) =
isolateSplitter $ \ source true false edge ->
liftM fst $
pipePS parallel
(\false-> split s1 source true false edge)
(\source-> pipe
(\true-> pipe (split s2 source true false) consumeAndSuppress)
(\source-> get source
>>= maybe
(return ([], []))
(\x-> pipe
(\sink-> put sink x
>>= cond
(pour source sink >> return [])
(return [x]))
(\source-> split (nestedIn rest) source true false edge))))
foreach :: forall m x b c. (ParallelizableMonad m, Branching c m x [x]) => Bool -> Splitter m x b -> c -> c -> c
foreach parallel s c1 c2 = combineBranches foreach' parallel c1 c2
where foreach' :: forall d. Bool ->
(forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x [x]) ->
(forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x [x]) ->
forall a. OpenConsumer m a d x [x]
foreach' parallel c1 c2 source =
liftM fst $
pipePS parallel
(transduce (splitterToMarker s) (liftSource source :: Source m d x))
(\source-> groupMarks source (maybe c2 (const c1)))
having :: forall m x b1 b2. ParallelizableMonad m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b1
having parallel s1 s2 = isolateSplitter s
where s source true false edge = liftM fst $
pipePS parallel
(transduce (splitterToMarker s1) source)
(flip groupMarks test)
where test Nothing chunk = pour chunk false >> return []
test (Just mb) chunk = pipe
(\sink1-> pipe (tee chunk sink1) getList)
(\chunk-> splitToConsumers s2 chunk
(liftM isJust . get)
consumeAndSuppress
(liftM isJust . get))
>>= \(((), prefix), (_, anyTrue, (), anyEdge))->
if anyTrue Prelude.|| anyEdge
then maybe (return True) (put edge) mb
>> putList prefix true
>>= whenNull (pour chunk true >> return [])
else putList prefix false
>>= whenNull (pour chunk false >> return [])
havingOnly :: forall m x b1 b2. ParallelizableMonad m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b1
havingOnly parallel s1 s2 = isolateSplitter s
where s source true false edge = liftM fst $
pipePS parallel
(transduce (splitterToMarker s1) source)
(flip groupMarks test)
where test Nothing chunk = pour chunk false >> return []
test (Just mb) chunk = pipe
(\sink1-> pipe (tee chunk sink1) getList)
(\chunk-> splitToConsumers s2 chunk
consumeAndSuppress
(liftM isJust . get)
consumeAndSuppress)
>>= \(((), prefix), (_, (), anyFalse, ()))->
if anyFalse
then putList prefix false
>>= whenNull (pour chunk false >> return [])
else maybe (return True) (put edge) mb
>> putList prefix true
>>= whenNull (pour chunk true >> return [])
first :: forall m x b. Monad m => Splitter m x b -> Splitter m x b
first splitter = isolateSplitter $ \ source true false edge ->
liftM (\(x, y)-> y ++ x) $
pipe
(transduce (splitterToMarker splitter) source)
(\source-> let get1 (Left (x, False)) = pass false x get1
get1 (Left (x, True)) = pass true x get2
get1 (Right b) = put edge b
>> get source
>>= maybe (return []) get2
get2 b@Right{} = get3 b
get2 (Left (x, True)) = pass true x get2
get2 (Left (x, False)) = pass false x get3
get3 (Left (x, _)) = pass false x get3
get3 (Right _) = get source >>= maybe (return []) get3
pass sink x next = put sink x
>>= cond
(get source
>>= maybe (return []) next)
(return [x])
in get source >>= maybe (return []) get1)
uptoFirst :: forall m x b. Monad m => Splitter m x b -> Splitter m x b
uptoFirst splitter = isolateSplitter $ \ source true false edge ->
liftM (\(x, y)-> y ++ x) $
pipe
(transduce (splitterToMarker splitter) source)
(\source-> let get1 q (Left (x, False)) = let q' = q |> x
in get source
>>= maybe
(putQueue q' false)
(get1 q')
get1 q p@(Left (_, True)) = putQueue q true
>>= whenNull (get2 p)
get1 q (Right b) = putQueue q true
>>= whenNull (put edge b
>> get source
>>= maybe (return []) get2)
get2 b@Right{} = get3 b
get2 (Left (x, True)) = pass true x get2
get2 (Left (x, False)) = pass false x get3
get3 (Left (x, _)) = pass false x get3
get3 (Right _) = get source >>= maybe (return []) get3
pass sink x next = put sink x
>>= cond
(get source
>>= maybe (return []) next)
(return [x])
in get source >>= maybe (return []) (get1 Seq.empty))
last :: forall m x b. Monad m => Splitter m x b -> Splitter m x b
last splitter = isolateSplitter $ \ source true false edge ->
liftM (\(x, y)-> y ++ x) $
pipe
(transduce (splitterToMarker splitter) source)
(\source-> let get1 (Left (x, False)) = put false x
>>= cond (get source
>>= maybe (return []) get1)
(return [x])
get1 p@(Left (x, True)) = get2 Nothing Seq.empty p
get1 (Right b) = pass (get2 (Just b) Seq.empty)
get2 mb q (Left (x, True)) = let q' = q |> x
in get source
>>= maybe
(flush mb q')
(get2 mb q')
get2 mb q p = get3 mb q Seq.empty p
get3 mb qt qf (Left (x, False)) =
let qf' = qf |> x
in get source
>>= maybe
(flush mb qt >> putQueue qf' false)
(get3 mb qt qf')
get3 mb qt qf p = do rest1 <- putQueue qt false
rest2 <- putQueue qf false
if null rest1 Prelude.&& null rest2
then get1 p
else return (rest1 ++ rest2)
flush mb q = maybe (return True) (put edge) mb
>> putQueue q true
pass succeed = get source >>= maybe (return []) succeed
in pass get1)
lastAndAfter :: forall m x b. Monad m => Splitter m x b -> Splitter m x b
lastAndAfter splitter = isolateSplitter $ \ source true false edge ->
liftM (\(x, y)-> y ++ x) $
pipe
(transduce (splitterToMarker splitter) source)
(\source-> let get1 (Left (x, False)) = put false x
>>= cond
(pass get1)
(return [x])
get1 p@(Left (x, True)) = get2 Nothing Seq.empty p
get1 (Right b) = pass (get2 (Just b) Seq.empty)
get2 mb q (Left (x, True)) = let q' = q |> x
in get source
>>= maybe
(flush mb q')
(get2 mb q')
get2 mb q p = get3 mb q p
get3 mb q (Left (x, False)) = let q' = q |> x
in get source
>>= maybe
(flush mb q')
(get3 mb q')
get3 _ q p@(Left (x, True)) = putQueue q false
>>= whenNull (get1 p)
get3 _ q b'@Right{} = putQueue q false
>>= whenNull (get1 b')
flush mb q = maybe (return True) (put edge) mb
>> putQueue q true
pass succeed = get source >>= maybe (return []) succeed
in pass get1)
prefix :: forall m x b. Monad m => Splitter m x b -> Splitter m x b
prefix splitter = isolateSplitter $ \ source true false edge ->
liftM (\(x, y)-> y ++ x) $
pipe
(transduce (splitterToMarker splitter) source)
(\source-> let get0 p@Left{} = get1 p
get0 (Right b) = put edge b
>> get source
>>= maybe (return []) get1
get1 (Left (x, False)) = pass false x get2
get1 (Left (x, True)) = pass true x get1
get1 (Right b) = get source >>= maybe (return []) get2
get2 (Left (x, _)) = pass false x get2
get2 Right{} = get source >>= maybe (return []) get2
pass sink x next = put sink x
>>= cond
(get source
>>= maybe (return []) next)
(return [x])
in get source >>= maybe (return []) get0)
suffix :: forall m x b. Monad m => Splitter m x b -> Splitter m x b
suffix splitter = isolateSplitter $ \ source true false edge ->
liftM (\(x, y)-> y ++ x) $
pipe
(transduce (splitterToMarker splitter) source)
(\source-> let get1 (Left (x, False)) = put false x
>>= cond (p get1) (return [x])
get1 (Left (x, True)) = get2 Nothing (Seq.singleton x)
get1 (Right b) = get2 (Just b) Seq.empty
get2 mb q = get source
>>= maybe
(maybe (return True) (put edge) mb
>> putQueue q true)
(get3 mb q)
get3 mb q (Left (x, True)) = get2 mb (q |> x)
get3 mb q p@(Left (x, False)) =
putQueue q false
>>= \rest-> if null rest
then get1 p
else return (rest ++ [x])
get3 mb q (Right b) = putQueue q false
>>= whenNull (get2 (Just b) Seq.empty)
p succeed = get source >>= maybe (return []) succeed
in p get1)
even :: forall m x b. Monad m => Splitter m x b -> Splitter m x b
even splitter = isolateSplitter $ \ source true false edge ->
liftM (\(x, y)-> y ++ x) $
pipe
(transduce (splitterToMarker splitter) source)
(\source-> let get1 (Left (x, False)) = put false x
>>= cond (next get1) (return [x])
get1 p@(Left (x, True)) = get2 p
get1 (Right b) = next get2
get2 (Left (x, True)) = put false x
>>= cond (next get2) (return [x])
get2 p@(Left (x, False)) = get3 p
get2 (Right b) = put edge b >> next get4
get3 (Left (x, False)) = put false x
>>= cond (next get3) (return [x])
get3 p@(Left (x, True)) = get4 p
get3 (Right b) = put edge b >> next get4
get4 (Left (x, True)) = put true x
>>= cond (next get4) (return [x])
get4 p@(Left (x, False)) = get1 p
get4 (Right b) = next get2
next g = get source >>= maybe (return []) g
in next get1)
startOf :: forall m x b. Monad m => Splitter m x b -> Splitter m x (Maybe b)
startOf splitter = isolateSplitter $ \ source true false edge ->
liftM (\(x, y)-> y ++ x) $
pipe
(transduce (splitterToMarker splitter) source)
(\source-> let get1 (Left (x, False)) = put false x
>>= cond
(next get1)
(return [x])
get1 p@(Left (x, True)) = put edge Nothing >> get2 p
get1 (Right b) = put edge (Just b)
>> next get2
get2 (Left (x, True)) = put false x
>>= cond
(next get2)
(return [x])
get2 p = get1 p
next g = get source >>= maybe (return []) g
in next get1)
endOf :: forall m x b. Monad m => Splitter m x b -> Splitter m x (Maybe b)
endOf splitter = isolateSplitter $ \ source true false edge ->
liftM (\(x, y)-> y ++ x) $
pipe
(transduce (splitterToMarker splitter) source)
(\source-> let get1 (Left (x, False)) = put false x
>>= cond
(next get1)
(return [x])
get1 p@(Left (x, True)) = get2 Nothing p
get1 (Right b) = next (get2 $ Just b)
get2 mb (Left (x, True))
= put false x
>>= cond (next $ get2 mb) (return [x])
get2 mb p@(Left (x, False)) = put edge mb >> get1 p
get2 mb (Right b) = put edge mb >> next (get2 $ Just b)
next g = get source >>= maybe (return []) g
in next get1)
followedBy :: forall m x b1 b2. ParallelizableMonad m =>
Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2)
followedBy parallel s1 s2 =
isolateSplitter $ \ source true false edge ->
liftM (\(x, y)-> y ++ x) $
pipePS parallel
(transduce (splitterToMarker s1) source)
(\source-> let get0 q = case Seq.viewl q
of Seq.EmptyL -> get source >>= maybe (return []) get1
(Left (x, False)) :< rest -> put false x
>>= cond
(get0 rest)
(return
$ concatMap (either ((:[]) . fst) (const []))
$ Foldable.toList $ Seq.viewl q)
(Left (x, True)) :< rest -> get2 Nothing Seq.empty q
(Right b) :< rest -> get2 (Just b) Seq.empty rest
get1 (Left (x, False)) = put false x
>>= cond (get source >>= maybe (return []) get1)
(return [x])
get1 p@(Left (x, True)) = get2 Nothing Seq.empty (Seq.singleton p)
get1 (Right b) = get2 (Just b) Seq.empty Seq.empty
get2 mb q q' = case Seq.viewl q'
of Seq.EmptyL -> get source
>>= maybe (testEnd mb q) (get2 mb q . Seq.singleton)
(Left (x, True)) :< rest -> get2 mb (q |> x) rest
(Left (x, False)) :< rest -> get3 mb q q'
Right{} :< rest -> get3 mb q q'
get3 mb q q' = do ((q1, q2), n) <- pipe (get7 Seq.empty q') (test mb q)
case n of Nothing -> putQueue q false
>>= whenNull (get0 (q1 >< q2))
Just 0 -> get0 (q1 >< q2)
Just n -> get8 (Just mb) n (q1 >< q2)
get7 q1 q2 sink = canPut sink
>>= cond (case Seq.viewl q2
of Seq.EmptyL -> get source
>>= maybe (return (q1, q2))
(\p-> either
(put sink . fst)
(const $ return True)
p
>> get7 (q1 |> p) q2 sink)
p :< rest -> either
(put sink . fst)
(const $ return True) p
>> get7 (q1 |> p) rest sink)
(return (q1, q2))
testEnd mb q = do ((), n) <- pipe (const $ return ()) (test mb q)
case n of Nothing -> putQueue q false
_ -> return []
test mb q source = liftM snd $
pipe
(transduce (splitterToMarker s2) source)
(\source-> let get4 (Left (_, False)) = return Nothing
get4 p@(Left (_, True)) = putQueue q true
>> get5 0 p
get4 p@(Right b) = maybe
(return True)
(\b1-> put edge (b1, b)) mb
>> putQueue q true
>> get6 0
get5 n (Left (x, True)) = put true x
>> get6 (succ n)
get5 n _ = return (Just n)
get6 n = get source
>>= maybe
(return $ Just n)
(get5 n)
in get source >>= maybe (return Nothing) get4)
get8 Nothing 0 q = get0 q
get8 (Just mb) 0 q = get2 mb Seq.empty q
get8 mmb n q = case Seq.viewl q of Left (x, False) :< rest -> get8 Nothing (pred n) rest
Left (x, True) :< rest
-> get8 (maybe (Just Nothing) Just mmb) (pred n) rest
Right b :< rest -> get8 (Just (Just b)) n rest
in get0 Seq.empty)
between :: forall m x b1 b2. ParallelizableMonad m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b1
between parallel s1 s2 = isolateSplitter $ \ source true false edge ->
liftM (\(x, y)-> y ++ x) $
pipePS parallel
(transduce (splittersToPairMarker parallel s1 s2) source)
(\source-> let next n = get source >>= maybe (return []) (state n)
pass n x = (if n > 0 then put true x else put false x)
>>= cond (next n) (return [x])
pass' n x = (if n >= 0 then put true x else put false x)
>>= cond (next n) (return [x])
state n (Left (x, True, False)) = pass (succ n) x
state n (Left (x, False, True)) = pass' (pred n) x
state n (Left (x, True, True)) = pass' n x
state n (Left (x, False, False)) = pass n x
state 0 (Right (Left b)) = put edge b >> next 1
state n (Right (Left _)) = next (succ n)
state n (Right (Right _)) = next (pred n)
in next 0)
splitterToMarker :: forall m x b. Monad m => Splitter m x b -> Transducer m x (Either (x, Bool) b)
splitterToMarker s = isolateTransducer $ \source sink->
let mark f source = canPut sink
>>= cond
(get source
>>= maybe (return [])
(\x-> put sink (f x)
>>= cond
(mark f source)
(return [x])))
(return [])
in liftM (\(x, y, z, _)-> z ++ y ++ x) $
splitToConsumers s source
(mark (\x-> Left (x, True)))
(mark (\x-> Left (x, False)))
(mark Right)
splittersToPairMarker :: forall m x b1 b2. (ParallelizableMonad m) => Bool -> Splitter m x b1 -> Splitter m x b2 ->
Transducer m x (Either (x, Bool, Bool) (Either b1 b2))
splittersToPairMarker parallel s1 s2 =
let t source sink =
liftM (\(((_, _), (x, _, _, _)), _)-> x) $
pipe
(\sync-> pipePS parallel
(\sink1-> pipe
(tee source sink1)
(\source2-> splitToConsumers s2 source2
(flip (pourMap (\x-> Left ((x, True), False))) sync)
(flip (pourMap (\x-> Left ((x, False), False))) sync)
(flip (pourMap (Right . Right)) sync)))
(\source1-> splitToConsumers s1 source1
(flip (pourMap (\x-> Left ((x, True), True))) sync)
(flip (pourMap (\x-> Left ((x, False), True))) sync)
(flip (pourMap (Right. Left)) sync)))
(synchronizeMarks Nothing sink)
synchronizeMarks state sink source = get source
>>= maybe
(assert (isNothing state) (return []))
(handleMark state sink source)
handleMark Nothing sink source (Right b) = put sink (Right b)
>> synchronizeMarks Nothing sink source
handleMark Nothing sink source (Left (p, first))
= synchronizeMarks (Just (Seq.singleton (Left p), first)) sink source
handleMark state@(Just (q, first)) sink source (Left (p, first')) | first == first'
= synchronizeMarks (Just (q |> Left p, first)) sink source
handleMark state@(Just (q, True)) sink source (Right b@Left{})
= synchronizeMarks (Just (q |> Right b, True)) sink source
handleMark state@(Just (q, False)) sink source (Right b@Right{})
= synchronizeMarks (Just (q |> Right b, False)) sink source
handleMark state sink source (Right b) = put sink (Right b) >> synchronizeMarks state sink source
handleMark state@(Just (q, pos')) sink source mark@(Left ((x, t), pos))
= case Seq.viewl q
of Seq.EmptyL -> synchronizeMarks (Just (Seq.singleton (Left (x, t)), pos)) sink source
Right b :< rest -> put sink (Right b)
>>= cond
(handleMark
(if Seq.null rest then Nothing else Just (rest, pos'))
sink
source
mark)
(returnQueuedList q)
Left (y, t') :< rest -> put sink (Left $ if pos then (y, t, t') else (y, t', t))
>>= cond
(synchronizeMarks
(if Seq.null rest then Nothing else Just (rest, pos'))
sink
source)
(returnQueuedList q)
returnQueuedList q = return $ concatMap (either ((:[]) . fst) (const [])) $ Foldable.toList $ Seq.viewl q
in isolateTransducer t
zipSplittersWith :: forall m x b1 b2 b. ParallelizableMonad m =>
(Bool -> Bool -> Bool) ->
(forall a1 a2 d. (AncestorFunctor a1 d, AncestorFunctor a2 d) =>
Source m a1 (Either b1 b2) -> Sink m a2 b -> Coroutine d m ()) ->
Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b
zipSplittersWith f boundaries parallel s1 s2
= isolateSplitter $ \ source true false edge ->
liftM (\((x, y), _)-> y ++ x) $
pipe
(\edge->
pipePS parallel
(transduce (splittersToPairMarker parallel s1 s2) source)
(\source-> let split = get source
>>= maybe
(return [])
(either
test
(\b-> put edge b >> split))
test (x, t1, t2) = (if f t1 t2 then put true x else put false x)
>>= cond split (return [x])
in split))
(flip boundaries edge)
groupMarks :: (Monad m, AncestorFunctor a d, AncestorFunctor a (SinkFunctor d x)) =>
Source m a (Either (x, Bool) b) ->
(Maybe (Maybe b) -> Source m (SourceFunctor d x) x -> Coroutine (SourceFunctor d x) m r) ->
Coroutine d m ()
groupMarks source getConsumer = start
where start = getSuccess source (either startContent startRegion)
startContent (x, False) = pipe (\sink-> pass False sink x) (getConsumer Nothing)
>>= maybe (return ()) (either startContent startRegion) . fst
startContent (x, True) = pipe (\sink-> pass True sink x) (getConsumer $ Just Nothing)
>>= maybe (return ()) (either startContent startRegion) . fst
startRegion b = pipe (next True) (getConsumer (Just $ Just b))
>>= maybe (return ()) (either startContent startRegion) . fst
pass t sink x = put sink x >> next t sink
next t sink = get source >>= maybe (return Nothing) (continue t sink)
continue t sink (Left (x, t')) | t == t' = pass t sink x
continue t sink p = return (Just p)
suppressProducer :: forall m a x r. (Functor a, Monad m) =>
(Sink m (SinkFunctor a x) x -> Coroutine (SinkFunctor a x) m r) -> Coroutine a m r
suppressProducer producer = liftM fst $ pipe producer consumeAndSuppress