repa-flow-4.2.3.1: Data-parallel data flows.

Safe HaskellNone
LanguageHaskell98

Data.Repa.Flow.Auto

Contents

Description

This module defines the default specialisation of flows that appears in Data.Repa.Flow. Each stream in the bundle is indexed by a single integer, and stream state is stored using the IO monad.

Synopsis

Flow types

type Sources a = Sources Int IO A a Source #

A bundle of stream sources, where the elements of the stream are chunked into arrays.

type Sinks a = Sinks Int IO A a Source #

A bundle of stream sinks, where the elements of the stream are chunked into arrays.

type Flow a = (Flow Int IO A a, Windowable A a) Source #

Shorthand for common type classes.

sourcesArity :: Sources a -> Int Source #

Yield the number of streams in the bundle.

sinksArity :: Sinks a -> Int Source #

Yield the number of streams in the bundle.

Conversion

List conversion

fromList :: Build a => Int -> [a] -> IO (Sources a) Source #

Given an arity and a list of elements, yield sources that each produce all the elements.

  • All elements are stuffed into a single chunk, and each stream is given the same chunk.

fromLists :: Build a => Int -> [[a]] -> IO (Sources a) Source #

Like fromList but take a list of lists. Each each of the inner lists is packed into a single chunk.

toList1 :: Build a => Int -> Sources a -> IO [a] Source #

Drain a single source from a bundle into a list of elements.

  • If the index does not specify a valid stream then the result will be empty.

toLists1 :: Build a => Int -> Sources a -> IO [[a]] Source #

Drain a single source from a bundle into a list of chunks.

  • If the index does not specify a valid stream then the result will be empty.

Array conversion

fromArray :: Int -> Array a -> IO (Sources a) Source #

Given an arity and an array of elements, yield sources that each produce all the elements.

  • All elements are stuffed into a single chunk, and each stream is given the same chunk.

fromArrays :: Elem a => Int -> Array (Array a) -> IO (Sources a) Source #

Like fromArray but take an array of arrays. Each of the inner arrays is packed into a single chunk.

toArray1 :: (Elem a, Build a) => Int -> Sources a -> IO (Array a) Source #

Drain a single source from a bundle into an array of elements.

  • If the index does not specify a valid stream then the result will be empty.

toArrays1 :: (Elem a, Build a) => Int -> Sources a -> IO (Array (Array a)) Source #

Drain a single source from a bundle into an array of elements.

  • If the index does not specify a valid stream then the result will be empty.

Evaluation

drainS :: Sources a -> Sinks a -> IO () Source #

Pull all available values from the sources and push them to the sinks. Streams in the bundle are processed sequentially, from first to last.

  • If the Sources and Sinks have different numbers of streams then we only evaluate the common subset.

drainP :: Sources a -> Sinks a -> IO () Source #

Pull all available values from the sources and push them to the sinks, in parallel. We fork a thread for each of the streams and evaluate them all in parallel.

  • If the Sources and Sinks have different numbers of streams then we only evaluate the common subset.

consumeS :: Bulk A a => Sources a -> (Int -> a -> IO ()) -> IO () Source #

Pull all available values from the sources and pass them to the given action.

Flow Operators

Replicating

replicates_i Source #

Arguments

:: (Flow (Int, a), Build a) 
=> Sources (Int, a)

Source of segment lengths and values.

-> IO (Sources a) 

Segmented replicate.

Mapping

If you want to work on a chunk at a time then use map_i and map_o from Data.Repa.Flow.Generic.

map_i :: (Flow a, Build b) => (a -> b) -> Sources a -> IO (Sources b) Source #

Apply a function to all elements pulled from some sources.

map_o :: (Flow a, Build b) => (a -> b) -> Sinks b -> IO (Sinks a) Source #

Apply a function to all elements pushed to some sinks.

zipWith_i :: (Flow a, Flow b, Build c) => (a -> b -> c) -> Sources a -> Sources b -> IO (Sources c) Source #

Combine corresponding elements of two sources with the given function.

Processing

process_i Source #

Arguments

:: (Flow a, Flow b, Build b) 
=> (s -> a -> (s, Array b))

Worker function.

-> s

Initial state.

-> Sources a

Input sources.

-> IO (Sources b) 

Apply a generic stream process to a bundle of sources.

Higher arity zipWith functions.

Connecting

dup_oo :: Sinks a -> Sinks a -> IO (Sinks a) Source #

Send the same data to two consumers.

Given two argument sinks, yield a result sink. Pushing to the result sink causes the same element to be pushed to both argument sinks.

dup_io :: Sources a -> Sinks a -> IO (Sources a) Source #

Send the same data to two consumers.

Given an argument source and argument sink, yield a result source. Pulling an element from the result source pulls from the argument source, and pushes that element to the sink, as well as returning it via the result source.

dup_oi :: Sinks a -> Sources a -> IO (Sources a) Source #

Send the same data to two consumers.

Like dup_io but with the arguments flipped.

connect_i :: Sources a -> IO (Sources a, Sources a) Source #

Connect an argument source to two result sources.

Pulling from either result source pulls from the argument source. Each result source only gets the elements pulled at the time, so if one side pulls all the elements the other side won't get any.

Watching

watch_i :: (Int -> Array A a -> IO ()) -> Sources a -> IO (Sources a) Source #

Hook a worker function to some sources, which will be passed every chunk that is pulled from each source.

  • The worker is also passed the source index of the chunk that was pulled.

watch_o :: (Int -> Array A a -> IO ()) -> Sinks a -> IO (Sinks a) Source #

Hook a worker function to some sinks, which will be passed every chunk that is pushed to each sink.

  • The worker is also passed the source index of the chunk that was pushed.

trigger_o :: Int -> (Int -> Array A a -> IO ()) -> IO (Sinks a) Source #

Create a bundle of sinks of the given arity that pass incoming chunks to a worker function.

  • This is like watch_o, except that the incoming chunks are discarded after they are passed to the worker function

Ignorance

ignore_o :: Int -> IO (Sinks a) Source #

Create a bundle of sinks of the given arity that drop all data on the floor.

  • Haskell debugging thunks attached to the chunks will be demanded, but thunks attached to elements may not be -- depending on whether the chunk representation is strict in the elements.

abandon_o :: Int -> IO (Sinks a) Source #

Create a bundle of sinks of the given arity that drop all data on the floor.

  • As opposed to ignore_o the sinks are non-strict in the chunks.
  • Haskell debugging thunks attached to the chunks will *not* be demanded.

Splitting

head_i :: Flow a => Int -> Int -> Sources a -> IO (Maybe ([a], Sources a)) Source #

Given a source index and a length, split the a list of that length from the front of the source. Yields a new source for the remaining elements.

  • We pull whole chunks from the source stream until we have at least the desired number of elements. The leftover elements in the final chunk are visible in the result Sources.

Concatenation

concat_i :: (Flow a, Build a) => Sources (Array a) -> IO (Sources a) Source #

Concatenate a flow of arrays into a flow of the elements.

Selecting

select_i Source #

Arguments

:: (Select n (Array fs), Select' n (Array fs) ~ Array (Select' n fs)) 
=> Nat n

Index of column to keep.

-> Sources fs

Sources of complete rows.

-> IO (Sources (Select' n fs))

Sources of selected column.

Select a single column from a flow of rows of fields.

select_o Source #

Arguments

:: (Select n (Array fs), Select' n (Array fs) ~ Array (Select' n fs)) 
=> Nat n

Index of column to keep.

-> Sinks (Select' n fs)

Sinks for selected column.

-> IO (Sinks fs)

Sinks for complete rows.

Select a single column from a flow of fields.

discard_i Source #

Arguments

:: (Discard n (Array fs), Discard' n (Array fs) ~ Array (Discard' n fs)) 
=> Nat n

Index of column to discard.

-> Sources fs

Sources of complete rows.

-> IO (Sources (Discard' n fs))

Sources of partial rows.

Discard a single column from a flow of fields.

discard_o Source #

Arguments

:: (Discard n (Array fs), Discard' n (Array fs) ~ Array (Discard' n fs)) 
=> Nat n

Index of column to discard.

-> Sinks (Discard' n fs)

Sinks for partial rows.

-> IO (Sinks fs)

Sinks for complete rows.

Discard a single column from a flow of fields.

mask_i Source #

Arguments

:: (Mask ms (Array fs), Mask' ms (Array fs) ~ Array (Mask' ms fs)) 
=> ms

Column mask.

-> Sources fs

Sources of complete rows.

-> IO (Sources (Mask' ms fs))

Sources of masked rows.

Mask columns from a flow of fields.

mask_o Source #

Arguments

:: (Mask ms (Array fs), Mask' ms (Array fs) ~ Array (Mask' ms fs)) 
=> ms

Column mask.

-> Sinks (Mask' ms fs)

Sources of complete rows.

-> IO (Sinks fs)

Sources of masked rows.

Mask columns from a flow of fields.

Grouping

groups_i Source #

Arguments

:: (GroupsDict a, Eq a) 
=> Sources a

Input elements.

-> IO (Sources (a, Int))

Starting element and length of groups.

Scan through some sources to find runs of matching elements, and count the lengths of those runs.

> F.toList1 0 =<< F.groups_i =<< F.fromList 1 "waabbbblle"
[('w',1),('a',2),('b',4),('l',2),('e',1)]

groupsBy_i Source #

Arguments

:: GroupsDict a 
=> (a -> a -> Bool)

Fn to check if consecutive elements are in the same group.

-> Sources a

Input elements.

-> IO (Sources (a, Int))

Starting element and length of groups.

Like groupsBy, but take a function to determine whether two consecutive values should be in the same group.

type GroupsDict a = GroupsDict Int IO A A A a Source #

Dictionaries needed to perform a grouping.

Folding

Complete

foldlS Source #

Arguments

:: (Flow b, Build a) 
=> (a -> b -> a)

Combining funtion.

-> a

Starting value.

-> Sources b

Input elements to fold.

-> IO (Array A a) 

Fold all the elements of each stream in a bundle, one stream after the other, returning an array of fold results.

foldlAllS Source #

Arguments

:: Flow b 
=> (a -> b -> a)

Combining funtion.

-> a

Starting value.

-> Sources b

Input elements to fold.

-> IO a 

Fold all the elements of each stream in a bundle, one stream after the other, returning an array of fold results.

Segmented

folds_i Source #

Arguments

:: FoldsDict n a b 
=> (a -> b -> b)

Worker function.

-> b

Initial state when folding each segment.

-> Sources (n, Int)

Segment lengths.

-> Sources a

Input elements to fold.

-> IO (Sources (n, b))

Result elements.

Given streams of lengths and values, perform a segmented fold where fold segments of values of the corresponding lengths are folded together.

> sSegs <- F.fromList 1 [('a', 1), ('b', 2), ('c', 4), ('d', 0), ('e', 1), ('f', 5 :: Int)]
> sVals <- F.fromList 1 [10, 20, 30, 40, 50, 60, 70, 80, 90 :: Int]

> F.toList1 0 =<< F.folds_i (+) 0 sSegs sVals
[('a',10),('b',50),('c',220),('d',0),('e',80)]

If not enough input elements are available to fold a complete segment then no output is produced for that segment. However, trailing zero length segments still produce the initial value for the fold.

> sSegs <- F.fromList 1 [('a', 1), ('b', 2), ('c', 0), ('d', 0), ('e', 0 :: Int)]
> sVals <- F.fromList 1 [10, 20, 30 :: Int]

> F.toList1 0 =<< F.folds_i (*) 1 sSegs sVals
[('a',10),('b',600),('c',1),('d',1),('e',1)]

type FoldsDict n a b = FoldsDict Int IO A A A A n a b Source #

Dictionaries needed to perform a segmented fold.

foldGroupsBy_i Source #

Arguments

:: FoldGroupsDict n a b 
=> (n -> n -> Bool)

Fn to check if consecutive elements are in the same group.

-> (a -> b -> b)

Worker function for the fold.

-> b

Initial when folding each segment.

-> Sources n

Names that determine groups.

-> Sources a

Values to fold.

-> IO (Sources (n, b)) 

Combination of groupsBy_i and folds_i. We determine the the segment lengths while performing the folds.

Note that a SQL-like groupby aggregations can be performed using this function, provided the data is pre-sorted on the group key. For example, we can take the average of some groups of values:

> sKeys <- F.fromList 1 "waaaabllle"
> sVals <- F.fromList 1 [10, 20, 30, 40, 50, 60, 70, 80, 90, 100 :: Double]

> sResult <-  F.map_i (\(key, (acc, n)) -> (key, acc / n))
          =<< F.foldGroupsBy_i (==) (\x (acc, n) -> (acc + x, n + 1)) (0, 0) sKeys sVals

> F.toList1 0 sResult
[10.0,35.0,60.0,80.0,100.0]

type FoldGroupsDict n a b = (BulkI A n, Material A a, Material A n, Material A b) Source #

Finalizers

finalize_i :: (Int -> IO ()) -> Sources a -> IO (Sources a) Source #

Attach a finalizer to some sources.

  • For a given source, the finalizer will be called the first time a consumer of that source tries to pull an element when no more are available.
  • The finalizer is given the index of the source that ended.
  • The finalizer will be run after any finalizers already attached to the source.

finalize_o :: (Int -> IO ()) -> Sinks a -> IO (Sinks a) Source #

Attach a finalizer to some sinks.

  • For a given sink, the finalizer will be called the first time that sink is ejected.
  • The finalizer is given the index of the sink that was ejected.
  • The finalizer will be run after any finalizers already attached to the sink.