{-# OPTIONS_GHC -fno-warn-unused-imports #-} -- | -- Module : User.Tutorials.ConcurrentStreams -- Copyright : (c) 2017 Composewell Technologies -- -- License : BSD3 -- Maintainer : streamly@composewell.com -- -- THIS TUTORIAL IS OBSOLETE. -- -- In this tutorial we will show how streamly can be used for idiomatic and -- declarative concurrent programming. Before you go through this tutorial we -- recommend that you take a look at the Streamly serial streams tutorial. module User.Tutorials.ConcurrentStreams ( -- * Imports -- $setup -- * Concurrent Streams -- $concurrentStreams -- * Combining Streams -- $flavors -- * Imports and Supporting Code -- $imports -- * Generating Streams Concurrently -- $generatingConcurrently -- * Concurrent Pipeline Stages -- $concurrentApplication -- * Mapping Concurrently -- $concurrentTransformation -- * Merging Streams -- ** Semigroup Style -- *** Deep Speculative Composition ('Ahead') -- $ahead -- *** Deep Asynchronous Composition ('Async') -- $async -- *** Wide Asynchronous Composition ('WAsync') -- $wasync -- *** Parallel Asynchronous Composition ('Parallel') -- $parallel -- XXX we should deprecate and remove the mkAsync API -- Custom composition -- custom -- ** Monoid Style -- $monoid -- * Nesting Streams -- ** Monad -- *** Deep Speculative Nesting ('Ahead') -- $aheadNesting -- *** Deep Asynchronous Nesting ('Async') -- $concurrentNesting -- *** Wide Asynchronous Nesting ('WAsync') -- $wasyncNesting -- *** Parallel Asynchronous Nesting ('Parallel') -- $parallelNesting -- ** Applicative -- $applicative -- * Zipping Streams -- ** Parallel Zipping -- $parallelzip -- * Concurrent Programming -- $concurrent -- * Writing Concurrent Programs -- $programs -- * Where to go next? -- $furtherReading ) where import Streamly.Data.Stream import Data.Semigroup import Control.Applicative import Control.Monad import Control.Monad.IO.Class (MonadIO(..)) import Control.Monad.Trans.Class (MonadTrans (lift)) -- CAUTION: please keep setup and imports sections in sync -- XXX This tutorial has to be rewritten. -- $setup -- In most of example snippets we do not repeat the imports. Where imports are -- not explicitly specified use the imports shown below. -- -- >>> :m -- >>> :set -fno-warn-deprecations -- >>> import Data.Function ((&)) -- >>> import Streamly.Prelude ((|:), (|&)) -- >>> import Streamly.Internal.Data.Stream.Cross (CrossStream(..)) -- >>> import qualified Streamly.Prelude as Stream -- >>> import qualified Streamly.Data.Fold as Fold -- -- To illustrate concurrent vs serial composition aspects, we will use the -- following @delay@ function to introduce a sleep or delay specified in -- seconds. After the delay it prints the number of seconds it slept. -- -- >>> import Control.Concurrent (threadDelay, myThreadId) -- >>> :{ -- delay n = Stream.fromEffect $ do -- threadDelay (n * 1000000) -- tid <- myThreadId -- putStrLn (show tid ++ ": Delay " ++ show n) -- :} -- -- For concurrent examples, use line buffering, otherwise output from different -- threads may get mixed: -- -- >>> import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering)) -- >>> hSetBuffering stdout LineBuffering -- -- $concurrentStreams -- -- Many stream operations can be done concurrently: -- -- * Streams can be generated concurrently. -- -- * Streams can be merged concurrently. -- -- * Multiple stages in a streaming pipeline can run concurrently. -- -- * Streams can be mapped and zipped concurrently. -- -- * In monadic composition they combine like a list transformer, -- providing concurrent non-determinism. -- -- There are three basic concurrent stream styles, 'Ahead', 'Async', and -- 'Parallel'. The 'Ahead' style streams are similar to 'Serial' except that -- they can speculatively execute multiple stream actions concurrently in -- advance. 'Ahead' would return exactly the same stream as 'Serial' except -- that it may execute the actions concurrently. The 'Async' style streams, -- like 'Ahead', speculatively execute multiple stream actions in advance but -- return the results in their finishing order rather than in the stream -- traversal order. 'Parallel' is like 'Async' except that it provides -- unbounded parallelism instead of controlled parallelism. -- -- For easy reference, we can classify the stream types based on /execution order/, -- /consumption order/, and /bounded or unbounded/ concurrency. -- Execution could be serial (i.e. synchronous) or asynchronous. In serial -- execution we execute the next action in the stream only after the previous -- one has finished executing. In asynchronous execution multiple actions in -- the stream can be executed asynchronously i.e. the next action can start -- executing even before the first one has finished. Consumption order -- determines the order in which the outputs generated by the composition are -- consumed. Consumption could be serial or asynchronous. In serial -- consumption, the outputs are consumed in the traversal order, in -- asynchronous consumption the outputs are consumed as they arrive i.e. first -- come first serve order. -- -- +------------+--------------+--------------+--------------+ -- | Type | Execution | Consumption | Concurrency | -- +============+==============+==============+==============+ -- | 'Serial' | Serial | Serial | None | -- +------------+--------------+--------------+--------------+ -- | 'Ahead' | Asynchronous | Serial | bounded | -- +------------+--------------+--------------+--------------+ -- | 'Async' | Asynchronous | Asynchronous | bounded | -- +------------+--------------+--------------+--------------+ -- | 'Parallel' | Asynchronous | Asynchronous | unbounded | -- +------------+--------------+--------------+--------------+ -- -- All these types can be freely inter-converted using type conversion -- combinators or type annotations, without any cost, to achieve the desired -- composition style. To force a particular type of composition, we coerce the -- stream type using the corresponding type adapting combinator from -- 'fromSerial', 'fromAhead', 'fromAsync', or 'fromParallel'. The default stream type -- is inferred as 'Serial' unless you change it by using one of the combinators -- or by using a type annotation. -- $flavors -- -- Streams can be combined using '<>' or 'mappend' to form a -- composite. Composite streams can be interpreted in a depth first or -- breadth first manner using an appropriate type conversion before -- consumption. Deep (e.g. 'Serial') stream type variants traverse a -- composite stream in a depth first manner, such that each stream is -- traversed fully before traversing the next stream. Wide -- (e.g. 'WSerial') stream types traverse it in a breadth first -- manner, such that one element from each stream is traversed before -- coming back to the first stream again. -- -- Each stream type has a wide traversal variant prefixed by 'W'. The wide -- variant differs only in the Semigroup\/Monoid, Applicative\/Monad -- compositions of the streams. -- The following table summarizes the basic types and the corresponding wide -- variants: -- -- @ -- +------------+-----------+ -- | Deep | Wide | -- +============+===========+ -- | 'Serial' | 'WSerial' | -- +------------+-----------+ -- | 'Ahead' | 'WAhead' | -- +------------+-----------+ -- | 'Async' | 'WAsync' | -- +------------+-----------+ -- @ -- -- Other than these types there are also 'ZipSerial' and 'ZipAsync' types that -- zip streams serially or concurrently using 'Applicative' operation. These -- types are not monads they are only applicatives and they do not differ in -- 'Semigroup' composition. -- -- $programs -- -- When writing concurrent programs it is advised to not use the concurrent -- style stream combinators blindly at the top level. That might create too -- much concurrency where it is not even required, and can even degrade -- performance in some cases. In some cases it can also lead to surprising -- behavior because of some code that is supposed to be serial becoming -- concurrent. Please be aware that all concurrency capable APIs that you may -- have used under the scope of a concurrent stream combinator will become -- concurrent. For example if you have a 'repeatM' somewhere in your program -- and you use 'fromParallel' on top, the 'repeatM' becomes fully parallel, -- resulting into an infinite parallel execution . Instead, use the -- /Keep It Serial and Stupid/ principle, start with the default serial -- composition and enable concurrent combinators only when and where necessary. -- When you use a concurrent combinator you can use an explicit 'fromSerial' -- combinator to suppress any unnecessary concurrency under the scope of that -- combinator. -- $generatingConcurrently -- -- Monadic construction and generation functions like 'consM', 'unfoldrM', -- 'replicateM', 'repeatM', 'iterateM' and 'fromFoldableM' work concurrently -- when used with appropriate stream type combinator. The pure versions of -- these APIs are not concurrent, however you can use the monadic versions even -- for pure computations by wrapping the pure value in a monad to get the -- concurrent generation capability where required. -- -- The following code finishes in 3 seconds (6 seconds when serial): -- -- >>> let p n = threadDelay (n * 1000000) >> return n -- >>> Stream.toList $ Stream.fromParallel $ p 3 |: p 2 |: p 1 |: Stream.nil -- [1,2,3] -- -- >>> Stream.toList $ Stream.fromAhead $ p 3 |: p 2 |: p 1 |: Stream.nil -- [3,2,1] -- -- The following finishes in 10 seconds (100 seconds when serial): -- -- >>> Stream.drain $ Stream.fromAsync $ Stream.replicateM 10 $ p 10 -- -- $concurrentTransformation -- -- Monadic transformation functions 'mapM' and 'sequence' work concurrently -- when used with appropriate stream type combinators. The pure versions do not -- work concurrently, however you can use the monadic versions even for pure -- computations to get the concurrent transformation capability where required. -- -- This would print a value every second (2 seconds when serial): -- -- >>> let p n = threadDelay (n * 1000000) >> return n -- >>> :{ -- parMap = -- Stream.repeatM (p 1) -- & Stream.fromSerial -- repeatM is serial -- & Stream.mapM (\x -> p 1 >> print x) -- & Stream.fromAhead -- mapM is cocnurrent using Ahead style -- & Stream.drain -- :} -- -- $concurrentApplication -- -- The concurrent function application operators '|$' and '|&' apply a stream -- argument to a stream function concurrently to compose a concurrent pipeline -- of stream processing functions: -- -- Because both the stages run concurrently, we would see a delay of only 1 -- second instead of 2 seconds in the following: -- -- >>> let p n = threadDelay (n * 1000000) >> return n -- >>> :{ -- parApp = -- Stream.repeatM (p 1) -- |& Stream.mapM (\x -> p 1 >> print x) -- & Stream.drain -- :} -- $ahead -- -- The 'Semigroup' operation '<>' of the 'Ahead' type combines two streams in a -- /serial depth first/ manner with concurrent lookahead. We use the 'fromAhead' -- type combinator to effect 'Ahead' style of composition. We can also use an -- explicit 'Ahead' type annotation for the stream to achieve the same effect. -- -- When two streams are combined in this manner, the streams are traversed in -- depth first manner just like 'Serial', however it can execute the next -- stream concurrently and keep the results ready when its turn arrives. -- Concurrent execution of the next stream(s) is performed if the first stream -- blocks or if it cannot produce output at the rate that is enough to meet the -- consumer demand. Multiple streams can be executed concurrently to meet the -- demand. The following example would print the result in a second even -- though each action in each stream takes one second: -- -- >>> p n = threadDelay 1000000 >> return n -- >>> stream1 = p 1 |: p 2 |: Stream.nil -- >>> stream2 = p 3 |: p 4 |: Stream.nil -- >>> Stream.toList $ Stream.fromAhead $ stream1 <> stream2 -- [1,2,3,4] -- -- Each stream is constructed 'fromAhead' and then both the streams are merged -- 'fromAhead', therefore, all the actions can run concurrently but the result is -- presented in serial order. -- -- You can also use the polymorphic combinator 'ahead' in place of '<>' to -- compose any type of streams in this manner. -- $async -- -- The 'Semigroup' operation '<>' of the 'Async' type combines the two -- streams in a depth first manner with parallel look ahead. We use the -- 'fromAsync' type combinator to effect 'Async' style of composition. We -- can also use the 'Async' type annotation for the stream type to achieve -- the same effect. -- -- When two streams with multiple elements are combined in this manner, the -- streams are traversed in depth first manner just like 'Serial', however it -- can execute the next stream concurrently and return the results from it -- as they arrive i.e. the results from the next stream may be yielded even -- before the results from the first stream. Concurrent execution of the next -- stream(s) is performed if the first stream blocks or if it cannot produce -- output at the rate that is enough to meet the consumer demand. Multiple -- streams can be executed concurrently to meet the demand. -- In the example below each element in the stream introduces a constant delay -- of 1 second, however, it takes just one second to produce all the results. -- The results are not guaranteed to be in any particular order: -- -- >>> p n = threadDelay 1000000 >> return n -- >>> stream1 = p 1 |: p 2 |: Stream.nil -- >>> stream2 = p 3 |: p 4 |: Stream.nil -- >>> Stream.toList $ Stream.fromAsync $ stream1 <> stream2 -- ... -- -- The constituent streams are also composed in 'Async' manner and the -- composition of streams too. We can compose the constituent streams to run -- serially, in that case it would take 2 seconds to produce all the results. -- The elements in the serial streams would be in serial order in the results: -- -- >>> p n = threadDelay 1000000 >> return n -- >>> stream = (Stream.fromSerial stream1) <> (Stream.fromSerial stream2) -- >>> Stream.toList $ Stream.fromAsync stream -- ... -- -- In the following example we can see that new threads are started when a -- computation blocks. Notice that the output from the stream with the -- shortest delay is printed first. The whole computation takes @maximum of -- (3, 2, 1) = 3@ seconds: -- -- >>> Stream.drain $ Stream.fromAsync $ delay 3 <> delay 2 <> delay 1 -- ThreadId ...: Delay 1 -- ThreadId ...: Delay 2 -- ThreadId ...: Delay 3 -- -- When we have a tree of computations composed using this style, the tree is -- traversed in DFS style just like the 'Serial' style, the only difference is -- that here we can move on to executing the next stream if a stream blocks. -- However, we will not start new threads if we have sufficient output to -- saturate the consumer. This is why we call it left-biased demand driven or -- adaptive concurrency style, the concurrency tends to stay on the left side -- of the composition as long as possible. More threads are started based on -- the pull rate of the consumer. The following example prints an output every -- second as all of the actions are concurrent. -- -- >>> Stream.drain $ Stream.fromAsync $ (delay 1 <> delay 2) <> (delay 3 <> delay 4) -- ThreadId ...: Delay 1 -- ThreadId ...: Delay 2 -- ThreadId ...: Delay 3 -- ThreadId ...: Delay 4 -- -- All the computations may even run in a single thread when more threads are -- not needed. As you can see, in the following example the computations are -- run in a single thread one after another, because none of them blocks. -- However, if the thread consuming the stream were faster than the producer -- then it would have started parallel threads for each computation to keep up -- even if none of them blocks: -- -- >>> :{ -- traced m = Stream.fromEffect (myThreadId >>= print) >> return m -- stream = traced (sqrt 9) <> traced (sqrt 16) <> traced (sqrt 25) -- main = Stream.drain $ Stream.fromAsync stream -- :} -- -- Note that the order of printing in the above examples may change due to -- variations in scheduling latencies for concurrent threads. -- -- The polymorphic version of the 'Async' binary operation '<>' is called -- 'async'. We can use 'async' to join streams in a left biased -- adaptively concurrent manner irrespective of the type, notice that we have -- not used the 'fromAsync' combinator in the following example: -- -- >>> Stream.drain $ delay 3 `Stream.async` delay 2 `Stream.async` delay 1 -- ThreadId ...: Delay 1 -- ThreadId ...: Delay 2 -- ThreadId ...: Delay 3 -- -- Since the concurrency provided by this operator is demand driven it cannot -- be used when the composed computations start timers that are relative to -- each other because all computations may not be started at the same time and -- therefore timers in all of them may not start at the same time. When -- relative timing among all computations is important or when we need to start -- all computations at once for any reason 'Parallel' style must be used -- instead. -- -- 'Async' style utilizes resources optimally and should be preferred over -- 'Parallel' or 'WAsync' unless you really need those. 'Async' should be used -- when we know that the computations can run in parallel but we do not care if -- they actually run in parallel or not, that decision can be left to the -- scheduler based on demand. Also, note that 'async' operator can be used to fold -- infinite number of streams in contrast to the 'Parallel' or 'WAsync' styles, -- because it does not require us to run all of them at the same time in a fair -- manner. -- $wasync -- -- The 'Semigroup' operation '<>' of the 'WAsync' type combines two streams in -- a concurrent manner using /breadth first traversal/. We use the 'fromWAsync' -- type combinator to effect 'WAsync' style of composition. We can also use the -- 'WAsync' type annotation for the stream to achieve the same effect. -- -- When streams with multiple elements are combined in this manner, we traverse -- all the streams concurrently in a breadth first manner i.e. one action from -- each stream is performed and yielded to the resulting stream before we come -- back to the first stream again and so on. Even though we execute the actions -- in a breadth first order the outputs are consumed on a first come first -- serve basis. -- -- In the following example we can see that outputs are produced in the breadth -- first traversal order but this is not guaranteed. -- -- >>> stream1 = print 1 |: print 2 |: Stream.nil -- >>> stream2 = print 3 |: print 4 |: Stream.nil -- >>> Stream.drain $ Stream.fromWAsync $ stream1 <> stream2 -- 1 -- 3 -- 2 -- 4 -- -- The polymorphic version of the binary operation '<>' of the 'WAsync' type is -- 'wAsync'. We can use 'wAsync' to join streams using a breadth first -- concurrent traversal irrespective of the type, notice that we have not used -- the 'fromWAsync' combinator in the following example: -- -- >>> Stream.drain $ delay 3 `Stream.wAsync` delay 2 `Stream.wAsync` delay 1 -- ThreadId ...: Delay 1 -- ThreadId ...: Delay 2 -- ThreadId ...: Delay 3 -- -- Since the concurrency provided by this style is demand driven it may not -- be used when the composed computations start timers that are relative to -- each other because all computations may not be started at the same time and -- therefore timers in all of them may not start at the same time. When -- relative timing among all computations is important or when we need to start -- all computations at once for any reason 'Parallel' style must be used -- instead. -- -- $parallel -- -- The 'Semigroup' operation '<>' of the 'Parallel' type combines the two -- streams in a fairly concurrent manner with round robin scheduling. We use -- the 'fromParallel' type combinator to effect 'Parallel' style of composition. -- We can also use the 'Parallel' type annotation for the stream type to -- achieve the same effect. -- -- When two streams with multiple elements are combined in this manner, the -- monadic actions in both the streams are performed concurrently with a fair -- round robin scheduling. The outputs are yielded in the order in which the -- actions complete. This is pretty similar to the 'WAsync' type, the -- difference is that 'WAsync' is adaptive to the consumer demand and may or -- may not execute all actions in parallel depending on the demand, whereas -- 'Parallel' runs all the streams in parallel irrespective of the demand. -- -- The polymorphic version of the binary operation '<>' of the 'Parallel' type -- is 'parallel'. We can use 'parallel' to join streams in a fairly concurrent -- manner irrespective of the type, notice that we have not used the -- 'fromParallel' combinator in the following example: -- -- >>> Stream.drain $ delay 3 `Stream.parallel` delay 2 `Stream.wAsync` delay 1 -- ThreadId ...: Delay 1 -- ThreadId ...: Delay 2 -- ThreadId ...: Delay 3 -- -- Note that this style of composition cannot be used to combine infinite -- number of streams, as it will lead to an infinite sized scheduling queue. -- -- XXX to be removed -- $custom -- -- The 'mkAsync' API can be used to create references to asynchronously running -- stream computations. We can then use 'uncons' to explore the streams -- arbitrarily and then recompose individual elements to create a new stream. -- This way we can dynamically decide which stream to explore at any given -- time. Take an example of a merge sort of two sorted streams. We need to -- keep consuming items from the stream which has the lowest item in the sort -- order. This can be achieved using async references to streams. See -- "MergeSort.hs" in . -- $monoid -- -- All of the following are equivalent and start ten concurrent tasks each with -- a delay from 1 to 10 seconds, resulting in the printing of each number every -- second: -- -- >>> :{ -- main = do -- Stream.drain $ Stream.fromAsync $ foldMap delay [1..10] -- Stream.drain $ Stream.concatFoldableWith Stream.async (map delay [1..10]) -- Stream.drain $ Stream.concatMapFoldableWith Stream.async delay [1..10] -- Stream.drain $ Stream.concatForFoldableWith Stream.async [1..10] delay -- :} -- -- $aheadNesting -- -- The 'Monad' composition of 'Ahead' type behaves just like 'Serial' except -- that it can speculatively perform a bounded number of next iterations of a -- loop concurrently. -- -- >>> :{ -- Stream.toList $ Stream.fromAhead $ do -- x <- Stream.fromFoldable [3,2,1] -- delay x -- return x -- :} -- ThreadId ...: Delay 1 -- ThreadId ...: Delay 2 -- ThreadId ...: Delay 3 -- [3,2,1] -- -- This code finishes in 3 seconds, 'Serial' would take 6 seconds. As we can -- see all the three iterations are concurrent and run in different threads, -- however, the results are returned in the serial order. -- -- Concurrency is demand driven, when multiple streams are composed using this -- style, the iterations are executed in a depth first manner just like -- 'Serial' i.e. nested iterations are executed before we proceed to the next -- outer iteration. The only difference is that we may execute multiple future -- iterations concurrently and keep the results ready. -- -- The 'fromAhead' type combinator can be used to switch to this style of -- composition. Alternatively, a type annotation can be used to specify the -- type of the stream as 'Ahead'. -- -- $concurrentNesting -- -- The 'Monad' composition of 'Async' type can perform the iterations of a -- loop concurrently. -- -- >>> :{ -- Stream.drain $ Stream.fromAsync $ do -- x <- Stream.fromFoldable [3,2,1] -- delay x -- :} -- ThreadId ...: Delay 1 -- ThreadId ...: Delay 2 -- ThreadId ...: Delay 3 -- -- As we can see the code after the @fromFoldable@ statement is run three -- times, once for each value of @x@. All the three iterations are concurrent -- and run in different threads. The iteration with least delay finishes first. -- When compared to imperative programming, this can be viewed as a @for@ loop -- with three concurrent iterations. -- -- Concurrency is demand driven i.e. more concurrent iterations are started -- only if the previous iterations are not able to saturate the consumer of the -- output stream. This works exactly the same way as the merging of two -- streams using 'async' works. -- -- The 'fromAsync' type combinator can be used to switch to this style of -- composition. Alternatively, a type annotation can be used to specify the -- type of the stream as 'Async'. -- -- When multiple streams are nested using this style, the iterations are -- concurrently evaluated in a depth first manner: -- -- -- >>> :{ -- Stream.drain $ Stream.fromAsync $ do -- x <- Stream.fromFoldable [1,2] -- y <- Stream.fromFoldable [3,4] -- Stream.fromEffect $ putStrLn $ show (x, y) -- :} -- (1,3) -- ... -- ... -- ... -- -- Nested iterations are given preference for concurrent evaluation i.e. -- (1,4) will be scheduled in preference to (2,3). -- $wasyncNesting -- -- Like 'Async', the 'Monad' composition of 'WAsync' runs the iterations of a -- loop concurrently. It differs from 'Async' in the nested loop behavior. Like -- 'WSerial', the nested loops in this type are traversed and executed in a -- breadth first manner rather than the depth first manner of 'Async' style. -- -- >>> :{ -- Stream.drain $ Stream.fromWAsync $ do -- x <- Stream.fromSerial $ Stream.fromFoldable [1,2] -- y <- Stream.fromSerial $ Stream.fromFoldable [3,4] -- Stream.fromEffect $ putStrLn $ show (x, y) -- :} -- (1,3) -- (1,4) -- (2,3) -- (2,4) -- -- Note that (2,3) is preferred to (1,4) when evaluating the iterations -- concurrently. This works exactly the same way as the merging of two streams -- using 'wAsync' works. -- -- The 'fromWAsync' type combinator can be used to switch to this style of -- composition. Alternatively, a type annotation can be used to specify the -- type of the stream as 'WAsync'. -- -- $parallelNesting -- -- Just like 'Async' or 'WAsync' the 'Monad' composition of 'Parallel' runs the -- iterations of a loop concurrently. -- -- >>> :{ -- Stream.drain $ Stream.fromParallel $ do -- x <- Stream.fromFoldable [3,2,1] -- delay x -- :} -- ThreadId ...: Delay 1 -- ThreadId ...: Delay 2 -- ThreadId ...: Delay 3 -- -- It differs from 'Async' and 'WAsync' in the nested loop behavior. All -- iterations of the loop are run fully concurrently irrespective of the -- demand. This works exactly the same way as the merging of streams using -- 'parallel' works. -- -- The 'fromParallel' type combinator can be used to switch to this style of -- composition. Alternatively, a type annotation can be used to specify the -- type of the stream as 'Parallel'. -- -- $applicative -- -- 'Async' can run the iterations concurrently, therefore, it takes a total -- of 6 seconds which is max (1, 2) + max (3, 4): -- -- >>> (Stream.toList $ Stream.fromAsync $ (,) <$> s1 <*> s2) >>= print -- ... -- -- @ -- ThreadId 34: Delay 1 -- ThreadId 36: Delay 2 -- ThreadId 35: Delay 3 -- ThreadId 36: Delay 3 -- ThreadId 35: Delay 4 -- ThreadId 36: Delay 4 -- [(1,3),(2,3),(1,4),(2,4)] -- @ -- -- Similarly, 'WAsync' as well can run the iterations concurrently, but with a -- different style of scheduling than 'Async' as explained in the Monad -- section, therefore, it too takes a total of 6 seconds (2 + 4): -- -- >>> (Stream.toList $ Stream.fromWAsync $ (,) <$> s1 <*> s2) >>= print -- ... -- -- @ -- ThreadId 34: Delay 1 -- ThreadId 36: Delay 2 -- ThreadId 35: Delay 3 -- ThreadId 36: Delay 3 -- ThreadId 35: Delay 4 -- ThreadId 36: Delay 4 -- [(1,3),(2,3),(1,4),(2,4)] -- @ -- $parallelzip -- -- The applicative instance of 'ZipAsync' type zips streams concurrently. -- 'fromZipAsync' type combinator can be used to switch to parallel applicative -- zip composition: -- -- >>> d n = unCrossStream (CrossStream (delay n) >> return n) -- >>> s1 = Stream.fromSerial $ d 2 <> d 4 -- >>> s2 = Stream.fromSerial $ d 3 <> d 1 -- >>> (Stream.toList $ Stream.fromZipAsync $ (,) <$> s1 <*> s2) >>= print -- ThreadId ...: Delay 2 -- ThreadId ...: Delay 3 -- ThreadId ...: Delay 1 -- ThreadId ...: Delay 4 -- [(2,3),(4,1)] -- -- The actions within each stream are executed serially, however, the two -- streams are run concurrently with respect to each other. Therefore, it takes -- 6 seconds to zip the two streams, which is the maximum of the time to -- evaluate stream s1 (2 + 4 = 6 seconds) and stream s2 (3 + 1 = 4 seconds). -- $concurrent -- -- When writing concurrent programs there are two distinct places where the -- programmer can control the concurrency. First, when /composing/ a stream by -- merging multiple streams we can choose an appropriate sum style operators to -- combine them concurrently or serially. Second, when /processing/ a stream in -- a monadic composition we can choose one of the monad composition types to -- choose the desired type of concurrency. -- -- In the following example the squares of @x@ and @y@ are computed -- concurrently using the 'async' operation and the square roots of their -- sum are computed serially because of the 'fromSerial' combinator. We can -- choose different combinators for the monadic processing and the stream -- generation, to control the concurrency. We can also use the 'fromAsync' -- combinator instead of explicitly folding with 'async'. -- -- >>> import Data.List (sum) -- >>> :{ -- main = do -- z <- Stream.toList -- $ Stream.fromSerial -- Serial monadic processing (sqrt below) -- $ unCrossStream $ do -- -- Concurrent @"for"@ loop -- x2 <- CrossStream (Stream.concatForFoldableWith Stream.async [1..100] $ -- \x -> Stream.fromPure $ x * x) -- body of the loop -- y2 <- CrossStream (Stream.concatForFoldableWith Stream.async [1..100] $ -- \y -> Stream.fromPure $ y * y) -- return $ sqrt (x2 + y2) -- print $ sum z -- :} -- -- We can see how this directly maps to the imperative style -- model, we use combinators -- and operators instead of the ugly pragmas. -- -- For more concurrent programming examples see, -- . -- $furtherReading -- -- * Read the reactive programming tutorial -- * See the examples in repo.