Copyright | (c) Adam Conner-Sax 2019 |
---|---|

License | BSD-3-Clause |

Maintainer | adam_conner_sax@yahoo.com |

Stability | experimental |

Safe Haskell | None |

Language | Haskell2010 |

map-reduce engine (fold builder) using `Streamly`

streams as its intermediate and return type.

Notes:
1. These are polymorphic in the return stream type. Thought the streams do have to be `serial`

when `groupBy`

is called
So you have to specify the stream type in the call or it has to be inferrable from the use of the result.

- There is a concurrent engine here, one that uses Streamly's concurrency features to map over the stream. I've not been able to verify that this is faster on an appropriate task with appropriate runtime settings.

## Synopsis

- streamlyEngine :: (Foldable g, Functor g) => (forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)) -> MapReduceFold y k c (SerialT Identity) x d
- streamlyEngineM :: (IsStream t, Monad m, MonadAsync m, Traversable g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFoldM m y k c (t m) x d
- concurrentStreamlyEngine :: forall tIn tOut m g y k c x d. (IsStream tIn, IsStream tOut, MonadAsync m, Foldable g, Functor g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFold y k c (tOut m) x d
- toStreamlyFold :: Monad m => Fold a b -> Fold m a b
- toStreamlyFoldM :: FoldM m a b -> Fold m a b
- resultToList :: (Monad m, IsStream t) => t m a -> m [a]
- concatStream :: (Monad m, Monoid a) => SerialT m a -> m a
- concatStreamFold :: Monoid b => Fold a (SerialT Identity b) -> Fold a b
- concatStreamFoldM :: (Monad m, Monoid b, IsStream t) => FoldM m a (t m b) -> FoldM m a b
- concatConcurrentStreamFold :: (Monad m, Monoid b, IsStream t) => Fold a (t m b) -> FoldM m a b
- groupByHashableKey :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- groupByOrderedKey :: (Monad m, Ord k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- groupByHashableKeyST :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- groupByDiscriminatedKey :: (Monad m, Grouping k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- data SerialT (m :: Type -> Type) a
- data WSerialT (m :: Type -> Type) a
- data AheadT (m :: Type -> Type) a
- data AsyncT (m :: Type -> Type) a
- data WAsyncT (m :: Type -> Type) a
- data ParallelT (m :: Type -> Type) a
- type MonadAsync (m :: Type -> Type) = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
- class (forall (m :: Type -> Type) a. MonadAsync m => Semigroup (t m a), forall (m :: Type -> Type) a. MonadAsync m => Monoid (t m a), forall (m :: Type -> Type). Monad m => Functor (t m), forall (m :: Type -> Type). MonadAsync m => Applicative (t m)) => IsStream (t :: (Type -> Type) -> Type -> Type)

# Engines

streamlyEngine :: (Foldable g, Functor g) => (forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)) -> MapReduceFold y k c (SerialT Identity) x d Source #

map-reduce-fold builder returning a `SerialT Identity d`

result

streamlyEngineM :: (IsStream t, Monad m, MonadAsync m, Traversable g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFoldM m y k c (t m) x d Source #

effectful map-reduce-fold engine returning a (Istream t => t m d) result The MonadAsync constraint here more or less requires us to run in IO, or something IO like.

concurrentStreamlyEngine :: forall tIn tOut m g y k c x d. (IsStream tIn, IsStream tOut, MonadAsync m, Foldable g, Functor g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFold y k c (tOut m) x d Source #

possibly (depending on chosen stream types) concurrent map-reduce-fold builder returning an `(Istream t, MonadAsync m) => t m d`

result

# Streamly Combinators

toStreamlyFold :: Monad m => Fold a b -> Fold m a b Source #

convert a Control.Foldl Fold into a Streamly.Data.Fold fold

toStreamlyFoldM :: FoldM m a b -> Fold m a b Source #

convert a Control.Foldl FoldM into a Streamly.Data.Fold fold

# Result Extraction

resultToList :: (Monad m, IsStream t) => t m a -> m [a] Source #

make a stream into an (effectful) `[]`

concatStreamFold :: Monoid b => Fold a (SerialT Identity b) -> Fold a b Source #

mappend everything in a pure Streamly fold

concatStreamFoldM :: (Monad m, Monoid b, IsStream t) => FoldM m a (t m b) -> FoldM m a b Source #

mappend everything in an effectful Streamly fold.

concatConcurrentStreamFold :: (Monad m, Monoid b, IsStream t) => Fold a (t m b) -> FoldM m a b Source #

mappend everything in a concurrent Streamly fold.

`groupBy`

Functions

groupByHashableKey :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #

Group streamly stream of `(k,c)`

by `hashable`

key.
NB: this function uses the fact that `SerialT m`

is a monad

groupByOrderedKey :: (Monad m, Ord k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #

Group streamly stream of `(k,c)`

by ordered key.
NB: this function uses the fact that `SerialT m`

is a monad

groupByHashableKeyST :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #

Group streamly stream of `(k,c)`

by `hashable`

key. Uses mutable hashtables running in the ST monad.
NB: this function uses the fact that `SerialT m`

is a monad

groupByDiscriminatedKey :: (Monad m, Grouping k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #

Group streamly stream of `(k,c)`

by key with instance of Grouping from http://hackage.haskell.org/package/discrimination.
NB: this function uses the fact that `SerialT m`

is a monad

# Re-Exports

data SerialT (m :: Type -> Type) a #

The `Semigroup`

operation for `SerialT`

behaves like a regular append
operation. Therefore, when `a <> b`

is evaluated, stream `a`

is evaluated
first until it exhausts and then stream `b`

is evaluated. In other words,
the elements of stream `b`

are appended to the elements of stream `a`

. This
operation can be used to fold an infinite lazy container of streams.

```
import Streamly
import qualified Streamly.Prelude as S
main = (S.toList .
````serially`

$ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print

[1,2,3,4]

The `Monad`

instance runs the *monadic continuation* for each
element of the stream, serially.

`main = S.drain . ``serially`

$ do
x <- return 1 <> return 2
S.yieldM $ print x

1 2

`SerialT`

nests streams serially in a depth first manner.

`main = S.drain . ``serially`

$ do
x <- return 1 <> return 2
y <- return 3 <> return 4
S.yieldM $ print (x, y)

(1,3) (1,4) (2,3) (2,4)

We call the monadic code being run for each element of the stream a monadic
continuation. In imperative paradigm we can think of this composition as
nested `for`

loops and the monadic continuation is the body of the loop. The
loop iterates for all elements of the stream.

Note that the behavior and semantics of `SerialT`

, including `Semigroup`

and `Monad`

instances are exactly like Haskell lists except that `SerialT`

can contain effectful actions while lists are pure.

In the code above, the `serially`

combinator can be omitted as the default
stream type is `SerialT`

.

*Since: streamly-0.2.0*

## Instances

data WSerialT (m :: Type -> Type) a #

The `Semigroup`

operation for `WSerialT`

interleaves the elements from the
two streams. Therefore, when `a <> b`

is evaluated, stream `a`

is evaluated
first to produce the first element of the combined stream and then stream
`b`

is evaluated to produce the next element of the combined stream, and
then we go back to evaluating stream `a`

and so on. In other words, the
elements of stream `a`

are interleaved with the elements of stream `b`

.

Note that when multiple actions are combined like `a <> b <> c ... <> z`

we
interleave them in a binary fashion i.e. `a`

and `b`

are interleaved with
each other and the result is interleaved with `c`

and so on. This will not
act as a true round-robin scheduling across all the streams. Note that this
operation cannot be used to fold a container of infinite streams as the
state that it needs to maintain is proportional to the number of streams.

```
import Streamly
import qualified Streamly.Prelude as S
main = (S.toList .
````wSerially`

$ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print

[1,3,2,4]

Similarly, the `Monad`

instance interleaves the iterations of the
inner and the outer loop, nesting loops in a breadth first manner.

`main = S.drain . ``wSerially`

$ do
x <- return 1 <> return 2
y <- return 3 <> return 4
S.yieldM $ print (x, y)

(1,3) (2,3) (1,4) (2,4)

*Since: streamly-0.2.0*

## Instances

data AheadT (m :: Type -> Type) a #

The `Semigroup`

operation for `AheadT`

appends two streams. The combined
stream behaves like a single stream with the actions from the second stream
appended to the first stream. The combined stream is evaluated in the
speculative style. This operation can be used to fold an infinite lazy
container of streams.

import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main = do xs <- S.`toList`

.`aheadly`

$ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil) print xs where p n = threadDelay 1000000 >> return n

[1,2,3,4]

Any exceptions generated by a constituent stream are propagated to the output stream.

The monad instance of `AheadT`

may run each monadic continuation (bind)
concurrently in a speculative manner, performing side effects in a partially
ordered manner but producing the outputs in an ordered manner like
`SerialT`

.

`main = S.drain . ``aheadly`

$ do
n <- return 3 <> return 2 <> return 1
S.yieldM $ do
threadDelay (n * 1000000)
myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)

ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3

*Since: streamly-0.3.0*

## Instances

MonadTrans AheadT | |

Defined in Streamly.Streams.Ahead | |

IsStream AheadT | |

Defined in Streamly.Streams.Ahead toStream :: AheadT m a -> Stream m a fromStream :: Stream m a -> AheadT m a consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a # (|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a # | |

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) | |

Defined in Streamly.Streams.Ahead | |

(MonadState s m, MonadAsync m) => MonadState s (AheadT m) | |

(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) | |

MonadAsync m => Monad (AheadT m) | |

Monad m => Functor (AheadT m) | |

(Monad m, MonadAsync m) => Applicative (AheadT m) | |

(MonadIO m, MonadAsync m) => MonadIO (AheadT m) | |

Defined in Streamly.Streams.Ahead | |

(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) | |

Defined in Streamly.Streams.Ahead | |

MonadAsync m => Semigroup (AheadT m a) | |

MonadAsync m => Monoid (AheadT m a) | |

data AsyncT (m :: Type -> Type) a #

The `Semigroup`

operation for `AsyncT`

appends two streams. The combined
stream behaves like a single stream with the actions from the second stream
appended to the first stream. The combined stream is evaluated in the
asynchronous style. This operation can be used to fold an infinite lazy
container of streams.

```
import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent
main = (S.toList .
````asyncly`

$ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print

[1,2,3,4]

Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.

Similarly, the monad instance of `AsyncT`

*may* run each iteration
concurrently based on demand. More concurrent iterations are started only
if the previous iterations are not able to produce enough output for the
consumer.

main =`drain`

.`asyncly`

$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)

ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3

*Since: streamly-0.1.0*

## Instances

MonadTrans AsyncT | |

Defined in Streamly.Streams.Async | |

IsStream AsyncT | |

Defined in Streamly.Streams.Async toStream :: AsyncT m a -> Stream m a fromStream :: Stream m a -> AsyncT m a consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a # (|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a # | |

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) | |

Defined in Streamly.Streams.Async | |

(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) | |

(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) | |

MonadAsync m => Monad (AsyncT m) | |

Monad m => Functor (AsyncT m) | |

(Monad m, MonadAsync m) => Applicative (AsyncT m) | |

(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) | |

Defined in Streamly.Streams.Async | |

(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) | |

Defined in Streamly.Streams.Async | |

MonadAsync m => Semigroup (AsyncT m a) | |

MonadAsync m => Monoid (AsyncT m a) | |

data WAsyncT (m :: Type -> Type) a #

The `Semigroup`

operation for `WAsyncT`

interleaves the elements from the
two streams. Therefore, when `a <> b`

is evaluated, one action is picked
from stream `a`

for evaluation and then the next action is picked from
stream `b`

and then the next action is again picked from stream `a`

, going
around in a round-robin fashion. Many such actions are executed concurrently
depending on `maxThreads`

and `maxBuffer`

limits. Results are served to the
consumer in the order completion of the actions.

Note that when multiple actions are combined like `a <> b <> c ... <> z`

we
go in a round-robin fasion across all of them picking one action from each
up to `z`

and then come back to `a`

. Note that this operation cannot be
used to fold a container of infinite streams as the state that it needs to
maintain is proportional to the number of streams.

```
import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent
main = (S.toList .
````wAsyncly`

$ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print

[1,3,2,4]

Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.

Similarly, the `Monad`

instance of `WAsyncT`

runs *all* iterations fairly
concurrently using a round robin scheduling.

main =`drain`

.`wAsyncly`

$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)

ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3

*Since: streamly-0.2.0*

## Instances

MonadTrans WAsyncT | |

Defined in Streamly.Streams.Async | |

IsStream WAsyncT | |

Defined in Streamly.Streams.Async toStream :: WAsyncT m a -> Stream m a fromStream :: Stream m a -> WAsyncT m a consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a # (|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a # | |

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) | |

Defined in Streamly.Streams.Async | |

(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) | |

(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) | |

MonadAsync m => Monad (WAsyncT m) | |

Monad m => Functor (WAsyncT m) | |

(Monad m, MonadAsync m) => Applicative (WAsyncT m) | |

(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) | |

Defined in Streamly.Streams.Async | |

(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) | |

Defined in Streamly.Streams.Async | |

MonadAsync m => Semigroup (WAsyncT m a) | |

MonadAsync m => Monoid (WAsyncT m a) | |

data ParallelT (m :: Type -> Type) a #

Async composition with strict concurrent execution of all streams.

The `Semigroup`

instance of `ParallelT`

executes both the streams
concurrently without any delay or without waiting for the consumer demand
and *merges* the results as they arrive. If the consumer does not consume
the results, they are buffered upto a configured maximum, controlled by the
`maxBuffer`

primitive. If the buffer becomes full the concurrent tasks will
block until there is space in the buffer.

Both `WAsyncT`

and `ParallelT`

, evaluate the constituent streams fairly in a
round robin fashion. The key difference is that `WAsyncT`

might wait for the
consumer demand before it executes the tasks whereas `ParallelT`

starts
executing all the tasks immediately without waiting for the consumer demand.
For `WAsyncT`

the `maxThreads`

limit applies whereas for `ParallelT`

it does
not apply. In other words, `WAsyncT`

can be lazy whereas `ParallelT`

is
strict.

`ParallelT`

is useful for cases when the streams are required to be
evaluated simultaneously irrespective of how the consumer consumes them e.g.
when we want to race two tasks and want to start both strictly at the same
time or if we have timers in the parallel tasks and our results depend on
the timers being started at the same time. If we do not have such
requirements then `AsyncT`

or `AheadT`

are recommended as they can be more
efficient than `ParallelT`

.

main = (`toList`

.`parallely`

$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print

[1,3,2,4]

When streams with more than one element are merged, it yields whichever
stream yields first without any bias, unlike the `Async`

style streams.

Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.

Similarly, the `Monad`

instance of `ParallelT`

runs *all* iterations
of the loop concurrently.

import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main =`drain`

.`parallely`

$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)

ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3

Note that parallel composition can only combine a finite number of streams as it needs to retain state for each unfinished stream.

*Since: 0.7.0 (maxBuffer applies to ParallelT streams)*

*Since: 0.1.0*

## Instances

MonadTrans ParallelT | |

Defined in Streamly.Streams.Parallel | |

IsStream ParallelT | |

Defined in Streamly.Streams.Parallel toStream :: ParallelT m a -> Stream m a fromStream :: Stream m a -> ParallelT m a consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a # (|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a # | |

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) | |

Defined in Streamly.Streams.Parallel | |

(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) | |

(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) | |

MonadAsync m => Monad (ParallelT m) | |

Monad m => Functor (ParallelT m) | |

(Monad m, MonadAsync m) => Applicative (ParallelT m) | |

Defined in Streamly.Streams.Parallel | |

(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) | |

Defined in Streamly.Streams.Parallel | |

(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) | |

Defined in Streamly.Streams.Parallel | |

MonadAsync m => Semigroup (ParallelT m a) | |

MonadAsync m => Monoid (ParallelT m a) | |

type MonadAsync (m :: Type -> Type) = (MonadIO m, MonadBaseControl IO m, MonadThrow m) #

A monad that can perform concurrent or parallel IO operations. Streams
that can be composed concurrently require the underlying monad to be
`MonadAsync`

.

*Since: streamly-0.1.0*

class (forall (m :: Type -> Type) a. MonadAsync m => Semigroup (t m a), forall (m :: Type -> Type) a. MonadAsync m => Monoid (t m a), forall (m :: Type -> Type). Monad m => Functor (t m), forall (m :: Type -> Type). MonadAsync m => Applicative (t m)) => IsStream (t :: (Type -> Type) -> Type -> Type) #

Class of types that can represent a stream of elements of some type `a`

in
some monad `m`

.

*Since: streamly-0.2.0*