repa-flow-4.2.2.1: Data-parallel data flows.

Safe HaskellNone
LanguageHaskell98

Data.Repa.Flow.Generic

Contents

Description

Everything flows.

This module defines generic flows. The other flow types defined in Data.Repa.Flow.Chunked and Data.Repa.Flow.Simple are specialisations of this generic one.

Synopsis

Documentation

data Sources i m e Source

A bundle of stream sources, indexed by a value of type i, in some monad m, returning elements of type e.

Elements can be pulled from each stream in the bundle individually.

Constructors

Sources 

Fields

sourcesArity :: i

Number of sources in this bundle.

sourcesPull :: i -> (e -> m ()) -> m () -> m ()

Function to pull data from a bundle. Give it the index of the desired stream, a continuation that accepts an element, and a continuation to invoke when no more elements will ever be available.

data Sinks i m e Source

A bundle of stream sinks, indexed by a value of type i, in some monad m, returning elements of type e.

Elements can be pushed to each stream in the bundle individually.

Constructors

Sinks 

Fields

sinksArity :: i

Number of sources in the bundle.

sinksPush :: i -> e -> m ()

Push an element to one of the streams in the bundle.

sinksEject :: i -> m ()

Signal that no more elements will ever be available for this sink. It is ok to eject the same stream multiple times.

Stream State and Thread Safety

As most functions in this library produce IO actions, thread safety is not guaranteed by their types.

It is not safe to concurrently pull from the same stream of a Sources bundle, or concurrently push to the same stream of a Sinks bundle. Both Sources and Sinks may hold per-stream state information, and accessing the same stream concurrently may cause a race condition.

It is safe to concurrently push or pull from different streams of a bundle, as the state information for each stream is guaranteed to be separate. Any inter-stream communication is protected by appropriate locks.

Unless stated otherwise, any worker function passed to a flow operator may be invoked concurrently. For example, if you pass an IO action to trigger_o then that action may be invoked concurrently.

In practice, if you use just the bulk operators provided by this library then you won't have a problem. However, if you construct your own Sources or Sinks by providing raw push, pull and eject functions then you must obey the above rules.

Evaluation

drainS :: (Next i, Monad m) => Sources i m a -> Sinks i m a -> m () Source

Pull all available values from the sources and push them to the sinks. Streams in the bundle are processed sequentially, from first to last.

  • If the Sources and Sinks have different numbers of streams then we only evaluate the common subset.

drainP :: Sources Int IO a -> Sinks Int IO a -> IO () Source

Pull all available values from the sources and push them to the sinks, in parallel. We fork a thread for each of the streams and evaluate them all in parallel.

  • If the Sources and Sinks have different numbers of streams then we only evaluate the common subset.

consumeS :: (Next i, Monad m) => (i -> a -> m ()) -> Sources i m a -> m () Source

Pull all available values from the sources and pass them to the given action.

Conversion

fromList :: States i m => i -> [a] -> m (Sources i m a) Source

Given an arity and a list of elements, yield sources that each produce all the elements.

toList1 :: States i m => i -> Sources i m a -> m [a] Source

Drain a single source into a list.

takeList1 :: States i m => Int -> i -> Sources i m a -> m [a] Source

Drain the given number of elements from a single source into a list.

pushList :: Monad m => [(i, a)] -> Sinks i m a -> m () Source

Push elements into the associated streams of a bundle of sinks.

pushList1 :: Monad m => i -> [a] -> Sinks i m a -> m () Source

Push the elements of a list into the given stream of a bundle of sinks.

Stream Indices

mapIndex_i :: Monad m => (i1 -> i2) -> (i2 -> i1) -> Sources i1 m a -> m (Sources i2 m a) Source

Transform the stream indexes of a bundle of sources.

The given transform functions should be inverses of each other, else you'll get a confusing result.

mapIndex_o :: Monad m => (i1 -> i2) -> (i2 -> i1) -> Sinks i1 m a -> m (Sinks i2 m a) Source

Transform the stream indexes of a bundle of sinks.

The given transform functions should be inverses of each other, else you'll get a confusing result.

flipIndex2_i :: Monad m => Sources SH2 m a -> m (Sources SH2 m a) Source

For a bundle of sources with a 2-d stream index, flip the components of the index.

flipIndex2_o :: Monad m => Sinks SH2 m a -> m (Sinks SH2 m a) Source

For a bundle of sinks with a 2-d stream index, flip the components of the index.

Finalizers

finalize_i :: States i m => (i -> m ()) -> Sources i m a -> m (Sources i m a) Source

Attach a finalizer to bundle of sources.

For each stream in the bundle, the finalizer will be called the first time a consumer of that stream tries to pull an element when no more are available.

The provided finalizer will be run after any finalizers already attached to the source.

finalize_o :: States i m => (i -> m ()) -> Sinks i m a -> m (Sinks i m a) Source

Attach a finalizer to a bundle of sinks.

For each stream in the bundle, the finalizer will be called the first time that stream is ejected.

The provided finalizer will be run after any finalizers already attached to the sink.

Flow Operators

Projection

project_i :: Monad m => i -> Sources i m a -> m (Sources () m a) Source

Project out a single stream source from a bundle.

project_o :: Monad m => i -> Sinks i m a -> m (Sinks () m a) Source

Project out a single stream sink from a bundle.

Constructors

repeat_i :: Monad m => i -> (i -> a) -> m (Sources i m a) Source

Yield sources that always produce the same value.

replicate_i :: States i m => i -> Int -> (i -> a) -> m (Sources i m a) Source

Yield sources of the given length that always produce the same value.

prepend_i :: States i m => [a] -> Sources i m a -> m (Sources i m a) Source

Prepend some more elements into the front of some sources.

prependOn_i :: States i m => (i -> Bool) -> [a] -> Sources i m a -> m (Sources i m a) Source

Like prepend_i but only prepend the elements to the streams that match the given predicate.

Mapping

map_i :: Monad m => (a -> b) -> Sources i m a -> m (Sources i m b) Source

Apply a function to every element pulled from some sources, producing some new sources.

map_o :: Monad m => (a -> b) -> Sinks i m b -> m (Sinks i m a) Source

Apply a function to every element pulled from some sources, producing some new sources.

smap_i :: Monad m => (i -> a -> b) -> Sources i m a -> m (Sources i m b) Source

Like map_i, but the worker function is also given the stream index.

smap_o :: Monad m => (i -> a -> b) -> Sinks i m b -> m (Sinks i m a) Source

Like map_o, but the worker function is also given the stream index.

szipWith_ii :: (Ord i, Monad m) => (i -> a -> b -> c) -> Sources i m a -> Sources i m b -> m (Sources i m c) Source

Combine the elements of two flows with the given function. The worker function is also given the stream index.

szipWith_io :: (Ord i, Monad m) => (i -> a -> b -> c) -> Sinks i m c -> Sources i m a -> m (Sinks i m b) Source

Like szipWith_ii, but take a bundle of Sinks for the result elements, and yield a bundle of Sinks to accept the b elements.

szipWith_oi :: (Ord i, Monad m) => (i -> a -> b -> c) -> Sinks i m c -> Sources i m b -> m (Sinks i m a) Source

Like szipWith_ii, but take a bundle of Sinks for the result elements, and yield a bundle of Sinks to accept the a elements.

Processing

compact_i :: (Monad m, States i m) => (s -> a -> (s, Maybe b)) -> s -> Sources i m a -> m (Sources i m b) Source

Combination of fold and filter.

We walk over the stream start-to-end, maintaining an accumulator. At each point we can chose to emit an element, or not.

scan_i :: (Monad m, States i m) => (s -> a -> s) -> s -> Sources i m a -> m (Sources i m s) Source

Start-to-end scan over each stream in a bundle.

indexed_i :: (Monad m, States i m) => Sources i m a -> m (Sources i m (Int, a)) Source

For each stream in a bundle of sources, associated the element with their corresponding position in the stream.

Connecting

dup_oo :: (Ord i, States i m) => Sinks i m a -> Sinks i m a -> m (Sinks i m a) Source

Send the same data to two consumers.

Given two argument sinks, yield a result sink. Pushing to the result sink causes the same element to be pushed to both argument sinks.

dup_io :: (Ord i, Monad m) => Sources i m a -> Sinks i m a -> m (Sources i m a) Source

Send the same data to two consumers.

Given an argument source and argument sink, yield a result source. Pulling an element from the result source pulls from the argument source, and pushes that element to the sink, as well as returning it via the result source.

dup_oi :: (Ord i, Monad m) => Sinks i m a -> Sources i m a -> m (Sources i m a) Source

Send the same data to two consumers.

Like dup_io but with the arguments flipped.

connect_i :: States i m => Sources i m a -> m (Sources i m a, Sources i m a) Source

Connect an argument source to two result sources.

Pulling from either result source pulls from the argument source. Each result source only gets the elements pulled at the time, so if one side pulls all the elements the other side won't get any.

funnel_i :: (States i m, States () m) => Sources i m a -> m (Sources () m a) Source

Given a bundle of sources containing several streams, produce a new bundle containing a single stream that gets data from the former.

Streams from the source are consumed in their natural order, and a complete stream is consumed before moving onto the next one.

> import Data.Repa.Flow.Generic
> toList1 () =<< funnel_i =<< fromList (3 :: Int) [11, 22, 33]
[11,22,33,11,22,33,11,22,33]

funnel_o :: States i m => i -> Sinks () m a -> m (Sinks i m a) Source

Given a bundle of sinks consisting of a single stream, produce a new bundle of the given arity that sends all data to the former, ignoring the stream index.

The argument stream is ejected only when all of the streams in the result bundle have been ejected.

  • Using this function in conjunction with parallel operators like drainP introduces non-determinism. Elements pushed to different streams in the result bundle could enter the single stream in the argument bundle in any order.
> import Data.Repa.Flow.Generic
> import Data.Repa.Array.Material
> import Data.Repa.Nice
 
> let things = [(0 :: Int, "foo"), (1, "bar"), (2, "baz")]
> result <- capture_o B () (\k -> funnel_o 4 k >>= pushList things)
> nice result
[((),"foo"),((),"bar"),((),"baz")]

Splitting

head_i :: States i m => Int -> Sources i m a -> i -> m ([a], Sources i m a) Source

Split the given number of elements from the head of a source returning those elements in a list, and yielding a new source for the rest.

Grouping

groups_i :: (Ord i, Monad m, Eq a) => Sources i m a -> m (Sources i m Int) Source

From a stream of values which has consecutive runs of idential values, produce a stream of the lengths of these runs.

Example: groups [4, 4, 4, 3, 3, 1, 1, 1, 4] = [3, 2, 3, 1]

Packing

pack_ii :: (Ord i, Monad m) => Sources i m Bool -> Sources i m a -> m (Sources i m a) Source

Given a stream of flags and a stream of values, produce a new stream of values where the corresponding flag was True. The length of the result is the length of the shorter of the two inputs.

Folding

folds_ii :: (Ord i, Monad m) => (a -> a -> a) -> a -> Sources i m Int -> Sources i m a -> m (Sources i m a) Source

Segmented fold.

Watching

watch_i :: Monad m => (i -> a -> m ()) -> Sources i m a -> m (Sources i m a) Source

Apply a monadic function to every element pulled from some sources, producing some new sources.

watch_o :: Monad m => (i -> a -> m ()) -> Sinks i m a -> m (Sinks i m a) Source

Pass elements to the provided action as they are pushed into the sink.

trigger_o :: Monad m => i -> (i -> a -> m ()) -> m (Sinks i m a) Source

Like watch_o but doesn't pass elements to another sink.

Capturing

capture_o Source

Arguments

:: (Target lDst (i, a), Index lDst ~ Int) 
=> Name lDst

Name of desination layout.

-> i

Arity of result bundle.

-> (Sinks i IO a -> IO ())

Function to push data into the sinks.

-> IO (Array lDst (i, a)) 

Create a bundle of sinks of the given arity and capture any data pushed to it.

> import Data.Repa.Flow.Generic
> import Data.Repa.Array.Material
> import Data.Repa.Nice
> import Control.Monad
> liftM nice $ capture_o B 4 (k -> pushList [(0 :: Int, "foo"), (1, "bar"), (0, "baz")] k)
> [(0,"foo"),(1,"bar"),(0,"baz")]

rcapture_o Source

Arguments

:: (Target lDst (i, a), Index lDst ~ Int) 
=> Name lDst

Name of desination layout.

-> i

Arity of result bundle.

-> (Sinks i IO a -> IO b)

Function to push data into the sinks.

-> IO (Array lDst (i, a), b) 

Like capture_o but also return the r-esult of the push function.

Ignorance

ignore_o :: Monad m => i -> m (Sinks i m a) Source

A sink that ignores all incoming data.

  • This sink is strict in the elements, so they are demanded before being discarded.
  • Haskell debugging thunks attached to the elements will be demanded.

abandon_o :: Monad m => i -> m (Sinks i m a) Source

A sink that drops all data on the floor.

  • This sink is non-strict in the elements.
  • Haskell tracing thunks attached to the elements will *not* be demanded.

Tracing

trace_o :: (Show i, Show a, Monad m) => i -> m (Sinks i m a) Source

Use the trace function from Debug.Trace to print each element that is pushed to a sink.

  • This function is intended for debugging only, and is not intended for production code.

Vector Flow Operators

1-dimensional distribution

distribute_o Source

Arguments

:: BulkI l a 
=> (Int -> a -> IO ())

Spill action, given the spilled element along with its index in the array.

-> Sinks Int IO a

Sinks to push elements into.

-> IO (Sinks () IO (Array l a)) 

Given a bundle of sinks indexed by an Int, produce a result sink for arrays.

Each time an array is pushed to the result sink, its elements are pushed to the corresponding streams of the argument sink. If there are more elements than sinks then then give them to the spill action.

  |          ..             |
  | [w0,  x0,  y0,  z0]     |   :: Sinks () IO (Array l a)
  | [w1,  x1,  y1,  z1, u1] |     (sink for a single stream of arrays)
  |          ..             |

     |    |    |    |    |
     v    v    v    v    .------> spilled

   | .. | .. | .. | .. |
   | w0 | x0 | y0 | z0 |        :: Sinks Int IO a
   | w1 | x1 | y1 | z1 |          (sink for several streams of elements)
   | .. | .. | .. | .. |

ddistribute_o :: BulkI l a => Sinks Int IO a -> IO (Sinks () IO (Array l a)) Source

Like distribute_o, but drop spilled elements on the floor.

2-dimensional distribution

distribute2_o Source

Arguments

:: BulkI l a 
=> (SH2 -> a -> IO ())

Spill action, given the spilled element along with its index in the array.

-> Sinks SH2 IO a

Sinks to push elements into.

-> IO (Sinks Int IO (Array l a)) 

Like distribute_o, but with 2-d stream indexes.

Given the argument and result sinks, when pushing to the result the stream index is used as the first component for the argument sink, and the index of the element in its array is used as the second component.

If you want to the components of stream index the other way around then apply flipIndex2_o to the argument sinks.

ddistribute2_o :: BulkI l a => Sinks SH2 IO a -> IO (Sinks Int IO (Array l a)) Source

Like distribute2_o, but drop spilled elements on the floor.

Shuffling

shuffle_o Source

Arguments

:: (BulkI lDst a, BulkI lSrc (Int, a), Windowable lDst a, Target lDst a, Elt a) 
=> Name lSrc

Name of source layout.

-> (Int -> Array lDst a -> IO ())

Handle spilled elements.

-> Sinks Int IO (Array lDst a)

Sinks to push results to.

-> IO (Sinks () IO (Array lSrc (Int, a))) 

Given a bundle of argument sinks, produce a result sink. Arrays of indices and elements are pushed to the result sink. On doing so, the elements are pushed into the corresponding streams of the argument sinks.

If the index associated with an element does not have a corresponding stream in the argument sinks, then pass it to the provided spill function.

 |                      ..                         |
 | [(0, v0), (1, v1), (0, v2), (0, v3), (2, v4)]   |  :: Sources Int IO (Array l (Int, a))
 |                      ..                         |
         \       \                          |
          \       .------------.            |
           v                   v            .---------> spilled

      |       ..       |       ..       |
      |  [v0, v2, v3]  |      [v1]      |             :: Sinks Int IO (Array l a)
      |       ..       |       ..       | 

The following example uses capture_o to demonstrate how the shuffle_o operator can be used as one step of a bucket-sort. We start with two arrays of key-value pairs. In the result, the values from each block that had the same key are packed into the same tuple (bucket).

> import Data.Repa.Flow.Generic    as G
> import Data.Repa.Array           as A
> import Data.Repa.Array.Material  as A
> import Data.Repa.Nice

> let arr1 = A.fromList B [(0, 'a'), (1, 'b'), (2, 'c'), (0, 'd'), (0, 'c')]
> let arr2 = A.fromList B [(0, 'A'), (3, 'B'), (3, 'C')]
> result :: Array B (Int, Array U Char) 
>        <- capture_o B 4 (\k ->  shuffle_o B (error "spilled") k  
>                             >>= pushList1 () [arr1, arr2]) 

> nice result
[(0,"adc"),(1,"b"),(2,"c"),(0,"A"),(3,"BC")]

dshuffle_o Source

Arguments

:: (BulkI lDst a, BulkI lSrc (Int, a), Windowable lDst a, Target lDst a, Elt a) 
=> Name lSrc

Name of source layout.

-> Sinks Int IO (Array lDst a)

Sinks to push results to.

-> IO (Sinks () IO (Array lSrc (Int, a))) 

Like shuffle_o, but drop spilled elements on the floor.

dshuffleBy_o Source

Arguments

:: (BulkI lDst a, BulkI lSrc a, Windowable lDst a, Target lDst a, Elt a) 
=> Name lSrc

Name of source layout.

-> (a -> Int)

Get the stream number for an element.

-> Sinks Int IO (Array lDst a)

Sinks to push results to.

-> IO (Sinks () IO (Array lSrc a)) 

Like dshuffle_o, but use the given function to decide which stream of the argument bundle each element should be pushed into.

> import Data.Repa.Flow.Generic   as G
> import Data.Repa.Array          as A
> import Data.Repa.Array.Material as A
> import Data.Repa.Nice
> import Data.Char
 
> let arr1 = A.fromList B "FooBAr"
> let arr2 = A.fromList B "BazLIKE"
> result :: Array B (Int, Array U Char) 
         <- capture_o B 2 (\k ->  dshuffleBy_o B (\x -> if isUpper x then 0 else 1) k 
                              >>= pushList1 () [arr1, arr2])
> nice result
[(0,"FBA"),(1,"oor"),(0,"BLIKE"),(1,"az")]

Chunking

chunkOn_i Source

Arguments

:: (States i IO, TargetI lDst a) 
=> Name lDst

Layout for result chunks.

-> Int

Maximum chunk length.

-> (a -> Bool)

Detect the last element in a chunk.

-> Sources i IO a

Element sources.

-> IO (Sources i IO (Array lDst a))

Chunk sources.

Take elements from a flow and pack them into chunks. The chunks are limited to the given maximum length. A predicate can also be supplied to detect the last element in a chunk.

unchunk_i Source

Arguments

:: (BulkI l a, States i IO) 
=> Sources i IO (Array l a)

Chunk sources.

-> IO (Sources i IO a)

Element sources.

Take a flow of chunks and flatten it into a flow of the individual elements.