-- | This module defines the default specialisation of flows that
--   appears in "Data.Repa.Flow". Each stream in the bundle is indexed
--   by a single integer, and stream state is stored using the IO monad.
--
module Data.Repa.Flow.Auto
        ( -- * Flow types
          Sources
        , Sinks
        , Flow
        , sourcesArity
        , sinksArity

        -- * Conversion
        -- ** List conversion
        , fromList,             fromLists
        , toList1,              toLists1

        -- ** Array conversion
        , fromArray,            fromArrays
        , toArray1,             toArrays1

        -- * Evaluation
        , drainS
        , drainP
        , consumeS

        -- * Flow Operators
        -- ** Replicating
        , replicates_i

        -- ** Mapping
        -- | If you want to work on a chunk at a time then use 
        --   `Data.Repa.Flow.Generic.map_i` and
        --   `Data.Repa.Flow.Generic.map_o` from "Data.Repa.Flow.Generic".
        , map_i,                map_o
        , zipWith_i

        -- ** Processing
        , process_i

        -- | Higher arity zipWith functions.
        , module Data.Repa.Flow.Auto.ZipWith

        -- ** Connecting
        , dup_oo
        , dup_io
        , dup_oi
        , connect_i

        -- ** Watching
        , watch_i,              watch_o
        , trigger_o

        -- ** Ignorance
        , ignore_o
        , abandon_o

        -- ** Splitting
        , head_i

        -- ** Concatenation
        , concat_i

        -- ** Selecting
        , select_i,             select_o
        , discard_i,            discard_o
        , mask_i,               mask_o

        -- ** Grouping
        , groups_i
        , groupsBy_i
        , GroupsDict

        -- ** Folding
        -- *** Complete
        , foldlS,               foldlAllS

        -- *** Segmented
        , folds_i,              FoldsDict
        , foldGroupsBy_i,       FoldGroupsDict

        -- * Finalizers
        , finalize_i,           finalize_o
        )
where
import Data.Repa.Flow.Auto.ZipWith
import Data.Repa.Flow.Auto.Select
import Data.Repa.Flow.Auto.Base
import Data.Repa.Array.Auto                    
        hiding (fromList, fromLists)

import Data.Repa.Array.Material.Auto                    (A(..), Name(..))
import qualified Data.Repa.Array.Meta.Window            as A
import qualified Data.Repa.Array.Material               as A
import qualified Data.Repa.Array.Generic                as A
import qualified Data.Repa.Flow.Chunked                 as C hiding (next)
import qualified Data.Repa.Flow.Generic                 as G hiding (next)
import Control.Monad
#include "repa-flow.h"


-- Evaluation -----------------------------------------------------------------
-- | Pull all available values from the sources and push them to the sinks.
--   Streams in the bundle are processed sequentially, from first to last.
--
--   * If the `Sources` and `Sinks` have different numbers of streams then
--     we only evaluate the common subset.
--
drainS   :: Sources a -> Sinks a -> IO ()
drainS = G.drainS
{-# INLINE drainS #-}


-- | Pull all available values from the sources and push them to the sinks,
--   in parallel. We fork a thread for each of the streams and evaluate
--   them all in parallel.
--
--   * If the `Sources` and `Sinks` have different numbers of streams then
--     we only evaluate the common subset.
--
drainP   :: Sources a -> Sinks a -> IO ()
drainP = G.drainP
{-# INLINE drainP #-}


-- | Pull all available values from the sources and pass them to the
--   given action.
--
consumeS :: A.Bulk A a
         => Sources a -> (Int -> a -> IO ()) -> IO ()
consumeS = C.consumeS
{-# INLINE consumeS #-}


-- Finalizers -----------------------------------------------------------------
-- | Attach a finalizer to some sources.
--
--   * For a given source, the finalizer will be called the first time a
--     consumer of that source tries to pull an element when no more
--     are available. 
--
--   * The finalizer is given the index of the source that ended.
--
--   * The finalizer will be run after any finalizers already attached
--     to the source.
--
---
--     TODO: make the finalizer run just the first time.
--
finalize_i
        :: (Int -> IO ())
        -> Sources a -> IO (Sources a)
finalize_i f s 
        = G.finalize_i f s
{-# INLINE finalize_i #-}


-- | Attach a finalizer to some sinks.
--
--   * For a given sink, the finalizer will be called the first time
--     that sink is ejected.
--      
--   * The finalizer is given the index of the sink that was ejected.
--
--   * The finalizer will be run after any finalizers already attached
--     to the sink.
--
---
--     TODO: make the finalizer run just the first time.
--
finalize_o
        :: (Int -> IO ())
        -> Sinks a   -> IO (Sinks a)
finalize_o f k 
        = G.finalize_o f k
{-# INLINE finalize_o #-}


-- Replicating ----------------------------------------------------------------
-- | Segmented replicate.
replicates_i 
        :: (Flow (Int, a), Build a)
        => Sources (Int, a)     -- ^ Source of segment lengths and values.
        -> IO (Sources a)       
replicates_i = C.replicates_i A
{-# INLINE replicates_i #-}


-- Mapping --------------------------------------------------------------------
-- | Apply a function to all elements pulled from some sources.
map_i   :: (Flow a, Build b)
        => (a -> b) -> Sources a -> IO (Sources b)
map_i f s = C.smap_i (\_ x -> f x) s
{-# INLINE map_i #-}


-- | Apply a function to all elements pushed to some sinks.
map_o   :: (Flow a, Build b)
        => (a -> b) -> Sinks b   -> IO (Sinks a)
map_o f s = C.smap_o (\_ x -> f x) s
{-# INLINE map_o #-}


-- | Combine corresponding elements of two sources with the given function.
zipWith_i 
        :: (Flow a, Flow b, Build c)
        => (a -> b -> c) 
        -> Sources a -> Sources b 
        -> IO (Sources c)
zipWith_i f sa sb
        = C.szipWith_ii A (\_ a b -> f a b) sa sb
{-# INLINE zipWith_i #-}


-- Processing -----------------------------------------------------------------
-- | Apply a generic stream process to a bundle of sources.
process_i
        :: ( Flow a, Flow b, Build b)
        => (s -> a -> (s, Array b))     -- ^ Worker function.
        -> s                            -- ^ Initial state.
        ->     Sources a                -- ^ Input sources.
        -> IO (Sources b)

process_i = C.process_i
{-# INLINE process_i #-}


-- Concatenation --------------------------------------------------------------
-- | Concatenate a flow of arrays into a flow of the elements.
concat_i
        :: (Flow a, Build a)
        => Sources (Array a)
        -> IO (Sources a)
concat_i ss
        = G.map_i (A.concat A) ss
{-# INLINE concat_i #-}


-- Connecting -----------------------------------------------------------------
-- | Send the same data to two consumers.
--
--   Given two argument sinks, yield a result sink.
--   Pushing to the result sink causes the same element to be pushed to both
--   argument sinks. 
dup_oo  :: Sinks a -> Sinks a -> IO (Sinks a)
dup_oo = G.dup_oo
{-# INLINE dup_oo #-}


-- | Send the same data to two consumers.
--  
--   Given an argument source and argument sink, yield a result source.
--   Pulling an element from the result source pulls from the argument source,
--   and pushes that element to the sink, as well as returning it via the
--   result source.
--   
dup_io  :: Sources a -> Sinks a -> IO (Sources a)
dup_io = G.dup_io
{-# INLINE dup_io #-}


-- | Send the same data to two consumers.
--
--   Like `dup_io` but with the arguments flipped.
--
dup_oi  :: Sinks a -> Sources a -> IO (Sources a)
dup_oi = G.dup_oi
{-# INLINE dup_oi #-}


-- | Connect an argument source to two result sources.
--
--   Pulling from either result source pulls from the argument source.
--   Each result source only gets the elements pulled at the time, 
--   so if one side pulls all the elements the other side won't get any.
--
connect_i :: Sources a -> IO (Sources a, Sources a)
connect_i = G.connect_i
{-# INLINE connect_i #-}


-- Watching -------------------------------------------------------------------
-- | Hook a worker function to some sources, which will be passed every
--   chunk that is pulled from each source.
--
--   * The worker is also passed the source index of the chunk that was pulled.
--
watch_i :: (Int -> A.Array A a -> IO ()) 
        -> Sources a  -> IO (Sources a)
watch_i f s = G.watch_i f s
{-# INLINE watch_i #-}


-- | Hook a worker function to some sinks, which will be passed every 
--   chunk that is pushed to each sink.
--
--   * The worker is also passed the source index of the chunk that was pushed.
--
watch_o :: (Int -> A.Array A a -> IO ())
        -> Sinks a    -> IO (Sinks a)
watch_o f k = G.watch_o f k
{-# INLINE watch_o #-}


-- | Create a bundle of sinks of the given arity that pass incoming chunks
--   to a worker function. 
--
--   * This is like `watch_o`, except that the incoming chunks are discarded
--     after they are passed to the worker function
--
trigger_o :: Int -> (Int -> A.Array A a -> IO ()) 
          -> IO (Sinks a)
trigger_o arity f 
        = G.trigger_o arity f
{-# INLINE trigger_o #-}


-- Ignorance ------------------------------------------------------------------
-- | Create a bundle of sinks of the given arity that drop all data on the
--   floor. 
--
--   * Haskell debugging thunks attached to the chunks will be
--     demanded, but thunks attached to elements may not be -- depending on
--     whether the chunk representation is strict in the elements.
--
ignore_o :: Int -> IO (Sinks a)
ignore_o  = G.ignore_o
{-# INLINE ignore_o #-}


-- | Create a bundle of sinks of the given arity that drop all data on the
--   floor.
--
--   * As opposed to `ignore_o` the sinks are non-strict in the chunks.
--   * Haskell debugging thunks attached to the chunks will *not* be 
--     demanded.
--
abandon_o :: Int -> IO (Sinks a)
abandon_o = G.abandon_o
{-# INLINE abandon_o #-}



-- Splitting ------------------------------------------------------------------
-- | Given a source index and a length, split the a list of that
--   length from the front of the source. Yields a new source for the
--   remaining elements.
--
--   * We pull /whole chunks/ from the source stream until we have
--     at least the desired number of elements. The leftover elements
--     in the final chunk are visible in the result `Sources`.
--
head_i  :: Flow a
        => Int -> Int -> Sources a -> IO (Maybe ([a], Sources a))
head_i ix len s
 | ix >= G.sourcesArity s = return Nothing
 | otherwise             
 = liftM Just $ C.head_i len s ix
{-# INLINE head_i #-}



-- Grouping -------------------------------------------------------------------
-- | Scan through some sources to find runs of matching elements, 
--   and count the lengths of those runs.
--
-- @  
-- > F.toList1 0 =<< F.groups_i =<< F.fromList 1 "waabbbblle"
-- [(\'w\',1),(\'a\',2),(\'b\',4),(\'l\',2),(\'e\',1)]
-- @
--
groups_i
        :: (GroupsDict a, Eq a)
        => Sources a       -- ^ Input elements.
        -> IO (Sources (a, Int)) 
                                -- ^ Starting element and length of groups.
groups_i s
        = groupsBy_i (==) s
{-# INLINE groups_i #-}


-- | Like `groupsBy`, but take a function to determine whether two consecutive
--   values should be in the same group.
groupsBy_i
        :: GroupsDict a
        => (a -> a -> Bool)     -- ^ Fn to check if consecutive elements
                                --   are in the same group.
        -> Sources a       -- ^ Input elements.
        -> IO (Sources (a, Int)) 
                                -- ^ Starting element and length of groups.
groupsBy_i f s
        =   G.map_i (A.convert A)
        =<< C.groupsBy_i A A f s
{-# INLINE groupsBy_i #-}


-- | Dictionaries needed to perform a grouping.
type GroupsDict a
        = C.GroupsDict Int IO A A A a


-- Folding --------------------------------------------------------------------
-- | Fold all the elements of each stream in a bundle, one stream after the
--   other, returning an array of fold results.
foldlS
        :: (Flow b, Build a)
        => (a -> b -> a)                -- ^ Combining funtion.
        -> a                            -- ^ Starting value.
        -> Sources b                    -- ^ Input elements to fold.
        -> IO (A.Array A a)

foldlS f z ss
        = C.foldlS A f z ss
{-# INLINE foldlS #-}


-- | Fold all the elements of each stream in a bundle, one stream after the
--   other, returning an array of fold results.
foldlAllS
        :: Flow b
        => (a -> b -> a)                -- ^ Combining funtion.
        -> a                            -- ^ Starting value.
        -> Sources b                    -- ^ Input elements to fold.
        -> IO a

foldlAllS f z ss
        = C.foldlAllS f z ss
{-# INLINE foldlAllS #-}


-- | Given streams of lengths and values, perform a segmented fold where
--   fold segments of values of the corresponding lengths are folded 
--   together.
--
-- @
-- > sSegs <- F.fromList 1 [(\'a\', 1), (\'b\', 2), (\'c\', 4), (\'d\', 0), (\'e\', 1), (\'f\', 5 :: Int)]
-- > sVals <- F.fromList 1 [10, 20, 30, 40, 50, 60, 70, 80, 90 :: Int]
--
-- > F.toList1 0 =<< F.folds_i (+) 0 sSegs sVals
-- [(\'a\',10),(\'b\',50),(\'c\',220),(\'d\',0),(\'e\',80)]
-- @
--
--   If not enough input elements are available to fold a complete segment
--   then no output is produced for that segment. However, trailing zero
--   length segments still produce the initial value for the fold.
--
-- @
-- > sSegs <- F.fromList 1 [(\'a\', 1), (\'b\', 2), (\'c\', 0), (\'d\', 0), (\'e\', 0 :: Int)]
-- > sVals <- F.fromList 1 [10, 20, 30 :: Int]
--
-- > F.toList1 0 =<< F.folds_i (*) 1 sSegs sVals
-- [(\'a\',10),(\'b\',600),(\'c\',1),(\'d\',1),(\'e\',1)]
-- @
--
folds_i :: FoldsDict n a b
        => (a -> b -> b)          -- ^ Worker function.
        -> b                      -- ^ Initial state when folding each segment.
        -> Sources (n, Int)       -- ^ Segment lengths.
        -> Sources a              -- ^ Input elements to fold.
        -> IO (Sources  (n, b))   -- ^ Result elements.

folds_i f z sLen sVal
        =   G.map_i (A.convert A)
        =<< C.folds_i A A f z sLen sVal
{-# INLINE folds_i #-}

-- | Dictionaries needed to perform a segmented fold.
type FoldsDict n a b
        = C.FoldsDict Int IO A A A A n a b


-- | Combination of `groupsBy_i` and `folds_i`. We determine the the segment
--   lengths while performing the folds.
-- 
--   Note that a SQL-like groupby aggregations can be performed using this 
--   function, provided the data is pre-sorted on the group key. For example,
--   we can take the average of some groups of values:
--
-- @
-- > sKeys <- F.fromList 1 "waaaabllle"
-- > sVals <- F.fromList 1 [10, 20, 30, 40, 50, 60, 70, 80, 90, 100 :: Double]
-- 
-- > sResult \<-  F.map_i (\\(key, (acc, n)) -\> (key, acc / n))
--           =\<\< F.foldGroupsBy_i (==) (\\x (acc, n) -> (acc + x, n + 1)) (0, 0) sKeys sVals
--
-- > F.toList1 0 sResult
-- [10.0,35.0,60.0,80.0,100.0]
-- @
--
foldGroupsBy_i
        :: (FoldGroupsDict n a b)
        => (n -> n -> Bool)     -- ^ Fn to check if consecutive elements
                                --   are in the same group.
        -> (a -> b -> b)        -- ^ Worker function for the fold.
        -> b                    -- ^ Initial when folding each segment.
        -> Sources n            -- ^ Names that determine groups.
        -> Sources a            -- ^ Values to fold.
        -> IO (Sources (n, b))

foldGroupsBy_i pGroup f z sNames sVals
 = do   segLens <- groupsBy_i pGroup sNames
        folds_i f z segLens sVals
{-# INLINE foldGroupsBy_i #-}
 

type FoldGroupsDict  n a b
      = ( A.BulkI    A n
        , A.Material A a
        , A.Material A n
        , A.Material A b)