{- Copyright 2008-2010 Mario Blazevic This file is part of the Streaming Component Combinators (SCC) project. The SCC project is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. SCC is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with SCC. If not, see . -} {-# LANGUAGE ScopedTypeVariables, RankNTypes, KindSignatures, EmptyDataDecls, MultiParamTypeClasses, FlexibleContexts, FlexibleInstances, FunctionalDependencies, TypeFamilies #-} {-# OPTIONS_HADDOCK hide #-} -- | The "Combinators" module defines combinators applicable to values of the 'Transducer' and 'Splitter' types defined -- in the "Control.Concurrent.SCC.Types" module. module Control.Concurrent.SCC.Combinators ( -- * Consumer, producer, and transducer combinators consumeBy, prepend, append, substitute, PipeableComponentPair (compose), JoinableComponentPair (join, sequence), -- * Splitter combinators sNot, -- ** Pseudo-logic flow combinators -- | Combinators 'sAnd' and 'sOr' are only /pseudo/-logic. While the laws of double negation and De Morgan's laws -- hold, 'sAnd' and 'sOr' are in general not commutative, associative, nor idempotent. In the special case when all -- argument splitters are stateless, such as those produced by 'Control.Concurrent.SCC.Types.statelessSplitter', -- these combinators do satisfy all laws of Boolean algebra. sAnd, sOr, -- ** Zipping logic combinators -- | The 'pAnd' and 'pOr' combinators run the argument splitters in parallel and combine their logical outputs using -- the corresponding logical operation on each output pair, in a manner similar to 'Data.List.zipWith'. They fully -- satisfy the laws of Boolean algebra. pAnd, pOr, -- * Flow-control combinators -- | The following combinators resemble the common flow-control programming language constructs. Combinators -- 'wherever', 'unless', and 'select' are just the special cases of the combinator 'ifs'. -- -- * /transducer/ ``wherever`` /splitter/ = 'ifs' /splitter/ /transducer/ 'Control.Category.id' -- -- * /transducer/ ``unless`` /splitter/ = 'ifs' /splitter/ 'Control.Category.id' /transducer/ -- -- * 'select' /splitter/ = 'ifs' /splitter/ 'Control.Category.id' -- 'Control.Concurrent.SCC.Primitives.suppress' -- ifs, wherever, unless, select, -- ** Recursive while, nestedIn, -- * Section-based combinators -- | All combinators in this section use their 'Control.Concurrent.SCC.Splitter' argument to determine the structure -- of the input. Every contiguous portion of the input that gets passed to one or the other sink of the splitter is -- treated as one section in the logical structure of the input stream. What is done with the section depends on the -- combinator, but the sections, and therefore the logical structure of the input stream, are determined by the -- argument splitter alone. foreach, having, havingOnly, followedBy, even, -- ** first and its variants first, uptoFirst, prefix, -- ** last and its variants last, lastAndAfter, suffix, -- ** positional splitters startOf, endOf, between, -- * Parser support splitterToMarker, parseRegions, parseNestedRegions, parseEachNestedRegion, -- * Helper functions groupMarks, findsTrueIn, findsFalseIn, teeConsumers ) where import Prelude hiding (even, last, sequence, head) import Control.Monad (liftM, when) import Control.Monad.Trans.Class (lift) import Data.Maybe (isJust, mapMaybe) import qualified Data.Foldable as Foldable import qualified Data.Sequence as Seq import Data.Sequence (Seq, (|>), (><), ViewL (EmptyL, (:<))) import Control.Monad.Coroutine import Control.Concurrent.SCC.Streams import Control.Concurrent.SCC.Types import Control.Concurrent.SCC.Coercions -- | Converts a 'Consumer' into a 'Transducer' with no output. consumeBy :: forall m x y r. (Monad m) => Consumer m x r -> Transducer m x y consumeBy c = Transducer $ \ source _sink -> consume c source >> return () -- | Class 'PipeableComponentPair' applies to any two components that can be combined into a third component with the -- following properties: -- -- * The input of the result, if any, becomes the input of the first component. -- -- * The output produced by the first child component is consumed by the second child component. -- -- * The result output, if any, is the output of the second component. class PipeableComponentPair (m :: * -> *) w c1 c2 c3 | c1 c2 -> c3, c1 c3 -> c2, c2 c3 -> c2, c1 -> m w, c2 -> m w, c3 -> m where compose :: PairBinder m -> c1 -> c2 -> c3 instance forall m x. Monad m => PipeableComponentPair m x (Producer m x ()) (Consumer m x ()) (Performer m ()) where compose binder p c = let performPipe :: Coroutine Naught m ((), ()) performPipe = pipeG binder (produce p) (consume c) in Performer (runCoroutine performPipe >> return ()) instance Monad m => PipeableComponentPair m y (Transducer m x y) (Consumer m y r) (Consumer m x r) where compose binder t c = isolateConsumer $ \source-> liftM snd $ pipeG binder (transduce t source) (consume c) instance Monad m => PipeableComponentPair m x (Producer m x r) (Transducer m x y) (Producer m y r) where compose binder p t = isolateProducer $ \sink-> liftM fst $ pipeG binder (produce p) (\source-> transduce t source sink) instance Monad m => PipeableComponentPair m y (Transducer m x y) (Transducer m y z) (Transducer m x z) where compose binder t1 t2 = isolateTransducer $ \source sink-> pipeG binder (transduce t1 source) (\source'-> transduce t2 source' sink) >> return () class CompatibleSignature c cons (m :: * -> *) input output | c -> cons m class AnyListOrUnit c instance AnyListOrUnit [x] instance AnyListOrUnit () instance (AnyListOrUnit x, AnyListOrUnit y) => CompatibleSignature (Performer m r) (PerformerType r) m x y instance AnyListOrUnit y => CompatibleSignature (Consumer m x r) (ConsumerType r) m [x] y instance AnyListOrUnit y => CompatibleSignature (Producer m x r) (ProducerType r) m y [x] instance CompatibleSignature (Transducer m x y) TransducerType m [x] [y] data PerformerType r data ConsumerType r data ProducerType r data TransducerType -- | Class 'JoinableComponentPair' applies to any two components that can be combined into a third component with the -- following properties: -- -- * if both argument components consume input, the input of the combined component gets distributed to both -- components in parallel, and -- -- * if both argument components produce output, the output of the combined component is a concatenation of the -- complete output from the first component followed by the complete output of the second component. class (Monad m, CompatibleSignature c1 t1 m x y, CompatibleSignature c2 t2 m x y, CompatibleSignature c3 t3 m x y) => JoinableComponentPair t1 t2 t3 m x y c1 c2 c3 | c1 c2 -> c3, c1 -> t1 m, c2 -> t2 m, c3 -> t3 m x y, t1 m x y -> c1, t2 m x y -> c2, t3 m x y -> c3 where -- | The 'join' combinator may apply the components in any order. join :: PairBinder m -> c1 -> c2 -> c3 join = const sequence -- | The 'sequence' combinator makes sure its first argument has completed before using the second one. sequence :: c1 -> c2 -> c3 instance forall m x r1 r2. Monad m => JoinableComponentPair (ProducerType r1) (ProducerType r2) (ProducerType r2) m () [x] (Producer m x r1) (Producer m x r2) (Producer m x r2) where sequence p1 p2 = Producer $ \sink-> produce p1 sink >> produce p2 sink instance forall m x. Monad m => JoinableComponentPair (ConsumerType ()) (ConsumerType ()) (ConsumerType ()) m [x] () (Consumer m x ()) (Consumer m x ()) (Consumer m x ()) where join binder c1 c2 = Consumer (liftM (const ()) . teeConsumers binder (consume c1) (consume c2)) sequence c1 c2 = Consumer $ \source-> teeConsumers sequentialBinder (consume c1) getList source >>= \((), list)-> pipe (putList list) (consume c2) >> return () instance forall m x y. Monad m => JoinableComponentPair TransducerType TransducerType TransducerType m [x] [y] (Transducer m x y) (Transducer m x y) (Transducer m x y) where join binder t1 t2 = isolateTransducer $ \source sink-> pipe (\buffer-> teeConsumers binder (\source'-> transduce t1 source' sink) (\source'-> transduce t2 source' buffer) source) getList >>= \(_, list)-> putList list sink >> return () sequence t1 t2 = isolateTransducer $ \source sink-> teeConsumers sequentialBinder (flip (transduce t1) sink) getList source >>= \(_, list)-> pipe (putList list) (\source'-> transduce t2 source' sink) >> return () instance forall m r1 r2. Monad m => JoinableComponentPair (PerformerType r1) (PerformerType r2) (PerformerType r2) m () () (Performer m r1) (Performer m r2) (Performer m r2) where join binder p1 p2 = Performer $ binder (const return) (perform p1) (perform p2) sequence p1 p2 = Performer $ perform p1 >> perform p2 instance forall m x r1 r2. Monad m => JoinableComponentPair (PerformerType r1) (ProducerType r2) (ProducerType r2) m () [x] (Performer m r1) (Producer m x r2) (Producer m x r2) where join binder pe pr = Producer $ \sink-> liftBinder binder (const return) (lift (perform pe)) (produce pr sink) sequence pe pr = Producer $ \sink-> lift (perform pe) >> produce pr sink instance forall m x r1 r2. Monad m => JoinableComponentPair (ProducerType r1) (PerformerType r2) (ProducerType r2) m () [x] (Producer m x r1) (Performer m r2) (Producer m x r2) where join binder pr pe = Producer $ \sink-> liftBinder binder (const return) (produce pr sink) (lift (perform pe)) sequence pr pe = Producer $ \sink-> produce pr sink >> lift (perform pe) instance forall m x r1 r2. Monad m => JoinableComponentPair (PerformerType r1) (ConsumerType r2) (ConsumerType r2) m [x] () (Performer m r1) (Consumer m x r2) (Consumer m x r2) where join binder p c = Consumer $ \source-> liftBinder binder (const return) (lift (perform p)) (consume c source) sequence p c = Consumer $ \source-> lift (perform p) >> consume c source instance forall m x r1 r2. Monad m => JoinableComponentPair (ConsumerType r1) (PerformerType r2) (ConsumerType r2) m [x] () (Consumer m x r1) (Performer m r2) (Consumer m x r2) where join binder c p = Consumer $ \source-> liftBinder binder (const return) (consume c source) (lift (perform p)) sequence c p = Consumer $ \source-> consume c source >> lift (perform p) instance forall m x y r. Monad m => JoinableComponentPair (PerformerType r) TransducerType TransducerType m [x] [y] (Performer m r) (Transducer m x y) (Transducer m x y) where join binder p t = Transducer $ \ source sink -> liftBinder binder (const return) (lift (perform p)) (transduce t source sink) sequence p t = Transducer $ \ source sink -> lift (perform p) >> transduce t source sink instance forall m x y r. Monad m => JoinableComponentPair TransducerType (PerformerType r) TransducerType m [x] [y] (Transducer m x y) (Performer m r) (Transducer m x y) where join binder t p = Transducer $ \ source sink -> liftBinder binder (const . return) (transduce t source sink) (lift (perform p)) sequence t p = Transducer $ \ source sink -> do result <- transduce t source sink _ <- lift (perform p) return result instance forall m x y. Monad m => JoinableComponentPair (ProducerType ()) TransducerType TransducerType m [x] [y] (Producer m y ()) (Transducer m x y) (Transducer m x y) where join binder p t = isolateTransducer $ \source sink-> pipe (\buffer-> liftBinder binder (const return) (produce p sink) (transduce t source buffer)) getList >>= \(_, out)-> putList out sink >> return () sequence p t = Transducer $ \ source sink -> produce p sink >> transduce t source sink instance forall m x y. Monad m => JoinableComponentPair TransducerType (ProducerType ()) TransducerType m [x] [y] (Transducer m x y) (Producer m y ()) (Transducer m x y) where join binder t p = isolateTransducer $ \source sink-> pipe (\buffer-> liftBinder binder (const . return) (transduce t source sink) (produce p buffer)) getList >>= \(_, out)-> putList out sink >> return () sequence t p = Transducer $ \ source sink -> do result <- transduce t source sink produce p sink return result instance forall m x y. Monad m => JoinableComponentPair (ConsumerType ()) TransducerType TransducerType m [x] [y] (Consumer m x ()) (Transducer m x y) (Transducer m x y) where join binder c t = isolateTransducer $ \source sink-> teeConsumers binder (consume c) (\source'-> transduce t source' sink) source >> return () sequence c t = isolateTransducer $ \source sink-> teeConsumers sequentialBinder (consume c) getList source >>= \(_, list)-> pipe (putList list) (\source'-> transduce t source' sink) >> return () instance forall m x y. Monad m => JoinableComponentPair TransducerType (ConsumerType ()) TransducerType m [x] [y] (Transducer m x y) (Consumer m x ()) (Transducer m x y) where join binder t c = join binder c t sequence t c = isolateTransducer $ \source sink-> teeConsumers sequentialBinder (\source'-> transduce t source' sink) getList source >>= \(_, list)-> pipe (putList list) (consume c) >> return () instance forall m x y. Monad m => JoinableComponentPair (ProducerType ()) (ConsumerType ()) TransducerType m [x] [y] (Producer m y ()) (Consumer m x ()) (Transducer m x y) where join binder p c = Transducer $ \ source sink -> liftBinder binder (\ _ _ -> return ()) (produce p sink) (consume c source) sequence p c = Transducer $ \ source sink -> produce p sink >> consume c source instance forall m x y. Monad m => JoinableComponentPair (ConsumerType ()) (ProducerType ()) TransducerType m [x] [y] (Consumer m x ()) (Producer m y ()) (Transducer m x y) where join binder c p = join binder p c sequence c p = Transducer $ \ source sink -> consume c source >> produce p sink -- | Combinator 'prepend' converts the given producer to a 'Control.Concurrent.SCC.Types.Transducer' that passes all its -- input through unmodified, except for prepending the output of the argument producer to it. The following law holds: @ -- 'prepend' /prefix/ = 'join' ('substitute' /prefix/) 'Control.Category.id' @ prepend :: forall m x r. Monad m => Producer m x r -> Transducer m x x prepend prefixProducer = Transducer $ \ source sink -> produce prefixProducer sink >> pour source sink -- | Combinator 'append' converts the given producer to a 'Control.Concurrent.SCC.Types.Transducer' that passes all its -- input through unmodified, finally appending the output of the argument producer to it. The following law holds: @ -- 'append' /suffix/ = 'join' 'Control.Category.id' ('substitute' /suffix/) @ append :: forall m x r. Monad m => Producer m x r -> Transducer m x x append suffixProducer = Transducer $ \ source sink -> pour source sink >> produce suffixProducer sink >> return () -- | The 'substitute' combinator converts its argument producer to a 'Control.Concurrent.SCC.Types.Transducer' that -- produces the same output, while consuming its entire input and ignoring it. substitute :: forall m x y r. Monad m => Producer m y r -> Transducer m x y substitute feed = Transducer $ \ source sink -> mapMStream_ (const $ return ()) source >> produce feed sink >> return () -- | The 'sNot' (streaming not) combinator simply reverses the outputs of the argument splitter. In other words, data -- that the argument splitter sends to its /true/ sink goes to the /false/ sink of the result, and vice versa. sNot :: forall m x b. Monad m => Splitter m x b -> Splitter m x b sNot splitter = isolateSplitter s where s :: forall d. Functor d => Source m d x -> Sink m d x -> Sink m d x -> Sink m d b -> Coroutine d m () s source true false _edge = split splitter source false true (nullSink :: Sink m d b) -- | The 'sAnd' combinator sends the /true/ sink output of its left operand to the input of its right operand for -- further splitting. Both operands' /false/ sinks are connected to the /false/ sink of the combined splitter, but any -- input value to reach the /true/ sink of the combined component data must be deemed true by both splitters. sAnd :: forall m x b1 b2. Monad m => PairBinder m -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2) sAnd binder s1 s2 = isolateSplitter $ \ source true false edge -> liftM (fst . fst) $ pipe (\edges-> pipeG binder (\true'-> split s1 source true' false (mapSink Left edges)) (\source'-> split s2 source' true false (mapSink Right edges))) (flip intersectRegions edge) intersectRegions :: forall m a1 a2 d b1 b2. Monad m => OpenTransducer m a1 a2 d (Either b1 b2) (b1, b2) () intersectRegions source sink = next Nothing Nothing where next lastLeft lastRight = getWith (either (flip pair lastRight . Just) (pair lastLeft . Just)) source pair (Just x) (Just y) = put sink (x, y) >> next Nothing Nothing pair l r = next l r -- | A 'sOr' combinator's input value can reach its /false/ sink only by going through both argument splitters' /false/ -- sinks. sOr :: forall m x b1 b2. Monad m => PairBinder m -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (Either b1 b2) sOr binder s1 s2 = isolateSplitter $ \ source true false edge -> liftM fst $ pipeG binder (\false'-> split s1 source true false' (mapSink Left edge)) (\source'-> split s2 source' true false (mapSink Right edge)) -- | Combinator 'pAnd' is a pairwise logical conjunction of two splitters run in parallel on the same input. pAnd :: forall m x b1 b2. Monad m => PairBinder m -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2) pAnd binder s1 s2 = isolateSplitter $ \ source true false edge -> pipeG binder (transduce (splittersToPairMarker binder s1 s2) source) (\source'-> let next l r = getWith (test l r) source' test l r (Left (x, t1, t2)) = (if t1 && t2 then put true x else put false x) >> next (if t1 then l else Nothing) (if t2 then r else Nothing) test _ Nothing (Right (Left l)) = next (Just l) Nothing test _ (Just r) (Right (Left l)) = put edge (l, r) >> next (Just l) (Just r) test Nothing _ (Right (Right r)) = next Nothing (Just r) test (Just l) _ (Right (Right r)) = put edge (l, r) >> next (Just l) (Just r) in next Nothing Nothing) >> return () -- | Combinator 'pOr' is a pairwise logical disjunction of two splitters run in parallel on the same input. pOr :: forall m x b1 b2. Monad m => PairBinder m -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (Either b1 b2) pOr = zipSplittersWith (||) pour ifs :: forall c m x b. (Monad m, Branching c m x ()) => PairBinder m -> Splitter m x b -> c -> c -> c ifs binder s c1 c2 = combineBranches if' binder c1 c2 where if' :: forall d. PairBinder m -> (forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x ()) -> (forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x ()) -> forall a. OpenConsumer m a d x () if' binder' c1' c2' source = splitInputToConsumers binder' s source c1' c2' wherever :: forall m x b. Monad m => PairBinder m -> Transducer m x x -> Splitter m x b -> Transducer m x x wherever binder t s = isolateTransducer wherever' where wherever' :: forall d. Functor d => Source m d x -> Sink m d x -> Coroutine d m () wherever' source sink = pipeG binder (\true-> split s source true sink (nullSink :: Sink m d b)) (flip (transduce t) sink) >> return () unless :: forall m x b. Monad m => PairBinder m -> Transducer m x x -> Splitter m x b -> Transducer m x x unless binder t s = wherever binder t (sNot s) select :: forall m x b. Monad m => Splitter m x b -> Transducer m x x select s = isolateTransducer t where t :: forall d. Functor d => Source m d x -> Sink m d x -> Coroutine d m () t source sink = split s source sink (nullSink :: Sink m d x) (nullSink :: Sink m d b) -- | Converts a splitter into a parser. parseRegions :: forall m x b. Monad m => Splitter m x b -> Parser m x b parseRegions s = isolateTransducer $ \source sink-> pipe (transduce (splitterToMarker s) source) (\source'-> concatMapAccumStream wrap Nothing source' sink >>= maybe (return ()) (put sink . flush)) >> return () where wrap Nothing (Left (x, _)) = (Nothing, [Content x]) wrap (Just p) (Left (x, False)) = (Nothing, [flush p, Content x]) wrap (Just (b, t)) (Left (x, True)) = (Just (b, True), if t then [Content x] else [Markup (Start b), Content x]) wrap (Just p) (Right b') = (Just (b', False), [flush p]) wrap Nothing (Right b) = (Just (b, False), []) flush (b, t) = Markup $ (if t then End else Point) b -- | Converts a boundary-marking splitter into a parser. parseNestedRegions :: forall m x b. Monad m => Splitter m x (Boundary b) -> Parser m x b parseNestedRegions s = isolateTransducer $ \source sink-> split s source (mapSink Content sink) (mapSink Content sink) (mapSink Markup sink) -- | Converts a boundary-marking splitter into a parser. parseEachNestedRegion :: forall m x y b. Monad m => PairBinder m -> Splitter m x (Boundary b) -> Transducer m x y -> Transducer m x (Markup b y) parseEachNestedRegion binder s t = isolateTransducer $ \source sink-> let transformContent contentSource = transduce t contentSource (mapSink Content sink) in pipeG binder (transduce (splitterToMarker s) source) (flip groupMarks (maybe transformContent (\mark group-> maybe (return ()) (put sink . Markup) mark >> transformContent group))) >> return () -- | The recursive combinator 'while' feeds the true sink of the argument splitter back to itself, modified by the -- argument transducer. Data fed to the splitter's false sink is passed on unmodified. while :: forall m x b. Monad m => PairBinder m -> Transducer m x x -> Splitter m x b -> Transducer m x x -> Transducer m x x while binder t s whileRest = isolateTransducer while' where while' :: forall d. Functor d => Source m d x -> Sink m d x -> Coroutine d m () while' source sink = pipeG binder (\true'-> split s source true' sink (nullSink :: Sink m d b)) (\source'-> peek source' >>= maybe (return ()) (\_-> transduce (compose binder t whileRest) source' sink)) >> return () -- | The recursive combinator 'nestedIn' combines two splitters into a mutually recursive loop acting as a single -- splitter. The true sink of one of the argument splitters and false sink of the other become the true and false sinks -- of the loop. The other two sinks are bound to the other splitter's source. The use of 'nestedIn' makes sense only on -- hierarchically structured streams. If we gave it some input containing a flat sequence of values, and assuming both -- component splitters are deterministic and stateless, an input value would either not loop at all or it would loop -- forever. nestedIn :: forall m x b. Monad m => PairBinder m -> Splitter m x b -> Splitter m x b -> Splitter m x b -> Splitter m x b nestedIn binder s1 s2 nestedRest = isolateSplitter $ \ source true false edge -> liftM fst $ pipeG binder (\false'-> split s1 source true false' edge) (\source'-> pipe (\true'-> splitInput s2 source' true' false) (\source''-> peek source'' >>= maybe (return ()) (\_-> split nestedRest source'' true false edge))) -- | The 'foreach' combinator is similar to the combinator 'ifs' in that it combines a splitter and two transducers into -- another transducer. However, in this case the transducers are re-instantiated for each consecutive portion of the -- input as the splitter chunks it up. Each contiguous portion of the input that the splitter sends to one of its two -- sinks gets transducered through the appropriate argument transducer as that transducer's whole input. As soon as the -- contiguous portion is finished, the transducer gets terminated. foreach :: forall m x b c. (Monad m, Branching c m x ()) => PairBinder m -> Splitter m x b -> c -> c -> c foreach binder s c1 c2 = combineBranches foreach' binder c1 c2 where foreach' :: forall d. PairBinder m -> (forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x ()) -> (forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x ()) -> forall a. OpenConsumer m a d x () foreach' binder' c1' c2' source = liftM fst $ pipeG binder' (transduce (splitterToMarker s) (liftSource source :: Source m d x)) (\source'-> groupMarks source' (maybe c2' (const c1'))) -- | The 'having' combinator combines two pure splitters into a pure splitter. One splitter is used to chunk the input -- into contiguous portions. Its /false/ sink is routed directly to the /false/ sink of the combined splitter. The -- second splitter is instantiated and run on each portion of the input that goes to first splitter's /true/ sink. If -- the second splitter sends any output at all to its /true/ sink, the whole input portion is passed on to the /true/ -- sink of the combined splitter, otherwise it goes to its /false/ sink. having :: forall m x y b1 b2. (Monad m, Coercible x y) => PairBinder m -> Splitter m x b1 -> Splitter m y b2 -> Splitter m x b1 having binder s1 s2 = isolateSplitter s where s :: forall d. Functor d => Source m d x -> Sink m d x -> Sink m d x -> Sink m d b1 -> Coroutine d m () s source true false edge = pipeG binder (transduce (splitterToMarker s1) source) (flip groupMarks test) >> return () where test Nothing chunk = pour chunk false test (Just mb) chunk = do chunkBuffer <- getList chunk (_, maybeFound) <- pipe (produce $ adaptProducer $ Producer $ putList chunkBuffer) (findsTrueIn s2) if isJust maybeFound then maybe (return ()) (put edge) mb >> putList chunkBuffer true >> return () else putList chunkBuffer false >> return () -- | The 'havingOnly' combinator is analogous to the 'having' combinator, but it succeeds and passes each chunk of the -- input to its /true/ sink only if the second splitter sends no part of it to its /false/ sink. havingOnly :: forall m x y b1 b2. (Monad m, Coercible x y) => PairBinder m -> Splitter m x b1 -> Splitter m y b2 -> Splitter m x b1 havingOnly binder s1 s2 = isolateSplitter s where s :: forall d. Functor d => Source m d x -> Sink m d x -> Sink m d x -> Sink m d b1 -> Coroutine d m () s source true false edge = pipeG binder (transduce (splitterToMarker s1) source) (flip groupMarks test) >> return () where test Nothing chunk = pour chunk false test (Just mb) chunk = do chunkBuffer <- getList chunk (_, anyFalse) <- pipe (produce $ adaptProducer $ Producer $ putList chunkBuffer) (findsFalseIn s2) if anyFalse then putList chunkBuffer false >> return () else maybe (return ()) (put edge) mb >> putList chunkBuffer true >> return () -- | The result of combinator 'first' behaves the same as the argument splitter up to and including the first portion of -- the input which goes into the argument's /true/ sink. All input following the first true portion goes into the -- /false/ sink. first :: forall m x b. Monad m => Splitter m x b -> Splitter m x b first splitter = wrapMarkedSplitter splitter $ \source true false edge-> let true' = mapSink (\(Left (x, True))-> x) true in pourUntil (either snd (const True)) source (mapSink (\(Left (x, False))-> x) false) >>= maybe (return ()) (\x-> either (const $ return ()) (\b-> put edge b >> get source >> return ()) x >> pourWhile (either snd (const False)) source true' >> mapMaybeStream (either (Just . fst) (const Nothing)) source false) -- | The result of combinator 'uptoFirst' takes all input up to and including the first portion of the input which goes -- into the argument's /true/ sink and feeds it to the result splitter's /true/ sink. All the rest of the input goes -- into the /false/ sink. The only difference between 'first' and 'uptoFirst' combinators is in where they direct the -- /false/ portion of the input preceding the first /true/ part. uptoFirst :: forall m x b. Monad m => Splitter m x b -> Splitter m x b uptoFirst splitter = wrapMarkedSplitter splitter $ \source true false edge-> do (pfx, mx) <- getUntil (either snd (const True)) source let prefix' = map (\(Left (x, False))-> x) pfx true' = mapSink (\(Left (x, True))-> x) true maybe (putList prefix' false >> return ()) (\x-> putList prefix' true >> either (const $ return ()) (\b-> put edge b >> get source >> return ()) x >> pourWhile (either snd (const False)) source true' >> mapMaybeStream (either (Just . fst) (const Nothing)) source false) mx -- | The result of the combinator 'last' is a splitter which directs all input to its /false/ sink, up to the last -- portion of the input which goes to its argument's /true/ sink. That portion of the input is the only one that goes to -- the resulting component's /true/ sink. The splitter returned by the combinator 'last' has to buffer the previous two -- portions of its input, because it cannot know if a true portion of the input is the last one until it sees the end of -- the input or another portion succeeding the previous one. last :: forall m x b. Monad m => Splitter m x b -> Splitter m x b last splitter = wrapMarkedSplitter splitter $ \source true false edge-> let true' = mapSink (\(Left (x, _))-> x) true false' = mapSink (\(Left (x, _))-> x) false split1 Nothing = return [] split1 (Just (Left ~(_, True))) = split2 Nothing split1 (Just (Right b)) = get source >> split2 (Just b) split2 mb = getUntil (either (not . snd) (const True)) source >>= split3 mb split3 mb (trues, Nothing) = maybe (return ()) (put edge) mb >> putList trues true' split3 mb (trues, Just (Left ~(_, False))) = getUntil (either snd (const True)) source >>= split4 mb trues split3 _ (trues, b@(Just Right{})) = putList trues false' >> split1 b split4 mb ts (fs, Nothing) = maybe (return ()) (put edge) mb >> putList ts true' >> putList fs false' split4 _ ts (fs, x@Just{}) = putList ts false' >> putList fs false' >> split1 x in pourUntil (either snd (const True)) source false' >>= split1 >> return () -- | The result of the combinator 'lastAndAfter' is a splitter which directs all input to its /false/ sink, up to the -- last portion of the input which goes to its argument's /true/ sink. That portion and the remainder of the input is -- fed to the resulting component's /true/ sink. The difference between 'last' and 'lastAndAfter' combinators is where -- they feed the /false/ portion of the input, if any, remaining after the last /true/ part. lastAndAfter :: forall m x b. Monad m => Splitter m x b -> Splitter m x b lastAndAfter splitter = wrapMarkedSplitter splitter $ \source true false edge-> let true' = mapSink (\(Left (x, _))-> x) true false' = mapSink (\(Left (x, _))-> x) false split1 Nothing = return [] split1 (Just (Left ~(_, True))) = split2 Nothing split1 (Just (Right b)) = get source >> split2 (Just b) split2 mb = getUntil (either (not . snd) (const True)) source >>= split3 mb split3 mb (trues, Nothing) = maybe (return ()) (put edge) mb >> putList trues true' split3 mb (trues, Just (Left ~(_, False))) = getUntil (either snd (const True)) source >>= split4 mb trues split3 _ (trues, b@(Just Right{})) = putList trues false' >> split1 b split4 mb ts (fs, Nothing) = maybe (return ()) (put edge) mb >> putList ts true' >> putList fs true' split4 _ ts (fs, x@Just{}) = putList ts false' >> putList fs false' >> split1 x in pourUntil (either snd (const True)) source false' >>= split1 >> return () -- | The 'prefix' combinator feeds its /true/ sink only the prefix of the input that its argument feeds to its /true/ -- sink. All the rest of the input is dumped into the /false/ sink of the result. prefix :: forall m x b. Monad m => Splitter m x b -> Splitter m x b prefix splitter = wrapMarkedSplitter splitter $ \source true false edge-> peek source >>= maybe (return ()) (\x0-> either (return . snd) (\x-> put edge x >> get source >> return True) x0 >>= flip when (pourWhile (either snd (const False)) source (mapSink (\(Left (x, True))-> x) true)) >> mapMaybeStream (either (Just . fst) (const Nothing)) source false) -- | The 'suffix' combinator feeds its /true/ sink only the suffix of the input that its argument feeds to its /true/ -- sink. All the rest of the input is dumped into the /false/ sink of the result. suffix :: forall m x b. Monad m => Splitter m x b -> Splitter m x b suffix splitter = wrapMarkedSplitter splitter $ \source true false edge-> let true' = mapSink (\(Left (x, _))-> x) true false' = mapSink (\(Left (x, _))-> x) false split0 = pourUntil (either snd (const True)) source false' >>= split1 split1 Nothing = return [] split1 (Just Left{}) = split2 Nothing split1 (Just (Right b)) = get source >> split2 (Just b) split2 mb = getUntil (either (not . snd) (const True)) source >>= split3 mb split3 mb (trues, Nothing) = maybe (return ()) (put edge) mb >> putList trues true' split3 _ (trues, Just{}) = putList trues false' >> split0 in split0 >> return () -- | The 'even' combinator takes every input section that its argument /splitter/ deems /true/, and feeds even ones into -- its /true/ sink. The odd sections and parts of input that are /false/ according to its argument splitter are fed to -- 'even' splitter's /false/ sink. even :: forall m x b. Monad m => Splitter m x b -> Splitter m x b even splitter = wrapMarkedSplitter splitter $ \source true false edge-> let true' = mapSink (\(Left (x, _))-> x) true false' = mapSink (\(Left (x, _))-> x) false split0 = pourUntil (either snd (const True)) source false' >>= split1 split1 Nothing = return () split1 (Just (Left ~(_, True))) = split2 split1 (Just Right{}) = get source >> split2 split2 = pourUntil (either (not . snd) (const True)) source false' >>= split3 split3 Nothing = return () split3 (Just (Left ~(_, False))) = pourUntil (either snd (const True)) source false' >>= split4 split3 r@(Just Right{}) = split4 r split4 Nothing = return () split4 (Just (Left ~(_, True))) = split5 split4 (Just (Right b)) = put edge b >> get source >> split5 split5 = pourWhile (either snd (const False)) source true' >> split0 in split0 -- | Splitter 'startOf' issues an empty /true/ section at the beginning of every section considered /true/ by its -- argument splitter, otherwise the entire input goes into its /false/ sink. startOf :: forall m x b. Monad m => Splitter m x b -> Splitter m x (Maybe b) startOf splitter = wrapMarkedSplitter splitter $ \source _true false edge-> let false' = mapSink (\(Left (x, _))-> x) false split0 = pourUntil (either snd (const True)) source false' >>= split1 split1 Nothing = return () split1 (Just (Left ~(_, True))) = put edge Nothing >> split2 split1 (Just (Right b)) = put edge (Just b) >> get source >> split2 split2 = pourUntil (either (not . snd) (const True)) source false' >>= split3 split3 Nothing = return () split3 (Just (Left ~(_, False))) = split0 split3 mb@(Just Right{}) = split1 mb in split0 -- | Splitter 'endOf' issues an empty /true/ section at the end of every section considered /true/ by its argument -- splitter, otherwise the entire input goes into its /false/ sink. endOf :: forall m x b. Monad m => Splitter m x b -> Splitter m x (Maybe b) endOf splitter = wrapMarkedSplitter splitter $ \source _true false edge-> let false' = mapSink (\(Left (x, _))-> x) false split0 = pourUntil (either snd (const True)) source false' >>= split1 split1 Nothing = return () split1 (Just (Left ~(_, True))) = split2 Nothing split1 (Just (Right b)) = get source >> split2 (Just b) split2 mb = pourUntil (either (not . snd) (const True)) source false' >>= (put edge mb >>) . split3 split3 Nothing = return () split3 (Just (Left ~(_, False))) = split0 split3 mb@(Just Right{}) = split1 mb in split0 -- | Combinator 'followedBy' treats its argument 'Splitter's as patterns components and returns a 'Splitter' that -- matches their concatenation. A section of input is considered /true/ by the result iff its prefix is considered -- /true/ by argument /s1/ and the rest of the section is considered /true/ by /s2/. The splitter /s2/ is started anew -- after every section split to /true/ sink by /s1/. followedBy :: forall m x b1 b2. Monad m => PairBinder m -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2) followedBy binder s1 s2 = isolateSplitter $ \ source true false edge -> pipeG binder (transduce (splitterToMarker s1) source) (\source'-> let get0 q = case Seq.viewl q of Seq.EmptyL -> split0 (Left (x, False)) :< rest -> put false x >> get0 rest (Left (_, True)) :< _ -> get2 Nothing Seq.empty q (Right b) :< rest -> get2 (Just b) Seq.empty rest false' = mapSink (\(Left (x, _))-> x) false true' = mapSink (\(Left (x, _))-> x) true split0 = pourUntil (either snd (const True)) source' false' >>= maybe (return ()) (either (const $ split1 Nothing) (\b-> get source' >> split1 (Just b))) split1 mb = do (list, mx) <- getUntil (either (not . snd) (const True)) source' let list' = Seq.fromList $ map (\(Left (x, True))-> x) list maybe (testEnd mb (Seq.fromList $ map (\(Left (x, True))-> x) list)) ((get source' >>) . get3 mb list' . Seq.singleton) mx get2 mb q q' = case Seq.viewl q' of Seq.EmptyL -> get source' >>= maybe (testEnd mb q) (get2 mb q . Seq.singleton) (Left (x, True)) :< rest -> get2 mb (q |> x) rest (Left (_, False)) :< _ -> get3 mb q q' Right{} :< _ -> get3 mb q q' get3 mb q q' = do let list = mapMaybe (either (Just . fst) (const Nothing)) (Foldable.toList $ Seq.viewl q') (q'', mn) <- pipe (\sink-> putList list sink >> get7 q' sink) (test mb q) case mn of Nothing -> putQueue q false >> get0 q'' Just 0 -> get0 q'' Just n -> get8 (Just mb) n q'' get7 q sink = do list <- getWhile (either (const True) (const False)) source' rest <- putList (map (\(Left (x, _))-> x) list) sink let q' = q >< Seq.fromList list if null rest then get source' >>= maybe (return q') (\x-> get7 (q' |> x) sink) else return q' testEnd mb q = do ((), n) <- pipe (const $ return ()) (test mb q) case n of Nothing -> putQueue q false >> return () _ -> return () test mb q source'' = liftM snd $ pipe (transduce (splitterToMarker s2) source'') (\source'''-> let test0 (Left (_, False)) = get source''' >> return Nothing test0 (Left (_, True)) = test1 test0 (Right b') = maybe (return ()) (\b-> put edge (b, b')) mb >> get source''' >> test1 test1 = putQueue q true >> getWhile (either snd (const False)) source''' >>= \list-> putList list true' >> get source''' >> return (Just $ length list) in peek source''' >>= maybe (return Nothing) test0) get8 Nothing 0 q = get0 q get8 (Just mb) 0 q = get2 mb Seq.empty q get8 mmb n q = case Seq.viewl q of Left (_, False) :< rest -> get8 Nothing (pred n) rest Left (_, True) :< rest -> get8 (maybe (Just Nothing) Just mmb) (pred n) rest Right b :< rest -> get8 (Just (Just b)) n rest EmptyL -> error "Expecting a non-empty queue!" in split0) >> return () -- | Combinator 'between' tracks the running balance of difference between the number of preceding starts of sections -- considered /true/ according to its first argument and the ones according to its second argument. The combinator -- passes to /true/ all input values for which the difference balance is positive. This combinator is typically used -- with 'startOf' and 'endOf' in order to count entire input sections and ignore their lengths. between :: forall m x b1 b2. Monad m => PairBinder m -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b1 between binder s1 s2 = isolateSplitter $ \ source true false edge -> pipeG binder (transduce (splittersToPairMarker binder s1 s2) source) (let pass n x = (if n > 0 then put true x else put false x) >> return n pass' n x = (if n >= 0 then put true x else put false x) >> return n state n (Left (x, True, False)) = pass (succ n) x state n (Left (x, False, True)) = pass' (pred n) x state n (Left (x, True, True)) = pass' n x state n (Left (x, False, False)) = pass n x state 0 (Right (Left b)) = put edge b >> return 1 state n (Right (Left _)) = return (succ n) state n (Right (Right _)) = return (pred n) in foldMStream_ state (0 :: Int)) >> return () -- Helper functions wrapMarkedSplitter :: forall m x b1 b2. Monad m => Splitter m x b1 -> (forall a1 a2 a3 a4 d. (AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d, AncestorFunctor a4 d) => Source m a1 (Either (x, Bool) b1) -> Sink m a2 x -> Sink m a3 x -> Sink m a4 b2 -> Coroutine d m ()) -> Splitter m x b2 wrapMarkedSplitter splitter splitMarked = isolateSplitter $ \ source true false edge -> pipe (transduce (splitterToMarker splitter) source) (\source'-> splitMarked source' true false edge) >> return () splitterToMarker :: forall m x b. Monad m => Splitter m x b -> Transducer m x (Either (x, Bool) b) splitterToMarker s = isolateTransducer $ \source sink-> split s source (mapSink (\x-> Left (x, True)) sink) (mapSink (\x-> Left (x, False)) sink) (mapSink Right sink) splittersToPairMarker :: forall m x b1 b2. Monad m => PairBinder m -> Splitter m x b1 -> Splitter m x b2 -> Transducer m x (Either (x, Bool, Bool) (Either b1 b2)) splittersToPairMarker binder s1 s2 = let synchronizeMarks :: forall a1 a2 d. (AncestorFunctor a1 d, AncestorFunctor a2 d) => Sink m a1 (Either (x, Bool, Bool) (Either b1 b2)) -> Source m a2 (Either ((x, Bool), Bool) (Either b1 b2)) -> Coroutine d m (Maybe (Seq (Either (x, Bool) (Either b1 b2)), Bool)) synchronizeMarks sink source = foldMStream handleMark Nothing source where handleMark Nothing (Right b) = put sink (Right b) >> return Nothing handleMark Nothing (Left (p, head)) = return (Just (Seq.singleton (Left p), head)) handleMark (Just (q, head)) (Left (p, head')) | head == head' = return (Just (q |> Left p, head)) handleMark (Just (q, True)) (Right b@Left{}) = return (Just (q |> Right b, True)) handleMark (Just (q, False)) (Right b@Right{}) = return (Just (q |> Right b, False)) handleMark state (Right b) = put sink (Right b) >> return state handleMark (Just (q, pos')) mark@(Left (p@(_, t), pos)) = case Seq.viewl q of Seq.EmptyL -> return (Just (Seq.singleton (Left p), pos)) Right b :< rest -> put sink (Right b) >> handleMark (if Seq.null rest then Nothing else Just (rest, pos')) mark Left (y, t') :< rest -> put sink (Left $ if pos then (y, t, t') else (y, t', t)) >> return (if Seq.null rest then Nothing else Just (rest, pos')) in isolateTransducer $ \source sink-> pipe (\sync-> teeConsumers binder (\source1-> split s1 source1 (mapSink (\x-> Left ((x, True), True)) sync) (mapSink (\x-> Left ((x, False), True)) sync) (mapSink (Right. Left) sync)) (\source2-> split s2 source2 (mapSink (\x-> Left ((x, True), False)) sync) (mapSink (\x-> Left ((x, False), False)) sync) (mapSink (Right . Right) sync)) source) (synchronizeMarks sink) >> return () zipSplittersWith :: forall m x b1 b2 b. Monad m => (Bool -> Bool -> Bool) -> (forall a1 a2 d. (AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 (Either b1 b2) -> Sink m a2 b -> Coroutine d m ()) -> PairBinder m -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b zipSplittersWith f boundaries binder s1 s2 = isolateSplitter $ \ source true false edge -> pipe (\edge'-> pipeG binder (transduce (splittersToPairMarker binder s1 s2) source) (mapMStream_ (either (\(x, t1, t2)-> if f t1 t2 then put true x else put false x) (put edge')))) (flip boundaries edge) >> return () -- | Runs the second argument on every contiguous region of input source (typically produced by 'splitterToMarker') -- whose all values either match @Left (_, True)@ or @Left (_, False)@. groupMarks :: (Monad m, AncestorFunctor a d, AncestorFunctor a (SinkFunctor d x)) => Source m a (Either (x, Bool) b) -> (Maybe (Maybe b) -> Source m (SourceFunctor d x) x -> Coroutine (SourceFunctor d x) m r) -> Coroutine d m () groupMarks source getConsumer = peek source >>= loop where loop = maybe (return ()) ((>>= loop . fst) . either startContent startRegion) startContent (_, False) = pipe (next False) (getConsumer Nothing) startContent (_, True) = pipe (next True) (getConsumer $ Just Nothing) startRegion b = get source >> pipe (next True) (getConsumer (Just $ Just b)) next t sink = pourUntil (either (\(_, t')-> t /= t') (const True)) source (mapSink (\(Left (x, _))-> x) sink) splitInput :: forall m a1 a2 a3 d x b. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Splitter m x b -> Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m () splitInput splitter source true false = split splitter source true false (nullSink :: Sink m d b) findsTrueIn :: forall m a d x b. (Monad m, AncestorFunctor a d) => Splitter m x b -> Source m a x -> Coroutine d m (Maybe (Maybe b)) findsTrueIn splitter source = pipe (\testTrue-> pipe (split splitter (liftSource source :: Source m d x) testTrue (nullSink :: Sink m d x)) get) get >>= \(((), maybeEdge), maybeTrue)-> return $ case maybeEdge of Nothing -> fmap (const Nothing) maybeTrue _ -> Just maybeEdge findsFalseIn :: forall m a d x b. (Monad m, AncestorFunctor a d) => Splitter m x b -> Source m a x -> Coroutine d m Bool findsFalseIn splitter source = pipe (\testFalse-> split splitter (liftSource source :: Source m d x) (nullSink :: Sink m d x) testFalse (nullSink :: Sink m d b)) get >>= \((), maybeFalse)-> return (isJust maybeFalse) teeConsumers :: forall m a d x r1 r2. Monad m => PairBinder m -> (forall a'. OpenConsumer m a' (SinkFunctor d x) x r1) -> (forall a'. OpenConsumer m a' (SourceFunctor d x) x r2) -> OpenConsumer m a d x (r1, r2) teeConsumers binder c1 c2 source = pipeG binder consume1 c2 where consume1 sink = c1 (teeSource sink source' :: Source m (SinkFunctor d x) x) source' :: Source m d x source' = liftSource source -- | Given a 'Splitter', a 'Source', and two consumer functions, 'splitInputToConsumers' runs the splitter on the source -- and feeds the splitter's /true/ and /false/ outputs, respectively, to the two consumers. splitInputToConsumers :: forall m a d d1 x b. (Monad m, d1 ~ SinkFunctor d x, AncestorFunctor a d) => PairBinder m -> Splitter m x b -> Source m a x -> (Source m (SourceFunctor d1 x) x -> Coroutine (SourceFunctor d1 x) m ()) -> (Source m (SourceFunctor d x) x -> Coroutine (SourceFunctor d x) m ()) -> Coroutine d m () splitInputToConsumers binder s source trueConsumer falseConsumer = pipeG binder (\false-> pipeG binder (\true-> split s source' true false (nullSink :: Sink m d b)) trueConsumer) falseConsumer >> return () where source' :: Source m d x source' = liftSource source