dynamic-pipeline-0.3.1.2: Library Type Safe implementation of Dynamic Pipeline Paradigm (DPP).
Copyright(c) 2021 Juan Pablo Royo Sales
LicenseBSD3
Maintainerjuanpablo.royo@gmail.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

DynamicPipeline

Description

DynamicPipeline is a Type Safe Dynamic and Parallel Streaming Library, which is an implementation of Dynamic Pipeline Paradigm (DPP) proposed in this paper DPP. The aim of this Library is to provide all the Type level constructs to guide the user in building a DPP flow to solve any algorithm that fits on this computational model.

This implementation has been developed using Type Level Programming techniques like Type families, Defunctionalization, Existential Types and Dynamic Record Tagged Types among others. Using all this techniques, we provide a High Level and Type Safe DynamicPipeline Library to build a Data Flow Algorithm avoiding as much as possible boilerplate code, but maintaining safety and robustness.

Example of Filtering Repeated elements of a Stream

import DynamicPipeline

type DPExample = Source (Channel (Int :<+> Eof)) :=> Generator (Channel (Int :<+> Eof)) :=> Sink

source' :: Stage (WriteChannel Int -> DP s ())
source' = withSource DPExample $ cout -> unfoldT ([1 .. 1000] <> [1 .. 1000]) cout identity

generator' :: GeneratorStage DPExample (Maybe Int) Int s
generator' =
  let gen = withGenerator DPExample genAction
   in mkGenerator gen filterTemp

genAction :: Filter DPExample (Maybe Int) Int s 
          -> ReadChannel Int
          -> WriteChannel Int
          -> DP s ()
genAction filter' cin cout = 
  let unfoldFilter = mkUnfoldFilterForAll' (`push` cout) filter' Just cin HNil 
   in void $ unfoldF unfoldFilter

filterTemp :: Filter DPExample (Maybe Int) Int s 
filterTemp = mkFilter actorRepeted

actorRepeted :: Int
             -> ReadChannel Int
             -> WriteChannel Int
             -> StateT (Maybe Int) (DP s) ()
actorRepeted i rc wc = do
  liftIO $ foldM rc $ e -> if e /= i then push e wc else pure ()

sink' :: Stage (ReadChannel Int -> DP s ())
sink' = withSink DPExample $ flip foldM print

program :: IO ()
program = runDP $ mkDP DPExample source' generator' sink'
Synopsis

DP Flow Grammar

The following is the Context Free Grammar allowed to build a DPP Flow definition:

DP       -> Source CHANS :=> Generator CHANS :=> Sink
DP       -> Source CHANS :=> Generator CHANS :=> FEEDBACK :=> Sink
CHANS    -> Channel CH
FEEDBACK -> FeedbackChannel CH
CH       -> Type :<+> CH | Eof

Example:

 Source (Channel (Int :<+> Int)) :=> Generator (Channel (Int :<+> Int)) :=> Sink

Or with Feedback Channel to retrofit Streamming

 Source (Channel (Int :<+> Int)) :=> Generator (Channel (Int :<+> Int)) :=> FeedbackChannel (String :<+> Eof) :=> Sink

Building DynamicPipeline

DynamicPipeline Data type is the point where all the information is contained in order the library can run our DP Algorithm.

This Data type contains three fundamental pieces: Source, Generator and Sink. But all these are dynamic based on the defined Flow. One of the fundamental feature of this Library is to provide several combinators that deduce from the Definition Flow, what are the Function Signatures the user must fulfill according to his definition.

All these combinators work in the same manner which based on the flow definition present to the user at compile time what is the function that must be provided. Lets see an example based on the Misc.RepeatedDP, which basically filter out repeated elements in a stream.

>>> import Relude
>>> import DynamicPipeline
>>> type DPEx = Source (Channel (Int :<+> Eof)) :=> Generator (Channel (Int :<+> Eof)) :=> Sink
>>> :t withSource @DPEx
withSource @DPEx
  :: forall k (st :: k).
     (WriteChannel Int -> DP st ())
     -> Stage (WriteChannel Int -> DP st ())

In type DPEx = .. we are defining a Flow which contains a Source that is going to have an Int Channel that is going to feed the Generator. Therefore the Source should write on that channel and because of that we are being asked to provide a Function that WriteChannel Int -> DP st (). Remember that our Monadic context is always DP.

Having that we can provide that function and have all the pieces together for Source.

>>> let source' = withSource @DPEx $ \wc -> unfoldT ([1..10] <> [1..10] <> [1..10] <> [1..10]) wc identity
>>> :t source'
source' :: forall k (st :: k). Stage (WriteChannel Int -> DP st ())

So we are done. we provide that function. Now we can do the same for Sink which is the other opposite part of the Stream because Generator is a little different as we can see in the documentation.

>>> let sink' = withSink @DPEx $ \rc -> foldM rc $ putStr . show
>>> :t sink'
sink' :: forall k (st :: k). Stage (ReadChannel Int -> DP st ())

Done with Sink.

Generator and Filter

Now we reach to the last piece which needs more work to be done because it is the core of DPP which dynamically adds Parallel computations between the Generator Stage and previous Filters and Source.

Fortunately we have the same combinator withGenerator but it is not so straightforward what to put there. So, lets go step by step.

>>> :t withGenerator @DPEx
withGenerator @DPEx
  :: forall k filter (st :: k).
     (filter -> ReadChannel Int -> WriteChannel Int -> DP st ())
     -> Stage
          (filter -> ReadChannel Int -> WriteChannel Int -> DP st ())

At the first Glance it is asking for some similar function that is going to return our desired Stage but there is some type parameter which is not so obvious filter. Fortunately we have combinators for that as well that can save us a lot of time and effort.

Note: We could have done a Generator with an Empty Filter but we are not taking advantage of DPP in building a Pipeline Parallelization Computational Algorithm

In the case of Filter we have several combinators at our disposal.

  • Use mkFilter if your DPP contains 1 actor per Filter
  • Use |>> and |>>> if your DPP contains more than 1 actor

In our example we are going to use 1 actor only that is going to discard repeated elements >>> :t mkFilter @DPEx actor1 Variable not in scope: actor1 :: filterParam -> ReadChannel Int -> WriteChannel Int -> StateT filterState (DP st) ()

First lets fill in the gaps.

>>> let filter' = mkFilter @DPEx (\i rc wc -> foldM rc $ \e -> if e /= i then push e wc else pure ())
>>> :t filter'
filter' :: forall k filterState (st :: k). Filter DPEx filterState Int st

Basically we are checking if the element that we are reding from the Channel (Remember that we can have multiple Filter on front writing to us), is equal to the First Element that was read by the Generator and on which this Filter was instantiated with (a.k.a. filterParam). If the element is not equal we push it to the next Filter or Generator, otherwise we discarded.

>>> let gen' = mkGenerator (withGenerator @DPEx $ \f r w -> let unf = mkUnfoldFilterForAll' (`push` w) f Just r HNil in void $ unfoldF unf) filter'
>>> :t gen'
gen' :: forall k (st :: k). GeneratorStage DPEx (Maybe Int) Int st

Now we have everything in place we only need to call runDP and mkDP

>>> runDP $ mkDP @DPEx source' gen' sink'
12345678910

Types Flow definition

data Eof Source #

Eof is the End of Channel mark in the DP Definition Flow

Instances

Instances details
MkCh Eof Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChI Eof :: [Type] Source #

type HChO Eof :: [Type] Source #

Methods

mkCh :: Proxy Eof -> IO (HList (HChI Eof), HList (HChO Eof)) Source #

type HChI Eof Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChI Eof = '[] :: [Type]
type HChO Eof Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChO Eof = '[] :: [Type]

data Sink Source #

Sink contains the Sink Stage end of Flow of DP definition.

Instances

Instances details
MkCh inToGen => MkChans (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source #

Methods

mkChans :: Proxy (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) -> IO (HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))))) Source #

MkCh inToGen => MkChans (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source #

Methods

mkChans :: Proxy (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) -> IO (HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)))) Source #

(MkCh inToGen, MkCh genToOut, MkCh toSource, HAppendList (HChO genToOut) (HChO toSource)) => MkChans (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source #

Methods

mkChans :: Proxy (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) -> IO (HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source #

(MkCh inToGen, MkCh genToOut) => MkChans (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) Source #

Methods

mkChans :: Proxy (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) -> IO (HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source #

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) = InOutChan (HChI inToGen) (HChO inToGen)
type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) = InOutChan (HChI inToGen) (HChO inToGen)
type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) = ChanRecord (HChI toSource) (HChO inToGen) (HChI inToGen) (HAppendListR (HChO genToOut) (HChO toSource)) (HChI genToOut) ('[] :: [Type])
type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) = ChanRecord ('[] :: [Type]) (HChO inToGen) (HChI inToGen) (HChO genToOut) (HChI genToOut) ('[] :: [Type])

data Generator (a :: Type) Source #

Generator contains the Generator Stage its Channels definitions in the DP definition Flow.

 a ~ Channel

Instances

Instances details
MkCh inToGen => MkChans (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source #

Methods

mkChans :: Proxy (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) -> IO (HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))))) Source #

MkCh inToGen => MkChans (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source #

Methods

mkChans :: Proxy (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) -> IO (HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)))) Source #

(MkCh inToGen, MkCh genToOut, MkCh toSource, HAppendList (HChO genToOut) (HChO toSource)) => MkChans (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source #

Methods

mkChans :: Proxy (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) -> IO (HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source #

(MkCh inToGen, MkCh genToOut) => MkChans (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) Source #

Methods

mkChans :: Proxy (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) -> IO (HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source #

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) = InOutChan (HChI inToGen) (HChO inToGen)
type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) = InOutChan (HChI inToGen) (HChO inToGen)
type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) = ChanRecord (HChI toSource) (HChO inToGen) (HChI inToGen) (HAppendListR (HChO genToOut) (HChO toSource)) (HChI genToOut) ('[] :: [Type])
type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) = ChanRecord ('[] :: [Type]) (HChO inToGen) (HChI inToGen) (HChO genToOut) (HChI genToOut) ('[] :: [Type])

data Source (a :: Type) Source #

Source contains the Source Stage its Channels definitions in the DP definition Flow.

 a ~ Channel

Instances

Instances details
MkCh inToGen => MkChans (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source #

Methods

mkChans :: Proxy (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) -> IO (HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))))) Source #

MkCh inToGen => MkChans (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source #

Methods

mkChans :: Proxy (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) -> IO (HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)))) Source #

(MkCh inToGen, MkCh genToOut, MkCh toSource, HAppendList (HChO genToOut) (HChO toSource)) => MkChans (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source #

Methods

mkChans :: Proxy (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) -> IO (HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source #

(MkCh inToGen, MkCh genToOut) => MkChans (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) Source #

Methods

mkChans :: Proxy (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) -> IO (HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source #

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) = InOutChan (HChI inToGen) (HChO inToGen)
type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) = InOutChan (HChI inToGen) (HChO inToGen)
type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) = ChanRecord (HChI toSource) (HChO inToGen) (HChI inToGen) (HAppendListR (HChO genToOut) (HChO toSource)) (HChI genToOut) ('[] :: [Type])
type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) = ChanRecord ('[] :: [Type]) (HChO inToGen) (HChI inToGen) (HChO genToOut) (HChI genToOut) ('[] :: [Type])

data Channel (a :: Type) Source #

Channel is the Container Type of Open Union Type which is going to be defined with :<+>.

 a ~ (Type :<+> Type :<+> ... :<+> Eof)

Instances

Instances details
MkCh inToGen => MkChans (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source #

Methods

mkChans :: Proxy (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) -> IO (HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))))) Source #

MkCh inToGen => MkChans (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source #

Methods

mkChans :: Proxy (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) -> IO (HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)))) Source #

(MkCh inToGen, MkCh genToOut, MkCh toSource, HAppendList (HChO genToOut) (HChO toSource)) => MkChans (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source #

Methods

mkChans :: Proxy (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) -> IO (HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source #

(MkCh inToGen, MkCh genToOut) => MkChans (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) Source #

Methods

mkChans :: Proxy (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) -> IO (HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source #

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) = InOutChan (HChI inToGen) (HChO inToGen)
type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) = InOutChan (HChI inToGen) (HChO inToGen)
type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) = ChanRecord (HChI toSource) (HChO inToGen) (HChI inToGen) (HAppendListR (HChO genToOut) (HChO toSource)) (HChI genToOut) ('[] :: [Type])
type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) = ChanRecord ('[] :: [Type]) (HChO inToGen) (HChI inToGen) (HChO genToOut) (HChI genToOut) ('[] :: [Type])

data FeedbackChannel (a :: Type) Source #

FeedbackChannel is the Container Type of Open Union Type which is going to be defined with :<+> and indicates that this | Channel is for feedback to Source

 a ~ (Type :<+> Type :<+> ... :<+> Eof)

Instances

Instances details
MkCh inToGen => MkChans (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source #

Methods

mkChans :: Proxy (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) -> IO (HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))))) Source #

(MkCh inToGen, MkCh genToOut, MkCh toSource, HAppendList (HChO genToOut) (HChO toSource)) => MkChans (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source #

Methods

mkChans :: Proxy (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) -> IO (HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source #

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) = InOutChan (HChI inToGen) (HChO inToGen)
type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) = ChanRecord (HChI toSource) (HChO inToGen) (HChI inToGen) (HAppendListR (HChO genToOut) (HChO toSource)) (HChI genToOut) ('[] :: [Type])

data a :=> b infixr 5 Source #

This is the Type level function of the Open Union Type for Stages.

This should have the form:

 Source (Channel ..) :=> Generator (Channel ..) :=> Sink

Constructors

a :=> b infixr 5 

Instances

Instances details
Functor ((:=>) a) Source # 
Instance details

Defined in DynamicPipeline.Flow

Methods

fmap :: (a0 -> b) -> (a :=> a0) -> a :=> b #

(<$) :: a0 -> (a :=> b) -> a :=> a0 #

Foldable ((:=>) a) Source # 
Instance details

Defined in DynamicPipeline.Flow

Methods

fold :: Monoid m => (a :=> m) -> m #

foldMap :: Monoid m => (a0 -> m) -> (a :=> a0) -> m #

foldMap' :: Monoid m => (a0 -> m) -> (a :=> a0) -> m #

foldr :: (a0 -> b -> b) -> b -> (a :=> a0) -> b #

foldr' :: (a0 -> b -> b) -> b -> (a :=> a0) -> b #

foldl :: (b -> a0 -> b) -> b -> (a :=> a0) -> b #

foldl' :: (b -> a0 -> b) -> b -> (a :=> a0) -> b #

foldr1 :: (a0 -> a0 -> a0) -> (a :=> a0) -> a0 #

foldl1 :: (a0 -> a0 -> a0) -> (a :=> a0) -> a0 #

toList :: (a :=> a0) -> [a0] #

null :: (a :=> a0) -> Bool #

length :: (a :=> a0) -> Int #

elem :: Eq a0 => a0 -> (a :=> a0) -> Bool #

maximum :: Ord a0 => (a :=> a0) -> a0 #

minimum :: Ord a0 => (a :=> a0) -> a0 #

sum :: Num a0 => (a :=> a0) -> a0 #

product :: Num a0 => (a :=> a0) -> a0 #

Traversable ((:=>) a) Source # 
Instance details

Defined in DynamicPipeline.Flow

Methods

traverse :: Applicative f => (a0 -> f b) -> (a :=> a0) -> f (a :=> b) #

sequenceA :: Applicative f => (a :=> f a0) -> f (a :=> a0) #

mapM :: Monad m => (a0 -> m b) -> (a :=> a0) -> m (a :=> b) #

sequence :: Monad m => (a :=> m a0) -> m (a :=> a0) #

MkCh inToGen => MkChans (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source #

Methods

mkChans :: Proxy (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) -> IO (HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))))) Source #

MkCh inToGen => MkChans (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source #

Methods

mkChans :: Proxy (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) -> IO (HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)))) Source #

(Bounded a, Bounded b) => Bounded (a :=> b) Source # 
Instance details

Defined in DynamicPipeline.Flow

Methods

minBound :: a :=> b #

maxBound :: a :=> b #

(Eq a, Eq b) => Eq (a :=> b) Source # 
Instance details

Defined in DynamicPipeline.Flow

Methods

(==) :: (a :=> b) -> (a :=> b) -> Bool #

(/=) :: (a :=> b) -> (a :=> b) -> Bool #

(Show a, Show b) => Show (a :=> b) Source # 
Instance details

Defined in DynamicPipeline.Flow

Methods

showsPrec :: Int -> (a :=> b) -> ShowS #

show :: (a :=> b) -> String #

showList :: [a :=> b] -> ShowS #

(MkCh inToGen, MkCh genToOut, MkCh toSource, HAppendList (HChO genToOut) (HChO toSource)) => MkChans (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source #

Methods

mkChans :: Proxy (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) -> IO (HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source #

(MkCh inToGen, MkCh genToOut) => MkChans (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) Source #

Methods

mkChans :: Proxy (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) -> IO (HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source #

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink)))) = InOutChan (HChI inToGen) (HChO inToGen)
type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (ChansFilter (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink))) = InOutChan (HChI inToGen) (HChO inToGen)
type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) = ChanRecord (HChI toSource) (HChO inToGen) (HChI inToGen) (HAppendListR (HChO genToOut) (HChO toSource)) (HChI genToOut) ('[] :: [Type])
type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChan (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) = ChanRecord ('[] :: [Type]) (HChO inToGen) (HChI inToGen) (HChO genToOut) (HChI genToOut) ('[] :: [Type])

data chann1 :<+> chann2 infixr 5 Source #

This is the Type level function of the Open Union Type for Channels.

Channels forms an Open Union Type in each stage because according to DPP we can have multiple In and Out Channels in a Single Stage.

Eof should be the last Channel of the Open Union Type to indicate termination of the Grammar.

 chann1 ~ Type
 chann2 ~ Type

Constructors

chann1 :<+> chann2 infixr 5 

Instances

Instances details
Functor ((:<+>) chann1) Source # 
Instance details

Defined in DynamicPipeline.Flow

Methods

fmap :: (a -> b) -> (chann1 :<+> a) -> chann1 :<+> b #

(<$) :: a -> (chann1 :<+> b) -> chann1 :<+> a #

Foldable ((:<+>) chann1) Source # 
Instance details

Defined in DynamicPipeline.Flow

Methods

fold :: Monoid m => (chann1 :<+> m) -> m #

foldMap :: Monoid m => (a -> m) -> (chann1 :<+> a) -> m #

foldMap' :: Monoid m => (a -> m) -> (chann1 :<+> a) -> m #

foldr :: (a -> b -> b) -> b -> (chann1 :<+> a) -> b #

foldr' :: (a -> b -> b) -> b -> (chann1 :<+> a) -> b #

foldl :: (b -> a -> b) -> b -> (chann1 :<+> a) -> b #

foldl' :: (b -> a -> b) -> b -> (chann1 :<+> a) -> b #

foldr1 :: (a -> a -> a) -> (chann1 :<+> a) -> a #

foldl1 :: (a -> a -> a) -> (chann1 :<+> a) -> a #

toList :: (chann1 :<+> a) -> [a] #

null :: (chann1 :<+> a) -> Bool #

length :: (chann1 :<+> a) -> Int #

elem :: Eq a => a -> (chann1 :<+> a) -> Bool #

maximum :: Ord a => (chann1 :<+> a) -> a #

minimum :: Ord a => (chann1 :<+> a) -> a #

sum :: Num a => (chann1 :<+> a) -> a #

product :: Num a => (chann1 :<+> a) -> a #

Traversable ((:<+>) chann1) Source # 
Instance details

Defined in DynamicPipeline.Flow

Methods

traverse :: Applicative f => (a -> f b) -> (chann1 :<+> a) -> f (chann1 :<+> b) #

sequenceA :: Applicative f => (chann1 :<+> f a) -> f (chann1 :<+> a) #

mapM :: Monad m => (a -> m b) -> (chann1 :<+> a) -> m (chann1 :<+> b) #

sequence :: Monad m => (chann1 :<+> m a) -> m (chann1 :<+> a) #

(Bounded chann1, Bounded chann2) => Bounded (chann1 :<+> chann2) Source # 
Instance details

Defined in DynamicPipeline.Flow

Methods

minBound :: chann1 :<+> chann2 #

maxBound :: chann1 :<+> chann2 #

(Eq chann1, Eq chann2) => Eq (chann1 :<+> chann2) Source # 
Instance details

Defined in DynamicPipeline.Flow

Methods

(==) :: (chann1 :<+> chann2) -> (chann1 :<+> chann2) -> Bool #

(/=) :: (chann1 :<+> chann2) -> (chann1 :<+> chann2) -> Bool #

(Show chann1, Show chann2) => Show (chann1 :<+> chann2) Source # 
Instance details

Defined in DynamicPipeline.Flow

Methods

showsPrec :: Int -> (chann1 :<+> chann2) -> ShowS #

show :: (chann1 :<+> chann2) -> String #

showList :: [chann1 :<+> chann2] -> ShowS #

MkCh more => MkCh (a :<+> more) Source # 
Instance details

Defined in DynamicPipeline.Flow

Associated Types

type HChI (a :<+> more) :: [Type] Source #

type HChO (a :<+> more) :: [Type] Source #

Methods

mkCh :: Proxy (a :<+> more) -> IO (HList (HChI (a :<+> more)), HList (HChO (a :<+> more))) Source #

type HChI (a :<+> more) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChI (a :<+> more) = ReadChannel a ': HChI more
type HChO (a :<+> more) Source # 
Instance details

Defined in DynamicPipeline.Flow

type HChO (a :<+> more) = WriteChannel a ': HChO more

Smart Constructors

data DynamicPipeline dpDefinition filterState filterParam st Source #

DynamicPipeline data type which contains all the three Stages definitions that have been generated by other combinators like withSource, withGenerator and withSink.

dpDefinition ~ Source (Channel ..) :=> Generator (Channel ..) :=> Sink
DP Type level Flow Definition
filterState
State of the StateT Monad that is the local State of the Filter execution
filterParam
Type of the First Parameter that is pass to the Filter when it is created by the Generator Anamorphism. Generator can change the type received from the Reader Channels.
st
Existential Scope of DP Monad.

data Filter dpDefinition filterState filterParam st Source #

Filter Is the template definition of the Filter that may be spawned when reading elements on the Stream.

  • Filter is a NonEmpty List of Actors.
  • Each Actor is executed sequentially on the that List when an Element arrive to that Filter instance.
  • All the Filter execution (a.k.a. forM_ actors runStage) executes in a StateT Monad to share an internal state among Actors.
dpDefinition ~ Source (Channel ..) :=> Generator (Channel ..) :=> Sink
DP Type level Flow Definition
filterState
State of the StateT Monad that is the local State of the Filter execution
filterParam
Type of the First Parameter that is pass to the Filter when it is created by the Generator Anamorphism. Generator can change the type received from the Reader Channels.
st
Existential Scope of DP Monad.

Instances

Instances details
Generic (Filter dpDefinition filterState filterParam st) Source # 
Instance details

Defined in DynamicPipeline.Stage

Associated Types

type Rep (Filter dpDefinition filterState filterParam st) :: Type -> Type #

Methods

from :: Filter dpDefinition filterState filterParam st -> Rep (Filter dpDefinition filterState filterParam st) x #

to :: Rep (Filter dpDefinition filterState filterParam st) x -> Filter dpDefinition filterState filterParam st #

Wrapped (Filter s' s a param) Source # 
Instance details

Defined in DynamicPipeline.Stage

Associated Types

type Unwrapped (Filter s' s a param)

Methods

_Wrapped' :: Iso' (Filter s' s a param) (Unwrapped (Filter s' s a param))

type Rep (Filter dpDefinition filterState filterParam st) Source # 
Instance details

Defined in DynamicPipeline.Stage

type Rep (Filter dpDefinition filterState filterParam st) = D1 ('MetaData "Filter" "DynamicPipeline.Stage" "dynamic-pipeline-0.3.1.2-inplace" 'True) (C1 ('MetaCons "Filter" 'PrefixI 'True) (S1 ('MetaSel ('Just "unFilter") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (NonEmpty (Actor dpDefinition filterState filterParam (StateT filterState (DP st)))))))
type Unwrapped (Filter s' s a param) Source # 
Instance details

Defined in DynamicPipeline.Stage

type Unwrapped (Filter s' s a param) = GUnwrapped (Rep (Filter s' s a param))

data Actor dpDefinition filterState filterParam monadicAction Source #

Actor Is a particular Stage computation inside a Filter.

dpDefinition ~ Source (Channel ..) :=> Generator (Channel ..) :=> Sink
DP Type level Flow Definition
filterState
State of the StateT Monad that is the local State of the Filter execution
filterParam
Type of the First Parameter that is pass to the Filter when it is created by the Generator Anamorphism. Generator can change the type received from the Reader Channels.
monadicAction
Monad Wrapped in StateT.

data GeneratorStage dpDefinition filterState filterParam st Source #

GeneartorStage is a special Stage data type according to DPP Definition which contains a Filter template definition, in orther to know how to spawn a new Filter if it is needed, and the Stage of the Generator to allow the user to perform some computation in that case.

dpDefinition ~ Source (Channel ..) :=> Generator (Channel ..) :=> Sink
DP Type level Flow Definition
filterState
State of the StateT Monad that is the local State of the Filter execution
filterParam
Type of the First Parameter that is pass to the Filter when it is created by the Generator Anamorphism. Generator can change the type received from the Reader Channels.
st
Existential Scope of DP Monad.

data Stage a Source #

type family ValidDP (a :: Bool) :: Constraint where ... Source #

FCF - Type Level Defunctionalization ValidDP Check if IsDP is True

a
IsDP dpDefinition ~ 'True

Throw a TypeError if Grammar is not correct

Equations

ValidDP 'True = () 
ValidDP 'False = TypeError ((((((('Text "Invalid Semantic for Building DP Program" :$$: 'Text "Language Grammar:") :$$: 'Text "DP -> Source CHANS :=> Generator CHANS :=> Sink") :$$: 'Text "DP -> Source CHANS :=> Generator CHANS :=> FEEDBACK :=> Sink") :$$: 'Text "CHANS -> Channel CH") :$$: 'Text "FEEDBACK -> FeedbackChannel CH") :$$: 'Text "CH -> Type :<+> CH | Eof") :$$: 'Text "Example: 'Source (Channel (Int :<+> Int)) :=> Generator (Channel (Int :<+> Int)) :=> Sink'") 

type family IsDP (dpDefinition :: k) :: Bool where ... Source #

FCF - Type Level Defunctionalization IsDP Validates if DP Flow Type Level Definition is Correct according to the Grammar

dpDefinition ~ Source (Channel ..) :=> Generator (Channel ..) :=> Sink
DP Type level Flow Definition

Equations

IsDP (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> Sink)) = And (IsDP (Source (Channel inToGen))) (IsDP (Generator (Channel genToOut))) 
IsDP (Source (Channel inToGen) :=> (Generator (Channel genToOut) :=> (FeedbackChannel toSource :=> Sink))) = And (IsDP (Source (Channel inToGen))) (IsDP (Generator (Channel genToOut))) 
IsDP (Source (Channel (a :<+> more))) = IsDP (Source (Channel more)) 
IsDP (Source (Channel Eof)) = 'True 
IsDP (Generator (Channel (a :<+> more))) = IsDP (Generator (Channel more)) 
IsDP (Generator (Channel a)) = 'True 
IsDP x = 'False 

data DP st a Source #

DP is the only Monadic Action allowed to run a DP Defined Flow. It is restricted on Scope by its Existential Type st in order to not escape out from this Monadic Context.

st
Existential Type to Ensure context of Monadic DP
a
Any Type that carries the Monadic Context DP

Instances

Instances details
Monad (DP st) Source # 
Instance details

Defined in DynamicPipeline.Stage

Methods

(>>=) :: DP st a -> (a -> DP st b) -> DP st b #

(>>) :: DP st a -> DP st b -> DP st b #

return :: a -> DP st a #

Functor (DP st) Source # 
Instance details

Defined in DynamicPipeline.Stage

Methods

fmap :: (a -> b) -> DP st a -> DP st b #

(<$) :: a -> DP st b -> DP st a #

Applicative (DP st) Source # 
Instance details

Defined in DynamicPipeline.Stage

Methods

pure :: a -> DP st a #

(<*>) :: DP st (a -> b) -> DP st a -> DP st b #

liftA2 :: (a -> b -> c) -> DP st a -> DP st b -> DP st c #

(*>) :: DP st a -> DP st b -> DP st b #

(<*) :: DP st a -> DP st b -> DP st a #

MonadIO (DP st) Source # 
Instance details

Defined in DynamicPipeline.Stage

Methods

liftIO :: IO a -> DP st a #

data UnFoldFilter dpDefinition readElem st filterState filterParam l Source #

UnFoldFilter is a wrapper Data Type that contains all the information needed to spawn Filter instances according to DPP. The user will have the capability to select how those filters are going to be spawned, for example on each read element, how to setup initial states of StateT Monad on Actor computations in filters, among others.

dpDefinition ~ Source (Channel ..) :=> Generator (Channel ..) :=> Sink
DP Type level Flow Definition
readElem
Type of the element that is being read from the Selected Channel in the Generator Stage
st
Existential Scope of DP Monad.
filterState
State of the StateT Monad that is the local State of the Filter execution
filterParam
Type of the First Parameter that is pass to the Filter when it is created by the Generator Anamorphism. Generator can change the type received from the Reader Channels.

withDP :: IO a -> DP s a Source #

Smart Constructor of DP from IO action

mkGenerator Source #

Arguments

:: Stage (WithGenerator dpDefinition (Filter dpDefinition filterState filterParam st) (DP st))

Generator Stage

-> Filter dpDefinition filterState filterParam st

Filter template

-> GeneratorStage dpDefinition filterState filterParam st 

Smart Constructor of GeneratorStage.

mkFilter Source #

Arguments

:: forall dpDefinition filterState filterParam st. WithFilter dpDefinition filterParam (StateT filterState (DP st))

Associated type family to Generate Function Signature

-> Filter dpDefinition filterState filterParam st 

Smart Constructor of Filter.

single Source #

Arguments

:: forall dpDefinition filterState filterParam st. WithFilter dpDefinition filterParam (StateT filterState (DP st))

Associated type family to Generate Function Signature

-> NonEmpty (Actor dpDefinition filterState filterParam (StateT filterState (DP st))) 

Smart Constructor of Single Actor Wrapped in NonEmpty List.

actor Source #

Arguments

:: forall dpDefinition filterState filterParam st. WithFilter dpDefinition filterParam (StateT filterState (DP st))

Associated type family to Generate Function Signature

-> Actor dpDefinition filterState filterParam (StateT filterState (DP st)) 

Smart Constructor of Actor.

(|>>>) infixr 5 Source #

Arguments

:: forall dpDefinition filterState filterParam st. Actor dpDefinition filterState filterParam (StateT filterState (DP st))

New Actor to put on front

-> Filter dpDefinition filterState filterParam st

Existing Filter

-> Filter dpDefinition filterState filterParam st 

Combinator to build Filter in a DSL approach. Add a new Actor to an already existing Filter.

(|>>) infixr 5 Source #

Arguments

:: forall dpDefinition filterState filterParam st. Actor dpDefinition filterState filterParam (StateT filterState (DP st))

Actor 1

-> Actor dpDefinition filterState filterParam (StateT filterState (DP st))

Actor 2

-> Filter dpDefinition filterState filterParam st 

Combinator to build Filter in a DSL approach . Given 2 Actors build a Filter.

withSource Source #

Arguments

:: forall (dpDefinition :: Type) st. WithSource dpDefinition (DP st)

Associated type family to Generate Function Signature

-> Stage (WithSource dpDefinition (DP st)) 

Combinator for Building a Source Stage. It uses an Associated Type Class to deduce the Function Signature required to the user taken from DP Type Level Flow Definition [dpDefinition ~ Source (Channel ..) :=> Generator (Channel ..) :=> Sink]: DP Type level Flow Definition

st
Existential Scope of DP Monad.

withGenerator Source #

Arguments

:: forall (dpDefinition :: Type) (filter :: Type) st. WithGenerator dpDefinition filter (DP st)

Associated type family to Generate Function Signature

-> Stage (WithGenerator dpDefinition filter (DP st)) 

Combinator for Building a Generator Stage. It uses an Associated Type Class to deduce the Function Signature required to the user taken from DP Type Level Flow Definition [dpDefinition ~ Source (Channel ..) :=> Generator (Channel ..) :=> Sink]: DP Type level Flow Definition

filter
Filter template type
st
Existential Scope of DP Monad.

withSink Source #

Arguments

:: forall (dpDefinition :: Type) st. WithSink dpDefinition (DP st)

Associated type family to Generate Function Signature

-> Stage (WithSink dpDefinition (DP st)) 

Combinator for Building a Sink Stage. It uses an Associated Type Class to deduce the Function Signature required to the user taken from DP Type Level Flow Definition [dpDefinition ~ Source (Channel ..) :=> Generator (Channel ..) :=> Sink]: DP Type level Flow Definition

st
Existential Scope of DP Monad.

mkDP Source #

Arguments

:: forall dpDefinition filterState st filterParam filter gparams slr slw glr glw silr silw iparams oparams ls lsi. DPConstraint dpDefinition filterState st filterParam filter gparams slr slw glr glw silr silw iparams oparams ls lsi 
=> Stage (WithSource dpDefinition (DP st))

Source Stage generated by withSource combinator

-> GeneratorStage dpDefinition filterState filterParam st

Generator Stage generated by withGenerator combinator

-> Stage (WithSink dpDefinition (DP st))

Sink Stage generated by withSink combinator

-> DP st () 

Smart constructor for DynamicPipeline Definition

runDP :: (forall st. DP st a) -> IO a Source #

Run DP Monad to final IO result

unfoldF Source #

Arguments

:: forall dpDefinition readElem st filterState filterParam l l1 l2 l3 b2 b3 l4. SpawnFilterConstraint dpDefinition readElem st filterState filterParam l l1 l2 l3 b2 b3 l4 
=> UnFoldFilter dpDefinition readElem st filterState filterParam l

UnFoldFilter

-> DP st (HList l)

Return the list of ReadChannels with the results to be read for the Generator at the end. You can use this to pass the results to Sink

mkUnfoldFilter Source #

Arguments

:: (readElem -> Bool)

Given a new Element determine if we need to interpose a new Filter or not

-> (readElem -> DP st ())

For each element that the Filter is consuming allow to do something outside the filter with that element. For example trace or debug

-> Filter dpDefinition filterState filterParam st

Filter Template

-> (readElem -> filterState)

Given the First element in this Filter Instance how to Initiate Internal Filter StateT (Memory)

-> ReadChannel readElem

Main ReadChannel to feed filter

-> HList l

HList with the rest of the ReadChannels if There are needed or HNil if it only contians 1 read channel

-> UnFoldFilter dpDefinition readElem st filterState filterParam l 

Smart Constructor for UnFoldFilter

mkUnfoldFilter' Source #

Arguments

:: (readElem -> Bool) 
-> Filter dpDefinition filterState filterParam st 
-> (readElem -> filterState) 
-> ReadChannel readElem 
-> HList l 
-> UnFoldFilter dpDefinition readElem st filterState filterParam l 

Smart Constructor for UnFoldFilter which bypass to do something externally on each read element

mkUnfoldFilterForAll Source #

Arguments

:: Filter dpDefinition filterState filterParam st 
-> (readElem -> filterState) 
-> ReadChannel readElem 
-> HList l 
-> UnFoldFilter dpDefinition readElem st filterState filterParam l 

Smart Constructor for UnFoldFilter That creates a Filter for each element on the Read Channel and interpose on Front of Generator Stage and Last Filter

 Source ---> Filter1 ---> Filter2 ... ---> FilterN ---> Generator ---> Sink

mkUnfoldFilterForAll' Source #

Arguments

:: (readElem -> DP st ()) 
-> Filter dpDefinition filterState filterParam st 
-> (readElem -> filterState) 
-> ReadChannel readElem 
-> HList l 
-> UnFoldFilter dpDefinition readElem st filterState filterParam l 

Idem for mkUnfoldFilterForAll but do something on each Element externally

(.*.) :: HExtend e l => e -> l -> HExtendR e l #

data family HList (l :: [Type]) #

Instances

Instances details
(SameLengths '[x, y, xy], HZipList x y xy) => HUnzip HList x y xy 
Instance details

Defined in Data.HList.HList

Methods

hUnzip :: HList xy -> (HList x, HList y)

(SameLengths '[x, y, xy], HZipList x y xy) => HZip HList x y xy 
Instance details

Defined in Data.HList.HList

Methods

hZip :: HList x -> HList y -> HList xy

HMapAux HList f ('[] :: [Type]) ('[] :: [Type]) 
Instance details

Defined in Data.HList.HList

Methods

hMapAux :: f -> HList '[] -> HList '[]

(HSpanEqBy f a as fst snd, HGroupBy f snd gs) => HGroupBy (f :: t) (a ': as) (HList (a ': fst) ': gs) 
Instance details

Defined in Data.HList.HList

Methods

hGroupBy :: Proxy f -> HList (a ': as) -> HList (HList (a ': fst) ': gs)

(ApplyAB f e e', HMapAux HList f l l', SameLength l l') => HMapAux HList f (e ': l) (e' ': l') 
Instance details

Defined in Data.HList.HList

Methods

hMapAux :: f -> HList (e ': l) -> HList (e' ': l')

HReverse l l' => HBuild' l (HList l') 
Instance details

Defined in Data.HList.HList

Methods

hBuild' :: HList l -> HList l'

HExtend e (HList l) 
Instance details

Defined in Data.HList.HList

Associated Types

type HExtendR e (HList l)

Methods

(.*.) :: e -> HList l -> HExtendR e (HList l) #

HInits1 a b => HInits a (HList ('[] :: [Type]) ': b) 
Instance details

Defined in Data.HList.HList

Methods

hInits :: HList a -> HList (HList '[] ': b)

(Bounded x, Bounded (HList xs)) => Bounded (HList (x ': xs)) 
Instance details

Defined in Data.HList.HList

Methods

minBound :: HList (x ': xs) #

maxBound :: HList (x ': xs) #

Bounded (HList ('[] :: [Type])) 
Instance details

Defined in Data.HList.HList

Methods

minBound :: HList '[] #

maxBound :: HList '[] #

(Eq x, Eq (HList xs)) => Eq (HList (x ': xs)) 
Instance details

Defined in Data.HList.HList

Methods

(==) :: HList (x ': xs) -> HList (x ': xs) -> Bool #

(/=) :: HList (x ': xs) -> HList (x ': xs) -> Bool #

Eq (HList ('[] :: [Type])) 
Instance details

Defined in Data.HList.HList

Methods

(==) :: HList '[] -> HList '[] -> Bool #

(/=) :: HList '[] -> HList '[] -> Bool #

(Ord x, Ord (HList xs)) => Ord (HList (x ': xs)) 
Instance details

Defined in Data.HList.HList

Methods

compare :: HList (x ': xs) -> HList (x ': xs) -> Ordering #

(<) :: HList (x ': xs) -> HList (x ': xs) -> Bool #

(<=) :: HList (x ': xs) -> HList (x ': xs) -> Bool #

(>) :: HList (x ': xs) -> HList (x ': xs) -> Bool #

(>=) :: HList (x ': xs) -> HList (x ': xs) -> Bool #

max :: HList (x ': xs) -> HList (x ': xs) -> HList (x ': xs) #

min :: HList (x ': xs) -> HList (x ': xs) -> HList (x ': xs) #

Ord (HList ('[] :: [Type])) 
Instance details

Defined in Data.HList.HList

Methods

compare :: HList '[] -> HList '[] -> Ordering #

(<) :: HList '[] -> HList '[] -> Bool #

(<=) :: HList '[] -> HList '[] -> Bool #

(>) :: HList '[] -> HList '[] -> Bool #

(>=) :: HList '[] -> HList '[] -> Bool #

max :: HList '[] -> HList '[] -> HList '[] #

min :: HList '[] -> HList '[] -> HList '[] #

(HProxies l, Read e, HSequence ReadP (ReadP e ': readP_l) (e ': l), HMapCxt HList ReadElement (AddProxy l) readP_l) => Read (HList (e ': l)) 
Instance details

Defined in Data.HList.HList

Methods

readsPrec :: Int -> ReadS (HList (e ': l)) #

readList :: ReadS [HList (e ': l)] #

readPrec :: ReadPrec (HList (e ': l)) #

readListPrec :: ReadPrec [HList (e ': l)] #

Read (HList ('[] :: [Type])) 
Instance details

Defined in Data.HList.HList

(Show e, Show (HList l)) => Show (HList (e ': l)) 
Instance details

Defined in Data.HList.HList

Methods

showsPrec :: Int -> HList (e ': l) -> ShowS #

show :: HList (e ': l) -> String #

showList :: [HList (e ': l)] -> ShowS #

Show (HList ('[] :: [Type])) 
Instance details

Defined in Data.HList.HList

Methods

showsPrec :: Int -> HList '[] -> ShowS #

show :: HList '[] -> String #

showList :: [HList '[]] -> ShowS #

(Ix x, Ix (HList xs)) => Ix (HList (x ': xs)) 
Instance details

Defined in Data.HList.HList

Methods

range :: (HList (x ': xs), HList (x ': xs)) -> [HList (x ': xs)] #

index :: (HList (x ': xs), HList (x ': xs)) -> HList (x ': xs) -> Int #

unsafeIndex :: (HList (x ': xs), HList (x ': xs)) -> HList (x ': xs) -> Int #

inRange :: (HList (x ': xs), HList (x ': xs)) -> HList (x ': xs) -> Bool #

rangeSize :: (HList (x ': xs), HList (x ': xs)) -> Int #

unsafeRangeSize :: (HList (x ': xs), HList (x ': xs)) -> Int #

Ix (HList ('[] :: [Type])) 
Instance details

Defined in Data.HList.HList

Methods

range :: (HList '[], HList '[]) -> [HList '[]] #

index :: (HList '[], HList '[]) -> HList '[] -> Int #

unsafeIndex :: (HList '[], HList '[]) -> HList '[] -> Int #

inRange :: (HList '[], HList '[]) -> HList '[] -> Bool #

rangeSize :: (HList '[], HList '[]) -> Int #

unsafeRangeSize :: (HList '[], HList '[]) -> Int #

(HZip HList a a aa, HMapCxt HList UncurryMappend aa a) => Semigroup (HList a) 
Instance details

Defined in Data.HList.HList

Methods

(<>) :: HList a -> HList a -> HList a #

sconcat :: NonEmpty (HList a) -> HList a #

stimes :: Integral b => b -> HList a -> HList a #

(HProxies a, HMapCxt HList ConstMempty (AddProxy a) a, HZip HList a a aa, HMapCxt HList UncurryMappend aa a) => Monoid (HList a) 
Instance details

Defined in Data.HList.HList

Methods

mempty :: HList a #

mappend :: HList a -> HList a -> HList a #

mconcat :: [HList a] -> HList a #

(TypeRepsList (HList xs), Typeable x) => TypeRepsList (HList (x ': xs)) 
Instance details

Defined in Data.HList.Data

Methods

typeRepsList :: HList (x ': xs) -> [TypeRep]

TypeRepsList (HList ('[] :: [Type])) 
Instance details

Defined in Data.HList.Data

Methods

typeRepsList :: HList '[] -> [TypeRep]

HAppendList l1 l2 => HAppend (HList l1) (HList l2) 
Instance details

Defined in Data.HList.HList

Methods

hAppend :: HList l1 -> HList l2 -> HAppendR (HList l1) (HList l2)

ApplyAB f e e' => ApplyAB (MapCar f) (e, HList l) (HList (e' ': l)) 
Instance details

Defined in Data.HList.HList

Methods

applyAB :: MapCar f -> (e, HList l) -> HList (e' ': l)

HInits1 ('[] :: [Type]) '[HList ('[] :: [Type])] 
Instance details

Defined in Data.HList.HList

Methods

hInits1 :: HList '[] -> HList '[HList '[]]

HTails ('[] :: [Type]) '[HList ('[] :: [Type])] 
Instance details

Defined in Data.HList.HList

Methods

hTails :: HList '[] -> HList '[HList '[]]

Apply (FHUProj sel ns) (HList l, Proxy ('HSucc n)) => Apply (Proxy 'False, FHUProj sel ns) (HList (e ': l), Proxy n) 
Instance details

Defined in Data.HList.HArray

Associated Types

type ApplyR (Proxy 'False, FHUProj sel ns) (HList (e ': l), Proxy n)

Methods

apply :: (Proxy 'False, FHUProj sel ns) -> (HList (e ': l), Proxy n) -> ApplyR (Proxy 'False, FHUProj sel ns) (HList (e ': l), Proxy n)

Apply (Proxy 'True, FHUProj sel ns) (HList (e ': l), Proxy n) 
Instance details

Defined in Data.HList.HArray

Associated Types

type ApplyR (Proxy 'True, FHUProj sel ns) (HList (e ': l), Proxy n)

Methods

apply :: (Proxy 'True, FHUProj sel ns) -> (HList (e ': l), Proxy n) -> ApplyR (Proxy 'True, FHUProj sel ns) (HList (e ': l), Proxy n)

(ch ~ Proxy (HBoolEQ sel (KMember n ns)), Apply (ch, FHUProj sel ns) (HList (e ': l), Proxy n)) => Apply (FHUProj sel ns) (HList (e ': l), Proxy n) 
Instance details

Defined in Data.HList.HArray

Associated Types

type ApplyR (FHUProj sel ns) (HList (e ': l), Proxy n)

Methods

apply :: FHUProj sel ns -> (HList (e ': l), Proxy n) -> ApplyR (FHUProj sel ns) (HList (e ': l), Proxy n)

Apply (FHUProj sel ns) (HList ('[] :: [Type]), n) 
Instance details

Defined in Data.HList.HArray

Associated Types

type ApplyR (FHUProj sel ns) (HList '[], n)

Methods

apply :: FHUProj sel ns -> (HList '[], n) -> ApplyR (FHUProj sel ns) (HList '[], n)

(HConcatFD as bs, HAppendFD a bs cs) => HConcatFD (HList a ': as) cs 
Instance details

Defined in Data.HList.HList

Methods

hConcatFD :: HList (HList a ': as) -> HList cs

(HInits1 xs ys, HMapCxt HList (FHCons2 x) ys ys', HMapCons x ys ~ ys', HMapTail ys' ~ ys) => HInits1 (x ': xs) (HList '[x] ': ys') 
Instance details

Defined in Data.HList.HList

Methods

hInits1 :: HList (x ': xs) -> HList (HList '[x] ': ys')

HTails xs ys => HTails (x ': xs) (HList (x ': xs) ': ys) 
Instance details

Defined in Data.HList.HList

Methods

hTails :: HList (x ': xs) -> HList (HList (x ': xs) ': ys)

HMapUnboxF xs us => HMapUnboxF (HList x ': xs) (RecordU x ': us) 
Instance details

Defined in Data.HList.RecordU

(HList (x ': y) ~ z, HZip3 xs ys zs) => HZip3 (x ': xs) (HList y ': ys) (z ': zs) 
Instance details

Defined in Data.HList.HZip

Methods

hZip3 :: HList (x ': xs) -> HList (HList y ': ys) -> HList (z ': zs)

type HExtendR e (HList l) 
Instance details

Defined in Data.HList.HList

type HExtendR e (HList l) = HList (e ': l)
type HMapCons x (HList a ': b) 
Instance details

Defined in Data.HList.HList

type HMapCons x (HList a ': b) = HList (x ': a) ': HMapCons x b
data HList ('[] :: [Type]) 
Instance details

Defined in Data.HList.HList

data HList ('[] :: [Type]) = HNil
type UnHList (HList a) 
Instance details

Defined in Data.HList.HList

type UnHList (HList a) = a
type HAppendR (HList l1 :: Type) (HList l2 :: Type) 
Instance details

Defined in Data.HList.HList

type HAppendR (HList l1 :: Type) (HList l2 :: Type) = HList (HAppendListR l1 l2)
type ApplyR (Proxy 'False, FHUProj sel ns) (HList (e ': l), Proxy n) 
Instance details

Defined in Data.HList.HArray

type ApplyR (Proxy 'False, FHUProj sel ns) (HList (e ': l), Proxy n) = ApplyR (FHUProj sel ns) (HList l, Proxy ('HSucc n))
type ApplyR (Proxy 'True, FHUProj sel ns) (HList (e ': l), Proxy n) 
Instance details

Defined in Data.HList.HArray

type ApplyR (Proxy 'True, FHUProj sel ns) (HList (e ': l), Proxy n) = HJust (e, (HList l, Proxy ('HSucc n)))
type ApplyR (FHUProj sel ns) (HList ('[] :: [Type]), n) 
Instance details

Defined in Data.HList.HArray

type ApplyR (FHUProj sel ns) (HList ('[] :: [Type]), n) = HNothing
type ApplyR (FHUProj sel ns) (HList (e ': l), Proxy n) 
Instance details

Defined in Data.HList.HArray

type ApplyR (FHUProj sel ns) (HList (e ': l), Proxy n) = ApplyR (Proxy (HBoolEQ sel (KMember n ns)), FHUProj sel ns) (HList (e ': l), Proxy n)
data HList (x ': xs) 
Instance details

Defined in Data.HList.HList

data HList (x ': xs) = x `HCons` (HList xs)
type HMapTail (HList (a ': as) ': bs) 
Instance details

Defined in Data.HList.HList

type HMapTail (HList (a ': as) ': bs) = HList as ': HMapTail bs

hHead :: forall e (l :: [Type]). HList (e ': l) -> e #

hTail :: forall e (l :: [Type]). HList (e ': l) -> HList l #

Channels

data ReadChannel a Source #

ReadChannel can only read values of a previously written Channel. It is connected to a WriteChannel but hidden for the user

a
Type that this Channel can read

data WriteChannel a Source #

WriteChannel can only write values into some Channel Queue

a
Type that this Channel can write

(|=>) :: MonadIO m => ReadChannel a -> WriteChannel a -> m () infixl 5 Source #

Same as map_ but with id combinator

(|=>|) :: MonadIO m => ReadChannel a -> WriteChannel b -> (a -> b) -> m () infixl 5 Source #

Alias mapF_

(|>=>) :: MonadIO m => ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m () infixr 5 Source #

Alias mapM_

(|>=>|) :: MonadIO m => ReadChannel a -> WriteChannel b -> (a -> m (Maybe b)) -> m () infixr 5 Source #

Alias mapMF_

mapF_ Source #

Arguments

:: MonadIO m 
=> ReadChannel a

ReadChannel

-> WriteChannel b

ReadChannel

-> (a -> b)

Monadic Transformation to do with read element

-> m () 

Same as map_ but mark Eof Channel after all processing

map_ Source #

Arguments

:: MonadIO m 
=> ReadChannel a

ReadChannel

-> WriteChannel b

ReadChannel

-> (a -> b)

Monadic Transformation to do with read element

-> m () 

map_ is a Natural Transformation from consumer ReadChannel to some producer WriteChannel applying a transformation with function f

mapM_ Source #

Arguments

:: MonadIO m 
=> ReadChannel a

ReadChannel

-> WriteChannel b

ReadChannel

-> (a -> m (Maybe b))

Monadic Transformation to do with read element

-> m () 

Same as map_ But applying a Monadic mapping

mapMF_ Source #

Arguments

:: MonadIO m 
=> ReadChannel a

ReadChannel

-> WriteChannel b

ReadChannel

-> (a -> m (Maybe b))

Monadic Transformation to do with read element

-> m () 

Same as mapM_ but mark Eof Channel after all processing

foldM_ Source #

Arguments

:: MonadIO m 
=> ReadChannel a

ReadChannel

-> (a -> m ())

Computation to do with read element

-> m () 

foldM_ is a Catamorphism for consuming a ReadChannel and do some Monadic m computation with each element

foldWithM_ Source #

Arguments

:: MonadIO m 
=> ReadChannel a

ReadChannel

-> m ()

Computation to do at the end of the channel

-> (a -> m ())

Computation to do with read element

-> m () 

Idem foldM_ but allows pass a monadic computation to perform at the end of the Channel

push :: MonadIO m => a -> WriteChannel a -> m () Source #

Push element a into WriteChannel

pull :: MonadIO m => ReadChannel a -> m (Maybe a) Source #

Pull element Maybe a from ReadChannel

finish :: MonadIO m => WriteChannel a -> m () Source #

Finalize Channel to indicate EOF mark and allow progress on following consumers

unfoldM Source #

Arguments

:: forall m a b. MonadIO m 
=> m a

Monadic Seed

-> (a -> b)

Map input from seed to something to be written in Channel

-> m Bool

When stop unfolding

-> WriteChannel b

WriteChannel to write input seed elements

-> m () 

Coalgebra with Monadic computation to Feed some WriteChannel

m
Monadic computation wrapping Coalgebra
a
Element get from some Source and to be write in some Channel

| unfold from a Monadic seed m a to a WriteChannel

unfoldFile Source #

Arguments

:: MonadIO m 
=> FilePath

Seed FilePath to read from

-> WriteChannel b

WriteChannel to write File contents

-> (ByteString -> b)

Transform ByteString read from File to something meaningful for your App

-> m () 

Using unfoldM, unfold from file

unfoldT :: (MonadIO m, Foldable t) => t a -> WriteChannel b -> (a -> b) -> m () Source #

Idem unfoldM but for Foldable, for example a List [a]. Useful for testing purpose