{-# LANGUAGE AllowAmbiguousTypes  #-}
{-# LANGUAGE UndecidableInstances #-}
-- |
-- Module      : DynamicPipeline.Channel
-- Copyright   : (c) 2021 Juan Pablo Royo Sales
--
-- License     : BSD3
-- Maintainer  : juanpablo.royo@gmail.com
-- Stability   : experimental
-- Portability : GHC
--
module DynamicPipeline.Stage
  ( DynamicPipeline,
    Filter,
    Actor,
    GeneratorStage,
    Stage,
    ValidDP,
    IsDP,
    DP,
    UnFoldFilter,
    withDP,
    mkGenerator,
    mkFilter,
    single,
    actor,
    (|>>>),
    (|>>),
    withSource,
    withGenerator,
    withSink,
    mkDP,
    runDP,
    unfoldF,
    mkUnfoldFilter,
    mkUnfoldFilter',
    mkUnfoldFilterForAll,
    mkUnfoldFilterForAll'
  ) where

import           Control.Concurrent.Async
import           Control.Lens             hiding ((<|))
import           Data.HList
import           Data.List.NonEmpty
import           DynamicPipeline.Channel hiding (mapM_)
import           DynamicPipeline.Flow
import           GHC.TypeLits
import           Relude                   as R


-- | FCF - Type Level Defunctionalization: Boolean 'And' Type Level Function
type family And (a :: Bool) (b :: Bool) :: Bool where
  And 'True 'True = 'True
  And a b         = 'False

-- | FCF - Type Level Defunctionalization
-- 'IsDP' Validates if /DP/ Flow Type Level Definition is Correct according to the Grammar
--
-- [@dpDefinition ~ 'Source' ('Channel' ..) ':=>' 'Generator' ('Channel' ..) ':=>' 'Sink'@]: /DP/ Type level Flow Definition
--
type family IsDP (dpDefinition :: k) :: Bool where
  IsDP (Source (Channel inToGen)
        :=> Generator (Channel genToOut)
        :=> Sink)
                                            = And (IsDP (Source (Channel inToGen))) (IsDP (Generator (Channel genToOut)))
  IsDP ( Source (Channel inToGen)
         :=> Generator (Channel genToOut)
         :=> FeedbackChannel toSource 
         :=> Sink 
        )
                                            = And (IsDP (Source (Channel inToGen))) (IsDP (Generator (Channel genToOut)))
  IsDP (Source (Channel (a :<+> more)))     = IsDP (Source (Channel more))
  IsDP (Source (Channel Eof))               = 'True
  IsDP (Generator (Channel (a :<+> more)))  = IsDP (Generator (Channel more))
  IsDP (Generator (Channel a))              = 'True
  IsDP x                                    = 'False


-- | FCF - Type Level Defunctionalization
-- 'ValidDP' Check if 'IsDP' is True
--
-- [@a@]: @IsDP dpDefinition ~ 'True@
--
-- Throw a 'TypeError' if Grammar is not correct
type family ValidDP (a :: Bool) :: Constraint where
  ValidDP 'True = ()
  ValidDP 'False = TypeError
                    ( 'Text "Invalid Semantic for Building DP Program"
                      ':$$: 'Text "Language Grammar:"
                      ':$$: 'Text "DP       -> Source CHANS :=> Generator CHANS :=> Sink"
                      ':$$: 'Text "DP       -> Source CHANS :=> Generator CHANS :=> FEEDBACK :=> Sink"
                      ':$$: 'Text "CHANS    -> Channel CH"
                      ':$$: 'Text "FEEDBACK -> FeedbackChannel CH"
                      ':$$: 'Text "CH       -> Type :<+> CH | Eof"
                      ':$$: 'Text "Example: 'Source (Channel (Int :<+> Int)) :=> Generator (Channel (Int :<+> Int)) :=> Sink'"
                    )

-- Inductive Type Family for Expanding and building Source, Generator, Filter and Sink Functions Signatures
type family WithSource (dpDefinition :: Type) (monadicAction :: Type -> Type) :: Type where
  WithSource (Source (Channel inToGen) :=> Generator (Channel genToOut) :=> Sink) monadicAction
                                                                     = WithSource (ChanIn inToGen) monadicAction
  WithSource (Source (Channel inToGen) :=> Generator (Channel genToOut) :=> FeedbackChannel toSource :=> Sink) monadicAction
                                                                     = WithSource (ChanOutIn toSource inToGen) monadicAction
  WithSource (ChanIn (dpDefinition :<+> more)) monadicAction         = WriteChannel dpDefinition -> WithSource (ChanIn more) monadicAction
  WithSource (ChanIn Eof) monadicAction                              = monadicAction ()
  WithSource (ChanOutIn (dpDefinition :<+> more) ins) monadicAction  = ReadChannel dpDefinition -> WithSource (ChanOutIn more ins) monadicAction
  WithSource (ChanOutIn Eof ins) monadicAction                       = WithSource (ChanIn ins) monadicAction
  WithSource dpDefinition _                                          = TypeError
                                                                        ( 'Text "Invalid Semantic for Source Stage"
                                                                          ':$$: 'Text "in the DP Definition '"
                                                                          ':<>: 'ShowType dpDefinition
                                                                          ':<>: 'Text "'"
                                                                          ':$$: 'Text "Language Grammar:"
                                                                          ':$$: 'Text "DP       -> Source CHANS :=> Generator CHANS :=> Sink"
                                                                          ':$$: 'Text "DP       -> Source CHANS :=> Generator CHANS :=> FEEDBACK :=> Sink"
                                                                          ':$$: 'Text "CHANS    -> Channel CH"
                                                                          ':$$: 'Text "FEEDBACK -> FeedbackChannel CH"
                                                                          ':$$: 'Text "CH       -> Type :<+> CH | Eof"
                                                                          ':$$: 'Text "Example: 'Source (Channel (Int :<+> Int)) :=> Generator (Channel (Int :<+> Int)) :=> Sink'"
                                                                        )

type family WithGenerator (a :: Type) (filter :: Type) (monadicAction :: Type -> Type) :: Type where
  WithGenerator (Source (Channel inToGen) :=> Generator (Channel genToOut) :=> Sink) filter monadicAction
                                                                         = filter -> WithGenerator (ChanOutIn inToGen genToOut) filter monadicAction
  WithGenerator (Source (Channel inToGen) :=> Generator (Channel genToOut) :=> FeedbackChannel toSource :=> Sink) filter monadicAction
                                                                         = filter -> WithGenerator (ChanOutIn inToGen (ChanInIn genToOut toSource)) filter monadicAction
  WithGenerator (ChanInIn (a :<+> more) ins) filter monadicAction        = WriteChannel a -> WithGenerator (ChanInIn more ins) filter monadicAction
  WithGenerator (ChanInIn Eof ins) filter monadicAction                  = WithGenerator (ChanIn ins) filter monadicAction
  WithGenerator (ChanIn (a :<+> more)) filter monadicAction              = WriteChannel a -> WithGenerator (ChanIn more) filter monadicAction
  WithGenerator (ChanIn Eof) filter monadicAction                        = monadicAction ()
  WithGenerator (ChanOutIn (a :<+> more) ins) filter monadicAction       = ReadChannel a -> WithGenerator (ChanOutIn more ins) filter monadicAction
  WithGenerator (ChanOutIn Eof (ChanInIn ins ins2)) filter monadicAction = WithGenerator (ChanInIn ins ins2) filter monadicAction
  WithGenerator (ChanOutIn Eof ins) filter monadicAction                 = WithGenerator (ChanIn ins) filter monadicAction
  WithGenerator dpDefinition _ _                                         = TypeError
                                                                             ( 'Text "Invalid Semantic for Generator Stage"
                                                                               ':$$: 'Text "in the DP Definition '"
                                                                               ':<>: 'ShowType dpDefinition
                                                                               ':<>: 'Text "'"
                                                                               ':$$: 'Text "Language Grammar:"
                                                                               ':$$: 'Text "DP       -> Source CHANS :=> Generator CHANS :=> Sink"
                                                                               ':$$: 'Text "DP       -> Source CHANS :=> Generator CHANS :=> FEEDBACK :=> Sink"
                                                                               ':$$: 'Text "CHANS    -> Channel CH"
                                                                               ':$$: 'Text "FEEDBACK -> FeedbackChannel CH"
                                                                               ':$$: 'Text "CH       -> Type :<+> CH | Eof"
                                                                               ':$$: 'Text "Example: 'Source (Channel (Int :<+> Int)) :=> Generator (Channel (Int :<+> Int)) :=> Sink'"
                                                                             )
     
type family WithFilter (dpDefinition :: Type) (param :: Type) (monadicAction :: Type -> Type) :: Type where
  WithFilter (Source (Channel inToGen) :=> Generator (Channel genToOut) :=> Sink) param monadicAction
                                                    = param -> WithFilter (ChanOutIn inToGen genToOut) param monadicAction
  WithFilter (Source (Channel inToGen) :=> Generator (Channel genToOut) :=> FeedbackChannel toSource  :=> Sink) param monadicAction
                                                    = param -> WithFilter (ChanOutIn inToGen (ChanInIn genToOut toSource)) param monadicAction
  WithFilter (ChanInIn (a :<+> more) ins) param monadicAction       
                                                    = WriteChannel a -> WithFilter (ChanInIn more ins) param monadicAction
  WithFilter (ChanInIn Eof ins) param monadicAction                 
                                                    = WithFilter (ChanIn ins) param monadicAction
  WithFilter (ChanIn (dpDefinition :<+> more)) param monadicAction         
                                                    = WriteChannel dpDefinition -> WithFilter (ChanIn more) param monadicAction
  WithFilter (ChanIn Eof) param monadicAction       = monadicAction ()
  WithFilter (ChanOutIn (dpDefinition :<+> more) ins) param monadicAction  
                                                    = ReadChannel dpDefinition -> WithFilter (ChanOutIn more ins) param monadicAction
  WithFilter (ChanOutIn Eof (ChanInIn ins ins2)) param monadicAction 
                                                    = WithFilter (ChanInIn ins ins2) param monadicAction
  WithFilter (ChanOutIn Eof ins) param m            = WithFilter (ChanIn ins) param m
  WithFilter dpDefinition _ _                       = TypeError
                                                        ( 'Text "Invalid Semantic Semantic for Generator Stage"
                                                          ':$$: 'Text "in the DP Definition '"
                                                          ':<>: 'ShowType dpDefinition
                                                          ':<>: 'Text "'"
                                                          ':$$: 'Text "Language Grammar:"
                                                          ':$$: 'Text "DP       -> Source CHANS :=> Generator CHANS :=> Sink"
                                                          ':$$: 'Text "DP       -> Source CHANS :=> Generator CHANS :=> FEEDBACK :=> Sink"
                                                          ':$$: 'Text "CHANS    -> Channel CH"
                                                          ':$$: 'Text "FEEDBACK -> FeedbackChannel CH"
                                                          ':$$: 'Text "CH       -> Type :<+> CH | Eof"
                                                          ':$$: 'Text "Example: 'Source (Channel (Int :<+> Int)) :=> Generator (Channel (Int :<+> Int)) :=> Sink'"
                                                        )

type family WithSink (dpDefinition :: Type) (monadicAction :: Type -> Type) :: Type where
  WithSink (Source (Channel inToGen) :=> Generator (Channel genToOut) :=> Sink) monadicAction
                                                              = WithSink (ChanOut genToOut) monadicAction
  WithSink (Source (Channel inToGen) :=> Generator (Channel genToOut) :=> FeedbackChannel toSource :=> Sink) monadicAction
                                                              = WithSink (ChanOut genToOut) monadicAction
  WithSink (ChanOut (dpDefinition :<+> more)) monadicAction   = ReadChannel dpDefinition -> WithSink (ChanOut more) monadicAction
  WithSink (ChanOut Eof) monadicAction                        = monadicAction ()
  WithSink dpDefinition _                                     = TypeError
                                                                    ( 'Text "Invalid Semantic for Sink Stage"
                                                                      ':$$: 'Text "in the DP Definition '"
                                                                      ':<>: 'ShowType dpDefinition
                                                                      ':<>: 'Text "'"
                                                                      ':$$: 'Text "Language Grammar:"
                                                                      ':$$: 'Text "DP       -> Source CHANS :=> Generator CHANS :=> Sink"
                                                                      ':$$: 'Text "DP       -> Source CHANS :=> Generator CHANS :=> FEEDBACK :=> Sink"
                                                                      ':$$: 'Text "CHANS    -> Channel CH"
                                                                      ':$$: 'Text "FEEDBACK -> FeedbackChannel CH"
                                                                      ':$$: 'Text "CH       -> Type :<+> CH | Eof"
                                                                      ':$$: 'Text "Example: 'Source (Channel (Int :<+> Int)) :=> Generator (Channel (Int :<+> Int)) :=> Sink'"
                                                                    )


-- | 'DP' is the only Monadic Action allowed to run a /DP/ Defined Flow.
-- It is restricted on Scope by its Existential Type @st@ in order to not escape out from this Monadic Context.
--
-- [@st@]: Existential Type to Ensure context of Monadic 'DP'
--
-- [@a@]: Any Type that carries the Monadic Context 'DP'
--
newtype DP st a = DP
  { DP st a -> IO a
runStage :: IO a
  } deriving newtype (a -> DP st b -> DP st a
(a -> b) -> DP st a -> DP st b
(forall a b. (a -> b) -> DP st a -> DP st b)
-> (forall a b. a -> DP st b -> DP st a) -> Functor (DP st)
forall k (st :: k) a b. a -> DP st b -> DP st a
forall k (st :: k) a b. (a -> b) -> DP st a -> DP st b
forall a b. a -> DP st b -> DP st a
forall a b. (a -> b) -> DP st a -> DP st b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> DP st b -> DP st a
$c<$ :: forall k (st :: k) a b. a -> DP st b -> DP st a
fmap :: (a -> b) -> DP st a -> DP st b
$cfmap :: forall k (st :: k) a b. (a -> b) -> DP st a -> DP st b
Functor, Functor (DP st)
a -> DP st a
Functor (DP st) =>
(forall a. a -> DP st a)
-> (forall a b. DP st (a -> b) -> DP st a -> DP st b)
-> (forall a b c. (a -> b -> c) -> DP st a -> DP st b -> DP st c)
-> (forall a b. DP st a -> DP st b -> DP st b)
-> (forall a b. DP st a -> DP st b -> DP st a)
-> Applicative (DP st)
DP st a -> DP st b -> DP st b
DP st a -> DP st b -> DP st a
DP st (a -> b) -> DP st a -> DP st b
(a -> b -> c) -> DP st a -> DP st b -> DP st c
forall a. a -> DP st a
forall k (st :: k). Functor (DP st)
forall k (st :: k) a. a -> DP st a
forall k (st :: k) a b. DP st a -> DP st b -> DP st a
forall k (st :: k) a b. DP st a -> DP st b -> DP st b
forall k (st :: k) a b. DP st (a -> b) -> DP st a -> DP st b
forall k (st :: k) a b c.
(a -> b -> c) -> DP st a -> DP st b -> DP st c
forall a b. DP st a -> DP st b -> DP st a
forall a b. DP st a -> DP st b -> DP st b
forall a b. DP st (a -> b) -> DP st a -> DP st b
forall a b c. (a -> b -> c) -> DP st a -> DP st b -> DP st c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
<* :: DP st a -> DP st b -> DP st a
$c<* :: forall k (st :: k) a b. DP st a -> DP st b -> DP st a
*> :: DP st a -> DP st b -> DP st b
$c*> :: forall k (st :: k) a b. DP st a -> DP st b -> DP st b
liftA2 :: (a -> b -> c) -> DP st a -> DP st b -> DP st c
$cliftA2 :: forall k (st :: k) a b c.
(a -> b -> c) -> DP st a -> DP st b -> DP st c
<*> :: DP st (a -> b) -> DP st a -> DP st b
$c<*> :: forall k (st :: k) a b. DP st (a -> b) -> DP st a -> DP st b
pure :: a -> DP st a
$cpure :: forall k (st :: k) a. a -> DP st a
$cp1Applicative :: forall k (st :: k). Functor (DP st)
Applicative, Applicative (DP st)
a -> DP st a
Applicative (DP st) =>
(forall a b. DP st a -> (a -> DP st b) -> DP st b)
-> (forall a b. DP st a -> DP st b -> DP st b)
-> (forall a. a -> DP st a)
-> Monad (DP st)
DP st a -> (a -> DP st b) -> DP st b
DP st a -> DP st b -> DP st b
forall a. a -> DP st a
forall k (st :: k). Applicative (DP st)
forall k (st :: k) a. a -> DP st a
forall k (st :: k) a b. DP st a -> DP st b -> DP st b
forall k (st :: k) a b. DP st a -> (a -> DP st b) -> DP st b
forall a b. DP st a -> DP st b -> DP st b
forall a b. DP st a -> (a -> DP st b) -> DP st b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
return :: a -> DP st a
$creturn :: forall k (st :: k) a. a -> DP st a
>> :: DP st a -> DP st b -> DP st b
$c>> :: forall k (st :: k) a b. DP st a -> DP st b -> DP st b
>>= :: DP st a -> (a -> DP st b) -> DP st b
$c>>= :: forall k (st :: k) a b. DP st a -> (a -> DP st b) -> DP st b
$cp1Monad :: forall k (st :: k). Applicative (DP st)
Monad, Monad (DP st)
Monad (DP st) => (forall a. IO a -> DP st a) -> MonadIO (DP st)
IO a -> DP st a
forall a. IO a -> DP st a
forall k (st :: k). Monad (DP st)
forall k (st :: k) a. IO a -> DP st a
forall (m :: * -> *).
Monad m =>
(forall a. IO a -> m a) -> MonadIO m
liftIO :: IO a -> DP st a
$cliftIO :: forall k (st :: k) a. IO a -> DP st a
$cp1MonadIO :: forall k (st :: k). Monad (DP st)
MonadIO)

-- | Smart Constructor of 'DP' from 'IO' action
withDP :: IO a -> DP s a
withDP :: IO a -> DP s a
withDP = IO a -> DP s a
forall k (st :: k) a. IO a -> DP st a
DP

-- Defunctionalization
data Stage a where
  Stage :: Proxy a -> a -> Stage a

{-# INLINE mkStage #-}
mkStage :: forall a. Proxy a -> a -> Stage a
mkStage :: Proxy a -> a -> Stage a
mkStage = Proxy a -> a -> Stage a
forall a. Proxy a -> a -> Stage a
Stage @a

{-# INLINE mkStage' #-}
mkStage' :: forall a. a -> Stage a
mkStage' :: a -> Stage a
mkStage' = Proxy a -> a -> Stage a
forall a. Proxy a -> a -> Stage a
Stage (Proxy a
forall k (t :: k). Proxy t
Proxy @a)

class EvalC l t | l -> t where
  run :: l -> t

instance forall a b. (a ~ b) => EvalC (Stage a) b where
  run :: Stage a -> b
run (Stage _ f :: a
f) = a
b
f

{-# INLINE runStageWith #-}
runStageWith :: forall (n :: HNat) f (xs :: [*]) s.
            (HCurry' n f xs (DP s ()), ArityFwd f n, ArityRev f n, CloseList xs)
            => Stage f -> HList xs -> DP s (Async ())
runStageWith :: Stage f -> HList xs -> DP s (Async ())
runStageWith fn :: Stage f
fn cIns :: HList xs
cIns = IO (Async ()) -> DP s (Async ())
forall k a (s :: k). IO a -> DP s a
withDP (IO (Async ()) -> DP s (Async ()))
-> IO (Async ()) -> DP s (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (DP s () -> IO ()
forall k (st :: k) a. DP st a -> IO a
runStage (f -> HList xs -> DP s ()
forall (n :: HNat) f (xs :: [*]) r.
(HCurry' n f xs r, ArityFwd f n, ArityRev f n) =>
f -> HList xs -> r
hUncurry (Stage f -> f
forall l t. EvalC l t => l -> t
run Stage f
fn) HList xs
cIns) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> HList xs -> IO ()
forall (xs :: [*]). CloseList xs => HList xs -> IO ()
closeList HList xs
cIns)

{-# INLINE runStageWith' #-}
runStageWith' :: forall (n :: HNat) f (xs :: [*]) (ss :: [*]) s.
            (HCurry' n f xs (DP s ()), ArityFwd f n, ArityRev f n, CloseList ss)
            => Stage f -> HList xs -> HList ss -> DP s (Async ())
runStageWith' :: Stage f -> HList xs -> HList ss -> DP s (Async ())
runStageWith' fn :: Stage f
fn cIns :: HList xs
cIns cClose :: HList ss
cClose = IO (Async ()) -> DP s (Async ())
forall k a (s :: k). IO a -> DP s a
withDP (IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (DP s () -> IO ()
forall k (st :: k) a. DP st a -> IO a
runStage (f -> HList xs -> DP s ()
forall (n :: HNat) f (xs :: [*]) r.
(HCurry' n f xs r, ArityFwd f n, ArityRev f n) =>
f -> HList xs -> r
hUncurry (Stage f -> f
forall l t. EvalC l t => l -> t
run Stage f
fn) HList xs
cIns) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> HList ss -> IO ()
forall (xs :: [*]). CloseList xs => HList xs -> IO ()
closeList HList ss
cClose))

-- | 'DynamicPipeline' data type which contains all the three Stages definitions that have been generated by other combinators like 'withSource',
-- 'withGenerator' and 'withSink'.
--
-- [@dpDefinition ~ 'Source' ('Channel' ..) ':=>' 'Generator' ('Channel' ..) ':=>' 'Sink'@]: /DP/ Type level Flow Definition
--
-- [@filterState@]: State of the 'StateT' 'Monad' that is the local State of the Filter execution
--
-- [@filterParam@]: Type of the First Parameter that is pass to the Filter when it is created by the Generator /Anamorphism/. Generator can change the type received from the Reader Channels.
--
-- [@st@]: Existential Scope of 'DP' 'Monad'.
data DynamicPipeline dpDefinition filterState filterParam st =
  DynamicPipeline
    { DynamicPipeline dpDefinition filterState filterParam st
-> Stage (WithSource dpDefinition (DP st))
source    :: Stage (WithSource dpDefinition (DP st))
    , DynamicPipeline dpDefinition filterState filterParam st
-> GeneratorStage dpDefinition filterState filterParam st
generator :: GeneratorStage dpDefinition filterState filterParam st
    , DynamicPipeline dpDefinition filterState filterParam st
-> Stage (WithSink dpDefinition (DP st))
sink      :: Stage (WithSink dpDefinition (DP st))
    }

-- | 'GeneartorStage' is a special 'Stage' data type according to /DPP/ Definition which contains a 'Filter' template definition,
-- in orther to know how to spawn a new 'Filter' if it is needed, and the 'Stage' of the Generator to allow the user to perform some computation
-- in that case.
--
-- [@dpDefinition ~ 'Source' ('Channel' ..) ':=>' 'Generator' ('Channel' ..) ':=>' 'Sink'@]: /DP/ Type level Flow Definition
--
-- [@filterState@]: State of the 'StateT' 'Monad' that is the local State of the Filter execution
--
-- [@filterParam@]: Type of the First Parameter that is pass to the Filter when it is created by the Generator /Anamorphism/. Generator can change the type received from the Reader Channels.
--
-- [@st@]: Existential Scope of 'DP' 'Monad'.
data GeneratorStage dpDefinition filterState filterParam st = GeneratorStage
  { GeneratorStage dpDefinition filterState filterParam st
-> Stage
     (WithGenerator
        dpDefinition
        (Filter dpDefinition filterState filterParam st)
        (DP st))
_gsGenerator      :: Stage (WithGenerator dpDefinition (Filter dpDefinition filterState filterParam st) (DP st))
  , GeneratorStage dpDefinition filterState filterParam st
-> Filter dpDefinition filterState filterParam st
_gsFilterTemplate :: Filter dpDefinition filterState filterParam st
  }

-- | 'Filter' Is the template definition of the 'Filter' that may be spawned when reading elements on the Stream.
--
-- * 'Filter' is a 'NonEmpty' List of 'Actor's. 
-- 
-- * Each 'Actor' is executed sequentially on the that List when an Element arrive to that 'Filter' instance.
-- 
-- * All the 'Filter' execution (a.k.a. @forM_ actors runStage@) executes in a 'StateT' 'Monad' to share an internal state among 'Actor's.
--
-- [@dpDefinition ~ 'Source' ('Channel' ..) ':=>' 'Generator' ('Channel' ..) ':=>' 'Sink'@]: /DP/ Type level Flow Definition
--
-- [@filterState@]: State of the 'StateT' 'Monad' that is the local State of the Filter execution
--
-- [@filterParam@]: Type of the First Parameter that is pass to the Filter when it is created by the Generator /Anamorphism/. Generator can change the type received from the Reader Channels.
--
-- [@st@]: Existential Scope of 'DP' 'Monad'.
newtype Filter dpDefinition filterState filterParam st =
  Filter { Filter dpDefinition filterState filterParam st
-> NonEmpty
     (Actor
        dpDefinition filterState filterParam (StateT filterState (DP st)))
unFilter :: NonEmpty (Actor dpDefinition filterState filterParam (StateT filterState (DP st))) }
  deriving (forall x.
 Filter dpDefinition filterState filterParam st
 -> Rep (Filter dpDefinition filterState filterParam st) x)
-> (forall x.
    Rep (Filter dpDefinition filterState filterParam st) x
    -> Filter dpDefinition filterState filterParam st)
-> Generic (Filter dpDefinition filterState filterParam st)
forall x.
Rep (Filter dpDefinition filterState filterParam st) x
-> Filter dpDefinition filterState filterParam st
forall x.
Filter dpDefinition filterState filterParam st
-> Rep (Filter dpDefinition filterState filterParam st) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall dpDefinition filterState filterParam k (st :: k) x.
Rep (Filter dpDefinition filterState filterParam st) x
-> Filter dpDefinition filterState filterParam st
forall dpDefinition filterState filterParam k (st :: k) x.
Filter dpDefinition filterState filterParam st
-> Rep (Filter dpDefinition filterState filterParam st) x
$cto :: forall dpDefinition filterState filterParam k (st :: k) x.
Rep (Filter dpDefinition filterState filterParam st) x
-> Filter dpDefinition filterState filterParam st
$cfrom :: forall dpDefinition filterState filterParam k (st :: k) x.
Filter dpDefinition filterState filterParam st
-> Rep (Filter dpDefinition filterState filterParam st) x
Generic

instance Wrapped (Filter s' s a param)

-- | 'Actor' Is a particular 'Stage' computation inside a 'Filter'.
--
-- [@dpDefinition ~ 'Source' ('Channel' ..) ':=>' 'Generator' ('Channel' ..) ':=>' 'Sink'@]: /DP/ Type level Flow Definition
--
-- [@filterState@]: State of the 'StateT' 'Monad' that is the local State of the Filter execution
--
-- [@filterParam@]: Type of the First Parameter that is pass to the Filter when it is created by the Generator /Anamorphism/. Generator can change the type received from the Reader Channels.
--
-- [@monadicAction@]: 'Monad' Wrapped in 'StateT'.
newtype Actor dpDefinition filterState filterParam monadicAction =
  Actor {  Actor dpDefinition filterState filterParam monadicAction
-> MonadState filterState monadicAction =>
   Stage (WithFilter dpDefinition filterParam monadicAction)
unActor :: MonadState filterState monadicAction => Stage (WithFilter dpDefinition filterParam monadicAction) }


-- | Smart Constructor of 'GeneratorStage'.
{-# INLINE mkGenerator #-}
mkGenerator :: Stage (WithGenerator dpDefinition (Filter dpDefinition filterState filterParam st) (DP st)) -- ^Generator 'Stage'
            -> Filter dpDefinition filterState filterParam st -- ^'Filter' template
            -> GeneratorStage dpDefinition filterState filterParam st
mkGenerator :: Stage
  (WithGenerator
     dpDefinition
     (Filter dpDefinition filterState filterParam st)
     (DP st))
-> Filter dpDefinition filterState filterParam st
-> GeneratorStage dpDefinition filterState filterParam st
mkGenerator = Stage
  (WithGenerator
     dpDefinition
     (Filter dpDefinition filterState filterParam st)
     (DP st))
-> Filter dpDefinition filterState filterParam st
-> GeneratorStage dpDefinition filterState filterParam st
forall k dpDefinition filterState filterParam (st :: k).
Stage
  (WithGenerator
     dpDefinition
     (Filter dpDefinition filterState filterParam st)
     (DP st))
-> Filter dpDefinition filterState filterParam st
-> GeneratorStage dpDefinition filterState filterParam st
GeneratorStage

-- | Smart Constructor of 'Filter'.
{-# INLINE mkFilter #-}
mkFilter :: forall dpDefinition filterState filterParam st. 
            WithFilter dpDefinition filterParam (StateT filterState (DP st)) -- ^Associated type family to Generate Function Signature
         -> Filter dpDefinition filterState filterParam st
mkFilter :: WithFilter dpDefinition filterParam (StateT filterState (DP st))
-> Filter dpDefinition filterState filterParam st
mkFilter = NonEmpty
  (Actor
     dpDefinition filterState filterParam (StateT filterState (DP st)))
-> Filter dpDefinition filterState filterParam st
forall k dpDefinition filterState filterParam (st :: k).
NonEmpty
  (Actor
     dpDefinition filterState filterParam (StateT filterState (DP st)))
-> Filter dpDefinition filterState filterParam st
Filter (NonEmpty
   (Actor
      dpDefinition filterState filterParam (StateT filterState (DP st)))
 -> Filter dpDefinition filterState filterParam st)
-> (WithFilter
      dpDefinition filterParam (StateT filterState (DP st))
    -> NonEmpty
         (Actor
            dpDefinition filterState filterParam (StateT filterState (DP st))))
-> WithFilter dpDefinition filterParam (StateT filterState (DP st))
-> Filter dpDefinition filterState filterParam st
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WithFilter dpDefinition filterParam (StateT filterState (DP st))
-> NonEmpty
     (Actor
        dpDefinition filterState filterParam (StateT filterState (DP st)))
forall k dpDefinition filterState filterParam (st :: k).
WithFilter dpDefinition filterParam (StateT filterState (DP st))
-> NonEmpty
     (Actor
        dpDefinition filterState filterParam (StateT filterState (DP st)))
single

-- | Smart Constructor of Single 'Actor' Wrapped in 'NonEmpty' List.  
{-# INLINE single #-}
single :: forall dpDefinition filterState filterParam st. 
          WithFilter dpDefinition filterParam (StateT filterState (DP st)) -- ^Associated type family to Generate Function Signature
       -> NonEmpty (Actor dpDefinition filterState filterParam (StateT filterState (DP st)))
single :: WithFilter dpDefinition filterParam (StateT filterState (DP st))
-> NonEmpty
     (Actor
        dpDefinition filterState filterParam (StateT filterState (DP st)))
single = Actor
  dpDefinition filterState filterParam (StateT filterState (DP st))
-> NonEmpty
     (Actor
        dpDefinition filterState filterParam (StateT filterState (DP st)))
forall x. One x => OneItem x -> x
one (Actor
   dpDefinition filterState filterParam (StateT filterState (DP st))
 -> NonEmpty
      (Actor
         dpDefinition filterState filterParam (StateT filterState (DP st))))
-> (WithFilter
      dpDefinition filterParam (StateT filterState (DP st))
    -> Actor
         dpDefinition filterState filterParam (StateT filterState (DP st)))
-> WithFilter dpDefinition filterParam (StateT filterState (DP st))
-> NonEmpty
     (Actor
        dpDefinition filterState filterParam (StateT filterState (DP st)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WithFilter dpDefinition filterParam (StateT filterState (DP st))
-> Actor
     dpDefinition filterState filterParam (StateT filterState (DP st))
forall k dpDefinition filterState filterParam (st :: k).
WithFilter dpDefinition filterParam (StateT filterState (DP st))
-> Actor
     dpDefinition filterState filterParam (StateT filterState (DP st))
actor

-- | Smart Constructor of 'Actor'.
{-# INLINE actor #-}
actor :: forall dpDefinition filterState filterParam st.
         WithFilter dpDefinition filterParam (StateT filterState (DP st)) -- ^Associated type family to Generate Function Signature
      -> Actor dpDefinition filterState filterParam (StateT filterState (DP st))
actor :: WithFilter dpDefinition filterParam (StateT filterState (DP st))
-> Actor
     dpDefinition filterState filterParam (StateT filterState (DP st))
actor = Stage
  (WithFilter dpDefinition filterParam (StateT filterState (DP st)))
-> Actor
     dpDefinition filterState filterParam (StateT filterState (DP st))
forall dpDefinition filterState filterParam
       (monadicAction :: * -> *).
(MonadState filterState monadicAction =>
 Stage (WithFilter dpDefinition filterParam monadicAction))
-> Actor dpDefinition filterState filterParam monadicAction
Actor (Stage
   (WithFilter dpDefinition filterParam (StateT filterState (DP st)))
 -> Actor
      dpDefinition filterState filterParam (StateT filterState (DP st)))
-> (WithFilter
      dpDefinition filterParam (StateT filterState (DP st))
    -> Stage
         (WithFilter dpDefinition filterParam (StateT filterState (DP st))))
-> WithFilter dpDefinition filterParam (StateT filterState (DP st))
-> Actor
     dpDefinition filterState filterParam (StateT filterState (DP st))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WithFilter dpDefinition filterParam (StateT filterState (DP st))
-> Stage
     (WithFilter dpDefinition filterParam (StateT filterState (DP st)))
forall a. a -> Stage a
mkStage' @(WithFilter dpDefinition filterParam (StateT filterState (DP st)))

-- | Combinator to build 'Filter' in a /DSL/ approach.
-- Add a new 'Actor' to an already existing 'Filter'.
{-# INLINE (|>>>) #-}
(|>>>) :: forall dpDefinition filterState filterParam st. 
          Actor dpDefinition filterState filterParam (StateT filterState (DP st)) -- ^New 'Actor' to put on front
       -> Filter dpDefinition filterState filterParam st -- ^Existing 'Filter'
       -> Filter dpDefinition filterState filterParam st
|>>> :: Actor
  dpDefinition filterState filterParam (StateT filterState (DP st))
-> Filter dpDefinition filterState filterParam st
-> Filter dpDefinition filterState filterParam st
(|>>>) a :: Actor
  dpDefinition filterState filterParam (StateT filterState (DP st))
a f :: Filter dpDefinition filterState filterParam st
f = Filter dpDefinition filterState filterParam st
f Filter dpDefinition filterState filterParam st
-> (Filter dpDefinition filterState filterParam st
    -> Filter dpDefinition filterState filterParam st)
-> Filter dpDefinition filterState filterParam st
forall a b. a -> (a -> b) -> b
& (NonEmpty
   (Actor
      dpDefinition filterState filterParam (StateT filterState (DP st)))
 -> Identity
      (NonEmpty
         (Actor
            dpDefinition
            filterState
            filterParam
            (StateT filterState (DP st)))))
-> Filter dpDefinition filterState filterParam st
-> Identity (Filter dpDefinition filterState filterParam st)
forall s. Wrapped s => Iso' s (Unwrapped s)
_Wrapped' ((NonEmpty
    (Actor
       dpDefinition filterState filterParam (StateT filterState (DP st)))
  -> Identity
       (NonEmpty
          (Actor
             dpDefinition
             filterState
             filterParam
             (StateT filterState (DP st)))))
 -> Filter dpDefinition filterState filterParam st
 -> Identity (Filter dpDefinition filterState filterParam st))
-> (NonEmpty
      (Actor
         dpDefinition filterState filterParam (StateT filterState (DP st)))
    -> NonEmpty
         (Actor
            dpDefinition filterState filterParam (StateT filterState (DP st))))
-> Filter dpDefinition filterState filterParam st
-> Filter dpDefinition filterState filterParam st
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
%~ (Actor
  dpDefinition filterState filterParam (StateT filterState (DP st))
a Actor
  dpDefinition filterState filterParam (StateT filterState (DP st))
-> NonEmpty
     (Actor
        dpDefinition filterState filterParam (StateT filterState (DP st)))
-> NonEmpty
     (Actor
        dpDefinition filterState filterParam (StateT filterState (DP st)))
forall a. a -> NonEmpty a -> NonEmpty a
<|)
infixr 5 |>>>

-- | Combinator to build 'Filter' in a /DSL/ approach .
-- Given 2 'Actor's build a 'Filter'.
{-# INLINE (|>>) #-}
(|>>) :: forall dpDefinition filterState filterParam st. 
         Actor dpDefinition filterState filterParam (StateT filterState (DP st)) -- ^'Actor' 1
      -> Actor dpDefinition filterState filterParam (StateT filterState (DP st)) -- ^'Actor' 2
      -> Filter dpDefinition filterState filterParam st
|>> :: Actor
  dpDefinition filterState filterParam (StateT filterState (DP st))
-> Actor
     dpDefinition filterState filterParam (StateT filterState (DP st))
-> Filter dpDefinition filterState filterParam st
(|>>) a1 :: Actor
  dpDefinition filterState filterParam (StateT filterState (DP st))
a1 a2 :: Actor
  dpDefinition filterState filterParam (StateT filterState (DP st))
a2 = NonEmpty
  (Actor
     dpDefinition filterState filterParam (StateT filterState (DP st)))
-> Filter dpDefinition filterState filterParam st
forall k dpDefinition filterState filterParam (st :: k).
NonEmpty
  (Actor
     dpDefinition filterState filterParam (StateT filterState (DP st)))
-> Filter dpDefinition filterState filterParam st
Filter (Actor
  dpDefinition filterState filterParam (StateT filterState (DP st))
a1 Actor
  dpDefinition filterState filterParam (StateT filterState (DP st))
-> NonEmpty
     (Actor
        dpDefinition filterState filterParam (StateT filterState (DP st)))
-> NonEmpty
     (Actor
        dpDefinition filterState filterParam (StateT filterState (DP st)))
forall a. a -> NonEmpty a -> NonEmpty a
<|OneItem
  (NonEmpty
     (Actor
        dpDefinition filterState filterParam (StateT filterState (DP st))))
-> NonEmpty
     (Actor
        dpDefinition filterState filterParam (StateT filterState (DP st)))
forall x. One x => OneItem x -> x
one OneItem
  (NonEmpty
     (Actor
        dpDefinition filterState filterParam (StateT filterState (DP st))))
Actor
  dpDefinition filterState filterParam (StateT filterState (DP st))
a2)
infixr 5 |>>

{-# INLINE runActor #-}
runActor :: ( MonadState filterState monadicAction
            , HCurry' n (WithFilter dpDefinition filterParam monadicAction) xs r
            , ArityFwd (WithFilter dpDefinition filterParam monadicAction) n
            , ArityRev (WithFilter dpDefinition filterParam monadicAction) n
            ) => Actor dpDefinition filterState filterParam monadicAction -> HList xs -> r
runActor :: Actor dpDefinition filterState filterParam monadicAction
-> HList xs -> r
runActor = WithFilter dpDefinition filterParam monadicAction -> HList xs -> r
forall (n :: HNat) f (xs :: [*]) r.
(HCurry' n f xs r, ArityFwd f n, ArityRev f n) =>
f -> HList xs -> r
hUncurry (WithFilter dpDefinition filterParam monadicAction
 -> HList xs -> r)
-> (Actor dpDefinition filterState filterParam monadicAction
    -> WithFilter dpDefinition filterParam monadicAction)
-> Actor dpDefinition filterState filterParam monadicAction
-> HList xs
-> r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stage (WithFilter dpDefinition filterParam monadicAction)
-> WithFilter dpDefinition filterParam monadicAction
forall l t. EvalC l t => l -> t
run (Stage (WithFilter dpDefinition filterParam monadicAction)
 -> WithFilter dpDefinition filterParam monadicAction)
-> (Actor dpDefinition filterState filterParam monadicAction
    -> Stage (WithFilter dpDefinition filterParam monadicAction))
-> Actor dpDefinition filterState filterParam monadicAction
-> WithFilter dpDefinition filterParam monadicAction
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Actor dpDefinition filterState filterParam monadicAction
-> Stage (WithFilter dpDefinition filterParam monadicAction)
forall dpDefinition filterState filterParam
       (monadicAction :: * -> *).
Actor dpDefinition filterState filterParam monadicAction
-> MonadState filterState monadicAction =>
   Stage (WithFilter dpDefinition filterParam monadicAction)
unActor

{-# INLINE runFilter #-}
runFilter :: ( CloseList ss
             , HCurry' n (WithFilter dpDefinition filterParam (StateT filterState (DP st))) xs (StateT filterState2 (DP st) ())
             , ArityFwd (WithFilter dpDefinition filterParam (StateT filterState (DP st))) n
             , ArityRev (WithFilter dpDefinition filterParam (StateT filterState (DP st))) n
             ) => Filter dpDefinition filterState filterParam st -> filterState2 -> HList xs -> HList ss -> DP st (Async ())
runFilter :: Filter dpDefinition filterState filterParam st
-> filterState2 -> HList xs -> HList ss -> DP st (Async ())
runFilter f :: Filter dpDefinition filterState filterParam st
f s :: filterState2
s clist :: HList xs
clist cClose :: HList ss
cClose = IO (Async ()) -> DP st (Async ())
forall k (st :: k) a. IO a -> DP st a
DP (IO (Async ()) -> DP st (Async ()))
-> IO (Async ()) -> DP st (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
  IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ())
-> (Filter dpDefinition filterState filterParam st -> IO ())
-> Filter dpDefinition filterState filterParam st
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DP st () -> IO ()
forall k (st :: k) a. DP st a -> IO a
runStage (DP st () -> IO ())
-> (Filter dpDefinition filterState filterParam st -> DP st ())
-> Filter dpDefinition filterState filterParam st
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StateT filterState2 (DP st) () -> filterState2 -> DP st ())
-> filterState2 -> StateT filterState2 (DP st) () -> DP st ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT filterState2 (DP st) () -> filterState2 -> DP st ()
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a
evalStateT filterState2
s  (StateT filterState2 (DP st) () -> DP st ())
-> (Filter dpDefinition filterState filterParam st
    -> StateT filterState2 (DP st) ())
-> Filter dpDefinition filterState filterParam st
-> DP st ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Actor
   dpDefinition filterState filterParam (StateT filterState (DP st))
 -> StateT filterState2 (DP st) ())
-> NonEmpty
     (Actor
        dpDefinition filterState filterParam (StateT filterState (DP st)))
-> StateT filterState2 (DP st) ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Actor
  dpDefinition filterState filterParam (StateT filterState (DP st))
-> HList xs -> StateT filterState2 (DP st) ()
forall filterState (monadicAction :: * -> *) (n :: HNat)
       dpDefinition filterParam (xs :: [*]) r.
(MonadState filterState monadicAction,
 HCurry' n (WithFilter dpDefinition filterParam monadicAction) xs r,
 ArityFwd (WithFilter dpDefinition filterParam monadicAction) n,
 ArityRev (WithFilter dpDefinition filterParam monadicAction) n) =>
Actor dpDefinition filterState filterParam monadicAction
-> HList xs -> r
`runActor` HList xs
clist) (NonEmpty
   (Actor
      dpDefinition filterState filterParam (StateT filterState (DP st)))
 -> StateT filterState2 (DP st) ())
-> (Filter dpDefinition filterState filterParam st
    -> NonEmpty
         (Actor
            dpDefinition filterState filterParam (StateT filterState (DP st))))
-> Filter dpDefinition filterState filterParam st
-> StateT filterState2 (DP st) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Filter dpDefinition filterState filterParam st
-> NonEmpty
     (Actor
        dpDefinition filterState filterParam (StateT filterState (DP st)))
forall dpDefinition filterState filterParam k (st :: k).
Filter dpDefinition filterState filterParam st
-> NonEmpty
     (Actor
        dpDefinition filterState filterParam (StateT filterState (DP st)))
unFilter (Filter dpDefinition filterState filterParam st -> IO ())
-> Filter dpDefinition filterState filterParam st -> IO ()
forall a b. (a -> b) -> a -> b
$ Filter dpDefinition filterState filterParam st
f
  HList ss -> IO ()
forall (xs :: [*]). CloseList xs => HList xs -> IO ()
closeList HList ss
cClose

-- | Combinator for Building a 'Source' Stage. It uses an Associated Type Class to deduce the Function Signature required to the user
-- taken from /DP/ Type Level Flow Definition 
-- [@dpDefinition ~ 'Source' ('Channel' ..) ':=>' 'Generator' ('Channel' ..) ':=>' 'Sink'@]: /DP/ Type level Flow Definition
--
-- [@st@]: Existential Scope of 'DP' 'Monad'.
{-# INLINE withSource #-}
withSource :: forall (dpDefinition :: Type) st. 
              WithSource dpDefinition (DP st) -- ^Associated type family to Generate Function Signature
           -> Stage (WithSource dpDefinition (DP st))
withSource :: WithSource dpDefinition (DP st)
-> Stage (WithSource dpDefinition (DP st))
withSource = WithSource dpDefinition (DP st)
-> Stage (WithSource dpDefinition (DP st))
forall a. a -> Stage a
mkStage' @(WithSource dpDefinition (DP st))

-- | Combinator for Building a 'Generator' Stage. It uses an Associated Type Class to deduce the Function Signature required to the user
-- taken from /DP/ Type Level Flow Definition 
-- [@dpDefinition ~ 'Source' ('Channel' ..) ':=>' 'Generator' ('Channel' ..) ':=>' 'Sink'@]: /DP/ Type level Flow Definition
--
-- [@filter@]: 'Filter' template type
--
-- [@st@]: Existential Scope of 'DP' 'Monad'.
{-# INLINE withGenerator #-}
withGenerator :: forall (dpDefinition :: Type) (filter :: Type) st. 
                 WithGenerator dpDefinition filter (DP st) -- ^Associated type family to Generate Function Signature 
              -> Stage (WithGenerator dpDefinition filter (DP st))
withGenerator :: WithGenerator dpDefinition filter (DP st)
-> Stage (WithGenerator dpDefinition filter (DP st))
withGenerator = WithGenerator dpDefinition filter (DP st)
-> Stage (WithGenerator dpDefinition filter (DP st))
forall a. a -> Stage a
mkStage' @(WithGenerator dpDefinition filter (DP st))

-- | Combinator for Building a 'Sink' Stage. It uses an Associated Type Class to deduce the Function Signature required to the user
-- taken from /DP/ Type Level Flow Definition 
-- [@dpDefinition ~ 'Source' ('Channel' ..) ':=>' 'Generator' ('Channel' ..) ':=>' 'Sink'@]: /DP/ Type level Flow Definition
--
-- [@st@]: Existential Scope of 'DP' 'Monad'.
{-# INLINE withSink #-}
withSink :: forall (dpDefinition :: Type) st. 
            WithSink dpDefinition (DP st)  -- ^Associated type family to Generate Function Signature 
           -> Stage (WithSink dpDefinition (DP st))
withSink :: WithSink dpDefinition (DP st)
-> Stage (WithSink dpDefinition (DP st))
withSink = WithSink dpDefinition (DP st)
-> Stage (WithSink dpDefinition (DP st))
forall a. a -> Stage a
mkStage' @(WithSink dpDefinition (DP st))

{-# INLINE mkDP' #-}
mkDP' :: forall dpDefinition filterState filterParam st.
         Stage (WithSource dpDefinition (DP st))
      -> GeneratorStage dpDefinition filterState filterParam st
      -> Stage (WithSink dpDefinition (DP st))
      -> DynamicPipeline dpDefinition filterState filterParam st
mkDP' :: Stage (WithSource dpDefinition (DP st))
-> GeneratorStage dpDefinition filterState filterParam st
-> Stage (WithSink dpDefinition (DP st))
-> DynamicPipeline dpDefinition filterState filterParam st
mkDP' = forall filterState filterParam (st :: k).
Stage (WithSource dpDefinition (DP st))
-> GeneratorStage dpDefinition filterState filterParam st
-> Stage (WithSink dpDefinition (DP st))
-> DynamicPipeline dpDefinition filterState filterParam st
forall k dpDefinition filterState filterParam (st :: k).
Stage (WithSource dpDefinition (DP st))
-> GeneratorStage dpDefinition filterState filterParam st
-> Stage (WithSink dpDefinition (DP st))
-> DynamicPipeline dpDefinition filterState filterParam st
DynamicPipeline @dpDefinition

-- Hiding DP Constraint for running DP
type DPConstraint dpDefinition filterState st filterParam filter gparams slr slw glr glw silr silw iparams oparams ls lsi 
  = ( MkChans dpDefinition
    , HChan dpDefinition ~ ChanRecord slr slw glr glw silr silw
    , HAppendList slr slw
    , HAppendList glr glw
    , HAppendList silr silw
    , Filter dpDefinition filterState filterParam st ~ filter
    , ls ~ HAppendListR slr slw
    , lsi ~ HAppendListR silr silw
    , CloseList ls
    , CloseList lsi
    , CloseList (HAppendListR glr glw)
    , iparams ~ WithSource dpDefinition (DP st)
    , gparams ~ WithGenerator dpDefinition filter (DP st)
    , oparams ~ WithSink dpDefinition (DP st)
    , ArityRev iparams (HLength (ExpandSourceToCh dpDefinition))
    , ArityFwd iparams (HLength (ExpandSourceToCh dpDefinition))
    , HCurry' (HLength (ExpandSourceToCh dpDefinition)) iparams ls (DP st ())
    , ArityRev gparams (HLength (ExpandGenToCh dpDefinition filter))
    , ArityFwd gparams (HLength (ExpandGenToCh dpDefinition filter))
    , HCurry' (HLength (ExpandGenToCh dpDefinition filter)) gparams (filter ': HAppendListR glr glw) (DP st ())
    , ArityRev oparams (HLength (ExpandSinkToCh dpDefinition))
    , ArityFwd oparams (HLength (ExpandSinkToCh dpDefinition))
    , HCurry' (HLength (ExpandSinkToCh dpDefinition)) oparams lsi (DP st ())
    )

{-# INLINE buildDPProg #-}
buildDPProg :: forall dpDefinition filterState st filterParam filter gparams slr slw glr glw silr silw iparams oparams ls lsi.
            DPConstraint dpDefinition filterState st filterParam filter gparams slr slw glr glw silr silw iparams oparams ls lsi => 
            DynamicPipeline dpDefinition filterState filterParam st -> DP st ()
buildDPProg :: DynamicPipeline dpDefinition filterState filterParam st -> DP st ()
buildDPProg DynamicPipeline{..} = do
  (cIns :: HList ls
cIns, cGen :: HList (HAppendListR glr glw)
cGen, cOut :: HList lsi
cOut) <- ChanRecord slr slw glr glw silr silw
-> (HList ls, HList (HAppendListR glr glw), HList lsi)
forall (slr :: [*]) (slw :: [*]) (glr :: [*]) (glw :: [*])
       (silr :: [*]) (silw :: [*]).
(HAppendList slr slw, HAppendList glr glw,
 HAppendList silr silw) =>
ChanRecord slr slw glr glw silr silw
-> (HList (HAppendListR slr slw), HList (HAppendListR glr glw),
    HList (HAppendListR silr silw))
inGenOut (ChanRecord slr slw glr glw silr silw
 -> (HList ls, HList (HAppendListR glr glw), HList lsi))
-> DP st (ChanRecord slr slw glr glw silr silw)
-> DP st (HList ls, HList (HAppendListR glr glw), HList lsi)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (ChanRecord slr slw glr glw silr silw)
-> DP st (ChanRecord slr slw glr glw silr silw)
forall k a (s :: k). IO a -> DP s a
withDP (forall (slr :: [*]) (slw :: [*]) (glr :: [*]) (glw :: [*])
       (silr :: [*]) (silw :: [*]).
(MkChans dpDefinition,
 HChan dpDefinition ~ ChanRecord slr slw glr glw silr silw) =>
IO (ChanRecord slr slw glr glw silr silw)
forall a (slr :: [*]) (slw :: [*]) (glr :: [*]) (glw :: [*])
       (silr :: [*]) (silw :: [*]).
(MkChans a, HChan a ~ ChanRecord slr slw glr glw silr silw) =>
IO (ChanRecord slr slw glr glw silr silw)
makeChans @dpDefinition)
  let genWithFilter :: HExtendR
  (Filter dpDefinition filterState filterParam st)
  (HList (HAppendListR glr glw))
genWithFilter   = GeneratorStage dpDefinition filterState filterParam st
-> Filter dpDefinition filterState filterParam st
forall dpDefinition filterState filterParam k (st :: k).
GeneratorStage dpDefinition filterState filterParam st
-> Filter dpDefinition filterState filterParam st
_gsFilterTemplate GeneratorStage dpDefinition filterState filterParam st
generator Filter dpDefinition filterState filterParam st
-> HList (HAppendListR glr glw)
-> HExtendR
     (Filter dpDefinition filterState filterParam st)
     (HList (HAppendListR glr glw))
forall e l. HExtend e l => e -> l -> HExtendR e l
.*. HList (HAppendListR glr glw)
cGen
  Stage iparams -> HList ls -> DP st (Async ())
forall k (n :: HNat) f (xs :: [*]) (s :: k).
(HCurry' n f xs (DP s ()), ArityFwd f n, ArityRev f n,
 CloseList xs) =>
Stage f -> HList xs -> DP s (Async ())
runStageWith Stage iparams
Stage (WithSource dpDefinition (DP st))
source HList ls
cIns
    DP st (Async ()) -> DP st (Async ()) -> DP st (Async ())
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Stage gparams
-> HList
     (Filter dpDefinition filterState filterParam st
        : HAppendListR glr glw)
-> HList (HAppendListR glr glw)
-> DP st (Async ())
forall k (n :: HNat) f (xs :: [*]) (ss :: [*]) (s :: k).
(HCurry' n f xs (DP s ()), ArityFwd f n, ArityRev f n,
 CloseList ss) =>
Stage f -> HList xs -> HList ss -> DP s (Async ())
runStageWith' @(HLength (ExpandGenToCh dpDefinition filter)) @gparams (GeneratorStage dpDefinition filterState filterParam st
-> Stage
     (WithGenerator
        dpDefinition
        (Filter dpDefinition filterState filterParam st)
        (DP st))
forall dpDefinition filterState filterParam k (st :: k).
GeneratorStage dpDefinition filterState filterParam st
-> Stage
     (WithGenerator
        dpDefinition
        (Filter dpDefinition filterState filterParam st)
        (DP st))
_gsGenerator GeneratorStage dpDefinition filterState filterParam st
generator) HList
  (Filter dpDefinition filterState filterParam st
     : HAppendListR glr glw)
HExtendR
  (Filter dpDefinition filterState filterParam st)
  (HList (HAppendListR glr glw))
genWithFilter HList (HAppendListR glr glw)
cGen
    DP st (Async ()) -> DP st (Async ()) -> DP st (Async ())
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Stage oparams -> HList lsi -> DP st (Async ())
forall k (n :: HNat) f (xs :: [*]) (s :: k).
(HCurry' n f xs (DP s ()), ArityFwd f n, ArityRev f n,
 CloseList xs) =>
Stage f -> HList xs -> DP s (Async ())
runStageWith Stage oparams
Stage (WithSink dpDefinition (DP st))
sink HList lsi
cOut DP st (Async ()) -> (Async () -> DP st ()) -> DP st ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> DP st ()
forall k (st :: k) a. IO a -> DP st a
DP (IO () -> DP st ()) -> (Async () -> IO ()) -> Async () -> DP st ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async () -> IO ()
forall a. Async a -> IO a
wait

-- | Smart constructor for 'DynamicPipeline' Definition
{-# INLINE mkDP #-}
mkDP :: forall dpDefinition filterState st filterParam filter gparams slr slw glr glw silr silw iparams oparams ls lsi.
        DPConstraint dpDefinition filterState st filterParam filter gparams slr slw glr glw silr silw iparams oparams ls lsi
     => Stage (WithSource dpDefinition (DP st)) -- ^ 'Source' Stage generated by 'withSource' combinator
     -> GeneratorStage dpDefinition filterState filterParam st  -- ^ 'Generator' Stage generated by 'withGenerator' combinator
     -> Stage (WithSink dpDefinition (DP st))   -- ^ 'Sink' Stage generated by 'withSink' combinator
     -> DP st ()
mkDP :: Stage (WithSource dpDefinition (DP st))
-> GeneratorStage dpDefinition filterState filterParam st
-> Stage (WithSink dpDefinition (DP st))
-> DP st ()
mkDP inS :: Stage (WithSource dpDefinition (DP st))
inS gS :: GeneratorStage dpDefinition filterState filterParam st
gS oS :: Stage (WithSink dpDefinition (DP st))
oS = DynamicPipeline dpDefinition filterState filterParam st -> DP st ()
forall k dpDefinition filterState (st :: k) filterParam filter
       gparams (slr :: [*]) (slw :: [*]) (glr :: [*]) (glw :: [*])
       (silr :: [*]) (silw :: [*]) iparams oparams (ls :: [*])
       (lsi :: [*]).
DPConstraint
  dpDefinition
  filterState
  st
  filterParam
  filter
  gparams
  slr
  slw
  glr
  glw
  silr
  silw
  iparams
  oparams
  ls
  lsi =>
DynamicPipeline dpDefinition filterState filterParam st -> DP st ()
buildDPProg (Stage (WithSource dpDefinition (DP st))
-> GeneratorStage dpDefinition filterState filterParam st
-> Stage (WithSink dpDefinition (DP st))
-> DynamicPipeline dpDefinition filterState filterParam st
forall k dpDefinition filterState filterParam (st :: k).
Stage (WithSource dpDefinition (DP st))
-> GeneratorStage dpDefinition filterState filterParam st
-> Stage (WithSink dpDefinition (DP st))
-> DynamicPipeline dpDefinition filterState filterParam st
mkDP' Stage (WithSource dpDefinition (DP st))
inS GeneratorStage dpDefinition filterState filterParam st
gS Stage (WithSink dpDefinition (DP st))
oS)

-- | Run 'DP' 'Monad' to final 'IO' result
{-# INLINE runDP #-}
runDP :: (forall st. DP st a) -> IO a
runDP :: (forall (st :: k). DP st a) -> IO a
runDP = (forall (st :: k). DP st a) -> IO a
forall k (st :: k) a. DP st a -> IO a
runStage

-- Closable Automatic Write Channels
data NotClose (a :: Type)

class CloseList xs where
  closeList :: HList xs -> IO ()

instance (IsClosable x, CloseList xs) => CloseList (x ': xs) where
  closeList :: HList (x : xs) -> IO ()
closeList (HCons x xs) = x -> IO ()
forall f. IsClosable f => f -> IO ()
close x
x IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> HList xs -> IO ()
forall (xs :: [*]). CloseList xs => HList xs -> IO ()
closeList HList xs
xs

instance CloseList '[] where
  closeList :: HList '[] -> IO ()
closeList _ = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

class IsClosable f where
  close :: f -> IO ()

instance IsClosable (WriteChannel a) where
  close :: WriteChannel a -> IO ()
close = WriteChannel a -> IO ()
forall a. WriteChannel a -> IO ()
end

instance IsClosable (ReadChannel a) where
  close :: ReadChannel a -> IO ()
close = IO () -> ReadChannel a -> IO ()
forall a b. a -> b -> a
const (IO () -> ReadChannel a -> IO ())
-> IO () -> ReadChannel a -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

-- | 'SpawnFilterConstraint' Constraint type alias
type SpawnFilterConstraint dpDefinition readElem st filterState filterParam l l1 l2 l3 b2 b3 l4 =
                ( MkChans (ChansFilter dpDefinition)
                , l1 ~ l
                , CloseList (ReadChannel readElem ': l4)
                , HAppendList l l3
                , l4 ~ HAppendListR l l3
                , l2 ~ (readElem ': ReadChannel readElem ': l4)
                , HChan (ChansFilter dpDefinition) ~ InOutChan (ReadChannel readElem : l) l3
                , WithFilter dpDefinition filterParam (StateT filterState (DP st)) ~ (b2 -> ReadChannel b2 -> b3)
                , HLength (ExpandFilterToCh dpDefinition filterParam) ~ HLength l2
                , HCurry' (HLength l2) (WithFilter dpDefinition filterParam (StateT filterState (DP st))) l2 (StateT filterState (DP st) ())
                , ArityFwd (WithFilter dpDefinition filterParam (StateT filterState (DP st))) (HLength (ExpandFilterToCh dpDefinition filterParam))
                , ArityRev b3 (HLength l4)
                )

-- | 'UnFoldFilter' is a wrapper Data Type that contains all the information needed to spawn 'Filter' instances according to /DPP/.
-- The user will have the capability to select how those filters are going to be spawned, for example on each read element, how to setup
-- initial states of 'StateT' Monad on 'Actor' computations in filters, among others.
--
-- [@dpDefinition ~ 'Source' ('Channel' ..) ':=>' 'Generator' ('Channel' ..) ':=>' 'Sink'@]: /DP/ Type level Flow Definition
--
-- [@readElem@]: Type of the element that is being read from the Selected Channel in the 'Generator' Stage
--
-- [@st@]: Existential Scope of 'DP' 'Monad'.
--
-- [@filterState@]: State of the 'StateT' 'Monad' that is the local State of the Filter execution
--
-- [@filterParam@]: Type of the First Parameter that is pass to the Filter when it is created by the Generator /Anamorphism/. Generator can change the type received from the Reader Channels.
--
data UnFoldFilter dpDefinition readElem st filterState filterParam l = 
  UnFoldFilter 
    { UnFoldFilter dpDefinition readElem st filterState filterParam l
-> readElem -> Bool
_ufSpawnIf :: readElem -> Bool -- ^Given a new Element determine if we need to interpose a new Filter or not
    , UnFoldFilter dpDefinition readElem st filterState filterParam l
-> readElem -> DP st ()
_ufOnElem :: readElem -> DP st () -- ^For each element that the Filter is consuming allow to do something outside the filter with that element. For example trace or debug
    , UnFoldFilter dpDefinition readElem st filterState filterParam l
-> Filter dpDefinition filterState filterParam st
_ufFilter :: Filter dpDefinition filterState filterParam st  -- ^'Filter' Template
    , UnFoldFilter dpDefinition readElem st filterState filterParam l
-> readElem -> filterState
_ufInitState :: readElem -> filterState -- ^Given the First element in this Filter Instance how to Initiate Internal 'Filter' 'StateT' (Memory)
    , UnFoldFilter dpDefinition readElem st filterState filterParam l
-> ReadChannel readElem
_ufReadChannel :: ReadChannel readElem -- ^Main 'ReadChannel' to feed filter
    , UnFoldFilter dpDefinition readElem st filterState filterParam l
-> HList l
_ufRsChannels :: HList l -- ^'HList' with the rest of the ReadChannels if There are needed or 'HNil' if it only contians 1 read channel
    }
    
-- | Smart Constructor for 'UnFoldFilter'
mkUnfoldFilter :: (readElem -> Bool) -- ^Given a new Element determine if we need to interpose a new Filter or not
               -> (readElem -> DP st ()) -- ^For each element that the Filter is consuming allow to do something outside the filter with that element. For example trace or debug
               -> Filter dpDefinition filterState filterParam st  -- ^'Filter' Template
               -> (readElem -> filterState) -- ^Given the First element in this Filter Instance how to Initiate Internal 'Filter' 'StateT' (Memory)
               -> ReadChannel readElem -- ^Main 'ReadChannel' to feed filter
               -> HList l -- ^'HList' with the rest of the ReadChannels if There are needed or 'HNil' if it only contians 1 read channel
               -> UnFoldFilter dpDefinition readElem st filterState filterParam l
mkUnfoldFilter :: (readElem -> Bool)
-> (readElem -> DP st ())
-> Filter dpDefinition filterState filterParam st
-> (readElem -> filterState)
-> ReadChannel readElem
-> HList l
-> UnFoldFilter dpDefinition readElem st filterState filterParam l
mkUnfoldFilter = (readElem -> Bool)
-> (readElem -> DP st ())
-> Filter dpDefinition filterState filterParam st
-> (readElem -> filterState)
-> ReadChannel readElem
-> HList l
-> UnFoldFilter dpDefinition readElem st filterState filterParam l
forall k dpDefinition readElem (st :: k) filterState filterParam
       (l :: [*]).
(readElem -> Bool)
-> (readElem -> DP st ())
-> Filter dpDefinition filterState filterParam st
-> (readElem -> filterState)
-> ReadChannel readElem
-> HList l
-> UnFoldFilter dpDefinition readElem st filterState filterParam l
UnFoldFilter

-- | Smart Constructor for 'UnFoldFilter' which bypass to do something externally on each read element
mkUnfoldFilter' :: (readElem -> Bool) -- ^ 
                -> Filter dpDefinition filterState filterParam st -- ^ 
                -> (readElem -> filterState) -- ^ 
                -> ReadChannel readElem -- ^ 
                -> HList l -- ^ 
                -> UnFoldFilter dpDefinition readElem st filterState filterParam l
mkUnfoldFilter' :: (readElem -> Bool)
-> Filter dpDefinition filterState filterParam st
-> (readElem -> filterState)
-> ReadChannel readElem
-> HList l
-> UnFoldFilter dpDefinition readElem st filterState filterParam l
mkUnfoldFilter' spawnIf :: readElem -> Bool
spawnIf = (readElem -> Bool)
-> (readElem -> DP st ())
-> Filter dpDefinition filterState filterParam st
-> (readElem -> filterState)
-> ReadChannel readElem
-> HList l
-> UnFoldFilter dpDefinition readElem st filterState filterParam l
forall k readElem (st :: k) dpDefinition filterState filterParam
       (l :: [*]).
(readElem -> Bool)
-> (readElem -> DP st ())
-> Filter dpDefinition filterState filterParam st
-> (readElem -> filterState)
-> ReadChannel readElem
-> HList l
-> UnFoldFilter dpDefinition readElem st filterState filterParam l
mkUnfoldFilter readElem -> Bool
spawnIf (DP st () -> readElem -> DP st ()
forall a b. a -> b -> a
const (DP st () -> readElem -> DP st ())
-> DP st () -> readElem -> DP st ()
forall a b. (a -> b) -> a -> b
$ () -> DP st ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())

-- | Smart Constructor for 'UnFoldFilter' That creates a 'Filter' for each element on the Read Channel and interpose on Front of 'Generator' Stage
-- and Last 'Filter'
--
-- @ Source ---> Filter1 ---> Filter2 ... ---> FilterN ---> Generator ---> Sink @
--
mkUnfoldFilterForAll :: Filter dpDefinition filterState filterParam st -- ^ 
                     -> (readElem -> filterState) -- ^ 
                     -> ReadChannel readElem -- ^ 
                     -> HList l -- ^ 
                     -> UnFoldFilter dpDefinition readElem st filterState filterParam l
mkUnfoldFilterForAll :: Filter dpDefinition filterState filterParam st
-> (readElem -> filterState)
-> ReadChannel readElem
-> HList l
-> UnFoldFilter dpDefinition readElem st filterState filterParam l
mkUnfoldFilterForAll = (readElem -> Bool)
-> Filter dpDefinition filterState filterParam st
-> (readElem -> filterState)
-> ReadChannel readElem
-> HList l
-> UnFoldFilter dpDefinition readElem st filterState filterParam l
forall k readElem dpDefinition filterState filterParam (st :: k)
       (l :: [*]).
(readElem -> Bool)
-> Filter dpDefinition filterState filterParam st
-> (readElem -> filterState)
-> ReadChannel readElem
-> HList l
-> UnFoldFilter dpDefinition readElem st filterState filterParam l
mkUnfoldFilter' (Bool -> readElem -> Bool
forall a b. a -> b -> a
const Bool
True)

-- | Idem for 'mkUnfoldFilterForAll' but do something on each Element externally
--
mkUnfoldFilterForAll' :: (readElem -> DP st ()) -- ^ 
                      -> Filter dpDefinition filterState filterParam st -- ^ 
                      -> (readElem -> filterState) -- ^ 
                      -> ReadChannel readElem -- ^ 
                      -> HList l -- ^ 
                      -> UnFoldFilter dpDefinition readElem st filterState filterParam l
mkUnfoldFilterForAll' :: (readElem -> DP st ())
-> Filter dpDefinition filterState filterParam st
-> (readElem -> filterState)
-> ReadChannel readElem
-> HList l
-> UnFoldFilter dpDefinition readElem st filterState filterParam l
mkUnfoldFilterForAll' = (readElem -> Bool)
-> (readElem -> DP st ())
-> Filter dpDefinition filterState filterParam st
-> (readElem -> filterState)
-> ReadChannel readElem
-> HList l
-> UnFoldFilter dpDefinition readElem st filterState filterParam l
forall k readElem (st :: k) dpDefinition filterState filterParam
       (l :: [*]).
(readElem -> Bool)
-> (readElem -> DP st ())
-> Filter dpDefinition filterState filterParam st
-> (readElem -> filterState)
-> ReadChannel readElem
-> HList l
-> UnFoldFilter dpDefinition readElem st filterState filterParam l
mkUnfoldFilter (Bool -> readElem -> Bool
forall a b. a -> b -> a
const Bool
True)

-- | Run 'UnFoldFilter'
{-# INLINE unfoldF #-}
unfoldF :: forall dpDefinition readElem st filterState filterParam l l1 l2 l3 b2 b3 l4.
           SpawnFilterConstraint dpDefinition readElem st filterState filterParam l l1 l2 l3 b2 b3 l4
        => UnFoldFilter dpDefinition readElem st filterState filterParam l -- ^ 'UnFoldFilter'
        -> DP st (HList l) -- ^Return the list of 'ReadChannel's with the results to be read for the 'Generator' at the end. You can use this to pass the results to 'Sink'
unfoldF :: UnFoldFilter dpDefinition readElem st filterState filterParam l
-> DP st (HList l)
unfoldF = UnFoldFilter dpDefinition readElem st filterState filterParam l
-> DP st (HList l)
loopSpawn

  where
    loopSpawn :: UnFoldFilter dpDefinition readElem st filterState filterParam l
-> DP st (HList l)
loopSpawn uf :: UnFoldFilter dpDefinition readElem st filterState filterParam l
uf@UnFoldFilter{..} =
      DP st (HList l)
-> (readElem -> DP st (HList l))
-> Maybe readElem
-> DP st (HList l)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (HList l -> DP st (HList l)
forall (f :: * -> *) a. Applicative f => a -> f a
pure HList l
_ufRsChannels) (UnFoldFilter dpDefinition readElem st filterState filterParam l
-> DP st (HList l)
loopSpawn (UnFoldFilter dpDefinition readElem st filterState filterParam l
 -> DP st (HList l))
-> (readElem
    -> DP
         st
         (UnFoldFilter dpDefinition readElem st filterState filterParam l))
-> readElem
-> DP st (HList l)
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< UnFoldFilter dpDefinition readElem st filterState filterParam l
-> readElem
-> DP
     st
     (UnFoldFilter dpDefinition readElem st filterState filterParam l)
doOnElem UnFoldFilter dpDefinition readElem st filterState filterParam l
uf) (Maybe readElem -> DP st (HList l))
-> DP st (Maybe readElem) -> DP st (HList l)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (Maybe readElem) -> DP st (Maybe readElem)
forall k (st :: k) a. IO a -> DP st a
DP (ReadChannel readElem -> IO (Maybe readElem)
forall (m :: * -> *) a. MonadIO m => ReadChannel a -> m (Maybe a)
pull ReadChannel readElem
_ufReadChannel)

    doOnElem :: UnFoldFilter dpDefinition readElem st filterState filterParam l
-> readElem
-> DP
     st
     (UnFoldFilter dpDefinition readElem st filterState filterParam l)
doOnElem uf :: UnFoldFilter dpDefinition readElem st filterState filterParam l
uf@UnFoldFilter{..} elem' :: readElem
elem' = do
      readElem -> DP st ()
_ufOnElem readElem
elem'
      if readElem -> Bool
_ufSpawnIf readElem
elem'
        then do
          (reads' :: HList (ReadChannel readElem : l)
reads', HList l3
writes' :: HList l3) <- InOutChan (ReadChannel readElem : l) l3
-> (HList (ReadChannel readElem : l), HList l3)
forall (lr :: [*]) (lw :: [*]).
InOutChan lr lw -> (HList lr, HList lw)
getFilterChannels (InOutChan (ReadChannel readElem : l) l3
 -> (HList (ReadChannel readElem : l), HList l3))
-> DP st (InOutChan (ReadChannel readElem : l) l3)
-> DP st (HList (ReadChannel readElem : l), HList l3)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (InOutChan (ReadChannel readElem : l) l3)
-> DP st (InOutChan (ReadChannel readElem : l) l3)
forall k (st :: k) a. IO a -> DP st a
DP (forall (flr :: [*]) (flw :: [*]).
(MkChans (ChansFilter dpDefinition),
 HChan (ChansFilter dpDefinition) ~ InOutChan flr flw) =>
IO (InOutChan flr flw)
forall a (flr :: [*]) (flw :: [*]).
(MkChans a, HChan a ~ InOutChan flr flw) =>
IO (InOutChan flr flw)
makeChansF @(ChansFilter dpDefinition))
          let hlist :: HExtendR readElem (HList (ReadChannel readElem : l4))
hlist = readElem
elem' readElem
-> HList (ReadChannel readElem : l4)
-> HExtendR readElem (HList (ReadChannel readElem : l4))
forall e l. HExtend e l => e -> l -> HExtendR e l
.*. ReadChannel readElem
_ufReadChannel ReadChannel readElem
-> HList l4 -> HExtendR (ReadChannel readElem) (HList l4)
forall e l. HExtend e l => e -> l -> HExtendR e l
.*. (HList l
_ufRsChannels HList l -> HList l3 -> HList (HAppendListR l l3)
forall (l1 :: [*]) (l2 :: [*]).
HAppendList l1 l2 =>
HList l1 -> HList l2 -> HList (HAppendListR l1 l2)
`hAppendList` HList l3
writes')
          DP st (Async ()) -> DP st ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (DP st (Async ()) -> DP st ()) -> DP st (Async ()) -> DP st ()
forall a b. (a -> b) -> a -> b
$ Filter dpDefinition filterState filterParam st
-> filterState
-> HList (readElem : ReadChannel readElem : l4)
-> HList (ReadChannel readElem : l4)
-> DP st (Async ())
forall k (ss :: [*]) (n :: HNat) dpDefinition filterParam
       filterState (st :: k) (xs :: [*]) filterState2.
(CloseList ss,
 HCurry'
   n
   (WithFilter dpDefinition filterParam (StateT filterState (DP st)))
   xs
   (StateT filterState2 (DP st) ()),
 ArityFwd
   (WithFilter dpDefinition filterParam (StateT filterState (DP st)))
   n,
 ArityRev
   (WithFilter dpDefinition filterParam (StateT filterState (DP st)))
   n) =>
Filter dpDefinition filterState filterParam st
-> filterState2 -> HList xs -> HList ss -> DP st (Async ())
runFilter Filter dpDefinition filterState filterParam st
_ufFilter (readElem -> filterState
_ufInitState readElem
elem') HList (readElem : ReadChannel readElem : l4)
HExtendR readElem (HList (ReadChannel readElem : l4))
hlist (ReadChannel readElem
_ufReadChannel ReadChannel readElem
-> HList l4 -> HExtendR (ReadChannel readElem) (HList l4)
forall e l. HExtend e l => e -> l -> HExtendR e l
.*. (HList l
_ufRsChannels HList l -> HList l3 -> HList (HAppendListR l l3)
forall (l1 :: [*]) (l2 :: [*]).
HAppendList l1 l2 =>
HList l1 -> HList l2 -> HList (HAppendListR l1 l2)
`hAppendList` HList l3
writes'))
          UnFoldFilter dpDefinition readElem st filterState filterParam l
-> DP
     st
     (UnFoldFilter dpDefinition readElem st filterState filterParam l)
forall (m :: * -> *) a. Monad m => a -> m a
return (UnFoldFilter dpDefinition readElem st filterState filterParam l
 -> DP
      st
      (UnFoldFilter dpDefinition readElem st filterState filterParam l))
-> UnFoldFilter dpDefinition readElem st filterState filterParam l
-> DP
     st
     (UnFoldFilter dpDefinition readElem st filterState filterParam l)
forall a b. (a -> b) -> a -> b
$ UnFoldFilter dpDefinition readElem st filterState filterParam l
uf { _ufReadChannel :: ReadChannel readElem
_ufReadChannel = HList (ReadChannel readElem : l) -> ReadChannel readElem
forall e (l :: [*]). HList (e : l) -> e
hHead HList (ReadChannel readElem : l)
reads', _ufRsChannels :: HList l
_ufRsChannels = HList (ReadChannel readElem : l) -> HList l
forall e (l :: [*]). HList (e : l) -> HList l
hTail HList (ReadChannel readElem : l)
reads' }
        else UnFoldFilter dpDefinition readElem st filterState filterParam l
-> DP
     st
     (UnFoldFilter dpDefinition readElem st filterState filterParam l)
forall (m :: * -> *) a. Monad m => a -> m a
return UnFoldFilter dpDefinition readElem st filterState filterParam l
uf