Safe Haskell | None |
---|---|
Language | Haskell98 |
Everything flows.
This module defines generic flows. The other flow types defined in Data.Repa.Flow.Chunked and Data.Repa.Flow.Simple are specialisations of this generic one.
- data Sources i m e = Sources {
- sourcesArity :: i
- sourcesPull :: i -> (e -> m ()) -> m () -> m ()
- data Sinks i m e = Sinks {
- sinksArity :: i
- sinksPush :: i -> e -> m ()
- sinksEject :: i -> m ()
- module Data.Repa.Flow.States
- drainS :: (Next i, Monad m) => Sources i m a -> Sinks i m a -> m ()
- drainP :: Sources Int IO a -> Sinks Int IO a -> IO ()
- consumeS :: (Next i, Monad m) => (i -> a -> m ()) -> Sources i m a -> m ()
- fromList :: States i m => i -> [a] -> m (Sources i m a)
- toList1 :: States i m => i -> Sources i m a -> m [a]
- takeList1 :: States i m => Int -> i -> Sources i m a -> m [a]
- pushList :: Monad m => [(i, a)] -> Sinks i m a -> m ()
- pushList1 :: Monad m => i -> [a] -> Sinks i m a -> m ()
- mapIndex_i :: Monad m => (i1 -> i2) -> (i2 -> i1) -> Sources i1 m a -> m (Sources i2 m a)
- mapIndex_o :: Monad m => (i1 -> i2) -> (i2 -> i1) -> Sinks i1 m a -> m (Sinks i2 m a)
- flipIndex2_i :: Monad m => Sources SH2 m a -> m (Sources SH2 m a)
- flipIndex2_o :: Monad m => Sinks SH2 m a -> m (Sinks SH2 m a)
- finalize_i :: States i m => (i -> m ()) -> Sources i m a -> m (Sources i m a)
- finalize_o :: States i m => (i -> m ()) -> Sinks i m a -> m (Sinks i m a)
- project_i :: Monad m => i -> Sources i m a -> m (Sources () m a)
- project_o :: Monad m => i -> Sinks i m a -> m (Sinks () m a)
- repeat_i :: Monad m => i -> (i -> a) -> m (Sources i m a)
- replicate_i :: States i m => i -> Int -> (i -> a) -> m (Sources i m a)
- prepend_i :: States i m => [a] -> Sources i m a -> m (Sources i m a)
- prependOn_i :: States i m => (i -> Bool) -> [a] -> Sources i m a -> m (Sources i m a)
- map_i :: Monad m => (a -> b) -> Sources i m a -> m (Sources i m b)
- map_o :: Monad m => (a -> b) -> Sinks i m b -> m (Sinks i m a)
- smap_i :: Monad m => (i -> a -> b) -> Sources i m a -> m (Sources i m b)
- smap_o :: Monad m => (i -> a -> b) -> Sinks i m b -> m (Sinks i m a)
- szipWith_ii :: (Ord i, Monad m) => (i -> a -> b -> c) -> Sources i m a -> Sources i m b -> m (Sources i m c)
- szipWith_io :: (Ord i, Monad m) => (i -> a -> b -> c) -> Sinks i m c -> Sources i m a -> m (Sinks i m b)
- szipWith_oi :: (Ord i, Monad m) => (i -> a -> b -> c) -> Sinks i m c -> Sources i m b -> m (Sinks i m a)
- compact_i :: States i m => (s -> a -> (s, Maybe b)) -> s -> Sources i m a -> m (Sources i m b)
- scan_i :: States i m => (s -> a -> s) -> s -> Sources i m a -> m (Sources i m s)
- indexed_i :: States i m => Sources i m a -> m (Sources i m (Int, a))
- dup_oo :: States i m => Sinks i m a -> Sinks i m a -> m (Sinks i m a)
- dup_io :: (Ord i, Monad m) => Sources i m a -> Sinks i m a -> m (Sources i m a)
- dup_oi :: (Ord i, Monad m) => Sinks i m a -> Sources i m a -> m (Sources i m a)
- connect_i :: States i m => Sources i m a -> m (Sources i m a, Sources i m a)
- funnel_i :: (States i m, States () m) => Sources i m a -> m (Sources () m a)
- funnel_o :: States i m => i -> Sinks () m a -> m (Sinks i m a)
- head_i :: States i m => Int -> Sources i m a -> i -> m ([a], Sources i m a)
- groups_i :: (Monad m, Eq a) => Sources i m a -> m (Sources i m Int)
- pack_ii :: (Ord i, Monad m) => Sources i m Bool -> Sources i m a -> m (Sources i m a)
- folds_ii :: (Ord i, Monad m) => (a -> a -> a) -> a -> Sources i m Int -> Sources i m a -> m (Sources i m a)
- watch_i :: Monad m => (i -> a -> m ()) -> Sources i m a -> m (Sources i m a)
- watch_o :: Monad m => (i -> a -> m ()) -> Sinks i m a -> m (Sinks i m a)
- trigger_o :: Monad m => i -> (i -> a -> m ()) -> m (Sinks i m a)
- capture_o :: (Target lDst (i, a), Index lDst ~ Int) => Name lDst -> i -> (Sinks i IO a -> IO ()) -> IO (Array lDst (i, a))
- rcapture_o :: (Target lDst (i, a), Index lDst ~ Int) => Name lDst -> i -> (Sinks i IO a -> IO b) -> IO (Array lDst (i, a), b)
- ignore_o :: Monad m => i -> m (Sinks i m a)
- abandon_o :: Monad m => i -> m (Sinks i m a)
- trace_o :: (Show i, Show a, Monad m) => i -> m (Sinks i m a)
- distribute_o :: BulkI l a => (Int -> a -> IO ()) -> Sinks Int IO a -> IO (Sinks () IO (Array l a))
- ddistribute_o :: BulkI l a => Sinks Int IO a -> IO (Sinks () IO (Array l a))
- distribute2_o :: BulkI l a => (SH2 -> a -> IO ()) -> Sinks SH2 IO a -> IO (Sinks Int IO (Array l a))
- ddistribute2_o :: BulkI l a => Sinks SH2 IO a -> IO (Sinks Int IO (Array l a))
- shuffle_o :: (BulkI lDst a, BulkI lSrc (Int, a), Windowable lDst a, Target lDst a, Elt a) => Name lSrc -> (Int -> Array lDst a -> IO ()) -> Sinks Int IO (Array lDst a) -> IO (Sinks () IO (Array lSrc (Int, a)))
- dshuffle_o :: (BulkI lDst a, BulkI lSrc (Int, a), Windowable lDst a, Target lDst a, Elt a) => Name lSrc -> Sinks Int IO (Array lDst a) -> IO (Sinks () IO (Array lSrc (Int, a)))
- dshuffleBy_o :: (BulkI lDst a, BulkI lSrc a, Windowable lDst a, Target lDst a, Elt a) => Name lSrc -> (a -> Int) -> Sinks Int IO (Array lDst a) -> IO (Sinks () IO (Array lSrc a))
- chunkOn_i :: (States i IO, TargetI lDst a) => Name lDst -> Int -> (a -> Bool) -> Sources i IO a -> IO (Sources i IO (Array lDst a))
- unchunk_i :: (BulkI l a, States i IO) => Sources i IO (Array l a) -> IO (Sources i IO a)
Documentation
A bundle of stream sources, indexed by a value of type i
,
in some monad m
, returning elements of type e
.
Elements can be pulled from each stream in the bundle individually.
Sources | |
|
A bundle of stream sinks, indexed by a value of type i
,
in some monad m
, returning elements of type e
.
Elements can be pushed to each stream in the bundle individually.
Sinks | |
|
Stream State and Thread Safety
As most functions in this library produce IO
actions, thread safety is not
guaranteed by their types.
It is not safe to concurrently pull from the same stream of a Sources
bundle, or concurrently push to the same stream of a Sinks
bundle.
Both Sources
and Sinks
may hold per-stream state information, and
accessing the same stream concurrently may cause a race condition.
It is safe to concurrently push or pull from different streams of a bundle, as the state information for each stream is guaranteed to be separate. Any inter-stream communication is protected by appropriate locks.
Unless stated otherwise, any worker function passed to a flow operator may
be invoked concurrently. For example, if you pass an IO
action to
trigger_o
then that action may be invoked concurrently.
In practice, if you use just the bulk operators provided by this library
then you won't have a problem. However, if you construct your own
Sources
or Sinks
by providing raw push
, pull
and eject
functions
then you must obey the above rules.
module Data.Repa.Flow.States
Evaluation
consumeS :: (Next i, Monad m) => (i -> a -> m ()) -> Sources i m a -> m () Source #
Pull all available values from the sources and pass them to the given action.
Conversion
fromList :: States i m => i -> [a] -> m (Sources i m a) Source #
Given an arity and a list of elements, yield sources that each produce all the elements.
takeList1 :: States i m => Int -> i -> Sources i m a -> m [a] Source #
Drain the given number of elements from a single source into a list.
pushList :: Monad m => [(i, a)] -> Sinks i m a -> m () Source #
Push elements into the associated streams of a bundle of sinks.
pushList1 :: Monad m => i -> [a] -> Sinks i m a -> m () Source #
Push the elements of a list into the given stream of a bundle of sinks.
Stream Indices
mapIndex_i :: Monad m => (i1 -> i2) -> (i2 -> i1) -> Sources i1 m a -> m (Sources i2 m a) Source #
Transform the stream indexes of a bundle of sources.
The given transform functions should be inverses of each other, else you'll get a confusing result.
mapIndex_o :: Monad m => (i1 -> i2) -> (i2 -> i1) -> Sinks i1 m a -> m (Sinks i2 m a) Source #
Transform the stream indexes of a bundle of sinks.
The given transform functions should be inverses of each other, else you'll get a confusing result.
flipIndex2_i :: Monad m => Sources SH2 m a -> m (Sources SH2 m a) Source #
For a bundle of sources with a 2-d stream index, flip the components of the index.
flipIndex2_o :: Monad m => Sinks SH2 m a -> m (Sinks SH2 m a) Source #
For a bundle of sinks with a 2-d stream index, flip the components of the index.
Finalizers
finalize_i :: States i m => (i -> m ()) -> Sources i m a -> m (Sources i m a) Source #
Attach a finalizer to bundle of sources.
For each stream in the bundle, the finalizer will be called the first time a consumer of that stream tries to pull an element when no more are available.
The provided finalizer will be run after any finalizers already attached to the source.
finalize_o :: States i m => (i -> m ()) -> Sinks i m a -> m (Sinks i m a) Source #
Attach a finalizer to a bundle of sinks.
For each stream in the bundle, the finalizer will be called the first time that stream is ejected.
The provided finalizer will be run after any finalizers already attached to the sink.
Flow Operators
Projection
project_i :: Monad m => i -> Sources i m a -> m (Sources () m a) Source #
Project out a single stream source from a bundle.
project_o :: Monad m => i -> Sinks i m a -> m (Sinks () m a) Source #
Project out a single stream sink from a bundle.
Constructors
repeat_i :: Monad m => i -> (i -> a) -> m (Sources i m a) Source #
Yield sources that always produce the same value.
replicate_i :: States i m => i -> Int -> (i -> a) -> m (Sources i m a) Source #
Yield sources of the given length that always produce the same value.
prepend_i :: States i m => [a] -> Sources i m a -> m (Sources i m a) Source #
Prepend some more elements into the front of some sources.
prependOn_i :: States i m => (i -> Bool) -> [a] -> Sources i m a -> m (Sources i m a) Source #
Like prepend_i
but only prepend the elements to the streams
that match the given predicate.
Mapping
map_i :: Monad m => (a -> b) -> Sources i m a -> m (Sources i m b) Source #
Apply a function to every element pulled from some sources, producing some new sources.
map_o :: Monad m => (a -> b) -> Sinks i m b -> m (Sinks i m a) Source #
Apply a function to every element pulled from some sources, producing some new sources.
smap_i :: Monad m => (i -> a -> b) -> Sources i m a -> m (Sources i m b) Source #
Like map_i
, but the worker function is also given the stream index.
smap_o :: Monad m => (i -> a -> b) -> Sinks i m b -> m (Sinks i m a) Source #
Like map_o
, but the worker function is also given the stream index.
szipWith_ii :: (Ord i, Monad m) => (i -> a -> b -> c) -> Sources i m a -> Sources i m b -> m (Sources i m c) Source #
Combine the elements of two flows with the given function. The worker function is also given the stream index.
szipWith_io :: (Ord i, Monad m) => (i -> a -> b -> c) -> Sinks i m c -> Sources i m a -> m (Sinks i m b) Source #
Like szipWith_ii
, but take a bundle of Sinks
for the result
elements, and yield a bundle of Sinks
to accept the b
elements.
szipWith_oi :: (Ord i, Monad m) => (i -> a -> b -> c) -> Sinks i m c -> Sources i m b -> m (Sinks i m a) Source #
Like szipWith_ii
, but take a bundle of Sinks
for the result
elements, and yield a bundle of Sinks
to accept the a
elements.
Processing
compact_i :: States i m => (s -> a -> (s, Maybe b)) -> s -> Sources i m a -> m (Sources i m b) Source #
Combination of fold
and filter
.
We walk over the stream start-to-end, maintaining an accumulator. At each point we can chose to emit an element, or not.
scan_i :: States i m => (s -> a -> s) -> s -> Sources i m a -> m (Sources i m s) Source #
Start-to-end scan over each stream in a bundle.
indexed_i :: States i m => Sources i m a -> m (Sources i m (Int, a)) Source #
For each stream in a bundle of sources, associated the element with their corresponding position in the stream.
Connecting
dup_oo :: States i m => Sinks i m a -> Sinks i m a -> m (Sinks i m a) Source #
Send the same data to two consumers.
Given two argument sinks, yield a result sink. Pushing to the result sink causes the same element to be pushed to both argument sinks.
dup_io :: (Ord i, Monad m) => Sources i m a -> Sinks i m a -> m (Sources i m a) Source #
Send the same data to two consumers.
Given an argument source and argument sink, yield a result source. Pulling an element from the result source pulls from the argument source, and pushes that element to the sink, as well as returning it via the result source.
dup_oi :: (Ord i, Monad m) => Sinks i m a -> Sources i m a -> m (Sources i m a) Source #
Send the same data to two consumers.
Like dup_io
but with the arguments flipped.
connect_i :: States i m => Sources i m a -> m (Sources i m a, Sources i m a) Source #
Connect an argument source to two result sources.
Pulling from either result source pulls from the argument source. Each result source only gets the elements pulled at the time, so if one side pulls all the elements the other side won't get any.
funnel_i :: (States i m, States () m) => Sources i m a -> m (Sources () m a) Source #
Given a bundle of sources containing several streams, produce a new bundle containing a single stream that gets data from the former.
Streams from the source are consumed in their natural order, and a complete stream is consumed before moving onto the next one.
> import Data.Repa.Flow.Generic > toList1 () =<< funnel_i =<< fromList (3 :: Int) [11, 22, 33] [11,22,33,11,22,33,11,22,33]
funnel_o :: States i m => i -> Sinks () m a -> m (Sinks i m a) Source #
Given a bundle of sinks consisting of a single stream, produce a new bundle of the given arity that sends all data to the former, ignoring the stream index.
The argument stream is ejected only when all of the streams in the result bundle have been ejected.
- Using this function in conjunction with parallel operators like
drainP
introduces non-determinism. Elements pushed to different streams in the result bundle could enter the single stream in the argument bundle in any order.
> import Data.Repa.Flow.Generic > import Data.Repa.Array.Material > import Data.Repa.Nice > let things = [(0 :: Int, "foo"), (1, "bar"), (2, "baz")] > result <- capture_o B () (\k -> funnel_o 4 k >>= pushList things) > nice result [((),"foo"),((),"bar"),((),"baz")]
Splitting
head_i :: States i m => Int -> Sources i m a -> i -> m ([a], Sources i m a) Source #
Split the given number of elements from the head of a source returning those elements in a list, and yielding a new source for the rest.
Grouping
groups_i :: (Monad m, Eq a) => Sources i m a -> m (Sources i m Int) Source #
From a stream of values which has consecutive runs of idential values, produce a stream of the lengths of these runs.
Example: groups [4, 4, 4, 3, 3, 1, 1, 1, 4] = [3, 2, 3, 1]
Packing
pack_ii :: (Ord i, Monad m) => Sources i m Bool -> Sources i m a -> m (Sources i m a) Source #
Given a stream of flags and a stream of values, produce a new stream of values where the corresponding flag was True. The length of the result is the length of the shorter of the two inputs.
Folding
folds_ii :: (Ord i, Monad m) => (a -> a -> a) -> a -> Sources i m Int -> Sources i m a -> m (Sources i m a) Source #
Segmented fold.
Watching
watch_i :: Monad m => (i -> a -> m ()) -> Sources i m a -> m (Sources i m a) Source #
Apply a monadic function to every element pulled from some sources, producing some new sources.
watch_o :: Monad m => (i -> a -> m ()) -> Sinks i m a -> m (Sinks i m a) Source #
Pass elements to the provided action as they are pushed into the sink.
trigger_o :: Monad m => i -> (i -> a -> m ()) -> m (Sinks i m a) Source #
Like watch_o
but doesn't pass elements to another sink.
Capturing
:: (Target lDst (i, a), Index lDst ~ Int) | |
=> Name lDst | Name of desination layout. |
-> i | Arity of result bundle. |
-> (Sinks i IO a -> IO ()) | Function to push data into the sinks. |
-> IO (Array lDst (i, a)) |
Create a bundle of sinks of the given arity and capture any data pushed to it.
> import Data.Repa.Flow.Generic > import Data.Repa.Array.Material > import Data.Repa.Nice > import Control.Monad > liftM nice $ capture_o B 4 (k -> pushList [(0 :: Int, "foo"), (1, "bar"), (0, "baz")] k) > [(0,"foo"),(1,"bar"),(0,"baz")]
:: (Target lDst (i, a), Index lDst ~ Int) | |
=> Name lDst | Name of desination layout. |
-> i | Arity of result bundle. |
-> (Sinks i IO a -> IO b) | Function to push data into the sinks. |
-> IO (Array lDst (i, a), b) |
Like capture_o
but also return the r
-esult of the push function.
Ignorance
ignore_o :: Monad m => i -> m (Sinks i m a) Source #
A sink that ignores all incoming data.
- This sink is strict in the elements, so they are demanded before being discarded.
- Haskell debugging thunks attached to the elements will be demanded.
abandon_o :: Monad m => i -> m (Sinks i m a) Source #
A sink that drops all data on the floor.
- This sink is non-strict in the elements.
- Haskell tracing thunks attached to the elements will *not* be demanded.
Tracing
trace_o :: (Show i, Show a, Monad m) => i -> m (Sinks i m a) Source #
Use the trace
function from Debug.Trace to print each element
that is pushed to a sink.
- This function is intended for debugging only, and is not intended for production code.
Vector Flow Operators
1-dimensional distribution
:: BulkI l a | |
=> (Int -> a -> IO ()) | Spill action, given the spilled element along with its index in the array. |
-> Sinks Int IO a | Sinks to push elements into. |
-> IO (Sinks () IO (Array l a)) |
Given a bundle of sinks indexed by an Int
,
produce a result sink for arrays.
Each time an array is pushed to the result sink, its elements are pushed to the corresponding streams of the argument sink. If there are more elements than sinks then then give them to the spill action.
| .. | | [w0, x0, y0, z0] | :: Sinks () IO (Array l a) | [w1, x1, y1, z1, u1] | (sink for a single stream of arrays) | .. | | | | | | v v v v .------> spilled | .. | .. | .. | .. | | w0 | x0 | y0 | z0 | :: Sinks Int IO a | w1 | x1 | y1 | z1 | (sink for several streams of elements) | .. | .. | .. | .. |
ddistribute_o :: BulkI l a => Sinks Int IO a -> IO (Sinks () IO (Array l a)) Source #
Like distribute_o
, but drop spilled elements on the floor.
2-dimensional distribution
:: BulkI l a | |
=> (SH2 -> a -> IO ()) | Spill action, given the spilled element along with its index in the array. |
-> Sinks SH2 IO a | Sinks to push elements into. |
-> IO (Sinks Int IO (Array l a)) |
Like distribute_o
, but with 2-d stream indexes.
Given the argument and result sinks, when pushing to the result the stream index is used as the first component for the argument sink, and the index of the element in its array is used as the second component.
If you want to the components of stream index the other way around
then apply flipIndex2_o
to the argument sinks.
ddistribute2_o :: BulkI l a => Sinks SH2 IO a -> IO (Sinks Int IO (Array l a)) Source #
Like distribute2_o
, but drop spilled elements on the floor.
Shuffling
:: (BulkI lDst a, BulkI lSrc (Int, a), Windowable lDst a, Target lDst a, Elt a) | |
=> Name lSrc | Name of source layout. |
-> (Int -> Array lDst a -> IO ()) | Handle spilled elements. |
-> Sinks Int IO (Array lDst a) | Sinks to push results to. |
-> IO (Sinks () IO (Array lSrc (Int, a))) |
Given a bundle of argument sinks, produce a result sink. Arrays of indices and elements are pushed to the result sink. On doing so, the elements are pushed into the corresponding streams of the argument sinks.
If the index associated with an element does not have a corresponding stream in the argument sinks, then pass it to the provided spill function.
| .. | | [(0, v0), (1, v1), (0, v2), (0, v3), (2, v4)] | :: Sources Int IO (Array l (Int, a)) | .. | \ \ | \ .------------. | v v .---------> spilled | .. | .. | | [v0, v2, v3] | [v1] | :: Sinks Int IO (Array l a) | .. | .. |
The following example uses capture_o
to demonstrate how the
shuffle_o
operator can be used as one step of a bucket-sort. We start
with two arrays of key-value pairs. In the result, the values from each
block that had the same key are packed into the same tuple (bucket).
> import Data.Repa.Flow.Generic as G > import Data.Repa.Array as A > import Data.Repa.Array.Material as A > import Data.Repa.Nice > let arr1 = A.fromList B [(0, 'a'), (1, 'b'), (2, 'c'), (0, 'd'), (0, 'c')] > let arr2 = A.fromList B [(0, 'A'), (3, 'B'), (3, 'C')] > result :: Array B (Int, Array U Char) > <- capture_o B 4 (\k -> shuffle_o B (error "spilled") k > >>= pushList1 () [arr1, arr2]) > nice result [(0,"adc"),(1,"b"),(2,"c"),(0,"A"),(3,"BC")]
:: (BulkI lDst a, BulkI lSrc (Int, a), Windowable lDst a, Target lDst a, Elt a) | |
=> Name lSrc | Name of source layout. |
-> Sinks Int IO (Array lDst a) | Sinks to push results to. |
-> IO (Sinks () IO (Array lSrc (Int, a))) |
Like shuffle_o
, but drop spilled elements on the floor.
:: (BulkI lDst a, BulkI lSrc a, Windowable lDst a, Target lDst a, Elt a) | |
=> Name lSrc | Name of source layout. |
-> (a -> Int) | Get the stream number for an element. |
-> Sinks Int IO (Array lDst a) | Sinks to push results to. |
-> IO (Sinks () IO (Array lSrc a)) |
Like dshuffle_o
, but use the given function to decide which stream of
the argument bundle each element should be pushed into.
> import Data.Repa.Flow.Generic as G > import Data.Repa.Array as A > import Data.Repa.Array.Material as A > import Data.Repa.Nice > import Data.Char > let arr1 = A.fromList B "FooBAr" > let arr2 = A.fromList B "BazLIKE" > result :: Array B (Int, Array U Char) <- capture_o B 2 (\k -> dshuffleBy_o B (\x -> if isUpper x then 0 else 1) k >>= pushList1 () [arr1, arr2]) > nice result [(0,"FBA"),(1,"oor"),(0,"BLIKE"),(1,"az")]
Chunking
:: (States i IO, TargetI lDst a) | |
=> Name lDst | Layout for result chunks. |
-> Int | Maximum chunk length. |
-> (a -> Bool) | Detect the last element in a chunk. |
-> Sources i IO a | Element sources. |
-> IO (Sources i IO (Array lDst a)) | Chunk sources. |
Take elements from a flow and pack them into chunks. The chunks are limited to the given maximum length. A predicate can also be supplied to detect the last element in a chunk.