module Data.Repa.Flow.Generic.Eval ( drainS , drainP) where import Data.Repa.Flow.Generic.Base import Data.Repa.Eval.Gang as Eval import GHC.Exts #include "repa-flow.h" -- | Pull all available values from the sources and push them to the sinks. -- Streams in the bundle are processed sequentially, from first to last. -- -- * If the `Sources` and `Sinks` have different numbers of streams then -- we only evaluate the common subset. -- drainS :: (Next i, Monad m) => Sources i m a -> Sinks i m a -> m () drainS (Sources nSources ipull) (Sinks nSinks opush oeject) = loop_drain first where n = min nSources nSinks loop_drain !ix = ipull ix eat_drain eject_drain where eat_drain v = do opush ix v loop_drain ix {-# INLINE eat_drain #-} eject_drain = do oeject ix case next ix n of Nothing -> return () Just ix' -> loop_drain ix' {-# INLINE eject_drain #-} {-# INLINE loop_drain #-} {-# INLINE_FLOW drainS #-} -- | Pull all available values from the sources and push them to the sinks, -- in parallel. We fork a thread for each of the streams and evaluate -- them all in parallel. -- -- * If the `Sources` and `Sinks` have different numbers of streams then -- we only evaluate the common subset. -- drainP :: Sources Int IO a -> Sinks Int IO a -> IO () drainP (Sources nSources ipull) (Sinks nSinks opush oeject) = do -- Create a new gang. gang <- Eval.forkGang n -- Evalaute all the streams in different threads. Eval.gangIO gang drainMe where !n = min nSources nSinks drainMe !ix = ipull (I# ix) eat_drain eject_drain where eat_drain v = do opush (I# ix) v drainMe ix {-# INLINE eat_drain #-} eject_drain = oeject (I# ix) {-# INLINE eject_drain #-} {-# INLINE drainMe #-} {-# INLINE_FLOW drainP #-}