streamly: Beautiful Streaming, Concurrent and Reactive Composition

[ array, bsd3, concurrency, dataflow, filesystem, library, list, logic, network, non-determinism, parsing, pipes, reactivity, streaming, streamly, time, unicode ] [ Propose Tags ]

Streamly is a monad transformer unifying non-determinism (list-t/logict), concurrency (async), streaming (conduit/pipes), and FRP (Yampa/reflex) functionality in a concise and intuitive API. High level concurrency makes concurrent applications almost indistinguishable from non-concurrent ones. By changing a single combinator you can control whether the code runs serially or concurrently. It naturally integrates concurrency with streaming rather than adding it as an afterthought. Moreover, it interworks with the popular streaming libraries.

See the README for an overview and the haddock documentation for full reference. It is recommended to read the comprehensive tutorial module Streamly.Tutorial first. Also see Streamly.Examples for some working examples.


[Skip to Readme]

Flags

Manual Flags

NameDescriptionDefault
dev

Build development version

Disabled
extra-benchmarks

Include comparative benchmarks

Disabled
examples

Build examples

Disabled
examples-sdl

Include examples that use SDL dependency

Disabled

Use -f <flag> to enable a flag, or -f -<flag> to disable that flag. More info

Downloads

Note: This package has metadata revisions in the cabal description newer than included in the tarball. To unpack the package including the revisions, use 'cabal get'.

Maintainer's Corner

Package maintainers

For package maintainers and hackage trustees

Candidates

Versions [RSS] 0.1.0, 0.1.1, 0.1.2, 0.2.0, 0.2.1, 0.3.0, 0.4.0, 0.4.1, 0.5.0, 0.5.1, 0.5.2, 0.6.0, 0.6.1, 0.7.0, 0.7.1, 0.7.2, 0.7.3, 0.7.3.1, 0.7.3.2, 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.3, 0.9.0, 0.10.0, 0.10.1
Change log Changelog.md
Dependencies atomic-primops (>=0.8 && <0.9), base (>=4.8 && <5), containers (>=0.5 && <0.6), exceptions (>=0.8 && <0.9), lifted-base (>=0.2 && <0.3), lockfree-queue (>=0.2.3 && <0.3), monad-control (>=1.0 && <2), mtl (>=2.2 && <3), semigroups (>=0.18 && <0.19), stm (>=2.4.3 && <2.5), transformers (>=0.4 && <0.6), transformers-base (>=0.4 && <0.5) [details]
License BSD-3-Clause
Copyright 2017 Harendra Kumar
Author Harendra Kumar
Maintainer harendra.kumar@gmail.com
Revised Revision 1 made by harendra at 2017-12-05T15:36:45Z
Category Control, Concurrency, Streaming, Reactivity
Home page https://github.com/composewell/streamly
Bug tracker https://github.com/composewell/streamly/issues
Source repo head: git clone https://github.com/composewell/streamly
Uploaded by harendra at 2017-12-05T15:16:20Z
Distributions LTSHaskell:0.10.1, NixOS:0.10.1, Stackage:0.10.1
Reverse Dependencies 33 direct, 4 indirect [details]
Executables parallel-loops, nested-loops, loops
Downloads 17039 total (280 in the last 30 days)
Rating 2.5 (votes: 6) [estimated by Bayesian average]
Your Rating
  • λ
  • λ
  • λ
Status Docs available [build log]
Last success reported on 2017-12-05 [all 1 reports]

Readme for streamly-0.1.0

[back to package description]

Streamly

Gitter chat Build Status Windows Build status Coverage Status

Streaming Concurrently

Streamly is a monad transformer unifying non-determinism (list-t/logict), concurrency (async), streaming (conduit/pipes), and FRP (Yampa/reflex) functionality in a concise and intuitive API. High level concurrency makes concurrent applications almost indistinguishable from non-concurrent ones. By changing a single combinator you can control whether the code runs serially or concurrently. It naturally integrates concurrency with streaming rather than adding it as an afterthought. Moreover, it interworks with the popular streaming libraries.

See the haddock documentation for full reference. It is recommended to read the comprehensive tutorial module Streamly.Tutorial first. Also see Streamly.Examples for some working examples.

Non-determinism

The monad instance composes like a list monad.

loops = $ do
    x <- each [1,2]
    y <- each [3,4]
    liftIO $ putStrLn $ show (x, y)

main = runStreaming $ serially $ loops
(1,3)
(1,4)
(2,3)
(2,4)

Magical Concurrency

To run the above code with demand-driven concurrency i.e. each iteration in the loops can run concurrently depending on the consumer rate:

main = runStreaming $ asyncly $ loops

To run it with full parallelism irrespective of demand:

main = runStreaming $ parallely $ loops

To run it serially but interleaving the outer and inner loop iterations:

main = runStreaming $ interleaving $ loops

You can fold multiple streams or IO actions using parallel combinators like <|, <|>. For example, to concurrently generate the squares and then concurrently sum the square roots of all combinations:

main = do
  print $ sum $ asyncly $ do
      -- Squaring is concurrent (<|)
      x2 <- forEachWith (<|) [1..100] $ \x -> return $ x * x
      y2 <- forEachWith (<|) [1..100] $ \y -> return $ y * y
      -- sqrt is concurrent (asyncly)
      return $ sqrt (x2 + y2)

Of course, the actions running in parallel could be arbitrary IO actions. To concurrently list the contents of a directory tree recursively:

import Path.IO (listDir, getCurrentDir)
import Streamly

main = runStreaming $ serially $ getCurrentDir >>= readdir
   where readdir d = do
            (dirs, files) <- lift $ listDir d
            liftIO $ mapM_ putStrLn $ map show files
            -- read the subdirs concurrently
            foldMapWith (<|>) readdir dirs

In the above examples we do not think in terms of threads, locking or synchronization, rather we think in terms of what can run in parallel, the rest is taken care of automatically. With asyncly and <| the programmer does not have to worry about how many threads are to be created they are automatically adjusted based on the demand of the consumer.

The concurrency facilities provided by streamly can be compared with OpenMP and Cilk but with a more declarative expression. Concurrency support does not compromise performance in non-concurrent cases, the performance of the library is at par or better than most of the existing streaming libraries.

Streaming

Streaming is effortless, simple and straightforward. Streamly data type behaves just like a list and combinators are provided in Streamly.Prelude to transform or fold streamly streams. Unlike other libraries and like streaming library the combinators explicitly consume a stream and produce a stream, therefore, no special operator is needed to join stream stages, just a forward ($) or reverse (&) function application operator is enough.

import Streamly
import Streamly.Prelude as S
import Data.Function ((&))

main = S.each [1..10]
     & fmap (+ 1)
     & S.drop 2
     & S.filter even
     & fmap (* 3)
     & S.takeWhile (< 25)
     & S.mapM (\x -> putStrLn ("saw " ++ show x) >> return x)
     & S.toList . serially
     >>= print

Fold style combinators can be used to fold purely or monadically. You can also use the beautiful foldl library for folding.

main = S.each [1..10]
     & serially
     & S.foldl (+) 0 id
     >>= print

Streams can be combined together in multiple ways:

return 1 <> return 2               -- serial, combine atoms
S.each [1..10] <> S.each [11..20]  -- serial
S.each [1..10] <| S.each [11..20]  -- demand driven parallel
S.each [1..10] <=> S.each [11..20] -- serial but interleaved
S.each [1..10] <|> S.each [11..20] -- fully parallel

As we have already seen streams can be combined using monadic composition in a non-deterministic manner. This allows arbitrary manipulation and combining of streams. See Streamly.Examples.MergeSortedStreams for a more complicated example.

Reactive Programming (FRP)

Streamly is a foundation for first class reactive programming as well by virtue of integrating concurrency and streaming. See Streamly.Examples.AcidRainGame and Streamly.Examples.CirclingSquare for an SDL based animation example.

Contributing

The code is available under BSD-3 license on github. Join the gitter chat channel for discussions. All contributions are welcome!

This library was originally inspired by the transient package authored by Alberto G. Corona.