{-# LANGUAGE ExplicitForAll #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Dataflow.Vertices (
statefulVertex,
statelessVertex,
outputTVar,
trace,
discard
) where
import Control.Concurrent.STM.TVar (TVar, modifyTVar')
import Control.Monad.STM (atomically)
import Control.Monad.Trans.Class (lift)
import Dataflow.Primitives (Dataflow (..), Edge, StateRef,
Timestamp (..), Vertex (..),
newState, registerFinalizer,
registerVertex, send)
import Prelude
import Text.Show.Pretty (pPrint)
statefulVertex ::
state
-> (StateRef state -> Timestamp -> i -> Dataflow ())
-> (StateRef state -> Timestamp -> Dataflow ())
-> Dataflow (Edge i)
statefulVertex initState callback finalizer = do
stateRef <- newState initState
registerFinalizer $ finalizer stateRef
registerVertex $ StatefulVertex stateRef callback
statelessVertex :: (Timestamp -> i -> Dataflow ()) -> Dataflow (Edge i)
statelessVertex callback = registerVertex $ StatelessVertex callback
{-# NOINLINE outputTVar #-}
outputTVar :: (o -> w -> w) -> TVar w -> Dataflow (Edge o)
outputTVar op register = statelessVertex $ \_ x -> Dataflow $ lift $ atomically $ modifyTVar' register (op x)
trace :: Show i => Edge i -> Dataflow (Edge i)
trace next = do
trace' <- ioVertex $ curry pPrint
statelessVertex $ \t x -> do
send trace' t x
send next t x
where
ioVertex callback = registerVertex $ StatelessVertex $ \t i -> Dataflow $ lift $ callback t i
discard :: Dataflow (Edge i)
discard = statelessVertex $ \_ _ -> return ()