{- 
    Copyright 2009-2016 Mario Blazevic

    This file is part of the Streaming Component Combinators (SCC) project.

    The SCC project is free software: you can redistribute it and/or modify it under the terms of the GNU General Public
    License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
    version.

    SCC is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
    of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details.

    You should have received a copy of the GNU General Public License along with SCC.  If not, see
    <http://www.gnu.org/licenses/>.
-}

-- | This module defines various 'Control.Concurrent.SCC.Coroutine' types that operate on
-- 'Control.Concurrent.SCC.Streams.Sink' and 'Control.Concurrent.SCC.Streams.Source' values. The simplest of the bunch
-- are 'Consumer' and 'Producer' types, which respectively operate on a single source or sink. A 'Transducer' has access
-- both to a 'Control.Concurrent.SCC.Streams.Source' to read from and a 'Control.Concurrent.SCC.Streams.Sink' to write
-- into. Finally, a 'Splitter' reads from a single source and writes all of the input, without any modifications, into
-- two sinks of the same type.
-- 

{-# LANGUAGE ScopedTypeVariables, KindSignatures, RankNTypes,
             MultiParamTypeClasses, FlexibleContexts, FlexibleInstances,
             FunctionalDependencies, TypeFamilies #-}

module Control.Concurrent.SCC.Types (
   -- * Component types
   Performer(..),
   OpenConsumer, Consumer(..), OpenProducer, Producer(..),
   OpenTransducer, Transducer(..), OpenSplitter, Splitter(..),
   Boundary(..), Markup(..), Parser,
   PipeableComponentPair (compose), Branching (combineBranches),
   -- * Component constructors
   isolateConsumer, isolateProducer, isolateTransducer, isolateSplitter,
   oneToOneTransducer, statelessTransducer, statelessChunkTransducer, statefulTransducer,
   statelessSplitter, statefulSplitter,
   )
where

import Control.Category (Category(id), (>>>))
import qualified Control.Category as Category
import Control.Monad (liftM)
import Data.Monoid (Monoid(..))

import Control.Monad.Coroutine
import Data.Monoid.Null (MonoidNull)
import Data.Monoid.Factorial (FactorialMonoid)

import Control.Concurrent.SCC.Streams

type OpenConsumer m a d x r = (AncestorFunctor a d, Monoid x) => Source m a x -> Coroutine d m r
type OpenProducer m a d x r = (AncestorFunctor a d, Monoid x) => Sink m a x -> Coroutine d m r
type OpenTransducer m a1 a2 d x y r = 
   (AncestorFunctor a1 d, AncestorFunctor a2 d, Monoid x, Monoid y) => Source m a1 x -> Sink m a2 y -> Coroutine d m r
type OpenSplitter m a1 a2 a3 d x r =
   (AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d, Monoid x) =>
   Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m r

-- | A coroutine that has no inputs nor outputs - and therefore may not suspend at all, which means it's not really a
-- /co/routine.
newtype Performer m r = Performer {Performer m r -> m r
perform :: m r}

-- | A coroutine that consumes values from a 'Control.Concurrent.SCC.Streams.Source'.
newtype Consumer m x r = Consumer {Consumer m x r
-> forall (a :: * -> *) (d :: * -> *).
   (AncestorFunctor a d, Monoid x) =>
   Source m a x -> Coroutine d m r
consume :: forall a d. OpenConsumer m a d x r}

-- | A coroutine that produces values and puts them into a 'Control.Concurrent.SCC.Streams.Sink'.
newtype Producer m x r = Producer {Producer m x r
-> forall (a :: * -> *) (d :: * -> *).
   (AncestorFunctor a d, Monoid x) =>
   Sink m a x -> Coroutine d m r
produce :: forall a d. OpenProducer m a d x r}

-- | The 'Transducer' type represents coroutines that transform a data stream.  Execution of 'transduce' must continue
-- consuming the given 'Control.Concurrent.SCC.Streams.Source' and feeding the 'Control.Concurrent.SCC.Streams.Sink' as
-- long as there is any data in the source.
newtype Transducer m x y = Transducer {Transducer m x y
-> forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
   (AncestorFunctor a1 d, AncestorFunctor a2 d, Monoid x, Monoid y) =>
   Source m a1 x -> Sink m a2 y -> Coroutine d m ()
transduce :: forall a1 a2 d. OpenTransducer m a1 a2 d x y ()}

-- | The 'Splitter' type represents coroutines that distribute the input stream acording to some criteria. A splitter
-- should distribute only the original input data, and feed it into the sinks in the same order it has been read from
-- the source. Furthermore, the input source should be entirely consumed and fed into the two sinks.
-- 
-- A splitter can be used in two ways: as a predicate to determine which portions of its input stream satisfy a certain
-- property, or as a chunker to divide the input stream into chunks. In the former case, the predicate is considered
-- true for exactly those parts of the input that are written to its /true/ sink. In the latter case, a chunk is a
-- contiguous section of the input stream that is written exclusively to one sink, either true or false. A 'mempty'
-- value written to either of the two sinks can also terminate the chunk written to the other sink.
newtype Splitter m x = Splitter {Splitter m x
-> forall (a1 :: * -> *) (a2 :: * -> *) (a3 :: * -> *)
          (d :: * -> *).
   (AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d,
    Monoid x) =>
   Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
split :: forall a1 a2 a3 d. OpenSplitter m a1 a2 a3 d x ()}

-- | A 'Boundary' value is produced to mark either a 'Start' and 'End' of a region of data, or an arbitrary 'Point' in
-- data. A 'Point' is semantically equivalent to a 'Start' immediately followed by 'End'.
data Boundary y = Start y | End y | Point y deriving (Boundary y -> Boundary y -> Bool
(Boundary y -> Boundary y -> Bool)
-> (Boundary y -> Boundary y -> Bool) -> Eq (Boundary y)
forall y. Eq y => Boundary y -> Boundary y -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Boundary y -> Boundary y -> Bool
$c/= :: forall y. Eq y => Boundary y -> Boundary y -> Bool
== :: Boundary y -> Boundary y -> Bool
$c== :: forall y. Eq y => Boundary y -> Boundary y -> Bool
Eq, Int -> Boundary y -> ShowS
[Boundary y] -> ShowS
Boundary y -> String
(Int -> Boundary y -> ShowS)
-> (Boundary y -> String)
-> ([Boundary y] -> ShowS)
-> Show (Boundary y)
forall y. Show y => Int -> Boundary y -> ShowS
forall y. Show y => [Boundary y] -> ShowS
forall y. Show y => Boundary y -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Boundary y] -> ShowS
$cshowList :: forall y. Show y => [Boundary y] -> ShowS
show :: Boundary y -> String
$cshow :: forall y. Show y => Boundary y -> String
showsPrec :: Int -> Boundary y -> ShowS
$cshowsPrec :: forall y. Show y => Int -> Boundary y -> ShowS
Show)

-- | Type of values in a markup-up stream. The 'Content' constructor wraps the actual data.
data Markup y x = Content x | Markup (Boundary y) deriving (Markup y x -> Markup y x -> Bool
(Markup y x -> Markup y x -> Bool)
-> (Markup y x -> Markup y x -> Bool) -> Eq (Markup y x)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall y x. (Eq x, Eq y) => Markup y x -> Markup y x -> Bool
/= :: Markup y x -> Markup y x -> Bool
$c/= :: forall y x. (Eq x, Eq y) => Markup y x -> Markup y x -> Bool
== :: Markup y x -> Markup y x -> Bool
$c== :: forall y x. (Eq x, Eq y) => Markup y x -> Markup y x -> Bool
Eq)

-- | A parser is a transducer that marks up its input.
type Parser m x b = Transducer m x [Markup b x]

instance Functor Boundary where
   fmap :: (a -> b) -> Boundary a -> Boundary b
fmap a -> b
f (Start a
b) = b -> Boundary b
forall y. y -> Boundary y
Start (a -> b
f a
b)
   fmap a -> b
f (End a
b) = b -> Boundary b
forall y. y -> Boundary y
End (a -> b
f a
b)
   fmap a -> b
f (Point a
b) = b -> Boundary b
forall y. y -> Boundary y
Point (a -> b
f a
b)

instance Functor (Markup y) where
   fmap :: (a -> b) -> Markup y a -> Markup y b
fmap a -> b
f (Content a
x) = b -> Markup y b
forall y x. x -> Markup y x
Content (a -> b
f a
x)
   fmap a -> b
_ (Markup Boundary y
b) = Boundary y -> Markup y b
forall y x. Boundary y -> Markup y x
Markup Boundary y
b

instance (Show x , Show y) => Show (Markup y x) where
   showsPrec :: Int -> Markup y x -> ShowS
showsPrec Int
_ (Content x
x) String
s = x -> ShowS
forall a. Show a => a -> ShowS
shows x
x String
s
   showsPrec Int
_ (Markup Boundary y
b) String
s = Char
'[' Char -> ShowS
forall a. a -> [a] -> [a]
: Boundary y -> ShowS
forall a. Show a => a -> ShowS
shows Boundary y
b (Char
']' Char -> ShowS
forall a. a -> [a] -> [a]
: String
s)

-- instance Monad m => Category (Transducer m) where
--    id = Transducer pour
--    t1 . t2 = isolateTransducer $ \source sink-> 
--              pipe (transduce t2 source) (\source'-> transduce t1 source' sink)
--              >> return ()

-- | Creates a proper 'Consumer' from a function that is, but can't be proven to be, an 'OpenConsumer'.
isolateConsumer :: forall m x r. (Monad m, Monoid x) =>
                   (forall d. Functor d => Source m d x -> Coroutine d m r) -> Consumer m x r
isolateConsumer :: (forall (d :: * -> *).
 Functor d =>
 Source m d x -> Coroutine d m r)
-> Consumer m x r
isolateConsumer forall (d :: * -> *). Functor d => Source m d x -> Coroutine d m r
c = (forall (a :: * -> *) (d :: * -> *). OpenConsumer m a d x r)
-> Consumer m x r
forall (m :: * -> *) x r.
(forall (a :: * -> *) (d :: * -> *). OpenConsumer m a d x r)
-> Consumer m x r
Consumer forall (a :: * -> *) (d :: * -> *). OpenConsumer m a d x r
consume'
   where consume' :: forall a d. OpenConsumer m a d x r
         consume' :: Source m a x -> Coroutine d m r
consume' Source m a x
source = let source' :: Source m d x
                               source' :: Source m d x
source' = Source m a x -> Source m d x
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Source m a x -> Source m d x
liftSource Source m a x
source
                           in Source m d x -> Coroutine d m r
forall (d :: * -> *). Functor d => Source m d x -> Coroutine d m r
c Source m d x
source'

-- | Creates a proper 'Producer' from a function that is, but can't be proven to be, an 'OpenProducer'.
isolateProducer :: forall m x r. (Monad m, Monoid x) =>
                   (forall d. Functor d => Sink m d x -> Coroutine d m r) -> Producer m x r
isolateProducer :: (forall (d :: * -> *). Functor d => Sink m d x -> Coroutine d m r)
-> Producer m x r
isolateProducer forall (d :: * -> *). Functor d => Sink m d x -> Coroutine d m r
p = (forall (a :: * -> *) (d :: * -> *). OpenProducer m a d x r)
-> Producer m x r
forall (m :: * -> *) x r.
(forall (a :: * -> *) (d :: * -> *). OpenProducer m a d x r)
-> Producer m x r
Producer forall (a :: * -> *) (d :: * -> *). OpenProducer m a d x r
produce'
   where produce' :: forall a d. OpenProducer m a d x r
         produce' :: Sink m a x -> Coroutine d m r
produce' Sink m a x
sink = let sink' :: Sink m d x
                             sink' :: Sink m d x
sink' = Sink m a x -> Sink m d x
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Sink m a x -> Sink m d x
liftSink Sink m a x
sink
                         in Sink m d x -> Coroutine d m r
forall (d :: * -> *). Functor d => Sink m d x -> Coroutine d m r
p Sink m d x
sink'

-- | Creates a proper 'Transducer' from a function that is, but can't be proven to be, an 'OpenTransducer'.
isolateTransducer :: forall m x y. (Monad m, Monoid x) =>
                     (forall d. Functor d => Source m d x -> Sink m d y -> Coroutine d m ()) -> Transducer m x y
isolateTransducer :: (forall (d :: * -> *).
 Functor d =>
 Source m d x -> Sink m d y -> Coroutine d m ())
-> Transducer m x y
isolateTransducer forall (d :: * -> *).
Functor d =>
Source m d x -> Sink m d y -> Coroutine d m ()
t = (forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
 OpenTransducer m a1 a2 d x y ())
-> Transducer m x y
forall (m :: * -> *) x y.
(forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
 OpenTransducer m a1 a2 d x y ())
-> Transducer m x y
Transducer forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
OpenTransducer m a1 a2 d x y ()
transduce'
   where transduce' :: forall a1 a2 d. OpenTransducer m a1 a2 d x y ()
         transduce' :: Source m a1 x -> Sink m a2 y -> Coroutine d m ()
transduce' Source m a1 x
source Sink m a2 y
sink = let source' :: Source m d x
                                      source' :: Source m d x
source' = Source m a1 x -> Source m d x
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Source m a x -> Source m d x
liftSource Source m a1 x
source
                                      sink' :: Sink m d y
                                      sink' :: Sink m d y
sink' = Sink m a2 y -> Sink m d y
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Sink m a x -> Sink m d x
liftSink Sink m a2 y
sink
                                  in Source m d x -> Sink m d y -> Coroutine d m ()
forall (d :: * -> *).
Functor d =>
Source m d x -> Sink m d y -> Coroutine d m ()
t Source m d x
source' Sink m d y
sink'

-- | Creates a proper 'Splitter' from a function that is, but can't be proven to be, an 'OpenSplitter'.
isolateSplitter :: forall m x b. (Monad m, Monoid x) =>
                   (forall d. Functor d => Source m d x -> Sink m d x -> Sink m d x -> Coroutine d m ())
                   -> Splitter m x
isolateSplitter :: (forall (d :: * -> *).
 Functor d =>
 Source m d x -> Sink m d x -> Sink m d x -> Coroutine d m ())
-> Splitter m x
isolateSplitter forall (d :: * -> *).
Functor d =>
Source m d x -> Sink m d x -> Sink m d x -> Coroutine d m ()
s = (forall (a1 :: * -> *) (a2 :: * -> *) (a3 :: * -> *) (d :: * -> *).
 OpenSplitter m a1 a2 a3 d x ())
-> Splitter m x
forall (m :: * -> *) x.
(forall (a1 :: * -> *) (a2 :: * -> *) (a3 :: * -> *) (d :: * -> *).
 OpenSplitter m a1 a2 a3 d x ())
-> Splitter m x
Splitter forall (a1 :: * -> *) (a2 :: * -> *) (a3 :: * -> *) (d :: * -> *).
OpenSplitter m a1 a2 a3 d x ()
split'
   where split' :: forall a1 a2 a3 d. OpenSplitter m a1 a2 a3 d x ()
         split' :: Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
split' Source m a1 x
source Sink m a2 x
true Sink m a3 x
false = let source' :: Source m d x
                                        source' :: Source m d x
source' = Source m a1 x -> Source m d x
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Source m a x -> Source m d x
liftSource Source m a1 x
source
                                        true' :: Sink m d x
                                        true' :: Sink m d x
true' = Sink m a2 x -> Sink m d x
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Sink m a x -> Sink m d x
liftSink Sink m a2 x
true
                                        false' :: Sink m d x
                                        false' :: Sink m d x
false' = Sink m a3 x -> Sink m d x
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Sink m a x -> Sink m d x
liftSink Sink m a3 x
false
                                    in Source m d x -> Sink m d x -> Sink m d x -> Coroutine d m ()
forall (d :: * -> *).
Functor d =>
Source m d x -> Sink m d x -> Sink m d x -> Coroutine d m ()
s Source m d x
source' Sink m d x
true' Sink m d x
false'


-- | Class 'PipeableComponentPair' applies to any two components that can be combined into a third component with the
-- following properties:
--
--    * The input of the result, if any, becomes the input of the first component.
--
--    * The output produced by the first child component is consumed by the second child component.
--
--    * The result output, if any, is the output of the second component.
class PipeableComponentPair (m :: * -> *) w c1 c2 c3 | c1 c2 -> c3, c1 c3 -> c2, c2 c3 -> c2,
                                                       c1 -> m w, c2 -> m w, c3 -> m
   where compose :: PairBinder m -> c1 -> c2 -> c3

instance {-# OVERLAPS #-} forall m x. (Monad m, Monoid x) =>
         PipeableComponentPair m x (Producer m x ()) (Consumer m x ()) (Performer m ())
   where compose :: PairBinder m
-> Producer m x () -> Consumer m x () -> Performer m ()
compose PairBinder m
binder Producer m x ()
p Consumer m x ()
c = let performPipe :: Coroutine Naught m ((), ())
                                  performPipe :: Coroutine Naught m ((), ())
performPipe = PairBinder m
-> (Sink m (SinkFunctor Naught x) x
    -> Coroutine (SinkFunctor Naught x) m ())
-> (Source m (SourceFunctor Naught x) x
    -> Coroutine (SourceFunctor Naught x) m ())
-> Coroutine Naught m ((), ())
forall (m :: * -> *) (a :: * -> *) (a1 :: * -> *) (a2 :: * -> *) x
       r1 r2.
(Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x,
 a2 ~ SourceFunctor a x) =>
PairBinder m
-> (Sink m a1 x -> Coroutine a1 m r1)
-> (Source m a2 x -> Coroutine a2 m r2)
-> Coroutine a m (r1, r2)
pipeG PairBinder m
binder (Producer m x ()
-> forall (a :: * -> *) (d :: * -> *).
   (AncestorFunctor a d, Monoid x) =>
   Sink m a x -> Coroutine d m ()
forall (m :: * -> *) x r.
Producer m x r
-> forall (a :: * -> *) (d :: * -> *). OpenProducer m a d x r
produce Producer m x ()
p) (Consumer m x ()
-> forall (a :: * -> *) (d :: * -> *).
   (AncestorFunctor a d, Monoid x) =>
   Source m a x -> Coroutine d m ()
forall (m :: * -> *) x r.
Consumer m x r
-> forall (a :: * -> *) (d :: * -> *). OpenConsumer m a d x r
consume Consumer m x ()
c)
                              in m () -> Performer m ()
forall (m :: * -> *) r. m r -> Performer m r
Performer (Coroutine Naught m ((), ()) -> m ((), ())
forall (m :: * -> *) x. Monad m => Coroutine Naught m x -> m x
runCoroutine Coroutine Naught m ((), ())
performPipe m ((), ()) -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())

instance  {-# OVERLAPS #-} forall m x r. (Monad m, Monoid x) =>
         PipeableComponentPair m x (Producer m x ()) (Consumer m x r) (Performer m r)
   where compose :: PairBinder m -> Producer m x () -> Consumer m x r -> Performer m r
compose PairBinder m
binder Producer m x ()
p Consumer m x r
c = let performPipe :: Coroutine Naught m ((), r)
                                  performPipe :: Coroutine Naught m ((), r)
performPipe = PairBinder m
-> (Sink m (SinkFunctor Naught x) x
    -> Coroutine (SinkFunctor Naught x) m ())
-> (Source m (SourceFunctor Naught x) x
    -> Coroutine (SourceFunctor Naught x) m r)
-> Coroutine Naught m ((), r)
forall (m :: * -> *) (a :: * -> *) (a1 :: * -> *) (a2 :: * -> *) x
       r1 r2.
(Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x,
 a2 ~ SourceFunctor a x) =>
PairBinder m
-> (Sink m a1 x -> Coroutine a1 m r1)
-> (Source m a2 x -> Coroutine a2 m r2)
-> Coroutine a m (r1, r2)
pipeG PairBinder m
binder (Producer m x ()
-> forall (a :: * -> *) (d :: * -> *).
   (AncestorFunctor a d, Monoid x) =>
   Sink m a x -> Coroutine d m ()
forall (m :: * -> *) x r.
Producer m x r
-> forall (a :: * -> *) (d :: * -> *). OpenProducer m a d x r
produce Producer m x ()
p) (Consumer m x r
-> forall (a :: * -> *) (d :: * -> *).
   (AncestorFunctor a d, Monoid x) =>
   Source m a x -> Coroutine d m r
forall (m :: * -> *) x r.
Consumer m x r
-> forall (a :: * -> *) (d :: * -> *). OpenConsumer m a d x r
consume Consumer m x r
c)
                              in m r -> Performer m r
forall (m :: * -> *) r. m r -> Performer m r
Performer ((((), r) -> r) -> m ((), r) -> m r
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM ((), r) -> r
forall a b. (a, b) -> b
snd (m ((), r) -> m r) -> m ((), r) -> m r
forall a b. (a -> b) -> a -> b
$ Coroutine Naught m ((), r) -> m ((), r)
forall (m :: * -> *) x. Monad m => Coroutine Naught m x -> m x
runCoroutine Coroutine Naught m ((), r)
performPipe)

instance  {-# OVERLAPS #-} forall m x r. (Monad m, Monoid x) =>
         PipeableComponentPair m x (Producer m x r) (Consumer m x ()) (Performer m r)
   where compose :: PairBinder m -> Producer m x r -> Consumer m x () -> Performer m r
compose PairBinder m
binder Producer m x r
p Consumer m x ()
c = let performPipe :: Coroutine Naught m (r, ())
                                  performPipe :: Coroutine Naught m (r, ())
performPipe = PairBinder m
-> (Sink m (SinkFunctor Naught x) x
    -> Coroutine (SinkFunctor Naught x) m r)
-> (Source m (SourceFunctor Naught x) x
    -> Coroutine (SourceFunctor Naught x) m ())
-> Coroutine Naught m (r, ())
forall (m :: * -> *) (a :: * -> *) (a1 :: * -> *) (a2 :: * -> *) x
       r1 r2.
(Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x,
 a2 ~ SourceFunctor a x) =>
PairBinder m
-> (Sink m a1 x -> Coroutine a1 m r1)
-> (Source m a2 x -> Coroutine a2 m r2)
-> Coroutine a m (r1, r2)
pipeG PairBinder m
binder (Producer m x r
-> forall (a :: * -> *) (d :: * -> *).
   (AncestorFunctor a d, Monoid x) =>
   Sink m a x -> Coroutine d m r
forall (m :: * -> *) x r.
Producer m x r
-> forall (a :: * -> *) (d :: * -> *). OpenProducer m a d x r
produce Producer m x r
p) (Consumer m x ()
-> forall (a :: * -> *) (d :: * -> *).
   (AncestorFunctor a d, Monoid x) =>
   Source m a x -> Coroutine d m ()
forall (m :: * -> *) x r.
Consumer m x r
-> forall (a :: * -> *) (d :: * -> *). OpenConsumer m a d x r
consume Consumer m x ()
c)
                              in m r -> Performer m r
forall (m :: * -> *) r. m r -> Performer m r
Performer (((r, ()) -> r) -> m (r, ()) -> m r
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (r, ()) -> r
forall a b. (a, b) -> a
fst (m (r, ()) -> m r) -> m (r, ()) -> m r
forall a b. (a -> b) -> a -> b
$ Coroutine Naught m (r, ()) -> m (r, ())
forall (m :: * -> *) x. Monad m => Coroutine Naught m x -> m x
runCoroutine Coroutine Naught m (r, ())
performPipe)

instance (Monad m, Monoid x, Monoid y) => 
         PipeableComponentPair m y (Transducer m x y) (Consumer m y r) (Consumer m x r)
   where compose :: PairBinder m
-> Transducer m x y -> Consumer m y r -> Consumer m x r
compose PairBinder m
binder Transducer m x y
t Consumer m y r
c = (forall (d :: * -> *).
 Functor d =>
 Source m d x -> Coroutine d m r)
-> Consumer m x r
forall (m :: * -> *) x r.
(Monad m, Monoid x) =>
(forall (d :: * -> *).
 Functor d =>
 Source m d x -> Coroutine d m r)
-> Consumer m x r
isolateConsumer ((forall (d :: * -> *).
  Functor d =>
  Source m d x -> Coroutine d m r)
 -> Consumer m x r)
-> (forall (d :: * -> *).
    Functor d =>
    Source m d x -> Coroutine d m r)
-> Consumer m x r
forall a b. (a -> b) -> a -> b
$ \Source m d x
source-> 
                              (((), r) -> r) -> Coroutine d m ((), r) -> Coroutine d m r
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM ((), r) -> r
forall a b. (a, b) -> b
snd (Coroutine d m ((), r) -> Coroutine d m r)
-> Coroutine d m ((), r) -> Coroutine d m r
forall a b. (a -> b) -> a -> b
$
                              PairBinder m
-> (Sink m (SinkFunctor d y) y -> Coroutine (SinkFunctor d y) m ())
-> (Source m (SourceFunctor d y) y
    -> Coroutine (SourceFunctor d y) m r)
-> Coroutine d m ((), r)
forall (m :: * -> *) (a :: * -> *) (a1 :: * -> *) (a2 :: * -> *) x
       r1 r2.
(Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x,
 a2 ~ SourceFunctor a x) =>
PairBinder m
-> (Sink m a1 x -> Coroutine a1 m r1)
-> (Source m a2 x -> Coroutine a2 m r2)
-> Coroutine a m (r1, r2)
pipeG PairBinder m
binder
                                 (Transducer m x y
-> Source m d x
-> Sink m (SinkFunctor d y) y
-> Coroutine (SinkFunctor d y) m ()
forall (m :: * -> *) x y.
Transducer m x y
-> forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
   OpenTransducer m a1 a2 d x y ()
transduce Transducer m x y
t Source m d x
source)
                                 (Consumer m y r
-> forall (a :: * -> *) (d :: * -> *).
   (AncestorFunctor a d, Monoid y) =>
   Source m a y -> Coroutine d m r
forall (m :: * -> *) x r.
Consumer m x r
-> forall (a :: * -> *) (d :: * -> *). OpenConsumer m a d x r
consume Consumer m y r
c)

instance (Monad m, Monoid x, Monoid y) => 
         PipeableComponentPair m x (Producer m x r) (Transducer m x y) (Producer m y r)
   where compose :: PairBinder m
-> Producer m x r -> Transducer m x y -> Producer m y r
compose PairBinder m
binder Producer m x r
p Transducer m x y
t = (forall (d :: * -> *). Functor d => Sink m d y -> Coroutine d m r)
-> Producer m y r
forall (m :: * -> *) x r.
(Monad m, Monoid x) =>
(forall (d :: * -> *). Functor d => Sink m d x -> Coroutine d m r)
-> Producer m x r
isolateProducer ((forall (d :: * -> *). Functor d => Sink m d y -> Coroutine d m r)
 -> Producer m y r)
-> (forall (d :: * -> *).
    Functor d =>
    Sink m d y -> Coroutine d m r)
-> Producer m y r
forall a b. (a -> b) -> a -> b
$ \Sink m d y
sink-> 
                              ((r, ()) -> r) -> Coroutine d m (r, ()) -> Coroutine d m r
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (r, ()) -> r
forall a b. (a, b) -> a
fst (Coroutine d m (r, ()) -> Coroutine d m r)
-> Coroutine d m (r, ()) -> Coroutine d m r
forall a b. (a -> b) -> a -> b
$
                              PairBinder m
-> (Sink m (SinkFunctor d x) x -> Coroutine (SinkFunctor d x) m r)
-> (Source m (SourceFunctor d x) x
    -> Coroutine (SourceFunctor d x) m ())
-> Coroutine d m (r, ())
forall (m :: * -> *) (a :: * -> *) (a1 :: * -> *) (a2 :: * -> *) x
       r1 r2.
(Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x,
 a2 ~ SourceFunctor a x) =>
PairBinder m
-> (Sink m a1 x -> Coroutine a1 m r1)
-> (Source m a2 x -> Coroutine a2 m r2)
-> Coroutine a m (r1, r2)
pipeG PairBinder m
binder
                                 (Producer m x r
-> forall (a :: * -> *) (d :: * -> *).
   (AncestorFunctor a d, Monoid x) =>
   Sink m a x -> Coroutine d m r
forall (m :: * -> *) x r.
Producer m x r
-> forall (a :: * -> *) (d :: * -> *). OpenProducer m a d x r
produce Producer m x r
p)
                                 (\Source m (SourceFunctor d x) x
source-> Transducer m x y
-> Source m (SourceFunctor d x) x
-> Sink m d y
-> Coroutine (SourceFunctor d x) m ()
forall (m :: * -> *) x y.
Transducer m x y
-> forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
   OpenTransducer m a1 a2 d x y ()
transduce Transducer m x y
t Source m (SourceFunctor d x) x
source Sink m d y
sink)

instance (Monad m, Monoid x, Monoid y, Monoid z) => 
         PipeableComponentPair m y (Transducer m x y) (Transducer m y z) (Transducer m x z)
   where compose :: PairBinder m
-> Transducer m x y -> Transducer m y z -> Transducer m x z
compose PairBinder m
binder Transducer m x y
t1 Transducer m y z
t2 = 
            (forall (d :: * -> *).
 Functor d =>
 Source m d x -> Sink m d z -> Coroutine d m ())
-> Transducer m x z
forall (m :: * -> *) x y.
(Monad m, Monoid x) =>
(forall (d :: * -> *).
 Functor d =>
 Source m d x -> Sink m d y -> Coroutine d m ())
-> Transducer m x y
isolateTransducer ((forall (d :: * -> *).
  Functor d =>
  Source m d x -> Sink m d z -> Coroutine d m ())
 -> Transducer m x z)
-> (forall (d :: * -> *).
    Functor d =>
    Source m d x -> Sink m d z -> Coroutine d m ())
-> Transducer m x z
forall a b. (a -> b) -> a -> b
$ \Source m d x
source Sink m d z
sink-> 
            PairBinder m
-> (Sink m (SinkFunctor d y) y -> Coroutine (SinkFunctor d y) m ())
-> (Source m (SourceFunctor d y) y
    -> Coroutine (SourceFunctor d y) m ())
-> Coroutine d m ((), ())
forall (m :: * -> *) (a :: * -> *) (a1 :: * -> *) (a2 :: * -> *) x
       r1 r2.
(Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x,
 a2 ~ SourceFunctor a x) =>
PairBinder m
-> (Sink m a1 x -> Coroutine a1 m r1)
-> (Source m a2 x -> Coroutine a2 m r2)
-> Coroutine a m (r1, r2)
pipeG PairBinder m
binder (Transducer m x y
-> Source m d x
-> Sink m (SinkFunctor d y) y
-> Coroutine (SinkFunctor d y) m ()
forall (m :: * -> *) x y.
Transducer m x y
-> forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
   OpenTransducer m a1 a2 d x y ()
transduce Transducer m x y
t1 Source m d x
source) (\Source m (SourceFunctor d y) y
source'-> Transducer m y z
-> Source m (SourceFunctor d y) y
-> Sink m d z
-> Coroutine (SourceFunctor d y) m ()
forall (m :: * -> *) x y.
Transducer m x y
-> forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
   OpenTransducer m a1 a2 d x y ()
transduce Transducer m y z
t2 Source m (SourceFunctor d y) y
source' Sink m d z
sink)
            Coroutine d m ((), ()) -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | 'Branching' is a type class representing all types that can act as consumers, namely 'Consumer',
-- 'Transducer', and 'Splitter'.
class Branching c (m :: * -> *) x r | c -> m x where
   -- | 'combineBranches' is used to combine two values of 'Branch' class into one, using the given 'Consumer' binary
   -- combinator.
   combineBranches :: (forall d. (PairBinder m ->
                                  (forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x r) ->
                                  (forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x r) ->
                                  (forall a. OpenConsumer m a d x r))) ->
                      PairBinder m -> c -> c -> c

instance forall m x r. Monad m => Branching (Consumer m x r) m x r where
   combineBranches :: (forall (d :: * -> *).
 PairBinder m
 -> (forall (a :: * -> *) (d' :: * -> *).
     AncestorFunctor d d' =>
     OpenConsumer m a d' x r)
 -> (forall (a :: * -> *) (d' :: * -> *).
     AncestorFunctor d d' =>
     OpenConsumer m a d' x r)
 -> forall (a :: * -> *). OpenConsumer m a d x r)
-> PairBinder m
-> Consumer m x r
-> Consumer m x r
-> Consumer m x r
combineBranches forall (d :: * -> *).
PairBinder m
-> (forall (a :: * -> *) (d' :: * -> *).
    AncestorFunctor d d' =>
    OpenConsumer m a d' x r)
-> (forall (a :: * -> *) (d' :: * -> *).
    AncestorFunctor d d' =>
    OpenConsumer m a d' x r)
-> forall (a :: * -> *). OpenConsumer m a d x r
combinator PairBinder m
binder Consumer m x r
c1 Consumer m x r
c2 = (forall (a :: * -> *) (d :: * -> *). OpenConsumer m a d x r)
-> Consumer m x r
forall (m :: * -> *) x r.
(forall (a :: * -> *) (d :: * -> *). OpenConsumer m a d x r)
-> Consumer m x r
Consumer ((forall (a :: * -> *) (d :: * -> *). OpenConsumer m a d x r)
 -> Consumer m x r)
-> (forall (a :: * -> *) (d :: * -> *). OpenConsumer m a d x r)
-> Consumer m x r
forall a b. (a -> b) -> a -> b
$ PairBinder m
-> (forall (a :: * -> *) (d' :: * -> *).
    AncestorFunctor d d' =>
    OpenConsumer m a d' x r)
-> (forall (a :: * -> *) (d' :: * -> *).
    AncestorFunctor d d' =>
    OpenConsumer m a d' x r)
-> forall (a :: * -> *). OpenConsumer m a d x r
forall (d :: * -> *).
PairBinder m
-> (forall (a :: * -> *) (d' :: * -> *).
    AncestorFunctor d d' =>
    OpenConsumer m a d' x r)
-> (forall (a :: * -> *) (d' :: * -> *).
    AncestorFunctor d d' =>
    OpenConsumer m a d' x r)
-> forall (a :: * -> *). OpenConsumer m a d x r
combinator PairBinder m
binder (Consumer m x r
-> forall (a :: * -> *) (d :: * -> *). OpenConsumer m a d x r
forall (m :: * -> *) x r.
Consumer m x r
-> forall (a :: * -> *) (d :: * -> *). OpenConsumer m a d x r
consume Consumer m x r
c1) (Consumer m x r
-> forall (a :: * -> *) (d :: * -> *). OpenConsumer m a d x r
forall (m :: * -> *) x r.
Consumer m x r
-> forall (a :: * -> *) (d :: * -> *). OpenConsumer m a d x r
consume Consumer m x r
c2)

instance forall m x y. Monad m => Branching (Transducer m x y) m x () where
   combineBranches :: (forall (d :: * -> *).
 PairBinder m
 -> (forall (a :: * -> *) (d' :: * -> *).
     AncestorFunctor d d' =>
     OpenConsumer m a d' x ())
 -> (forall (a :: * -> *) (d' :: * -> *).
     AncestorFunctor d d' =>
     OpenConsumer m a d' x ())
 -> forall (a :: * -> *). OpenConsumer m a d x ())
-> PairBinder m
-> Transducer m x y
-> Transducer m x y
-> Transducer m x y
combineBranches forall (d :: * -> *).
PairBinder m
-> (forall (a :: * -> *) (d' :: * -> *).
    AncestorFunctor d d' =>
    OpenConsumer m a d' x ())
-> (forall (a :: * -> *) (d' :: * -> *).
    AncestorFunctor d d' =>
    OpenConsumer m a d' x ())
-> forall (a :: * -> *). OpenConsumer m a d x ()
combinator PairBinder m
binder Transducer m x y
t1 Transducer m x y
t2
      = let transduce' :: forall a1 a2 d. OpenTransducer m a1 a2 d x y ()
            transduce' :: Source m a1 x -> Sink m a2 y -> Coroutine d m ()
transduce' Source m a1 x
source Sink m a2 y
sink = PairBinder m
-> (forall (a :: * -> *) (d' :: * -> *).
    AncestorFunctor d d' =>
    OpenConsumer m a d' x ())
-> (forall (a :: * -> *) (d' :: * -> *).
    AncestorFunctor d d' =>
    OpenConsumer m a d' x ())
-> Source m a1 x
-> Coroutine d m ()
forall (d :: * -> *).
PairBinder m
-> (forall (a :: * -> *) (d' :: * -> *).
    AncestorFunctor d d' =>
    OpenConsumer m a d' x ())
-> (forall (a :: * -> *) (d' :: * -> *).
    AncestorFunctor d d' =>
    OpenConsumer m a d' x ())
-> forall (a :: * -> *). OpenConsumer m a d x ()
combinator PairBinder m
binder
                                        (\Source m a x
source'-> Transducer m x y -> Source m a x -> Sink m d y -> Coroutine d' m ()
forall (m :: * -> *) x y.
Transducer m x y
-> forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
   OpenTransducer m a1 a2 d x y ()
transduce Transducer m x y
t1 Source m a x
source' Sink m d y
sink')
                                        (\Source m a x
source'-> Transducer m x y -> Source m a x -> Sink m d y -> Coroutine d' m ()
forall (m :: * -> *) x y.
Transducer m x y
-> forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
   OpenTransducer m a1 a2 d x y ()
transduce Transducer m x y
t2 Source m a x
source' Sink m d y
sink')
                                        Source m a1 x
source
               where sink' :: Sink m d y
                     sink' :: Sink m d y
sink' = Sink m a2 y -> Sink m d y
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Sink m a x -> Sink m d x
liftSink Sink m a2 y
sink
        in (forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
 OpenTransducer m a1 a2 d x y ())
-> Transducer m x y
forall (m :: * -> *) x y.
(forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
 OpenTransducer m a1 a2 d x y ())
-> Transducer m x y
Transducer forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
OpenTransducer m a1 a2 d x y ()
transduce'

instance forall m x b. Monad m => Branching (Splitter m x) m x () where
   combineBranches :: (forall (d :: * -> *).
 PairBinder m
 -> (forall (a :: * -> *) (d' :: * -> *).
     AncestorFunctor d d' =>
     OpenConsumer m a d' x ())
 -> (forall (a :: * -> *) (d' :: * -> *).
     AncestorFunctor d d' =>
     OpenConsumer m a d' x ())
 -> forall (a :: * -> *). OpenConsumer m a d x ())
-> PairBinder m -> Splitter m x -> Splitter m x -> Splitter m x
combineBranches forall (d :: * -> *).
PairBinder m
-> (forall (a :: * -> *) (d' :: * -> *).
    AncestorFunctor d d' =>
    OpenConsumer m a d' x ())
-> (forall (a :: * -> *) (d' :: * -> *).
    AncestorFunctor d d' =>
    OpenConsumer m a d' x ())
-> forall (a :: * -> *). OpenConsumer m a d x ()
combinator PairBinder m
binder Splitter m x
s1 Splitter m x
s2
      = let split' :: forall a1 a2 a3 d. OpenSplitter m a1 a2 a3 d x ()
            split' :: Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
split' Source m a1 x
source Sink m a2 x
true Sink m a3 x
false = PairBinder m
-> (forall (a :: * -> *) (d' :: * -> *).
    AncestorFunctor d d' =>
    OpenConsumer m a d' x ())
-> (forall (a :: * -> *) (d' :: * -> *).
    AncestorFunctor d d' =>
    OpenConsumer m a d' x ())
-> Source m a1 x
-> Coroutine d m ()
forall (d :: * -> *).
PairBinder m
-> (forall (a :: * -> *) (d' :: * -> *).
    AncestorFunctor d d' =>
    OpenConsumer m a d' x ())
-> (forall (a :: * -> *) (d' :: * -> *).
    AncestorFunctor d d' =>
    OpenConsumer m a d' x ())
-> forall (a :: * -> *). OpenConsumer m a d x ()
combinator PairBinder m
binder
                                          (\Source m a x
source'-> Splitter m x
-> Source m a x -> Sink m d x -> Sink m d x -> Coroutine d' m ()
forall (m :: * -> *) x.
Splitter m x
-> forall (a1 :: * -> *) (a2 :: * -> *) (a3 :: * -> *)
          (d :: * -> *).
   OpenSplitter m a1 a2 a3 d x ()
split Splitter m x
s1 Source m a x
source' Sink m d x
true' Sink m d x
false')
                                          (\Source m a x
source'-> Splitter m x
-> Source m a x -> Sink m d x -> Sink m d x -> Coroutine d' m ()
forall (m :: * -> *) x.
Splitter m x
-> forall (a1 :: * -> *) (a2 :: * -> *) (a3 :: * -> *)
          (d :: * -> *).
   OpenSplitter m a1 a2 a3 d x ()
split Splitter m x
s2 Source m a x
source' Sink m d x
true' Sink m d x
false')
                                          Source m a1 x
source
               where true' :: Sink m d x
                     true' :: Sink m d x
true' = Sink m a2 x -> Sink m d x
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Sink m a x -> Sink m d x
liftSink Sink m a2 x
true
                     false' :: Sink m d x
                     false' :: Sink m d x
false' = Sink m a3 x -> Sink m d x
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Sink m a x -> Sink m d x
liftSink Sink m a3 x
false
        in (forall (a1 :: * -> *) (a2 :: * -> *) (a3 :: * -> *) (d :: * -> *).
 OpenSplitter m a1 a2 a3 d x ())
-> Splitter m x
forall (m :: * -> *) x.
(forall (a1 :: * -> *) (a2 :: * -> *) (a3 :: * -> *) (d :: * -> *).
 OpenSplitter m a1 a2 a3 d x ())
-> Splitter m x
Splitter forall (a1 :: * -> *) (a2 :: * -> *) (a3 :: * -> *) (d :: * -> *).
OpenSplitter m a1 a2 a3 d x ()
split'

-- | Function 'oneToOneTransducer' takes a function that maps one input value to one output value each, and lifts it
-- into a 'Transducer'.
oneToOneTransducer :: (Monad m, FactorialMonoid x, Monoid y) => (x -> y) -> Transducer m x y
oneToOneTransducer :: (x -> y) -> Transducer m x y
oneToOneTransducer x -> y
f = (forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
 OpenTransducer m a1 a2 d x y ())
-> Transducer m x y
forall (m :: * -> *) x y.
(forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
 OpenTransducer m a1 a2 d x y ())
-> Transducer m x y
Transducer ((x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
forall (m :: * -> *) (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *) x
       y.
(Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d,
 AncestorFunctor a2 d) =>
(x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapStream x -> y
f)

-- | Function 'statelessTransducer' takes a function that maps one input value into a list of output values, and
-- lifts it into a 'Transducer'.
statelessTransducer :: Monad m => (x -> y) -> Transducer m [x] y
statelessTransducer :: (x -> y) -> Transducer m [x] y
statelessTransducer x -> y
f = (forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
 OpenTransducer m a1 a2 d [x] y ())
-> Transducer m [x] y
forall (m :: * -> *) x y.
(forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
 OpenTransducer m a1 a2 d x y ())
-> Transducer m x y
Transducer (([x] -> y) -> Source m a1 [x] -> Sink m a2 y -> Coroutine d m ()
forall (m :: * -> *) (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *) x
       y.
(Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d,
 AncestorFunctor a2 d) =>
(x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapStream ([y] -> y
forall a. Monoid a => [a] -> a
mconcat ([y] -> y) -> ([x] -> [y]) -> [x] -> y
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (x -> y) -> [x] -> [y]
forall a b. (a -> b) -> [a] -> [b]
map x -> y
f))

-- | Function 'statelessTransducer' takes a function that maps one input value into a list of output values, and
-- lifts it into a 'Transducer'.
statelessChunkTransducer :: Monad m => (x -> y) -> Transducer m x y
statelessChunkTransducer :: (x -> y) -> Transducer m x y
statelessChunkTransducer x -> y
f = (forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
 OpenTransducer m a1 a2 d x y ())
-> Transducer m x y
forall (m :: * -> *) x y.
(forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
 OpenTransducer m a1 a2 d x y ())
-> Transducer m x y
Transducer ((x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
forall (m :: * -> *) (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *) x
       y.
(Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) =>
(x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapStreamChunks x -> y
f)

-- | Function 'statefulTransducer' constructs a 'Transducer' from a state-transition function and the initial
-- state. The transition function may produce arbitrary output at any transition step.
statefulTransducer :: (Monad m, MonoidNull y) => (state -> x -> (state, y)) -> state -> Transducer m [x] y
statefulTransducer :: (state -> x -> (state, y)) -> state -> Transducer m [x] y
statefulTransducer state -> x -> (state, y)
f state
s0 = 
   (forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
 OpenTransducer m a1 a2 d [x] y ())
-> Transducer m [x] y
forall (m :: * -> *) x y.
(forall (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *).
 OpenTransducer m a1 a2 d x y ())
-> Transducer m x y
Transducer (\Source m a1 [x]
source Sink m a2 y
sink-> (state -> x -> Coroutine d m state)
-> state -> Source m a1 [x] -> Coroutine d m ()
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x acc.
(Monad m, AncestorFunctor a d) =>
(acc -> x -> Coroutine d m acc)
-> acc -> Source m a [x] -> Coroutine d m ()
foldMStream_ (\ state
s x
x -> let (state
s', y
ys) = state -> x -> (state, y)
f state
s x
x in y -> Sink m a2 y -> Coroutine d m y
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, MonoidNull x, AncestorFunctor a d) =>
x -> Sink m a x -> Coroutine d m x
putAll y
ys Sink m a2 y
sink Coroutine d m y -> Coroutine d m state -> Coroutine d m state
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> state -> Coroutine d m state
forall (m :: * -> *) a. Monad m => a -> m a
return state
s') state
s0 Source m a1 [x]
source)

-- | Function 'statelessSplitter' takes a function that assigns a Boolean value to each input item and lifts it into
-- a 'Splitter'.
statelessSplitter :: Monad m => (x -> Bool) -> Splitter m [x]
statelessSplitter :: (x -> Bool) -> Splitter m [x]
statelessSplitter x -> Bool
f = (forall (a1 :: * -> *) (a2 :: * -> *) (a3 :: * -> *) (d :: * -> *).
 OpenSplitter m a1 a2 a3 d [x] ())
-> Splitter m [x]
forall (m :: * -> *) x.
(forall (a1 :: * -> *) (a2 :: * -> *) (a3 :: * -> *) (d :: * -> *).
 OpenSplitter m a1 a2 a3 d x ())
-> Splitter m x
Splitter (\Source m a1 [x]
source Sink m a2 [x]
true Sink m a3 [x]
false-> (x -> Bool)
-> Source m a1 [x]
-> Sink m a2 [x]
-> Sink m a3 [x]
-> Coroutine d m ()
forall (m :: * -> *) (a1 :: * -> *) (a2 :: * -> *) (a3 :: * -> *)
       (d :: * -> *) x.
(Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d,
 AncestorFunctor a3 d) =>
(x -> Bool)
-> Source m a1 [x]
-> Sink m a2 [x]
-> Sink m a3 [x]
-> Coroutine d m ()
partitionStream x -> Bool
f Source m a1 [x]
source Sink m a2 [x]
true Sink m a3 [x]
false)

-- | Function 'statefulSplitter' takes a state-converting function that also assigns a Boolean value to each input
-- item and lifts it into a 'Splitter'.
statefulSplitter :: Monad m => (state -> x -> (state, Bool)) -> state -> Splitter m [x]
statefulSplitter :: (state -> x -> (state, Bool)) -> state -> Splitter m [x]
statefulSplitter state -> x -> (state, Bool)
f state
s0 = 
   (forall (a1 :: * -> *) (a2 :: * -> *) (a3 :: * -> *) (d :: * -> *).
 OpenSplitter m a1 a2 a3 d [x] ())
-> Splitter m [x]
forall (m :: * -> *) x.
(forall (a1 :: * -> *) (a2 :: * -> *) (a3 :: * -> *) (d :: * -> *).
 OpenSplitter m a1 a2 a3 d x ())
-> Splitter m x
Splitter (\Source m a1 [x]
source Sink m a2 [x]
true Sink m a3 [x]
false-> 
              (state -> x -> Coroutine d m state)
-> state -> Source m a1 [x] -> Coroutine d m ()
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x acc.
(Monad m, AncestorFunctor a d) =>
(acc -> x -> Coroutine d m acc)
-> acc -> Source m a [x] -> Coroutine d m ()
foldMStream_ 
                 (\ state
s x
x -> let (state
s', Bool
truth) = state -> x -> (state, Bool)
f state
s x
x in (if Bool
truth then Sink m a2 [x] -> x -> Coroutine d m ()
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Sink m a [x] -> x -> Coroutine d m ()
put Sink m a2 [x]
true x
x else Sink m a3 [x] -> x -> Coroutine d m ()
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Sink m a [x] -> x -> Coroutine d m ()
put Sink m a3 [x]
false x
x) Coroutine d m () -> Coroutine d m state -> Coroutine d m state
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> state -> Coroutine d m state
forall (m :: * -> *) a. Monad m => a -> m a
return state
s')
                 state
s0 Source m a1 [x]
source)