module Data.Repa.Flow.Generic.Eval
        ( drainS
        , drainP
        , consumeS)
where
import Data.Repa.Flow.Generic.Base
import Data.Repa.Eval.Gang                      as Eval
import GHC.Exts
#include "repa-flow.h"


-- | Pull all available values from the sources and push them to the sinks.
--   Streams in the bundle are processed sequentially, from first to last.
--
--   * If the `Sources` and `Sinks` have different numbers of streams then
--     we only evaluate the common subset.
--
drainS  :: (Next i, Monad m)
        => Sources i m a -> Sinks i m a -> m ()

drainS (Sources nSources ipull) (Sinks nSinks opush oeject)
 = loop_drain first
 where 
        n = min nSources nSinks

        loop_drain !ix
         = ipull ix eat_drain eject_drain
         where  eat_drain v
                 = do   opush ix v
                        loop_drain ix
                {-# INLINE eat_drain #-}

                eject_drain
                 = do   oeject ix  
                        case next ix n of
                         Nothing        -> return ()
                         Just ix'       -> loop_drain ix'
                {-# INLINE eject_drain #-}
        {-# INLINE loop_drain #-}
{-# INLINE_FLOW drainS #-}


-- | Pull all available values from the sources and push them to the sinks,
--   in parallel. We fork a thread for each of the streams and evaluate
--   them all in parallel.
--
--   * If the `Sources` and `Sinks` have different numbers of streams then
--     we only evaluate the common subset.
--
drainP  :: Sources Int IO a -> Sinks Int IO a -> IO ()
drainP (Sources nSources ipull) (Sinks nSinks opush oeject)
 = do   
        -- Create a new gang.
        gang    <- Eval.forkGang n

        -- Evalaute all the streams in different threads.
        Eval.gangIO gang drainMe

 where  
        !n      = min nSources nSinks

        drainMe !ix
         = ipull (I# ix) eat_drain eject_drain
         where  eat_drain v 
                 = do   opush  (I# ix) v
                        drainMe ix
                {-# INLINE eat_drain #-}

                eject_drain = oeject (I# ix)
                {-# INLINE eject_drain #-}
        {-# INLINE drainMe #-}
{-# INLINE_FLOW drainP #-}


-- | Pull all available values from the sources and pass them to the
--   given action.
consumeS :: (Next i, Monad m)
         => (i -> a -> m ())
         -> Sources i m a 
         -> m ()
consumeS eat (Sources nSources ipull)
 = loop_consume first
 where  
        loop_consume !ix
         = ipull ix eat_consume eject_consume
         where  
                eat_consume v
                 = do eat ix v
                      loop_consume ix
                {-# INLINE eat_consume #-}

                eject_consume 
                 = do case next ix nSources of
                        Nothing  -> return ()
                        Just ix' -> loop_consume ix'
                {-# INLINE eject_consume #-}
        {-# INLINE loop_consume #-}
{-# INLINE_FLOW consumeS #-}