{-# OPTIONS_GHC -fno-warn-unused-imports #-} -- | -- Module : Streamly.Tutorial -- Copyright : (c) 2017 Harendra Kumar -- -- License : BSD3 -- Maintainer : streamly@composewell.com -- -- Streamly is a general computing framework based on concurrent data flow -- programming. The IO monad and pure lists are a special case of streamly. On -- one hand, streamly extends the lists of pure values to lists of monadic -- actions, on the other hand it extends the IO monad with concurrent -- non-determinism. In simple imperative terms we can say that streamly extends -- the IO monad with @for@ loops and nested @for@ loops with concurrency -- support. Hopefully, this analogy becomes clearer once you go through this -- tutorial. -- -- Streaming in general enables writing modular, composable and scalable -- applications with ease, and concurrency allows you to make them scale and -- perform well. Streamly enables writing scalable concurrent applications -- without being aware of threads or synchronization. No explicit thread -- control is needed. Where applicable, concurrency rate is automatically -- controlled based on the demand by the consumer. However, combinators can be -- used to fine tune the concurrency control. -- -- Streaming and concurrency together enable expressing reactive applications -- conveniently. See the @CirclingSquare@ example in the examples directory for -- a simple SDL based FRP example. To summarize, streamly provides a unified -- computing framework for streaming, non-determinism and functional reactive -- programming in an elegant and simple API that is a natural extension of pure -- lists to monadic streams. -- -- In this tutorial we will go over the basic concepts and how to use the -- library. Before you go through this tutorial we recommend that you take a -- look at: -- -- * The quick overview in the package . -- * The overview of streams and folds in the "Streamly" module. -- -- Once you finish this tutorial, see the last section for further reading -- resources. module Streamly.Tutorial ( -- * Stream Types -- $streams -- * Concurrent Streams -- $concurrentStreams -- * Combining Streams -- $flavors -- * Imports and Supporting Code -- $imports -- * Generating Streams -- $generating -- * Generating Streams Concurrently -- $generatingConcurrently -- * Eliminating Streams -- $eliminating -- * Concurrent Pipeline Stages -- $concurrentApplication -- * Transforming Streams -- $transformation -- * Mapping Concurrently -- $concurrentTransformation -- * Merging Streams -- ** Semigroup Style -- $semigroup -- *** Deep Serial Composition ('Serial') -- $serial -- *** Wide Serial Composition ('WSerial') -- $interleaved -- *** Deep Speculative Composition ('Ahead') -- $ahead -- *** Deep Asynchronous Composition ('Async') -- $async -- *** Wide Asynchronous Composition ('WAsync') -- $wasync -- *** Parallel Asynchronous Composition ('Parallel') -- $parallel -- XXX we should deprecate and remove the mkAsync API -- Custom composition -- custom -- ** Monoid Style -- $monoid -- * Nesting Streams -- $nesting -- ** Monad -- $monad -- *** Deep Serial Nesting ('Serial') -- $regularSerial -- *** Wide Serial Nesting ('WSerial') -- $interleavedNesting -- *** Deep Speculative Nesting ('Ahead') -- $aheadNesting -- *** Deep Asynchronous Nesting ('Async') -- $concurrentNesting -- *** Wide Asynchronous Nesting ('WAsync') -- $wasyncNesting -- *** Parallel Asynchronous Nesting ('Parallel') -- $parallelNesting -- *** Exercise -- $monadExercise -- ** Applicative -- $applicative -- ** Functor -- $functor -- * Zipping Streams -- $zipping -- ** Serial Zipping -- $serialzip -- ** Parallel Zipping -- $parallelzip -- * Monad transformers -- $monadtransformers -- * Concurrent Programming -- $concurrent -- * Reactive Programming -- $reactive -- * Writing Concurrent Programs -- $programs -- * Performance -- $performance -- * Interoperation with Streaming Libraries -- $interop -- * Comparison with Existing Packages -- $comparison -- * Where to go next? -- $furtherReading ) where import Streamly hiding (foldWith, foldMapWith, forEachWith) import Streamly.Prelude import Data.Semigroup import Control.Applicative import Control.Monad import Control.Monad.IO.Class (MonadIO(..)) import Control.Monad.Trans.Class (MonadTrans (lift)) -- $streams -- -- The monadic stream API offered by Streamly is very close to the Haskell -- "Prelude" pure lists' API, it can be considered as a natural extension of -- lists to monadic actions. Streamly streams provide concurrent composition -- and merging of streams. It can be considered as a concurrent list -- transformer. -- -- The basic stream type is 'Serial', it represents a sequence of IO actions, -- and is a 'Monad'. The 'Serial' monad is almost a drop in replacement for -- the 'IO' monad, IO monad is a special case of the 'Serial' monad; IO monad -- represents a single IO action whereas the 'Serial' monad represents a series -- of IO actions. The only change you need to make to go from 'IO' to 'Serial' -- is to use 'drain' to run the monad and to prefix the IO actions with -- either 'yieldM' or 'liftIO'. If you use liftIO you can switch from 'Serial' -- to IO monad by simply removing the 'drain' function; no other changes -- are needed unless you have used some stream specific composition or -- combinators. -- -- Similarly, the 'Serial' type is almost a drop in replacement for pure lists, -- pure lists are a special case of monadic streams. If you use 'nil' in place -- of '[]' and '|:' in place ':' you can replace a list with a 'Serial' stream. -- The only difference is that the elements must be monadic type and to operate -- on the streams we must use the corresponding functions from -- "Streamly.Prelude" instead of using the base "Prelude". -- $concurrentStreams -- -- Many stream operations can be done concurrently: -- -- * Streams can be generated concurrently. -- -- * Streams can be merged concurrently. -- -- * Multiple stages in a streaming pipeline can run concurrently. -- -- * Streams can be mapped and zipped concurrently. -- -- * In monadic composition they combine like a list transformer, -- providing concurrent non-determinism. -- -- There are three basic concurrent stream styles, 'Ahead', 'Async', and -- 'Parallel'. The 'Ahead' style streams are similar to 'Serial' except that -- they can speculatively execute multiple stream actions concurrently in -- advance. 'Ahead' would return exactly the same stream as 'Serial' except -- that it may execute the actions concurrently. The 'Async' style streams, -- like 'Ahead', speculatively execute multiple stream actions in advance but -- return the results in their finishing order rather than in the stream -- traversal order. 'Parallel' is like 'Async' except that it provides -- unbounded parallelism instead of controlled parallelism. -- -- For easy reference, we can classify the stream types based on /execution order/, -- /consumption order/, and /bounded or unbounded/ concurrency. -- Execution could be serial (i.e. synchronous) or asynchronous. In serial -- execution we execute the next action in the stream only after the previous -- one has finished executing. In asynchronous execution multiple actions in -- the stream can be executed asynchronously i.e. the next action can start -- executing even before the first one has finished. Consumption order -- determines the order in which the outputs generated by the composition are -- consumed. Consumption could be serial or asynchronous. In serial -- consumption, the outputs are consumed in the traversal order, in -- asynchronous consumption the outputs are consumed as they arrive i.e. first -- come first serve order. -- -- +------------+--------------+--------------+--------------+ -- | Type | Execution | Consumption | Concurrency | -- +============+==============+==============+==============+ -- | 'Serial' | Serial | Serial | None | -- +------------+--------------+--------------+--------------+ -- | 'Ahead' | Asynchronous | Serial | bounded | -- +------------+--------------+--------------+--------------+ -- | 'Async' | Asynchronous | Asynchronous | bounded | -- +------------+--------------+--------------+--------------+ -- | 'Parallel' | Asynchronous | Asynchronous | unbounded | -- +------------+--------------+--------------+--------------+ -- -- All these types can be freely inter-converted using type conversion -- combinators or type annotations, without any cost, to achieve the desired -- composition style. To force a particular type of composition, we coerce the -- stream type using the corresponding type adapting combinator from -- 'serially', 'aheadly', 'asyncly', or 'parallely'. The default stream type -- is inferred as 'Serial' unless you change it by using one of the combinators -- or by using a type annotation. -- $flavors -- -- Streams can be combined using '<>' or 'mappend' to form a -- composite. Composite streams can be interpreted in a depth first or -- breadth first manner using an appropriate type conversion before -- consumption. Deep (e.g. 'Serial') stream type variants traverse a -- composite stream in a depth first manner, such that each stream is -- traversed fully before traversing the next stream. Wide -- (e.g. 'WSerial') stream types traverse it in a breadth first -- manner, such that one element from each stream is traversed before -- coming back to the first stream again. -- -- Each stream type has a wide traversal variant prefixed by 'W'. The wide -- variant differs only in the Semigroup\/Monoid, Applicative\/Monad -- compositions of the streams. -- The following table summarizes the basic types and the corresponding wide -- variants: -- -- @ -- +------------+-----------+ -- | Deep | Wide | -- +============+===========+ -- | 'Serial' | 'WSerial' | -- +------------+-----------+ -- | 'Ahead' | 'WAhead' | -- +------------+-----------+ -- | 'Async' | 'WAsync' | -- +------------+-----------+ -- @ -- -- Other than these types there are also 'ZipSerial' and 'ZipAsync' types that -- zip streams serially or concurrently using 'Applicative' operation. These -- types are not monads they are only applicatives and they do not differ in -- 'Semigroup' composition. -- -- $programs -- -- When writing concurrent programs it is advised to not use the concurrent -- style stream combinators blindly at the top level. That might create too -- much concurrency where it is not even required, and can even degrade -- performance in some cases. In some cases it can also lead to surprising -- behavior because of some code that is supposed to be serial becoming -- concurrent. Please be aware that all concurrency capable APIs that you may -- have used under the scope of a concurrent stream combinator will become -- concurrent. For example if you have a 'repeatM' somewhere in your program -- and you use 'parallely' on top, the 'repeatM' becomes fully parallel, -- resulting into an infinite parallel execution . Instead, use the -- /Keep It Serial and Stupid/ principle, start with the default serial -- composition and enable concurrent combinators only when and where necessary. -- When you use a concurrent combinator you can use an explicit 'serially' -- combinator to suppress any unnecessary concurrency under the scope of that -- combinator. -- $monadtransformers -- -- To represent streams in an arbitrary monad use the more general monad -- transformer types for example the monad transformer type corresponding to -- the 'Serial' type is 'SerialT'. @SerialT m a@ represents a stream of values -- of type 'a' in some underlying monad 'm'. For example, @SerialT IO Int@ is a -- stream of 'Int' in 'IO' monad. In fact, the type 'Serial' is a synonym for -- @SerialT IO@. -- -- Similarly we have monad transformer types for other stream types as well viz. -- 'WSerialT', 'AsyncT', 'WAsyncT' and 'ParallelT'. -- -- To lift a value from an underlying monad in a monad transformer stack into a -- singleton stream use 'lift' and to lift from an IO action use 'liftIO'. -- -- @ -- > S.'drain' $ liftIO $ putStrLn "Hello world!" -- Hello world! -- > S.'drain' $ lift $ putStrLn "Hello world!" -- Hello world! -- @ -- -- $generating -- -- We will assume the following imports in this tutorial. Go ahead, fire up a -- GHCi session and import these lines to start playing. -- -- @ -- > import "Streamly" -- > import "Streamly.Prelude" ((|:)) -- > import qualified "Streamly.Prelude" as S -- -- > import Control.Concurrent -- @ -- -- 'nil' represents an empty stream and 'consM' or its operator form '|:' adds -- a monadic action at the head of the stream. -- -- @ -- > S.'toList' S.'nil' -- [] -- > S.'toList' $ 'getLine' |: 'getLine' |: S.'nil' -- hello -- world -- ["hello","world"] -- @ -- -- To create a singleton stream from a pure value use 'yield' or 'pure' and to -- create a singleton stream from a monadic action use 'yieldM'. Note that in -- case of Zip applicative streams "pure" repeats the value to generate an -- infinite stream. -- -- @ -- > S.'toList' $ 'pure' 1 -- [1] -- > S.'toList' $ S.'yield' 1 -- [1] -- > S.'toList' $ S.'yieldM' 'getLine' -- hello -- ["hello"] -- @ -- -- To create a stream from pure values in a 'Foldable' container use -- 'fromFoldable' which is equivalent to a fold using 'cons' and 'nil': -- -- @ -- > S.'toList' $ S.'fromFoldable' [1..3] -- [1,2,3] -- > S.'toList' $ 'Prelude.foldr' S.'cons' S.'nil' [1..3] -- [1,2,3] -- @ -- -- To create a stream from monadic actions in a 'Foldable' container just use a -- right fold using 'consM' and 'nil': -- -- @ -- > S.'drain' $ 'Prelude.foldr' ('|:') S.'nil' ['putStr' "Hello ", 'putStrLn' "world!"] -- Hello world! -- @ -- -- For more ways to construct a stream see the module "Streamly.Prelude". -- $generatingConcurrently -- -- Monadic construction and generation functions like 'consM', 'unfoldrM', -- 'replicateM', 'repeatM', 'iterateM' and 'fromFoldableM' work concurrently -- when used with appropriate stream type combinator. The pure versions of -- these APIs are not concurrent, however you can use the monadic versions even -- for pure computations by wrapping the pure value in a monad to get the -- concurrent generation capability where required. -- -- The following code finishes in 3 seconds (6 seconds when serial): -- -- @ -- > let p n = threadDelay (n * 1000000) >> return n -- > S.'toList' $ 'parallely' $ p 3 |: p 2 |: p 1 |: S.'nil' -- [1,2,3] -- > S.'toList' $ 'aheadly' $ p 3 |: p 2 |: p 1 |: S.'nil' -- [3,2,1] -- @ -- The following finishes in 10 seconds (100 seconds when serial): -- -- @ -- > S.drain $ 'asyncly' $ S.'replicateM' 10 $ p 10 -- @ -- -- $eliminating -- -- We have already seen 'drain' and 'toList' to eliminate a stream in the -- examples above. 'drain' runs a stream discarding the results i.e. only -- for effects. 'toList' runs the stream and collects the results in a list. -- -- For other ways to eliminate a stream see the @Folding@ section in -- "Streamly.Prelude" module. -- $transformation -- -- Transformation over a stream is the equivalent of a @for@ loop construct in -- imperative paradigm. We iterate over every element in the stream and perform -- certain transformations for each element. Transformations may involve -- mapping functions over the elements, filtering elements from the stream or -- folding all the elements in the stream into a single value. Streamly streams -- are exactly like lists and you can perform all the transformations in the -- same way as you would on lists. -- -- Here is a simple console echo program that just echoes every input line, -- forever: -- -- @ -- > import Data.Function ((&)) -- > S.'drain' $ S.'repeatM' getLine & S.'mapM' putStrLn -- @ -- -- The following code snippet reads lines from standard input, filters blank -- lines, drops the first non-blank line, takes the next two, up cases them, -- numbers them and prints them: -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- import Data.Char (toUpper) -- import Data.Function ((&)) -- -- main = S.'drain' $ -- S.'repeatM' getLine -- & S.'filter' (not . null) -- & S.'drop' 1 -- & S.'take' 2 -- & fmap (map toUpper) -- & S.'zipWith' (\\n s -> show n ++ " " ++ s) (S.'fromFoldable' [1..]) -- & S.'mapM' putStrLn -- @ -- $concurrentTransformation -- -- Monadic transformation functions 'mapM' and 'sequence' work concurrently -- when used with appropriate stream type combinators. The pure versions do not -- work concurrently, however you can use the monadic versions even for pure -- computations to get the concurrent transformation capability where required. -- -- This would print a value every second (2 seconds when serial): -- -- @ -- > let p n = threadDelay (n * 1000000) >> return n -- > S.'drain' $ S.aheadly $ S.'mapM' (\\x -> p 1 >> print x) (serially $ S.repeatM (p 1)) -- @ -- -- $concurrentApplication -- -- The concurrent function application operators '|$' and '|&' apply a stream -- argument to a stream function concurrently to compose a concurrent pipeline -- of stream processing functions: -- -- Because both the stages run concurrently, we would see a delay of only 1 -- second instead of 2 seconds in the following: -- -- @ -- > let p n = threadDelay (n * 1000000) >> return n -- > S.'drain' $ S.'repeatM' (p 1) '|&' S.'mapM' (\\x -> p 1 >> print x) -- @ -- $semigroup -- -- We can combine two streams into a single stream using semigroup composition -- operation '<>'. Streams can be combined in many different ways as described -- in the following sections, the '<>' operation behaves differently depending -- on the stream type in effect. The stream type and therefore the composition -- style can be changed at any point using one of the type combinators as -- discussed earlier. -- $imports -- -- In most of example snippets we do not repeat the imports. Where imports are -- not explicitly specified use the imports shown below. -- -- @ -- import "Streamly" -- import "Streamly.Prelude" ((|:), nil) -- import qualified "Streamly.Prelude" as S -- -- import Control.Concurrent -- import Control.Monad (forever) -- @ -- -- To illustrate concurrent vs serial composition aspects, we will use the -- following @delay@ function to introduce a sleep or delay specified in -- seconds. After the delay it prints the number of seconds it slept. -- -- @ -- delay n = S.'yieldM' $ do -- threadDelay (n * 1000000) -- tid \<- myThreadId -- putStrLn (show tid ++ ": Delay " ++ show n) -- @ -- $serial -- -- The 'Semigroup' operation '<>' of the 'Serial' type combines the two streams -- in a /serial depth first/ manner. We use the 'serially' type combinator to -- effect 'Serial' style of composition. We can also use an explicit 'Serial' -- type annotation for the stream to achieve the same effect. However, since -- 'Serial' is the default type unless explicitly specified by using a -- combinator, we can omit using an explicit combinator or type annotation for -- this style of composition. -- -- When two streams with multiple elements are combined in this manner, the -- monadic actions in the two streams are performed sequentially i.e. first all -- actions in the first stream are performed sequentially and then all actions -- in the second stream are performed sequentially. We call it -- /serial depth first/ as the full depth of one stream is fully traversed -- before we move to the next. The following example prints the sequence 1, 2, -- 3, 4: -- -- @ -- main = S.'drain' $ (print 1 |: print 2 |: nil) <> (print 3 |: print 4 |: nil) -- @ -- @ -- 1 -- 2 -- 3 -- 4 -- @ -- -- All actions in both the streams are performed serially in the same thread. -- In the following example we can see that all actions are performed in the -- same thread and take a combined total of @3 + 2 + 1 = 6@ seconds: -- -- @ -- main = S.'drain' $ delay 3 <> delay 2 <> delay 1 -- @ -- @ -- ThreadId 36: Delay 3 -- ThreadId 36: Delay 2 -- ThreadId 36: Delay 1 -- @ -- -- The polymorphic version of the binary operation '<>' of the 'Serial' type is -- 'serial'. We can use 'serial' to join streams in a sequential manner -- irrespective of the type of stream: -- -- @ -- main = S.'drain' $ (print 1 |: print 2 |: nil) \`serial` (print 3 |: print 4 |: nil) -- @ -- $interleaved -- -- The 'Semigroup' operation '<>' of the 'WSerial' type combines the two -- streams in a /serial breadth first/ manner. We use the 'wSerially' type -- combinator to effect 'WSerial' style of composition. We can also use the -- 'WSerial' type annotation for the stream to achieve the same effect. -- -- When two streams with multiple elements are combined in this manner, we -- traverse all the streams in a breadth first manner i.e. one action from each -- stream is performed and yielded to the resulting stream before we come back -- to the first stream again and so on. -- The following example prints the sequence 1, 3, 2, 4 -- -- @ -- main = S.'drain' . 'wSerially' $ (print 1 |: print 2 |: nil) <> (print 3 |: print 4 |: nil) -- @ -- @ -- 1 -- 3 -- 2 -- 4 -- @ -- -- Even though the monadic actions of the two streams are performed in an -- interleaved manner they are all performed serially in the same thread. In -- the following example we can see that all actions are performed in the same -- thread and take a combined total of @3 + 2 + 1 = 6@ seconds: -- -- @ -- main = S.'drain' . 'wSerially' $ delay 3 <> delay 2 <> delay 1 -- @ -- @ -- ThreadId 36: Delay 3 -- ThreadId 36: Delay 2 -- ThreadId 36: Delay 1 -- @ -- -- The polymorphic version of the 'WSerial' binary operation '<>' is called -- 'wSerial'. We can use 'wSerial' to join streams in an interleaved manner -- irrespective of the type, notice that we have not used the 'wSerially' -- combinator in the following example: -- -- @ -- main = S.'drain' $ (print 1 |: print 2 |: nil) \`wSerial` (print 3 |: print 4 |: nil) -- @ -- @ -- 1 -- 3 -- 2 -- 4 -- @ -- -- Note that this composition cannot be used to fold infinite number of streams -- since it requires preserving the state until a stream is finished. -- $ahead -- -- The 'Semigroup' operation '<>' of the 'Ahead' type combines two streams in a -- /serial depth first/ manner with concurrent lookahead. We use the 'aheadly' -- type combinator to effect 'Ahead' style of composition. We can also use an -- explicit 'Ahead' type annotation for the stream to achieve the same effect. -- -- When two streams are combined in this manner, the streams are traversed in -- depth first manner just like 'Serial', however it can execute the next -- stream concurrently and keep the results ready when its turn arrives. -- Concurrent execution of the next stream(s) is performed if the first stream -- blocks or if it cannot produce output at the rate that is enough to meet the -- consumer demand. Multiple streams can be executed concurrently to meet the -- demand. The following example would print the result in a second even -- though each action in each stream takes one second: -- -- @ -- main = do -- xs \<- S.'toList' . 'aheadly' $ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil) -- print xs -- where p n = threadDelay 1000000 >> return n -- @ -- @ -- [1,2,3,4] -- @ -- -- Each stream is constructed 'aheadly' and then both the streams are merged -- 'aheadly', therefore, all the actions can run concurrently but the result is -- presented in serial order. -- -- You can also use the polymorphic combinator 'ahead' in place of '<>' to -- compose any type of streams in this manner. -- $async -- -- The 'Semigroup' operation '<>' of the 'Async' type combines the two -- streams in a depth first manner with parallel look ahead. We use the -- 'asyncly' type combinator to effect 'Async' style of composition. We -- can also use the 'Async' type annotation for the stream type to achieve -- the same effect. -- -- When two streams with multiple elements are combined in this manner, the -- streams are traversed in depth first manner just like 'Serial', however it -- can execute the next stream concurrently and return the results from it -- as they arrive i.e. the results from the next stream may be yielded even -- before the results from the first stream. Concurrent execution of the next -- stream(s) is performed if the first stream blocks or if it cannot produce -- output at the rate that is enough to meet the consumer demand. Multiple -- streams can be executed concurrently to meet the demand. -- In the example below each element in the stream introduces a constant delay -- of 1 second, however, it takes just one second to produce all the results. -- The results are not guaranteed to be in any particular order: -- -- @ -- main = do -- xs \<- S.'toList' . 'asyncly' $ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil) -- print xs -- where p n = threadDelay 1000000 >> return n -- @ -- @ -- [4,2,1,3] -- @ -- -- The constituent streams are also composed in 'Async' manner and the -- composition of streams too. We can compose the constituent streams to run -- serially, in that case it would take 2 seconds to produce all the results. -- The elements in the serial streams would be in serial order in the results: -- -- @ -- main = do -- xs \<- S.'toList' . 'asyncly' $ (serially $ p 1 |: p 2 |: nil) <> (serially $ p 3 |: p 4 |: nil) -- print xs -- where p n = threadDelay 1000000 >> return n -- @ -- @ -- [3,1,2,4] -- @ -- -- In the following example we can see that new threads are started when a -- computation blocks. Notice that the output from the stream with the -- shortest delay is printed first. The whole computation takes @maximum of -- (3, 2, 1) = 3@ seconds: -- -- @ -- main = S.'drain' . 'asyncly' $ delay 3 '<>' delay 2 '<>' delay 1 -- @ -- @ -- ThreadId 42: Delay 1 -- ThreadId 41: Delay 2 -- ThreadId 40: Delay 3 -- @ -- -- When we have a tree of computations composed using this style, the tree is -- traversed in DFS style just like the 'Serial' style, the only difference is -- that here we can move on to executing the next stream if a stream blocks. -- However, we will not start new threads if we have sufficient output to -- saturate the consumer. This is why we call it left-biased demand driven or -- adaptive concurrency style, the concurrency tends to stay on the left side -- of the composition as long as possible. More threads are started based on -- the pull rate of the consumer. The following example prints an output every -- second as all of the actions are concurrent. -- -- @ -- main = S.'drain' . 'asyncly' $ (delay 1 <> delay 2) <> (delay 3 <> delay 4) -- @ -- @ -- 1 -- 2 -- 3 -- 4 -- @ -- -- All the computations may even run in a single thread when more threads are -- not needed. As you can see, in the following example the computations are -- run in a single thread one after another, because none of them blocks. -- However, if the thread consuming the stream were faster than the producer -- then it would have started parallel threads for each computation to keep up -- even if none of them blocks: -- -- @ -- main = S.'drain' . 'asyncly' $ traced (sqrt 9) '<>' traced (sqrt 16) '<>' traced (sqrt 25) -- where traced m = S.'yieldM' (myThreadId >>= print) >> return m -- @ -- @ -- ThreadId 40 -- ThreadId 40 -- ThreadId 40 -- @ -- -- Note that the order of printing in the above examples may change due to -- variations in scheduling latencies for concurrent threads. -- -- The polymorphic version of the 'Async' binary operation '<>' is called -- 'async'. We can use 'async' to join streams in a left biased -- adaptively concurrent manner irrespective of the type, notice that we have -- not used the 'asyncly' combinator in the following example: -- -- @ -- main = S.'drain' $ delay 3 \`async` delay 2 \`async` delay 1 -- @ -- @ -- ThreadId 42: Delay 1 -- ThreadId 41: Delay 2 -- ThreadId 40: Delay 3 -- @ -- -- Since the concurrency provided by this operator is demand driven it cannot -- be used when the composed computations start timers that are relative to -- each other because all computations may not be started at the same time and -- therefore timers in all of them may not start at the same time. When -- relative timing among all computations is important or when we need to start -- all computations at once for any reason 'Parallel' style must be used -- instead. -- -- 'Async' style utilizes resources optimally and should be preferred over -- 'Parallel' or 'WAsync' unless you really need those. 'Async' should be used -- when we know that the computations can run in parallel but we do not care if -- they actually run in parallel or not, that decision can be left to the -- scheduler based on demand. Also, note that 'async' operator can be used to fold -- infinite number of streams in contrast to the 'Parallel' or 'WAsync' styles, -- because it does not require us to run all of them at the same time in a fair -- manner. -- $wasync -- -- The 'Semigroup' operation '<>' of the 'WAsync' type combines two streams in -- a concurrent manner using /breadth first traversal/. We use the 'wAsyncly' -- type combinator to effect 'WAsync' style of composition. We can also use the -- 'WAsync' type annotation for the stream to achieve the same effect. -- -- When streams with multiple elements are combined in this manner, we traverse -- all the streams concurrently in a breadth first manner i.e. one action from -- each stream is performed and yielded to the resulting stream before we come -- back to the first stream again and so on. Even though we execute the actions -- in a breadth first order the outputs are consumed on a first come first -- serve basis. -- -- In the following example we can see that outputs are produced in the breadth -- first traversal order but this is not guaranteed. -- -- @ -- main = S.'drain' . 'wAsyncly' $ (serially $ print 1 |: print 2 |: nil) <> (serially $ print 3 |: print 4 |: nil) -- @ -- @ -- 1 -- 3 -- 2 -- 4 -- @ -- -- The polymorphic version of the binary operation '<>' of the 'WAsync' type is -- 'wAsync'. We can use 'wAsync' to join streams using a breadth first -- concurrent traversal irrespective of the type, notice that we have not used -- the 'wAsyncly' combinator in the following example: -- -- @ -- main = S.'drain' $ delay 3 \`wAsync` delay 2 \`wAsync` delay 1 -- @ -- @ -- ThreadId 42: Delay 1 -- ThreadId 41: Delay 2 -- ThreadId 40: Delay 3 -- @ -- -- Since the concurrency provided by this style is demand driven it may not -- be used when the composed computations start timers that are relative to -- each other because all computations may not be started at the same time and -- therefore timers in all of them may not start at the same time. When -- relative timing among all computations is important or when we need to start -- all computations at once for any reason 'Parallel' style must be used -- instead. -- -- $parallel -- -- The 'Semigroup' operation '<>' of the 'Parallel' type combines the two -- streams in a fairly concurrent manner with round robin scheduling. We use -- the 'parallely' type combinator to effect 'Parallel' style of composition. -- We can also use the 'Parallel' type annotation for the stream type to -- achieve the same effect. -- -- When two streams with multiple elements are combined in this manner, the -- monadic actions in both the streams are performed concurrently with a fair -- round robin scheduling. The outputs are yielded in the order in which the -- actions complete. This is pretty similar to the 'WAsync' type, the -- difference is that 'WAsync' is adaptive to the consumer demand and may or -- may not execute all actions in parallel depending on the demand, whereas -- 'Parallel' runs all the streams in parallel irrespective of the demand. -- -- The following example sends a query to all the three search engines in -- parallel and prints the name of the search engines in the order in which the -- responses arrive. You need the [http-conduit](http://hackage.haskell.org/package/http-conduit) -- package to run this example: -- -- @ -- import "Streamly" -- import qualified Streamly.Prelude as S -- import Network.HTTP.Simple -- -- main = S.'drain' . 'parallely' $ google \<> bing \<> duckduckgo -- where -- google = get "https://www.google.com/search?q=haskell" -- bing = get "https://www.bing.com/search?q=haskell" -- duckduckgo = get "https://www.duckduckgo.com/?q=haskell" -- get s = S.'yieldM' (httpNoBody (parseRequest_ s) >> putStrLn (show s)) -- @ -- -- The polymorphic version of the binary operation '<>' of the 'Parallel' type -- is 'parallel'. We can use 'parallel' to join streams in a fairly concurrent -- manner irrespective of the type, notice that we have not used the -- 'parallely' combinator in the following example: -- -- @ -- main = S.'drain' $ delay 3 \`parallel` delay 2 \`wAsync` delay 1 -- @ -- @ -- ThreadId 42: Delay 1 -- ThreadId 41: Delay 2 -- ThreadId 40: Delay 3 -- @ -- -- Note that this style of composition cannot be used to combine infinite -- number of streams, as it will lead to an infinite sized scheduling queue. -- -- XXX to be removed -- $custom -- -- The 'mkAsync' API can be used to create references to asynchronously running -- stream computations. We can then use 'uncons' to explore the streams -- arbitrarily and then recompose individual elements to create a new stream. -- This way we can dynamically decide which stream to explore at any given -- time. Take an example of a merge sort of two sorted streams. We need to -- keep consuming items from the stream which has the lowest item in the sort -- order. This can be achieved using async references to streams. See -- "MergeSort.hs" in the examples directory. -- $monoid -- -- We can use 'Monoid' instances to fold a container of streams in the desired -- style using 'fold' or 'foldMap'. We have also provided some fold utilities -- to fold streams using the polymorphic combine operations: -- -- * 'foldWith' is like 'fold', it folds a 'Foldable' container of streams -- using the given composition operator. -- * 'foldMapWith' is like 'foldMap', it folds like @foldWith@ but also maps a -- function before folding. -- * 'forEachWith' is like @foldMapwith@ but the container argument comes before -- the function argument. -- -- All of the following are equivalent and start ten concurrent tasks each with -- a delay from 1 to 10 seconds, resulting in the printing of each number every -- second: -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- import Control.Concurrent -- -- main = do -- S.'drain' $ 'asyncly' $ foldMap delay [1..10] -- S.'drain' $ S.'foldWith' 'async' (map delay [1..10]) -- S.'drain' $ S.'foldMapWith' 'async' delay [1..10] -- S.'drain' $ S.'forEachWith' 'async' [1..10] delay -- where delay n = S.'yieldM' $ threadDelay (n * 1000000) >> print n -- @ -- $nesting -- -- Till now we discussed ways to apply transformations on a stream or to merge -- streams together to create another stream. We mentioned earlier that -- transforming a stream is similar to a @for@ loop in the imperative paradigm. -- We will now discuss the concept of a nested composition of streams which is -- analogous to nested @for@ loops in the imperative paradigm. Functional -- programmers call this style of composition a list transformer or @ListT@. -- Logic programmers call it a logic monad or non-deterministic composition, -- but for ordinary imperative minded people like me it is easier to think in -- terms of good old nested @for@ loops. -- -- $monad -- -- In functional programmer's parlance the 'Monad' instances of different -- 'IsStream' types implement non-determinism, exploring all possible -- combination of choices from both the streams. From an imperative -- programmer's point of view it behaves like nested loops i.e. for each -- element in the first stream and for each element in the second stream -- execute the body of the loop. -- -- The 'Monad' instances of 'Serial', 'WSerial', 'Async' and 'WAsync' -- stream types support different flavors of nested looping. In other words, -- they are all variants of list transformer. The nesting behavior of these -- types correspond exactly to the way they merge streams as we discussed in -- the previous section. -- -- $regularSerial -- -- The 'Monad' composition of the 'Serial' type behaves like a standard list -- transformer. This is the default when we do not use an explicit type -- combinator. However, the 'serially' type combinator can be used to switch to -- this style of composition. We will see how this style of composition works -- in the following examples. -- -- Let's start with an example with a simple @for@ loop without any nesting. -- For simplicity of illustration we are using streams of pure values in all -- the examples. However, the streams could also be made of monadic actions -- instead. -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- -- main = S.'drain' $ do -- x <- S.'fromFoldable' [3,2,1] -- delay x -- @ -- @ -- ThreadId 30: Delay 3 -- ThreadId 30: Delay 2 -- ThreadId 30: Delay 1 -- @ -- -- As we can see, the code after the @fromFoldable@ statement is run three -- times, once for each value of @x@ drawn from the stream. All the three -- iterations are serial and run in the same thread one after another. In -- imperative terms this is equivalent to a @for@ loop with three iterations. -- -- A console echo loop copying standard input to standard output can simply be -- written like this: -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- -- import Control.Monad (forever) -- -- main = S.'drain' $ forever $ S.yieldM getLine >>= S.yieldM . putStrLn -- @ -- -- When multiple streams are composed using this style they nest in a DFS -- manner i.e. nested iterations of a loop are executed before we proceed to -- the next iteration of the parent loop. This behaves just like nested @for@ -- loops in imperative programming. -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- -- main = S.'drain' $ do -- x <- S.'fromFoldable' [1,2] -- y <- S.'fromFoldable' [3,4] -- S.'yieldM' $ putStrLn $ show (x, y) -- @ -- @ -- (1,3) -- (1,4) -- (2,3) -- (2,4) -- @ -- -- Notice that this is analogous to merging streams of type 'Serial' or merging -- streams using 'serial'. -- $aheadNesting -- -- The 'Monad' composition of 'Ahead' type behaves just like 'Serial' except -- that it can speculatively perform a bounded number of next iterations of a -- loop concurrently. -- -- The 'aheadly' type combinator can be used to switch to this style of -- composition. Alternatively, a type annotation can be used to specify the -- type of the stream as 'Ahead'. -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- -- comp = S.'toList' . 'aheadly' $ do -- x <- S.'fromFoldable' [3,2,1] -- delay x >> return x -- -- main = comp >>= print -- @ -- @ -- ThreadId 40: Delay 1 -- ThreadId 39: Delay 2 -- ThreadId 38: Delay 3 -- [3,2,1] -- @ -- -- This code finishes in 3 seconds, 'Serial' would take 6 seconds. As we can -- see all the three iterations are concurrent and run in different threads, -- however, the results are returned in the serial order. -- -- Concurrency is demand driven, when multiple streams are composed using this -- style, the iterations are executed in a depth first manner just like -- 'Serial' i.e. nested iterations are executed before we proceed to the next -- outer iteration. The only difference is that we may execute multiple future -- iterations concurrently and keep the results ready. -- -- $concurrentNesting -- -- The 'Monad' composition of 'Async' type can perform the iterations of a -- loop concurrently. Concurrency is demand driven i.e. more concurrent -- iterations are started only if the previous iterations are not able to -- produce enough output for the consumer of the output stream. This works -- exactly the same way as the merging of two streams 'asyncly' works. -- This is the concurrent analogue of 'Serial' style monadic composition. -- -- The 'asyncly' type combinator can be used to switch to this style of -- composition. Alternatively, a type annotation can be used to specify the -- type of the stream as 'Async'. -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- -- main = S.'drain' . 'asyncly' $ do -- x <- S.'fromFoldable' [3,2,1] -- delay x -- @ -- @ -- ThreadId 40: Delay 1 -- ThreadId 39: Delay 2 -- ThreadId 38: Delay 3 -- @ -- -- As we can see the code after the @fromFoldable@ statement is run three -- times, once for each value of @x@. All the three iterations are concurrent -- and run in different threads. The iteration with least delay finishes first. -- When compared to imperative programming, this can be viewed as a @for@ loop -- with three concurrent iterations. -- -- Concurrency is demand driven just as in the case of 'async' merging. -- When multiple streams are composed using this style, the iterations are -- triggered in a depth first manner just like 'Serial' i.e. nested iterations are -- executed before we proceed to the next iteration at higher level. However, -- unlike 'Serial' more than one iterations may be started concurrently based -- on the demand from the consumer of the stream. -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- -- main = S.'drain' . 'asyncly' $ do -- x <- S.'fromFoldable' [1,2] -- y <- S.'fromFoldable' [3,4] -- S.'yieldM' $ putStrLn $ show (x, y) -- @ -- @ -- (1,3) -- (1,4) -- (2,3) -- (2,4) -- @ -- $interleavedNesting -- -- The 'Monad' composition of 'WSerial' type interleaves the iterations of -- outer and inner loops in a nested loop composition. This works exactly the -- same way as the merging of two streams in 'wSerially' fashion works. The -- 'wSerially' type combinator can be used to switch to this style of -- composition. Alternatively, a type annotation can be used to specify the -- type of the stream as 'WSerial'. -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- -- main = S.'drain' . 'wSerially' $ do -- x <- S.'fromFoldable' [1,2] -- y <- S.'fromFoldable' [3,4] -- S.yieldM $ putStrLn $ show (x, y) -- @ -- @ -- (1,3) -- (2,3) -- (1,4) -- (2,4) -- @ -- -- $wasyncNesting -- -- Just like 'Async' the 'Monad' composition of 'WAsync' runs the iterations of -- a loop concurrently. The difference is in the nested loop behavior. The -- nested loops in this type are traversed and executed in a breadth first -- manner rather than the depth first manner of 'Async' style. -- The loop nesting works exactly the same way as the merging of streams -- 'wAsyncly' works. The 'wAsyncly' type combinator can be used to switch to -- this style of composition. Alternatively, a type annotation can be used to -- specify the type of the stream as 'WAsync'. -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- -- main = S.'drain' . 'wAsyncly' $ do -- x <- S.'fromFoldable' [1,2] -- y <- S.'fromFoldable' [3,4] -- S.'yieldM' $ putStrLn $ show (x, y) -- @ -- @ -- (1,3) -- (2,3) -- (1,4) -- (2,4) -- @ -- $parallelNesting -- -- Just like 'Async' or 'WAsync' the 'Monad' composition of 'Parallel' runs the -- iterations of a loop concurrently. The difference is in the nested loop -- behavior. The streams at each nest level is run fully concurrently -- irrespective of the demand. The loop nesting works exactly the same way as -- the merging of streams 'parallely' works. The 'parallely' type combinator -- can be used to switch to this style of composition. Alternatively, a type -- annotation can be used to specify the type of the stream as 'Parallel'. -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- -- main = S.'drain' . 'parallely' $ do -- x <- S.'fromFoldable' [3,2,1] -- delay x -- @ -- @ -- ThreadId 40: Delay 1 -- ThreadId 39: Delay 2 -- ThreadId 38: Delay 3 -- @ -- $monadExercise -- -- Streamly code is usually written in a way that is agnostic of the -- specific monadic composition type. We use a polymorphic type with a -- 'IsStream' type class constraint. When running the stream we can choose the -- specific mode of composition. For example take a look at the following code. -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- -- composed :: (IsStream t, Monad (t IO)) => t IO () -- composed = do -- sz <- sizes -- cl <- colors -- sh <- shapes -- S.'yieldM' $ putStrLn $ show (sz, cl, sh) -- -- where -- -- sizes = S.'fromFoldable' [1, 2, 3] -- colors = S.'fromFoldable' ["red", "green", "blue"] -- shapes = S.'fromFoldable' ["triangle", "square", "circle"] -- @ -- -- Now we can interpret this in whatever way we want: -- -- @ -- main = S.'drain' . 'serially' $ composed -- main = S.'drain' . 'wSerially' $ composed -- main = S.'drain' . 'asyncly' $ composed -- main = S.'drain' . 'wAsyncly' $ composed -- main = S.'drain' . 'parallely' $ composed -- @ -- -- As an exercise try to figure out the output of this code for each mode of -- composition. -- $functor -- -- 'fmap' transforms a stream by mapping a function on all elements of the -- stream. 'fmap' behaves in the same way for all stream types, it is always -- serial. -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- -- main = (S.'toList' $ fmap show $ S.'fromFoldable' [1..10]) >>= print -- @ -- -- Also see the 'mapM' and 'sequence' functions for mapping actions, in the -- "Streamly.Prelude" module. -- $applicative -- -- Applicative is precisely the same as the 'ap' operation of 'Monad'. For -- zipping applicatives separate types 'ZipSerial' and 'ZipAsync' are -- provided. -- -- The following example uses the 'Serial' applicative, it runs all iterations -- serially and takes a total 17 seconds (1 + 3 + 4 + 2 + 3 + 4): -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- import Control.Concurrent -- -- s1 = d 1 <> d 2 -- s2 = d 3 <> d 4 -- d n = delay n >> return n -- -- main = (S.'toList' . 'serially' $ (,) \<$> s1 \<*> s2) >>= print -- @ -- @ -- ThreadId 36: Delay 1 -- ThreadId 36: Delay 3 -- ThreadId 36: Delay 4 -- ThreadId 36: Delay 2 -- ThreadId 36: Delay 3 -- ThreadId 36: Delay 4 -- [(1,3),(1,4),(2,3),(2,4)] -- @ -- -- Similarly 'WSerial' applicative runs the iterations in an interleaved -- order but since it is serial it takes a total of 17 seconds: -- -- @ -- main = (S.'toList' . 'wSerially' $ (,) \<$> s1 \<*> s2) >>= print -- @ -- @ -- ThreadId 36: Delay 1 -- ThreadId 36: Delay 3 -- ThreadId 36: Delay 2 -- ThreadId 36: Delay 3 -- ThreadId 36: Delay 4 -- ThreadId 36: Delay 4 -- [(1,3),(2,3),(1,4),(2,4)] -- @ -- -- 'Async' can run the iterations concurrently and therefore takes a total -- of 6 seconds which is max (1, 2) + max (3, 4): -- -- @ -- main = (S.'toList' . 'asyncly' $ (,) \<$> s1 \<*> s2) >>= print -- @ -- @ -- ThreadId 34: Delay 1 -- ThreadId 36: Delay 2 -- ThreadId 35: Delay 3 -- ThreadId 36: Delay 3 -- ThreadId 35: Delay 4 -- ThreadId 36: Delay 4 -- [(1,3),(2,3),(1,4),(2,4)] -- @ -- -- Similarly 'WAsync' as well can run the iterations concurrently and -- therefore takes a total of 6 seconds (2 + 4): -- -- @ -- main = (S.'toList' . 'wAsyncly' $ (,) \<$> s1 \<*> s2) >>= print -- @ -- @ -- ThreadId 34: Delay 1 -- ThreadId 36: Delay 2 -- ThreadId 35: Delay 3 -- ThreadId 36: Delay 3 -- ThreadId 35: Delay 4 -- ThreadId 36: Delay 4 -- [(1,3),(2,3),(1,4),(2,4)] -- @ -- $zipping -- -- Zipping is a special transformation where the corresponding elements of two -- streams are combined together using a zip function producing a new stream of -- outputs. Two different types are provided for serial and concurrent zipping. -- These types provide an applicative instance that can be used to lift -- functions to zip the argument streams. -- Also see the zipping functions in the "Streamly.Prelude" module. -- $serialzip -- -- The applicative instance of 'ZipSerial' type zips streams serially. -- 'zipSerially' type combinator can be used to switch to serial applicative -- zip composition: -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- import Control.Concurrent -- -- d n = delay n >> return n -- s1 = 'serially' $ d 1 <> d 2 -- s2 = 'serially' $ d 3 <> d 4 -- -- main = (S.'toList' . 'zipSerially' $ (,) \<$> s1 \<*> s2) >>= print -- @ -- -- This takes total 10 seconds to zip, which is (1 + 2 + 3 + 4) since -- everything runs serially: -- -- @ -- ThreadId 29: Delay 1 -- ThreadId 29: Delay 3 -- ThreadId 29: Delay 2 -- ThreadId 29: Delay 4 -- [(1,3),(2,4)] -- @ -- $parallelzip -- -- The applicative instance of 'ZipAsync' type zips streams concurrently. -- 'zipAsyncly' type combinator can be used to switch to parallel applicative -- zip composition: -- -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- import Control.Concurrent -- import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering)) -- -- d n = delay n >> return n -- s1 = 'serially' $ d 1 <> d 2 -- s2 = 'serially' $ d 3 <> d 4 -- -- main = do -- hSetBuffering stdout LineBuffering -- (S.'toList' . 'zipAsyncly' $ (,) \<$> s1 \<*> s2) >>= print -- @ -- -- This takes 7 seconds to zip, which is max (1,3) + max (2,4) because 1 and 3 -- are produced concurrently, and 2 and 4 are produced concurrently: -- -- @ -- ThreadId 32: Delay 1 -- ThreadId 32: Delay 2 -- ThreadId 33: Delay 3 -- ThreadId 33: Delay 4 -- [(1,3),(2,4)] -- @ -- $concurrent -- -- When writing concurrent programs there are two distinct places where the -- programmer can control the concurrency. First, when /composing/ a stream by -- merging multiple streams we can choose an appropriate sum style operators to -- combine them concurrently or serially. Second, when /processing/ a stream in -- a monadic composition we can choose one of the monad composition types to -- choose the desired type of concurrency. -- -- In the following example the squares of @x@ and @y@ are computed -- concurrently using the 'async' operation and the square roots of their -- sum are computed serially because of the 'streamly' combinator. We can -- choose different combinators for the monadic processing and the stream -- generation, to control the concurrency. We can also use the 'asyncly' -- combinator instead of explicitly folding with 'async'. -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- import Data.List (sum) -- -- main = do -- z \<- S.'toList' -- $ 'serially' -- Serial monadic processing (sqrt below) -- $ do -- x2 \<- 'forEachWith' 'async' [1..100] $ -- Concurrent @"for"@ loop -- \\x -> return $ x * x -- body of the loop -- y2 \<- 'forEachWith' 'async' [1..100] $ -- \\y -> return $ y * y -- return $ sqrt (x2 + y2) -- print $ sum z -- @ -- -- We can see how this directly maps to the imperative style -- model, we use combinators -- and operators instead of the ugly pragmas. -- -- For more concurrent programming examples see, -- , -- and -- in the examples directory. -- $reactive -- -- Reactive programming is nothing but concurrent streaming which is what -- streamly is all about. With streamly we can generate streams of events, -- merge streams that are generated concurrently and process events -- concurrently. We can do all this without any knowledge about the specifics -- of the implementation of concurrency. In the following example you will see -- that the code is just regular Haskell code without much streamly APIs used -- (active hyperlinks are the streamly APIs) and yet it is a reactive -- application. -- -- This application has two independent and concurrent sources of event -- streams, @acidRain@ and @userAction@. @acidRain@ continuously generates -- events that deteriorate the health of the character in the game. -- @userAction@ can be "potion" or "quit". When the user types "potion" the -- health improves and the game continues. -- -- @ -- {-\# LANGUAGE FlexibleContexts #-} -- -- import "Streamly" -- import "Streamly.Prelude" as S -- import Control.Monad (void, when) -- import Control.Monad.IO.Class (MonadIO(liftIO)) -- import Control.Monad.State (MonadState, get, modify, runStateT, put) -- -- data Event = Quit | Harm Int | Heal Int deriving (Show) -- -- userAction :: MonadAsync m => 'SerialT' m Event -- userAction = S.'repeatM' $ liftIO askUser -- where -- askUser = do -- command <- getLine -- case command of -- "potion" -> return (Heal 10) -- "harm" -> return (Harm 10) -- "quit" -> return Quit -- _ -> putStrLn "Type potion or harm or quit" >> askUser -- -- acidRain :: MonadAsync m => 'SerialT' m Event -- acidRain = 'asyncly' $ 'constRate' 1 $ S.'repeatM' $ liftIO $ return $ Harm 1 -- -- data Result = Check | Done -- -- runEvents :: (MonadAsync m, MonadState Int m) => 'SerialT' m Result -- runEvents = do -- event \<- userAction \`parallel` acidRain -- case event of -- Harm n -> modify (\\h -> h - n) >> return Check -- Heal n -> modify (\\h -> h + n) >> return Check -- Quit -> return Done -- -- data Status = Alive | GameOver deriving Eq -- -- getStatus :: (MonadAsync m, MonadState Int m) => Result -> m Status -- getStatus result = -- case result of -- Done -> liftIO $ putStrLn "You quit!" >> return GameOver -- Check -> do -- h <- get -- liftIO $ if (h <= 0) -- then putStrLn "You die!" >> return GameOver -- else putStrLn ("Health = " <> show h) >> return Alive -- -- main :: IO () -- main = do -- putStrLn "Your health is deteriorating due to acid rain,\\ -- \\ type \\"potion\\" or \\"quit\\"" -- let runGame = S.'drainWhile' (== Alive) $ S.'mapM' getStatus runEvents -- void $ runStateT runGame 60 -- @ -- -- You can also find the source of this example in the examples directory as -- . It has been adapted from Gabriel's -- -- package. -- This is much simpler compared to the pipes version because of the builtin -- concurrency in streamly. You can also find a SDL based reactive programming -- example adapted from Yampa in -- . -- $performance -- -- Streamly is highly optimized for performance, it is designed for serious -- high performing, concurrent and scalable applications. We have created the -- -- package which is specifically and carefully designed to measure the -- performance of Haskell streaming libraries fairly and squarely in the right -- way. Streamly performs at par or even better than most streaming libraries -- for serial operations even though it needs to deal with the concurrency -- capability. -- $interop -- -- We can use @unfoldr@ and @uncons@ to convert one streaming type to another. -- -- Interop with @vector@: -- -- @ -- import Streamly -- import qualified Streamly.Prelude as S -- import qualified Data.Vector.Fusion.Stream.Monadic as V -- -- -- | vector to streamly -- fromVector :: (IsStream t, Monad m) => V.Stream m a -> t m a -- fromVector = S.unfoldrM unconsV -- where -- unconsV v = do -- r <- V.null v -- if r -- then return Nothing -- else do -- h <- V.head v -- return $ Just (h, V.tail v) -- -- -- | streamly to vector -- toVector :: Monad m => SerialT m a -> V.Stream m a -- toVector = V.unfoldrM (S.uncons . adapt) -- -- main = do -- S.toList (fromVector (V.fromList [1..3])) >>= print -- V.toList (toVector (S.fromFoldable [1..3])) >>= print -- @ -- -- Interop with @pipes@: -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- import qualified Pipes as P -- import qualified Pipes.Prelude as P -- -- -- | pipes to streamly -- fromPipes :: (IsStream t, Monad m) => P.Producer a m r -> t m a -- fromPipes = S.'unfoldrM' unconsP -- where -- -- Adapt P.next to return a Maybe instead of Either -- unconsP p = P.next p >>= either (\\_ -> return Nothing) (return . Just) -- -- -- | streamly to pipes -- toPipes :: Monad m => SerialT m a -> P.Producer a m () -- toPipes = P.unfoldr unconsS -- where -- -- Adapt S.uncons to return an Either instead of Maybe -- unconsS s = S.'uncons' s >>= maybe (return $ Left ()) (return . Right) -- -- main = do -- S.'toList' (fromPipes (P.each [1..3])) >>= print -- P.toListM (toPipes (S.'fromFoldable' [1..3])) >>= print -- @ -- -- Interop with @streaming@: -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- import qualified Streaming as SG -- import qualified Streaming.Prelude as SG -- -- -- | streaming to streamly -- fromStreaming :: (IsStream t, MonadAsync m) => SG.Stream (SG.Of a) m r -> t m a -- fromStreaming = S.unfoldrM SG.uncons -- -- -- | streamly to streaming -- toStreaming :: Monad m => SerialT m a -> SG.Stream (SG.Of a) m () -- toStreaming = SG.unfoldr unconsS -- where -- -- Adapt S.uncons to return an Either instead of Maybe -- unconsS s = S.'uncons' s >>= maybe (return $ Left ()) (return . Right) -- -- main = do -- S.toList (fromStreaming (SG.each [1..3])) >>= print -- SG.toList (toStreaming (S.fromFoldable [1..3])) >>= print -- @ -- -- Interop with @conduit@: -- -- @ -- import "Streamly" -- import qualified "Streamly.Prelude" as S -- import qualified Data.Conduit as C -- import qualified Data.Conduit.List as C -- import qualified Data.Conduit.Combinators as C -- -- -- It seems there is no way out of a conduit as it does not provide an -- -- uncons or a tail function. We can convert streamly to conduit though. -- -- -- | streamly to conduit -- toConduit :: Monad m => SerialT m a -> C.ConduitT i a m () -- toConduit s = C.unfoldM S.'uncons' s -- -- main = do -- C.runConduit (toConduit (S.'fromFoldable' [1..3]) C..| C.sinkList) >>= print -- @ -- $comparison -- -- List transformers and logic programming monads also provide a product style -- composition similar to streamly, however streamly generalizes it with the -- time dimension; allowing streams to be composed in an asynchronous and -- concurrent fashion in many different ways. It also provides multiple -- alternative ways of composing streams e.g. serial, interleaved or -- concurrent. -- -- This seemingly simple addition of asynchronicity and concurrency to product -- style streaming composition unifies a number of disparate abstractions into -- one powerful, concise and elegant abstraction. A wide variety of -- programming problems can be solved elegantly with this abstraction. In -- particular, it unifies three major programming domains namely -- non-deterministic (logic) programming, concurrent programming and functional -- reactive programming. In other words, you can do everything with this one -- abstraction that you could do with the popular libraries listed under these -- categories in the list below. -- -- @ -- +-----------------+----------------+ -- | Non-determinism | | -- | +----------------+ -- | | | -- | +----------------+ -- | | | -- +-----------------+----------------+ -- | Streaming | | -- | +----------------+ -- | | | -- | +----------------+ -- | | | -- | +----------------+ -- | | | -- +-----------------+----------------+ -- | Concurrency | | -- | +----------------+ -- | | | -- +-----------------+----------------+ -- | FRP | | -- | +----------------+ -- | | | -- | +----------------+ -- | | | -- +-----------------+----------------+ -- @ -- -- Streamly is a list-transformer. It provides all the functionality provided -- by any of the list transformer and logic programming packages listed above. -- In addition, Streamly naturally integrates the concurrency dimension to the -- basic list transformer functionality. -- -- When it comes to streaming, in terms of the streaming API streamly is almost -- identical to the vector package. Streamly, vector and streaming packages all -- represent a stream as data and are therefore similar in the fundamental -- approach to streaming. The fundamental difference is that streamly adds -- concurrency support and the monad instance provides concurrent looping. -- Other streaming libraries like pipes, conduit and machines represent and -- compose stream processors rather than the stream data and therefore fall in -- another class of streaming libraries and have comparatively more complicated -- types. -- -- When it comes to concurrency, streamly can do everything that the @async@ -- package can do and more. async provides applicative concurrency whereas -- streamly provides both applicative and monadic concurrency. The -- 'ZipAsync' type behaves like the applicative instance of async. In -- comparison to transient streamly has a first class streaming interface and -- is a monad transformer that can be used universally in any Haskell monad -- transformer stack. Streamly was in fact originally inspired by the -- concurrency implementation in @transient@ though it has no resemblance with -- that and takes a lazy pull approach versus transient's strict push approach. -- -- The non-determinism, concurrency and streaming combination make streamly a -- strong reactive programming library as well. Reactive programming is -- fundamentally stream of events that can be processed concurrently. The -- example in this tutorial as well as the -- example from Yampa demonstrate -- the basic reactive capability of streamly. In core concepts streamly is -- strikingly similar to @dunai@. dunai was designed from a FRP perspective -- and streamly was originally designed from a concurrency perspective. -- However, both have similarity at the core. -- $furtherReading -- -- * Read the documentation of "Streamly" module -- * Read the documentation of "Streamly.Prelude" module -- * See the examples in the "examples" directory of the package -- * See the tests in the "test" directory of the package