-- Hoogle documentation, generated by Haddock
-- See Hoogle, http://www.haskell.org/hoogle/
-- | Data-parallel data flows.
--
-- Data-parallel data flows.
@package repa-flow
@version 4.2.2.1
module Data.Repa.Flow.States
class (Ord i, Eq i) => Next i
-- | Get the zero for this index type.
first :: Next i => i
-- | Given an index an arity, get the next index after this one, or
-- Nothing if there aren't any more.
next :: Next i => i -> i -> Maybe i
-- | Check if an index is valid for this arity.
check :: Next i => i -> i -> Bool
class (Ord i, Next i, Monad m) => States i m where data family Refs i m a
-- | Get the extent of the collection.
extentRefs :: States i m => Refs i m a -> i
-- | Allocate a new state of the given arity, also returning an index to
-- the first element of the collection.
newRefs :: States i m => i -> a -> m (Refs i m a)
-- | Write an element of the state.
readRefs :: States i m => Refs i m a -> i -> m a
-- | Read an element of the state.
writeRefs :: States i m => Refs i m a -> i -> a -> m ()
-- | Fold all the elements in a collection of refs.
foldRefsM :: States i m => (a -> b -> b) -> b -> Refs i m a -> m b
toListM :: States i m => Refs i m a -> m [a]
instance Data.Repa.Flow.States.Next ()
instance Data.Repa.Flow.States.Next GHC.Types.Int
instance Data.Repa.Flow.States.Next (GHC.Types.Int, GHC.Types.Int)
instance Data.Repa.Flow.States.States GHC.Types.Int GHC.Types.IO
instance Data.Repa.Flow.States.States GHC.Types.Int m => Data.Repa.Flow.States.States () m
module Data.Repa.Flow.IO.Bucket
-- | A bucket represents portion of a whole data-set on disk, and contains
-- a file handle that points to the next piece of data to be read or
-- written.
--
-- The bucket could be created from a portion of a single flat file, or
-- be one file of a pre-split data set. The main advantage over a plain
-- Handle is that a Bucket can represent a small portion of
-- a single large file.
data Bucket
Bucket :: Maybe FilePath -> Integer -> Maybe Integer -> Handle -> Bucket
-- | Physical location of the file, if known.
[bucketFilePath] :: Bucket -> Maybe FilePath
-- | Starting position of the bucket in the file, in bytes.
[bucketStartPos] :: Bucket -> Integer
-- | Maximum length of the bucket, in bytes.
--
-- If Nothing then the length is indeterminate, which is used when
-- writing to files.
[bucketLength] :: Bucket -> Maybe Integer
-- | File handle for the bucket.
--
-- If several buckets have been created from a single file, then all
-- buckets will have handles bound to that file, but they will be at
-- different positions.
[bucketHandle] :: Bucket -> Handle
-- | Open a file as a single bucket.
openBucket :: FilePath -> IOMode -> IO Bucket
-- | Wrap an existing file handle as a bucket.
--
-- The starting position is set to 0.
hBucket :: Handle -> IO Bucket
-- | Open some files as buckets and use them as Sources.
fromFiles :: [FilePath] -> (Array B Bucket -> IO b) -> IO b
-- | Like fromFiles', but take a list of file paths.
fromFiles' :: (Bulk l FilePath, Target l Bucket) => Array l FilePath -> (Array l Bucket -> IO b) -> IO b
-- | Open all the files in a directory as separate buckets.
--
-- This operation may fail with the same exceptions as
-- getDirectoryContents.
fromDir :: FilePath -> (Array B Bucket -> IO b) -> IO b
-- | Open a file containing atomic records and split it into the given
-- number of evenly sized buckets.
--
-- The records are separated by a special terminating charater, which the
-- given predicate detects. The file is split cleanly on record
-- boundaries, so we get a whole number of records in each bucket. As the
-- records can be of varying size the buckets are not guaranteed to have
-- exactly the same length, in either records or buckets, though we try
-- to give them the approximatly the same number of bytes.
fromSplitFile :: Int -> (Word8 -> Bool) -> FilePath -> (Array B Bucket -> IO b) -> IO b
-- | Like fromSplitFile but start at the given offset.
fromSplitFileAt :: Int -> (Word8 -> Bool) -> FilePath -> Integer -> (Array B Bucket -> IO b) -> IO b
-- | Open some files for writing as individual buckets and pass them to the
-- given consumer.
toFiles :: [FilePath] -> (Array B Bucket -> IO b) -> IO b
-- | Like toFiles, but take an array of FilePaths.
toFiles' :: (Bulk l FilePath, Target l Bucket) => Array l FilePath -> (Array l Bucket -> IO b) -> IO b
-- | Create a new directory of the given name, containing the given number
-- of buckets.
--
-- If the directory is named somedir then the files are named
-- somedir/000000, somedir/000001,
-- somedir/000002 and so on.
toDir :: Int -> FilePath -> (Array B Bucket -> IO b) -> IO b
-- | Given a list of directories, create those directories and open the
-- given number of output files per directory.
--
-- In the resulting array of buckets, the outer dimension indexes each
-- directory, and the inner one indexes each file in its directory.
--
-- For each directory somedir the files are named
-- somedir/000000, somedir/000001,
-- somedir/000002 and so on.
toDirs :: Int -> [FilePath] -> (Array (E B DIM2) Bucket -> IO b) -> IO b
-- | Close a bucket, releasing the contained file handle.
bClose :: Bucket -> IO ()
-- | Check if the bucket is currently open.
bIsOpen :: Bucket -> IO Bool
-- | Check if the contained file handle is at the end of the bucket.
bAtEnd :: Bucket -> IO Bool
-- | Seek to a position with a bucket.
bSeek :: Bucket -> SeekMode -> Integer -> IO ()
-- | Get some data from a bucket.
bGetArray :: Bucket -> Integer -> IO (Array F Word8)
-- | Put some data in a bucket.
bPutArray :: Bucket -> Array F Word8 -> IO ()
module Data.Repa.Flow.Generic.IO
-- | Read data from some files, using the given chunk length.
--
--
-- - Data is read into foreign memory without copying it through the
-- GHC heap.
-- - All chunks have the same size, except possibly the last one.
--
sourceBytes :: Bulk l Bucket => Integer -> Array l Bucket -> IO (Sources (Index l) IO (Array F Word8))
-- | Read 8-byte ASCII characters from some files, using the given chunk
-- length.
--
--
-- - Data is read into foreign memory without copying it through the
-- GHC heap.
-- - All chunks have the same size, except possibly the last one.
--
sourceChars :: Bulk l Bucket => Integer -> Array l Bucket -> IO (Sources (Index l) IO (Array F Char))
-- | Like sourceRecords, but produce all records in a single vector.
sourceChunks :: BulkI l Bucket => Integer -> (Word8 -> Bool) -> IO () -> Array l Bucket -> IO (Sources (Index l) IO (Array F Word8))
-- | Read complete records of data form a bucket, into chunks of the given
-- length. We read as many complete records as will fit into each chunk.
--
-- The records are separated by a special terminating character, which
-- the given predicate detects. After reading a chunk of data we seek the
-- bucket to just after the last complete record that was read, so we can
-- continue to read more complete records next time.
--
-- If we cannot fit at least one complete record in the chunk then
-- perform the given failure action. Limiting the chunk length guards
-- against the case where a large input file is malformed, as we won't
-- try to read the whole file into memory.
--
--
-- - Data is read into foreign memory without copying it through the
-- GHC heap.
-- - The provided file handle must support seeking, else you'll get an
-- exception.
--
sourceRecords :: BulkI l Bucket => Integer -> (Word8 -> Bool) -> IO () -> Array l Bucket -> IO (Sources Int IO (Array N (Array F Word8)))
-- | Read lines from a named text file, in a chunk-wise manner, converting
-- each line to values with the given format.
sourceLinesFormat :: (Unpackable format, Target A (Value format)) => Integer -> IO () -> IO (Array A Word8 -> IO ()) -> format -> Array B Bucket -> IO (Sources Int IO (Array A (Value format)))
-- | Read lines from a lazy byte string, in a chunk-wise manner, converting
-- each line to values with the given format.
sourceLinesFormatFromLazyByteString :: (Unpackable format, Target A (Value format)) => Int -> IO (Array A Word -> IO ()) -> format -> ByteString -> Int -> IO (Sources Int IO (Array A (Value format)))
-- | Write chunks of bytes to the given file handles.
--
--
-- - Data is written out directly from the provided buffer.
--
sinkBytes :: Bulk l Bucket => Array l Bucket -> IO (Sinks (Index l) IO (Array F Word8))
-- | Write chunks of 8-byte ASCII characters to the given file handles.
--
--
-- - Data is copied into a foreign buffer to truncate the characters to
-- 8-bits each before being written out.
--
sinkChars :: (Bulk l Bucket, BulkI r Char) => Array l Bucket -> IO (Sinks (Index l) IO (Array r Char))
-- | Write vectors of text lines to the given files handles.
--
--
-- - Data is copied into a new buffer to insert newlines before being
-- written out.
--
sinkLines :: (Bulk l Bucket, BulkI l1 (Array l2 Char), BulkI l2 Char) => Name l1 -> Name l2 -> Array l Bucket -> IO (Sinks (Index l) IO (Array l1 (Array l2 Char)))
-- | Create an output sieve that writes data to an indeterminate number of
-- output files. Each new element is appended to its associated file.
sieve_o :: Int -> Int -> (a -> Maybe (FilePath, Array F Word8)) -> IO (Sinks () IO a)
-- | Read a file containing Comma-Separated-Values.
--
-- TODO: handle escaped commas. TODO: check CSV file standard.
sourceCSV :: BulkI l Bucket => Integer -> IO () -> Array l Bucket -> IO (Sources Int IO (Array N (Array N (Array F Char))))
-- | Read a file containing Tab-Separated-Values.
sourceTSV :: BulkI l Bucket => Integer -> IO () -> Array l Bucket -> IO (Sources Int IO (Array N (Array N (Array F Char))))
-- | Everything flows.
--
-- This module defines generic flows. The other flow types defined in
-- Data.Repa.Flow.Chunked and Data.Repa.Flow.Simple are
-- specialisations of this generic one.
module Data.Repa.Flow.Generic
-- | A bundle of stream sources, indexed by a value of type i, in
-- some monad m, returning elements of type e.
--
-- Elements can be pulled from each stream in the bundle individually.
data Sources i m e
Sources :: i -> (i -> (e -> m ()) -> m () -> m ()) -> Sources i m e
-- | Number of sources in this bundle.
[sourcesArity] :: Sources i m e -> i
-- | Function to pull data from a bundle. Give it the index of the desired
-- stream, a continuation that accepts an element, and a continuation to
-- invoke when no more elements will ever be available.
[sourcesPull] :: Sources i m e -> i -> (e -> m ()) -> m () -> m ()
-- | A bundle of stream sinks, indexed by a value of type i, in
-- some monad m, returning elements of type e.
--
-- Elements can be pushed to each stream in the bundle individually.
data Sinks i m e
Sinks :: i -> (i -> e -> m ()) -> (i -> m ()) -> Sinks i m e
-- | Number of sources in the bundle.
[sinksArity] :: Sinks i m e -> i
-- | Push an element to one of the streams in the bundle.
[sinksPush] :: Sinks i m e -> i -> e -> m ()
-- | Signal that no more elements will ever be available for this sink. It
-- is ok to eject the same stream multiple times.
[sinksEject] :: Sinks i m e -> i -> m ()
-- | Pull all available values from the sources and push them to the sinks.
-- Streams in the bundle are processed sequentially, from first to last.
--
--
-- - If the Sources and Sinks have different numbers of
-- streams then we only evaluate the common subset.
--
drainS :: (Next i, Monad m) => Sources i m a -> Sinks i m a -> m ()
-- | Pull all available values from the sources and push them to the sinks,
-- in parallel. We fork a thread for each of the streams and evaluate
-- them all in parallel.
--
--
-- - If the Sources and Sinks have different numbers of
-- streams then we only evaluate the common subset.
--
drainP :: Sources Int IO a -> Sinks Int IO a -> IO ()
-- | Pull all available values from the sources and pass them to the given
-- action.
consumeS :: (Next i, Monad m) => (i -> a -> m ()) -> Sources i m a -> m ()
-- | Given an arity and a list of elements, yield sources that each produce
-- all the elements.
fromList :: States i m => i -> [a] -> m (Sources i m a)
-- | Drain a single source into a list.
toList1 :: States i m => i -> Sources i m a -> m [a]
-- | Drain the given number of elements from a single source into a list.
takeList1 :: States i m => Int -> i -> Sources i m a -> m [a]
-- | Push elements into the associated streams of a bundle of sinks.
pushList :: Monad m => [(i, a)] -> Sinks i m a -> m ()
-- | Push the elements of a list into the given stream of a bundle of
-- sinks.
pushList1 :: Monad m => i -> [a] -> Sinks i m a -> m ()
-- | Transform the stream indexes of a bundle of sources.
--
-- The given transform functions should be inverses of each other, else
-- you'll get a confusing result.
mapIndex_i :: Monad m => (i1 -> i2) -> (i2 -> i1) -> Sources i1 m a -> m (Sources i2 m a)
-- | Transform the stream indexes of a bundle of sinks.
--
-- The given transform functions should be inverses of each other, else
-- you'll get a confusing result.
mapIndex_o :: Monad m => (i1 -> i2) -> (i2 -> i1) -> Sinks i1 m a -> m (Sinks i2 m a)
-- | For a bundle of sources with a 2-d stream index, flip the components
-- of the index.
flipIndex2_i :: Monad m => Sources SH2 m a -> m (Sources SH2 m a)
-- | For a bundle of sinks with a 2-d stream index, flip the components of
-- the index.
flipIndex2_o :: Monad m => Sinks SH2 m a -> m (Sinks SH2 m a)
-- | Attach a finalizer to bundle of sources.
--
-- For each stream in the bundle, the finalizer will be called the first
-- time a consumer of that stream tries to pull an element when no more
-- are available.
--
-- The provided finalizer will be run after any finalizers already
-- attached to the source.
finalize_i :: States i m => (i -> m ()) -> Sources i m a -> m (Sources i m a)
-- | Attach a finalizer to a bundle of sinks.
--
-- For each stream in the bundle, the finalizer will be called the first
-- time that stream is ejected.
--
-- The provided finalizer will be run after any finalizers already
-- attached to the sink.
finalize_o :: States i m => (i -> m ()) -> Sinks i m a -> m (Sinks i m a)
-- | Project out a single stream source from a bundle.
project_i :: Monad m => i -> Sources i m a -> m (Sources () m a)
-- | Project out a single stream sink from a bundle.
project_o :: Monad m => i -> Sinks i m a -> m (Sinks () m a)
-- | Yield sources that always produce the same value.
repeat_i :: Monad m => i -> (i -> a) -> m (Sources i m a)
-- | Yield sources of the given length that always produce the same value.
replicate_i :: States i m => i -> Int -> (i -> a) -> m (Sources i m a)
-- | Prepend some more elements into the front of some sources.
prepend_i :: States i m => [a] -> Sources i m a -> m (Sources i m a)
-- | Like prepend_i but only prepend the elements to the streams
-- that match the given predicate.
prependOn_i :: States i m => (i -> Bool) -> [a] -> Sources i m a -> m (Sources i m a)
-- | Apply a function to every element pulled from some sources, producing
-- some new sources.
map_i :: Monad m => (a -> b) -> Sources i m a -> m (Sources i m b)
-- | Apply a function to every element pulled from some sources, producing
-- some new sources.
map_o :: Monad m => (a -> b) -> Sinks i m b -> m (Sinks i m a)
-- | Like map_i, but the worker function is also given the stream
-- index.
smap_i :: Monad m => (i -> a -> b) -> Sources i m a -> m (Sources i m b)
-- | Like map_o, but the worker function is also given the stream
-- index.
smap_o :: Monad m => (i -> a -> b) -> Sinks i m b -> m (Sinks i m a)
-- | Combine the elements of two flows with the given function. The worker
-- function is also given the stream index.
szipWith_ii :: (Ord i, Monad m) => (i -> a -> b -> c) -> Sources i m a -> Sources i m b -> m (Sources i m c)
-- | Like szipWith_ii, but take a bundle of Sinks for the
-- result elements, and yield a bundle of Sinks to accept the
-- b elements.
szipWith_io :: (Ord i, Monad m) => (i -> a -> b -> c) -> Sinks i m c -> Sources i m a -> m (Sinks i m b)
-- | Like szipWith_ii, but take a bundle of Sinks for the
-- result elements, and yield a bundle of Sinks to accept the
-- a elements.
szipWith_oi :: (Ord i, Monad m) => (i -> a -> b -> c) -> Sinks i m c -> Sources i m b -> m (Sinks i m a)
-- | Combination of fold and filter.
--
-- We walk over the stream start-to-end, maintaining an accumulator. At
-- each point we can chose to emit an element, or not.
compact_i :: (Monad m, States i m) => (s -> a -> (s, Maybe b)) -> s -> Sources i m a -> m (Sources i m b)
-- | Start-to-end scan over each stream in a bundle.
scan_i :: (Monad m, States i m) => (s -> a -> s) -> s -> Sources i m a -> m (Sources i m s)
-- | For each stream in a bundle of sources, associated the element with
-- their corresponding position in the stream.
indexed_i :: (Monad m, States i m) => Sources i m a -> m (Sources i m (Int, a))
-- | Send the same data to two consumers.
--
-- Given two argument sinks, yield a result sink. Pushing to the result
-- sink causes the same element to be pushed to both argument sinks.
dup_oo :: (Ord i, States i m) => Sinks i m a -> Sinks i m a -> m (Sinks i m a)
-- | Send the same data to two consumers.
--
-- Given an argument source and argument sink, yield a result source.
-- Pulling an element from the result source pulls from the argument
-- source, and pushes that element to the sink, as well as returning it
-- via the result source.
dup_io :: (Ord i, Monad m) => Sources i m a -> Sinks i m a -> m (Sources i m a)
-- | Send the same data to two consumers.
--
-- Like dup_io but with the arguments flipped.
dup_oi :: (Ord i, Monad m) => Sinks i m a -> Sources i m a -> m (Sources i m a)
-- | Connect an argument source to two result sources.
--
-- Pulling from either result source pulls from the argument source. Each
-- result source only gets the elements pulled at the time, so if one
-- side pulls all the elements the other side won't get any.
connect_i :: States i m => Sources i m a -> m (Sources i m a, Sources i m a)
-- | Given a bundle of sources containing several streams, produce a new
-- bundle containing a single stream that gets data from the former.
--
-- Streams from the source are consumed in their natural order, and a
-- complete stream is consumed before moving onto the next one.
--
--
-- > import Data.Repa.Flow.Generic
-- > toList1 () =<< funnel_i =<< fromList (3 :: Int) [11, 22, 33]
-- [11,22,33,11,22,33,11,22,33]
--
funnel_i :: (States i m, States () m) => Sources i m a -> m (Sources () m a)
-- | Given a bundle of sinks consisting of a single stream, produce a new
-- bundle of the given arity that sends all data to the former, ignoring
-- the stream index.
--
-- The argument stream is ejected only when all of the streams in the
-- result bundle have been ejected.
--
--
-- - Using this function in conjunction with parallel operators like
-- drainP introduces non-determinism. Elements pushed to
-- different streams in the result bundle could enter the single stream
-- in the argument bundle in any order.
--
--
--
-- > import Data.Repa.Flow.Generic
-- > import Data.Repa.Array.Material
-- > import Data.Repa.Nice
--
-- > let things = [(0 :: Int, "foo"), (1, "bar"), (2, "baz")]
-- > result <- capture_o B () (\k -> funnel_o 4 k >>= pushList things)
-- > nice result
-- [((),"foo"),((),"bar"),((),"baz")]
--
funnel_o :: States i m => i -> Sinks () m a -> m (Sinks i m a)
-- | Split the given number of elements from the head of a source returning
-- those elements in a list, and yielding a new source for the rest.
head_i :: States i m => Int -> Sources i m a -> i -> m ([a], Sources i m a)
-- | From a stream of values which has consecutive runs of idential values,
-- produce a stream of the lengths of these runs.
--
-- Example: groups [4, 4, 4, 3, 3, 1, 1, 1, 4] = [3, 2, 3, 1]
groups_i :: (Ord i, Monad m, Eq a) => Sources i m a -> m (Sources i m Int)
-- | Given a stream of flags and a stream of values, produce a new stream
-- of values where the corresponding flag was True. The length of the
-- result is the length of the shorter of the two inputs.
pack_ii :: (Ord i, Monad m) => Sources i m Bool -> Sources i m a -> m (Sources i m a)
-- | Segmented fold.
folds_ii :: (Ord i, Monad m) => (a -> a -> a) -> a -> Sources i m Int -> Sources i m a -> m (Sources i m a)
-- | Apply a monadic function to every element pulled from some sources,
-- producing some new sources.
watch_i :: Monad m => (i -> a -> m ()) -> Sources i m a -> m (Sources i m a)
-- | Pass elements to the provided action as they are pushed into the sink.
watch_o :: Monad m => (i -> a -> m ()) -> Sinks i m a -> m (Sinks i m a)
-- | Like watch_o but doesn't pass elements to another sink.
trigger_o :: Monad m => i -> (i -> a -> m ()) -> m (Sinks i m a)
-- | Create a bundle of sinks of the given arity and capture any data
-- pushed to it.
--
--
-- > import Data.Repa.Flow.Generic
-- > import Data.Repa.Array.Material
-- > import Data.Repa.Nice
-- > import Control.Monad
-- > liftM nice $ capture_o B 4 (k -> pushList [(0 :: Int, "foo"), (1, "bar"), (0, "baz")] k)
-- > [(0,"foo"),(1,"bar"),(0,"baz")]
--
capture_o :: (Target lDst (i, a), Index lDst ~ Int) => Name lDst -> i -> (Sinks i IO a -> IO ()) -> IO (Array lDst (i, a))
-- | Like capture_o but also return the r-esult of the push
-- function.
rcapture_o :: (Target lDst (i, a), Index lDst ~ Int) => Name lDst -> i -> (Sinks i IO a -> IO b) -> IO (Array lDst (i, a), b)
-- | A sink that ignores all incoming data.
--
--
-- - This sink is strict in the elements, so they are demanded before
-- being discarded.
-- - Haskell debugging thunks attached to the elements will be
-- demanded.
--
ignore_o :: Monad m => i -> m (Sinks i m a)
-- | A sink that drops all data on the floor.
--
--
-- - This sink is non-strict in the elements.
-- - Haskell tracing thunks attached to the elements will *not* be
-- demanded.
--
abandon_o :: Monad m => i -> m (Sinks i m a)
-- | Use the trace function from Debug.Trace to print each
-- element that is pushed to a sink.
--
--
-- - This function is intended for debugging only, and is not intended
-- for production code.
--
trace_o :: (Show i, Show a, Monad m) => i -> m (Sinks i m a)
-- | Given a bundle of sinks indexed by an Int, produce a result
-- sink for arrays.
--
-- Each time an array is pushed to the result sink, its elements are
-- pushed to the corresponding streams of the argument sink. If there are
-- more elements than sinks then then give them to the spill action.
--
--
-- | .. |
-- | [w0, x0, y0, z0] | :: Sinks () IO (Array l a)
-- | [w1, x1, y1, z1, u1] | (sink for a single stream of arrays)
-- | .. |
--
-- | | | | |
-- v v v v .------> spilled
--
-- | .. | .. | .. | .. |
-- | w0 | x0 | y0 | z0 | :: Sinks Int IO a
-- | w1 | x1 | y1 | z1 | (sink for several streams of elements)
-- | .. | .. | .. | .. |
--
distribute_o :: BulkI l a => (Int -> a -> IO ()) -> Sinks Int IO a -> IO (Sinks () IO (Array l a))
-- | Like distribute_o, but drop spilled elements on the floor.
ddistribute_o :: BulkI l a => Sinks Int IO a -> IO (Sinks () IO (Array l a))
-- | Like distribute_o, but with 2-d stream indexes.
--
-- Given the argument and result sinks, when pushing to the result the
-- stream index is used as the first component for the argument sink, and
-- the index of the element in its array is used as the second component.
--
-- If you want to the components of stream index the other way around
-- then apply flipIndex2_o to the argument sinks.
distribute2_o :: BulkI l a => (SH2 -> a -> IO ()) -> Sinks SH2 IO a -> IO (Sinks Int IO (Array l a))
-- | Like distribute2_o, but drop spilled elements on the floor.
ddistribute2_o :: BulkI l a => Sinks SH2 IO a -> IO (Sinks Int IO (Array l a))
-- | Given a bundle of argument sinks, produce a result sink. Arrays of
-- indices and elements are pushed to the result sink. On doing so, the
-- elements are pushed into the corresponding streams of the argument
-- sinks.
--
-- If the index associated with an element does not have a corresponding
-- stream in the argument sinks, then pass it to the provided spill
-- function.
--
--
-- | .. |
-- | [(0, v0), (1, v1), (0, v2), (0, v3), (2, v4)] | :: Sources Int IO (Array l (Int, a))
-- | .. |
-- \ \ |
-- \ .------------. |
-- v v .---------> spilled
--
-- | .. | .. |
-- | [v0, v2, v3] | [v1] | :: Sinks Int IO (Array l a)
-- | .. | .. |
--
--
-- The following example uses capture_o to demonstrate how the
-- shuffle_o operator can be used as one step of a bucket-sort. We
-- start with two arrays of key-value pairs. In the result, the values
-- from each block that had the same key are packed into the same tuple
-- (bucket).
--
--
-- > import Data.Repa.Flow.Generic as G
-- > import Data.Repa.Array as A
-- > import Data.Repa.Array.Material as A
-- > import Data.Repa.Nice
--
-- > let arr1 = A.fromList B [(0, 'a'), (1, 'b'), (2, 'c'), (0, 'd'), (0, 'c')]
-- > let arr2 = A.fromList B [(0, 'A'), (3, 'B'), (3, 'C')]
-- > result :: Array B (Int, Array U Char)
-- > <- capture_o B 4 (\k -> shuffle_o B (error "spilled") k
-- > >>= pushList1 () [arr1, arr2])
--
-- > nice result
-- [(0,"adc"),(1,"b"),(2,"c"),(0,"A"),(3,"BC")]
--
shuffle_o :: (BulkI lDst a, BulkI lSrc (Int, a), Windowable lDst a, Target lDst a, Elt a) => Name lSrc -> (Int -> Array lDst a -> IO ()) -> Sinks Int IO (Array lDst a) -> IO (Sinks () IO (Array lSrc (Int, a)))
-- | Like shuffle_o, but drop spilled elements on the floor.
dshuffle_o :: (BulkI lDst a, BulkI lSrc (Int, a), Windowable lDst a, Target lDst a, Elt a) => Name lSrc -> Sinks Int IO (Array lDst a) -> IO (Sinks () IO (Array lSrc (Int, a)))
-- | Like dshuffle_o, but use the given function to decide which
-- stream of the argument bundle each element should be pushed into.
--
--
-- > import Data.Repa.Flow.Generic as G
-- > import Data.Repa.Array as A
-- > import Data.Repa.Array.Material as A
-- > import Data.Repa.Nice
-- > import Data.Char
--
-- > let arr1 = A.fromList B "FooBAr"
-- > let arr2 = A.fromList B "BazLIKE"
-- > result :: Array B (Int, Array U Char)
-- <- capture_o B 2 (\k -> dshuffleBy_o B (\x -> if isUpper x then 0 else 1) k
-- >>= pushList1 () [arr1, arr2])
-- > nice result
-- [(0,"FBA"),(1,"oor"),(0,"BLIKE"),(1,"az")]
--
dshuffleBy_o :: (BulkI lDst a, BulkI lSrc a, Windowable lDst a, Target lDst a, Elt a) => Name lSrc -> (a -> Int) -> Sinks Int IO (Array lDst a) -> IO (Sinks () IO (Array lSrc a))
-- | Take elements from a flow and pack them into chunks. The chunks are
-- limited to the given maximum length. A predicate can also be supplied
-- to detect the last element in a chunk.
chunkOn_i :: (States i IO, TargetI lDst a) => Name lDst -> Int -> (a -> Bool) -> Sources i IO a -> IO (Sources i IO (Array lDst a))
-- | Take a flow of chunks and flatten it into a flow of the individual
-- elements.
unchunk_i :: (BulkI l a, States i IO) => Sources i IO (Array l a) -> IO (Sources i IO a)
-- | Input and Output for Chunked Flows.
--
-- Most functions in this module are re-exports of the ones from
-- Data.Repa.Flow.Generic.IO, but using the Sources and
-- Sinks type synonyms for chunked flows.
module Data.Repa.Flow.Chunked.IO
-- | Like fileSourceRecords, but taking an existing file handle.
sourceRecords :: BulkI l Bucket => Integer -> (Word8 -> Bool) -> IO () -> Array l Bucket -> IO (Sources Int IO N (Array F Word8))
-- | Read 8-bit ASCII characters from some files, using the given chunk
-- length.
sourceChars :: BulkI l Bucket => Integer -> Array l Bucket -> IO (Sources Int IO F Char)
-- | Read data from some files, using the given chunk length.
sourceBytes :: BulkI l Bucket => Integer -> Array l Bucket -> IO (Sources Int IO F Word8)
-- | Write 8-bit ASCII characters to the given file handles.
sinkChars :: BulkI l Bucket => Array l Bucket -> IO (Sinks Int IO F Char)
-- | Write chunks of data to the given file handles.
sinkBytes :: BulkI l Bucket => Array l Bucket -> IO (Sinks Int IO F Word8)
module Data.Repa.Flow.Generic.Debug
-- | Given a source index and a length, pull enough chunks from the source
-- to build a list of the requested length, and discard the remaining
-- elements in the final chunk.
--
--
-- - This function is intended for interactive debugging. If you want
-- to retain the rest of the final chunk then use head_i.
--
more :: (States i IO, Nicer a) => i -> Sources i IO a -> IO [Nice a]
-- | Like more but also specify now many elements you want.
more' :: (States i IO, Nicer a) => i -> Int -> Sources i IO a -> IO [Nice a]
-- | Like more, but print results in a tabular form to the console.
moret :: (States i IO, Nicer [a], Presentable (Nice [a])) => i -> Sources i IO a -> IO ()
-- | Like more', but print results in tabular form to the console.
moret' :: (States i IO, Nicer [a], Presentable (Nice [a])) => i -> Int -> Sources i IO a -> IO ()
-- | Like more, but show elements in their raw format.
morer :: States i IO => i -> Sources i IO a -> IO [a]
-- | Like more', but show elements in their raw format.
morer' :: States i IO => i -> Int -> Sources i IO a -> IO [a]
-- | Convert some value to a nice form.
--
-- In particular:
--
--
-- - Nested Arrays are converted to nested lists, so that they are
-- easier to work with on the ghci console.
-- - Lists of characters are wrapped into the Str data type, so
-- that they can be pretty printed differently by follow-on
-- processing.
--
--
-- As ghci automatically pretty prints lists, using nice is more
-- fun than trying to show the raw Repa array representations.
class Nicer a where type family Nice a :: *
nice :: Nicer a => a -> Nice a
-- | Convert some value to a form presentable to the user.
--
-- Like show but we allow the nesting structure to be preserved so
-- it can be displayed in tabular format.
class Presentable a
present :: Presentable a => a -> Present
module Data.Repa.Flow.Simple
-- | Source consisting of a single stream.
type Source m e = Sources () m e
-- | Sink consisting of a single stream.
type Sink m e = Sinks () m e
-- | Pull all available values from the source and push them to the sink.
drainS :: Monad m => Source m a -> Sink m a -> m ()
-- | Given an arity and a list of elements, yield a source that produces
-- all the elements.
fromList :: States () m => [a] -> m (Source m a)
-- | Drain a source into a list.
toList :: States () m => Source m a -> m [a]
-- | Drain the given number of elements from a single source into a list.
takeList :: States () m => Int -> Source m a -> m [a]
-- | Attach a finalizer to a source.
--
-- The finalizer will be called the first time a consumer of that stream
-- tries to pull an element when no more are available.
--
-- The provided finalizer will be run after any finalizers already
-- attached to the source.
finalize_i :: States () m => m () -> Source m a -> m (Source m a)
-- | Attach a finalizer to a sink.
--
-- The finalizer will be called the first time the stream is ejected.
--
-- The provided finalizer will be run after any finalizers already
-- attached to the sink.
finalize_o :: States () m => m () -> Sink m a -> m (Sink m a)
-- | Yield a source that always produces the same value.
repeat_i :: States () m => a -> m (Source m a)
-- | Yield a source of the given length that always produces the same
-- value.
replicate_i :: States () m => Int -> a -> m (Source m a)
-- | Prepend some more elements to the front of a source.
prepend_i :: States () m => [a] -> Source m a -> m (Source m a)
-- | Apply a function to every element pulled from some source, producing a
-- new source.
map_i :: States () m => (a -> b) -> Source m a -> m (Source m b)
-- | Apply a function to every element pushed to some sink, producing a new
-- sink.
map_o :: States () m => (a -> b) -> Sink m b -> m (Sink m a)
-- | Send the same data to two consumers.
--
-- Given two argument sinks, yield a result sink. Pushing to the result
-- sink causes the same element to be pushed to both argument sinks.
dup_oo :: States () m => Sink m a -> Sink m a -> m (Sink m a)
-- | Send the same data to two consumers.
--
-- Given an argument source and argument sink, yield a result source.
-- Pulling an element from the result source pulls from the argument
-- source, and pushes that element to the sink, as well as returning it
-- via the result source.
dup_io :: States () m => Source m a -> Sink m a -> m (Source m a)
-- | Send the same data to two consumers.
--
-- Like dup_io but with the arguments flipped.
dup_oi :: States () m => Sink m a -> Source m a -> m (Source m a)
-- | Connect an argument source to two result sources.
--
-- Pulling from either result source pulls from the argument source. Each
-- result source only gets the elements pulled at the time, so if one
-- side pulls all the elements the other side won't get any.
connect_i :: States () m => Source m a -> m (Source m a, Source m a)
-- | Split the given number of elements from the head of a source,
-- returning those elements in a list, and yielding a new source for the
-- rest.
head_i :: States () m => Int -> Source m a -> m ([a], Source m a)
-- | Peek at the given number of elements in the stream, returning a result
-- stream that still produces them all.
peek_i :: States () m => Int -> Source m a -> m ([a], Source m a)
-- | From a stream of values which has consecutive runs of idential values,
-- produce a stream of the lengths of these runs.
--
-- Example: groups [4, 4, 4, 3, 3, 1, 1, 1, 4] = [3, 2, 3, 1]
groups_i :: (Monad m, Eq a) => Source m a -> m (Source m Int)
-- | Given a stream of flags and a stream of values, produce a new stream
-- of values where the corresponding flag was True. The length of the
-- result is the length of the shorter of the two inputs.
pack_ii :: Monad m => Source m Bool -> Source m a -> m (Source m a)
-- | Segmented fold.
folds_ii :: Monad m => (a -> a -> a) -> a -> Source m Int -> Source m a -> m (Source m a)
-- | Apply a monadic function to every element pulled from a source
-- producing a new source.
watch_i :: Monad m => (a -> m ()) -> Source m a -> m (Source m a)
-- | Pass elements to the provided action as they are pushed to the sink.
watch_o :: Monad m => (a -> m ()) -> Sink m a -> m (Sink m a)
-- | Like watch but doesn't pass elements to another sink.
trigger_o :: Monad m => (a -> m ()) -> m (Sink m a)
-- | A sink that ignores all incoming elements.
--
-- This sink is strict in the elements, so they are demanded before being
-- discarded. Haskell debugging thunks attached to the elements will be
-- demanded.
ignore_o :: Monad m => m (Sink m a)
-- | A sink that drops all data on the floor.
--
-- This sink is non-strict in the elements. Haskell tracing thinks
-- attached to the elements will *not* be demanded.
abandon_o :: Monad m => m (Sink m a)
-- | Open some files as buckets and use them as Sources.
fromFiles :: [FilePath] -> (Array B Bucket -> IO b) -> IO b
-- | Read data from a file, using the given chunk length.
--
--
-- - Data is read into foreign memory without copying it through the
-- GHC heap.
-- - All chunks have the same size, except possibly the last one.
--
--
-- The file will be closed the first time the consumer tries to pull an
-- element from the associated stream when no more are available.
sourceBytes :: Integer -> Bucket -> IO (Source IO (Array F Word8))
-- | Read complete records of data from a file, using the given chunk
-- length
--
-- The records are separated by a special terminating character, which
-- the given predicate detects. After reading a chunk of data we seek to
-- just after the last complete record that was read, so we can continue
-- to read more complete records next time.
--
-- If we cannot find an end-of-record terminator in the chunk then apply
-- the given failure action. The records can be no longer than the chunk
-- length. This fact guards against the case where a large input file is
-- malformed and contains no end-of-record terminators, as we won't try
-- to read the whole file into memory.
--
--
-- - Data is read into foreign memory without copying it through the
-- GHC heap.
-- - All chunks have the same size, except possibly the last one.
-- - The provided file handle must support seeking, else you'll get an
-- exception.
--
--
-- The file will be closed the first time the consumer tries to pull an
-- element from the associated stream when no more are available.
sourceRecords :: Integer -> (Word8 -> Bool) -> IO () -> Bucket -> IO (Source IO (Array N (Array F Word8)))
-- | Open some files for writing as individual buckets and pass them to the
-- given consumer.
toFiles :: [FilePath] -> (Array B Bucket -> IO b) -> IO b
-- | Write chunks of data to the given files.
--
-- The file will be closed when the associated stream is ejected.
sinkBytes :: Bucket -> IO (Sink IO (Array F Word8))
module Data.Repa.Flow.Chunked
-- | A bundle of sources, where the elements are chunked into arrays.
type Sources i m l e = Sources i m (Array l e)
-- | A bundle of sinks, where the elements are chunked into arrays.
type Sinks i m l e = Sinks i m (Array l e)
-- | Shorthand for common type classes.
type Flow i m l a = (Ord i, Monad m, BulkI l a, States i m)
-- | Pull all available values from the sources and push them to the sinks.
drainS :: (Next i, Monad m) => Sources i m r a -> Sinks i m r a -> m ()
-- | Pull all available values from the sources and pass them to the given
-- action.
consumeS :: (Next i, Monad m, Bulk r a) => Sources i m r a -> (i -> a -> m ()) -> m ()
-- | Given an arity and a list of elements, yield sources that each produce
-- all the elements.
--
--
-- - All elements are stuffed into a single chunk, and each stream is
-- given the same chunk.
--
fromList :: (States i m, TargetI l a) => Name l -> i -> [a] -> m (Sources i m l a)
-- | Like fromLists but take a list of lists, where each of the
-- inner lists is packed into a single chunk.
fromLists :: (States i m, TargetI l a) => Name l -> i -> [[a]] -> m (Sources i m l a)
-- | Drain a single source into a list of elements.
toList1 :: (States i m, BulkI l a) => i -> Sources i m l a -> m [a]
-- | Drain a single source into a list of chunks.
toLists1 :: (States i m, BulkI l a) => i -> Sources i m l a -> m [[a]]
-- | Attach a finalizer to a bundle of sources.
--
-- For each stream in the bundle, the finalizer will be called the first
-- time a consumer of that stream tries to pull an element when no more
-- are available.
--
-- The provided finalizer will be run after any finalizers already
-- attached to the source.
finalize_i :: States i m => (i -> m ()) -> Sources i m l a -> m (Sources i m l a)
-- | Attach a finalizer to a bundle of sinks.
--
-- The finalizer will be called the first time the stream is ejected.
--
-- The provided finalizer will be run after any finalizers already
-- attached to the sink.
finalize_o :: States i m => (i -> m ()) -> Sinks i m l a -> m (Sinks i m l a)
replicates_i :: (Flow i m lSrc (Int, a), TargetI lDst a) => Name lDst -> Sources i m lSrc (Int, a) -> m (Sources i m lDst a)
-- | Map a function over elements pulled from a source.
smap_i :: (Flow i m l1 a, TargetI l2 b) => (i -> a -> b) -> Sources i m l1 a -> m (Sources i m l2 b)
-- | Map a function over elements pushed into a sink.
smap_o :: (Flow i m l1 a, TargetI l2 b) => (i -> a -> b) -> Sinks i m l2 b -> m (Sinks i m l1 a)
-- | Combine the elements of two flows with the given function.
szipWith_ii :: (Ord i, States i m, BulkI lSrc1 a, BulkI lSrc2 b, TargetI lDst c, Windowable lSrc1 a, Windowable lSrc2 b) => Name lDst -> (i -> a -> b -> c) -> Sources i m lSrc1 a -> Sources i m lSrc2 b -> m (Sources i m lDst c)
-- | Apply a generic stream process to all the streams in a bundle of
-- sources.
process_i :: (States i m, BulkI lSrc a, Bulk lDst b, Bulk lDst (Array lDst b), TargetI lDst b, TargetI lDst (Array lDst b)) => (s -> a -> (s, Array lDst b)) -> s -> Sources i m lSrc a -> m (Sources i m lDst b)
-- | Apply a generic stream process to all the streams in a bundle of
-- sources.
unfolds_i :: (States i m, BulkI lSrc a, Bulk lDst b, TargetI lDst b) => (a -> s -> StepUnfold s b) -> s -> Sources i m lSrc a -> m (Sources i m lDst b)
data StepUnfold s a :: * -> * -> *
StepUnfoldGive :: a -> s -> StepUnfold s a
StepUnfoldNext :: a -> s -> StepUnfold s a
StepUnfoldBump :: s -> StepUnfold s a
StepUnfoldFinish :: s -> StepUnfold s a
-- | Split the given number of elements from the head of a source,
-- retrurning those elements in a list, and yielding a new source for the
-- rest.
--
--
-- - We pull whole chunks from the source stream until we have
-- at least the desired number of elements. The leftover elements in the
-- final chunk are visible in the result Sources.
--
head_i :: (States i m, Windowable l a, Index l ~ Int) => Int -> Sources i m l a -> i -> m ([a], Sources i m l a)
-- | From a stream of values which has consecutive runs of idential values,
-- produce a stream of the lengths of these runs.
--
--
-- groupsBy (==) [4, 4, 4, 3, 3, 1, 1, 1, 4]
-- => [3, 2, 3, 1]
--
groupsBy_i :: GroupsDict i m lVal lGrp lLen a => Name lGrp -> Name lLen -> (a -> a -> Bool) -> Sources i m lVal a -> m (Sources i m (T2 lGrp lLen) (a, Int))
-- | Dictionaries needed to perform a grouping.
type GroupsDict i m lVal lGrp lLen a = (Flow i m lVal a, Index lVal ~ Int, TargetI lGrp a, TargetI lLen Int)
-- | Fold all elements of all streams in a bundle individually, returning
-- an array of per-stream results.
foldlS :: (States Int m, Target lDst a, Index lDst ~ Int, BulkI lSrc b) => Name lDst -> (a -> b -> a) -> a -> Sources Int m lSrc b -> m (Array lDst a)
-- | Fold all elements of all streams in a bundle together, one stream
-- after the other, returning the single final value.
foldlAllS :: (States () m, BulkI lSrc b) => (a -> b -> a) -> a -> Sources Int m lSrc b -> m a
-- | Segmented fold over vectors of segment lengths and input values.
folds_i :: FoldsDict i m lSeg lElt lGrp lRes n a b => Name lGrp -> Name lRes -> (a -> b -> b) -> b -> Sources i m lSeg (n, Int) -> Sources i m lElt a -> m (Sources i m (T2 lGrp lRes) (n, b))
-- | Dictionaries needed to perform a segmented fold.
type FoldsDict i m lSeg lElt lGrp lRes n a b = (States i m, Windowable lSeg (n, Int), Windowable lElt a, BulkI lSeg (n, Int), BulkI lElt a, BulkI lGrp n, BulkI lRes b, TargetI lElt a, TargetI lGrp n, TargetI lRes b)
-- | Hook a monadic function to some sources, which will be passed every
-- chunk that is pulled from the result.
watch_i :: Monad m => (i -> Array l a -> m ()) -> Sources i m l a -> m (Sources i m l a)
-- | Hook a monadic function to some sinks, which will be passed every
-- chunk that is pushed to the result.
watch_o :: Monad m => (i -> Array l a -> m ()) -> Sinks i m l a -> m (Sinks i m l a)
-- | Like watch_o but discard the incoming chunks after they are
-- passed to the function.
trigger_o :: Monad m => i -> (i -> Array l a -> m ()) -> m (Sinks i m l a)
-- | A sink that ignores all incoming data.
--
--
-- - The sinks is strict in the *chunks*, so they are demanded before
-- being discarded. Haskell debugging thunks attached to the chunks will
-- be demanded, but thunks attached to elements may not be -- depending
-- on whether the chunk representation is strict in the elements.
--
ignore_o :: Monad m => i -> m (Sinks i m l a)
-- | Yield a bundle of sinks of the given arity that drops all data on the
-- floor.
--
-- This sink is non-strict in the chunks. Haskell tracing thunks attached
-- to the chunks will *not* be demanded.
abandon_o :: Monad m => i -> m (Sinks i m l a)
module Data.Repa.Flow.Auto.ZipWith
-- | Combine corresponding elements of three sources with the given
-- function.
zipWith3_i :: (Flow a, Flow b, Flow c, Build b, Build c, Build d) => (a -> b -> c -> d) -> Sources a -> Sources b -> Sources c -> IO (Sources d)
-- | Combine corresponding elements of four sources with the given
-- function.
zipWith4_i :: (Flow a, Flow b, Flow c, Flow d, Build a, Build b, Build c, Build d, Build e) => (a -> b -> c -> d -> e) -> Sources a -> Sources b -> Sources c -> Sources d -> IO (Sources e)
-- | Combine corresponding elements of five sources with the given
-- function.
zipWith5_i :: (Flow a, Flow b, Flow c, Flow d, Flow e, Build a, Build b, Build c, Build d, Build e, Build f) => (a -> b -> c -> d -> e -> f) -> Sources a -> Sources b -> Sources c -> Sources d -> Sources e -> IO (Sources f)
-- | Combine corresponding elements of six sources with the given function.
zipWith6_i :: (Flow a, Flow b, Flow c, Flow d, Flow e, Flow f, Build a, Build b, Build c, Build d, Build e, Build f, Build g) => (a -> b -> c -> d -> e -> f -> g) -> Sources a -> Sources b -> Sources c -> Sources d -> Sources e -> Sources f -> IO (Sources g)
-- | Combine corresponding elements of seven sources with the given
-- function.
zipWith7_i :: (Flow a, Flow b, Flow c, Flow d, Flow e, Flow f, Flow g, Build a, Build b, Build c, Build d, Build e, Build f, Build g, Build h) => (a -> b -> c -> d -> e -> f -> g -> h) -> Sources a -> Sources b -> Sources c -> Sources d -> Sources e -> Sources f -> Sources g -> IO (Sources h)
-- | This module defines the default specialisation of flows that appears
-- in Data.Repa.Flow. Each stream in the bundle is indexed by a
-- single integer, and stream state is stored using the IO monad.
module Data.Repa.Flow.Auto
-- | A bundle of stream sources, where the elements of the stream are
-- chunked into arrays.
type Sources a = Sources Int IO A a
-- | A bundle of stream sinks, where the elements of the stream are chunked
-- into arrays.
type Sinks a = Sinks Int IO A a
-- | Shorthand for common type classes.
type Flow a = (Flow Int IO A a, Windowable A a)
-- | Yield the number of streams in the bundle.
sourcesArity :: Sources a -> Int
-- | Yield the number of streams in the bundle.
sinksArity :: Sinks a -> Int
-- | Given an arity and a list of elements, yield sources that each produce
-- all the elements.
--
--
-- - All elements are stuffed into a single chunk, and each stream is
-- given the same chunk.
--
fromList :: Build a => Int -> [a] -> IO (Sources a)
-- | Like fromList but take a list of lists. Each each of the inner
-- lists is packed into a single chunk.
fromLists :: Build a => Int -> [[a]] -> IO (Sources a)
-- | Drain a single source from a bundle into a list of elements.
--
--
-- - If the index does not specify a valid stream then the result will
-- be empty.
--
toList1 :: Build a => Int -> Sources a -> IO [a]
-- | Drain a single source from a bundle into a list of chunks.
--
--
-- - If the index does not specify a valid stream then the result will
-- be empty.
--
toLists1 :: Build a => Int -> Sources a -> IO [[a]]
-- | Given an arity and an array of elements, yield sources that each
-- produce all the elements.
--
--
-- - All elements are stuffed into a single chunk, and each stream is
-- given the same chunk.
--
fromArray :: Build a => Int -> Array a -> IO (Sources a)
-- | Like fromArray but take an array of arrays. Each of the inner
-- arrays is packed into a single chunk.
fromArrays :: (Elem a, Build a) => Int -> Array (Array a) -> IO (Sources a)
-- | Drain a single source from a bundle into an array of elements.
--
--
-- - If the index does not specify a valid stream then the result will
-- be empty.
--
toArray1 :: (Elem a, Build a) => Int -> Sources a -> IO (Array a)
-- | Drain a single source from a bundle into an array of elements.
--
--
-- - If the index does not specify a valid stream then the result will
-- be empty.
--
toArrays1 :: (Elem a, Build a) => Int -> Sources a -> IO (Array (Array a))
-- | Pull all available values from the sources and push them to the sinks.
-- Streams in the bundle are processed sequentially, from first to last.
--
--
-- - If the Sources and Sinks have different numbers of
-- streams then we only evaluate the common subset.
--
drainS :: Sources a -> Sinks a -> IO ()
-- | Pull all available values from the sources and push them to the sinks,
-- in parallel. We fork a thread for each of the streams and evaluate
-- them all in parallel.
--
--
-- - If the Sources and Sinks have different numbers of
-- streams then we only evaluate the common subset.
--
drainP :: Sources a -> Sinks a -> IO ()
-- | Pull all available values from the sources and pass them to the given
-- action.
consumeS :: Bulk A a => Sources a -> (Int -> a -> IO ()) -> IO ()
-- | Segmented replicate.
replicates_i :: (Flow (Int, a), Build a) => Sources (Int, a) -> IO (Sources a)
-- | Apply a function to all elements pulled from some sources.
map_i :: (Flow a, Build b) => (a -> b) -> Sources a -> IO (Sources b)
-- | Apply a function to all elements pushed to some sinks.
map_o :: (Flow a, Build b) => (a -> b) -> Sinks b -> IO (Sinks a)
-- | Combine corresponding elements of two sources with the given function.
zipWith_i :: (Flow a, Flow b, Build c) => (a -> b -> c) -> Sources a -> Sources b -> IO (Sources c)
-- | Apply a generic stream process to a bundle of sources.
process_i :: (Flow a, Flow b, Build b) => (s -> a -> (s, Array b)) -> s -> Sources a -> IO (Sources b)
-- | Send the same data to two consumers.
--
-- Given two argument sinks, yield a result sink. Pushing to the result
-- sink causes the same element to be pushed to both argument sinks.
dup_oo :: Sinks a -> Sinks a -> IO (Sinks a)
-- | Send the same data to two consumers.
--
-- Given an argument source and argument sink, yield a result source.
-- Pulling an element from the result source pulls from the argument
-- source, and pushes that element to the sink, as well as returning it
-- via the result source.
dup_io :: Sources a -> Sinks a -> IO (Sources a)
-- | Send the same data to two consumers.
--
-- Like dup_io but with the arguments flipped.
dup_oi :: Sinks a -> Sources a -> IO (Sources a)
-- | Connect an argument source to two result sources.
--
-- Pulling from either result source pulls from the argument source. Each
-- result source only gets the elements pulled at the time, so if one
-- side pulls all the elements the other side won't get any.
connect_i :: Sources a -> IO (Sources a, Sources a)
-- | Hook a worker function to some sources, which will be passed every
-- chunk that is pulled from each source.
--
--
-- - The worker is also passed the source index of the chunk that was
-- pulled.
--
watch_i :: (Int -> Array A a -> IO ()) -> Sources a -> IO (Sources a)
-- | Hook a worker function to some sinks, which will be passed every chunk
-- that is pushed to each sink.
--
--
-- - The worker is also passed the source index of the chunk that was
-- pushed.
--
watch_o :: (Int -> Array A a -> IO ()) -> Sinks a -> IO (Sinks a)
-- | Create a bundle of sinks of the given arity that pass incoming chunks
-- to a worker function.
--
--
-- - This is like watch_o, except that the incoming chunks are
-- discarded after they are passed to the worker function
--
trigger_o :: Int -> (Int -> Array A a -> IO ()) -> IO (Sinks a)
-- | Create a bundle of sinks of the given arity that drop all data on the
-- floor.
--
--
-- - Haskell debugging thunks attached to the chunks will be demanded,
-- but thunks attached to elements may not be -- depending on whether the
-- chunk representation is strict in the elements.
--
ignore_o :: Int -> IO (Sinks a)
-- | Create a bundle of sinks of the given arity that drop all data on the
-- floor.
--
--
-- - As opposed to ignore_o the sinks are non-strict in the
-- chunks.
-- - Haskell debugging thunks attached to the chunks will *not* be
-- demanded.
--
abandon_o :: Int -> IO (Sinks a)
-- | Given a source index and a length, split the a list of that length
-- from the front of the source. Yields a new source for the remaining
-- elements.
--
--
-- - We pull whole chunks from the source stream until we have
-- at least the desired number of elements. The leftover elements in the
-- final chunk are visible in the result Sources.
--
head_i :: Flow a => Int -> Int -> Sources a -> IO (Maybe ([a], Sources a))
-- | Concatenate a flow of arrays into a flow of the elements.
concat_i :: (Flow a, Build a) => Sources (Array a) -> IO (Sources a)
-- | Select a single column from a flow of rows of fields.
select_i :: (Select n (Array fs), Select' n (Array fs) ~ Array (Select' n fs)) => Nat n -> Sources fs -> IO (Sources (Select' n fs))
-- | Select a single column from a flow of fields.
select_o :: (Select n (Array fs), Select' n (Array fs) ~ Array (Select' n fs)) => Nat n -> Sinks (Select' n fs) -> IO (Sinks fs)
-- | Discard a single column from a flow of fields.
discard_i :: (Discard n (Array fs), Discard' n (Array fs) ~ Array (Discard' n fs)) => Nat n -> Sources fs -> IO (Sources (Discard' n fs))
-- | Discard a single column from a flow of fields.
discard_o :: (Discard n (Array fs), Discard' n (Array fs) ~ Array (Discard' n fs)) => Nat n -> Sinks (Discard' n fs) -> IO (Sinks fs)
-- | Mask columns from a flow of fields.
mask_i :: (Mask ms (Array fs), Mask' ms (Array fs) ~ Array (Mask' ms fs)) => ms -> Sources fs -> IO (Sources (Mask' ms fs))
-- | Mask columns from a flow of fields.
mask_o :: (Mask ms (Array fs), Mask' ms (Array fs) ~ Array (Mask' ms fs)) => ms -> Sinks (Mask' ms fs) -> IO (Sinks fs)
-- | Scan through some sources to find runs of matching elements, and count
-- the lengths of those runs.
--
--
-- > F.toList1 0 =<< F.groups_i =<< F.fromList 1 "waabbbblle"
-- [('w',1),('a',2),('b',4),('l',2),('e',1)]
--
groups_i :: (GroupsDict a, Eq a) => Sources a -> IO (Sources (a, Int))
-- | Like groupsBy, but take a function to determine whether two
-- consecutive values should be in the same group.
groupsBy_i :: GroupsDict a => (a -> a -> Bool) -> Sources a -> IO (Sources (a, Int))
-- | Dictionaries needed to perform a grouping.
type GroupsDict a = GroupsDict Int IO A A A a
-- | Fold all the elements of each stream in a bundle, one stream after the
-- other, returning an array of fold results.
foldlS :: (Flow b, Build a) => (a -> b -> a) -> a -> Sources b -> IO (Array A a)
-- | Fold all the elements of each stream in a bundle, one stream after the
-- other, returning an array of fold results.
foldlAllS :: Flow b => (a -> b -> a) -> a -> Sources b -> IO a
-- | Given streams of lengths and values, perform a segmented fold where
-- fold segments of values of the corresponding lengths are folded
-- together.
--
--
-- > sSegs <- F.fromList 1 [('a', 1), ('b', 2), ('c', 4), ('d', 0), ('e', 1), ('f', 5 :: Int)]
-- > sVals <- F.fromList 1 [10, 20, 30, 40, 50, 60, 70, 80, 90 :: Int]
--
-- > F.toList1 0 =<< F.folds_i (+) 0 sSegs sVals
-- [('a',10),('b',50),('c',220),('d',0),('e',80)]
--
--
-- If not enough input elements are available to fold a complete segment
-- then no output is produced for that segment. However, trailing zero
-- length segments still produce the initial value for the fold.
--
--
-- > sSegs <- F.fromList 1 [('a', 1), ('b', 2), ('c', 0), ('d', 0), ('e', 0 :: Int)]
-- > sVals <- F.fromList 1 [10, 20, 30 :: Int]
--
-- > F.toList1 0 =<< F.folds_i (*) 1 sSegs sVals
-- [('a',10),('b',600),('c',1),('d',1),('e',1)]
--
folds_i :: FoldsDict n a b => (a -> b -> b) -> b -> Sources (n, Int) -> Sources a -> IO (Sources (n, b))
-- | Dictionaries needed to perform a segmented fold.
type FoldsDict n a b = FoldsDict Int IO A A A A n a b
-- | Combination of groupsBy_i and folds_i. We determine the
-- the segment lengths while performing the folds.
--
-- Note that a SQL-like groupby aggregations can be performed using this
-- function, provided the data is pre-sorted on the group key. For
-- example, we can take the average of some groups of values:
--
--
-- > sKeys <- F.fromList 1 "waaaabllle"
-- > sVals <- F.fromList 1 [10, 20, 30, 40, 50, 60, 70, 80, 90, 100 :: Double]
--
-- > sResult <- F.map_i (\(key, (acc, n)) -> (key, acc / n))
-- =<< F.foldGroupsBy_i (==) (\x (acc, n) -> (acc + x, n + 1)) (0, 0) sKeys sVals
--
-- > F.toList1 0 sResult
-- [10.0,35.0,60.0,80.0,100.0]
--
foldGroupsBy_i :: (FoldGroupsDict n a b) => (n -> n -> Bool) -> (a -> b -> b) -> b -> Sources n -> Sources a -> IO (Sources (n, b))
type FoldGroupsDict n a b = (BulkI A n, Material A a, Material A n, Material A b)
-- | Attach a finalizer to some sources.
--
--
-- - For a given source, the finalizer will be called the first time a
-- consumer of that source tries to pull an element when no more are
-- available.
-- - The finalizer is given the index of the source that ended.
-- - The finalizer will be run after any finalizers already attached to
-- the source.
--
finalize_i :: (Int -> IO ()) -> Sources a -> IO (Sources a)
-- | Attach a finalizer to some sinks.
--
--
-- - For a given sink, the finalizer will be called the first time that
-- sink is ejected.
-- - The finalizer is given the index of the sink that was
-- ejected.
-- - The finalizer will be run after any finalizers already attached to
-- the sink.
--
finalize_o :: (Int -> IO ()) -> Sinks a -> IO (Sinks a)
module Data.Repa.Flow.Auto.Debug
-- | Given a source index and a length, pull enough chunks from the source
-- to build a list of the requested length, and discard the remaining
-- elements in the final chunk.
--
--
-- - This function is intended for interactive debugging. If you want
-- to retain the rest of the final chunk then use head_i.
--
more :: (Flow a, Nicer a) => Int -> Sources a -> IO (Maybe [Nice a])
-- | Like more but also specify now many elements you want.
more' :: (Flow a, Nicer a) => Int -> Int -> Sources a -> IO (Maybe [Nice a])
-- | Like more, but print results in a tabular form to the console.
moret :: (Flow a, Nicer [a], Presentable (Nice [a])) => Int -> Sources a -> IO ()
-- | Like more', but print results in tabular form to the console.
moret' :: (Flow a, Nicer [a], Presentable (Nice [a])) => Int -> Int -> Sources a -> IO ()
-- | Like more, but show elements in their raw format.
morer :: Flow a => Int -> Sources a -> IO (Maybe [a])
-- | Like more', but show elements in their raw format.
morer' :: Flow a => Int -> Int -> Sources a -> IO (Maybe [a])
-- | Convert some value to a nice form.
--
-- In particular:
--
--
-- - Nested Arrays are converted to nested lists, so that they are
-- easier to work with on the ghci console.
-- - Lists of characters are wrapped into the Str data type, so
-- that they can be pretty printed differently by follow-on
-- processing.
--
--
-- As ghci automatically pretty prints lists, using nice is more
-- fun than trying to show the raw Repa array representations.
class Nicer a where type family Nice a :: *
nice :: Nicer a => a -> Nice a
-- | Convert some value to a form presentable to the user.
--
-- Like show but we allow the nesting structure to be preserved so
-- it can be displayed in tabular format.
class Presentable a
present :: Presentable a => a -> Present
-- | Read and write files.
module Data.Repa.Flow.Auto.SizedIO
-- | Like sourceBytes, but with the default chunk size.
sourceBytes :: Integer -> Array B Bucket -> IO (Sources Word8)
-- | Like sourceChars, but with the default chunk size.
sourceChars :: Integer -> Array B Bucket -> IO (Sources Char)
-- | Like sourceLines, but with the default chunk size and error
-- action.
sourceLines :: Integer -> IO () -> Array B Bucket -> IO (Sources (Array A Char))
-- | Like sourceRecords, but with the default chunk size and error
-- action.
sourceRecords :: Integer -> (Word8 -> Bool) -> IO () -> Array B Bucket -> IO (Sources (Array A Word8))
-- | Read a file containing Tab-Separated-Values.
sourceTSV :: Integer -> IO () -> Array B Bucket -> IO (Sources (Array A (Array A Char)))
-- | Read a file containing Comma-Separated-Values.
sourceCSV :: Integer -> IO () -> Array B Bucket -> IO (Sources (Array A (Array A Char)))
-- | Read and write files.
--
-- The functions in this module are wrappers for the ones in
-- Data.Repa.Flow.Default.SizedIO that use a default chunk size of
-- 64kBytes and just call error if the source file appears
-- corruped.
module Data.Repa.Flow.Auto.IO
-- | The default chunk size of 64kBytes.
defaultChunkSize :: Integer
-- | Read data from some files, using the given chunk length.
sourceBytes :: Array B Bucket -> IO (Sources Word8)
-- | Read 8-bit ASCII characters from some files, using the given chunk
-- length.
sourceChars :: Array B Bucket -> IO (Sources Char)
-- | Read complete lines of data from a text file, using the given chunk
-- length. We read as many complete lines as will fit into each chunk.
--
--
-- - The trailing new-line characters are discarded.
-- - Data is read into foreign memory without copying it through the
-- GHC heap.
-- - The provided file handle must support seeking, else you'll get an
-- exception.
-- - Each file is closed the first time the consumer tries to pull a
-- line from the associated stream when no more are available.
--
sourceLines :: Array B Bucket -> IO (Sources (Array A Char))
-- | Read complete records of data form a file, into chunks of the given
-- length. We read as many complete records as will fit into each chunk.
--
-- The records are separated by a special terminating character, which
-- the given predicate detects. After reading a chunk of data we seek the
-- file to just after the last complete record that was read, so we can
-- continue to read more complete records next time.
--
-- If we cannot fit at least one complete record in the chunk then
-- perform the given failure action. Limiting the chunk length guards
-- against the case where a large input file is malformed, as we won't
-- try to read the whole file into memory.
--
--
-- - Data is read into foreign memory without copying it through the
-- GHC heap.
-- - The provided file handle must support seeking, else you'll get an
-- exception.
-- - Each file is closed the first time the consumer tries to pull a
-- record from the associated stream when no more are available.
--
sourceRecords :: (Word8 -> Bool) -> Array B Bucket -> IO (Sources (Array A Word8))
-- | Read a file containing Tab-Separated-Values.
sourceTSV :: Array B Bucket -> IO (Sources (Array A (Array A Char)))
-- | Read a file containing Comma-Separated-Values.
sourceCSV :: Array B Bucket -> IO (Sources (Array A (Array A Char)))
-- | Read the lines of a text file, converting each line to values with the
-- given format.
sourceFormatLn :: (Unpackable format, Target A (Value format)) => Integer -> IO () -> IO (Array A Word8 -> IO ()) -> format -> Array B Bucket -> IO (Sources (Value format))
-- | Write 8-bit bytes to some files.
sinkBytes :: Array B Bucket -> IO (Sinks Word8)
-- | Write vectors of text lines to the given files handles.
sinkLines :: Array B Bucket -> IO (Sinks (Array A Char))
-- | Write 8-bit ASCII characters to some files.
sinkChars :: Array B Bucket -> IO (Sinks Char)
-- | Create sinks that convert values to some format and writes them to
-- buckets.
--
--
-- > import Data.Repa.Flow as F
-- > import Data.Repa.Convert.Format as F
-- > :{
-- do let format = FixString ASCII 10 :*: Float64be :*: Int16be
-- let vals = listFormat format
-- [ "red" :*: 5.3 :*: 100
-- , "green" :*: 2.8 :*: 93
-- , "blue" :*: 0.99 :*: 42 ]
--
-- ss <- F.fromList 1 vals
-- out <- toFiles' ["colors.bin"]
-- $ sinkFormatLn format (error "convert failed")
-- drainS ss out
-- :}
--
sinkFormatLn :: (Packable format, Bulk A (Value format), Show format) => format -> IO () -> Array B Bucket -> IO (Sinks (Value format))
module Data.Repa.Flow.Auto.Format
-- | Pack elements into the given storage formats.
packFormat_i :: (Packable format, Elem (Value format)) => format -> Sources (Value format) -> IO (Sources (Array Word8))
-- | Like packFormat_i, but return sources of flat bytes.
flatPackFormat_i :: (Packable format, Elem (Value format)) => format -> Sources (Value format) -> IO (Sources Word8)
-- | Like packFormat_i, but also append a newline character after
-- every packed element.
packFormatLn_i :: (Packable format, Elem (Value format)) => format -> Sources (Value format) -> IO (Sources (Array Word8))
-- | Like packFormatLn_i, but return sources of flat bytes.
flatPackFormatLn_i :: (Packable format, Elem (Value format)) => format -> Sources (Value format) -> IO (Sources Word8)
-- | Like packFormatLn_i, but use a default, human-readable format
-- to encode the values.
packAsciiLn_i :: (FormatAscii a, a ~ Value (FormatAscii' a), Elem a, Packable (FormatAscii' a)) => Sources a -> IO (Sources (Array Word8))
-- | Like packAsciiLn_i, but return sources of flat bytes.
flatPackAsciiLn_i :: (FormatAscii a, a ~ Value (FormatAscii' a), Elem a, Packable (FormatAscii' a)) => Sources a -> IO (Sources Word8)
-- | Like packFormatLn_i, but use a default, human-readable format
-- to encode the values.
keyPackAsciiLn_i :: (FormatAscii a, a ~ Value (FormatAscii' a), Elem a, Packable (FormatAscii' a), Elem k, Build k) => Sources (k, a) -> IO (Sources (k, Array Word8))
-- | Getting Started
--
-- A flow consists of a bundle of individual streams. Here we create a
-- bundle of two streams, using different files for each. Data will be
-- read in chunks, using the default chunk size of 64kBytes.
--
--
-- > import Data.Repa.Array as A
-- > import Data.Repa.Flow as F
-- > import Data.Repa.Flow.IO as F
-- > import Data.Repa.Flow.Auto.Debug as F
--
-- > ws <- fromFiles ["/usr/share/dict/words", "/usr/share/dict/cracklib-small"] sourceLines
--
--
-- Show the first few elements of the first chunk of the first file.
--
--
-- > more 0 ws
-- Just ["A","A's","AA's","AB's","ABM's","AC's","ACTH's","AI's" ...]
--
--
-- The more function is helpful for debugging. It pulls a whole
-- chunk from a source, displays the requested number of elements from
-- the front of it, then discards the rest. In production code you could
-- use head_i to split a few elements from a stream while
-- retaining the rest.
--
-- Use more' to show more elements at a time. We've already pulled
-- the first chunk, so here are the first 100 elements from the second
-- chunk:
--
--
-- > more' 0 100 ws
-- Just ["Jubal","Judah","Judaic","Judaism","Judaism's","Judaisms","Judas" ...]
--
--
-- Use moret to display elements in tabular form. Here are the
-- first few elements of the second stream in the bundle:
--
--
-- > moret 1 ws
-- "10th"
-- "1st"
-- "2nd"
-- "3rd"
-- "4th"
-- "5th"
-- ...
--
--
-- Lets convert the characters to upper-case.
--
--
-- > import Data.Char
-- > up <- F.map_i (A.map toUpper) ws
-- > more 0 up
-- Just ["UTOPIAN","UTOPIAN'S","UTOPIANS","UTOPIAS","UTRECHT" ...]
--
--
-- Flows are data-parallel, which means operators like map_i apply
-- to all streams in the bundle. The second stream has been converted to
-- upper-case as well:
--
--
-- > more 1 up
-- Just ["BROWNER","BROWNEST","BROWNIAN","BROWNIE","BROWNIE'S" ...]
--
--
-- Lets write out the data to some files. There are two streams in the
-- bundle, so open a file for each stream:
--
--
-- > out <- toFiles ["out1.txt", "out2.txt"] sinkLines
--
--
-- Note that the ws and up we used before were bundles
-- of stream Sources whereas out is a bundle of stream
-- Sinks. When we used the map_i operator before the
-- _i (input) suffix indicates that this is a transformer of
-- Sources. There is a related map_o (output) operator for
-- Sinks.
--
-- Now that we have a bundle of Sources, and some matching
-- Sinks, we can drainS all of the data from the former
-- into the latter.
--
--
-- > F.drainS up out
--
--
-- At this point we can run an external shell command to check the
-- output.
--
--
-- > :! head out1.txt
-- BEARSKIN'S
-- BEARSKINS
-- BEAST
-- BEAST'S
-- BEASTLIER
-- BEASTLIEST
-- BEASTLINESS
-- BEASTLINESS'S
-- BEASTLY
-- BEASTLY'S
--
--
-- Performance
--
-- Althogh repa-flow can be used productively in the ghci REPL,
-- performance won't be great because you will be running unspecialised,
-- polymorphic code. For best results you should write a complete program
-- and compile it with ghc -fllvm -O2 Main.hs.
module Data.Repa.Flow
-- | A bundle of stream sources, where the elements of the stream are
-- chunked into arrays.
type Sources a = Sources Int IO A a
-- | A bundle of stream sinks, where the elements of the stream are chunked
-- into arrays.
type Sinks a = Sinks Int IO A a
-- | Shorthand for common type classes.
type Flow a = (Flow Int IO A a, Windowable A a)
-- | Yield the number of streams in the bundle.
sourcesArity :: Sources a -> Int
-- | Yield the number of streams in the bundle.
sinksArity :: Sinks a -> Int
-- | Pull all available values from the sources and push them to the sinks.
-- Streams in the bundle are processed sequentially, from first to last.
--
--
-- - If the Sources and Sinks have different numbers of
-- streams then we only evaluate the common subset.
--
drainS :: Sources a -> Sinks a -> IO ()
-- | Pull all available values from the sources and push them to the sinks,
-- in parallel. We fork a thread for each of the streams and evaluate
-- them all in parallel.
--
--
-- - If the Sources and Sinks have different numbers of
-- streams then we only evaluate the common subset.
--
drainP :: Sources a -> Sinks a -> IO ()
-- | Given an arity and a list of elements, yield sources that each produce
-- all the elements.
--
--
-- - All elements are stuffed into a single chunk, and each stream is
-- given the same chunk.
--
fromList :: Build a => Int -> [a] -> IO (Sources a)
-- | Like fromList but take a list of lists. Each each of the inner
-- lists is packed into a single chunk.
fromLists :: Build a => Int -> [[a]] -> IO (Sources a)
-- | Drain a single source from a bundle into a list of elements.
--
--
-- - If the index does not specify a valid stream then the result will
-- be empty.
--
toList1 :: Build a => Int -> Sources a -> IO [a]
-- | Drain a single source from a bundle into a list of chunks.
--
--
-- - If the index does not specify a valid stream then the result will
-- be empty.
--
toLists1 :: Build a => Int -> Sources a -> IO [[a]]
-- | Given an arity and an array of elements, yield sources that each
-- produce all the elements.
--
--
-- - All elements are stuffed into a single chunk, and each stream is
-- given the same chunk.
--
fromArray :: Build a => Int -> Array a -> IO (Sources a)
-- | Like fromArray but take an array of arrays. Each of the inner
-- arrays is packed into a single chunk.
fromArrays :: (Elem a, Build a) => Int -> Array (Array a) -> IO (Sources a)
-- | Drain a single source from a bundle into an array of elements.
--
--
-- - If the index does not specify a valid stream then the result will
-- be empty.
--
toArray1 :: (Elem a, Build a) => Int -> Sources a -> IO (Array a)
-- | Drain a single source from a bundle into an array of elements.
--
--
-- - If the index does not specify a valid stream then the result will
-- be empty.
--
toArrays1 :: (Elem a, Build a) => Int -> Sources a -> IO (Array (Array a))
-- | Attach a finalizer to some sources.
--
--
-- - For a given source, the finalizer will be called the first time a
-- consumer of that source tries to pull an element when no more are
-- available.
-- - The finalizer is given the index of the source that ended.
-- - The finalizer will be run after any finalizers already attached to
-- the source.
--
finalize_i :: (Int -> IO ()) -> Sources a -> IO (Sources a)
-- | Attach a finalizer to some sinks.
--
--
-- - For a given sink, the finalizer will be called the first time that
-- sink is ejected.
-- - The finalizer is given the index of the sink that was
-- ejected.
-- - The finalizer will be run after any finalizers already attached to
-- the sink.
--
finalize_o :: (Int -> IO ()) -> Sinks a -> IO (Sinks a)
-- | Segmented replicate.
replicates_i :: (Flow (Int, a), Build a) => Sources (Int, a) -> IO (Sources a)
-- | Apply a function to all elements pulled from some sources.
map_i :: (Flow a, Build b) => (a -> b) -> Sources a -> IO (Sources b)
-- | Apply a function to all elements pushed to some sinks.
map_o :: (Flow a, Build b) => (a -> b) -> Sinks b -> IO (Sinks a)
-- | Combine corresponding elements of two sources with the given function.
zipWith_i :: (Flow a, Flow b, Build c) => (a -> b -> c) -> Sources a -> Sources b -> IO (Sources c)
-- | Send the same data to two consumers.
--
-- Given two argument sinks, yield a result sink. Pushing to the result
-- sink causes the same element to be pushed to both argument sinks.
dup_oo :: Sinks a -> Sinks a -> IO (Sinks a)
-- | Send the same data to two consumers.
--
-- Given an argument source and argument sink, yield a result source.
-- Pulling an element from the result source pulls from the argument
-- source, and pushes that element to the sink, as well as returning it
-- via the result source.
dup_io :: Sources a -> Sinks a -> IO (Sources a)
-- | Send the same data to two consumers.
--
-- Like dup_io but with the arguments flipped.
dup_oi :: Sinks a -> Sources a -> IO (Sources a)
-- | Connect an argument source to two result sources.
--
-- Pulling from either result source pulls from the argument source. Each
-- result source only gets the elements pulled at the time, so if one
-- side pulls all the elements the other side won't get any.
connect_i :: Sources a -> IO (Sources a, Sources a)
-- | Hook a worker function to some sources, which will be passed every
-- chunk that is pulled from each source.
--
--
-- - The worker is also passed the source index of the chunk that was
-- pulled.
--
watch_i :: (Int -> Array A a -> IO ()) -> Sources a -> IO (Sources a)
-- | Hook a worker function to some sinks, which will be passed every chunk
-- that is pushed to each sink.
--
--
-- - The worker is also passed the source index of the chunk that was
-- pushed.
--
watch_o :: (Int -> Array A a -> IO ()) -> Sinks a -> IO (Sinks a)
-- | Create a bundle of sinks of the given arity that pass incoming chunks
-- to a worker function.
--
--
-- - This is like watch_o, except that the incoming chunks are
-- discarded after they are passed to the worker function
--
trigger_o :: Int -> (Int -> Array A a -> IO ()) -> IO (Sinks a)
-- | Create a bundle of sinks of the given arity that drop all data on the
-- floor.
--
--
-- - Haskell debugging thunks attached to the chunks will be demanded,
-- but thunks attached to elements may not be -- depending on whether the
-- chunk representation is strict in the elements.
--
ignore_o :: Int -> IO (Sinks a)
-- | Create a bundle of sinks of the given arity that drop all data on the
-- floor.
--
--
-- - As opposed to ignore_o the sinks are non-strict in the
-- chunks.
-- - Haskell debugging thunks attached to the chunks will *not* be
-- demanded.
--
abandon_o :: Int -> IO (Sinks a)
-- | Given a source index and a length, split the a list of that length
-- from the front of the source. Yields a new source for the remaining
-- elements.
--
--
-- - We pull whole chunks from the source stream until we have
-- at least the desired number of elements. The leftover elements in the
-- final chunk are visible in the result Sources.
--
head_i :: Flow a => Int -> Int -> Sources a -> IO (Maybe ([a], Sources a))
-- | Concatenate a flow of arrays into a flow of the elements.
concat_i :: (Flow a, Build a) => Sources (Array a) -> IO (Sources a)
-- | Select a single column from a flow of rows of fields.
select_i :: (Select n (Array fs), Select' n (Array fs) ~ Array (Select' n fs)) => Nat n -> Sources fs -> IO (Sources (Select' n fs))
-- | Select a single column from a flow of fields.
select_o :: (Select n (Array fs), Select' n (Array fs) ~ Array (Select' n fs)) => Nat n -> Sinks (Select' n fs) -> IO (Sinks fs)
-- | Discard a single column from a flow of fields.
discard_i :: (Discard n (Array fs), Discard' n (Array fs) ~ Array (Discard' n fs)) => Nat n -> Sources fs -> IO (Sources (Discard' n fs))
-- | Discard a single column from a flow of fields.
discard_o :: (Discard n (Array fs), Discard' n (Array fs) ~ Array (Discard' n fs)) => Nat n -> Sinks (Discard' n fs) -> IO (Sinks fs)
-- | Mask columns from a flow of fields.
mask_i :: (Mask ms (Array fs), Mask' ms (Array fs) ~ Array (Mask' ms fs)) => ms -> Sources fs -> IO (Sources (Mask' ms fs))
-- | Mask columns from a flow of fields.
mask_o :: (Mask ms (Array fs), Mask' ms (Array fs) ~ Array (Mask' ms fs)) => ms -> Sinks (Mask' ms fs) -> IO (Sinks fs)
-- | Scan through some sources to find runs of matching elements, and count
-- the lengths of those runs.
--
--
-- > F.toList1 0 =<< F.groups_i =<< F.fromList 1 "waabbbblle"
-- [('w',1),('a',2),('b',4),('l',2),('e',1)]
--
groups_i :: (GroupsDict a, Eq a) => Sources a -> IO (Sources (a, Int))
-- | Like groupsBy, but take a function to determine whether two
-- consecutive values should be in the same group.
groupsBy_i :: GroupsDict a => (a -> a -> Bool) -> Sources a -> IO (Sources (a, Int))
-- | Dictionaries needed to perform a grouping.
type GroupsDict a = GroupsDict Int IO A A A a
-- | Fold all the elements of each stream in a bundle, one stream after the
-- other, returning an array of fold results.
foldlS :: (Flow b, Build a) => (a -> b -> a) -> a -> Sources b -> IO (Array A a)
-- | Fold all the elements of each stream in a bundle, one stream after the
-- other, returning an array of fold results.
foldlAllS :: Flow b => (a -> b -> a) -> a -> Sources b -> IO a
-- | Given streams of lengths and values, perform a segmented fold where
-- fold segments of values of the corresponding lengths are folded
-- together.
--
--
-- > sSegs <- F.fromList 1 [('a', 1), ('b', 2), ('c', 4), ('d', 0), ('e', 1), ('f', 5 :: Int)]
-- > sVals <- F.fromList 1 [10, 20, 30, 40, 50, 60, 70, 80, 90 :: Int]
--
-- > F.toList1 0 =<< F.folds_i (+) 0 sSegs sVals
-- [('a',10),('b',50),('c',220),('d',0),('e',80)]
--
--
-- If not enough input elements are available to fold a complete segment
-- then no output is produced for that segment. However, trailing zero
-- length segments still produce the initial value for the fold.
--
--
-- > sSegs <- F.fromList 1 [('a', 1), ('b', 2), ('c', 0), ('d', 0), ('e', 0 :: Int)]
-- > sVals <- F.fromList 1 [10, 20, 30 :: Int]
--
-- > F.toList1 0 =<< F.folds_i (*) 1 sSegs sVals
-- [('a',10),('b',600),('c',1),('d',1),('e',1)]
--
folds_i :: FoldsDict n a b => (a -> b -> b) -> b -> Sources (n, Int) -> Sources a -> IO (Sources (n, b))
-- | Dictionaries needed to perform a segmented fold.
type FoldsDict n a b = FoldsDict Int IO A A A A n a b
-- | Combination of groupsBy_i and folds_i. We determine the
-- the segment lengths while performing the folds.
--
-- Note that a SQL-like groupby aggregations can be performed using this
-- function, provided the data is pre-sorted on the group key. For
-- example, we can take the average of some groups of values:
--
--
-- > sKeys <- F.fromList 1 "waaaabllle"
-- > sVals <- F.fromList 1 [10, 20, 30, 40, 50, 60, 70, 80, 90, 100 :: Double]
--
-- > sResult <- F.map_i (\(key, (acc, n)) -> (key, acc / n))
-- =<< F.foldGroupsBy_i (==) (\x (acc, n) -> (acc + x, n + 1)) (0, 0) sKeys sVals
--
-- > F.toList1 0 sResult
-- [10.0,35.0,60.0,80.0,100.0]
--
foldGroupsBy_i :: (FoldGroupsDict n a b) => (n -> n -> Bool) -> (a -> b -> b) -> b -> Sources n -> Sources a -> IO (Sources (n, b))
type FoldGroupsDict n a b = (BulkI A n, Material A a, Material A n, Material A b)