| Copyright | (c) 2017 Composewell Technologies | 
|---|---|
| License | BSD-3-Clause | 
| Maintainer | streamly@composewell.com | 
| Stability | experimental | 
| Portability | GHC | 
| Safe Haskell | Safe-Inferred | 
| Language | Haskell2010 | 
Streamly.Internal.Data.Stream.IsStream.Transform
Contents
Description
Synopsis
- transform :: (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b
- foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
- foldrSShared :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
- foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> t m a -> s m b
- map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b
- sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a
- mapM :: forall t m a b. (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b
- smapM :: (IsStream t, Monad m) => (s -> a -> m (s, b)) -> m s -> t m a -> t m b
- trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a
- trace_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
- tap :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m a
- tapOffsetEvery :: (IsStream t, Monad m) => Int -> Int -> Fold m a b -> t m a -> t m a
- tapAsync :: (IsStream t, MonadAsync m) => Fold m a b -> t m a -> t m a
- tapAsyncK :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> t m a
- distributeAsync_ :: (Foldable f, IsStream t, MonadAsync m) => f (t m a -> m b) -> t m a -> t m a
- tapRate :: (IsStream t, MonadAsync m, MonadCatch m) => Double -> (Int -> m b) -> t m a -> t m a
- pollCounts :: (IsStream t, MonadAsync m) => (a -> Bool) -> (t m Int -> t m Int) -> Fold m Int b -> t m a -> t m a
- scan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
- postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
- scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
- scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
- scanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
- postscanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
- postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
- prescanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
- prescanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
- scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a
- scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a
- with :: forall (t :: (Type -> Type) -> Type -> Type) m a b s. Functor (t m) => (t m a -> t m (s, a)) -> (((s, a) -> b) -> t m (s, a) -> t m (s, a)) -> ((s, a) -> b) -> t m a -> t m a
- deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a
- filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
- filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
- uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a
- uniqBy :: (IsStream t, Monad m, Functor (t m)) => (a -> a -> Bool) -> t m a -> t m a
- nubBy :: (a -> a -> Bool) -> t m a -> t m a
- nubWindowBy :: Int -> (a -> a -> Bool) -> t m a -> t m a
- prune :: (a -> Bool) -> t m a -> t m a
- repeated :: t m a -> t m a
- take :: (IsStream t, Monad m) => Int -> t m a -> t m a
- takeInterval :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a
- takeLast :: Int -> t m a -> t m a
- takeLastInterval :: Double -> t m a -> t m a
- takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
- takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
- takeWhileLast :: (a -> Bool) -> t m a -> t m a
- takeWhileAround :: (a -> Bool) -> t m a -> t m a
- drop :: (IsStream t, Monad m) => Int -> t m a -> t m a
- dropInterval :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a
- dropLast :: Int -> t m a -> t m a
- dropLastInterval :: Int -> t m a -> t m a
- dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
- dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
- dropWhileLast :: (a -> Bool) -> t m a -> t m a
- dropWhileAround :: (a -> Bool) -> t m a -> t m a
- intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a
- intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
- intersperseBySpan :: Int -> m a -> t m a -> t m a
- intersperseSuffix :: (IsStream t, Monad m) => m a -> t m a -> t m a
- intersperseSuffixBySpan :: (IsStream t, Monad m) => Int -> m a -> t m a -> t m a
- interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a
- intersperseM_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
- delay :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
- intersperseSuffix_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
- delayPost :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
- interspersePrefix_ :: (IsStream t, MonadAsync m) => m b -> t m a -> t m a
- delayPre :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
- insertBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> a -> t m a -> t m a
- reverse :: (IsStream t, Monad m) => t m a -> t m a
- reverse' :: (IsStream t, MonadIO m, Storable a) => t m a -> t m a
- reassembleBy :: Fold m a b -> (a -> a -> Int) -> t m a -> t m b
- indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a)
- indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a)
- timestamped :: (IsStream t, MonadAsync m, Functor (t m)) => t m a -> t m (AbsTime, a)
- timestampWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m (AbsTime, a)
- timeIndexed :: (IsStream t, MonadAsync m, Functor (t m)) => t m a -> t m (RelTime64, a)
- timeIndexWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m (RelTime64, a)
- findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int
- elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int
- rollingMapM :: (IsStream t, Monad m) => (a -> a -> m b) -> t m a -> t m b
- rollingMap :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b
- catMaybes :: (IsStream t, Monad m, Functor (t m)) => t m (Maybe a) -> t m a
- mapMaybe :: (IsStream t, Monad m) => (a -> Maybe b) -> t m a -> t m b
- mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m)) => (a -> m (Maybe b)) -> t m a -> t m b
- lefts :: (IsStream t, Monad m, Functor (t m)) => t m (Either a b) -> t m a
- rights :: (IsStream t, Monad m, Functor (t m)) => t m (Either a b) -> t m b
- both :: Functor (t m) => t m (Either a a) -> t m a
- mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a
- mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a
- applyAsync :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
- (|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
- (|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b
- maxThreads :: IsStream t => Int -> t m a -> t m a
- maxBuffer :: IsStream t => Int -> t m a -> t m a
- sampleOld :: Int -> t m a -> t m a
- sampleNew :: Int -> t m a -> t m a
- sampleRate :: Double -> t m a -> t m a
- data Rate = Rate {}
- rate :: IsStream t => Maybe Rate -> t m a -> t m a
- avgRate :: IsStream t => Double -> t m a -> t m a
- minRate :: IsStream t => Double -> t m a -> t m a
- maxRate :: IsStream t => Double -> t m a -> t m a
- constRate :: IsStream t => Double -> t m a -> t m a
- inspectMode :: IsStream t => t m a -> t m a
- scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
Piping
Pass through a Pipe.
transform :: (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b Source #
Use a Pipe to transform a stream.
Pre-release
Folding
foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #
Right fold to a streaming monad.
foldrS Stream.cons Stream.nil === id
foldrS can be used to perform stateless stream to stream transformations
 like map and filter in general. It can be coupled with a scan to perform
 stateful transformations. However, note that the custom map and filter
 routines can be much more efficient than this due to better stream fusion.
>>>Stream.toList $ Stream.foldrS Stream.cons Stream.nil $ Stream.fromList [1..5][1,2,3,4,5]
Find if any element in the stream is True:
>>>Stream.toList $ Stream.foldrS (\x xs -> if odd x then return True else xs) (return False) $ (Stream.fromList (2:4:5:undefined) :: Stream.SerialT IO Int)[True]
Map (+2) on odd elements and filter out the even elements:
>>>Stream.toList $ Stream.foldrS (\x xs -> if odd x then (x + 2) `Stream.cons` xs else xs) Stream.nil $ (Stream.fromList [1..5] :: Stream.SerialT IO Int)[3,5,7]
foldrM can also be represented in terms of foldrS, however, the former
 is much more efficient:
foldrM f z s = runIdentityT $ foldrS (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s
Pre-release
foldrSShared :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #
foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> t m a -> s m b Source #
Right fold to a transformer monad.  This is the most general right fold
 function. foldrS is a special case of foldrT, however foldrS
 implementation can be more efficient:
foldrS = foldrT foldrM f z s = runIdentityT $ foldrT (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s
foldrT can be used to translate streamly streams to other transformer
 monads e.g.  to a different streaming type.
Pre-release
Mapping
Stateless one-to-one maps.
sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a Source #
sequence = mapM id
Replace the elements of a stream of monadic actions with the outputs of those actions.
>>> drain $ Stream.sequence $ Stream.fromList [putStr "a", putStr "b", putStrLn "c"]
abc
>>> :{
drain $ Stream.replicateM 3 (return $ threadDelay 1000000 >> print 1)
 & (fromSerial . Stream.sequence)
:}
1
1
1
>>> :{
drain $ Stream.replicateM 3 (return $ threadDelay 1000000 >> print 1)
 & (fromAsync . Stream.sequence)
:}
1
1
1
Concurrent (do not use with fromParallel on infinite streams)
Since: 0.1.0
mapM :: forall t m a b. (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b Source #
mapM f = sequence . map f
Apply a monadic function to each element of the stream and replace it with the output of the resulting action.
>>> drain $ Stream.mapM putStr $ Stream.fromList ["a", "b", "c"]
abc
>>> :{
   drain $ Stream.replicateM 10 (return 1)
     & (fromSerial . Stream.mapM (x -> threadDelay 1000000 >> print x))
:}
1
...
1
> drain $ Stream.replicateM 10 (return 1)
 & (fromAsync . Stream.mapM (x -> threadDelay 1000000 >> print x))
Concurrent (do not use with fromParallel on infinite streams)
Since: 0.1.0
smapM :: (IsStream t, Monad m) => (s -> a -> m (s, b)) -> m s -> t m a -> t m b Source #
A stateful mapM, equivalent to a left scan, more like mapAccumL.
 Hopefully, this is a better alternative to scan. Separation of state from
 the output makes it easier to think in terms of a shared state, and also
 makes it easier to keep the state fully strict and the output lazy.
See also: scanlM'
Pre-release
Mapping Side Effects (Observation)
See also the intersperse*_ combinators.
trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a Source #
Apply a monadic function to each element flowing through the stream and discard the results.
>>> Stream.drain $ Stream.trace print (Stream.enumerateFromTo 1 2) 1 2
Compare with tap.
Since: 0.7.0
trace_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #
Perform a side effect before yielding each element of the stream and discard the results.
>>> Stream.drain $ Stream.trace_ (print "got here") (Stream.enumerateFromTo 1 2) "got here" "got here"
Same as interspersePrefix_ but always serial.
See also: trace
Pre-release
tap :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m a Source #
Tap the data flowing through a stream into a Fold. For example, you may
 add a tap to log the contents flowing through the stream. The fold is used
 only for effects, its result is discarded.
                  Fold m a b
                      |
-----stream m a ---------------stream m a-----
>>>Stream.drain $ Stream.tap (Fold.drainBy print) (Stream.enumerateFromTo 1 2)1 2
Compare with trace.
Since: 0.7.0
tapOffsetEvery :: (IsStream t, Monad m) => Int -> Int -> Fold m a b -> t m a -> t m a Source #
tapOffsetEvery offset n taps every nth element in the stream
 starting at offset. offset can be between 0 and n - 1. Offset 0
 means start at the first element in the stream. If the offset is outside
 this range then offset  is used as offset.mod n
>>>Stream.drain $ Stream.tapOffsetEvery 0 2 (Fold.rmapM print Fold.toList) $ Stream.enumerateFromTo 0 10[0,2,4,6,8,10]
tapAsync :: (IsStream t, MonadAsync m) => Fold m a b -> t m a -> t m a Source #
Redirect a copy of the stream to a supplied fold and run it concurrently
 in an independent thread. The fold may buffer some elements. The buffer size
 is determined by the prevailing maxBuffer setting.
              Stream m a -> m b
                      |
-----stream m a ---------------stream m a-----
>>> Stream.drain $ Stream.tapAsync (Fold.drainBy print) (Stream.enumerateFromTo 1 2) 1 2
Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained.
Compare with tap.
Pre-release
tapAsyncK :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> t m a Source #
Redirect a copy of the stream to a supplied fold and run it concurrently
 in an independent thread. The fold may buffer some elements. The buffer size
 is determined by the prevailing maxBuffer setting.
              Stream m a -> m b
                      |
-----stream m a ---------------stream m a-----
> S.drain $ S.tapAsync (S.mapM_ print) (S.enumerateFromTo 1 2) 1 2
Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained.
Compare with tap.
Pre-release
distributeAsync_ :: (Foldable f, IsStream t, MonadAsync m) => f (t m a -> m b) -> t m a -> t m a Source #
Concurrently distribute a stream to a collection of fold functions, discarding the outputs of the folds.
> Stream.drain $ Stream.distributeAsync_ [Stream.mapM_ print, Stream.mapM_ print] (Stream.enumerateFromTo 1 2) 1 2 1 2
distributeAsync_ = flip (foldr tapAsync)
Pre-release
tapRate :: (IsStream t, MonadAsync m, MonadCatch m) => Double -> (Int -> m b) -> t m a -> t m a Source #
Calls the supplied function with the number of elements consumed
 every n seconds. The given function is run in a separate thread
 until the end of the stream. In case there is an exception in the
 stream the thread is killed during the next major GC.
Note: The action is not guaranteed to run if the main thread exits.
> delay n = threadDelay (round $ n * 1000000) >> return n > Stream.toList $ Stream.tapRate 2 (n -> print $ show n ++ " elements processed") (delay 1 Stream.|: delay 0.5 Stream.|: delay 0.5 Stream.|: Stream.nil) "2 elements processed" [1.0,0.5,0.5] "1 elements processed"
Note: This may not work correctly on 32-bit machines.
Pre-release
pollCounts :: (IsStream t, MonadAsync m) => (a -> Bool) -> (t m Int -> t m Int) -> Fold m Int b -> t m a -> t m a Source #
pollCounts predicate transform fold stream counts those elements in the
 stream that pass the predicate. The resulting count stream is sent to
 another thread which transforms it using transform and then folds it using
 fold.  The thread is automatically cleaned up if the stream stops or
 aborts due to exception.
For example, to print the count of elements processed every second:
> Stream.drain $ Stream.pollCounts (const True) (Stream.rollingMap (-) . Stream.delayPost 1) (FLold.drainBy print)
          $ Stream.enumerateFrom 0
Note: This may not work correctly on 32-bit machines.
Pre-release
Scanning By Fold
scan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #
Scan a stream using the given monadic fold.
>>>Stream.toList $ Stream.takeWhile (< 10) $ Stream.scan Fold.sum (Stream.fromList [1..10])[0,1,3,6]
Since: 0.7.0
postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #
Postscan a stream using the given monadic fold.
The following example extracts the input stream up to a point where the running average of elements is no more than 10:
>>>import Data.Maybe (fromJust)>>>let avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)>>>:{Stream.toList $ Stream.map (fromJust . fst) $ Stream.takeWhile (\(_,x) -> x <= 10) $ Stream.postscan (Fold.tee Fold.last avg) (Stream.enumerateFromTo 1.0 100.0) :} [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]
Since: 0.7.0
Scanning
Left scans. Stateful, mostly one-to-one maps.
scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #
Strict left scan. Like map, scanl' too is a one to one transformation,
 however it adds an extra element.
>>> Stream.toList $ Stream.scanl' (+) 0 $ fromList [1,2,3,4] [0,1,3,6,10]
>>> Stream.toList $ Stream.scanl' (flip (:)) [] $ Stream.fromList [1,2,3,4] [[],[1],[2,1],[3,2,1],[4,3,2,1]]
The output of scanl' is the initial value of the accumulator followed by
 all the intermediate steps and the final result of foldl'.
By streaming the accumulated state after each fold step, we can share the state across multiple stages of stream composition. Each stage can modify or extend the state, do some processing with it and emit it for the next stage, thus modularizing the stream processing. This can be useful in stateful or event-driven programming.
Consider the following monolithic example, computing the sum and the product
 of the elements in a stream in one go using a foldl':
>>> Stream.foldl' ((s, p) x -> (s + x, p * x)) (0,1) $ Stream.fromList 1,2,3,4
Using scanl' we can make it modular by computing the sum in the first
 stage and passing it down to the next stage for computing the product:
>>> :{
  Stream.foldl' ((_, p) (s, x) -> (s, p * x)) (0,1)
  $ Stream.scanl' ((s, _) x -> (s + x, x)) (0,1)
  $ Stream.fromList [1,2,3,4]
:}
(10,24)
IMPORTANT: scanl' evaluates the accumulator to WHNF.  To avoid building
 lazy expressions inside the accumulator, it is recommended that a strict
 data structure is used for accumulator.
>>>scanl' f z xs = scanlM' (\a b -> return (f a b)) (return z) xs>>>scanl' f z xs = z `Stream.cons` postscanl' f z xs
See also: usingStateT
Since: 0.2.0
scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #
Like scanl' but with a monadic step function and a monadic seed.
Since: 0.4.0
Since: 0.8.0 (signature change)
scanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b Source #
scanlMAfter' accumulate initial done stream is like scanlM' except
 that it provides an additional done function to be applied on the
 accumulator when the stream stops. The result of done is also emitted in
 the stream.
This function can be used to allocate a resource in the beginning of the scan and release it when the stream ends or to flush the internal state of the scan at the end.
Pre-release
postscanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #
Like scanl' but does not stream the initial value of the accumulator.
>>>postscanl' f z = postscanlM' (\a b -> return (f a b)) (return z)>>>postscanl' f z xs = Stream.drop 1 $ Stream.scanl' f z xs
Since: 0.7.0
postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #
Like postscanl' but with a monadic step function and a monadic seed.
>>>postscanlM' f z xs = Stream.drop 1 $ Stream.scanlM' f z xs
Since: 0.7.0
Since: 0.8.0 (signature change)
prescanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #
Like scanl' but does not stream the final value of the accumulator.
Pre-release
prescanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #
Like prescanl' but with a monadic step function and a monadic seed.
Pre-release
scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a Source #
Like scanl' but for a non-empty stream. The first element of the stream
 is used as the initial value of the accumulator. Does nothing if the stream
 is empty.
>>> Stream.toList $ Stream.scanl1' (+) $ fromList [1,2,3,4] [1,3,6,10]
Since: 0.6.0
scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a Source #
Like scanl1' but with a monadic step function.
Since: 0.6.0
Filtering
Produce a subset of the stream using criteria based on the values of the elements. We can use a concatMap and scan for filtering but these combinators are more efficient and convenient.
with :: forall (t :: (Type -> Type) -> Type -> Type) m a b s. Functor (t m) => (t m a -> t m (s, a)) -> (((s, a) -> b) -> t m (s, a) -> t m (s, a)) -> ((s, a) -> b) -> t m a -> t m a Source #
Modify a t m a -> t m a stream transformation that accepts a predicate
 (a -> b) to accept ((s, a) -> b) instead, provided a transformation t m
 a -> t m (s, a). Convenient to filter with index or time.
filterWithIndex = with indexed filter filterWithAbsTime = with timestamped filter filterWithRelTime = with timeIndexed filter
Pre-release
deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a Source #
Deletes the first occurrence of the element in the stream that satisfies the given equality predicate.
>>> Stream.toList $ Stream.deleteBy (==) 3 $ Stream.fromList [1,3,3,5] [1,3,5]
Since: 0.6.0
filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #
Include only those elements that pass a predicate.
Since: 0.1.0
filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #
Same as filter but with a monadic predicate.
Since: 0.4.0
uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a Source #
Drop repeated elements that are adjacent to each other.
Since: 0.6.0
nubBy :: (a -> a -> Bool) -> t m a -> t m a Source #
Drop repeated elements anywhere in the stream.
Caution: not scalable for infinite streams
See also: nubWindowBy
Unimplemented
nubWindowBy :: Int -> (a -> a -> Bool) -> t m a -> t m a Source #
Drop repeated elements within the specified tumbling window in the stream.
nubBy = nubWindowBy maxBound
Unimplemented
prune :: (a -> Bool) -> t m a -> t m a Source #
Strip all leading and trailing occurrences of an element passing a predicate and make all other consecutive occurrences uniq.
prune p = dropWhileAround p $ uniqBy (x y -> p x && p y)
> Stream.prune isSpace (Stream.fromList " hello world! ") "hello world!"
Space: O(1)
Unimplemented
Trimming
Produce a subset of the stream trimmed at ends.
take :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #
Take first n elements from the stream and discard the rest.
Since: 0.1.0
takeInterval :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a Source #
takeInterval duration yields stream elements upto specified time
 duration. The duration starts when the stream is evaluated for the first
 time, before the first element is yielded. The time duration is checked
 before generating each element, if the duration has expired the stream
 stops.
The total time taken in executing the stream is guaranteed to be at least
 duration, however, because the duration is checked before generating an
 element, the upper bound is indeterminate and depends on the time taken in
 generating and processing the last element.
No element is yielded if the duration is zero. At least one element is yielded if the duration is non-zero.
Pre-release
takeLast :: Int -> t m a -> t m a Source #
Take n elements at the end of the stream.
O(n) space, where n is the number elements taken.
Unimplemented
takeLastInterval :: Double -> t m a -> t m a Source #
Take time interval i seconds at the end of the stream.
O(n) space, where n is the number elements taken.
Unimplemented
takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #
End the stream as soon as the predicate fails on an element.
Since: 0.1.0
takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #
Same as takeWhile but with a monadic predicate.
Since: 0.4.0
takeWhileLast :: (a -> Bool) -> t m a -> t m a Source #
Take all consecutive elements at the end of the stream for which the predicate is true.
O(n) space, where n is the number elements taken.
Unimplemented
takeWhileAround :: (a -> Bool) -> t m a -> t m a Source #
Like takeWhile and takeWhileLast combined.
O(n) space, where n is the number elements taken from the end.
Unimplemented
drop :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #
Discard first n elements from the stream and take the rest.
Since: 0.1.0
dropInterval :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a Source #
dropInterval duration drops stream elements until specified duration has
 passed.  The duration begins when the stream is evaluated for the first
 time. The time duration is checked after generating a stream element, the
 element is yielded if the duration has expired otherwise it is dropped.
The time elapsed before starting to generate the first element is at most
 duration, however, because the duration expiry is checked after the
 element is generated, the lower bound is indeterminate and depends on the
 time taken in generating an element.
All elements are yielded if the duration is zero.
Pre-release
dropLast :: Int -> t m a -> t m a Source #
Drop n elements at the end of the stream.
O(n) space, where n is the number elements dropped.
Unimplemented
dropLastInterval :: Int -> t m a -> t m a Source #
Drop time interval i seconds at the end of the stream.
O(n) space, where n is the number elements dropped.
Unimplemented
dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #
Drop elements in the stream as long as the predicate succeeds and then take the rest of the stream.
Since: 0.1.0
dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #
Same as dropWhile but with a monadic predicate.
Since: 0.4.0
dropWhileLast :: (a -> Bool) -> t m a -> t m a Source #
Drop all consecutive elements at the end of the stream for which the predicate is true.
O(n) space, where n is the number elements dropped.
Unimplemented
dropWhileAround :: (a -> Bool) -> t m a -> t m a Source #
Like dropWhile and dropWhileLast combined.
O(n) space, where n is the number elements dropped from the end.
Unimplemented
Inserting Elements
Produce a superset of the stream. This is the opposite of filtering/sampling. We can always use concatMap and scan for inserting but these combinators are more efficient and convenient.
intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a Source #
Insert a pure value between successive elements of a stream.
>>>Stream.toList $ Stream.intersperse ',' $ Stream.fromList "hello""h,e,l,l,o"
Since: 0.7.0
intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #
Insert an effect and its output before consuming an element of a stream except the first one.
>>>Stream.toList $ Stream.trace putChar $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.fromList "hello"h.,e.,l.,l.,o"h,e,l,l,o"
Be careful about the order of effects. In the above example we used trace after the intersperse, if we use it before the intersperse the output would be he.l.l.o."h,e,l,l,o".
>>>Stream.toList $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.trace putChar $ Stream.fromList "hello"he.l.l.o."h,e,l,l,o"
Since: 0.5.0
intersperseBySpan :: Int -> m a -> t m a -> t m a Source #
Intersperse a monadic action into the input stream after every n
 elements.
> Stream.toList $ Stream.intersperseBySpan 2 (return ',') $ Stream.fromList "hello" "he,ll,o"
Unimplemented
intersperseSuffix :: (IsStream t, Monad m) => m a -> t m a -> t m a Source #
Insert an effect and its output after consuming an element of a stream.
>>>Stream.toList $ Stream.trace putChar $ intersperseSuffix (putChar '.' >> return ',') $ Stream.fromList "hello"h.,e.,l.,l.,o.,"h,e,l,l,o,"
Pre-release
intersperseSuffixBySpan :: (IsStream t, Monad m) => Int -> m a -> t m a -> t m a Source #
Like intersperseSuffix but intersperses an effectful action into the
 input stream after every n elements and after the last element.
>>>Stream.toList $ Stream.intersperseSuffixBySpan 2 (return ',') $ Stream.fromList "hello""he,ll,o,"
Pre-release
interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a Source #
Intersperse a monadic action into the input stream after every n
 seconds.
> import Control.Concurrent (threadDelay) > Stream.drain $ Stream.interjectSuffix 1 (putChar ',') $ Stream.mapM (x -> threadDelay 1000000 >> putChar x) $ Stream.fromList "hello" h,e,l,l,o
Pre-release
Inserting Side Effects/Time
intersperseM_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #
Insert a side effect before consuming an element of a stream except the first one.
>>>Stream.drain $ Stream.trace putChar $ Stream.intersperseM_ (putChar '.') $ Stream.fromList "hello"h.e.l.l.o
Pre-release
delay :: (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #
Introduce a delay of specified seconds before consuming an element of the stream except the first one.
>>>Stream.mapM_ print $ Stream.timestamped $ Stream.delay 1 $ Stream.enumerateFromTo 1 3(AbsTime (TimeSpec {sec = ..., nsec = ...}),1) (AbsTime (TimeSpec {sec = ..., nsec = ...}),2) (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)
Since: 0.8.0
intersperseSuffix_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #
Insert a side effect after consuming an element of a stream.
>>> Stream.mapM_ putChar $ Stream.intersperseSuffix_ (threadDelay 1000000) $ Stream.fromList "hello" hello
Pre-release
delayPost :: (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #
Introduce a delay of specified seconds after consuming an element of a stream.
>>>Stream.mapM_ print $ Stream.timestamped $ Stream.delayPost 1 $ Stream.enumerateFromTo 1 3(AbsTime (TimeSpec {sec = ..., nsec = ...}),1) (AbsTime (TimeSpec {sec = ..., nsec = ...}),2) (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)
Pre-release
interspersePrefix_ :: (IsStream t, MonadAsync m) => m b -> t m a -> t m a Source #
Insert a side effect before consuming an element of a stream.
>>>Stream.toList $ Stream.trace putChar $ Stream.interspersePrefix_ (putChar '.' >> return ',') $ Stream.fromList "hello".h.e.l.l.o"hello"
Same as trace_ but may be concurrent.
Concurrent
Pre-release
delayPre :: (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #
Introduce a delay of specified seconds before consuming an element of a stream.
>>>Stream.mapM_ print $ Stream.timestamped $ Stream.delayPre 1 $ Stream.enumerateFromTo 1 3(AbsTime (TimeSpec {sec = ..., nsec = ...}),1) (AbsTime (TimeSpec {sec = ..., nsec = ...}),2) (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)
Pre-release
Element Aware Insertion
Opposite of filtering
insertBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> a -> t m a -> t m a Source #
insertBy cmp elem stream inserts elem before the first element in
 stream that is less than elem when compared using cmp.
insertBy cmp x =mergeBycmp (fromPurex)
>>> Stream.toList $ Stream.insertBy compare 2 $ Stream.fromList [1,3,5] [1,2,3,5]
Since: 0.6.0
Reordering
reverse :: (IsStream t, Monad m) => t m a -> t m a Source #
Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.
>>>reverse = Stream.foldlT (flip Stream.cons) Stream.nil
Since 0.7.0 (Monad m constraint)
Since: 0.1.1
reassembleBy :: Fold m a b -> (a -> a -> Int) -> t m a -> t m b Source #
Buffer until the next element in sequence arrives. The function argument determines the difference in sequence numbers. This could be useful in implementing sequenced streams, for example, TCP reassembly.
Unimplemented
Position Indexing
indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a) Source #
indexed = Stream.postscanl' (\(i, _) x -> (i + 1, x)) (-1,undefined) indexed = Stream.zipWith (,) (Stream.enumerateFrom 0)
Pair each element in a stream with its index, starting from index 0.
>>>Stream.toList $ Stream.indexed $ Stream.fromList "hello"[(0,'h'),(1,'e'),(2,'l'),(3,'l'),(4,'o')]
Since: 0.6.0
indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a) Source #
indexedR n = Stream.postscanl' (\(i, _) x -> (i - 1, x)) (n + 1,undefined) indexedR n = Stream.zipWith (,) (Stream.enumerateFromThen n (n - 1))
Pair each element in a stream with its index, starting from the
 given index n and counting down.
>>>Stream.toList $ Stream.indexedR 10 $ Stream.fromList "hello"[(10,'h'),(9,'e'),(8,'l'),(7,'l'),(6,'o')]
Since: 0.6.0
Time Indexing
timestamped :: (IsStream t, MonadAsync m, Functor (t m)) => t m a -> t m (AbsTime, a) Source #
timestampWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m (AbsTime, a) Source #
Pair each element in a stream with an absolute timestamp, using a clock of specified granularity. The timestamp is generated just before the element is consumed.
>>>Stream.mapM_ print $ Stream.timestampWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3(AbsTime (TimeSpec {sec = ..., nsec = ...}),1) (AbsTime (TimeSpec {sec = ..., nsec = ...}),2) (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)
Pre-release
timeIndexed :: (IsStream t, MonadAsync m, Functor (t m)) => t m a -> t m (RelTime64, a) Source #
Pair each element in a stream with relative times starting from 0, using a 10 ms granularity clock. The time is measured just before the element is consumed.
>>>Stream.mapM_ print $ Stream.timeIndexed $ Stream.delay 1 $ Stream.enumerateFromTo 1 3(RelTime64 (NanoSecond64 ...),1) (RelTime64 (NanoSecond64 ...),2) (RelTime64 (NanoSecond64 ...),3)
Pre-release
timeIndexWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m (RelTime64, a) Source #
Pair each element in a stream with relative times starting from 0, using a clock with the specified granularity. The time is measured just before the element is consumed.
>>>Stream.mapM_ print $ Stream.timeIndexWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3(RelTime64 (NanoSecond64 ...),1) (RelTime64 (NanoSecond64 ...),2) (RelTime64 (NanoSecond64 ...),3)
Pre-release
Searching
findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int Source #
Find all the indices where the element in the stream satisfies the given predicate.
findIndices = fold Fold.findIndices
Since: 0.5.0
elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int Source #
Find all the indices where the value of the element in the stream is equal to the given value.
elemIndices a = findIndices (== a)
Since: 0.5.0
Rolling map
Map using the previous element.
rollingMapM :: (IsStream t, Monad m) => (a -> a -> m b) -> t m a -> t m b Source #
Like rollingMap but with an effectful map function.
Pre-release
rollingMap :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b Source #
Apply a function on every two successive elements of a stream. If the stream consists of a single element the output is an empty stream.
This is the stream equivalent of the list idiom zipWith f xs (tail xs).
Pre-release
Maybe Streams
mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m)) => (a -> m (Maybe b)) -> t m a -> t m b Source #
Either Streams
both :: Functor (t m) => t m (Either a a) -> t m a Source #
Remove the either wrapper and flatten both lefts and as well as rights in the output stream.
Pre-release
Concurrent Evaluation
Concurrent Pipelines
Run streaming stages concurrently.
mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a Source #
Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it terminates if the buffer is full and a worker thread is kicked off again to evaluate the remaining stream when there is space in the buffer. The consumer consumes the stream lazily from the buffer.
Since: 0.2.0 (Streamly)
Since: 0.8.0
mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a Source #
Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it blocks if the buffer is full until there is space in the buffer. The consumer consumes the stream lazily from the buffer.
mkParallel = IsStream.fromStreamD . mkParallelD . IsStream.toStreamD
Pre-release
applyAsync :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b Source #
Same as |$.
Internal
(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 Source #
Parallel transform application operator; applies a stream transformation
 function t m a -> t m b to a stream t m a concurrently; the input stream
 is evaluated asynchronously in an independent thread yielding elements to a
 buffer and the transformation function runs in another thread consuming the
 input from the buffer.  |$ is just like regular function application
 operator $ except that it is concurrent.
If you read the signature as (t m a -> t m b) -> (t m a -> t m b) you can
 look at it as a transformation that converts a transform function to a
 buffered concurrent transform function.
The following code prints a value every second even though each stage adds a 1 second delay.
>>>:{Stream.drain $ Stream.mapM (\x -> threadDelay 1000000 >> print x) |$ Stream.replicateM 3 (threadDelay 1000000 >> return 1) :} 1 1 1
Concurrent
Since: 0.3.0 (Streamly)
Since: 0.8.0
(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 Source #
Concurrency Control
maxThreads :: IsStream t => Int -> t m a -> t m a Source #
Specify the maximum number of threads that can be spawned concurrently for
 any concurrent combinator in a stream.
 A value of 0 resets the thread limit to default, a negative value means
 there is no limit. The default value is 1500. maxThreads does not affect
 ParallelT streams as they can use unbounded number of threads.
When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.
Since: 0.4.0 (Streamly)
Since: 0.8.0
Buffering and Sampling
Evaluate strictly using a buffer of results. When the buffer becomes full we can block, drop the new elements, drop the oldest element and insert the new at the end or keep dropping elements uniformly to match the rate of the consumer.
maxBuffer :: IsStream t => Int -> t m a -> t m a Source #
Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.
CAUTION! using an unbounded maxBuffer value (i.e. a negative value)
 coupled with an unbounded maxThreads value is a recipe for disaster in
 presence of infinite streams, or very large streams.  Especially, it must
 not be used when pure is used in ZipAsyncM streams as pure in
 applicative zip streams generates an infinite stream causing unbounded
 concurrent generation with no limit on the buffer or threads.
Since: 0.4.0 (Streamly)
Since: 0.8.0
sampleOld :: Int -> t m a -> t m a Source #
Evaluate the input stream continuously and keep only the oldest n
 elements in the buffer, discard the new ones when the buffer is full.  When
 the output stream is evaluated it consumes the values from the buffer in a
 FIFO manner.
Unimplemented
sampleNew :: Int -> t m a -> t m a Source #
Evaluate the input stream continuously and keep only the latest n
 elements in a ring buffer, keep discarding the older ones to make space for
 the new ones.  When the output stream is evaluated it consumes the values
 from the buffer in a FIFO manner.
Unimplemented
sampleRate :: Double -> t m a -> t m a Source #
Rate Limiting
Evaluate the stream at uniform intervals to maintain a specified evaluation rate.
Specifies the stream yield rate in yields per second (Hertz).
 We keep accumulating yield credits at rateGoal. At any point of time we
 allow only as many yields as we have accumulated as per rateGoal since the
 start of time. If the consumer or the producer is slower or faster, the
 actual rate may fall behind or exceed rateGoal.  We try to recover the gap
 between the two by increasing or decreasing the pull rate from the producer.
 However, if the gap becomes more than rateBuffer we try to recover only as
 much as rateBuffer.
rateLow puts a bound on how low the instantaneous rate can go when
 recovering the rate gap.  In other words, it determines the maximum yield
 latency.  Similarly, rateHigh puts a bound on how high the instantaneous
 rate can go when recovering the rate gap.  In other words, it determines the
 minimum yield latency. We reduce the latency by increasing concurrency,
 therefore we can say that it puts an upper bound on concurrency.
If the rateGoal is 0 or negative the stream never yields a value.
 If the rateBuffer is 0 or negative we do not attempt to recover.
Since: 0.5.0 (Streamly)
Since: 0.8.0
rate :: IsStream t => Maybe Rate -> t m a -> t m a Source #
Specify the pull rate of a stream.
 A Nothing value resets the rate to default which is unlimited.  When the
 rate is specified, concurrent production may be ramped up or down
 automatically to achieve the specified yield rate. The specific behavior for
 different styles of Rate specifications is documented under Rate.  The
 effective maximum production rate achieved by a stream is governed by:
- The maxThreadslimit
- The maxBufferlimit
- The maximum rate that the stream producer can achieve
- The maximum rate that the stream consumer can achieve
Since: 0.5.0 (Streamly)
Since: 0.8.0
avgRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate (r/2) r (2*r) maxBound)
Specifies the average production rate of a stream in number of yields
 per second (i.e.  Hertz).  Concurrent production is ramped up or down
 automatically to achieve the specified average yield rate. The rate can
 go down to half of the specified rate on the lower side and double of
 the specified rate on the higher side.
Since: 0.5.0 (Streamly)
Since: 0.8.0
minRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate r r (2*r) maxBound)
Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.
Since: 0.5.0 (Streamly)
Since: 0.8.0
maxRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate (r/2) r r maxBound)
Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.
Since: 0.5.0 (Streamly)
Since: 0.8.0
constRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate r r r 0)
Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.
Since: 0.5.0 (Streamly)
Since: 0.8.0
Diagnostics
inspectMode :: IsStream t => t m a -> t m a Source #
Print debug information about an SVar when the stream ends
Pre-release
Deprecated
scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b Source #
Deprecated: Please use scanl followed by map instead.
Strict left scan with an extraction function. Like scanl', but applies a
 user supplied extraction function (the third argument) at each step. This is
 designed to work with the foldl library. The suffix x is a mnemonic for
 extraction.
Since 0.2.0
Since: 0.7.0 (Monad m constraint)