module Control.Concurrent.SCC.Combinators
(
(->>), (<<-),
(>->), join,
snot, (>&), (>|),
(&&), (||),
ifs, wherever, unless, select,
while, nestedIn,
foreach, having, havingOnly, followedBy, even,
first, uptoFirst, prefix,
last, lastAndAfter, suffix,
between, (...))
where
import Control.Concurrent.SCC.Foundation
import Control.Concurrent.SCC.ComponentTypes
import Prelude hiding (even, last, (||), (&&))
import qualified Prelude
import Control.Exception (assert)
import Control.Monad (liftM, when)
import qualified Control.Monad as Monad
import Data.Maybe (isJust, isNothing, fromJust)
import Data.Typeable (Typeable)
import qualified Data.Foldable as Foldable
import qualified Data.Sequence as Seq
import Data.Sequence (Seq, (|>), (><), ViewL (EmptyL, (:<)))
import Debug.Trace (trace)
infixr ->>
(->>) :: forall x y m r. (Monad m, Typeable x, Typeable y) => Transducer m x y -> Consumer m y r -> Consumer m x r
Transducer t ->> consumer = consumer'
where consumer' source = liftM snd $ pipeD "->>" (t source) consumer
(<<-) :: forall x y m r c c1. (Monad m, Typeable x, Typeable y) => Transducer m x y -> Producer m x r -> Producer m y r
Transducer t <<- producer = producer'
where producer' sink = liftM fst $ pipeD "<<-" producer (flip t sink)
(>->) :: forall m x y z. Monad m => Transducer m x y -> Transducer m y z -> Transducer m x z
Transducer t1 >-> Transducer t2 = Transducer t
where t source sink = liftM fst $ pipeD ">->" (t1 source) (flip t2 sink)
join :: (Monad m, Typeable x) => Transducer m x y -> Transducer m x y -> Transducer m x y
join (Transducer t1) (Transducer t2) = Transducer t
where t source sink = do (((), l), extra) <- pipeD "join 1"
(\sink1-> pipeD "join 2" (\sink2-> tee source sink1 sink2) getList)
(flip t1 sink)
pipeD "join 3" (putList l) (flip t2 sink)
return extra
snot :: (Monad m, Typeable x) => Splitter m x -> Splitter m x
snot splitter = liftSectionSplitter (\source true false-> splitSections splitter source false true)
(>&) :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
s1 >& s2 = liftSimpleSplitter (\source true false->
liftM fst $ pipeD ">&" (\true-> split s1 source true false) (\source-> split s2 source true false))
(>|) :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
s1 >| s2 = liftSimpleSplitter (\source true false->
liftM fst $ pipeD ">|" (split s1 source true) (\source-> split s2 source true false))
(&&) :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
(&&) = zipSplittersWith (Prelude.&&)
(||) :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
(||) = zipSplittersWith (Prelude.||)
ifs :: (Monad m, Typeable x) => Splitter m x -> Transducer m x y -> Transducer m x y -> Transducer m x y
ifs s (Transducer t1) (Transducer t2) = Transducer t
where t source sink = liftM fst3 $ splitConsumer "ifs" s (flip t1 sink) (flip t2 sink) source
wherever :: (Monad m, Typeable x) => Transducer m x x -> Splitter m x -> Transducer m x x
wherever (Transducer t) s = Transducer wherever'
where wherever' source sink = liftM fst3 $ splitConsumer "wherever" s (flip t sink) (flip pour sink) source
unless :: (Monad m, Typeable x) => Transducer m x x -> Splitter m x -> Transducer m x x
unless (Transducer t) s = Transducer unless'
where unless' source sink = liftM fst3 $ splitConsumer "unless" s (flip pour sink) (flip t sink) source
select :: (Monad m, Typeable x) => Splitter m x -> Transducer m x x
select s = Transducer (\source sink-> liftM fst3 $ splitConsumer "select" s (flip pour sink) consumeAndSuppress source)
while :: (Monad m, Typeable x) => Transducer m x x -> Splitter m x -> Transducer m x x
while t s = Transducer while'
where while' source sink = liftM fst3 $ splitConsumer "while" s (t ->> while t s ->> flip pour sink) (flip pour sink) source
nestedIn :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
nestedIn s1 s2 = s
where s = liftSimpleSplitter (\source true false->
liftM fst $
pipe (\false-> split s1 source true false)
(\source-> pipe (\true-> split s2 source true false)
(\source-> split (nestedIn s1 s2) source true false)))
foreach :: (Monad m, Typeable x, Typeable y) => Splitter m x -> Transducer m x y -> Transducer m x y -> Transducer m x y
foreach s t1 t2 = Transducer t
where t source sink = liftM fst $
pipeD "foreach"
(transduce (splitterToMarker s) source)
(\source-> groupMarks source (\b chunk-> transduce (if b then t1 else t2) chunk sink))
having :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
having s1 s2 = liftSectionSplitter s
where s source true false = liftM fst $
pipeD "having"
(transduce (splitterToMarker s1) source)
(\source-> groupMarks source (\b chunk-> if b then test chunk else pourMaybe chunk false))
where test chunk = pipe (\sink1-> pipe (\sink2-> tee chunk sink1 sink2) getList)
(\chunk-> pipe (\sink-> suppressProducer (split s2 chunk sink)) getList)
>>= \(((), chunk), (_, truePart))-> let chunk' = if null chunk
then [Nothing]
else map Just chunk
in (if null truePart
then putList chunk' false
else putList chunk' true)
>> return ()
havingOnly :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
havingOnly s1 s2 = liftSectionSplitter s
where s source true false = liftM fst $
pipeD "havingOnly"
(transduce (splitterToMarker s1) source)
(\source-> groupMarks source (\b chunk-> if b then test chunk else pourMaybe chunk false))
where test chunk = pipe (\sink1-> pipe (\sink2-> tee chunk sink1 sink2) getList)
(\chunk-> pipe (\sink-> suppressProducer (\suppress-> split s2 chunk suppress sink))
getList)
>>= \(((), chunk), (_, falsePart))-> let chunk' = if null chunk
then [Nothing]
else map Just chunk
in (if null falsePart
then putList chunk' true
else putList chunk' false)
>> return ()
first :: (Monad m, Typeable x) => Splitter m x -> Splitter m x
first splitter = liftSectionSplitter s
where s source true false = liftM (\(x, y)-> y ++ x) $
pipeD "first" (transduce (splitterToMarker splitter) source)
(\source-> let get1 (x, False) = p false x get1
get1 (x, True) = p true x get2
get2 (x, True) = p true x get2
get2 (x, False) = p false x get3
get3 (x, _) = p false x get3
p sink x succeed = put sink x
>>= cond (get source >>= maybe (return []) succeed)
(return $ maybe [] (:[]) x)
in get source >>= maybe (return []) get1)
uptoFirst :: (Monad m, Typeable x) => Splitter m x -> Splitter m x
uptoFirst splitter = liftSectionSplitter s
where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
pipeD "uptoFirst" (transduce (splitterToMarker splitter) source)
(\source-> let get1 q (x, False) = let q' = q |> x
in get source
>>= maybe
(putQueue q' false)
(get1 q')
get1 q p@(x, True) = do rest <- putQueue q true
if null rest then get2 p else return rest
get2 (x, True) = p true x get2
get2 (x, False) = p false x get3
get3 (x, _) = p false x get3
p sink x succeed = put sink x
>>= cond (get source >>= maybe (return []) succeed)
(return [x])
in get source >>= maybe (return []) (get1 Seq.empty))
last :: (Monad m, Typeable x) => Splitter m x -> Splitter m x
last splitter = liftSectionSplitter s
where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
pipeD "last" (transduce (splitterToMarker splitter) source)
(\source-> let get1 (x, False) = put false x
>>= cond (get source >>= maybe (return []) get1)
(return [x])
get1 p@(x, True) = get2 Seq.empty p
get2 q (x, True) = let q' = q |> x
in get source
>>= maybe
(putQueue q' true)
(get2 q')
get2 q p@(x, False) = get3 q Seq.empty p
get3 qt qf (x, False) = let qf' = qf |> x
in get source
>>= maybe
(putQueue qt true >> putQueue qf' false)
(get3 qt qf')
get3 qt qf p@(x, True) = do rest1 <- putQueue qt false
rest2 <- putQueue qf false
if null rest1 Prelude.&& null rest2
then get2 Seq.empty p
else return (rest1 ++ rest2)
p succeed = get source >>= maybe (return []) succeed
in p get1)
lastAndAfter :: (Monad m, Typeable x) => Splitter m x -> Splitter m x
lastAndAfter splitter = liftSectionSplitter s
where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
pipeD "lastAndAfter" (transduce (splitterToMarker splitter) source)
(\source-> let get1 (x, False) = put false x >>= cond (p get1) (return [x])
get1 p@(x, True) = get2 Seq.empty p
get2 q (x, True) = let q' = q |> x
in get source
>>= maybe
(putQueue q' true)
(get2 q')
get2 q p@(x, False) = get3 q p
get3 q (x, False) = let q' = q |> x
in get source
>>= maybe
(putQueue q' true)
(get3 q')
get3 q p@(x, True) = putQueue q false >>= whenNull (get1 p)
p succeed = get source >>= maybe (return []) succeed
in p get1)
prefix :: (Monad m, Typeable x) => Splitter m x -> Splitter m x
prefix splitter = liftSectionSplitter s
where s source true false = liftM (\(x, y)-> y ++ x) $
pipeD "prefix" (transduce (splitterToMarker splitter) source)
(\source-> let get1 (x, False) = p false x get2
get1 (x, True) = p true x get1
get2 (x, _) = p false x get2
p sink x succeed = put sink x
>>= cond (get source >>= maybe (return []) succeed)
(return $ maybe [] (:[]) x)
in get source >>= maybe (return []) get1)
suffix :: (Monad m, Typeable x) => Splitter m x -> Splitter m x
suffix splitter = liftSectionSplitter s
where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
pipeD "suffix" (transduce (splitterToMarker splitter) source)
(\source-> let get1 (x, False) = put false x >>= cond (p get1) (return [x])
get1 (x, True) = get2 (Seq.singleton x)
get2 q = get source
>>= maybe (putQueue q true) (get3 q)
get3 q (x, True) = get2 (q |> x)
get3 q p@(x, False) = putQueue q false >>= whenNull (get1 p)
p succeed = get source >>= maybe (return []) succeed
in p get1)
even :: (Monad m, Typeable x) => Splitter m x -> Splitter m x
even splitter = liftSectionSplitter s
where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
pipeD "even"
(transduce (splitterToMarker splitter) source)
(\source-> let get1 (x, False) = put false x
>>= cond (get source >>= maybe (return []) get1)
(return [x])
get1 p@(x, True) = get2 p
get2 (x, True) = put false x
>>= cond (get source >>= maybe (return []) get2)
(return [x])
get2 p@(x, False) = get3 p
get3 (x, False) = put false x
>>= cond (get source >>= maybe (return []) get3)
(return [x])
get3 p@(x, True) = get4 p
get4 (x, True) = put true x
>>= cond (get source >>= maybe (return []) get4)
(return [x])
get4 p@(x, False) = get1 p
in get source >>= maybe (return []) get1)
followedBy :: forall m x. (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
followedBy s1 s2 = liftSectionSplitter s
where s source true false
= liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
pipeD "followedBy"
(transduce (splitterToMarker s1) source)
(\source-> let get0 q = case Seq.viewl q
of Seq.EmptyL -> get source >>= maybe (return []) get1
(x, False) :< rest -> put false x
>>= cond (get0 rest)
(return $ Foldable.toList $ Seq.viewl $ fmap fst q)
(x, True) :< rest -> get2 Seq.empty q
get1 (x, False) = put false x
>>= cond (get source >>= maybe (return []) get1)
(return [x])
get1 p@(x, True) = get2 Seq.empty (Seq.singleton p)
get2 q q' = case Seq.viewl q'
of Seq.EmptyL -> get source
>>= maybe (testEnd q) (get2 q . Seq.singleton)
(x, True) :< rest -> get2 (q |> x) rest
(x, False) :< rest -> do ((q1, q2), n) <- pipeD "followedBy tail"
(get3 Seq.empty q') (test q)
case n of Nothing -> putQueue q false
>>= whenNull (get0 (q1 >< q2))
Just n -> do put false Nothing
get0 (dropJust n q1 >< q2)
get3 q1 q2 sink = canPut sink
>>= cond (case Seq.viewl q2
of Seq.EmptyL -> get source
>>= maybe (return (q1, q2))
(\p-> maybe (return True) (put sink) (fst p)
>> get3 (q1 |> p) q2 sink)
p :< rest -> maybe (return True) (put sink) (fst p)
>> get3 (q1 |> p) rest sink)
(return (q1, q2))
testEnd q = do ((), n) <- pipeD "testEnd" (const $ return ()) (test q)
case n of Nothing -> putQueue q false
_ -> return []
test q source = liftM snd $
pipeD "follower"
(transduce (splitterToMarker s2) source)
(\source-> let get4 (_, False) = return Nothing
get4 p@(_, True) = putQueue q true >> get5 0 p
get5 n (x, False) = return (Just n)
get5 n (Nothing, True) = get6 n
get5 n (x, True) = put true x >> get6 (succ n)
get6 n = get source
>>= maybe
(return $ Just n)
(get5 n)
in get source >>= maybe (return Nothing) get4)
dropJust 0 q = q
dropJust n q = case Seq.viewl q of (Nothing, _) :< rest -> dropJust n rest
(Just _, _) :< rest -> dropJust (pred n) rest
in get0 Seq.empty)
between :: forall m x. (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
between s1 s2 = liftSectionSplitter s
where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
pipeD "between"
(transduce (pairMarkerToMaybePairMarker $ splittersToPairMarker s1 s2) source)
(\source-> let next state = get source >>= maybe (return []) state
pass sink x state = put sink x >>= cond (next state) (return [x])
state0 t@(x, True, False) = state1 t
state0 (x, _, _) = pass false x state0
state1 t@(x, _, True) = state0 t
state1 (x, True, False) = pass false x state1
state1 t@(x, False, False) = state2 1 t
state2 n (x, False, False) = pass true x (state2 n)
state2 n t@(x, _, True) = state4 (pred n) t
state2 n t@(x, True, False) = state3 (succ n) t
state3 n (x, True, _) = pass true x (state3 n)
state3 n t@(x, False, False) = state2 n t
state3 n t@(x, False, True) = state4 (pred n) t
state4 0 t = state0 t
state4 n (x, _, True) = pass true x (state4 n)
state4 n t@(x, True, False) = state3 (succ n) t
state4 n t@(x, False, False) = state2 n t
in next state0)
(...) :: forall m x. (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
s1 ... s2 = liftSectionSplitter s
where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
pipeD "..."
(transduce (pairMarkerToMaybePairMarker $ splittersToPairMarker s1 s2) source)
(\source-> let next state = get source >>= maybe (return []) state
pass sink x state = put sink x >>= cond (next state) (return [x])
state0 (x, False, _) = pass false x state0
state0 t@(x, True, _) = state1 1 t
state1 0 t = state0 t
state1 n (x, True, False) = pass true x (state1 n)
state1 n t@(x, False, False) = state2 n t
state1 n t@(x, _, True) = state3 (pred n) t
state2 n (x, False, False) = pass true x (state2 n)
state2 n t@(x, _, True) = state3 (pred n) t
state2 n t@(x, True, False) = state1 (succ n) t
state3 n (x, _, True) = pass true x (state3 n)
state3 n t@(x, True, False) = put false Nothing >> state1 (succ n) t
state3 0 t@(x, False, False) = state0 t
state3 n t@(x, False, False) = state2 n t
in next state0)
type Marker m x = Transducer m x (Maybe x, Bool)
splitterToMarker :: forall m x. (Monad m, Typeable x) => Splitter m x -> Marker m x
splitterToMarker s = Transducer t
where t source sink = liftM (\((x, y), z)-> z ++ y ++ x) $
pipeD "splitterToMarker true"
(\trueSink-> pipeD "splitterToMarker false" (splitSections s source trueSink) (mark False))
(mark True)
where mark b source = canPut sink
>>= cond (get source
>>= maybe (return [])
(\x-> put sink (x, b) >>= cond (mark b source) (return $ maybe [] (: []) x)))
(return [])
splittersToPairMarker :: forall m x. (Monad m, Typeable x)
=> Splitter m x -> Splitter m x -> Transducer m x (Either (x, Bool, Bool) (Either Bool Bool))
splittersToPairMarker s1 s2 = Transducer t
where t source sink = liftM (\((((((((), l1), l2), l3), l4), l5), l6), l7)-> l7 ++ l6 ++ l5 ++ l4 ++ l3 ++ l2 ++ l1) $
pipeD "splittersToMarker synchronize"
(\sync->
pipeD "splittersToMarker true1"
(\true1->
pipeD "splittersToMarker false1"
(\false1->
pipeD "splitterssToMarker true2"
(\true2->
pipeD "splittersToMarker false2"
(\false2->
pipeD "splittersToMarker sink1"
(\sink1->
pipeD "splittersToMarker sink2"
(\sink2-> tee source sink1 sink2)
(\source2-> splitSections s2 source2 true2 false2))
(\source1-> splitSections s1 source1 true1 false1))
(mark sync False False))
(mark sync False True))
(mark sync True False))
(mark sync True True))
(synchronizeMarks Nothing)
where synchronizeMarks :: Maybe (Seq (x, Bool), Bool) -> Source c (Maybe x, Bool, Bool) -> Pipe c m [x]
synchronizeMarks state source
= get source
>>= maybe
(assert (isNothing state) (return []))
(\(x, pos, b) ->
maybe
(put sink (Right $ if pos then Left b else Right b)
>> synchronizeMarks state source)
(\x-> case state
of Nothing -> synchronizeMarks (Just (Seq.singleton (x, b), pos)) source
Just (q, pos') -> if pos == pos'
then synchronizeMarks (Just (q |> (x, b), pos')) source
else case Seq.viewl q
of Seq.EmptyL -> synchronizeMarks
(Just (Seq.singleton (x, b), pos))
source
(y, b') :< rest -> put sink (Left $ if pos
then (x, b, b')
else (x, b', b))
>>= cond
(synchronizeMarks
(if Seq.null rest
then Nothing
else Just (rest, pos'))
source)
(returnQueuedList q))
x)
returnQueuedList q = return $ map fst $ Foldable.toList $ Seq.viewl q
mark sink first b source = let mark' = canPut sink
>>= cond
(get source
>>= maybe
(return [])
(\x-> put sink (x, first, b)
>>= cond mark' (return $ maybe [] (: []) x)))
(return [])
in mark'
pairMarkerToMaybePairMarker :: forall m x. (Monad m, Typeable x)
=> Transducer m x (Either (x, Bool, Bool) (Either Bool Bool)) -> Transducer m x (Maybe x, Bool, Bool)
pairMarkerToMaybePairMarker t = Transducer t'
where t' source sink = liftM (\(x, y)-> y ++ x) $
pipeD "pairMarkerToMaybePairMarker"
(transduce t source)
(\source-> let next state = get source >>= maybe (return []) state
nextState2 l r d = get source
>>= maybe (put sink (Nothing, l, r) >> return []) (state2 l r d)
state0 (Left (x, l, r)) = put sink (Just x, l, r)
>>= cond (next $ state1 l r) (return [x])
state0 v@(Right d) = state2 False False d v
state1 _ _ (Left (x, l, r)) = put sink (Just x, l, r)
>>= cond (next $ state1 l r) (return [x])
state1 l r v@(Right d) = state2 l r d v
state2 l r Left{} (Right d@(Left l')) = nextState2 l' r d
state2 l r Left{} (Right (Right r')) = put sink (Nothing, l, r')
>>= cond (next $ state1 l r') (return [])
state2 l r Left{} t@(Left (x, l', r')) | l == l' = state1 l r t
| otherwise = put sink (Nothing, l, r)
>>= cond
(state1 l' r' t)
(return [])
state2 l r Right{} (Right d@(Right r')) = nextState2 l r' d
state2 l r Right{} (Right (Left l')) = put sink (Nothing, l', r)
>>= cond (next $ state1 l' r) (return [])
state2 l r Right{} t@(Left (x, l', r')) | r == r' = state1 l r t
| otherwise = put sink (Nothing, l, r)
>>= cond
(state1 l' r' t)
(return [])
in next state0)
zipSplittersWith :: (Monad m, Typeable x) => (Bool -> Bool -> Bool) -> Splitter m x -> Splitter m x -> Splitter m x
zipSplittersWith f s1 s2
= liftSectionSplitter (\source true false->
liftM (\(x, y)-> y ++ x) $
pipeD "&"
(transduce (pairMarkerToMaybePairMarker $ splittersToPairMarker s1 s2) source)
(\source-> let split = get source >>= maybe (return []) test
test (x, b1, b2) = (if f b1 b2 then put true x else put false x)
>>= cond split (return $ maybe [] (:[]) x)
in split))
groupMarks :: forall c1 c m x y z. (Monad m, Typeable x, Typeable y, Eq y)
=> Source c1 (Maybe x, y) -> (y -> Consumer m x z) -> Pipe c m ()
groupMarks source getConsumer = getSuccess source startNew
where startNew (mx, y) = do (nextPair, _) <- pipeD "groupMarks" (\sink-> pass sink mx y) (getConsumer y)
case nextPair of Just p -> startNew p
Nothing -> return ()
pass sink Nothing y = next sink y
pass sink (Just x) y = put sink x >> next sink y
next sink y = get source >>= maybe (return Nothing) (continue sink y)
continue sink y (x, y') | y == y' = pass sink x y
continue sink y p@(x, y') | y /= y' = return (Just p)
splitConsumer :: forall x m r1 r2 c c1. (Monad m, Typeable x)
=> String -> Splitter m x -> Consumer m x r1 -> Consumer m x r2 -> Source c1 x -> Pipe c m ([x], r1, r2)
splitConsumer description s trueConsumer falseConsumer = consumer'
where consumer' source = pipeD (description ++ " false")
(\false-> pipeD (description ++ " true") (\true-> split s source true false) trueConsumer)
falseConsumer
>>= \((extra, r1), r2)-> return (extra, r1, r2)
splitConsumerSections :: forall x m r1 r2 c c1. (Monad m, Typeable x)
=> String -> Splitter m x -> Consumer m (Maybe x) r1 -> Consumer m (Maybe x) r2 -> Source c1 x
-> Pipe c m ([x], r1, r2)
splitConsumerSections description s trueConsumer falseConsumer = consumer'
where consumer' source = pipeD (description ++ " false")
(\false-> pipeD (description ++ " true") (\true-> splitSections s source true false) trueConsumer)
falseConsumer
>>= \((extra, r1), r2)-> return (extra, r1, r2)
putQueue :: forall context r m x. (Monad m, Typeable x) => Seq x -> Sink context x -> Pipe r m [x]
putQueue q sink = putList (Foldable.toList (Seq.viewl q)) sink
getQueue :: forall x c c1 m. (Monad m, Typeable x) => Source c1 x -> Pipe c m (Seq x)
getQueue source = let getOne q = get source >>= maybe (return q) (\x-> getOne (q |> x))
in getOne Seq.empty
pourMaybe :: forall c c1 c2 x m. (Monad m, Typeable x) => Source c1 x -> Sink c2 (Maybe x) -> Pipe c m ()
pourMaybe source sink = pour0
where pour0 = canPut sink >>= flip when (get source >>= maybe (put sink Nothing >> return ()) pass)
pour1 = canPut sink >>= flip when (getSuccess source pass)
pass x = put sink (Just x) >> pour1
suppressProducer :: forall x c m r. (Monad m, Typeable x) => Producer m x r -> Pipe c m r
suppressProducer producer = liftM fst $ pipeD "suppress" producer consumeAndSuppress
fst3 :: (a, b, c) -> a
fst3 (a, b, c) = a