-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Data-parallel data flows. -- -- Data-parallel data flows. @package repa-flow @version 4.2.2.1 module Data.Repa.Flow.States class (Ord i, Eq i) => Next i -- | Get the zero for this index type. first :: Next i => i -- | Given an index an arity, get the next index after this one, or -- Nothing if there aren't any more. next :: Next i => i -> i -> Maybe i -- | Check if an index is valid for this arity. check :: Next i => i -> i -> Bool class (Ord i, Next i, Monad m) => States i m where data family Refs i m a -- | Get the extent of the collection. extentRefs :: States i m => Refs i m a -> i -- | Allocate a new state of the given arity, also returning an index to -- the first element of the collection. newRefs :: States i m => i -> a -> m (Refs i m a) -- | Write an element of the state. readRefs :: States i m => Refs i m a -> i -> m a -- | Read an element of the state. writeRefs :: States i m => Refs i m a -> i -> a -> m () -- | Fold all the elements in a collection of refs. foldRefsM :: States i m => (a -> b -> b) -> b -> Refs i m a -> m b toListM :: States i m => Refs i m a -> m [a] instance Data.Repa.Flow.States.Next () instance Data.Repa.Flow.States.Next GHC.Types.Int instance Data.Repa.Flow.States.Next (GHC.Types.Int, GHC.Types.Int) instance Data.Repa.Flow.States.States GHC.Types.Int GHC.Types.IO instance Data.Repa.Flow.States.States GHC.Types.Int m => Data.Repa.Flow.States.States () m module Data.Repa.Flow.IO.Bucket -- | A bucket represents portion of a whole data-set on disk, and contains -- a file handle that points to the next piece of data to be read or -- written. -- -- The bucket could be created from a portion of a single flat file, or -- be one file of a pre-split data set. The main advantage over a plain -- Handle is that a Bucket can represent a small portion of -- a single large file. data Bucket Bucket :: Maybe FilePath -> Integer -> Maybe Integer -> Handle -> Bucket -- | Physical location of the file, if known. [bucketFilePath] :: Bucket -> Maybe FilePath -- | Starting position of the bucket in the file, in bytes. [bucketStartPos] :: Bucket -> Integer -- | Maximum length of the bucket, in bytes. -- -- If Nothing then the length is indeterminate, which is used when -- writing to files. [bucketLength] :: Bucket -> Maybe Integer -- | File handle for the bucket. -- -- If several buckets have been created from a single file, then all -- buckets will have handles bound to that file, but they will be at -- different positions. [bucketHandle] :: Bucket -> Handle -- | Open a file as a single bucket. openBucket :: FilePath -> IOMode -> IO Bucket -- | Wrap an existing file handle as a bucket. -- -- The starting position is set to 0. hBucket :: Handle -> IO Bucket -- | Open some files as buckets and use them as Sources. fromFiles :: [FilePath] -> (Array B Bucket -> IO b) -> IO b -- | Like fromFiles', but take a list of file paths. fromFiles' :: (Bulk l FilePath, Target l Bucket) => Array l FilePath -> (Array l Bucket -> IO b) -> IO b -- | Open all the files in a directory as separate buckets. -- -- This operation may fail with the same exceptions as -- getDirectoryContents. fromDir :: FilePath -> (Array B Bucket -> IO b) -> IO b -- | Open a file containing atomic records and split it into the given -- number of evenly sized buckets. -- -- The records are separated by a special terminating charater, which the -- given predicate detects. The file is split cleanly on record -- boundaries, so we get a whole number of records in each bucket. As the -- records can be of varying size the buckets are not guaranteed to have -- exactly the same length, in either records or buckets, though we try -- to give them the approximatly the same number of bytes. fromSplitFile :: Int -> (Word8 -> Bool) -> FilePath -> (Array B Bucket -> IO b) -> IO b -- | Like fromSplitFile but start at the given offset. fromSplitFileAt :: Int -> (Word8 -> Bool) -> FilePath -> Integer -> (Array B Bucket -> IO b) -> IO b -- | Open some files for writing as individual buckets and pass them to the -- given consumer. toFiles :: [FilePath] -> (Array B Bucket -> IO b) -> IO b -- | Like toFiles, but take an array of FilePaths. toFiles' :: (Bulk l FilePath, Target l Bucket) => Array l FilePath -> (Array l Bucket -> IO b) -> IO b -- | Create a new directory of the given name, containing the given number -- of buckets. -- -- If the directory is named somedir then the files are named -- somedir/000000, somedir/000001, -- somedir/000002 and so on. toDir :: Int -> FilePath -> (Array B Bucket -> IO b) -> IO b -- | Given a list of directories, create those directories and open the -- given number of output files per directory. -- -- In the resulting array of buckets, the outer dimension indexes each -- directory, and the inner one indexes each file in its directory. -- -- For each directory somedir the files are named -- somedir/000000, somedir/000001, -- somedir/000002 and so on. toDirs :: Int -> [FilePath] -> (Array (E B DIM2) Bucket -> IO b) -> IO b -- | Close a bucket, releasing the contained file handle. bClose :: Bucket -> IO () -- | Check if the bucket is currently open. bIsOpen :: Bucket -> IO Bool -- | Check if the contained file handle is at the end of the bucket. bAtEnd :: Bucket -> IO Bool -- | Seek to a position with a bucket. bSeek :: Bucket -> SeekMode -> Integer -> IO () -- | Get some data from a bucket. bGetArray :: Bucket -> Integer -> IO (Array F Word8) -- | Put some data in a bucket. bPutArray :: Bucket -> Array F Word8 -> IO () module Data.Repa.Flow.Generic.IO -- | Read data from some files, using the given chunk length. -- -- sourceBytes :: Bulk l Bucket => Integer -> Array l Bucket -> IO (Sources (Index l) IO (Array F Word8)) -- | Read 8-byte ASCII characters from some files, using the given chunk -- length. -- -- sourceChars :: Bulk l Bucket => Integer -> Array l Bucket -> IO (Sources (Index l) IO (Array F Char)) -- | Like sourceRecords, but produce all records in a single vector. sourceChunks :: BulkI l Bucket => Integer -> (Word8 -> Bool) -> IO () -> Array l Bucket -> IO (Sources (Index l) IO (Array F Word8)) -- | Read complete records of data form a bucket, into chunks of the given -- length. We read as many complete records as will fit into each chunk. -- -- The records are separated by a special terminating character, which -- the given predicate detects. After reading a chunk of data we seek the -- bucket to just after the last complete record that was read, so we can -- continue to read more complete records next time. -- -- If we cannot fit at least one complete record in the chunk then -- perform the given failure action. Limiting the chunk length guards -- against the case where a large input file is malformed, as we won't -- try to read the whole file into memory. -- -- sourceRecords :: BulkI l Bucket => Integer -> (Word8 -> Bool) -> IO () -> Array l Bucket -> IO (Sources Int IO (Array N (Array F Word8))) -- | Read lines from a named text file, in a chunk-wise manner, converting -- each line to values with the given format. sourceLinesFormat :: (Unpackable format, Target A (Value format)) => Integer -> IO () -> IO (Array A Word8 -> IO ()) -> format -> Array B Bucket -> IO (Sources Int IO (Array A (Value format))) -- | Read lines from a lazy byte string, in a chunk-wise manner, converting -- each line to values with the given format. sourceLinesFormatFromLazyByteString :: (Unpackable format, Target A (Value format)) => Int -> IO (Array A Word -> IO ()) -> format -> ByteString -> Int -> IO (Sources Int IO (Array A (Value format))) -- | Write chunks of bytes to the given file handles. -- -- sinkBytes :: Bulk l Bucket => Array l Bucket -> IO (Sinks (Index l) IO (Array F Word8)) -- | Write chunks of 8-byte ASCII characters to the given file handles. -- -- sinkChars :: (Bulk l Bucket, BulkI r Char) => Array l Bucket -> IO (Sinks (Index l) IO (Array r Char)) -- | Write vectors of text lines to the given files handles. -- -- sinkLines :: (Bulk l Bucket, BulkI l1 (Array l2 Char), BulkI l2 Char) => Name l1 -> Name l2 -> Array l Bucket -> IO (Sinks (Index l) IO (Array l1 (Array l2 Char))) -- | Create an output sieve that writes data to an indeterminate number of -- output files. Each new element is appended to its associated file. sieve_o :: Int -> Int -> (a -> Maybe (FilePath, Array F Word8)) -> IO (Sinks () IO a) -- | Read a file containing Comma-Separated-Values. -- -- TODO: handle escaped commas. TODO: check CSV file standard. sourceCSV :: BulkI l Bucket => Integer -> IO () -> Array l Bucket -> IO (Sources Int IO (Array N (Array N (Array F Char)))) -- | Read a file containing Tab-Separated-Values. sourceTSV :: BulkI l Bucket => Integer -> IO () -> Array l Bucket -> IO (Sources Int IO (Array N (Array N (Array F Char)))) -- | Everything flows. -- -- This module defines generic flows. The other flow types defined in -- Data.Repa.Flow.Chunked and Data.Repa.Flow.Simple are -- specialisations of this generic one. module Data.Repa.Flow.Generic -- | A bundle of stream sources, indexed by a value of type i, in -- some monad m, returning elements of type e. -- -- Elements can be pulled from each stream in the bundle individually. data Sources i m e Sources :: i -> (i -> (e -> m ()) -> m () -> m ()) -> Sources i m e -- | Number of sources in this bundle. [sourcesArity] :: Sources i m e -> i -- | Function to pull data from a bundle. Give it the index of the desired -- stream, a continuation that accepts an element, and a continuation to -- invoke when no more elements will ever be available. [sourcesPull] :: Sources i m e -> i -> (e -> m ()) -> m () -> m () -- | A bundle of stream sinks, indexed by a value of type i, in -- some monad m, returning elements of type e. -- -- Elements can be pushed to each stream in the bundle individually. data Sinks i m e Sinks :: i -> (i -> e -> m ()) -> (i -> m ()) -> Sinks i m e -- | Number of sources in the bundle. [sinksArity] :: Sinks i m e -> i -- | Push an element to one of the streams in the bundle. [sinksPush] :: Sinks i m e -> i -> e -> m () -- | Signal that no more elements will ever be available for this sink. It -- is ok to eject the same stream multiple times. [sinksEject] :: Sinks i m e -> i -> m () -- | Pull all available values from the sources and push them to the sinks. -- Streams in the bundle are processed sequentially, from first to last. -- -- drainS :: (Next i, Monad m) => Sources i m a -> Sinks i m a -> m () -- | Pull all available values from the sources and push them to the sinks, -- in parallel. We fork a thread for each of the streams and evaluate -- them all in parallel. -- -- drainP :: Sources Int IO a -> Sinks Int IO a -> IO () -- | Pull all available values from the sources and pass them to the given -- action. consumeS :: (Next i, Monad m) => (i -> a -> m ()) -> Sources i m a -> m () -- | Given an arity and a list of elements, yield sources that each produce -- all the elements. fromList :: States i m => i -> [a] -> m (Sources i m a) -- | Drain a single source into a list. toList1 :: States i m => i -> Sources i m a -> m [a] -- | Drain the given number of elements from a single source into a list. takeList1 :: States i m => Int -> i -> Sources i m a -> m [a] -- | Push elements into the associated streams of a bundle of sinks. pushList :: Monad m => [(i, a)] -> Sinks i m a -> m () -- | Push the elements of a list into the given stream of a bundle of -- sinks. pushList1 :: Monad m => i -> [a] -> Sinks i m a -> m () -- | Transform the stream indexes of a bundle of sources. -- -- The given transform functions should be inverses of each other, else -- you'll get a confusing result. mapIndex_i :: Monad m => (i1 -> i2) -> (i2 -> i1) -> Sources i1 m a -> m (Sources i2 m a) -- | Transform the stream indexes of a bundle of sinks. -- -- The given transform functions should be inverses of each other, else -- you'll get a confusing result. mapIndex_o :: Monad m => (i1 -> i2) -> (i2 -> i1) -> Sinks i1 m a -> m (Sinks i2 m a) -- | For a bundle of sources with a 2-d stream index, flip the components -- of the index. flipIndex2_i :: Monad m => Sources SH2 m a -> m (Sources SH2 m a) -- | For a bundle of sinks with a 2-d stream index, flip the components of -- the index. flipIndex2_o :: Monad m => Sinks SH2 m a -> m (Sinks SH2 m a) -- | Attach a finalizer to bundle of sources. -- -- For each stream in the bundle, the finalizer will be called the first -- time a consumer of that stream tries to pull an element when no more -- are available. -- -- The provided finalizer will be run after any finalizers already -- attached to the source. finalize_i :: States i m => (i -> m ()) -> Sources i m a -> m (Sources i m a) -- | Attach a finalizer to a bundle of sinks. -- -- For each stream in the bundle, the finalizer will be called the first -- time that stream is ejected. -- -- The provided finalizer will be run after any finalizers already -- attached to the sink. finalize_o :: States i m => (i -> m ()) -> Sinks i m a -> m (Sinks i m a) -- | Project out a single stream source from a bundle. project_i :: Monad m => i -> Sources i m a -> m (Sources () m a) -- | Project out a single stream sink from a bundle. project_o :: Monad m => i -> Sinks i m a -> m (Sinks () m a) -- | Yield sources that always produce the same value. repeat_i :: Monad m => i -> (i -> a) -> m (Sources i m a) -- | Yield sources of the given length that always produce the same value. replicate_i :: States i m => i -> Int -> (i -> a) -> m (Sources i m a) -- | Prepend some more elements into the front of some sources. prepend_i :: States i m => [a] -> Sources i m a -> m (Sources i m a) -- | Like prepend_i but only prepend the elements to the streams -- that match the given predicate. prependOn_i :: States i m => (i -> Bool) -> [a] -> Sources i m a -> m (Sources i m a) -- | Apply a function to every element pulled from some sources, producing -- some new sources. map_i :: Monad m => (a -> b) -> Sources i m a -> m (Sources i m b) -- | Apply a function to every element pulled from some sources, producing -- some new sources. map_o :: Monad m => (a -> b) -> Sinks i m b -> m (Sinks i m a) -- | Like map_i, but the worker function is also given the stream -- index. smap_i :: Monad m => (i -> a -> b) -> Sources i m a -> m (Sources i m b) -- | Like map_o, but the worker function is also given the stream -- index. smap_o :: Monad m => (i -> a -> b) -> Sinks i m b -> m (Sinks i m a) -- | Combine the elements of two flows with the given function. The worker -- function is also given the stream index. szipWith_ii :: (Ord i, Monad m) => (i -> a -> b -> c) -> Sources i m a -> Sources i m b -> m (Sources i m c) -- | Like szipWith_ii, but take a bundle of Sinks for the -- result elements, and yield a bundle of Sinks to accept the -- b elements. szipWith_io :: (Ord i, Monad m) => (i -> a -> b -> c) -> Sinks i m c -> Sources i m a -> m (Sinks i m b) -- | Like szipWith_ii, but take a bundle of Sinks for the -- result elements, and yield a bundle of Sinks to accept the -- a elements. szipWith_oi :: (Ord i, Monad m) => (i -> a -> b -> c) -> Sinks i m c -> Sources i m b -> m (Sinks i m a) -- | Combination of fold and filter. -- -- We walk over the stream start-to-end, maintaining an accumulator. At -- each point we can chose to emit an element, or not. compact_i :: (Monad m, States i m) => (s -> a -> (s, Maybe b)) -> s -> Sources i m a -> m (Sources i m b) -- | Start-to-end scan over each stream in a bundle. scan_i :: (Monad m, States i m) => (s -> a -> s) -> s -> Sources i m a -> m (Sources i m s) -- | For each stream in a bundle of sources, associated the element with -- their corresponding position in the stream. indexed_i :: (Monad m, States i m) => Sources i m a -> m (Sources i m (Int, a)) -- | Send the same data to two consumers. -- -- Given two argument sinks, yield a result sink. Pushing to the result -- sink causes the same element to be pushed to both argument sinks. dup_oo :: (Ord i, States i m) => Sinks i m a -> Sinks i m a -> m (Sinks i m a) -- | Send the same data to two consumers. -- -- Given an argument source and argument sink, yield a result source. -- Pulling an element from the result source pulls from the argument -- source, and pushes that element to the sink, as well as returning it -- via the result source. dup_io :: (Ord i, Monad m) => Sources i m a -> Sinks i m a -> m (Sources i m a) -- | Send the same data to two consumers. -- -- Like dup_io but with the arguments flipped. dup_oi :: (Ord i, Monad m) => Sinks i m a -> Sources i m a -> m (Sources i m a) -- | Connect an argument source to two result sources. -- -- Pulling from either result source pulls from the argument source. Each -- result source only gets the elements pulled at the time, so if one -- side pulls all the elements the other side won't get any. connect_i :: States i m => Sources i m a -> m (Sources i m a, Sources i m a) -- | Given a bundle of sources containing several streams, produce a new -- bundle containing a single stream that gets data from the former. -- -- Streams from the source are consumed in their natural order, and a -- complete stream is consumed before moving onto the next one. -- --
--   > import Data.Repa.Flow.Generic
--   > toList1 () =<< funnel_i =<< fromList (3 :: Int) [11, 22, 33]
--   [11,22,33,11,22,33,11,22,33]
--   
funnel_i :: (States i m, States () m) => Sources i m a -> m (Sources () m a) -- | Given a bundle of sinks consisting of a single stream, produce a new -- bundle of the given arity that sends all data to the former, ignoring -- the stream index. -- -- The argument stream is ejected only when all of the streams in the -- result bundle have been ejected. -- -- -- --
--   > import Data.Repa.Flow.Generic
--   > import Data.Repa.Array.Material
--   > import Data.Repa.Nice
--    
--   > let things = [(0 :: Int, "foo"), (1, "bar"), (2, "baz")]
--   > result <- capture_o B () (\k -> funnel_o 4 k >>= pushList things)
--   > nice result
--   [((),"foo"),((),"bar"),((),"baz")]
--   
funnel_o :: States i m => i -> Sinks () m a -> m (Sinks i m a) -- | Split the given number of elements from the head of a source returning -- those elements in a list, and yielding a new source for the rest. head_i :: States i m => Int -> Sources i m a -> i -> m ([a], Sources i m a) -- | From a stream of values which has consecutive runs of idential values, -- produce a stream of the lengths of these runs. -- -- Example: groups [4, 4, 4, 3, 3, 1, 1, 1, 4] = [3, 2, 3, 1] groups_i :: (Ord i, Monad m, Eq a) => Sources i m a -> m (Sources i m Int) -- | Given a stream of flags and a stream of values, produce a new stream -- of values where the corresponding flag was True. The length of the -- result is the length of the shorter of the two inputs. pack_ii :: (Ord i, Monad m) => Sources i m Bool -> Sources i m a -> m (Sources i m a) -- | Segmented fold. folds_ii :: (Ord i, Monad m) => (a -> a -> a) -> a -> Sources i m Int -> Sources i m a -> m (Sources i m a) -- | Apply a monadic function to every element pulled from some sources, -- producing some new sources. watch_i :: Monad m => (i -> a -> m ()) -> Sources i m a -> m (Sources i m a) -- | Pass elements to the provided action as they are pushed into the sink. watch_o :: Monad m => (i -> a -> m ()) -> Sinks i m a -> m (Sinks i m a) -- | Like watch_o but doesn't pass elements to another sink. trigger_o :: Monad m => i -> (i -> a -> m ()) -> m (Sinks i m a) -- | Create a bundle of sinks of the given arity and capture any data -- pushed to it. -- --
--   > import Data.Repa.Flow.Generic
--   > import Data.Repa.Array.Material
--   > import Data.Repa.Nice
--   > import Control.Monad
--   > liftM nice $ capture_o B 4 (k -> pushList [(0 :: Int, "foo"), (1, "bar"), (0, "baz")] k)
--   > [(0,"foo"),(1,"bar"),(0,"baz")]
--   
capture_o :: (Target lDst (i, a), Index lDst ~ Int) => Name lDst -> i -> (Sinks i IO a -> IO ()) -> IO (Array lDst (i, a)) -- | Like capture_o but also return the r-esult of the push -- function. rcapture_o :: (Target lDst (i, a), Index lDst ~ Int) => Name lDst -> i -> (Sinks i IO a -> IO b) -> IO (Array lDst (i, a), b) -- | A sink that ignores all incoming data. -- -- ignore_o :: Monad m => i -> m (Sinks i m a) -- | A sink that drops all data on the floor. -- -- abandon_o :: Monad m => i -> m (Sinks i m a) -- | Use the trace function from Debug.Trace to print each -- element that is pushed to a sink. -- -- trace_o :: (Show i, Show a, Monad m) => i -> m (Sinks i m a) -- | Given a bundle of sinks indexed by an Int, produce a result -- sink for arrays. -- -- Each time an array is pushed to the result sink, its elements are -- pushed to the corresponding streams of the argument sink. If there are -- more elements than sinks then then give them to the spill action. -- --
--   |          ..             |
--   | [w0,  x0,  y0,  z0]     |   :: Sinks () IO (Array l a)
--   | [w1,  x1,  y1,  z1, u1] |     (sink for a single stream of arrays)
--   |          ..             |
--   
--      |    |    |    |    |
--      v    v    v    v    .------> spilled
--   
--    | .. | .. | .. | .. |
--    | w0 | x0 | y0 | z0 |        :: Sinks Int IO a
--    | w1 | x1 | y1 | z1 |          (sink for several streams of elements)
--    | .. | .. | .. | .. |
--   
distribute_o :: BulkI l a => (Int -> a -> IO ()) -> Sinks Int IO a -> IO (Sinks () IO (Array l a)) -- | Like distribute_o, but drop spilled elements on the floor. ddistribute_o :: BulkI l a => Sinks Int IO a -> IO (Sinks () IO (Array l a)) -- | Like distribute_o, but with 2-d stream indexes. -- -- Given the argument and result sinks, when pushing to the result the -- stream index is used as the first component for the argument sink, and -- the index of the element in its array is used as the second component. -- -- If you want to the components of stream index the other way around -- then apply flipIndex2_o to the argument sinks. distribute2_o :: BulkI l a => (SH2 -> a -> IO ()) -> Sinks SH2 IO a -> IO (Sinks Int IO (Array l a)) -- | Like distribute2_o, but drop spilled elements on the floor. ddistribute2_o :: BulkI l a => Sinks SH2 IO a -> IO (Sinks Int IO (Array l a)) -- | Given a bundle of argument sinks, produce a result sink. Arrays of -- indices and elements are pushed to the result sink. On doing so, the -- elements are pushed into the corresponding streams of the argument -- sinks. -- -- If the index associated with an element does not have a corresponding -- stream in the argument sinks, then pass it to the provided spill -- function. -- --
--   |                      ..                         |
--   | [(0, v0), (1, v1), (0, v2), (0, v3), (2, v4)]   |  :: Sources Int IO (Array l (Int, a))
--   |                      ..                         |
--           \       \                          |
--            \       .------------.            |
--             v                   v            .---------> spilled
--   
--        |       ..       |       ..       |
--        |  [v0, v2, v3]  |      [v1]      |             :: Sinks Int IO (Array l a)
--        |       ..       |       ..       | 
--   
-- -- The following example uses capture_o to demonstrate how the -- shuffle_o operator can be used as one step of a bucket-sort. We -- start with two arrays of key-value pairs. In the result, the values -- from each block that had the same key are packed into the same tuple -- (bucket). -- --
--   > import Data.Repa.Flow.Generic    as G
--   > import Data.Repa.Array           as A
--   > import Data.Repa.Array.Material  as A
--   > import Data.Repa.Nice
--   
--   > let arr1 = A.fromList B [(0, 'a'), (1, 'b'), (2, 'c'), (0, 'd'), (0, 'c')]
--   > let arr2 = A.fromList B [(0, 'A'), (3, 'B'), (3, 'C')]
--   > result :: Array B (Int, Array U Char) 
--   >        <- capture_o B 4 (\k ->  shuffle_o B (error "spilled") k  
--   >                             >>= pushList1 () [arr1, arr2]) 
--   
--   > nice result
--   [(0,"adc"),(1,"b"),(2,"c"),(0,"A"),(3,"BC")]
--   
shuffle_o :: (BulkI lDst a, BulkI lSrc (Int, a), Windowable lDst a, Target lDst a, Elt a) => Name lSrc -> (Int -> Array lDst a -> IO ()) -> Sinks Int IO (Array lDst a) -> IO (Sinks () IO (Array lSrc (Int, a))) -- | Like shuffle_o, but drop spilled elements on the floor. dshuffle_o :: (BulkI lDst a, BulkI lSrc (Int, a), Windowable lDst a, Target lDst a, Elt a) => Name lSrc -> Sinks Int IO (Array lDst a) -> IO (Sinks () IO (Array lSrc (Int, a))) -- | Like dshuffle_o, but use the given function to decide which -- stream of the argument bundle each element should be pushed into. -- --
--   > import Data.Repa.Flow.Generic   as G
--   > import Data.Repa.Array          as A
--   > import Data.Repa.Array.Material as A
--   > import Data.Repa.Nice
--   > import Data.Char
--    
--   > let arr1 = A.fromList B "FooBAr"
--   > let arr2 = A.fromList B "BazLIKE"
--   > result :: Array B (Int, Array U Char) 
--            <- capture_o B 2 (\k ->  dshuffleBy_o B (\x -> if isUpper x then 0 else 1) k 
--                                 >>= pushList1 () [arr1, arr2])
--   > nice result
--   [(0,"FBA"),(1,"oor"),(0,"BLIKE"),(1,"az")]
--   
dshuffleBy_o :: (BulkI lDst a, BulkI lSrc a, Windowable lDst a, Target lDst a, Elt a) => Name lSrc -> (a -> Int) -> Sinks Int IO (Array lDst a) -> IO (Sinks () IO (Array lSrc a)) -- | Take elements from a flow and pack them into chunks. The chunks are -- limited to the given maximum length. A predicate can also be supplied -- to detect the last element in a chunk. chunkOn_i :: (States i IO, TargetI lDst a) => Name lDst -> Int -> (a -> Bool) -> Sources i IO a -> IO (Sources i IO (Array lDst a)) -- | Take a flow of chunks and flatten it into a flow of the individual -- elements. unchunk_i :: (BulkI l a, States i IO) => Sources i IO (Array l a) -> IO (Sources i IO a) -- | Input and Output for Chunked Flows. -- -- Most functions in this module are re-exports of the ones from -- Data.Repa.Flow.Generic.IO, but using the Sources and -- Sinks type synonyms for chunked flows. module Data.Repa.Flow.Chunked.IO -- | Like fileSourceRecords, but taking an existing file handle. sourceRecords :: BulkI l Bucket => Integer -> (Word8 -> Bool) -> IO () -> Array l Bucket -> IO (Sources Int IO N (Array F Word8)) -- | Read 8-bit ASCII characters from some files, using the given chunk -- length. sourceChars :: BulkI l Bucket => Integer -> Array l Bucket -> IO (Sources Int IO F Char) -- | Read data from some files, using the given chunk length. sourceBytes :: BulkI l Bucket => Integer -> Array l Bucket -> IO (Sources Int IO F Word8) -- | Write 8-bit ASCII characters to the given file handles. sinkChars :: BulkI l Bucket => Array l Bucket -> IO (Sinks Int IO F Char) -- | Write chunks of data to the given file handles. sinkBytes :: BulkI l Bucket => Array l Bucket -> IO (Sinks Int IO F Word8) module Data.Repa.Flow.Generic.Debug -- | Given a source index and a length, pull enough chunks from the source -- to build a list of the requested length, and discard the remaining -- elements in the final chunk. -- -- more :: (States i IO, Nicer a) => i -> Sources i IO a -> IO [Nice a] -- | Like more but also specify now many elements you want. more' :: (States i IO, Nicer a) => i -> Int -> Sources i IO a -> IO [Nice a] -- | Like more, but print results in a tabular form to the console. moret :: (States i IO, Nicer [a], Presentable (Nice [a])) => i -> Sources i IO a -> IO () -- | Like more', but print results in tabular form to the console. moret' :: (States i IO, Nicer [a], Presentable (Nice [a])) => i -> Int -> Sources i IO a -> IO () -- | Like more, but show elements in their raw format. morer :: States i IO => i -> Sources i IO a -> IO [a] -- | Like more', but show elements in their raw format. morer' :: States i IO => i -> Int -> Sources i IO a -> IO [a] -- | Convert some value to a nice form. -- -- In particular: -- -- -- -- As ghci automatically pretty prints lists, using nice is more -- fun than trying to show the raw Repa array representations. class Nicer a where type family Nice a :: * nice :: Nicer a => a -> Nice a -- | Convert some value to a form presentable to the user. -- -- Like show but we allow the nesting structure to be preserved so -- it can be displayed in tabular format. class Presentable a present :: Presentable a => a -> Present module Data.Repa.Flow.Simple -- | Source consisting of a single stream. type Source m e = Sources () m e -- | Sink consisting of a single stream. type Sink m e = Sinks () m e -- | Pull all available values from the source and push them to the sink. drainS :: Monad m => Source m a -> Sink m a -> m () -- | Given an arity and a list of elements, yield a source that produces -- all the elements. fromList :: States () m => [a] -> m (Source m a) -- | Drain a source into a list. toList :: States () m => Source m a -> m [a] -- | Drain the given number of elements from a single source into a list. takeList :: States () m => Int -> Source m a -> m [a] -- | Attach a finalizer to a source. -- -- The finalizer will be called the first time a consumer of that stream -- tries to pull an element when no more are available. -- -- The provided finalizer will be run after any finalizers already -- attached to the source. finalize_i :: States () m => m () -> Source m a -> m (Source m a) -- | Attach a finalizer to a sink. -- -- The finalizer will be called the first time the stream is ejected. -- -- The provided finalizer will be run after any finalizers already -- attached to the sink. finalize_o :: States () m => m () -> Sink m a -> m (Sink m a) -- | Yield a source that always produces the same value. repeat_i :: States () m => a -> m (Source m a) -- | Yield a source of the given length that always produces the same -- value. replicate_i :: States () m => Int -> a -> m (Source m a) -- | Prepend some more elements to the front of a source. prepend_i :: States () m => [a] -> Source m a -> m (Source m a) -- | Apply a function to every element pulled from some source, producing a -- new source. map_i :: States () m => (a -> b) -> Source m a -> m (Source m b) -- | Apply a function to every element pushed to some sink, producing a new -- sink. map_o :: States () m => (a -> b) -> Sink m b -> m (Sink m a) -- | Send the same data to two consumers. -- -- Given two argument sinks, yield a result sink. Pushing to the result -- sink causes the same element to be pushed to both argument sinks. dup_oo :: States () m => Sink m a -> Sink m a -> m (Sink m a) -- | Send the same data to two consumers. -- -- Given an argument source and argument sink, yield a result source. -- Pulling an element from the result source pulls from the argument -- source, and pushes that element to the sink, as well as returning it -- via the result source. dup_io :: States () m => Source m a -> Sink m a -> m (Source m a) -- | Send the same data to two consumers. -- -- Like dup_io but with the arguments flipped. dup_oi :: States () m => Sink m a -> Source m a -> m (Source m a) -- | Connect an argument source to two result sources. -- -- Pulling from either result source pulls from the argument source. Each -- result source only gets the elements pulled at the time, so if one -- side pulls all the elements the other side won't get any. connect_i :: States () m => Source m a -> m (Source m a, Source m a) -- | Split the given number of elements from the head of a source, -- returning those elements in a list, and yielding a new source for the -- rest. head_i :: States () m => Int -> Source m a -> m ([a], Source m a) -- | Peek at the given number of elements in the stream, returning a result -- stream that still produces them all. peek_i :: States () m => Int -> Source m a -> m ([a], Source m a) -- | From a stream of values which has consecutive runs of idential values, -- produce a stream of the lengths of these runs. -- -- Example: groups [4, 4, 4, 3, 3, 1, 1, 1, 4] = [3, 2, 3, 1] groups_i :: (Monad m, Eq a) => Source m a -> m (Source m Int) -- | Given a stream of flags and a stream of values, produce a new stream -- of values where the corresponding flag was True. The length of the -- result is the length of the shorter of the two inputs. pack_ii :: Monad m => Source m Bool -> Source m a -> m (Source m a) -- | Segmented fold. folds_ii :: Monad m => (a -> a -> a) -> a -> Source m Int -> Source m a -> m (Source m a) -- | Apply a monadic function to every element pulled from a source -- producing a new source. watch_i :: Monad m => (a -> m ()) -> Source m a -> m (Source m a) -- | Pass elements to the provided action as they are pushed to the sink. watch_o :: Monad m => (a -> m ()) -> Sink m a -> m (Sink m a) -- | Like watch but doesn't pass elements to another sink. trigger_o :: Monad m => (a -> m ()) -> m (Sink m a) -- | A sink that ignores all incoming elements. -- -- This sink is strict in the elements, so they are demanded before being -- discarded. Haskell debugging thunks attached to the elements will be -- demanded. ignore_o :: Monad m => m (Sink m a) -- | A sink that drops all data on the floor. -- -- This sink is non-strict in the elements. Haskell tracing thinks -- attached to the elements will *not* be demanded. abandon_o :: Monad m => m (Sink m a) -- | Open some files as buckets and use them as Sources. fromFiles :: [FilePath] -> (Array B Bucket -> IO b) -> IO b -- | Read data from a file, using the given chunk length. -- -- -- -- The file will be closed the first time the consumer tries to pull an -- element from the associated stream when no more are available. sourceBytes :: Integer -> Bucket -> IO (Source IO (Array F Word8)) -- | Read complete records of data from a file, using the given chunk -- length -- -- The records are separated by a special terminating character, which -- the given predicate detects. After reading a chunk of data we seek to -- just after the last complete record that was read, so we can continue -- to read more complete records next time. -- -- If we cannot find an end-of-record terminator in the chunk then apply -- the given failure action. The records can be no longer than the chunk -- length. This fact guards against the case where a large input file is -- malformed and contains no end-of-record terminators, as we won't try -- to read the whole file into memory. -- -- -- -- The file will be closed the first time the consumer tries to pull an -- element from the associated stream when no more are available. sourceRecords :: Integer -> (Word8 -> Bool) -> IO () -> Bucket -> IO (Source IO (Array N (Array F Word8))) -- | Open some files for writing as individual buckets and pass them to the -- given consumer. toFiles :: [FilePath] -> (Array B Bucket -> IO b) -> IO b -- | Write chunks of data to the given files. -- -- The file will be closed when the associated stream is ejected. sinkBytes :: Bucket -> IO (Sink IO (Array F Word8)) module Data.Repa.Flow.Chunked -- | A bundle of sources, where the elements are chunked into arrays. type Sources i m l e = Sources i m (Array l e) -- | A bundle of sinks, where the elements are chunked into arrays. type Sinks i m l e = Sinks i m (Array l e) -- | Shorthand for common type classes. type Flow i m l a = (Ord i, Monad m, BulkI l a, States i m) -- | Pull all available values from the sources and push them to the sinks. drainS :: (Next i, Monad m) => Sources i m r a -> Sinks i m r a -> m () -- | Pull all available values from the sources and pass them to the given -- action. consumeS :: (Next i, Monad m, Bulk r a) => Sources i m r a -> (i -> a -> m ()) -> m () -- | Given an arity and a list of elements, yield sources that each produce -- all the elements. -- -- fromList :: (States i m, TargetI l a) => Name l -> i -> [a] -> m (Sources i m l a) -- | Like fromLists but take a list of lists, where each of the -- inner lists is packed into a single chunk. fromLists :: (States i m, TargetI l a) => Name l -> i -> [[a]] -> m (Sources i m l a) -- | Drain a single source into a list of elements. toList1 :: (States i m, BulkI l a) => i -> Sources i m l a -> m [a] -- | Drain a single source into a list of chunks. toLists1 :: (States i m, BulkI l a) => i -> Sources i m l a -> m [[a]] -- | Attach a finalizer to a bundle of sources. -- -- For each stream in the bundle, the finalizer will be called the first -- time a consumer of that stream tries to pull an element when no more -- are available. -- -- The provided finalizer will be run after any finalizers already -- attached to the source. finalize_i :: States i m => (i -> m ()) -> Sources i m l a -> m (Sources i m l a) -- | Attach a finalizer to a bundle of sinks. -- -- The finalizer will be called the first time the stream is ejected. -- -- The provided finalizer will be run after any finalizers already -- attached to the sink. finalize_o :: States i m => (i -> m ()) -> Sinks i m l a -> m (Sinks i m l a) replicates_i :: (Flow i m lSrc (Int, a), TargetI lDst a) => Name lDst -> Sources i m lSrc (Int, a) -> m (Sources i m lDst a) -- | Map a function over elements pulled from a source. smap_i :: (Flow i m l1 a, TargetI l2 b) => (i -> a -> b) -> Sources i m l1 a -> m (Sources i m l2 b) -- | Map a function over elements pushed into a sink. smap_o :: (Flow i m l1 a, TargetI l2 b) => (i -> a -> b) -> Sinks i m l2 b -> m (Sinks i m l1 a) -- | Combine the elements of two flows with the given function. szipWith_ii :: (Ord i, States i m, BulkI lSrc1 a, BulkI lSrc2 b, TargetI lDst c, Windowable lSrc1 a, Windowable lSrc2 b) => Name lDst -> (i -> a -> b -> c) -> Sources i m lSrc1 a -> Sources i m lSrc2 b -> m (Sources i m lDst c) -- | Apply a generic stream process to all the streams in a bundle of -- sources. process_i :: (States i m, BulkI lSrc a, Bulk lDst b, Bulk lDst (Array lDst b), TargetI lDst b, TargetI lDst (Array lDst b)) => (s -> a -> (s, Array lDst b)) -> s -> Sources i m lSrc a -> m (Sources i m lDst b) -- | Apply a generic stream process to all the streams in a bundle of -- sources. unfolds_i :: (States i m, BulkI lSrc a, Bulk lDst b, TargetI lDst b) => (a -> s -> StepUnfold s b) -> s -> Sources i m lSrc a -> m (Sources i m lDst b) data StepUnfold s a :: * -> * -> * StepUnfoldGive :: a -> s -> StepUnfold s a StepUnfoldNext :: a -> s -> StepUnfold s a StepUnfoldBump :: s -> StepUnfold s a StepUnfoldFinish :: s -> StepUnfold s a -- | Split the given number of elements from the head of a source, -- retrurning those elements in a list, and yielding a new source for the -- rest. -- -- head_i :: (States i m, Windowable l a, Index l ~ Int) => Int -> Sources i m l a -> i -> m ([a], Sources i m l a) -- | From a stream of values which has consecutive runs of idential values, -- produce a stream of the lengths of these runs. -- --
--   groupsBy (==) [4, 4, 4, 3, 3, 1, 1, 1, 4] 
--    => [3, 2, 3, 1]
--   
groupsBy_i :: GroupsDict i m lVal lGrp lLen a => Name lGrp -> Name lLen -> (a -> a -> Bool) -> Sources i m lVal a -> m (Sources i m (T2 lGrp lLen) (a, Int)) -- | Dictionaries needed to perform a grouping. type GroupsDict i m lVal lGrp lLen a = (Flow i m lVal a, Index lVal ~ Int, TargetI lGrp a, TargetI lLen Int) -- | Fold all elements of all streams in a bundle individually, returning -- an array of per-stream results. foldlS :: (States Int m, Target lDst a, Index lDst ~ Int, BulkI lSrc b) => Name lDst -> (a -> b -> a) -> a -> Sources Int m lSrc b -> m (Array lDst a) -- | Fold all elements of all streams in a bundle together, one stream -- after the other, returning the single final value. foldlAllS :: (States () m, BulkI lSrc b) => (a -> b -> a) -> a -> Sources Int m lSrc b -> m a -- | Segmented fold over vectors of segment lengths and input values. folds_i :: FoldsDict i m lSeg lElt lGrp lRes n a b => Name lGrp -> Name lRes -> (a -> b -> b) -> b -> Sources i m lSeg (n, Int) -> Sources i m lElt a -> m (Sources i m (T2 lGrp lRes) (n, b)) -- | Dictionaries needed to perform a segmented fold. type FoldsDict i m lSeg lElt lGrp lRes n a b = (States i m, Windowable lSeg (n, Int), Windowable lElt a, BulkI lSeg (n, Int), BulkI lElt a, BulkI lGrp n, BulkI lRes b, TargetI lElt a, TargetI lGrp n, TargetI lRes b) -- | Hook a monadic function to some sources, which will be passed every -- chunk that is pulled from the result. watch_i :: Monad m => (i -> Array l a -> m ()) -> Sources i m l a -> m (Sources i m l a) -- | Hook a monadic function to some sinks, which will be passed every -- chunk that is pushed to the result. watch_o :: Monad m => (i -> Array l a -> m ()) -> Sinks i m l a -> m (Sinks i m l a) -- | Like watch_o but discard the incoming chunks after they are -- passed to the function. trigger_o :: Monad m => i -> (i -> Array l a -> m ()) -> m (Sinks i m l a) -- | A sink that ignores all incoming data. -- -- ignore_o :: Monad m => i -> m (Sinks i m l a) -- | Yield a bundle of sinks of the given arity that drops all data on the -- floor. -- -- This sink is non-strict in the chunks. Haskell tracing thunks attached -- to the chunks will *not* be demanded. abandon_o :: Monad m => i -> m (Sinks i m l a) module Data.Repa.Flow.Auto.ZipWith -- | Combine corresponding elements of three sources with the given -- function. zipWith3_i :: (Flow a, Flow b, Flow c, Build b, Build c, Build d) => (a -> b -> c -> d) -> Sources a -> Sources b -> Sources c -> IO (Sources d) -- | Combine corresponding elements of four sources with the given -- function. zipWith4_i :: (Flow a, Flow b, Flow c, Flow d, Build a, Build b, Build c, Build d, Build e) => (a -> b -> c -> d -> e) -> Sources a -> Sources b -> Sources c -> Sources d -> IO (Sources e) -- | Combine corresponding elements of five sources with the given -- function. zipWith5_i :: (Flow a, Flow b, Flow c, Flow d, Flow e, Build a, Build b, Build c, Build d, Build e, Build f) => (a -> b -> c -> d -> e -> f) -> Sources a -> Sources b -> Sources c -> Sources d -> Sources e -> IO (Sources f) -- | Combine corresponding elements of six sources with the given function. zipWith6_i :: (Flow a, Flow b, Flow c, Flow d, Flow e, Flow f, Build a, Build b, Build c, Build d, Build e, Build f, Build g) => (a -> b -> c -> d -> e -> f -> g) -> Sources a -> Sources b -> Sources c -> Sources d -> Sources e -> Sources f -> IO (Sources g) -- | Combine corresponding elements of seven sources with the given -- function. zipWith7_i :: (Flow a, Flow b, Flow c, Flow d, Flow e, Flow f, Flow g, Build a, Build b, Build c, Build d, Build e, Build f, Build g, Build h) => (a -> b -> c -> d -> e -> f -> g -> h) -> Sources a -> Sources b -> Sources c -> Sources d -> Sources e -> Sources f -> Sources g -> IO (Sources h) -- | This module defines the default specialisation of flows that appears -- in Data.Repa.Flow. Each stream in the bundle is indexed by a -- single integer, and stream state is stored using the IO monad. module Data.Repa.Flow.Auto -- | A bundle of stream sources, where the elements of the stream are -- chunked into arrays. type Sources a = Sources Int IO A a -- | A bundle of stream sinks, where the elements of the stream are chunked -- into arrays. type Sinks a = Sinks Int IO A a -- | Shorthand for common type classes. type Flow a = (Flow Int IO A a, Windowable A a) -- | Yield the number of streams in the bundle. sourcesArity :: Sources a -> Int -- | Yield the number of streams in the bundle. sinksArity :: Sinks a -> Int -- | Given an arity and a list of elements, yield sources that each produce -- all the elements. -- -- fromList :: Build a => Int -> [a] -> IO (Sources a) -- | Like fromList but take a list of lists. Each each of the inner -- lists is packed into a single chunk. fromLists :: Build a => Int -> [[a]] -> IO (Sources a) -- | Drain a single source from a bundle into a list of elements. -- -- toList1 :: Build a => Int -> Sources a -> IO [a] -- | Drain a single source from a bundle into a list of chunks. -- -- toLists1 :: Build a => Int -> Sources a -> IO [[a]] -- | Given an arity and an array of elements, yield sources that each -- produce all the elements. -- -- fromArray :: Build a => Int -> Array a -> IO (Sources a) -- | Like fromArray but take an array of arrays. Each of the inner -- arrays is packed into a single chunk. fromArrays :: (Elem a, Build a) => Int -> Array (Array a) -> IO (Sources a) -- | Drain a single source from a bundle into an array of elements. -- -- toArray1 :: (Elem a, Build a) => Int -> Sources a -> IO (Array a) -- | Drain a single source from a bundle into an array of elements. -- -- toArrays1 :: (Elem a, Build a) => Int -> Sources a -> IO (Array (Array a)) -- | Pull all available values from the sources and push them to the sinks. -- Streams in the bundle are processed sequentially, from first to last. -- -- drainS :: Sources a -> Sinks a -> IO () -- | Pull all available values from the sources and push them to the sinks, -- in parallel. We fork a thread for each of the streams and evaluate -- them all in parallel. -- -- drainP :: Sources a -> Sinks a -> IO () -- | Pull all available values from the sources and pass them to the given -- action. consumeS :: Bulk A a => Sources a -> (Int -> a -> IO ()) -> IO () -- | Segmented replicate. replicates_i :: (Flow (Int, a), Build a) => Sources (Int, a) -> IO (Sources a) -- | Apply a function to all elements pulled from some sources. map_i :: (Flow a, Build b) => (a -> b) -> Sources a -> IO (Sources b) -- | Apply a function to all elements pushed to some sinks. map_o :: (Flow a, Build b) => (a -> b) -> Sinks b -> IO (Sinks a) -- | Combine corresponding elements of two sources with the given function. zipWith_i :: (Flow a, Flow b, Build c) => (a -> b -> c) -> Sources a -> Sources b -> IO (Sources c) -- | Apply a generic stream process to a bundle of sources. process_i :: (Flow a, Flow b, Build b) => (s -> a -> (s, Array b)) -> s -> Sources a -> IO (Sources b) -- | Send the same data to two consumers. -- -- Given two argument sinks, yield a result sink. Pushing to the result -- sink causes the same element to be pushed to both argument sinks. dup_oo :: Sinks a -> Sinks a -> IO (Sinks a) -- | Send the same data to two consumers. -- -- Given an argument source and argument sink, yield a result source. -- Pulling an element from the result source pulls from the argument -- source, and pushes that element to the sink, as well as returning it -- via the result source. dup_io :: Sources a -> Sinks a -> IO (Sources a) -- | Send the same data to two consumers. -- -- Like dup_io but with the arguments flipped. dup_oi :: Sinks a -> Sources a -> IO (Sources a) -- | Connect an argument source to two result sources. -- -- Pulling from either result source pulls from the argument source. Each -- result source only gets the elements pulled at the time, so if one -- side pulls all the elements the other side won't get any. connect_i :: Sources a -> IO (Sources a, Sources a) -- | Hook a worker function to some sources, which will be passed every -- chunk that is pulled from each source. -- -- watch_i :: (Int -> Array A a -> IO ()) -> Sources a -> IO (Sources a) -- | Hook a worker function to some sinks, which will be passed every chunk -- that is pushed to each sink. -- -- watch_o :: (Int -> Array A a -> IO ()) -> Sinks a -> IO (Sinks a) -- | Create a bundle of sinks of the given arity that pass incoming chunks -- to a worker function. -- -- trigger_o :: Int -> (Int -> Array A a -> IO ()) -> IO (Sinks a) -- | Create a bundle of sinks of the given arity that drop all data on the -- floor. -- -- ignore_o :: Int -> IO (Sinks a) -- | Create a bundle of sinks of the given arity that drop all data on the -- floor. -- -- abandon_o :: Int -> IO (Sinks a) -- | Given a source index and a length, split the a list of that length -- from the front of the source. Yields a new source for the remaining -- elements. -- -- head_i :: Flow a => Int -> Int -> Sources a -> IO (Maybe ([a], Sources a)) -- | Concatenate a flow of arrays into a flow of the elements. concat_i :: (Flow a, Build a) => Sources (Array a) -> IO (Sources a) -- | Select a single column from a flow of rows of fields. select_i :: (Select n (Array fs), Select' n (Array fs) ~ Array (Select' n fs)) => Nat n -> Sources fs -> IO (Sources (Select' n fs)) -- | Select a single column from a flow of fields. select_o :: (Select n (Array fs), Select' n (Array fs) ~ Array (Select' n fs)) => Nat n -> Sinks (Select' n fs) -> IO (Sinks fs) -- | Discard a single column from a flow of fields. discard_i :: (Discard n (Array fs), Discard' n (Array fs) ~ Array (Discard' n fs)) => Nat n -> Sources fs -> IO (Sources (Discard' n fs)) -- | Discard a single column from a flow of fields. discard_o :: (Discard n (Array fs), Discard' n (Array fs) ~ Array (Discard' n fs)) => Nat n -> Sinks (Discard' n fs) -> IO (Sinks fs) -- | Mask columns from a flow of fields. mask_i :: (Mask ms (Array fs), Mask' ms (Array fs) ~ Array (Mask' ms fs)) => ms -> Sources fs -> IO (Sources (Mask' ms fs)) -- | Mask columns from a flow of fields. mask_o :: (Mask ms (Array fs), Mask' ms (Array fs) ~ Array (Mask' ms fs)) => ms -> Sinks (Mask' ms fs) -> IO (Sinks fs) -- | Scan through some sources to find runs of matching elements, and count -- the lengths of those runs. -- --
--   > F.toList1 0 =<< F.groups_i =<< F.fromList 1 "waabbbblle"
--   [('w',1),('a',2),('b',4),('l',2),('e',1)]
--   
groups_i :: (GroupsDict a, Eq a) => Sources a -> IO (Sources (a, Int)) -- | Like groupsBy, but take a function to determine whether two -- consecutive values should be in the same group. groupsBy_i :: GroupsDict a => (a -> a -> Bool) -> Sources a -> IO (Sources (a, Int)) -- | Dictionaries needed to perform a grouping. type GroupsDict a = GroupsDict Int IO A A A a -- | Fold all the elements of each stream in a bundle, one stream after the -- other, returning an array of fold results. foldlS :: (Flow b, Build a) => (a -> b -> a) -> a -> Sources b -> IO (Array A a) -- | Fold all the elements of each stream in a bundle, one stream after the -- other, returning an array of fold results. foldlAllS :: Flow b => (a -> b -> a) -> a -> Sources b -> IO a -- | Given streams of lengths and values, perform a segmented fold where -- fold segments of values of the corresponding lengths are folded -- together. -- --
--   > sSegs <- F.fromList 1 [('a', 1), ('b', 2), ('c', 4), ('d', 0), ('e', 1), ('f', 5 :: Int)]
--   > sVals <- F.fromList 1 [10, 20, 30, 40, 50, 60, 70, 80, 90 :: Int]
--   
--   > F.toList1 0 =<< F.folds_i (+) 0 sSegs sVals
--   [('a',10),('b',50),('c',220),('d',0),('e',80)]
--   
-- -- If not enough input elements are available to fold a complete segment -- then no output is produced for that segment. However, trailing zero -- length segments still produce the initial value for the fold. -- --
--   > sSegs <- F.fromList 1 [('a', 1), ('b', 2), ('c', 0), ('d', 0), ('e', 0 :: Int)]
--   > sVals <- F.fromList 1 [10, 20, 30 :: Int]
--   
--   > F.toList1 0 =<< F.folds_i (*) 1 sSegs sVals
--   [('a',10),('b',600),('c',1),('d',1),('e',1)]
--   
folds_i :: FoldsDict n a b => (a -> b -> b) -> b -> Sources (n, Int) -> Sources a -> IO (Sources (n, b)) -- | Dictionaries needed to perform a segmented fold. type FoldsDict n a b = FoldsDict Int IO A A A A n a b -- | Combination of groupsBy_i and folds_i. We determine the -- the segment lengths while performing the folds. -- -- Note that a SQL-like groupby aggregations can be performed using this -- function, provided the data is pre-sorted on the group key. For -- example, we can take the average of some groups of values: -- --
--   > sKeys <- F.fromList 1 "waaaabllle"
--   > sVals <- F.fromList 1 [10, 20, 30, 40, 50, 60, 70, 80, 90, 100 :: Double]
--   
--   > sResult <-  F.map_i (\(key, (acc, n)) -> (key, acc / n))
--             =<< F.foldGroupsBy_i (==) (\x (acc, n) -> (acc + x, n + 1)) (0, 0) sKeys sVals
--   
--   > F.toList1 0 sResult
--   [10.0,35.0,60.0,80.0,100.0]
--   
foldGroupsBy_i :: (FoldGroupsDict n a b) => (n -> n -> Bool) -> (a -> b -> b) -> b -> Sources n -> Sources a -> IO (Sources (n, b)) type FoldGroupsDict n a b = (BulkI A n, Material A a, Material A n, Material A b) -- | Attach a finalizer to some sources. -- -- finalize_i :: (Int -> IO ()) -> Sources a -> IO (Sources a) -- | Attach a finalizer to some sinks. -- -- finalize_o :: (Int -> IO ()) -> Sinks a -> IO (Sinks a) module Data.Repa.Flow.Auto.Debug -- | Given a source index and a length, pull enough chunks from the source -- to build a list of the requested length, and discard the remaining -- elements in the final chunk. -- -- more :: (Flow a, Nicer a) => Int -> Sources a -> IO (Maybe [Nice a]) -- | Like more but also specify now many elements you want. more' :: (Flow a, Nicer a) => Int -> Int -> Sources a -> IO (Maybe [Nice a]) -- | Like more, but print results in a tabular form to the console. moret :: (Flow a, Nicer [a], Presentable (Nice [a])) => Int -> Sources a -> IO () -- | Like more', but print results in tabular form to the console. moret' :: (Flow a, Nicer [a], Presentable (Nice [a])) => Int -> Int -> Sources a -> IO () -- | Like more, but show elements in their raw format. morer :: Flow a => Int -> Sources a -> IO (Maybe [a]) -- | Like more', but show elements in their raw format. morer' :: Flow a => Int -> Int -> Sources a -> IO (Maybe [a]) -- | Convert some value to a nice form. -- -- In particular: -- -- -- -- As ghci automatically pretty prints lists, using nice is more -- fun than trying to show the raw Repa array representations. class Nicer a where type family Nice a :: * nice :: Nicer a => a -> Nice a -- | Convert some value to a form presentable to the user. -- -- Like show but we allow the nesting structure to be preserved so -- it can be displayed in tabular format. class Presentable a present :: Presentable a => a -> Present -- | Read and write files. module Data.Repa.Flow.Auto.SizedIO -- | Like sourceBytes, but with the default chunk size. sourceBytes :: Integer -> Array B Bucket -> IO (Sources Word8) -- | Like sourceChars, but with the default chunk size. sourceChars :: Integer -> Array B Bucket -> IO (Sources Char) -- | Like sourceLines, but with the default chunk size and error -- action. sourceLines :: Integer -> IO () -> Array B Bucket -> IO (Sources (Array A Char)) -- | Like sourceRecords, but with the default chunk size and error -- action. sourceRecords :: Integer -> (Word8 -> Bool) -> IO () -> Array B Bucket -> IO (Sources (Array A Word8)) -- | Read a file containing Tab-Separated-Values. sourceTSV :: Integer -> IO () -> Array B Bucket -> IO (Sources (Array A (Array A Char))) -- | Read a file containing Comma-Separated-Values. sourceCSV :: Integer -> IO () -> Array B Bucket -> IO (Sources (Array A (Array A Char))) -- | Read and write files. -- -- The functions in this module are wrappers for the ones in -- Data.Repa.Flow.Default.SizedIO that use a default chunk size of -- 64kBytes and just call error if the source file appears -- corruped. module Data.Repa.Flow.Auto.IO -- | The default chunk size of 64kBytes. defaultChunkSize :: Integer -- | Read data from some files, using the given chunk length. sourceBytes :: Array B Bucket -> IO (Sources Word8) -- | Read 8-bit ASCII characters from some files, using the given chunk -- length. sourceChars :: Array B Bucket -> IO (Sources Char) -- | Read complete lines of data from a text file, using the given chunk -- length. We read as many complete lines as will fit into each chunk. -- -- sourceLines :: Array B Bucket -> IO (Sources (Array A Char)) -- | Read complete records of data form a file, into chunks of the given -- length. We read as many complete records as will fit into each chunk. -- -- The records are separated by a special terminating character, which -- the given predicate detects. After reading a chunk of data we seek the -- file to just after the last complete record that was read, so we can -- continue to read more complete records next time. -- -- If we cannot fit at least one complete record in the chunk then -- perform the given failure action. Limiting the chunk length guards -- against the case where a large input file is malformed, as we won't -- try to read the whole file into memory. -- -- sourceRecords :: (Word8 -> Bool) -> Array B Bucket -> IO (Sources (Array A Word8)) -- | Read a file containing Tab-Separated-Values. sourceTSV :: Array B Bucket -> IO (Sources (Array A (Array A Char))) -- | Read a file containing Comma-Separated-Values. sourceCSV :: Array B Bucket -> IO (Sources (Array A (Array A Char))) -- | Read the lines of a text file, converting each line to values with the -- given format. sourceFormatLn :: (Unpackable format, Target A (Value format)) => Integer -> IO () -> IO (Array A Word8 -> IO ()) -> format -> Array B Bucket -> IO (Sources (Value format)) -- | Write 8-bit bytes to some files. sinkBytes :: Array B Bucket -> IO (Sinks Word8) -- | Write vectors of text lines to the given files handles. sinkLines :: Array B Bucket -> IO (Sinks (Array A Char)) -- | Write 8-bit ASCII characters to some files. sinkChars :: Array B Bucket -> IO (Sinks Char) -- | Create sinks that convert values to some format and writes them to -- buckets. -- --
--   > import Data.Repa.Flow           as F
--   > import Data.Repa.Convert.Format as F
--   > :{
--     do let format = FixString ASCII 10 :*: Float64be :*: Int16be
--        let vals   = listFormat format
--                      [ "red"   :*: 5.3    :*: 100
--                      , "green" :*: 2.8    :*: 93 
--                      , "blue"  :*: 0.99   :*: 42 ]
--   
--        ss  <- F.fromList 1 vals
--        out <- toFiles' ["colors.bin"] 
--            $  sinkFormatLn format (error "convert failed")
--        drainS ss out
--     :}
--   
sinkFormatLn :: (Packable format, Bulk A (Value format), Show format) => format -> IO () -> Array B Bucket -> IO (Sinks (Value format)) module Data.Repa.Flow.Auto.Format -- | Pack elements into the given storage formats. packFormat_i :: (Packable format, Elem (Value format)) => format -> Sources (Value format) -> IO (Sources (Array Word8)) -- | Like packFormat_i, but return sources of flat bytes. flatPackFormat_i :: (Packable format, Elem (Value format)) => format -> Sources (Value format) -> IO (Sources Word8) -- | Like packFormat_i, but also append a newline character after -- every packed element. packFormatLn_i :: (Packable format, Elem (Value format)) => format -> Sources (Value format) -> IO (Sources (Array Word8)) -- | Like packFormatLn_i, but return sources of flat bytes. flatPackFormatLn_i :: (Packable format, Elem (Value format)) => format -> Sources (Value format) -> IO (Sources Word8) -- | Like packFormatLn_i, but use a default, human-readable format -- to encode the values. packAsciiLn_i :: (FormatAscii a, a ~ Value (FormatAscii' a), Elem a, Packable (FormatAscii' a)) => Sources a -> IO (Sources (Array Word8)) -- | Like packAsciiLn_i, but return sources of flat bytes. flatPackAsciiLn_i :: (FormatAscii a, a ~ Value (FormatAscii' a), Elem a, Packable (FormatAscii' a)) => Sources a -> IO (Sources Word8) -- | Like packFormatLn_i, but use a default, human-readable format -- to encode the values. keyPackAsciiLn_i :: (FormatAscii a, a ~ Value (FormatAscii' a), Elem a, Packable (FormatAscii' a), Elem k, Build k) => Sources (k, a) -> IO (Sources (k, Array Word8)) -- |

Getting Started

-- -- A flow consists of a bundle of individual streams. Here we create a -- bundle of two streams, using different files for each. Data will be -- read in chunks, using the default chunk size of 64kBytes. -- --
--   > import Data.Repa.Array           as A
--   > import Data.Repa.Flow            as F
--   > import Data.Repa.Flow.IO         as F
--   > import Data.Repa.Flow.Auto.Debug as F
--   
--   > ws <- fromFiles ["/usr/share/dict/words", "/usr/share/dict/cracklib-small"] sourceLines
--   
-- -- Show the first few elements of the first chunk of the first file. -- --
--   > more 0 ws
--   Just ["A","A's","AA's","AB's","ABM's","AC's","ACTH's","AI's" ...]
--   
-- -- The more function is helpful for debugging. It pulls a whole -- chunk from a source, displays the requested number of elements from -- the front of it, then discards the rest. In production code you could -- use head_i to split a few elements from a stream while -- retaining the rest. -- -- Use more' to show more elements at a time. We've already pulled -- the first chunk, so here are the first 100 elements from the second -- chunk: -- --
--   > more' 0 100 ws
--   Just ["Jubal","Judah","Judaic","Judaism","Judaism's","Judaisms","Judas" ...]
--   
-- -- Use moret to display elements in tabular form. Here are the -- first few elements of the second stream in the bundle: -- --
--   > moret 1 ws
--   "10th"   
--   "1st"    
--   "2nd"    
--   "3rd"    
--   "4th"    
--   "5th"    
--   ...
--   
-- -- Lets convert the characters to upper-case. -- --
--   > import Data.Char
--   > up <- F.map_i (A.map toUpper) ws
--   > more 0 up
--   Just ["UTOPIAN","UTOPIAN'S","UTOPIANS","UTOPIAS","UTRECHT" ...]
--   
-- -- Flows are data-parallel, which means operators like map_i apply -- to all streams in the bundle. The second stream has been converted to -- upper-case as well: -- --
--   > more 1 up
--   Just ["BROWNER","BROWNEST","BROWNIAN","BROWNIE","BROWNIE'S" ...]
--   
-- -- Lets write out the data to some files. There are two streams in the -- bundle, so open a file for each stream: -- --
--   > out <- toFiles ["out1.txt", "out2.txt"] sinkLines
--   
-- -- Note that the ws and up we used before were bundles -- of stream Sources whereas out is a bundle of stream -- Sinks. When we used the map_i operator before the -- _i (input) suffix indicates that this is a transformer of -- Sources. There is a related map_o (output) operator for -- Sinks. -- -- Now that we have a bundle of Sources, and some matching -- Sinks, we can drainS all of the data from the former -- into the latter. -- --
--   > F.drainS up out
--   
-- -- At this point we can run an external shell command to check the -- output. -- --
--   > :! head out1.txt
--   BEARSKIN'S
--   BEARSKINS
--   BEAST
--   BEAST'S
--   BEASTLIER
--   BEASTLIEST
--   BEASTLINESS
--   BEASTLINESS'S
--   BEASTLY
--   BEASTLY'S
--   
-- --

Performance

-- -- Althogh repa-flow can be used productively in the ghci REPL, -- performance won't be great because you will be running unspecialised, -- polymorphic code. For best results you should write a complete program -- and compile it with ghc -fllvm -O2 Main.hs. module Data.Repa.Flow -- | A bundle of stream sources, where the elements of the stream are -- chunked into arrays. type Sources a = Sources Int IO A a -- | A bundle of stream sinks, where the elements of the stream are chunked -- into arrays. type Sinks a = Sinks Int IO A a -- | Shorthand for common type classes. type Flow a = (Flow Int IO A a, Windowable A a) -- | Yield the number of streams in the bundle. sourcesArity :: Sources a -> Int -- | Yield the number of streams in the bundle. sinksArity :: Sinks a -> Int -- | Pull all available values from the sources and push them to the sinks. -- Streams in the bundle are processed sequentially, from first to last. -- -- drainS :: Sources a -> Sinks a -> IO () -- | Pull all available values from the sources and push them to the sinks, -- in parallel. We fork a thread for each of the streams and evaluate -- them all in parallel. -- -- drainP :: Sources a -> Sinks a -> IO () -- | Given an arity and a list of elements, yield sources that each produce -- all the elements. -- -- fromList :: Build a => Int -> [a] -> IO (Sources a) -- | Like fromList but take a list of lists. Each each of the inner -- lists is packed into a single chunk. fromLists :: Build a => Int -> [[a]] -> IO (Sources a) -- | Drain a single source from a bundle into a list of elements. -- -- toList1 :: Build a => Int -> Sources a -> IO [a] -- | Drain a single source from a bundle into a list of chunks. -- -- toLists1 :: Build a => Int -> Sources a -> IO [[a]] -- | Given an arity and an array of elements, yield sources that each -- produce all the elements. -- -- fromArray :: Build a => Int -> Array a -> IO (Sources a) -- | Like fromArray but take an array of arrays. Each of the inner -- arrays is packed into a single chunk. fromArrays :: (Elem a, Build a) => Int -> Array (Array a) -> IO (Sources a) -- | Drain a single source from a bundle into an array of elements. -- -- toArray1 :: (Elem a, Build a) => Int -> Sources a -> IO (Array a) -- | Drain a single source from a bundle into an array of elements. -- -- toArrays1 :: (Elem a, Build a) => Int -> Sources a -> IO (Array (Array a)) -- | Attach a finalizer to some sources. -- -- finalize_i :: (Int -> IO ()) -> Sources a -> IO (Sources a) -- | Attach a finalizer to some sinks. -- -- finalize_o :: (Int -> IO ()) -> Sinks a -> IO (Sinks a) -- | Segmented replicate. replicates_i :: (Flow (Int, a), Build a) => Sources (Int, a) -> IO (Sources a) -- | Apply a function to all elements pulled from some sources. map_i :: (Flow a, Build b) => (a -> b) -> Sources a -> IO (Sources b) -- | Apply a function to all elements pushed to some sinks. map_o :: (Flow a, Build b) => (a -> b) -> Sinks b -> IO (Sinks a) -- | Combine corresponding elements of two sources with the given function. zipWith_i :: (Flow a, Flow b, Build c) => (a -> b -> c) -> Sources a -> Sources b -> IO (Sources c) -- | Send the same data to two consumers. -- -- Given two argument sinks, yield a result sink. Pushing to the result -- sink causes the same element to be pushed to both argument sinks. dup_oo :: Sinks a -> Sinks a -> IO (Sinks a) -- | Send the same data to two consumers. -- -- Given an argument source and argument sink, yield a result source. -- Pulling an element from the result source pulls from the argument -- source, and pushes that element to the sink, as well as returning it -- via the result source. dup_io :: Sources a -> Sinks a -> IO (Sources a) -- | Send the same data to two consumers. -- -- Like dup_io but with the arguments flipped. dup_oi :: Sinks a -> Sources a -> IO (Sources a) -- | Connect an argument source to two result sources. -- -- Pulling from either result source pulls from the argument source. Each -- result source only gets the elements pulled at the time, so if one -- side pulls all the elements the other side won't get any. connect_i :: Sources a -> IO (Sources a, Sources a) -- | Hook a worker function to some sources, which will be passed every -- chunk that is pulled from each source. -- -- watch_i :: (Int -> Array A a -> IO ()) -> Sources a -> IO (Sources a) -- | Hook a worker function to some sinks, which will be passed every chunk -- that is pushed to each sink. -- -- watch_o :: (Int -> Array A a -> IO ()) -> Sinks a -> IO (Sinks a) -- | Create a bundle of sinks of the given arity that pass incoming chunks -- to a worker function. -- -- trigger_o :: Int -> (Int -> Array A a -> IO ()) -> IO (Sinks a) -- | Create a bundle of sinks of the given arity that drop all data on the -- floor. -- -- ignore_o :: Int -> IO (Sinks a) -- | Create a bundle of sinks of the given arity that drop all data on the -- floor. -- -- abandon_o :: Int -> IO (Sinks a) -- | Given a source index and a length, split the a list of that length -- from the front of the source. Yields a new source for the remaining -- elements. -- -- head_i :: Flow a => Int -> Int -> Sources a -> IO (Maybe ([a], Sources a)) -- | Concatenate a flow of arrays into a flow of the elements. concat_i :: (Flow a, Build a) => Sources (Array a) -> IO (Sources a) -- | Select a single column from a flow of rows of fields. select_i :: (Select n (Array fs), Select' n (Array fs) ~ Array (Select' n fs)) => Nat n -> Sources fs -> IO (Sources (Select' n fs)) -- | Select a single column from a flow of fields. select_o :: (Select n (Array fs), Select' n (Array fs) ~ Array (Select' n fs)) => Nat n -> Sinks (Select' n fs) -> IO (Sinks fs) -- | Discard a single column from a flow of fields. discard_i :: (Discard n (Array fs), Discard' n (Array fs) ~ Array (Discard' n fs)) => Nat n -> Sources fs -> IO (Sources (Discard' n fs)) -- | Discard a single column from a flow of fields. discard_o :: (Discard n (Array fs), Discard' n (Array fs) ~ Array (Discard' n fs)) => Nat n -> Sinks (Discard' n fs) -> IO (Sinks fs) -- | Mask columns from a flow of fields. mask_i :: (Mask ms (Array fs), Mask' ms (Array fs) ~ Array (Mask' ms fs)) => ms -> Sources fs -> IO (Sources (Mask' ms fs)) -- | Mask columns from a flow of fields. mask_o :: (Mask ms (Array fs), Mask' ms (Array fs) ~ Array (Mask' ms fs)) => ms -> Sinks (Mask' ms fs) -> IO (Sinks fs) -- | Scan through some sources to find runs of matching elements, and count -- the lengths of those runs. -- --
--   > F.toList1 0 =<< F.groups_i =<< F.fromList 1 "waabbbblle"
--   [('w',1),('a',2),('b',4),('l',2),('e',1)]
--   
groups_i :: (GroupsDict a, Eq a) => Sources a -> IO (Sources (a, Int)) -- | Like groupsBy, but take a function to determine whether two -- consecutive values should be in the same group. groupsBy_i :: GroupsDict a => (a -> a -> Bool) -> Sources a -> IO (Sources (a, Int)) -- | Dictionaries needed to perform a grouping. type GroupsDict a = GroupsDict Int IO A A A a -- | Fold all the elements of each stream in a bundle, one stream after the -- other, returning an array of fold results. foldlS :: (Flow b, Build a) => (a -> b -> a) -> a -> Sources b -> IO (Array A a) -- | Fold all the elements of each stream in a bundle, one stream after the -- other, returning an array of fold results. foldlAllS :: Flow b => (a -> b -> a) -> a -> Sources b -> IO a -- | Given streams of lengths and values, perform a segmented fold where -- fold segments of values of the corresponding lengths are folded -- together. -- --
--   > sSegs <- F.fromList 1 [('a', 1), ('b', 2), ('c', 4), ('d', 0), ('e', 1), ('f', 5 :: Int)]
--   > sVals <- F.fromList 1 [10, 20, 30, 40, 50, 60, 70, 80, 90 :: Int]
--   
--   > F.toList1 0 =<< F.folds_i (+) 0 sSegs sVals
--   [('a',10),('b',50),('c',220),('d',0),('e',80)]
--   
-- -- If not enough input elements are available to fold a complete segment -- then no output is produced for that segment. However, trailing zero -- length segments still produce the initial value for the fold. -- --
--   > sSegs <- F.fromList 1 [('a', 1), ('b', 2), ('c', 0), ('d', 0), ('e', 0 :: Int)]
--   > sVals <- F.fromList 1 [10, 20, 30 :: Int]
--   
--   > F.toList1 0 =<< F.folds_i (*) 1 sSegs sVals
--   [('a',10),('b',600),('c',1),('d',1),('e',1)]
--   
folds_i :: FoldsDict n a b => (a -> b -> b) -> b -> Sources (n, Int) -> Sources a -> IO (Sources (n, b)) -- | Dictionaries needed to perform a segmented fold. type FoldsDict n a b = FoldsDict Int IO A A A A n a b -- | Combination of groupsBy_i and folds_i. We determine the -- the segment lengths while performing the folds. -- -- Note that a SQL-like groupby aggregations can be performed using this -- function, provided the data is pre-sorted on the group key. For -- example, we can take the average of some groups of values: -- --
--   > sKeys <- F.fromList 1 "waaaabllle"
--   > sVals <- F.fromList 1 [10, 20, 30, 40, 50, 60, 70, 80, 90, 100 :: Double]
--   
--   > sResult <-  F.map_i (\(key, (acc, n)) -> (key, acc / n))
--             =<< F.foldGroupsBy_i (==) (\x (acc, n) -> (acc + x, n + 1)) (0, 0) sKeys sVals
--   
--   > F.toList1 0 sResult
--   [10.0,35.0,60.0,80.0,100.0]
--   
foldGroupsBy_i :: (FoldGroupsDict n a b) => (n -> n -> Bool) -> (a -> b -> b) -> b -> Sources n -> Sources a -> IO (Sources (n, b)) type FoldGroupsDict n a b = (BulkI A n, Material A a, Material A n, Material A b)