úÎ!ÌMµM      !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~€‚ƒ„…†‡ˆ‰Š‹ŒŽ‘’“”•–—˜™š›œ(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone+;<=>?CDFKV]U„streamlyŽA monad that can perform concurrent or parallel IO operations. Streams that can be composed concurrently require the underlying monad to be .streamly6Specifies the stream yield rate in yields per second (Hertz*). We keep accumulating yield credits at S. At any point of time we allow only as many yields as we have accumulated as per { since the start of time. If the consumer or the producer is slower or faster, the actual rate may fall behind or exceed ’. We try to recover the gap between the two by increasing or decreasing the pull rate from the producer. However, if the gap becomes more than  $ we try to recover only as much as  . puts a bound on how low the instantaneous rate can go when recovering the rate gap. In other words, it determines the maximum yield latency. Similarly,  ÿ puts a bound on how high the instantaneous rate can go when recovering the rate gap. In other words, it determines the minimum yield latency. We reduce the latency by increasing concurrency, therefore we can say that it puts an upper bound on concurrency.If the ; is 0 or negative the stream never yields a value. If the  / is 0 or negative we do not attempt to recover.streamlyThe lower rate limitstreamly"The target rate we want to achieve streamlyThe upper rate limit streamlyMaximum slack from the goalstreamlyÿVAn SVar or a Stream Var is a conduit to the output from multiple streams running concurrently and asynchronously. An SVar can be thought of as an asynchronous IO handle. We can write any number of streams to an SVar in a non-blocking manner and then read them back at any time at any pace. The SVar would run the streams asynchronously and accumulate results. An SVar may not really execute the stream completely and accumulate all the results. However, it ensures that the reader can read the results at whatever paces it wants to read. The SVar monitors and adapts to the consumer's pace.ÿŸAn SVar is a mini scheduler, it has an associated workLoop that holds the stream tasks to be picked and run by a pool of worker threads. It has an associated output queue where the output stream elements are placed by the worker threads. A outputDoorBell is used by the worker threads to intimate the consumer thread about availability of new results in the output queue. More workers are added to the SVar by  fromStreamVarÿ" on demand if the output produced is not keeping pace with the consumer. On bounded SVars, workers block on the output queue to provide throttling of the producer when the consumer is not pulling fast enough. The number of workers may even get reduced depending on the consuming pace.{New work is enqueued either at the time of creation of the SVar or as a result of executing the parallel combinators i.e. <| and <|>< when the already enqueued computations get evaluated. See joinStreamVarAsync.žstreamlyhIdentify the type of the SVar. Two computations using the same style can be scheduled on the same SVar.Ÿstreamly=Sorting out-of-turn outputs in a heap for Ahead style streams streamly7Events that a child thread may send to a parent thread.¡streamlyŒThis function is used by the producer threads to queue output for the consumer thread to consume. Returns whether the queue has more space.¢streamly}This is safe even if we are adding more threads concurrently because if a child thread is adding another thread then anyway £ will not be empty.¤streamlyÿ<In contrast to pushWorker which always happens only from the consumer thread, a pushWorkerPar can happen concurrently from multiple threads on the producer side. So we need to use a thread safe modification of workerThreads. Alternatively, we can use a CreateThread event to avoid using a CAS based modification.¥streamly]This is a magic number and it is overloaded, and used at several places to achieve batching: «If we have to sleep to slowdown this is the minimum period that we accumulate before we sleep. Also, workers do not stop until this much sleep time is accumulated.hCollected latencies are computed and transferred to measured latency after a minimum of this period.¦streamlyÿ`Another magic number! When we have to start more workers to cover up a number of yields that we are lagging by then we cannot start one worker for each yield because that may be a very big number and if the latency of the workers is low these number of yields could be very high. We assume that we run each extra worker for at least this much time.§streamly¥Get the worker latency without resetting workerPendingLatency Returns (total yield count, base time, measured latency) CAUTION! keep it in sync with collectLatency¨streamlyWrite a stream to an ©Q in a non-blocking manner. The stream can then be read back from the SVar using fromSVar.ª«¬­®¯©°±²³´µ¶·¸¹º»¼½¾£¿ÀÁÂÃÄÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖרÙÚÛÜÝ ÞßàážâãäåŸæç èéêëìíîïðñòóôõö÷øùúûüýþ¡ÿ     ¤¨(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone+.;<=>?DQV]‘™ streamlySame as  . streamlyDClass of types that can represent a stream of elements of some type a in some monad m. streamly_Constructs a stream by adding a monadic action at the head of an existing stream. For example: M> toList $ getLine `consM` getLine `consM` nil hello world ["hello","world"] Concurrent (do not use  parallely to construct infinite streams)streamlyOperator equivalent of  . We can read it as "parallel colon" to remember that | comes before :. C> toList $ getLine |: getLine |: nil hello world ["hello","world"]  ™let delay = threadDelay 1000000 >> print 1 runStream $ serially $ delay |: delay |: delay |: nil runStream $ parallely $ delay |: delay |: delay |: nil Concurrent (do not use  parallely to construct infinite streams)streamly The type  Stream m a/ represents a monadic stream of values of type a% constructed using actions in monad ma. It uses stop, singleton and yield continuations equivalent to the following direct style type: <data Stream m a = Stop | Singleton a | Yield a (Stream m a) CTo facilitate parallel composition we maintain a local state in an ©W that is shared across and is used for synchronization of the streams being composed.ÜThe singleton case can be expressed in terms of stop and yield but we have it as a separate case to optimize composition operations for streams with single element. We build singleton streams in the implementation of $ for Applicative and Monad, and in  for MonadTrans.streamlyAAdapt any specific stream type to any other specific stream type.streamlyBuild a stream from an ©Q, a stop continuation, a singleton stream continuation and a yield continuation.streamlyAn empty stream. > toList nil [] streamlyuConstruct a stream by adding a pure value at the head of an existing stream. For serial streams this is the same as (return a) `consM` rM but more efficient. For concurrent streams this is not concurrent whereas   is concurrent. For example: 2> toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3] streamlyOperator equivalent of . &> toList $ 1 .: 2 .: 3 .: nil [1,2,3] streamly.Make an empty stream from a callback function. streamly:Make a singleton stream from a one shot callback function.!streamly,Construct a stream from a callback function.streamlySame as yieldMstreamly6Generate an infinite stream by repeating a pure value.streamlyConstruct a stream from a " containing pure values.#streamlylFold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation.$streamlyLazy right associative fold.%streamly-Lazy right fold with a monadic step function.&streamlyÜStrict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.'streamlyStrict left associative fold.(streamlyLike &#, but with a monadic step function.)streamlyLike '" but with a monadic step function.*streamly/Extract the last element of the stream, if any.+streamly[Apply a monadic action to each element of the stream and discard the output of the action.,streamly7Zip two streams serially using a pure zipping function.-streamly:Zip two streams serially using a monadic zipping function..streamlyˆConcatenates two streams sequentially i.e. the first stream is exhausted completely before yielding any element from the second stream.G /0123 !456789:#$%;&'()<=>?@ABCD*EFGHI+JKLMNOPQRSTUVW,-.XY 5555(c) 2018 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone+;<=>?CDQV]Ÿí ZstreamlypA stream consists of a step function that generates the next step given a current state, and the current state.[streamlyA stream is a succession of [s. A \< produces a single value and the next state of the stream. ]3 indicates there are no more values in the stream.^streamly An empty Z._streamly#Can fuse but has O(n^2) complexity.`streamlyCreate a singleton Z from a pure value.astreamlyCreate a singleton Z from a monadic action.bstreamly'Convert a list of monadic actions to a Zcstreamly#Convert a list of pure values to a Zdstreamly1Run a streaming composition, discard the results.estreamly1Execute a monadic action for each element of the ZfstreamlyMap a monadic function over a Z2Zg[\]^_hijkl`abcmnopqdrstuvwxyz{e|}~€‚ƒ„…†f‡ˆ‰Š‹(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone +.0;<=>?KÐÌ streamlystreamly5An interleaving serial IO stream of elements of type a. See ! documentation for more details.streamlySWide serial composition or serial composition with a breadth first traversal. The  instance of – traverses the two streams in a breadth first manner. In other words, it interleaves two streams, yielding one element from each stream alternately. !import Streamly import qualified Streamly.Prelude as S main = (toList . ; $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print   [1,3,2,4] Similarly, the Œo instance interleaves the iterations of the inner and the outer loop, nesting loops in a breadth first manner. main =  runStream . ^ $ do x <- return 1 <> return 2 y <- return 3 <> return 4 S.yieldM $ print (x, y)  (1,3) (2,3) (1,4) (2,4) ¡Note that a serial composition with breadth first traversal can only combine a finite number of streams as it needs to retain state for each unfinished stream.streamlystreamly'A serial IO stream of elements of type a. See ! documentation for more details.streamlyODeep serial composition or serial composition with depth first traversal. The  instance of “ appends two streams serially in a depth first manner, yielding all elements from the first stream, and then all elements from the second stream. !import Streamly import qualified Streamly.Prelude as S main = (toList . ; $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print   [1,2,3,4] The Œ instance runs the monadic continuation+ for each element of the stream, serially. main =  runStream . ; $ do x <- return 1 <> return 2 S.yieldM $ print x  1 2 0 nests streams serially in a depth first manner. main =  runStream . ^ $ do x <- return 1 <> return 2 y <- return 3 <> return 4 S.yieldM $ print (x, y)  (1,3) (1,4) (2,3) (2,4) This behavior of Ä is exactly like a list transformer. We call the monadic code being run for each element of the stream a monadic continuation. In imperative paradigm we can think of this composition as nested foro loops and the monadic continuation is the body of the loop. The loop iterates for all elements of the stream.The : combinator can be omitted as the default stream type is ›. Note that serial composition with depth first traversal can be used to combine an infinite number of streams as it explores only one stream at a time.streamly(Fix the type of a polymorphic stream as .streamlyPolymorphic version of the  operation  of €. Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.streamlySame as .streamly(Fix the type of a polymorphic stream as . streamlySame as .!streamlyPolymorphic version of the  operation  of N. Interleaves two streams, yielding one element from each stream alternately."streamlySame as !.Ž !""5(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone +;<=>?DQV]k streamlyPull a stream from an SVar.streamlyWrite a stream to an ©Q in a non-blocking manner. The stream can then be read back from the SVar using ‘.#streamlyêSpecify the maximum number of threads that can be spawned concurrently for any concurrent combinator in a stream. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500.ÿÿWhen the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.$streamlyÿ;Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.CAUTION! using an unbounded $: value (i.e. a negative value) coupled with an unbounded #€ value is a recipe for disaster in presence of infinite streams, or very large streams. Especially, it must not be used when  is used in  ZipAsyncM streams as Š in applicative zip streams generates an infinite stream causing unbounded concurrent generation with no limit on the buffer or threads.%streamly&Specify the pull rate of a stream. A ’æ value resets the rate to default which is unlimited. When the rate is specified, concurrent production may be ramped up or down automatically to achieve the specified yield rate. The specific behavior for different styles of $ specifications is documented under N. The effective maximum production rate achieved by a stream is governed by:The # limitThe $ limit5The maximum rate that the stream producer can achieve5The maximum rate that the stream consumer can achieve&streamlySame as )rate (Just $ Rate (r/2) r (2*r) maxBound)YSpecifies the average production rate of a stream in number of yields per second (i.e. Hertzæ). Concurrent production is ramped up or down automatically to achieve the specified average yield rate. The rate can go down to half of the specified rate on the lower side and double of the specified rate on the higher side.'streamlySame as %rate (Just $ Rate r r (2*r) maxBound)ÿSpecifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.(streamlySame as %rate (Just $ Rate (r/2) r r maxBound)ÿsSpecifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.)streamlySame as rate (Just $ Rate r r r 0)ÿ-Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.“streamlyÿSpecify the average latency, in nanoseconds, of a single threaded action in a concurrent composition. Streamly can measure the latencies, but that is possible only after at least one task has completed. This combinator can be used to provide a latency hint so that rate control using %ÿ can take that into account right from the beginning. When not specified then a default behavior is chosen which could be too slow or too fast, and would be restricted by any other control parameters configured. A value of 0 indicates default behavior, a negative value means there is no limit i.e. zero latency. This would normally be useful only in high latency and high throughput cases. ‘#$%&'()” (c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone +.0;<=>?KQVÈ*streamly A variant of   that allows you to fold a "@ container of streams using the specified stream sum operation.  foldWith async $ map return [1..3]+streamly A variant of •9 that allows you to map a monadic streaming action on a "F container and then fold it using the specified stream sum operation.  foldMapWith async return [1..3],streamlyLike +d but with the last two arguments reversed i.e. the monadic streaming function is the last argument.*+, (c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone +.0;<=>?Kv -streamly4A parallely composing IO stream of elements of type a. See . documentation for more details..streamly=Async composition with simultaneous traversal of all streams.The Semigroup instance of . concurrently merges” two streams, running both strictly concurrently and yielding elements from both streams as they arrive. When multiple streams are combined using .‹ each one is evaluated in its own thread and the results produced are presented in the combined stream on a first come first serve basis.AsyncT and WAsyncT are concurrent lookahead streamsÿî each with a specific type of consumption pattern (depth first or breadth first). Since they are lookahead, they may introduce certain default latency in starting more concurrent tasks for efficiency reasons or may put a default limitation on the resource consumption (e.g. number of concurrent threads for lookahead). If we look at the implementation detail, they both can share a pool of worker threads to evaluate the streams in the desired pattern and at the desired rate. However, .9 uses a separate runtime thread to evaluate each stream.WAsyncT is similar to .z, as both of them evaluate the constituent streams fairly in a round robin fashion. However, the key difference is that WAsyncT! is lazy or pull driven whereas . is strict or push driven. .z immediately starts concurrent evaluation of both the streams (in separate threads) and later picks the results whereas WAsyncTë may wait for a certain latency threshold before initiating concurrent evaluation of the next stream. The concurrent scheduling of the next stream or the degree of concurrency is driven by the feedback from the consumer. In case of .@ each stream is evaluated in a separate thread and results are pushedd to a shared output buffer, the evaluation rate is controlled by blocking when the buffer is full.@Concurrent lookahead streams are generally more efficient than .  and can work pretty efficiently even for smaller tasks because they do not necessarily use a separate thread for each task. So they should be preferred over .d especially when efficiency is a concern and simultaneous strict evaluation is not a requirement. .ÿ] is useful for cases when the streams are required to be evaluated simultaneously irrespective of how the consumer consumes them e.g. when we want to race two tasks and want to start both strictly at the same time or if we have timers in the parallel tasks and our results depend on the timers being started at the same time. We can say that .A is almost the same (modulo some implementation differences) as WAsyncT\ when the latter is used with unlimited lookahead and zero latency in initiating lookahead. main = (toList . 4; $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print   [1,3,2,4] zWhen streams with more than one element are merged, it yields whichever stream yields first without any bias, unlike the Async style streams.ÿŠAny exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.Similarly, the Œ instance of . runs all& iterations of the loop concurrently. import Streamly import qualified Streamly.Prelude( as S import Control.Concurrent main =  runStream . 4± $ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)  ?ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3 ƒNote that parallel composition can only combine a finite number of streams as it needs to retain state for each unfinished stream.–streamlynXXX we can implement it more efficienty by directly implementing instead of combining streams using parallel./streamlyPolymorphic version of the  operation  of ." Merges two streams concurrently.0streamlyiParallel function application operator for streams; just like the regular function application operator —} except that it is concurrent. The following code prints a value every second even though each stage adds a 1 second delay. qrunStream $ S.mapM (\x -> threadDelay 1000000 >> print x) |$ S.repeatM (threadDelay 1000000 >> return 1)  Concurrent1streamlyyParallel reverse function application operator for streams; just like the regular reverse function application operator & except that it is concurrent. rrunStream $ S.repeatM (threadDelay 1000000 >> return 1) |& S.mapM (\x -> threadDelay 1000000 >> print x)  Concurrent2streamly2Parallel function application operator; applies a run or fold_ function to a stream such that the fold consumer and the stream producer run in parallel. A run or foldF function reduces the stream to a value in the underlying monad. The .I at the end of the operator is a mnemonic for termination of the stream. o S.foldlM' (\_ a -> threadDelay 1000000 >> print a) () |$. S.repeatM (threadDelay 1000000 >> return 1)  Concurrent3streamlylParallel reverse function application operator for applying a run or fold functions to a stream. Just like 2' except that the operands are reversed. p S.repeatM (threadDelay 1000000 >> return 1) |&. S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()  Concurrent4streamly(Fix the type of a polymorphic stream as .. -./˜0123400112031 (c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone +.0;<=>?K]ÕÊ5streamly@A round robin parallely composing IO stream of elements of type a. See 6 documentation for more details.6streamlyeWide async composition or async composition with breadth first traversal. The Semigroup instance of 6 concurrently  traverses‚ the composed streams using a depth first travesal or in a round robin fashion, yielding elements from both streams alternately. main = (toList . >; $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print   [1,3,2,4] ÿŠAny exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.Similarly, the Œ instance of 6 runs all@ iterations fairly concurrently using a round robin scheduling. import Streamly import qualified Streamly.Prelude( as S import Control.Concurrent main =  runStream . >± $ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)  ?ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3 Unlike 8L all iterations are guaranteed to run fairly concurrently, unconditionally.žNote that async composition with breadth first traversal can only combine a finite number of streams as it needs to retain state for each unfinished stream.7streamlyOA demand driven left biased parallely composing IO stream of elements of type a. See 8 documentation for more details.8streamly\Deep async composition or async composition with depth first traversal. In a left to right ÿ composition it tries to yield elements from the left stream as long as it can, but it can run the right stream in parallel if it needs to, based on demand. The right stream can be run if the left stream blocks on IO or cannot produce elements fast enough for the consumer. main = (toList . <; $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print   [1,2,3,4] ÿŠAny exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.!Similarly, the monad instance of 8 may´ run each iteration concurrently based on demand. More concurrent iterations are started only if the previous iterations are not able to produce enough output for the consumer. import Streamly import qualified Streamly.Prelude( as S import Control.Concurrent main =  runStream . <± $ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)  ?ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3 ?All iterations may run in the same thread if they do not block.¥Note that async composition with depth first traversal can be used to combine infinite number of streams as it explores only a bounded number of streams at a time.9streamlyÿEMake a stream asynchronous, triggers the computation and returns a stream in the underlying monad representing the output generated by the original computation. The returned action is exhaustible and must be drained once. If not drained fully we may have a thread blocked forever and once exhausted it will always return empty.™streamly;Create a new SVar and enqueue one stream computation on it.šstreamly/Join two computations on the currently running ©ÿ queue for concurrent execution. When we are using parallel composition, an SVar is passed around as a state variable. We try to schedule a new parallel computation on the SVar passed to us. The first time, when no SVar exists, a new SVar is created. Subsequently, ›n may get called when a computation already scheduled on the SVar is further evaluated. For example, when (a parallel b) is evaluated it calls a › to put a and b! on the current scheduler queue.The žÿ\ required by the current composition context is passed as one of the parameters. If the scheduling and composition style of the new computation being scheduled is different than the style of the current SVar, then we create a new SVar and schedule it on that. The newly created SVar joins as one of the computations on the current SVar queue.+Cases when we need to switch to a new SVar:(x parallel y) parallel (t parallel1 u) -- all of them get scheduled on the same SVar(x parallel y) parallel (t : u) -- t and uN get scheduled on a new child SVar because of the scheduling policy change.if we  a stream of type : to a stream of type Parallel1, we create a new SVar at the transitioning bind.¤When the stream is switching from disjunctive composition to conjunctive composition and vice-versa we create a new SVar to isolate the scheduling of the two.:streamlyPolymorphic version of the  operation  of 8g. Merges two streams possibly concurrently, preferring the elements from the left one when available.;streamlySame as :.œstreamlykXXX we can implement it more efficienty by directly implementing instead of combining streams using async.<streamly(Fix the type of a polymorphic stream as 8.streamlylXXX we can implement it more efficienty by directly implementing instead of combining streams using wAsync.=streamlyPolymorphic version of the  operation  of 6F. Merges two streams concurrently choosing elements from both fairly.>streamly(Fix the type of a polymorphic stream as 6. 56789ž:;<=>(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone +.0;<=>?Kò[?streamly'A serial IO stream of elements of type a" with concurrent lookahead. See @ documentation for more details.@streamlyfDeep ahead composition or ahead composition with depth first traversal. The semigroup composition of @4 appends streams in a depth first manner just like SerialTM except that it can produce elements concurrently ahead of time. It is like AsyncT except that AsyncT, produces the output as it arrives whereas @* orders the output in the traversal order. main = (toList . A; $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print   [1,2,3,4] VAny exceptions generated by a constituent stream are propagated to the output stream.!Similarly, the monad instance of @c may run each iteration concurrently ahead of time but presents the results in the same order as SerialT. import Streamly import qualified Streamly.Prelude( as S import Control.Concurrent main =  runStream . A± $ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)  ?ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3 ?All iterations may run in the same thread if they do not block.¥Note that ahead composition with depth first traversal can be used to combine infinite number of streams as it explores only a bounded number of streams at a time.ŸstreamlykXXX we can implement it more efficienty by directly implementing instead of combining streams using ahead.Astreamly(Fix the type of a polymorphic stream as @.BstreamlyPolymorphic version of the  operation  of @A. Merges two streams sequentially but with concurrent lookahead.?@AB(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone +.0;<=>?K . Cstreamly>An IO stream whose applicative instance zips streams wAsyncly.DstreamlyLike GP but zips in parallel, it generates all the elements to be zipped concurrently. main = (toList . L‘ $ (,,) <$> s1 <*> s2 <*> s3) >>= print where s1 = fromFoldable [1, 2] s2 = fromFoldable [3, 4] s3 = fromFoldable [5, 6]  [(1,3,5),(2,4,6)] The 6 instance of this type works the same way as that of SerialT.Estreamly>An IO stream whose applicative instance zips streams serially.FstreamlyGstreamlyThe applicative instance of G} zips a number of streams serially i.e. it produces one element from each stream serially and then zips all those elements. main = (toList . H‘ $ (,,) <$> s1 <*> s2 <*> s3) >>= print where s1 = fromFoldable [1, 2] s2 = fromFoldable [3, 4] s3 = fromFoldable [5, 6]  [(1,3,5),(2,4,6)] The 6 instance of this type works the same way as that of SerialT.Hstreamly(Fix the type of a polymorphic stream as G.IstreamlySame as H.Jstreamly}Zip two streams concurrently (i.e. both the elements being zipped are generated concurrently) using a pure zipping function.Kstreamly{Zip two streams asyncly (i.e. both the elements being zipped are generated concurrently) using a monadic zipping function.Lstreamly(Fix the type of a polymorphic stream as D.MstreamlySame as L. ,-CDEFGHIJKLM(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone ;<=>?QV›rENstreamlyLDecompose a stream into its head and tail. If the stream is empty, returns ’&. If the stream is non-empty, returns  Just (a, ma), where a is the head of the stream and ma its tail.OstreamlyBuild a stream by unfolding a pure” step function starting from a seed. The step function returns the next element in the stream and the next seed value. When it is done it returns ’# and the stream ends. For example, elet f b = if b > 3 then Nothing else Just (b, b + 1) in toList $ unfoldr f 0  [0,1,2,3] PstreamlyBuild a stream by unfolding a monadic• step function starting from a seed. The step function returns the next element in the stream and the next seed value. When it is done it returns ’# and the stream ends. For example, „let f b = if b > 3 then return Nothing else print b >> return (Just (b, b + 1)) in runStream $ unfoldrM f 0   0 1 2 3 îWhen run concurrently, the next unfold step can run concurrently with the processing of the output of the previous step. Note that more than one step cannot run concurrently as the next step depends on the output of the previous step. –(asyncly $ S.unfoldrM (\n -> liftIO (threadDelay 1000000) >> return (Just (n, n + 1))) 0) & S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()  Concurrent Since: 0.1.0QstreamlyACreate a singleton stream from a pure value. In monadic streams,  or   can be used in place of Q', however, in Zip applicative streams  is equivalent to ¡.Rstreamly9Create a singleton stream from a monadic action. Same as  m `consM` nil but more efficient. *> toList $ yieldM getLine hello ["hello"] Sstreamly1Generate a stream by performing a monadic action n times. ”runStream $ serially $ S.replicateM 10 $ (threadDelay 1000000 >> print 1) runStream $ asyncly $ S.replicateM 10 $ (threadDelay 1000000 >> print 1)  ConcurrentTstreamlyCGenerate a stream by repeatedly executing a monadic action forever.  runStream $ serially $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1) runStream $ asyncly $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1) &Concurrent, infinite (do not use with  parallely)UstreamlyIIterate a pure function from a seed value, streaming the results forever.VstreamlyMIterate a monadic function from a seed value, streaming the results forever.ñWhen run concurrently, the next iteration can run concurrently with the processing of the previous iteration. Note that more than one iteration cannot run concurrently as the next iteration depends on the output of the previous iteration. ÝrunStream $ serially $ S.take 10 $ S.iterateM (\x -> threadDelay 1000000 >> print x >> return (x + 1)) 0 runStream $ asyncly $ S.take 10 $ S.iterateM (\x -> threadDelay 1000000 >> print x >> return (x + 1)) 0  ConcurrentWstreamlyXConstruct a stream from a list containing pure values. This can be more efficient than # for lists as it can fuse the list.Xstreamly\Construct a stream from a list containing monadic actions. This can be more efficient than Y8 especially for serial streams as it can fuse the list.YstreamlyConstruct a stream from a " containing monadic actions. ®runStream $ serially $ S.fromFoldableM $ replicate 10 (threadDelay 1000000 >> print 1) runStream $ asyncly $ S.fromFoldableM $ replicate 10 (threadDelay 1000000 >> print 1) Concurrent (do not use with  parallely on infinite containers)ZstreamlySame as  fromFoldable.[streamly6Read lines from an IO Handle into a stream of Strings.\streamlyYLazy right fold with a monadic step function. For example, to fold a stream into a list: `>> runIdentity $ foldrM (\x xs -> return (x : xs)) [] (serially $ fromFoldable [1,2,3]) [1,2,3] ]streamlyGLazy right associative fold. For example, to fold a stream into a list: H>> runIdentity $ foldr (:) [] (serially $ fromFoldable [1,2,3]) [1,2,3] ^streamlyWRight fold, for non-empty streams, using first element as the starting value. Returns ’ if the stream is empty._streamlyÜStrict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.`streamlyastreamlyStrict left associative fold.bstreamly]Strict left fold, for non-empty streams, using first element as the starting value. Returns ’ if the stream is empty.cstreamlyLike _#, but with a monadic step function.dstreamlyestreamlyLike a" but with a monadic step function.fstreamly&Determine whether the stream is empty.gstreamly0Extract the first element of the stream, if any.hstreamly8Extract all but the first element of the stream, if any.istreamly7Extract all but the last element of the stream, if any.jstreamly/Extract the last element of the stream, if any.kstreamly6Determine whether an element is present in the stream.lstreamly:Determine whether an element is not present in the stream.mstreamly#Determine the length of the stream.nstreamly?Determine whether all elements of a stream satisfy a predicate.ostreamlyFDetermine whether any of the elements of a stream satisfy a predicate.pstreamly8Determines if all elements of a boolean stream are True.qstreamlyCDetermines wheter at least one element of a boolean stream is True.rstreamly8Determine the sum of all elements of a stream of numberssstreamly<Determine the product of all elements of a stream of numberststreamly*Determine the minimum element in a stream.ustreamly*Determine the maximum element in a stream.vstreamlyILooks the given key up, treating the given stream as an association list.wstreamlyPReturns the first element of the stream satisfying the given predicate, if any.xstreamlyAFinds all the indices of elements satisfying the given predicate.ystreamlyLGives the index of the first stream element satisfying the given preficate.zstreamlyLFinds the index of all elements in the stream which are equal to the given.{streamlyJGives the first index of an element in the stream, which equals the given.|streamly[Apply a monadic action to each element of the stream and discard the output of the action.}streamly5Convert a stream into a list in the underlying monad.~streamly*Write a stream of Strings to an IO Handle.streamly3Strict left scan with an extraction function. Like ‚y, but applies a user supplied extraction function (the third argument) at each step. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.€streamlystreamlyLike ‚" but with a monadic step function.‚streamlyStrict left scan. Like aê, but returns the folded value at each step, generating a stream of all intermediate fold results. The first element of the stream is the user supplied initial value, and the last element of the stream is the same as the result of a.ƒstreamly2Include only those elements that pass a predicate.„streamlySame as ƒ but with a monadic predicate.…streamly Take first n/ elements from the stream and discard the rest.†streamly<End the stream as soon as the predicate fails on an element.‡streamlySame as † but with a monadic predicate.ˆstreamlyDiscard first n, elements from the stream and take the rest.‰streamlydDrop elements in the stream as long as the predicate succeeds and then take the rest of the stream.ŠstreamlySame as ‰ but with a monadic predicate.‹streamly_Replace each element of the stream with the result of a monadic action applied on the element. ÜrunStream $ S.replicateM 10 (return 1) & (serially . S.mapM (\x -> threadDelay 1000000 >> print x)) runStream $ S.replicateM 10 (return 1) & (asyncly . S.mapM (\x -> threadDelay 1000000 >> print x)) Concurrent (do not use with  parallely on infinite streams)ŒstreamlyOReduce a stream of monadic actions to a stream of the output of those actions. ÔrunStream $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1) & (serially . S.sequence) runStream $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1) & (asyncly . S.sequence) Concurrent (do not use with  parallely on infinite streams)streamlyMap a ¢0 returning function to a stream, filter out the ’9 elements, and return a stream of values extracted from £.ŽstreamlyLike  but maps a monadic function.Concurrent (do not use with  parallely on infinite streams)streamlyPReturns the elements of the stream in reverse order. The stream must be finite.streamly_Generate a stream by performing the monadic action inbetween all elements of the given stream.‘streamly:Zip two streams serially using a monadic zipping function.’streamly7Zip two streams serially using a pure zipping function.P JKNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~€‚ƒ„…†‡ˆ‰Š‹ŒŽ‘’P NOPSTUVQRWXY[]^\abe_cghjifk{lvwynopqmutrs|}~‹Œ‚ƒ„…†‡ˆ‰ŠxzŽ’‘JKZ€`d(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone§/“streamlyZRun a streaming composition, discard the results. By default it interprets the stream as O, to run other types of streams use the type adapting combinators for example  runStream . <.”streamlySame as “•streamlySame as  runStream.–streamlySame as runStream . wSerially.—streamlySame as runStream . parallely.˜streamlySame as runStream . asyncly.™streamlySame as runStream . zipping.šstreamlySame as runStream . zippingAsync.K  !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHILM“”•–—˜™šK@86.GD“01239!B:=/#$ %&'()*+, <A>4HL?75-EC ”•–˜—™šF IM";(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCSafe´4›streamlyXRun an action forever periodically at the given frequency specified in per second (Hz).œstreamlyÿàRun a computation on every clock tick, the clock runs at the specified frequency. It allows running a computation at high frequency efficiently by maintaining a local clock and adjusting it with the provided base clock at longer intervals. The first argument is a base clock returning some notion of time in microseconds. The second argument is the frequency in per second (Hz). The third argument is the action to run, the action is provided the local time as an argument.›œ›œ(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.comNoneµI¤ !"#$%&'()*+,-./0123456789:; < = > ? @ A B C D E F G H I J K L M N O PQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~€‚ƒ„…†‡ˆ‰Š‹ŒŽ‘’“”•–—˜™š›œžŸ ¡¢£¤¥¦§¨©ª«¬­®¯°±²³´µ¶·¸¹º»¼½¾¿ÀÁ»ÂÃÄÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖרØÙÚÛÜÝÞßàáââãäåæçèéêëì¯íîïðñòóôõö÷øøùùúûüýþÿ      !"#$%&'()*+,-./ 01onqsuw|ޤ£/23'45`abcdi6p¥xyz{}~€†‡ˆ‰Š7‘”•—˜š›0ž¢Ÿ89':;<"#cdji¥Ž'`ba&=6nowsxyz|}~€‡†7“—™˜šœ›–•0Ÿ £¤>?@ABCDE F GH I J K L M N OPQR&STU%streamly-0.5.1-FKJ3hi3XQpHD3jenl3eplZStreamlyStreamly.Prelude Streamly.Time Streamly.SVarStreamly.Streams.StreamKStreamly.Streams.StreamDStreamly.Streams.SerialStreamly.Streams.SVarStreamly.Streams.Prelude Data.FoldablefoldStreamly.Streams.ParallelStreamly.Streams.AsyncStreamly.Streams.AheadStreamly.Streams.ZipStreamly.TutorialbaseGHC.Base<> Semigroupstimessconcat MonadAsyncRaterateLowrateGoalrateHigh rateBuffer StreamingIsStreamconsM|:adaptnilcons.:oncerepeat fromFoldable InterleavedTWSerialWSerialTStreamTSerialSerialTseriallyserialmap wSerially interleavingwSerial<=> maxThreads maxBufferrateavgRateminRatemaxRate constRatefoldWith foldMapWith forEachWithParallel ParallelTparallel|$|&|$.|&. parallelyWAsyncWAsyncTAsyncAsyncTmkAsyncasync<|asynclywAsyncwAsynclyAheadAheadTaheadlyaheadZipAsync ZipAsyncM ZipSerial ZipStream ZipSerialM zipSeriallyzipping zipAsyncWith zipAsyncWithM zipAsyncly zippingAsyncunconsunfoldrunfoldrMyieldyieldM replicateMrepeatMiterateiterateMfromList fromListM fromFoldableMeach fromHandlefoldrMfoldrfoldr1foldxfoldlfoldl'foldl1'foldxMfoldlMfoldlM'nullheadtailinitlastelemnotElemlengthallanyandorsumproductminimummaximumlookupfind findIndices findIndex elemIndices elemIndexmapM_toListtoHandlescanxscanscanlM'scanl'filterfilterMtake takeWhile takeWhileMdrop dropWhile dropWhileMmapMsequencemapMaybe mapMaybeMreverse intersperseMzipWithMzipWith runStream runStreaming runStreamTrunInterleavedT runParallelT runAsyncT runZipStream runZipAsyncperiodic withClock WorkerInfo SVarStyleAheadHeapEntry ChildEventsendallThreadsDone workerThreads pushWorkerParminThreadDelayrateRecoveryTimegetWorkerLatency toStreamVarSVarHeapDequeueResultClearingWaitingReadyState streamVar svarStyle outputQueueoutputDoorBell readOutputQ postProcessmaxWorkerLimitmaxBufferLimit remainingWork yieldRateInfoenqueue isWorkDone isQueueDone needDoorBellworkLoop workerCount accountThreadworkerStopMVar svarStatssvarRefLimit UnlimitedLimited SVarStats maxHeapSizetotalDispatches maxWorkers maxOutQSize maxWorkQSizeavgWorkerLatencyminWorkerLatencymaxWorkerLatency svarStopTime YieldRateInfosvarLatencyTargetsvarLatencyRangesvarRateBuffersvarGainedLostYieldssvarAllTimeLatencyworkerBootstrapLatencyworkerPollingIntervalworkerPendingLatencyworkerCollectedLatencyworkerMeasuredLatencyworkerYieldMaxworkerYieldCountworkerLatencyStartAsyncVar WAsyncVar ParallelVarAheadVarAheadEntryPureAheadEntryStream ChildYield ChildStop ThreadAbortNanoSecsdefStaterstState setYieldLimit getYieldLimit setMaxThreads getMaxThreads setMaxBuffer getMaxBuffer setStreamRate getStreamRatesetStreamLatency cleanupSVarcleanupSVarFromWorkeratomicModifyIORefCASdecrementYieldLimitdecrementYieldLimitPostincrementYieldLimitworkerUpdateLatencyupdateYieldCountworkerRateControl sendYieldsendStop enqueueLIFO enqueueFIFO enqueueAheadreEnqueueAheadqueueEmptyAhead dequeueAhead withIORefdequeueFromHeapdequeueFromHeapSeqrequeueOnHeapTop updateHeapSeq delThreadisBeyondMaxRatecollectLatencydispatchWorkerPacedreadOutputQBoundedreadOutputQPacedpostProcessBoundedpostProcessPacedgetYieldRateInfosendFirstWorker newAheadVarnewParallelVarStreampuretransformers-0.5.5.0Control.Monad.Trans.ClassliftmkStreamnilKyieldKconsKFoldable foldStreamtoStream fromStreamunStream consMSerial fromStreamK toStreamKbindWith withLocalStepYieldStop enumFromStepNMonadfmap fromStreamVartoSVarfromSVarNothing_serialLatency maxYieldsfoldMap consMParallel$ mkParallel newWAsyncVar forkSVarAsyncjoinStreamVarAsync consMAsync consMWAsyncmkAsync' consMAheadreturnGHC.ListMaybeJust