streamly-0.8.3: Dataflow programming and declarative concurrency
Copyright(c) 2019 Composewell Technologies
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Internal.Data.Sink

Description

The Sink type is a just a special case of Fold and we can do without it. However, in some cases Sink is a simpler type and may provide better performance than Fold because it does not maintain any state. Folds can be used for both pure and monadic computations. Sinks are not applicable to pure computations.

Synopsis

Documentation

newtype Sink m a Source #

A Sink is a special type of Fold that does not accumulate any value, but runs only effects. A Sink has no state to maintain therefore can be a bit more efficient than a Fold with () as the state, especially when Sinks are composed with other operations. A Sink can be upgraded to a Fold, but a Fold cannot be converted into a Sink.

Constructors

Sink (a -> m ()) 

Upgrading

toFold :: Monad m => Sink m a -> Fold m a () Source #

Convert a Sink to a Fold. When you want to compose sinks and folds together, upgrade a sink to a fold before composing.

Composing Sinks

Distribute

tee :: Monad m => Sink m a -> Sink m a -> Sink m a Source #

Distribute one copy each of the input to both the sinks.

                |-------Sink m a
---stream m a---|
                |-------Sink m a
> let pr x = Sink.drainM (putStrLn . ((x ++ " ") ++) . show)
> sink (Sink.tee (pr "L") (pr "R")) (S.enumerateFromTo 1 2)
L 1
R 1
L 2
R 2

distribute :: Monad m => [Sink m a] -> Sink m a Source #

Distribute copies of the input to all the sinks in a container.

                |-------Sink m a
---stream m a---|
                |-------Sink m a
                |
                      ...
> let pr x = Sink.drainM (putStrLn . ((x ++ " ") ++) . show)
> sink (Sink.distribute [(pr "L"), (pr "R")]) (S.enumerateFromTo 1 2)
L 1
R 1
L 2
R 2

This is the consumer side dual of the producer side sequence_ operation.

Demultiplex

demux :: (Monad m, Ord k) => Map k (Sink m a) -> Sink m (a, k) Source #

Demultiplex to multiple consumers without collecting the results. Useful to run different effectful computations depending on the value of the stream elements, for example handling network packets of different types using different handlers.

                            |-------Sink m a
-----stream m a-----Map-----|
                            |-------Sink m a
                            |
                                      ...
> let pr x = Sink.drainM (putStrLn . ((x ++ " ") ++) . show)
> let table = Data.Map.fromList [(1, pr "One"), (2, pr "Two")]
  in Sink.sink (Sink.demux id table) (S.enumerateFromTo 1 100)
One 1
Two 2

Unzip

unzipM :: Monad m => (a -> m (b, c)) -> Sink m b -> Sink m c -> Sink m a Source #

Split elements in the input stream into two parts using a monadic unzip function, direct each part to a different sink.

                          |-------Sink m b
-----Stream m a----(b,c)--|
                          |-------Sink m c
> let pr x = Sink.drainM (putStrLn . ((x ++ " ") ++) . show)
  in Sink.sink (Sink.unzip return (pr "L") (pr "R")) (S.fromPure (1,2))
L 1
R 2

unzip :: Monad m => (a -> (b, c)) -> Sink m b -> Sink m c -> Sink m a Source #

Same as unzipM but with a pure unzip function.

Input Transformation

These are contravariant operations i.e. they apply on the input of the Sink, for this reason they are prefixed with l for left.

lmap :: (a -> b) -> Sink m b -> Sink m a Source #

Map a pure function on the input of a Sink.

lmapM :: Monad m => (a -> m b) -> Sink m b -> Sink m a Source #

Map a monadic function on the input of a Sink.

lfilter :: Monad m => (a -> Bool) -> Sink m a -> Sink m a Source #

Filter the input of a Sink using a pure predicate function.

lfilterM :: Monad m => (a -> m Bool) -> Sink m a -> Sink m a Source #

Filter the input of a Sink using a monadic predicate function.

Sinks

drain :: Monad m => Sink m a Source #

Drain all input, running the effects and discarding the results.

drainM :: Monad m => (a -> m b) -> Sink m a Source #

drainM f = lmapM f drain

Drain all input after passing it through a monadic function.