{- 
    Copyright 2008 Mario Blazevic

    This file is part of the Streaming Component Combinators (SCC) project.

    The SCC project is free software: you can redistribute it and/or modify it under the terms of the GNU General Public
    License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
    version.

    SCC is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
    of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details.

    You should have received a copy of the GNU General Public License along with SCC.  If not, see
    <http://www.gnu.org/licenses/>.
-}

{-# LANGUAGE ScopedTypeVariables, Rank2Types, KindSignatures, EmptyDataDecls,
             MultiParamTypeClasses, FunctionalDependencies, FlexibleContexts, FlexibleInstances #-}

-- | The "Combinators" module defines combinators applicable to 'Transducer' and 'Splitter' components defined in the
-- "ComponentTypes" module.

module Control.Concurrent.SCC.Combinators
   (-- * Consumer, producer, and transducer combinators
    consumeBy, prepend, append, substitute,
    PipeableComponentPair ((>->)), JoinableComponentPair (join, sequence),
    -- * Pseudo-logic splitter combinators
    -- | Combinators '>&' and '>|' are only /pseudo/-logic. While the laws of double negation and De Morgan's laws hold,
    -- '>&' and '>|' are in general not commutative, associative, nor idempotent. In the special case when all argument
    -- splitters are stateless, such as those produced by 'Components.liftStatelessSplitter', these combinators do satisfy
    -- all laws of Boolean algebra.
    snot, (>&), (>|),
    -- ** Zipping logic combinators
    -- | The '&&' and '||' combinators run the argument splitters in parallel and combine their logical outputs using
    -- the corresponding logical operation on each output pair, in a manner similar to 'Prelude.zipWith'. They fully
    -- satisfy the laws of Boolean algebra.
    (&&), (||),
    -- * Flow-control combinators
    -- | The following combinators resemble the common flow-control programming language constructs. Combinators 
    -- 'wherever', 'unless', and 'select' are just the special cases of the combinator 'ifs'.
    --
    --    * /transducer/ ``wherever`` /splitter/ = 'ifs' /splitter/ /transducer/ 'Components.asis'
    --
    --    * /transducer/ ``unless`` /splitter/ = 'ifs' /splitter/ 'Components.asis' /transducer/
    --
    --    * 'select' /splitter/ = 'ifs' /splitter/ 'Components.asis' 'Components.suppress'
    --
    ifs, wherever, unless, select,
    -- ** Recursive
    while, nestedIn,
    -- * Section-based combinators
    -- | All combinators in this section use their 'Splitter' argument to determine the
    -- structure of the input. Every contiguous portion of the input that gets passed to one or the other sink of the
    -- splitter is treated as one section in the logical structure of the input stream. What is done with the section
    -- depends on the combinator, but the sections, and therefore the logical structure of the input stream, are
    -- determined by the argument splitter alone.
    foreach, having, havingOnly, followedBy, even,
    -- ** first and its variants
    first, uptoFirst, prefix,
    -- ** last and its variants
    last, lastAndAfter, suffix,
    -- ** positional splitters
    startOf, endOf,
    -- ** input ranges
    (...))
where

import Control.Concurrent.SCC.Foundation
import Control.Concurrent.SCC.ComponentTypes

import Prelude hiding (even, last, sequence, (||), (&&))
import qualified Prelude
import Control.Exception (assert)
import Control.Monad (liftM, when)
import qualified Control.Monad as Monad
import Data.Maybe (isJust, isNothing, fromJust)
import Data.Typeable (Typeable)
import qualified Data.Foldable as Foldable
import qualified Data.Sequence as Seq
import Data.Sequence (Seq, (|>), (><), ViewL (EmptyL, (:<)))

import Debug.Trace (trace)

consumeBy :: forall m x y r. (Monad m, Typeable x) => Consumer m x r -> Transducer m x y
consumeBy c = liftTransducer "consumeBy" (maxUsableThreads c) $
              \threads-> let c' = usingThreads threads c
                         in (ComponentConfiguration [AnyComponent c'] (usedThreads c') (cost c'),
                             \ source _sink -> consume c' source >> return [])

-- | Class 'PipeableComponentPair' applies to any two components that can be combined into a third component with the
-- | following properties:
-- |    * The input of the result, if any, becomes the input of the first component.
-- |    * The output produced by the first child component is consumed by the second child component.
-- |    * The result output, if any, is the output of the second component.
class PipeableComponentPair (m :: * -> *) w c1 c2 c3 | c1 c2 -> c3, c1 c3 -> c2, c2 c3 -> c2,
                                                       c1 -> m w, c2 -> m w, c3 -> m
   where (>->) :: c1 -> c2 -> c3

instance (ParallelizableMonad m, Typeable x)
   => PipeableComponentPair m x (Producer m x ()) (Consumer m x ()) (Performer m ())
   where p >-> c = liftPerformer ">->" (maxUsableThreads p `max` maxUsableThreads c) $
                   \threads-> let (configuration, p', c', parallel) = optimalTwoParallelConfigurations threads p c
                                  performPipe = (if parallel then pipeP else pipe) (produce p') (consume c') >> return ()
                              in (configuration, performPipe)

instance (ParallelizableMonad m, Typeable x, Typeable y)
   => PipeableComponentPair m y (Transducer m x y) (Consumer m y r) (Consumer m x r)
   where t >-> c = liftConsumer ">->" (maxUsableThreads t `max` maxUsableThreads c) $
                   \threads-> let (configuration, t', c', parallel) = optimalTwoParallelConfigurations threads t c
                                  consumePipe source = liftM snd $ (if parallel then pipeP else pipe)
                                                                      (transduce t' source)
                                                                      (consume c')
                              in (configuration, consumePipe)

instance (ParallelizableMonad m, Typeable x, Typeable y)
   => PipeableComponentPair m x (Producer m x r) (Transducer m x y) (Producer m y r)
      where p >-> t = liftProducer ">->" (maxUsableThreads t `max` maxUsableThreads p) $
                      \threads-> let (configuration, p', t', parallel) = optimalTwoParallelConfigurations threads p t
                                     producePipe sink = liftM fst $ (if parallel then pipeP else pipe)
                                                                       (produce p')
                                                                       (\source-> transduce t' source sink)
                                 in (configuration, producePipe)

instance ParallelizableMonad m => PipeableComponentPair m y (Transducer m x y) (Transducer m y z) (Transducer m x z)
   where t1 >-> t2 = liftTransducer ">->" (maxUsableThreads t1 + maxUsableThreads t2) $
                     \threads-> let (configuration, t1', t2', parallel) = optimalTwoParallelConfigurations threads t1 t2
                                    transducePipe source sink = liftM fst $ (if parallel then pipeP else pipe)
                                                                               (transduce t1' source)
                                                                               (\source-> transduce t2' source sink)
                                in (configuration, transducePipe)

class Component c => CompatibleSignature c cons (m :: * -> *) input output | c -> cons m

class AnyListOrUnit c

instance AnyListOrUnit [x]
instance AnyListOrUnit ()

instance (AnyListOrUnit x, AnyListOrUnit y) => CompatibleSignature (Performer m r)    (PerformerType r)  m x y
instance AnyListOrUnit y                    => CompatibleSignature (Consumer m x r)   (ConsumerType r)   m [x] y
instance AnyListOrUnit y                    => CompatibleSignature (Producer m x r)   (ProducerType r)   m y [x]
instance                                       CompatibleSignature (Transducer m x y)  TransducerType    m [x] [y]

data PerformerType r
data ConsumerType r
data ProducerType r
data TransducerType

-- | Class 'JoinableComponentPair' applies to any two components that can be combined into a third component with the
-- | following properties:
-- |    * if both argument components consume input, the input of the combined component gets distributed to both
-- |      components in parallel,
-- |    * if both argument components produce output, the output of the combined component is a concatenation of the
-- |      complete output from the first component followed by the complete output of the second component, and
-- |    * the 'join' method may apply the components in any order, the 'sequence' method makes sure its first argument
-- |      has completed before using the second one.
class (Monad m, CompatibleSignature c1 t1 m x y, CompatibleSignature c2 t2 m x y, CompatibleSignature c3 t3 m x y)
   => JoinableComponentPair t1 t2 t3 m x y c1 c2 c3 | c1 c2 -> c3, c1 -> t1 m, c2 -> t2 m, c3 -> t3 m x y,
                                                      t1 m x y -> c1, t2 m x y -> c2, t3 m x y -> c3
   where join :: c1 -> c2 -> c3
         sequence :: c1 -> c2 -> c3
         join = sequence

instance forall m x any r1 r2. (Monad m, Typeable x)
   => JoinableComponentPair (ProducerType r1) (ProducerType r2) (ProducerType r2) m () [x] (Producer m x r1) (Producer m x r2) (Producer m x r2)
   where sequence p1 p2 = liftProducer "sequence" (maxUsableThreads p1 `max` maxUsableThreads p2) $
                          \threads-> let (configuration, p1', p2') = optimalTwoSequentialConfigurations threads p1 p2
                                         produceJoin sink = produce p1' sink >> produce p2' sink
                                     in (configuration, produceJoin)

instance forall m x any. (ParallelizableMonad m, Typeable x)
   => JoinableComponentPair (ConsumerType ()) (ConsumerType ()) (ConsumerType ()) m [x] () (Consumer m x ()) (Consumer m x ()) (Consumer m x ())
   where join c1 c2 = liftConsumer "join" (maxUsableThreads c1 + maxUsableThreads c2) $
                      \threads-> let (configuration, c1', c2', parallel) = optimalTwoParallelConfigurations threads c1 c2
                                     consumeJoin source = do (if parallel then pipeP else pipe)
                                                                (\sink1-> pipe (tee source sink1) (consume c2'))
                                                                (consume c1')
                                                             return ()
                                 in (configuration, consumeJoin)
         sequence c1 c2 = liftConsumer "sequence" (maxUsableThreads c1 `max` maxUsableThreads c2) $
                          \threads-> let (configuration, c1', c2') = optimalTwoSequentialConfigurations threads c1 c2
                                         consumeJoin source = pipe
                                                                 (\buffer-> pipe (tee source buffer) (consume c1'))
                                                                 getList
                                                              >>= \(_, list)-> pipe (putList list) (consume c2')
                                                              >> return ()
                                     in (configuration, consumeJoin)

instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
   => JoinableComponentPair TransducerType TransducerType TransducerType m [x] [y] (Transducer m x y) (Transducer m x y) (Transducer m x y)
   where join t1 t2 = liftTransducer "join" (maxUsableThreads t1 + maxUsableThreads t2) $
                      \threads-> let (configuration, t1', t2', parallel) = optimalTwoParallelConfigurations threads t1 t2
                                     transduce' source sink = pipe
                                                                 (\buffer-> (if parallel then pipeP else pipe)
                                                                               (\sink1-> pipe
                                                                                            (\sink2-> tee source sink1 sink2)
                                                                                            (\src-> transduce t2' src buffer))
                                                                               (\source-> transduce t1' source sink))
                                                                 getList
                                                              >>= \(_, list)-> putList list sink
                                                              >> getList source
                                 in (configuration, transduce')
         sequence t1 t2 = liftTransducer "sequence" (maxUsableThreads t1 `max` maxUsableThreads t2) $
                          \threads-> let (configuration, t1', t2') = optimalTwoSequentialConfigurations threads t1 t2
                                         transduce' source sink = pipe
                                                                     (\buffer-> pipe
                                                                                   (tee source buffer)
                                                                                   (\source-> transduce t1 source sink))
                                                                     getList
                                                                  >>= \((extra, _), list)-> pipe
                                                                                               (putList list)
                                                                                               (\source-> transduce t2 source sink)
                                                                  >> return extra
                                     in (configuration, transduce')


instance forall m r1 r2. ParallelizableMonad m
   => JoinableComponentPair (PerformerType r1) (PerformerType r2) (PerformerType r2) m () () (Performer m r1) (Performer m r2) (Performer m r2)
   where join p1 p2 = liftPerformer "join" (maxUsableThreads p1 + maxUsableThreads p2) $
                      \threads-> let (configuration, p1', p2', parallel) = optimalTwoParallelConfigurations threads p1 p2
                                 in (configuration, if parallel then liftM snd $ perform p1' `parallelize` perform p2'
                                                    else perform p1' >> perform p2')
         sequence p1 p2 = liftPerformer "sequence" (maxUsableThreads p1 `max` maxUsableThreads p2) $
                          \threads-> let (configuration, p1', p2') = optimalTwoSequentialConfigurations threads p1 p2
                                     in (configuration, perform p1' >> perform p2')

instance forall m x r1 r2. (ParallelizableMonad m, Typeable x)
   => JoinableComponentPair (PerformerType r1) (ProducerType r2) (ProducerType r2) m () [x] (Performer m r1) (Producer m x r2) (Producer m x r2)
   where join pe pr = liftProducer "join" (maxUsableThreads pe + maxUsableThreads pr) $
                      \threads-> let (configuration, pe', pr', parallel) = optimalTwoParallelConfigurations threads pe pr
                                     produceJoin sink = if parallel then liftM snd (perform pe' `parallelize` produce pr' sink)
                                                        else perform pe' >> produce pr' sink
                                 in (configuration, produceJoin)
         sequence pe pr = liftProducer "sequence" (maxUsableThreads pe `max` maxUsableThreads pr) $
                          \threads-> let (configuration, pe', pr') = optimalTwoSequentialConfigurations threads pe pr
                                         produceJoin sink = perform pe' >> produce pr' sink
                                     in (configuration, produceJoin)

instance forall m x r1 r2. (ParallelizableMonad m, Typeable x)
   => JoinableComponentPair (ProducerType r1) (PerformerType r2) (ProducerType r2) m () [x] (Producer m x r1) (Performer m r2) (Producer m x r2)
   where join pr pe = liftProducer "join" (maxUsableThreads pr + maxUsableThreads pe) $
                      \threads-> let (configuration, pr', pe', parallel) = optimalTwoParallelConfigurations threads pr pe
                                     produceJoin sink = if parallel then liftM snd (produce pr' sink `parallelize` perform pe')
                                                        else produce pr' sink >> perform pe'
                                 in (configuration, produceJoin)
         sequence pr pe = liftProducer "sequence" (maxUsableThreads pr `max` maxUsableThreads pe) $
                          \threads-> let (configuration, pr', pe') = optimalTwoSequentialConfigurations threads pr pe
                                         produceJoin sink = produce pr' sink >> perform pe'
                                     in (configuration, produceJoin)

instance forall m x r1 r2. (ParallelizableMonad m, Typeable x)
   => JoinableComponentPair (PerformerType r1) (ConsumerType r2) (ConsumerType r2) m [x] () (Performer m r1) (Consumer m x r2) (Consumer m x r2)
   where join p c = liftConsumer "join" (maxUsableThreads p + maxUsableThreads c) $
                    \threads-> let (configuration, p', c', parallel) = optimalTwoParallelConfigurations threads p c
                                   consumeJoin source = if parallel then liftM snd (perform p' `parallelize` consume c' source)
                                                        else perform p' >> consume c' source
                               in (configuration, consumeJoin)
         sequence p c = liftConsumer "sequence" (maxUsableThreads p `max` maxUsableThreads c) $
                        \threads-> let (configuration, p', c') = optimalTwoSequentialConfigurations threads p c
                                       consumeJoin source = perform p' >> consume c' source
                                   in (configuration, consumeJoin)

instance forall m x r1 r2. (ParallelizableMonad m, Typeable x)
   => JoinableComponentPair (ConsumerType r1) (PerformerType r2) (ConsumerType r2) m [x] () (Consumer m x r1) (Performer m r2) (Consumer m x r2)
   where join c p = liftConsumer "join" (maxUsableThreads c + maxUsableThreads p) $
                    \threads-> let (configuration, c', p', parallel) = optimalTwoParallelConfigurations threads c p
                                   consumeJoin source = if parallel then liftM snd (consume c' source `parallelize` perform p')
                                                        else consume c' source >> perform p'
                               in (configuration, consumeJoin)
         sequence c p = liftConsumer "sequence" (maxUsableThreads c `max` maxUsableThreads p) $
                        \threads-> let (configuration, c', p') = optimalTwoSequentialConfigurations threads c p
                                       consumeJoin source = consume c' source >> perform p'
                                   in (configuration, consumeJoin)

instance forall m x y r. (ParallelizableMonad m, Typeable x, Typeable y)
   => JoinableComponentPair (PerformerType r) TransducerType TransducerType m [x] [y] (Performer m r) (Transducer m x y) (Transducer m x y)
   where join p t = liftTransducer "join" (maxUsableThreads p + maxUsableThreads t) $
                    \threads-> let (configuration, p', t', parallel) = optimalTwoParallelConfigurations threads p t
                                   join' source sink = if parallel then liftM snd (perform p'
                                                                                   `parallelize` transduce t' source sink)
                                                       else perform p' >> transduce t' source sink
                               in (configuration, join')
         sequence p t = liftTransducer "sequence" (maxUsableThreads p `max` maxUsableThreads t) $
                        \threads-> let (configuration, p', t') = optimalTwoSequentialConfigurations threads p t
                                       join' source sink = perform p' >> transduce t' source sink
                                   in (configuration, join')

instance forall m x y r. (ParallelizableMonad m, Typeable x, Typeable y)
   => JoinableComponentPair TransducerType (PerformerType r) TransducerType m [x] [y] (Transducer m x y) (Performer m r) (Transducer m x y)
   where join t p = liftTransducer "join" (maxUsableThreads t + maxUsableThreads p) $
                    \threads-> let (configuration, t', p', parallel) = optimalTwoParallelConfigurations threads t p
                                   join' source sink = if parallel then liftM fst (transduce t' source sink
                                                                                   `parallelize` perform p')
                                                       else do result <- transduce t' source sink
                                                               perform p'
                                                               return result
                               in (configuration, join')
         sequence t p = liftTransducer "sequence" (maxUsableThreads t `max` maxUsableThreads p) $
                        \threads-> let (configuration, t', p') = optimalTwoSequentialConfigurations threads t p
                                       join' source sink = do result <- transduce t' source sink
                                                              perform p'
                                                              return result
                                   in (configuration, join')

instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
   => JoinableComponentPair (ProducerType ()) TransducerType TransducerType m [x] [y] (Producer m y ()) (Transducer m x y) (Transducer m x y)
   where join p t = liftTransducer "join" (maxUsableThreads p + maxUsableThreads t) $
                    \threads-> let (configuration, p', t', parallel) = optimalTwoParallelConfigurations threads p t
                                   join' source sink = if parallel
                                                       then do ((_, rest), out) <- pipe
                                                                                      (\buffer-> produce p' sink `parallelize`
                                                                                                 transduce t' source buffer)
                                                                                      getList
                                                               putList out sink
                                                               return rest 
                                                       else produce p' sink >> transduce t' source sink
                               in (configuration, join')
         sequence p t = liftTransducer "sequence" (maxUsableThreads p `max` maxUsableThreads t) $
                        \threads-> let (configuration, p', t') = optimalTwoSequentialConfigurations threads p t
                                       join' source sink = produce p' sink >> transduce t' source sink
                                   in (configuration, join')

instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
   => JoinableComponentPair TransducerType (ProducerType ()) TransducerType m [x] [y] (Transducer m x y) (Producer m y ()) (Transducer m x y)
   where join t p = liftTransducer "join" (maxUsableThreads t `max` maxUsableThreads p) $
                    \threads-> let (configuration, t', p', parallel) = optimalTwoParallelConfigurations threads t p
                                   join' source sink = if parallel
                                                       then do ((rest, ()), out) <- pipe
                                                                                       (\buffer-> transduce t' source sink
                                                                                                  `parallelize` produce p' buffer)
                                                                                       getList
                                                               putList out sink
                                                               return rest 
                                                       else do result <- transduce t' source sink
                                                               produce p' sink
                                                               return result
                               in (configuration, join')
         sequence t p = liftTransducer "sequence" (maxUsableThreads t `max` maxUsableThreads p) $
                        \threads-> let (configuration, t', p') = optimalTwoSequentialConfigurations threads t p
                                       join' source sink = do result <- transduce t' source sink
                                                              produce p' sink
                                                              return result
                                   in (configuration, join')

instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
   => JoinableComponentPair (ConsumerType ()) TransducerType TransducerType m [x] [y] (Consumer m x ()) (Transducer m x y) (Transducer m x y)
   where join c t = liftTransducer "join" (maxUsableThreads c + maxUsableThreads t) $
                    \threads-> let (configuration, c', t', parallel) = optimalTwoParallelConfigurations threads c t
                                   join' source sink = liftM (snd . fst) $
                                                       (if parallel then pipeP else pipe)
                                                          (\sink1-> pipe
                                                                       (tee source sink1)
                                                                       (\source-> transduce t' source sink))
                                                          (consume c')
                               in (configuration, join')
         sequence c t = liftTransducer "sequence" (maxUsableThreads c `max` maxUsableThreads t) $
                        \threads-> let (configuration, c', t') = optimalTwoSequentialConfigurations threads c t
                                       sequence' source sink = pipe
                                                                  (\buffer-> pipe
                                                                                (tee source buffer)
                                                                                (consume c'))
                                                                  getList
                                                               >>= \((rest, _), list)-> pipe
                                                                                           (putList list)
                                                                                           (\source-> transduce t' source sink)
                                                               >> return rest
                                   in (configuration, sequence')

instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
   => JoinableComponentPair TransducerType (ConsumerType ()) TransducerType m [x] [y] (Transducer m x y) (Consumer m x ()) (Transducer m x y)
   where join t c = join c t
         sequence t c = liftTransducer "sequence" (maxUsableThreads t `max` maxUsableThreads c) $
                        \threads-> let (configuration, t', c') = optimalTwoSequentialConfigurations threads t c
                                       sequence' source sink = pipe
                                                                  (\buffer-> pipe
                                                                                (tee source buffer)
                                                                                (\source-> transduce t' source sink))
                                                                  getList
                                                               >>= \((rest, _), list)-> pipe
                                                                                           (putList list)
                                                                                           (consume c')
                                                               >> return rest
                                   in (configuration, sequence')

instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
   => JoinableComponentPair (ProducerType ()) (ConsumerType ()) TransducerType m [x] [y] (Producer m y ()) (Consumer m x ()) (Transducer m x y)
   where join p c = liftTransducer "sequence" (maxUsableThreads p + maxUsableThreads c) $
                    \threads-> let (configuration, p', c', parallel) = optimalTwoParallelConfigurations threads p c
                                   join' source sink = if parallel then produce p' sink >> consume c' source >> return []
                                                       else parallelize (produce p' sink) (consume c' source) >> return []
                               in (configuration, join')
         sequence p c = liftTransducer "sequence" (maxUsableThreads p `max` maxUsableThreads c) $
                        \threads-> let (configuration, p', c') = optimalTwoSequentialConfigurations threads p c
                                       join' source sink = produce p' sink >> consume c' source >> return []
                                   in (configuration, join')

instance forall m x y. (ParallelizableMonad m, Typeable x, Typeable y)
   => JoinableComponentPair (ConsumerType ()) (ProducerType ()) TransducerType m [x] [y] (Consumer m x ()) (Producer m y ()) (Transducer m x y)
   where join c p = join p c
         sequence c p = liftTransducer "sequence" (maxUsableThreads c `max` maxUsableThreads p) $
                        \threads-> let (configuration, c', p') = optimalTwoSequentialConfigurations threads c p
                                       join' source sink = consume c' source >> produce p' sink >> return []
                                   in (configuration, join')

-- | Combinator 'prepend' converts the given producer to transducer that passes all its input through unmodified, except
-- | for prepending the output of the argument producer to it.
-- | 'prepend' /prefix/ = 'join' ('substitute' /prefix/) 'asis'
prepend :: forall m x r. (Monad m, Typeable x) => Producer m x r -> Transducer m x x
prepend prefix = liftTransducer "prepend" (maxUsableThreads prefix) $
                 \threads-> let prefix' = usingThreads threads prefix
                                prepend' source sink = produce prefix' sink >> pour source sink >> return []
                            in (ComponentConfiguration [AnyComponent prefix] threads (cost prefix'), prepend')

-- | Combinator 'append' converts the given producer to transducer that passes all its input through unmodified, finally
-- | appending to it the output of the argument producer.
-- | 'append' /suffix/ = 'join' 'asis' ('substitute' /suffix/)
append :: forall m x r. (Monad m, Typeable x) => Producer m x r -> Transducer m x x
append suffix = liftTransducer "append" (maxUsableThreads suffix) $
                \threads-> let suffix' = usingThreads threads suffix
                               append' source sink = pour source sink >> produce suffix' sink >> return []
                           in (ComponentConfiguration [AnyComponent suffix] threads (cost suffix'), append')

-- | The 'substitute' combinator converts its argument producer to a transducer that produces the same output, while
-- | consuming its entire input and ignoring it.
substitute :: forall m x y r. (Monad m, Typeable x, Typeable y) => Producer m y r -> Transducer m x y
substitute feed = liftTransducer "substitute" (maxUsableThreads feed) $
                  \threads-> let feed' = usingThreads threads feed
                                 substitute' source sink = consumeAndSuppress source >> produce feed' sink >> return []
                             in (ComponentConfiguration [AnyComponent feed] threads (cost feed'), substitute')

-- | The 'snot' (streaming not) combinator simply reverses the outputs of the argument splitter.
-- In other words, data that the argument splitter sends to its /true/ sink goes to the /false/ sink of the result, and vice versa.
snot :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
snot splitter = liftSectionSplitter "not" (maxUsableThreads splitter) $
                \threads-> let splitter' = usingThreads threads splitter
                               not source true false = splitSections splitter source false true
                           in (ComponentConfiguration [AnyComponent splitter'] threads (cost splitter'), not)

-- | The '>&' combinator sends the /true/ sink output of its left operand to the input of its right operand for further
-- splitting. Both operands' /false/ sinks are connected to the /false/ sink of the combined splitter, but any input
-- value to reach the /true/ sink of the combined component data must be deemed true by both splitters.
(>&) :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
s1 >& s2 = liftSimpleSplitter ">&" (maxUsableThreads s1 + maxUsableThreads s2) $
           \threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
                          s source true false = liftM fst $
                                                (if parallel then pipeP else pipe)
                                                   (\true-> split s1 source true false)
                                                   (\source-> split s2 source true false)
                      in (configuration, s)

-- | A '>|' combinator's input value can reach its /false/ sink only by going through both argument splitters' /false/
-- sinks.
(>|) :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
s1 >| s2 = liftSimpleSplitter ">|" (maxUsableThreads s1 + maxUsableThreads s2) $
           \threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
                          s source true false = liftM fst $
                                                (if parallel then pipeP else pipe)
                                                   (split s1 source true)
                                                   (\source-> split s2 source true false)
                      in (configuration, s)

-- | Combinator '&&' is a pairwise logical conjunction of two splitters run in parallel on the same input.
(&&) :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
(&&) = zipSplittersWith (Prelude.&&)

-- | Combinator '||' is a pairwise logical disjunction of two splitters run in parallel on the same input.
(||) :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
(||) = zipSplittersWith (Prelude.||)

ifs :: (ParallelizableMonad m, Typeable x, BranchComponent cc m x [x]) => Splitter m x -> cc -> cc -> cc
ifs s = combineBranches "if" (cost s) (\ parallel c1 c2 -> \source-> liftM fst3 $ splitConsumer "ifs" parallel s c1 c2 source)

wherever :: (ParallelizableMonad m, Typeable x) => Transducer m x x -> Splitter m x -> Transducer m x x
wherever t s = liftTransducer "wherever" (maxUsableThreads s + maxUsableThreads t) $
               \threads-> let (configuration, s', t', parallel) = optimalTwoParallelConfigurations threads s t
                              wherever' source sink = liftM fst3 $ splitConsumer "wherever" parallel s
                                                                      (\source-> transduce t source sink)
                                                                      (\source-> pour source sink)
                                                                      source
                          in (configuration, wherever')

unless :: (ParallelizableMonad m, Typeable x) => Transducer m x x -> Splitter m x -> Transducer m x x
unless t s = liftTransducer "unless" (maxUsableThreads s + maxUsableThreads t) $
             \threads-> let (configuration, s', t', parallel) = optimalTwoParallelConfigurations threads s t
                            unless' source sink = liftM fst3 $ splitConsumer "unless" parallel s
                                                                  (\source-> pour source sink)
                                                                  (\source-> transduce t source sink)
                                                                  source
                        in (configuration, unless')

select :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Transducer m x x
select s = liftTransducer "select" (maxUsableThreads s) $
           \threads-> let s' = usingThreads threads s
                          transduce' source sink = liftM fst3 $ splitConsumer "select" False s'
                                                                   (\source-> pour source sink)
                                                                   consumeAndSuppress
                                                                   source
                      in (ComponentConfiguration [AnyComponent s'] threads (cost s' + 1), transduce')

-- | The recursive combinator 'while' feeds the true sink of the argument splitter back to itself, modified by the
-- argument transducer. Data fed to the splitter's false sink is passed on unmodified.
while :: (ParallelizableMonad m, Typeable x) => Transducer m x x -> Splitter m x -> Transducer m x x
while t s = liftTransducer "while" (maxUsableThreads t + maxUsableThreads s) $
            \threads-> let (configuration, s', while'', parallel) = optimalTwoParallelConfigurations threads s while'
                           transduce' source sink = liftM fst3 $ splitConsumer "while" parallel s'
                                                                    (\source-> transduce while' source sink)
                                                                    (\source-> pour source sink)
                                                                    source
                           while' = t >-> while t s
                       in (configuration, transduce')

-- | The recursive combinator 'nestedIn' combines two splitters into a mutually recursive loop acting as a single splitter.
-- The true  sink of one of the argument splitters and false sink of the other become the true and false sinks of the loop.
-- The other two sinks are bound to the other splitter's source.
-- The use of 'nestedIn' makes sense only on hierarchically structured streams. If we gave it some input containing
-- a flat sequence of values, and assuming both component splitters are deterministic and stateless,
-- a value would either not loop at all or it would loop forever.
nestedIn :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
nestedIn s1 s2 = liftSimpleSplitter "nestedIn" (maxUsableThreads s1 + maxUsableThreads s2) $
                 \threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
                                s source true false = liftM fst $
                                                      (if parallel then pipeP else pipe)
                                                         (\false-> split s1' source true false)
                                                         (\source-> pipe (\true-> split s2' source true false)
                                                                         (\source-> split (nestedIn s1' s2') source true false))
                            in (configuration,s)

-- | The 'foreach' combinator is similar to the combinator 'ifs' in that it combines a splitter and two transducers into
-- another transducer. However, in this case the transducers are re-instantiated for each consecutive portion of the
-- input as the splitter chunks it up. Each contiguous portion of the input that the splitter sends to one of its two
-- sinks gets transducered through the appropriate argument transducer as that transducer's whole input. As soon as the
-- contiguous portion is finished, the transducer gets terminated.
foreach :: (ParallelizableMonad m, Typeable x, BranchComponent cc m x [x]) => Splitter m x -> cc -> cc -> cc
foreach s = combineBranches "foreach" (cost s)
               (\ parallel c1 c2 source-> liftM fst $ (if parallel then pipeP else pipe)
                                                         (transduce (splitterToMarker s) source)
                                                         (\source-> groupMarks source (\b-> if b then c1 else c2)))

-- | The 'having' combinator combines two pure splitters into a pure splitter. One splitter is used to chunk the input
-- into contiguous portions. Its /false/ sink is routed directly to the /false/ sink of the combined splitter. The
-- second splitter is instantiated and run on each portion of the input that goes to first splitter's /true/ sink. If
-- the second splitter sends any output at all to its /true/ sink, the whole input portion is passed on to the /true/
-- sink of the combined splitter, otherwise it goes to its /false/ sink.
having :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
having s1 s2 = liftSectionSplitter "having" (maxUsableThreads s1 + maxUsableThreads s2) $
               \threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
                              s source true false = liftM fst $
                                                    (if parallel then pipeP else pipe)
                                                       (transduce (splitterToMarker s1') source)
                                                       (\source-> groupMarks source (\b chunk-> if b then test chunk
                                                                                                else pourMaybe chunk false))
                                 where test chunk = pipe (\sink1-> pipe (tee chunk sink1) getList)
                                                         (\chunk-> pipe (\sink-> suppressProducer (split s2' chunk sink)) getList)
                                                    >>= \(([], chunk), (_, truePart))-> let chunk' = if null chunk
                                                                                                     then [Nothing]
                                                                                                     else map Just chunk
                                                                                        in (if null truePart
                                                                                            then putList chunk' false
                                                                                            else putList chunk' true)
                                                                                           >> return ()
                            in (configuration, s)

-- | The 'havingOnly' combinator is analogous to the 'having' combinator, but it succeeds and passes each chunk of the
-- input to its /true/ sink only if the second splitter sends no part of it to its /false/ sink.
havingOnly :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
havingOnly s1 s2 = liftSectionSplitter "havingOnly" (maxUsableThreads s1 + maxUsableThreads s2) $
                   \threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
                                  s source true false = liftM fst $
                                                        (if parallel then pipeP else pipe)
                                                           (transduce (splitterToMarker s1') source)
                                                           (\source-> groupMarks source (\b chunk-> if b then test chunk
                                                                                                    else pourMaybe chunk false))
                                     where test chunk = pipe (\sink1-> pipe (tee chunk sink1) getList)
                                                             (\chunk-> pipe (\sink-> suppressProducer
                                                                                        (\suppress-> split s2' chunk suppress sink))
                                                                            getList)
                                                        >>= \(([], chunk), (_, falsePart))-> let chunk' = if null chunk
                                                                                                          then [Nothing]
                                                                                                          else map Just chunk
                                                                                             in (if null falsePart
                                                                                                 then putList chunk' true
                                                                                                 else putList chunk' false)
                                                                                                >> return ()
                            in (configuration, s)

-- | The result of combinator 'first' behaves the same as the argument splitter up to and including the first portion of
-- the input which goes into the argument's /true/ sink. All input following the first true portion goes into the
-- /false/ sink.
first :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
first splitter = liftSectionSplitter "first" (maxUsableThreads splitter) $
                 \threads-> let splitter' = usingThreads threads splitter
                                configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
                                s source true false = liftM (\(x, y)-> y ++ x) $
                                                      pipeD "first" (transduce (splitterToMarker splitter') source)
                                                      (\source-> let get1 (x, False) = p false x get1
                                                                     get1 (x, True) = p true x get2
                                                                     get2 (x, True) = p true x get2
                                                                     get2 (x, False) = p false x get3
                                                                     get3 (x, _) = p false x get3
                                                                     p sink x succeed = put sink x
                                                                                        >>= cond (get source
                                                                                                  >>= maybe (return []) succeed)
                                                                                                 (return $ maybe [] (:[]) x)
                                                                 in get source >>= maybe (return []) get1)
                            in (configuration, s)

-- | The result of combinator 'uptoFirst' takes all input up to and including the first portion of the input which goes
-- into the argument's /true/ sink and feeds it to the result splitter's /true/ sink. All the rest of the input goes
-- into the /false/ sink. The only difference between 'last' and 'lastAndAfter' combinators is in where they direct the
-- /false/ portion of the input preceding the first /true/ part.
uptoFirst :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
uptoFirst splitter = liftSectionSplitter "uptoFirst" (maxUsableThreads splitter) $
                     \threads-> let splitter' = usingThreads threads splitter
                                    configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
                                    s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
                                                          pipeD "uptoFirst" (transduce (splitterToMarker splitter') source)
                                                          (\source-> let get1 q (x, False) = let q' = q |> x
                                                                                             in get source
                                                                                                >>= maybe
                                                                                                       (putQueue q' false)
                                                                                                       (get1 q')
                                                                         get1 q p@(x, True) = putQueue q true
                                                                                              >>= whenNull (get2 p)
                                                                         get2 (x, True) = p true x get2
                                                                         get2 (x, False) = p false x get3
                                                                         get3 (x, _) = p false x get3
                                                                         p sink x succeed = put sink x
                                                                                            >>= cond (get source
                                                                                                      >>= maybe (return []) succeed)
                                                                                                     (return [x])
                                                                     in get source >>= maybe (return []) (get1 Seq.empty))
                            in (configuration, s)

-- | The result of the combinator 'last' is a splitter which directs all input to its /false/ sink, up to the last
-- portion of the input which goes to its argument's /true/ sink. That portion of the input is the only one that goes to
-- the resulting component's /true/ sink.  The splitter returned by the combinator 'last' has to buffer the previous two
-- portions of its input, because it cannot know if a true portion of the input is the last one until it sees the end of
-- the input or another portion succeeding the previous one.
last :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
last splitter = liftSectionSplitter "last" (maxUsableThreads splitter) $
                \threads-> let splitter' = usingThreads threads splitter
                               configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
                               s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
                                                     pipeD "last" (transduce (splitterToMarker splitter') source)
                                                     (\source-> let get1 (x, False) = put false x
                                                                                      >>= cond (get source
                                                                                                >>= maybe (return []) get1)
                                                                                               (return [x])
                                                                    get1 p@(x, True) = get2 Seq.empty p
                                                                    get2 q (x, True) = let q' = q |> x
                                                                                       in get source
                                                                                          >>= maybe
                                                                                                 (putQueue q' true)
                                                                                                 (get2 q')
                                                                    get2 q p@(x, False) = get3 q Seq.empty p
                                                                    get3 qt qf (x, False) = let qf' = qf |> x
                                                                                            in get source
                                                                                               >>= maybe
                                                                                                      (putQueue qt true
                                                                                                       >> putQueue qf' false)
                                                                                                      (get3 qt qf')
                                                                    get3 qt qf p@(x, True) = do rest1 <- putQueue qt false
                                                                                                rest2 <- putQueue qf false 
                                                                                                if null rest1 Prelude.&& null rest2
                                                                                                   then get2 Seq.empty p
                                                                                                   else return (rest1 ++ rest2)
                                                                    p succeed = get source >>= maybe (return []) succeed
                                                                in p get1)
                            in (configuration, s)

-- | The result of the combinator 'lastAndAfter' is a splitter which directs all input to its /false/ sink, up to the
-- last portion of the input which goes to its argument's /true/ sink. That portion and the remainder of the input is fed
-- to the resulting component's /true/ sink. The difference between 'last' and 'lastAndAfter' combinators is where they
-- feed the /false/ portion of the input, if any, remaining after the last /true/ part.
lastAndAfter :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
lastAndAfter splitter = liftSectionSplitter "lastAndAfter" (maxUsableThreads splitter) $
                        \threads-> let splitter' = usingThreads threads splitter
                                       configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
                                       s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
                                                             pipeD "lastAndAfter" (transduce (splitterToMarker splitter') source)
                                                             (\source-> let get1 (x, False) = put false x
                                                                                              >>= cond (p get1) (return [x])
                                                                            get1 p@(x, True) = get2 Seq.empty p
                                                                            get2 q (x, True) = let q' = q |> x
                                                                                                    in get source
                                                                                                       >>= maybe
                                                                                                              (putQueue q' true)
                                                                                                              (get2 q')
                                                                            get2 q p@(x, False) = get3 q p
                                                                            get3 q (x, False) = let q' = q |> x
                                                                                                in get source
                                                                                                   >>= maybe
                                                                                                          (putQueue q' true)
                                                                                                          (get3 q')
                                                                            get3 q p@(x, True) = putQueue q false
                                                                                                 >>= whenNull (get1 p)
                                                                            p succeed = get source >>= maybe (return []) succeed
                                                                        in p get1)
                                   in (configuration, s)

-- | The 'prefix' combinator feeds its /true/ sink only the prefix of the input that its argument feeds to its /true/ sink.
-- All the rest of the input is dumped into the /false/ sink of the result.
prefix :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
prefix splitter = liftSectionSplitter "prefix" (maxUsableThreads splitter) $
                  \threads-> let splitter' = usingThreads threads splitter
                                 configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
                                 s source true false = liftM (\(x, y)-> y ++ x) $
                                                   pipeD "prefix" (transduce (splitterToMarker splitter') source)
                                                   (\source-> let get1 (x, False) = p false x get2
                                                                  get1 (x, True) = p true x get1
                                                                  get2 (x, _) = p false x get2
                                                                  p sink x succeed = put sink x
                                                                                     >>= cond (get source
                                                                                               >>= maybe (return []) succeed)
                                                                                              (return $ maybe [] (:[]) x)
                                                              in get source >>= maybe (return []) get1)
                             in (configuration, s)

-- | The 'suffix' combinator feeds its /true/ sink only the suffix of the input that its argument feeds to its /true/ sink.
-- All the rest of the input is dumped into the /false/ sink of the result.
suffix :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
suffix splitter = liftSectionSplitter "suffix" (maxUsableThreads splitter) $
                  \threads-> let splitter' = usingThreads threads splitter
                                 configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
                                 s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
                                                   pipeD "suffix" (transduce (splitterToMarker splitter') source)
                                                   (\source-> let get1 (x, False) = put false x >>= cond (p get1) (return [x])
                                                                  get1 (x, True) = get2 (Seq.singleton x)
                                                                  get2 q = get source
                                                                           >>= maybe (putQueue q true) (get3 q)
                                                                  get3 q (x, True) = get2 (q |> x)
                                                                  get3 q p@(x, False) = putQueue q false >>= whenNull (get1 p)
                                                                  p succeed = get source >>= maybe (return []) succeed
                                                              in p get1)
                             in (configuration, s)

-- | The 'even' combinator takes every input section that its argument splitters deems /true/, and feeds even ones into
-- its /true/ sink. The odd sections and parts of input that are /false/ according to its argument splitter are fed to
-- 'even' splitter's /false/ sink.
even :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
even splitter = liftSectionSplitter "even" (maxUsableThreads splitter) $
                   \threads-> let splitter' = usingThreads threads splitter
                                  configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
                                  s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
                                                        pipeD "even"
                                                           (transduce (splitterToMarker splitter') source)
                                                           (\source-> let get1 (x, False) = put false x
                                                                                            >>= cond (next get1) (return [x])
                                                                          get1 p@(x, True) = get2 p
                                                                          get2 (x, True) = put false x
                                                                                           >>= cond (next get2) (return [x])
                                                                          get2 p@(x, False) = get3 p
                                                                          get3 (x, False) = put false x
                                                                                            >>= cond (next get3) (return [x])
                                                                          get3 p@(x, True) = get4 p
                                                                          get4 (x, True) = put true x
                                                                                           >>= cond (next get4) (return [x])
                                                                          get4 p@(x, False) = get1 p
                                                                          next g = get source >>= maybe (return []) g
                                                                      in next get1)
                             in (configuration, s)

-- | Splitter 'startOf' issues an empty /true/ section at the beginning of every section considered /true/ by its
-- | argument splitter, otherwise the entire input goes into its /false/ sink.
startOf :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
startOf splitter = liftSectionSplitter "startOf" (maxUsableThreads splitter) $
                   \threads-> let splitter' = usingThreads threads splitter
                                  configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
                                  s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
                                                        pipeD "startOf"
                                                           (transduce (splitterToMarker splitter') source)
                                                           (\source-> let get1 (x, False) = put false x
                                                                                            >>= cond (next get1) (return [x])
                                                                          get1 p@(x, True) = put true Nothing >> get2 p
                                                                          get2 (x, True) = put false x
                                                                                           >>= cond (next get2) (return [x])
                                                                          get2 p@(x, False) = get1 p
                                                                          next g = get source >>= maybe (return []) g
                                                                      in next get1)
                              in (configuration, s)

-- | Splitter 'endOf' issues an empty /true/ section at the end of every section considered /true/ by its argument
-- | splitter, otherwise the entire input goes into its /false/ sink.
endOf :: (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x
endOf splitter = liftSectionSplitter "endOf" (maxUsableThreads splitter) $
                 \threads-> let splitter' = usingThreads threads splitter
                                configuration = ComponentConfiguration [AnyComponent splitter'] threads (cost splitter' + 2)
                                s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
                                                      pipeD "endOf"
                                                         (transduce (splitterToMarker splitter') source)
                                                         (\source-> let get1 (x, False) = put false x
                                                                                          >>= cond (next get1) (return [x])
                                                                        get1 p@(x, True) = get2 p
                                                                        get2 (x, True) = put false x
                                                                                         >>= cond (next get2) (return [x])
                                                                        get2 p@(x, False) = put true Nothing >> get1 p
                                                                        next g = get source >>= maybe (return []) g
                                                                    in next get1)
                            in (configuration, s)

-- | Combinator 'followedBy' treats its argument 'Splitter's as patterns components and returns a 'Splitter' that
-- matches their concatenation. A section of input is considered /true/ by the result iff its prefix is considered
-- /true/ by argument /s1/ and the rest of the section is considered /true/ by /s2/. The splitter /s2/ is started anew
-- after every section split to /true/ sink by /s1/.
followedBy :: forall m x. (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
followedBy s1 s2 = liftSectionSplitter "followedBy" (maxUsableThreads s1 + maxUsableThreads s2) $
                   \threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
                              in (configuration, followedBy' parallel s1' s2')
   where followedBy' parallel s1 s2 source true false
            = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
              (if parallel then pipeP else pipe)
                 (transduce (splitterToMarker s1) source)
                 (\source-> let get0 q = case Seq.viewl q
                                         of Seq.EmptyL -> get source >>= maybe (return []) get1
                                            (x, False) :< rest -> put false x
                                                                  >>= cond (get0 rest)
                                                                           (return $ Foldable.toList $ Seq.viewl $ fmap fst q)
                                            (x, True) :< rest -> get2 Seq.empty q
                                get1 (x, False) = put false x
                                                  >>= cond (get source >>= maybe (return []) get1)
                                                           (return [x])
                                get1 p@(x, True) = get2 Seq.empty (Seq.singleton p)
                                get2 q q' = case Seq.viewl q'
                                            of Seq.EmptyL -> get source
                                                             >>= maybe (testEnd q) (get2 q . Seq.singleton)
                                               (x, True) :< rest -> get2 (q |> x) rest
                                               (x, False) :< rest -> do ((q1, q2), n) <- pipeD "followedBy tail"
                                                                                               (get3 Seq.empty q') (test q)
                                                                        case n of Nothing -> putQueue q false
                                                                                             >>= whenNull (get0 (q1 >< q2))
                                                                                  Just n -> do put false Nothing
                                                                                               get0 (dropJust n q1 >< q2)
                                get3 q1 q2 sink = canPut sink
                                                  >>= cond (case Seq.viewl q2
                                                            of Seq.EmptyL -> get source
                                                                             >>= maybe (return (q1, q2))
                                                                                       (\p-> maybe (return True) (put sink) (fst p)
                                                                                                >> get3 (q1 |> p) q2 sink)
                                                               p :< rest -> maybe (return True) (put sink) (fst p)
                                                                            >> get3 (q1 |> p) rest sink)
                                                           (return (q1, q2))
                                testEnd q = do ((), n) <- pipeD "testEnd" (const $ return ()) (test q)
                                               case n of Nothing -> putQueue q false
                                                         _ -> return []
                                test q source = liftM snd $
                                                pipeD "follower"
                                                   (transduce (splitterToMarker s2) source)
                                                   (\source-> let get4 (_, False) = return Nothing
                                                                  get4 p@(_, True) = putQueue q true >> get5 0 p
                                                                  get5 n (x, False) = return (Just n)
                                                                  get5 n (Nothing, True) = get6 n
                                                                  get5 n (x, True) = put true x >> get6 (succ n)
                                                                  get6 n = get source
                                                                           >>= maybe
                                                                                  (return $ Just n)
                                                                                  (get5 n)
                                                              in get source >>= maybe (return Nothing) get4)
                                dropJust 0 q = q
                                dropJust n q = case Seq.viewl q of (Nothing, _) :< rest -> dropJust n rest
                                                                   (Just _, _) :< rest -> dropJust (pred n) rest
                           in get0 Seq.empty)

-- | Combinator '...' tracks the running balance of difference between the numbers of preceding inputs considered /true/
-- according to its first argument and the ones according to its second argument. The combinator passes to /true/ all
-- input values for which the difference balance is positive. This combinator is typically used with 'startOf' and
-- 'endOf' in order to count entire input sections and ignore their lengths.
(...) :: forall m x. (ParallelizableMonad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
s1 ... s2 = liftSectionSplitter "..." (maxUsableThreads s1 + maxUsableThreads s2) $
            \threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
                           s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
                                                 (if parallel then pipeP else pipe)
                                                    (transduce (splittersToPairMarker s1 s2) source)
                                                    (\source-> let next n = get source >>= maybe (return []) (state n)
                                                                   pass n x = (if n > 0 then put true x else put false x)
                                                                              >>= cond (next n) (return [x])
                                                                   pass' n x = (if n >= 0 then put true x else put false x)
                                                                               >>= cond (next n) (return [x])
                                                                   state n (Left (x, True, False)) = pass (succ n) (Just x)
                                                                   state n (Left (x, False, True)) = pass' (pred n) (Just x)
                                                                   state n (Left (x, True, True)) = pass' n (Just x)
                                                                   state n (Left (x, False, False)) = pass n (Just x)
                                                                   state n (Right (Left True)) = pass (succ n) Nothing
                                                                   state n (Right (Right True)) = pass (pred n) Nothing
                                                                   state n (Right _) = next n
                                                               in next 0)
                       in (configuration, s)

-- Helper functions

type Marker m x = Transducer m x (Maybe x, Bool)

splitterToMarker :: forall m x. (ParallelizableMonad m, Typeable x) => Splitter m x -> Marker m x
splitterToMarker s = liftTransducer "splitterToMarker" (maxUsableThreads s) $
                     \threads-> let s' = usingThreads threads s
                                    t source sink = liftM (\((x, y), z)-> z ++ y ++ x) $
                                                    pipeD "splitterToMarker true"
                                                       (\trueSink-> pipeD "splitterToMarker false"
                                                                       (splitSections s' source trueSink)
                                                                       (mark False))
                                                       (mark True)
                                             where mark b source = canPut sink
                                                                   >>= cond (get source
                                                                             >>= maybe (return [])
                                                                                       (\x-> put sink (x, b)
                                                                                             >>= cond
                                                                                                    (mark b source)
                                                                                                    (return $ maybe [] (: []) x)))
                                                                            (return [])
                                in (ComponentConfiguration [AnyComponent s'] threads (cost s' + 1), t)


splittersToPairMarker :: forall m x. (ParallelizableMonad m, Typeable x)
                         => Splitter m x -> Splitter m x -> Transducer m x (Either (x, Bool, Bool) (Either Bool Bool))
splittersToPairMarker s1 s2
   = liftTransducer "splittersToPairMarker" (maxUsableThreads s1 + maxUsableThreads s2) $
     \threads-> let (configuration, s1', s2', parallelize) = optimalTwoParallelConfigurations threads s1 s2
                    t source sink = liftM (\((((((([], l1), l2), l3), l4), l5), l6), l7)-> l7 ++ l6 ++ l5 ++ l4 ++ l3 ++ l2 ++ l1) $
                                    pipeD "splittersToPairMarker synchronize"
                                    (\sync->
                                     pipeD "splittersToPairMarker true1"
                                     (\true1->
                                      pipeD "splittersToPairMarker false1"
                                      (\false1->
                                       pipeD "splittersToPairMarker true2"
                                       (\true2->
                                        pipeD "splittersToPairMarker false2"
                                        (\false2->
                                         pipeD "splittersToPairMarker sink1"
                                         (\sink1->
                                          (if parallelize then pipeP else pipe)
                                          (\sink2-> tee source sink1 sink2)
                                          (\source2-> splitSections s2 source2 true2 false2))
                                         (\source1-> splitSections s1 source1 true1 false1))
                                        (mark sync False False))
                                       (mark sync False True))
                                      (mark sync True False))
                                     (mark sync True True))
                                    (synchronizeMarks Nothing sink)
                    synchronizeMarks :: Maybe (Seq (Maybe x, Bool), Bool)
                                     -> Sink c (Either (x, Bool, Bool) (Either Bool Bool)) -> Source c (Maybe x, Bool, Bool)
                                     -> Pipe c m [x]
                    synchronizeMarks state sink source = get source
                                                         >>= maybe
                                                                (assert (isNothing state) (return []))
                                                                (handleMark state sink source)
                    handleMark :: Maybe (Seq (Maybe x, Bool), Bool)
                               -> Sink c (Either (x, Bool, Bool) (Either Bool Bool)) -> Source c (Maybe x, Bool, Bool)
                               -> (Maybe x, Bool, Bool) -> Pipe c m [x]
                    handleMark Nothing sink source (x, pos, b)
                       = case x of Nothing -> put sink (Right $ if pos then Left b else Right b)
                                              >> synchronizeMarks Nothing sink source
                                   _ -> synchronizeMarks (Just (Seq.singleton (x, b), pos)) sink source
                    handleMark state@(Just (q, pos')) sink source mark@(x, pos, b)
                       | pos == pos' = synchronizeMarks (Just (q |> (x, b), pos')) sink source
                       | isNothing x = put sink (Right $ if pos then Left b else Right b)
                                       >> synchronizeMarks state sink source
                       | otherwise = case Seq.viewl q
                                     of Seq.EmptyL -> synchronizeMarks (Just (Seq.singleton (x, b), pos)) sink source
                                        (Nothing, b') :< rest -> put sink (Right $ if pos then Right b' else Left b')
                                                                 >>= cond
                                                                        (handleMark
                                                                           (if Seq.null rest then Nothing else Just (rest, pos'))
                                                                           sink
                                                                           source
                                                                           mark)
                                                                        (returnQueuedList q)
                                        (Just y, b') :< rest -> put sink (Left $ if pos then (y, b, b') else (y, b', b))
                                                                >>= cond
                                                                       (synchronizeMarks
                                                                           (if Seq.null rest then Nothing else Just (rest, pos'))
                                                                           sink
                                                                           source)
                                                                       (returnQueuedList q)
                    returnQueuedList q = return $ concatMap (maybe [] (:[]) . fst) $ Foldable.toList $ Seq.viewl q
                    mark sink first b source = let mark' = canPut sink
                                                           >>= cond
                                                                  (get source
                                                                   >>= maybe
                                                                          (return [])
                                                                          (\x-> put sink (x, first, b)
                                                                                   >>= cond mark' (return $ maybe [] (: []) x)))
                                                                  (return [])
                                               in mark'
                in (configuration, t)

pairMarkerToMaybePairMarker :: forall m x. (ParallelizableMonad m, Typeable x)
                               => Transducer m x (Either (x, Bool, Bool) (Either Bool Bool)) -> Transducer m x (Maybe x, Bool, Bool)
pairMarkerToMaybePairMarker t = liftTransducer "pairMarkerToMaybePairMarker" (maxUsableThreads t + 1) $
   \threads-> let t's = usingThreads threads t
                  t'p = usingThreads (threads - 1) t
                  parallel = threads > 1 Prelude.&& cost t'p <= cost t's
                  t' = if parallel then t'p else t's
                  cost' = if parallel then (cost t'p `max` 1) + 1 else cost t's + 1
                  transduce' source sink
                     = liftM (\(x, y)-> y ++ x) $
                       (if parallel then pipeP else pipe)
                          (transduce t source)
                          (\source-> let next state = get source >>= maybe (return []) state
                                         nextState2 l r d = get source
                                                            >>= maybe (put sink (Nothing, l, r) >> return []) (state2 l r d)
                                         state0 (Left (x, l, r)) = put sink (Just x, l, r)
                                                                   >>= cond (next $ state1 l r) (return [x])
                                         state0 v@(Right d) = state2 False False d v
                                         state1 _ _ (Left (x, l, r)) = put sink (Just x, l, r)
                                                                       >>= cond (next $ state1 l r) (return [x])
                                         state1 l r v@(Right d) = state2 l r d v
                                         state2 l r Left{} (Right d@(Left l')) = nextState2 l' r d
                                         state2 l r Left{} (Right (Right r')) = put sink (Nothing, l, r')
                                                                                >>= cond (next $ state1 l r') (return [])
                                         state2 l r Left{} t@(Left (x, l', r')) | l == l' = state1 l r t
                                                                                | otherwise = put sink (Nothing, l, r)
                                                                                              >>= cond
                                                                                                     (state1 l' r' t)
                                                                                                     (return [])
                                         state2 l r Right{} (Right d@(Right r')) = nextState2 l r' d
                                         state2 l r Right{} (Right (Left l')) = put sink (Nothing, l', r)
                                                                                >>= cond (next $ state1 l' r) (return [])
                                         state2 l r Right{} t@(Left (x, l', r')) | r == r' = state1 l r t
                                                                                 | otherwise = put sink (Nothing, l, r)
                                                                                               >>= cond
                                                                                                      (state1 l' r' t)
                                                                                                      (return [])
                                     in next state0)
              in (ComponentConfiguration [AnyComponent t'] threads cost', transduce')

zipSplittersWith :: (ParallelizableMonad m, Typeable x) => (Bool -> Bool -> Bool) -> Splitter m x -> Splitter m x -> Splitter m x
zipSplittersWith f s1 s2
   = liftSectionSplitter "zip" (maxUsableThreads s1 + maxUsableThreads s2) $
     \threads-> let (configuration, s1', s2', parallel) = optimalTwoParallelConfigurations threads s1 s2
                    s source true false = liftM (\(x, y)-> y ++ x) $
                                          (if parallel then pipeP else pipe)
                                             (transduce (pairMarkerToMaybePairMarker $ splittersToPairMarker s1 s2) source)
                                             (\source-> let split = get source >>= maybe (return []) test
                                                            test (x, b1, b2) = (if f b1 b2 then put true x else put false x)
                                                                               >>= cond split (return $ maybe [] (:[]) x)
                                                        in split)
                in (configuration, s)

groupMarks :: forall c m x y z. (ParallelizableMonad m, Typeable x, Typeable y, Eq y)
              => Source c (Maybe x, y) -> (y -> Source c x -> Pipe c m z) -> Pipe c m ()
groupMarks source getConsumer = getSuccess source startNew
   where startNew (mx, y) = do (nextPair, _) <- pipeD "groupMarks" (\sink-> pass sink mx y) (getConsumer y)
                               case nextPair of Just p -> startNew p
                                                Nothing -> return ()
         pass sink Nothing y = next sink y
         pass sink (Just x) y = put sink x >> next sink y
         next sink y = get source >>= maybe (return Nothing) (continue sink y)
         continue sink y (x, y') | y == y' = pass sink x y
         continue sink y p@(x, y') | y /= y' = return (Just p)

splitConsumer :: forall c m x r1 r2. (ParallelizableMonad m, Typeable x)
                 => String -> Bool -> Splitter m x -> (Source c x -> Pipe c m r1) -> (Source c x -> Pipe c m r2)
                           -> (Source c x -> Pipe c m ([x], r1, r2))
splitConsumer description parallel s trueConsumer falseConsumer = consumer'
   where consumer' source = (if parallel then pipeP else pipe)
                               (\false-> pipeD (description ++ " true") (\true-> split s source true false) trueConsumer)
                               falseConsumer
                            >>= \((extra, r1), r2)-> return (extra, r1, r2)

splitConsumerSections :: forall m x r1 r2. (ParallelizableMonad m, Typeable x) =>
                         String -> Splitter m x -> Consumer m (Maybe x) r1 -> Consumer m (Maybe x) r2 -> Consumer m x ([x], r1, r2)
splitConsumerSections description s trueConsumer falseConsumer
   = liftConsumer description (maxUsableThreads s + maxUsableThreads trueConsumer + maxUsableThreads falseConsumer) usingThreads
   where usingThreads :: Int -> (ComponentConfiguration, forall c. Source c x -> Pipe c m ([x], r1, r2))
         usingThreads threadCount = (configuration', consumer')
            where (configuration', (splitter', forkSplitter), (trueConsumer', forkTrue), (falseConsumer', forkFalse))
                     = optimalThreeParallelConfigurations threadCount s trueConsumer falseConsumer
                  consumer' source = (if forkFalse then pipeP else pipe)
                                        (\false-> (if forkTrue Prelude.|| forkSplitter then pipeP else pipe)
                                                     (\true-> splitSections s source true false)
                                                     (consume trueConsumer))
                                        (consume falseConsumer)
                                     >>= \((extra, r1), r2)-> return (extra, r1, r2)

putQueue :: forall c m x. (Monad m, Typeable x) => Seq x -> Sink c x -> Pipe c m [x]
putQueue q sink = putList (Foldable.toList (Seq.viewl q)) sink

getQueue :: forall c m x. (Monad m, Typeable x) => Source c x -> Pipe c m (Seq x)
getQueue source = let getOne q = get source >>= maybe (return q) (\x-> getOne (q |> x))
                  in getOne Seq.empty

pourMaybe :: forall c x m. (Monad m, Typeable x) => Source c x -> Sink c (Maybe x) -> Pipe c m ()
pourMaybe source sink = pour0
   where pour0 = canPut sink >>= flip when (get source >>= maybe (put sink Nothing >> return ()) pass)
         pour1 = canPut sink >>= flip when (getSuccess source pass)
         pass x = put sink (Just x) >> pour1


suppressProducer :: forall c m x r. (ParallelizableMonad m, Typeable x) => (Sink c x -> Pipe c m r) -> Pipe c m r
suppressProducer p = liftM fst $ pipeD "suppress" p consumeAndSuppress

fst3 :: (a, b, c) -> a
fst3 (a, b, c) = a