Copyright | (c) 2017 Harendra Kumar |
---|---|
License | BSD3 |
Maintainer | harendra.kumar@gmail.com |
Safe Haskell | None |
Language | Haskell2010 |
Streamly, short for stream concurrently, combines the essence of non-determinism, streaming and concurrency in functional programming. Concurrent and non-concurrent applications are almost indistinguisable, concurrency capability does not at all impact the performance of non-concurrent case. Streaming enables writing modular, composable and scalable applications with ease and concurrency allows you to make them scale and perform well. Streamly enables writing concurrent applications without being aware of threads or synchronization. No explicit thread control is needed, where applicable the concurrency rate is automatically controlled based on the demand by the consumer. However, combinators are provided to fine tune the concurrency control. Streaming and concurrency together enable expressing reactive applications conveniently. See Streamly.Examples for a simple SDL based FRP example.
Streamly streams are very much like the Haskell lists and most of the
functions that work on lists have a counterpart that works on streams.
However, streamly streams can be generated, consumed or combined
concurrently. In this tutorial we will go over the basic concepts and how to
use the library. The documentation of Streamly
module has more details on
core APIs. For more APIs for constructing, folding, filtering, mapping and
zipping etc. see the documentation of Streamly.Prelude module. For
examples and other ways to use the library see the module
Streamly.Examples as well.
Streams
Streamly provides many different stream types depending on the desired
composition style. The simplest type is StreamT
. StreamT
is a monad
transformer, the type StreamT m a
represents a stream of values of type
a
in some underlying monad m
. For example, StreamT IO Int
is a stream
of Int
in IO
monad.
Generating Streams
Pure values can be placed into the stream type using return
or pure
.
Effects in the IO monad can be lifted to the stream type using the liftIO
combinator. In a transformer stack we can lift actions from the lower monad
using the lift
combinator. Some examples of streams with a single element:
return 1 :: StreamT
IO Int
liftIO $ putStrLn "Hello world!" :: StreamT
IO ()
We can combine streams using <>
to create streams of many elements:
return 1 <> return 2 <> return 3 :: StreamT
IO Int
For more ways to construct or generate a stream see the module Streamly.Prelude.
Eliminating Streams
runStreamT
runs a composed StreamT
computation, lowering the type into
the underlying monad and discarding the result stream:
import Streamly
main = runStreamT
$ liftIO $ putStrLn "Hello world!"
toList
runs a stream computation and collects the result stream in a list
in the underlying monad. toList
is a polymorphic function that works on
multiple stream types belonging to the class Streaming
. Therefore, before
you run a stream you need to tell how you want to interpret the stream by
using one of the stream type combinators (serially
, asyncly
, parallely
etc.). The combinator serially
is equivalent to annotating the type as ::
StreamT
.
import Streamly import Streamly.Prelude main = do xs <-toList
$serially
$ return 1 <> return 2 print xs
For other ways to eliminate or fold a stream see the module Streamly.Prelude.
Combining Streams
Semigroup Style
Streams of the same type can be combined into a composite stream in many
different ways using one of the semigroup style binary composition operators
i.e. <>
, <=>
, <|
, <|>
, mplus
. These operators work on all stream
types (StreamT
, AsyncT
etc.) uniformly.
To illustrate the concurrent aspects, we will use the following delay
function to introduce a delay specified in seconds.
import Streamly import Control.Concurrent delay n = liftIO $ do threadDelay (n * 1000000) tid <- myThreadId putStrLn (show tid ++ ": Delay " ++ show n)
Serial composition (<>
)
We have already seen, the <>
operator. It composes two streams in series
i.e. the first stream is completely exhausted and then the second stream is
processed. The following example prints the sequence 3, 2, 1 and takes a
total of 6 seconds because everything is serial:
main = runStreamT
$ delay 3 <> delay 2 <> delay 1
ThreadId 36: Delay 3 ThreadId 36: Delay 2 ThreadId 36: Delay 1
Async composition (<|
)
The <|
operator can run both computations concurrently, when needed.
In the following example since the first computation blocks we start the
next one in a separate thread and so on:
main =runStreamT
$ delay 3<|
delay 2<|
delay 1
ThreadId 42: Delay 1 ThreadId 41: Delay 2 ThreadId 40: Delay 3
This is the concurrent version of the <>
operator. The computations are
triggered in the same order as <>
except that they are concurrent. When
we have a tree of computations composed using this operator, the tree is
traversed in DFS style just like <>
.
main =runStreamT
$ (p 1<|
p 2)<|
(p 3<|
p 4) where p = liftIO . print
1 2 3 4
Concurrency provided by this operator is demand driven. The second computation is run concurrently with the first only if the first computation is not producing enough output to keep the stream consumer busy otherwise the second computation is run serially after the previous one. The number of concurrent threads is adapted dynamically based on the pull rate of the consumer of the stream. As you can see, in the following example the computations are run in a single thread one after another, because none of them blocks. However, if the thread consuming the stream were faster than the producer then it would have started parallel threads for each computation to keep up even if none of them blocks:
main =runStreamT
$ traced (sqrt 9)<|
traced (sqrt 16)<|
traced (sqrt 25)
ThreadId 40 ThreadId 40 ThreadId 40
Since the concurrency provided by this operator is demand driven it cannot
be used when the composed computations have timers that are relative to each
other because all computations may not be started at the same time and
therefore timers in all of them may not start at the same time. When
relative timing among all computations is important or when we need to start
all computations at once for some reason <|>
must be used instead.
However, <|
is useful in situations when we want to optimally utilize the
resources and we know that the computations can run in parallel but we do
not care if they actually run in parallel or not, that decision is left to
the scheduler. Also, note that this operator can be used to fold infinite
containers in contrast to <|>
, because it does not require us to run all
of them at the same time.
The left bias (or the DFS style) of the operator <|
is suggested by its
shape. You can also think of this as an unbalanced version of the fairly
parallel operator <|>
.
Interleaved composition (<=>
)
The <=>
operator is serial like <>
but it interleaves the two streams
i.e. it yields one element from the first stream and then one element from
the second stream, and so on. The following example prints the sequence 1,
3, 2, 4 and takes a total of 10 seconds because everything is serial:
main =runStreamT
$ (delay 1 <> delay 2)<=>
(delay 3 <> delay 4)
ThreadId 36: Delay 1 ThreadId 36: Delay 3 ThreadId 36: Delay 2 ThreadId 36: Delay 4
Note that this operator cannot be used to fold infinite containers since it requires preserving the state until a stream is finished. To be clear, it can combine infinite streams but not infinite number of streams.
Fair Concurrent composition (<|>
)
The Alternative
composition operator <|>
, like <|
, runs the composed
computations concurrently. However, unlike <|
it runs all of the
computations in fairly parallel manner using a round robin scheduling
mechanism. This can be considered as the concurrent version of the fairly
interleaved serial operation <=>
. Note that this cannot be used on
infinite containers, as it will lead to an infinite sized scheduling queue.
The following example sends a query to three search engines in parallel and prints the name of the search engine as a response arrives:
import Streamly
import Network.HTTP.Simple
main = runStreamT
$ google <|> bing <|> duckduckgo
where
google = get "https://www.google.com/search?q=haskell"
bing = get "https://www.bing.com/search?q=haskell"
duckduckgo = get "https://www.duckduckgo.com/?q=haskell"
get s = liftIO (httpNoBody (parseRequest_ s) >> putStrLn (show s))
Custom composition
The async
API can be used to create references to asynchronously running
stream computations. We can then use uncons
to explore the streams
arbitrarily and then recompose individual elements to create a new stream.
This way we can dynamically decide which stream to explore at any given
time. Take an example of a merge sort of two sorted streams. We need to
keep consuming items from the stream which has the lowest item in the sort
order. This can be achieved using async references to streams. See
Streamly.Examples.MergeSortedStreams.
Monoid Style
Each of the semigroup compositions described has an identity that can be
used to fold a possibly empty container. An empty stream is represented by
nil
which can be represented in various standard forms as mempty
,
empty
or mzero
.
Some fold utilities are also provided by the library for convenience:
foldWith
folds aFoldable
container of stream computations using the given composition operator.foldMapWith
folds like foldWith but also maps a function before folding.forEachWith
is like foldMapwith but the container argument comes before the function argument.- The
each
primitive from Streamly.Prelude folds aFoldable
container using the<>
operator:
All of the following are equivalent:
import Streamly import Streamly.Prelude main = dotoList
.serially
$foldWith
(<>) (map return [1..10]) >>= printtoList
.serially
$foldMapWith
(<>) return [1..10] >>= printtoList
.serially
$forEachWith
(<>) [1..10] return >>= printtoList
.serially
$each
[1..10] >>= print
Transforming Streams
The previous section discussed ways to merge the elements of two streams
without doing any transformation on them. In this section we will explore
how to transform streams using Functor
, Applicative
or Monad
style
compositions. The applicative and monad composition of all Streaming
types
behave exactly the same way as a list transformer. For simplicity of
illustration we are using streams of pure values in the following examples.
However, the real application of streams arises when these streams are
generated using monadic actions.
Monad
In functional programmer's parlance the Monad
instance of Streaming
types implement non-determinism, exploring all possible combination of
choices from both the streams. From an imperative programmer's point of view
it behaves like nested loops i.e. for each element in the first stream and
for each element in the second stream apply the body of the loop. If you are
familiar with list transformer this behavior is exactly the same as that of
a list transformer.
Just like we saw in sum style compositions earlier, monadic composition also has multiple variants each of which exactly corresponds to one of the sum style composition variant.
Serial Composition (StreamT
)
When we interpret the monadic composition as StreamT
we get a standard
list transformer like serial composition.
import Streamly import Streamly.Prelude main =runStreamT
$ do x <-each
[3,2,1] delay x
ThreadId 30: Delay 3 ThreadId 30: Delay 2 ThreadId 30: Delay 1
As you can see the code after the each
statement is run three times, once
for each value of x
. All the three iterations are serial and run in the
same thread one after another. When compared to imperative programming, this
can also be viewed as a for
loop with three iterations.
A console echo loop copying standard input to standard output can simply be written like this:
import Streamly
import Data.Semigroup (cycle1)
main = runStreamT
$ cycle1 (liftIO getLine) >>= liftIO . putStrLn
When multiple streams are composed using this style they nest in a DFS
manner i.e. nested iterations of an iteration are executed before we proceed
to the next iteration at higher level. This behaves just like nested for
loops in imperative programming.
import Streamly import Streamly.Prelude main =runStreamT
$ do x <-each
[1,2] y <-each
[3,4] liftIO $ putStrLn $ show (x, y)
(1,3) (1,4) (2,3) (2,4)
You will also notice that this is the monadic equivalent of the sum style
composition using <>
.
Async Composition (AsyncT
)
When we interpret the monadic composition as AsyncT
we get a concurrent
list-transformer like composition. Multiple monadic continuations (or loop
iterations) may be started concurrently. Concurrency is demand driven
i.e. more concurrent iterations are started only if the previous iterations
are not able to produce enough output for the consumer of the output stream.
This is the concurrent version of StreamT
.
import Streamly import Streamly.Prelude main =runAsyncT
$ do x <-each
[3,2,1] delay x
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
As you can see the code after the each
statement is run three times, once
for each value of x
. All the three iterations are concurrent and run in
different threads. The iteration with least delay finishes first. When
compared to imperative programming, this can be viewed as a for
loop
with three concurrent iterations.
Concurrency is demand driven just as in the case of <|
. When multiple
streams are composed using this style the iterations are triggered in a DFS
manner just like StreamT
i.e. nested iterations are executed before we
proceed to the next iteration at higher level. However, unlike StreamT
more than one iterations may be started concurrently, and based on the
demand from the consumer.
import Streamly import Streamly.Prelude main =runAsyncT
$ do x <-each
[1,2] y <-each
[3,4] liftIO $ putStrLn $ show (x, y)
(1,3) (1,4) (2,3) (2,4)
You will notice that this is the monadic equivalent of the <|
style
sum composition. The same caveats apply to this as the <|
operation.
Interleaved Composition (InterleavedT
)
When we interpret the monadic composition as InterleavedT
we get a serial
but fairly interleaved list-transformer like composition. The monadic
continuations or iterations of the outer loop are fairly interleaved with
the continuations or iterations of the inner loop.
import Streamly import Streamly.Prelude main =runInterleavedT
$ do x <-each
[1,2] y <-each
[3,4] liftIO $ putStrLn $ show (x, y)
(1,3) (2,3) (1,4) (2,4)
You will notice that this is the monadic equivalent of the <=>
style
sum composition. The same caveats apply to this as the <=>
operation.
Fair Concurrent Composition (ParallelT
)
When we interpret the monadic composition as ParallelT
we get a
concurrent list-transformer like composition just like AsyncT
. The
difference is that this is fully parallel with all iterations starting
concurrently instead of the demand driven concurrency of AsyncT
.
import Streamly import Streamly.Prelude main =runParallelT
$ do x <-each
[3,2,1] delay x
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
You will notice that this is the monadic equivalent of the <|>
style
sum composition. The same caveats apply to this as the <|>
operation.
Exercise
The streamly code is usually written in a way that is agnostic of the
specific monadic composition type. We use a polymorphic type with a
Streaming
type class constraint. When running the stream we can choose the
specific mode of composition. For example look at the following code.
import Streamly import Streamly.Prelude composed ::Streaming
t => t m a composed = do sz <- sizes cl <- colors sh <- shapes liftIO $ putStrLn $ show (sz, cl, sh) where sizes =each
[1, 2, 3] colors =each
["red", "green", "blue"] shapes =each
["triangle", "square", "circle"]
Now we can interpret this in whatever way we want:
main =runStreamT
composed main =runAsyncT
composed main =runInterleavedT
composed main =runParallelT
composed
Equivalently, we can also write it using the type adapter combinators, like this:
main =runStreaming
$serially
$ composed main =runStreaming
$asyncly
$ composed main =runStreaming
$interleaving
$ composed main =runStreaming
$parallely
$ composed
As an exercise try to figure out the output of this code for each mode of composition.
Applicative
Applicative is precisely the same as the ap
operation of Monad
. For
zipping and parallel applicatives separate types ZipStream
and ZipAsync
are provided.
The following example runs all iterations serially and takes a total 17 seconds (1 + 3 + 4 + 2 + 3 + 4):
import Streamly import Streamly.Prelude import Control.Concurrent s1 = d 1 <> d 2 s2 = d 3 <> d 4 d n = delay n >> return n main = (toList
.serially
$ (,) <$> s1 <*> s2) >>= print
ThreadId 36: Delay 1 ThreadId 36: Delay 3 ThreadId 36: Delay 4 ThreadId 36: Delay 2 ThreadId 36: Delay 3 ThreadId 36: Delay 4 [(1,3),(1,4),(2,3),(2,4)]
Similalrly interleaving runs the iterations in an interleaved order but since it is serial it takes a total of 17 seconds:
main = (toList
.interleaving
$ (,) <$> s1 <*> s2) >>= print
ThreadId 36: Delay 1 ThreadId 36: Delay 3 ThreadId 36: Delay 2 ThreadId 36: Delay 3 ThreadId 36: Delay 4 ThreadId 36: Delay 4 [(1,3),(2,3),(1,4),(2,4)]
AsyncT
can run the iterations concurrently and therefore takes a total
of 10 seconds (1 + 2 + 3 + 4):
main = (toList
.asyncly
$ (,) <$> s1 <*> s2) >>= print
ThreadId 34: Delay 1 ThreadId 36: Delay 2 ThreadId 35: Delay 3 ThreadId 36: Delay 3 ThreadId 35: Delay 4 ThreadId 36: Delay 4 [(1,3),(2,3),(1,4),(2,4)]
Similalrly ParallelT
as well can run the iterations concurrently and
therefore takes a total of 10 seconds (1 + 2 + 3 + 4):
main = (toList
.parallely
$ (,) <$> s1 <*> s2) >>= print
ThreadId 34: Delay 1 ThreadId 36: Delay 2 ThreadId 35: Delay 3 ThreadId 36: Delay 3 ThreadId 35: Delay 4 ThreadId 36: Delay 4 [(1,3),(2,3),(1,4),(2,4)]
Functor
fmap
transforms a stream by mapping a function on all elements of the
stream. The functor instance of each stream type defines fmap
to be
precisely the same as liftM
, and therefore fmap
is always serial
irrespective of the type. For concurrent mapping, alternative versions of
fmap
, namely, asyncMap
and parMap
are provided.
import Streamly main = (toList
$serially
$ fmap show $each
[1..10]) >>= print
Also see the mapM
and sequence
functions for mapping actions, in the
Streamly.Prelude module.
Zipping Streams
Zipping is a special transformation where the corresponding elements of two streams are combined together using a zip function producing a new stream of outputs. Two different types are provided for serial and concurrent zipping. These types provide an applicative instance that zips the argument streams. Also see the zipping function in the Streamly.Prelude module.
Serial Zipping
ZipStream
zips streams serially:
import Streamly import Streamly.Prelude import Control.Concurrent d n = delay n >> return n s1 =adapt
.serially
$ d 1 <> d 2 s2 =adapt
.serially
$ d 3 <> d 4 main = (toList
.zipping
$ (,) <$> s1 <*> s2) >>= print
This takes total 10 seconds to zip, which is (1 + 2 + 3 + 4) since everything runs serially:
ThreadId 29: Delay 1 ThreadId 29: Delay 3 ThreadId 29: Delay 2 ThreadId 29: Delay 4 [(1,3),(2,4)]
Parallel Zipping
ZipAsync
zips streams concurrently:
import Streamly import Streamly.Prelude import Control.Concurrent import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering)) d n = delay n >> return n s1 =adapt
.serially
$ d 1 <> d 2 s2 =adapt
.serially
$ d 3 <> d 4 main = do liftIO $ hSetBuffering stdout LineBuffering (toList
.zippingAsync
$ (,) <$> s1 <*> s2) >>= print
This takes 7 seconds to zip, which is max (1,3) + max (2,4) because 1 and 3 are produced concurrently, and 2 and 4 are produced concurrently:
ThreadId 32: Delay 1 ThreadId 32: Delay 2 ThreadId 33: Delay 3 ThreadId 33: Delay 4 [(1,3),(2,4)]
Summary of Compositions
The following table summarizes the types for monadic compositions and the operators for sum style compositions. This table captures the essence of streamly.
+-----+--------------+------------+ | | Serial | Concurrent | +=====+==============+============+ | DFS |StreamT
|AsyncT
| | +--------------+------------+ | |<>
|<|
| +-----+--------------+------------+ | BFS |InterleavedT
|ParallelT
| | +--------------+------------+ | |<=>
|<|>
| +-----+--------------+------------+
Concurrent Programming
When writing concurrent programs there are two distinct places where the programmer chooses the type of concurrency. First, when generating a stream by combining other streams we can use one of the sum style operators to combine them concurrently or serially. Second, when processing a stream in a monadic composition we can choose one of the monad composition types to choose the desired type of concurrency.
In the following example the squares of x
and y
are computed
concurrently using the <|
operator and the square roots of their sum are
also computed concurrently by using the asyncly
combinator. We can choose
different combinators e.g. <>
and serially
, to control the concurrency.
import Streamly import Streamly.Prelude (toList) import Data.List (sum) main = do z <-toList
$asyncly
-- Concurrent monadic processing (sqrt below) $ do x2 <-forEachWith
(<|
) [1..100] $ -- Concurrent"for"
loop \x -> return $ x * x -- body of the loop y2 <-forEachWith
(<|
) [1..100] $ \y -> return $ y * y return $ sqrt (x2 + y2) print $ sum z
You can see how this directly maps to the imperative style OpenMP model, we use combinators and operators instead of the ugly pragmas.
For more concurrent programming examples see, Streamly.Examples.ListDirRecursive, Streamly.Examples.MergeSortedStreams and Streamly.Examples.SearchEngineQuery.
Reactive Programming
Reactive programming is nothing but concurrent streaming which is what streamly is all about. With streamly we can generate streams of events, merge streams that are generated concurrently and process events concurrently. We can do all this without any knowledge about the specifics of the implementation of concurrency. In the following example you will see that the code is just regular Haskell code without much streamly APIs used (active hyperlinks are the streamly APIs) and yet it is a reactive application.
This application has two independent and concurrent sources of event
streams, acidRain
and userAction
. acidRain
continuously generates
events that deteriorate the health of the game character. userAction
can
be "potion" or "quit". When the user types "potion" the health improves and
the game continues.
{-# LANGUAGE FlexibleContexts #-} import Streamly import Control.Concurrent (threadDelay) import Control.Monad (when) import Control.Monad.State import Data.Semigroup (cycle1) data Event = Harm Int | Heal Int | Quit deriving (Show) userAction :: MonadIO m =>StreamT
m Event userAction = cycle1 $ liftIO askUser where askUser = do command <- getLine case command of "potion" -> return (Heal 10) "quit" -> return Quit _ -> putStrLn "What?" >> askUser acidRain :: MonadIO m =>StreamT
m Event acidRain = cycle1 $ liftIO (threadDelay 1000000) >> return (Harm 1) game :: (MonadAsync
m, MonadState Int m) =>StreamT
m () game = do event <- userAction <|> acidRain case event of Harm n -> modify $ \h -> h - n Heal n -> modify $ \h -> h + n Quit -> fail "quit" h <- get when (h <= 0) $ fail "You die!" liftIO $ putStrLn $ "Health = " ++ show h main = do putStrLn "Your health is deteriorating due to acid rain,\ \ type \"potion\" or \"quit\"" _ <- runStateT (runStreamT
game) 60 return ()
You can also find the source of this example in Streamly.Examples.AcidRainGame. It has been adapted from Gabriel's pipes-concurrency package. This is much simpler compared to the pipes version because of the builtin concurrency in streamly. You can also find a SDL based reactive programming example adapted from Yampa in Streamly.Examples.CirclingSquare.
Performance
Streamly is highly optimized for performance, it is designed for serious high performing, concurrent and scalable applications. We have created the streaming-benchmarks package which is specifically and carefully designed to measure the performance of Haskell streaming libraries fairly and squarely in the right way. Streamly performs at par or even better than most streaming libraries for common operations even though it needs to deal with the concurrency capability.
Interoperation with Streaming Libraries
We can use unfoldr
and uncons
to convert one streaming type to another.
We will assume the following common code to be available in the examples
demonstrated below.
import Streamly import Streamly.Prelude import System.IO (stdin) -- Adapt uncons to return an Either instead of Maybe unconsE s =uncons
s >>= maybe (return $ Left ()) (return . Right) stdinLn =serially
$fromHandle
stdin
Interop with pipes
:
import qualified Pipes as P import qualified Pipes.Prelude as P main = do -- streamly to pipe P.runEffect $ P.for (P.unfoldr unconsE stdinLn) (lift . putStrLn) -- pipe to streamly -- Adapt P.next to return a Maybe instead of Either let nextM p = P.next p >>= either (\_ -> return Nothing) (return . Just)runStreamT
$unfoldrM
nextM P.stdinLn >>= lift . putStrLn
Interop with streaming
:
import qualified Streaming as S
import qualified Streaming.Prelude as S
main = do
-- streamly to streaming
S.stdoutLn $ S.unfoldr unconsE stdinLn
-- streaming to streamly
runStreamT
$ unfoldrM S.uncons S.stdinLn >>= lift . putStrLn
Interop with conduit
:
import qualified Data.Conduit as C
import qualified Data.Conduit.List as C
import qualified Data.Conduit.Combinators as C
-- streamly to conduit
main = (C.unfoldM uncons
stdinLn) C.$$ C.print
Comparison with Existing Packages
Streamly unifies non-determinism, streaming, concurrency and FRP functionality that is otherwise covered by several disparate packages, and it does that with a surprisingly concise API. Here is a list of popular and well-known packages in all these areas:
+-----------------+----------------+ | Non-determinism | list-t | | +----------------+ | | logict | +-----------------+----------------+ | Streaming | streaming | | +----------------+ | | conduit | | +----------------+ | | pipes | | +----------------+ | | simple-conduit | +-----------------+----------------+ | Concurrency | async | | +----------------+ | | transient | +-----------------+----------------+ | FRP | Yampa | | +----------------+ | | dunai | | +----------------+ | | reflex | +-----------------+----------------+
Streamly covers all the functionality provided by both the non-determinism packages listed above and provides better performance in comparison to those. In fact, at the core streamly is a list transformer but it naturally integrates the concurrency dimension to the basic list transformer functionality.
When it comes to streaming, in terms of core concepts, simple-conduit
is
the package that is closest to streamly if we set aside the concurrency
dimension, both are streaming packages with list transformer like monad
composition. However, in terms of API it is more like the streaming
package. Streamly can be used to achieve more or less the functionality
provided by any of the streaming packages listed above. The types and API of
streamly are much simpler in comparison to conduit and pipes. It is more or
less like the standard Haskell list APIs.
When it comes to concurrency, streamly can do everything that the async
package can do and more. async provides applicative concurrency whereas
streamly provides both applicative and monadic concurrency. The ZipAsync
type behaves like the applicative instance of async. This work was
originally inspired by the concurrency implementation in transient
though
it has no resemblence with that. Streamly provides concurrency as transient
does but in a sort of dual manner, it can lazily stream the output. In
comparison to transient streamly has a first class streaming interface and
is a monad transformer that can be used universally in any Haskell monad
transformer stack.
The non-determinism, concurrency and streaming combination make streamly a
strong FRP capable library as well. FRP is fundamentally stream of events
that can be processed concurrently. The example in this tutorial as well as
the Streamly.Examples.CirclingSquare example from Yampa demonstrate the
basic FRP capability of streamly. In core concepts streamly is strikingly
similar to dunai
. dunai was designed from a FRP perspective and streamly
wa original designed from a concurrency perspective. However, both have
similarity at the core.