-- Hoogle documentation, generated by Haddock
-- See Hoogle, http://www.haskell.org/hoogle/
-- | Data-parallel data flows.
--
@package repa-flow
@version 4.0.0.2
module Data.Repa.Flow.IO.Bucket
-- | A bucket represents portion of a whole data-set on disk, and contains
-- a file handle that points to the next piece of data to be read or
-- written.
--
-- The bucket could be created from a portion of a single flat file, or
-- be one file of a pre-split data set. The main advantage over a plain
-- Handle is that a Bucket can represent a small portion of
-- a single large file.
data Bucket
-- | Maximum length of the bucket, in bytes.
--
-- If Nothing then the length is indeterminate, which is used when
-- writing to files.
bucketLength :: Bucket -> Maybe Integer
-- | Open a file as a single bucket.
openBucket :: FilePath -> IOMode -> IO Bucket
-- | Wrap an existing file handle as a bucket.
hBucket :: Handle -> IO Bucket
-- | Open some files as buckets and use them as Sources.
fromFiles :: (Bulk l FilePath, Target l Bucket) => Array l FilePath -> (Array l Bucket -> IO b) -> IO b
-- | Like fromFiles, but take a list of file paths.
fromFiles' :: [FilePath] -> (Array B Bucket -> IO b) -> IO b
-- | Open all the files in a directory as separate buckets.
--
-- This operation may fail with the same exceptions as
-- getDirectoryContents.
fromDir :: FilePath -> (Array B Bucket -> IO b) -> IO b
-- | Open a file containing atomic records and split it into the given
-- number of evenly sized buckets.
--
-- The records are separated by a special terminating charater, which the
-- given predicate detects. The file is split cleanly on record
-- boundaries, so we get a whole number of records in each bucket. As the
-- records can be of varying size the buckets are not guaranteed to have
-- exactly the same length, in either records or buckets, though we try
-- to give them the approximatly the same number of bytes.
fromSplitFile :: Int -> (Word8 -> Bool) -> FilePath -> (Array B Bucket -> IO b) -> IO b
-- | Like fromSplitFile but start at the given offset.
fromSplitFileAt :: Int -> (Word8 -> Bool) -> FilePath -> Integer -> (Array B Bucket -> IO b) -> IO b
-- | Open some files for writing as individual buckets and pass them to the
-- given consumer.
toFiles :: (Bulk l FilePath, Target l Bucket) => Array l FilePath -> (Array l Bucket -> IO b) -> IO b
-- | Like toFiles, but take a list of file paths.
toFiles' :: [FilePath] -> (Array B Bucket -> IO b) -> IO b
-- | Create a new directory of the given name, containing the given number
-- of buckets.
--
-- If the directory is named somedir then the files are named
-- somedir/000000, somedir/000001,
-- somedir/000002 and so on.
toDir :: Int -> FilePath -> (Array B Bucket -> IO b) -> IO b
-- | Given a list of directories, create those directories and open the
-- given number of output files per directory.
--
-- In the resulting array of buckets, the outer dimension indexes each
-- directory, and the inner one indexes each file in its directory.
--
-- For each directory somedir the files are named
-- somedir/000000, somedir/000001,
-- somedir/000002 and so on.
toDirs' :: Int -> [FilePath] -> (Array (E B DIM2) Bucket -> IO b) -> IO b
-- | Close a bucket, releasing the contained file handle.
bClose :: Bucket -> IO ()
-- | Check if the bucket is currently open.
bIsOpen :: Bucket -> IO Bool
-- | Check if the contained file handle is at the end of the bucket.
bAtEnd :: Bucket -> IO Bool
-- | Seek to a position with a bucket.
bSeek :: Bucket -> SeekMode -> Integer -> IO ()
-- | Get some data from a bucket.
bGetArray :: Bucket -> Integer -> IO (Array F Word8)
-- | Put some data in a bucket.
bPutArray :: Bucket -> Array F Word8 -> IO ()
module Data.Repa.Flow.States
class (Ord i, Eq i) => Next i
first :: Next i => i
next :: Next i => i -> i -> Maybe i
check :: Next i => i -> i -> Bool
class (Ord i, Next i, Monad m) => States i m where data family Refs i m a
extentRefs :: States i m => Refs i m a -> i
newRefs :: States i m => i -> a -> m (Refs i m a)
readRefs :: States i m => Refs i m a -> i -> m a
writeRefs :: States i m => Refs i m a -> i -> a -> m ()
-- | Fold all the elements in a collection of refs.
foldRefsM :: States i m => (a -> b -> b) -> b -> Refs i m a -> m b
toListM :: States i m => Refs i m a -> m [a]
instance States Int m => States () m
instance States Int IO
instance Next (Int, Int)
instance Next Int
instance Next ()
-- | Everything flows.
--
-- This module defines generic flows. The other flow types defined in
-- Data.Repa.Flow.Chunked and Data.Repa.Flow.Simple are
-- specialisations of this generic one.
module Data.Repa.Flow.Generic
-- | A bundle of stream sources, indexed by a value of type i, in
-- some monad m, returning elements of type e.
--
-- Elements can be pulled from each stream in the bundle individually.
data Sources i m e
Sources :: i -> (i -> (e -> m ()) -> m () -> m ()) -> Sources i m e
-- | Number of sources in this bundle.
sourcesArity :: Sources i m e -> i
-- | Function to pull data from a bundle. Give it the index of the desired
-- stream, a continuation that accepts an element, and a continuation to
-- invoke when no more elements will ever be available.
sourcesPull :: Sources i m e -> i -> (e -> m ()) -> m () -> m ()
-- | A bundle of stream sinks, indexed by a value of type i, in
-- some monad m, returning elements of type e.
--
-- Elements can be pushed to each stream in the bundle individually.
data Sinks i m e
Sinks :: i -> (i -> e -> m ()) -> (i -> m ()) -> Sinks i m e
-- | Number of sources in the bundle.
sinksArity :: Sinks i m e -> i
-- | Push an element to one of the streams in the bundle.
sinksPush :: Sinks i m e -> i -> e -> m ()
-- | Signal that no more elements will ever be available for this sink. It
-- is ok to eject the same stream multiple times.
sinksEject :: Sinks i m e -> i -> m ()
-- | Pull all available values from the sources and push them to the sinks.
-- Streams in the bundle are processed sequentially, from first to last.
--
--
-- - If the Sources and Sinks have different numbers of
-- streams then we only evaluate the common subset.
--
drainS :: (Next i, Monad m) => Sources i m a -> Sinks i m a -> m ()
-- | Pull all available values from the sources and push them to the sinks,
-- in parallel. We fork a thread for each of the streams and evaluate
-- them all in parallel.
--
--
-- - If the Sources and Sinks have different numbers of
-- streams then we only evaluate the common subset.
--
drainP :: Sources Int IO a -> Sinks Int IO a -> IO ()
-- | Given an arity and a list of elements, yield sources that each produce
-- all the elements.
fromList :: States i m => i -> [a] -> m (Sources i m a)
-- | Drain a single source into a list.
toList1 :: States i m => i -> Sources i m a -> m [a]
-- | Drain the given number of elements from a single source into a list.
takeList1 :: States i m => Int -> i -> Sources i m a -> m [a]
-- | Push elements into the associated streams of a bundle of sinks.
pushList :: Monad m => [(i, a)] -> Sinks i m a -> m ()
-- | Push the elements of a list into the given stream of a bundle of
-- sinks.
pushList1 :: Monad m => i -> [a] -> Sinks i m a -> m ()
-- | Transform the stream indexes of a bundle of sources.
--
-- The given transform functions should be inverses of each other, else
-- you'll get a confusing result.
mapIndex_i :: Monad m => (i1 -> i2) -> (i2 -> i1) -> Sources i1 m a -> m (Sources i2 m a)
-- | Transform the stream indexes of a bundle of sinks.
--
-- The given transform functions should be inverses of each other, else
-- you'll get a confusing result.
mapIndex_o :: Monad m => (i1 -> i2) -> (i2 -> i1) -> Sinks i1 m a -> m (Sinks i2 m a)
-- | For a bundle of sources with a 2-d stream index, flip the components
-- of the index.
flipIndex2_i :: Monad m => Sources SH2 m a -> m (Sources SH2 m a)
-- | For a bundle of sinks with a 2-d stream index, flip the components of
-- the index.
flipIndex2_o :: Monad m => Sinks SH2 m a -> m (Sinks SH2 m a)
-- | Attach a finalizer to bundle of sources.
--
-- For each stream in the bundle, the finalizer will be called the first
-- time a consumer of that stream tries to pull an element when no more
-- are available.
--
-- The provided finalizer will be run after any finalizers already
-- attached to the source.
finalize_i :: States i m => (i -> m ()) -> Sources i m a -> m (Sources i m a)
-- | Attach a finalizer to a bundle of sinks.
--
-- For each stream in the bundle, the finalizer will be called the first
-- time that stream is ejected.
--
-- The provided finalizer will be run after any finalizers already
-- attached to the sink.
finalize_o :: States i m => (i -> m ()) -> Sinks i m a -> m (Sinks i m a)
-- | Project out a single stream source from a bundle.
project_i :: Monad m => i -> Sources i m a -> m (Sources () m a)
-- | Project out a single stream sink from a bundle.
project_o :: Monad m => i -> Sinks i m a -> m (Sinks () m a)
-- | Yield sources that always produce the same value.
repeat_i :: Monad m => i -> (i -> a) -> m (Sources i m a)
-- | Yield sources of the given length that always produce the same value.
replicate_i :: States i m => i -> Int -> (i -> a) -> m (Sources i m a)
-- | Prepend some more elements into the front of some sources.
prepend_i :: States i m => [a] -> Sources i m a -> m (Sources i m a)
-- | Like prepend_i but only prepend the elements to the streams
-- that match the given predicate.
prependOn_i :: States i m => (i -> Bool) -> [a] -> Sources i m a -> m (Sources i m a)
-- | Apply a function to every element pulled from some sources, producing
-- some new sources.
map_i :: Monad m => (a -> b) -> Sources i m a -> m (Sources i m b)
-- | Apply a function to every element pulled from some sources, producing
-- some new sources.
map_o :: Monad m => (a -> b) -> Sinks i m b -> m (Sinks i m a)
-- | Like map_i, but the worker function is also given the stream
-- index.
smap_i :: Monad m => (i -> a -> b) -> Sources i m a -> m (Sources i m b)
-- | Like map_o, but the worker function is also given the stream
-- index.
smap_o :: Monad m => (i -> a -> b) -> Sinks i m b -> m (Sinks i m a)
-- | Combine the elements of two flows with the given function. The worker
-- function is also given the stream index.
szipWith_ii :: (Ord i, Monad m) => (i -> a -> b -> c) -> Sources i m a -> Sources i m b -> m (Sources i m c)
-- | Like szipWith_ii, but take a bundle of Sinks for the
-- result elements, and yield a bundle of Sinks to accept the
-- b elements.
szipWith_io :: (Ord i, Monad m) => (i -> a -> b -> c) -> Sinks i m c -> Sources i m a -> m (Sinks i m b)
-- | Like szipWith_ii, but take a bundle of Sinks for the
-- result elements, and yield a bundle of Sinks to accept the
-- a elements.
szipWith_oi :: (Ord i, Monad m) => (i -> a -> b -> c) -> Sinks i m c -> Sources i m b -> m (Sinks i m a)
-- | Send the same data to two consumers.
--
-- Given two argument sinks, yield a result sink. Pushing to the result
-- sink causes the same element to be pushed to both argument sinks.
dup_oo :: (Ord i, States i m) => Sinks i m a -> Sinks i m a -> m (Sinks i m a)
-- | Send the same data to two consumers.
--
-- Given an argument source and argument sink, yield a result source.
-- Pulling an element from the result source pulls from the argument
-- source, and pushes that element to the sink, as well as returning it
-- via the result source.
dup_io :: (Ord i, Monad m) => Sources i m a -> Sinks i m a -> m (Sources i m a)
-- | Send the same data to two consumers.
--
-- Like dup_io but with the arguments flipped.
dup_oi :: (Ord i, Monad m) => Sinks i m a -> Sources i m a -> m (Sources i m a)
-- | Connect an argument source to two result sources.
--
-- Pulling from either result source pulls from the argument source. Each
-- result source only gets the elements pulled at the time, so if one
-- side pulls all the elements the other side won't get any.
connect_i :: States i m => Sources i m a -> m (Sources i m a, Sources i m a)
-- | Given a bundle of sources containing several streams, produce a new
-- bundle containing a single stream that gets data from the former.
--
-- Streams from the source are consumed in their natural order, and a
-- complete stream is consumed before moving onto the next one.
--
--
-- > import Data.Repa.Flow.Generic
-- > toList1 () =<< funnel_i =<< fromList (3 :: Int) [11, 22, 33]
-- [11,22,33,11,22,33,11,22,33]
--
funnel_i :: (States i m, States () m) => Sources i m a -> m (Sources () m a)
-- | Given a bundle of sinks consisting of a single stream, produce a new
-- bundle of the given arity that sends all data to the former, ignoring
-- the stream index.
--
-- The argument stream is ejected only when all of the streams in the
-- result bundle have been ejected.
--
--
-- - Using this function in conjunction with parallel operators like
-- drainP introduces non-determinism. Elements pushed to
-- different streams in the result bundle could enter the single stream
-- in the argument bundle in any order.
--
--
--
-- > import Data.Repa.Flow.Generic
-- > import Data.Repa.Array.Material
-- > import Data.Repa.Nice
--
-- > let things = [(0 :: Int, "foo"), (1, "bar"), (2, "baz")]
-- > result <- capture_o B () (\k -> funnel_o 4 k >>= pushList things)
-- > nice result
-- [((),"foo"),((),"bar"),((),"baz")]
--
funnel_o :: States i m => i -> Sinks () m a -> m (Sinks i m a)
-- | Split the given number of elements from the head of a source returning
-- those elements in a list, and yielding a new source for the rest.
head_i :: States i m => Int -> Sources i m a -> i -> m ([a], Sources i m a)
-- | From a stream of values which has consecutive runs of idential values,
-- produce a stream of the lengths of these runs.
--
-- Example: groups [4, 4, 4, 3, 3, 1, 1, 1, 4] = [3, 2, 3, 1]
groups_i :: (Ord i, Monad m, Eq a) => Sources i m a -> m (Sources i m Int)
-- | Given a stream of flags and a stream of values, produce a new stream
-- of values where the corresponding flag was True. The length of the
-- result is the length of the shorter of the two inputs.
pack_ii :: (Ord i, Monad m) => Sources i m Bool -> Sources i m a -> m (Sources i m a)
-- | Segmented fold.
folds_ii :: (Ord i, Monad m) => (a -> a -> a) -> a -> Sources i m Int -> Sources i m a -> m (Sources i m a)
-- | Apply a monadic function to every element pulled from some sources,
-- producing some new sources.
watch_i :: Monad m => (i -> a -> m ()) -> Sources i m a -> m (Sources i m a)
-- | Pass elements to the provided action as they are pushed into the sink.
watch_o :: Monad m => (i -> a -> m ()) -> Sinks i m a -> m (Sinks i m a)
-- | Like watch_o but doesn't pass elements to another sink.
trigger_o :: Monad m => i -> (i -> a -> m ()) -> m (Sinks i m a)
-- | Create a bundle of sinks of the given arity and capture any data
-- pushed to it.
--
--
-- > import Data.Repa.Flow.Generic
-- > import Data.Repa.Array.Material
-- > import Data.Repa.Nice
-- > import Control.Monad
-- > liftM nice $ capture_o B 4 (k -> pushList [(0 :: Int, "foo"), (1, "bar"), (0, "baz")] k)
-- > [(0,"foo"),(1,"bar"),(0,"baz")]
--
capture_o :: (Target lDst (i, a), Index lDst ~ Int) => Name lDst -> i -> (Sinks i IO a -> IO ()) -> IO (Array lDst (i, a))
-- | Like capture_o but also return the r-esult of the push
-- function.
rcapture_o :: (Target lDst (i, a), Index lDst ~ Int) => Name lDst -> i -> (Sinks i IO a -> IO b) -> IO (Array lDst (i, a), b)
-- | A sink that drops all data on the floor.
--
-- This sink is strict in the elements, so they are demanded before being
-- discarded. Haskell debugging thunks attached to the elements will be
-- demanded.
discard_o :: Monad m => i -> m (Sinks i m a)
-- | A sink that ignores all incoming data.
--
-- This sink is non-strict in the elements. Haskell tracing thunks
-- attached to the elements will *not* be demanded.
ignore_o :: Monad m => i -> m (Sinks i m a)
-- | Use the trace function from Debug.Trace to print each
-- element that is pushed to a sink.
--
--
-- - This function is intended for debugging only, and is not intended
-- for production code.
--
trace_o :: (Show i, Show a, Monad m) => i -> m (Sinks i m a)
-- | Given a bundle of sinks indexed by an Int, produce a result
-- sink for arrays.
--
-- Each time an array is pushed to the result sink, its elements are
-- pushed to the corresponding streams of the argument sink. If there are
-- more elements than sinks then then give them to the spill action.
--
--
-- | .. |
-- | [w0, x0, y0, z0] | :: Sinks () IO (Array l a)
-- | [w1, x1, y1, z1, u1] | (sink for a single stream of arrays)
-- | .. |
--
-- | | | | |
-- v v v v .------> spilled
--
-- | .. | .. | .. | .. |
-- | w0 | x0 | y0 | z0 | :: Sinks Int IO a
-- | w1 | x1 | y1 | z1 | (sink for several streams of elements)
-- | .. | .. | .. | .. |
--
distribute_o :: BulkI l a => (Int -> a -> IO ()) -> Sinks Int IO a -> IO (Sinks () IO (Array l a))
-- | Like distribute_o, but drop spilled elements on the floor.
ddistribute_o :: BulkI l a => Sinks Int IO a -> IO (Sinks () IO (Array l a))
-- | Like distribute_o, but with 2-d stream indexes.
--
-- Given the argument and result sinks, when pushing to the result the
-- stream index is used as the first component for the argument sink, and
-- the index of the element in its array is used as the second component.
--
-- If you want to the components of stream index the other way around
-- then apply flipIndex2_o to the argument sinks.
distribute2_o :: BulkI l a => (SH2 -> a -> IO ()) -> Sinks SH2 IO a -> IO (Sinks Int IO (Array l a))
-- | Like distribute2_o, but drop spilled elements on the floor.
ddistribute2_o :: BulkI l a => Sinks SH2 IO a -> IO (Sinks Int IO (Array l a))
-- | Given a bundle of argument sinks, produce a result sink. Arrays of
-- indices and elements are pushed to the result sink. On doing so, the
-- elements are pushed into the corresponding streams of the argument
-- sinks.
--
-- If the index associated with an element does not have a corresponding
-- stream in the argument sinks, then pass it to the provided spill
-- function.
--
--
-- | .. |
-- | [(0, v0), (1, v1), (0, v2), (0, v3), (2, v4)] | :: Sources Int IO (Array l (Int, a))
-- | .. |
-- \ \ |
-- \ .------------. |
-- v v .---------> spilled
--
-- | .. | .. |
-- | [v0, v2, v3] | [v1] | :: Sinks Int IO (Array l a)
-- | .. | .. |
--
--
-- The following example uses capture_o to demonstrate how the
-- shuffle_o operator can be used as one step of a bucket-sort. We
-- start with two arrays of key-value pairs. In the result, the values
-- from each block that had the same key are packed into the same tuple
-- (bucket).
--
--
-- > import Data.Repa.Flow.Generic as G
-- > import Data.Repa.Array as A
-- > import Data.Repa.Array.Material as A
-- > import Data.Repa.Nice
--
-- > let arr1 = A.fromList B [(0, 'a'), (1, 'b'), (2, 'c'), (0, 'd'), (0, 'c')]
-- > let arr2 = A.fromList B [(0, 'A'), (3, 'B'), (3, 'C')]
-- > result :: Array B (Int, Array U Char)
-- > <- capture_o B 4 (\k -> shuffle_o B (error "spilled") k
-- > >>= pushList1 () [arr1, arr2])
--
-- > nice result
-- [(0,"adc"),(1,"b"),(2,"c"),(0,"A"),(3,"BC")]
--
shuffle_o :: (BulkI lDst a, BulkI lSrc (Int, a), Windowable lDst a, Target lDst a, Elt a) => Name lSrc -> (Int -> Array lDst a -> IO ()) -> Sinks Int IO (Array lDst a) -> IO (Sinks () IO (Array lSrc (Int, a)))
-- | Like shuffle_o, but drop spilled elements on the floor.
dshuffle_o :: (BulkI lDst a, BulkI lSrc (Int, a), Windowable lDst a, Target lDst a, Elt a) => Name lSrc -> Sinks Int IO (Array lDst a) -> IO (Sinks () IO (Array lSrc (Int, a)))
-- | Like dshuffle_o, but use the given function to decide which
-- stream of the argument bundle each element should be pushed into.
--
--
-- > import Data.Repa.Flow.Generic as G
-- > import Data.Repa.Array as A
-- > import Data.Repa.Array.Material as A
-- > import Data.Repa.Nice
-- > import Data.Char
--
-- > let arr1 = A.fromList B "FooBAr"
-- > let arr2 = A.fromList B "BazLIKE"
-- > result :: Array B (Int, Array U Char)
-- <- capture_o B 2 (\k -> dshuffleBy_o B (\x -> if isUpper x then 0 else 1) k
-- >>= pushList1 () [arr1, arr2])
-- > nice result
-- [(0,"FBA"),(1,"oor"),(0,"BLIKE"),(1,"az")]
--
dshuffleBy_o :: (BulkI lDst a, BulkI lSrc a, Windowable lDst a, Target lDst a, Elt a) => Name lSrc -> (a -> Int) -> Sinks Int IO (Array lDst a) -> IO (Sinks () IO (Array lSrc a))
-- | Take elements from a flow and pack them into chunks of the given
-- maximum length.
chunk_i :: (Target lDst a, Index lDst ~ Int, States i IO) => Name lDst -> Int -> Sources i IO a -> IO (Sources i IO (Array lDst a))
-- | Take a flow of chunks and flatten it into a flow of the individual
-- elements.
unchunk_i :: (BulkI l a, States i IO) => Sources i IO (Array l a) -> IO (Sources i IO a)
module Data.Repa.Flow.Generic.Debug
-- | Given a source index and a length, pull enough chunks from the source
-- to build a list of the requested length, and discard the remaining
-- elements in the final chunk.
--
--
-- - This function is intended for interactive debugging. If you want
-- to retain the rest of the final chunk then use head_i.
--
more :: (States i IO, Nicer a) => i -> Sources i IO a -> IO [Nice a]
-- | Like more but also specify now many elements you want.
more' :: (States i IO, Nicer a) => i -> Int -> Sources i IO a -> IO [Nice a]
-- | Like more, but print results in a tabular form to the console.
moret :: (States i IO, Nicer [a], Presentable (Nice [a])) => i -> Sources i IO a -> IO ()
-- | Like more', but print results in tabular form to the console.
moret' :: (States i IO, Nicer [a], Presentable (Nice [a])) => i -> Int -> Sources i IO a -> IO ()
-- | Like more, but show elements in their raw format.
morer :: States i IO => i -> Sources i IO a -> IO [a]
-- | Like more', but show elements in their raw format.
morer' :: States i IO => i -> Int -> Sources i IO a -> IO [a]
-- | Convert some value to a nice form.
--
-- In particular:
--
--
-- - Nested Arrays are converted to nested lists, so that they are
-- easier to work with on the ghci console.
-- - Lists of characters are wrapped into the Str data type, so
-- that they can be pretty printed differently by follow-on
-- processing.
--
--
-- As ghci automatically pretty prints lists, using nice is more
-- fun than trying to show the raw Repa array representations.
class Nicer a where type family Nice a :: *
nice :: Nicer a => a -> Nice a
-- | Convert some value to a form presentable to the user.
--
-- Like show but we allow the nesting structure to be preserved so
-- it can be displayed in tabular format.
class Presentable a
present :: Presentable a => a -> Present
module Data.Repa.Flow.Generic.IO
-- | Read data from some files, using the given chunk length.
--
--
-- - Data is read into foreign memory without copying it through the
-- GHC heap.
-- - All chunks have the same size, except possibly the last one.
--
sourceBytes :: Bulk l Bucket => Integer -> Array l Bucket -> IO (Sources (Index l) IO (Array F Word8))
-- | Read 8-byte ASCII characters from some files, using the given chunk
-- length.
--
--
-- - Data is read into foreign memory without copying it through the
-- GHC heap.
-- - All chunks have the same size, except possibly the last one.
--
sourceChars :: Bulk l Bucket => Integer -> Array l Bucket -> IO (Sources (Index l) IO (Array F Char))
-- | Like sourceRecords, but produce all records in a single vector.
sourceChunks :: BulkI l Bucket => Integer -> (Word8 -> Bool) -> IO () -> Array l Bucket -> IO (Sources (Index l) IO (Array F Word8))
-- | Read complete records of data form a bucket, into chunks of the given
-- length. We read as many complete records as will fit into each chunk.
--
-- The records are separated by a special terminating character, which
-- the given predicate detects. After reading a chunk of data we seek the
-- bucket to just after the last complete record that was read, so we can
-- continue to read more complete records next time.
--
-- If we cannot fit at least one complete record in the chunk then
-- perform the given failure action. Limiting the chunk length guards
-- against the case where a large input file is malformed, as we won't
-- try to read the whole file into memory.
--
--
-- - Data is read into foreign memory without copying it through the
-- GHC heap.
-- - The provided file handle must support seeking, else you'll get an
-- exception.
--
sourceRecords :: BulkI l Bucket => Integer -> (Word8 -> Bool) -> IO () -> Array l Bucket -> IO (Sources Int IO (Array N (Array F Word8)))
-- | Write chunks of bytes to the given file handles.
--
--
-- - Data is written out directly from the provided buffer.
--
sinkBytes :: Bulk l Bucket => Array l Bucket -> IO (Sinks (Index l) IO (Array F Word8))
-- | Write chunks of 8-byte ASCII characters to the given file handles.
--
--
-- - Data is copied into a foreign buffer to truncate the characters to
-- 8-bits each before being written out.
--
sinkChars :: (Bulk l Bucket, BulkI r Char) => Array l Bucket -> IO (Sinks (Index l) IO (Array r Char))
-- | Write vectors of text lines to the given files handles.
--
--
-- - Data is copied into a new buffer to insert newlines before being
-- written out.
--
sinkLines :: (Bulk l Bucket, BulkI l1 (Array l2 Char), BulkI l2 Char, Unpack (Array l2 Char) t2) => Name l1 -> Name l2 -> Array l Bucket -> IO (Sinks (Index l) IO (Array l1 (Array l2 Char)))
-- | Create an output sieve that writes data to an indeterminate number of
-- output files. Each new element is appended to its associated file.
--
--
-- - TODO: This function keeps a maximum of 8 files open at once,
-- closing and re-opening them in a least-recently-used order. Due to
-- this behaviour it's fine to create thousands of separate output files
-- without risking overflowing the process limit on the maximum number of
-- useable file handles.
--
sieve_o :: (a -> Maybe (FilePath, Array F Word8)) -> IO (Sinks () IO a)
-- | Input and Output for Chunked Flows.
--
-- Most functions in this module are re-exports of the ones from
-- Data.Repa.Flow.Generic.IO, but using the Sources and
-- Sinks type synonyms for chunked flows.
module Data.Repa.Flow.Chunked.IO
-- | Like fileSourceRecords, but taking an existing file handle.
sourceRecords :: BulkI l Bucket => Integer -> (Word8 -> Bool) -> IO () -> Array l Bucket -> IO (Sources Int IO N (Array F Word8))
-- | Read 8-bit ASCII characters from some files, using the given chunk
-- length.
sourceChars :: BulkI l Bucket => Integer -> Array l Bucket -> IO (Sources Int IO F Char)
-- | Read data from some files, using the given chunk length.
sourceBytes :: BulkI l Bucket => Integer -> Array l Bucket -> IO (Sources Int IO F Word8)
-- | Write 8-bit ASCII characters to the given file handles.
sinkChars :: BulkI l Bucket => Array l Bucket -> IO (Sinks Int IO F Char)
-- | Write chunks of data to the given file handles.
sinkBytes :: BulkI l Bucket => Array l Bucket -> IO (Sinks Int IO F Word8)
module Data.Repa.Flow.Simple
-- | Source consisting of a single stream.
type Source m e = Sources () m e
-- | Sink consisting of a single stream.
type Sink m e = Sinks () m e
-- | Pull all available values from the source and push them to the sink.
drainS :: Monad m => Source m a -> Sink m a -> m ()
-- | Given an arity and a list of elements, yield a source that produces
-- all the elements.
fromList :: States () m => [a] -> m (Source m a)
-- | Drain a source into a list.
toList :: States () m => Source m a -> m [a]
-- | Drain the given number of elements from a single source into a list.
takeList :: States () m => Int -> Source m a -> m [a]
-- | Attach a finalizer to a source.
--
-- The finalizer will be called the first time a consumer of that stream
-- tries to pull an element when no more are available.
--
-- The provided finalizer will be run after any finalizers already
-- attached to the source.
finalize_i :: States () m => m () -> Source m a -> m (Source m a)
-- | Attach a finalizer to a sink.
--
-- The finalizer will be called the first time the stream is ejected.
--
-- The provided finalizer will be run after any finalizers already
-- attached to the sink.
finalize_o :: States () m => m () -> Sink m a -> m (Sink m a)
-- | Yield a source that always produces the same value.
repeat_i :: States () m => a -> m (Source m a)
-- | Yield a source of the given length that always produces the same
-- value.
replicate_i :: States () m => Int -> a -> m (Source m a)
-- | Prepend some more elements to the front of a source.
prepend_i :: States () m => [a] -> Source m a -> m (Source m a)
-- | Apply a function to every element pulled from some source, producing a
-- new source.
map_i :: States () m => (a -> b) -> Source m a -> m (Source m b)
-- | Apply a function to every element pushed to some sink, producing a new
-- sink.
map_o :: States () m => (a -> b) -> Sink m b -> m (Sink m a)
-- | Send the same data to two consumers.
--
-- Given two argument sinks, yield a result sink. Pushing to the result
-- sink causes the same element to be pushed to both argument sinks.
dup_oo :: States () m => Sink m a -> Sink m a -> m (Sink m a)
-- | Send the same data to two consumers.
--
-- Given an argument source and argument sink, yield a result source.
-- Pulling an element from the result source pulls from the argument
-- source, and pushes that element to the sink, as well as returning it
-- via the result source.
dup_io :: States () m => Source m a -> Sink m a -> m (Source m a)
-- | Send the same data to two consumers.
--
-- Like dup_io but with the arguments flipped.
dup_oi :: States () m => Sink m a -> Source m a -> m (Source m a)
-- | Connect an argument source to two result sources.
--
-- Pulling from either result source pulls from the argument source. Each
-- result source only gets the elements pulled at the time, so if one
-- side pulls all the elements the other side won't get any.
connect_i :: States () m => Source m a -> m (Source m a, Source m a)
-- | Split the given number of elements from the head of a source,
-- returning those elements in a list, and yielding a new source for the
-- rest.
head_i :: States () m => Int -> Source m a -> m ([a], Source m a)
-- | Peek at the given number of elements in the stream, returning a result
-- stream that still produces them all.
peek_i :: States () m => Int -> Source m a -> m ([a], Source m a)
-- | From a stream of values which has consecutive runs of idential values,
-- produce a stream of the lengths of these runs.
--
-- Example: groups [4, 4, 4, 3, 3, 1, 1, 1, 4] = [3, 2, 3, 1]
groups_i :: (Monad m, Eq a) => Source m a -> m (Source m Int)
-- | Given a stream of flags and a stream of values, produce a new stream
-- of values where the corresponding flag was True. The length of the
-- result is the length of the shorter of the two inputs.
pack_ii :: Monad m => Source m Bool -> Source m a -> m (Source m a)
-- | Segmented fold.
folds_ii :: Monad m => (a -> a -> a) -> a -> Source m Int -> Source m a -> m (Source m a)
-- | Apply a monadic function to every element pulled from a source
-- producing a new source.
watch_i :: Monad m => (a -> m ()) -> Source m a -> m (Source m a)
-- | Pass elements to the provided action as they are pushed to the sink.
watch_o :: Monad m => (a -> m ()) -> Sink m a -> m (Sink m a)
-- | Like watch but doesn't pass elements to another sink.
trigger_o :: Monad m => (a -> m ()) -> m (Sink m a)
-- | A sink that drops all data on the floor.
--
-- This sink is strict in the elements, so they are demanded before being
-- discarded. Haskell debugging thunks attached to the elements will be
-- demanded.
discard_o :: Monad m => m (Sink m a)
-- | A sink that ignores all incoming elements.
--
-- This sink is non-strict in the elements. Haskell tracing thinks
-- attached to the elements will *not* be demanded.
ignore_o :: Monad m => m (Sink m a)
-- | Open some files as buckets and use them as Sources.
fromFiles :: (Bulk l FilePath, Target l Bucket) => Array l FilePath -> (Array l Bucket -> IO b) -> IO b
-- | Read data from a file, using the given chunk length.
--
--
-- - Data is read into foreign memory without copying it through the
-- GHC heap.
-- - All chunks have the same size, except possibly the last one.
--
--
-- The file will be closed the first time the consumer tries to pull an
-- element from the associated stream when no more are available.
sourceBytes :: Integer -> Bucket -> IO (Source IO (Array F Word8))
-- | Read complete records of data from a file, using the given chunk
-- length
--
-- The records are separated by a special terminating character, which
-- the given predicate detects. After reading a chunk of data we seek to
-- just after the last complete record that was read, so we can continue
-- to read more complete records next time.
--
-- If we cannot find an end-of-record terminator in the chunk then apply
-- the given failure action. The records can be no longer than the chunk
-- length. This fact guards against the case where a large input file is
-- malformed and contains no end-of-record terminators, as we won't try
-- to read the whole file into memory.
--
--
-- - Data is read into foreign memory without copying it through the
-- GHC heap.
-- - All chunks have the same size, except possibly the last one.
-- - The provided file handle must support seeking, else you'll get an
-- exception.
--
--
-- The file will be closed the first time the consumer tries to pull an
-- element from the associated stream when no more are available.
sourceRecords :: Integer -> (Word8 -> Bool) -> IO () -> Bucket -> IO (Source IO (Array N (Array F Word8)))
-- | Open some files for writing as individual buckets and pass them to the
-- given consumer.
toFiles :: (Bulk l FilePath, Target l Bucket) => Array l FilePath -> (Array l Bucket -> IO b) -> IO b
-- | Write chunks of data to the given files.
--
-- The file will be closed when the associated stream is ejected.
sinkBytes :: Bucket -> IO (Sink IO (Array F Word8))
module Data.Repa.Flow.Chunked
-- | A bundle of sources, where the elements are chunked into arrays.
type Sources i m l e = Sources i m (Array l e)
-- | A bundle of sinks, where the elements are chunked into arrays.
type Sinks i m l e = Sinks i m (Array l e)
-- | Shorthand for common type classes.
type Flow i m l a = (Ord i, Monad m, BulkI l a, States i m)
-- | Pull all available values from the sources and push them to the sinks.
drainS :: (Next i, Monad m) => Sources i m r a -> Sinks i m r a -> m ()
-- | Given an arity and a list of elements, yield sources that each produce
-- all the elements.
--
--
-- - All elements are stuffed into a single chunk, and each stream is
-- given the same chunk.
--
fromList :: (States i m, TargetI l a) => Name l -> i -> [a] -> m (Sources i m l a)
-- | Like fromLists but take a list of lists, where each of the
-- inner lists is packed into a single chunk.
fromLists :: (States i m, TargetI l a) => Name l -> i -> [[a]] -> m (Sources i m l a)
-- | Drain a single source into a list of elements.
toList1 :: (States i m, BulkI l a) => i -> Sources i m l a -> m [a]
-- | Drain a single source into a list of chunks.
toLists1 :: (States i m, BulkI l a) => i -> Sources i m l a -> m [[a]]
-- | Attach a finalizer to a bundle of sources.
--
-- For each stream in the bundle, the finalizer will be called the first
-- time a consumer of that stream tries to pull an element when no more
-- are available.
--
-- The provided finalizer will be run after any finalizers already
-- attached to the source.
finalize_i :: States i m => (i -> m ()) -> Sources i m l a -> m (Sources i m l a)
-- | Attach a finalizer to a bundle of sinks.
--
-- The finalizer will be called the first time the stream is ejected.
--
-- The provided finalizer will be run after any finalizers already
-- attached to the sink.
finalize_o :: States i m => (i -> m ()) -> Sinks i m l a -> m (Sinks i m l a)
-- | Map a function over elements pulled from a source.
smap_i :: (Flow i m l1 a, TargetI l2 b) => (i -> a -> b) -> Sources i m l1 a -> m (Sources i m l2 b)
-- | Map a function over elements pushed into a sink.
smap_o :: (Flow i m l1 a, TargetI l2 b) => (i -> a -> b) -> Sinks i m l2 b -> m (Sinks i m l1 a)
-- | Combine the elements of two flows with the given function.
szipWith_ii :: (Ord i, States i m, BulkI lSrc1 a, BulkI lSrc2 b, TargetI lDst c, Windowable lSrc1 a, Windowable lSrc2 b) => Name lDst -> (i -> a -> b -> c) -> Sources i m lSrc1 a -> Sources i m lSrc2 b -> m (Sources i m lDst c)
-- | Split the given number of elements from the head of a source,
-- retrurning those elements in a list, and yielding a new source for the
-- rest.
--
--
-- - We pull whole chunks from the source stream until we have
-- at least the desired number of elements. The leftover elements in the
-- final chunk are visible in the result Sources.
--
head_i :: (States i m, Windowable l a, Index l ~ Int) => Int -> Sources i m l a -> i -> m ([a], Sources i m l a)
-- | From a stream of values which has consecutive runs of idential values,
-- produce a stream of the lengths of these runs.
--
--
-- groupsBy (==) [4, 4, 4, 3, 3, 1, 1, 1, 4]
-- => [3, 2, 3, 1]
--
groupsBy_i :: GroupsDict i m lVal lGrp tGrp lLen tLen a => Name lGrp -> Name lLen -> (a -> a -> Bool) -> Sources i m lVal a -> m (Sources i m (T2 lGrp lLen) (a, Int))
-- | Dictionaries needed to perform a grouping.
type GroupsDict i m lVal lGrp tGrp lLen tLen a = (Flow i m lVal a, Index lVal ~ Int, TargetI lGrp a, TargetI lLen Int, Unpack (IOBuffer lGrp a) tGrp, Unpack (IOBuffer lLen Int) tLen)
-- | Fold all elements of all streams in a bundle individually, returning
-- an array of per-stream results.
foldlS :: (States Int m, Target lDst a, Index lDst ~ Int, BulkI lSrc b) => Name lDst -> (a -> b -> a) -> a -> Sources Int m lSrc b -> m (Array lDst a)
-- | Fold all elements of all streams in a bundle together, one stream
-- after the other, returning the single final value.
foldlAllS :: (States () m, BulkI lSrc b) => (a -> b -> a) -> a -> Sources Int m lSrc b -> m a
-- | Segmented fold over vectors of segment lengths and input values.
folds_i :: FoldsDict i m lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b => Name lGrp -> Name lRes -> (a -> b -> b) -> b -> Sources i m lSeg (n, Int) -> Sources i m lElt a -> m (Sources i m (T2 lGrp lRes) (n, b))
-- | Dictionaries needed to perform a segmented fold.
type FoldsDict i m lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b = (States i m, Windowable lSeg (n, Int), Windowable lElt a, BulkI lSeg (n, Int), BulkI lElt a, BulkI lGrp n, BulkI lRes b, TargetI lElt a, TargetI lGrp n, TargetI lRes b, Unpack (IOBuffer lGrp n) tGrp, Unpack (IOBuffer lRes b) tRes)
-- | Hook a monadic function to some sources, which will be passed every
-- chunk that is pulled from the result.
watch_i :: Monad m => (i -> Array l a -> m ()) -> Sources i m l a -> m (Sources i m l a)
-- | Hook a monadic function to some sinks, which will be passed every
-- chunk that is pushed to the result.
watch_o :: Monad m => (i -> Array l a -> m ()) -> Sinks i m l a -> m (Sinks i m l a)
-- | Like watch_o but discard the incoming chunks after they are
-- passed to the function.
trigger_o :: Monad m => i -> (i -> Array l a -> m ()) -> m (Sinks i m l a)
-- | Yield a bundle of sinks of the given arity that drops all data on the
-- floor.
--
--
-- - The sinks is strict in the *chunks*, so they are demanded before
-- being discarded. Haskell debugging thunks attached to the chunks will
-- be demanded, but thunks attached to elements may not be -- depending
-- on whether the chunk representation is strict in the elements.
--
discard_o :: Monad m => i -> m (Sinks i m l a)
-- | A sink that ignores all incoming data.
--
-- This sink is non-strict in the chunks. Haskell tracing thunks attached
-- to the chunks will *not* be demanded.
ignore_o :: Monad m => i -> m (Sinks i m l a)
-- | This module defines the default specialisation of flows that appears
-- in Data.Repa.Flow. Each stream in the bundle is indexed by a
-- single integer, and stream state is stored using the IO monad.
module Data.Repa.Flow.Default
-- | A bundle of stream sources, where the elements of the stream are
-- chunked into arrays.
--
-- The chunks have some Layout l and contain elements of
-- type a. See Data.Repa.Array for the available layouts.
type Sources l a = Sources Int IO l a
-- | A bundle of stream sinks, where the elements of the stream are chunked
-- into arrays.
type Sinks l a = Sinks Int IO l a
-- | Shorthand for common type classes.
type Flow l a = Flow Int IO l a
-- | Yield the number of streams in the bundle.
sourcesArity :: Sources l a -> Int
-- | Yield the number of streams in the bundle.
sinksArity :: Sinks l a -> Int
-- | Pull all available values from the sources and push them to the sinks.
-- Streams in the bundle are processed sequentially, from first to last.
--
--
-- - If the Sources and Sinks have different numbers of
-- streams then we only evaluate the common subset.
--
drainS :: Sources l a -> Sinks l a -> IO ()
-- | Pull all available values from the sources and push them to the sinks,
-- in parallel. We fork a thread for each of the streams and evaluate
-- them all in parallel.
--
--
-- - If the Sources and Sinks have different numbers of
-- streams then we only evaluate the common subset.
--
drainP :: Sources l a -> Sinks l a -> IO ()
-- | Given an arity and a list of elements, yield sources that each produce
-- all the elements.
--
--
-- - All elements are stuffed into a single chunk, and each stream is
-- given the same chunk.
--
fromList :: TargetI l a => Name l -> Int -> [a] -> IO (Sources l a)
-- | Like fromLists_i but take a list of lists. Each each of the
-- inner lists is packed into a single chunk.
fromLists :: TargetI l a => Name l -> Int -> [[a]] -> IO (Sources l a)
-- | Drain a single source from a bundle into a list of elements.
toList1 :: BulkI l a => Int -> Sources l a -> IO [a]
-- | Drain a single source from a bundle into a list of chunks.
toLists1 :: BulkI l a => Int -> Sources l a -> IO [[a]]
-- | Attach a finalizer to some sources.
--
--
-- - For a given source, the finalizer will be called the first time a
-- consumer of that source tries to pull an element when no more are
-- available.
-- - The finalizer is given the index of the source that ended.
-- - The finalizer will be run after any finalizers already attached to
-- the source.
--
finalize_i :: (Int -> IO ()) -> Sources l a -> IO (Sources l a)
-- | Attach a finalizer to some sinks.
--
--
-- - For a given sink, the finalizer will be called the first time that
-- sink is ejected.
-- - The finalizer is given the index of the sink that was
-- ejected.
-- - The finalizer will be run after any finalizers already attached to
-- the sink.
--
finalize_o :: (Int -> IO ()) -> Sinks l a -> IO (Sinks l a)
-- | Apply a function to all elements pulled from some sources.
map_i :: (Flow l1 a, TargetI l2 b) => Name l2 -> (a -> b) -> Sources l1 a -> IO (Sources l2 b)
-- | Apply a function to all elements pushed to some sinks.
map_o :: (Flow l1 a, TargetI l2 b) => Name l1 -> (a -> b) -> Sinks l2 b -> IO (Sinks l1 a)
-- | Send the same data to two consumers.
--
-- Given two argument sinks, yield a result sink. Pushing to the result
-- sink causes the same element to be pushed to both argument sinks.
dup_oo :: Sinks l a -> Sinks l a -> IO (Sinks l a)
-- | Send the same data to two consumers.
--
-- Given an argument source and argument sink, yield a result source.
-- Pulling an element from the result source pulls from the argument
-- source, and pushes that element to the sink, as well as returning it
-- via the result source.
dup_io :: Sources l a -> Sinks l a -> IO (Sources l a)
-- | Send the same data to two consumers.
--
-- Like dup_io but with the arguments flipped.
dup_oi :: Sinks l a -> Sources l a -> IO (Sources l a)
-- | Connect an argument source to two result sources.
--
-- Pulling from either result source pulls from the argument source. Each
-- result source only gets the elements pulled at the time, so if one
-- side pulls all the elements the other side won't get any.
connect_i :: Sources l a -> IO (Sources l a, Sources l a)
-- | Hook a worker function to some sources, which will be passed every
-- chunk that is pulled from each source.
--
--
-- - The worker is also passed the source index of the chunk that was
-- pulled.
--
watch_i :: (Int -> Array l a -> IO ()) -> Sources l a -> IO (Sources l a)
-- | Hook a worker function to some sinks, which will be passed every chunk
-- that is pushed to each sink.
--
--
-- - The worker is also passed the source index of the chunk that was
-- pushed.
--
watch_o :: (Int -> Array l a -> IO ()) -> Sinks l a -> IO (Sinks l a)
-- | Create a bundle of sinks of the given arity that pass incoming chunks
-- to a worker function.
--
--
-- - This is like watch_o, except that the incoming chunks are
-- discarded after they are passed to the worker function
--
trigger_o :: Int -> (Int -> Array l a -> IO ()) -> IO (Sinks l a)
-- | Create a bundle of sinks of the given arity that drop all data on the
-- floor.
--
--
-- - The sinks is strict in the *chunks*, so they are demanded before
-- being discarded.
-- - Haskell debugging thunks attached to the chunks will be demanded,
-- but thunks attached to elements may not be -- depending on whether the
-- chunk representation is strict in the elements.
--
discard_o :: Int -> IO (Sinks l a)
-- | Create a bundle of sinks of the given arity that drop all data on the
-- floor.
--
--
-- - As opposed to discard_o the sinks are non-strict in the
-- chunks.
-- - Haskell debugging thunks attached to the chunks will *not* be
-- demanded.
--
ignore_o :: Int -> IO (Sinks l a)
-- | Given a source index and a length, split the a list of that length
-- from the front of the source. Yields a new source for the remaining
-- elements.
--
--
-- - We pull whole chunks from the source stream until we have
-- at least the desired number of elements. The leftover elements in the
-- final chunk are visible in the result Sources.
--
head_i :: (Windowable l a, Index l ~ Int) => Int -> Int -> Sources l a -> IO (Maybe ([a], Sources l a))
-- | Scan through some sources to find runs of matching elements, and count
-- the lengths of those runs.
--
--
-- > import Data.Repa.Flow
-- > toList1 0 =<< groups_i U U =<< fromList U 1 "waabbbblle"
-- Just [('w',1),('a',2),('b',4),('l',2),('e',1)]
--
groups_i :: (GroupsDict lVal lGrp tGrp lLen tLen a, Eq a) => Name lGrp -> Name lLen -> Sources lVal a -> IO (Sources (T2 lGrp lLen) (a, Int))
-- | Like groupsBy, but take a function to determine whether two
-- consecutive values should be in the same group.
groupsBy_i :: GroupsDict lVal lGrp tGrp lLen tLen a => Name lGrp -> Name lLen -> (a -> a -> Bool) -> Sources lVal a -> IO (Sources (T2 lGrp lLen) (a, Int))
-- | Dictionaries needed to perform a grouping.
type GroupsDict lVal lGrp tGrp lLen tLen a = GroupsDict Int IO lVal lGrp tGrp lLen tLen a
-- | Fold all the elements of each stream in a bundle, one stream after the
-- other, returning an array of fold results.
foldlS :: (Target lDst a, Index lDst ~ Int, BulkI lSrc b) => Name lDst -> (a -> b -> a) -> a -> Sources lSrc b -> IO (Array lDst a)
-- | Fold all the elements of each stream in a bundle, one stream after the
-- other, returning an array of fold results.
foldlAllS :: BulkI lSrc b => (a -> b -> a) -> a -> Sources lSrc b -> IO a
-- | Given streams of lengths and values, perform a segmented fold where
-- fold segments of values of the corresponding lengths are folded
-- together.
--
--
-- > import Data.Repa.Flow
-- > sSegs <- fromList U 1 [('a', 1), ('b', 2), ('c', 4), ('d', 0), ('e', 1), ('f', 5 :: Int)]
-- > sVals <- fromList U 1 [10, 20, 30, 40, 50, 60, 70, 80, 90 :: Int]
-- > toList1 0 =<< folds_i U U (+) 0 sSegs sVals
-- Just [('a',10),('b',50),('c',220),('d',0),('e',80)]
--
--
-- If not enough input elements are available to fold a complete segment
-- then no output is produced for that segment. However, trailing zero
-- length segments still produce the initial value for the fold.
--
--
-- > import Data.Repa.Flow
-- > sSegs <- fromList U 1 [('a', 1), ('b', 2), ('c', 0), ('d', 0), ('e', 0 :: Int)]
-- > sVals <- fromList U 1 [10, 20, 30 :: Int]
-- > toList1 0 =<< folds_i U U (*) 1 sSegs sVals
-- Just [('a',10),('b',600),('c',1),('d',1),('e',1)]
--
folds_i :: FoldsDict lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b => Name lGrp -> Name lRes -> (a -> b -> b) -> b -> Sources lSeg (n, Int) -> Sources lElt a -> IO (Sources (T2 lGrp lRes) (n, b))
-- | Dictionaries needed to perform a segmented fold.
type FoldsDict lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b = FoldsDict Int IO lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b
-- | Combination of groupsBy_i and folds_i. We determine the
-- the segment lengths while performing the folds.
--
-- Note that a SQL-like groupby aggregations can be performed using this
-- function, provided the data is pre-sorted on the group key. For
-- example, we can take the average of some groups of values:
--
--
-- > import Data.Repa.Flow
-- > sKeys <- fromList U 1 "waaaabllle"
-- > sVals <- fromList U 1 [10, 20, 30, 40, 50, 60, 70, 80, 90, 100 :: Double]
--
-- > sResult <- map_i U (\(key, (acc, n)) -> (key, acc / n))
-- =<< foldGroupsBy_i U U (==) (\x (acc, n) -> (acc + x, n + 1)) (0, 0) sKeys sVals
--
-- > toList1 0 sResult
-- Just [10.0,35.0,60.0,80.0,100.0]
--
foldGroupsBy_i :: FoldGroupsDict lSeg tSeg lVal tVal lGrp tGrp lRes tRes n a b => Name lGrp -> Name lRes -> (n -> n -> Bool) -> (a -> b -> b) -> b -> Sources lSeg n -> Sources lVal a -> IO (Sources (T2 lGrp lRes) (n, b))
type FoldGroupsDict lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b = (BulkI lSeg n, Material lElt a, Index lElt ~ Int, Material lGrp n, Index lGrp ~ Int, Material lRes b, Index lRes ~ Int, Unpack (IOBuffer lGrp n) tGrp, Unpack (IOBuffer lRes b) tRes)
module Data.Repa.Flow.Default.Debug
-- | Given a source index and a length, pull enough chunks from the source
-- to build a list of the requested length, and discard the remaining
-- elements in the final chunk.
--
--
-- - This function is intended for interactive debugging. If you want
-- to retain the rest of the final chunk then use head_i.
--
more :: (Windowable l a, Index l ~ Int, Nicer a) => Int -> Sources l a -> IO (Maybe [Nice a])
-- | Like more but also specify now many elements you want.
more' :: (Windowable l a, Index l ~ Int, Nicer a) => Int -> Int -> Sources l a -> IO (Maybe [Nice a])
-- | Like more, but print results in a tabular form to the console.
moret :: (Windowable l a, Index l ~ Int, Nicer [a], Presentable (Nice [a])) => Int -> Sources l a -> IO ()
-- | Like more', but print results in tabular form to the console.
moret' :: (Windowable l a, Index l ~ Int, Nicer [a], Presentable (Nice [a])) => Int -> Int -> Sources l a -> IO ()
-- | Like more, but show elements in their raw format.
morer :: (Windowable l a, Index l ~ Int) => Int -> Sources l a -> IO (Maybe [a])
-- | Like more', but show elements in their raw format.
morer' :: (Windowable l a, Index l ~ Int) => Int -> Int -> Sources l a -> IO (Maybe [a])
-- | Convert some value to a nice form.
--
-- In particular:
--
--
-- - Nested Arrays are converted to nested lists, so that they are
-- easier to work with on the ghci console.
-- - Lists of characters are wrapped into the Str data type, so
-- that they can be pretty printed differently by follow-on
-- processing.
--
--
-- As ghci automatically pretty prints lists, using nice is more
-- fun than trying to show the raw Repa array representations.
class Nicer a where type family Nice a :: *
nice :: Nicer a => a -> Nice a
-- | Convert some value to a form presentable to the user.
--
-- Like show but we allow the nesting structure to be preserved so
-- it can be displayed in tabular format.
class Presentable a
present :: Presentable a => a -> Present
-- | Read and write files.
module Data.Repa.Flow.Default.SizedIO
-- | Like sourceBytes, but with the default chunk size.
sourceBytes :: BulkI l Bucket => Integer -> Array l Bucket -> IO (Sources F Word8)
-- | Like sourceChars, but with the default chunk size.
sourceChars :: BulkI l Bucket => Integer -> Array l Bucket -> IO (Sources F Char)
-- | Like sourceLines, but with the default chunk size and error
-- action.
sourceLines :: BulkI l Bucket => Integer -> IO () -> Array l Bucket -> IO (Sources N (Array F Char))
-- | Like sourceRecords, but with the default chunk size and error
-- action.
sourceRecords :: BulkI l Bucket => Integer -> (Word8 -> Bool) -> IO () -> Array l Bucket -> IO (Sources N (Array F Word8))
-- | Read a file containing Tab-Separated-Values.
sourceTSV :: BulkI l Bucket => Integer -> IO () -> Array l Bucket -> IO (Sources N (Array N (Array F Char)))
-- | Read a file containing Tab-Separated-Values.
--
-- TODO: handle escaped commas. TODO: check CSV file standard.
sourceCSV :: BulkI l Bucket => Integer -> IO () -> Array l Bucket -> IO (Sources N (Array N (Array F Char)))
-- | An alias for sinkBytes.
sinkBytes :: BulkI l Bucket => Array l Bucket -> IO (Sinks F Word8)
-- | An alias for sinkChars.
sinkChars :: BulkI l Bucket => Array l Bucket -> IO (Sinks F Char)
-- | An alias for sinkLines.
sinkLines :: (BulkI l Bucket, BulkI l1 (Array l2 Char), BulkI l2 Char, Unpack (Array l2 Char) t2) => Name l1 -> Name l2 -> Array l Bucket -> IO (Sinks l1 (Array l2 Char))
-- | Read and write files.
--
-- The functions in this module are wrappers for the ones in
-- Data.Repa.Flow.Default.SizedIO that use a default chunk size of
-- 64kBytes and just call error if the source file appears
-- corruped.
module Data.Repa.Flow.Default.IO
-- | The default chunk size of 64kBytes.
defaultChunkSize :: Integer
-- | Read a file containing Comma-Separated-Values.
sourceCSV :: BulkI l Bucket => Array l Bucket -> IO (Sources N (Array N (Array F Char)))
-- | Read a file containing Tab-Separated-Values.
sourceTSV :: BulkI l Bucket => Array l Bucket -> IO (Sources N (Array N (Array F Char)))
-- | Read complete records of data form a file, into chunks of the given
-- length. We read as many complete records as will fit into each chunk.
--
-- The records are separated by a special terminating character, which
-- the given predicate detects. After reading a chunk of data we seek the
-- file to just after the last complete record that was read, so we can
-- continue to read more complete records next time.
--
-- If we cannot fit at least one complete record in the chunk then
-- perform the given failure action. Limiting the chunk length guards
-- against the case where a large input file is malformed, as we won't
-- try to read the whole file into memory.
--
--
-- - Data is read into foreign memory without copying it through the
-- GHC heap.
-- - The provided file handle must support seeking, else you'll get an
-- exception.
-- - Each file is closed the first time the consumer tries to pull a
-- record from the associated stream when no more are available.
--
sourceRecords :: BulkI l Bucket => (Word8 -> Bool) -> Array l Bucket -> IO (Sources N (Array F Word8))
-- | Read complete lines of data from a text file, using the given chunk
-- length. We read as many complete lines as will fit into each chunk.
--
--
-- - The trailing new-line characters are discarded.
-- - Data is read into foreign memory without copying it through the
-- GHC heap.
-- - The provided file handle must support seeking, else you'll get an
-- exception.
-- - Each file is closed the first time the consumer tries to pull a
-- line from the associated stream when no more are available.
--
sourceLines :: BulkI l Bucket => Array l Bucket -> IO (Sources N (Array F Char))
-- | Read 8-bit ASCII characters from some files, using the given chunk
-- length.
sourceChars :: BulkI l Bucket => Array l Bucket -> IO (Sources F Char)
-- | Read data from some files, using the given chunk length.
sourceBytes :: BulkI l Bucket => Array l Bucket -> IO (Sources F Word8)
-- | Write 8-bit ASCII characters to some files.
sinkChars :: BulkI l Bucket => Array l Bucket -> IO (Sinks F Char)
-- | Write vectors of text lines to the given files handles.
--
--
-- - Data is copied into a new buffer to insert newlines before being
-- written out.
--
sinkLines :: (BulkI l Bucket, BulkI l1 (Array l2 Char), BulkI l2 Char, Unpack (Array l2 Char) t2) => Name l1 -> Name l2 -> Array l Bucket -> IO (Sinks l1 (Array l2 Char))
-- | Write bytes to some file.
sinkBytes :: BulkI l Bucket => Array l Bucket -> IO (Sinks F Word8)
-- | Getting Started
--
-- A flow consists of a bundle of individual streams. Here we create a
-- bundle of two streams, using different files for each. Data will be
-- read in chunks, using the default chunk size of 64kBytes.
--
--
-- > import Data.Repa.Flow
-- > import Data.Repa.Flow.Default.Debug
-- > ws <- fromFiles ["/usr/share/dict/words", "/usr/share/dict/cracklib-small"] sourceLines
--
--
-- Show the first few elements of the first chunk of the first file.
--
--
-- > more 0 ws
-- Just ["A","A's","AA's","AB's","ABM's","AC's","ACTH's","AI's" ...]
--
--
-- The more function is helpful for debugging. It pulls a whole
-- chunk from a source, displays the requested number of elements from
-- the front of it, then discards the rest. In production code you could
-- use head_i to split a few elements from a stream while
-- retaining the rest.
--
-- Use more' to show more elements at a time. We've already pulled
-- the first chunk, so here are the first 100 elements from the second
-- chunk:
--
--
-- > more' 0 100 ws
-- Just ["Jubal","Judah","Judaic","Judaism","Judaism's","Judaisms","Judas" ...]
--
--
-- Use moret to display elements in tabular form. Here are the
-- first few elements of the second stream in the bundle:
--
--
-- > moret 1 ws
-- "10th"
-- "1st"
-- "2nd"
-- "3rd"
-- "4th"
-- "5th"
-- ...
--
--
-- Lets convert the characters to upper-case.
--
--
-- > import Data.Char
-- > up <- map_i B (mapS U toUpper) ws
-- > more 0 up
-- Just ["UTOPIAN","UTOPIAN'S","UTOPIANS","UTOPIAS","UTRECHT" ...]
--
--
-- The B and U are Layout names that indicate how
-- the chunks for the result streams should be arranged in memory. In
-- this case the chunks are B-oxed arrays of U-nboxed
-- arrays of characters. Other useful layouts are F which stores
-- data in foreign memory, and N for nested arrays.
--
-- Flows are data-parallel, which means operators like map_i apply
-- to all streams in the bundle. The second stream has been converted to
-- upper-case as well:
--
--
-- > more 1 up
-- Just ["BROWNER","BROWNEST","BROWNIAN","BROWNIE","BROWNIE'S" ...]
--
--
-- Lets write out the data to some files. There are two streams in the
-- bundle, so open a file for each stream:
--
--
-- > out <- toFiles ["out1.txt", "out2.txt"] $ sinkLines B U
--
--
-- Note that the ws and up we used before were bundles
-- of stream Sources whereas out is a bundle of stream
-- Sinks. When we used the map_i operator before the
-- _i (input) suffix indicates that this is a transformer of
-- Sources. There is a related map_o (output) operator for
-- Sinks.
--
-- Now that we have a bundle of Sources, and some matching
-- Sinks, we can drainS all of the data from the former
-- into the latter.
--
--
-- > drainS up out
--
--
-- At this point we can run an external shell command to check the
-- output.
--
--
-- > :! head out1.txt
-- BEARSKIN'S
-- BEARSKINS
-- BEAST
-- BEAST'S
-- BEASTLIER
-- BEASTLIEST
-- BEASTLINESS
-- BEASTLINESS'S
-- BEASTLY
-- BEASTLY'S
--
--
-- Performance
--
-- Althogh repa-flow can be used productively in the ghci REPL,
-- performance won't be great because you will be running unspecialised,
-- polymorphic code. For best results you should write a complete program
-- and compile it with ghc -fllvm -O2 Main.hs.
module Data.Repa.Flow
-- | A bundle of stream sources, where the elements of the stream are
-- chunked into arrays.
--
-- The chunks have some Layout l and contain elements of
-- type a. See Data.Repa.Array for the available layouts.
type Sources l a = Sources Int IO l a
-- | A bundle of stream sinks, where the elements of the stream are chunked
-- into arrays.
type Sinks l a = Sinks Int IO l a
-- | Shorthand for common type classes.
type Flow l a = Flow Int IO l a
-- | Yield the number of streams in the bundle.
sourcesArity :: Sources l a -> Int
-- | Yield the number of streams in the bundle.
sinksArity :: Sinks l a -> Int
-- | Pull all available values from the sources and push them to the sinks.
-- Streams in the bundle are processed sequentially, from first to last.
--
--
-- - If the Sources and Sinks have different numbers of
-- streams then we only evaluate the common subset.
--
drainS :: Sources l a -> Sinks l a -> IO ()
-- | Pull all available values from the sources and push them to the sinks,
-- in parallel. We fork a thread for each of the streams and evaluate
-- them all in parallel.
--
--
-- - If the Sources and Sinks have different numbers of
-- streams then we only evaluate the common subset.
--
drainP :: Sources l a -> Sinks l a -> IO ()
-- | Given an arity and a list of elements, yield sources that each produce
-- all the elements.
--
--
-- - All elements are stuffed into a single chunk, and each stream is
-- given the same chunk.
--
fromList :: TargetI l a => Name l -> Int -> [a] -> IO (Sources l a)
-- | Like fromLists_i but take a list of lists. Each each of the
-- inner lists is packed into a single chunk.
fromLists :: TargetI l a => Name l -> Int -> [[a]] -> IO (Sources l a)
-- | Drain a single source from a bundle into a list of elements.
toList1 :: BulkI l a => Int -> Sources l a -> IO [a]
-- | Drain a single source from a bundle into a list of chunks.
toLists1 :: BulkI l a => Int -> Sources l a -> IO [[a]]
-- | Attach a finalizer to some sources.
--
--
-- - For a given source, the finalizer will be called the first time a
-- consumer of that source tries to pull an element when no more are
-- available.
-- - The finalizer is given the index of the source that ended.
-- - The finalizer will be run after any finalizers already attached to
-- the source.
--
finalize_i :: (Int -> IO ()) -> Sources l a -> IO (Sources l a)
-- | Attach a finalizer to some sinks.
--
--
-- - For a given sink, the finalizer will be called the first time that
-- sink is ejected.
-- - The finalizer is given the index of the sink that was
-- ejected.
-- - The finalizer will be run after any finalizers already attached to
-- the sink.
--
finalize_o :: (Int -> IO ()) -> Sinks l a -> IO (Sinks l a)
-- | Apply a function to all elements pulled from some sources.
map_i :: (Flow l1 a, TargetI l2 b) => Name l2 -> (a -> b) -> Sources l1 a -> IO (Sources l2 b)
-- | Apply a function to all elements pushed to some sinks.
map_o :: (Flow l1 a, TargetI l2 b) => Name l1 -> (a -> b) -> Sinks l2 b -> IO (Sinks l1 a)
-- | Send the same data to two consumers.
--
-- Given two argument sinks, yield a result sink. Pushing to the result
-- sink causes the same element to be pushed to both argument sinks.
dup_oo :: Sinks l a -> Sinks l a -> IO (Sinks l a)
-- | Send the same data to two consumers.
--
-- Given an argument source and argument sink, yield a result source.
-- Pulling an element from the result source pulls from the argument
-- source, and pushes that element to the sink, as well as returning it
-- via the result source.
dup_io :: Sources l a -> Sinks l a -> IO (Sources l a)
-- | Send the same data to two consumers.
--
-- Like dup_io but with the arguments flipped.
dup_oi :: Sinks l a -> Sources l a -> IO (Sources l a)
-- | Connect an argument source to two result sources.
--
-- Pulling from either result source pulls from the argument source. Each
-- result source only gets the elements pulled at the time, so if one
-- side pulls all the elements the other side won't get any.
connect_i :: Sources l a -> IO (Sources l a, Sources l a)
-- | Hook a worker function to some sources, which will be passed every
-- chunk that is pulled from each source.
--
--
-- - The worker is also passed the source index of the chunk that was
-- pulled.
--
watch_i :: (Int -> Array l a -> IO ()) -> Sources l a -> IO (Sources l a)
-- | Hook a worker function to some sinks, which will be passed every chunk
-- that is pushed to each sink.
--
--
-- - The worker is also passed the source index of the chunk that was
-- pushed.
--
watch_o :: (Int -> Array l a -> IO ()) -> Sinks l a -> IO (Sinks l a)
-- | Create a bundle of sinks of the given arity that pass incoming chunks
-- to a worker function.
--
--
-- - This is like watch_o, except that the incoming chunks are
-- discarded after they are passed to the worker function
--
trigger_o :: Int -> (Int -> Array l a -> IO ()) -> IO (Sinks l a)
-- | Create a bundle of sinks of the given arity that drop all data on the
-- floor.
--
--
-- - The sinks is strict in the *chunks*, so they are demanded before
-- being discarded.
-- - Haskell debugging thunks attached to the chunks will be demanded,
-- but thunks attached to elements may not be -- depending on whether the
-- chunk representation is strict in the elements.
--
discard_o :: Int -> IO (Sinks l a)
-- | Create a bundle of sinks of the given arity that drop all data on the
-- floor.
--
--
-- - As opposed to discard_o the sinks are non-strict in the
-- chunks.
-- - Haskell debugging thunks attached to the chunks will *not* be
-- demanded.
--
ignore_o :: Int -> IO (Sinks l a)
-- | Given a source index and a length, split the a list of that length
-- from the front of the source. Yields a new source for the remaining
-- elements.
--
--
-- - We pull whole chunks from the source stream until we have
-- at least the desired number of elements. The leftover elements in the
-- final chunk are visible in the result Sources.
--
head_i :: (Windowable l a, Index l ~ Int) => Int -> Int -> Sources l a -> IO (Maybe ([a], Sources l a))
-- | Scan through some sources to find runs of matching elements, and count
-- the lengths of those runs.
--
--
-- > import Data.Repa.Flow
-- > toList1 0 =<< groups_i U U =<< fromList U 1 "waabbbblle"
-- Just [('w',1),('a',2),('b',4),('l',2),('e',1)]
--
groups_i :: (GroupsDict lVal lGrp tGrp lLen tLen a, Eq a) => Name lGrp -> Name lLen -> Sources lVal a -> IO (Sources (T2 lGrp lLen) (a, Int))
-- | Like groupsBy, but take a function to determine whether two
-- consecutive values should be in the same group.
groupsBy_i :: GroupsDict lVal lGrp tGrp lLen tLen a => Name lGrp -> Name lLen -> (a -> a -> Bool) -> Sources lVal a -> IO (Sources (T2 lGrp lLen) (a, Int))
-- | Dictionaries needed to perform a grouping.
type GroupsDict lVal lGrp tGrp lLen tLen a = GroupsDict Int IO lVal lGrp tGrp lLen tLen a
-- | Fold all the elements of each stream in a bundle, one stream after the
-- other, returning an array of fold results.
foldlS :: (Target lDst a, Index lDst ~ Int, BulkI lSrc b) => Name lDst -> (a -> b -> a) -> a -> Sources lSrc b -> IO (Array lDst a)
-- | Fold all the elements of each stream in a bundle, one stream after the
-- other, returning an array of fold results.
foldlAllS :: BulkI lSrc b => (a -> b -> a) -> a -> Sources lSrc b -> IO a
-- | Given streams of lengths and values, perform a segmented fold where
-- fold segments of values of the corresponding lengths are folded
-- together.
--
--
-- > import Data.Repa.Flow
-- > sSegs <- fromList U 1 [('a', 1), ('b', 2), ('c', 4), ('d', 0), ('e', 1), ('f', 5 :: Int)]
-- > sVals <- fromList U 1 [10, 20, 30, 40, 50, 60, 70, 80, 90 :: Int]
-- > toList1 0 =<< folds_i U U (+) 0 sSegs sVals
-- Just [('a',10),('b',50),('c',220),('d',0),('e',80)]
--
--
-- If not enough input elements are available to fold a complete segment
-- then no output is produced for that segment. However, trailing zero
-- length segments still produce the initial value for the fold.
--
--
-- > import Data.Repa.Flow
-- > sSegs <- fromList U 1 [('a', 1), ('b', 2), ('c', 0), ('d', 0), ('e', 0 :: Int)]
-- > sVals <- fromList U 1 [10, 20, 30 :: Int]
-- > toList1 0 =<< folds_i U U (*) 1 sSegs sVals
-- Just [('a',10),('b',600),('c',1),('d',1),('e',1)]
--
folds_i :: FoldsDict lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b => Name lGrp -> Name lRes -> (a -> b -> b) -> b -> Sources lSeg (n, Int) -> Sources lElt a -> IO (Sources (T2 lGrp lRes) (n, b))
-- | Dictionaries needed to perform a segmented fold.
type FoldsDict lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b = FoldsDict Int IO lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b
-- | Combination of groupsBy_i and folds_i. We determine the
-- the segment lengths while performing the folds.
--
-- Note that a SQL-like groupby aggregations can be performed using this
-- function, provided the data is pre-sorted on the group key. For
-- example, we can take the average of some groups of values:
--
--
-- > import Data.Repa.Flow
-- > sKeys <- fromList U 1 "waaaabllle"
-- > sVals <- fromList U 1 [10, 20, 30, 40, 50, 60, 70, 80, 90, 100 :: Double]
--
-- > sResult <- map_i U (\(key, (acc, n)) -> (key, acc / n))
-- =<< foldGroupsBy_i U U (==) (\x (acc, n) -> (acc + x, n + 1)) (0, 0) sKeys sVals
--
-- > toList1 0 sResult
-- Just [10.0,35.0,60.0,80.0,100.0]
--
foldGroupsBy_i :: FoldGroupsDict lSeg tSeg lVal tVal lGrp tGrp lRes tRes n a b => Name lGrp -> Name lRes -> (n -> n -> Bool) -> (a -> b -> b) -> b -> Sources lSeg n -> Sources lVal a -> IO (Sources (T2 lGrp lRes) (n, b))
type FoldGroupsDict lSeg tSeg lElt tElt lGrp tGrp lRes tRes n a b = (BulkI lSeg n, Material lElt a, Index lElt ~ Int, Material lGrp n, Index lGrp ~ Int, Material lRes b, Index lRes ~ Int, Unpack (IOBuffer lGrp n) tGrp, Unpack (IOBuffer lRes b) tRes)
-- | The default chunk size of 64kBytes.
defaultChunkSize :: Integer
-- | Read a file containing Comma-Separated-Values.
sourceCSV :: BulkI l Bucket => Array l Bucket -> IO (Sources N (Array N (Array F Char)))
-- | Read a file containing Tab-Separated-Values.
sourceTSV :: BulkI l Bucket => Array l Bucket -> IO (Sources N (Array N (Array F Char)))
-- | Read complete records of data form a file, into chunks of the given
-- length. We read as many complete records as will fit into each chunk.
--
-- The records are separated by a special terminating character, which
-- the given predicate detects. After reading a chunk of data we seek the
-- file to just after the last complete record that was read, so we can
-- continue to read more complete records next time.
--
-- If we cannot fit at least one complete record in the chunk then
-- perform the given failure action. Limiting the chunk length guards
-- against the case where a large input file is malformed, as we won't
-- try to read the whole file into memory.
--
--
-- - Data is read into foreign memory without copying it through the
-- GHC heap.
-- - The provided file handle must support seeking, else you'll get an
-- exception.
-- - Each file is closed the first time the consumer tries to pull a
-- record from the associated stream when no more are available.
--
sourceRecords :: BulkI l Bucket => (Word8 -> Bool) -> Array l Bucket -> IO (Sources N (Array F Word8))
-- | Read complete lines of data from a text file, using the given chunk
-- length. We read as many complete lines as will fit into each chunk.
--
--
-- - The trailing new-line characters are discarded.
-- - Data is read into foreign memory without copying it through the
-- GHC heap.
-- - The provided file handle must support seeking, else you'll get an
-- exception.
-- - Each file is closed the first time the consumer tries to pull a
-- line from the associated stream when no more are available.
--
sourceLines :: BulkI l Bucket => Array l Bucket -> IO (Sources N (Array F Char))
-- | Read 8-bit ASCII characters from some files, using the given chunk
-- length.
sourceChars :: BulkI l Bucket => Array l Bucket -> IO (Sources F Char)
-- | Read data from some files, using the given chunk length.
sourceBytes :: BulkI l Bucket => Array l Bucket -> IO (Sources F Word8)
-- | Write 8-bit ASCII characters to some files.
sinkChars :: BulkI l Bucket => Array l Bucket -> IO (Sinks F Char)
-- | Write vectors of text lines to the given files handles.
--
--
-- - Data is copied into a new buffer to insert newlines before being
-- written out.
--
sinkLines :: (BulkI l Bucket, BulkI l1 (Array l2 Char), BulkI l2 Char, Unpack (Array l2 Char) t2) => Name l1 -> Name l2 -> Array l Bucket -> IO (Sinks l1 (Array l2 Char))
-- | Write bytes to some file.
sinkBytes :: BulkI l Bucket => Array l Bucket -> IO (Sinks F Word8)