| Copyright | (c) Adam Conner-Sax 2019 |
|---|---|
| License | BSD-3-Clause |
| Maintainer | adam_conner_sax@yahoo.com |
| Stability | experimental |
| Safe Haskell | None |
| Language | Haskell2010 |
Control.MapReduce.Engines.Streamly
Description
map-reduce engine (fold builder) using Streamly streams as its intermediate and return type.
Notes:
1. These are polymorphic in the return stream type. Thought the streams do have to be serial when groupBy is called
So you have to specify the stream type in the call or it has to be inferrable from the use of the result.
- There is a concurrent engine here, one that uses Streamly's concurrency features to map over the stream. I've not been able to verify that this is faster on an appropriate task with appropriate runtime settings.
Synopsis
- streamlyEngine :: (Foldable g, Functor g) => (forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)) -> MapReduceFold y k c (SerialT Identity) x d
- streamlyEngineM :: (IsStream t, Monad m, MonadAsync m, Traversable g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFoldM m y k c (t m) x d
- concurrentStreamlyEngine :: forall tIn tOut m g y k c x d. (IsStream tIn, IsStream tOut, MonadAsync m, Foldable g, Functor g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFold y k c (tOut m) x d
- resultToList :: (Monad m, IsStream t) => t m a -> m [a]
- concatStream :: (Monad m, Monoid a) => SerialT m a -> m a
- concatStreamFold :: Monoid b => Fold a (SerialT Identity b) -> Fold a b
- concatStreamFoldM :: (Monad m, Monoid b, IsStream t) => FoldM m a (t m b) -> FoldM m a b
- concatConcurrentStreamFold :: (Monad m, Monoid b, IsStream t) => Fold a (t m b) -> FoldM m a b
- groupByHashableKey :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- groupByOrderedKey :: (Monad m, Ord k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- groupByHashableKeyST :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- groupByDiscriminatedKey :: (Monad m, Grouping k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- data SerialT (m :: Type -> Type) a
- data WSerialT (m :: Type -> Type) a
- data AheadT (m :: Type -> Type) a
- data AsyncT (m :: Type -> Type) a
- data WAsyncT (m :: Type -> Type) a
- data ParallelT (m :: Type -> Type) a
- type MonadAsync (m :: Type -> Type) = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
- class IsStream (t :: (Type -> Type) -> Type -> Type)
Engines
streamlyEngine :: (Foldable g, Functor g) => (forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)) -> MapReduceFold y k c (SerialT Identity) x d Source #
map-reduce-fold builder returning a SerialT Identity d result
streamlyEngineM :: (IsStream t, Monad m, MonadAsync m, Traversable g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFoldM m y k c (t m) x d Source #
effectful map-reduce-fold engine returning a (Istream t => t m d) result The MonadAsync constraint here more or less requires us to run in IO, or something IO like.
concurrentStreamlyEngine :: forall tIn tOut m g y k c x d. (IsStream tIn, IsStream tOut, MonadAsync m, Foldable g, Functor g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFold y k c (tOut m) x d Source #
possibly (depending on chosen stream types) concurrent map-reduce-fold builder returning an (Istream t, MonadAsync m) => t m d result
Result Extraction
resultToList :: (Monad m, IsStream t) => t m a -> m [a] Source #
make a stream into an (effectful) []
concatStreamFold :: Monoid b => Fold a (SerialT Identity b) -> Fold a b Source #
mappend everything in a pure Streamly fold
concatStreamFoldM :: (Monad m, Monoid b, IsStream t) => FoldM m a (t m b) -> FoldM m a b Source #
mappend everything in an effectful Streamly fold.
concatConcurrentStreamFold :: (Monad m, Monoid b, IsStream t) => Fold a (t m b) -> FoldM m a b Source #
mappend everything in a concurrent Streamly fold.
groupBy Functions
groupByHashableKey :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c) by hashable key.
NB: this function uses the fact that SerialT m is a monad
groupByOrderedKey :: (Monad m, Ord k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c) by ordered key.
NB: this function uses the fact that SerialT m is a monad
groupByHashableKeyST :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c) by hashable key. Uses mutable hashtables running in the ST monad.
NB: this function uses the fact that SerialT m is a monad
groupByDiscriminatedKey :: (Monad m, Grouping k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c) by key with instance of Grouping from http://hackage.haskell.org/package/discrimination.
NB: this function uses the fact that SerialT m is a monad
Re-Exports
data SerialT (m :: Type -> Type) a #
Deep serial composition or serial composition with depth first traversal.
The Semigroup instance of SerialT appends two streams serially in a
depth first manner, yielding all elements from the first stream, and then
all elements from the second stream.
import Streamly import qualified Streamly.Prelude as S main = (toList.serially$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,2,3,4]
The Monad instance runs the monadic continuation for each
element of the stream, serially.
main =runStream.serially$ do x <- return 1 <> return 2 S.yieldM $ print x
1 2
SerialT nests streams serially in a depth first manner.
main =runStream.serially$ do x <- return 1 <> return 2 y <- return 3 <> return 4 S.yieldM $ print (x, y)
(1,3) (1,4) (2,3) (2,4)
This behavior of SerialT is exactly like a list transformer. We call the
monadic code being run for each element of the stream a monadic
continuation. In imperative paradigm we can think of this composition as
nested for loops and the monadic continuation is the body of the loop. The
loop iterates for all elements of the stream.
The serially combinator can be omitted as the default stream type is
SerialT.
Note that serial composition with depth first traversal can be used to
combine an infinite number of streams as it explores only one stream at a
time.
Since: streamly-0.2.0
Instances
data WSerialT (m :: Type -> Type) a #
Wide serial composition or serial composition with a breadth first
traversal. The Semigroup instance of WSerialT traverses
the two streams in a breadth first manner. In other words, it interleaves
two streams, yielding one element from each stream alternately.
import Streamly import qualified Streamly.Prelude as S main = (toList.wSerially$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]
Similarly, the Monad instance interleaves the iterations of the
inner and the outer loop, nesting loops in a breadth first manner.
main =runStream.wSerially$ do x <- return 1 <> return 2 y <- return 3 <> return 4 S.yieldM $ print (x, y)
(1,3) (2,3) (1,4) (2,4)
Note that a serial composition with breadth first traversal can only combine a finite number of streams as it needs to retain state for each unfinished stream.
Since: streamly-0.2.0
Instances
data AheadT (m :: Type -> Type) a #
Deep ahead composition or ahead composition with depth first traversal.
The semigroup composition of AheadT appends streams in a depth first
manner just like SerialT except that it can produce elements concurrently
ahead of time. It is like AsyncT except that AsyncT produces the output
as it arrives whereas AheadT orders the output in the traversal order.
main = (toList.aheadly$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,2,3,4]
Any exceptions generated by a constituent stream are propagated to the output stream.
Similarly, the monad instance of AheadT may run each iteration
concurrently ahead of time but presents the results in the same order as
SerialT.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main =runStream.aheadly$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
All iterations may run in the same thread if they do not block.
Note that ahead composition with depth first traversal can be used to combine infinite number of streams as it explores only a bounded number of streams at a time.
Since: streamly-0.3.0
Instances
| MonadTrans AheadT | |
Defined in Streamly.Streams.Ahead | |
| IsStream AheadT | |
Defined in Streamly.Streams.Ahead Methods toStream :: AheadT m a -> Stream m a fromStream :: Stream m a -> AheadT m a consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a # (|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a # | |
| (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) | |
Defined in Streamly.Streams.Ahead | |
| (MonadState s m, MonadAsync m) => MonadState s (AheadT m) | |
| (MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) | |
| MonadAsync m => Monad (AheadT m) | |
| Monad m => Functor (AheadT m) | |
| (Monad m, MonadAsync m) => Applicative (AheadT m) | |
| (MonadIO m, MonadAsync m) => MonadIO (AheadT m) | |
Defined in Streamly.Streams.Ahead | |
| (MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) | |
Defined in Streamly.Streams.Ahead | |
| MonadAsync m => Semigroup (AheadT m a) | |
| MonadAsync m => Monoid (AheadT m a) | |
data AsyncT (m :: Type -> Type) a #
Deep async composition or async composition with depth first traversal. In
a left to right Semigroup composition it tries to yield elements from the
left stream as long as it can, but it can run the right stream in parallel
if it needs to, based on demand. The right stream can be run if the left
stream blocks on IO or cannot produce elements fast enough for the consumer.
main = (toList.asyncly$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,2,3,4]
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the monad instance of AsyncT may run each iteration
concurrently based on demand. More concurrent iterations are started only
if the previous iterations are not able to produce enough output for the
consumer.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main =runStream.asyncly$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
All iterations may run in the same thread if they do not block.
Note that async composition with depth first traversal can be used to combine infinite number of streams as it explores only a bounded number of streams at a time.
Since: streamly-0.1.0
Instances
| MonadTrans AsyncT | |
Defined in Streamly.Streams.Async | |
| IsStream AsyncT | |
Defined in Streamly.Streams.Async Methods toStream :: AsyncT m a -> Stream m a fromStream :: Stream m a -> AsyncT m a consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a # (|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a # | |
| (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) | |
Defined in Streamly.Streams.Async | |
| (MonadState s m, MonadAsync m) => MonadState s (AsyncT m) | |
| (MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) | |
| MonadAsync m => Monad (AsyncT m) | |
| Monad m => Functor (AsyncT m) | |
| (Monad m, MonadAsync m) => Applicative (AsyncT m) | |
| (MonadIO m, MonadAsync m) => MonadIO (AsyncT m) | |
Defined in Streamly.Streams.Async | |
| (MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) | |
Defined in Streamly.Streams.Async | |
| MonadAsync m => Semigroup (AsyncT m a) | |
| MonadAsync m => Monoid (AsyncT m a) | |
data WAsyncT (m :: Type -> Type) a #
Wide async composition or async composition with breadth first traversal.
The Semigroup instance of WAsyncT concurrently traverses the composed
streams using a depth first travesal or in a round robin fashion, yielding
elements from both streams alternately.
main = (toList.wAsyncly$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the Monad instance of WAsyncT runs all iterations fairly
concurrently using a round robin scheduling.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main =runStream.wAsyncly$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Unlike AsyncT all iterations are guaranteed to run fairly
concurrently, unconditionally.
Note that async composition with breadth first traversal can only combine a finite number of streams as it needs to retain state for each unfinished stream.
Since: streamly-0.2.0
Instances
| MonadTrans WAsyncT | |
Defined in Streamly.Streams.Async | |
| IsStream WAsyncT | |
Defined in Streamly.Streams.Async Methods toStream :: WAsyncT m a -> Stream m a fromStream :: Stream m a -> WAsyncT m a consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a # (|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a # | |
| (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) | |
Defined in Streamly.Streams.Async | |
| (MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) | |
| (MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) | |
| MonadAsync m => Monad (WAsyncT m) | |
| Monad m => Functor (WAsyncT m) | |
| (Monad m, MonadAsync m) => Applicative (WAsyncT m) | |
| (MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) | |
Defined in Streamly.Streams.Async | |
| (MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) | |
Defined in Streamly.Streams.Async | |
| MonadAsync m => Semigroup (WAsyncT m a) | |
| MonadAsync m => Monoid (WAsyncT m a) | |
data ParallelT (m :: Type -> Type) a #
Async composition with simultaneous traversal of all streams.
The Semigroup instance of ParallelT concurrently merges two streams,
running both strictly concurrently and yielding elements from both streams
as they arrive. When multiple streams are combined using ParallelT each
one is evaluated in its own thread and the results produced are presented in
the combined stream on a first come first serve basis.
AsyncT and WAsyncT are concurrent lookahead streams each with a
specific type of consumption pattern (depth first or breadth first). Since
they are lookahead, they may introduce certain default latency in starting
more concurrent tasks for efficiency reasons or may put a default limitation
on the resource consumption (e.g. number of concurrent threads for
lookahead). If we look at the implementation detail, they both can share a
pool of worker threads to evaluate the streams in the desired pattern and at
the desired rate. However, ParallelT uses a separate runtime thread to
evaluate each stream.
WAsyncT is similar to ParallelT, as both of them evaluate the
constituent streams fairly in a round robin fashion.
However, the key difference is that WAsyncT is lazy or pull driven
whereas ParallelT is strict or push driven. ParallelT immediately
starts concurrent evaluation of both the streams (in separate threads) and
later picks the results whereas WAsyncT may wait for a certain latency
threshold before initiating concurrent evaluation of the next stream. The
concurrent scheduling of the next stream or the degree of concurrency is
driven by the feedback from the consumer. In case of ParallelT each stream
is evaluated in a separate thread and results are pushed to a shared
output buffer, the evaluation rate is controlled by blocking when the buffer
is full.
Concurrent lookahead streams are generally more efficient than
ParallelT and can work pretty efficiently even for smaller tasks because
they do not necessarily use a separate thread for each task. So they should
be preferred over ParallelT especially when efficiency is a concern and
simultaneous strict evaluation is not a requirement. ParallelT is useful
for cases when the streams are required to be evaluated simultaneously
irrespective of how the consumer consumes them e.g. when we want to race
two tasks and want to start both strictly at the same time or if we have
timers in the parallel tasks and our results depend on the timers being
started at the same time. We can say that ParallelT is almost the same
(modulo some implementation differences) as WAsyncT when the latter is
used with unlimited lookahead and zero latency in initiating lookahead.
main = (toList.parallely$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]
When streams with more than one element are merged, it yields whichever
stream yields first without any bias, unlike the Async style streams.
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the Monad instance of ParallelT runs all iterations
of the loop concurrently.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main =runStream.parallely$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Note that parallel composition can only combine a finite number of streams as it needs to retain state for each unfinished stream.
Since: streamly-0.1.0
Instances
| MonadTrans ParallelT | |
Defined in Streamly.Streams.Parallel | |
| IsStream ParallelT | |
Defined in Streamly.Streams.Parallel Methods toStream :: ParallelT m a -> Stream m a fromStream :: Stream m a -> ParallelT m a consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a # (|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a # | |
| (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) | |
Defined in Streamly.Streams.Parallel | |
| (MonadState s m, MonadAsync m) => MonadState s (ParallelT m) | |
| (MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) | |
| MonadAsync m => Monad (ParallelT m) | |
| Monad m => Functor (ParallelT m) | |
| (Monad m, MonadAsync m) => Applicative (ParallelT m) | |
Defined in Streamly.Streams.Parallel | |
| (MonadIO m, MonadAsync m) => MonadIO (ParallelT m) | |
Defined in Streamly.Streams.Parallel | |
| (MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) | |
Defined in Streamly.Streams.Parallel | |
| MonadAsync m => Semigroup (ParallelT m a) | |
| MonadAsync m => Monoid (ParallelT m a) | |
type MonadAsync (m :: Type -> Type) = (MonadIO m, MonadBaseControl IO m, MonadThrow m) #
A monad that can perform concurrent or parallel IO operations. Streams
that can be composed concurrently require the underlying monad to be
MonadAsync.
Since: streamly-0.1.0
class IsStream (t :: (Type -> Type) -> Type -> Type) #
Class of types that can represent a stream of elements of some type a in
some monad m.
Since: streamly-0.2.0
Instances
| IsStream Stream | |
Defined in Streamly.Streams.StreamK.Type Methods toStream :: Stream m a -> Stream m a fromStream :: Stream m a -> Stream m a consM :: MonadAsync m => m a -> Stream m a -> Stream m a # (|:) :: MonadAsync m => m a -> Stream m a -> Stream m a # | |
| IsStream AheadT | |
Defined in Streamly.Streams.Ahead Methods toStream :: AheadT m a -> Stream m a fromStream :: Stream m a -> AheadT m a consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a # (|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a # | |
| IsStream ZipSerialM | |
Defined in Streamly.Streams.Zip Methods toStream :: ZipSerialM m a -> Stream m a fromStream :: Stream m a -> ZipSerialM m a consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a # (|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a # | |
| IsStream ZipAsyncM | |
Defined in Streamly.Streams.Zip Methods toStream :: ZipAsyncM m a -> Stream m a fromStream :: Stream m a -> ZipAsyncM m a consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a # (|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a # | |
| IsStream AsyncT | |
Defined in Streamly.Streams.Async Methods toStream :: AsyncT m a -> Stream m a fromStream :: Stream m a -> AsyncT m a consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a # (|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a # | |
| IsStream WAsyncT | |
Defined in Streamly.Streams.Async Methods toStream :: WAsyncT m a -> Stream m a fromStream :: Stream m a -> WAsyncT m a consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a # (|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a # | |
| IsStream ParallelT | |
Defined in Streamly.Streams.Parallel Methods toStream :: ParallelT m a -> Stream m a fromStream :: Stream m a -> ParallelT m a consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a # (|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a # | |
| IsStream SerialT | |
Defined in Streamly.Streams.Serial Methods toStream :: SerialT m a -> Stream m a fromStream :: Stream m a -> SerialT m a consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a # (|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a # | |
| IsStream WSerialT | |
Defined in Streamly.Streams.Serial Methods toStream :: WSerialT m a -> Stream m a fromStream :: Stream m a -> WSerialT m a consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a # (|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a # | |