{- 
    Copyright 2008-2009 Mario Blazevic

    This file is part of the Streaming Component Combinators (SCC) project.

    The SCC project is free software: you can redistribute it and/or modify it under the terms of the GNU General Public
    License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
    version.

    SCC is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
    of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details.

    You should have received a copy of the GNU General Public License along with SCC.  If not, see
    <http://www.gnu.org/licenses/>.
-}

{-# LANGUAGE ScopedTypeVariables, Rank2Types, ImpredicativeTypes, KindSignatures, EmptyDataDecls,
             MultiParamTypeClasses, FunctionalDependencies, FlexibleContexts, FlexibleInstances #-}

-- | The "Combinators" module defines combinators applicable to 'Transducer' and 'Splitter' components defined in the
-- "Control.Concurrent.SCC.ComponentTypes" module.

module Control.Concurrent.SCC.Combinators
   (-- * Consumer, producer, and transducer combinators
    splitterToMarker,
    consumeBy, prepend, append, substitute,
    PipeableComponentPair ((>->)), JoinableComponentPair (join, sequence),
    -- * Pseudo-logic splitter combinators
    -- | Combinators '>&' and '>|' are only /pseudo/-logic. While the laws of double negation and De Morgan's laws hold,
    -- '>&' and '>|' are in general not commutative, associative, nor idempotent. In the special case when all argument
    -- splitters are stateless, such as those produced by 'Components.liftStatelessSplitter', these combinators do satisfy
    -- all laws of Boolean algebra.
    snot, (>&), (>|),
    -- ** Zipping logic combinators
    -- | The '&&' and '||' combinators run the argument splitters in parallel and combine their logical outputs using
    -- the corresponding logical operation on each output pair, in a manner similar to 'Prelude.zipWith'. They fully
    -- satisfy the laws of Boolean algebra.
    (&&), (||),
    -- * Flow-control combinators
    -- | The following combinators resemble the common flow-control programming language constructs. Combinators 
    -- 'wherever', 'unless', and 'select' are just the special cases of the combinator 'ifs'.
    --
    --    * /transducer/ ``wherever`` /splitter/ = 'ifs' /splitter/ /transducer/ 'Components.asis'
    --
    --    * /transducer/ ``unless`` /splitter/ = 'ifs' /splitter/ 'Components.asis' /transducer/
    --
    --    * 'select' /splitter/ = 'ifs' /splitter/ 'Components.asis' 'Components.suppress'
    --
    ifs, wherever, unless, select,
    -- ** Recursive
    while, nestedIn,
    -- * Section-based combinators
    -- | All combinators in this section use their 'Splitter' argument to determine the
    -- structure of the input. Every contiguous portion of the input that gets passed to one or the other sink of the
    -- splitter is treated as one section in the logical structure of the input stream. What is done with the section
    -- depends on the combinator, but the sections, and therefore the logical structure of the input stream, are
    -- determined by the argument splitter alone.
    foreach, having, havingOnly, followedBy, even,
    -- ** first and its variants
    first, uptoFirst, prefix,
    -- ** last and its variants
    last, lastAndAfter, suffix,
    -- ** positional splitters
    startOf, endOf,
    -- ** input ranges
    (...),
    -- * parser support
    parseRegions, parseNestedRegions,
    -- * grouping helpers
    groupMarks)
where

import Control.Concurrent.SCC.Foundation
import Control.Concurrent.SCC.ComponentTypes

import Prelude hiding (even, last, sequence, (||), (&&))
import qualified Prelude
import Control.Exception (assert)
import Control.Monad (liftM, when)
import qualified Control.Monad as Monad
import Data.Maybe (isJust, isNothing, fromJust)
import Data.Typeable (Typeable)
import qualified Data.Foldable as Foldable
import qualified Data.Sequence as Seq
import Data.Sequence (Seq, (|>), (><), ViewL (EmptyL, (:<)))

import Debug.Trace (trace)

-- | Converts a 'Consumer' into a 'Transducer' with no output.
consumeBy :: forall m x y r. (Monad m, Typeable x) => Consumer m x r -> Transducer m x y
consumeBy c = liftTransducer "consumeBy" (maxUsableThreads c) $
              \threads-> let c' = usingThreads threads c
                         in (ComponentConfiguration [AnyComponent c'] (usedThreads c') (cost c'),
                             \ source _sink -> consume c' source >> return [])

-- | Class 'PipeableComponentPair' applies to any two components that can be combined into a third component with the
-- following properties:
--
--    * The input of the result, if any, becomes the input of the first component.
--
--    * The output produced by the first child component is consumed by the second child component.
--
--    * The result output, if any, is the output of the second component.
class PipeableComponentPair (m :: * -> *) w c1 c2 c3 | c1 c2 -> c3, c1 c3 -> c2, c2 c3 -> c2,
                                                       c1 -> m w, c2 -> m w, c3 -> m
   where (>->) :: c1 -> c2 -> c3

instance (ParallelizableMonad m, Typeable x)
   => PipeableComponentPair m x (Producer m x ()) (Consumer m x ()) (Performer m ())
   where p >-> c = liftPerformer ">->" (maxUsableThreads p `max` maxUsableThreads c) $
                   \threads-> let (configuration, p', c', parallel) = optimalTwoParallelConfigurations threads p c
                                  performPipe = (if parallel then pipeP else pipe) (produce p') (consume c') >> return ()
                              in (configuration, performPipe)

instance (ParallelizableMonad m, Typeable x, Typeable y)
   => PipeableComponentPair m y (Transducer m x y) (Consumer m y r) (Consumer m x r)
   where t >-> c = liftConsumer ">->" (maxUsableThreads t `max` maxUsableThreads c) $
                   \threads-> let (configuration, t', c', parallel) = optimalTwoParallelConfigurations threads t c
                                  consumePipe source = liftM snd $ (if parallel then pipeP else pipe)
                                                                      (transduce t' source)
                                                                      (consume c')
                              in (configuration, consumePipe)

instance (ParallelizableMonad m, Typeable x, Typeable y)
   => PipeableComponentPair m x (Producer m x r) (Transducer m x y) (Producer m y r)
      where p >-> t = liftProducer ">->" (maxUsableThreads t `max` maxUsableThreads p) $
                      \threads-> let (configuration, p', t', parallel) = optimalTwoParallelConfigurations threads p t
                                     producePipe sink = liftM fst $ (if parallel then pipeP else pipe)
                                                                       (produce p')
                                                                       (\source-> transduce t' source sink)
                                 in (configuration, producePipe)

instance ParallelizableMonad m => PipeableComponentPair m y (Transducer m x y) (Transducer m y z) (Transducer m x z)
   where t1 >-> t2 = liftTransducer ">->" (maxUsableThreads t1 + maxUsableThreads t2) $
                     \threads-> let (configuration, t1', t2', parallel) = optimalTwoParallelConfigurations threads t1 t2
                                    transducePipe source sink = liftM fst $ (if parallel then pipeP else pipe)
                                                                               (transduce t1' source)
                                                                               (\source-> transduce t2' source sink)
                                in (configuration, transducePipe)

class Component c => CompatibleSignature c cons (m :: * -> *) input output | c -> cons m

class AnyListOrUnit c

instance AnyListOrUnit [x]
instance AnyListOrUnit ()

instance (AnyListOrUnit x, AnyListOrUnit y) => CompatibleSignature (Performer m r)    (PerformerType r)  m x y
instance AnyListOrUnit y                    => CompatibleSignature (Consumer m x r)   (ConsumerType r)   m [x] y
instance AnyListOrUnit y                    => CompatibleSignature (Producer m x r)   (ProducerType r)   m y [x]
instance                                       CompatibleSignature (Transducer m x y)  TransducerType    m [x] [y]

data PerformerType r
data ConsumerType r
data ProducerType r
data TransducerType

-- | Class 'JoinableComponentPair' applies to any two components that can be combined into a third component with the
-- following properties:
--
--    * if both argument components consume input, the input of the combined component gets distributed to both
--      components in parallel,
--
--    * if both argument components produce output, the output of the combined component is a concatenation of the
--      complete output from the first component followed by the complete output of the second component, and
--
--    * the 'join' method may apply the components in any order, the 'sequence' method makes sure its first argument
--      has completed before using the second one.
class (Monad m, CompatibleSignature c1 t1 m x y, CompatibleSignature c2 t2 m x y, CompatibleSignature c3 t3 m x y)
   => JoinableComponentPair t1 t2 t3 m x y c1 c2 c3 | c1 c2 -> c3, c1 -> t1 m, c2 -> t2 m, c3 -> t3 m x y,
                                                      t1 m x y -> c1, t2 m x y -> c2, t3 m x y -> c3
   where join :: c1 -> c2 -> c3
         sequence :: c1 -> c2 -> c3
         join = sequence

instance forall m x any r1 r2. (Monad m, Typeable x)
   => JoinableComponentPair (ProducerType r1) (ProducerType r2) (ProducerType r2) m () [x] (Producer m x r1) (Producer m x r2) (Producer m x r2)
   where sequence p1 p2 = liftProducer "sequence" (maxUsableThreads p1 `max` maxUsableThreads p2) $
                          \threads-> let (configuration, p1', p2') = optimalTwoSequentialConfigurations threads p1 p2
                                         produceJoin sink = produce p1' sink >> produce p2' sink
                                     in (configuration, produceJoin)

instance forall m x any. (ParallelizableMonad m, Typeable x)
   => JoinableComponentPair (ConsumerType ()) (ConsumerType ()) (ConsumerType ()) m [x] () (Consumer m x ()) (Consumer m x ()) (Consumer m x ())
   where join c1 c2 = liftConsumer "join" (maxUsableThreads c1 + maxUsableThreads c2) $
                      \threads-> let (configuration, c1', c2', parallel) = optimalTwoParallelConfigurations threads c1 c2
                                     consumeJoin source = do (if parallel then pipeP else pipe)
                                                                (\sink1-> pipe (tee source sink1) (consume c2'))
                                                                (consume c1')
                                                             return ()
                                 in (configuration, consumeJoin)
         sequence c1 c2 = liftConsumer "sequence" (maxUsableThreads c1 `max` maxUsableThreads c2) $
                          \threads-> let (configuration, c1', c2') = optimalTwoSequentialConfigurations threads c1 c2
                                         consumeJoin source = pipe
                                                                 (\buffer-> pipe (tee source buffer) (consume c1'))
                                                                 getList
                                                              >>= \(_, list)-> pipe (putList list) (consume c2')
                                                              >> return ()
                                     in (configuration, consumeJoin)

instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
   => JoinableComponentPair TransducerType TransducerType TransducerType m [x] [y] (Transducer m x y) (Transducer m x y) (Transducer m x y)
   where join t1 t2 = liftTransducer "join" (maxUsableThreads t1 + maxUsableThreads t2) $
                      \threads-> let (configuration, t1', t2', parallel) = optimalTwoParallelConfigurations threads t1 t2
                                     transduce' source sink = pipe
                                                                 (\buffer-> (if parallel then pipeP else pipe)
                                                                               (\sink1-> pipe
                                                                                            (\sink2-> tee source sink1 sink2)
                                                                                            (\src-> transduce t2' src buffer))
                                                                               (\source-> transduce t1' source sink))
                                                                 getList
                                                              >>= \(_, list)-> putList list sink
                                                              >> getList source
                                 in (configuration, transduce')
         sequence t1 t2 = liftTransducer "sequence" (maxUsableThreads t1 `max` maxUsableThreads t2) $
                          \threads-> let (configuration, t1', t2') = optimalTwoSequentialConfigurations threads t1 t2
                                         transduce' source sink = pipe
                                                                     (\buffer-> pipe
                                                                                   (tee source buffer)
                                                                                   (\source-> transduce t1 source sink))
                                                                     getList
                                                                  >>= \(_, list)-> pipe
                                                                                      (\sink-> putList list sink
                                                                                               >>= whenNull
                                                                                                      (pour source sink
                                                                                                       >> return []))
                                                                                      (\source-> transduce t2 source sink)
                                                                  >>= return . fst
                                     in (configuration, transduce')


instance forall m r1 r2. ParallelizableMonad m
   => JoinableComponentPair (PerformerType r1) (PerformerType r2) (PerformerType r2) m () () (Performer m r1) (Performer m r2) (Performer m r2)
   where join p1 p2 = liftPerformer "join" (maxUsableThreads p1 + maxUsableThreads p2) $
                      \threads-> let (configuration, p1', p2', parallel) = optimalTwoParallelConfigurations threads p1 p2
                                 in (configuration, if parallel then liftM snd $ perform p1' `parallelize` perform p2'
                                                    else perform p1' >> perform p2')
         sequence p1 p2 = liftPerformer "sequence" (maxUsableThreads p1 `max` maxUsableThreads p2) $
                          \threads-> let (configuration, p1', p2') = optimalTwoSequentialConfigurations threads p1 p2
                                     in (configuration, perform p1' >> perform p2')

instance forall m x r1 r2. (ParallelizableMonad m, Typeable x)
   => JoinableComponentPair (PerformerType r1) (ProducerType r2) (ProducerType r2) m () [x] (Performer m r1) (Producer m x r2) (Producer m x r2)
   where join pe pr = liftProducer "join" (maxUsableThreads pe + maxUsableThreads pr) $
                      \threads-> let (configuration, pe', pr', parallel) = optimalTwoParallelConfigurations threads pe pr
                                     produceJoin sink = if parallel then liftM snd (perform pe' `parallelize` produce pr' sink)
                                                        else perform pe' >> produce pr' sink
                                 in (configuration, produceJoin)
         sequence pe pr = liftProducer "sequence" (maxUsableThreads pe `max` maxUsableThreads pr) $
                          \threads-> let (configuration, pe', pr') = optimalTwoSequentialConfigurations threads pe pr
                                         produceJoin sink = perform pe' >> produce pr' sink
                                     in (configuration, produceJoin)

instance forall m x r1 r2. (ParallelizableMonad m, Typeable x)
   => JoinableComponentPair (ProducerType r1) (PerformerType r2) (ProducerType r2) m () [x] (Producer m x r1) (Performer m r2) (Producer m x r2)
   where join pr pe = liftProducer "join" (maxUsableThreads pr + maxUsableThreads pe) $
                      \threads-> let (configuration, pr', pe', parallel) = optimalTwoParallelConfigurations threads pr pe
                                     produceJoin sink = if parallel then liftM snd (produce pr' sink `parallelize` perform pe')
                                                        else produce pr' sink >> perform pe'
                                 in (configuration, produceJoin)
         sequence pr pe = liftProducer "sequence" (maxUsableThreads pr `max` maxUsableThreads pe) $
                          \threads-> let (configuration, pr', pe') = optimalTwoSequentialConfigurations threads pr pe
                                         produceJoin sink = produce pr' sink >> perform pe'
                                     in (configuration, produceJoin)

instance forall m x r1 r2. (ParallelizableMonad m, Typeable x)
   => JoinableComponentPair (PerformerType r1) (ConsumerType r2) (ConsumerType r2) m [x] () (Performer m r1) (Consumer m x r2) (Consumer m x r2)
   where join p c = liftConsumer "join" (maxUsableThreads p + maxUsableThreads c) $
                    \threads-> let (configuration, p', c', parallel) = optimalTwoParallelConfigurations threads p c
                                   consumeJoin source = if parallel then liftM snd (perform p' `parallelize` consume c' source)
                                                        else perform p' >> consume c' source
                               in (configuration, consumeJoin)
         sequence p c = liftConsumer "sequence" (maxUsableThreads p `max` maxUsableThreads c) $
                        \threads-> let (configuration, p', c') = optimalTwoSequentialConfigurations threads p c
                                       consumeJoin source = perform p' >> consume c' source
                                   in (configuration, consumeJoin)

instance forall m x r1 r2. (ParallelizableMonad m, Typeable x)
   => JoinableComponentPair (ConsumerType r1) (PerformerType r2) (ConsumerType r2) m [x] () (Consumer m x r1) (Performer m r2) (Consumer m x r2)
   where join c p = liftConsumer "join" (maxUsableThreads c + maxUsableThreads p) $
                    \threads-> let (configuration, c', p', parallel) = optimalTwoParallelConfigurations threads c p
                                   consumeJoin source = if parallel then liftM snd (consume c' source `parallelize` perform p')
                                                        else consume c' source >> perform p'
                               in (configuration, consumeJoin)
         sequence c p = liftConsumer "sequence" (maxUsableThreads c `max` maxUsableThreads p) $
                        \threads-> let (configuration, c', p') = optimalTwoSequentialConfigurations threads c p
                                       consumeJoin source = consume c' source >> perform p'
                                   in (configuration, consumeJoin)

instance forall m x y r. (ParallelizableMonad m, Typeable x, Typeable y)
   => JoinableComponentPair (PerformerType r) TransducerType TransducerType m [x] [y] (Performer m r) (Transducer m x y) (Transducer m x y)
   where join p t = liftTransducer "join" (maxUsableThreads p + maxUsableThreads t) $
                    \threads-> let (configuration, p', t', parallel) = optimalTwoParallelConfigurations threads p t
                                   join' source sink = if parallel then liftM snd (perform p'
                                                                                   `parallelize` transduce t' source sink)
                                                       else perform p' >> transduce t' source sink
                               in (configuration, join')
         sequence p t = liftTransducer "sequence" (maxUsableThreads p `max` maxUsableThreads t) $
                        \threads-> let (configuration, p', t') = optimalTwoSequentialConfigurations threads p t
                                       join' source sink = perform p' >> transduce t' source sink
                                   in (configuration, join')

instance forall m x y r. (ParallelizableMonad m, Typeable x, Typeable y)
   => JoinableComponentPair TransducerType (PerformerType r) TransducerType m [x] [y] (Transducer m x y) (Performer m r) (Transducer m x y)
   where join t p = liftTransducer "join" (maxUsableThreads t + maxUsableThreads p) $
                    \threads-> let (configuration, t', p', parallel) = optimalTwoParallelConfigurations threads t p
                                   join' source sink = if parallel then liftM fst (transduce t' source sink
                                                                                   `parallelize` perform p')
                                                       else do result <- transduce t' source sink
                                                               perform p'
                                                               return result
                               in (configuration, join')
         sequence t p = liftTransducer "sequence" (maxUsableThreads t `max` maxUsableThreads p) $
                        \threads-> let (configuration, t', p') = optimalTwoSequentialConfigurations threads t p
                                       join' source sink = do result <- transduce t' source sink
                                                              perform p'
                                                              return result
                                   in (configuration, join')

instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
   => JoinableComponentPair (ProducerType ()) TransducerType TransducerType m [x] [y] (Producer m y ()) (Transducer m x y) (Transducer m x y)
   where join p t = liftTransducer "join" (maxUsableThreads p + maxUsableThreads t) $
                    \threads-> let (configuration, p', t', parallel) = optimalTwoParallelConfigurations threads p t
                                   join' source sink = if parallel
                                                       then do ((_, rest), out) <- pipe
                                                                                      (\buffer-> produce p' sink `parallelize`
                                                                                                 transduce t' source buffer)
                                                                                      getList
                                                               putList out sink
                                                               return rest 
                                                       else produce p' sink >> transduce t' source sink
                               in (configuration, join')
         sequence p t = liftTransducer "sequence" (maxUsableThreads p `max` maxUsableThreads t) $
                        \threads-> let (configuration, p', t') = optimalTwoSequentialConfigurations threads p t
                                       join' source sink = produce p' sink >> transduce t' source sink
                                   in (configuration, join')

instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
   => JoinableComponentPair TransducerType (ProducerType ()) TransducerType m [x] [y] (Transducer m x y) (Producer m y ()) (Transducer m x y)
   where join t p = liftTransducer "join" (maxUsableThreads t `max` maxUsableThreads p) $
                    \threads-> let (configuration, t', p', parallel) = optimalTwoParallelConfigurations threads t p
                                   join' source sink = if parallel
                                                       then do ((rest, ()), out) <- pipe
                                                                                       (\buffer-> transduce t' source sink
                                                                                                  `parallelize` produce p' buffer)
                                                                                       getList
                                                               putList out sink
                                                               return rest 
                                                       else do result <- transduce t' source sink
                                                               produce p' sink
                                                               return result
                               in (configuration, join')
         sequence t p = liftTransducer "sequence" (maxUsableThreads t `max` maxUsableThreads p) $
                        \threads-> let (configuration, t', p') = optimalTwoSequentialConfigurations threads t p
                                       join' source sink = do result <- transduce t' source sink
                                                              produce p' sink
                                                              return result
                                   in (configuration, join')

instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
   => JoinableComponentPair (ConsumerType ()) TransducerType TransducerType m [x] [y] (Consumer m x ()) (Transducer m x y) (Transducer m x y)
   where join c t = liftTransducer "join" (maxUsableThreads c + maxUsableThreads t) $
                    \threads-> let (configuration, c', t', parallel) = optimalTwoParallelConfigurations threads c t
                                   join' source sink = liftM (snd . fst) $
                                                       (if parallel then pipeP else pipe)
                                                          (\sink1-> pipe
                                                                       (tee source sink1)
                                                                       (\source-> transduce t' source sink))
                                                          (consume c')
                               in (configuration, join')
         sequence c t = liftTransducer "sequence" (maxUsableThreads c `max` maxUsableThreads t) $
                        \threads-> let (configuration, c', t') = optimalTwoSequentialConfigurations threads c t
                                       sequence' source sink = pipe
                                                                  (\buffer-> pipe
                                                                                (tee source buffer)
                                                                                (consume c'))
                                                                  getList
                                                               >>= \(_, list)-> pipe
                                                                                   (\sink-> putList list sink
                                                                                            >>= whenNull (pour source sink
                                                                                                          >> return []))
                                                                                   (\source-> transduce t' source sink)
                                                               >>= return . fst
                                   in (configuration, sequence')

instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
   => JoinableComponentPair TransducerType (ConsumerType ()) TransducerType m [x] [y] (Transducer m x y) (Consumer m x ()) (Transducer m x y)
   where join t c = join c t
         sequence t c = liftTransducer "sequence" (maxUsableThreads t `max` maxUsableThreads c) $
                        \threads-> let (configuration, t', c') = optimalTwoSequentialConfigurations threads t c
                                       sequence' source sink = pipe
                                                                  (\buffer-> pipe
                                                                                (tee source buffer)
                                                                                (\source-> transduce t' source sink))
                                                                  getList
                                                               >>= \(_, list)-> pipe
                                                                                   (\sink-> putList list sink
                                                                                            >>= whenNull (pour source sink
                                                                                                          >> return []))
                                                                                   (consume c')
                                                               >>= return . fst
                                   in (configuration, sequence')

instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
   => JoinableComponentPair (ProducerType ()) (ConsumerType ()) TransducerType m [x] [y] (Producer m y ()) (Consumer m x ()) (Transducer m x y)
   where join p c = liftTransducer "sequence" (maxUsableThreads p + maxUsableThreads c) $
                    \threads-> let (configuration, p', c', parallel) = optimalTwoParallelConfigurations threads p c
                                   join' source sink = if parallel then produce p' sink >> consume c' source >> return []
                                                       else parallelize (produce p' sink) (consume c' source) >> return []
                               in (configuration, join')
         sequence p c = liftTransducer "sequence" (maxUsableThreads p `max` maxUsableThreads c) $
                        \threads-> let (configuration, p', c') = optimalTwoSequentialConfigurations threads p c
                                       join' source sink = produce p' sink >> consume c' source >> return []
                                   in (configuration, join')

instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
   => JoinableComponentPair (ConsumerType ()) (ProducerType ()) TransducerType m [x] [y] (Consumer m x ()) (Producer m y ()) (Transducer m x y)
   where join c p = join p c
         sequence c p = liftTransducer "sequence" (maxUsableThreads c `max` maxUsableThreads p) $
                        \threads-> let (configuration, c', p') = optimalTwoSequentialConfigurations threads c p
                                       join' source sink = consume c' source >> produce p' sink >> return []
                                   in (configuration, join')

-- | Combinator 'prepend' converts the given producer to transducer that passes all its input through unmodified, except
-- | for prepending the output of the argument producer to it.
-- | 'prepend' /prefix/ = 'join' ('substitute' /prefix/) 'asis'
prepend :: forall m x r. (Monad m, Typeable x) => Producer m x r -> Transducer m x x
prepend prefix = liftTransducer "prepend" (maxUsableThreads prefix) $
                 \threads-> let prefix' = usingThreads threads prefix
                                prepend' source sink = produce prefix' sink >> pour source sink >> return []
                            in (ComponentConfiguration [AnyComponent prefix] threads (cost prefix'), prepend')

-- | Combinator 'append' converts the given producer to transducer that passes all its input through unmodified, finally
-- | appending to it the output of the argument producer.
-- | 'append' /suffix/ = 'join' 'asis' ('substitute' /suffix/)
append :: forall m x r. (Monad m, Typeable x) => Producer m x r -> Transducer m x x
append suffix = liftTransducer "append" (maxUsableThreads suffix) $
                \threads-> let suffix' = usingThreads threads suffix
                               append' source sink = pour source sink >> produce suffix' sink >> return []
                           in (ComponentConfiguration [AnyComponent suffix] threads (cost suffix'), append')

-- | The 'substitute' combinator converts its argument producer to a transducer that produces the same output, while
-- | consuming its entire input and ignoring it.
substitute :: forall m x y r. (Monad m, Typeable x, Typeable y) => Producer m y r -> Transducer m x y
substitute feed = liftTransducer "substitute" (maxUsableThreads feed) $
                  \threads-> let feed' = usingThreads threads feed
                                 substitute' source sink = consumeAndSuppress source >> produce feed' sink >> return []
                             in (ComponentConfiguration [AnyComponent feed] threads (cost feed'), substitute')

-- | The 'snot' (streaming not) combinator simply reverses the outputs of the argument splitter.
-- In other words, data that the argument splitter sends to its /true/ sink goes to the /false/ sink of the result, and vice versa.
snot :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x b
snot splitter = liftSplitter "not" (maxUsableThreads splitter) $
                \threads-> let splitter' = usingThreads threads splitter
                               not source true false edge = liftM fst $
                                                            pipe
                                                               (split splitter source false true)
                                                               consumeAndSuppress
                           in (ComponentConfiguration [AnyComponent splitter'] threads (cost splitter'), not)

-- | The '>&' combinator sends the /true/ sink output of its left operand to the input of its right operand for further
-- splitting. Both operands' /false/ sinks are connected to the /false/ sink of the combined splitter, but any input
-- value to reach the /true/ sink of the combined component data must be deemed true by both splitters.
(>&) :: (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2) => Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2)
s1 >& s2 = liftSplitter ">&" (maxUsableThreads s1 + maxUsableThreads s2) $
           \threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
                          s source true false edge = liftM (fst . fst . fst . fst) $
                                                     pipe
                                                        (\edges->
                                                         pipe
                                                            (\edge1-> pipe
                                                                         (\edge2-> (if parallel then pipeP else pipe)
                                                                                      (\true-> split s1' source true false edge1)
                                                                                      (\source-> split s2' source true false edge2))
                                                                         (flip (pourMap Right) edges))
                                                            (flip (pourMap Left) edges))
                                                        (flip intersectRegions edge)
                      in (configuration, s)

intersectRegions source sink = next Nothing Nothing
   where next lastLeft lastRight = get source
                                   >>= maybe
                                          (return ())
                                          (either
                                              (flip pair lastRight . Just)
                                              (pair lastLeft . Just))
         pair l@(Just x) r@(Just y) = put sink (x, y)
                                      >>= flip when (next Nothing Nothing)
         pair l r = next l r

-- | A '>|' combinator's input value can reach its /false/ sink only by going through both argument splitters' /false/
-- sinks.
(>|) :: forall m x b1 b2. (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2)
        => Splitter m x b1 -> Splitter m x b2 -> Splitter m x (Either b1 b2)
s1 >| s2 = liftSplitter ">|" (maxUsableThreads s1 + maxUsableThreads s2) $
           \threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
                          s source true false edge = liftM (fst . fst . fst) $
                                                     pipe
                                                        (\edge1-> pipe
                                                                     (\edge2-> (if parallel then pipeP else pipe)
                                                                                  (\false-> split s1' source true false edge1)
                                                                                  (\source-> split s2' source true false edge2))
                                                                     (flip (pourMap Right) edge))
                                                        (flip (pourMap Left) edge)
                      in (configuration, s)

-- | Combinator '&&' is a pairwise logical conjunction of two splitters run in parallel on the same input.
(&&) :: (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2) => Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2)
s1 && s2 = liftSplitter "&&" (maxUsableThreads s1 + maxUsableThreads s2) $
           \threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
                          s source true false edge = liftM (\(x, y)-> y ++ x) $
                                                     (if parallel then pipeP else pipe)
                                                         (transduce (splittersToPairMarker s1' s2') source)
                                                         (\source-> let split l r = get source
                                                                                    >>= maybe
                                                                                           (return [])
                                                                                           (test l r)
                                                                        test l r (Left (x, t1, t2))
                                                                           = put (if t1 Prelude.&& t2 then true else false) x
                                                                             >>= cond
                                                                                    (split
                                                                                        (if t1 then l else Nothing)
                                                                                        (if t2 then r else Nothing))
                                                                                    (return [x])
                                                                        test _ Nothing (Right (Left l)) = split (Just l) Nothing
                                                                        test _ (Just r) (Right (Left l))
                                                                           = put edge (l, r) >> split (Just l) (Just r)
                                                                        test Nothing _ (Right (Right r)) = split Nothing (Just r)
                                                                        test (Just l) _ (Right (Right r))
                                                                           = put edge (l, r) >> split (Just l) (Just r)
                                                                    in split Nothing Nothing)
                      in (configuration, s)

-- | Combinator '||' is a pairwise logical disjunction of two splitters run in parallel on the same input.
(||) :: (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2)
        => Splitter m x b1 -> Splitter m x b2 -> Splitter m x (Either b1 b2)
(||) = zipSplittersWith (Prelude.||) pour

ifs :: (ParallelizableMonad m, Typeable x, Typeable b, BranchComponent cc m x [x]) => Splitter m x b -> cc -> cc -> cc
ifs s = combineBranches "if" (cost s) (\ parallel c1 c2 -> \source-> splitInputToConsumers parallel s source c1 c2)

wherever :: (ParallelizableMonad m, Typeable x, Typeable b) => Transducer m x x -> Splitter m x b -> Transducer m x x
wherever t s = liftTransducer "wherever" (maxUsableThreads s + maxUsableThreads t) $
               \threads-> let (configuration, s', t', parallel) = optimalTwoParallelConfigurations threads s t
                              wherever' source sink = splitInputToConsumers parallel s source
                                                         (\source-> transduce t source sink)
                                                         (\source-> pour source sink >> return [])
                          in (configuration, wherever')

unless :: (ParallelizableMonad m, Typeable x, Typeable b) => Transducer m x x -> Splitter m x b -> Transducer m x x
unless t s = liftTransducer "unless" (maxUsableThreads s + maxUsableThreads t) $
             \threads-> let (configuration, s', t', parallel) = optimalTwoParallelConfigurations threads s t
                            unless' source sink = splitInputToConsumers parallel s source
                                                     (\source-> pour source sink >> return [])
                                                     (\source-> transduce t source sink)
                        in (configuration, unless')

select :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Transducer m x x
select s = liftTransducer "select" (maxUsableThreads s) $
           \threads-> let s' = usingThreads threads s
                          transduce' source sink = splitInputToConsumers False s' source
                                                      (\source-> pour source sink >> return [])
                                                      (\source-> consumeAndSuppress source >> return [])
                      in (ComponentConfiguration [AnyComponent s'] threads (cost s' + 1), transduce')

-- | Converts a splitter into a parser.
parseRegions :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Parser m x b
parseRegions s = liftTransducer "parseRegions" (maxUsableThreads s) $
                \threads-> let s' = usingThreads threads s
                               transduce' source sink = liftM (\(x, y)-> y ++ x) $
                                                        pipe
                                                           (transduce (splitterToMarker s') source)
                                                           (\source-> wrapRegions source sink)
                               wrapRegions source sink = let wrap0 mb = get source
                                                                        >>= maybe
                                                                               (maybe (return True) flush mb >> return [])
                                                                               (wrap1 mb)
                                                             wrap1 Nothing (Left (x, _)) = put sink (Content x)
                                                                                           >>= cond (wrap0 Nothing) (return [x])
                                                             wrap1 (Just p) (Left (x, False)) = flush p
                                                                                                >> put sink (Content x)
                                                                                                >>= cond
                                                                                                       (wrap0 Nothing)
                                                                                                       (return [x])
                                                             wrap1 (Just (b, t)) (Left (x, True))
                                                                = (if t then return True else put sink (Markup (Start b)))
                                                                  >> put sink (Content x)
                                                                  >>= cond (wrap0 (Just (b, True))) (return [x])
                                                             wrap1 (Just p) (Right b') = flush p >> wrap0 (Just (b', False))
                                                             wrap1 Nothing (Right b) = wrap0 (Just (b, False))
                                                             flush (b, t) = put sink $ Markup $ (if t then End else Point) b
                                                         in wrap0 Nothing
                           in (ComponentConfiguration [AnyComponent s'] threads (cost s' + 1), transduce')

-- | Converts a boundary-marking splitter into a parser.
parseNestedRegions :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x (Boundary b) -> Parser m x b
parseNestedRegions s = liftTransducer "parseNestedRegions" (maxUsableThreads s) $
                       \threads-> let s' = usingThreads threads s
                                      transduce' source sink = liftM (\(w, (), (), _)-> w) $
                                                               splitToConsumers s' source
                                                                  (flip (pourMap Content) sink)
                                                                  (flip (pourMap Content) sink)
                                                                  (flip (pourMap Markup) sink)
                                  in (ComponentConfiguration [AnyComponent s'] threads (cost s' + 1), transduce')

-- | The recursive combinator 'while' feeds the true sink of the argument splitter back to itself, modified by the
-- argument transducer. Data fed to the splitter's false sink is passed on unmodified.
while :: (ParallelizableMonad m, Typeable x, Typeable b) => Transducer m x x -> Splitter m x b -> Transducer m x x
while t s = liftTransducer "while" (maxUsableThreads t + maxUsableThreads s) $
            \threads-> let (configuration, s', while'', parallel) = optimalTwoParallelConfigurations threads s while'
                           transduce' source sink = splitInputToConsumers parallel s' source
                                                       (\source-> transduce while' source sink)
                                                       (\source-> pour source sink >> return [])
                           while' = t >-> while t s
                       in (configuration, transduce')

-- | The recursive combinator 'nestedIn' combines two splitters into a mutually recursive loop acting as a single splitter.
-- The true  sink of one of the argument splitters and false sink of the other become the true and false sinks of the loop.
-- The other two sinks are bound to the other splitter's source.
-- The use of 'nestedIn' makes sense only on hierarchically structured streams. If we gave it some input containing
-- a flat sequence of values, and assuming both component splitters are deterministic and stateless,
-- an input value would either not loop at all or it would loop forever.
nestedIn :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x b -> Splitter m x b
nestedIn s1 s2 = liftSplitter "nestedIn" (maxUsableThreads s1 + maxUsableThreads s2) $
                 \threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
                                s source true false edge
                                   = liftM fst $
                                     (if parallel then pipeP else pipe)
                                        (\false-> split s1' source true false edge)
                                        (\source-> pipe
                                                      (\true-> pipe (split s2' source true false) consumeAndSuppress)
                                                      (\source-> get source
                                                                 >>= maybe
                                                                        (return ([], []))
                                                                        (\x-> pipe
                                                                                 (\sink-> put sink x
                                                                                          >>= cond
                                                                                                 (pour source sink
                                                                                                  >> return [])
                                                                                                 (return [x]))
                                                                                 (\source-> split
                                                                                               (nestedIn s1' s2')
                                                                                               source true false edge))))
                            in (configuration,s)

-- | The 'foreach' combinator is similar to the combinator 'ifs' in that it combines a splitter and two transducers into
-- another transducer. However, in this case the transducers are re-instantiated for each consecutive portion of the
-- input as the splitter chunks it up. Each contiguous portion of the input that the splitter sends to one of its two
-- sinks gets transducered through the appropriate argument transducer as that transducer's whole input. As soon as the
-- contiguous portion is finished, the transducer gets terminated.
foreach :: (ParallelizableMonad m, Typeable x, Typeable b, BranchComponent cc m x [x]) => Splitter m x b -> cc -> cc -> cc
foreach s = combineBranches "foreach" (cost s)
               (\ parallel c1 c2 source-> liftM fst $ (if parallel then pipeP else pipe)
                                                         (transduce (splitterToMarker s) source)
                                                         (\source-> groupMarks source (maybe c2 (const c1))))

-- | The 'having' combinator combines two pure splitters into a pure splitter. One splitter is used to chunk the input
-- into contiguous portions. Its /false/ sink is routed directly to the /false/ sink of the combined splitter. The
-- second splitter is instantiated and run on each portion of the input that goes to first splitter's /true/ sink. If
-- the second splitter sends any output at all to its /true/ sink, the whole input portion is passed on to the /true/
-- sink of the combined splitter, otherwise it goes to its /false/ sink.
having :: (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2)
          => Splitter m x b1 -> Splitter m x b2 -> Splitter m x b1
having s1 s2 = liftSplitter "having" (maxUsableThreads s1 + maxUsableThreads s2) $
               \threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
                              s source true false edge = liftM fst $
                                                         (if parallel then pipeP else pipe)
                                                            (transduce (splitterToMarker s1') source)
                                                            (flip groupMarks test)
                                 where test Nothing chunk = pour chunk false >> return []
                                       test (Just mb) chunk = pipe
                                                                 (\sink1-> pipe (tee chunk sink1) getList)
                                                                 (\chunk-> splitToConsumers s2' chunk
                                                                              (liftM isJust . get)
                                                                              consumeAndSuppress
                                                                              (liftM isJust . get))
                                                              >>= \(((), prefix), (_, anyTrue, (), anyEdge))->
                                                                  if anyTrue Prelude.|| anyEdge
                                                                  then maybe (return True) (put edge) mb
                                                                       >> putList prefix true
                                                                       >>= whenNull (pour chunk true >> return [])
                                                                  else putList prefix false
                                                                       >>= whenNull (pour chunk false >> return [])
                            in (configuration, s)

-- | The 'havingOnly' combinator is analogous to the 'having' combinator, but it succeeds and passes each chunk of the
-- input to its /true/ sink only if the second splitter sends no part of it to its /false/ sink.
havingOnly :: (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2)
              => Splitter m x b1 -> Splitter m x b2 -> Splitter m x b1
havingOnly s1 s2 = liftSplitter "havingOnly" (maxUsableThreads s1 + maxUsableThreads s2) $
                   \threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
                                  s source true false edge = liftM fst $
                                                             (if parallel then pipeP else pipe)
                                                                (transduce (splitterToMarker s1') source)
                                                                (flip groupMarks test)
                                     where test Nothing chunk = pour chunk false >> return []
                                           test (Just mb) chunk = pipe
                                                                     (\sink1-> pipe (tee chunk sink1) getList)
                                                                     (\chunk-> splitToConsumers s2' chunk
                                                                                  consumeAndSuppress
                                                                                  (liftM isJust . get)
                                                                                  consumeAndSuppress)
                                                                  >>= \(((), prefix), (_, (), anyFalse, ()))->
                                                                      if anyFalse
                                                                      then putList prefix false
                                                                           >>= whenNull (pour chunk false >> return [])
                                                                      else maybe (return True) (put edge) mb
                                                                           >> putList prefix true
                                                                           >>= whenNull (pour chunk true >> return [])
                            in (configuration, s)

-- | The result of combinator 'first' behaves the same as the argument splitter up to and including the first portion of
-- the input which goes into the argument's /true/ sink. All input following the first true portion goes into the
-- /false/ sink.
first :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x b
first splitter = liftSplitter "first" (maxUsableThreads splitter) $
                 \threads-> let splitter' = usingThreads threads splitter
                                configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
                                s source true false edge
                                   = liftM (\(x, y)-> y ++ x) $
                                     pipeD "first" (transduce (splitterToMarker splitter') source)
                                     (\source-> let get1 (Left (x, False)) = pass false x get1
                                                    get1 (Left (x, True)) = pass true x get2
                                                    get1 (Right b) = put edge b
                                                                     >> get source
                                                                     >>= maybe (return []) get2
                                                    get2 b@Right{} = get3 b
                                                    get2 (Left (x, True)) = pass true x get2
                                                    get2 (Left (x, False)) = pass false x get3
                                                    get3 (Left (x, _)) = pass false x get3
                                                    get3 (Right _) = get source >>= maybe (return []) get3
                                                    pass sink x next = put sink x
                                                                       >>= cond
                                                                              (get source >>= maybe (return []) next)
                                                                              (return [x])
                                                in get source >>= maybe (return []) get1)
                            in (configuration, s)

-- | The result of combinator 'uptoFirst' takes all input up to and including the first portion of the input which goes
-- into the argument's /true/ sink and feeds it to the result splitter's /true/ sink. All the rest of the input goes
-- into the /false/ sink. The only difference between 'first' and 'uptoFirst' combinators is in where they direct the
-- /false/ portion of the input preceding the first /true/ part.
uptoFirst :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x b
uptoFirst splitter = liftSplitter "uptoFirst" (maxUsableThreads splitter) $
                     \threads-> let splitter' = usingThreads threads splitter
                                    configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
                                    s source true false edge
                                       = liftM (\(x, y)-> y ++ x) $
                                         pipeD "uptoFirst" (transduce (splitterToMarker splitter') source)
                                         (\source-> let get1 q (Left (x, False)) = let q' = q |> x
                                                                                   in get source
                                                                                         >>= maybe
                                                                                                (putQueue q' false)
                                                                                                (get1 q')
                                                        get1 q p@(Left (_, True)) = putQueue q true
                                                                                    >>= whenNull (get2 p)
                                                        get1 q (Right b) = putQueue q true
                                                                           >>= whenNull (put edge b
                                                                                         >> get source
                                                                                         >>= maybe (return []) get2)
                                                        get2 b@Right{} = get3 b
                                                        get2 (Left (x, True)) = pass true x get2
                                                        get2 (Left (x, False)) = pass false x get3
                                                        get3 (Left (x, _)) = pass false x get3
                                                        get3 (Right _) = get source >>= maybe (return []) get3
                                                        pass sink x next = put sink x
                                                                           >>= cond
                                                                                  (get source >>= maybe (return []) next)
                                                                                  (return [x])
                                                    in get source >>= maybe (return []) (get1 Seq.empty))
                                in (configuration, s)

-- | The result of the combinator 'last' is a splitter which directs all input to its /false/ sink, up to the last
-- portion of the input which goes to its argument's /true/ sink. That portion of the input is the only one that goes to
-- the resulting component's /true/ sink.  The splitter returned by the combinator 'last' has to buffer the previous two
-- portions of its input, because it cannot know if a true portion of the input is the last one until it sees the end of
-- the input or another portion succeeding the previous one.
last :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x b
last splitter = liftSplitter "last" (maxUsableThreads splitter) $
                \threads-> let splitter' = usingThreads threads splitter
                               configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
                               s source true false edge
                                  = liftM (\(x, y)-> y ++ x) $
                                    pipeD "last" (transduce (splitterToMarker splitter') source)
                                    (\source-> let get1 (Left (x, False)) = put false x
                                                                            >>= cond (get source
                                                                                      >>= maybe (return []) get1)
                                                                                   (return [x])
                                                   get1 p@(Left (x, True)) = get2 Nothing Seq.empty p
                                                   get1 (Right b) = pass (get2 (Just b) Seq.empty)
                                                   get2 mb q (Left (x, True)) = let q' = q |> x
                                                                                in get source
                                                                                   >>= maybe
                                                                                          (flush mb q')
                                                                                          (get2 mb q')
                                                   get2 mb q p = get3 mb q Seq.empty p
                                                   get3 mb qt qf (Left (x, False)) = let qf' = qf |> x
                                                                                     in get source
                                                                                        >>= maybe
                                                                                               (flush mb qt >> putQueue qf' false)
                                                                                               (get3 mb qt qf')
                                                   get3 mb qt qf p = do rest1 <- putQueue qt false
                                                                        rest2 <- putQueue qf false 
                                                                        if null rest1 Prelude.&& null rest2
                                                                           then get1 p
                                                                           else return (rest1 ++ rest2)
                                                   flush mb q = maybe (return True) (put edge) mb
                                                                >> putQueue q true
                                                   pass succeed = get source >>= maybe (return []) succeed
                                               in pass get1)
                            in (configuration, s)

-- | The result of the combinator 'lastAndAfter' is a splitter which directs all input to its /false/ sink, up to the
-- last portion of the input which goes to its argument's /true/ sink. That portion and the remainder of the input is fed
-- to the resulting component's /true/ sink. The difference between 'last' and 'lastAndAfter' combinators is where they
-- feed the /false/ portion of the input, if any, remaining after the last /true/ part.
lastAndAfter :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x b
lastAndAfter splitter = liftSplitter "lastAndAfter" (maxUsableThreads splitter) $
                        \threads-> let splitter' = usingThreads threads splitter
                                       configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
                                       s source true false edge
                                          = liftM (\(x, y)-> y ++ x) $
                                            pipe
                                               (transduce (splitterToMarker splitter') source)
                                               (\source-> let get1 (Left (x, False)) = put false x
                                                                                       >>= cond (pass get1) (return [x])
                                                              get1 p@(Left (x, True)) = get2 Nothing Seq.empty p
                                                              get1 (Right b) = pass (get2 (Just b) Seq.empty)
                                                              get2 mb q (Left (x, True)) = let q' = q |> x
                                                                                           in get source
                                                                                              >>= maybe
                                                                                                     (flush mb q')
                                                                                                     (get2 mb q')
                                                              get2 mb q p = get3 mb q p
                                                              get3 mb q (Left (x, False)) = let q' = q |> x
                                                                                            in get source
                                                                                               >>= maybe
                                                                                                      (flush mb q')
                                                                                                      (get3 mb q')
                                                              get3 _ q p@(Left (x, True)) = putQueue q false
                                                                                            >>= whenNull (get1 p)
                                                              get3 _ q b'@Right{} = putQueue q false
                                                                                    >>= whenNull (get1 b')
                                                              flush mb q = maybe (return True) (put edge) mb
                                                                           >> putQueue q true
                                                              pass succeed = get source >>= maybe (return []) succeed
                                                          in pass get1)
                                   in (configuration, s)

-- | The 'prefix' combinator feeds its /true/ sink only the prefix of the input that its argument feeds to its /true/ sink.
-- All the rest of the input is dumped into the /false/ sink of the result.
prefix :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x b
prefix splitter = liftSplitter "prefix" (maxUsableThreads splitter) $
                  \threads-> let splitter' = usingThreads threads splitter
                                 configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
                                 s source true false edge
                                    = liftM (\(x, y)-> y ++ x) $
                                      pipeD "prefix" (transduce (splitterToMarker splitter') source)
                                      (\source-> let get0 p@Left{} = get1 p
                                                     get0 (Right b) = put edge b >> get source >>= maybe (return []) get1
                                                     get1 (Left (x, False)) = pass false x get2
                                                     get1 (Left (x, True)) = pass true x get1
                                                     get1 (Right b) = get source >>= maybe (return []) get2
                                                     get2 (Left (x, _)) = pass false x get2
                                                     get2 Right{} = get source >>= maybe (return []) get2
                                                     pass sink x next = put sink x
                                                                        >>= cond
                                                                               (get source >>= maybe (return []) next)
                                                                               (return [x])
                                                 in get source >>= maybe (return []) get0)
                             in (configuration, s)

-- | The 'suffix' combinator feeds its /true/ sink only the suffix of the input that its argument feeds to its /true/ sink.
-- All the rest of the input is dumped into the /false/ sink of the result.
suffix :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x b
suffix splitter = liftSplitter "suffix" (maxUsableThreads splitter) $
                  \threads-> let splitter' = usingThreads threads splitter
                                 configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
                                 s source true false edge
                                    = liftM (\(x, y)-> y ++ x) $
                                      pipeD "suffix" (transduce (splitterToMarker splitter') source)
                                      (\source-> let get1 (Left (x, False)) = put false x >>= cond (p get1) (return [x])
                                                     get1 (Left (x, True)) = get2 Nothing (Seq.singleton x)
                                                     get1 (Right b) = get2 (Just b) Seq.empty
                                                     get2 mb q = get source
                                                                 >>= maybe
                                                                        (maybe (return True) (put edge) mb >> putQueue q true)
                                                                        (get3 mb q)
                                                     get3 mb q (Left (x, True)) = get2 mb (q |> x)
                                                     get3 mb q p@(Left (x, False)) = putQueue q false
                                                                                     >>= \rest-> if null rest
                                                                                                 then get1 p
                                                                                                 else return (rest ++ [x])
                                                     get3 mb q (Right b) = putQueue q false
                                                                           >>= whenNull (get2 (Just b) Seq.empty)
                                                     p succeed = get source >>= maybe (return []) succeed
                                                 in p get1)
                             in (configuration, s)

-- | The 'even' combinator takes every input section that its argument /splitter/ deems /true/, and feeds even ones into
-- its /true/ sink. The odd sections and parts of input that are /false/ according to its argument splitter are fed to
-- 'even' splitter's /false/ sink.
even :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x b
even splitter = liftSplitter "even" (maxUsableThreads splitter) $
                   \threads-> let splitter' = usingThreads threads splitter
                                  configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
                                  s source true false edge
                                     = liftM (\(x, y)-> y ++ x) $
                                       pipeD "even"
                                          (transduce (splitterToMarker splitter') source)
                                          (\source-> let get1 (Left (x, False)) = put false x
                                                                                  >>= cond (next get1) (return [x])
                                                         get1 p@(Left (x, True)) = get2 p
                                                         get1 (Right b) = next get2
                                                         get2 (Left (x, True)) = put false x
                                                                                 >>= cond (next get2) (return [x])
                                                         get2 p@(Left (x, False)) = get3 p
                                                         get2 (Right b) = put edge b >> next get4
                                                         get3 (Left (x, False)) = put false x
                                                                                  >>= cond (next get3) (return [x])
                                                         get3 p@(Left (x, True)) = get4 p
                                                         get3 (Right b) = put edge b >> next get4
                                                         get4 (Left (x, True)) = put true x
                                                                                 >>= cond (next get4) (return [x])
                                                         get4 p@(Left (x, False)) = get1 p
                                                         get4 (Right b) = next get2
                                                         next g = get source >>= maybe (return []) g
                                                     in next get1)
                             in (configuration, s)

-- | Splitter 'startOf' issues an empty /true/ section at the beginning of every section considered /true/ by its
-- | argument splitter, otherwise the entire input goes into its /false/ sink.
startOf :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x (Maybe b)
startOf splitter = liftSplitter "startOf" (maxUsableThreads splitter) $
                   \threads-> let splitter' = usingThreads threads splitter
                                  configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
                                  s source true false edge = liftM (\(x, y)-> y ++ x) $
                                                             pipeD "startOf"
                                                                (transduce (splitterToMarker splitter') source)
                                                                (\source-> let get1 (Left (x, False)) = put false x
                                                                                                        >>= cond
                                                                                                               (next get1)
                                                                                                               (return [x])
                                                                               get1 p@(Left (x, True)) = put edge Nothing >> get2 p
                                                                               get1 (Right b) = put edge (Just b)
                                                                                                >> next get2
                                                                               get2 (Left (x, True)) = put false x
                                                                                                       >>= cond
                                                                                                              (next get2)
                                                                                                              (return [x])
                                                                               get2 p = get1 p
                                                                               next g = get source >>= maybe (return []) g
                                                                           in next get1)
                              in (configuration, s)

-- | Splitter 'endOf' issues an empty /true/ section at the end of every section considered /true/ by its argument
-- | splitter, otherwise the entire input goes into its /false/ sink.
endOf :: (ParallelizableMonad m, Typeable x, Typeable b) => Splitter m x b -> Splitter m x (Maybe b)
endOf splitter = liftSplitter "endOf" (maxUsableThreads splitter) $
                 \threads-> let splitter' = usingThreads threads splitter
                                configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
                                s source true false edge = liftM (\(x, y)-> y ++ x) $
                                                           pipeD "endOf"
                                                              (transduce (splitterToMarker splitter') source)
                                                              (\source-> let get1 (Left (x, False)) = put false x
                                                                                                      >>= cond
                                                                                                             (next get1)
                                                                                                             (return [x])
                                                                             get1 p@(Left (x, True)) = get2 Nothing p
                                                                             get1 (Right b) = next (get2 $ Just b)
                                                                             get2 mb (Left (x, True))
                                                                                = put false x
                                                                                  >>= cond (next $ get2 mb) (return [x])
                                                                             get2 mb p@(Left (x, False)) = put edge mb >> get1 p
                                                                             get2 mb (Right b) = put edge mb >> next (get2 $ Just b)
                                                                             next g = get source >>= maybe (return []) g
                                                                         in next get1)
                            in (configuration, s)

-- | Combinator 'followedBy' treats its argument 'Splitter's as patterns components and returns a 'Splitter' that
-- matches their concatenation. A section of input is considered /true/ by the result iff its prefix is considered
-- /true/ by argument /s1/ and the rest of the section is considered /true/ by /s2/. The splitter /s2/ is started anew
-- after every section split to /true/ sink by /s1/.
followedBy :: forall m x b1 b2. (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2)
              => Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2)
followedBy s1 s2 = liftSplitter "followedBy" (maxUsableThreads s1 + maxUsableThreads s2) $
                   \threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
                              in (configuration, followedBy' parallel s1' s2')
   where followedBy' parallel s1 s2 source true false edge
            = liftM (\(x, y)-> y ++ x) $
              (if parallel then pipeP else pipe)
                 (transduce (splitterToMarker s1) source)
                 (\source-> let get0 q = case Seq.viewl q
                                         of Seq.EmptyL -> get source >>= maybe (return []) get1
                                            (Left (x, False)) :< rest -> put false x
                                                                         >>= cond
                                                                                (get0 rest)
                                                                                (return
                                                                                 $ concatMap (either ((:[]) . fst) (const []))
                                                                                      $ Foldable.toList $ Seq.viewl q)
                                            (Left (x, True)) :< rest -> get2 Nothing Seq.empty q
                                            (Right b) :< rest -> get2 (Just b) Seq.empty rest
                                get1 (Left (x, False)) = put false x
                                                         >>= cond (get source >>= maybe (return []) get1)
                                                                  (return [x])
                                get1 p@(Left (x, True)) = get2 Nothing Seq.empty (Seq.singleton p)
                                get1 (Right b) = get2 (Just b) Seq.empty Seq.empty
                                get2 mb q q' = case Seq.viewl q'
                                               of Seq.EmptyL -> get source
                                                                >>= maybe (testEnd mb q) (get2 mb q . Seq.singleton)
                                                  (Left (x, True)) :< rest -> get2 mb (q |> x) rest
                                                  (Left (x, False)) :< rest -> get3 mb q q'
                                                  Right{} :< rest -> get3 mb q q'
                                get3 mb q q' = do ((q1, q2), n) <- pipe (get7 Seq.empty q') (test mb q)
                                                  case n of Nothing -> putQueue q false
                                                                       >>= whenNull (get0 (q1 >< q2))
                                                            Just 0 -> get0 (q1 >< q2)
                                                            Just n -> get8 (Just mb) n (q1 >< q2)
                                get7 q1 q2 sink = canPut sink
                                                  >>= cond (case Seq.viewl q2
                                                            of Seq.EmptyL -> get source
                                                                             >>= maybe (return (q1, q2))
                                                                                    (\p-> either
                                                                                             (put sink . fst)
                                                                                             (const $ return True)
                                                                                             p
                                                                                          >> get7 (q1 |> p) q2 sink)
                                                               p :< rest -> either (put sink . fst) (const $ return True) p
                                                                            >> get7 (q1 |> p) rest sink)
                                                           (return (q1, q2))
                                testEnd mb q = do ((), n) <- pipeD "testEnd" (const $ return ()) (test mb q)
                                                  case n of Nothing -> putQueue q false
                                                            _ -> return []
                                test mb q source = liftM snd $
                                                   pipeD "follower"
                                                      (transduce (splitterToMarker s2) source)
                                                      (\source-> let get4 (Left (_, False)) = return Nothing
                                                                     get4 p@(Left (_, True)) = putQueue q true
                                                                                               >> get5 0 p
                                                                     get4 p@(Right b) = maybe
                                                                                           (return True) (\b1-> put edge (b1, b)) mb
                                                                                        >> putQueue q true
                                                                                        >> get6 0
                                                                     get5 n (Left (x, True)) = put true x >> get6 (succ n)
                                                                     get5 n _ = return (Just n)
                                                                     get6 n = get source
                                                                              >>= maybe
                                                                                     (return $ Just n)
                                                                                     (get5 n)
                                                                 in get source >>= maybe (return Nothing) get4)
                                get8 Nothing 0 q = get0 q
                                get8 (Just mb) 0 q = get2 mb Seq.empty q
                                get8 mmb n q = case Seq.viewl q of Left (x, False) :< rest -> get8 Nothing (pred n) rest
                                                                   Left (x, True) :< rest
                                                                      -> get8 (maybe (Just Nothing) Just mmb) (pred n) rest
                                                                   Right b :< rest -> get8 (Just (Just b)) n rest
                           in get0 Seq.empty)

-- | Combinator '...' tracks the running balance of difference between the number of preceding starts of sections
-- considered /true/ according to its first argument and the ones according to its second argument. The combinator
-- passes to /true/ all input values for which the difference balance is positive. This combinator is typically used
-- with 'startOf' and 'endOf' in order to count entire input sections and ignore their lengths.
(...) :: forall m x b1 b2. (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2)
         => Splitter m x b1 -> Splitter m x b2 -> Splitter m x b1
s1 ... s2 = liftSplitter "..." (maxUsableThreads s1 + maxUsableThreads s2) $
            \threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
                           s source true false edge
                              = liftM (\(x, y)-> y ++ x) $
                                (if parallel then pipeP else pipe)
                                   (transduce (splittersToPairMarker s1' s2') source)
                                   (\source-> let next n = get source >>= maybe (return []) (state n)
                                                  pass n x = (if n > 0 then put true x else put false x)
                                                             >>= cond (next n) (return [x])
                                                  pass' n x = (if n >= 0 then put true x else put false x)
                                                              >>= cond (next n) (return [x])
                                                  state n (Left (x, True, False)) = pass (succ n) x
                                                  state n (Left (x, False, True)) = pass' (pred n) x
                                                  state n (Left (x, True, True)) = pass' n x
                                                  state n (Left (x, False, False)) = pass n x
                                                  state 0 (Right (Left b)) = put edge b >> next 1
                                                  state n (Right (Left _)) = next (succ n)
                                                  state n (Right (Right _)) = next (pred n)
                                              in next 0)
                       in (configuration, s)

-- Helper functions

-- | Converts a 'Control.Concurrent.SCC.ComponentTypes.Splitter' into a
-- 'Control.Concurrent.SCC.ComponentTypes.Transducer'.  Every input value @x@ that the argument splitter sends to its
-- /true/ sink is converted to @Left (x, True)@, every @y@ sent to the splitter's /false/ sink becomes @Left (y,
-- False)@, and any value @e@ the splitter puts in its /edge/ sink becomes @Right e@.
splitterToMarker :: forall m x b. (ParallelizableMonad m, Typeable x, Typeable b)
                    => Splitter m x b -> Transducer m x (Either (x, Bool) b)
splitterToMarker s = liftTransducer "splitterToMarker" (maxUsableThreads s) $
                     \threads-> let s' = usingThreads threads s
                                    t source sink = liftM (\(x, y, z, _)-> z ++ y ++ x) $
                                                    splitToConsumers s' source
                                                       (mark (\x-> Left (x, True)))
                                                       (mark (\x-> Left (x, False)))
                                                       (mark Right)
                                       where mark f source = canPut sink
                                                             >>= cond
                                                                    (get source
                                                                     >>= maybe (return [])
                                                                            (\x-> put sink (f x)
                                                                                  >>= cond (mark f source) (return [x])))
                                                                    (return [])
                                in (ComponentConfiguration [AnyComponent s'] threads (cost s' + 1), t)


splittersToPairMarker :: forall m x b1 b2. (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2)
                         => Splitter m x b1 -> Splitter m x b2
                                            -> Transducer m x (Either (x, Bool, Bool) (Either b1 b2))
splittersToPairMarker s1 s2
   = liftTransducer "splittersToPairMarker" (maxUsableThreads s1 + maxUsableThreads s2) $
     \threads-> let (configuration, s1', s2', parallelize) = optimalTwoParallelConfigurations threads s1 s2
                    t source sink = liftM (\(((_, _), (x, _, _, _)), _)-> x) $
                                    pipeD "splittersToPairMarker synchronize"
                                       (\sync-> (if parallelize then pipeP else pipe)
                                                   (\sink1-> pipe
                                                                (tee source sink1)
                                                                (\source2-> splitToConsumers s2' source2
                                                                               (flip (pourMap (\x-> Left ((x, True), False))) sync)
                                                                               (flip (pourMap (\x-> Left ((x, False), False))) sync)
                                                                               (flip (pourMap (Right . Right)) sync)))
                                                   (\source1-> splitToConsumers s1' source1
                                                                  (flip (pourMap (\x-> Left ((x, True), True))) sync)
                                                                  (flip (pourMap (\x-> Left ((x, False), True))) sync)
                                                                  (flip (pourMap (Right. Left)) sync)))
                                        (synchronizeMarks Nothing sink)
                    synchronizeMarks :: Maybe (Seq (Either (x, Bool) (Either b1 b2)), Bool)
                                     -> Sink c (Either (x, Bool, Bool) (Either b1 b2))
                                     -> Source c (Either ((x, Bool), Bool) (Either b1 b2))
                                     -> Pipe c m [x]
                    synchronizeMarks state sink source = get source
                                                         >>= maybe
                                                                (assert (isNothing state) (return []))
                                                                (handleMark state sink source)
                    handleMark :: Maybe (Seq (Either (x, Bool) (Either b1 b2)), Bool)
                               -> Sink c (Either (x, Bool, Bool) (Either b1 b2))
                               -> Source c (Either ((x, Bool), Bool) (Either b1 b2))
                               -> Either ((x, Bool), Bool) (Either b1 b2) -> Pipe c m [x]
                    handleMark Nothing sink source (Right b) = put sink (Right b)
                                                               >> synchronizeMarks Nothing sink source
                    handleMark Nothing sink source (Left (p, first))
                       = synchronizeMarks (Just (Seq.singleton (Left p), first)) sink source
                    handleMark state@(Just (q, first)) sink source (Left (p, first')) | first == first'
                       = synchronizeMarks (Just (q |> Left p, first)) sink source
                    handleMark state@(Just (q, True)) sink source (Right b@Left{})
                       = synchronizeMarks (Just (q |> Right b, True)) sink source
                    handleMark state@(Just (q, False)) sink source (Right b@Right{})
                       = synchronizeMarks (Just (q |> Right b, False)) sink source
                    handleMark state sink source (Right b) = put sink (Right b) >> synchronizeMarks state sink source
                    handleMark state@(Just (q, pos')) sink source mark@(Left ((x, t), pos))
                       = case Seq.viewl q
                         of Seq.EmptyL -> synchronizeMarks (Just (Seq.singleton (Left (x, t)), pos)) sink source
                            Right b :< rest -> put sink (Right b)
                                               >>= cond
                                                      (handleMark
                                                          (if Seq.null rest then Nothing else Just (rest, pos'))
                                                          sink
                                                          source
                                                          mark)
                                                      (returnQueuedList q)
                            Left (y, t') :< rest -> put sink (Left $ if pos then (y, t, t') else (y, t', t))
                                                    >>= cond
                                                           (synchronizeMarks
                                                               (if Seq.null rest then Nothing else Just (rest, pos'))
                                                               sink
                                                               source)
                                                           (returnQueuedList q)
                    returnQueuedList q = return $ concatMap (either ((:[]) . fst) (const [])) $ Foldable.toList $ Seq.viewl q
                in (configuration, t)

zipSplittersWith :: (ParallelizableMonad m, Typeable x, Typeable b1, Typeable b2, Typeable b)
                    => (Bool -> Bool -> Bool)
                       -> (forall c. Source c (Either b1 b2) -> Sink c b -> Pipe c m ())
                       -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b
zipSplittersWith f boundaries s1 s2
   = liftSplitter "zip" (maxUsableThreads s1 + maxUsableThreads s2) $
     \threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
                    s source true false edge = liftM (\((x, y), _)-> y ++ x) $
                                               pipe
                                                  (\edge'->
                                                   (if parallel then pipeP else pipe)
                                                      (transduce (splittersToPairMarker s1' s2') source)
                                                      (\source-> let split = get source
                                                                             >>= maybe
                                                                                    (return [])
                                                                                    (either
                                                                                        test
                                                                                        (\b-> put edge' b >> split))
                                                                     test (x, t1, t2) = put (if f t1 t2 then true else false) x
                                                                                        >>= cond split (return [x])
                                                                 in split))
                                                  (flip boundaries edge)
                in (configuration, s)
-- | Runs the second argument on every contiguous region of input source (typically produced by 'splitterToMarker')
-- whose all values either match @Left (_, True)@ or @Left (_, False)@.
groupMarks :: forall c m x b r. (ParallelizableMonad m, Typeable x, Typeable b)
              => Source c (Either (x, Bool) b) -> (Maybe (Maybe b) -> Source c x -> Pipe c m r) -> Pipe c m ()
groupMarks source getConsumer = start
   where start = getSuccess source (either startContent startRegion)
         startContent (x, False) = pipe (\sink-> pass False sink x) (getConsumer Nothing)
                                   >>= maybe (return ()) (either startContent startRegion) . fst
         startContent (x, True) = pipe (\sink-> pass True sink x) (getConsumer $ Just Nothing)
                                  >>= maybe (return ()) (either startContent startRegion) . fst
         startRegion b = pipe (next True) (getConsumer (Just $ Just b))
                         >>= maybe (return ()) (either startContent startRegion) . fst
         pass t sink x = put sink x >> next t sink
         next t sink = get source >>= maybe (return Nothing) (continue t sink)
         continue t sink (Left (x, t')) | t == t' = pass t sink x
         continue t sink p = return (Just p)