{- Copyright 2008-2009 Mario Blazevic This file is part of the Streaming Component Combinators (SCC) project. The SCC project is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. SCC is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with SCC. If not, see . -} {-# LANGUAGE ScopedTypeVariables, Rank2Types, KindSignatures, EmptyDataDecls, MultiParamTypeClasses, FlexibleContexts, FlexibleInstances, FunctionalDependencies, TypeFamilies #-} -- | The "Combinators" module defines combinators applicable to values of the 'Transducer' and 'Splitter' types defined -- in the "Control.Concurrent.SCC.Types" module. module Control.Concurrent.SCC.Combinators (-- * Consumer, producer, and transducer combinators splitterToMarker, consumeBy, prepend, append, substitute, PipeableComponentPair (connect), JoinableComponentPair (join, sequence), -- * Pseudo-logic splitter combinators -- | Combinators '>&' and '>|' are only /pseudo/-logic. While the laws of double negation and De Morgan's laws hold, -- '>&' and '>|' are in general not commutative, associative, nor idempotent. In the special case when all argument -- splitters are stateless, such as those produced by 'Components.liftStatelessSplitter', these combinators do satisfy -- all laws of Boolean algebra. sNot, sAnd, sOr, -- ** Zipping logic combinators -- | The '&&' and '||' combinators run the argument splitters in parallel and combine their logical outputs using -- the corresponding logical operation on each output pair, in a manner similar to 'Prelude.zipWith'. They fully -- satisfy the laws of Boolean algebra. pAnd, pOr, -- * Flow-control combinators -- | The following combinators resemble the common flow-control programming language constructs. Combinators -- 'wherever', 'unless', and 'select' are just the special cases of the combinator 'ifs'. -- -- * /transducer/ ``wherever`` /splitter/ = 'ifs' /splitter/ /transducer/ 'Components.asis' -- -- * /transducer/ ``unless`` /splitter/ = 'ifs' /splitter/ 'Components.asis' /transducer/ -- -- * 'select' /splitter/ = 'ifs' /splitter/ 'Components.asis' 'Components.suppress' -- ifs, wherever, unless, select, -- ** Recursive while, nestedIn, -- * Section-based combinators -- | All combinators in this section use their 'Splitter' argument to determine the structure of the input. Every -- contiguous portion of the input that gets passed to one or the other sink of the splitter is treated as one -- section in the logical structure of the input stream. What is done with the section depends on the combinator, -- but the sections, and therefore the logical structure of the input stream, are determined by the argument -- splitter alone. foreach, having, havingOnly, followedBy, even, -- ** first and its variants first, uptoFirst, prefix, -- ** last and its variants last, lastAndAfter, suffix, -- ** positional splitters startOf, endOf, -- ** input ranges between, -- * parser support parseRegions, parseNestedRegions, -- * grouping helpers groupMarks) where import Control.Concurrent.Coroutine import Control.Concurrent.SCC.Streams import Control.Concurrent.SCC.Types import Prelude hiding (even, last, sequence, (||), (&&)) import qualified Prelude import Control.Exception (assert) import Control.Monad (liftM, when) import qualified Control.Monad as Monad import Control.Monad.Trans (lift) import Data.Maybe (isJust, isNothing, fromJust) import qualified Data.Foldable as Foldable import qualified Data.Sequence as Seq import Data.Sequence (Seq, (|>), (><), ViewL (EmptyL, (:<))) import Debug.Trace (trace) -- | Converts a 'Consumer' into a 'Transducer' with no output. consumeBy :: forall m x y r. (Monad m) => Consumer m x r -> Transducer m x y consumeBy c = Transducer $ \ source _sink -> consume c source >> return [] -- | Class 'PipeableComponentPair' applies to any two components that can be combined into a third component with the -- following properties: -- -- * The input of the result, if any, becomes the input of the first component. -- -- * The output produced by the first child component is consumed by the second child component. -- -- * The result output, if any, is the output of the second component. class PipeableComponentPair (m :: * -> *) w c1 c2 c3 | c1 c2 -> c3, c1 c3 -> c2, c2 c3 -> c2, c1 -> m w, c2 -> m w, c3 -> m where connect :: Bool -> c1 -> c2 -> c3 instance forall m x. (ParallelizableMonad m) => PipeableComponentPair m x (Producer m x ()) (Consumer m x ()) (Performer m ()) where connect parallel p c = let performPipe :: Coroutine Naught m ((), ()) performPipe = pipePS parallel (produce p) (consume c) in Performer (runCoroutine performPipe >> return ()) instance (ParallelizableMonad m) => PipeableComponentPair m y (Transducer m x y) (Consumer m y r) (Consumer m x r) where connect parallel t c = isolateConsumer $ \source-> liftM snd $ pipePS parallel (transduce t source) (consume c) instance (ParallelizableMonad m) => PipeableComponentPair m x (Producer m x r) (Transducer m x y) (Producer m y r) where connect parallel p t = isolateProducer $ \sink-> liftM fst $ pipePS parallel (produce p) (\source-> transduce t source sink) instance ParallelizableMonad m => PipeableComponentPair m y (Transducer m x y) (Transducer m y z) (Transducer m x z) where connect parallel t1 t2 = isolateTransducer $ \source sink-> liftM fst $ pipePS parallel (transduce t1 source) (\source-> transduce t2 source sink) class CompatibleSignature c cons (m :: * -> *) input output | c -> cons m class AnyListOrUnit c instance AnyListOrUnit [x] instance AnyListOrUnit () instance (AnyListOrUnit x, AnyListOrUnit y) => CompatibleSignature (Performer m r) (PerformerType r) m x y instance AnyListOrUnit y => CompatibleSignature (Consumer m x r) (ConsumerType r) m [x] y instance AnyListOrUnit y => CompatibleSignature (Producer m x r) (ProducerType r) m y [x] instance CompatibleSignature (Transducer m x y) TransducerType m [x] [y] data PerformerType r data ConsumerType r data ProducerType r data TransducerType -- | Class 'JoinableComponentPair' applies to any two components that can be combined into a third component with the -- following properties: -- -- * if both argument components consume input, the input of the combined component gets distributed to both -- components in parallel, -- -- * if both argument components produce output, the output of the combined component is a concatenation of the -- complete output from the first component followed by the complete output of the second component, and -- -- * the 'join' method may apply the components in any order, the 'sequence' method makes sure its first argument -- has completed before using the second one. class (Monad m, CompatibleSignature c1 t1 m x y, CompatibleSignature c2 t2 m x y, CompatibleSignature c3 t3 m x y) => JoinableComponentPair t1 t2 t3 m x y c1 c2 c3 | c1 c2 -> c3, c1 -> t1 m, c2 -> t2 m, c3 -> t3 m x y, t1 m x y -> c1, t2 m x y -> c2, t3 m x y -> c3 where join :: Bool -> c1 -> c2 -> c3 sequence :: c1 -> c2 -> c3 join = const sequence instance forall m x r1 r2. Monad m => JoinableComponentPair (ProducerType r1) (ProducerType r2) (ProducerType r2) m () [x] (Producer m x r1) (Producer m x r2) (Producer m x r2) where sequence p1 p2 = Producer $ \sink-> produce p1 sink >> produce p2 sink instance forall m x. ParallelizableMonad m => JoinableComponentPair (ConsumerType ()) (ConsumerType ()) (ConsumerType ()) m [x] () (Consumer m x ()) (Consumer m x ()) (Consumer m x ()) where join parallel c1 c2 = isolateConsumer $ \source-> pipePS parallel (\sink1-> pipe (tee source sink1) (consume c2)) (consume c1) >> return () sequence c1 c2 = isolateConsumer $ \source-> pipe (\buffer-> pipe (tee source buffer) (consume c1)) getList >>= \(_, list)-> pipe (putList list) (consume c2) >> return () instance forall m x y. (ParallelizableMonad m) => JoinableComponentPair TransducerType TransducerType TransducerType m [x] [y] (Transducer m x y) (Transducer m x y) (Transducer m x y) where join parallel t1 t2 = isolateTransducer $ \source sink-> pipe (\buffer-> pipePS parallel (\sink1-> pipe (\sink2-> tee source sink1 sink2) (\src-> transduce t2 src buffer)) (\source-> transduce t1 source sink)) getList >>= \(_, list)-> putList list sink >> getList source sequence t1 t2 = isolateTransducer $ \source sink-> pipe (\buffer-> pipe (tee source buffer) (\source-> transduce t1 source sink)) getList >>= \(_, list)-> pipe (\sink-> putList list sink >>= whenNull (pour source sink >> return [])) (\source-> transduce t2 source sink) >>= return . fst instance forall m r1 r2. ParallelizableMonad m => JoinableComponentPair (PerformerType r1) (PerformerType r2) (PerformerType r2) m () () (Performer m r1) (Performer m r2) (Performer m r2) where join parallel p1 p2 = Performer $ if parallel then bindM2 (const return) (perform p1) (perform p2) else perform p1 >> perform p2 sequence p1 p2 = Performer $ perform p1 >> perform p2 instance forall m x r1 r2. (ParallelizableMonad m) => JoinableComponentPair (PerformerType r1) (ProducerType r2) (ProducerType r2) m () [x] (Performer m r1) (Producer m x r2) (Producer m x r2) where join parallel pe pr = Producer $ \sink-> if parallel then bindM2 (const return) (lift (perform pe)) (produce pr sink) else lift (perform pe) >> produce pr sink sequence pe pr = Producer $ \sink-> lift (perform pe) >> produce pr sink instance forall m x r1 r2. (ParallelizableMonad m) => JoinableComponentPair (ProducerType r1) (PerformerType r2) (ProducerType r2) m () [x] (Producer m x r1) (Performer m r2) (Producer m x r2) where join parallel pr pe = Producer $ \sink-> if parallel then bindM2 (const return) (produce pr sink) (lift (perform pe)) else produce pr sink >> lift (perform pe) sequence pr pe = Producer $ \sink-> produce pr sink >> lift (perform pe) instance forall m x r1 r2. (ParallelizableMonad m) => JoinableComponentPair (PerformerType r1) (ConsumerType r2) (ConsumerType r2) m [x] () (Performer m r1) (Consumer m x r2) (Consumer m x r2) where join parallel p c = Consumer $ \source-> if parallel then bindM2 (const return) (lift (perform p)) (consume c source) else lift (perform p) >> consume c source sequence p c = Consumer $ \source-> lift (perform p) >> consume c source instance forall m x r1 r2. (ParallelizableMonad m) => JoinableComponentPair (ConsumerType r1) (PerformerType r2) (ConsumerType r2) m [x] () (Consumer m x r1) (Performer m r2) (Consumer m x r2) where join parallel c p = Consumer $ \source-> if parallel then bindM2 (const return) (consume c source) (lift (perform p)) else consume c source >> lift (perform p) sequence c p = Consumer $ \source-> consume c source >> lift (perform p) instance forall m x y r. (ParallelizableMonad m) => JoinableComponentPair (PerformerType r) TransducerType TransducerType m [x] [y] (Performer m r) (Transducer m x y) (Transducer m x y) where join parallel p t = Transducer $ \ source sink -> if parallel then bindM2 (const return) (lift (perform p)) (transduce t source sink) else lift (perform p) >> transduce t source sink sequence p t = Transducer $ \ source sink -> lift (perform p) >> transduce t source sink instance forall m x y r. (ParallelizableMonad m) => JoinableComponentPair TransducerType (PerformerType r) TransducerType m [x] [y] (Transducer m x y) (Performer m r) (Transducer m x y) where join parallel t p = Transducer $ \ source sink -> if parallel then bindM2 (const . return) (transduce t source sink) (lift (perform p)) else do result <- transduce t source sink lift (perform p) return result sequence t p = Transducer $ \ source sink -> do result <- transduce t source sink lift (perform p) return result instance forall m x y. (ParallelizableMonad m) => JoinableComponentPair (ProducerType ()) TransducerType TransducerType m [x] [y] (Producer m y ()) (Transducer m x y) (Transducer m x y) where join parallel p t = if parallel then isolateTransducer $ \source sink-> do (rest, out) <- pipe (\buffer-> bindM2 (const return) (produce p sink) (transduce t source buffer)) getList putList out sink return rest else sequence p t sequence p t = Transducer $ \ source sink -> produce p sink >> transduce t source sink instance forall m x y. (ParallelizableMonad m) => JoinableComponentPair TransducerType (ProducerType ()) TransducerType m [x] [y] (Transducer m x y) (Producer m y ()) (Transducer m x y) where join parallel t p = if parallel then isolateTransducer $ \source sink-> do (rest, out) <- pipe (\buffer-> bindM2 (const . return) (transduce t source sink) (produce p buffer)) getList putList out sink return rest else sequence t p sequence t p = Transducer $ \ source sink -> do result <- transduce t source sink produce p sink return result instance forall m x y. (ParallelizableMonad m) => JoinableComponentPair (ConsumerType ()) TransducerType TransducerType m [x] [y] (Consumer m x ()) (Transducer m x y) (Transducer m x y) where join parallel c t = isolateTransducer $ \source sink-> liftM (snd . fst) $ pipePS parallel (\sink1-> pipe (tee source sink1) (\source-> transduce t source sink)) (consume c) sequence c t = isolateTransducer $ \source sink-> pipe (\buffer-> pipe (tee source buffer) (consume c)) getList >>= \(_, list)-> pipe (\sink-> putList list sink >>= whenNull (pour source sink >> return [])) (\source-> transduce t source sink) >>= return . fst instance forall m x y. ParallelizableMonad m => JoinableComponentPair TransducerType (ConsumerType ()) TransducerType m [x] [y] (Transducer m x y) (Consumer m x ()) (Transducer m x y) where join parallel t c = join parallel c t sequence t c = isolateTransducer $ \source sink-> pipe (\buffer-> pipe (tee source buffer) (\source-> transduce t source sink)) getList >>= \(_, list)-> pipe (\sink-> putList list sink >>= whenNull (pour source sink >> return [])) (consume c) >>= return . fst instance forall m x y. (ParallelizableMonad m) => JoinableComponentPair (ProducerType ()) (ConsumerType ()) TransducerType m [x] [y] (Producer m y ()) (Consumer m x ()) (Transducer m x y) where join parallel p c = Transducer $ \ source sink -> if parallel then bindM2 (\ _ _ -> return []) (produce p sink) (consume c source) else produce p sink >> consume c source >> return [] sequence p c = Transducer $ \ source sink -> produce p sink >> consume c source >> return [] instance forall m x y. (ParallelizableMonad m) => JoinableComponentPair (ConsumerType ()) (ProducerType ()) TransducerType m [x] [y] (Consumer m x ()) (Producer m y ()) (Transducer m x y) where join parallel c p = join parallel p c sequence c p = Transducer $ \ source sink -> consume c source >> produce p sink >> return [] -- | Combinator 'prepend' converts the given producer to transducer that passes all its input through unmodified, except -- | for prepending the output of the argument producer to it. -- | 'prepend' /prefix/ = 'join' ('substitute' /prefix/) 'asis' prepend :: forall m x r. (Monad m) => Producer m x r -> Transducer m x x prepend prefix = Transducer $ \ source sink -> produce prefix sink >> pour source sink >> return [] -- | Combinator 'append' converts the given producer to transducer that passes all its input through unmodified, finally -- | appending to it the output of the argument producer. -- | 'append' /suffix/ = 'join' 'asis' ('substitute' /suffix/) append :: forall m x r. (Monad m) => Producer m x r -> Transducer m x x append suffix = Transducer $ \ source sink -> pour source sink >> produce suffix sink >> return [] -- | The 'substitute' combinator converts its argument producer to a transducer that produces the same output, while -- | consuming its entire input and ignoring it. substitute :: forall m x y r. (Monad m) => Producer m y r -> Transducer m x y substitute feed = Transducer $ \ source sink -> consumeAndSuppress source >> produce feed sink >> return [] -- | The 'snot' (streaming not) combinator simply reverses the outputs of the argument splitter. In other words, data -- that the argument splitter sends to its /true/ sink goes to the /false/ sink of the result, and vice versa. sNot :: forall m x b. Monad m => Splitter m x b -> Splitter m x b sNot splitter = isolateSplitter $ \ source true false edge -> suppressProducer (split splitter source false true) -- | The '>&' combinator sends the /true/ sink output of its left operand to the input of its right operand for further -- splitting. Both operands' /false/ sinks are connected to the /false/ sink of the combined splitter, but any input -- value to reach the /true/ sink of the combined component data must be deemed true by both splitters. sAnd :: forall m x b1 b2. ParallelizableMonad m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2) sAnd parallel s1 s2 = isolateSplitter $ \ source true false edge -> liftM (fst . fst . fst . fst) $ pipe (\edges-> pipe (\edge1-> pipe (\edge2-> pipePS parallel (\true-> split s1 source true false edge1) (\source-> split s2 source true false edge2)) (flip (pourMap Right) edges)) (flip (pourMap Left) edges)) (flip intersectRegions edge) intersectRegions source sink = next Nothing Nothing where next lastLeft lastRight = get source >>= maybe (return ()) (either (flip pair lastRight . Just) (pair lastLeft . Just)) pair l@(Just x) r@(Just y) = put sink (x, y) >>= flip when (next Nothing Nothing) pair l r = next l r -- | A '>|' combinator's input value can reach its /false/ sink only by going through both argument splitters' /false/ -- sinks. sOr :: forall m x b1 b2. ParallelizableMonad m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (Either b1 b2) sOr parallel s1 s2 = isolateSplitter $ \ source true false edge -> liftM (fst . fst . fst) $ pipe (\edge1-> pipe (\edge2-> pipePS parallel (\false-> split s1 source true false edge1) (\source-> split s2 source true false edge2)) (flip (pourMap Right) edge)) (flip (pourMap Left) edge) -- | Combinator '&&' is a pairwise logical conjunction of two splitters run in parallel on the same input. pAnd :: forall m x b1 b2. ParallelizableMonad m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2) pAnd parallel s1 s2 = isolateSplitter $ \ source true false edge -> liftM (\(x, y)-> y ++ x) $ pipePS parallel (transduce (splittersToPairMarker parallel s1 s2) source) (\source-> let split l r = get source >>= maybe (return []) (test l r) test l r (Left (x, t1, t2)) = (if t1 Prelude.&& t2 then put true x else put false x) >>= cond (split (if t1 then l else Nothing) (if t2 then r else Nothing)) (return [x]) test _ Nothing (Right (Left l)) = split (Just l) Nothing test _ (Just r) (Right (Left l)) = put edge (l, r) >> split (Just l) (Just r) test Nothing _ (Right (Right r)) = split Nothing (Just r) test (Just l) _ (Right (Right r)) = put edge (l, r) >> split (Just l) (Just r) in split Nothing Nothing) -- | Combinator '||' is a pairwise logical disjunction of two splitters run in parallel on the same input. pOr :: forall c m x b1 b2. ParallelizableMonad m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (Either b1 b2) pOr = zipSplittersWith (Prelude.||) pour ifs :: forall c m x b. (ParallelizableMonad m, Branching c m x [x]) => Bool -> Splitter m x b -> c -> c -> c ifs parallel s c1 c2 = combineBranches if' parallel c1 c2 where if' :: forall d. Bool -> (forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x [x]) -> (forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x [x]) -> forall a. OpenConsumer m a d x [x] if' parallel c1 c2 source = splitInputToConsumers parallel s source c1 c2 wherever :: forall m x b. ParallelizableMonad m => Bool -> Transducer m x x -> Splitter m x b -> Transducer m x x wherever parallel t s = isolateTransducer $ \source sink-> splitInputToConsumers parallel s source (\source-> transduce t source sink) (\source-> pour source sink >> return []) unless :: forall m x b. ParallelizableMonad m => Bool -> Transducer m x x -> Splitter m x b -> Transducer m x x unless parallel t s = isolateTransducer $ \source sink-> splitInputToConsumers parallel s source (\source-> pour source sink >> return []) (\source-> transduce t source sink) select :: forall m x b. Monad m => Splitter m x b -> Transducer m x x select s = isolateTransducer $ \source sink-> suppressProducer (suppressProducer . split s source sink) -- | Converts a splitter into a parser. parseRegions :: forall m x b. Monad m => Splitter m x b -> Parser m x b parseRegions s = isolateTransducer $ \source sink-> liftM (\(x, y)-> y ++ x) $ pipe (transduce (splitterToMarker s) source) (\source-> wrapRegions source sink) where wrapRegions source sink = let wrap0 mb = get source >>= maybe (maybe (return True) flush mb >> return []) (wrap1 mb) wrap1 Nothing (Left (x, _)) = put sink (Content x) >>= cond (wrap0 Nothing) (return [x]) wrap1 (Just p) (Left (x, False)) = flush p >> put sink (Content x) >>= cond (wrap0 Nothing) (return [x]) wrap1 (Just (b, t)) (Left (x, True)) = (if t then return True else put sink (Markup (Start b))) >> put sink (Content x) >>= cond (wrap0 (Just (b, True))) (return [x]) wrap1 (Just p) (Right b') = flush p >> wrap0 (Just (b', False)) wrap1 Nothing (Right b) = wrap0 (Just (b, False)) flush (b, t) = put sink $ Markup $ (if t then End else Point) b in wrap0 Nothing -- | Converts a boundary-marking splitter into a parser. parseNestedRegions :: forall m x b. Monad m => Splitter m x (Boundary b) -> Parser m x b parseNestedRegions s = isolateTransducer $ \source sink-> liftM (\(w, (), (), _)-> w) $ splitToConsumers s source (flip (pourMap Content) sink) (flip (pourMap Content) sink) (flip (pourMap Markup) sink) -- | The recursive combinator 'while' feeds the true sink of the argument splitter back to itself, modified by the -- argument transducer. Data fed to the splitter's false sink is passed on unmodified. while :: forall m x b. ParallelizableMonad m => [(Bool, (Transducer m x x, Splitter m x b))] -> Transducer m x x while ((parallel, (t, s)) : rest) = isolateTransducer $ \source sink-> splitInputToConsumers parallel s source (\source-> get source >>= maybe (return []) (\x-> liftM (uncurry (++)) $ pipe (\sink-> put sink x >>= cond (pour source sink >> return []) (return [x])) (\source-> transduce while' source sink))) (\source-> pour source sink >> return []) where while' = connect parallel t (while rest) -- | The recursive combinator 'nestedIn' combines two splitters into a mutually recursive loop acting as a single -- splitter. The true sink of one of the argument splitters and false sink of the other become the true and false sinks -- of the loop. The other two sinks are bound to the other splitter's source. The use of 'nestedIn' makes sense only -- on hierarchically structured streams. If we gave it some input containing a flat sequence of values, and assuming -- both component splitters are deterministic and stateless, an input value would either not loop at all or it would -- loop forever. nestedIn :: forall m x b. ParallelizableMonad m => [(Bool, (Splitter m x b, Splitter m x b))] -> Splitter m x b nestedIn ((parallel, (s1, s2)) : rest) = isolateSplitter $ \ source true false edge -> liftM fst $ pipePS parallel (\false-> split s1 source true false edge) (\source-> pipe (\true-> pipe (split s2 source true false) consumeAndSuppress) (\source-> get source >>= maybe (return ([], [])) (\x-> pipe (\sink-> put sink x >>= cond (pour source sink >> return []) (return [x])) (\source-> split (nestedIn rest) source true false edge)))) -- | The 'foreach' combinator is similar to the combinator 'ifs' in that it combines a splitter and two transducers into -- another transducer. However, in this case the transducers are re-instantiated for each consecutive portion of the -- input as the splitter chunks it up. Each contiguous portion of the input that the splitter sends to one of its two -- sinks gets transducered through the appropriate argument transducer as that transducer's whole input. As soon as the -- contiguous portion is finished, the transducer gets terminated. foreach :: forall m x b c. (ParallelizableMonad m, Branching c m x [x]) => Bool -> Splitter m x b -> c -> c -> c foreach parallel s c1 c2 = combineBranches foreach' parallel c1 c2 where foreach' :: forall d. Bool -> (forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x [x]) -> (forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x [x]) -> forall a. OpenConsumer m a d x [x] foreach' parallel c1 c2 source = liftM fst $ pipePS parallel (transduce (splitterToMarker s) (liftSource source :: Source m d x)) (\source-> groupMarks source (maybe c2 (const c1))) -- | The 'having' combinator combines two pure splitters into a pure splitter. One splitter is used to chunk the input -- into contiguous portions. Its /false/ sink is routed directly to the /false/ sink of the combined splitter. The -- second splitter is instantiated and run on each portion of the input that goes to first splitter's /true/ sink. If -- the second splitter sends any output at all to its /true/ sink, the whole input portion is passed on to the /true/ -- sink of the combined splitter, otherwise it goes to its /false/ sink. having :: forall m x b1 b2. ParallelizableMonad m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b1 having parallel s1 s2 = isolateSplitter s where s source true false edge = liftM fst $ pipePS parallel (transduce (splitterToMarker s1) source) (flip groupMarks test) where test Nothing chunk = pour chunk false >> return [] test (Just mb) chunk = pipe (\sink1-> pipe (tee chunk sink1) getList) (\chunk-> splitToConsumers s2 chunk (liftM isJust . get) consumeAndSuppress (liftM isJust . get)) >>= \(((), prefix), (_, anyTrue, (), anyEdge))-> if anyTrue Prelude.|| anyEdge then maybe (return True) (put edge) mb >> putList prefix true >>= whenNull (pour chunk true >> return []) else putList prefix false >>= whenNull (pour chunk false >> return []) -- | The 'havingOnly' combinator is analogous to the 'having' combinator, but it succeeds and passes each chunk of the -- input to its /true/ sink only if the second splitter sends no part of it to its /false/ sink. havingOnly :: forall m x b1 b2. ParallelizableMonad m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b1 havingOnly parallel s1 s2 = isolateSplitter s where s source true false edge = liftM fst $ pipePS parallel (transduce (splitterToMarker s1) source) (flip groupMarks test) where test Nothing chunk = pour chunk false >> return [] test (Just mb) chunk = pipe (\sink1-> pipe (tee chunk sink1) getList) (\chunk-> splitToConsumers s2 chunk consumeAndSuppress (liftM isJust . get) consumeAndSuppress) >>= \(((), prefix), (_, (), anyFalse, ()))-> if anyFalse then putList prefix false >>= whenNull (pour chunk false >> return []) else maybe (return True) (put edge) mb >> putList prefix true >>= whenNull (pour chunk true >> return []) -- | The result of combinator 'first' behaves the same as the argument splitter up to and including the first portion of -- the input which goes into the argument's /true/ sink. All input following the first true portion goes into the -- /false/ sink. first :: forall m x b. Monad m => Splitter m x b -> Splitter m x b first splitter = isolateSplitter $ \ source true false edge -> liftM (\(x, y)-> y ++ x) $ pipe (transduce (splitterToMarker splitter) source) (\source-> let get1 (Left (x, False)) = pass false x get1 get1 (Left (x, True)) = pass true x get2 get1 (Right b) = put edge b >> get source >>= maybe (return []) get2 get2 b@Right{} = get3 b get2 (Left (x, True)) = pass true x get2 get2 (Left (x, False)) = pass false x get3 get3 (Left (x, _)) = pass false x get3 get3 (Right _) = get source >>= maybe (return []) get3 pass sink x next = put sink x >>= cond (get source >>= maybe (return []) next) (return [x]) in get source >>= maybe (return []) get1) -- | The result of combinator 'uptoFirst' takes all input up to and including the first portion of the input which goes -- into the argument's /true/ sink and feeds it to the result splitter's /true/ sink. All the rest of the input goes -- into the /false/ sink. The only difference between 'first' and 'uptoFirst' combinators is in where they direct the -- /false/ portion of the input preceding the first /true/ part. uptoFirst :: forall m x b. Monad m => Splitter m x b -> Splitter m x b uptoFirst splitter = isolateSplitter $ \ source true false edge -> liftM (\(x, y)-> y ++ x) $ pipe (transduce (splitterToMarker splitter) source) (\source-> let get1 q (Left (x, False)) = let q' = q |> x in get source >>= maybe (putQueue q' false) (get1 q') get1 q p@(Left (_, True)) = putQueue q true >>= whenNull (get2 p) get1 q (Right b) = putQueue q true >>= whenNull (put edge b >> get source >>= maybe (return []) get2) get2 b@Right{} = get3 b get2 (Left (x, True)) = pass true x get2 get2 (Left (x, False)) = pass false x get3 get3 (Left (x, _)) = pass false x get3 get3 (Right _) = get source >>= maybe (return []) get3 pass sink x next = put sink x >>= cond (get source >>= maybe (return []) next) (return [x]) in get source >>= maybe (return []) (get1 Seq.empty)) -- | The result of the combinator 'last' is a splitter which directs all input to its /false/ sink, up to the last -- portion of the input which goes to its argument's /true/ sink. That portion of the input is the only one that goes to -- the resulting component's /true/ sink. The splitter returned by the combinator 'last' has to buffer the previous two -- portions of its input, because it cannot know if a true portion of the input is the last one until it sees the end of -- the input or another portion succeeding the previous one. last :: forall m x b. Monad m => Splitter m x b -> Splitter m x b last splitter = isolateSplitter $ \ source true false edge -> liftM (\(x, y)-> y ++ x) $ pipe (transduce (splitterToMarker splitter) source) (\source-> let get1 (Left (x, False)) = put false x >>= cond (get source >>= maybe (return []) get1) (return [x]) get1 p@(Left (x, True)) = get2 Nothing Seq.empty p get1 (Right b) = pass (get2 (Just b) Seq.empty) get2 mb q (Left (x, True)) = let q' = q |> x in get source >>= maybe (flush mb q') (get2 mb q') get2 mb q p = get3 mb q Seq.empty p get3 mb qt qf (Left (x, False)) = let qf' = qf |> x in get source >>= maybe (flush mb qt >> putQueue qf' false) (get3 mb qt qf') get3 mb qt qf p = do rest1 <- putQueue qt false rest2 <- putQueue qf false if null rest1 Prelude.&& null rest2 then get1 p else return (rest1 ++ rest2) flush mb q = maybe (return True) (put edge) mb >> putQueue q true pass succeed = get source >>= maybe (return []) succeed in pass get1) -- | The result of the combinator 'lastAndAfter' is a splitter which directs all input to its /false/ sink, up to the -- last portion of the input which goes to its argument's /true/ sink. That portion and the remainder of the input is -- fed to the resulting component's /true/ sink. The difference between 'last' and 'lastAndAfter' combinators is where -- they feed the /false/ portion of the input, if any, remaining after the last /true/ part. lastAndAfter :: forall m x b. Monad m => Splitter m x b -> Splitter m x b lastAndAfter splitter = isolateSplitter $ \ source true false edge -> liftM (\(x, y)-> y ++ x) $ pipe (transduce (splitterToMarker splitter) source) (\source-> let get1 (Left (x, False)) = put false x >>= cond (pass get1) (return [x]) get1 p@(Left (x, True)) = get2 Nothing Seq.empty p get1 (Right b) = pass (get2 (Just b) Seq.empty) get2 mb q (Left (x, True)) = let q' = q |> x in get source >>= maybe (flush mb q') (get2 mb q') get2 mb q p = get3 mb q p get3 mb q (Left (x, False)) = let q' = q |> x in get source >>= maybe (flush mb q') (get3 mb q') get3 _ q p@(Left (x, True)) = putQueue q false >>= whenNull (get1 p) get3 _ q b'@Right{} = putQueue q false >>= whenNull (get1 b') flush mb q = maybe (return True) (put edge) mb >> putQueue q true pass succeed = get source >>= maybe (return []) succeed in pass get1) -- | The 'prefix' combinator feeds its /true/ sink only the prefix of the input that its argument feeds to its /true/ -- sink. All the rest of the input is dumped into the /false/ sink of the result. prefix :: forall m x b. Monad m => Splitter m x b -> Splitter m x b prefix splitter = isolateSplitter $ \ source true false edge -> liftM (\(x, y)-> y ++ x) $ pipe (transduce (splitterToMarker splitter) source) (\source-> let get0 p@Left{} = get1 p get0 (Right b) = put edge b >> get source >>= maybe (return []) get1 get1 (Left (x, False)) = pass false x get2 get1 (Left (x, True)) = pass true x get1 get1 (Right b) = get source >>= maybe (return []) get2 get2 (Left (x, _)) = pass false x get2 get2 Right{} = get source >>= maybe (return []) get2 pass sink x next = put sink x >>= cond (get source >>= maybe (return []) next) (return [x]) in get source >>= maybe (return []) get0) -- | The 'suffix' combinator feeds its /true/ sink only the suffix of the input that its argument feeds to its /true/ -- sink. All the rest of the input is dumped into the /false/ sink of the result. suffix :: forall m x b. Monad m => Splitter m x b -> Splitter m x b suffix splitter = isolateSplitter $ \ source true false edge -> liftM (\(x, y)-> y ++ x) $ pipe (transduce (splitterToMarker splitter) source) (\source-> let get1 (Left (x, False)) = put false x >>= cond (p get1) (return [x]) get1 (Left (x, True)) = get2 Nothing (Seq.singleton x) get1 (Right b) = get2 (Just b) Seq.empty get2 mb q = get source >>= maybe (maybe (return True) (put edge) mb >> putQueue q true) (get3 mb q) get3 mb q (Left (x, True)) = get2 mb (q |> x) get3 mb q p@(Left (x, False)) = putQueue q false >>= \rest-> if null rest then get1 p else return (rest ++ [x]) get3 mb q (Right b) = putQueue q false >>= whenNull (get2 (Just b) Seq.empty) p succeed = get source >>= maybe (return []) succeed in p get1) -- | The 'even' combinator takes every input section that its argument /splitter/ deems /true/, and feeds even ones into -- its /true/ sink. The odd sections and parts of input that are /false/ according to its argument splitter are fed to -- 'even' splitter's /false/ sink. even :: forall m x b. Monad m => Splitter m x b -> Splitter m x b even splitter = isolateSplitter $ \ source true false edge -> liftM (\(x, y)-> y ++ x) $ pipe (transduce (splitterToMarker splitter) source) (\source-> let get1 (Left (x, False)) = put false x >>= cond (next get1) (return [x]) get1 p@(Left (x, True)) = get2 p get1 (Right b) = next get2 get2 (Left (x, True)) = put false x >>= cond (next get2) (return [x]) get2 p@(Left (x, False)) = get3 p get2 (Right b) = put edge b >> next get4 get3 (Left (x, False)) = put false x >>= cond (next get3) (return [x]) get3 p@(Left (x, True)) = get4 p get3 (Right b) = put edge b >> next get4 get4 (Left (x, True)) = put true x >>= cond (next get4) (return [x]) get4 p@(Left (x, False)) = get1 p get4 (Right b) = next get2 next g = get source >>= maybe (return []) g in next get1) -- | Splitter 'startOf' issues an empty /true/ section at the beginning of every section considered /true/ by its -- argument splitter, otherwise the entire input goes into its /false/ sink. startOf :: forall m x b. Monad m => Splitter m x b -> Splitter m x (Maybe b) startOf splitter = isolateSplitter $ \ source true false edge -> liftM (\(x, y)-> y ++ x) $ pipe (transduce (splitterToMarker splitter) source) (\source-> let get1 (Left (x, False)) = put false x >>= cond (next get1) (return [x]) get1 p@(Left (x, True)) = put edge Nothing >> get2 p get1 (Right b) = put edge (Just b) >> next get2 get2 (Left (x, True)) = put false x >>= cond (next get2) (return [x]) get2 p = get1 p next g = get source >>= maybe (return []) g in next get1) -- | Splitter 'endOf' issues an empty /true/ section at the end of every section considered /true/ by its argument -- splitter, otherwise the entire input goes into its /false/ sink. endOf :: forall m x b. Monad m => Splitter m x b -> Splitter m x (Maybe b) endOf splitter = isolateSplitter $ \ source true false edge -> liftM (\(x, y)-> y ++ x) $ pipe (transduce (splitterToMarker splitter) source) (\source-> let get1 (Left (x, False)) = put false x >>= cond (next get1) (return [x]) get1 p@(Left (x, True)) = get2 Nothing p get1 (Right b) = next (get2 $ Just b) get2 mb (Left (x, True)) = put false x >>= cond (next $ get2 mb) (return [x]) get2 mb p@(Left (x, False)) = put edge mb >> get1 p get2 mb (Right b) = put edge mb >> next (get2 $ Just b) next g = get source >>= maybe (return []) g in next get1) -- | Combinator 'followedBy' treats its argument 'Splitter's as patterns components and returns a 'Splitter' that -- matches their concatenation. A section of input is considered /true/ by the result iff its prefix is considered -- /true/ by argument /s1/ and the rest of the section is considered /true/ by /s2/. The splitter /s2/ is started anew -- after every section split to /true/ sink by /s1/. followedBy :: forall m x b1 b2. ParallelizableMonad m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2) followedBy parallel s1 s2 = isolateSplitter $ \ source true false edge -> liftM (\(x, y)-> y ++ x) $ pipePS parallel (transduce (splitterToMarker s1) source) (\source-> let get0 q = case Seq.viewl q of Seq.EmptyL -> get source >>= maybe (return []) get1 (Left (x, False)) :< rest -> put false x >>= cond (get0 rest) (return $ concatMap (either ((:[]) . fst) (const [])) $ Foldable.toList $ Seq.viewl q) (Left (x, True)) :< rest -> get2 Nothing Seq.empty q (Right b) :< rest -> get2 (Just b) Seq.empty rest get1 (Left (x, False)) = put false x >>= cond (get source >>= maybe (return []) get1) (return [x]) get1 p@(Left (x, True)) = get2 Nothing Seq.empty (Seq.singleton p) get1 (Right b) = get2 (Just b) Seq.empty Seq.empty get2 mb q q' = case Seq.viewl q' of Seq.EmptyL -> get source >>= maybe (testEnd mb q) (get2 mb q . Seq.singleton) (Left (x, True)) :< rest -> get2 mb (q |> x) rest (Left (x, False)) :< rest -> get3 mb q q' Right{} :< rest -> get3 mb q q' get3 mb q q' = do ((q1, q2), n) <- pipe (get7 Seq.empty q') (test mb q) case n of Nothing -> putQueue q false >>= whenNull (get0 (q1 >< q2)) Just 0 -> get0 (q1 >< q2) Just n -> get8 (Just mb) n (q1 >< q2) get7 q1 q2 sink = canPut sink >>= cond (case Seq.viewl q2 of Seq.EmptyL -> get source >>= maybe (return (q1, q2)) (\p-> either (put sink . fst) (const $ return True) p >> get7 (q1 |> p) q2 sink) p :< rest -> either (put sink . fst) (const $ return True) p >> get7 (q1 |> p) rest sink) (return (q1, q2)) testEnd mb q = do ((), n) <- pipe (const $ return ()) (test mb q) case n of Nothing -> putQueue q false _ -> return [] test mb q source = liftM snd $ pipe (transduce (splitterToMarker s2) source) (\source-> let get4 (Left (_, False)) = return Nothing get4 p@(Left (_, True)) = putQueue q true >> get5 0 p get4 p@(Right b) = maybe (return True) (\b1-> put edge (b1, b)) mb >> putQueue q true >> get6 0 get5 n (Left (x, True)) = put true x >> get6 (succ n) get5 n _ = return (Just n) get6 n = get source >>= maybe (return $ Just n) (get5 n) in get source >>= maybe (return Nothing) get4) get8 Nothing 0 q = get0 q get8 (Just mb) 0 q = get2 mb Seq.empty q get8 mmb n q = case Seq.viewl q of Left (x, False) :< rest -> get8 Nothing (pred n) rest Left (x, True) :< rest -> get8 (maybe (Just Nothing) Just mmb) (pred n) rest Right b :< rest -> get8 (Just (Just b)) n rest in get0 Seq.empty) -- | Combinator '...' tracks the running balance of difference between the number of preceding starts of sections -- considered /true/ according to its first argument and the ones according to its second argument. The combinator -- passes to /true/ all input values for which the difference balance is positive. This combinator is typically used -- with 'startOf' and 'endOf' in order to count entire input sections and ignore their lengths. between :: forall m x b1 b2. ParallelizableMonad m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b1 between parallel s1 s2 = isolateSplitter $ \ source true false edge -> liftM (\(x, y)-> y ++ x) $ pipePS parallel (transduce (splittersToPairMarker parallel s1 s2) source) (\source-> let next n = get source >>= maybe (return []) (state n) pass n x = (if n > 0 then put true x else put false x) >>= cond (next n) (return [x]) pass' n x = (if n >= 0 then put true x else put false x) >>= cond (next n) (return [x]) state n (Left (x, True, False)) = pass (succ n) x state n (Left (x, False, True)) = pass' (pred n) x state n (Left (x, True, True)) = pass' n x state n (Left (x, False, False)) = pass n x state 0 (Right (Left b)) = put edge b >> next 1 state n (Right (Left _)) = next (succ n) state n (Right (Right _)) = next (pred n) in next 0) -- Helper functions splitterToMarker :: forall m x b. Monad m => Splitter m x b -> Transducer m x (Either (x, Bool) b) splitterToMarker s = isolateTransducer $ \source sink-> let mark f source = canPut sink >>= cond (get source >>= maybe (return []) (\x-> put sink (f x) >>= cond (mark f source) (return [x]))) (return []) in liftM (\(x, y, z, _)-> z ++ y ++ x) $ splitToConsumers s source (mark (\x-> Left (x, True))) (mark (\x-> Left (x, False))) (mark Right) splittersToPairMarker :: forall m x b1 b2. (ParallelizableMonad m) => Bool -> Splitter m x b1 -> Splitter m x b2 -> Transducer m x (Either (x, Bool, Bool) (Either b1 b2)) splittersToPairMarker parallel s1 s2 = let t source sink = liftM (\(((_, _), (x, _, _, _)), _)-> x) $ pipe (\sync-> pipePS parallel (\sink1-> pipe (tee source sink1) (\source2-> splitToConsumers s2 source2 (flip (pourMap (\x-> Left ((x, True), False))) sync) (flip (pourMap (\x-> Left ((x, False), False))) sync) (flip (pourMap (Right . Right)) sync))) (\source1-> splitToConsumers s1 source1 (flip (pourMap (\x-> Left ((x, True), True))) sync) (flip (pourMap (\x-> Left ((x, False), True))) sync) (flip (pourMap (Right. Left)) sync))) (synchronizeMarks Nothing sink) -- synchronizeMarks :: Maybe (Seq (Either (x, Bool) (Either b1 b2)), Bool) -- -> Sink m c (Either (x, Bool, Bool) (Either b1 b2)) -- -> Source m c (Either ((x, Bool), Bool) (Either b1 b2)) -- -> Coroutine c m [x] synchronizeMarks state sink source = get source >>= maybe (assert (isNothing state) (return [])) (handleMark state sink source) -- handleMark :: Maybe (Seq (Either (x, Bool) (Either b1 b2)), Bool) -- -> Sink m c (Either (x, Bool, Bool) (Either b1 b2)) -- -> Source m c (Either ((x, Bool), Bool) (Either b1 b2)) -- -> Either ((x, Bool), Bool) (Either b1 b2) -> Coroutine c m [x] handleMark Nothing sink source (Right b) = put sink (Right b) >> synchronizeMarks Nothing sink source handleMark Nothing sink source (Left (p, first)) = synchronizeMarks (Just (Seq.singleton (Left p), first)) sink source handleMark state@(Just (q, first)) sink source (Left (p, first')) | first == first' = synchronizeMarks (Just (q |> Left p, first)) sink source handleMark state@(Just (q, True)) sink source (Right b@Left{}) = synchronizeMarks (Just (q |> Right b, True)) sink source handleMark state@(Just (q, False)) sink source (Right b@Right{}) = synchronizeMarks (Just (q |> Right b, False)) sink source handleMark state sink source (Right b) = put sink (Right b) >> synchronizeMarks state sink source handleMark state@(Just (q, pos')) sink source mark@(Left ((x, t), pos)) = case Seq.viewl q of Seq.EmptyL -> synchronizeMarks (Just (Seq.singleton (Left (x, t)), pos)) sink source Right b :< rest -> put sink (Right b) >>= cond (handleMark (if Seq.null rest then Nothing else Just (rest, pos')) sink source mark) (returnQueuedList q) Left (y, t') :< rest -> put sink (Left $ if pos then (y, t, t') else (y, t', t)) >>= cond (synchronizeMarks (if Seq.null rest then Nothing else Just (rest, pos')) sink source) (returnQueuedList q) returnQueuedList q = return $ concatMap (either ((:[]) . fst) (const [])) $ Foldable.toList $ Seq.viewl q in isolateTransducer t zipSplittersWith :: forall m x b1 b2 b. ParallelizableMonad m => (Bool -> Bool -> Bool) -> (forall a1 a2 d. (AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 (Either b1 b2) -> Sink m a2 b -> Coroutine d m ()) -> Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b zipSplittersWith f boundaries parallel s1 s2 = isolateSplitter $ \ source true false edge -> liftM (\((x, y), _)-> y ++ x) $ pipe (\edge-> pipePS parallel (transduce (splittersToPairMarker parallel s1 s2) source) (\source-> let split = get source >>= maybe (return []) (either test (\b-> put edge b >> split)) test (x, t1, t2) = (if f t1 t2 then put true x else put false x) >>= cond split (return [x]) in split)) (flip boundaries edge) -- | Runs the second argument on every contiguous region of input source (typically produced by 'splitterToMarker') -- whose all values either match @Left (_, True)@ or @Left (_, False)@. groupMarks :: (Monad m, AncestorFunctor a d, AncestorFunctor a (SinkFunctor d x)) => Source m a (Either (x, Bool) b) -> (Maybe (Maybe b) -> Source m (SourceFunctor d x) x -> Coroutine (SourceFunctor d x) m r) -> Coroutine d m () groupMarks source getConsumer = start where start = getSuccess source (either startContent startRegion) startContent (x, False) = pipe (\sink-> pass False sink x) (getConsumer Nothing) >>= maybe (return ()) (either startContent startRegion) . fst startContent (x, True) = pipe (\sink-> pass True sink x) (getConsumer $ Just Nothing) >>= maybe (return ()) (either startContent startRegion) . fst startRegion b = pipe (next True) (getConsumer (Just $ Just b)) >>= maybe (return ()) (either startContent startRegion) . fst pass t sink x = put sink x >> next t sink next t sink = get source >>= maybe (return Nothing) (continue t sink) continue t sink (Left (x, t')) | t == t' = pass t sink x continue t sink p = return (Just p) -- | 'suppressProducer' runs the /producer/ argument with a new sink, suppressing everything 'put' in the sink. suppressProducer :: forall m a x r. (Functor a, Monad m) => (Sink m (SinkFunctor a x) x -> Coroutine (SinkFunctor a x) m r) -> Coroutine a m r suppressProducer producer = liftM fst $ pipe producer consumeAndSuppress