Copyright  (c) 2021 Juan Pablo Royo Sales 

License  BSD3 
Maintainer  juanpablo.royo@gmail.com 
Stability  experimental 
Portability  GHC 
Safe Haskell  None 
Language  Haskell2010 
DynamicPipeline is a Type Safe Dynamic and Parallel Streaming Library, which is an implementation of Dynamic Pipeline Paradigm (DPP) proposed in this paper DPP. The aim of this Library is to provide all the Type level constructs to guide the user in building a DPP flow to solve any algorithm that fits on this computational model.
This implementation has been developed using Type Level Programming techniques like Type families
, Defunctionalization
, Existential Types
and
Dynamic Record Tagged Types
among others.
Using all this techniques, we provide a High Level and Type Safe DynamicPipeline Library to build a Data Flow Algorithm avoiding as much as possible
boilerplate code, but maintaining safety and robustness.
Example of Filtering Repeated elements of a Stream
import DynamicPipeline type DPExample =Source
(Channel
(Int:<+>
Eof
)):=>
Generator
(Channel
(Int:<+>
Eof
)):=>
Sink
source' ::Stage
(WriteChannel
Int >DP
s ()) source' =withSource
DPExample $ cout >
DPExample genAction inunfoldT
([1 .. 1000] <> [1 .. 1000]) cout identity generator' ::GeneratorStage
DPExample (Maybe Int) Int s generator' = let gen =withGenerator
mkGenerator
gen filterTemp genAction ::Filter
DPExample (Maybe Int) Int s >ReadChannel
Int >WriteChannel
Int >DP
s () genAction filter' cin cout = let unfoldFilter =mkUnfoldFilterForAll'
(`push
` cout) filter' Just cinHNil
in void $unfoldF
unfoldFilter filterTemp ::Filter
DPExample (Maybe Int) Int s filterTemp =mkFilter
actorRepeted actorRepeted :: Int >ReadChannel
Int >WriteChannel
Int > StateT (Maybe Int) (DP
s) () actorRepeted i rc wc = do liftIO $foldM
rc $ e > if e /= i thenpush
e wc else pure () sink' ::Stage
(ReadChannel
Int >DP
s ()) sink' =withSink
DPExample $ flip
DPExample source' generator' sink'foldM
print program :: IO () program =runDP
$mkDP
Some visual representation of DP and how this is happening under the hood
Source
>>Filter
_1(runStateT $ actor_1 >> actor_2 >> ... actor_n) ..... >>Generator
>>Sink
    Async Async Async Async
Synopsis
 data Eof
 data Sink
 data Generator (a :: Type)
 data Source (a :: Type)
 data Channel (a :: Type)
 data FeedbackChannel (a :: Type)
 data a :=> b = a :=> b
 data chann1 :<+> chann2 = chann1 :<+> chann2
 data DynamicPipeline dpDefinition filterState filterParam st
 data Filter dpDefinition filterState filterParam st
 data Actor dpDefinition filterState filterParam monadicAction
 data GeneratorStage dpDefinition filterState filterParam st
 data Stage a
 type family ValidDP (a :: Bool) :: Constraint where ...
 type family IsDP (dpDefinition :: k) :: Bool where ...
 data DP st a
 data UnFoldFilter dpDefinition readElem st filterState filterParam l
 withDP :: IO a > DP s a
 mkGenerator :: Stage (WithGenerator dpDefinition (Filter dpDefinition filterState filterParam st) (DP st)) > Filter dpDefinition filterState filterParam st > GeneratorStage dpDefinition filterState filterParam st
 mkFilter :: forall dpDefinition filterState filterParam st. WithFilter dpDefinition filterParam (StateT filterState (DP st)) > Filter dpDefinition filterState filterParam st
 single :: forall dpDefinition filterState filterParam st. WithFilter dpDefinition filterParam (StateT filterState (DP st)) > NonEmpty (Actor dpDefinition filterState filterParam (StateT filterState (DP st)))
 actor :: forall dpDefinition filterState filterParam st. WithFilter dpDefinition filterParam (StateT filterState (DP st)) > Actor dpDefinition filterState filterParam (StateT filterState (DP st))
 (>>>) :: forall dpDefinition filterState filterParam st. Actor dpDefinition filterState filterParam (StateT filterState (DP st)) > Filter dpDefinition filterState filterParam st > Filter dpDefinition filterState filterParam st
 (>>) :: forall dpDefinition filterState filterParam st. Actor dpDefinition filterState filterParam (StateT filterState (DP st)) > Actor dpDefinition filterState filterParam (StateT filterState (DP st)) > Filter dpDefinition filterState filterParam st
 withSource :: forall (dpDefinition :: Type) st. WithSource dpDefinition (DP st) > Stage (WithSource dpDefinition (DP st))
 withGenerator :: forall (dpDefinition :: Type) (filter :: Type) st. WithGenerator dpDefinition filter (DP st) > Stage (WithGenerator dpDefinition filter (DP st))
 withSink :: forall (dpDefinition :: Type) st. WithSink dpDefinition (DP st) > Stage (WithSink dpDefinition (DP st))
 mkDP :: forall dpDefinition filterState st filterParam filter gparams slr slw glr glw silr silw iparams oparams ls lsi. DPConstraint dpDefinition filterState st filterParam filter gparams slr slw glr glw silr silw iparams oparams ls lsi => Stage (WithSource dpDefinition (DP st)) > GeneratorStage dpDefinition filterState filterParam st > Stage (WithSink dpDefinition (DP st)) > DP st ()
 runDP :: (forall st. DP st a) > IO a
 unfoldF :: forall dpDefinition readElem st filterState filterParam l l1 l2 l3 b2 b3 l4. SpawnFilterConstraint dpDefinition readElem st filterState filterParam l l1 l2 l3 b2 b3 l4 => UnFoldFilter dpDefinition readElem st filterState filterParam l > DP st (HList l)
 mkUnfoldFilter :: (readElem > Bool) > (readElem > DP st ()) > Filter dpDefinition filterState filterParam st > (readElem > filterState) > ReadChannel readElem > HList l > UnFoldFilter dpDefinition readElem st filterState filterParam l
 mkUnfoldFilter' :: (readElem > Bool) > Filter dpDefinition filterState filterParam st > (readElem > filterState) > ReadChannel readElem > HList l > UnFoldFilter dpDefinition readElem st filterState filterParam l
 mkUnfoldFilterForAll :: Filter dpDefinition filterState filterParam st > (readElem > filterState) > ReadChannel readElem > HList l > UnFoldFilter dpDefinition readElem st filterState filterParam l
 mkUnfoldFilterForAll' :: (readElem > DP st ()) > Filter dpDefinition filterState filterParam st > (readElem > filterState) > ReadChannel readElem > HList l > UnFoldFilter dpDefinition readElem st filterState filterParam l
 (.*.) :: HExtend e l => e > l > HExtendR e l
 data family HList (l :: [Type])
 hHead :: forall e (l :: [Type]). HList (e ': l) > e
 hTail :: forall e (l :: [Type]). HList (e ': l) > HList l
 data ReadChannel a
 data WriteChannel a
 (=>) :: MonadIO m => ReadChannel a > WriteChannel a > m ()
 (=>) :: MonadIO m => ReadChannel a > WriteChannel b > (a > b) > m ()
 (>=>) :: MonadIO m => ReadChannel a > WriteChannel b > (a > m (Maybe b)) > m ()
 (>=>) :: MonadIO m => ReadChannel a > WriteChannel b > (a > m (Maybe b)) > m ()
 mapF_ :: MonadIO m => ReadChannel a > WriteChannel b > (a > b) > m ()
 map_ :: MonadIO m => ReadChannel a > WriteChannel b > (a > b) > m ()
 mapM_ :: MonadIO m => ReadChannel a > WriteChannel b > (a > m (Maybe b)) > m ()
 mapMF_ :: MonadIO m => ReadChannel a > WriteChannel b > (a > m (Maybe b)) > m ()
 foldM_ :: MonadIO m => ReadChannel a > (a > m ()) > m ()
 foldWithM_ :: MonadIO m => ReadChannel a > m () > (a > m ()) > m ()
 push :: MonadIO m => a > WriteChannel a > m ()
 pull :: MonadIO m => ReadChannel a > m (Maybe a)
 finish :: MonadIO m => WriteChannel a > m ()
 unfoldM :: forall m a b. MonadIO m => m a > (a > b) > m Bool > WriteChannel b > m ()
 unfoldFile :: MonadIO m => FilePath > WriteChannel b > (ByteString > b) > m ()
 unfoldT :: (MonadIO m, Foldable t) => t a > WriteChannel b > (a > b) > m ()
DP Flow Grammar
The following is the Context Free Grammar allowed to build a DPP Flow definition:
DP >Source
CHANS:=>
Generator
CHANS:=>
Sink
DP >Source
CHANS:=>
Generator
CHANS:=>
FEEDBACK:=>
Sink
CHANS >Channel
CH FEEDBACK >FeedbackChannel
CH CH >Type
:<+>
CH Eof
Example:
Source
(Channel
(Int:<+>
Int)):=>
Generator
(Channel
(Int:<+>
Int)):=>
Sink
Or with Feedback Channel to retrofit Streamming
Source
(Channel
(Int:<+>
Int)):=>
Generator
(Channel
(Int:<+>
Int)):=>
FeedbackChannel
(String
:<+>
Eof
):=>
Sink
Building
DynamicPipeline
DynamicPipeline
Data type is the point where all the information is contained in order the library can run our DP Algorithm.
This Data type contains three fundamental pieces: Source
, Generator
and Sink
. But all these are dynamic based on the
defined Flow.
One of the fundamental feature of this Library is to provide several combinators that deduce from the Definition Flow, what are the
Function Signatures the user must fulfill according to his definition.
All these combinators work in the same manner which based on the flow definition present to the user at compile time what is the function that must be provided. Lets see an example based on the Misc.RepeatedDP, which basically filter out repeated elements in a stream.
>>>
import Relude
>>>
import DynamicPipeline
>>>
type DPEx = Source (Channel (Int :<+> Eof)) :=> Generator (Channel (Int :<+> Eof)) :=> Sink
>>>
:t withSource @DPEx
withSource @DPEx :: forall k (st :: k). (WriteChannel Int > DP st ()) > Stage (WriteChannel Int > DP st ())
In type DPEx = ..
we are defining a Flow which contains a Source
that is going to have an Int
Channel that is going to feed the Generator
.
Therefore the Source
should write on that channel and because of that we are being asked to provide a Function that WriteChannel Int > DP st ()
.
Remember that our Monadic context is always DP
.
Having that we can provide that function and have all the pieces together for Source
.
>>>
let source' = withSource @DPEx $ \wc > unfoldT ([1..10] <> [1..10] <> [1..10] <> [1..10]) wc identity
>>>
:t source'
source' :: forall k (st :: k). Stage (WriteChannel Int > DP st ())
So we are done. we provide that function.
Now we can do the same for Sink
which is the other opposite part of the Stream because Generator
is a little different as we can see in the documentation.
>>>
let sink' = withSink @DPEx $ \rc > foldM rc $ putStr . show
>>>
:t sink'
sink' :: forall k (st :: k). Stage (ReadChannel Int > DP st ())
Done with Sink
.
Generator and Filter
Now we reach to the last piece which needs more work to be done because it is the core of DPP which dynamically adds Parallel computations between the Generator
Stage
and previous Filter
s and Source
.
Fortunately we have the same combinator withGenerator
but it is not so straightforward what to put there. So, lets go step by step.
>>>
:t withGenerator @DPEx
withGenerator @DPEx :: forall k filter (st :: k). (filter > ReadChannel Int > WriteChannel Int > DP st ()) > Stage (filter > ReadChannel Int > WriteChannel Int > DP st ())
At the first Glance it is asking for some similar function that is going to return our desired Stage
but there is some type parameter which is
not so obvious filter
.
Fortunately we have combinators for that as well that can save us a lot of time and effort.
Note: We could have done a Generator with an Empty Filter
but we are not taking advantage of DPP in building a Pipeline Parallelization Computational Algorithm
In the case of Filter
we have several combinators at our disposal.
 Use
mkFilter
if your DPP contains 1 actor per Filter  Use
>>
and>>>
if your DPP contains more than 1 actor
In our example we are going to use 1 actor only that is going to discard repeated elements >>> :t mkFilter @DPEx actor1 Variable not in scope: actor1 :: filterParam > ReadChannel Int > WriteChannel Int > StateT filterState (DP st) ()
First lets fill in the gaps.
>>>
let filter' = mkFilter @DPEx (\i rc wc > foldM rc $ \e > if e /= i then push e wc else pure ())
>>>
:t filter'
filter' :: forall k filterState (st :: k). Filter DPEx filterState Int st
Basically we are checking if the element that we are reding from the Channel (Remember that we can have multiple Filter
on front writing to us),
is equal to the First Element that was read by the Generator
and on which this Filter
was instantiated with (a.k.a. filterParam
).
If the element is not equal we push
it to the next Filter
or Generator
, otherwise we discarded.
>>>
let gen' = mkGenerator (withGenerator @DPEx $ \f r w > let unf = mkUnfoldFilterForAll' (`push` w) f Just r HNil in void $ unfoldF unf) filter'
>>>
:t gen'
gen' :: forall k (st :: k). GeneratorStage DPEx (Maybe Int) Int st
Now we have everything in place we only need to call runDP
and mkDP
>>>
runDP $ mkDP @DPEx source' gen' sink'
12345678910
Types Flow definition
Eof
is the End of Channel mark in the DP Definition Flow
Instances
data Generator (a :: Type) Source #
Generator
contains the Generator
Stage its Channels definitions in the DP definition Flow.
a ~ Channel
Instances
data Source (a :: Type) Source #
Instances
data Channel (a :: Type) Source #
Channel
is the Container Type of Open Union Type which is going to be defined with :<+>
.
a ~ (Type:<+>
Type:<+>
...:<+>
Eof)
Instances
data FeedbackChannel (a :: Type) Source #
FeedbackChannel
is the Container Type of Open Union Type which is going to be defined with :<+>
and indicates that this
 Channel is for feedback to Source
a ~ (Type:<+>
Type:<+>
...:<+>
Eof)
Instances
MkCh inToGen => MkChans (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source #  
Defined in DynamicPipeline.Flow  
(MkCh inToGen, MkCh genToOut, MkCh toSource, HAppendList (HChO genToOut) (HChO toSource)) => MkChans (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source #  
type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source #  
Defined in DynamicPipeline.Flow  
type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source #  
data a :=> b infixr 5 Source #
This is the Type level function of the Open Union Type for Stages.
This should have the form:
Source
(Channel
..):=>
Generator
(Channel
..):=>
Sink
a :=> b infixr 5 
Instances
Functor ((:=>) a) Source #  
Foldable ((:=>) a) Source #  
Defined in DynamicPipeline.Flow fold :: Monoid m => (a :=> m) > m # foldMap :: Monoid m => (a0 > m) > (a :=> a0) > m # foldMap' :: Monoid m => (a0 > m) > (a :=> a0) > m # foldr :: (a0 > b > b) > b > (a :=> a0) > b # foldr' :: (a0 > b > b) > b > (a :=> a0) > b # foldl :: (b > a0 > b) > b > (a :=> a0) > b # foldl' :: (b > a0 > b) > b > (a :=> a0) > b # foldr1 :: (a0 > a0 > a0) > (a :=> a0) > a0 # foldl1 :: (a0 > a0 > a0) > (a :=> a0) > a0 # toList :: (a :=> a0) > [a0] # elem :: Eq a0 => a0 > (a :=> a0) > Bool # maximum :: Ord a0 => (a :=> a0) > a0 # minimum :: Ord a0 => (a :=> a0) > a0 #  
Traversable ((:=>) a) Source #  
MkCh inToGen => MkChans (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source #  
Defined in DynamicPipeline.Flow  
MkCh inToGen => MkChans (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source #  
(Bounded a, Bounded b) => Bounded (a :=> b) Source #  
(Eq a, Eq b) => Eq (a :=> b) Source #  
(Show a, Show b) => Show (a :=> b) Source #  
(MkCh inToGen, MkCh genToOut, MkCh toSource, HAppendList (HChO genToOut) (HChO toSource)) => MkChans (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source #  
(MkCh inToGen, MkCh genToOut) => MkChans (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) Source #  
Defined in DynamicPipeline.Flow  
type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source #  
Defined in DynamicPipeline.Flow  
type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source #  
type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source #  
type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) Source #  
data chann1 :<+> chann2 infixr 5 Source #
This is the Type level function of the Open Union Type for Channels.
Channels forms an Open Union Type in each stage because according to DPP we can have multiple In and Out Channels in a Single Stage.
Eof
should be the last Channel of the Open Union Type to indicate termination of the Grammar.
chann1 ~ Type
chann2 ~ Type
chann1 :<+> chann2 infixr 5 
Instances
Functor ((:<+>) chann1) Source #  
Foldable ((:<+>) chann1) Source #  
Defined in DynamicPipeline.Flow fold :: Monoid m => (chann1 :<+> m) > m # foldMap :: Monoid m => (a > m) > (chann1 :<+> a) > m # foldMap' :: Monoid m => (a > m) > (chann1 :<+> a) > m # foldr :: (a > b > b) > b > (chann1 :<+> a) > b # foldr' :: (a > b > b) > b > (chann1 :<+> a) > b # foldl :: (b > a > b) > b > (chann1 :<+> a) > b # foldl' :: (b > a > b) > b > (chann1 :<+> a) > b # foldr1 :: (a > a > a) > (chann1 :<+> a) > a # foldl1 :: (a > a > a) > (chann1 :<+> a) > a # toList :: (chann1 :<+> a) > [a] # null :: (chann1 :<+> a) > Bool # length :: (chann1 :<+> a) > Int # elem :: Eq a => a > (chann1 :<+> a) > Bool # maximum :: Ord a => (chann1 :<+> a) > a # minimum :: Ord a => (chann1 :<+> a) > a #  
Traversable ((:<+>) chann1) Source #  
Defined in DynamicPipeline.Flow  
(Bounded chann1, Bounded chann2) => Bounded (chann1 :<+> chann2) Source #  
(Eq chann1, Eq chann2) => Eq (chann1 :<+> chann2) Source #  
(Show chann1, Show chann2) => Show (chann1 :<+> chann2) Source #  
MkCh more => MkCh (a :<+> more) Source #  
type HChI (a :<+> more) Source #  
Defined in DynamicPipeline.Flow  
type HChO (a :<+> more) Source #  
Defined in DynamicPipeline.Flow 
Smart Constructors
data DynamicPipeline dpDefinition filterState filterParam st Source #
DynamicPipeline
data type which contains all the three Stages definitions that have been generated by other combinators like withSource
,
withGenerator
and withSink
.
dpDefinition ~
Source
(Channel
..):=>
Generator
(Channel
..):=>
Sink
 DP Type level Flow Definition
filterState
 State of the
StateT
Monad
that is the local State of the Filter execution filterParam
 Type of the First Parameter that is pass to the Filter when it is created by the Generator Anamorphism. Generator can change the type received from the Reader Channels.
st
 Existential Scope of
DP
Monad
.
data Filter dpDefinition filterState filterParam st Source #
Filter
Is the template definition of the Filter
that may be spawned when reading elements on the Stream.
Filter
is aNonEmpty
List ofActor
s. Each
Actor
is executed sequentially on the that List when an Element arrive to thatFilter
instance.  All the
Filter
execution (a.k.a.forM_ actors runStage
) executes in aStateT
Monad
to share an internal state amongActor
s.
dpDefinition ~
Source
(Channel
..):=>
Generator
(Channel
..):=>
Sink
 DP Type level Flow Definition
filterState
 State of the
StateT
Monad
that is the local State of the Filter execution filterParam
 Type of the First Parameter that is pass to the Filter when it is created by the Generator Anamorphism. Generator can change the type received from the Reader Channels.
st
 Existential Scope of
DP
Monad
.
Instances
Generic (Filter dpDefinition filterState filterParam st) Source #  
Defined in DynamicPipeline.Stage  
Wrapped (Filter s' s a param) Source #  
Defined in DynamicPipeline.Stage type Unwrapped (Filter s' s a param)  
type Rep (Filter dpDefinition filterState filterParam st) Source #  
Defined in DynamicPipeline.Stage type Rep (Filter dpDefinition filterState filterParam st) = D1 ('MetaData "Filter" "DynamicPipeline.Stage" "dynamicpipeline0.3.1.3inplace" 'True) (C1 ('MetaCons "Filter" 'PrefixI 'True) (S1 ('MetaSel ('Just "unFilter") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (NonEmpty (Actor dpDefinition filterState filterParam (StateT filterState (DP st)))))))  
type Unwrapped (Filter s' s a param) Source #  
Defined in DynamicPipeline.Stage 
data Actor dpDefinition filterState filterParam monadicAction Source #
Actor
Is a particular Stage
computation inside a Filter
.
dpDefinition ~
Source
(Channel
..):=>
Generator
(Channel
..):=>
Sink
 DP Type level Flow Definition
filterState
 State of the
StateT
Monad
that is the local State of the Filter execution filterParam
 Type of the First Parameter that is pass to the Filter when it is created by the Generator Anamorphism. Generator can change the type received from the Reader Channels.
monadicAction
Monad
Wrapped inStateT
.
data GeneratorStage dpDefinition filterState filterParam st Source #
GeneartorStage
is a special Stage
data type according to DPP Definition which contains a Filter
template definition,
in orther to know how to spawn a new Filter
if it is needed, and the Stage
of the Generator to allow the user to perform some computation
in that case.
dpDefinition ~
Source
(Channel
..):=>
Generator
(Channel
..):=>
Sink
 DP Type level Flow Definition
filterState
 State of the
StateT
Monad
that is the local State of the Filter execution filterParam
 Type of the First Parameter that is pass to the Filter when it is created by the Generator Anamorphism. Generator can change the type received from the Reader Channels.
st
 Existential Scope of
DP
Monad
.
type family ValidDP (a :: Bool) :: Constraint where ... Source #
FCF  Type Level Defunctionalization
ValidDP
Check if IsDP
is True
a
IsDP dpDefinition ~ 'True
Throw a TypeError
if Grammar is not correct
ValidDP 'True = ()  
ValidDP 'False = TypeError ((((((('Text "Invalid Semantic for Building DP Program" :$$: 'Text "Language Grammar:") :$$: 'Text "DP > Source CHANS :=> Generator CHANS :=> Sink") :$$: 'Text "DP > Source CHANS :=> Generator CHANS :=> FEEDBACK :=> Sink") :$$: 'Text "CHANS > Channel CH") :$$: 'Text "FEEDBACK > FeedbackChannel CH") :$$: 'Text "CH > Type :<+> CH  Eof") :$$: 'Text "Example: 'Source (Channel (Int :<+> Int)) :=> Generator (Channel (Int :<+> Int)) :=> Sink'") 
type family IsDP (dpDefinition :: k) :: Bool where ... Source #
FCF  Type Level Defunctionalization
IsDP
Validates if DP Flow Type Level Definition is Correct according to the Grammar
IsDP (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) = And (IsDP (Source (Channel inToGen))) (IsDP (Generator (Channel genToOut)))  
IsDP (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) = And (IsDP (Source (Channel inToGen))) (IsDP (Generator (Channel genToOut)))  
IsDP (Source (Channel (a :<+> more))) = IsDP (Source (Channel more))  
IsDP (Source (Channel Eof)) = 'True  
IsDP (Generator (Channel (a :<+> more))) = IsDP (Generator (Channel more))  
IsDP (Generator (Channel a)) = 'True  
IsDP x = 'False 
DP
is the only Monadic Action allowed to run a DP Defined Flow.
It is restricted on Scope by its Existential Type st
in order to not escape out from this Monadic Context.
data UnFoldFilter dpDefinition readElem st filterState filterParam l Source #
UnFoldFilter
is a wrapper Data Type that contains all the information needed to spawn Filter
instances according to DPP.
The user will have the capability to select how those filters are going to be spawned, for example on each read element, how to setup
initial states of StateT
Monad on Actor
computations in filters, among others.
dpDefinition ~
Source
(Channel
..):=>
Generator
(Channel
..):=>
Sink
 DP Type level Flow Definition
readElem
 Type of the element that is being read from the Selected Channel in the
Generator
Stage st
 Existential Scope of
DP
Monad
. filterState
 State of the
StateT
Monad
that is the local State of the Filter execution filterParam
 Type of the First Parameter that is pass to the Filter when it is created by the Generator Anamorphism. Generator can change the type received from the Reader Channels.
:: Stage (WithGenerator dpDefinition (Filter dpDefinition filterState filterParam st) (DP st))  Generator 
> Filter dpDefinition filterState filterParam st 

> GeneratorStage dpDefinition filterState filterParam st 
Smart Constructor of GeneratorStage
.
:: forall dpDefinition filterState filterParam st. WithFilter dpDefinition filterParam (StateT filterState (DP st))  Associated type family to Generate Function Signature 
> Filter dpDefinition filterState filterParam st 
Smart Constructor of Filter
.
:: forall dpDefinition filterState filterParam st. WithFilter dpDefinition filterParam (StateT filterState (DP st))  Associated type family to Generate Function Signature 
> Actor dpDefinition filterState filterParam (StateT filterState (DP st)) 
Smart Constructor of Actor
.
:: forall (dpDefinition :: Type) st. WithSource dpDefinition (DP st)  Associated type family to Generate Function Signature 
> Stage (WithSource dpDefinition (DP st)) 
:: forall (dpDefinition :: Type) (filter :: Type) st. WithGenerator dpDefinition filter (DP st)  Associated type family to Generate Function Signature 
> Stage (WithGenerator dpDefinition filter (DP st)) 
:: forall (dpDefinition :: Type) st. WithSink dpDefinition (DP st)  Associated type family to Generate Function Signature 
> Stage (WithSink dpDefinition (DP st)) 
:: forall dpDefinition filterState st filterParam filter gparams slr slw glr glw silr silw iparams oparams ls lsi. DPConstraint dpDefinition filterState st filterParam filter gparams slr slw glr glw silr silw iparams oparams ls lsi  
=> Stage (WithSource dpDefinition (DP st)) 

> GeneratorStage dpDefinition filterState filterParam st 

> Stage (WithSink dpDefinition (DP st))  
> DP st () 
Smart constructor for DynamicPipeline
Definition
:: forall dpDefinition readElem st filterState filterParam l l1 l2 l3 b2 b3 l4. SpawnFilterConstraint dpDefinition readElem st filterState filterParam l l1 l2 l3 b2 b3 l4  
=> UnFoldFilter dpDefinition readElem st filterState filterParam l  
> DP st (HList l)  Return the list of 
Run UnFoldFilter
:: (readElem > Bool)  Given a new Element determine if we need to interpose a new Filter or not 
> (readElem > DP st ())  For each element that the Filter is consuming allow to do something outside the filter with that element. For example trace or debug 
> Filter dpDefinition filterState filterParam st 

> (readElem > filterState)  Given the First element in this Filter Instance how to Initiate Internal 
> ReadChannel readElem  Main 
> HList l 

> UnFoldFilter dpDefinition readElem st filterState filterParam l 
Smart Constructor for UnFoldFilter
:: (readElem > Bool)  
> Filter dpDefinition filterState filterParam st  
> (readElem > filterState)  
> ReadChannel readElem  
> HList l  
> UnFoldFilter dpDefinition readElem st filterState filterParam l 
Smart Constructor for UnFoldFilter
which bypass to do something externally on each read element
:: Filter dpDefinition filterState filterParam st  
> (readElem > filterState)  
> ReadChannel readElem  
> HList l  
> UnFoldFilter dpDefinition readElem st filterState filterParam l 
Smart Constructor for UnFoldFilter
That creates a Filter
for each element on the Read Channel and interpose on Front of Generator
Stage
and Last Filter
Source > Filter1 > Filter2 ... > FilterN > Generator > Sink
mkUnfoldFilterForAll' Source #
:: (readElem > DP st ())  
> Filter dpDefinition filterState filterParam st  
> (readElem > filterState)  
> ReadChannel readElem  
> HList l  
> UnFoldFilter dpDefinition readElem st filterState filterParam l 
Idem for mkUnfoldFilterForAll
but do something on each Element externally
data family HList (l :: [Type]) #
Instances
(SameLengths '[x, y, xy], HZipList x y xy) => HUnzip HList x y xy  
Defined in Data.HList.HList  
(SameLengths '[x, y, xy], HZipList x y xy) => HZip HList x y xy  
Defined in Data.HList.HList  
HMapAux HList f ('[] :: [Type]) ('[] :: [Type])  
Defined in Data.HList.HList  
(HSpanEqBy f a as fst snd, HGroupBy f snd gs) => HGroupBy (f :: t) (a ': as) (HList (a ': fst) ': gs)  
(ApplyAB f e e', HMapAux HList f l l', SameLength l l') => HMapAux HList f (e ': l) (e' ': l')  
Defined in Data.HList.HList  
HReverse l l' => HBuild' l (HList l')  
Defined in Data.HList.HList  
HExtend e (HList l)  
HInits1 a b => HInits a (HList ('[] :: [Type]) ': b)  
Defined in Data.HList.HList  
(Bounded x, Bounded (HList xs)) => Bounded (HList (x ': xs))  
Bounded (HList ('[] :: [Type]))  
(Eq x, Eq (HList xs)) => Eq (HList (x ': xs))  
Eq (HList ('[] :: [Type]))  
(Ord x, Ord (HList xs)) => Ord (HList (x ': xs))  
Defined in Data.HList.HList compare :: HList (x ': xs) > HList (x ': xs) > Ordering # (<) :: HList (x ': xs) > HList (x ': xs) > Bool # (<=) :: HList (x ': xs) > HList (x ': xs) > Bool # (>) :: HList (x ': xs) > HList (x ': xs) > Bool # (>=) :: HList (x ': xs) > HList (x ': xs) > Bool # max :: HList (x ': xs) > HList (x ': xs) > HList (x ': xs) # min :: HList (x ': xs) > HList (x ': xs) > HList (x ': xs) #  
Ord (HList ('[] :: [Type]))  
Defined in Data.HList.HList  
(HProxies l, Read e, HSequence ReadP (ReadP e ': readP_l) (e ': l), HMapCxt HList ReadElement (AddProxy l) readP_l) => Read (HList (e ': l))  
Read (HList ('[] :: [Type]))  
(Show e, Show (HList l)) => Show (HList (e ': l))  
Show (HList ('[] :: [Type]))  
(Ix x, Ix (HList xs)) => Ix (HList (x ': xs))  
Defined in Data.HList.HList range :: (HList (x ': xs), HList (x ': xs)) > [HList (x ': xs)] # index :: (HList (x ': xs), HList (x ': xs)) > HList (x ': xs) > Int # unsafeIndex :: (HList (x ': xs), HList (x ': xs)) > HList (x ': xs) > Int # inRange :: (HList (x ': xs), HList (x ': xs)) > HList (x ': xs) > Bool # rangeSize :: (HList (x ': xs), HList (x ': xs)) > Int # unsafeRangeSize :: (HList (x ': xs), HList (x ': xs)) > Int #  
Ix (HList ('[] :: [Type]))  
Defined in Data.HList.HList range :: (HList '[], HList '[]) > [HList '[]] # index :: (HList '[], HList '[]) > HList '[] > Int # unsafeIndex :: (HList '[], HList '[]) > HList '[] > Int # inRange :: (HList '[], HList '[]) > HList '[] > Bool # rangeSize :: (HList '[], HList '[]) > Int # unsafeRangeSize :: (HList '[], HList '[]) > Int #  
(HZip HList a a aa, HMapCxt HList UncurryMappend aa a) => Semigroup (HList a)  
(HProxies a, HMapCxt HList ConstMempty (AddProxy a) a, HZip HList a a aa, HMapCxt HList UncurryMappend aa a) => Monoid (HList a)  
(TypeRepsList (HList xs), Typeable x) => TypeRepsList (HList (x ': xs))  
Defined in Data.HList.Data typeRepsList :: HList (x ': xs) > [TypeRep]  
TypeRepsList (HList ('[] :: [Type]))  
Defined in Data.HList.Data typeRepsList :: HList '[] > [TypeRep]  
HAppendList l1 l2 => HAppend (HList l1) (HList l2)  
ApplyAB f e e' => ApplyAB (MapCar f) (e, HList l) (HList (e' ': l))  
Defined in Data.HList.HList  
HInits1 ('[] :: [Type]) '[HList ('[] :: [Type])]  
Defined in Data.HList.HList  
HTails ('[] :: [Type]) '[HList ('[] :: [Type])]  
Defined in Data.HList.HList  
Apply (FHUProj sel ns) (HList l, Proxy ('HSucc n)) => Apply (Proxy 'False, FHUProj sel ns) (HList (e ': l), Proxy n)  
Apply (Proxy 'True, FHUProj sel ns) (HList (e ': l), Proxy n)  
(ch ~ Proxy (HBoolEQ sel (KMember n ns)), Apply (ch, FHUProj sel ns) (HList (e ': l), Proxy n)) => Apply (FHUProj sel ns) (HList (e ': l), Proxy n)  
Apply (FHUProj sel ns) (HList ('[] :: [Type]), n)  
Defined in Data.HList.HArray type ApplyR (FHUProj sel ns) (HList '[], n)  
(HConcatFD as bs, HAppendFD a bs cs) => HConcatFD (HList a ': as) cs  
Defined in Data.HList.HList  
(HInits1 xs ys, HMapCxt HList (FHCons2 x) ys ys', HMapCons x ys ~ ys', HMapTail ys' ~ ys) => HInits1 (x ': xs) (HList '[x] ': ys')  
Defined in Data.HList.HList  
HTails xs ys => HTails (x ': xs) (HList (x ': xs) ': ys)  
Defined in Data.HList.HList  
HMapUnboxF xs us => HMapUnboxF (HList x ': xs) (RecordU x ': us)  
Defined in Data.HList.RecordU  
(HList (x ': y) ~ z, HZip3 xs ys zs) => HZip3 (x ': xs) (HList y ': ys) (z ': zs)  
type HExtendR e (HList l)  
Defined in Data.HList.HList  
type HMapCons x (HList a ': b)  
Defined in Data.HList.HList  
data HList ('[] :: [Type])  
Defined in Data.HList.HList  
type UnHList (HList a)  
Defined in Data.HList.HList type UnHList (HList a) = a  
type HAppendR (HList l1 :: Type) (HList l2 :: Type)  
type ApplyR (Proxy 'False, FHUProj sel ns) (HList (e ': l), Proxy n)  
type ApplyR (Proxy 'True, FHUProj sel ns) (HList (e ': l), Proxy n)  
type ApplyR (FHUProj sel ns) (HList ('[] :: [Type]), n)  
Defined in Data.HList.HArray  
type ApplyR (FHUProj sel ns) (HList (e ': l), Proxy n)  
data HList (x ': xs)  
Defined in Data.HList.HList  
type HMapTail (HList (a ': as) ': bs)  
Defined in Data.HList.HList 
Channels
data ReadChannel a Source #
ReadChannel
can only read values of a previously written Channel. It is connected to a WriteChannel
but hidden for the user
a
 Type that this Channel can read
data WriteChannel a Source #
WriteChannel
can only write values into some Channel Queue
a
 Type that this Channel can write
(=>) :: MonadIO m => ReadChannel a > WriteChannel a > m () infixl 5 Source #
(=>) :: MonadIO m => ReadChannel a > WriteChannel b > (a > b) > m () infixl 5 Source #
Alias mapF_
(>=>) :: MonadIO m => ReadChannel a > WriteChannel b > (a > m (Maybe b)) > m () infixr 5 Source #
Alias mapM_
(>=>) :: MonadIO m => ReadChannel a > WriteChannel b > (a > m (Maybe b)) > m () infixr 5 Source #
Alias mapMF_
:: MonadIO m  
=> ReadChannel a  
> WriteChannel b  
> (a > b)  Monadic Transformation to do with read element 
> m () 
Same as map_
but mark Eof Channel after all processing
:: MonadIO m  
=> ReadChannel a  
> WriteChannel b  
> (a > b)  Monadic Transformation to do with read element 
> m () 
map_
is a Natural Transformation from consumer ReadChannel
to some producer WriteChannel
applying a transformation with function f
:: MonadIO m  
=> ReadChannel a  
> WriteChannel b  
> (a > m (Maybe b))  Monadic Transformation to do with read element 
> m () 
Same as map_
But applying a Monadic mapping
:: MonadIO m  
=> ReadChannel a  
> WriteChannel b  
> (a > m (Maybe b))  Monadic Transformation to do with read element 
> m () 
Same as mapM_
but mark Eof Channel after all processing
:: MonadIO m  
=> ReadChannel a  
> (a > m ())  Computation to do with read element 
> m () 
foldM_
is a Catamorphism for consuming a ReadChannel
and do some Monadic m
computation with each element
:: MonadIO m  
=> ReadChannel a  
> m ()  Computation to do at the end of the channel 
> (a > m ())  Computation to do with read element 
> m () 
Idem foldM_
but allows pass a monadic computation to perform at the end of the Channel
push :: MonadIO m => a > WriteChannel a > m () Source #
Push element a
into WriteChannel
pull :: MonadIO m => ReadChannel a > m (Maybe a) Source #
Pull element Maybe a
from ReadChannel
finish :: MonadIO m => WriteChannel a > m () Source #
Finalize Channel to indicate EOF mark and allow progress on following consumers
:: forall m a b. MonadIO m  
=> m a  Monadic Seed 
> (a > b)  Map input from seed to something to be written in Channel 
> m Bool  When stop unfolding 
> WriteChannel b 

> m () 
Coalgebra with Monadic computation to Feed some WriteChannel
m
 Monadic computation wrapping Coalgebra
a
 Element get from some Source and to be write in some Channel
 unfold from a Monadic seed m a
to a WriteChannel
:: MonadIO m  
=> FilePath  Seed 
> WriteChannel b 

> (ByteString > b)  Transform 
> m () 
Using unfoldM
, unfold from file