module Test.Dataflow ( runDataflow, runDataflowMany ) where import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO, readTVarIO) import Control.Monad (foldM_) import Control.Monad.IO.Class (MonadIO (..)) import Control.Monad.STM (atomically) import Control.Monad.Trans.Class (lift) import Dataflow (Edge, compile, execute, modifyState, readState, statefulVertex, writeState) import Dataflow.Primitives (Dataflow (Dataflow)) import Prelude -- | Run a dataflow with a list of inputs. All inputs will be sent as part of -- a single epoch. -- -- @since 0.1.0.0 runDataflow :: MonadIO io => (Edge o -> Dataflow (Edge i)) -> [i] -> io [o] runDataflow dataflow inputs = head <$> runDataflowMany dataflow [inputs] -- | Run a dataflow with a list of lists of inputs. Each outer list will be -- sent as its own epoch. -- -- @since 0.2.2.0 runDataflowMany :: MonadIO io => (Edge o -> Dataflow (Edge i)) -> [[i]] -> io [[o]] runDataflowMany dataflow inputs = liftIO $ do out <- newTVarIO [] program <- compile (dataflow =<< outputTVarNestedList out) foldM_ (flip execute) program inputs reverse <$> readTVarIO out where outputTVarNestedList register = statefulVertex [] (\sref _ x -> modifyState sref (x :)) (\sref _ -> do state <- readState sref Dataflow $ lift $ atomically $ modifyTVar' register (reverse state :) writeState sref [] )