Safe Haskell | None |
---|---|
Language | Haskell98 |
- module Data.Repa.Flow.States
- type Source m e = Sources () m e
- type Sink m e = Sinks () m e
- drainS :: Monad m => Source m a -> Sink m a -> m ()
- fromList :: States () m => [a] -> m (Source m a)
- toList :: States () m => Source m a -> m [a]
- takeList :: States () m => Int -> Source m a -> m [a]
- finalize_i :: States () m => m () -> Source m a -> m (Source m a)
- finalize_o :: States () m => m () -> Sink m a -> m (Sink m a)
- repeat_i :: States () m => a -> m (Source m a)
- replicate_i :: States () m => Int -> a -> m (Source m a)
- prepend_i :: States () m => [a] -> Source m a -> m (Source m a)
- map_i :: States () m => (a -> b) -> Source m a -> m (Source m b)
- map_o :: States () m => (a -> b) -> Sink m b -> m (Sink m a)
- dup_oo :: States () m => Sink m a -> Sink m a -> m (Sink m a)
- dup_io :: States () m => Source m a -> Sink m a -> m (Source m a)
- dup_oi :: States () m => Sink m a -> Source m a -> m (Source m a)
- connect_i :: States () m => Source m a -> m (Source m a, Source m a)
- head_i :: States () m => Int -> Source m a -> m ([a], Source m a)
- peek_i :: States () m => Int -> Source m a -> m ([a], Source m a)
- groups_i :: (Monad m, Eq a) => Source m a -> m (Source m Int)
- pack_ii :: Monad m => Source m Bool -> Source m a -> m (Source m a)
- folds_ii :: Monad m => (a -> a -> a) -> a -> Source m Int -> Source m a -> m (Source m a)
- watch_i :: Monad m => (a -> m ()) -> Source m a -> m (Source m a)
- watch_o :: Monad m => (a -> m ()) -> Sink m a -> m (Sink m a)
- trigger_o :: Monad m => (a -> m ()) -> m (Sink m a)
- ignore_o :: Monad m => m (Sink m a)
- abandon_o :: Monad m => m (Sink m a)
- fromFiles :: [FilePath] -> (Array B Bucket -> IO b) -> IO b
- sourceBytes :: Integer -> Bucket -> IO (Source IO (Array F Word8))
- sourceRecords :: Integer -> (Word8 -> Bool) -> IO () -> Bucket -> IO (Source IO (Array N (Array F Word8)))
- toFiles :: [FilePath] -> (Array B Bucket -> IO b) -> IO b
- sinkBytes :: Bucket -> IO (Sink IO (Array F Word8))
Documentation
module Data.Repa.Flow.States
Evaluation
drainS :: Monad m => Source m a -> Sink m a -> m () Source
Pull all available values from the source and push them to the sink.
Conversions
fromList :: States () m => [a] -> m (Source m a) Source
Given an arity and a list of elements, yield a source that produces all the elements.
takeList :: States () m => Int -> Source m a -> m [a] Source
Drain the given number of elements from a single source into a list.
Finalizers
finalize_i :: States () m => m () -> Source m a -> m (Source m a) Source
Attach a finalizer to a source.
The finalizer will be called the first time a consumer of that stream tries to pull an element when no more are available.
The provided finalizer will be run after any finalizers already attached to the source.
finalize_o :: States () m => m () -> Sink m a -> m (Sink m a) Source
Attach a finalizer to a sink.
The finalizer will be called the first time the stream is ejected.
The provided finalizer will be run after any finalizers already attached to the sink.
Flow Operators
Constructors
repeat_i :: States () m => a -> m (Source m a) Source
Yield a source that always produces the same value.
replicate_i :: States () m => Int -> a -> m (Source m a) Source
Yield a source of the given length that always produces the same value.
prepend_i :: States () m => [a] -> Source m a -> m (Source m a) Source
Prepend some more elements to the front of a source.
Mapping
map_i :: States () m => (a -> b) -> Source m a -> m (Source m b) Source
Apply a function to every element pulled from some source, producing a new source.
map_o :: States () m => (a -> b) -> Sink m b -> m (Sink m a) Source
Apply a function to every element pushed to some sink, producing a new sink.
Connecting
dup_oo :: States () m => Sink m a -> Sink m a -> m (Sink m a) Source
Send the same data to two consumers.
Given two argument sinks, yield a result sink. Pushing to the result sink causes the same element to be pushed to both argument sinks.
dup_io :: States () m => Source m a -> Sink m a -> m (Source m a) Source
Send the same data to two consumers.
Given an argument source and argument sink, yield a result source. Pulling an element from the result source pulls from the argument source, and pushes that element to the sink, as well as returning it via the result source.
dup_oi :: States () m => Sink m a -> Source m a -> m (Source m a) Source
Send the same data to two consumers.
Like dup_io
but with the arguments flipped.
connect_i :: States () m => Source m a -> m (Source m a, Source m a) Source
Connect an argument source to two result sources.
Pulling from either result source pulls from the argument source. Each result source only gets the elements pulled at the time, so if one side pulls all the elements the other side won't get any.
Splitting
head_i :: States () m => Int -> Source m a -> m ([a], Source m a) Source
Split the given number of elements from the head of a source, returning those elements in a list, and yielding a new source for the rest.
peek_i :: States () m => Int -> Source m a -> m ([a], Source m a) Source
Peek at the given number of elements in the stream, returning a result stream that still produces them all.
Grouping
groups_i :: (Monad m, Eq a) => Source m a -> m (Source m Int) Source
From a stream of values which has consecutive runs of idential values, produce a stream of the lengths of these runs.
Example: groups [4, 4, 4, 3, 3, 1, 1, 1, 4] = [3, 2, 3, 1]
Packing
pack_ii :: Monad m => Source m Bool -> Source m a -> m (Source m a) Source
Given a stream of flags and a stream of values, produce a new stream of values where the corresponding flag was True. The length of the result is the length of the shorter of the two inputs.
Folding
folds_ii :: Monad m => (a -> a -> a) -> a -> Source m Int -> Source m a -> m (Source m a) Source
Segmented fold.
Watching
watch_i :: Monad m => (a -> m ()) -> Source m a -> m (Source m a) Source
Apply a monadic function to every element pulled from a source producing a new source.
watch_o :: Monad m => (a -> m ()) -> Sink m a -> m (Sink m a) Source
Pass elements to the provided action as they are pushed to the sink.
trigger_o :: Monad m => (a -> m ()) -> m (Sink m a) Source
Like watch
but doesn't pass elements to another sink.
Ignorance
ignore_o :: Monad m => m (Sink m a) Source
A sink that ignores all incoming elements.
This sink is strict in the elements, so they are demanded before being discarded. Haskell debugging thunks attached to the elements will be demanded.
abandon_o :: Monad m => m (Sink m a) Source
A sink that drops all data on the floor.
This sink is non-strict in the elements. Haskell tracing thinks attached to the elements will *not* be demanded.
Flow IO
Sourcing
fromFiles :: [FilePath] -> (Array B Bucket -> IO b) -> IO b Source
Open some files as buckets and use them as Sources
.
sourceBytes :: Integer -> Bucket -> IO (Source IO (Array F Word8)) Source
Read data from a file, using the given chunk length.
- Data is read into foreign memory without copying it through the GHC heap.
- All chunks have the same size, except possibly the last one.
The file will be closed the first time the consumer tries to pull an element from the associated stream when no more are available.
:: Integer | Size of chunk to read in bytes. |
-> (Word8 -> Bool) | Detect the end of a record. |
-> IO () | Action to perform if we can't get a whole record. |
-> Bucket | File handle. |
-> IO (Source IO (Array N (Array F Word8))) |
Read complete records of data from a file, using the given chunk length
The records are separated by a special terminating character, which the given predicate detects. After reading a chunk of data we seek to just after the last complete record that was read, so we can continue to read more complete records next time.
If we cannot find an end-of-record terminator in the chunk then apply the given failure action. The records can be no longer than the chunk length. This fact guards against the case where a large input file is malformed and contains no end-of-record terminators, as we won't try to read the whole file into memory.
- Data is read into foreign memory without copying it through the GHC heap.
- All chunks have the same size, except possibly the last one.
- The provided file handle must support seeking, else you'll get an exception.
The file will be closed the first time the consumer tries to pull an element from the associated stream when no more are available.
Sinking
Open some files for writing as individual buckets and pass them to the given consumer.