repa-flow-4.2.3.1: Data-parallel data flows.

Data.Repa.Flow.Simple

Synopsis

Documentation

type Source m e = Sources () m e Source #

Source consisting of a single stream.

type Sink m e = Sinks () m e Source #

Sink consisting of a single stream.

Evaluation

drainS :: Monad m => Source m a -> Sink m a -> m () Source #

Pull all available values from the source and push them to the sink.

Conversions

fromList :: States () m => [a] -> m (Source m a) Source #

Given an arity and a list of elements, yield a source that produces all the elements.

toList :: States () m => Source m a -> m [a] Source #

Drain a source into a list.

takeList :: States () m => Int -> Source m a -> m [a] Source #

Drain the given number of elements from a single source into a list.

Finalizers

finalize_i :: States () m => m () -> Source m a -> m (Source m a) Source #

Attach a finalizer to a source.

The finalizer will be called the first time a consumer of that stream tries to pull an element when no more are available.

The provided finalizer will be run after any finalizers already attached to the source.

finalize_o :: States () m => m () -> Sink m a -> m (Sink m a) Source #

Attach a finalizer to a sink.

The finalizer will be called the first time the stream is ejected.

The provided finalizer will be run after any finalizers already attached to the sink.

Flow Operators

Constructors

repeat_i :: States () m => a -> m (Source m a) Source #

Yield a source that always produces the same value.

replicate_i :: States () m => Int -> a -> m (Source m a) Source #

Yield a source of the given length that always produces the same value.

prepend_i :: States () m => [a] -> Source m a -> m (Source m a) Source #

Prepend some more elements to the front of a source.

Mapping

map_i :: States () m => (a -> b) -> Source m a -> m (Source m b) Source #

Apply a function to every element pulled from some source, producing a new source.

map_o :: States () m => (a -> b) -> Sink m b -> m (Sink m a) Source #

Apply a function to every element pushed to some sink, producing a new sink.

Connecting

dup_oo :: States () m => Sink m a -> Sink m a -> m (Sink m a) Source #

Send the same data to two consumers.

Given two argument sinks, yield a result sink. Pushing to the result sink causes the same element to be pushed to both argument sinks.

dup_io :: States () m => Source m a -> Sink m a -> m (Source m a) Source #

Send the same data to two consumers.

Given an argument source and argument sink, yield a result source. Pulling an element from the result source pulls from the argument source, and pushes that element to the sink, as well as returning it via the result source.

dup_oi :: States () m => Sink m a -> Source m a -> m (Source m a) Source #

Send the same data to two consumers.

Like dup_io but with the arguments flipped.

connect_i :: States () m => Source m a -> m (Source m a, Source m a) Source #

Connect an argument source to two result sources.

Pulling from either result source pulls from the argument source. Each result source only gets the elements pulled at the time, so if one side pulls all the elements the other side won't get any.

Splitting

head_i :: States () m => Int -> Source m a -> m ([a], Source m a) Source #

Split the given number of elements from the head of a source, returning those elements in a list, and yielding a new source for the rest.

peek_i :: States () m => Int -> Source m a -> m ([a], Source m a) Source #

Peek at the given number of elements in the stream, returning a result stream that still produces them all.

Grouping

groups_i :: (Monad m, Eq a) => Source m a -> m (Source m Int) Source #

From a stream of values which has consecutive runs of idential values, produce a stream of the lengths of these runs.

Example: groups [4, 4, 4, 3, 3, 1, 1, 1, 4] = [3, 2, 3, 1]

Packing

pack_ii :: Monad m => Source m Bool -> Source m a -> m (Source m a) Source #

Given a stream of flags and a stream of values, produce a new stream of values where the corresponding flag was True. The length of the result is the length of the shorter of the two inputs.

Folding

folds_ii :: Monad m => (a -> a -> a) -> a -> Source m Int -> Source m a -> m (Source m a) Source #

Segmented fold.

Watching

watch_i :: Monad m => (a -> m ()) -> Source m a -> m (Source m a) Source #

Apply a monadic function to every element pulled from a source producing a new source.

watch_o :: Monad m => (a -> m ()) -> Sink m a -> m (Sink m a) Source #

Pass elements to the provided action as they are pushed to the sink.

trigger_o :: Monad m => (a -> m ()) -> m (Sink m a) Source #

Like watch but doesn't pass elements to another sink.

Ignorance

ignore_o :: Monad m => m (Sink m a) Source #

A sink that ignores all incoming elements.

This sink is strict in the elements, so they are demanded before being discarded. Haskell debugging thunks attached to the elements will be demanded.

abandon_o :: Monad m => m (Sink m a) Source #

A sink that drops all data on the floor.

This sink is non-strict in the elements. Haskell tracing thinks attached to the elements will *not* be demanded.

Flow IO

Sourcing

fromFiles :: [FilePath] -> (Array B Bucket -> IO b) -> IO b Source #

Open some files as buckets and use them as Sources.

Read data from a file, using the given chunk length.

• Data is read into foreign memory without copying it through the GHC heap.
• All chunks have the same size, except possibly the last one.

The file will be closed the first time the consumer tries to pull an element from the associated stream when no more are available.

Arguments

 :: Integer Size of chunk to read in bytes. -> (Word8 -> Bool) Detect the end of a record. -> IO () Action to perform if we can't get a whole record. -> Bucket File handle. -> IO (Source IO (Array N (Array F Word8)))

Read complete records of data from a file, using the given chunk length

The records are separated by a special terminating character, which the given predicate detects. After reading a chunk of data we seek to just after the last complete record that was read, so we can continue to read more complete records next time.

If we cannot find an end-of-record terminator in the chunk then apply the given failure action. The records can be no longer than the chunk length. This fact guards against the case where a large input file is malformed and contains no end-of-record terminators, as we won't try to read the whole file into memory.

• Data is read into foreign memory without copying it through the GHC heap.
• All chunks have the same size, except possibly the last one.
• The provided file handle must support seeking, else you'll get an exception.

The file will be closed the first time the consumer tries to pull an element from the associated stream when no more are available.

Sinking

Arguments

 :: [FilePath] File paths. -> (Array B Bucket -> IO b) Worker writes data to buckets. -> IO b

Open some files for writing as individual buckets and pass them to the given consumer.

Write chunks of data to the given files.

The file will be closed when the associated stream is ejected.