{-| Module : Dataflow Description : Timely Dataflow for Haskell Copyright : (c) Double Crown Gaming Co. 2020 License : BSD3 Maintainer : jesse.kempf@doublecrown.co Stability : experimental Common utility operators for data flows @since 0.1.3.0 -} module Dataflow.Operators ( fanout, map, join2, join3, join4, join5, join6 ) where import Dataflow.Primitives (Dataflow, Edge, StateRef, Timestamp, Vertex (StatefulVertex), newState, registerFinalizer, registerVertex, send) import Dataflow.Vertices (statelessVertex) import Prelude (mapM_, ($), (<$>), (<*>)) -- | Construct a stateless vertex that sends each input to every 'Edge' in the output list. -- -- @since 0.1.3.0 fanout :: [Edge a] -> Dataflow (Edge a) fanout nexts = statelessVertex $ \timestamp x -> mapM_ (\next -> send next timestamp x) nexts -- | Construct a stateless vertex that applies the provided function to every input -- and sends the result to the output. -- -- @since 0.1.3.0 map :: (i -> o) -> Edge o -> Dataflow (Edge i) map f next = statelessVertex $ \timestamp x -> send next timestamp (f x) -- | Construct a stateful vertex with two input edges. -- -- @since 0.1.3.0 join2 :: state -> (StateRef state -> Timestamp -> i -> Dataflow ()) -> (StateRef state -> Timestamp -> j -> Dataflow ()) -> (StateRef state -> Timestamp -> Dataflow ()) -> Dataflow (Edge i, Edge j) join2 initState callbackI callbackJ finalizer = do stateRef <- newState initState registerFinalizer $ finalizer stateRef (,) <$> registerVertex (StatefulVertex stateRef callbackI) <*> registerVertex (StatefulVertex stateRef callbackJ) -- | Construct a stateful vertex with three input edges. -- -- @since 0.1.3.0 join3 :: state -> (StateRef state -> Timestamp -> i -> Dataflow ()) -> (StateRef state -> Timestamp -> j -> Dataflow ()) -> (StateRef state -> Timestamp -> k -> Dataflow ()) -> (StateRef state -> Timestamp -> Dataflow ()) -> Dataflow (Edge i, Edge j, Edge k) join3 initState callbackI callbackJ callbackK finalizer = do stateRef <- newState initState registerFinalizer $ finalizer stateRef (,,) <$> registerVertex (StatefulVertex stateRef callbackI) <*> registerVertex (StatefulVertex stateRef callbackJ) <*> registerVertex (StatefulVertex stateRef callbackK) -- | Construct a stateful vertex with four input edges. -- -- @since 0.2.1.0 join4 :: state -> (StateRef state -> Timestamp -> i1 -> Dataflow ()) -> (StateRef state -> Timestamp -> i2 -> Dataflow ()) -> (StateRef state -> Timestamp -> i3 -> Dataflow ()) -> (StateRef state -> Timestamp -> i4 -> Dataflow ()) -> (StateRef state -> Timestamp -> Dataflow ()) -> Dataflow (Edge i1, Edge i2, Edge i3, Edge i4) join4 initState callback1 callback2 callback3 callback4 finalizer = do stateRef <- newState initState registerFinalizer $ finalizer stateRef (,,,) <$> registerVertex (StatefulVertex stateRef callback1) <*> registerVertex (StatefulVertex stateRef callback2) <*> registerVertex (StatefulVertex stateRef callback3) <*> registerVertex (StatefulVertex stateRef callback4) -- | Construct a stateful vertex with five input edges. -- -- @since 0.2.1.0 join5 :: state -> (StateRef state -> Timestamp -> i1 -> Dataflow ()) -> (StateRef state -> Timestamp -> i2 -> Dataflow ()) -> (StateRef state -> Timestamp -> i3 -> Dataflow ()) -> (StateRef state -> Timestamp -> i4 -> Dataflow ()) -> (StateRef state -> Timestamp -> i5 -> Dataflow ()) -> (StateRef state -> Timestamp -> Dataflow ()) -> Dataflow (Edge i1, Edge i2, Edge i3, Edge i4, Edge i5) join5 initState callback1 callback2 callback3 callback4 callback5 finalizer = do stateRef <- newState initState registerFinalizer $ finalizer stateRef (,,,,) <$> registerVertex (StatefulVertex stateRef callback1) <*> registerVertex (StatefulVertex stateRef callback2) <*> registerVertex (StatefulVertex stateRef callback3) <*> registerVertex (StatefulVertex stateRef callback4) <*> registerVertex (StatefulVertex stateRef callback5) -- | Construct a stateful vertex with six input edges. -- -- @since 0.2.1.0 join6 :: state -> (StateRef state -> Timestamp -> i1 -> Dataflow ()) -> (StateRef state -> Timestamp -> i2 -> Dataflow ()) -> (StateRef state -> Timestamp -> i3 -> Dataflow ()) -> (StateRef state -> Timestamp -> i4 -> Dataflow ()) -> (StateRef state -> Timestamp -> i5 -> Dataflow ()) -> (StateRef state -> Timestamp -> i6 -> Dataflow ()) -> (StateRef state -> Timestamp -> Dataflow ()) -> Dataflow (Edge i1, Edge i2, Edge i3, Edge i4, Edge i5, Edge i6) join6 initState callback1 callback2 callback3 callback4 callback5 callback6 finalizer = do stateRef <- newState initState registerFinalizer $ finalizer stateRef (,,,,,) <$> registerVertex (StatefulVertex stateRef callback1) <*> registerVertex (StatefulVertex stateRef callback2) <*> registerVertex (StatefulVertex stateRef callback3) <*> registerVertex (StatefulVertex stateRef callback4) <*> registerVertex (StatefulVertex stateRef callback5) <*> registerVertex (StatefulVertex stateRef callback6)