Safe Haskell | None |
---|---|
Language | Haskell98 |
This module defines the default specialisation of flows that appears in Data.Repa.Flow. Each stream in the bundle is indexed by a single integer, and stream state is stored using the IO monad.
- type Sources l a = Sources Int IO l a
- type Sinks l a = Sinks Int IO l a
- type Flow l a = Flow Int IO l a
- sourcesArity :: Sources l a -> Int
- sinksArity :: Sinks l a -> Int
- module Data.Repa.Flow.States
- module Data.Repa.Eval.Array
- module Data.Repa.Array
- module Data.Repa.Array.Material
- drainS :: Sources l a -> Sinks l a -> IO ()
- drainP :: Sources l a -> Sinks l a -> IO ()
- fromList :: TargetI l a => Name l -> Int -> [a] -> IO (Sources l a)
- fromLists :: TargetI l a => Name l -> Int -> [[a]] -> IO (Sources l a)
- toList1 :: BulkI l a => Int -> Sources l a -> IO [a]
- toLists1 :: BulkI l a => Int -> Sources l a -> IO [[a]]
- finalize_i :: (Int -> IO ()) -> Sources l a -> IO (Sources l a)
- finalize_o :: (Int -> IO ()) -> Sinks l a -> IO (Sinks l a)
- map_i :: (Flow l1 a, TargetI l2 b) => Name l2 -> (a -> b) -> Sources l1 a -> IO (Sources l2 b)
- map_o :: (Flow l1 a, TargetI l2 b) => Name l1 -> (a -> b) -> Sinks l2 b -> IO (Sinks l1 a)
- dup_oo :: Sinks l a -> Sinks l a -> IO (Sinks l a)
- dup_io :: Sources l a -> Sinks l a -> IO (Sources l a)
- dup_oi :: Sinks l a -> Sources l a -> IO (Sources l a)
- connect_i :: Sources l a -> IO (Sources l a, Sources l a)
- watch_i :: (Int -> Array l a -> IO ()) -> Sources l a -> IO (Sources l a)
- watch_o :: (Int -> Array l a -> IO ()) -> Sinks l a -> IO (Sinks l a)
- trigger_o :: Int -> (Int -> Array l a -> IO ()) -> IO (Sinks l a)
- discard_o :: Int -> IO (Sinks l a)
- ignore_o :: Int -> IO (Sinks l a)
- head_i :: (Windowable l a, Index l ~ Int) => Int -> Int -> Sources l a -> IO (Maybe ([a], Sources l a))
- groups_i :: (GroupsDict lVal lGrp tGrp lLen tLen a, Eq a) => Name lGrp -> Name lLen -> Sources lVal a -> IO (Sources (T2 lGrp lLen) (a, Int))
- groupsBy_i :: GroupsDict lVal lGrp tGrp lLen tLen a => Name lGrp -> Name lLen -> (a -> a -> Bool) -> Sources lVal a -> IO (Sources (T2 lGrp lLen) (a, Int))
- type GroupsDict lVal lGrp tGrp lLen tLen a = GroupsDict Int IO lVal lGrp tGrp lLen tLen a
- foldlS :: (Target lDst a, Index lDst ~ Int, BulkI lSrc b) => Name lDst -> (a -> b -> a) -> a -> Sources lSrc b -> IO (Array lDst a)
- foldlAllS :: BulkI lSrc b => (a -> b -> a) -> a -> Sources lSrc b -> IO a
- folds_i :: FoldsDict lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b => Name lGrp -> Name lRes -> (a -> b -> b) -> b -> Sources lSeg (n, Int) -> Sources lElt a -> IO (Sources (T2 lGrp lRes) (n, b))
- type FoldsDict lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b = FoldsDict Int IO lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b
- foldGroupsBy_i :: FoldGroupsDict lSeg tSeg lVal tVal lGrp tGrp lRes tRes n a b => Name lGrp -> Name lRes -> (n -> n -> Bool) -> (a -> b -> b) -> b -> Sources lSeg n -> Sources lVal a -> IO (Sources (T2 lGrp lRes) (n, b))
- type FoldGroupsDict lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b = (BulkI lSeg n, Material lElt a, Index lElt ~ Int, Material lGrp n, Index lGrp ~ Int, Material lRes b, Index lRes ~ Int, Unpack (IOBuffer lGrp n) tGrp, Unpack (IOBuffer lRes b) tRes)
Flow types
type Sources l a = Sources Int IO l a Source
A bundle of stream sources, where the elements of the stream are chunked into arrays.
The chunks have some Layout
l
and contain elements of type a
.
See Data.Repa.Array for the available layouts.
type Sinks l a = Sinks Int IO l a Source
A bundle of stream sinks, where the elements of the stream are chunked into arrays.
sourcesArity :: Sources l a -> Int Source
Yield the number of streams in the bundle.
sinksArity :: Sinks l a -> Int Source
Yield the number of streams in the bundle.
States and Arrays
module Data.Repa.Flow.States
module Data.Repa.Eval.Array
module Data.Repa.Array
module Data.Repa.Array.Material
Evaluation
Conversion
fromList :: TargetI l a => Name l -> Int -> [a] -> IO (Sources l a) Source
Given an arity and a list of elements, yield sources that each produce all the elements.
- All elements are stuffed into a single chunk, and each stream is given the same chunk.
fromLists :: TargetI l a => Name l -> Int -> [[a]] -> IO (Sources l a) Source
Like fromLists_i
but take a list of lists. Each each of the inner
lists is packed into a single chunk.
toList1 :: BulkI l a => Int -> Sources l a -> IO [a] Source
Drain a single source from a bundle into a list of elements.
toLists1 :: BulkI l a => Int -> Sources l a -> IO [[a]] Source
Drain a single source from a bundle into a list of chunks.
Finalizers
finalize_i :: (Int -> IO ()) -> Sources l a -> IO (Sources l a) Source
Attach a finalizer to some sources.
- For a given source, the finalizer will be called the first time a consumer of that source tries to pull an element when no more are available.
- The finalizer is given the index of the source that ended.
- The finalizer will be run after any finalizers already attached to the source.
finalize_o :: (Int -> IO ()) -> Sinks l a -> IO (Sinks l a) Source
Attach a finalizer to some sinks.
- For a given sink, the finalizer will be called the first time that sink is ejected.
- The finalizer is given the index of the sink that was ejected.
- The finalizer will be run after any finalizers already attached to the sink.
Flow Operators
Mapping
If you want to work on a chunk at a time then use
map_i
and
map_o
from Data.Repa.Flow.Generic.
map_i :: (Flow l1 a, TargetI l2 b) => Name l2 -> (a -> b) -> Sources l1 a -> IO (Sources l2 b) Source
Apply a function to all elements pulled from some sources.
map_o :: (Flow l1 a, TargetI l2 b) => Name l1 -> (a -> b) -> Sinks l2 b -> IO (Sinks l1 a) Source
Apply a function to all elements pushed to some sinks.
Connecting
dup_oo :: Sinks l a -> Sinks l a -> IO (Sinks l a) Source
Send the same data to two consumers.
Given two argument sinks, yield a result sink. Pushing to the result sink causes the same element to be pushed to both argument sinks.
dup_io :: Sources l a -> Sinks l a -> IO (Sources l a) Source
Send the same data to two consumers.
Given an argument source and argument sink, yield a result source. Pulling an element from the result source pulls from the argument source, and pushes that element to the sink, as well as returning it via the result source.
dup_oi :: Sinks l a -> Sources l a -> IO (Sources l a) Source
Send the same data to two consumers.
Like dup_io
but with the arguments flipped.
connect_i :: Sources l a -> IO (Sources l a, Sources l a) Source
Connect an argument source to two result sources.
Pulling from either result source pulls from the argument source. Each result source only gets the elements pulled at the time, so if one side pulls all the elements the other side won't get any.
Watching
watch_i :: (Int -> Array l a -> IO ()) -> Sources l a -> IO (Sources l a) Source
Hook a worker function to some sources, which will be passed every chunk that is pulled from each source.
- The worker is also passed the source index of the chunk that was pulled.
watch_o :: (Int -> Array l a -> IO ()) -> Sinks l a -> IO (Sinks l a) Source
Hook a worker function to some sinks, which will be passed every chunk that is pushed to each sink.
- The worker is also passed the source index of the chunk that was pushed.
trigger_o :: Int -> (Int -> Array l a -> IO ()) -> IO (Sinks l a) Source
Create a bundle of sinks of the given arity that pass incoming chunks to a worker function.
- This is like
watch_o
, except that the incoming chunks are discarded after they are passed to the worker function
Ignorance
discard_o :: Int -> IO (Sinks l a) Source
Create a bundle of sinks of the given arity that drop all data on the floor.
- The sinks is strict in the *chunks*, so they are demanded before being discarded.
- Haskell debugging thunks attached to the chunks will be demanded, but thunks attached to elements may not be -- depending on whether the chunk representation is strict in the elements.
ignore_o :: Int -> IO (Sinks l a) Source
Create a bundle of sinks of the given arity that drop all data on the floor.
- As opposed to
discard_o
the sinks are non-strict in the chunks. - Haskell debugging thunks attached to the chunks will *not* be demanded.
Splitting
head_i :: (Windowable l a, Index l ~ Int) => Int -> Int -> Sources l a -> IO (Maybe ([a], Sources l a)) Source
Given a source index and a length, split the a list of that length from the front of the source. Yields a new source for the remaining elements.
- We pull whole chunks from the source stream until we have
at least the desired number of elements. The leftover elements
in the final chunk are visible in the result
Sources
.
Grouping
:: (GroupsDict lVal lGrp tGrp lLen tLen a, Eq a) | |
=> Name lGrp | Layout of result groups. |
-> Name lLen | Layout of result lengths. |
-> Sources lVal a | Input elements. |
-> IO (Sources (T2 lGrp lLen) (a, Int)) | Starting element and length of groups. |
Scan through some sources to find runs of matching elements, and count the lengths of those runs.
> import Data.Repa.Flow > toList1 0 =<< groups_i U U =<< fromList U 1 "waabbbblle" Just [('w',1),('a',2),('b',4),('l',2),('e',1)]
:: GroupsDict lVal lGrp tGrp lLen tLen a | |
=> Name lGrp | Layout of result groups. |
-> Name lLen | Layout of result lengths. |
-> (a -> a -> Bool) | Fn to check if consecutive elements are in the same group. |
-> Sources lVal a | Input elements. |
-> IO (Sources (T2 lGrp lLen) (a, Int)) | Starting element and length of groups. |
Like groupsBy
, but take a function to determine whether two consecutive
values should be in the same group.
type GroupsDict lVal lGrp tGrp lLen tLen a = GroupsDict Int IO lVal lGrp tGrp lLen tLen a Source
Dictionaries needed to perform a grouping.
Folding
:: (Target lDst a, Index lDst ~ Int, BulkI lSrc b) | |
=> Name lDst | Layout for result. |
-> (a -> b -> a) | Combining funtion. |
-> a | Starting value. |
-> Sources lSrc b | Input elements to fold. |
-> IO (Array lDst a) |
Fold all the elements of each stream in a bundle, one stream after the other, returning an array of fold results.
:: BulkI lSrc b | |
=> (a -> b -> a) | Combining funtion. |
-> a | Starting value. |
-> Sources lSrc b | Input elements to fold. |
-> IO a |
Fold all the elements of each stream in a bundle, one stream after the other, returning an array of fold results.
:: FoldsDict lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b | |
=> Name lGrp | Layout for group names. |
-> Name lRes | Layout for fold results. |
-> (a -> b -> b) | Worker function. |
-> b | Initial state when folding each segment. |
-> Sources lSeg (n, Int) | Segment lengths. |
-> Sources lElt a | Input elements to fold. |
-> IO (Sources (T2 lGrp lRes) (n, b)) | Result elements. |
Given streams of lengths and values, perform a segmented fold where fold segments of values of the corresponding lengths are folded together.
> import Data.Repa.Flow > sSegs <- fromList U 1 [('a', 1), ('b', 2), ('c', 4), ('d', 0), ('e', 1), ('f', 5 :: Int)] > sVals <- fromList U 1 [10, 20, 30, 40, 50, 60, 70, 80, 90 :: Int] > toList1 0 =<< folds_i U U (+) 0 sSegs sVals Just [('a',10),('b',50),('c',220),('d',0),('e',80)]
If not enough input elements are available to fold a complete segment then no output is produced for that segment. However, trailing zero length segments still produce the initial value for the fold.
> import Data.Repa.Flow > sSegs <- fromList U 1 [('a', 1), ('b', 2), ('c', 0), ('d', 0), ('e', 0 :: Int)] > sVals <- fromList U 1 [10, 20, 30 :: Int] > toList1 0 =<< folds_i U U (*) 1 sSegs sVals Just [('a',10),('b',600),('c',1),('d',1),('e',1)]
type FoldsDict lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b = FoldsDict Int IO lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b Source
Dictionaries needed to perform a segmented fold.
:: FoldGroupsDict lSeg tSeg lVal tVal lGrp tGrp lRes tRes n a b | |
=> Name lGrp | Layout for group names. |
-> Name lRes | Layout for fold results. |
-> (n -> n -> Bool) | Fn to check if consecutive elements are in the same group. |
-> (a -> b -> b) | Worker function for the fold. |
-> b | Initial when folding each segment. |
-> Sources lSeg n | Names that determine groups. |
-> Sources lVal a | Values to fold. |
-> IO (Sources (T2 lGrp lRes) (n, b)) |
Combination of groupsBy_i
and folds_i
. We determine the the segment
lengths while performing the folds.
Note that a SQL-like groupby aggregations can be performed using this function, provided the data is pre-sorted on the group key. For example, we can take the average of some groups of values:
> import Data.Repa.Flow > sKeys <- fromList U 1 "waaaabllle" > sVals <- fromList U 1 [10, 20, 30, 40, 50, 60, 70, 80, 90, 100 :: Double] > sResult <- map_i U (\(key, (acc, n)) -> (key, acc / n)) =<< foldGroupsBy_i U U (==) (\x (acc, n) -> (acc + x, n + 1)) (0, 0) sKeys sVals > toList1 0 sResult Just [10.0,35.0,60.0,80.0,100.0]