-- | This module defines the default specialisation of flows that -- appears in "Data.Repa.Flow". Each stream in the bundle is indexed -- by a single integer, and stream state is stored using the IO monad. -- module Data.Repa.Flow.Default ( -- * Flow types Sources , Sinks , Flow , sourcesArity , sinksArity -- * States and Arrays , module Data.Repa.Flow.States , module Data.Repa.Eval.Array , module Data.Repa.Array , module Data.Repa.Array.Material -- * Evaluation , drainS , drainP -- * Conversion , fromList, fromLists , toList1, toLists1 -- * Finalizers , finalize_i, finalize_o -- * Flow Operators -- ** Mapping -- | If you want to work on a chunk at a time then use -- `Data.Repa.Flow.Generic.map_i` and -- `Data.Repa.Flow.Generic.map_o` from "Data.Repa.Flow.Generic". , map_i, map_o -- ** Connecting , dup_oo , dup_io , dup_oi , connect_i -- ** Watching , watch_i, watch_o , trigger_o -- ** Ignorance , discard_o , ignore_o -- ** Splitting , head_i -- ** Grouping , groups_i , groupsBy_i , GroupsDict -- ** Folding , foldlS, foldlAllS , folds_i, FoldsDict , foldGroupsBy_i, FoldGroupsDict) where import Data.Repa.Flow.States import Data.Repa.Eval.Array import Data.Repa.Eval.Array as A import Data.Repa.Array hiding (FoldsDict, GroupsDict, Index, fromList) import Data.Repa.Array as A hiding (FoldsDict, GroupsDict, fromList) import Data.Repa.Array.Material hiding (fromLists) import Data.Repa.Fusion.Unpack as A import qualified Data.Repa.Flow.Chunked as C hiding (next) import qualified Data.Repa.Flow.Generic as G hiding (next) import Control.Monad #include "repa-flow.h" -- | A bundle of stream sources, where the elements of the stream -- are chunked into arrays. -- -- The chunks have some `Layout` @l@ and contain elements of type @a@. -- See "Data.Repa.Array" for the available layouts. type Sources l a = C.Sources Int IO l a -- | A bundle of stream sinks, where the elements of the stream -- are chunked into arrays. -- type Sinks l a = C.Sinks Int IO l a -- | Yield the number of streams in the bundle. sourcesArity :: Sources l a -> Int sourcesArity = G.sourcesArity -- | Yield the number of streams in the bundle. sinksArity :: Sinks l a -> Int sinksArity = G.sinksArity -- | Shorthand for common type classes. type Flow l a = C.Flow Int IO l a -- Evaluation ----------------------------------------------------------------- -- | Pull all available values from the sources and push them to the sinks. -- Streams in the bundle are processed sequentially, from first to last. -- -- * If the `Sources` and `Sinks` have different numbers of streams then -- we only evaluate the common subset. -- drainS :: Sources l a -> Sinks l a -> IO () drainS = G.drainS {-# INLINE drainS #-} -- | Pull all available values from the sources and push them to the sinks, -- in parallel. We fork a thread for each of the streams and evaluate -- them all in parallel. -- -- * If the `Sources` and `Sinks` have different numbers of streams then -- we only evaluate the common subset. -- drainP :: Sources l a -> Sinks l a -> IO () drainP = G.drainP {-# INLINE drainP #-} -- Conversion ----------------------------------------------------------------- -- | Given an arity and a list of elements, yield sources that each produce all -- the elements. -- -- * All elements are stuffed into a single chunk, and each stream is given -- the same chunk. -- fromList :: A.TargetI l a => Name l -> Int -> [a] -> IO (Sources l a) fromList l xs = C.fromList l xs {-# INLINE fromList #-} -- | Like `fromLists_i` but take a list of lists. Each each of the inner -- lists is packed into a single chunk. fromLists :: A.TargetI l a => Name l -> Int -> [[a]] -> IO (Sources l a) fromLists nDst xss = C.fromLists nDst xss {-# INLINE fromLists #-} -- | Drain a single source from a bundle into a list of elements. toList1 :: A.BulkI l a => Int -> Sources l a -> IO [a] toList1 ix s | ix >= G.sourcesArity s = return [] | otherwise = C.toList1 ix s {-# INLINE toList1 #-} -- | Drain a single source from a bundle into a list of chunks. toLists1 :: A.BulkI l a => Int -> Sources l a -> IO [[a]] toLists1 ix s | ix >= G.sourcesArity s = return [] | otherwise = C.toLists1 ix s {-# INLINE toLists1 #-} -- Finalizers ----------------------------------------------------------------- -- | Attach a finalizer to some sources. -- -- * For a given source, the finalizer will be called the first time a -- consumer of that source tries to pull an element when no more -- are available. -- -- * The finalizer is given the index of the source that ended. -- -- * The finalizer will be run after any finalizers already attached -- to the source. -- --- -- TODO: make the finalizer run just the first time. -- finalize_i :: (Int -> IO ()) -> Sources l a -> IO (Sources l a) finalize_i f s = G.finalize_i f s {-# INLINE finalize_i #-} -- | Attach a finalizer to some sinks. -- -- * For a given sink, the finalizer will be called the first time -- that sink is ejected. -- -- * The finalizer is given the index of the sink that was ejected. -- -- * The finalizer will be run after any finalizers already attached -- to the sink. -- --- -- TODO: make the finalizer run just the first time. -- finalize_o :: (Int -> IO ()) -> Sinks l a -> IO (Sinks l a) finalize_o f k = G.finalize_o f k {-# INLINE finalize_o #-} -- Mapping -------------------------------------------------------------------- -- | Apply a function to all elements pulled from some sources. map_i :: (Flow l1 a, A.TargetI l2 b) => Name l2 -> (a -> b) -> Sources l1 a -> IO (Sources l2 b) map_i _ f s = C.smap_i (\_ x -> f x) s {-# INLINE map_i #-} -- | Apply a function to all elements pushed to some sinks. map_o :: (Flow l1 a, A.TargetI l2 b) => Name l1 -> (a -> b) -> Sinks l2 b -> IO (Sinks l1 a) map_o _ f s = C.smap_o (\_ x -> f x) s {-# INLINE map_o #-} -- Connecting ----------------------------------------------------------------- -- | Send the same data to two consumers. -- -- Given two argument sinks, yield a result sink. -- Pushing to the result sink causes the same element to be pushed to both -- argument sinks. dup_oo :: Sinks l a -> Sinks l a -> IO (Sinks l a) dup_oo = G.dup_oo {-# INLINE dup_oo #-} -- | Send the same data to two consumers. -- -- Given an argument source and argument sink, yield a result source. -- Pulling an element from the result source pulls from the argument source, -- and pushes that element to the sink, as well as returning it via the -- result source. -- dup_io :: Sources l a -> Sinks l a -> IO (Sources l a) dup_io = G.dup_io {-# INLINE dup_io #-} -- | Send the same data to two consumers. -- -- Like `dup_io` but with the arguments flipped. -- dup_oi :: Sinks l a -> Sources l a -> IO (Sources l a) dup_oi = G.dup_oi {-# INLINE dup_oi #-} -- | Connect an argument source to two result sources. -- -- Pulling from either result source pulls from the argument source. -- Each result source only gets the elements pulled at the time, -- so if one side pulls all the elements the other side won't get any. -- connect_i :: Sources l a -> IO (Sources l a, Sources l a) connect_i = G.connect_i {-# INLINE connect_i #-} -- Watching ------------------------------------------------------------------- -- | Hook a worker function to some sources, which will be passed every -- chunk that is pulled from each source. -- -- * The worker is also passed the source index of the chunk that was pulled. -- watch_i :: (Int -> Array l a -> IO ()) -> Sources l a -> IO (Sources l a) watch_i f s = G.watch_i f s {-# INLINE watch_i #-} -- | Hook a worker function to some sinks, which will be passed every -- chunk that is pushed to each sink. -- -- * The worker is also passed the source index of the chunk that was pushed. -- watch_o :: (Int -> Array l a -> IO ()) -> Sinks l a -> IO (Sinks l a) watch_o f k = G.watch_o f k {-# INLINE watch_o #-} -- | Create a bundle of sinks of the given arity that pass incoming chunks -- to a worker function. -- -- * This is like `watch_o`, except that the incoming chunks are discarded -- after they are passed to the worker function -- trigger_o :: Int -> (Int -> Array l a -> IO ()) -> IO (Sinks l a) trigger_o arity f = G.trigger_o arity f {-# INLINE trigger_o #-} -- Ignorance ------------------------------------------------------------------ -- | Create a bundle of sinks of the given arity that drop all data on the -- floor. -- -- * The sinks is strict in the *chunks*, so they are demanded before being -- discarded. -- * Haskell debugging thunks attached to the chunks will be -- demanded, but thunks attached to elements may not be -- depending on -- whether the chunk representation is strict in the elements. -- discard_o :: Int -> IO (Sinks l a) discard_o = G.discard_o {-# INLINE discard_o #-} -- | Create a bundle of sinks of the given arity that drop all data on the -- floor. -- -- * As opposed to `discard_o` the sinks are non-strict in the chunks. -- * Haskell debugging thunks attached to the chunks will *not* be -- demanded. -- ignore_o :: Int -> IO (Sinks l a) ignore_o = G.ignore_o {-# INLINE ignore_o #-} -- Splitting ------------------------------------------------------------------ -- | Given a source index and a length, split the a list of that -- length from the front of the source. Yields a new source for the -- remaining elements. -- -- * We pull /whole chunks/ from the source stream until we have -- at least the desired number of elements. The leftover elements -- in the final chunk are visible in the result `Sources`. -- head_i :: (A.Windowable l a, A.Index l ~ Int) => Int -> Int -> Sources l a -> IO (Maybe ([a], Sources l a)) head_i ix len s | ix >= G.sourcesArity s = return Nothing | otherwise = liftM Just $ C.head_i len s ix {-# INLINE head_i #-} -- Grouping ------------------------------------------------------------------- -- | Scan through some sources to find runs of matching elements, -- and count the lengths of those runs. -- -- @ -- > import Data.Repa.Flow -- > toList1 0 =<< groups_i U U =<< fromList U 1 "waabbbblle" -- Just [(\'w\',1),(\'a\',2),(\'b\',4),(\'l\',2),(\'e\',1)] -- @ -- groups_i :: (GroupsDict lVal lGrp tGrp lLen tLen a, Eq a) => Name lGrp -- ^ Layout of result groups. -> Name lLen -- ^ Layout of result lengths. -> Sources lVal a -- ^ Input elements. -> IO (Sources (T2 lGrp lLen) (a, Int)) -- ^ Starting element and length of groups. groups_i nGrp nLen s = groupsBy_i nGrp nLen (==) s {-# INLINE groups_i #-} -- | Like `groupsBy`, but take a function to determine whether two consecutive -- values should be in the same group. groupsBy_i :: GroupsDict lVal lGrp tGrp lLen tLen a => Name lGrp -- ^ Layout of result groups. -> Name lLen -- ^ Layout of result lengths. -> (a -> a -> Bool) -- ^ Fn to check if consecutive elements -- are in the same group. -> Sources lVal a -- ^ Input elements. -> IO (Sources (T2 lGrp lLen) (a, Int)) -- ^ Starting element and length of groups. groupsBy_i nGrp nLen f s = C.groupsBy_i nGrp nLen f s {-# INLINE groupsBy_i #-} -- | Dictionaries needed to perform a grouping. type GroupsDict lVal lGrp tGrp lLen tLen a = C.GroupsDict Int IO lVal lGrp tGrp lLen tLen a -- Folding -------------------------------------------------------------------- -- | Fold all the elements of each stream in a bundle, one stream after the -- other, returning an array of fold results. foldlS :: ( A.Target lDst a, A.Index lDst ~ Int , A.BulkI lSrc b) => A.Name lDst -- ^ Layout for result. -> (a -> b -> a) -- ^ Combining funtion. -> a -- ^ Starting value. -> Sources lSrc b -- ^ Input elements to fold. -> IO (A.Array lDst a) foldlS n f z ss = C.foldlS n f z ss {-# INLINE foldlS #-} -- | Fold all the elements of each stream in a bundle, one stream after the -- other, returning an array of fold results. foldlAllS :: A.BulkI lSrc b => (a -> b -> a) -- ^ Combining funtion. -> a -- ^ Starting value. -> Sources lSrc b -- ^ Input elements to fold. -> IO a foldlAllS f z ss = C.foldlAllS f z ss {-# INLINE foldlAllS #-} -- | Given streams of lengths and values, perform a segmented fold where -- fold segments of values of the corresponding lengths are folded -- together. -- -- @ -- > import Data.Repa.Flow -- > sSegs <- fromList U 1 [(\'a\', 1), (\'b\', 2), (\'c\', 4), (\'d\', 0), (\'e\', 1), (\'f\', 5 :: Int)] -- > sVals <- fromList U 1 [10, 20, 30, 40, 50, 60, 70, 80, 90 :: Int] -- > toList1 0 =<< folds_i U U (+) 0 sSegs sVals -- Just [(\'a\',10),(\'b\',50),(\'c\',220),(\'d\',0),(\'e\',80)] -- @ -- -- If not enough input elements are available to fold a complete segment -- then no output is produced for that segment. However, trailing zero -- length segments still produce the initial value for the fold. -- -- @ -- > import Data.Repa.Flow -- > sSegs <- fromList U 1 [(\'a\', 1), (\'b\', 2), (\'c\', 0), (\'d\', 0), (\'e\', 0 :: Int)] -- > sVals <- fromList U 1 [10, 20, 30 :: Int] -- > toList1 0 =<< folds_i U U (*) 1 sSegs sVals -- Just [(\'a\',10),(\'b\',600),(\'c\',1),(\'d\',1),(\'e\',1)] -- @ -- folds_i :: (FoldsDict lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b) => Name lGrp -- ^ Layout for group names. -> Name lRes -- ^ Layout for fold results. -> (a -> b -> b) -- ^ Worker function. -> b -- ^ Initial state when folding each segment. -> Sources lSeg (n, Int) -- ^ Segment lengths. -> Sources lElt a -- ^ Input elements to fold. -> IO (Sources (T2 lGrp lRes) (n, b)) -- ^ Result elements. folds_i nGrp nRes f z sLen sVal = C.folds_i nGrp nRes f z sLen sVal {-# INLINE folds_i #-} -- | Dictionaries needed to perform a segmented fold. type FoldsDict lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b = C.FoldsDict Int IO lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b -- | Combination of `groupsBy_i` and `folds_i`. We determine the the segment -- lengths while performing the folds. -- -- Note that a SQL-like groupby aggregations can be performed using this -- function, provided the data is pre-sorted on the group key. For example, -- we can take the average of some groups of values: -- -- @ -- > import Data.Repa.Flow -- > sKeys <- fromList U 1 "waaaabllle" -- > sVals <- fromList U 1 [10, 20, 30, 40, 50, 60, 70, 80, 90, 100 :: Double] -- -- > sResult \<- map_i U (\\(key, (acc, n)) -\> (key, acc / n)) -- =\<\< foldGroupsBy_i U U (==) (\\x (acc, n) -> (acc + x, n + 1)) (0, 0) sKeys sVals -- -- > toList1 0 sResult -- Just [10.0,35.0,60.0,80.0,100.0] -- @ -- foldGroupsBy_i :: ( FoldGroupsDict lSeg tSeg lVal tVal lGrp tGrp lRes tRes n a b) => Name lGrp -- ^ Layout for group names. -> Name lRes -- ^ Layout for fold results. -> (n -> n -> Bool) -- ^ Fn to check if consecutive elements -- are in the same group. -> (a -> b -> b) -- ^ Worker function for the fold. -> b -- ^ Initial when folding each segment. -> Sources lSeg n -- ^ Names that determine groups. -> Sources lVal a -- ^ Values to fold. -> IO (Sources (T2 lGrp lRes) (n, b)) foldGroupsBy_i nGrp nRes pGroup f z sNames sVals = do segLens <- groupsBy_i nGrp U pGroup sNames folds_i nGrp nRes f z segLens sVals {-# INLINE foldGroupsBy_i #-} type FoldGroupsDict lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b = ( A.BulkI lSeg n , A.Material lElt a, A.Index lElt ~ Int , A.Material lGrp n, A.Index lGrp ~ Int , A.Material lRes b, A.Index lRes ~ Int , Unpack (IOBuffer lGrp n) tGrp , Unpack (IOBuffer lRes b) tRes)