| Copyright | (c) Adam Conner-Sax 2019 |
|---|---|
| License | BSD-3-Clause |
| Maintainer | adam_conner_sax@yahoo.com |
| Stability | experimental |
| Safe Haskell | Safe-Inferred |
| Language | Haskell2010 |
Control.MapReduce.Engines.Streamly
Description
map-reduce engine (fold builder) using Streamly streams as its intermediate and return type.
Notes:
1. These are polymorphic in the return stream type. Thought the streams do have to be serial when groupBy is called
So you have to specify the stream type in the call or it has to be inferrable from the use of the result.
- There is a concurrent engine here, one that uses Streamly's concurrency features to map over the stream. I've not been able to verify that this is faster on an appropriate task with appropriate runtime settings.
Synopsis
- streamlyEngine :: (Foldable g, Functor g) => (forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)) -> MapReduceFold y k c (SerialT Identity) x d
- streamlyEngineM :: (IsStream t, Monad m, MonadAsync m, Traversable g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFoldM m y k c (t m) x d
- concurrentStreamlyEngine :: forall tIn tOut m g y k c x d. (IsStream tIn, IsStream tOut, MonadAsync m, Foldable g, Functor g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFold y k c (tOut m) x d
- toStreamlyFold :: Monad m => Fold a b -> Fold m a b
- toStreamlyFoldM :: Functor m => FoldM m a b -> Fold m a b
- resultToList :: (Monad m, IsStream t) => t m a -> m [a]
- concatStream :: (Monad m, Monoid a) => SerialT m a -> m a
- concatStreamFold :: Monoid b => Fold a (SerialT Identity b) -> Fold a b
- concatStreamFoldM :: (Monad m, Monoid b, IsStream t) => FoldM m a (t m b) -> FoldM m a b
- concatConcurrentStreamFold :: (Monad m, Monoid b, IsStream t) => Fold a (t m b) -> FoldM m a b
- groupByHashableKey :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- groupByOrderedKey :: (Monad m, Ord k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- groupByHashableKeyST :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- groupByDiscriminatedKey :: (Monad m, Grouping k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- data SerialT (m :: Type -> TYPE LiftedRep) a
- data WSerialT (m :: Type -> TYPE LiftedRep) a
- data AheadT (m :: Type -> TYPE LiftedRep) a
- data AsyncT (m :: Type -> TYPE LiftedRep) a
- data WAsyncT (m :: Type -> TYPE LiftedRep) a
- data ParallelT (m :: Type -> TYPE LiftedRep) a
- type MonadAsync (m :: Type -> Type) = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
- class (forall (m :: Type -> Type) a. MonadAsync m => Semigroup (t m a), forall (m :: Type -> Type) a. MonadAsync m => Monoid (t m a), forall (m :: Type -> Type). Monad m => Functor (t m), forall (m :: Type -> Type). MonadAsync m => Applicative (t m)) => IsStream (t :: (Type -> TYPE LiftedRep) -> Type -> TYPE LiftedRep)
Engines
streamlyEngine :: (Foldable g, Functor g) => (forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)) -> MapReduceFold y k c (SerialT Identity) x d Source #
map-reduce-fold builder returning a SerialT Identity d result
streamlyEngineM :: (IsStream t, Monad m, MonadAsync m, Traversable g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFoldM m y k c (t m) x d Source #
effectful map-reduce-fold engine returning a (Istream t => t m d) result The MonadAsync constraint here more or less requires us to run in IO, or something IO like.
concurrentStreamlyEngine :: forall tIn tOut m g y k c x d. (IsStream tIn, IsStream tOut, MonadAsync m, Foldable g, Functor g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFold y k c (tOut m) x d Source #
possibly (depending on chosen stream types) concurrent map-reduce-fold builder returning an (Istream t, MonadAsync m) => t m d result
Streamly Combinators
toStreamlyFold :: Monad m => Fold a b -> Fold m a b Source #
convert a Control.Foldl Fold into a Streamly.Data.Fold fold
toStreamlyFoldM :: Functor m => FoldM m a b -> Fold m a b Source #
convert a Control.Foldl FoldM into a Streamly.Data.Fold fold
Result Extraction
resultToList :: (Monad m, IsStream t) => t m a -> m [a] Source #
make a stream into an (effectful) []
concatStreamFold :: Monoid b => Fold a (SerialT Identity b) -> Fold a b Source #
mappend everything in a pure Streamly fold
concatStreamFoldM :: (Monad m, Monoid b, IsStream t) => FoldM m a (t m b) -> FoldM m a b Source #
mappend everything in an effectful Streamly fold.
concatConcurrentStreamFold :: (Monad m, Monoid b, IsStream t) => Fold a (t m b) -> FoldM m a b Source #
mappend everything in a concurrent Streamly fold.
groupBy Functions
groupByHashableKey :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c) by hashable key.
NB: this function uses the fact that SerialT m is a monad
groupByOrderedKey :: (Monad m, Ord k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c) by ordered key.
NB: this function uses the fact that SerialT m is a monad
groupByHashableKeyST :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c) by hashable key. Uses mutable hashtables running in the ST monad.
NB: this function uses the fact that SerialT m is a monad
groupByDiscriminatedKey :: (Monad m, Grouping k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c) by key with instance of Grouping from http://hackage.haskell.org/package/discrimination.
NB: this function uses the fact that SerialT m is a monad
Re-Exports
data SerialT (m :: Type -> TYPE LiftedRep) a #
For SerialT streams:
(<>) =serial--Semigroup(>>=) = flip .concatMapWithserial--Monad
A single Monad bind behaves like a for loop:
>>>:{Stream.toList $ do x <- Stream.fromList [1,2] -- foreach x in stream return x :} [1,2]
Nested monad binds behave like nested for loops:
>>>:{Stream.toList $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(1,4),(2,3),(2,4)]
Since: 0.2.0 (Streamly)
Since: streamly-0.8.0
Instances
data WSerialT (m :: Type -> TYPE LiftedRep) a #
For WSerialT streams:
(<>) =wSerial--Semigroup(>>=) = flip .concatMapWithwSerial--Monad
Note that <> is associative only if we disregard the ordering of elements
in the resulting stream.
A single Monad bind behaves like a for loop:
>>>:{Stream.toList $ Stream.fromWSerial $ do x <- Stream.fromList [1,2] -- foreach x in stream return x :} [1,2]
Nested monad binds behave like interleaved nested for loops:
>>>:{Stream.toList $ Stream.fromWSerial $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(2,3),(1,4),(2,4)]
It is a result of interleaving all the nested iterations corresponding to
element 1 in the first stream with all the nested iterations of element
2:
>>>import Streamly.Prelude (wSerial)>>>Stream.toList $ Stream.fromList [(1,3),(1,4)] `Stream.wSerial` Stream.fromList [(2,3),(2,4)][(1,3),(2,3),(1,4),(2,4)]
The W in the name stands for wide or breadth wise scheduling in
contrast to the depth wise scheduling behavior of SerialT.
Since: 0.2.0 (Streamly)
Since: streamly-0.8.0
Instances
data AheadT (m :: Type -> TYPE LiftedRep) a #
For AheadT streams:
(<>) =ahead(>>=) = flip .concatMapWithahead
A single Monad bind behaves like a for loop with iterations executed
concurrently, ahead of time, producing side effects of iterations out of
order, but results in order:
>>>:{Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [2,1]
Nested monad binds behave like nested for loops with nested iterations
executed concurrently, ahead of time:
>>>:{Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,5,4,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1 in the first stream constitute one output stream and all
the iterations corresponding to 2 constitute another output stream and
these two output streams are merged using ahead.
Since: 0.3.0 (Streamly)
Since: streamly-0.8.0
Instances
| IsStream AheadT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
| MonadTrans AheadT | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
| (MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) | |
| (MonadState s m, MonadAsync m) => MonadState s (AheadT m) | |
| (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
| (MonadIO m, MonadAsync m) => MonadIO (AheadT m) | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
| (Monad m, MonadAsync m) => Applicative (AheadT m) | |
| Monad m => Functor (AheadT m) | |
| MonadAsync m => Monad (AheadT m) | |
| (MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
| MonadAsync m => Monoid (AheadT m a) | |
| MonadAsync m => Semigroup (AheadT m a) | |
data AsyncT (m :: Type -> TYPE LiftedRep) a #
For AsyncT streams:
(<>) =async(>>=) = flip .concatMapWithasync
A single Monad bind behaves like a for loop with iterations of the loop
executed concurrently a la the async combinator, producing results and
side effects of iterations out of order:
>>>:{Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for loops with nested iterations
executed concurrently, a la the async combinator:
>>>:{Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1 in the first stream constitute one output stream and all
the iterations corresponding to 2 constitute another output stream and
these two output streams are merged using async.
Since: 0.1.0 (Streamly)
Since: streamly-0.8.0
Instances
| IsStream AsyncT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
| MonadTrans AsyncT | |
Defined in Streamly.Internal.Data.Stream.Async | |
| (MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) | |
| (MonadState s m, MonadAsync m) => MonadState s (AsyncT m) | |
| (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
| (MonadIO m, MonadAsync m) => MonadIO (AsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
| (Monad m, MonadAsync m) => Applicative (AsyncT m) | |
| Monad m => Functor (AsyncT m) | |
| MonadAsync m => Monad (AsyncT m) | |
| (MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
| MonadAsync m => Monoid (AsyncT m a) | |
| MonadAsync m => Semigroup (AsyncT m a) | |
data WAsyncT (m :: Type -> TYPE LiftedRep) a #
For WAsyncT streams:
(<>) =wAsync(>>=) = flip .concatMapWithwAsync
A single Monad bind behaves like a for loop with iterations of the loop
executed concurrently a la the wAsync combinator, producing results and
side effects of iterations out of order:
>>>:{Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for loops with nested iterations
executed concurrently, a la the wAsync combinator:
>>>:{Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1 in the first stream constitute one WAsyncT output
stream and all the iterations corresponding to 2 constitute another
WAsyncT output stream and these two output streams are merged using
wAsync.
The W in the name stands for wide or breadth wise scheduling in
contrast to the depth wise scheduling behavior of AsyncT.
Since: 0.2.0 (Streamly)
Since: streamly-0.8.0
Instances
| IsStream WAsyncT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
| MonadTrans WAsyncT | |
Defined in Streamly.Internal.Data.Stream.Async | |
| (MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) | |
| (MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) | |
| (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
| (MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
| (Monad m, MonadAsync m) => Applicative (WAsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
| Monad m => Functor (WAsyncT m) | |
| MonadAsync m => Monad (WAsyncT m) | |
| (MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
| MonadAsync m => Monoid (WAsyncT m a) | |
| MonadAsync m => Semigroup (WAsyncT m a) | |
data ParallelT (m :: Type -> TYPE LiftedRep) a #
For ParallelT streams:
(<>) =parallel(>>=) = flip .concatMapWithparallel
See AsyncT, ParallelT is similar except that all
iterations are strictly concurrent while in AsyncT it depends on the
consumer demand and available threads. See parallel for more details.
Since: 0.1.0 (Streamly)
Since: 0.7.0 (maxBuffer applies to ParallelT streams)
Since: streamly-0.8.0
Instances
| IsStream ParallelT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
| MonadTrans ParallelT | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
| (MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) | |
| (MonadState s m, MonadAsync m) => MonadState s (ParallelT m) | |
| (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
| (MonadIO m, MonadAsync m) => MonadIO (ParallelT m) | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
| (Monad m, MonadAsync m) => Applicative (ParallelT m) | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
| Monad m => Functor (ParallelT m) | |
| MonadAsync m => Monad (ParallelT m) | |
| (MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
| MonadAsync m => Monoid (ParallelT m a) | |
| MonadAsync m => Semigroup (ParallelT m a) | |
type MonadAsync (m :: Type -> Type) = (MonadIO m, MonadBaseControl IO m, MonadThrow m) #
A monad that can perform concurrent or parallel IO operations. Streams
that can be composed concurrently require the underlying monad to be
MonadAsync.
Since: 0.1.0 (Streamly)
Since: streamly-0.8.0
class (forall (m :: Type -> Type) a. MonadAsync m => Semigroup (t m a), forall (m :: Type -> Type) a. MonadAsync m => Monoid (t m a), forall (m :: Type -> Type). Monad m => Functor (t m), forall (m :: Type -> Type). MonadAsync m => Applicative (t m)) => IsStream (t :: (Type -> TYPE LiftedRep) -> Type -> TYPE LiftedRep) #
Class of types that can represent a stream of elements of some type a in
some monad m.
Since: 0.2.0 (Streamly)
Since: streamly-0.8.0
Minimal complete definition
Instances
| IsStream AheadT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
| IsStream AsyncT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
| IsStream WAsyncT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
| IsStream ParallelT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
| IsStream SerialT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
| IsStream WSerialT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
| IsStream ZipSerialM | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type Methods toStream :: forall (m :: Type -> TYPE LiftedRep) a. ZipSerialM m a -> Stream m a # fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> ZipSerialM m a # consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a # (|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a # | |
| IsStream ZipAsyncM | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |