Safe Haskell | None |
---|---|
Language | Haskell98 |
Getting Started
A flow consists of a bundle of individual streams. Here we create a bundle of two streams, using different files for each. Data will be read in chunks, using the default chunk size of 64kBytes.
> import Data.Repa.Array as A > import Data.Repa.Flow as F > import Data.Repa.Flow.Auto.Debug as F > ws <- fromFiles' ["/usr/share/dict/words", "/usr/share/dict/cracklib-small"] sourceLines
Show the first few elements of the first chunk of the first file.
> more 0 ws Just ["A","A's","AA's","AB's","ABM's","AC's","ACTH's","AI's" ...]
The more
function is helpful for debugging. It pulls a whole chunk from a
source, displays the requested number of elements from the front of it, then
discards the rest. In production code you could use head_i
to split a few
elements from a stream while retaining the rest.
Use more'
to show more elements at a time. We've already pulled the first chunk,
so here are the first 100 elements from the second chunk:
> more' 0 100 ws Just ["Jubal","Judah","Judaic","Judaism","Judaism's","Judaisms","Judas" ...]
Use moret
to display elements in tabular form. Here are the first few elements of
the second stream in the bundle:
> moret 1 ws "10th" "1st" "2nd" "3rd" "4th" "5th" ...
Lets convert the characters to upper-case.
> import Data.Char > up <- F.map_i (A.map toUpper) ws > more 0 up Just ["UTOPIAN","UTOPIAN'S","UTOPIANS","UTOPIAS","UTRECHT" ...]
Flows are data-parallel, which means operators like map_i
apply to all
streams in the bundle. The second stream has been converted to upper-case
as well:
> more 1 up Just ["BROWNER","BROWNEST","BROWNIAN","BROWNIE","BROWNIE'S" ...]
Lets write out the data to some files. There are two streams in the bundle, so open a file for each stream:
> out <- toFiles' ["out1.txt", "out2.txt"] sinkLines
Note that the ws
and up
we used before were bundles of stream
Sources
whereas out
is a bundle of stream Sinks
. When we used
the map_i
operator before the _i
(input) suffix indicates that
this is a transformer of Sources
. There is a related map_o
(output) operator for Sinks
.
Now that we have a bundle of Sources
, and some matching Sinks
,
we can drainS
all of the data from the former into the latter.
> F.drainS up out
At this point we can run an external shell command to check the output.
> :! head out1.txt BEARSKIN'S BEARSKINS BEAST BEAST'S BEASTLIER BEASTLIEST BEASTLINESS BEASTLINESS'S BEASTLY BEASTLY'S
Performance
Althogh repa-flow
can be used productively in the ghci REPL,
performance won't be great because you will be running unspecialised,
polymorphic code. For best results you should write a complete
program and compile it with ghc -fllvm -O2 Main.hs
.
- type Sources a = Sources Int IO A a
- type Sinks a = Sinks Int IO A a
- type Flow a = (Flow Int IO A a, Windowable A a)
- sourcesArity :: Sources a -> Int
- sinksArity :: Sinks a -> Int
- drainS :: Sources a -> Sinks a -> IO ()
- drainP :: Sources a -> Sinks a -> IO ()
- fromList :: Build a t => Int -> [a] -> IO (Sources a)
- fromLists :: Build a t => Int -> [[a]] -> IO (Sources a)
- toList1 :: Build a t => Int -> Sources a -> IO [a]
- toLists1 :: Build a t => Int -> Sources a -> IO [[a]]
- finalize_i :: (Int -> IO ()) -> Sources a -> IO (Sources a)
- finalize_o :: (Int -> IO ()) -> Sinks a -> IO (Sinks a)
- map_i :: (Flow a, Build b bt) => (a -> b) -> Sources a -> IO (Sources b)
- map_o :: (Flow a, Build b bt) => (a -> b) -> Sinks b -> IO (Sinks a)
- dup_oo :: Sinks a -> Sinks a -> IO (Sinks a)
- dup_io :: Sources a -> Sinks a -> IO (Sources a)
- dup_oi :: Sinks a -> Sources a -> IO (Sources a)
- connect_i :: Sources a -> IO (Sources a, Sources a)
- watch_i :: (Int -> Array A a -> IO ()) -> Sources a -> IO (Sources a)
- watch_o :: (Int -> Array A a -> IO ()) -> Sinks a -> IO (Sinks a)
- trigger_o :: Int -> (Int -> Array A a -> IO ()) -> IO (Sinks a)
- discard_o :: Int -> IO (Sinks a)
- ignore_o :: Int -> IO (Sinks a)
- head_i :: Flow a => Int -> Int -> Sources a -> IO (Maybe ([a], Sources a))
- groups_i :: (GroupsDict a u1 u2, Eq a) => Sources a -> IO (Sources (a, Int))
- groupsBy_i :: GroupsDict a u1 u2 => (a -> a -> Bool) -> Sources a -> IO (Sources (a, Int))
- type GroupsDict a u1 u2 = GroupsDict Int IO A A u1 A u2 a
- foldlS :: (Flow b, Build a at) => (a -> b -> a) -> a -> Sources b -> IO (Array A a)
- foldlAllS :: Flow b => (a -> b -> a) -> a -> Sources b -> IO a
- folds_i :: FoldsDict n a b u1 u2 u3 u4 => (a -> b -> b) -> b -> Sources (n, Int) -> Sources a -> IO (Sources (n, b))
- type FoldsDict n a b u1 u2 u3 u4 = FoldsDict Int IO A u1 A u2 A u3 A u4 n a b
- foldGroupsBy_i :: FoldGroupsDict n a b u1 u2 => (n -> n -> Bool) -> (a -> b -> b) -> b -> Sources n -> Sources a -> IO (Sources (n, b))
- type FoldGroupsDict n a b u1 u2 = (BulkI A n, Material A a, Material A n, Material A b, Unpack (Buffer A n) u1, Unpack (Buffer A b) u2)
- defaultChunkSize :: Integer
- module Data.Repa.Flow.IO.Bucket
- sourceCSV :: Array B Bucket -> IO (Sources (Array A (Array A Char)))
- sourceTSV :: Array B Bucket -> IO (Sources (Array A (Array A Char)))
- sourceRecords :: (Word8 -> Bool) -> Array B Bucket -> IO (Sources (Array A Word8))
- sourceLines :: Array B Bucket -> IO (Sources (Array A Char))
- sourceChars :: Array B Bucket -> IO (Sources Char)
- sourceBytes :: Array B Bucket -> IO (Sources Word8)
- sourcePacked :: (Packable format, Target A (Value format)) => format -> IO () -> Array B Bucket -> IO (Sources (Value format))
- sinkChars :: Array B Bucket -> IO (Sinks Char)
- sinkLines :: Array B Bucket -> IO (Sinks (Array A Char))
- sinkBytes :: Array B Bucket -> IO (Sinks Word8)
- sinkPacked :: (Packable format, Bulk A (Value format)) => format -> IO () -> Array B Bucket -> IO (Sinks (Value format))
Flow types
type Sources a = Sources Int IO A a Source
A bundle of stream sources, where the elements of the stream are chunked into arrays.
type Sinks a = Sinks Int IO A a Source
A bundle of stream sinks, where the elements of the stream are chunked into arrays.
sourcesArity :: Sources a -> Int Source
Yield the number of streams in the bundle.
sinksArity :: Sinks a -> Int Source
Yield the number of streams in the bundle.
Evaluation
Conversion
fromList :: Build a t => Int -> [a] -> IO (Sources a) Source
Given an arity and a list of elements, yield sources that each produce all the elements.
- All elements are stuffed into a single chunk, and each stream is given the same chunk.
fromLists :: Build a t => Int -> [[a]] -> IO (Sources a) Source
Like fromLists_i
but take a list of lists. Each each of the inner
lists is packed into a single chunk.
toList1 :: Build a t => Int -> Sources a -> IO [a] Source
Drain a single source from a bundle into a list of elements.
toLists1 :: Build a t => Int -> Sources a -> IO [[a]] Source
Drain a single source from a bundle into a list of chunks.
Finalizers
finalize_i :: (Int -> IO ()) -> Sources a -> IO (Sources a) Source
Attach a finalizer to some sources.
- For a given source, the finalizer will be called the first time a consumer of that source tries to pull an element when no more are available.
- The finalizer is given the index of the source that ended.
- The finalizer will be run after any finalizers already attached to the source.
finalize_o :: (Int -> IO ()) -> Sinks a -> IO (Sinks a) Source
Attach a finalizer to some sinks.
- For a given sink, the finalizer will be called the first time that sink is ejected.
- The finalizer is given the index of the sink that was ejected.
- The finalizer will be run after any finalizers already attached to the sink.
Flow Operators
Mapping
If you want to work on a chunk at a time then use
map_i
and
map_o
from Data.Repa.Flow.Generic.
map_i :: (Flow a, Build b bt) => (a -> b) -> Sources a -> IO (Sources b) Source
Apply a function to all elements pulled from some sources.
map_o :: (Flow a, Build b bt) => (a -> b) -> Sinks b -> IO (Sinks a) Source
Apply a function to all elements pushed to some sinks.
Connecting
dup_oo :: Sinks a -> Sinks a -> IO (Sinks a) Source
Send the same data to two consumers.
Given two argument sinks, yield a result sink. Pushing to the result sink causes the same element to be pushed to both argument sinks.
dup_io :: Sources a -> Sinks a -> IO (Sources a) Source
Send the same data to two consumers.
Given an argument source and argument sink, yield a result source. Pulling an element from the result source pulls from the argument source, and pushes that element to the sink, as well as returning it via the result source.
dup_oi :: Sinks a -> Sources a -> IO (Sources a) Source
Send the same data to two consumers.
Like dup_io
but with the arguments flipped.
connect_i :: Sources a -> IO (Sources a, Sources a) Source
Connect an argument source to two result sources.
Pulling from either result source pulls from the argument source. Each result source only gets the elements pulled at the time, so if one side pulls all the elements the other side won't get any.
Watching
watch_i :: (Int -> Array A a -> IO ()) -> Sources a -> IO (Sources a) Source
Hook a worker function to some sources, which will be passed every chunk that is pulled from each source.
- The worker is also passed the source index of the chunk that was pulled.
watch_o :: (Int -> Array A a -> IO ()) -> Sinks a -> IO (Sinks a) Source
Hook a worker function to some sinks, which will be passed every chunk that is pushed to each sink.
- The worker is also passed the source index of the chunk that was pushed.
trigger_o :: Int -> (Int -> Array A a -> IO ()) -> IO (Sinks a) Source
Create a bundle of sinks of the given arity that pass incoming chunks to a worker function.
- This is like
watch_o
, except that the incoming chunks are discarded after they are passed to the worker function
Ignorance
discard_o :: Int -> IO (Sinks a) Source
Create a bundle of sinks of the given arity that drop all data on the floor.
- The sinks is strict in the *chunks*, so they are demanded before being discarded.
- Haskell debugging thunks attached to the chunks will be demanded, but thunks attached to elements may not be -- depending on whether the chunk representation is strict in the elements.
ignore_o :: Int -> IO (Sinks a) Source
Create a bundle of sinks of the given arity that drop all data on the floor.
- As opposed to
discard_o
the sinks are non-strict in the chunks. - Haskell debugging thunks attached to the chunks will *not* be demanded.
Splitting
head_i :: Flow a => Int -> Int -> Sources a -> IO (Maybe ([a], Sources a)) Source
Given a source index and a length, split the a list of that length from the front of the source. Yields a new source for the remaining elements.
- We pull whole chunks from the source stream until we have
at least the desired number of elements. The leftover elements
in the final chunk are visible in the result
Sources
.
Grouping
:: (GroupsDict a u1 u2, Eq a) | |
=> Sources a | Input elements. |
-> IO (Sources (a, Int)) | Starting element and length of groups. |
Scan through some sources to find runs of matching elements, and count the lengths of those runs.
> F.toList1 0 =<< F.groups_i =<< F.fromList 1 "waabbbblle" [('w',1),('a',2),('b',4),('l',2),('e',1)]
:: GroupsDict a u1 u2 | |
=> (a -> a -> Bool) | Fn to check if consecutive elements are in the same group. |
-> Sources a | Input elements. |
-> IO (Sources (a, Int)) | Starting element and length of groups. |
Like groupsBy
, but take a function to determine whether two consecutive
values should be in the same group.
type GroupsDict a u1 u2 = GroupsDict Int IO A A u1 A u2 a Source
Dictionaries needed to perform a grouping.
Folding
Complete
:: (Flow b, Build a at) | |
=> (a -> b -> a) | Combining funtion. |
-> a | Starting value. |
-> Sources b | Input elements to fold. |
-> IO (Array A a) |
Fold all the elements of each stream in a bundle, one stream after the other, returning an array of fold results.
:: Flow b | |
=> (a -> b -> a) | Combining funtion. |
-> a | Starting value. |
-> Sources b | Input elements to fold. |
-> IO a |
Fold all the elements of each stream in a bundle, one stream after the other, returning an array of fold results.
Segmented
:: FoldsDict n a b u1 u2 u3 u4 | |
=> (a -> b -> b) | Worker function. |
-> b | Initial state when folding each segment. |
-> Sources (n, Int) | Segment lengths. |
-> Sources a | Input elements to fold. |
-> IO (Sources (n, b)) | Result elements. |
Given streams of lengths and values, perform a segmented fold where fold segments of values of the corresponding lengths are folded together.
> sSegs <- F.fromList 1 [('a', 1), ('b', 2), ('c', 4), ('d', 0), ('e', 1), ('f', 5 :: Int)] > sVals <- F.fromList 1 [10, 20, 30, 40, 50, 60, 70, 80, 90 :: Int] > F.toList1 0 =<< F.folds_i (+) 0 sSegs sVals [('a',10),('b',50),('c',220),('d',0),('e',80)]
If not enough input elements are available to fold a complete segment then no output is produced for that segment. However, trailing zero length segments still produce the initial value for the fold.
> sSegs <- F.fromList 1 [('a', 1), ('b', 2), ('c', 0), ('d', 0), ('e', 0 :: Int)] > sVals <- F.fromList 1 [10, 20, 30 :: Int] > F.toList1 0 =<< F.folds_i (*) 1 sSegs sVals [('a',10),('b',600),('c',1),('d',1),('e',1)]
type FoldsDict n a b u1 u2 u3 u4 = FoldsDict Int IO A u1 A u2 A u3 A u4 n a b Source
Dictionaries needed to perform a segmented fold.
:: FoldGroupsDict n a b u1 u2 | |
=> (n -> n -> Bool) | Fn to check if consecutive elements are in the same group. |
-> (a -> b -> b) | Worker function for the fold. |
-> b | Initial when folding each segment. |
-> Sources n | Names that determine groups. |
-> Sources a | Values to fold. |
-> IO (Sources (n, b)) |
Combination of groupsBy_i
and folds_i
. We determine the the segment
lengths while performing the folds.
Note that a SQL-like groupby aggregations can be performed using this function, provided the data is pre-sorted on the group key. For example, we can take the average of some groups of values:
> sKeys <- F.fromList 1 "waaaabllle" > sVals <- F.fromList 1 [10, 20, 30, 40, 50, 60, 70, 80, 90, 100 :: Double] > sResult <- F.map_i (\(key, (acc, n)) -> (key, acc / n)) =<< F.foldGroupsBy_i (==) (\x (acc, n) -> (acc + x, n + 1)) (0, 0) sKeys sVals > F.toList1 0 sResult [10.0,35.0,60.0,80.0,100.0]
type FoldGroupsDict n a b u1 u2 = (BulkI A n, Material A a, Material A n, Material A b, Unpack (Buffer A n) u1, Unpack (Buffer A b) u2) Source
Flow I/O
defaultChunkSize :: Integer Source
The default chunk size of 64kBytes.
Buckets
module Data.Repa.Flow.IO.Bucket
Sourcing
sourceCSV :: Array B Bucket -> IO (Sources (Array A (Array A Char))) Source
Read a file containing Comma-Separated-Values.
sourceTSV :: Array B Bucket -> IO (Sources (Array A (Array A Char))) Source
Read a file containing Tab-Separated-Values.
:: (Word8 -> Bool) | Detect the end of a record. |
-> Array B Bucket | Input Buckets. |
-> IO (Sources (Array A Word8)) |
Read complete records of data form a file, into chunks of the given length. We read as many complete records as will fit into each chunk.
The records are separated by a special terminating character, which the given predicate detects. After reading a chunk of data we seek the file to just after the last complete record that was read, so we can continue to read more complete records next time.
If we cannot fit at least one complete record in the chunk then perform the given failure action. Limiting the chunk length guards against the case where a large input file is malformed, as we won't try to read the whole file into memory.
- Data is read into foreign memory without copying it through the GHC heap.
- The provided file handle must support seeking, else you'll get an exception.
- Each file is closed the first time the consumer tries to pull a record from the associated stream when no more are available.
sourceLines :: Array B Bucket -> IO (Sources (Array A Char)) Source
Read complete lines of data from a text file, using the given chunk length. We read as many complete lines as will fit into each chunk.
- The trailing new-line characters are discarded.
- Data is read into foreign memory without copying it through the GHC heap.
- The provided file handle must support seeking, else you'll get an exception.
- Each file is closed the first time the consumer tries to pull a line from the associated stream when no more are available.
sourceChars :: Array B Bucket -> IO (Sources Char) Source
Read 8-bit ASCII characters from some files, using the given chunk length.
sourceBytes :: Array B Bucket -> IO (Sources Word8) Source
Read data from some files, using the given chunk length.
:: (Packable format, Target A (Value format)) | |
=> format | Binary format for each value. |
-> IO () | Action when a value cannot be converted. |
-> Array B Bucket | Input buckets. |
-> IO (Sources (Value format)) |
Read packed binary data from some buckets and unpack the values
to some Sources
.
The following uses the colors.bin
file produced by the sinkPacked
example:
> import Data.Repa.Flow as F > import Data.Repa.Convert.Format as F > :{ do let format = FixString ASCII 10 :*: Float64be :*: Int16be ss <- fromFiles' ["colors.bin"] $ sourcePacked format (error "convert failed") toList1 0 ss :} ["red" :*: (5.3 :*: 100), "green" :*: (2.8 :*: 93), "blue" :*: (0.99 :*: 42)]
Sinking
sinkLines :: Array B Bucket -> IO (Sinks (Array A Char)) Source
Write vectors of text lines to the given files handles.
:: (Packable format, Bulk A (Value format)) | |
=> format | Binary format for each value. |
-> IO () | Action when a value cannot be serialized. |
-> Array B Bucket | Output buckets. |
-> IO (Sinks (Value format)) |
Create sinks that convert values to a packed binary format and writes them to some buckets.
> import Data.Repa.Flow as F > import Data.Repa.Convert.Format as F > :{ do let format = FixString ASCII 10 :*: Float64be :*: Int16be let vals = listFormat format [ "red" :*: 5.3 :*: 100 , "green" :*: 2.8 :*: 93 , "blue" :*: 0.99 :*: 42 ] ss <- F.fromList 1 vals out <- toFiles' ["colors.bin"] $ sinkPacked format (error "convert failed") drainS ss out :}