| Copyright | (c) 2017 Composewell Technologies |
|---|---|
| License | BSD3 |
| Maintainer | streamly@composewell.com |
| Stability | experimental |
| Portability | GHC |
| Safe Haskell | Safe-Inferred |
| Language | Haskell2010 |
Streamly.Internal.Data.Stream.Parallel
Description
Deprecated: Please use Streamly.Internal.Data.Stream.Concurrent instead.
To run examples in this module:
>>>import qualified Streamly.Prelude as Stream>>>import Control.Concurrent (threadDelay)>>>:{delay n = do threadDelay (n * 1000000) -- sleep for n seconds putStrLn (show n ++ " sec") -- print "n sec" return n -- IO Int :}
Synopsis
- newtype ParallelT m a = ParallelT {
- getParallelT :: StreamK m a
- type Parallel = ParallelT IO
- consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a
- parallelK :: MonadAsync m => StreamK m a -> StreamK m a -> StreamK m a
- parallelFstK :: MonadAsync m => StreamK m a -> StreamK m a -> StreamK m a
- parallelMinK :: MonadAsync m => StreamK m a -> StreamK m a -> StreamK m a
- mkParallelD :: MonadAsync m => Stream m a -> Stream m a
- mkParallelK :: MonadAsync m => StreamK m a -> StreamK m a
- tapAsyncK :: MonadAsync m => (StreamK m a -> m b) -> StreamK m a -> StreamK m a
- tapAsyncF :: MonadAsync m => Fold m a b -> Stream m a -> Stream m a
- newCallbackStream :: MonadAsync m => m (a -> m (), StreamK m a)
Parallel Stream Type
newtype ParallelT m a Source #
For ParallelT streams:
(<>) =parallel(>>=) = flip .concatMapWithparallel
See AsyncT, ParallelT is similar except that all
iterations are strictly concurrent while in AsyncT it depends on the
consumer demand and available threads. See parallel for more details.
Since: 0.1.0 (Streamly)
Since: 0.7.0 (maxBuffer applies to ParallelT streams)
Since: 0.8.0
Constructors
| ParallelT | |
Fields
| |
Instances
consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #
XXX we can implement it more efficienty by directly implementing instead of combining streams using parallel.
Merge Concurrently
parallelFstK :: MonadAsync m => StreamK m a -> StreamK m a -> StreamK m a Source #
Like parallel but stops the output as soon as the first stream stops.
Pre-release
parallelMinK :: MonadAsync m => StreamK m a -> StreamK m a -> StreamK m a Source #
Like parallel but stops the output as soon as any of the two streams
stops.
Pre-release
Evaluate Concurrently
mkParallelD :: MonadAsync m => Stream m a -> Stream m a Source #
Same as mkParallel but for StreamD stream.
mkParallelK :: MonadAsync m => StreamK m a -> StreamK m a Source #
Like mkParallel but uses StreamK internally.
Pre-release
Tap Concurrently
tapAsyncK :: MonadAsync m => (StreamK m a -> m b) -> StreamK m a -> StreamK m a Source #
Redirect a copy of the stream to a supplied fold and run it concurrently
in an independent thread. The fold may buffer some elements. The buffer size
is determined by the prevailing maxBuffer setting.
StreamK m a -> m b
|
-----stream m a ---------------stream m a-----
> S.drain $ S.tapAsync (S.mapM_ print) (S.enumerateFromTo 1 2) 1 2
Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained.
Compare with tap.
Pre-release
tapAsyncF :: MonadAsync m => Fold m a b -> Stream m a -> Stream m a Source #
Like tapAsync but uses a Fold instead of a fold function.
Callbacks
newCallbackStream :: MonadAsync m => m (a -> m (), StreamK m a) Source #
Generates a callback and a stream pair. The callback returned is used to queue values to the stream. The stream is infinite, there is no way for the callback to indicate that it is done now.
Pre-release