streamly-0.2.0: Beautiful Streaming, Concurrent and Reactive Composition

Streamly.Tutorial

Description

Streamly is a general computing framework based on streaming IO. The IO monad and pure lists are a special case of streamly. On one hand, streamly extends the lists of pure values to lists of monadic actions, on the other hand it extends the IO monad with concurrrent non-determinism. In simple imperative terms we can say that streamly extends the IO monad with for loops and nested for loops with concurrency support. You can understand this analogy better once you can go through this tutorial.

Streaming in general enables writing modular, composable and scalable applications with ease, and concurrency allows you to make them scale and perform well. Streamly enables writing scalable concurrent applications without being aware of threads or synchronization. No explicit thread control is needed, where applicable the concurrency rate is automatically controlled based on the demand by the consumer. However, combinators can be used to fine tune the concurrency control.

Streaming and concurrency together enable expressing reactive applications conveniently. See the CirclingSquare example in the examples directory for a simple SDL based FRP example. To summarize, streamly provides a unified computing framework for streaming, non-determinism and functional reactive programming in an elegant and simple API that is a natural extension of pure lists to monadic streams.

In this tutorial we will go over the basic concepts and how to use the library. See the last section for further reading resources.

Synopsis

# Streams

The way a list represents a sequence of pure values, a stream represents a sequence of monadic actions. The monadic stream API offered by Streamly is very close to the Haskell Prelude pure lists' API, it can be considered as a natural extension of lists to monadic actions. Streamly streams provide concurrent composition and merging of streams. It can be considered as a concurrent list transformer. In contrast to the Prelude lists, merging or appending streams of arbitrary length is scalable and inexpensive.

The basic stream type is Serial, it represents a sequence of IO actions, and is a Monad. The Serial monad is almost a drop in replacement for the IO monad, IO monad is a special case of the Serial monad; IO monad represents a single IO action whereas the Serial monad represents a series of IO actions. The only change you need to make to go from IO to Serial is to use runStream to run the monad and to prefix the IO actions with either once or liftIO. If you use liftIO you can switch from Serial to IO monad by simply removing the runStream function; no other changes are needed unless you have used some stream specific composition or combinators.

Similarly, the Serial type is almost a drop in replacement for pure lists, pure lists are a special case of monadic streams. If you use nil in place of '[]' and |: in place : you can replace a list with a Serial The only difference is that the elements must be monadic type and to operate on the streams we must use the corresponding functions from Streamly.Prelude instead of using the base Prelude.

# Flavors of Streams

There are a few more types similar to Serial that all represent streams and differ only in the Semigroup, Applicative and Monad compositions of the streams.

The composition of two or more streams is distinguished based on three characterstics, traversal order, execution order and consumption order. Traversal of a composition of streams could be deep or wide. Deep goes depth first i.e. each stream is traversed fully before we traverse the next stream. Wide goes breadth first i.e. one element from each stream is traversed before coming back to the first stream again. Execution could be serial (i.e. synchronous) or asynchronous. In serial execution we execute an action in the next stream only after the first has finished executing. In asynchronous execution actions in both streams can be executed asynchronously i.e. the next action can start executing even before the first one has finished. The third parameter is consumption order that is in what order the output generated by the composition is consumed. Consumption could be serial or asynchronous. In serial consumption, the outputs are consumed in the traversal order, in asynchronous consumption the outputs are consumed as they arrive i.e. first come first serve order.

The following table summarizes different styles of streams based on how they compose. All these types are monads and they differ in Semigroup, Applicative and Monad compositions:

+------------+-----------+--------------+--------------+
| Type       | Traversal | Execution    | Consumption  |
+============+===========+==============+==============+
| Serial     | Deep      | Serial       | Serial       |
+------------+-----------+--------------+--------------+
| WSerial    | Wide      | Serial       | Serial       |
+------------+-----------+--------------+--------------+
| Async      | Deep      | Asynchronous | Asynchronous |
+------------+-----------+--------------+--------------+
| WAsync     | Wide      | Asynchronous | Asynchronous |
+------------+-----------+--------------+--------------+
| Parallel   | Parallel  | Asynchronous | Asynchronous |
+------------+-----------+--------------+--------------+


Other than these types there are also ZipSerial and ZipAsync types that zip streams serially or concurrently using Applicative operation. These types are not monads they are only applicatives and they do not differ in Semigroup composition.

All these types can be freely inter-converted using type conversion combinators or type annotations without any cost, to acheive the desired composition style. To force a particular type of composition we coerce the stream type using the corresponding type adapting combinator from serially, wSerially, asyncly, wAsyncly, parallely, zipSerially or zipAsyncly. The default stream type is inferred as Serial unless you change it by using one of the combinators or using a type annotation.

# Imports and Supporting Code

In most of example snippets we do not repeat the imports. Where imports are not explicitly specified use the imports shown below.

import Streamly
import Streamly.Prelude ((|:), nil)
import qualified Streamly.Prelude as S

import Control.Concurrent


To illustrate concurrent vs serial composition aspects, we will use the following delay function to introduce a sleep or delay specified in seconds. After the delay it prints the number of seconds it slept.

delay n = S.once $do threadDelay (n * 1000000) tid <- myThreadId putStrLn (show tid ++ ": Delay " ++ show n)  # Generating Streams We will assume the following imports in this tutorial. Go ahead, fire up a GHCi session and import these lines to start playing. > import Streamly > import Streamly.Prelude ((|:)) > import qualified Streamly.Prelude as S  nil represents an empty stream and consM or its operator form |: adds a monadic action at the head of the stream. > S.toList S.nil [] > S.toList$ getLine |: getLine |: S.nil
hello
world
["hello","world"]


To create a singleton stream from a pure value use pure and to create a singleton stream from a monadic action use once.

> S.toList $pure 1 [1] > S.toList$ S.once getLine
hello
["hello"]


To create a stream from pure values in a Foldable container use fromFoldable which is equivalent to a fold using cons and nil:

> S.toList $S.fromFoldable [1..3] [1,2,3] > S.toList$ foldr S.cons S.nil [1..3]
[1,2,3]


To create a stream from monadic actions in a Foldable container just use a right fold using consM and nil:

> runStream $foldr (|:) S.nil [putStr "Hello ", putStrLn "world!"] Hello world!  For more ways to construct a stream see the module Streamly.Prelude. # Eliminating Streams We have already seen runStream and toList to eliminate a stream in the examples above. runStream runs a stream discarding the results i.e. only for effects. toList runs the stream and collects the results in a list. For other ways to eliminate a stream see the Folding section in Streamly.Prelude module. # Transforming Streams Transformation over a stream is the equivalent of a for loop construct in imperative paradigm. We iterate over every element in the stream and perform certain transformations for each element. Transformations may involve mapping functions over the elements, filtering elements from the stream or folding all the elements in the stream into a single value. Streamly streams are exactly like lists and you can perform all the transformations in the same way as you would on lists. Here is a simple console echo program that just echoes every input line, forever: > runStream$ S.repeatM getLine & S.mapM putStrLn


The following code snippet reads lines from standard input, filters blank lines, drops the first non-blank line, takes the next two, up cases them, numbers them and prints them:

import Streamly
import qualified Streamly.Prelude as S
import Data.Char (toUpper)
import Data.Function ((&))

main = runStream $S.repeatM getLine & S.filter (not . null) & S.drop 1 & S.take 2 & fmap (map toUpper) & S.zipWith (\n s -> show n ++ " " ++ s) (S.fromFoldable [1..]) & S.mapM putStrLn  # Merging Streams ## Semigroup Style We can combine two streams into a single stream using semigroup composition operation <>. Streams can be combined in many different ways as described in the following sections, the <> operation behaves differently depending on the stream type in effect. The stream type and therefore the composition style can be changed at any point using one of the type combinators as discussed earlier. ### Deep Serial Composition (Serial) The Semigroup operation <> of the Serial type combines the two streams in a serial depth first manner. We use the serially type combinator to effect Serial style of composition. We can also use an explicit Serial type annotation for the stream to acheive the same effect. However, since Serial is the default type unless explicitly specified by using a combinator, we can omit using an explicit combinator or type annotation for this style of composition. When two streams with multiple elements are combined in this manner, the monadic actions in the two streams are performed sequentially i.e. first all actions in the first stream are performed sequentially and then all actions in the second stream are performed sequentially. We call it serial depth first as the full depth of one stream is fully traversed before we move to the next. The following example prints the sequence 1, 2, 3, 4: main = runStream$ (print 1 |: print 2 |: nil) <> (print 3 |: print 4 |: nil)

1
2
3
4


All actions in both the streams are performed serially in the same thread. In the following example we can see that all actions are performed in the same thread and take a combined total of 3 + 2 + 1 = 6 seconds:

main = runStream $delay 3 <> delay 2 <> delay 1  ThreadId 36: Delay 3 ThreadId 36: Delay 2 ThreadId 36: Delay 1  The polymorphic version of the binary operation <> of the Serial type is serial. We can use serial to join streams in a sequential manner irrespective of the type of stream: main = runStream$ (print 1 |: print 2 |: nil) serial (print 3 |: print 4 |: nil)


### Wide Serial Composition (WSerial)

The Semigroup operation <> of the WSerial type combines the two streams in a serial breadth first manner. We use the wSerially type combinator to effect WSerial style of composition. We can also use the WSerial type annotation for the stream to acheive the same effect.

When two streams with multiple elements are combined in this manner, we traverse all the streams in a breadth first manner i.e. one action from each stream is peformed and yielded to the resulting stream before we come back to the first stream again and so on. The following example prints the sequence 1, 3, 2, 4

main = runStream . wSerially $(print 1 |: print 2 |: nil) <> (print 3 |: print 4 |: nil)  1 3 2 4  Even though the monadic actions of the two streams are performed in an interleaved manner they are all performed serially in the same thread. In the following example we can see that all actions are performed in the same thread and take a combined total of 3 + 2 + 1 = 6 seconds: main = runStream . wSerially$ delay 3 <> delay 2 <> delay 1

ThreadId 36: Delay 3


The polymorphic version of the WSerial binary operation <> is called wSerial. We can use wSerial to join streams in an interleaved manner irrespective of the type, notice that we have not used the wSerially combinator in the following example:

main = runStream $(print 1 |: print 2 |: nil) wSerial (print 3 |: print 4 |: nil)  1 3 2 4  Note that this composition cannot be used to fold infinite number of streams since it requires preserving the state until a stream is finished. ### Deep Asynchronous Composition (Async) The Semigroup operation <> of the Async type combines the two streams in a depth first manner with parallel look ahead. We use the asyncly type combinator to effect Async style of composition. We can also use the Async type annotation for the stream type to acheive the same effect. When two streams with multiple elements are combined in this manner, the streams are traversed in depth first manner just like Serial, however it can execute the next stream concurrently and return the results from it as they arrive i.e. the results from the next stream may be yielded even before the results from the first stream. Concurrent execution of the next stream(s) is performed if the first stream blocks or if it cannot produce output at the rate that is enough to meet the consumer demand. Multiple streams can be executed concurrently to meet the demand. In the following example the first stream does not block, therefore the first stream is completely exhausted before the second. main = runStream . asyncly$ (print 1 |: print 2 |: nil) <> (print 3 |: print 4 |: nil)

1
2
3
4


If the first stream blocks, we can yield from the second. In the example below each yield in the stream has a constant delay of 1 second therefore 1 and 3 would be yielded first and then 2 and 4 would be yielded.

main = runStream . asyncly $(p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil) where p n = threadDelay 1000000 >> print n  1 3 2 4  In the following example we can see that new threads are started when a computation blocks. Notice that the output from the stream with the shortest delay is printed first. The whole computation takes maximum of (3, 2, 1) = 3 seconds: main = runStream . asyncly$ delay 3 <> delay 2 <> delay 1

ThreadId 42: Delay 1


When we have a tree of computations composed using this style, the tree is traversed in DFS style just like the Serial style, the only difference is that here we can move on to executing the next stream if a stream blocks. However, we will not start new threads if we have sufficient output to saturate the consumer. This is why we call it left-biased demand driven or adaptive concurrency style, the concurrency tends to stay on the left side of the composition as long as possible. More threads are started based on the pull rate of the consumer. The following example prints an output every second as all of the actions are concurrent.

main = runStream . asyncly $(delay 1 <> delay 2) <> (delay 3 <> delay 4)  1 2 3 4  All the computations may even run in a single thread when more threads are not needed. As you can see, in the following example the computations are run in a single thread one after another, because none of them blocks. However, if the thread consuming the stream were faster than the producer then it would have started parallel threads for each computation to keep up even if none of them blocks: main = runStream . asyncly$ traced (sqrt 9) <> traced (sqrt 16) <> traced (sqrt 25)
where traced m = S.once (myThreadId >>= print) >> return m

ThreadId 40


Note that the order of printing in the above examples may change due to variations in scheduling latencies for concurrent threads.

The polymorphic version of the Async binary operation <> is called async. We can use async to join streams in a left biased adaptively concurrent manner irrespective of the type, notice that we have not used the asyncly combinator in the following example:

main = runStream $delay 3 async delay 2 async delay 1  ThreadId 42: Delay 1 ThreadId 41: Delay 2 ThreadId 40: Delay 3  Since the concurrency provided by this operator is demand driven it cannot be used when the composed computations start timers that are relative to each other because all computations may not be started at the same time and therefore timers in all of them may not start at the same time. When relative timing among all computations is important or when we need to start all computations at once for any reason Parallel style must be used instead. Async style should be preferred over Parallel or WAsync unless you really need those. It utilizes the resources optimally. It should be used when we know that the computations can run in parallel but we do not care if they actually run in parallel or not, that decision can be left to the scheduler based on demand. Also, note that this operator can be used to fold infinite number of streams in contrast to the Parallel or WAsync styles, because it does not require us to run all of them at the same time in a fair manner. ### Wide Asynchronous Composition (WAsync) The Semigroup operation <> of the WAsync type combines two streams in a concurrent manner using breadth first traversal. We use the wAsyncly type combinator to effect WAsync style of composition. We can also use the WAsync type annotation for the stream to acheive the same effect. When streams with multiple elements are combined in this manner, we traverse all the streams concurrently in a breadth first manner i.e. one action from each stream is peformed and yielded to the resulting stream before we come back to the first stream again and so on. Even though we execute the actions in a breadth first order the outputs may be consumed in a different order because they are consumed on a first come first serve basis. In the following example we can see that outputs are produced in the breadth first travresal order but this is not guaranteed. main = runStream . wAsyncly$ (print 1 |: print 2 |: nil) <> (print 3 |: print 4 |: nil)

1
3
2
4


The polymorphic version of the binary operation <> of the WAsync type is wAsync. We can use wAsync to join streams using a breadth first concurrent traversal irrespective of the type, notice that we have not used the wAsyncly combinator in the following example:

main = runStream $delay 3 wAsync delay 2 wAsync delay 1  ThreadId 42: Delay 1 ThreadId 41: Delay 2 ThreadId 40: Delay 3  Since the concurrency provided by this style is demand driven it may not be used when the composed computations start timers that are relative to each other because all computations may not be started at the same time and therefore timers in all of them may not start at the same time. When relative timing among all computations is important or when we need to start all computations at once for any reason Parallel style must be used instead. ### Parallel Asynchronous Composition (Parallel) The Semigroup operation <> of the Parallel type combines the two streams in a fairly concurrent manner with round robin scheduling. We use the parallely type combinator to effect Parallel style of composition. We can also use the Parallel type annotation for the stream type to acheive the same effect. When two streams with multiple elements are combined in this manner, the monadic actions in both the streams are performed concurrently with a fair round robin scheduling. The outputs are yielded in the order in which the actions complete. This is pretty similar to the WAsync type, the difference is that WAsync is adaptive to the consumer demand and may or may not execute all actions in parallel depending on the demand, whereas Parallel runs all the streams in parallel irrespective of the demand. The following example sends a query to all the three search engines in parallel and prints the name of the search engines in the order in which the responses arrive: import Streamly import qualified Streamly.Prelude as S import Network.HTTP.Simple main = runStream . parallely$ google <> bing <> duckduckgo
where
get s = S.once (httpNoBody (parseRequest_ s) >> putStrLn (show s))


The polymorphic version of the binary operation <> of the Parallel type is parallel. We can use parallel to join streams in a fairly concurrent manner irrespective of the type, notice that we have not used the parallely combinator in the following example:

main = runStream $delay 3 parallel delay 2 wAsync delay 1  ThreadId 42: Delay 1 ThreadId 41: Delay 2 ThreadId 40: Delay 3  Note that this style of composition cannot be used to combine infinite number of streams, as it will lead to an infinite sized scheduling queue. ### Custom composition The mkAsync API can be used to create references to asynchronously running stream computations. We can then use uncons to explore the streams arbitrarily and then recompose individual elements to create a new stream. This way we can dynamically decide which stream to explore at any given time. Take an example of a merge sort of two sorted streams. We need to keep consuming items from the stream which has the lowest item in the sort order. This can be achieved using async references to streams. See "MergeSort.hs" in the examples directory. ## Monoid Style We can use Monoid instances to fold a container of streams in the desired style using fold or foldMap. We have also provided some fold utilities to fold streams using the polymorphic combine operations: • foldWith is like fold, it folds a Foldable container of streams using the given composition operator. • foldMapWith is like foldMap, it folds like foldWith but also maps a function before folding. • forEachWith is like foldMapwith but the container argument comes before the function argument. All of the following are equivalent and start ten concurrent tasks each with a delay from 1 to 10 seconds, resulting in the printing of each number every second: import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main = do runStream$ asyncly $foldMap delay [1..10] runStream$ foldWith    async (map delay [1..10])
runStream $foldMapWith async delay [1..10] runStream$ forEachWith async [1..10] delay
where delay n = S.once $threadDelay (n * 1000000) >> print n  # Nesting Streams Till now we discussed ways to apply transformations on a stream or to merge streams together to create another stream. We mentioned earlier that transforming a stream is similar to a for loop in the imperative paradigm. We will now discuss the concept of a nested composition of streams which is analogous to nested for loops in the imperative paradigm. Functional programmers call this style of composition a list transformer or ListT. Logic programmers call it a logic monad or non-deterministic composition, but for ordinary imperative minded people like me it is easier to think in terms of good old nested for loops. ## Monad In functional programmer's parlance the Monad instances of different IsStream types implement non-determinism, exploring all possible combination of choices from both the streams. From an imperative programmer's point of view it behaves like nested loops i.e. for each element in the first stream and for each element in the second stream execute the body of the loop. The Monad instances of Serial, WSerial, Async and WAsync stream types support different flavors of nested looping. In other words, they are all variants of list transformer. The nesting behavior of these types correspond exactly to the way they merge streams as we discussed in the previous section. ### Deep Serial Nesting (Serial) The Monad composition of the Serial type behaves like a standard list transformer. This is the default when we do not use an explicit type combinator. However, the serially type combinator can be used to switch to this style of composition. We will see how this style of composition works in the following examples. Let's start with an example with a simple for loop without any nesting. For simplicity of illustration we are using streams of pure values in all the examples. However, the streams could also be made of monadic actions instead. import Streamly import qualified Streamly.Prelude as S main = runStream$ do
x <- S.fromFoldable [3,2,1]
delay x

ThreadId 30: Delay 3


As we can see, the code after the fromFoldable statement is run three times, once for each value of x drawn from the stream. All the three iterations are serial and run in the same thread one after another. In imperative terms this is equivalent to a for loop with three iterations.

A console echo loop copying standard input to standard output can simply be written like this:

import Streamly
import qualified Streamly.Prelude as S

main = runStream $forever$ S.once getLine >>= S.once . putStrLn


When multiple streams are composed using this style they nest in a DFS manner i.e. nested iterations of a loop are executed before we proceed to the next iteration of the parent loop. This behaves just like nested for loops in imperative programming.

import Streamly
import qualified Streamly.Prelude as S

main = runStream $do x <- S.fromFoldable [1,2] y <- S.fromFoldable [3,4] S.once$ putStrLn $show (x, y)  (1,3) (1,4) (2,3) (2,4)  Notice that this is analogous to merging streams of type Serial or merging streams using serial. ### Wide Serial Nesting (WSerial) The Monad composition of WSerial type interleaves the iterations of outer and inner loops in a nested loop composition. This works exactly the same way as the merging of two streams in wSerially fashion works. The wSerially type combinator can be used to switch to this style of composition. Alternatively, a type annotation can be used to specify the type of the stream as WSerial. import Streamly import qualified Streamly.Prelude as S main = runStream . wSerially$ do
x <- S.fromFoldable [1,2]
y <- S.fromFoldable [3,4]
S.once $putStrLn$ show (x, y)

(1,3)
(2,3)
(1,4)
(2,4)


### Deep Asynchronous Nesting (Async)

The Monad composition of Async type can perform the iterations of a loop concurrently. Concurrency is demand driven i.e. more concurrent iterations are started only if the previous iterations are not able to produce enough output for the consumer of the output stream. This works exactly the same way as the merging of two streams asyncly works. This is the concurrent analogue of Serial style monadic composition.

The asyncly type combinator can be used to switch to this style of composition. Alternatively, a type annotation can be used to specify the type of the stream as Async.

import Streamly
import Streamly.Prelude

main = runStream . asyncly $do x <- S.fromFoldable [3,2,1] delay x  ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3  As we can see the code after the fromFoldable statement is run three times, once for each value of x. All the three iterations are concurrent and run in different threads. The iteration with least delay finishes first. When compared to imperative programming, this can be viewed as a for loop with three concurrent iterations. Concurrency is demand driven just as in the case of async merging. When multiple streams are composed using this style, the iterations are triggered in a depth first manner just like Serial i.e. nested iterations are executed before we proceed to the next iteration at higher level. However, unlike Serial more than one iterations may be started concurrently based on the demand from the consumer of the stream. import Streamly import qualified Streamly.Prelude as S main = runStream . asyncly$ do
x <- S.fromFoldable [1,2]
y <- S.fromFoldable [3,4]
S.once $putStrLn$ show (x, y)

(1,3)
(1,4)
(2,3)
(2,4)


### Wide Asynchronous Nesting (WAsync)

Just like Async the Monad composition of WAsync runs the iterations of a loop concurrently. The difference is in the nested loop behavior. The nested loops in this type are traversed and executed in a breadth first manner rather than the depth first manner of Async style. The loop nesting works exactly the same way as the merging of streams wAsyncly works. The wAsyncly type combinator can be used to switch to this style of composition. Alternatively, a type annotation can be used to specify the type of the stream as WAsync.

import Streamly
import qualified Streamly.Prelude as S

main = runStream . wAsyncly $do x <- S.fromFoldable [1,2] y <- S.fromFoldable [3,4] S.once$ putStrLn $show (x, y)  (1,3) (2,3) (1,4) (2,4)  ### Parallel Asynchronous Nesting (Parallel) Just like Async or WAsync the Monad composition of Parallel runs the iterations of a loop concurrently. The difference is in the nested loop behavior. The streams at each nest level is run fully concurrently irrespective of the demand. The loop nesting works exactly the same way as the merging of streams parallely works. The parallely type combinator can be used to switch to this style of composition. Alternatively, a type annotation can be used to specify the type of the stream as Parallel. import Streamly import qualified Streamly.Prelude as S main = runStream . parallely$ do
x <- S.fromFoldable [3,2,1]
delay x

ThreadId 40: Delay 1


### Exercise

Streamly code is usually written in a way that is agnostic of the specific monadic composition type. We use a polymorphic type with a IsStream type class constraint. When running the stream we can choose the specific mode of composition. For example take a look at the following code.

import Streamly
import qualified Streamly.Prelude as S

composed :: (IsStream t, Monad (t IO)) => t IO ()
composed = do
sz <- sizes
cl <- colors
sh <- shapes
S.once $putStrLn$ show (sz, cl, sh)

where

sizes  = S.fromFoldable [1, 2, 3]
colors = S.fromFoldable ["red", "green", "blue"]
shapes = S.fromFoldable ["triangle", "square", "circle"]


Now we can interpret this in whatever way we want:

main = runStream . serially  $composed main = runStream . wSerially$ composed
main = runStream . asyncly   $composed main = runStream . wAsyncly$ composed
s2 = serially $d 3 <> d 4 main = do hSetBuffering stdout LineBuffering (S.toList . zipAsyncly$ (,) <$> s1 <*> s2) >>= print  This takes 7 seconds to zip, which is max (1,3) + max (2,4) because 1 and 3 are produced concurrently, and 2 and 4 are produced concurrently: ThreadId 32: Delay 1 ThreadId 32: Delay 2 ThreadId 33: Delay 3 ThreadId 33: Delay 4 [(1,3),(2,4)]  # Monad transformers To represent streams in an arbitrary monad use the more general monad transformer types for example the monad transformer type corresponding to the Serial type is SerialT. SerialT m a represents a stream of values of type a in some underlying monad m. For example, SerialT IO Int is a stream of Int in IO monad. In fact, the type Serial is a synonym for SerialT IO. Similarly we have monad transformer types for other stream types as well viz. WSerialT, AsyncT, WAsyncT and ParallelT. To lift a value from an underlying monad in a monad transformer stack into a singleton stream use lift and to lift from an IO action use liftIO. > runStream$ liftIO $putStrLn "Hello world!" Hello world! > runStream$ lift $putStrLn "Hello world!" Hello world!  # Concurrent Programming When writing concurrent programs there are two distinct places where the programmer can control the concurrency. First, when composing a stream by merging multiple streams we can choose an appropriate sum style operators to combine them concurrently or serially. Second, when processing a stream in a monadic composition we can choose one of the monad composition types to choose the desired type of concurrency. In the following example the squares of x and y are computed concurrently using the async operation and the square roots of their sum are computed serially because of the streamly combinator. We can choose different combinators for the monadic processing and the stream generation, to control the concurrency. We can also use the asyncly combinator instead of explicitly folding with async. import Streamly import qualified Streamly.Prelude as S import Data.List (sum) main = do z <- S.toList$ serially     -- Serial monadic processing (sqrt below)
$do x2 <- forEachWith async [1..100]$ -- Concurrent "for" loop
\x -> return $x * x -- body of the loop y2 <- forEachWith async [1..100]$
\y -> return $y * y return$ sqrt (x2 + y2)
print $sum z  We can see how this directly maps to the imperative style OpenMP model, we use combinators and operators instead of the ugly pragmas. For more concurrent programming examples see, "ListDir.hs", "MergeSort.hs" and "SearchQuery.hs" in the examples directory. # Reactive Programming Reactive programming is nothing but concurrent streaming which is what streamly is all about. With streamly we can generate streams of events, merge streams that are generated concurrently and process events concurrently. We can do all this without any knowledge about the specifics of the implementation of concurrency. In the following example you will see that the code is just regular Haskell code without much streamly APIs used (active hyperlinks are the streamly APIs) and yet it is a reactive application. This application has two independent and concurrent sources of event streams, acidRain and userAction. acidRain continuously generates events that deteriorate the health of the character in the game. userAction can be "potion" or "quit". When the user types "potion" the health improves and the game continues. {-# LANGUAGE FlexibleContexts #-} import Streamly import Control.Concurrent (threadDelay) import Control.Monad (when) import Control.Monad.IO.Class (MonadIO(..)) import Control.Monad.State (MonadState, get, modify, runStateT) import Data.Semigroup (cycle1) data Event = Harm Int | Heal Int | Quit deriving (Show) userAction :: MonadIO m => SerialT m Event userAction = cycle1$ liftIO askUser
where
command <- getLine
case command of
"potion" -> return (Heal 10)
"quit"   -> return  Quit
_        -> putStrLn "What?" >> askUser

acidRain :: MonadIO m => SerialT m Event
acidRain = cycle1 $liftIO (threadDelay 1000000) >> return (Harm 1) game :: (MonadAsync m, MonadState Int m) => SerialT m () game = do event <- userAction parallel acidRain case event of Harm n -> modify$ \h -> h - n
Heal n -> modify $\h -> h + n Quit -> fail "quit" h <- get when (h <= 0)$ fail "You die!"
liftIO $putStrLn$ "Health = " ++ show h

main = do
putStrLn "Your health is deteriorating due to acid rain,\
\ type \"potion\" or \"quit\""
_ <- runStateT (runStream game) 60
return ()


You can also find the source of this example in the examples directory as "AcidRain.hs". It has been adapted from Gabriel's pipes-concurrency package. This is much simpler compared to the pipes version because of the builtin concurrency in streamly. You can also find a SDL based reactive programming example adapted from Yampa in Streamly.Examples.CirclingSquare.

# Performance

Streamly is highly optimized for performance, it is designed for serious high performing, concurrent and scalable applications. We have created the streaming-benchmarks package which is specifically and carefully designed to measure the performance of Haskell streaming libraries fairly and squarely in the right way. Streamly performs at par or even better than most streaming libraries for serial operations even though it needs to deal with the concurrency capability.

# Interoperation with Streaming Libraries

We can use unfoldr and uncons to convert one streaming type to another.

Interop with vector:

import Streamly
import qualified Streamly.Prelude as S

main = do
-- streamly to vector
V.toList (V.unfoldrM S.uncons (S.fromFoldable [1..3])) >>= print

-- vector to streamly
S.toList (S.unfoldrM unconsV (V.fromList [1..3])) >>= print

where
unconsV v = do
r <- V.null v
if r
then return Nothing
else do
return $Just (h, V.tail v)  Interop with pipes: import Streamly import qualified Streamly.Prelude as S import qualified Pipes as P import qualified Pipes.Prelude as P main = do -- streamly to pipe P.toListM (P.unfoldr unconsS (S.fromFoldable [1..3])) >>= print -- pipe to streamly S.toList (S.unfoldrM unconsP (P.each [1..3])) >>= print where -- Adapt P.next to return a Maybe instead of Either unconsP p = P.next p >>= either (\_ -> return Nothing) (return . Just) -- Adapt S.uncons to return an Either instead of Maybe unconsS s = S.uncons s >>= maybe (return$ Left ()) (return . Right)


Interop with streaming:

import Streamly
import qualified Streamly.Prelude as S
import qualified Streaming as SG
import qualified Streaming.Prelude as SG

main = do
-- streamly to streaming
SG.toList (SG.unfoldr unconsS (S.fromFoldable [1..3])) >>= print

-- streaming to streamly
S.toList (S.unfoldrM SG.uncons (SG.each [1..3])) >>= print

where

unconsS s = S.uncons s >>= maybe (return \$ Left ()) (return . Right)


Interop with conduit:

import Streamly
import qualified Streamly.Prelude as S
import qualified Data.Conduit as C
import qualified Data.Conduit.List as C
import qualified Data.Conduit.Combinators as C

main = do
-- streamly to conduit
C.runConduit (C.unfoldM S.uncons (S.fromFoldable [1..3]) C..| C.sinkList) >>= print

-- It seems there is no way out of a conduit as it does not provide an
-- uncons or a tail function.


# Comparison with Existing Packages

List transformers and logic programming monads also provide a product style composition similar to streamly, however streamly generalizes it with the time dimension; allowing streams to be composed in an asynchronous and concurrent fashion in many different ways. It also provides multiple alternative ways of composing streams e.g. serial, interleaved or concurrent.

This seemingly simple addition of asynchronicity and concurrency to product style streaming composition unifies a number of disparate abstractions into one powerful, concise and elegant abstraction. A wide variety of programming problems can be solved elegantly with this abstraction. In particular, it unifies three major programming domains namely non-deterministic (logic) programming, concurrent programming and functional reactive programming. In other words, you can do everything with this one abstraction that you could do with the popular libraries listed under these categories in the list below.

+-----------------+----------------+
| Non-determinism | pipes          |
|                 +----------------+
|                 | list-t         |
|                 +----------------+
|                 | logict         |
+-----------------+----------------+
| Streaming       | vector         |
|                 +----------------+
|                 | streaming      |
|                 +----------------+
|                 | pipes          |
|                 +----------------+
|                 | conduit        |
+-----------------+----------------+
| Concurrency     | async          |
|                 +----------------+
|                 | transient      |
+-----------------+----------------+
| FRP             | Yampa          |
|                 +----------------+
|                 | dunai          |
|                 +----------------+
|                 | reflex         |
+-----------------+----------------+


Streamly is a list-transformer. It provides all the functionality provided by any of the list transformer and logic programming packages listed above. In addition, Streamly naturally integrates the concurrency dimension to the basic list transformer functionality.

When it comes to streaming, in terms of the streaming API streamly is almost identical to the vector package. Streamly, vector and streaming packages all represent a stream as data and are therefore similar in the fundamental approach to streaming. The fundamental difference is that streamly adds concurrency support and the monad instance provides concurrent looping. Other streaming libraries like pipes, conduit and machines represent and compose stream processors rather than the stream data and therefore fall in another class of streaming libraries and have comparatively more complicated types.

When it comes to concurrency, streamly can do everything that the async package can do and more. async provides applicative concurrency whereas streamly provides both applicative and monadic concurrency. The ZipAsync type behaves like the applicative instance of async. In comparison to transient streamly has a first class streaming interface and is a monad transformer that can be used universally in any Haskell monad transformer stack. Streamly was in fact originally inspired by the concurrency implementation in transient though it has no resemblence with that and takes a lazy pull approach versus transient's strict push approach.

The non-determinism, concurrency and streaming combination make streamly a strong FRP capable library as well. FRP is fundamentally stream of events that can be processed concurrently. The example in this tutorial as well as the Streamly.Examples.CirclingSquare example from Yampa demonstrate the basic FRP capability of streamly. In core concepts streamly is strikingly similar to dunai. dunai was designed from a FRP perspective and streamly was originally designed from a concurrency perspective. However, both have similarity at the core.

# Where to go next?

• Read the documentation of Streamly module
• Read the documentation of Streamly.Prelude module
• See the examples in the "examples" directory of the package
• See the tests in the "test" directory of the package