module Data.Repa.Flow.Chunked.Process
        ( process_i 
        , unfolds_i,    A.StepUnfold(..))
where
import Data.Repa.Flow.Chunked.Base
import qualified Data.Repa.Array.Generic        as A
import qualified Data.Repa.Array.Generic.Index  as A
import qualified Data.Repa.Flow.Generic.Base    as G
#include "repa-flow.h"


-- | Apply a generic stream process to all the streams in a bundle of sources.
process_i
        :: ( G.States i m
           , A.BulkI   lSrc a
           , A.Bulk    lDst b, A.Bulk    lDst (A.Array lDst b)
           , A.TargetI lDst b, A.TargetI lDst (A.Array lDst b))
        => (s -> a -> (s, A.Array lDst b))      -- ^ Worker function.
        -> s                                    -- ^ Initial state.
        ->    Sources i m lSrc a                -- ^ Input sources
        -> m (Sources i m lDst b)               -- ^ Result sources.

process_i f z (G.Sources n pullA)
 = do
        refs    <- G.newRefs n z

        let pull_process i eatB ejectB
             = do s1 <- G.readRefs refs i
                  pullA i (eatA_process s1) ejectA_process

             where eatA_process s1 xA
                    = case A.process A.name f s1 xA of
                        (s1', arrB) 
                         -> do  G.writeRefs refs i s1'
                                eatB arrB
                   {-# INLINE eatA_process #-}

                   ejectA_process 
                    = ejectB
                   {-# INLINE ejectA_process #-}
            {-# INLINE pull_process #-}

        return $ G.Sources n pull_process
{-# INLINE_FLOW process_i #-}


-- | Apply a generic stream process to all the streams in a bundle of sources.
unfolds_i
        :: ( G.States i m
           , A.BulkI   lSrc a
           , A.Bulk    lDst b
           , A.TargetI lDst b)
        => (a -> s -> A.StepUnfold s b)         -- ^ Worker function.
        -> s                                    -- ^ Initial state.
        ->    Sources i m lSrc a                -- ^ Input sources
        -> m (Sources i m lDst b)               -- ^ Result sources.

unfolds_i f z (G.Sources n pullA)
 = do
        refs    <- G.newRefs n z

        let pull_unfolds i eatB ejectB
             = do s1 <- G.readRefs refs i
                  pullA i (eatA_unfolds s1) ejectA_unfolds

             where eatA_unfolds s1 xA
                    = case A.unfolds A.name f s1 xA of
                        (s1', arrB) 
                         -> do  G.writeRefs refs i s1'
                                eatB arrB
                   {-# INLINE eatA_unfolds #-}

                   ejectA_unfolds
                    = ejectB
                   {-# INLINE ejectA_unfolds #-}
            {-# INLINE pull_unfolds #-}

        return $ G.Sources n pull_unfolds
{-# INLINE_FLOW unfolds_i #-}