module Data.Repa.Flow.Generic.Eval
( drainS
, drainP
, consumeS)
where
import Data.Repa.Flow.Generic.Base
import Data.Repa.Eval.Gang as Eval
import GHC.Exts
#include "repa-flow.h"
drainS :: (Next i, Monad m)
=> Sources i m a -> Sinks i m a -> m ()
drainS (Sources nSources ipull) (Sinks nSinks opush oeject)
= loop_drain first
where
n = min nSources nSinks
loop_drain !ix
= ipull ix eat_drain eject_drain
where eat_drain v
= do opush ix v
loop_drain ix
eject_drain
= do oeject ix
case next ix n of
Nothing -> return ()
Just ix' -> loop_drain ix'
drainP :: Sources Int IO a -> Sinks Int IO a -> IO ()
drainP (Sources nSources ipull) (Sinks nSinks opush oeject)
= do
gang <- Eval.forkGang n
Eval.gangIO gang drainMe
where
!n = min nSources nSinks
drainMe !ix
= ipull (I# ix) eat_drain eject_drain
where eat_drain v
= do opush (I# ix) v
drainMe ix
eject_drain = oeject (I# ix)
consumeS :: (Next i, Monad m)
=> (i -> a -> m ())
-> Sources i m a
-> m ()
consumeS eat (Sources nSources ipull)
= loop_consume first
where
loop_consume !ix
= ipull ix eat_consume eject_consume
where
eat_consume v
= do eat ix v
loop_consume ix
eject_consume
= do case next ix nSources of
Nothing -> return ()
Just ix' -> loop_consume ix'