{- 
    Copyright 2010-2011 Mario Blazevic

    This file is part of the Streaming Component Combinators (SCC) project.

    The SCC project is free software: you can redistribute it and/or modify it under the terms of the GNU General Public
    License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
    version.

    SCC is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
    of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details.

    You should have received a copy of the GNU General Public License along with SCC.  If not, see
    <http://www.gnu.org/licenses/>.
-}

-- | This module defines 'Source' and 'Sink' types and 'pipe' functions that create them. The method 'get' on 'Source'
-- abstracts away 'Control.Concurrent.Coroutine.SuspensionFunctors.await', and the method 'put' on 'Sink' is a
-- higher-level abstraction of 'Control.Concurrent.Coroutine.SuspensionFunctors.yield'. With this arrangement, a single
-- coroutine can yield values to multiple sinks and await values from multiple sources with no need to change the
-- 'Control.Concurrent.Coroutine.Coroutine' functor; the only requirement is for each funtor of the sources and sinks
-- the coroutine uses to be an 'Control.Concurrent.Coroutine.AncestorFunctor' of the coroutine's functor. For example,
-- coroutine /zip/ that takes two sources and one sink would be declared like this:
-- 
-- @
-- zip :: forall m a1 a2 a3 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
--        => Source m a1 x -> Source m a2 y -> Sink m a3 (x, y) -> Coroutine d m ()
-- @
-- 
-- Sources, sinks, and coroutines communicating through them are all created using the 'pipe' function or one of its
-- variants. They effectively split the current coroutine into a producer-consumer coroutine pair. The producer gets a
-- new 'Sink' to write to and the consumer a new 'Source' to read from, in addition to all the streams that are visible
-- in the original coroutine. The following function, for example, uses the /zip/ coroutine above to add together the
-- values from two Integer sources:
--
-- @
-- add :: forall m a1 a2 a3 d. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
--        => Source m a1 Integer -> Source m a2 Integer -> Sink m a3 Integer -> Coroutine d m ()
-- add source1 source2 sink = do pipe
--                                  (\pairSink-> zip source1 source2 pairSink)              -- producer coroutine
--                                  (\pairSource-> mapStream (uncurry (+)) pairSource sink) -- consumer coroutine
--                               return ()
-- @

{-# LANGUAGE ScopedTypeVariables, Rank2Types, TypeFamilies, KindSignatures #-}
{-# OPTIONS_HADDOCK hide #-}

module Control.Concurrent.SCC.Streams
   (
    -- * Sink and Source types
    Sink, Source, SinkFunctor, SourceFunctor, AncestorFunctor,
    -- * Sink and Source constructors
    pipe, pipeP, pipeG, nullSink, nullSource,
    -- * Operations on sinks and sources
    -- ** Singleton operations
    get, getWith, peek, put, tryPut,
    -- ** Lifting functions
    liftSink, liftSource,
    -- ** Bulk operations
    -- *** Fetching and moving data
    pour, tee, teeSink,
    getList, putList, putQueue,
    getTicked, getWhile, getUntil, 
    pourTicked, pourParsed, pourWhile, pourUntil,
    -- *** Stream transformations
    mapSink, mapStream,
    mapMaybeStream, concatMapStream,
    mapStreamChunks, foldStream, mapAccumStream, concatMapAccumStream, partitionStream,
    -- *** Monadic stream transformations
    mapMStream, mapMStream_, mapMStreamChunks_,
    filterMStream, foldMStream, foldMStream_, unfoldMStream, unmapMStream_, unmapMStreamChunks_,
    zipWithMStream, parZipWithMStream,
   )
where

import Prelude hiding (takeWhile)
  
import qualified Control.Monad
import Control.Monad (liftM, when, unless, foldM)
import Data.Foldable (toList)
import Data.Monoid (Monoid, mempty, First(First, getFirst))
import Data.Monoid.Null (MonoidNull)
import Data.Maybe (mapMaybe)
import Data.List (mapAccumL)
import Data.Sequence (Seq, viewl)
import Text.ParserCombinators.Incremental

import Control.Monad.Parallel (MonadParallel(..))
import Control.Monad.Coroutine
import Control.Monad.Coroutine.SuspensionFunctors (EitherFunctor(..), Request, request, ParseRequest, requestParse,
                                                   nestedLazyParserRequestResolver)
import Control.Monad.Coroutine.Nested (AncestorFunctor(..), liftAncestor, seesawNested)

type SourceFunctor a x = EitherFunctor a (ParseRequest x)
type SinkFunctor a x = EitherFunctor a (Request [x] [x])

-- | A 'Sink' can be used to yield values from any nested `Coroutine` computation whose functor provably descends from
-- the functor /a/. It's the write-only end of a communication channel created by 'pipe'.
newtype Sink (m :: * -> *) a x =
   Sink
   {
   -- | This method puts a list of values into the `Sink`. The intervening 'Coroutine' computations suspend up to the
   -- 'pipe' invocation that has created the argument sink. The method returns all values that could not make it into
   -- the sink because of the sibling coroutine's death.
   putChunk :: forall d. AncestorFunctor a d => [x] -> Coroutine d m [x]
   }

-- | A 'Source' can be used to read values into any nested `Coroutine` computation whose functor provably descends from
-- the functor /a/. It's the read-only end of a communication channel created by 'pipe'.
newtype Source (m :: * -> *) a x =
   Source
   {
   -- | This method gets a list of values from the 'Source', as well as an indication of the next value if any. The
   -- first argument is a function that determines how many values should be consumed from the source. The function will
   -- keep being called until it returns @False@ or the current chunk gets completely consumed. If the current chunk is
   -- empty on call, a new one is obtained from the source. The intervening 'Coroutine' computations suspend all the way
   -- to the 'pipe' function invocation that created the source.
   foldChunk :: forall d y. (AncestorFunctor a d, MonoidNull y) => 
                Parser [x] y -> Coroutine d m (y, Maybe (Parser [x] y))
   }

-- | A disconnected sink that ignores all values 'put' into it.
nullSink :: forall m a x. Monad m => Sink m a x
nullSink = Sink{putChunk= const (return [])}

-- | An empty source whose 'get' always returns Nothing.
nullSource :: forall m a x. Monad m => Source m a x
nullSource = Source{foldChunk= \p-> return (mempty, Just p)}

-- | Converts a 'Sink' on the ancestor functor /a/ into a sink on the descendant functor /d/.
liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d x
liftSink s = Sink {putChunk= liftAncestor . (putChunk s :: [x] -> Coroutine d m [x])}

-- | Converts a 'Source' on the ancestor functor /a/ into a source on the descendant functor /d/.
liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d x
liftSource s = Source {foldChunk= liftAncestor . (foldChunk s 
                                                  :: forall y. MonoidNull y => 
                                                     Parser [x] y -> Coroutine d m (y, Maybe (Parser [x] y)))}

-- | The 'pipe' function splits the computation into two concurrent parts, /producer/ and /consumer/. The /producer/ is
-- given a 'Sink' to put values into, and /consumer/ a 'Source' to get those values from. Once producer and consumer
-- both complete, 'pipe' returns their paired results.
pipe :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
        (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
pipe = pipeG sequentialBinder

-- | The 'pipeP' function is equivalent to 'pipe', except it runs the /producer/ and the /consumer/ in parallel.
pipeP :: forall m a a1 a2 x r1 r2. 
         (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
         (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
pipeP = pipeG bindM2

-- | A generic version of 'pipe'. The first argument is used to combine two computation steps.
pipeG :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
         PairBinder m -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2)
      -> Coroutine a m (r1, r2)
pipeG run2 producer consumer =
   liftM (uncurry (flip (,))) $ 
   seesawNested run2 (nestedLazyParserRequestResolver) (consumer source) (producer sink)
   where sink = Sink {putChunk= \xs-> if null xs then return []
                                      else (liftAncestor (mapSuspension RightF (request xs) :: Coroutine a1 m [x]))}
         source = Source {foldChunk= fc}
         fc :: forall d y. (AncestorFunctor a2 d, MonoidNull y) => 
               Parser [x] y -> Coroutine d m (y, Maybe (Parser [x] y))
         fc t = liftAncestor (mapSuspension RightF (requestParse t) :: Coroutine a2 m (y, Maybe (Parser [x] y)))

-- | Function 'get' tries to get a value from the given 'Source' argument. The intervening 'Coroutine' computations
-- suspend all the way to the 'pipe' function invocation that created the source. The function returns 'Nothing' if
-- the argument source is empty.
get :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m (Maybe x)
get source = foldChunk source anyToken
             >>= \(r, _) -> return $ case r of [] -> Nothing
                                               ~[x] -> Just x

-- | Function 'peek' acts the same way as 'get', but doesn't actually consume the value from the source; sequential
-- calls to 'peek' will always return the same value.
peek :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m (Maybe x)
peek source = foldChunk source (lookAhead anyToken)
             >>= \(r, _) -> return $ case r of [] -> Nothing
                                               ~[x] -> Just x

-- | 'getList' returns the list of all values generated by the source.
getList :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m [x]
getList = getTicked acceptAll

-- | Invokes its first argument with the value it gets from the source, if there is any to get.
getWith :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()
getWith consumer source = get source >>= maybe (return ()) consumer

-- | Consumes values from the /source/ as long as the /parser/ accepts them.
getTicked :: forall m a d x. (Monad m, AncestorFunctor a d) => Parser [x] [x] -> Source m a x -> Coroutine d m [x]
getTicked parser source = loop return parser
   where loop cont p = foldChunk source p >>= proceed cont
         proceed cont (chunk, Nothing) = cont chunk
         proceed cont (chunk, Just p') = loop (cont . (chunk ++)) p'

-- | Consumes values from the /source/ as long as each satisfies the predicate, then returns their list.
getWhile :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Bool) -> Source m a x -> Coroutine d m [x]
getWhile predicate = getTicked (takeWhile (predicate . head))

-- | Consumes values from the /source/ until one of them satisfies the predicate or the source is emptied, then returns
-- the pair of the list of preceding values and maybe the one value that satisfied the predicate. The latter is not
-- consumed.
getUntil :: forall m a d x. (Monad m, AncestorFunctor a d) => 
            (x -> Bool) -> Source m a x -> Coroutine d m ([x], Maybe x)
getUntil f source = loop id
   where loop cont = foldChunk source (takeWhile (not . f . head) 
                                       `andThen` lookAhead (fmap (First . Just . head) anyToken 
                                                            <<|> return (First Nothing)))
                     >>= extract cont
         extract cont ((chunk, First mx), Nothing) = return (cont chunk, mx)
         extract cont ((chunk, First Nothing), Just{}) = loop (cont . (chunk ++))

-- | Copies all data from the /source/ argument into the /sink/ argument.
pour :: forall m a1 a2 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
        => Source m a1 x -> Sink m a2 x -> Coroutine d m ()
pour source sink = loop
   where loop = getChunk source >>= nullOrElse (return ()) ((>> loop) . putChunk sink)

-- | Like 'pour', copies data from the /source/ to the /sink/, but only as long as it satisfies the predicate.
pourTicked :: forall m a1 a2 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
              => Parser [x] [x] -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
pourTicked parser source sink = loop parser
   where loop p = foldChunk source p
                  >>= \(chunk, p')-> unless (null chunk) (putChunk sink chunk >> maybe (return ()) loop p')

-- | Parses the input data using the given parser and copies the results to output.
pourParsed :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
              => Parser [x] [y] -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
pourParsed parser source sink = loop parser
   where loop p = foldChunk source p
                  >>= \(chunk, p')-> unless (null chunk) (putChunk sink chunk >> maybe (return ()) loop p')

-- | Like 'pour', copies data from the /source/ to the /sink/, but only as long as it satisfies the predicate.
pourWhile :: forall m a1 a2 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
             => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
pourWhile f = pourTicked (takeWhile (f . head))

-- | Like 'pour', copies data from the /source/ to the /sink/, but only until one value satisfies the predicate. That
-- value is returned rather than copied.
pourUntil :: forall m a1 a2 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
             => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m (Maybe x)
pourUntil f source sink = loop
   where loop = foldChunk source (takeWhile (not . f . head)
                                  `andThen` lookAhead (fmap (First . Just . head) anyToken 
                                                       <<|> return (First Nothing)))
                >>= extract
         extract ((chunk, First mx), Nothing) = putList chunk sink >> return mx
         extract ((chunk, First Nothing), Just{}) = putChunk sink chunk >> loop

-- | 'mapStream' is like 'pour' that applies the function /f/ to each argument before passing it into the /sink/.
mapStream :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
           => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapStream f source sink = loop
   where loop = getChunk source >>= nullOrElse (return ()) ((>> loop) . putChunk sink . map f)

-- | An equivalent of 'Data.List.map' that works on a 'Sink' instead of a list. The argument function is applied to
-- every value vefore it's written to the sink argument.
mapSink :: forall m a x y. Monad m => (x -> y) -> Sink m a y -> Sink m a x
mapSink f sink = Sink{putChunk= \xs-> putChunk sink (map f xs) 
                                      >>= \rest-> return (dropExcept (length rest) xs)}
   where dropExcept :: forall z. Int -> [z] -> [z]
         dropExcept 0 _ = []
         dropExcept n list = snd (drop' list)
            where drop' :: [z] -> (Int, [z])
                  drop' [] = (0, [])
                  drop' (x:xs) = let r@(len, tl) = drop' xs in if len < n then (succ len, x:tl) else r
         

-- | 'mapMaybeStream' is to 'mapStream' like 'Data.Maybe.mapMaybe' is to 'Data.List.map'.
mapMaybeStream :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
                => (x -> Maybe y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapMaybeStream f source sink = mapMStreamChunks_ ((>> return ()) . putChunk sink . mapMaybe f) source

-- | 'concatMapStream' is to 'mapStream' like 'Data.List.concatMap' is to 'Data.List.map'.
concatMapStream :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
                   => (x -> [y]) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
concatMapStream f source sink = loop
   where loop = getChunk source >>= nullOrElse (return ()) ((>> loop) . putChunk sink . concatMap f)

-- | 'mapAccumStream' is similar to 'mapAccumL' except it reads the values from a 'Source' instead of a list
-- and writes the mapped values into a 'Sink' instead of returning another list.
mapAccumStream :: forall m a1 a2 d x y acc . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
                  => (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m acc
mapAccumStream f acc source sink = foldMStreamChunks (\a xs-> dispatch $ mapAccumL f a xs) acc source
   where dispatch (a, ys) = putChunk sink ys >> return a

-- | 'concatMapAccumStream' is a love child of 'concatMapStream' and 'mapAccumStream': it threads the accumulator like
-- the latter, but its argument function returns not a single value, but a list of values to write into the sink.
concatMapAccumStream :: forall m a1 a2 d x y acc . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
                  => (acc -> x -> (acc, [y])) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m acc
concatMapAccumStream f acc source sink = foldMStreamChunks (\a xs-> dispatch $ concatMapAccumL a xs) acc source
   where dispatch (a, ys) = putChunk sink ys >> return a
         concatMapAccumL s []        =  (s, [])
         concatMapAccumL s (x:xs)    =  (s'', y ++ ys)
            where (s',  y ) = f s x
                  (s'', ys) = concatMapAccumL s' xs

-- | Like 'mapStream' except it runs the argument function on whole chunks read from the input.
mapStreamChunks :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
                   => ([x] -> [y]) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapStreamChunks f source sink = loop
   where loop = getChunk source >>= nullOrElse (return ()) ((>> loop) . flip putList sink . f)

-- | 'mapMStream' is similar to 'Control.Monad.mapM'. It draws the values from a 'Source' instead of a list, writes the
-- mapped values to a 'Sink', and returns a 'Coroutine'.
mapMStream :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
              => (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapMStream f source sink = loop
   where loop = getChunk source >>= nullOrElse (return ()) ((>> loop) . (putChunk sink =<<) . mapM f)

-- | 'mapMStream_' is similar to 'Control.Monad.mapM_' except it draws the values from a 'Source' instead of a list and
-- works with 'Coroutine' instead of an arbitrary monad.
mapMStream_ :: forall m a d x . (Monad m, AncestorFunctor a d)
              => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()
mapMStream_ f = mapMStreamChunks_ (Control.Monad.mapM_ f)

-- | Like 'mapMStream_' except it runs the argument function on whole chunks read from the input.
mapMStreamChunks_ :: forall m a d x . (Monad m, AncestorFunctor a d)
              => ([x] -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()
mapMStreamChunks_ f source = loop
   where loop = getChunk source >>= nullOrElse (return ()) ((>> loop) . f)

-- | An equivalent of 'Control.Monad.filterM'. Draws the values from a 'Source' instead of a list, writes the filtered
-- values to a 'Sink', and returns a 'Coroutine'.
filterMStream :: forall m a1 a2 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
              => (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
filterMStream f source sink = mapMStream_ (\x-> f x >>= flip when (put sink x)) source

-- | Similar to 'Data.List.foldl', but reads the values from a 'Source' instead of a list.
foldStream :: forall m a d x acc . (Monad m, AncestorFunctor a d)
              => (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m acc
foldStream f acc source = loop acc
   where loop s = getChunk source >>= nullOrElse (return s) (loop . foldl f s)

-- | 'foldMStream' is similar to 'Control.Monad.foldM' except it draws the values from a 'Source' instead of a list and
-- works with 'Coroutine' instead of an arbitrary monad.
foldMStream :: forall m a d x acc . (Monad m, AncestorFunctor a d)
              => (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m acc
foldMStream f acc source = loop acc
   where loop a = getChunk source >>= nullOrElse (return a) ((loop =<<) . foldM f a)

-- | A variant of 'foldMStream' that discards the final result value.
foldMStream_ :: forall m a d x acc . (Monad m, AncestorFunctor a d)
                => (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m ()
foldMStream_ f acc source = foldMStream f acc source >> return ()

-- | Like 'foldMStream' but working on whole chunks from the argument source.
foldMStreamChunks :: forall m a d x acc . (Monad m, AncestorFunctor a d)
                     => (acc -> [x] -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m acc
foldMStreamChunks f acc source = loop acc
   where loop a = getChunk source >>= nullOrElse (return a) ((loop =<<) . f a)

-- | 'unfoldMStream' is a version of 'Data.List.unfoldr' that writes the generated values into a 'Sink' instead of
-- returning a list.
unfoldMStream :: forall m a d x acc . (Monad m, AncestorFunctor a d)
                 => (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a x -> Coroutine d m acc
unfoldMStream f acc sink = loop acc
   where loop a = f a >>= maybe (return a) (\(x, acc')-> put sink x >> loop acc')

-- | 'unmapMStream_' is opposite of 'mapMStream_'; it takes a 'Sink' instead of a 'Source' argument and writes the
-- generated values into it.
unmapMStream_ :: forall m a d x . (Monad m, AncestorFunctor a d)
                 => Coroutine d m (Maybe x) -> Sink m a x -> Coroutine d m ()
unmapMStream_ f sink = loop
   where loop = f >>= maybe (return ()) (\x-> put sink x >> loop)

-- | Like 'unmapMStream_' but writing whole chunks of generated data into the argument sink.
unmapMStreamChunks_ :: forall m a d x . (Monad m, AncestorFunctor a d)
                       => Coroutine d m [x] -> Sink m a x -> Coroutine d m ()
unmapMStreamChunks_ f sink = loop >> return ()
   where loop = f >>= nullOrElse (return []) ((>>= nullOrElse loop return) . putChunk sink)

-- | Equivalent to 'Data.List.partition'. Takes a 'Source' instead of a list argument and partitions its contents into
-- the two 'Sink' arguments.
partitionStream :: forall m a1 a2 a3 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
                   => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
partitionStream f source true false = mapMStreamChunks_ partitionChunk source
   where partitionChunk (x:rest) = partitionTo (f x) x rest
         partitionChunk [] = error "Chunks cannot be empty!"
         partitionTo False x chunk = let (falses, rest) = break f chunk
                                     in putChunk false (x:falses)
                                        >> case rest of y:ys -> partitionTo True y ys
                                                        [] -> return ()
         partitionTo True x chunk = let (trues, rest) = span f chunk
                                    in putChunk true (x:trues)
                                       >> case rest of y:ys -> partitionTo False y ys
                                                       [] -> return ()

-- | 'zipWithMStream' is similar to 'Control.Monad.zipWithM' except it draws the values from two 'Source' arguments
-- instead of two lists, sends the results into a 'Sink', and works with 'Coroutine' instead of an arbitrary monad.
zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
                  => (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()
zipWithMStream f source1 source2 sink = loop
   where loop = do mx <- get source1
                   my <- get source2
                   case (mx, my) of (Just x, Just y) -> f x y >>= put sink >> loop
                                    _ -> return ()

-- | 'parZipWithMStream' is equivalent to 'zipWithMStream', but it consumes the two sources in parallel.
parZipWithMStream :: forall m a1 a2 a3 d x y z.
                     (MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
                     => (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()
parZipWithMStream f source1 source2 sink = loop
   where loop = bindM2 zipMaybe (get source1) (get source2)
         zipMaybe (Just x) (Just y) = f x y >>= put sink >> loop
         zipMaybe _ _ = return ()

-- | 'tee' is similar to 'pour' except it distributes every input value from its source argument into its both sink
-- arguments.
tee :: forall m a1 a2 a3 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
       => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
tee source sink1 sink2 = distribute
   where distribute = getChunk source
                      >>= nullOrElse (return ()) (\x-> putChunk sink1 x >> putChunk sink2 x >> distribute)

-- | Every value 'put' into a 'teeSink' result sink goes into its both argument sinks: @put (teeSink s1 s2) x@ is
-- equivalent to @put s1 x >> put s2 x@. The 'putChunk' method returns the list of values that couldn't fit into the
-- second sink.
teeSink :: forall m a1 a2 a3 x . (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3)
           => Sink m a1 x -> Sink m a2 x -> Sink m a3 x
teeSink s1 s2 = Sink{putChunk= teeChunk}
   where teeChunk :: forall d. AncestorFunctor a3 d => [x] -> Coroutine d m [x]
         teeChunk x = putChunk s1' x >> putChunk s2' x
         s1' :: Sink m a3 x
         s1' = liftSink s1
         s2' :: Sink m a3 x
         s2' = liftSink s2

-- | This function puts a value into the given `Sink`. The intervening 'Coroutine' computations suspend up
-- to the 'pipe' invocation that has created the argument sink.
put :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> x -> Coroutine d m ()
put sink x = putChunk sink [x] >> return ()

-- | Like 'put', but returns a Bool that determines if the sink is still active.
tryPut :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> x -> Coroutine d m Bool
tryPut sink x = liftM null $ putChunk sink [x]

-- | 'putList' puts an entire list into its /sink/ argument. If the coroutine fed by the /sink/ dies, the remainder of
-- the argument list is returned.
putList :: forall m a d x. (Monad m, AncestorFunctor a d) => [x] -> Sink m a x -> Coroutine d m [x]
putList l sink = if null l then return [] else putChunk sink l

getChunk :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m [x]
getChunk source = liftM fst $ foldChunk source acceptAll

-- | Like 'putList', except it puts the contents of the given 'Data.Sequence.Seq' into the sink.
putQueue :: forall m a d x. (Monad m, AncestorFunctor a d) => Seq x -> Sink m a x -> Coroutine d m [x]
putQueue q sink = putList (toList (viewl q)) sink

nullOrElse :: a -> ([x] -> a) -> [x] -> a
nullOrElse nullCase _ [] = nullCase
nullOrElse _ f list = f list