repa-flow-4.2.2.1: Data-parallel data flows.

Safe HaskellNone
LanguageHaskell98

Data.Repa.Flow.Simple

Contents

Synopsis

Documentation

type Source m e = Sources () m e Source

Source consisting of a single stream.

type Sink m e = Sinks () m e Source

Sink consisting of a single stream.

Evaluation

drainS :: Monad m => Source m a -> Sink m a -> m () Source

Pull all available values from the source and push them to the sink.

Conversions

fromList :: States () m => [a] -> m (Source m a) Source

Given an arity and a list of elements, yield a source that produces all the elements.

toList :: States () m => Source m a -> m [a] Source

Drain a source into a list.

takeList :: States () m => Int -> Source m a -> m [a] Source

Drain the given number of elements from a single source into a list.

Finalizers

finalize_i :: States () m => m () -> Source m a -> m (Source m a) Source

Attach a finalizer to a source.

The finalizer will be called the first time a consumer of that stream tries to pull an element when no more are available.

The provided finalizer will be run after any finalizers already attached to the source.

finalize_o :: States () m => m () -> Sink m a -> m (Sink m a) Source

Attach a finalizer to a sink.

The finalizer will be called the first time the stream is ejected.

The provided finalizer will be run after any finalizers already attached to the sink.

Flow Operators

Constructors

repeat_i :: States () m => a -> m (Source m a) Source

Yield a source that always produces the same value.

replicate_i :: States () m => Int -> a -> m (Source m a) Source

Yield a source of the given length that always produces the same value.

prepend_i :: States () m => [a] -> Source m a -> m (Source m a) Source

Prepend some more elements to the front of a source.

Mapping

map_i :: States () m => (a -> b) -> Source m a -> m (Source m b) Source

Apply a function to every element pulled from some source, producing a new source.

map_o :: States () m => (a -> b) -> Sink m b -> m (Sink m a) Source

Apply a function to every element pushed to some sink, producing a new sink.

Connecting

dup_oo :: States () m => Sink m a -> Sink m a -> m (Sink m a) Source

Send the same data to two consumers.

Given two argument sinks, yield a result sink. Pushing to the result sink causes the same element to be pushed to both argument sinks.

dup_io :: States () m => Source m a -> Sink m a -> m (Source m a) Source

Send the same data to two consumers.

Given an argument source and argument sink, yield a result source. Pulling an element from the result source pulls from the argument source, and pushes that element to the sink, as well as returning it via the result source.

dup_oi :: States () m => Sink m a -> Source m a -> m (Source m a) Source

Send the same data to two consumers.

Like dup_io but with the arguments flipped.

connect_i :: States () m => Source m a -> m (Source m a, Source m a) Source

Connect an argument source to two result sources.

Pulling from either result source pulls from the argument source. Each result source only gets the elements pulled at the time, so if one side pulls all the elements the other side won't get any.

Splitting

head_i :: States () m => Int -> Source m a -> m ([a], Source m a) Source

Split the given number of elements from the head of a source, returning those elements in a list, and yielding a new source for the rest.

peek_i :: States () m => Int -> Source m a -> m ([a], Source m a) Source

Peek at the given number of elements in the stream, returning a result stream that still produces them all.

Grouping

groups_i :: (Monad m, Eq a) => Source m a -> m (Source m Int) Source

From a stream of values which has consecutive runs of idential values, produce a stream of the lengths of these runs.

Example: groups [4, 4, 4, 3, 3, 1, 1, 1, 4] = [3, 2, 3, 1]

Packing

pack_ii :: Monad m => Source m Bool -> Source m a -> m (Source m a) Source

Given a stream of flags and a stream of values, produce a new stream of values where the corresponding flag was True. The length of the result is the length of the shorter of the two inputs.

Folding

folds_ii :: Monad m => (a -> a -> a) -> a -> Source m Int -> Source m a -> m (Source m a) Source

Segmented fold.

Watching

watch_i :: Monad m => (a -> m ()) -> Source m a -> m (Source m a) Source

Apply a monadic function to every element pulled from a source producing a new source.

watch_o :: Monad m => (a -> m ()) -> Sink m a -> m (Sink m a) Source

Pass elements to the provided action as they are pushed to the sink.

trigger_o :: Monad m => (a -> m ()) -> m (Sink m a) Source

Like watch but doesn't pass elements to another sink.

Ignorance

ignore_o :: Monad m => m (Sink m a) Source

A sink that ignores all incoming elements.

This sink is strict in the elements, so they are demanded before being discarded. Haskell debugging thunks attached to the elements will be demanded.

abandon_o :: Monad m => m (Sink m a) Source

A sink that drops all data on the floor.

This sink is non-strict in the elements. Haskell tracing thinks attached to the elements will *not* be demanded.

Flow IO

Sourcing

fromFiles :: [FilePath] -> (Array B Bucket -> IO b) -> IO b Source

Open some files as buckets and use them as Sources.

sourceBytes :: Integer -> Bucket -> IO (Source IO (Array F Word8)) Source

Read data from a file, using the given chunk length.

  • Data is read into foreign memory without copying it through the GHC heap.
  • All chunks have the same size, except possibly the last one.

The file will be closed the first time the consumer tries to pull an element from the associated stream when no more are available.

sourceRecords Source

Arguments

:: Integer

Size of chunk to read in bytes.

-> (Word8 -> Bool)

Detect the end of a record.

-> IO ()

Action to perform if we can't get a whole record.

-> Bucket

File handle.

-> IO (Source IO (Array N (Array F Word8))) 

Read complete records of data from a file, using the given chunk length

The records are separated by a special terminating character, which the given predicate detects. After reading a chunk of data we seek to just after the last complete record that was read, so we can continue to read more complete records next time.

If we cannot find an end-of-record terminator in the chunk then apply the given failure action. The records can be no longer than the chunk length. This fact guards against the case where a large input file is malformed and contains no end-of-record terminators, as we won't try to read the whole file into memory.

  • Data is read into foreign memory without copying it through the GHC heap.
  • All chunks have the same size, except possibly the last one.
  • The provided file handle must support seeking, else you'll get an exception.

The file will be closed the first time the consumer tries to pull an element from the associated stream when no more are available.

Sinking

toFiles Source

Arguments

:: [FilePath]

File paths.

-> (Array B Bucket -> IO b)

Worker writes data to buckets.

-> IO b 

Open some files for writing as individual buckets and pass them to the given consumer.

sinkBytes :: Bucket -> IO (Sink IO (Array F Word8)) Source

Write chunks of data to the given files.

The file will be closed when the associated stream is ejected.