module Control.Concurrent.SCC.Combinators
(
splitterToMarker,
consumeBy, prepend, append, substitute,
PipeableComponentPair ((>->)), JoinableComponentPair (join, sequence),
snot, (>&), (>|),
(&&), (||),
ifs, wherever, unless, select,
while, nestedIn,
foreach, having, havingOnly, followedBy, even,
first, uptoFirst, prefix,
last, lastAndAfter, suffix,
startOf, endOf,
(...),
parseRegions, parseNestedRegions,
groupMarks)
where
import Control.Concurrent.SCC.Foundation
import Control.Concurrent.SCC.ComponentTypes
import Prelude hiding (even, last, sequence, (||), (&&))
import qualified Prelude
import Control.Exception (assert)
import Control.Monad (liftM, when)
import qualified Control.Monad as Monad
import Data.Maybe (isJust, isNothing, fromJust)
import Data.Typeable (Typeable)
import qualified Data.Foldable as Foldable
import qualified Data.Sequence as Seq
import Data.Sequence (Seq, (|>), (><), ViewL (EmptyL, (:<)))
import Debug.Trace (trace)
consumeBy :: forall m x y r. (Monad m, Typeable x) => Consumer m x r -> Transducer m x y
consumeBy c = liftTransducer "consumeBy" (maxUsableThreads c) $
\threads-> let c' = usingThreads threads c
in (ComponentConfiguration [AnyComponent c'] (usedThreads c') (cost c'),
\ source _sink -> consume c' source >> return [])
class PipeableComponentPair (m :: * -> *) w c1 c2 c3 | c1 c2 -> c3, c1 c3 -> c2, c2 c3 -> c2,
c1 -> m w, c2 -> m w, c3 -> m
where (>->) :: c1 -> c2 -> c3
instance (ParallelizableMonad m, Typeable x)
=> PipeableComponentPair m x (Producer m x ()) (Consumer m x ()) (Performer m ())
where p >-> c = liftPerformer ">->" (maxUsableThreads p `max` maxUsableThreads c) $
\threads-> let (configuration, p', c', parallel) = optimalTwoParallelConfigurations threads p c
performPipe = (if parallel then pipeP else pipe) (produce p') (consume c') >> return ()
in (configuration, performPipe)
instance (ParallelizableMonad m, Typeable x, Typeable y)
=> PipeableComponentPair m y (Transducer m x y) (Consumer m y r) (Consumer m x r)
where t >-> c = liftConsumer ">->" (maxUsableThreads t `max` maxUsableThreads c) $
\threads-> let (configuration, t', c', parallel) = optimalTwoParallelConfigurations threads t c
consumePipe source = liftM snd $ (if parallel then pipeP else pipe)
(transduce t' source)
(consume c')
in (configuration, consumePipe)
instance (ParallelizableMonad m, Typeable x, Typeable y)
=> PipeableComponentPair m x (Producer m x r) (Transducer m x y) (Producer m y r)
where p >-> t = liftProducer ">->" (maxUsableThreads t `max` maxUsableThreads p) $
\threads-> let (configuration, p', t', parallel) = optimalTwoParallelConfigurations threads p t
producePipe sink = liftM fst $ (if parallel then pipeP else pipe)
(produce p')
(\source-> transduce t' source sink)
in (configuration, producePipe)
instance ParallelizableMonad m => PipeableComponentPair m y (Transducer m x y) (Transducer m y z) (Transducer m x z)
where t1 >-> t2 = liftTransducer ">->" (maxUsableThreads t1 + maxUsableThreads t2) $
\threads-> let (configuration, t1', t2', parallel) = optimalTwoParallelConfigurations threads t1 t2
transducePipe source sink = liftM fst $ (if parallel then pipeP else pipe)
(transduce t1' source)
(\source-> transduce t2' source sink)
in (configuration, transducePipe)
class Component c => CompatibleSignature c cons (m :: * -> *) input output | c -> cons m
class AnyListOrUnit c
instance AnyListOrUnit [x]
instance AnyListOrUnit ()
instance (AnyListOrUnit x, AnyListOrUnit y) => CompatibleSignature (Performer m r) (PerformerType r) m x y
instance AnyListOrUnit y => CompatibleSignature (Consumer m x r) (ConsumerType r) m [x] y
instance AnyListOrUnit y => CompatibleSignature (Producer m x r) (ProducerType r) m y [x]
instance CompatibleSignature (Transducer m x y) TransducerType m [x] [y]
data PerformerType r
data ConsumerType r
data ProducerType r
data TransducerType
class (Monad m, CompatibleSignature c1 t1 m x y, CompatibleSignature c2 t2 m x y, CompatibleSignature c3 t3 m x y)
=> JoinableComponentPair t1 t2 t3 m x y c1 c2 c3 | c1 c2 -> c3, c1 -> t1 m, c2 -> t2 m, c3 -> t3 m x y,
t1 m x y -> c1, t2 m x y -> c2, t3 m x y -> c3
where join :: c1 -> c2 -> c3
sequence :: c1 -> c2 -> c3
join = sequence
instance forall m x any r1 r2. (Monad m, Typeable x)
=> JoinableComponentPair (ProducerType r1) (ProducerType r2) (ProducerType r2) m () [x] (Producer m x r1) (Producer m x r2) (Producer m x r2)
where sequence p1 p2 = liftProducer "sequence" (maxUsableThreads p1 `max` maxUsableThreads p2) $
\threads-> let (configuration, p1', p2') = optimalTwoSequentialConfigurations threads p1 p2
produceJoin sink = produce p1' sink >> produce p2' sink
in (configuration, produceJoin)
instance forall m x any. (ParallelizableMonad m, Typeable x)
=> JoinableComponentPair (ConsumerType ()) (ConsumerType ()) (ConsumerType ()) m [x] () (Consumer m x ()) (Consumer m x ()) (Consumer m x ())
where join c1 c2 = liftConsumer "join" (maxUsableThreads c1 + maxUsableThreads c2) $
\threads-> let (configuration, c1', c2', parallel) = optimalTwoParallelConfigurations threads c1 c2
consumeJoin source = do (if parallel then pipeP else pipe)
(\sink1-> pipe (tee source sink1) (consume c2'))
(consume c1')
return ()
in (configuration, consumeJoin)
sequence c1 c2 = liftConsumer "sequence" (maxUsableThreads c1 `max` maxUsableThreads c2) $
\threads-> let (configuration, c1', c2') = optimalTwoSequentialConfigurations threads c1 c2
consumeJoin source = pipe
(\buffer-> pipe (tee source buffer) (consume c1'))
getList
>>= \(_, list)-> pipe (putList list) (consume c2')
>> return ()
in (configuration, consumeJoin)
instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
=> JoinableComponentPair TransducerType TransducerType TransducerType m [x] [y] (Transducer m x y) (Transducer m x y) (Transducer m x y)
where join t1 t2 = liftTransducer "join" (maxUsableThreads t1 + maxUsableThreads t2) $
\threads-> let (configuration, t1', t2', parallel) = optimalTwoParallelConfigurations threads t1 t2
transduce' source sink = pipe
(\buffer-> (if parallel then pipeP else pipe)
(\sink1-> pipe
(\sink2-> tee source sink1 sink2)
(\src-> transduce t2' src buffer))
(\source-> transduce t1' source sink))
getList
>>= \(_, list)-> putList list sink
>> getList source
in (configuration, transduce')
sequence t1 t2 = liftTransducer "sequence" (maxUsableThreads t1 `max` maxUsableThreads t2) $
\threads-> let (configuration, t1', t2') = optimalTwoSequentialConfigurations threads t1 t2
transduce' source sink = pipe
(\buffer-> pipe
(tee source buffer)
(\source-> transduce t1 source sink))
getList
>>= \(_, list)-> pipe
(\sink-> putList list sink
>>= whenNull
(pour source sink
>> return []))
(\source-> transduce t2 source sink)
>>= return . fst
in (configuration, transduce')
instance forall m r1 r2. ParallelizableMonad m
=> JoinableComponentPair (PerformerType r1) (PerformerType r2) (PerformerType r2) m () () (Performer m r1) (Performer m r2) (Performer m r2)
where join p1 p2 = liftPerformer "join" (maxUsableThreads p1 + maxUsableThreads p2) $
\threads-> let (configuration, p1', p2', parallel) = optimalTwoParallelConfigurations threads p1 p2
in (configuration, if parallel then liftM snd $ perform p1' `parallelize` perform p2'
else perform p1' >> perform p2')
sequence p1 p2 = liftPerformer "sequence" (maxUsableThreads p1 `max` maxUsableThreads p2) $
\threads-> let (configuration, p1', p2') = optimalTwoSequentialConfigurations threads p1 p2
in (configuration, perform p1' >> perform p2')
instance forall m x r1 r2. (ParallelizableMonad m, Typeable x)
=> JoinableComponentPair (PerformerType r1) (ProducerType r2) (ProducerType r2) m () [x] (Performer m r1) (Producer m x r2) (Producer m x r2)
where join pe pr = liftProducer "join" (maxUsableThreads pe + maxUsableThreads pr) $
\threads-> let (configuration, pe', pr', parallel) = optimalTwoParallelConfigurations threads pe pr
produceJoin sink = if parallel then liftM snd (perform pe' `parallelize` produce pr' sink)
else perform pe' >> produce pr' sink
in (configuration, produceJoin)
sequence pe pr = liftProducer "sequence" (maxUsableThreads pe `max` maxUsableThreads pr) $
\threads-> let (configuration, pe', pr') = optimalTwoSequentialConfigurations threads pe pr
produceJoin sink = perform pe' >> produce pr' sink
in (configuration, produceJoin)
instance forall m x r1 r2. (ParallelizableMonad m, Typeable x)
=> JoinableComponentPair (ProducerType r1) (PerformerType r2) (ProducerType r2) m () [x] (Producer m x r1) (Performer m r2) (Producer m x r2)
where join pr pe = liftProducer "join" (maxUsableThreads pr + maxUsableThreads pe) $
\threads-> let (configuration, pr', pe', parallel) = optimalTwoParallelConfigurations threads pr pe
produceJoin sink = if parallel then liftM snd (produce pr' sink `parallelize` perform pe')
else produce pr' sink >> perform pe'
in (configuration, produceJoin)
sequence pr pe = liftProducer "sequence" (maxUsableThreads pr `max` maxUsableThreads pe) $
\threads-> let (configuration, pr', pe') = optimalTwoSequentialConfigurations threads pr pe
produceJoin sink = produce pr' sink >> perform pe'
in (configuration, produceJoin)
instance forall m x r1 r2. (ParallelizableMonad m, Typeable x)
=> JoinableComponentPair (PerformerType r1) (ConsumerType r2) (ConsumerType r2) m [x] () (Performer m r1) (Consumer m x r2) (Consumer m x r2)
where join p c = liftConsumer "join" (maxUsableThreads p + maxUsableThreads c) $
\threads-> let (configuration, p', c', parallel) = optimalTwoParallelConfigurations threads p c
consumeJoin source = if parallel then liftM snd (perform p' `parallelize` consume c' source)
else perform p' >> consume c' source
in (configuration, consumeJoin)
sequence p c = liftConsumer "sequence" (maxUsableThreads p `max` maxUsableThreads c) $
\threads-> let (configuration, p', c') = optimalTwoSequentialConfigurations threads p c
consumeJoin source = perform p' >> consume c' source
in (configuration, consumeJoin)
instance forall m x r1 r2. (ParallelizableMonad m, Typeable x)
=> JoinableComponentPair (ConsumerType r1) (PerformerType r2) (ConsumerType r2) m [x] () (Consumer m x r1) (Performer m r2) (Consumer m x r2)
where join c p = liftConsumer "join" (maxUsableThreads c + maxUsableThreads p) $
\threads-> let (configuration, c', p', parallel) = optimalTwoParallelConfigurations threads c p
consumeJoin source = if parallel then liftM snd (consume c' source `parallelize` perform p')
else consume c' source >> perform p'
in (configuration, consumeJoin)
sequence c p = liftConsumer "sequence" (maxUsableThreads c `max` maxUsableThreads p) $
\threads-> let (configuration, c', p') = optimalTwoSequentialConfigurations threads c p
consumeJoin source = consume c' source >> perform p'
in (configuration, consumeJoin)
instance forall m x y r. (ParallelizableMonad m, Typeable x, Typeable y)
=> JoinableComponentPair (PerformerType r) TransducerType TransducerType m [x] [y] (Performer m r) (Transducer m x y) (Transducer m x y)
where join p t = liftTransducer "join" (maxUsableThreads p + maxUsableThreads t) $
\threads-> let (configuration, p', t', parallel) = optimalTwoParallelConfigurations threads p t
join' source sink = if parallel then liftM snd (perform p'
`parallelize` transduce t' source sink)
else perform p' >> transduce t' source sink
in (configuration, join')
sequence p t = liftTransducer "sequence" (maxUsableThreads p `max` maxUsableThreads t) $
\threads-> let (configuration, p', t') = optimalTwoSequentialConfigurations threads p t
join' source sink = perform p' >> transduce t' source sink
in (configuration, join')
instance forall m x y r. (ParallelizableMonad m, Typeable x, Typeable y)
=> JoinableComponentPair TransducerType (PerformerType r) TransducerType m [x] [y] (Transducer m x y) (Performer m r) (Transducer m x y)
where join t p = liftTransducer "join" (maxUsableThreads t + maxUsableThreads p) $
\threads-> let (configuration, t', p', parallel) = optimalTwoParallelConfigurations threads t p
join' source sink = if parallel then liftM fst (transduce t' source sink
`parallelize` perform p')
else do result <- transduce t' source sink
perform p'
return result
in (configuration, join')
sequence t p = liftTransducer "sequence" (maxUsableThreads t `max` maxUsableThreads p) $
\threads-> let (configuration, t', p') = optimalTwoSequentialConfigurations threads t p
join' source sink = do result <- transduce t' source sink
perform p'
return result
in (configuration, join')
instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
=> JoinableComponentPair (ProducerType ()) TransducerType TransducerType m [x] [y] (Producer m y ()) (Transducer m x y) (Transducer m x y)
where join p t = liftTransducer "join" (maxUsableThreads p + maxUsableThreads t) $
\threads-> let (configuration, p', t', parallel) = optimalTwoParallelConfigurations threads p t
join' source sink = if parallel
then do ((_, rest), out) <- pipe
(\buffer-> produce p' sink `parallelize`
transduce t' source buffer)
getList
putList out sink
return rest
else produce p' sink >> transduce t' source sink
in (configuration, join')
sequence p t = liftTransducer "sequence" (maxUsableThreads p `max` maxUsableThreads t) $
\threads-> let (configuration, p', t') = optimalTwoSequentialConfigurations threads p t
join' source sink = produce p' sink >> transduce t' source sink
in (configuration, join')
instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
=> JoinableComponentPair TransducerType (ProducerType ()) TransducerType m [x] [y] (Transducer m x y) (Producer m y ()) (Transducer m x y)
where join t p = liftTransducer "join" (maxUsableThreads t `max` maxUsableThreads p) $
\threads-> let (configuration, t', p', parallel) = optimalTwoParallelConfigurations threads t p
join' source sink = if parallel
then do ((rest, ()), out) <- pipe
(\buffer-> transduce t' source sink
`parallelize` produce p' buffer)
getList
putList out sink
return rest
else do result <- transduce t' source sink
produce p' sink
return result
in (configuration, join')
sequence t p = liftTransducer "sequence" (maxUsableThreads t `max` maxUsableThreads p) $
\threads-> let (configuration, t', p') = optimalTwoSequentialConfigurations threads t p
join' source sink = do result <- transduce t' source sink
produce p' sink
return result
in (configuration, join')
instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
=> JoinableComponentPair (ConsumerType ()) TransducerType TransducerType m [x] [y] (Consumer m x ()) (Transducer m x y) (Transducer m x y)
where join c t = liftTransducer "join" (maxUsableThreads c + maxUsableThreads t) $
\threads-> let (configuration, c', t', parallel) = optimalTwoParallelConfigurations threads c t
join' source sink = liftM (snd . fst) $
(if parallel then pipeP else pipe)
(\sink1-> pipe
(tee source sink1)
(\source-> transduce t' source sink))
(consume c')
in (configuration, join')
sequence c t = liftTransducer "sequence" (maxUsableThreads c `max` maxUsableThreads t) $
\threads-> let (configuration, c', t') = optimalTwoSequentialConfigurations threads c t
sequence' source sink = pipe
(\buffer-> pipe
(tee source buffer)
(consume c'))
getList
>>= \(_, list)-> pipe
(\sink-> putList list sink
>>= whenNull (pour source sink
>> return []))
(\source-> transduce t' source sink)
>>= return . fst
in (configuration, sequence')
instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
=> JoinableComponentPair TransducerType (ConsumerType ()) TransducerType m [x] [y] (Transducer m x y) (Consumer m x ()) (Transducer m x y)
where join t c = join c t
sequence t c = liftTransducer "sequence" (maxUsableThreads t `max` maxUsableThreads c) $
\threads-> let (configuration, t', c') = optimalTwoSequentialConfigurations threads t c
sequence' source sink = pipe
(\buffer-> pipe
(tee source buffer)
(\source-> transduce t' source sink))
getList
>>= \(_, list)-> pipe
(\sink-> putList list sink
>>= whenNull (pour source sink
>> return []))
(consume c')
>>= return . fst
in (configuration, sequence')
instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
=> JoinableComponentPair (ProducerType ()) (ConsumerType ()) TransducerType m [x] [y] (Producer m y ()) (Consumer m x ()) (Transducer m x y)
where join p c = liftTransducer "sequence" (maxUsableThreads p + maxUsableThreads c) $
\threads-> let (configuration, p', c', parallel) = optimalTwoParallelConfigurations threads p c
join' source sink = if parallel then produce p' sink >> consume c' source >> return []
else parallelize (produce p' sink) (consume c' source) >> return []
in (configuration, join')
sequence p c = liftTransducer "sequence" (maxUsableThreads p `max` maxUsableThreads c) $
\threads-> let (configuration, p', c') = optimalTwoSequentialConfigurations threads p c
join' source sink = produce p' sink >> consume c' source >> return []
in (configuration, join')
instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
=> JoinableComponentPair (ConsumerType ()) (ProducerType ()) TransducerType m [x] [y] (Consumer m x ()) (Producer m y ()) (Transducer m x y)
where join c p = join p c
sequence c p = liftTransducer "sequence" (maxUsableThreads c `max` maxUsableThreads p) $
\threads-> let (configuration, c', p') = optimalTwoSequentialConfigurations threads c p
join' source sink = consume c' source >> produce p' sink >> return []
in (configuration, join')
prepend :: forall m x r. (Monad m, Typeable x) => Producer m x r -> Transducer m x x
prepend prefix = liftTransducer "prepend" (maxUsableThreads prefix) $
\threads-> let prefix' = usingThreads threads prefix
prepend' source sink = produce prefix' sink >> pour source sink >> return []
in (ComponentConfiguration [AnyComponent prefix] threads (cost prefix'), prepend')
append :: forall m x r. (Monad m, Typeable x) => Producer m x r -> Transducer m x x
append suffix = liftTransducer "append" (maxUsableThreads suffix) $
\threads-> let suffix' = usingThreads threads suffix
append' source sink = pour source sink >> produce suffix' sink >> return []
in (ComponentConfiguration [AnyComponent suffix] threads (cost suffix'), append')
substitute :: forall m x y r. (Monad m, Typeable x, Typeable y) => Producer m y r -> Transducer m x y
substitute feed = liftTransducer "substitute" (maxUsableThreads feed) $
\threads-> let feed' = usingThreads threads feed
substitute' source sink = consumeAndSuppress source >> produce feed' sink >> return []
in (ComponentConfiguration [AnyComponent feed] threads (cost feed'), substitute')
snot :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x b
snot splitter = liftSplitter "not" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
not source true false edge = liftM fst $
pipe
(split splitter source false true)
consumeAndSuppress
in (ComponentConfiguration [AnyComponent splitter'] threads (cost splitter'), not)
(>&) :: (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2) => Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2)
s1 >& s2 = liftSplitter ">&" (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
s source true false edge = liftM (fst . fst . fst . fst) $
pipe
(\edges->
pipe
(\edge1-> pipe
(\edge2-> (if parallel then pipeP else pipe)
(\true-> split s1' source true false edge1)
(\source-> split s2' source true false edge2))
(flip (pourMap Right) edges))
(flip (pourMap Left) edges))
(flip intersectRegions edge)
in (configuration, s)
intersectRegions source sink = next Nothing Nothing
where next lastLeft lastRight = get source
>>= maybe
(return ())
(either
(flip pair lastRight . Just)
(pair lastLeft . Just))
pair l@(Just x) r@(Just y) = put sink (x, y)
>>= flip when (next Nothing Nothing)
pair l r = next l r
(>|) :: forall m x b1 b2. (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2)
=> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (Either b1 b2)
s1 >| s2 = liftSplitter ">|" (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
s source true false edge = liftM (fst . fst . fst) $
pipe
(\edge1-> pipe
(\edge2-> (if parallel then pipeP else pipe)
(\false-> split s1' source true false edge1)
(\source-> split s2' source true false edge2))
(flip (pourMap Right) edge))
(flip (pourMap Left) edge)
in (configuration, s)
(&&) :: (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2) => Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2)
s1 && s2 = liftSplitter "&&" (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
s source true false edge = liftM (\(x, y)-> y ++ x) $
(if parallel then pipeP else pipe)
(transduce (splittersToPairMarker s1' s2') source)
(\source-> let split l r = get source
>>= maybe
(return [])
(test l r)
test l r (Left (x, t1, t2))
= put (if t1 Prelude.&& t2 then true else false) x
>>= cond
(split
(if t1 then l else Nothing)
(if t2 then r else Nothing))
(return [x])
test _ Nothing (Right (Left l)) = split (Just l) Nothing
test _ (Just r) (Right (Left l))
= put edge (l, r) >> split (Just l) (Just r)
test Nothing _ (Right (Right r)) = split Nothing (Just r)
test (Just l) _ (Right (Right r))
= put edge (l, r) >> split (Just l) (Just r)
in split Nothing Nothing)
in (configuration, s)
(||) :: (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2)
=> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (Either b1 b2)
(||) = zipSplittersWith (Prelude.||) pour
ifs :: (ParallelizableMonad m, Typeable x, Typeable b, BranchComponent cc m x [x]) => Splitter m x b -> cc -> cc -> cc
ifs s = combineBranches "if" (cost s) (\ parallel c1 c2 -> \source-> splitInputToConsumers parallel s source c1 c2)
wherever :: (ParallelizableMonad m, Typeable x, Typeable b) => Transducer m x x -> Splitter m x b -> Transducer m x x
wherever t s = liftTransducer "wherever" (maxUsableThreads s + maxUsableThreads t) $
\threads-> let (configuration, s', t', parallel) = optimalTwoParallelConfigurations threads s t
wherever' source sink = splitInputToConsumers parallel s source
(\source-> transduce t source sink)
(\source-> pour source sink >> return [])
in (configuration, wherever')
unless :: (ParallelizableMonad m, Typeable x, Typeable b) => Transducer m x x -> Splitter m x b -> Transducer m x x
unless t s = liftTransducer "unless" (maxUsableThreads s + maxUsableThreads t) $
\threads-> let (configuration, s', t', parallel) = optimalTwoParallelConfigurations threads s t
unless' source sink = splitInputToConsumers parallel s source
(\source-> pour source sink >> return [])
(\source-> transduce t source sink)
in (configuration, unless')
select :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Transducer m x x
select s = liftTransducer "select" (maxUsableThreads s) $
\threads-> let s' = usingThreads threads s
transduce' source sink = splitInputToConsumers False s' source
(\source-> pour source sink >> return [])
(\source-> consumeAndSuppress source >> return [])
in (ComponentConfiguration [AnyComponent s'] threads (cost s' + 1), transduce')
parseRegions :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Parser m x b
parseRegions s = liftTransducer "parseRegions" (maxUsableThreads s) $
\threads-> let s' = usingThreads threads s
transduce' source sink = liftM (\(x, y)-> y ++ x) $
pipe
(transduce (splitterToMarker s') source)
(\source-> wrapRegions source sink)
wrapRegions source sink = let wrap0 mb = get source
>>= maybe
(maybe (return True) flush mb >> return [])
(wrap1 mb)
wrap1 Nothing (Left (x, _)) = put sink (Content x)
>>= cond (wrap0 Nothing) (return [x])
wrap1 (Just p) (Left (x, False)) = flush p
>> put sink (Content x)
>>= cond
(wrap0 Nothing)
(return [x])
wrap1 (Just (b, t)) (Left (x, True))
= (if t then return True else put sink (Markup (Start b)))
>> put sink (Content x)
>>= cond (wrap0 (Just (b, True))) (return [x])
wrap1 (Just p) (Right b') = flush p >> wrap0 (Just (b', False))
wrap1 Nothing (Right b) = wrap0 (Just (b, False))
flush (b, t) = put sink $ Markup $ (if t then End else Point) b
in wrap0 Nothing
in (ComponentConfiguration [AnyComponent s'] threads (cost s' + 1), transduce')
parseNestedRegions :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x (Boundary b) -> Parser m x b
parseNestedRegions s = liftTransducer "parseNestedRegions" (maxUsableThreads s) $
\threads-> let s' = usingThreads threads s
transduce' source sink = liftM (\(w, (), (), _)-> w) $
splitToConsumers s' source
(flip (pourMap Content) sink)
(flip (pourMap Content) sink)
(flip (pourMap Markup) sink)
in (ComponentConfiguration [AnyComponent s'] threads (cost s' + 1), transduce')
while :: (ParallelizableMonad m, Typeable x, Typeable b) => Transducer m x x -> Splitter m x b -> Transducer m x x
while t s = liftTransducer "while" (maxUsableThreads t + maxUsableThreads s) $
\threads-> let (configuration, s', while'', parallel) = optimalTwoParallelConfigurations threads s while'
transduce' source sink = splitInputToConsumers parallel s' source
(\source-> transduce while' source sink)
(\source-> pour source sink >> return [])
while' = t >-> while t s
in (configuration, transduce')
nestedIn :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x b -> Splitter m x b
nestedIn s1 s2 = liftSplitter "nestedIn" (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
s source true false edge
= liftM fst $
(if parallel then pipeP else pipe)
(\false-> split s1' source true false edge)
(\source-> pipe
(\true-> pipe (split s2' source true false) consumeAndSuppress)
(\source-> get source
>>= maybe
(return ([], []))
(\x-> pipe
(\sink-> put sink x
>>= cond
(pour source sink
>> return [])
(return [x]))
(\source-> split
(nestedIn s1' s2')
source true false edge))))
in (configuration,s)
foreach :: (ParallelizableMonad m, Typeable x, Typeable b, BranchComponent cc m x [x]) => Splitter m x b -> cc -> cc -> cc
foreach s = combineBranches "foreach" (cost s)
(\ parallel c1 c2 source-> liftM fst $ (if parallel then pipeP else pipe)
(transduce (splitterToMarker s) source)
(\source-> groupMarks source (maybe c2 (const c1))))
having :: (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2)
=> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b1
having s1 s2 = liftSplitter "having" (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
s source true false edge = liftM fst $
(if parallel then pipeP else pipe)
(transduce (splitterToMarker s1') source)
(flip groupMarks test)
where test Nothing chunk = pour chunk false >> return []
test (Just mb) chunk = pipe
(\sink1-> pipe (tee chunk sink1) getList)
(\chunk-> splitToConsumers s2' chunk
(liftM isJust . get)
consumeAndSuppress
(liftM isJust . get))
>>= \(((), prefix), (_, anyTrue, (), anyEdge))->
if anyTrue Prelude.|| anyEdge
then maybe (return True) (put edge) mb
>> putList prefix true
>>= whenNull (pour chunk true >> return [])
else putList prefix false
>>= whenNull (pour chunk false >> return [])
in (configuration, s)
havingOnly :: (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2)
=> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b1
havingOnly s1 s2 = liftSplitter "havingOnly" (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
s source true false edge = liftM fst $
(if parallel then pipeP else pipe)
(transduce (splitterToMarker s1') source)
(flip groupMarks test)
where test Nothing chunk = pour chunk false >> return []
test (Just mb) chunk = pipe
(\sink1-> pipe (tee chunk sink1) getList)
(\chunk-> splitToConsumers s2' chunk
consumeAndSuppress
(liftM isJust . get)
consumeAndSuppress)
>>= \(((), prefix), (_, (), anyFalse, ()))->
if anyFalse
then putList prefix false
>>= whenNull (pour chunk false >> return [])
else maybe (return True) (put edge) mb
>> putList prefix true
>>= whenNull (pour chunk true >> return [])
in (configuration, s)
first :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x b
first splitter = liftSplitter "first" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
s source true false edge
= liftM (\(x, y)-> y ++ x) $
pipeD "first" (transduce (splitterToMarker splitter') source)
(\source-> let get1 (Left (x, False)) = pass false x get1
get1 (Left (x, True)) = pass true x get2
get1 (Right b) = put edge b
>> get source
>>= maybe (return []) get2
get2 b@Right{} = get3 b
get2 (Left (x, True)) = pass true x get2
get2 (Left (x, False)) = pass false x get3
get3 (Left (x, _)) = pass false x get3
get3 (Right _) = get source >>= maybe (return []) get3
pass sink x next = put sink x
>>= cond
(get source >>= maybe (return []) next)
(return [x])
in get source >>= maybe (return []) get1)
in (configuration, s)
uptoFirst :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x b
uptoFirst splitter = liftSplitter "uptoFirst" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
s source true false edge
= liftM (\(x, y)-> y ++ x) $
pipeD "uptoFirst" (transduce (splitterToMarker splitter') source)
(\source-> let get1 q (Left (x, False)) = let q' = q |> x
in get source
>>= maybe
(putQueue q' false)
(get1 q')
get1 q p@(Left (_, True)) = putQueue q true
>>= whenNull (get2 p)
get1 q (Right b) = putQueue q true
>>= whenNull (put edge b
>> get source
>>= maybe (return []) get2)
get2 b@Right{} = get3 b
get2 (Left (x, True)) = pass true x get2
get2 (Left (x, False)) = pass false x get3
get3 (Left (x, _)) = pass false x get3
get3 (Right _) = get source >>= maybe (return []) get3
pass sink x next = put sink x
>>= cond
(get source >>= maybe (return []) next)
(return [x])
in get source >>= maybe (return []) (get1 Seq.empty))
in (configuration, s)
last :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x b
last splitter = liftSplitter "last" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
s source true false edge
= liftM (\(x, y)-> y ++ x) $
pipeD "last" (transduce (splitterToMarker splitter') source)
(\source-> let get1 (Left (x, False)) = put false x
>>= cond (get source
>>= maybe (return []) get1)
(return [x])
get1 p@(Left (x, True)) = get2 Nothing Seq.empty p
get1 (Right b) = pass (get2 (Just b) Seq.empty)
get2 mb q (Left (x, True)) = let q' = q |> x
in get source
>>= maybe
(flush mb q')
(get2 mb q')
get2 mb q p = get3 mb q Seq.empty p
get3 mb qt qf (Left (x, False)) = let qf' = qf |> x
in get source
>>= maybe
(flush mb qt >> putQueue qf' false)
(get3 mb qt qf')
get3 mb qt qf p = do rest1 <- putQueue qt false
rest2 <- putQueue qf false
if null rest1 Prelude.&& null rest2
then get1 p
else return (rest1 ++ rest2)
flush mb q = maybe (return True) (put edge) mb
>> putQueue q true
pass succeed = get source >>= maybe (return []) succeed
in pass get1)
in (configuration, s)
lastAndAfter :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x b
lastAndAfter splitter = liftSplitter "lastAndAfter" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
s source true false edge
= liftM (\(x, y)-> y ++ x) $
pipe
(transduce (splitterToMarker splitter') source)
(\source-> let get1 (Left (x, False)) = put false x
>>= cond (pass get1) (return [x])
get1 p@(Left (x, True)) = get2 Nothing Seq.empty p
get1 (Right b) = pass (get2 (Just b) Seq.empty)
get2 mb q (Left (x, True)) = let q' = q |> x
in get source
>>= maybe
(flush mb q')
(get2 mb q')
get2 mb q p = get3 mb q p
get3 mb q (Left (x, False)) = let q' = q |> x
in get source
>>= maybe
(flush mb q')
(get3 mb q')
get3 _ q p@(Left (x, True)) = putQueue q false
>>= whenNull (get1 p)
get3 _ q b'@Right{} = putQueue q false
>>= whenNull (get1 b')
flush mb q = maybe (return True) (put edge) mb
>> putQueue q true
pass succeed = get source >>= maybe (return []) succeed
in pass get1)
in (configuration, s)
prefix :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x b
prefix splitter = liftSplitter "prefix" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
s source true false edge
= liftM (\(x, y)-> y ++ x) $
pipeD "prefix" (transduce (splitterToMarker splitter') source)
(\source-> let get0 p@Left{} = get1 p
get0 (Right b) = put edge b >> get source >>= maybe (return []) get1
get1 (Left (x, False)) = pass false x get2
get1 (Left (x, True)) = pass true x get1
get1 (Right b) = get source >>= maybe (return []) get2
get2 (Left (x, _)) = pass false x get2
get2 Right{} = get source >>= maybe (return []) get2
pass sink x next = put sink x
>>= cond
(get source >>= maybe (return []) next)
(return [x])
in get source >>= maybe (return []) get0)
in (configuration, s)
suffix :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x b
suffix splitter = liftSplitter "suffix" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
s source true false edge
= liftM (\(x, y)-> y ++ x) $
pipeD "suffix" (transduce (splitterToMarker splitter') source)
(\source-> let get1 (Left (x, False)) = put false x >>= cond (p get1) (return [x])
get1 (Left (x, True)) = get2 Nothing (Seq.singleton x)
get1 (Right b) = get2 (Just b) Seq.empty
get2 mb q = get source
>>= maybe
(maybe (return True) (put edge) mb >> putQueue q true)
(get3 mb q)
get3 mb q (Left (x, True)) = get2 mb (q |> x)
get3 mb q p@(Left (x, False)) = putQueue q false
>>= \rest-> if null rest
then get1 p
else return (rest ++ [x])
get3 mb q (Right b) = putQueue q false
>>= whenNull (get2 (Just b) Seq.empty)
p succeed = get source >>= maybe (return []) succeed
in p get1)
in (configuration, s)
even :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x b
even splitter = liftSplitter "even" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
s source true false edge
= liftM (\(x, y)-> y ++ x) $
pipeD "even"
(transduce (splitterToMarker splitter') source)
(\source-> let get1 (Left (x, False)) = put false x
>>= cond (next get1) (return [x])
get1 p@(Left (x, True)) = get2 p
get1 (Right b) = next get2
get2 (Left (x, True)) = put false x
>>= cond (next get2) (return [x])
get2 p@(Left (x, False)) = get3 p
get2 (Right b) = put edge b >> next get4
get3 (Left (x, False)) = put false x
>>= cond (next get3) (return [x])
get3 p@(Left (x, True)) = get4 p
get3 (Right b) = put edge b >> next get4
get4 (Left (x, True)) = put true x
>>= cond (next get4) (return [x])
get4 p@(Left (x, False)) = get1 p
get4 (Right b) = next get2
next g = get source >>= maybe (return []) g
in next get1)
in (configuration, s)
startOf :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x (Maybe b)
startOf splitter = liftSplitter "startOf" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
s source true false edge = liftM (\(x, y)-> y ++ x) $
pipeD "startOf"
(transduce (splitterToMarker splitter') source)
(\source-> let get1 (Left (x, False)) = put false x
>>= cond
(next get1)
(return [x])
get1 p@(Left (x, True)) = put edge Nothing >> get2 p
get1 (Right b) = put edge (Just b)
>> next get2
get2 (Left (x, True)) = put false x
>>= cond
(next get2)
(return [x])
get2 p = get1 p
next g = get source >>= maybe (return []) g
in next get1)
in (configuration, s)
endOf :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x (Maybe b)
endOf splitter = liftSplitter "endOf" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
s source true false edge = liftM (\(x, y)-> y ++ x) $
pipeD "endOf"
(transduce (splitterToMarker splitter') source)
(\source-> let get1 (Left (x, False)) = put false x
>>= cond
(next get1)
(return [x])
get1 p@(Left (x, True)) = get2 Nothing p
get1 (Right b) = next (get2 $ Just b)
get2 mb (Left (x, True))
= put false x
>>= cond (next $ get2 mb) (return [x])
get2 mb p@(Left (x, False)) = put edge mb >> get1 p
get2 mb (Right b) = put edge mb >> next (get2 $ Just b)
next g = get source >>= maybe (return []) g
in next get1)
in (configuration, s)
followedBy :: forall m x b1 b2. (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2)
=> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2)
followedBy s1 s2 = liftSplitter "followedBy" (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
in (configuration, followedBy' parallel s1' s2')
where followedBy' parallel s1 s2 source true false edge
= liftM (\(x, y)-> y ++ x) $
(if parallel then pipeP else pipe)
(transduce (splitterToMarker s1) source)
(\source-> let get0 q = case Seq.viewl q
of Seq.EmptyL -> get source >>= maybe (return []) get1
(Left (x, False)) :< rest -> put false x
>>= cond
(get0 rest)
(return
$ concatMap (either ((:[]) . fst) (const []))
$ Foldable.toList $ Seq.viewl q)
(Left (x, True)) :< rest -> get2 Nothing Seq.empty q
(Right b) :< rest -> get2 (Just b) Seq.empty rest
get1 (Left (x, False)) = put false x
>>= cond (get source >>= maybe (return []) get1)
(return [x])
get1 p@(Left (x, True)) = get2 Nothing Seq.empty (Seq.singleton p)
get1 (Right b) = get2 (Just b) Seq.empty Seq.empty
get2 mb q q' = case Seq.viewl q'
of Seq.EmptyL -> get source
>>= maybe (testEnd mb q) (get2 mb q . Seq.singleton)
(Left (x, True)) :< rest -> get2 mb (q |> x) rest
(Left (x, False)) :< rest -> get3 mb q q'
Right{} :< rest -> get3 mb q q'
get3 mb q q' = do ((q1, q2), n) <- pipe (get7 Seq.empty q') (test mb q)
case n of Nothing -> putQueue q false
>>= whenNull (get0 (q1 >< q2))
Just 0 -> get0 (q1 >< q2)
Just n -> get8 (Just mb) n (q1 >< q2)
get7 q1 q2 sink = canPut sink
>>= cond (case Seq.viewl q2
of Seq.EmptyL -> get source
>>= maybe (return (q1, q2))
(\p-> either
(put sink . fst)
(const $ return True)
p
>> get7 (q1 |> p) q2 sink)
p :< rest -> either (put sink . fst) (const $ return True) p
>> get7 (q1 |> p) rest sink)
(return (q1, q2))
testEnd mb q = do ((), n) <- pipeD "testEnd" (const $ return ()) (test mb q)
case n of Nothing -> putQueue q false
_ -> return []
test mb q source = liftM snd $
pipeD "follower"
(transduce (splitterToMarker s2) source)
(\source-> let get4 (Left (_, False)) = return Nothing
get4 p@(Left (_, True)) = putQueue q true
>> get5 0 p
get4 p@(Right b) = maybe
(return True) (\b1-> put edge (b1, b)) mb
>> putQueue q true
>> get6 0
get5 n (Left (x, True)) = put true x >> get6 (succ n)
get5 n _ = return (Just n)
get6 n = get source
>>= maybe
(return $ Just n)
(get5 n)
in get source >>= maybe (return Nothing) get4)
get8 Nothing 0 q = get0 q
get8 (Just mb) 0 q = get2 mb Seq.empty q
get8 mmb n q = case Seq.viewl q of Left (x, False) :< rest -> get8 Nothing (pred n) rest
Left (x, True) :< rest
-> get8 (maybe (Just Nothing) Just mmb) (pred n) rest
Right b :< rest -> get8 (Just (Just b)) n rest
in get0 Seq.empty)
(...) :: forall m x b1 b2. (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2)
=> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b1
s1 ... s2 = liftSplitter "..." (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
s source true false edge
= liftM (\(x, y)-> y ++ x) $
(if parallel then pipeP else pipe)
(transduce (splittersToPairMarker s1' s2') source)
(\source-> let next n = get source >>= maybe (return []) (state n)
pass n x = (if n > 0 then put true x else put false x)
>>= cond (next n) (return [x])
pass' n x = (if n >= 0 then put true x else put false x)
>>= cond (next n) (return [x])
state n (Left (x, True, False)) = pass (succ n) x
state n (Left (x, False, True)) = pass' (pred n) x
state n (Left (x, True, True)) = pass' n x
state n (Left (x, False, False)) = pass n x
state 0 (Right (Left b)) = put edge b >> next 1
state n (Right (Left _)) = next (succ n)
state n (Right (Right _)) = next (pred n)
in next 0)
in (configuration, s)
splitterToMarker :: forall m x b. (ParallelizableMonad m, Typeable x, Typeable b)
=> Splitter m x b -> Transducer m x (Either (x, Bool) b)
splitterToMarker s = liftTransducer "splitterToMarker" (maxUsableThreads s) $
\threads-> let s' = usingThreads threads s
t source sink = liftM (\(x, y, z, _)-> z ++ y ++ x) $
splitToConsumers s' source
(mark (\x-> Left (x, True)))
(mark (\x-> Left (x, False)))
(mark Right)
where mark f source = canPut sink
>>= cond
(get source
>>= maybe (return [])
(\x-> put sink (f x)
>>= cond (mark f source) (return [x])))
(return [])
in (ComponentConfiguration [AnyComponent s'] threads (cost s' + 1), t)
splittersToPairMarker :: forall m x b1 b2. (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2)
=> Splitter m x b1 -> Splitter m x b2
-> Transducer m x (Either (x, Bool, Bool) (Either b1 b2))
splittersToPairMarker s1 s2
= liftTransducer "splittersToPairMarker" (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallelize) = optimalTwoParallelConfigurations threads s1 s2
t source sink = liftM (\(((_, _), (x, _, _, _)), _)-> x) $
pipeD "splittersToPairMarker synchronize"
(\sync-> (if parallelize then pipeP else pipe)
(\sink1-> pipe
(tee source sink1)
(\source2-> splitToConsumers s2' source2
(flip (pourMap (\x-> Left ((x, True), False))) sync)
(flip (pourMap (\x-> Left ((x, False), False))) sync)
(flip (pourMap (Right . Right)) sync)))
(\source1-> splitToConsumers s1' source1
(flip (pourMap (\x-> Left ((x, True), True))) sync)
(flip (pourMap (\x-> Left ((x, False), True))) sync)
(flip (pourMap (Right. Left)) sync)))
(synchronizeMarks Nothing sink)
synchronizeMarks :: Maybe (Seq (Either (x, Bool) (Either b1 b2)), Bool)
-> Sink c (Either (x, Bool, Bool) (Either b1 b2))
-> Source c (Either ((x, Bool), Bool) (Either b1 b2))
-> Pipe c m [x]
synchronizeMarks state sink source = get source
>>= maybe
(assert (isNothing state) (return []))
(handleMark state sink source)
handleMark :: Maybe (Seq (Either (x, Bool) (Either b1 b2)), Bool)
-> Sink c (Either (x, Bool, Bool) (Either b1 b2))
-> Source c (Either ((x, Bool), Bool) (Either b1 b2))
-> Either ((x, Bool), Bool) (Either b1 b2) -> Pipe c m [x]
handleMark Nothing sink source (Right b) = put sink (Right b)
>> synchronizeMarks Nothing sink source
handleMark Nothing sink source (Left (p, first))
= synchronizeMarks (Just (Seq.singleton (Left p), first)) sink source
handleMark state@(Just (q, first)) sink source (Left (p, first')) | first == first'
= synchronizeMarks (Just (q |> Left p, first)) sink source
handleMark state@(Just (q, True)) sink source (Right b@Left{})
= synchronizeMarks (Just (q |> Right b, True)) sink source
handleMark state@(Just (q, False)) sink source (Right b@Right{})
= synchronizeMarks (Just (q |> Right b, False)) sink source
handleMark state sink source (Right b) = put sink (Right b) >> synchronizeMarks state sink source
handleMark state@(Just (q, pos')) sink source mark@(Left ((x, t), pos))
= case Seq.viewl q
of Seq.EmptyL -> synchronizeMarks (Just (Seq.singleton (Left (x, t)), pos)) sink source
Right b :< rest -> put sink (Right b)
>>= cond
(handleMark
(if Seq.null rest then Nothing else Just (rest, pos'))
sink
source
mark)
(returnQueuedList q)
Left (y, t') :< rest -> put sink (Left $ if pos then (y, t, t') else (y, t', t))
>>= cond
(synchronizeMarks
(if Seq.null rest then Nothing else Just (rest, pos'))
sink
source)
(returnQueuedList q)
returnQueuedList q = return $ concatMap (either ((:[]) . fst) (const [])) $ Foldable.toList $ Seq.viewl q
in (configuration, t)
zipSplittersWith :: (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2, Typeable b)
=> (Bool -> Bool -> Bool)
-> (forall c. Source c (Either b1 b2) -> Sink c b -> Pipe c m ())
-> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b
zipSplittersWith f boundaries s1 s2
= liftSplitter "zip" (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
s source true false edge = liftM (\((x, y), _)-> y ++ x) $
pipe
(\edge'->
(if parallel then pipeP else pipe)
(transduce (splittersToPairMarker s1' s2') source)
(\source-> let split = get source
>>= maybe
(return [])
(either
test
(\b-> put edge' b >> split))
test (x, t1, t2) = put (if f t1 t2 then true else false) x
>>= cond split (return [x])
in split))
(flip boundaries edge)
in (configuration, s)
groupMarks :: forall c m x b r. (ParallelizableMonad m, Typeable x, Typeable b)
=> Source c (Either (x, Bool) b) -> (Maybe (Maybe b) -> Source c x -> Pipe c m r) -> Pipe c m ()
groupMarks source getConsumer = start
where start = getSuccess source (either startContent startRegion)
startContent (x, False) = pipe (\sink-> pass False sink x) (getConsumer Nothing)
>>= maybe (return ()) (either startContent startRegion) . fst
startContent (x, True) = pipe (\sink-> pass True sink x) (getConsumer $ Just Nothing)
>>= maybe (return ()) (either startContent startRegion) . fst
startRegion b = pipe (next True) (getConsumer (Just $ Just b))
>>= maybe (return ()) (either startContent startRegion) . fst
pass t sink x = put sink x >> next t sink
next t sink = get source >>= maybe (return Nothing) (continue t sink)
continue t sink (Left (x, t')) | t == t' = pass t sink x
continue t sink p = return (Just p)