{- 
    Copyright 2008-2010 Mario Blazevic

    This file is part of the Streaming Component Combinators (SCC) project.

    The SCC project is free software: you can redistribute it and/or modify it under the terms of the GNU General Public
    License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
    version.

    SCC is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
    of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details.

    You should have received a copy of the GNU General Public License along with SCC.  If not, see
    <http://www.gnu.org/licenses/>.
-}

{-# LANGUAGE ScopedTypeVariables, RankNTypes, KindSignatures, EmptyDataDecls,
             MultiParamTypeClasses, FlexibleContexts, FlexibleInstances, FunctionalDependencies, TypeFamilies #-}
{-# OPTIONS_HADDOCK hide #-}

-- | The "Combinators" module defines combinators applicable to values of the 'Transducer' and 'Splitter' types defined
-- in the "Control.Concurrent.SCC.Types" module.

module Control.Concurrent.SCC.Combinators (
   -- * Consumer, producer, and transducer combinators
   consumeBy, prepend, append, substitute,
   PipeableComponentPair (compose), JoinableComponentPair (join, sequence),
   -- * Splitter combinators
   sNot,
   -- ** Pseudo-logic flow combinators
   -- | Combinators 'sAnd' and 'sOr' are only /pseudo/-logic. While the laws of double negation and De Morgan's laws
   -- hold, 'sAnd' and 'sOr' are in general not commutative, associative, nor idempotent. In the special case when all
   -- argument splitters are stateless, such as those produced by 'Control.Concurrent.SCC.Types.statelessSplitter',
   -- these combinators do satisfy all laws of Boolean algebra.
   sAnd, sOr,
   -- ** Zipping logic combinators
   -- | The 'pAnd' and 'pOr' combinators run the argument splitters in parallel and combine their logical outputs using
   -- the corresponding logical operation on each output pair, in a manner similar to 'Data.List.zipWith'. They fully
   -- satisfy the laws of Boolean algebra.
   pAnd, pOr,
   -- * Flow-control combinators
   -- | The following combinators resemble the common flow-control programming language constructs. Combinators 
   -- 'wherever', 'unless', and 'select' are just the special cases of the combinator 'ifs'.
   --
   --    * /transducer/ ``wherever`` /splitter/ = 'ifs' /splitter/ /transducer/ 'Control.Category.id'
   --
   --    * /transducer/ ``unless`` /splitter/ = 'ifs' /splitter/ 'Control.Category.id' /transducer/
   --
   --    * 'select' /splitter/ = 'ifs' /splitter/ 'Control.Category.id'
   --    'Control.Concurrent.SCC.Primitives.suppress'
   --
   ifs, wherever, unless, select,
   -- ** Recursive
   while, nestedIn,
   -- * Section-based combinators
   -- | All combinators in this section use their 'Control.Concurrent.SCC.Splitter' argument to determine the structure
   -- of the input. Every contiguous portion of the input that gets passed to one or the other sink of the splitter is
   -- treated as one section in the logical structure of the input stream. What is done with the section depends on the
   -- combinator, but the sections, and therefore the logical structure of the input stream, are determined by the
   -- argument splitter alone.
   foreach, having, havingOnly, followedBy, even,
   -- ** first and its variants
   first, uptoFirst, prefix,
   -- ** last and its variants
   last, lastAndAfter, suffix,
   -- ** positional splitters
   startOf, endOf, between,
   -- * Parser support
   splitterToMarker, parserToSplitter, parseRegions, parseNestedRegions, parseEachNestedRegion,
   -- * Helper functions
   groupMarks, findsTrueIn, findsFalseIn, teeConsumers
   )
where

import Prelude hiding (even, last, sequence, head)
import Control.Monad (liftM, when)
import Control.Monad.Trans.Class (lift)
import Data.Maybe (isJust, mapMaybe)
import qualified Data.Foldable as Foldable
import qualified Data.Sequence as Seq
import Data.Sequence (Seq, (|>), (><), ViewL (EmptyL, (:<)))

import Control.Monad.Coroutine

import Control.Concurrent.SCC.Streams
import Control.Concurrent.SCC.Types
import Control.Concurrent.SCC.Coercions

-- | Converts a 'Consumer' into a 'Transducer' with no output.
consumeBy :: forall m x y r. (Monad m) => Consumer m x r -> Transducer m x y
consumeBy c = Transducer $ \ source _sink -> consume c source >> return ()

-- | Class 'PipeableComponentPair' applies to any two components that can be combined into a third component with the
-- following properties:
--
--    * The input of the result, if any, becomes the input of the first component.
--
--    * The output produced by the first child component is consumed by the second child component.
--
--    * The result output, if any, is the output of the second component.
class PipeableComponentPair (m :: * -> *) w c1 c2 c3 | c1 c2 -> c3, c1 c3 -> c2, c2 c3 -> c2,
                                                       c1 -> m w, c2 -> m w, c3 -> m
   where compose :: PairBinder m -> c1 -> c2 -> c3

instance forall m x. Monad m => PipeableComponentPair m x (Producer m x ()) (Consumer m x ()) (Performer m ())
   where compose binder p c = let performPipe :: Coroutine Naught m ((), ())
                                  performPipe = pipeG binder (produce p) (consume c)
                              in Performer (runCoroutine performPipe >> return ())

instance Monad m => PipeableComponentPair m y (Transducer m x y) (Consumer m y r) (Consumer m x r)
   where compose binder t c = isolateConsumer $ \source-> 
                              liftM snd $
                              pipeG binder
                                 (transduce t source)
                                 (consume c)

instance Monad m => PipeableComponentPair m x (Producer m x r) (Transducer m x y) (Producer m y r)
   where compose binder p t = isolateProducer $ \sink-> 
                              liftM fst $
                              pipeG binder
                                 (produce p)
                                 (\source-> transduce t source sink)

instance Monad m => PipeableComponentPair m y (Transducer m x y) (Transducer m y z) (Transducer m x z)
   where compose binder t1 t2 = 
            isolateTransducer $ \source sink-> 
            pipeG binder (transduce t1 source) (\source'-> transduce t2 source' sink)
            >> return ()

class CompatibleSignature c cons (m :: * -> *) input output | c -> cons m

class AnyListOrUnit c

instance AnyListOrUnit [x]
instance AnyListOrUnit ()

instance (AnyListOrUnit x, AnyListOrUnit y) => CompatibleSignature (Performer m r)    (PerformerType r)  m x y
instance AnyListOrUnit y                    => CompatibleSignature (Consumer m x r)   (ConsumerType r)   m [x] y
instance AnyListOrUnit y                    => CompatibleSignature (Producer m x r)   (ProducerType r)   m y [x]
instance                                       CompatibleSignature (Transducer m x y)  TransducerType    m [x] [y]

data PerformerType r
data ConsumerType r
data ProducerType r
data TransducerType

-- | Class 'JoinableComponentPair' applies to any two components that can be combined into a third component with the
-- following properties:
--
--    * if both argument components consume input, the input of the combined component gets distributed to both
--      components in parallel, and
--
--    * if both argument components produce output, the output of the combined component is a concatenation of the
--      complete output from the first component followed by the complete output of the second component.
class (Monad m, CompatibleSignature c1 t1 m x y, CompatibleSignature c2 t2 m x y, CompatibleSignature c3 t3 m x y)
   => JoinableComponentPair t1 t2 t3 m x y c1 c2 c3 | c1 c2 -> c3, c1 -> t1 m, c2 -> t2 m, c3 -> t3 m x y,
                                                      t1 m x y -> c1, t2 m x y -> c2, t3 m x y -> c3
   where 
      -- | The 'join' combinator may apply the components in any order.
      join :: PairBinder m -> c1 -> c2 -> c3
      join = const sequence
      -- | The 'sequence' combinator makes sure its first argument has completed before using the second one.
      sequence :: c1 -> c2 -> c3

instance forall m x r1 r2. Monad m =>
   JoinableComponentPair (ProducerType r1) (ProducerType r2) (ProducerType r2) m () [x]
                         (Producer m x r1) (Producer m x r2) (Producer m x r2)
   where sequence p1 p2 = Producer $ \sink-> produce p1 sink >> produce p2 sink

instance forall m x. Monad m =>
   JoinableComponentPair (ConsumerType ()) (ConsumerType ()) (ConsumerType ()) m [x] ()
                         (Consumer m x ()) (Consumer m x ()) (Consumer m x ())
   where join binder c1 c2 = Consumer (liftM (const ()) . teeConsumers binder (consume c1) (consume c2))
         sequence c1 c2 = Consumer $ \source->
                          teeConsumers sequentialBinder (consume c1) getList source
                          >>= \((), list)-> pipe (putList list) (consume c2)
                          >> return ()

instance forall m x y. Monad m =>
   JoinableComponentPair TransducerType TransducerType TransducerType m [x] [y]
                         (Transducer m x y) (Transducer m x y) (Transducer m x y)
   where join binder t1 t2 = isolateTransducer $ \source sink->
                             pipe
                                (\buffer-> teeConsumers binder
                                              (\source'-> transduce t1 source' sink)
                                              (\source'-> transduce t2 source' buffer)
                                              source)
                                getList
                             >>= \(_, list)-> putList list sink
                             >> return ()
         sequence t1 t2 = isolateTransducer $ \source sink->
                          teeConsumers sequentialBinder (flip (transduce t1) sink) getList source
                          >>= \(_, list)-> pipe (putList list) (\source'-> transduce t2 source' sink)
                          >> return ()

instance forall m r1 r2. Monad m =>
   JoinableComponentPair (PerformerType r1) (PerformerType r2) (PerformerType r2) m () ()
                         (Performer m r1) (Performer m r2) (Performer m r2)
   where join binder p1 p2 = Performer $ binder (const return) (perform p1) (perform p2)
         sequence p1 p2 = Performer $ perform p1 >> perform p2

instance forall m x r1 r2. Monad m =>
   JoinableComponentPair (PerformerType r1) (ProducerType r2) (ProducerType r2) m () [x]
                         (Performer m r1) (Producer m x r2) (Producer m x r2)
   where join binder pe pr = Producer $ \sink-> liftBinder binder (const return) (lift (perform pe)) (produce pr sink)
         sequence pe pr = Producer $ \sink-> lift (perform pe) >> produce pr sink

instance forall m x r1 r2. Monad m =>
   JoinableComponentPair (ProducerType r1) (PerformerType r2) (ProducerType r2) m () [x]
                         (Producer m x r1) (Performer m r2) (Producer m x r2)
   where join binder pr pe = Producer $ \sink-> liftBinder binder (const return) (produce pr sink) (lift (perform pe))
         sequence pr pe = Producer $ \sink-> produce pr sink >> lift (perform pe)

instance forall m x r1 r2. Monad m =>
   JoinableComponentPair (PerformerType r1) (ConsumerType r2) (ConsumerType r2) m [x] ()
                         (Performer m r1) (Consumer m x r2) (Consumer m x r2)
   where join binder p c = Consumer $ \source-> liftBinder binder (const return) (lift (perform p)) (consume c source)
         sequence p c = Consumer $ \source-> lift (perform p) >> consume c source

instance forall m x r1 r2. Monad m =>
   JoinableComponentPair (ConsumerType r1) (PerformerType r2) (ConsumerType r2) m [x] ()
                         (Consumer m x r1) (Performer m r2) (Consumer m x r2)
   where join binder c p = Consumer $ \source-> liftBinder binder (const return) (consume c source) (lift (perform p))
         sequence c p = Consumer $ \source-> consume c source >> lift (perform p)

instance forall m x y r. Monad m =>
   JoinableComponentPair (PerformerType r) TransducerType TransducerType m [x] [y]
                         (Performer m r) (Transducer m x y) (Transducer m x y)
   where join binder p t = 
            Transducer $ \ source sink -> 
            liftBinder binder (const return) (lift (perform p)) (transduce t source sink)
         sequence p t = Transducer $ \ source sink -> lift (perform p) >> transduce t source sink

instance forall m x y r. Monad m
   => JoinableComponentPair TransducerType (PerformerType r) TransducerType m [x] [y]
                            (Transducer m x y) (Performer m r) (Transducer m x y)
   where join binder t p = 
            Transducer $ \ source sink -> 
            liftBinder binder (const . return) (transduce t source sink) (lift (perform p))
         sequence t p = Transducer $ \ source sink -> do result <- transduce t source sink
                                                         _ <- lift (perform p)
                                                         return result

instance forall m x y. Monad m =>
   JoinableComponentPair (ProducerType ()) TransducerType TransducerType m [x] [y]
                         (Producer m y ()) (Transducer m x y) (Transducer m x y)
   where join binder p t = 
            isolateTransducer $ \source sink->
            pipe (\buffer-> liftBinder binder (const return) (produce p sink) (transduce t source buffer)) getList
            >>= \(_, out)-> putList out sink >> return ()
         sequence p t = Transducer $ \ source sink -> produce p sink >> transduce t source sink

instance forall m x y. Monad m =>
   JoinableComponentPair TransducerType (ProducerType ()) TransducerType m [x] [y]
                         (Transducer m x y) (Producer m y ()) (Transducer m x y)
   where join binder t p =
            isolateTransducer $ \source sink->
            pipe (\buffer-> liftBinder binder (const . return) (transduce t source sink) (produce p buffer)) getList
            >>= \(_, out)-> putList out sink >> return ()
         sequence t p = Transducer $ \ source sink -> do result <- transduce t source sink
                                                         produce p sink
                                                         return result

instance forall m x y. Monad m =>
   JoinableComponentPair (ConsumerType ()) TransducerType TransducerType m [x] [y]
                         (Consumer m x ()) (Transducer m x y) (Transducer m x y)
   where join binder c t = 
            isolateTransducer $ \source sink->
            teeConsumers binder (consume c) (\source'-> transduce t source' sink) source
            >> return ()
         sequence c t = isolateTransducer $ \source sink->
                        teeConsumers sequentialBinder (consume c) getList source
                        >>= \(_, list)-> pipe (putList list) (\source'-> transduce t source' sink)
                        >> return ()

instance forall m x y. Monad m =>
   JoinableComponentPair TransducerType (ConsumerType ()) TransducerType m [x] [y]
                         (Transducer m x y) (Consumer m x ()) (Transducer m x y)
   where join binder t c = join binder c t
         sequence t c = isolateTransducer $ \source sink->
                        teeConsumers sequentialBinder (\source'-> transduce t source' sink) getList source
                        >>= \(_, list)-> pipe (putList list) (consume c)
                        >> return ()

instance forall m x y. Monad m =>
   JoinableComponentPair (ProducerType ()) (ConsumerType ()) TransducerType m [x] [y]
                         (Producer m y ()) (Consumer m x ()) (Transducer m x y)
   where join binder p c = Transducer $ 
                           \ source sink -> liftBinder binder (\ _ _ -> return ()) (produce p sink) (consume c source)
         sequence p c = Transducer $ \ source sink -> produce p sink >> consume c source

instance forall m x y. Monad m =>
   JoinableComponentPair (ConsumerType ()) (ProducerType ()) TransducerType m [x] [y]
                         (Consumer m x ()) (Producer m y ()) (Transducer m x y)
   where join binder c p = join binder p c
         sequence c p = Transducer $ \ source sink -> consume c source >> produce p sink

-- | Combinator 'prepend' converts the given producer to a 'Control.Concurrent.SCC.Types.Transducer' that passes all its
-- input through unmodified, except for prepending the output of the argument producer to it. The following law holds: @
-- 'prepend' /prefix/ = 'join' ('substitute' /prefix/) 'Control.Category.id' @
prepend :: forall m x r. Monad m => Producer m x r -> Transducer m x x
prepend prefixProducer = Transducer $ \ source sink -> produce prefixProducer sink >> pour source sink

-- | Combinator 'append' converts the given producer to a 'Control.Concurrent.SCC.Types.Transducer' that passes all its
-- input through unmodified, finally appending the output of the argument producer to it. The following law holds: @
-- 'append' /suffix/ = 'join' 'Control.Category.id' ('substitute' /suffix/) @
append :: forall m x r. Monad m => Producer m x r -> Transducer m x x
append suffixProducer = Transducer $ \ source sink -> pour source sink >> produce suffixProducer sink >> return ()

-- | The 'substitute' combinator converts its argument producer to a 'Control.Concurrent.SCC.Types.Transducer' that
-- produces the same output, while consuming its entire input and ignoring it.
substitute :: forall m x y r. Monad m => Producer m y r -> Transducer m x y
substitute feed = Transducer $ \ source sink -> mapMStream_ (const $ return ()) source >> produce feed sink >> return ()

-- | The 'sNot' (streaming not) combinator simply reverses the outputs of the argument splitter. In other words, data
-- that the argument splitter sends to its /true/ sink goes to the /false/ sink of the result, and vice versa.
sNot :: forall m x b. Monad m => Splitter m x b -> Splitter m x b
sNot splitter = isolateSplitter s
   where s :: forall d. Functor d => Source m d x -> Sink m d x -> Sink m d x -> Sink m d b -> Coroutine d m ()
         s source true false _edge = split splitter source false true (nullSink :: Sink m d b)

-- | The 'sAnd' combinator sends the /true/ sink output of its left operand to the input of its right operand for
-- further splitting. Both operands' /false/ sinks are connected to the /false/ sink of the combined splitter, but any
-- input value to reach the /true/ sink of the combined component data must be deemed true by both splitters.
sAnd :: forall m x b1 b2. Monad m => PairBinder m -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2)
sAnd binder s1 s2 =
   isolateSplitter $ \ source true false edge ->
   liftM (fst . fst) $
   pipe
      (\edges-> pipeG binder
                   (\true'-> split s1 source true' false (mapSink Left edges))
                   (\source'-> split s2 source' true false (mapSink Right edges)))
      (flip intersectRegions edge)

intersectRegions :: forall m a1 a2 d b1 b2. Monad m => OpenTransducer m a1 a2 d (Either b1 b2) (b1, b2) ()
intersectRegions source sink = next Nothing Nothing
   where next lastLeft lastRight = getWith
                                      (either
                                          (flip pair lastRight . Just)
                                          (pair lastLeft . Just))
                                      source
         pair (Just x) (Just y) = put sink (x, y)
                                  >> next Nothing Nothing
         pair l r = next l r

-- | A 'sOr' combinator's input value can reach its /false/ sink only by going through both argument splitters' /false/
-- sinks.
sOr :: forall m x b1 b2. Monad m => PairBinder m -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (Either b1 b2)
sOr binder s1 s2 = 
   isolateSplitter $ \ source true false edge ->
   liftM fst $
   pipeG binder
      (\false'-> split s1 source true false' (mapSink Left edge))
      (\source'-> split s2 source' true false (mapSink Right edge))

-- | Combinator 'pAnd' is a pairwise logical conjunction of two splitters run in parallel on the same input.
pAnd :: forall m x b1 b2. Monad m => PairBinder m -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2)
pAnd binder s1 s2 = 
   isolateSplitter $ \ source true false edge ->
   pipeG binder
      (transduce (splittersToPairMarker binder s1 s2) source)
      (\source'-> let next l r = getWith (test l r) source'
                      test l r (Left (x, t1, t2)) = 
                         (if t1 && t2 then put true x else put false x)
                         >> next (if t1 then l else Nothing) (if t2 then r else Nothing)
                      test _ Nothing (Right (Left l)) = next (Just l) Nothing
                      test _ (Just r) (Right (Left l)) = put edge (l, r) >> next (Just l) (Just r)
                      test Nothing _ (Right (Right r)) = next Nothing (Just r)
                      test (Just l) _ (Right (Right r)) = put edge (l, r) >> next (Just l) (Just r)
                  in next Nothing Nothing)
      >> return ()

-- | Combinator 'pOr' is a pairwise logical disjunction of two splitters run in parallel on the same input.
pOr :: forall m x b1 b2. Monad m => PairBinder m -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (Either b1 b2)
pOr = zipSplittersWith (||) pour

ifs :: forall c m x b. (Monad m, Branching c m x ()) => PairBinder m -> Splitter m x b -> c -> c -> c
ifs binder s c1 c2 = combineBranches if' binder c1 c2
   where if' :: forall d. PairBinder m -> (forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x ()) ->
                (forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x ()) ->
                forall a. OpenConsumer m a d x ()
         if' binder' c1' c2' source = splitInputToConsumers binder' s source c1' c2'

wherever :: forall m x b. Monad m => PairBinder m -> Transducer m x x -> Splitter m x b -> Transducer m x x
wherever binder t s = isolateTransducer wherever'
   where wherever' :: forall d. Functor d => Source m d x -> Sink m d x -> Coroutine d m ()
         wherever' source sink = pipeG binder
                                    (\true-> split s source true sink (nullSink :: Sink m d b))
                                    (flip (transduce t) sink)
                                 >> return ()

unless :: forall m x b. Monad m => PairBinder m -> Transducer m x x -> Splitter m x b -> Transducer m x x
unless binder t s = wherever binder t (sNot s)

select :: forall m x b. Monad m => Splitter m x b -> Transducer m x x
select s = isolateTransducer t 
   where t :: forall d. Functor d => Source m d x -> Sink m d x -> Coroutine d m ()
         t source sink = split s source sink (nullSink :: Sink m d x) (nullSink :: Sink m d b)

-- | Converts a splitter into a parser.
parseRegions :: forall m x b. Monad m => Splitter m x b -> Parser m x b
parseRegions s = isolateTransducer $ \source sink->
                    pipe
                       (transduce (splitterToMarker s) source)
                       (\source'-> concatMapAccumStream wrap Nothing source' sink 
                                   >>= maybe (return ()) (put sink . flush))
                    >> return ()
   where wrap Nothing (Left (x, _)) = (Nothing, [Content x])
         wrap (Just p) (Left (x, False)) = (Nothing, [flush p, Content x])
         wrap (Just (b, t)) (Left (x, True)) = (Just (b, True), if t then [Content x] else [Markup (Start b), Content x])
         wrap (Just p) (Right b') = (Just (b', False), [flush p])
         wrap Nothing (Right b) = (Just (b, False), [])
         flush (b, t) = Markup $ (if t then End else Point) b

-- | Converts a boundary-marking splitter into a parser.
parseNestedRegions :: forall m x b. Monad m => Splitter m x (Boundary b) -> Parser m x b
parseNestedRegions s = isolateTransducer $ \source sink->
                       split s source (mapSink Content sink) (mapSink Content sink) (mapSink Markup sink)

-- | Converts a boundary-marking splitter into a parser.
parseEachNestedRegion :: forall m x y b. Monad m =>
                         PairBinder m -> Splitter m x (Boundary b) -> Transducer m x y -> Transducer m x (Markup b y)
parseEachNestedRegion binder s t =
   isolateTransducer $ \source sink->
   let transformContent contentSource = transduce t contentSource (mapSink Content sink)
   in pipeG binder
         (transduce (splitterToMarker s) source)
         (flip groupMarks (maybe transformContent (\mark group-> maybe (return ()) (put sink . Markup) mark
                                                                 >> transformContent group)))
      >> return ()

-- | The recursive combinator 'while' feeds the true sink of the argument splitter back to itself, modified by the
-- argument transducer. Data fed to the splitter's false sink is passed on unmodified.
while :: forall m x b. Monad m => 
         PairBinder m -> Transducer m x x -> Splitter m x b -> Transducer m x x -> Transducer m x x
while binder t s whileRest = isolateTransducer while'
   where while' :: forall d. Functor d => Source m d x -> Sink m d x -> Coroutine d m ()
         while' source sink =
            pipeG binder
               (\true'-> split s source true' sink (nullSink :: Sink m d b))
               (\source'-> peek source'
                           >>= maybe 
                                  (return ())
                                  (\_-> transduce (compose binder t whileRest) source' sink))
            >> return ()

-- | The recursive combinator 'nestedIn' combines two splitters into a mutually recursive loop acting as a single
-- splitter. The true sink of one of the argument splitters and false sink of the other become the true and false sinks
-- of the loop. The other two sinks are bound to the other splitter's source. The use of 'nestedIn' makes sense only on
-- hierarchically structured streams. If we gave it some input containing a flat sequence of values, and assuming both
-- component splitters are deterministic and stateless, an input value would either not loop at all or it would loop
-- forever.
nestedIn :: forall m x b. Monad m => 
            PairBinder m -> Splitter m x b -> Splitter m x b -> Splitter m x b -> Splitter m x b
nestedIn binder s1 s2 nestedRest =
   isolateSplitter $ \ source true false edge ->
   liftM fst $
      pipeG binder
         (\false'-> split s1 source true false' edge)
         (\source'-> pipe
                        (\true'-> splitInput s2 source' true' false)
                        (\source''-> peek source''
                                     >>= maybe
                                            (return ())
                                            (\_-> split nestedRest source'' true false edge)))

-- | The 'foreach' combinator is similar to the combinator 'ifs' in that it combines a splitter and two transducers into
-- another transducer. However, in this case the transducers are re-instantiated for each consecutive portion of the
-- input as the splitter chunks it up. Each contiguous portion of the input that the splitter sends to one of its two
-- sinks gets transducered through the appropriate argument transducer as that transducer's whole input. As soon as the
-- contiguous portion is finished, the transducer gets terminated.
foreach :: forall m x b c. (Monad m, Branching c m x ()) => PairBinder m -> Splitter m x b -> c -> c -> c
foreach binder s c1 c2 = combineBranches foreach' binder c1 c2
   where foreach' :: forall d. PairBinder m -> 
                     (forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x ()) ->
                     (forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x ()) ->
                     forall a. OpenConsumer m a d x ()
         foreach' binder' c1' c2' source =
            liftM fst $
            pipeG binder'
               (transduce (splitterToMarker s) (liftSource source :: Source m d x))
               (\source'-> groupMarks source' (maybe c2' (const c1')))

-- | The 'having' combinator combines two pure splitters into a pure splitter. One splitter is used to chunk the input
-- into contiguous portions. Its /false/ sink is routed directly to the /false/ sink of the combined splitter. The
-- second splitter is instantiated and run on each portion of the input that goes to first splitter's /true/ sink. If
-- the second splitter sends any output at all to its /true/ sink, the whole input portion is passed on to the /true/
-- sink of the combined splitter, otherwise it goes to its /false/ sink.
having :: forall m x y b1 b2. (Monad m, Coercible x y) =>
          PairBinder m -> Splitter m x b1 -> Splitter m y b2 -> Splitter m x b1
having binder s1 s2 = isolateSplitter s
   where s :: forall d. Functor d => Source m d x -> Sink m d x -> Sink m d x -> Sink m d b1 -> Coroutine d m ()
         s source true false edge = pipeG binder
                                       (transduce (splitterToMarker s1) source)
                                       (flip groupMarks test)
                                    >> return ()
            where test Nothing chunk = pour chunk false
                  test (Just mb) chunk = 
                     do chunkBuffer <- getList chunk
                        (_, maybeFound) <- 
                           pipe (produce $ adaptProducer $ Producer $ putList chunkBuffer) (findsTrueIn s2)
                        if isJust maybeFound 
                           then maybe (return ()) (put edge) mb >> putList chunkBuffer true >> return ()
                           else putList chunkBuffer false >> return ()

-- | The 'havingOnly' combinator is analogous to the 'having' combinator, but it succeeds and passes each chunk of the
-- input to its /true/ sink only if the second splitter sends no part of it to its /false/ sink.
havingOnly :: forall m x y b1 b2. (Monad m, Coercible x y) =>
              PairBinder m -> Splitter m x b1 -> Splitter m y b2 -> Splitter m x b1
havingOnly binder s1 s2 = isolateSplitter s
   where s :: forall d. Functor d => Source m d x -> Sink m d x -> Sink m d x -> Sink m d b1 -> Coroutine d m ()
         s source true false edge = pipeG binder
                                       (transduce (splitterToMarker s1) source)
                                       (flip groupMarks test)
                                    >> return ()
            where test Nothing chunk = pour chunk false
                  test (Just mb) chunk = 
                     do chunkBuffer <- getList chunk
                        (_, anyFalse) <- 
                           pipe (produce $ adaptProducer $ Producer $ putList chunkBuffer) (findsFalseIn s2)
                        if anyFalse
                           then putList chunkBuffer false >> return ()
                           else maybe (return ()) (put edge) mb >> putList chunkBuffer true >> return ()

-- | The result of combinator 'first' behaves the same as the argument splitter up to and including the first portion of
-- the input which goes into the argument's /true/ sink. All input following the first true portion goes into the
-- /false/ sink.
first :: forall m x b. Monad m => Splitter m x b -> Splitter m x b
first splitter = wrapMarkedSplitter splitter $
                 \source true false edge-> 
                 let true' = mapSink (\(Left (x, True))-> x) true
                 in pourUntil (either snd (const True)) source (mapSink (\(Left (x, False))-> x) false)
                    >>= maybe
                           (return ())
                           (\x-> either (const $ return ()) (\b-> put edge b >> get source >> return ()) x
                                 >> pourWhile (either snd (const False)) source true'
                                 >> mapMaybeStream (either (Just . fst) (const Nothing)) source false)

-- | The result of combinator 'uptoFirst' takes all input up to and including the first portion of the input which goes
-- into the argument's /true/ sink and feeds it to the result splitter's /true/ sink. All the rest of the input goes
-- into the /false/ sink. The only difference between 'first' and 'uptoFirst' combinators is in where they direct the
-- /false/ portion of the input preceding the first /true/ part.
uptoFirst :: forall m x b. Monad m => Splitter m x b -> Splitter m x b
uptoFirst splitter = wrapMarkedSplitter splitter $
                     \source true false edge->
                     do (pfx, mx) <- getUntil (either snd (const True)) source
                        let prefix' = map (\(Left (x, False))-> x) pfx
                            true' = mapSink (\(Left (x, True))-> x) true
                        maybe
                           (putList prefix' false >> return ())
                           (\x-> putList prefix' true
                                 >> either (const $ return ()) (\b-> put edge b >> get source >> return ()) x
                                 >> pourWhile (either snd (const False)) source true'
                                 >> mapMaybeStream (either (Just . fst) (const Nothing)) source false)
                           mx

-- | The result of the combinator 'last' is a splitter which directs all input to its /false/ sink, up to the last
-- portion of the input which goes to its argument's /true/ sink. That portion of the input is the only one that goes to
-- the resulting component's /true/ sink.  The splitter returned by the combinator 'last' has to buffer the previous two
-- portions of its input, because it cannot know if a true portion of the input is the last one until it sees the end of
-- the input or another portion succeeding the previous one.
last :: forall m x b. Monad m => Splitter m x b -> Splitter m x b
last splitter = 
  wrapMarkedSplitter splitter $ \source true false edge->
  let true' = mapSink (\(Left (x, _))-> x) true
      false' = mapSink (\(Left (x, _))-> x) false
      split1 Nothing = return []
      split1 (Just (Left ~(_, True))) = split2 Nothing
      split1 (Just (Right b)) = get source >> split2 (Just b)
      split2 mb = getUntil (either (not . snd) (const True)) source >>= split3 mb
      split3 mb (trues, Nothing) = maybe (return ()) (put edge) mb >> putList trues true'
      split3 mb (trues, Just (Left ~(_, False))) = getUntil (either snd (const True)) source >>= split4 mb trues
      split3 _ (trues, b@(Just Right{})) = putList trues false' >> split1 b
      split4 mb ts (fs, Nothing) = maybe (return ()) (put edge) mb >> putList ts true' >> putList fs false'
      split4 _ ts (fs, x@Just{}) = putList ts false' >> putList fs false' >> split1 x
  in pourUntil (either snd (const True)) source false' >>= split1 >> return ()

-- | The result of the combinator 'lastAndAfter' is a splitter which directs all input to its /false/ sink, up to the
-- last portion of the input which goes to its argument's /true/ sink. That portion and the remainder of the input is
-- fed to the resulting component's /true/ sink. The difference between 'last' and 'lastAndAfter' combinators is where
-- they feed the /false/ portion of the input, if any, remaining after the last /true/ part.
lastAndAfter :: forall m x b. Monad m => Splitter m x b -> Splitter m x b
lastAndAfter splitter = 
   wrapMarkedSplitter splitter $
   \source true false edge->
   let true' = mapSink (\(Left (x, _))-> x) true
       false' = mapSink (\(Left (x, _))-> x) false
       split1 Nothing = return []
       split1 (Just (Left ~(_, True))) = split2 Nothing
       split1 (Just (Right b)) = get source >> split2 (Just b)
       split2 mb = getUntil (either (not . snd) (const True)) source >>= split3 mb
       split3 mb (trues, Nothing) = maybe (return ()) (put edge) mb >> putList trues true'
       split3 mb (trues, Just (Left ~(_, False))) = getUntil (either snd (const True)) source >>= split4 mb trues
       split3 _ (trues, b@(Just Right{})) = putList trues false' >> split1 b
       split4 mb ts (fs, Nothing) = maybe (return ()) (put edge) mb >> putList ts true' >> putList fs true'
       split4 _ ts (fs, x@Just{}) = putList ts false' >> putList fs false' >> split1 x
   in pourUntil (either snd (const True)) source false' >>= split1 >> return ()

-- | The 'prefix' combinator feeds its /true/ sink only the prefix of the input that its argument feeds to its /true/
-- sink.  All the rest of the input is dumped into the /false/ sink of the result.
prefix :: forall m x b. Monad m => Splitter m x b -> Splitter m x b
prefix splitter = wrapMarkedSplitter splitter $
                  \source true false edge->
                  peek source
                  >>= maybe
                         (return ())
                         (\x0-> either (return . snd) (\x-> put edge x >> get source >> return True) x0
                                >>= flip when (pourWhile (either snd (const False))
                                                         source 
                                                         (mapSink (\(Left (x, True))-> x) true))
                                >> mapMaybeStream (either (Just . fst) (const Nothing)) source false)

-- | The 'suffix' combinator feeds its /true/ sink only the suffix of the input that its argument feeds to its /true/
-- sink.  All the rest of the input is dumped into the /false/ sink of the result.
suffix :: forall m x b. Monad m => Splitter m x b -> Splitter m x b
suffix splitter = 
   wrapMarkedSplitter splitter $
   \source true false edge->
   let true' = mapSink (\(Left (x, _))-> x) true
       false' = mapSink (\(Left (x, _))-> x) false
       split0 = pourUntil (either snd (const True)) source false' >>= split1
       split1 Nothing = return []
       split1 (Just Left{}) = split2 Nothing
       split1 (Just (Right b)) = get source >> split2 (Just b)
       split2 mb = getUntil (either (not . snd) (const True)) source >>= split3 mb
       split3 mb (trues, Nothing) = maybe (return ()) (put edge) mb >> putList trues true'
       split3 _ (trues, Just{}) = putList trues false' >> split0
   in split0 >> return ()

-- | The 'even' combinator takes every input section that its argument /splitter/ deems /true/, and feeds even ones into
-- its /true/ sink. The odd sections and parts of input that are /false/ according to its argument splitter are fed to
-- 'even' splitter's /false/ sink.
even :: forall m x b. Monad m => Splitter m x b -> Splitter m x b
even splitter = wrapMarkedSplitter splitter $
                \source true false edge->
                let true' = mapSink (\(Left (x, _))-> x) true
                    false' = mapSink (\(Left (x, _))-> x) false
                    split0 = pourUntil (either snd (const True)) source false' >>= split1
                    split1 Nothing = return ()
                    split1 (Just (Left ~(_, True))) = split2
                    split1 (Just Right{}) = get source >> split2
                    split2 = pourUntil (either (not . snd) (const True)) source false' >>= split3
                    split3 Nothing = return ()
                    split3 (Just (Left ~(_, False))) = pourUntil (either snd (const True)) source false' >>= split4
                    split3 r@(Just Right{}) = split4 r
                    split4 Nothing = return ()
                    split4 (Just (Left ~(_, True))) = split5
                    split4 (Just (Right b)) = put edge b >> get source >> split5
                    split5 = pourWhile (either snd (const False)) source true' >> split0
                in split0

-- | Splitter 'startOf' issues an empty /true/ section at the beginning of every section considered /true/ by its
-- argument splitter, otherwise the entire input goes into its /false/ sink.
startOf :: forall m x b. Monad m => Splitter m x b -> Splitter m x (Maybe b)
startOf splitter = wrapMarkedSplitter splitter $
                   \source _true false edge->
                   let false' = mapSink (\(Left (x, _))-> x) false
                       split0 = pourUntil (either snd (const True)) source false' >>= split1
                       split1 Nothing = return ()
                       split1 (Just (Left ~(_, True))) = put edge Nothing >> split2
                       split1 (Just (Right b)) = put edge (Just b) >> get source >> split2
                       split2 = pourUntil (either (not . snd) (const True)) source false' >>= split3
                       split3 Nothing = return ()
                       split3 (Just (Left ~(_, False))) = split0
                       split3 mb@(Just Right{}) = split1 mb
                   in split0

-- | Splitter 'endOf' issues an empty /true/ section at the end of every section considered /true/ by its argument
-- splitter, otherwise the entire input goes into its /false/ sink.
endOf :: forall m x b. Monad m => Splitter m x b -> Splitter m x (Maybe b)
endOf splitter = wrapMarkedSplitter splitter $
                 \source _true false edge->
                 let false' = mapSink (\(Left (x, _))-> x) false
                     split0 = pourUntil (either snd (const True)) source false' >>= split1
                     split1 Nothing = return ()
                     split1 (Just (Left ~(_, True))) = split2 Nothing
                     split1 (Just (Right b)) = get source >> split2 (Just b)
                     split2 mb = pourUntil (either (not . snd) (const True)) source false' 
                                 >>= (put edge mb >>) . split3
                     split3 Nothing = return ()
                     split3 (Just (Left ~(_, False))) = split0
                     split3 mb@(Just Right{}) = split1 mb
                 in split0

-- | Combinator 'followedBy' treats its argument 'Splitter's as patterns components and returns a 'Splitter' that
-- matches their concatenation. A section of input is considered /true/ by the result iff its prefix is considered
-- /true/ by argument /s1/ and the rest of the section is considered /true/ by /s2/. The splitter /s2/ is started anew
-- after every section split to /true/ sink by /s1/.
followedBy :: forall m x b1 b2. Monad m => PairBinder m -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2)
followedBy binder s1 s2 = 
   isolateSplitter $ \ source true false edge ->
   pipeG binder
      (transduce (splitterToMarker s1) source)
      (\source'-> let get0 q = case Seq.viewl q
                               of Seq.EmptyL -> split0
                                  (Left (x, False)) :< rest -> put false x >> get0 rest
                                  (Left (_, True)) :< _ -> get2 Nothing Seq.empty q
                                  (Right b) :< rest -> get2 (Just b) Seq.empty rest
                      false' = mapSink (\(Left (x, _))-> x) false
                      true' = mapSink (\(Left (x, _))-> x) true
                      split0 = pourUntil (either snd (const True)) source' false'
                               >>= maybe 
                                      (return ()) 
                                      (either (const $ split1 Nothing) (\b-> get source' >> split1 (Just b)))
                      split1 mb = do (list, mx) <- getUntil (either (not . snd) (const True)) source'
                                     let list' = Seq.fromList $ map (\(Left (x, True))-> x) list
                                     maybe
                                        (testEnd mb (Seq.fromList $ map (\(Left (x, True))-> x) list))
                                        ((get source' >>) . get3 mb list' . Seq.singleton)
                                        mx
                      get2 mb q q' = case Seq.viewl q'
                                     of Seq.EmptyL -> get source'
                                                      >>= maybe (testEnd mb q) (get2 mb q . Seq.singleton)
                                        (Left (x, True)) :< rest -> get2 mb (q |> x) rest
                                        (Left (_, False)) :< _ -> get3 mb q q'
                                        Right{} :< _ -> get3 mb q q'
                      get3 mb q q' = do let list = mapMaybe 
                                                      (either (Just . fst) (const Nothing)) 
                                                      (Foldable.toList $ Seq.viewl q')
                                        (q'', mn) <- pipe (\sink-> putList list sink >> get7 q' sink) (test mb q)
                                        case mn of Nothing -> putQueue q false >> get0 q''
                                                   Just 0 -> get0 q''
                                                   Just n -> get8 (Just mb) n q''
                      get7 q sink = do list <- getWhile (either (const True) (const False)) source'
                                       rest <- putList (map (\(Left (x, _))-> x) list) sink
                                       let q' = q >< Seq.fromList list
                                       if null rest 
                                          then get source' >>= maybe (return q') (\x-> get7 (q' |> x) sink)
                                          else return q'
                      testEnd mb q = do ((), n) <- pipe (const $ return ()) (test mb q)
                                        case n of Nothing -> putQueue q false >> return ()
                                                  _ -> return ()
                      test mb q source'' = liftM snd $
                                           pipe
                                              (transduce (splitterToMarker s2) source'')
                                              (\source'''-> let test0 (Left (_, False)) = get source''' 
                                                                                          >> return Nothing
                                                                test0 (Left (_, True)) = test1
                                                                test0 (Right b') = maybe 
                                                                                      (return ()) 
                                                                                      (\b-> put edge (b, b')) 
                                                                                      mb
                                                                                   >> get source'''
                                                                                   >> test1
                                                                test1 = putQueue q true
                                                                        >> getWhile (either snd (const False)) source'''
                                                                        >>= \list-> putList list true'
                                                                        >> get source'''
                                                                        >> return (Just $ length list)
                                                            in peek source''' >>= maybe (return Nothing) test0)
                      get8 Nothing 0 q = get0 q
                      get8 (Just mb) 0 q = get2 mb Seq.empty q
                      get8 mmb n q = case Seq.viewl q 
                                     of Left (_, False) :< rest -> get8 Nothing (pred n) rest
                                        Left (_, True) :< rest -> get8 (maybe (Just Nothing) Just mmb) (pred n) rest
                                        Right b :< rest -> get8 (Just (Just b)) n rest
                                        EmptyL -> error "Expecting a non-empty queue!" 
                 in split0)
   >> return ()

-- | Combinator 'between' tracks the running balance of difference between the number of preceding starts of sections
-- considered /true/ according to its first argument and the ones according to its second argument. The combinator
-- passes to /true/ all input values for which the difference balance is positive. This combinator is typically used
-- with 'startOf' and 'endOf' in order to count entire input sections and ignore their lengths.
between :: forall m x b1 b2. Monad m => PairBinder m -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b1
between binder s1 s2 = isolateSplitter $ \ source true false edge ->
                         pipeG binder
                            (transduce (splittersToPairMarker binder s1 s2) source)
                            (let pass n x = (if n > 0 then put true x else put false x)
                                            >> return n
                                 pass' n x = (if n >= 0 then put true x else put false x)
                                             >> return n
                                 state n (Left (x, True, False)) = pass (succ n) x
                                 state n (Left (x, False, True)) = pass' (pred n) x
                                 state n (Left (x, True, True)) = pass' n x
                                 state n (Left (x, False, False)) = pass n x
                                 state 0 (Right (Left b)) = put edge b >> return 1
                                 state n (Right (Left _)) = return (succ n)
                                 state n (Right (Right _)) = return (pred n)
                             in foldMStream_ state (0 :: Int))
                         >> return ()

-- Helper functions

wrapMarkedSplitter ::
   forall m x b1 b2. Monad m =>
   Splitter m x b1
   -> (forall a1 a2 a3 a4 d. (AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d, AncestorFunctor a4 d) =>
       Source m a1 (Either (x, Bool) b1) -> Sink m a2 x -> Sink m a3 x -> Sink m a4 b2 -> Coroutine d m ())
   -> Splitter m x b2
wrapMarkedSplitter splitter splitMarked = isolateSplitter $ \ source true false edge ->
                                          pipe
                                             (transduce (splitterToMarker splitter) source)
                                             (\source'-> splitMarked source' true false edge)
                                          >> return ()

splitterToMarker :: forall m x b. Monad m => Splitter m x b -> Transducer m x (Either (x, Bool) b)
splitterToMarker s = isolateTransducer $ \source sink->
                     split s source
                        (mapSink (\x-> Left (x, True)) sink)
                        (mapSink (\x-> Left (x, False)) sink)
                        (mapSink Right sink)

parserToSplitter :: forall m x b. Monad m => Parser m x b -> Splitter m x (Boundary b)
parserToSplitter t = isolateSplitter $ \ source true false edge ->
                     pipe
                        (transduce t source)
                        (\source-> let true' = mapSink fromContent true
                                       false' = mapSink fromContent false
                                       topLevel = pourUntil isMarkup source false'
                                                  >>= maybe (return ()) (\x-> handleMarkup x >> topLevel)
                                       handleMarkup (Markup p@Point{}) = put edge p >> return True
                                       handleMarkup (Markup s@Start{}) = put edge s >> handleRegion >> return True
                                       handleMarkup (Markup e@End{}) = put edge e >> return False
                                       handleRegion = pourUntil isMarkup source true'
                                                      >>= maybe (return ()) (\x -> handleMarkup x 
                                                                                   >>= flip when handleRegion)
                                   in topLevel)
                        >> return ()
   where isMarkup Markup{} = True
         isMarkup Content{} = False
         fromContent (Content x) = x

splittersToPairMarker :: forall m x b1 b2. Monad m => PairBinder m -> Splitter m x b1 -> Splitter m x b2 ->
                         Transducer m x (Either (x, Bool, Bool) (Either b1 b2))
splittersToPairMarker binder s1 s2 =
   let synchronizeMarks :: forall a1 a2 d. (AncestorFunctor a1 d, AncestorFunctor a2 d) =>
                           Sink m a1 (Either (x, Bool, Bool) (Either b1 b2))
                        -> Source m a2 (Either ((x, Bool), Bool) (Either b1 b2))
                        -> Coroutine d m (Maybe (Seq (Either (x, Bool) (Either b1 b2)), Bool))
       synchronizeMarks sink source = foldMStream handleMark Nothing source where
          handleMark Nothing (Right b) = put sink (Right b) >> return Nothing
          handleMark Nothing (Left (p, head)) = return (Just (Seq.singleton (Left p), head))
          handleMark (Just (q, head)) (Left (p, head')) | head == head' = return (Just (q |> Left p, head))
          handleMark (Just (q, True)) (Right b@Left{}) = return (Just (q |> Right b, True))
          handleMark (Just (q, False)) (Right b@Right{}) = return (Just (q |> Right b, False))
          handleMark state (Right b) = put sink (Right b) >> return state
          handleMark (Just (q, pos')) mark@(Left (p@(_, t), pos))
             = case Seq.viewl q
               of Seq.EmptyL -> return (Just (Seq.singleton (Left p), pos))
                  Right b :< rest -> put sink (Right b)
                                     >> handleMark (if Seq.null rest then Nothing else Just (rest, pos')) mark
                  Left (y, t') :< rest -> put sink (Left $ if pos then (y, t, t') else (y, t', t))
                                          >> return (if Seq.null rest then Nothing else Just (rest, pos'))
   in isolateTransducer $
      \source sink->
      pipe
         (\sync-> teeConsumers binder
                     (\source1-> split s1 source1
                                    (mapSink (\x-> Left ((x, True), True)) sync)
                                    (mapSink (\x-> Left ((x, False), True)) sync)
                                    (mapSink (Right. Left) sync))
                     (\source2-> split s2 source2
                                    (mapSink (\x-> Left ((x, True), False)) sync)
                                    (mapSink (\x-> Left ((x, False), False)) sync)
                                    (mapSink (Right . Right) sync))
                     source)
          (synchronizeMarks sink)
      >> return ()

zipSplittersWith :: forall m x b1 b2 b. Monad m => 
                    (Bool -> Bool -> Bool) -> 
                    (forall a1 a2 d. (AncestorFunctor a1 d, AncestorFunctor a2 d) =>
                     Source m a1 (Either b1 b2) -> Sink m a2 b -> Coroutine d m ()) -> 
                    PairBinder m -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b
zipSplittersWith f boundaries binder s1 s2
   = isolateSplitter $ \ source true false edge ->
     pipe
        (\edge'->
         pipeG binder
            (transduce (splittersToPairMarker binder s1 s2) source)
            (mapMStream_
                (either
                    (\(x, t1, t2)-> if f t1 t2 then put true x else put false x)
                    (put edge'))))
        (flip boundaries edge)
     >> return ()

-- | Runs the second argument on every contiguous region of input source (typically produced by 'splitterToMarker')
-- whose all values either match @Left (_, True)@ or @Left (_, False)@.
groupMarks :: (Monad m, AncestorFunctor a d, AncestorFunctor a (SinkFunctor d x)) =>
              Source m a (Either (x, Bool) b) ->
              (Maybe (Maybe b) -> Source m (SourceFunctor d x) x -> Coroutine (SourceFunctor d x) m r) ->
              Coroutine d m ()
groupMarks source getConsumer = peek source >>= loop
   where loop = maybe (return ()) ((>>= loop . fst) . either startContent startRegion)
         startContent (_, False) = pipe (next False) (getConsumer Nothing)
         startContent (_, True) = pipe (next True) (getConsumer $ Just Nothing)
         startRegion b = get source >> pipe (next True) (getConsumer (Just $ Just b))
         next t sink = pourUntil (either (\(_, t')-> t /= t') (const True)) source (mapSink (\(Left (x, _))-> x) sink)

splitInput :: forall m a1 a2 a3 d x b. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) =>
              Splitter m x b -> Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
splitInput splitter source true false = split splitter source true false (nullSink :: Sink m d b)

findsTrueIn :: forall m a d x b. (Monad m, AncestorFunctor a d)
               => Splitter m x b -> Source m a x -> Coroutine d m (Maybe (Maybe b))
findsTrueIn splitter source = pipe
                                 (\testTrue-> pipe
                                                 (split splitter (liftSource source :: Source m d x)
                                                     testTrue
                                                     (nullSink :: Sink m d x))
                                                 get)
                                 get
                              >>= \(((), maybeEdge), maybeTrue)-> return $
                                                                  case maybeEdge
                                                                  of Nothing -> fmap (const Nothing) maybeTrue
                                                                     _ -> Just maybeEdge

findsFalseIn :: forall m a d x b. (Monad m, AncestorFunctor a d) => Splitter m x b -> Source m a x -> Coroutine d m Bool
findsFalseIn splitter source = pipe
                                  (\testFalse-> split splitter (liftSource source :: Source m d x)
                                                   (nullSink :: Sink m d x)
                                                   testFalse
                                                   (nullSink :: Sink m d b))
                                  get
                               >>= \((), maybeFalse)-> return (isJust maybeFalse)

teeConsumers :: forall m a d x r1 r2. Monad m => 
                PairBinder m 
                -> (forall a'. OpenConsumer m a' (SourceFunctor (SinkFunctor d x) x) x r1)
                -> (forall a'. OpenConsumer m a' (SourceFunctor d x) x r2)
                -> OpenConsumer m a d x (r1, r2)
teeConsumers binder c1 c2 source = pipeG binder consume1 c2
   where consume1 sink = liftM snd $ pipe (tee source' sink) c1
         source' :: Source m d x
         source' = liftSource source

-- | Given a 'Splitter', a 'Source', and two consumer functions, 'splitInputToConsumers' runs the splitter on the source
-- and feeds the splitter's /true/ and /false/ outputs, respectively, to the two consumers.
splitInputToConsumers :: forall m a d d1 x b. (Monad m, d1 ~ SinkFunctor d x, AncestorFunctor a d) =>
                         PairBinder m -> Splitter m x b -> Source m a x ->
                         (Source m (SourceFunctor d1 x) x -> Coroutine (SourceFunctor d1 x) m ()) ->
                         (Source m (SourceFunctor d x) x -> Coroutine (SourceFunctor d x) m ()) ->
                         Coroutine d m ()
splitInputToConsumers binder s source trueConsumer falseConsumer
   = pipeG binder
        (\false-> pipeG binder
                     (\true-> split s source' true false (nullSink :: Sink m d b))
                     trueConsumer)
        falseConsumer
     >> return ()
   where source' :: Source m d x
         source' = liftSource source