Copyright | (c) Adam Conner-Sax 2019 |
---|---|
License | BSD-3-Clause |
Maintainer | adam_conner_sax@yahoo.com |
Stability | experimental |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
map-reduce engine (fold builder) using Streamly
streams as its intermediate and return type.
Notes:
1. These are polymorphic in the return stream type. Thought the streams do have to be serial
when groupBy
is called
So you have to specify the stream type in the call or it has to be inferrable from the use of the result.
- There is a concurrent engine here, one that uses Streamly's concurrency features to map over the stream. I've not been able to verify that this is faster on an appropriate task with appropriate runtime settings.
Synopsis
- streamlyEngine :: (Foldable g, Functor g) => (forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)) -> MapReduceFold y k c (SerialT Identity) x d
- streamlyEngineM :: (IsStream t, Monad m, MonadAsync m, Traversable g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFoldM m y k c (t m) x d
- concurrentStreamlyEngine :: forall tIn tOut m g y k c x d. (IsStream tIn, IsStream tOut, MonadAsync m, Foldable g, Functor g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFold y k c (tOut m) x d
- toStreamlyFold :: Monad m => Fold a b -> Fold m a b
- toStreamlyFoldM :: Functor m => FoldM m a b -> Fold m a b
- resultToList :: (Monad m, IsStream t) => t m a -> m [a]
- concatStream :: (Monad m, Monoid a) => SerialT m a -> m a
- concatStreamFold :: Monoid b => Fold a (SerialT Identity b) -> Fold a b
- concatStreamFoldM :: (Monad m, Monoid b, IsStream t) => FoldM m a (t m b) -> FoldM m a b
- concatConcurrentStreamFold :: (Monad m, Monoid b, IsStream t) => Fold a (t m b) -> FoldM m a b
- groupByHashableKey :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- groupByOrderedKey :: (Monad m, Ord k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- groupByHashableKeyST :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- groupByDiscriminatedKey :: (Monad m, Grouping k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- data SerialT (m :: Type -> TYPE LiftedRep) a
- data WSerialT (m :: Type -> TYPE LiftedRep) a
- data AheadT (m :: Type -> TYPE LiftedRep) a
- data AsyncT (m :: Type -> TYPE LiftedRep) a
- data WAsyncT (m :: Type -> TYPE LiftedRep) a
- data ParallelT (m :: Type -> TYPE LiftedRep) a
- type MonadAsync (m :: Type -> Type) = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
- class (forall (m :: Type -> Type) a. MonadAsync m => Semigroup (t m a), forall (m :: Type -> Type) a. MonadAsync m => Monoid (t m a), forall (m :: Type -> Type). Monad m => Functor (t m), forall (m :: Type -> Type). MonadAsync m => Applicative (t m)) => IsStream (t :: (Type -> TYPE LiftedRep) -> Type -> TYPE LiftedRep)
Engines
streamlyEngine :: (Foldable g, Functor g) => (forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)) -> MapReduceFold y k c (SerialT Identity) x d Source #
map-reduce-fold builder returning a SerialT Identity d
result
streamlyEngineM :: (IsStream t, Monad m, MonadAsync m, Traversable g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFoldM m y k c (t m) x d Source #
effectful map-reduce-fold engine returning a (Istream t => t m d) result The MonadAsync constraint here more or less requires us to run in IO, or something IO like.
concurrentStreamlyEngine :: forall tIn tOut m g y k c x d. (IsStream tIn, IsStream tOut, MonadAsync m, Foldable g, Functor g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFold y k c (tOut m) x d Source #
possibly (depending on chosen stream types) concurrent map-reduce-fold builder returning an (Istream t, MonadAsync m) => t m d
result
Streamly Combinators
toStreamlyFold :: Monad m => Fold a b -> Fold m a b Source #
convert a Control.Foldl Fold into a Streamly.Data.Fold fold
toStreamlyFoldM :: Functor m => FoldM m a b -> Fold m a b Source #
convert a Control.Foldl FoldM into a Streamly.Data.Fold fold
Result Extraction
resultToList :: (Monad m, IsStream t) => t m a -> m [a] Source #
make a stream into an (effectful) []
concatStreamFold :: Monoid b => Fold a (SerialT Identity b) -> Fold a b Source #
mappend everything in a pure Streamly fold
concatStreamFoldM :: (Monad m, Monoid b, IsStream t) => FoldM m a (t m b) -> FoldM m a b Source #
mappend everything in an effectful Streamly fold.
concatConcurrentStreamFold :: (Monad m, Monoid b, IsStream t) => Fold a (t m b) -> FoldM m a b Source #
mappend everything in a concurrent Streamly fold.
groupBy
Functions
groupByHashableKey :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c)
by hashable
key.
NB: this function uses the fact that SerialT m
is a monad
groupByOrderedKey :: (Monad m, Ord k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c)
by ordered key.
NB: this function uses the fact that SerialT m
is a monad
groupByHashableKeyST :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c)
by hashable
key. Uses mutable hashtables running in the ST monad.
NB: this function uses the fact that SerialT m
is a monad
groupByDiscriminatedKey :: (Monad m, Grouping k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c)
by key with instance of Grouping from http://hackage.haskell.org/package/discrimination.
NB: this function uses the fact that SerialT m
is a monad
Re-Exports
data SerialT (m :: Type -> TYPE LiftedRep) a #
For SerialT
streams:
(<>) =serial
--Semigroup
(>>=) = flip .concatMapWith
serial
--Monad
A single Monad
bind behaves like a for
loop:
>>>
:{
Stream.toList $ do x <- Stream.fromList [1,2] -- foreach x in stream return x :} [1,2]
Nested monad binds behave like nested for
loops:
>>>
:{
Stream.toList $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(1,4),(2,3),(2,4)]
Since: 0.2.0 (Streamly)
Since: streamly-0.8.0
Instances
data WSerialT (m :: Type -> TYPE LiftedRep) a #
For WSerialT
streams:
(<>) =wSerial
--Semigroup
(>>=) = flip .concatMapWith
wSerial
--Monad
Note that <>
is associative only if we disregard the ordering of elements
in the resulting stream.
A single Monad
bind behaves like a for
loop:
>>>
:{
Stream.toList $ Stream.fromWSerial $ do x <- Stream.fromList [1,2] -- foreach x in stream return x :} [1,2]
Nested monad binds behave like interleaved nested for
loops:
>>>
:{
Stream.toList $ Stream.fromWSerial $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(2,3),(1,4),(2,4)]
It is a result of interleaving all the nested iterations corresponding to
element 1
in the first stream with all the nested iterations of element
2
:
>>>
import Streamly.Prelude (wSerial)
>>>
Stream.toList $ Stream.fromList [(1,3),(1,4)] `Stream.wSerial` Stream.fromList [(2,3),(2,4)]
[(1,3),(2,3),(1,4),(2,4)]
The W
in the name stands for wide
or breadth wise scheduling in
contrast to the depth wise scheduling behavior of SerialT
.
Since: 0.2.0 (Streamly)
Since: streamly-0.8.0
Instances
data AheadT (m :: Type -> TYPE LiftedRep) a #
For AheadT
streams:
(<>) =ahead
(>>=) = flip .concatMapWith
ahead
A single Monad
bind behaves like a for
loop with iterations executed
concurrently, ahead of time, producing side effects of iterations out of
order, but results in order:
>>>
:{
Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [2,1]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, ahead of time:
>>>
:{
Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,5,4,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one output stream and all
the iterations corresponding to 2
constitute another output stream and
these two output streams are merged using ahead
.
Since: 0.3.0 (Streamly)
Since: streamly-0.8.0
Instances
IsStream AheadT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
MonadTrans AheadT | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) | |
(MonadState s m, MonadAsync m) => MonadState s (AheadT m) | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
(MonadIO m, MonadAsync m) => MonadIO (AheadT m) | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
(Monad m, MonadAsync m) => Applicative (AheadT m) | |
Monad m => Functor (AheadT m) | |
MonadAsync m => Monad (AheadT m) | |
(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
MonadAsync m => Monoid (AheadT m a) | |
MonadAsync m => Semigroup (AheadT m a) | |
data AsyncT (m :: Type -> TYPE LiftedRep) a #
For AsyncT
streams:
(<>) =async
(>>=) = flip .concatMapWith
async
A single Monad
bind behaves like a for
loop with iterations of the loop
executed concurrently a la the async
combinator, producing results and
side effects of iterations out of order:
>>>
:{
Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, a la the async
combinator:
>>>
:{
Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one output stream and all
the iterations corresponding to 2
constitute another output stream and
these two output streams are merged using async
.
Since: 0.1.0 (Streamly)
Since: streamly-0.8.0
Instances
IsStream AsyncT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
MonadTrans AsyncT | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) | |
(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
(Monad m, MonadAsync m) => Applicative (AsyncT m) | |
Monad m => Functor (AsyncT m) | |
MonadAsync m => Monad (AsyncT m) | |
(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
MonadAsync m => Monoid (AsyncT m a) | |
MonadAsync m => Semigroup (AsyncT m a) | |
data WAsyncT (m :: Type -> TYPE LiftedRep) a #
For WAsyncT
streams:
(<>) =wAsync
(>>=) = flip .concatMapWith
wAsync
A single Monad
bind behaves like a for
loop with iterations of the loop
executed concurrently a la the wAsync
combinator, producing results and
side effects of iterations out of order:
>>>
:{
Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, a la the wAsync
combinator:
>>>
:{
Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one WAsyncT
output
stream and all the iterations corresponding to 2
constitute another
WAsyncT
output stream and these two output streams are merged using
wAsync
.
The W
in the name stands for wide
or breadth wise scheduling in
contrast to the depth wise scheduling behavior of AsyncT
.
Since: 0.2.0 (Streamly)
Since: streamly-0.8.0
Instances
IsStream WAsyncT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
MonadTrans WAsyncT | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) | |
(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
(Monad m, MonadAsync m) => Applicative (WAsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
Monad m => Functor (WAsyncT m) | |
MonadAsync m => Monad (WAsyncT m) | |
(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
MonadAsync m => Monoid (WAsyncT m a) | |
MonadAsync m => Semigroup (WAsyncT m a) | |
data ParallelT (m :: Type -> TYPE LiftedRep) a #
For ParallelT
streams:
(<>) =parallel
(>>=) = flip .concatMapWith
parallel
See AsyncT
, ParallelT
is similar except that all
iterations are strictly concurrent while in AsyncT
it depends on the
consumer demand and available threads. See parallel
for more details.
Since: 0.1.0 (Streamly)
Since: 0.7.0 (maxBuffer applies to ParallelT streams)
Since: streamly-0.8.0
Instances
IsStream ParallelT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
MonadTrans ParallelT | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) | |
(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(Monad m, MonadAsync m) => Applicative (ParallelT m) | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
Monad m => Functor (ParallelT m) | |
MonadAsync m => Monad (ParallelT m) | |
(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
MonadAsync m => Monoid (ParallelT m a) | |
MonadAsync m => Semigroup (ParallelT m a) | |
type MonadAsync (m :: Type -> Type) = (MonadIO m, MonadBaseControl IO m, MonadThrow m) #
A monad that can perform concurrent or parallel IO operations. Streams
that can be composed concurrently require the underlying monad to be
MonadAsync
.
Since: 0.1.0 (Streamly)
Since: streamly-0.8.0
class (forall (m :: Type -> Type) a. MonadAsync m => Semigroup (t m a), forall (m :: Type -> Type) a. MonadAsync m => Monoid (t m a), forall (m :: Type -> Type). Monad m => Functor (t m), forall (m :: Type -> Type). MonadAsync m => Applicative (t m)) => IsStream (t :: (Type -> TYPE LiftedRep) -> Type -> TYPE LiftedRep) #
Class of types that can represent a stream of elements of some type a
in
some monad m
.
Since: 0.2.0 (Streamly)
Since: streamly-0.8.0
Instances
IsStream AheadT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
IsStream AsyncT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
IsStream WAsyncT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
IsStream ParallelT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
IsStream SerialT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
IsStream WSerialT | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type | |
IsStream ZipSerialM | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type toStream :: forall (m :: Type -> TYPE LiftedRep) a. ZipSerialM m a -> Stream m a # fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> ZipSerialM m a # consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a # (|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a # | |
IsStream ZipAsyncM | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type |