streamly-0.1.2: Beautiful Streaming, Concurrent and Reactive Composition

Streamly.Tutorial

Description

Streamly, short for stream concurrently, combines the essence of non-determinism, streaming and concurrency in functional programming. Concurrent and non-concurrent applications are almost indistinguisable, concurrency capability does not at all impact the performance of non-concurrent case. Streaming enables writing modular, composable and scalable applications with ease and concurrency allows you to make them scale and perform well. Streamly enables writing concurrent applications without being aware of threads or synchronization. No explicit thread control is needed, where applicable the concurrency rate is automatically controlled based on the demand by the consumer. However, combinators are provided to fine tune the concurrency control. Streaming and concurrency together enable expressing reactive applications conveniently. See Streamly.Examples for a simple SDL based FRP example.

Streamly streams are very much like the Haskell lists and most of the functions that work on lists have a counterpart that works on streams. However, streamly streams can be generated, consumed or combined concurrently. In this tutorial we will go over the basic concepts and how to use the library. The documentation of Streamly module has more details on core APIs. For more APIs for constructing, folding, filtering, mapping and zipping etc. see the documentation of Streamly.Prelude module. For examples and other ways to use the library see the module Streamly.Examples as well.

Synopsis

# Streams

Streamly provides many different stream types depending on the desired composition style. The simplest type is StreamT. StreamT is a monad transformer, the type StreamT m a represents a stream of values of type a in some underlying monad m. For example, StreamT IO Int is a stream of Int in IO monad.

## Generating Streams

Pure values can be placed into the stream type using return or pure. Effects in the IO monad can be lifted to the stream type using the liftIO combinator. In a transformer stack we can lift actions from the lower monad using the lift combinator. Some examples of streams with a single element:

 return 1 :: StreamT IO Int

 liftIO $putStrLn "Hello world!" :: StreamT IO ()  We can combine streams using <> to create streams of many elements:  return 1 <> return 2 <> return 3 :: StreamT IO Int  For more ways to construct or generate a stream see the module Streamly.Prelude. ## Eliminating Streams runStreamT runs a composed StreamT computation, lowering the type into the underlying monad and discarding the result stream: import Streamly main = runStreamT$ liftIO $putStrLn "Hello world!"  toList runs a stream computation and collects the result stream in a list in the underlying monad. toList is a polymorphic function that works on multiple stream types belonging to the class Streaming. Therefore, before you run a stream you need to tell how you want to interpret the stream by using one of the stream type combinators (serially, asyncly, parallely etc.). The combinator serially is equivalent to annotating the type as :: StreamT. import Streamly import Streamly.Prelude main = do xs <- toList$ serially $return 1 <> return 2 print xs  For other ways to eliminate or fold a stream see the module Streamly.Prelude. # Combining Streams ## Semigroup Style Streams of the same type can be combined into a composite stream in many different ways using one of the semigroup style binary composition operators i.e. <>, <=>, <|, <|>, mplus. These operators work on all stream types (StreamT, AsyncT etc.) uniformly. To illustrate the concurrent aspects, we will use the following delay function to introduce a delay specified in seconds. import Streamly import Control.Concurrent delay n = liftIO$ do
putStrLn (show tid ++ ": Delay " ++ show n)


### Serial composition (<>)

We have already seen, the <> operator. It composes two streams in series i.e. the first stream is completely exhausted and then the second stream is processed. The following example prints the sequence 3, 2, 1 and takes a total of 6 seconds because everything is serial:

main = runStreamT $delay 3 <> delay 2 <> delay 1  ThreadId 36: Delay 3 ThreadId 36: Delay 2 ThreadId 36: Delay 1  ### Async composition (<|) The <| operator can run both computations concurrently, when needed. In the following example since the first computation blocks we start the next one in a separate thread and so on: main = runStreamT$ delay 3 <| delay 2 <| delay 1

ThreadId 42: Delay 1


This is the concurrent version of the <> operator. The computations are triggered in the same order as <> except that they are concurrent. When we have a tree of computations composed using this operator, the tree is traversed in DFS style just like <>.

main = runStreamT $(p 1 <| p 2) <| (p 3 <| p 4) where p = liftIO . print  1 2 3 4  Concurrency provided by this operator is demand driven. The second computation is run concurrently with the first only if the first computation is not producing enough output to keep the stream consumer busy otherwise the second computation is run serially after the previous one. The number of concurrent threads is adapted dynamically based on the pull rate of the consumer of the stream. As you can see, in the following example the computations are run in a single thread one after another, because none of them blocks. However, if the thread consuming the stream were faster than the producer then it would have started parallel threads for each computation to keep up even if none of them blocks: main = runStreamT$ traced (sqrt 9) <| traced (sqrt 16) <| traced (sqrt 25)
where traced m = liftIO (myThreadId >>= print) >> m

ThreadId 40


Since the concurrency provided by this operator is demand driven it cannot be used when the composed computations have timers that are relative to each other because all computations may not be started at the same time and therefore timers in all of them may not start at the same time. When relative timing among all computations is important or when we need to start all computations at once for some reason <|> must be used instead. However, <| is useful in situations when we want to optimally utilize the resources and we know that the computations can run in parallel but we do not care if they actually run in parallel or not, that decision is left to the scheduler. Also, note that this operator can be used to fold infinite containers in contrast to <|>, because it does not require us to run all of them at the same time.

The left bias (or the DFS style) of the operator <| is suggested by its shape. You can also think of this as an unbalanced version of the fairly parallel operator <|>.

### Interleaved composition (<=>)

The <=> operator is serial like <> but it interleaves the two streams i.e. it yields one element from the first stream and then one element from the second stream, and so on. The following example prints the sequence 1, 3, 2, 4 and takes a total of 10 seconds because everything is serial:

main = runStreamT $(delay 1 <> delay 2) <=> (delay 3 <> delay 4)  ThreadId 36: Delay 1 ThreadId 36: Delay 3 ThreadId 36: Delay 2 ThreadId 36: Delay 4  Note that this operator cannot be used to fold infinite containers since it requires preserving the state until a stream is finished. To be clear, it can combine infinite streams but not infinite number of streams. ### Fair Concurrent composition (<|>) The Alternative composition operator <|>, like <|, runs the composed computations concurrently. However, unlike <| it runs all of the computations in fairly parallel manner using a round robin scheduling mechanism. This can be considered as the concurrent version of the fairly interleaved serial operation <=>. Note that this cannot be used on infinite containers, as it will lead to an infinite sized scheduling queue. The following example sends a query to three search engines in parallel and prints the name of the search engine as a response arrives: import Streamly import Network.HTTP.Simple main = runStreamT$ google <|> bing <|> duckduckgo
where
get s = liftIO (httpNoBody (parseRequest_ s) >> putStrLn (show s))


### Custom composition

The async API can be used to create references to asynchronously running stream computations. We can then use uncons to explore the streams arbitrarily and then recompose individual elements to create a new stream. This way we can dynamically decide which stream to explore at any given time. Take an example of a merge sort of two sorted streams. We need to keep consuming items from the stream which has the lowest item in the sort order. This can be achieved using async references to streams. See Streamly.Examples.MergeSortedStreams.

## Monoid Style

Each of the semigroup compositions described has an identity that can be used to fold a possibly empty container. An empty stream is represented by nil which can be represented in various standard forms as mempty, empty or mzero. Some fold utilities are also provided by the library for convenience:

• foldWith folds a Foldable container of stream computations using the given composition operator.
• foldMapWith folds like foldWith but also maps a function before folding.
• forEachWith is like foldMapwith but the container argument comes before the function argument.
• The each primitive from Streamly.Prelude folds a Foldable container using the <> operator:

All of the following are equivalent:

import Streamly
import Streamly.Prelude

main = do
toList . serially $foldWith (<>) (map return [1..10]) >>= print toList . serially$ foldMapWith (<>) return [1..10]       >>= print
toList . serially $forEachWith (<>) [1..10] return >>= print toList . serially$ each [1..10]                          >>= print


# Transforming Streams

The previous section discussed ways to merge the elements of two streams without doing any transformation on them. In this section we will explore how to transform streams using Functor, Applicative or Monad style compositions. The applicative and monad composition of all Streaming types behave exactly the same way as a list transformer. For simplicity of illustration we are using streams of pure values in the following examples. However, the real application of streams arises when these streams are generated using monadic actions.

In functional programmer's parlance the Monad instance of Streaming types implement non-determinism, exploring all possible combination of choices from both the streams. From an imperative programmer's point of view it behaves like nested loops i.e. for each element in the first stream and for each element in the second stream apply the body of the loop. If you are familiar with list transformer this behavior is exactly the same as that of a list transformer.

Just like we saw in sum style compositions earlier, monadic composition also has multiple variants each of which exactly corresponds to one of the sum style composition variant.

### Serial Composition (StreamT)

When we interpret the monadic composition as StreamT we get a standard list transformer like serial composition.

import Streamly
import Streamly.Prelude

main = runStreamT $do x <- each [3,2,1] delay x  ThreadId 30: Delay 3 ThreadId 30: Delay 2 ThreadId 30: Delay 1  As you can see the code after the each statement is run three times, once for each value of x. All the three iterations are serial and run in the same thread one after another. When compared to imperative programming, this can also be viewed as a for loop with three iterations. A console echo loop copying standard input to standard output can simply be written like this: import Streamly import Data.Semigroup (cycle1) main = runStreamT$ cycle1 (liftIO getLine) >>= liftIO . putStrLn


When multiple streams are composed using this style they nest in a DFS manner i.e. nested iterations of an iteration are executed before we proceed to the next iteration at higher level. This behaves just like nested for loops in imperative programming.

import Streamly
import Streamly.Prelude

main = runStreamT $do x <- each [1,2] y <- each [3,4] liftIO$ putStrLn $show (x, y)  (1,3) (1,4) (2,3) (2,4)  You will also notice that this is the monadic equivalent of the sum style composition using <>. ### Async Composition (AsyncT) When we interpret the monadic composition as AsyncT we get a concurrent list-transformer like composition. Multiple monadic continuations (or loop iterations) may be started concurrently. Concurrency is demand driven i.e. more concurrent iterations are started only if the previous iterations are not able to produce enough output for the consumer of the output stream. This is the concurrent version of StreamT. import Streamly import Streamly.Prelude main = runAsyncT$ do
x <- each [3,2,1]
delay x

ThreadId 40: Delay 1


As you can see the code after the each statement is run three times, once for each value of x. All the three iterations are concurrent and run in different threads. The iteration with least delay finishes first. When compared to imperative programming, this can be viewed as a for loop with three concurrent iterations.

Concurrency is demand driven just as in the case of <|. When multiple streams are composed using this style the iterations are triggered in a DFS manner just like StreamT i.e. nested iterations are executed before we proceed to the next iteration at higher level. However, unlike StreamT more than one iterations may be started concurrently, and based on the demand from the consumer.

import Streamly
import Streamly.Prelude

main = runAsyncT $do x <- each [1,2] y <- each [3,4] liftIO$ putStrLn $show (x, y)  (1,3) (1,4) (2,3) (2,4)  You will notice that this is the monadic equivalent of the <| style sum composition. The same caveats apply to this as the <| operation. ### Interleaved Composition (InterleavedT) When we interpret the monadic composition as InterleavedT we get a serial but fairly interleaved list-transformer like composition. The monadic continuations or iterations of the outer loop are fairly interleaved with the continuations or iterations of the inner loop. import Streamly import Streamly.Prelude main = runInterleavedT$ do
x <- each [1,2]
y <- each [3,4]
liftIO $putStrLn$ show (x, y)

(1,3)
(2,3)
(1,4)
(2,4)


You will notice that this is the monadic equivalent of the <=> style sum composition. The same caveats apply to this as the <=> operation.

### Fair Concurrent Composition (ParallelT)

When we interpret the monadic composition as ParallelT we get a concurrent list-transformer like composition just like AsyncT. The difference is that this is fully parallel with all iterations starting concurrently instead of the demand driven concurrency of AsyncT.

import Streamly
import Streamly.Prelude

main = runParallelT $do x <- each [3,2,1] delay x  ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3  You will notice that this is the monadic equivalent of the <|> style sum composition. The same caveats apply to this as the <|> operation. ### Exercise The streamly code is usually written in a way that is agnostic of the specific monadic composition type. We use a polymorphic type with a Streaming type class constraint. When running the stream we can choose the specific mode of composition. For example look at the following code. import Streamly import Streamly.Prelude composed :: Streaming t => t m a composed = do sz <- sizes cl <- colors sh <- shapes liftIO$ putStrLn $show (sz, cl, sh) where sizes = each [1, 2, 3] colors = each ["red", "green", "blue"] shapes = each ["triangle", "square", "circle"]  Now we can interpret this in whatever way we want: main = runStreamT composed main = runAsyncT composed main = runInterleavedT composed main = runParallelT composed  Equivalently, we can also write it using the type adapter combinators, like this: main = runStreaming$ serially     $composed main = runStreaming$ asyncly      $composed main = runStreaming$ interleaving $composed main = runStreaming$ parallely    $composed  As an exercise try to figure out the output of this code for each mode of composition. ## Applicative Applicative is precisely the same as the ap operation of Monad. For zipping and parallel applicatives separate types ZipStream and ZipAsync are provided. The following example runs all iterations serially and takes a total 17 seconds (1 + 3 + 4 + 2 + 3 + 4): import Streamly import Streamly.Prelude import Control.Concurrent s1 = d 1 <> d 2 s2 = d 3 <> d 4 d n = delay n >> return n main = (toList . serially$ (,) <$> s1 <*> s2) >>= print  ThreadId 36: Delay 1 ThreadId 36: Delay 3 ThreadId 36: Delay 4 ThreadId 36: Delay 2 ThreadId 36: Delay 3 ThreadId 36: Delay 4 [(1,3),(1,4),(2,3),(2,4)]  Similalrly interleaving runs the iterations in an interleaved order but since it is serial it takes a total of 17 seconds: main = (toList . interleaving$ (,) <$> s1 <*> s2) >>= print  ThreadId 36: Delay 1 ThreadId 36: Delay 3 ThreadId 36: Delay 2 ThreadId 36: Delay 3 ThreadId 36: Delay 4 ThreadId 36: Delay 4 [(1,3),(2,3),(1,4),(2,4)]  AsyncT can run the iterations concurrently and therefore takes a total of 10 seconds (1 + 2 + 3 + 4): main = (toList . asyncly$ (,) <$> s1 <*> s2) >>= print  ThreadId 34: Delay 1 ThreadId 36: Delay 2 ThreadId 35: Delay 3 ThreadId 36: Delay 3 ThreadId 35: Delay 4 ThreadId 36: Delay 4 [(1,3),(2,3),(1,4),(2,4)]  Similalrly ParallelT as well can run the iterations concurrently and therefore takes a total of 10 seconds (1 + 2 + 3 + 4): main = (toList . parallely$ (,) <$> s1 <*> s2) >>= print  ThreadId 34: Delay 1 ThreadId 36: Delay 2 ThreadId 35: Delay 3 ThreadId 36: Delay 3 ThreadId 35: Delay 4 ThreadId 36: Delay 4 [(1,3),(2,3),(1,4),(2,4)]  ## Functor fmap transforms a stream by mapping a function on all elements of the stream. The functor instance of each stream type defines fmap to be precisely the same as liftM, and therefore fmap is always serial irrespective of the type. For concurrent mapping, alternative versions of fmap, namely, asyncMap and parMap are provided. import Streamly main = (toList$ serially $fmap show$ each [1..10]) >>= print


Also see the mapM and sequence functions for mapping actions, in the Streamly.Prelude module.

# Zipping Streams

Zipping is a special transformation where the corresponding elements of two streams are combined together using a zip function producing a new stream of outputs. Two different types are provided for serial and concurrent zipping. These types provide an applicative instance that zips the argument streams. Also see the zipping function in the Streamly.Prelude module.

## Serial Zipping

ZipStream zips streams serially:

import Streamly
import Streamly.Prelude
import Control.Concurrent

d n = delay n >> return n
s1 = adapt . serially $d 1 <> d 2 s2 = adapt . serially$ d 3 <> d 4

main = (toList . zipping $(,) <$> s1 <*> s2) >>= print


This takes total 10 seconds to zip, which is (1 + 2 + 3 + 4) since everything runs serially:

ThreadId 29: Delay 1
[(1,3),(2,4)]


## Parallel Zipping

ZipAsync zips streams concurrently:

import Streamly
import Streamly.Prelude
import Control.Concurrent
import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))

d n = delay n >> return n
s1 = adapt . serially $d 1 <> d 2 s2 = adapt . serially$ d 3 <> d 4

main = do
liftIO $hSetBuffering stdout LineBuffering (toList . zippingAsync$ (,) <$> s1 <*> s2) >>= print  This takes 7 seconds to zip, which is max (1,3) + max (2,4) because 1 and 3 are produced concurrently, and 2 and 4 are produced concurrently: ThreadId 32: Delay 1 ThreadId 32: Delay 2 ThreadId 33: Delay 3 ThreadId 33: Delay 4 [(1,3),(2,4)]  # Summary of Compositions The following table summarizes the types for monadic compositions and the operators for sum style compositions. This table captures the essence of streamly. +-----+--------------+------------+ | | Serial | Concurrent | +=====+==============+============+ | DFS | StreamT | AsyncT | | +--------------+------------+ | | <> | <| | +-----+--------------+------------+ | BFS | InterleavedT | ParallelT | | +--------------+------------+ | | <=> | <|> | +-----+--------------+------------+  # Concurrent Programming When writing concurrent programs there are two distinct places where the programmer chooses the type of concurrency. First, when generating a stream by combining other streams we can use one of the sum style operators to combine them concurrently or serially. Second, when processing a stream in a monadic composition we can choose one of the monad composition types to choose the desired type of concurrency. In the following example the squares of x and y are computed concurrently using the <| operator and the square roots of their sum are also computed concurrently by using the asyncly combinator. We can choose different combinators e.g. <> and serially, to control the concurrency. import Streamly import Streamly.Prelude (toList) import Data.List (sum) main = do z <- toList$ asyncly     -- Concurrent monadic processing (sqrt below)
$do x2 <- forEachWith (<|) [1..100]$ -- Concurrent "for" loop
\x -> return $x * x -- body of the loop y2 <- forEachWith (<|) [1..100]$
\y -> return $y * y return$ sqrt (x2 + y2)
print $sum z  You can see how this directly maps to the imperative style OpenMP model, we use combinators and operators instead of the ugly pragmas. For more concurrent programming examples see, Streamly.Examples.ListDirRecursive, Streamly.Examples.MergeSortedStreams and Streamly.Examples.SearchEngineQuery. # Reactive Programming Reactive programming is nothing but concurrent streaming which is what streamly is all about. With streamly we can generate streams of events, merge streams that are generated concurrently and process events concurrently. We can do all this without any knowledge about the specifics of the implementation of concurrency. In the following example you will see that the code is just regular Haskell code without much streamly APIs used (active hyperlinks are the streamly APIs) and yet it is a reactive application. This application has two independent and concurrent sources of event streams, acidRain and userAction. acidRain continuously generates events that deteriorate the health of the game character. userAction can be "potion" or "quit". When the user types "potion" the health improves and the game continues. {-# LANGUAGE FlexibleContexts #-} import Streamly import Control.Concurrent (threadDelay) import Control.Monad (when) import Control.Monad.State import Data.Semigroup (cycle1) data Event = Harm Int | Heal Int | Quit deriving (Show) userAction :: MonadIO m => StreamT m Event userAction = cycle1$ liftIO askUser
where
command <- getLine
case command of
"potion" -> return (Heal 10)
"quit"   -> return  Quit
_        -> putStrLn "What?" >> askUser

acidRain :: MonadIO m => StreamT m Event
acidRain = cycle1 $liftIO (threadDelay 1000000) >> return (Harm 1) game :: (MonadAsync m, MonadState Int m) => StreamT m () game = do event <- userAction <|> acidRain case event of Harm n -> modify$ \h -> h - n
Heal n -> modify $\h -> h + n Quit -> fail "quit" h <- get when (h <= 0)$ fail "You die!"
liftIO $putStrLn$ "Health = " ++ show h

main = do
putStrLn "Your health is deteriorating due to acid rain,\
\ type \"potion\" or \"quit\""
_ <- runStateT (runStreamT game) 60
return ()


You can also find the source of this example in Streamly.Examples.AcidRainGame. It has been adapted from Gabriel's pipes-concurrency package. This is much simpler compared to the pipes version because of the builtin concurrency in streamly. You can also find a SDL based reactive programming example adapted from Yampa in Streamly.Examples.CirclingSquare.

# Performance

Streamly is highly optimized for performance, it is designed for serious high performing, concurrent and scalable applications. We have created the streaming-benchmarks package which is specifically and carefully designed to measure the performance of Haskell streaming libraries fairly and squarely in the right way. Streamly performs at par or even better than most streaming libraries for common operations even though it needs to deal with the concurrency capability.

# Interoperation with Streaming Libraries

We can use unfoldr and uncons to convert one streaming type to another. We will assume the following common code to be available in the examples demonstrated below.

import Streamly
import Streamly.Prelude
import System.IO (stdin)

unconsE s = uncons s >>= maybe (return $Left ()) (return . Right) stdinLn = serially$ fromHandle stdin


Interop with pipes:

import qualified Pipes as P
import qualified Pipes.Prelude as P

main = do
-- streamly to pipe
P.runEffect $P.for (P.unfoldr unconsE stdinLn) (lift . putStrLn) -- pipe to streamly -- Adapt P.next to return a Maybe instead of Either let nextM p = P.next p >>= either (\_ -> return Nothing) (return . Just) runStreamT$ unfoldrM nextM P.stdinLn >>= lift . putStrLn


Interop with streaming:

import qualified Streaming as S
import qualified Streaming.Prelude as S

main = do
-- streamly to streaming
S.stdoutLn $S.unfoldr unconsE stdinLn -- streaming to streamly runStreamT$ unfoldrM S.uncons S.stdinLn >>= lift . putStrLn



Interop with conduit:

import qualified Data.Conduit as C
import qualified Data.Conduit.List as C
import qualified Data.Conduit.Combinators as C

-- streamly to conduit
main = (C.unfoldM uncons stdinLn) C. C.print


# Comparison with Existing Packages

Streamly unifies non-determinism, streaming, concurrency and FRP functionality that is otherwise covered by several disparate packages, and it does that with a surprisingly concise API. Here is a list of popular and well-known packages in all these areas:

+-----------------+----------------+
| Non-determinism | list-t         |
|                 +----------------+
|                 | logict         |
+-----------------+----------------+
| Streaming       | streaming      |
|                 +----------------+
|                 | conduit        |
|                 +----------------+
|                 | pipes          |
|                 +----------------+
|                 | simple-conduit |
+-----------------+----------------+
| Concurrency     | async          |
|                 +----------------+
|                 | transient      |
+-----------------+----------------+
| FRP             | Yampa          |
|                 +----------------+
|                 | dunai          |
|                 +----------------+
|                 | reflex         |
+-----------------+----------------+


Streamly covers all the functionality provided by both the non-determinism packages listed above and provides better performance in comparison to those. In fact, at the core streamly is a list transformer but it naturally integrates the concurrency dimension to the basic list transformer functionality.

When it comes to streaming, in terms of core concepts, simple-conduit is the package that is closest to streamly if we set aside the concurrency dimension, both are streaming packages with list transformer like monad composition. However, in terms of API streamly is more like the streaming package. Streamly can be used to achieve more or less the functionality provided by any of the streaming packages listed above. The types and API of streamly are much simpler in comparison to conduit and pipes. It is more or less like the standard Haskell list APIs.

When it comes to concurrency, streamly can do everything that the async package can do and more. async provides applicative concurrency whereas streamly provides both applicative and monadic concurrency. The ZipAsync type behaves like the applicative instance of async. This work was originally inspired by the concurrency implementation in transient though it has no resemblence with that. Streamly provides concurrency as transient does but in a sort of dual manner, it can lazily stream the output. In comparison to transient streamly has a first class streaming interface and is a monad transformer that can be used universally in any Haskell monad transformer stack.

The non-determinism, concurrency and streaming combination make streamly a strong FRP capable library as well. FRP is fundamentally stream of events that can be processed concurrently. The example in this tutorial as well as the Streamly.Examples.CirclingSquare example from Yampa demonstrate the basic FRP capability of streamly. In core concepts streamly is strikingly similar to dunai. dunai was designed from a FRP perspective and streamly was originally designed from a concurrency perspective. However, both have similarity at the core.