{- Copyright 2008 Mario Blazevic This file is part of the Streaming Component Combinators (SCC) project. The SCC project is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. SCC is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with SCC. If not, see . -} -- | The "Combinators" module defines combinators applicable to 'Transducer' and 'Splitter' components defined in the -- "ComponentTypes" module. {-# LANGUAGE ScopedTypeVariables, Rank2Types #-} module Control.Concurrent.SCC.Combinators (-- * Consumer and producer combinators (->>), (<<-), -- * Transducer combinators (>->), join, -- * Pseudo-logic splitter combinators -- | Combinators '>&' and '>|' are only /pseudo/-logic. While the laws of double negation and De Morgan's laws hold, -- '>&' and '>|' are in general not commutative, associative, nor idempotent. In the special case when all argument -- splitters are stateless, such as those produced by 'Components.liftStatelessSplitter', these combinators do satisfy -- all laws of Boolean algebra. snot, (>&), (>|), -- ** Zipping logic combinators -- | The '&&' and '||' combinators run the argument splitters in parallel and combine their logical outputs using -- the corresponding logical operation on each output pair, in a manner similar to 'Prelude.zipWith'. (&&), (||), -- * Flow-control combinators -- | The following combinators resemble the common flow-control programming language constructs. Combinators -- 'wherever', 'unless', and 'select' are just the special cases of the combinator 'ifs'. -- -- * /transducer/ ``wherever`` /splitter/ = 'ifs' /splitter/ /transducer/ 'Components.asis' -- -- * /transducer/ ``unless`` /splitter/ = 'ifs' /splitter/ 'Components.asis' /transducer/ -- -- * 'select' /splitter/ = 'ifs' /splitter/ 'Components.asis' 'Components.suppress' -- ifs, wherever, unless, select, -- ** Recursive while, nestedIn, -- * Section-based combinators -- | All combinators in this section use their 'Splitter' argument to determine the -- structure of the input. Every contiguous portion of the input that gets passed to one or the other sink of the -- splitter is treated as one section in the logical structure of the input stream. What is done with the section -- depends on the combinator, but the sections, and therefore the logical structure of the input stream, are -- determined by the argument splitter alone. foreach, having, havingOnly, followedBy, even, -- ** first and its variants first, uptoFirst, prefix, -- ** last and its variants last, lastAndAfter, suffix, -- ** input ranges between, (...)) where import Control.Concurrent.SCC.Foundation import Control.Concurrent.SCC.ComponentTypes import Prelude hiding (even, last, (||), (&&)) import qualified Prelude import Control.Exception (assert) import Control.Monad (liftM, when) import qualified Control.Monad as Monad import Data.Maybe (isJust, isNothing, fromJust) import Data.Typeable (Typeable) import qualified Data.Foldable as Foldable import qualified Data.Sequence as Seq import Data.Sequence (Seq, (|>), (><), ViewL (EmptyL, (:<))) import Debug.Trace (trace) infixr ->> -- | The result of combinator '->>' is a consumer that acts as a composition of the given transducer and consumer -- arguments. (->>) :: forall x y m r. (Monad m, Typeable x, Typeable y) => Transducer m x y -> Consumer m y r -> Consumer m x r Transducer t ->> consumer = consumer' where consumer' source = liftM snd $ pipeD "->>" (t source) consumer -- | The result of combinator '<<-' is a producer that acts as a composition of the given transducer and producer -- arguments. (<<-) :: forall x y m r c c1. (Monad m, Typeable x, Typeable y) => Transducer m x y -> Producer m x r -> Producer m y r Transducer t <<- producer = producer' where producer' sink = liftM fst $ pipeD "<<-" producer (flip t sink) -- | The '>->' combinator composes its argument transducers. The resulting composition /t1 >-> t2/ passes its input through the -- first transducer /t1/, the output of /t1/ is passed to the other transducer /t2/, and its output becomes the output of the -- composition. (>->) :: forall m x y z. Monad m => Transducer m x y -> Transducer m y z -> Transducer m x z Transducer t1 >-> Transducer t2 = Transducer t where t source sink = liftM fst $ pipeD ">->" (t1 source) (flip t2 sink) -- | The 'join' combinator arranges the two transducer arguments in parallel. The input of the resulting transducer is replicated -- to both component transducers in parallel, and the output of the resulting transducer is a concatenation of the two component -- transducers' outputs. join :: (Monad m, Typeable x) => Transducer m x y -> Transducer m x y -> Transducer m x y join (Transducer t1) (Transducer t2) = Transducer t where t source sink = do (((), l), extra) <- pipeD "join 1" (\sink1-> pipeD "join 2" (\sink2-> tee source sink1 sink2) getList) (flip t1 sink) pipeD "join 3" (putList l) (flip t2 sink) return extra -- | The 'snot' (streaming not) combinator simply reverses the outputs of the argument splitter. -- In other words, data that the argument splitter sends to its /true/ sink goes to the /false/ sink of the result, and vice versa. snot :: (Monad m, Typeable x) => Splitter m x -> Splitter m x snot splitter = liftSectionSplitter (\source true false-> splitSections splitter source false true) -- | The '>&' combinator sends the /true/ sink output of its left operand to the input of its right operand for further -- splitting. Both operands' /false/ sinks are connected to the /false/ sink of the combined splitter, but any input -- value to reach the /true/ sink of the combined component data must be deemed true by both splitters. (>&) :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x s1 >& s2 = liftSimpleSplitter (\source true false-> liftM fst $ pipeD ">&" (\true-> split s1 source true false) (\source-> split s2 source true false)) -- | A '>|' combinator's input value can reach its /false/ sink only by going through both argument splitters' /false/ -- sinks. (>|) :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x s1 >| s2 = liftSimpleSplitter (\source true false-> liftM fst $ pipeD ">|" (split s1 source true) (\source-> split s2 source true false)) -- | Combinator '&&' is a pairwise logical conjunction of two splitters run in parallel on the same input. (&&) :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x (&&) = zipSplittersWith (Prelude.&&) -- | Combinator '||' is a pairwise logical disjunction of two splitters run in parallel on the same input. (||) :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x (||) = zipSplittersWith (Prelude.||) -- | The result of the combinator 'ifs' is a transducer that applies one argument transducer to one portion of -- the input and the other transducer to the other portion of input, depending on where the splitter argument routes the data. ifs :: (Monad m, Typeable x) => Splitter m x -> Transducer m x y -> Transducer m x y -> Transducer m x y ifs s (Transducer t1) (Transducer t2) = Transducer t where t source sink = liftM fst3 $ splitConsumer "ifs" s (flip t1 sink) (flip t2 sink) source wherever :: (Monad m, Typeable x) => Transducer m x x -> Splitter m x -> Transducer m x x wherever (Transducer t) s = Transducer wherever' where wherever' source sink = liftM fst3 $ splitConsumer "wherever" s (flip t sink) (flip pour sink) source unless :: (Monad m, Typeable x) => Transducer m x x -> Splitter m x -> Transducer m x x unless (Transducer t) s = Transducer unless' where unless' source sink = liftM fst3 $ splitConsumer "unless" s (flip pour sink) (flip t sink) source select :: (Monad m, Typeable x) => Splitter m x -> Transducer m x x select s = Transducer (\source sink-> liftM fst3 $ splitConsumer "select" s (flip pour sink) consumeAndSuppress source) -- | The recursive combinator 'while' feeds the true sink of the argument splitter back to itself, modified by the -- argument transducer. Data fed to the splitter's false sink is passed on unmodified. while :: (Monad m, Typeable x) => Transducer m x x -> Splitter m x -> Transducer m x x while t s = Transducer while' where while' source sink = liftM fst3 $ splitConsumer "while" s (t ->> while t s ->> flip pour sink) (flip pour sink) source -- | The recursive combinator 'nestedIn' combines two splitters into a mutually recursive loop acting as a single splitter. -- The true sink of one of the argument splitters and false sink of the other become the true and false sinks of the loop. -- The other two sinks are bound to the other splitter's source. -- The use of 'nestedIn' makes sense only on hierarchically structured streams. If we gave it some input containing -- a flat sequence of values, and assuming both component splitters are deterministic and stateless, -- a value would either not loop at all or it would loop forever. nestedIn :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x nestedIn s1 s2 = s where s = liftSimpleSplitter (\source true false-> liftM fst $ pipe (\false-> split s1 source true false) (\source-> pipe (\true-> split s2 source true false) (\source-> split (nestedIn s1 s2) source true false))) -- | The 'foreach' combinator is similar to the combinator 'ifs' in that it combines a splitter and two transducers into -- another transducer. However, in this case the transducers are re-instantiated for each consecutive portion of the -- input as the splitter chunks it up. Each contiguous portion of the input that the splitter sends to one of its two -- sinks gets transducered through the appropriate argument transducer as that transducer's whole input. As soon as the -- contiguous portion is finished, the transducer gets terminated. foreach :: (Monad m, Typeable x, Typeable y) => Splitter m x -> Transducer m x y -> Transducer m x y -> Transducer m x y foreach s t1 t2 = Transducer t where t source sink = liftM fst $ pipeD "foreach" (transduce (splitterToMarker s) source) (\source-> groupMarks source (\b chunk-> transduce (if b then t1 else t2) chunk sink)) -- | The 'having' combinator combines two pure splitters into a pure splitter. One splitter is used to chunk the input -- into contiguous portions. Its /false/ sink is routed directly to the /false/ sink of the combined splitter. The -- second splitter is instantiated and run on each portion of the input that goes to first splitter's /true/ sink. If -- the second splitter sends any output at all to its /true/ sink, the whole input portion is passed on to the /true/ -- sink of the combined splitter, otherwise it goes to its /false/ sink. having :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x having s1 s2 = liftSectionSplitter s where s source true false = liftM fst $ pipeD "having" (transduce (splitterToMarker s1) source) (\source-> groupMarks source (\b chunk-> if b then test chunk else pourMaybe chunk false)) where test chunk = pipe (\sink1-> pipe (\sink2-> tee chunk sink1 sink2) getList) (\chunk-> pipe (\sink-> suppressProducer (split s2 chunk sink)) getList) >>= \(((), chunk), (_, truePart))-> let chunk' = if null chunk then [Nothing] else map Just chunk in (if null truePart then putList chunk' false else putList chunk' true) >> return () -- | The 'havingOnly' combinator is analogous to the 'having' combinator, but it succeeds and passes each chunk of the -- input to its /true/ sink only if the second splitter sends no part of it to its /false/ sink. havingOnly :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x havingOnly s1 s2 = liftSectionSplitter s where s source true false = liftM fst $ pipeD "havingOnly" (transduce (splitterToMarker s1) source) (\source-> groupMarks source (\b chunk-> if b then test chunk else pourMaybe chunk false)) where test chunk = pipe (\sink1-> pipe (\sink2-> tee chunk sink1 sink2) getList) (\chunk-> pipe (\sink-> suppressProducer (\suppress-> split s2 chunk suppress sink)) getList) >>= \(((), chunk), (_, falsePart))-> let chunk' = if null chunk then [Nothing] else map Just chunk in (if null falsePart then putList chunk' true else putList chunk' false) >> return () -- | The result of combinator 'first' behaves the same as the argument splitter up to and including the first portion of -- the input which goes into the argument's /true/ sink. All input following the first true portion goes into the -- /false/ sink. first :: (Monad m, Typeable x) => Splitter m x -> Splitter m x first splitter = liftSectionSplitter s where s source true false = liftM (\(x, y)-> y ++ x) $ pipeD "first" (transduce (splitterToMarker splitter) source) (\source-> let get1 (x, False) = p false x get1 get1 (x, True) = p true x get2 get2 (x, True) = p true x get2 get2 (x, False) = p false x get3 get3 (x, _) = p false x get3 p sink x succeed = put sink x >>= cond (get source >>= maybe (return []) succeed) (return $ maybe [] (:[]) x) in get source >>= maybe (return []) get1) -- | The result of combinator 'uptoFirst' takes all input up to and including the first portion of the input which goes -- into the argument's /true/ sink and feeds it to the result splitter's /true/ sink. All the rest of the input goes -- into the /false/ sink. The only difference between 'last' and 'lastAndAfter' combinators is in where they direct the -- /false/ portion of the input preceding the first /true/ part. uptoFirst :: (Monad m, Typeable x) => Splitter m x -> Splitter m x uptoFirst splitter = liftSectionSplitter s where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $ pipeD "uptoFirst" (transduce (splitterToMarker splitter) source) (\source-> let get1 q (x, False) = let q' = q |> x in get source >>= maybe (putQueue q' false) (get1 q') get1 q p@(x, True) = do rest <- putQueue q true if null rest then get2 p else return rest get2 (x, True) = p true x get2 get2 (x, False) = p false x get3 get3 (x, _) = p false x get3 p sink x succeed = put sink x >>= cond (get source >>= maybe (return []) succeed) (return [x]) in get source >>= maybe (return []) (get1 Seq.empty)) -- | The result of the combinator 'last' is a splitter which directs all input to its /false/ sink, up to the last -- portion of the input which goes to its argument's /true/ sink. That portion of the input is the only one that goes to -- the resulting component's /true/ sink. The splitter returned by the combinator 'last' has to buffer the previous two -- portions of its input, because it cannot know if a true portion of the input is the last one until it sees the end of -- the input or another portion succeeding the previous one. last :: (Monad m, Typeable x) => Splitter m x -> Splitter m x last splitter = liftSectionSplitter s where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $ pipeD "last" (transduce (splitterToMarker splitter) source) (\source-> let get1 (x, False) = put false x >>= cond (get source >>= maybe (return []) get1) (return [x]) get1 p@(x, True) = get2 Seq.empty p get2 q (x, True) = let q' = q |> x in get source >>= maybe (putQueue q' true) (get2 q') get2 q p@(x, False) = get3 q Seq.empty p get3 qt qf (x, False) = let qf' = qf |> x in get source >>= maybe (putQueue qt true >> putQueue qf' false) (get3 qt qf') get3 qt qf p@(x, True) = do rest1 <- putQueue qt false rest2 <- putQueue qf false if null rest1 Prelude.&& null rest2 then get2 Seq.empty p else return (rest1 ++ rest2) p succeed = get source >>= maybe (return []) succeed in p get1) -- | The result of the combinator 'lastAndAfter' is a splitter which directs all input to its /false/ sink, up to the -- last portion of the input which goes to its argument's /true/ sink. That portion and the remainder of the input is fed -- to the resulting component's /true/ sink. The difference between 'last' and 'lastAndAfter' combinators is where they -- feed the /false/ portion of the input, if any, remaining after the last /true/ part. lastAndAfter :: (Monad m, Typeable x) => Splitter m x -> Splitter m x lastAndAfter splitter = liftSectionSplitter s where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $ pipeD "lastAndAfter" (transduce (splitterToMarker splitter) source) (\source-> let get1 (x, False) = put false x >>= cond (p get1) (return [x]) get1 p@(x, True) = get2 Seq.empty p get2 q (x, True) = let q' = q |> x in get source >>= maybe (putQueue q' true) (get2 q') get2 q p@(x, False) = get3 q p get3 q (x, False) = let q' = q |> x in get source >>= maybe (putQueue q' true) (get3 q') get3 q p@(x, True) = putQueue q false >>= whenNull (get1 p) p succeed = get source >>= maybe (return []) succeed in p get1) -- | The 'prefix' combinator feeds its /true/ sink only the prefix of the input that its argument feeds to its /true/ sink. -- All the rest of the input is dumped into the /false/ sink of the result. prefix :: (Monad m, Typeable x) => Splitter m x -> Splitter m x prefix splitter = liftSectionSplitter s where s source true false = liftM (\(x, y)-> y ++ x) $ pipeD "prefix" (transduce (splitterToMarker splitter) source) (\source-> let get1 (x, False) = p false x get2 get1 (x, True) = p true x get1 get2 (x, _) = p false x get2 p sink x succeed = put sink x >>= cond (get source >>= maybe (return []) succeed) (return $ maybe [] (:[]) x) in get source >>= maybe (return []) get1) -- | The 'suffix' combinator feeds its /true/ sink only the suffix of the input that its argument feeds to its /true/ sink. -- All the rest of the input is dumped into the /false/ sink of the result. suffix :: (Monad m, Typeable x) => Splitter m x -> Splitter m x suffix splitter = liftSectionSplitter s where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $ pipeD "suffix" (transduce (splitterToMarker splitter) source) (\source-> let get1 (x, False) = put false x >>= cond (p get1) (return [x]) get1 (x, True) = get2 (Seq.singleton x) get2 q = get source >>= maybe (putQueue q true) (get3 q) get3 q (x, True) = get2 (q |> x) get3 q p@(x, False) = putQueue q false >>= whenNull (get1 p) p succeed = get source >>= maybe (return []) succeed in p get1) -- | The 'even' combinator takes every input section that its argument splitters deems /true/, and feeds even ones into -- its /true/ sink. The odd sections and parts of input that are /false/ according to its argument splitter are fed to -- 'even' splitter's /false/ sink. even :: (Monad m, Typeable x) => Splitter m x -> Splitter m x even splitter = liftSectionSplitter s where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $ pipeD "even" (transduce (splitterToMarker splitter) source) (\source-> let get1 (x, False) = put false x >>= cond (get source >>= maybe (return []) get1) (return [x]) get1 p@(x, True) = get2 p get2 (x, True) = put false x >>= cond (get source >>= maybe (return []) get2) (return [x]) get2 p@(x, False) = get3 p get3 (x, False) = put false x >>= cond (get source >>= maybe (return []) get3) (return [x]) get3 p@(x, True) = get4 p get4 (x, True) = put true x >>= cond (get source >>= maybe (return []) get4) (return [x]) get4 p@(x, False) = get1 p in get source >>= maybe (return []) get1) -- | Combinator 'followedBy' treats its argument 'Splitter's as patterns components and returns a 'Splitter' that -- matches their concatenation. A section of input is considered /true/ by the result iff its prefix is considered -- /true/ by argument /s1/ and the rest of the section is considered /true/ by /s2/. The splitter /s2/ is started anew -- after every section split to /true/ sink by /s1/. followedBy :: forall m x. (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x followedBy s1 s2 = liftSectionSplitter s where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $ pipeD "followedBy" (transduce (splitterToMarker s1) source) (\source-> let get0 q = case Seq.viewl q of Seq.EmptyL -> get source >>= maybe (return []) get1 (x, False) :< rest -> put false x >>= cond (get0 rest) (return $ Foldable.toList $ Seq.viewl $ fmap fst q) (x, True) :< rest -> get2 Seq.empty q get1 (x, False) = put false x >>= cond (get source >>= maybe (return []) get1) (return [x]) get1 p@(x, True) = get2 Seq.empty (Seq.singleton p) get2 q q' = case Seq.viewl q' of Seq.EmptyL -> get source >>= maybe (testEnd q) (get2 q . Seq.singleton) (x, True) :< rest -> get2 (q |> x) rest (x, False) :< rest -> do ((q1, q2), n) <- pipeD "followedBy tail" (get3 Seq.empty q') (test q) case n of Nothing -> putQueue q false >>= whenNull (get0 (q1 >< q2)) Just n -> do put false Nothing get0 (dropJust n q1 >< q2) get3 q1 q2 sink = canPut sink >>= cond (case Seq.viewl q2 of Seq.EmptyL -> get source >>= maybe (return (q1, q2)) (\p-> maybe (return True) (put sink) (fst p) >> get3 (q1 |> p) q2 sink) p :< rest -> maybe (return True) (put sink) (fst p) >> get3 (q1 |> p) rest sink) (return (q1, q2)) testEnd q = do ((), n) <- pipeD "testEnd" (const $ return ()) (test q) case n of Nothing -> putQueue q false _ -> return [] test q source = liftM snd $ pipeD "follower" (transduce (splitterToMarker s2) source) (\source-> let get4 (_, False) = return Nothing get4 p@(_, True) = putQueue q true >> get5 0 p get5 n (x, False) = return (Just n) get5 n (Nothing, True) = get6 n get5 n (x, True) = put true x >> get6 (succ n) get6 n = get source >>= maybe (return $ Just n) (get5 n) in get source >>= maybe (return Nothing) get4) dropJust 0 q = q dropJust n q = case Seq.viewl q of (Nothing, _) :< rest -> dropJust n rest (Just _, _) :< rest -> dropJust (pred n) rest in get0 Seq.empty) -- | Combinator 'between' passes to its /true/ sink all input that follows a section considered true by its first -- argument splitter but not a section considered true by its second argument. The section delimiter pairs can nest to -- arbitrary depth. between :: forall m x. (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x between s1 s2 = liftSectionSplitter s where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $ pipeD "between" (transduce (pairMarkerToMaybePairMarker $ splittersToPairMarker s1 s2) source) (\source-> let next state = get source >>= maybe (return []) state pass sink x state = put sink x >>= cond (next state) (return [x]) state0 t@(x, True, False) = state1 t state0 (x, _, _) = pass false x state0 state1 t@(x, _, True) = state0 t state1 (x, True, False) = pass false x state1 state1 t@(x, False, False) = state2 1 t state2 n (x, False, False) = pass true x (state2 n) state2 n t@(x, _, True) = state4 (pred n) t state2 n t@(x, True, False) = state3 (succ n) t state3 n (x, True, _) = pass true x (state3 n) state3 n t@(x, False, False) = state2 n t state3 n t@(x, False, True) = state4 (pred n) t state4 0 t = state0 t state4 n (x, _, True) = pass true x (state4 n) state4 n t@(x, True, False) = state3 (succ n) t state4 n t@(x, False, False) = state2 n t in next state0) -- | Combinator '...' is similar to 'between', except it passes to /true/ the delimiting sections as well -- as all input between them. (...) :: forall m x. (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x s1 ... s2 = liftSectionSplitter s where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $ pipeD "..." (transduce (pairMarkerToMaybePairMarker $ splittersToPairMarker s1 s2) source) (\source-> let next state = get source >>= maybe (return []) state pass sink x state = put sink x >>= cond (next state) (return [x]) state0 (x, False, _) = pass false x state0 state0 t@(x, True, _) = state1 1 t state1 0 t = state0 t state1 n (x, True, False) = pass true x (state1 n) state1 n t@(x, False, False) = state2 n t state1 n t@(x, _, True) = state3 (pred n) t state2 n (x, False, False) = pass true x (state2 n) state2 n t@(x, _, True) = state3 (pred n) t state2 n t@(x, True, False) = state1 (succ n) t state3 n (x, _, True) = pass true x (state3 n) state3 n t@(x, True, False) = put false Nothing >> state1 (succ n) t state3 0 t@(x, False, False) = state0 t state3 n t@(x, False, False) = state2 n t in next state0) -- Helper functions type Marker m x = Transducer m x (Maybe x, Bool) splitterToMarker :: forall m x. (Monad m, Typeable x) => Splitter m x -> Marker m x splitterToMarker s = Transducer t where t source sink = liftM (\((x, y), z)-> z ++ y ++ x) $ pipeD "splitterToMarker true" (\trueSink-> pipeD "splitterToMarker false" (splitSections s source trueSink) (mark False)) (mark True) where mark b source = canPut sink >>= cond (get source >>= maybe (return []) (\x-> put sink (x, b) >>= cond (mark b source) (return $ maybe [] (: []) x))) (return []) splittersToPairMarker :: forall m x. (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Transducer m x (Either (x, Bool, Bool) (Either Bool Bool)) splittersToPairMarker s1 s2 = Transducer t where t source sink = liftM (\((((((((), l1), l2), l3), l4), l5), l6), l7)-> l7 ++ l6 ++ l5 ++ l4 ++ l3 ++ l2 ++ l1) $ pipeD "splittersToMarker synchronize" (\sync-> pipeD "splittersToMarker true1" (\true1-> pipeD "splittersToMarker false1" (\false1-> pipeD "splitterssToMarker true2" (\true2-> pipeD "splittersToMarker false2" (\false2-> pipeD "splittersToMarker sink1" (\sink1-> pipeD "splittersToMarker sink2" (\sink2-> tee source sink1 sink2) (\source2-> splitSections s2 source2 true2 false2)) (\source1-> splitSections s1 source1 true1 false1)) (mark sync False False)) (mark sync False True)) (mark sync True False)) (mark sync True True)) (synchronizeMarks Nothing) where synchronizeMarks :: Maybe (Seq (x, Bool), Bool) -> Source c (Maybe x, Bool, Bool) -> Pipe c m [x] synchronizeMarks state source = get source -- >>= \t-> trace (show t ++ "@" ++ show state) (return t) >>= maybe (assert (isNothing state) (return [])) (\(x, pos, b) -> maybe (put sink (Right $ if pos then Left b else Right b) >> synchronizeMarks state source) (\x-> case state of Nothing -> synchronizeMarks (Just (Seq.singleton (x, b), pos)) source Just (q, pos') -> if pos == pos' then synchronizeMarks (Just (q |> (x, b), pos')) source else case Seq.viewl q of Seq.EmptyL -> synchronizeMarks (Just (Seq.singleton (x, b), pos)) source (y, b') :< rest -> put sink (Left $ if pos then (x, b, b') else (x, b', b)) >>= cond (synchronizeMarks (if Seq.null rest then Nothing else Just (rest, pos')) source) (returnQueuedList q)) x) returnQueuedList q = return $ map fst $ Foldable.toList $ Seq.viewl q mark sink first b source = let mark' = canPut sink >>= cond (get source >>= maybe (return []) (\x-> put sink (x, first, b) >>= cond mark' (return $ maybe [] (: []) x))) (return []) in mark' pairMarkerToMaybePairMarker :: forall m x. (Monad m, Typeable x) => Transducer m x (Either (x, Bool, Bool) (Either Bool Bool)) -> Transducer m x (Maybe x, Bool, Bool) pairMarkerToMaybePairMarker t = Transducer t' where t' source sink = liftM (\(x, y)-> y ++ x) $ pipeD "pairMarkerToMaybePairMarker" (transduce t source) (\source-> let next state = get source >>= maybe (return []) state nextState2 l r d = get source >>= maybe (put sink (Nothing, l, r) >> return []) (state2 l r d) state0 (Left (x, l, r)) = put sink (Just x, l, r) >>= cond (next $ state1 l r) (return [x]) state0 v@(Right d) = state2 False False d v state1 _ _ (Left (x, l, r)) = put sink (Just x, l, r) >>= cond (next $ state1 l r) (return [x]) state1 l r v@(Right d) = state2 l r d v state2 l r Left{} (Right d@(Left l')) = nextState2 l' r d state2 l r Left{} (Right (Right r')) = put sink (Nothing, l, r') >>= cond (next $ state1 l r') (return []) state2 l r Left{} t@(Left (x, l', r')) | l == l' = state1 l r t | otherwise = put sink (Nothing, l, r) >>= cond (state1 l' r' t) (return []) state2 l r Right{} (Right d@(Right r')) = nextState2 l r' d state2 l r Right{} (Right (Left l')) = put sink (Nothing, l', r) >>= cond (next $ state1 l' r) (return []) state2 l r Right{} t@(Left (x, l', r')) | r == r' = state1 l r t | otherwise = put sink (Nothing, l, r) >>= cond (state1 l' r' t) (return []) in next state0) zipSplittersWith :: (Monad m, Typeable x) => (Bool -> Bool -> Bool) -> Splitter m x -> Splitter m x -> Splitter m x zipSplittersWith f s1 s2 = liftSectionSplitter (\source true false-> liftM (\(x, y)-> y ++ x) $ pipeD "&" (transduce (pairMarkerToMaybePairMarker $ splittersToPairMarker s1 s2) source) (\source-> let split = get source >>= maybe (return []) test test (x, b1, b2) = (if f b1 b2 then put true x else put false x) >>= cond split (return $ maybe [] (:[]) x) in split)) groupMarks :: forall c1 c m x y z. (Monad m, Typeable x, Typeable y, Eq y) => Source c1 (Maybe x, y) -> (y -> Consumer m x z) -> Pipe c m () groupMarks source getConsumer = getSuccess source startNew where startNew (mx, y) = do (nextPair, _) <- pipeD "groupMarks" (\sink-> pass sink mx y) (getConsumer y) case nextPair of Just p -> startNew p Nothing -> return () pass sink Nothing y = next sink y pass sink (Just x) y = put sink x >> next sink y next sink y = get source >>= maybe (return Nothing) (continue sink y) continue sink y (x, y') | y == y' = pass sink x y continue sink y p@(x, y') | y /= y' = return (Just p) splitConsumer :: forall x m r1 r2 c c1. (Monad m, Typeable x) => String -> Splitter m x -> Consumer m x r1 -> Consumer m x r2 -> Source c1 x -> Pipe c m ([x], r1, r2) splitConsumer description s trueConsumer falseConsumer = consumer' where consumer' source = pipeD (description ++ " false") (\false-> pipeD (description ++ " true") (\true-> split s source true false) trueConsumer) falseConsumer >>= \((extra, r1), r2)-> return (extra, r1, r2) splitConsumerSections :: forall x m r1 r2 c c1. (Monad m, Typeable x) => String -> Splitter m x -> Consumer m (Maybe x) r1 -> Consumer m (Maybe x) r2 -> Source c1 x -> Pipe c m ([x], r1, r2) splitConsumerSections description s trueConsumer falseConsumer = consumer' where consumer' source = pipeD (description ++ " false") (\false-> pipeD (description ++ " true") (\true-> splitSections s source true false) trueConsumer) falseConsumer >>= \((extra, r1), r2)-> return (extra, r1, r2) putQueue :: forall context r m x. (Monad m, Typeable x) => Seq x -> Sink context x -> Pipe r m [x] putQueue q sink = putList (Foldable.toList (Seq.viewl q)) sink getQueue :: forall x c c1 m. (Monad m, Typeable x) => Source c1 x -> Pipe c m (Seq x) getQueue source = let getOne q = get source >>= maybe (return q) (\x-> getOne (q |> x)) in getOne Seq.empty pourMaybe :: forall c c1 c2 x m. (Monad m, Typeable x) => Source c1 x -> Sink c2 (Maybe x) -> Pipe c m () pourMaybe source sink = pour0 where pour0 = canPut sink >>= flip when (get source >>= maybe (put sink Nothing >> return ()) pass) pour1 = canPut sink >>= flip when (getSuccess source pass) pass x = put sink (Just x) >> pour1 suppressProducer :: forall x c m r. (Monad m, Typeable x) => Producer m x r -> Pipe c m r suppressProducer producer = liftM fst $ pipeD "suppress" producer consumeAndSuppress fst3 :: (a, b, c) -> a fst3 (a, b, c) = a