Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | pre-release |
Portability | GHC |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
Deprecated: Please use "Streamly.Internal.Data.Stream from streamly-core package", Streamly.Internal.Data.Stream.Concurrent, Streamly.Internal.Data.Stream.Exception.Lifted, & Streamly.Internal.Data.Stream.Time from streamly package instead.
This is an internal module which is a superset of the corresponding released module Streamly.Prelude. It contains some additional unreleased or experimental APIs.
Synopsis
- newtype StreamK (m :: Type -> TYPE LiftedRep) a = MkStream (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
- type WSerial = WSerialT IO
- data WSerialT m a
- type Serial = SerialT IO
- data SerialT m a
- type Parallel = ParallelT IO
- data ParallelT m a
- type WAsync = WAsyncT IO
- data WAsyncT m a
- type Async = AsyncT IO
- data AsyncT m a
- type Ahead = AheadT IO
- data AheadT m a
- type ZipSerial = ZipSerialM IO
- data ZipSerialM m a
- type ZipAsync = ZipAsyncM IO
- data ZipAsyncM m a
- class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t where
- consM :: MonadAsync m => m a -> t m a -> t m a
- (|:) :: MonadAsync m => m a -> t m a -> t m a
- toStreamK :: IsStream t => t m a -> StreamK m a
- fromStreamK :: IsStream t => StreamK m a -> t m a
- adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
- fromStreamD :: (IsStream t, Monad m) => Stream m a -> t m a
- toConsK :: IsStream t => (m a -> t m a -> t m a) -> m a -> StreamK m a -> StreamK m a
- toStreamD :: (IsStream t, Monad m) => t m a -> Stream m a
- fromList :: (Monad m, IsStream t) => [a] -> t m a
- mkStream :: IsStream t => (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a
- foldrMx :: (IsStream t, Monad m) => (a -> m x -> m x) -> m x -> (m x -> m b) -> t m a -> m b
- foldlMx' :: (IsStream t, Monad m) => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b
- foldlx' :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b
- foldStreamShared :: IsStream t => State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
- foldStream :: IsStream t => State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
- fromSerial :: IsStream t => SerialT m a -> t m a
- fromWSerial :: IsStream t => WSerialT m a -> t m a
- fromAsync :: IsStream t => AsyncT m a -> t m a
- fromWAsync :: IsStream t => WAsyncT m a -> t m a
- fromAhead :: IsStream t => AheadT m a -> t m a
- fromParallel :: IsStream t => ParallelT m a -> t m a
- fromZipSerial :: IsStream t => ZipSerialM m a -> t m a
- fromZipAsync :: IsStream t => ZipAsyncM m a -> t m a
- cons :: IsStream t => a -> t m a -> t m a
- (.:) :: IsStream t => a -> t m a -> t m a
- nil :: IsStream t => t m a
- nilM :: (IsStream t, Monad m) => m b -> t m a
- bindWith :: IsStream t => (t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b
- concatMapWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
- concatMapFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
- concatForFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b
- concatFoldableWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a
- module Streamly.Internal.Data.Stream.IsStream.Generate
- fold :: Monad m => Fold m a b -> SerialT m a -> m b
- uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a))
- foldrM :: Monad m => (a -> m b -> m b) -> m b -> SerialT m a -> m b
- foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b
- foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a)
- foldlS :: IsStream t => (t m b -> a -> t m b) -> t m b -> t m a -> t m b
- foldlT :: (Monad m, IsStream t, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> t m a -> s m b
- foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b
- foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b
- foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a)
- foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b
- foldlM' :: Monad m => (b -> a -> m b) -> m b -> SerialT m a -> m b
- parseD :: Monad m => Parser a m b -> SerialT m a -> m (Either ParseError b)
- parse :: Monad m => Parser a m b -> SerialT m a -> m (Either ParseError b)
- mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m ()
- drain :: Monad m => SerialT m a -> m ()
- drainN :: Monad m => Int -> SerialT m a -> m ()
- runN :: Monad m => Int -> SerialT m a -> m ()
- drainWhile :: Monad m => (a -> Bool) -> SerialT m a -> m ()
- runWhile :: Monad m => (a -> Bool) -> SerialT m a -> m ()
- runStream :: Monad m => SerialT m a -> m ()
- null :: Monad m => SerialT m a -> m Bool
- head :: Monad m => SerialT m a -> m (Maybe a)
- headElse :: Monad m => a -> SerialT m a -> m a
- tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a))
- init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a))
- last :: Monad m => SerialT m a -> m (Maybe a)
- elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool
- notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool
- length :: Monad m => SerialT m a -> m Int
- all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool
- any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool
- and :: Monad m => SerialT m Bool -> m Bool
- or :: Monad m => SerialT m Bool -> m Bool
- sum :: (Monad m, Num a) => SerialT m a -> m a
- product :: (Monad m, Num a) => SerialT m a -> m a
- mconcat :: (Monad m, Monoid a) => SerialT m a -> m a
- minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a)
- minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a)
- maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a)
- maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a)
- the :: (Eq a, Monad m) => SerialT m a -> m (Maybe a)
- (!!) :: Monad m => SerialT m a -> Int -> m (Maybe a)
- lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b)
- find :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe a)
- findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a)
- findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int)
- elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int)
- toList :: Monad m => SerialT m a -> m [a]
- toListRev :: Monad m => SerialT m a -> m [a]
- toHandle :: MonadIO m => Handle -> SerialT m String -> m ()
- toStreamRev :: Monad m => SerialT m a -> m (SerialT n a)
- (|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
- foldAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
- (|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b
- isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool
- isInfixOf :: (MonadIO m, Eq a, Enum a, Storable a, Unbox a) => SerialT m a -> SerialT m a -> m Bool
- isSuffixOf :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m Bool
- isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool
- stripPrefix :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m (Maybe (t m a))
- stripSuffix :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m (Maybe (SerialT m a))
- eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool
- cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering
- module Streamly.Internal.Data.Stream.IsStream.Transform
- module Streamly.Internal.Data.Stream.IsStream.Expand
- module Streamly.Internal.Data.Stream.IsStream.Reduce
- module Streamly.Internal.Data.Stream.IsStream.Exception
- module Streamly.Internal.Data.Stream.IsStream.Lift
- module Streamly.Internal.Data.Stream.IsStream.Top
- fromStream :: (IsStream t, Monad m) => Stream m a -> t m a
- toStream :: (IsStream t, Monad m) => t m a -> Stream m a
Documentation
newtype StreamK (m :: Type -> TYPE LiftedRep) a #
Instances
For WSerialT
streams:
(<>) =wSerial
--Semigroup
(>>=) = flip .concatMapWith
wSerial
--Monad
Note that <>
is associative only if we disregard the ordering of elements
in the resulting stream.
A single Monad
bind behaves like a for
loop:
>>>
:{
IsStream.toList $ IsStream.fromWSerial $ do x <- IsStream.fromList [1,2] -- foreach x in stream return x :} [1,2]
Nested monad binds behave like interleaved nested for
loops:
>>>
:{
IsStream.toList $ IsStream.fromWSerial $ do x <- IsStream.fromList [1,2] -- foreach x in stream y <- IsStream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(2,3),(1,4),(2,4)]
It is a result of interleaving all the nested iterations corresponding to
element 1
in the first stream with all the nested iterations of element
2
:
>>>
import Streamly.Prelude (wSerial)
>>>
IsStream.toList $ IsStream.fromList [(1,3),(1,4)] `IsStream.wSerial` IsStream.fromList [(2,3),(2,4)]
[(1,3),(2,3),(1,4),(2,4)]
The W
in the name stands for wide
or breadth wise scheduling in
contrast to the depth wise scheduling behavior of SerialT
.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
For SerialT
streams:
(<>) =serial
--Semigroup
(>>=) = flip .concatMapWith
serial
--Monad
A single Monad
bind behaves like a for
loop:
>>>
:{
IsStream.toList $ do x <- IsStream.fromList [1,2] -- foreach x in stream return x :} [1,2]
Nested monad binds behave like nested for
loops:
>>>
:{
IsStream.toList $ do x <- IsStream.fromList [1,2] -- foreach x in stream y <- IsStream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(1,4),(2,3),(2,4)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
For ParallelT
streams:
(<>) =parallel
(>>=) = flip .concatMapWith
parallel
See AsyncT
, ParallelT
is similar except that all
iterations are strictly concurrent while in AsyncT
it depends on the
consumer demand and available threads. See parallel
for more details.
Since: 0.1.0 (Streamly)
Since: 0.7.0 (maxBuffer applies to ParallelT streams)
Since: 0.8.0
Instances
For WAsyncT
streams:
(<>) =wAsync
(>>=) = flip .concatMapWith
wAsync
A single Monad
bind behaves like a for
loop with iterations of the loop
executed concurrently a la the wAsync
combinator, producing results and
side effects of iterations out of order:
>>>
:{
Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, a la the wAsync
combinator:
>>>
:{
Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one WAsyncT
output
stream and all the iterations corresponding to 2
constitute another
WAsyncT
output stream and these two output streams are merged using
wAsync
.
The W
in the name stands for wide
or breadth wise scheduling in
contrast to the depth wise scheduling behavior of AsyncT
.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
For AsyncT
streams:
(<>) =async
(>>=) = flip .concatMapWith
async
A single Monad
bind behaves like a for
loop with iterations of the loop
executed concurrently a la the async
combinator, producing results and
side effects of iterations out of order:
>>>
:{
Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, a la the async
combinator:
>>>
:{
Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one output stream and all
the iterations corresponding to 2
constitute another output stream and
these two output streams are merged using async
.
Since: 0.1.0 (Streamly)
Since: 0.8.0
Instances
For AheadT
streams:
(<>) =ahead
(>>=) = flip .concatMapWith
ahead
A single Monad
bind behaves like a for
loop with iterations executed
concurrently, ahead of time, producing side effects of iterations out of
order, but results in order:
>>>
:{
Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [2,1]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, ahead of time:
>>>
:{
Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,5,4,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one output stream and all
the iterations corresponding to 2
constitute another output stream and
these two output streams are merged using ahead
.
Since: 0.3.0 (Streamly)
Since: 0.8.0
Instances
type ZipSerial = ZipSerialM IO Source #
data ZipSerialM m a Source #
For ZipSerialM
streams:
(<>) = serial
(*) = 'Streamly.Prelude.serial.zipWith' id
Applicative evaluates the streams being zipped serially:
>>>
s1 = Stream.fromFoldable [1, 2]
>>>
s2 = Stream.fromFoldable [3, 4]
>>>
s3 = Stream.fromFoldable [5, 6]
>>>
Stream.toList $ Stream.fromZipSerial $ (,,) <$> s1 <*> s2 <*> s3
[(1,3,5),(2,4,6)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
For ZipAsyncM
streams:
(<>) = serial
(*) = 'Streamly.Prelude.serial.zipAsyncWith' id
Applicative evaluates the streams being zipped concurrently, the following would take half the time that it would take in serial zipping:
>>>
s = Stream.fromFoldableM $ Prelude.map delay [1, 1, 1]
>>>
Stream.toList $ Stream.fromZipAsync $ (,) <$> s <*> s
... [(1,1),(1,1),(1,1)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
IsStream ZipAsyncM Source # | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type toStream :: forall (m :: Type -> TYPE LiftedRep) a. ZipAsyncM m a -> StreamK m a Source # fromStream :: forall (m :: Type -> TYPE LiftedRep) a. StreamK m a -> ZipAsyncM m a Source # consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source # (|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source # | |
MonadAsync m => Applicative (ZipAsyncM m) Source # | |
Defined in Streamly.Internal.Data.Stream.ZipAsync | |
Monad m => Functor (ZipAsyncM m) Source # | |
Monoid (ZipAsyncM m a) Source # | |
Semigroup (ZipAsyncM m a) Source # | |
class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t where Source #
Class of types that can represent a stream of elements of some type a
in
some monad m
.
Since: 0.2.0 (Streamly)
Since: 0.8.0
consM :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #
Constructs a stream by adding a monadic action at the head of an existing stream. For example:
> toList $ getLine `consM` getLine `consM` nil hello world ["hello","world"]
Concurrent (do not use fromParallel
to construct infinite streams)
Since: 0.2.0
(|:) :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #
Operator equivalent of consM
. We can read it as "parallel colon
"
to remember that |
comes before :
.
> toList $ getLine |: getLine |: nil hello world ["hello","world"]
let delay = threadDelay 1000000 >> print 1 drain $ fromSerial $ delay |: delay |: delay |: nil drain $ fromParallel $ delay |: delay |: delay |: nil
Concurrent (do not use fromParallel
to construct infinite streams)
Since: 0.2.0
Instances
fromStreamK :: IsStream t => StreamK m a -> t m a Source #
adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #
Adapt any specific stream type to any other specific stream type.
Since: 0.1.0 (Streamly)
Since: 0.8.0
toConsK :: IsStream t => (m a -> t m a -> t m a) -> m a -> StreamK m a -> StreamK m a Source #
Adapt a polymorphic consM operation to a StreamK cons operation
fromList :: (Monad m, IsStream t) => [a] -> t m a Source #
fromList =foldr
cons
nil
Construct a stream from a list of pure values. This is more efficient than
fromFoldable
for serial streams.
Since: 0.4.0
mkStream :: IsStream t => (forall r. State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a Source #
Build a stream from an SVar
, a stop continuation, a singleton stream
continuation and a yield continuation.
foldrMx :: (IsStream t, Monad m) => (a -> m x -> m x) -> m x -> (m x -> m b) -> t m a -> m b Source #
foldlMx' :: (IsStream t, Monad m) => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b Source #
Like foldlx'
, but with a monadic step function.
Since: 0.7.0
foldlx' :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b Source #
Strict left fold with an extraction function. Like the standard strict
left fold, but applies a user supplied extraction function (the third
argument) to the folded value at the end. This is designed to work with the
foldl
library. The suffix x
is a mnemonic for extraction.
Since: 0.7.0
foldStreamShared :: IsStream t => State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #
Fold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation. The stream would share the current SVar passed via the State.
foldStream :: IsStream t => State StreamK m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #
Fold a stream by providing a State, stop continuation, a singleton continuation and a yield continuation. The stream will not use the SVar passed via State.
fromSerial :: IsStream t => SerialT m a -> t m a Source #
fromWSerial :: IsStream t => WSerialT m a -> t m a Source #
fromWAsync :: IsStream t => WAsyncT m a -> t m a Source #
fromParallel :: IsStream t => ParallelT m a -> t m a Source #
fromZipSerial :: IsStream t => ZipSerialM m a -> t m a Source #
fromZipAsync :: IsStream t => ZipAsyncM m a -> t m a Source #
cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #
Construct a stream by adding a pure value at the head of an existing
stream. For serial streams this is the same as (return a) `consM` r
but
more efficient. For concurrent streams this is not concurrent whereas
consM
is concurrent. For example:
> toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3]
Since: 0.1.0
concatMapWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #
concatMapWith mixer generator stream
is a two dimensional looping
combinator. The generator
function is used to generate streams from the
elements in the input stream
and the mixer
function is used to merge
those streams.
Note we can merge streams concurrently by using a concurrent merge function.
Since: 0.7.0
Since: 0.8.0 (signature change)
concatMapFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #
A variant of foldMap
that allows you to map a monadic streaming action
on a Foldable
container and then fold it using the specified stream merge
operation.
concatMapFoldableWith async
return [1..3]
Equivalent to:
concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs)
Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith)
Since: 0.1.0 (Streamly)
concatForFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #
Like concatMapFoldableWith
but with the last two arguments reversed i.e. the
monadic streaming function is the last argument.
Equivalent to:
concatForFoldableWith f xs g = Prelude.foldr (f . g) D.nil xs concatForFoldableWith f = flip (D.concatMapFoldableWith f)
Since: 0.8.0 (Renamed forEachWith to concatForFoldableWith)
Since: 0.1.0 (Streamly)
concatFoldableWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #
A variant of fold
that allows you to fold a Foldable
container of streams using the specified stream sum operation.
concatFoldableWith async
$ map return [1..3]
Equivalent to:
concatFoldableWith f = Prelude.foldr f D.nil concatFoldableWith f = D.concatMapFoldableWith f id
Since: 0.8.0 (Renamed foldWith to concatFoldableWith)
Since: 0.1.0 (Streamly)
fold :: Monad m => Fold m a b -> SerialT m a -> m b Source #
Fold a stream using the supplied left Fold
and reducing the resulting
expression strictly at each step. The behavior is similar to foldl'
. A
Fold
can terminate early without consuming the full stream. See the
documentation of individual Fold
s for termination behavior.
>>>
Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)
5050
Folds never fail, therefore, they produce a default value even when no input is provided. It means we can always fold an empty stream and get a valid result. For example:
>>>
Stream.fold Fold.sum Stream.nil
0
However, foldMany
on an empty stream results in an empty stream.
Therefore, Stream.fold f
is not the same as Stream.head . Stream.foldMany
f
.
fold f = Stream.parse (Parser.fromFold f)
Since: 0.7.0
uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a)) Source #
Decompose a stream into its head and tail. If the stream is empty, returns
Nothing
. If the stream is non-empty, returns Just (a, ma)
, where a
is
the head of the stream and ma
its tail.
This can be used to do pretty much anything in an imperative manner, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.
All the folds in this module can be expressed in terms of uncons
, however,
this is generally less efficient than specific folds because it takes apart
the stream one element at a time, therefore, does not take adavantage of
stream fusion.
Since: 0.1.0
foldrM :: Monad m => (a -> m b -> m b) -> m b -> SerialT m a -> m b Source #
Right associative/lazy pull fold. foldrM build final stream
constructs
an output structure using the step function build
. build
is invoked with
the next input element and the remaining (lazy) tail of the output
structure. It builds a lazy output expression using the two. When the "tail
structure" in the output expression is evaluated it calls build
again thus
lazily consuming the input stream
until either the output expression built
by build
is free of the "tail" or the input is exhausted in which case
final
is used as the terminating case for the output structure. For more
details see the description in the previous section.
Example, determine if any element is odd
in a stream:
>>>
Stream.foldrM (\x xs -> if odd x then return True else xs) (return False) $ Stream.fromList (2:4:5:undefined)
True
Since: 0.7.0 (signature changed)
Since: 0.2.0 (signature changed)
Since: 0.1.0
foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b Source #
Right fold, lazy for lazy monads and pure streams, and strict for strict monads.
Please avoid using this routine in strict monads like IO unless you need a
strict right fold. This is provided only for use in lazy monads (e.g.
Identity) or pure streams. Note that with this signature it is not possible
to implement a lazy foldr when the monad m
is strict. In that case it
would be strict in its accumulator and therefore would necessarily consume
all its input.
Since: 0.1.0
foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #
Lazy right fold for non-empty streams, using first element as the starting
value. Returns Nothing
if the stream is empty.
Since: 0.5.0
foldlS :: IsStream t => (t m b -> a -> t m b) -> t m b -> t m a -> t m b Source #
Lazy left fold to a stream.
foldlT :: (Monad m, IsStream t, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> t m a -> s m b Source #
Lazy left fold to a transformer monad.
For example, to reverse a stream:
D.toList $ D.foldlT (flip D.cons) D.nil $ (D.fromList [1..5] :: SerialT IO Int)
foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b Source #
Strict left fold with an extraction function. Like the standard strict
left fold, but applies a user supplied extraction function (the third
argument) to the folded value at the end. This is designed to work with the
foldl
library. The suffix x
is a mnemonic for extraction.
Since: 0.2.0
foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b Source #
Left associative/strict push fold. foldl' reduce initial stream
invokes
reduce
with the accumulator and the next input in the input stream, using
initial
as the initial value of the current value of the accumulator. When
the input is exhausted the current value of the accumulator is returned.
Make sure to use a strict data structure for accumulator to not build
unnecessary lazy expressions unless that's what you want. See the previous
section for more details.
Since: 0.2.0
foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #
Strict left fold, for non-empty streams, using first element as the
starting value. Returns Nothing
if the stream is empty.
Since: 0.5.0
foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b Source #
Like foldx
, but with a monadic step function.
Since: 0.2.0
parseD :: Monad m => Parser a m b -> SerialT m a -> m (Either ParseError b) Source #
Parse a stream using the supplied ParserD Parser
.
Internal
parse :: Monad m => Parser a m b -> SerialT m a -> m (Either ParseError b) Source #
Parse a stream using the supplied Parser
.
Unlike folds, parsers may not always result in a valid output, they may result in an error. For example:
>>>
Stream.parse (Parser.takeEQ 1 Fold.drain) Stream.nil
Left (ParseError "takeEQ: Expecting exactly 1 elements, input terminated on 0")
Note:
fold f = Stream.parse (Parser.fromFold f)
parse p
is not the same as head . parseMany p
on an empty stream.
Pre-release
mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m () Source #
mapM_ = Stream.drain . Stream.mapM
Apply a monadic action to each element of the stream and discard the output of the action. This is not really a pure transformation operation but a transformation followed by fold.
Since: 0.1.0
drain :: Monad m => SerialT m a -> m () Source #
drain = mapM_ (\_ -> return ()) drain = Stream.fold Fold.drain
Run a stream, discarding the results. By default it interprets the stream
as SerialT
, to run other types of streams use the type adapting
combinators for example Stream.drain .
.fromAsync
Since: 0.7.0
drainN :: Monad m => Int -> SerialT m a -> m () Source #
drainN n = Stream.drain . Stream.take n drainN n = Stream.fold (Fold.take n Fold.drain)
Run maximum up to n
iterations of a stream.
Since: 0.7.0
runN :: Monad m => Int -> SerialT m a -> m () Source #
runN n = runStream . take n
Run maximum up to n
iterations of a stream.
Since: 0.6.0
drainWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #
drainWhile p = Stream.drain . Stream.takeWhile p
Run a stream as long as the predicate holds true.
Since: 0.7.0
runWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #
runWhile p = runStream . takeWhile p
Run a stream as long as the predicate holds true.
Since: 0.6.0
runStream :: Monad m => SerialT m a -> m () Source #
Run a stream, discarding the results. By default it interprets the stream
as SerialT
, to run other types of streams use the type adapting
combinators for example runStream .
.fromAsync
Since: 0.2.0
null :: Monad m => SerialT m a -> m Bool Source #
Determine whether the stream is empty.
null = Stream.fold Fold.null
Since: 0.1.1
head :: Monad m => SerialT m a -> m (Maybe a) Source #
Extract the first element of the stream, if any.
head = (!! 0) head = Stream.fold Fold.one
Since: 0.1.0
headElse :: Monad m => a -> SerialT m a -> m a Source #
Extract the first element of the stream, if any, otherwise use the supplied default value. It can help avoid one branch in high performance code.
Pre-release
tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #
tail = fmap (fmap snd) . Stream.uncons
Extract all but the first element of the stream, if any.
Since: 0.1.1
init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #
Extract all but the last element of the stream, if any.
Since: 0.5.0
last :: Monad m => SerialT m a -> m (Maybe a) Source #
Extract the last element of the stream, if any.
last xs = xs !! (Stream.length xs - 1) last = Stream.fold Fold.last
Since: 0.1.1
elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #
Determine whether an element is present in the stream.
elem = Stream.fold Fold.elem
Since: 0.1.0
notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #
Determine whether an element is not present in the stream.
notElem = Stream.fold Fold.length
Since: 0.1.0
all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #
Determine whether all elements of a stream satisfy a predicate.
all = Stream.fold Fold.all
Since: 0.1.0
any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #
Determine whether any of the elements of a stream satisfy a predicate.
any = Stream.fold Fold.any
Since: 0.1.0
and :: Monad m => SerialT m Bool -> m Bool Source #
Determines if all elements of a boolean stream are True.
and = Stream.fold Fold.and
Since: 0.5.0
or :: Monad m => SerialT m Bool -> m Bool Source #
Determines whether at least one element of a boolean stream is True.
or = Stream.fold Fold.or
Since: 0.5.0
sum :: (Monad m, Num a) => SerialT m a -> m a Source #
Determine the sum of all elements of a stream of numbers. Returns 0
when
the stream is empty. Note that this is not numerically stable for floating
point numbers.
sum = Stream.fold Fold.sum
Since: 0.1.0
product :: (Monad m, Num a) => SerialT m a -> m a Source #
Determine the product of all elements of a stream of numbers. Returns 1
when the stream is empty.
product = Stream.fold Fold.product
Since: 0.1.1
mconcat :: (Monad m, Monoid a) => SerialT m a -> m a Source #
Fold a stream of monoid elements by appending them.
mconcat = Stream.fold Fold.mconcat
Pre-release
minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #
minimum = minimumBy
compare
minimum = Stream.fold Fold.minimum
Determine the minimum element in a stream.
Since: 0.1.0
minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #
Determine the minimum element in a stream using the supplied comparison function.
minimumBy = Stream.fold Fold.minimumBy
Since: 0.6.0
maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #
maximum = maximumBy
compare
maximum = Stream.fold Fold.maximum
Determine the maximum element in a stream.
Since: 0.1.0
maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #
Determine the maximum element in a stream using the supplied comparison function.
maximumBy = Stream.fold Fold.maximumBy
Since: 0.6.0
the :: (Eq a, Monad m) => SerialT m a -> m (Maybe a) Source #
Ensures that all the elements of the stream are identical and then returns that unique element.
Since: 0.6.0
(!!) :: Monad m => SerialT m a -> Int -> m (Maybe a) Source #
Lookup the element at the given index.
Since: 0.6.0
lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b) Source #
In a stream of (key-value) pairs (a, b)
, return the value b
of the
first pair where the key equals the given value a
.
lookup = snd <$> Stream.find ((==) . fst) lookup = Stream.fold Fold.lookup
Since: 0.5.0
find :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe a) Source #
Like findM
but with a non-monadic predicate.
find p = findM (return . p) find = Stream.fold Fold.find
Since: 0.5.0
findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a) Source #
Returns the first element that satisfies the given predicate.
findM = Stream.fold Fold.findM
Since: 0.6.0
findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int) Source #
Returns the first index that satisfies the given predicate.
findIndex = Stream.fold Fold.findIndex
Since: 0.5.0
elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int) Source #
Returns the first index where a given value is found in the stream.
elemIndex a = Stream.findIndex (== a)
Since: 0.5.0
toList :: Monad m => SerialT m a -> m [a] Source #
toList = Stream.foldr (:) []
Convert a stream into a list in the underlying monad. The list can be
consumed lazily in a lazy monad (e.g. Identity
). In a strict monad (e.g.
IO) the whole list is generated and buffered before it can be consumed.
Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.
Since: 0.1.0
toListRev :: Monad m => SerialT m a -> m [a] Source #
toListRev = Stream.foldl' (flip (:)) []
Convert a stream into a list in reverse order in the underlying monad.
Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.
Pre-release
toHandle :: MonadIO m => Handle -> SerialT m String -> m () Source #
toHandle h = D.mapM_ $ hPutStrLn h
Write a stream of Strings to an IO Handle.
Since: 0.1.0
toStreamRev :: Monad m => SerialT m a -> m (SerialT n a) Source #
Convert a stream to a pure stream in reverse order.
toStreamRev = Stream.foldl' (flip Stream.cons) Stream.nil
Pre-release
(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 Source #
Parallel fold application operator; applies a fold function t m a -> m b
to a stream t m a
concurrently; The the input stream is evaluated
asynchronously in an independent thread yielding elements to a buffer and
the folding action runs in another thread consuming the input from the
buffer.
If you read the signature as (t m a -> m b) -> (t m a -> m b)
you can look
at it as a transformation that converts a fold function to a buffered
concurrent fold function.
The .
at the end of the operator is a mnemonic for termination of the
stream.
In the example below, each stage introduces a delay of 1 sec but output is printed every second because both stages are concurrent.
>>>
import Control.Concurrent (threadDelay)
>>>
import Streamly.Prelude ((|$.))
>>>
:{
Stream.foldlM' (\_ a -> threadDelay 1000000 >> print a) (return ()) |$. Stream.replicateM 3 (threadDelay 1000000 >> return 1) :} 1 1 1
Concurrent
Since: 0.3.0 (Streamly)
Since: 0.8.0
foldAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b Source #
Same as |$.
.
Internal
(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 Source #
isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #
Returns True
if the first stream is the same as or a prefix of the
second. A stream is a prefix of itself.
>>>
Stream.isPrefixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)
True
Since: 0.6.0
isInfixOf :: (MonadIO m, Eq a, Enum a, Storable a, Unbox a) => SerialT m a -> SerialT m a -> m Bool Source #
isSuffixOf :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m Bool Source #
Returns True
if the first stream is a suffix of the second. A stream is
considered a suffix of itself.
>>>
Stream.isSuffixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)
True
Space: O(n)
, buffers entire input stream and the suffix.
Pre-release
Suboptimal - Help wanted.
isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #
Returns True
if all the elements of the first stream occur, in order, in
the second stream. The elements do not have to occur consecutively. A stream
is a subsequence of itself.
>>>
Stream.isSubsequenceOf (Stream.fromList "hlo") (Stream.fromList "hello" :: SerialT IO Char)
True
Since: 0.6.0
stripPrefix :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m (Maybe (t m a)) Source #
stripPrefix prefix stream
strips prefix
from stream
if it is a
prefix of stream. Returns Nothing
if the stream does not start with the
given prefix, stripped stream otherwise. Returns Just nil
when the prefix
is the same as the stream.
See also "Streamly.Internal.Data.Stream.IsStream.Nesting.dropPrefix".
Space: O(1)
Since: 0.6.0
stripSuffix :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m (Maybe (SerialT m a)) Source #
Drops the given suffix from a stream. Returns Nothing
if the stream does
not end with the given suffix. Returns Just nil
when the suffix is the
same as the stream.
It may be more efficient to convert the stream to an Array and use stripSuffix on that especially if the elements have a Storable or Prim instance.
See also "Streamly.Internal.Data.Stream.IsStream.Nesting.dropSuffix".
Space: O(n)
, buffers the entire input stream as well as the suffix
Pre-release
eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool Source #
Compare two streams for equality using an equality function.
Since: 0.6.0