Safe Haskell | None |
---|---|
Language | Haskell98 |
- module Data.Repa.Flow.States
- type Source m e = Sources () m e
- type Sink m e = Sinks () m e
- drainS :: Monad m => Source m a -> Sink m a -> m ()
- fromList :: States () m => [a] -> m (Source m a)
- toList :: States () m => Source m a -> m [a]
- takeList :: States () m => Int -> Source m a -> m [a]
- finalize_i :: States () m => m () -> Source m a -> m (Source m a)
- finalize_o :: States () m => m () -> Sink m a -> m (Sink m a)
- repeat_i :: States () m => a -> m (Source m a)
- replicate_i :: States () m => Int -> a -> m (Source m a)
- prepend_i :: States () m => [a] -> Source m a -> m (Source m a)
- map_i :: States () m => (a -> b) -> Source m a -> m (Source m b)
- map_o :: States () m => (a -> b) -> Sink m b -> m (Sink m a)
- dup_oo :: States () m => Sink m a -> Sink m a -> m (Sink m a)
- dup_io :: States () m => Source m a -> Sink m a -> m (Source m a)
- dup_oi :: States () m => Sink m a -> Source m a -> m (Source m a)
- connect_i :: States () m => Source m a -> m (Source m a, Source m a)
- head_i :: States () m => Int -> Source m a -> m ([a], Source m a)
- peek_i :: States () m => Int -> Source m a -> m ([a], Source m a)
- groups_i :: (Monad m, Eq a) => Source m a -> m (Source m Int)
- pack_ii :: Monad m => Source m Bool -> Source m a -> m (Source m a)
- folds_ii :: Monad m => (a -> a -> a) -> a -> Source m Int -> Source m a -> m (Source m a)
- watch_i :: Monad m => (a -> m ()) -> Source m a -> m (Source m a)
- watch_o :: Monad m => (a -> m ()) -> Sink m a -> m (Sink m a)
- trigger_o :: Monad m => (a -> m ()) -> m (Sink m a)
- ignore_o :: Monad m => m (Sink m a)
- abandon_o :: Monad m => m (Sink m a)
- fromFiles :: [FilePath] -> (Array B Bucket -> IO b) -> IO b
- sourceBytes :: Integer -> Bucket -> IO (Source IO (Array F Word8))
- sourceRecords :: Integer -> (Word8 -> Bool) -> IO () -> Bucket -> IO (Source IO (Array N (Array F Word8)))
- toFiles :: [FilePath] -> (Array B Bucket -> IO b) -> IO b
- sinkBytes :: Bucket -> IO (Sink IO (Array F Word8))
Documentation
module Data.Repa.Flow.States
Evaluation
drainS :: Monad m => Source m a -> Sink m a -> m () Source #
Pull all available values from the source and push them to the sink.
Conversions
fromList :: States () m => [a] -> m (Source m a) Source #
Given an arity and a list of elements, yield a source that produces all the elements.
takeList :: States () m => Int -> Source m a -> m [a] Source #
Drain the given number of elements from a single source into a list.
Finalizers
finalize_i :: States () m => m () -> Source m a -> m (Source m a) Source #
Attach a finalizer to a source.
The finalizer will be called the first time a consumer of that stream tries to pull an element when no more are available.
The provided finalizer will be run after any finalizers already attached to the source.
finalize_o :: States () m => m () -> Sink m a -> m (Sink m a) Source #
Attach a finalizer to a sink.
The finalizer will be called the first time the stream is ejected.
The provided finalizer will be run after any finalizers already attached to the sink.
Flow Operators
Constructors
repeat_i :: States () m => a -> m (Source m a) Source #
Yield a source that always produces the same value.
replicate_i :: States () m => Int -> a -> m (Source m a) Source #
Yield a source of the given length that always produces the same value.
prepend_i :: States () m => [a] -> Source m a -> m (Source m a) Source #
Prepend some more elements to the front of a source.
Mapping
map_i :: States () m => (a -> b) -> Source m a -> m (Source m b) Source #
Apply a function to every element pulled from some source, producing a new source.
map_o :: States () m => (a -> b) -> Sink m b -> m (Sink m a) Source #
Apply a function to every element pushed to some sink, producing a new sink.
Connecting
dup_oo :: States () m => Sink m a -> Sink m a -> m (Sink m a) Source #
Send the same data to two consumers.
Given two argument sinks, yield a result sink. Pushing to the result sink causes the same element to be pushed to both argument sinks.
dup_io :: States () m => Source m a -> Sink m a -> m (Source m a) Source #
Send the same data to two consumers.
Given an argument source and argument sink, yield a result source. Pulling an element from the result source pulls from the argument source, and pushes that element to the sink, as well as returning it via the result source.
dup_oi :: States () m => Sink m a -> Source m a -> m (Source m a) Source #
Send the same data to two consumers.
Like dup_io
but with the arguments flipped.
connect_i :: States () m => Source m a -> m (Source m a, Source m a) Source #
Connect an argument source to two result sources.
Pulling from either result source pulls from the argument source. Each result source only gets the elements pulled at the time, so if one side pulls all the elements the other side won't get any.
Splitting
head_i :: States () m => Int -> Source m a -> m ([a], Source m a) Source #
Split the given number of elements from the head of a source, returning those elements in a list, and yielding a new source for the rest.
peek_i :: States () m => Int -> Source m a -> m ([a], Source m a) Source #
Peek at the given number of elements in the stream, returning a result stream that still produces them all.
Grouping
groups_i :: (Monad m, Eq a) => Source m a -> m (Source m Int) Source #
From a stream of values which has consecutive runs of idential values, produce a stream of the lengths of these runs.
Example: groups [4, 4, 4, 3, 3, 1, 1, 1, 4] = [3, 2, 3, 1]
Packing
pack_ii :: Monad m => Source m Bool -> Source m a -> m (Source m a) Source #
Given a stream of flags and a stream of values, produce a new stream of values where the corresponding flag was True. The length of the result is the length of the shorter of the two inputs.
Folding
folds_ii :: Monad m => (a -> a -> a) -> a -> Source m Int -> Source m a -> m (Source m a) Source #
Segmented fold.
Watching
watch_i :: Monad m => (a -> m ()) -> Source m a -> m (Source m a) Source #
Apply a monadic function to every element pulled from a source producing a new source.
watch_o :: Monad m => (a -> m ()) -> Sink m a -> m (Sink m a) Source #
Pass elements to the provided action as they are pushed to the sink.
trigger_o :: Monad m => (a -> m ()) -> m (Sink m a) Source #
Like watch
but doesn't pass elements to another sink.
Ignorance
ignore_o :: Monad m => m (Sink m a) Source #
A sink that ignores all incoming elements.
This sink is strict in the elements, so they are demanded before being discarded. Haskell debugging thunks attached to the elements will be demanded.
abandon_o :: Monad m => m (Sink m a) Source #
A sink that drops all data on the floor.
This sink is non-strict in the elements. Haskell tracing thinks attached to the elements will *not* be demanded.
Flow IO
Sourcing
fromFiles :: [FilePath] -> (Array B Bucket -> IO b) -> IO b Source #
Open some files as buckets and use them as Sources
.
sourceBytes :: Integer -> Bucket -> IO (Source IO (Array F Word8)) Source #
Read data from a file, using the given chunk length.
- Data is read into foreign memory without copying it through the GHC heap.
- All chunks have the same size, except possibly the last one.
The file will be closed the first time the consumer tries to pull an element from the associated stream when no more are available.
:: Integer | Size of chunk to read in bytes. |
-> (Word8 -> Bool) | Detect the end of a record. |
-> IO () | Action to perform if we can't get a whole record. |
-> Bucket | File handle. |
-> IO (Source IO (Array N (Array F Word8))) |
Read complete records of data from a file, using the given chunk length
The records are separated by a special terminating character, which the given predicate detects. After reading a chunk of data we seek to just after the last complete record that was read, so we can continue to read more complete records next time.
If we cannot find an end-of-record terminator in the chunk then apply the given failure action. The records can be no longer than the chunk length. This fact guards against the case where a large input file is malformed and contains no end-of-record terminators, as we won't try to read the whole file into memory.
- Data is read into foreign memory without copying it through the GHC heap.
- All chunks have the same size, except possibly the last one.
- The provided file handle must support seeking, else you'll get an exception.
The file will be closed the first time the consumer tries to pull an element from the associated stream when no more are available.
Sinking
Open some files for writing as individual buckets and pass them to the given consumer.