{-| Module : Dataflow Description : Timely Dataflow for Haskell Copyright : (c) Double Crown Gaming Co. 2020 License : BSD3 Maintainer : jesse.kempf@doublecrown.co Stability : experimental Common utility operators for data flows @since 0.1.3.0 -} module Dataflow.Operators ( fanout, map, join2, join3 ) where import Dataflow.Primitives (Dataflow, Edge, StateRef, Timestamp, Vertex (StatefulVertex), newState, registerFinalizer, registerVertex, send) import Dataflow.Vertices (statelessVertex) import Prelude (mapM_, ($), (<$>), (<*>)) -- | Construct a stateless vertex that sends each input to every 'Edge' in the output list. -- -- @since 0.1.3.0 fanout :: [Edge a] -> Dataflow (Edge a) fanout nexts = statelessVertex $ \timestamp x -> mapM_ (\next -> send next timestamp x) nexts -- | Construct a stateless vertex that applies the provided function to every input -- and sends the result to the output. -- -- @since 0.1.3.0 map :: (i -> o) -> Edge o -> Dataflow (Edge i) map f next = statelessVertex $ \timestamp x -> send next timestamp (f x) -- | Construct a stateful vertex with two input edges. -- -- @since 0.1.3.0 join2 :: state -> (StateRef state -> Timestamp -> i -> Dataflow ()) -> (StateRef state -> Timestamp -> j -> Dataflow ()) -> (StateRef state -> Timestamp -> Dataflow ()) -> Dataflow (Edge i, Edge j) join2 initState callbackI callbackJ finalizer = do stateRef <- newState initState registerFinalizer $ finalizer stateRef (,) <$> registerVertex (StatefulVertex stateRef callbackI) <*> registerVertex (StatefulVertex stateRef callbackJ) -- | Construct a stateful vertex with three input edges. -- -- @since 0.1.3.0 join3 :: state -> (StateRef state -> Timestamp -> i -> Dataflow ()) -> (StateRef state -> Timestamp -> j -> Dataflow ()) -> (StateRef state -> Timestamp -> k -> Dataflow ()) -> (StateRef state -> Timestamp -> Dataflow ()) -> Dataflow (Edge i, Edge j, Edge k) join3 initState callbackI callbackJ callbackK finalizer = do stateRef <- newState initState registerFinalizer $ finalizer stateRef (,,) <$> registerVertex (StatefulVertex stateRef callbackI) <*> registerVertex (StatefulVertex stateRef callbackJ) <*> registerVertex (StatefulVertex stateRef callbackK)