module Control.Concurrent.SCC.Combinators
(
consumeBy, prepend, append, substitute,
PipeableComponentPair ((>->)), JoinableComponentPair (join, sequence),
snot, (>&), (>|),
(&&), (||),
ifs, wherever, unless, select,
while, nestedIn,
foreach, having, havingOnly, followedBy, even,
first, uptoFirst, prefix,
last, lastAndAfter, suffix,
startOf, endOf,
(...))
where
import Control.Concurrent.SCC.Foundation
import Control.Concurrent.SCC.ComponentTypes
import Prelude hiding (even, last, sequence, (||), (&&))
import qualified Prelude
import Control.Exception (assert)
import Control.Monad (liftM, when)
import qualified Control.Monad as Monad
import Data.Maybe (isJust, isNothing, fromJust)
import Data.Typeable (Typeable)
import qualified Data.Foldable as Foldable
import qualified Data.Sequence as Seq
import Data.Sequence (Seq, (|>), (><), ViewL (EmptyL, (:<)))
import Debug.Trace (trace)
consumeBy :: forall m x y r. (Monad m, Typeable x) => Consumer m x r -> Transducer m x y
consumeBy c = liftTransducer "consumeBy" (maxUsableThreads c) $
\threads-> let c' = usingThreads threads c
in (ComponentConfiguration [AnyComponent c'] (usedThreads c') (cost c'),
\ source _sink -> consume c' source >> return [])
class PipeableComponentPair (m :: * -> *) w c1 c2 c3 | c1 c2 -> c3, c1 c3 -> c2, c2 c3 -> c2,
c1 -> m w, c2 -> m w, c3 -> m
where (>->) :: c1 -> c2 -> c3
instance (ParallelizableMonad m, Typeable x)
=> PipeableComponentPair m x (Producer m x ()) (Consumer m x ()) (Performer m ())
where p >-> c = liftPerformer ">->" (maxUsableThreads p `max` maxUsableThreads c) $
\threads-> let (configuration, p', c', parallel) = optimalTwoParallelConfigurations threads p c
performPipe = (if parallel then pipeP else pipe) (produce p') (consume c') >> return ()
in (configuration, performPipe)
instance (ParallelizableMonad m, Typeable x, Typeable y)
=> PipeableComponentPair m y (Transducer m x y) (Consumer m y r) (Consumer m x r)
where t >-> c = liftConsumer ">->" (maxUsableThreads t `max` maxUsableThreads c) $
\threads-> let (configuration, t', c', parallel) = optimalTwoParallelConfigurations threads t c
consumePipe source = liftM snd $ (if parallel then pipeP else pipe)
(transduce t' source)
(consume c')
in (configuration, consumePipe)
instance (ParallelizableMonad m, Typeable x, Typeable y)
=> PipeableComponentPair m x (Producer m x r) (Transducer m x y) (Producer m y r)
where p >-> t = liftProducer ">->" (maxUsableThreads t `max` maxUsableThreads p) $
\threads-> let (configuration, p', t', parallel) = optimalTwoParallelConfigurations threads p t
producePipe sink = liftM fst $ (if parallel then pipeP else pipe)
(produce p')
(\source-> transduce t' source sink)
in (configuration, producePipe)
instance ParallelizableMonad m => PipeableComponentPair m y (Transducer m x y) (Transducer m y z) (Transducer m x z)
where t1 >-> t2 = liftTransducer ">->" (maxUsableThreads t1 + maxUsableThreads t2) $
\threads-> let (configuration, t1', t2', parallel) = optimalTwoParallelConfigurations threads t1 t2
transducePipe source sink = liftM fst $ (if parallel then pipeP else pipe)
(transduce t1' source)
(\source-> transduce t2' source sink)
in (configuration, transducePipe)
class Component c => CompatibleSignature c cons (m :: * -> *) input output | c -> cons m
class AnyListOrUnit c
instance AnyListOrUnit [x]
instance AnyListOrUnit ()
instance (AnyListOrUnit x, AnyListOrUnit y) => CompatibleSignature (Performer m r) (PerformerType r) m x y
instance AnyListOrUnit y => CompatibleSignature (Consumer m x r) (ConsumerType r) m [x] y
instance AnyListOrUnit y => CompatibleSignature (Producer m x r) (ProducerType r) m y [x]
instance CompatibleSignature (Transducer m x y) TransducerType m [x] [y]
data PerformerType r
data ConsumerType r
data ProducerType r
data TransducerType
class (Monad m, CompatibleSignature c1 t1 m x y, CompatibleSignature c2 t2 m x y, CompatibleSignature c3 t3 m x y)
=> JoinableComponentPair t1 t2 t3 m x y c1 c2 c3 | c1 c2 -> c3, c1 -> t1 m, c2 -> t2 m, c3 -> t3 m x y,
t1 m x y -> c1, t2 m x y -> c2, t3 m x y -> c3
where join :: c1 -> c2 -> c3
sequence :: c1 -> c2 -> c3
join = sequence
instance forall m x any r1 r2. (Monad m, Typeable x)
=> JoinableComponentPair (ProducerType r1) (ProducerType r2) (ProducerType r2) m () [x] (Producer m x r1) (Producer m x r2) (Producer m x r2)
where sequence p1 p2 = liftProducer "sequence" (maxUsableThreads p1 `max` maxUsableThreads p2) $
\threads-> let (configuration, p1', p2') = optimalTwoSequentialConfigurations threads p1 p2
produceJoin sink = produce p1' sink >> produce p2' sink
in (configuration, produceJoin)
instance forall m x any. (ParallelizableMonad m, Typeable x)
=> JoinableComponentPair (ConsumerType ()) (ConsumerType ()) (ConsumerType ()) m [x] () (Consumer m x ()) (Consumer m x ()) (Consumer m x ())
where join c1 c2 = liftConsumer "join" (maxUsableThreads c1 + maxUsableThreads c2) $
\threads-> let (configuration, c1', c2', parallel) = optimalTwoParallelConfigurations threads c1 c2
consumeJoin source = do (if parallel then pipeP else pipe)
(\sink1-> pipe (tee source sink1) (consume c2'))
(consume c1')
return ()
in (configuration, consumeJoin)
sequence c1 c2 = liftConsumer "sequence" (maxUsableThreads c1 `max` maxUsableThreads c2) $
\threads-> let (configuration, c1', c2') = optimalTwoSequentialConfigurations threads c1 c2
consumeJoin source = pipe
(\buffer-> pipe (tee source buffer) (consume c1'))
getList
>>= \(_, list)-> pipe (putList list) (consume c2')
>> return ()
in (configuration, consumeJoin)
instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
=> JoinableComponentPair TransducerType TransducerType TransducerType m [x] [y] (Transducer m x y) (Transducer m x y) (Transducer m x y)
where join t1 t2 = liftTransducer "join" (maxUsableThreads t1 + maxUsableThreads t2) $
\threads-> let (configuration, t1', t2', parallel) = optimalTwoParallelConfigurations threads t1 t2
transduce' source sink = pipe
(\buffer-> (if parallel then pipeP else pipe)
(\sink1-> pipe
(\sink2-> tee source sink1 sink2)
(\src-> transduce t2' src buffer))
(\source-> transduce t1' source sink))
getList
>>= \(_, list)-> putList list sink
>> getList source
in (configuration, transduce')
sequence t1 t2 = liftTransducer "sequence" (maxUsableThreads t1 `max` maxUsableThreads t2) $
\threads-> let (configuration, t1', t2') = optimalTwoSequentialConfigurations threads t1 t2
transduce' source sink = pipe
(\buffer-> pipe
(tee source buffer)
(\source-> transduce t1 source sink))
getList
>>= \((extra, _), list)-> pipe
(putList list)
(\source-> transduce t2 source sink)
>> return extra
in (configuration, transduce')
instance forall m r1 r2. ParallelizableMonad m
=> JoinableComponentPair (PerformerType r1) (PerformerType r2) (PerformerType r2) m () () (Performer m r1) (Performer m r2) (Performer m r2)
where join p1 p2 = liftPerformer "join" (maxUsableThreads p1 + maxUsableThreads p2) $
\threads-> let (configuration, p1', p2', parallel) = optimalTwoParallelConfigurations threads p1 p2
in (configuration, if parallel then liftM snd $ perform p1' `parallelize` perform p2'
else perform p1' >> perform p2')
sequence p1 p2 = liftPerformer "sequence" (maxUsableThreads p1 `max` maxUsableThreads p2) $
\threads-> let (configuration, p1', p2') = optimalTwoSequentialConfigurations threads p1 p2
in (configuration, perform p1' >> perform p2')
instance forall m x r1 r2. (ParallelizableMonad m, Typeable x)
=> JoinableComponentPair (PerformerType r1) (ProducerType r2) (ProducerType r2) m () [x] (Performer m r1) (Producer m x r2) (Producer m x r2)
where join pe pr = liftProducer "join" (maxUsableThreads pe + maxUsableThreads pr) $
\threads-> let (configuration, pe', pr', parallel) = optimalTwoParallelConfigurations threads pe pr
produceJoin sink = if parallel then liftM snd (perform pe' `parallelize` produce pr' sink)
else perform pe' >> produce pr' sink
in (configuration, produceJoin)
sequence pe pr = liftProducer "sequence" (maxUsableThreads pe `max` maxUsableThreads pr) $
\threads-> let (configuration, pe', pr') = optimalTwoSequentialConfigurations threads pe pr
produceJoin sink = perform pe' >> produce pr' sink
in (configuration, produceJoin)
instance forall m x r1 r2. (ParallelizableMonad m, Typeable x)
=> JoinableComponentPair (ProducerType r1) (PerformerType r2) (ProducerType r2) m () [x] (Producer m x r1) (Performer m r2) (Producer m x r2)
where join pr pe = liftProducer "join" (maxUsableThreads pr + maxUsableThreads pe) $
\threads-> let (configuration, pr', pe', parallel) = optimalTwoParallelConfigurations threads pr pe
produceJoin sink = if parallel then liftM snd (produce pr' sink `parallelize` perform pe')
else produce pr' sink >> perform pe'
in (configuration, produceJoin)
sequence pr pe = liftProducer "sequence" (maxUsableThreads pr `max` maxUsableThreads pe) $
\threads-> let (configuration, pr', pe') = optimalTwoSequentialConfigurations threads pr pe
produceJoin sink = produce pr' sink >> perform pe'
in (configuration, produceJoin)
instance forall m x r1 r2. (ParallelizableMonad m, Typeable x)
=> JoinableComponentPair (PerformerType r1) (ConsumerType r2) (ConsumerType r2) m [x] () (Performer m r1) (Consumer m x r2) (Consumer m x r2)
where join p c = liftConsumer "join" (maxUsableThreads p + maxUsableThreads c) $
\threads-> let (configuration, p', c', parallel) = optimalTwoParallelConfigurations threads p c
consumeJoin source = if parallel then liftM snd (perform p' `parallelize` consume c' source)
else perform p' >> consume c' source
in (configuration, consumeJoin)
sequence p c = liftConsumer "sequence" (maxUsableThreads p `max` maxUsableThreads c) $
\threads-> let (configuration, p', c') = optimalTwoSequentialConfigurations threads p c
consumeJoin source = perform p' >> consume c' source
in (configuration, consumeJoin)
instance forall m x r1 r2. (ParallelizableMonad m, Typeable x)
=> JoinableComponentPair (ConsumerType r1) (PerformerType r2) (ConsumerType r2) m [x] () (Consumer m x r1) (Performer m r2) (Consumer m x r2)
where join c p = liftConsumer "join" (maxUsableThreads c + maxUsableThreads p) $
\threads-> let (configuration, c', p', parallel) = optimalTwoParallelConfigurations threads c p
consumeJoin source = if parallel then liftM snd (consume c' source `parallelize` perform p')
else consume c' source >> perform p'
in (configuration, consumeJoin)
sequence c p = liftConsumer "sequence" (maxUsableThreads c `max` maxUsableThreads p) $
\threads-> let (configuration, c', p') = optimalTwoSequentialConfigurations threads c p
consumeJoin source = consume c' source >> perform p'
in (configuration, consumeJoin)
instance forall m x y r. (ParallelizableMonad m, Typeable x, Typeable y)
=> JoinableComponentPair (PerformerType r) TransducerType TransducerType m [x] [y] (Performer m r) (Transducer m x y) (Transducer m x y)
where join p t = liftTransducer "join" (maxUsableThreads p + maxUsableThreads t) $
\threads-> let (configuration, p', t', parallel) = optimalTwoParallelConfigurations threads p t
join' source sink = if parallel then liftM snd (perform p'
`parallelize` transduce t' source sink)
else perform p' >> transduce t' source sink
in (configuration, join')
sequence p t = liftTransducer "sequence" (maxUsableThreads p `max` maxUsableThreads t) $
\threads-> let (configuration, p', t') = optimalTwoSequentialConfigurations threads p t
join' source sink = perform p' >> transduce t' source sink
in (configuration, join')
instance forall m x y r. (ParallelizableMonad m, Typeable x, Typeable y)
=> JoinableComponentPair TransducerType (PerformerType r) TransducerType m [x] [y] (Transducer m x y) (Performer m r) (Transducer m x y)
where join t p = liftTransducer "join" (maxUsableThreads t + maxUsableThreads p) $
\threads-> let (configuration, t', p', parallel) = optimalTwoParallelConfigurations threads t p
join' source sink = if parallel then liftM fst (transduce t' source sink
`parallelize` perform p')
else do result <- transduce t' source sink
perform p'
return result
in (configuration, join')
sequence t p = liftTransducer "sequence" (maxUsableThreads t `max` maxUsableThreads p) $
\threads-> let (configuration, t', p') = optimalTwoSequentialConfigurations threads t p
join' source sink = do result <- transduce t' source sink
perform p'
return result
in (configuration, join')
instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
=> JoinableComponentPair (ProducerType ()) TransducerType TransducerType m [x] [y] (Producer m y ()) (Transducer m x y) (Transducer m x y)
where join p t = liftTransducer "join" (maxUsableThreads p + maxUsableThreads t) $
\threads-> let (configuration, p', t', parallel) = optimalTwoParallelConfigurations threads p t
join' source sink = if parallel
then do ((_, rest), out) <- pipe
(\buffer-> produce p' sink `parallelize`
transduce t' source buffer)
getList
putList out sink
return rest
else produce p' sink >> transduce t' source sink
in (configuration, join')
sequence p t = liftTransducer "sequence" (maxUsableThreads p `max` maxUsableThreads t) $
\threads-> let (configuration, p', t') = optimalTwoSequentialConfigurations threads p t
join' source sink = produce p' sink >> transduce t' source sink
in (configuration, join')
instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
=> JoinableComponentPair TransducerType (ProducerType ()) TransducerType m [x] [y] (Transducer m x y) (Producer m y ()) (Transducer m x y)
where join t p = liftTransducer "join" (maxUsableThreads t `max` maxUsableThreads p) $
\threads-> let (configuration, t', p', parallel) = optimalTwoParallelConfigurations threads t p
join' source sink = if parallel
then do ((rest, ()), out) <- pipe
(\buffer-> transduce t' source sink
`parallelize` produce p' buffer)
getList
putList out sink
return rest
else do result <- transduce t' source sink
produce p' sink
return result
in (configuration, join')
sequence t p = liftTransducer "sequence" (maxUsableThreads t `max` maxUsableThreads p) $
\threads-> let (configuration, t', p') = optimalTwoSequentialConfigurations threads t p
join' source sink = do result <- transduce t' source sink
produce p' sink
return result
in (configuration, join')
instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
=> JoinableComponentPair (ConsumerType ()) TransducerType TransducerType m [x] [y] (Consumer m x ()) (Transducer m x y) (Transducer m x y)
where join c t = liftTransducer "join" (maxUsableThreads c + maxUsableThreads t) $
\threads-> let (configuration, c', t', parallel) = optimalTwoParallelConfigurations threads c t
join' source sink = liftM (snd . fst) $
(if parallel then pipeP else pipe)
(\sink1-> pipe
(tee source sink1)
(\source-> transduce t' source sink))
(consume c')
in (configuration, join')
sequence c t = liftTransducer "sequence" (maxUsableThreads c `max` maxUsableThreads t) $
\threads-> let (configuration, c', t') = optimalTwoSequentialConfigurations threads c t
sequence' source sink = pipe
(\buffer-> pipe
(tee source buffer)
(consume c'))
getList
>>= \((rest, _), list)-> pipe
(putList list)
(\source-> transduce t' source sink)
>> return rest
in (configuration, sequence')
instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
=> JoinableComponentPair TransducerType (ConsumerType ()) TransducerType m [x] [y] (Transducer m x y) (Consumer m x ()) (Transducer m x y)
where join t c = join c t
sequence t c = liftTransducer "sequence" (maxUsableThreads t `max` maxUsableThreads c) $
\threads-> let (configuration, t', c') = optimalTwoSequentialConfigurations threads t c
sequence' source sink = pipe
(\buffer-> pipe
(tee source buffer)
(\source-> transduce t' source sink))
getList
>>= \((rest, _), list)-> pipe
(putList list)
(consume c')
>> return rest
in (configuration, sequence')
instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
=> JoinableComponentPair (ProducerType ()) (ConsumerType ()) TransducerType m [x] [y] (Producer m y ()) (Consumer m x ()) (Transducer m x y)
where join p c = liftTransducer "sequence" (maxUsableThreads p + maxUsableThreads c) $
\threads-> let (configuration, p', c', parallel) = optimalTwoParallelConfigurations threads p c
join' source sink = if parallel then produce p' sink >> consume c' source >> return []
else parallelize (produce p' sink) (consume c' source) >> return []
in (configuration, join')
sequence p c = liftTransducer "sequence" (maxUsableThreads p `max` maxUsableThreads c) $
\threads-> let (configuration, p', c') = optimalTwoSequentialConfigurations threads p c
join' source sink = produce p' sink >> consume c' source >> return []
in (configuration, join')
instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
=> JoinableComponentPair (ConsumerType ()) (ProducerType ()) TransducerType m [x] [y] (Consumer m x ()) (Producer m y ()) (Transducer m x y)
where join c p = join p c
sequence c p = liftTransducer "sequence" (maxUsableThreads c `max` maxUsableThreads p) $
\threads-> let (configuration, c', p') = optimalTwoSequentialConfigurations threads c p
join' source sink = consume c' source >> produce p' sink >> return []
in (configuration, join')
prepend :: forall m x r. (Monad m, Typeable x) => Producer m x r -> Transducer m x x
prepend prefix = liftTransducer "prepend" (maxUsableThreads prefix) $
\threads-> let prefix' = usingThreads threads prefix
prepend' source sink = produce prefix' sink >> pour source sink >> return []
in (ComponentConfiguration [AnyComponent prefix] threads (cost prefix'), prepend')
append :: forall m x r. (Monad m, Typeable x) => Producer m x r -> Transducer m x x
append suffix = liftTransducer "append" (maxUsableThreads suffix) $
\threads-> let suffix' = usingThreads threads suffix
append' source sink = pour source sink >> produce suffix' sink >> return []
in (ComponentConfiguration [AnyComponent suffix] threads (cost suffix'), append')
substitute :: forall m x y r. (Monad m, Typeable x, Typeable y) => Producer m y r -> Transducer m x y
substitute feed = liftTransducer "substitute" (maxUsableThreads feed) $
\threads-> let feed' = usingThreads threads feed
substitute' source sink = consumeAndSuppress source >> produce feed' sink >> return []
in (ComponentConfiguration [AnyComponent feed] threads (cost feed'), substitute')
snot :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
snot splitter = liftSectionSplitter "not" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
not source true false = splitSections splitter source false true
in (ComponentConfiguration [AnyComponent splitter'] threads (cost splitter'), not)
(>&) :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
s1 >& s2 = liftSimpleSplitter ">&" (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
s source true false = liftM fst $
(if parallel then pipeP else pipe)
(\true-> split s1 source true false)
(\source-> split s2 source true false)
in (configuration, s)
(>|) :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
s1 >| s2 = liftSimpleSplitter ">|" (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
s source true false = liftM fst $
(if parallel then pipeP else pipe)
(split s1 source true)
(\source-> split s2 source true false)
in (configuration, s)
(&&) :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
(&&) = zipSplittersWith (Prelude.&&)
(||) :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
(||) = zipSplittersWith (Prelude.||)
ifs :: (ParallelizableMonad m, Typeable x, BranchComponent cc m x [x]) => Splitter m x -> cc -> cc -> cc
ifs s = combineBranches "if" (cost s) (\ parallel c1 c2 -> \source-> liftM fst3 $ splitConsumer "ifs" parallel s c1 c2 source)
wherever :: (ParallelizableMonad m, Typeable x) => Transducer m x x -> Splitter m x -> Transducer m x x
wherever t s = liftTransducer "wherever" (maxUsableThreads s + maxUsableThreads t) $
\threads-> let (configuration, s', t', parallel) = optimalTwoParallelConfigurations threads s t
wherever' source sink = liftM fst3 $ splitConsumer "wherever" parallel s
(\source-> transduce t source sink)
(\source-> pour source sink)
source
in (configuration, wherever')
unless :: (ParallelizableMonad m, Typeable x) => Transducer m x x -> Splitter m x -> Transducer m x x
unless t s = liftTransducer "unless" (maxUsableThreads s + maxUsableThreads t) $
\threads-> let (configuration, s', t', parallel) = optimalTwoParallelConfigurations threads s t
unless' source sink = liftM fst3 $ splitConsumer "unless" parallel s
(\source-> pour source sink)
(\source-> transduce t source sink)
source
in (configuration, unless')
select :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Transducer m x x
select s = liftTransducer "select" (maxUsableThreads s) $
\threads-> let s' = usingThreads threads s
transduce' source sink = liftM fst3 $ splitConsumer "select" False s'
(\source-> pour source sink)
consumeAndSuppress
source
in (ComponentConfiguration [AnyComponent s'] threads (cost s' + 1), transduce')
while :: (ParallelizableMonad m, Typeable x) => Transducer m x x -> Splitter m x -> Transducer m x x
while t s = liftTransducer "while" (maxUsableThreads t + maxUsableThreads s) $
\threads-> let (configuration, s', while'', parallel) = optimalTwoParallelConfigurations threads s while'
transduce' source sink = liftM fst3 $ splitConsumer "while" parallel s'
(\source-> transduce while' source sink)
(\source-> pour source sink)
source
while' = t >-> while t s
in (configuration, transduce')
nestedIn :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
nestedIn s1 s2 = liftSimpleSplitter "nestedIn" (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
s source true false = liftM fst $
(if parallel then pipeP else pipe)
(\false-> split s1' source true false)
(\source-> pipe (\true-> split s2' source true false)
(\source-> split (nestedIn s1' s2') source true false))
in (configuration,s)
foreach :: (ParallelizableMonad m, Typeable x, BranchComponent cc m x [x]) => Splitter m x -> cc -> cc -> cc
foreach s = combineBranches "foreach" (cost s)
(\ parallel c1 c2 source-> liftM fst $ (if parallel then pipeP else pipe)
(transduce (splitterToMarker s) source)
(\source-> groupMarks source (\b-> if b then c1 else c2)))
having :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
having s1 s2 = liftSectionSplitter "having" (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
s source true false = liftM fst $
(if parallel then pipeP else pipe)
(transduce (splitterToMarker s1') source)
(\source-> groupMarks source (\b chunk-> if b then test chunk
else pourMaybe chunk false))
where test chunk = pipe (\sink1-> pipe (tee chunk sink1) getList)
(\chunk-> pipe (\sink-> suppressProducer (split s2' chunk sink)) getList)
>>= \(([], chunk), (_, truePart))-> let chunk' = if null chunk
then [Nothing]
else map Just chunk
in (if null truePart
then putList chunk' false
else putList chunk' true)
>> return ()
in (configuration, s)
havingOnly :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
havingOnly s1 s2 = liftSectionSplitter "havingOnly" (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
s source true false = liftM fst $
(if parallel then pipeP else pipe)
(transduce (splitterToMarker s1') source)
(\source-> groupMarks source (\b chunk-> if b then test chunk
else pourMaybe chunk false))
where test chunk = pipe (\sink1-> pipe (tee chunk sink1) getList)
(\chunk-> pipe (\sink-> suppressProducer
(\suppress-> split s2' chunk suppress sink))
getList)
>>= \(([], chunk), (_, falsePart))-> let chunk' = if null chunk
then [Nothing]
else map Just chunk
in (if null falsePart
then putList chunk' true
else putList chunk' false)
>> return ()
in (configuration, s)
first :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
first splitter = liftSectionSplitter "first" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
s source true false = liftM (\(x, y)-> y ++ x) $
pipeD "first" (transduce (splitterToMarker splitter') source)
(\source-> let get1 (x, False) = p false x get1
get1 (x, True) = p true x get2
get2 (x, True) = p true x get2
get2 (x, False) = p false x get3
get3 (x, _) = p false x get3
p sink x succeed = put sink x
>>= cond (get source
>>= maybe (return []) succeed)
(return $ maybe [] (:[]) x)
in get source >>= maybe (return []) get1)
in (configuration, s)
uptoFirst :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
uptoFirst splitter = liftSectionSplitter "uptoFirst" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
pipeD "uptoFirst" (transduce (splitterToMarker splitter') source)
(\source-> let get1 q (x, False) = let q' = q |> x
in get source
>>= maybe
(putQueue q' false)
(get1 q')
get1 q p@(x, True) = putQueue q true
>>= whenNull (get2 p)
get2 (x, True) = p true x get2
get2 (x, False) = p false x get3
get3 (x, _) = p false x get3
p sink x succeed = put sink x
>>= cond (get source
>>= maybe (return []) succeed)
(return [x])
in get source >>= maybe (return []) (get1 Seq.empty))
in (configuration, s)
last :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
last splitter = liftSectionSplitter "last" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
pipeD "last" (transduce (splitterToMarker splitter') source)
(\source-> let get1 (x, False) = put false x
>>= cond (get source
>>= maybe (return []) get1)
(return [x])
get1 p@(x, True) = get2 Seq.empty p
get2 q (x, True) = let q' = q |> x
in get source
>>= maybe
(putQueue q' true)
(get2 q')
get2 q p@(x, False) = get3 q Seq.empty p
get3 qt qf (x, False) = let qf' = qf |> x
in get source
>>= maybe
(putQueue qt true
>> putQueue qf' false)
(get3 qt qf')
get3 qt qf p@(x, True) = do rest1 <- putQueue qt false
rest2 <- putQueue qf false
if null rest1 Prelude.&& null rest2
then get2 Seq.empty p
else return (rest1 ++ rest2)
p succeed = get source >>= maybe (return []) succeed
in p get1)
in (configuration, s)
lastAndAfter :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
lastAndAfter splitter = liftSectionSplitter "lastAndAfter" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
pipeD "lastAndAfter" (transduce (splitterToMarker splitter') source)
(\source-> let get1 (x, False) = put false x
>>= cond (p get1) (return [x])
get1 p@(x, True) = get2 Seq.empty p
get2 q (x, True) = let q' = q |> x
in get source
>>= maybe
(putQueue q' true)
(get2 q')
get2 q p@(x, False) = get3 q p
get3 q (x, False) = let q' = q |> x
in get source
>>= maybe
(putQueue q' true)
(get3 q')
get3 q p@(x, True) = putQueue q false
>>= whenNull (get1 p)
p succeed = get source >>= maybe (return []) succeed
in p get1)
in (configuration, s)
prefix :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
prefix splitter = liftSectionSplitter "prefix" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
s source true false = liftM (\(x, y)-> y ++ x) $
pipeD "prefix" (transduce (splitterToMarker splitter') source)
(\source-> let get1 (x, False) = p false x get2
get1 (x, True) = p true x get1
get2 (x, _) = p false x get2
p sink x succeed = put sink x
>>= cond (get source
>>= maybe (return []) succeed)
(return $ maybe [] (:[]) x)
in get source >>= maybe (return []) get1)
in (configuration, s)
suffix :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
suffix splitter = liftSectionSplitter "suffix" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
pipeD "suffix" (transduce (splitterToMarker splitter') source)
(\source-> let get1 (x, False) = put false x >>= cond (p get1) (return [x])
get1 (x, True) = get2 (Seq.singleton x)
get2 q = get source
>>= maybe (putQueue q true) (get3 q)
get3 q (x, True) = get2 (q |> x)
get3 q p@(x, False) = putQueue q false >>= whenNull (get1 p)
p succeed = get source >>= maybe (return []) succeed
in p get1)
in (configuration, s)
even :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
even splitter = liftSectionSplitter "even" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
pipeD "even"
(transduce (splitterToMarker splitter') source)
(\source-> let get1 (x, False) = put false x
>>= cond (next get1) (return [x])
get1 p@(x, True) = get2 p
get2 (x, True) = put false x
>>= cond (next get2) (return [x])
get2 p@(x, False) = get3 p
get3 (x, False) = put false x
>>= cond (next get3) (return [x])
get3 p@(x, True) = get4 p
get4 (x, True) = put true x
>>= cond (next get4) (return [x])
get4 p@(x, False) = get1 p
next g = get source >>= maybe (return []) g
in next get1)
in (configuration, s)
startOf :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
startOf splitter = liftSectionSplitter "startOf" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
pipeD "startOf"
(transduce (splitterToMarker splitter') source)
(\source-> let get1 (x, False) = put false x
>>= cond (next get1) (return [x])
get1 p@(x, True) = put true Nothing >> get2 p
get2 (x, True) = put false x
>>= cond (next get2) (return [x])
get2 p@(x, False) = get1 p
next g = get source >>= maybe (return []) g
in next get1)
in (configuration, s)
endOf :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
endOf splitter = liftSectionSplitter "endOf" (maxUsableThreads splitter) $
\threads-> let splitter' = usingThreads threads splitter
configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
pipeD "endOf"
(transduce (splitterToMarker splitter') source)
(\source-> let get1 (x, False) = put false x
>>= cond (next get1) (return [x])
get1 p@(x, True) = get2 p
get2 (x, True) = put false x
>>= cond (next get2) (return [x])
get2 p@(x, False) = put true Nothing >> get1 p
next g = get source >>= maybe (return []) g
in next get1)
in (configuration, s)
followedBy :: forall m x. (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
followedBy s1 s2 = liftSectionSplitter "followedBy" (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
in (configuration, followedBy' parallel s1' s2')
where followedBy' parallel s1 s2 source true false
= liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
(if parallel then pipeP else pipe)
(transduce (splitterToMarker s1) source)
(\source-> let get0 q = case Seq.viewl q
of Seq.EmptyL -> get source >>= maybe (return []) get1
(x, False) :< rest -> put false x
>>= cond (get0 rest)
(return $ Foldable.toList $ Seq.viewl $ fmap fst q)
(x, True) :< rest -> get2 Seq.empty q
get1 (x, False) = put false x
>>= cond (get source >>= maybe (return []) get1)
(return [x])
get1 p@(x, True) = get2 Seq.empty (Seq.singleton p)
get2 q q' = case Seq.viewl q'
of Seq.EmptyL -> get source
>>= maybe (testEnd q) (get2 q . Seq.singleton)
(x, True) :< rest -> get2 (q |> x) rest
(x, False) :< rest -> do ((q1, q2), n) <- pipeD "followedBy tail"
(get3 Seq.empty q') (test q)
case n of Nothing -> putQueue q false
>>= whenNull (get0 (q1 >< q2))
Just n -> do put false Nothing
get0 (dropJust n q1 >< q2)
get3 q1 q2 sink = canPut sink
>>= cond (case Seq.viewl q2
of Seq.EmptyL -> get source
>>= maybe (return (q1, q2))
(\p-> maybe (return True) (put sink) (fst p)
>> get3 (q1 |> p) q2 sink)
p :< rest -> maybe (return True) (put sink) (fst p)
>> get3 (q1 |> p) rest sink)
(return (q1, q2))
testEnd q = do ((), n) <- pipeD "testEnd" (const $ return ()) (test q)
case n of Nothing -> putQueue q false
_ -> return []
test q source = liftM snd $
pipeD "follower"
(transduce (splitterToMarker s2) source)
(\source-> let get4 (_, False) = return Nothing
get4 p@(_, True) = putQueue q true >> get5 0 p
get5 n (x, False) = return (Just n)
get5 n (Nothing, True) = get6 n
get5 n (x, True) = put true x >> get6 (succ n)
get6 n = get source
>>= maybe
(return $ Just n)
(get5 n)
in get source >>= maybe (return Nothing) get4)
dropJust 0 q = q
dropJust n q = case Seq.viewl q of (Nothing, _) :< rest -> dropJust n rest
(Just _, _) :< rest -> dropJust (pred n) rest
in get0 Seq.empty)
(...) :: forall m x. (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
s1 ... s2 = liftSectionSplitter "..." (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
(if parallel then pipeP else pipe)
(transduce (splittersToPairMarker s1 s2) source)
(\source-> let next n = get source >>= maybe (return []) (state n)
pass n x = (if n > 0 then put true x else put false x)
>>= cond (next n) (return [x])
pass' n x = (if n >= 0 then put true x else put false x)
>>= cond (next n) (return [x])
state n (Left (x, True, False)) = pass (succ n) (Just x)
state n (Left (x, False, True)) = pass' (pred n) (Just x)
state n (Left (x, True, True)) = pass' n (Just x)
state n (Left (x, False, False)) = pass n (Just x)
state n (Right (Left True)) = pass (succ n) Nothing
state n (Right (Right True)) = pass (pred n) Nothing
state n (Right _) = next n
in next 0)
in (configuration, s)
type Marker m x = Transducer m x (Maybe x, Bool)
splitterToMarker :: forall m x. (ParallelizableMonad m, Typeable x) => Splitter m x -> Marker m x
splitterToMarker s = liftTransducer "splitterToMarker" (maxUsableThreads s) $
\threads-> let s' = usingThreads threads s
t source sink = liftM (\((x, y), z)-> z ++ y ++ x) $
pipeD "splitterToMarker true"
(\trueSink-> pipeD "splitterToMarker false"
(splitSections s' source trueSink)
(mark False))
(mark True)
where mark b source = canPut sink
>>= cond (get source
>>= maybe (return [])
(\x-> put sink (x, b)
>>= cond
(mark b source)
(return $ maybe [] (: []) x)))
(return [])
in (ComponentConfiguration [AnyComponent s'] threads (cost s' + 1), t)
splittersToPairMarker :: forall m x. (ParallelizableMonad m, Typeable x)
=> Splitter m x -> Splitter m x -> Transducer m x (Either (x, Bool, Bool) (Either Bool Bool))
splittersToPairMarker s1 s2
= liftTransducer "splittersToPairMarker" (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallelize) = optimalTwoParallelConfigurations threads s1 s2
t source sink = liftM (\((((((([], l1), l2), l3), l4), l5), l6), l7)-> l7 ++ l6 ++ l5 ++ l4 ++ l3 ++ l2 ++ l1) $
pipeD "splittersToPairMarker synchronize"
(\sync->
pipeD "splittersToPairMarker true1"
(\true1->
pipeD "splittersToPairMarker false1"
(\false1->
pipeD "splittersToPairMarker true2"
(\true2->
pipeD "splittersToPairMarker false2"
(\false2->
pipeD "splittersToPairMarker sink1"
(\sink1->
(if parallelize then pipeP else pipe)
(\sink2-> tee source sink1 sink2)
(\source2-> splitSections s2 source2 true2 false2))
(\source1-> splitSections s1 source1 true1 false1))
(mark sync False False))
(mark sync False True))
(mark sync True False))
(mark sync True True))
(synchronizeMarks Nothing sink)
synchronizeMarks :: Maybe (Seq (Maybe x, Bool), Bool)
-> Sink c (Either (x, Bool, Bool) (Either Bool Bool)) -> Source c (Maybe x, Bool, Bool)
-> Pipe c m [x]
synchronizeMarks state sink source = get source
>>= maybe
(assert (isNothing state) (return []))
(handleMark state sink source)
handleMark :: Maybe (Seq (Maybe x, Bool), Bool)
-> Sink c (Either (x, Bool, Bool) (Either Bool Bool)) -> Source c (Maybe x, Bool, Bool)
-> (Maybe x, Bool, Bool) -> Pipe c m [x]
handleMark Nothing sink source (x, pos, b)
= case x of Nothing -> put sink (Right $ if pos then Left b else Right b)
>> synchronizeMarks Nothing sink source
_ -> synchronizeMarks (Just (Seq.singleton (x, b), pos)) sink source
handleMark state@(Just (q, pos')) sink source mark@(x, pos, b)
| pos == pos' = synchronizeMarks (Just (q |> (x, b), pos')) sink source
| isNothing x = put sink (Right $ if pos then Left b else Right b)
>> synchronizeMarks state sink source
| otherwise = case Seq.viewl q
of Seq.EmptyL -> synchronizeMarks (Just (Seq.singleton (x, b), pos)) sink source
(Nothing, b') :< rest -> put sink (Right $ if pos then Right b' else Left b')
>>= cond
(handleMark
(if Seq.null rest then Nothing else Just (rest, pos'))
sink
source
mark)
(returnQueuedList q)
(Just y, b') :< rest -> put sink (Left $ if pos then (y, b, b') else (y, b', b))
>>= cond
(synchronizeMarks
(if Seq.null rest then Nothing else Just (rest, pos'))
sink
source)
(returnQueuedList q)
returnQueuedList q = return $ concatMap (maybe [] (:[]) . fst) $ Foldable.toList $ Seq.viewl q
mark sink first b source = let mark' = canPut sink
>>= cond
(get source
>>= maybe
(return [])
(\x-> put sink (x, first, b)
>>= cond mark' (return $ maybe [] (: []) x)))
(return [])
in mark'
in (configuration, t)
pairMarkerToMaybePairMarker :: forall m x. (ParallelizableMonad m, Typeable x)
=> Transducer m x (Either (x, Bool, Bool) (Either Bool Bool)) -> Transducer m x (Maybe x, Bool, Bool)
pairMarkerToMaybePairMarker t = liftTransducer "pairMarkerToMaybePairMarker" (maxUsableThreads t + 1) $
\threads-> let t's = usingThreads threads t
t'p = usingThreads (threads 1) t
parallel = threads > 1 Prelude.&& cost t'p <= cost t's
t' = if parallel then t'p else t's
cost' = if parallel then (cost t'p `max` 1) + 1 else cost t's + 1
transduce' source sink
= liftM (\(x, y)-> y ++ x) $
(if parallel then pipeP else pipe)
(transduce t source)
(\source-> let next state = get source >>= maybe (return []) state
nextState2 l r d = get source
>>= maybe (put sink (Nothing, l, r) >> return []) (state2 l r d)
state0 (Left (x, l, r)) = put sink (Just x, l, r)
>>= cond (next $ state1 l r) (return [x])
state0 v@(Right d) = state2 False False d v
state1 _ _ (Left (x, l, r)) = put sink (Just x, l, r)
>>= cond (next $ state1 l r) (return [x])
state1 l r v@(Right d) = state2 l r d v
state2 l r Left{} (Right d@(Left l')) = nextState2 l' r d
state2 l r Left{} (Right (Right r')) = put sink (Nothing, l, r')
>>= cond (next $ state1 l r') (return [])
state2 l r Left{} t@(Left (x, l', r')) | l == l' = state1 l r t
| otherwise = put sink (Nothing, l, r)
>>= cond
(state1 l' r' t)
(return [])
state2 l r Right{} (Right d@(Right r')) = nextState2 l r' d
state2 l r Right{} (Right (Left l')) = put sink (Nothing, l', r)
>>= cond (next $ state1 l' r) (return [])
state2 l r Right{} t@(Left (x, l', r')) | r == r' = state1 l r t
| otherwise = put sink (Nothing, l, r)
>>= cond
(state1 l' r' t)
(return [])
in next state0)
in (ComponentConfiguration [AnyComponent t'] threads cost', transduce')
zipSplittersWith :: (ParallelizableMonad m, Typeable x) => (Bool -> Bool -> Bool) -> Splitter m x -> Splitter m x -> Splitter m x
zipSplittersWith f s1 s2
= liftSectionSplitter "zip" (maxUsableThreads s1 + maxUsableThreads s2) $
\threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
s source true false = liftM (\(x, y)-> y ++ x) $
(if parallel then pipeP else pipe)
(transduce (pairMarkerToMaybePairMarker $ splittersToPairMarker s1 s2) source)
(\source-> let split = get source >>= maybe (return []) test
test (x, b1, b2) = (if f b1 b2 then put true x else put false x)
>>= cond split (return $ maybe [] (:[]) x)
in split)
in (configuration, s)
groupMarks :: forall c m x y z. (ParallelizableMonad m, Typeable x, Typeable y, Eq y)
=> Source c (Maybe x, y) -> (y -> Source c x -> Pipe c m z) -> Pipe c m ()
groupMarks source getConsumer = getSuccess source startNew
where startNew (mx, y) = do (nextPair, _) <- pipeD "groupMarks" (\sink-> pass sink mx y) (getConsumer y)
case nextPair of Just p -> startNew p
Nothing -> return ()
pass sink Nothing y = next sink y
pass sink (Just x) y = put sink x >> next sink y
next sink y = get source >>= maybe (return Nothing) (continue sink y)
continue sink y (x, y') | y == y' = pass sink x y
continue sink y p@(x, y') | y /= y' = return (Just p)
splitConsumer :: forall c m x r1 r2. (ParallelizableMonad m, Typeable x)
=> String -> Bool -> Splitter m x -> (Source c x -> Pipe c m r1) -> (Source c x -> Pipe c m r2)
-> (Source c x -> Pipe c m ([x], r1, r2))
splitConsumer description parallel s trueConsumer falseConsumer = consumer'
where consumer' source = (if parallel then pipeP else pipe)
(\false-> pipeD (description ++ " true") (\true-> split s source true false) trueConsumer)
falseConsumer
>>= \((extra, r1), r2)-> return (extra, r1, r2)
splitConsumerSections :: forall m x r1 r2. (ParallelizableMonad m, Typeable x) =>
String -> Splitter m x -> Consumer m (Maybe x) r1 -> Consumer m (Maybe x) r2 -> Consumer m x ([x], r1, r2)
splitConsumerSections description s trueConsumer falseConsumer
= liftConsumer description (maxUsableThreads s + maxUsableThreads trueConsumer + maxUsableThreads falseConsumer) usingThreads
where usingThreads :: Int -> (ComponentConfiguration, forall c. Source c x -> Pipe c m ([x], r1, r2))
usingThreads threadCount = (configuration', consumer')
where (configuration', (splitter', forkSplitter), (trueConsumer', forkTrue), (falseConsumer', forkFalse))
= optimalThreeParallelConfigurations threadCount s trueConsumer falseConsumer
consumer' source = (if forkFalse then pipeP else pipe)
(\false-> (if forkTrue Prelude.|| forkSplitter then pipeP else pipe)
(\true-> splitSections s source true false)
(consume trueConsumer))
(consume falseConsumer)
>>= \((extra, r1), r2)-> return (extra, r1, r2)
putQueue :: forall c m x. (Monad m, Typeable x) => Seq x -> Sink c x -> Pipe c m [x]
putQueue q sink = putList (Foldable.toList (Seq.viewl q)) sink
getQueue :: forall c m x. (Monad m, Typeable x) => Source c x -> Pipe c m (Seq x)
getQueue source = let getOne q = get source >>= maybe (return q) (\x-> getOne (q |> x))
in getOne Seq.empty
pourMaybe :: forall c x m. (Monad m, Typeable x) => Source c x -> Sink c (Maybe x) -> Pipe c m ()
pourMaybe source sink = pour0
where pour0 = canPut sink >>= flip when (get source >>= maybe (put sink Nothing >> return ()) pass)
pour1 = canPut sink >>= flip when (getSuccess source pass)
pass x = put sink (Just x) >> pour1
suppressProducer :: forall c m x r. (ParallelizableMonad m, Typeable x) => (Sink c x -> Pipe c m r) -> Pipe c m r
suppressProducer p = liftM fst $ pipeD "suppress" p consumeAndSuppress
fst3 :: (a, b, c) -> a
fst3 (a, b, c) = a