{-# LANGUAGE ExplicitForAll #-} {-# LANGUAGE OverloadedLists #-} {-# LANGUAGE ScopedTypeVariables #-} module Dataflow.Vertices ( statefulVertex, statelessVertex, outputTVar, trace, discard ) where import Control.Concurrent.STM.TVar (TVar, modifyTVar') import Control.Monad.STM (atomically) import Control.Monad.Trans.Class (lift) import Dataflow.Primitives (Dataflow (..), Edge, StateRef, Timestamp (..), Vertex (..), newState, registerFinalizer, registerVertex, send) import Prelude import Text.Show.Pretty (pPrint) -- | Construct a vertex with internal state. Like 'statelessVertex', 'statefulVertex' -- requires a procedure to invoke on each input. It also needs an initial 'state' value -- and a procedure to call when all inputs for a given 'Timestamp' value have been -- delivered. -- -- NB: Until the finalizer has been called for a particular timestamp, a stateful vertex -- must be capable of accepting data for multiple timestamps simultaneously. -- -- @since 0.1.0.0 statefulVertex :: state -- ^ The initial state value. -> (StateRef state -> Timestamp -> i -> Dataflow ()) -- ^ The input handler. -> (StateRef state -> Timestamp -> Dataflow ()) -- ^ The finalizer. -> Dataflow (Edge i) statefulVertex initState callback finalizer = do stateRef <- newState initState registerFinalizer $ finalizer stateRef registerVertex $ StatefulVertex stateRef callback -- | Construct a vertex with no internal state. The given procedure is invoked on each input. -- -- `send`ing to a stateless vertex is effectively a function call and will execute in the -- caller's thread. By design this is a cheap operation. -- -- @since 0.1.0.0 statelessVertex :: (Timestamp -> i -> Dataflow ()) -> Dataflow (Edge i) statelessVertex callback = registerVertex $ StatelessVertex callback {-# NOINLINE outputTVar #-} -- | Construct an output vertex that stores items into the provided 'TVar'. The first argument -- is an update function so that, for example, the 'TVar' could contain a list of 'o's and the update -- function could then `cons` new items onto the list. -- -- @since 0.1.0.0 outputTVar :: (o -> w -> w) -> TVar w -> Dataflow (Edge o) outputTVar op register = statelessVertex $ \_ x -> Dataflow $ lift $ atomically $ modifyTVar' register (op x) -- | Construct a vertex that pretty-prints items and passes them through unchanged. -- -- @since 0.1.2.0 trace :: Show i => Edge i -> Dataflow (Edge i) trace next = do trace' <- ioVertex $ curry pPrint statelessVertex $ \t x -> do send trace' t x send next t x where ioVertex callback = registerVertex $ StatelessVertex $ \t i -> Dataflow $ lift $ callback t i -- | Construct a vertex that discards anything sent to it. -- -- @since 0.1.2.0 discard :: Dataflow (Edge i) discard = statelessVertex $ \_ _ -> return ()