!*J       !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone,=>?@AEFHMSX_ZstreamlyA monad that can perform concurrent or parallel IO operations. Streams that can be composed concurrently require the underlying monad to be .streamly6Specifies the stream yield rate in yields per second (Hertz*). We keep accumulating yield credits at S. At any point of time we allow only as many yields as we have accumulated as per { since the start of time. If the consumer or the producer is slower or faster, the actual rate may fall behind or exceed . We try to recover the gap between the two by increasing or decreasing the pull rate from the producer. However, if the gap becomes more than  $ we try to recover only as much as  . puts a bound on how low the instantaneous rate can go when recovering the rate gap. In other words, it determines the maximum yield latency. Similarly,   puts a bound on how high the instantaneous rate can go when recovering the rate gap. In other words, it determines the minimum yield latency. We reduce the latency by increasing concurrency, therefore we can say that it puts an upper bound on concurrency.If the ; is 0 or negative the stream never yields a value. If the  / is 0 or negative we do not attempt to recover.streamlyThe lower rate limitstreamly"The target rate we want to achieve streamlyThe upper rate limit streamlyMaximum slack from the goalstreamlyVAn SVar or a Stream Var is a conduit to the output from multiple streams running concurrently and asynchronously. An SVar can be thought of as an asynchronous IO handle. We can write any number of streams to an SVar in a non-blocking manner and then read them back at any time at any pace. The SVar would run the streams asynchronously and accumulate results. An SVar may not really execute the stream completely and accumulate all the results. However, it ensures that the reader can read the results at whatever paces it wants to read. The SVar monitors and adapts to the consumer's pace.An SVar is a mini scheduler, it has an associated workLoop that holds the stream tasks to be picked and run by a pool of worker threads. It has an associated output queue where the output stream elements are placed by the worker threads. A outputDoorBell is used by the worker threads to intimate the consumer thread about availability of new results in the output queue. More workers are added to the SVar by  fromStreamVar" on demand if the output produced is not keeping pace with the consumer. On bounded SVars, workers block on the output queue to provide throttling of the producer when the consumer is not pulling fast enough. The number of workers may even get reduced depending on the consuming pace.{New work is enqueued either at the time of creation of the SVar or as a result of executing the parallel combinators i.e. <| and <|>< when the already enqueued computations get evaluated. See joinStreamVarAsync.streamlyhIdentify the type of the SVar. Two computations using the same style can be scheduled on the same SVar.streamly=Sorting out-of-turn outputs in a heap for Ahead style streamsstreamly7Events that a child thread may send to a parent thread.streamly0Adapt the stream state from one type to another.streamlyConvert a number of seconds to a string. The string will consist of four decimal places, followed by a short description of the time units.streamlyThis function is used by the producer threads to queue output for the consumer thread to consume. Returns whether the queue has more space.streamly}This is safe even if we are adding more threads concurrently because if a child thread is adding another thread then anyway  will not be empty.streamly<In contrast to pushWorker which always happens only from the consumer thread, a pushWorkerPar can happen concurrently from multiple threads on the producer side. So we need to use a thread safe modification of workerThreads. Alternatively, we can use a CreateThread event to avoid using a CAS based modification.streamly]This is a magic number and it is overloaded, and used at several places to achieve batching: If we have to sleep to slowdown this is the minimum period that we accumulate before we sleep. Also, workers do not stop until this much sleep time is accumulated.hCollected latencies are computed and transferred to measured latency after a minimum of this period.streamly`Another magic number! When we have to start more workers to cover up a number of yields that we are lagging by then we cannot start one worker for each yield because that may be a very big number and if the latency of the workers is low these number of yields could be very high. We assume that we run each extra worker for at least this much time.streamlyGet the worker latency without resetting workerPendingLatency Returns (total yield count, base time, measured latency) CAUTION! keep it in sync with collectLatencystreamlyWrite a stream to an Q in a non-blocking manner. The stream can then be read back from the SVar using fromSVar.       !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMN(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone%,/=>?@ASXgOstreamly&A monadic continuation, it is a function that yields a value of type "a" and calls the argument (a -> m r) as a continuation with that value. We can also think of it as a callback with a handler (a -> m r). Category theorists call it a codensity type, a special type of right kan extension.Pstreamly7A terminal function that has no continuation to follow. streamlySame as  . streamlyDClass of types that can represent a stream of elements of some type a in some monad m. streamly_Constructs a stream by adding a monadic action at the head of an existing stream. For example: M> toList $ getLine `consM` getLine `consM` nil hello world ["hello","world"] Concurrent (do not use  parallely to construct infinite streams)streamlyOperator equivalent of  . We can read it as "parallel colon" to remember that | comes before :. C> toList $ getLine |: getLine |: nil hello world ["hello","world"]  let delay = threadDelay 1000000 >> print 1 runStream $ serially $ delay |: delay |: delay |: nil runStream $ parallely $ delay |: delay |: delay |: nil Concurrent (do not use  parallely to construct infinite streams)Qstreamly The type  Stream m a/ represents a monadic stream of values of type a% constructed using actions in monad ma. It uses stop, singleton and yield continuations equivalent to the following direct style type: <data Stream m a = Stop | Singleton a | Yield a (Stream m a) CTo facilitate parallel composition we maintain a local state in an W that is shared across and is used for synchronization of the streams being composed.The singleton case can be expressed in terms of stop and yield but we have it as a separate case to optimize composition operations for streams with single element. We build singleton streams in the implementation of R$ for Applicative and Monad, and in S for MonadTrans.mXXX remove the Stream type parameter from State as it is always constant. We can remove it from SVar as wellstreamlyAAdapt any specific stream type to any other specific stream type.TstreamlyBuild a stream from an Q, a stop continuation, a singleton stream continuation and a yield continuation.Ustreamly*Make an empty stream from a stop function.Vstreamly.Make a singleton stream from a yield function.Wstreamly/Add a yield function at the head of the stream.XstreamlyFold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation. The stream would share the current SVar passed via the State.YstreamlyFold a stream by providing a State, stop continuation, a singleton continuation and a yield continuation. The stream will not use the SVar passed via State.streamlyPolymorphic version of the  operation  of SerialT. Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.streamlyAn empty stream. > toList nil []  Z[QTUVWXY\]^_ 55(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone ,/=>?@ASX`streamlyDetach a stream from an SVarstreamlyuConstruct a stream by adding a pure value at the head of an existing stream. For serial streams this is the same as (return a) `consM` rM but more efficient. For concurrent streams this is not concurrent whereas   is concurrent. For example: 2> toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3] streamlyOperator equivalent of . &> toList $ 1 .: 2 .: 3 .: nil [1,2,3] streamlySame as yieldMstreamly .repeatM = fix . cons repeatM = cycle1 . yield 6Generate an infinite stream by repeating a pure value.streamly fromFoldable = a   Construct a stream from a b containing pure values:cstreamlyLazy right associative fold.dstreamly-Lazy right fold with a monadic step function.estreamlyStrict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.JNote that the accumulator is always evaluated including the initial value.fstreamlyStrict left associative fold.gstreamlyLike e#, but with a monadic step function.hstreamlyLike f" but with a monadic step function.istreamly FrunStream = foldl' (\_ _ -> ()) () runStream = mapM_ (\_ -> return ())jstreamly/Extract the last element of the stream, if any.kstreamly[Apply a monadic action to each element of the stream and discard the output of the action.lstreamly7Zip two streams serially using a pure zipping function.mstreamly:Zip two streams serially using a monadic zipping function.P Z[QTXY\]^_`nopqrstucdvefghiwxyz{|}~jklm55(c) 2018 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone %,=>?@AESXg/streamlypA stream consists of a step function that generates the next step given a current state, and the current state.streamlyA stream is a succession of s. A < produces a single value and the next state of the stream. 3 indicates there are no more values in the stream.streamlyMap a monadic function over a   (c) 2018 Harendra KumarNone %,=>?@AESXg$ streamly An empty .streamly#Can fuse but has O(n^2) complexity.streamlysCan be used to enumerate unbounded integrals. This does not check for overflow or underflow for bounded integrals.streamlyCreate a singleton  from a pure value.streamlyCreate a singleton  from a monadic action.streamly'Convert a list of monadic actions to a streamly#Convert a list of pure values to a streamly1Run a streaming composition, discard the results.streamly1Execute a monadic action for each element of the streamly%Compare two streams lexicographicallyj (c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone>streamlyPull a stream from an SVar.streamlyWrite a stream to an Q in a non-blocking manner. The stream can then be read back from the SVar using . (c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNoneΦstreamly  fromList = a   LConstruct a stream from a list of pure values. This is more efficient than  for serial streams. streamly5Convert a stream into a list in the underlying monad. streamlyStrict left associative fold. streamly Compare two streams for equality streamlyCompare two streamsstreamly A variant of   that allows you to fold a b@ container of streams using the specified stream sum operation.  foldWith async $ map return [1..3]streamly A variant of  9 that allows you to map a monadic streaming action on a bF container and then fold it using the specified stream sum operation.  foldMapWith async return [1..3]streamlyLike d but with the last two arguments reversed i.e. the monadic streaming function is the last argument.     (c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone,/=>?@AHMV streamlystreamly5An interleaving serial IO stream of elements of type a. See ! documentation for more details.streamlySWide serial composition or serial composition with a breadth first traversal. The  instance of  traverses the two streams in a breadth first manner. In other words, it interleaves two streams, yielding one element from each stream alternately. !import Streamly import qualified Streamly.Prelude as S main = ( . #; $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print   [1,3,2,4] Similarly, the o instance interleaves the iterations of the inner and the outer loop, nesting loops in a breadth first manner. main =  runStream . #^ $ do x <- return 1 <> return 2 y <- return 3 <> return 4 S.yieldM $ print (x, y)  (1,3) (2,3) (1,4) (2,4) Note that a serial composition with breadth first traversal can only combine a finite number of streams as it needs to retain state for each unfinished stream.streamlystreamly'A serial IO stream of elements of type a. See  ! documentation for more details. streamlyODeep serial composition or serial composition with depth first traversal. The  instance of   appends two streams serially in a depth first manner, yielding all elements from the first stream, and then all elements from the second stream. !import Streamly import qualified Streamly.Prelude as S main = ( . !; $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print   [1,2,3,4] The  instance runs the monadic continuation+ for each element of the stream, serially. main =  runStream . !; $ do x <- return 1 <> return 2 S.yieldM $ print x  1 2  0 nests streams serially in a depth first manner. main =  runStream . !^ $ do x <- return 1 <> return 2 y <- return 3 <> return 4 S.yieldM $ print (x, y)  (1,3) (1,4) (2,3) (2,4) This behavior of   is exactly like a list transformer. We call the monadic code being run for each element of the stream a monadic continuation. In imperative paradigm we can think of this composition as nested foro loops and the monadic continuation is the body of the loop. The loop iterates for all elements of the stream.The !: combinator can be omitted as the default stream type is  . Note that serial composition with depth first traversal can be used to combine an infinite number of streams as it explores only one stream at a time.!streamly(Fix the type of a polymorphic stream as  ."streamly  map = fmap Same as . 5> S.toList $ S.map (+1) $ S.fromList [1,2,3] [2,3,4] #streamly(Fix the type of a polymorphic stream as .$streamlySame as #.%streamlyPolymorphic version of the  operation  of N. Interleaves two streams, yielding one element from each stream alternately.&streamlySame as %. !"#$%&&5(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone ,/=>?@AMW 'streamly4A parallely composing IO stream of elements of type a. See ( documentation for more details.(streamly=Async composition with simultaneous traversal of all streams.The Semigroup instance of ( concurrently merges two streams, running both strictly concurrently and yielding elements from both streams as they arrive. When multiple streams are combined using ( each one is evaluated in its own thread and the results produced are presented in the combined stream on a first come first serve basis.AsyncT and WAsyncT are concurrent lookahead streams each with a specific type of consumption pattern (depth first or breadth first). Since they are lookahead, they may introduce certain default latency in starting more concurrent tasks for efficiency reasons or may put a default limitation on the resource consumption (e.g. number of concurrent threads for lookahead). If we look at the implementation detail, they both can share a pool of worker threads to evaluate the streams in the desired pattern and at the desired rate. However, (9 uses a separate runtime thread to evaluate each stream.WAsyncT is similar to (z, as both of them evaluate the constituent streams fairly in a round robin fashion. However, the key difference is that WAsyncT! is lazy or pull driven whereas ( is strict or push driven. (z immediately starts concurrent evaluation of both the streams (in separate threads) and later picks the results whereas WAsyncT may wait for a certain latency threshold before initiating concurrent evaluation of the next stream. The concurrent scheduling of the next stream or the degree of concurrency is driven by the feedback from the consumer. In case of (@ each stream is evaluated in a separate thread and results are pushedd to a shared output buffer, the evaluation rate is controlled by blocking when the buffer is full.@Concurrent lookahead streams are generally more efficient than ( and can work pretty efficiently even for smaller tasks because they do not necessarily use a separate thread for each task. So they should be preferred over (d especially when efficiency is a concern and simultaneous strict evaluation is not a requirement. (] is useful for cases when the streams are required to be evaluated simultaneously irrespective of how the consumer consumes them e.g. when we want to race two tasks and want to start both strictly at the same time or if we have timers in the parallel tasks and our results depend on the timers being started at the same time. We can say that (A is almost the same (modulo some implementation differences) as WAsyncT\ when the latter is used with unlimited lookahead and zero latency in initiating lookahead. main = (toList . .; $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print   [1,3,2,4] zWhen streams with more than one element are merged, it yields whichever stream yields first without any bias, unlike the Async style streams.Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.Similarly, the  instance of ( runs all& iterations of the loop concurrently. import Streamly import qualified Streamly.Prelude( as S import Control.Concurrent main =  runStream . . $ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)  ?ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3 Note that parallel composition can only combine a finite number of streams as it needs to retain state for each unfinished stream.streamlynXXX we can implement it more efficienty by directly implementing instead of combining streams using parallel.)streamlyPolymorphic version of the  operation  of (" Merges two streams concurrently.*streamlyiParallel function application operator for streams; just like the regular function application operator } except that it is concurrent. The following code prints a value every second even though each stage adds a 1 second delay. qrunStream $ S.mapM (\x -> threadDelay 1000000 >> print x) |$ S.repeatM (threadDelay 1000000 >> return 1)  Concurrent+streamlyyParallel reverse function application operator for streams; just like the regular reverse function application operator & except that it is concurrent. rrunStream $ S.repeatM (threadDelay 1000000 >> return 1) |& S.mapM (\x -> threadDelay 1000000 >> print x)  Concurrent,streamly2Parallel function application operator; applies a run or fold_ function to a stream such that the fold consumer and the stream producer run in parallel. A run or foldF function reduces the stream to a value in the underlying monad. The .I at the end of the operator is a mnemonic for termination of the stream. o S.foldlM' (\_ a -> threadDelay 1000000 >> print a) () |$. S.repeatM (threadDelay 1000000 >> return 1)  Concurrent-streamlylParallel reverse function application operator for applying a run or fold functions to a stream. Just like ,' except that the operands are reversed. p S.repeatM (threadDelay 1000000 >> return 1) |&. S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()  Concurrent.streamly(Fix the type of a polymorphic stream as (. '()*+,-.*0+1,0-1(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone /streamlySpecify the maximum number of threads that can be spawned concurrently for any concurrent combinator in a stream. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500.When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.0streamly;Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.CAUTION! using an unbounded 0: value (i.e. a negative value) coupled with an unbounded / value is a recipe for disaster in presence of infinite streams, or very large streams. Especially, it must not be used when R is used in  ZipAsyncM streams as R in applicative zip streams generates an infinite stream causing unbounded concurrent generation with no limit on the buffer or threads.1streamly&Specify the pull rate of a stream. A  value resets the rate to default which is unlimited. When the rate is specified, concurrent production may be ramped up or down automatically to achieve the specified yield rate. The specific behavior for different styles of $ specifications is documented under N. The effective maximum production rate achieved by a stream is governed by:The / limitThe 0 limit5The maximum rate that the stream producer can achieve5The maximum rate that the stream consumer can achieve2streamlySame as )rate (Just $ Rate (r/2) r (2*r) maxBound)YSpecifies the average production rate of a stream in number of yields per second (i.e. Hertz). Concurrent production is ramped up or down automatically to achieve the specified average yield rate. The rate can go down to half of the specified rate on the lower side and double of the specified rate on the higher side.3streamlySame as %rate (Just $ Rate r r (2*r) maxBound)Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.4streamlySame as %rate (Just $ Rate (r/2) r r maxBound)sSpecifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.5streamlySame as rate (Just $ Rate r r r 0)-Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.streamlySpecify the average latency, in nanoseconds, of a single threaded action in a concurrent composition. Streamly can measure the latencies, but that is possible only after at least one task has completed. This combinator can be used to provide a latency hint so that rate control using 1 can take that into account right from the beginning. When not specified then a default behavior is chosen which could be too slow or too fast, and would be restricted by any other control parameters configured. A value of 0 indicates default behavior, a negative value means there is no limit i.e. zero latency. This would normally be useful only in high latency and high throughput cases.6streamly:Print debug information about an SVar when the stream ends /0123456(c) 2018 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone66(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone ,/=>?@AMX_+7streamly@A round robin parallely composing IO stream of elements of type a. See 8 documentation for more details.8streamlyeWide async composition or async composition with breadth first traversal. The Semigroup instance of 8 concurrently  traverses the composed streams using a depth first travesal or in a round robin fashion, yielding elements from both streams alternately. main = (toList . @; $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print   [1,3,2,4] Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.Similarly, the  instance of 8 runs all@ iterations fairly concurrently using a round robin scheduling. import Streamly import qualified Streamly.Prelude( as S import Control.Concurrent main =  runStream . @ $ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)  ?ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3 Unlike :L all iterations are guaranteed to run fairly concurrently, unconditionally.Note that async composition with breadth first traversal can only combine a finite number of streams as it needs to retain state for each unfinished stream.9streamlyOA demand driven left biased parallely composing IO stream of elements of type a. See : documentation for more details.:streamly\Deep async composition or async composition with depth first traversal. In a left to right  composition it tries to yield elements from the left stream as long as it can, but it can run the right stream in parallel if it needs to, based on demand. The right stream can be run if the left stream blocks on IO or cannot produce elements fast enough for the consumer. main = (toList . >; $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print   [1,2,3,4] Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.!Similarly, the monad instance of : may run each iteration concurrently based on demand. More concurrent iterations are started only if the previous iterations are not able to produce enough output for the consumer. import Streamly import qualified Streamly.Prelude( as S import Control.Concurrent main =  runStream . > $ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)  ?ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3 ?All iterations may run in the same thread if they do not block.Note that async composition with depth first traversal can be used to combine infinite number of streams as it explores only a bounded number of streams at a time.;streamlyEMake a stream asynchronous, triggers the computation and returns a stream in the underlying monad representing the output generated by the original computation. The returned action is exhaustible and must be drained once. If not drained fully we may have a thread blocked forever and once exhausted it will always return empty.streamly;Create a new SVar and enqueue one stream computation on it.streamly/Join two computations on the currently running  queue for concurrent execution. When we are using parallel composition, an SVar is passed around as a state variable. We try to schedule a new parallel computation on the SVar passed to us. The first time, when no SVar exists, a new SVar is created. Subsequently,  n may get called when a computation already scheduled on the SVar is further evaluated. For example, when (a parallel b) is evaluated it calls a   to put a and b! on the current scheduler queue.The \ required by the current composition context is passed as one of the parameters. If the scheduling and composition style of the new computation being scheduled is different than the style of the current SVar, then we create a new SVar and schedule it on that. The newly created SVar joins as one of the computations on the current SVar queue.+Cases when we need to switch to a new SVar:(x parallel y) parallel (t parallel1 u) -- all of them get scheduled on the same SVar(x parallel y) parallel (t < u) -- t and uN get scheduled on a new child SVar because of the scheduling policy change.if we  a stream of type < to a stream of type Parallel1, we create a new SVar at the transitioning bind.When the stream is switching from disjunctive composition to conjunctive composition and vice-versa we create a new SVar to isolate the scheduling of the two.<streamlyPolymorphic version of the  operation  of :g. Merges two streams possibly concurrently, preferring the elements from the left one when available.=streamlySame as <.!streamlykXXX we can implement it more efficienty by directly implementing instead of combining streams using async.>streamly(Fix the type of a polymorphic stream as :."streamlylXXX we can implement it more efficienty by directly implementing instead of combining streams using wAsync.?streamlyPolymorphic version of the  operation  of 8F. Merges two streams concurrently choosing elements from both fairly.@streamly(Fix the type of a polymorphic stream as 8. 789:;#<=>?@(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone ,/=>?@AMAstreamly'A serial IO stream of elements of type a" with concurrent lookahead. See B documentation for more details.BstreamlyfDeep ahead composition or ahead composition with depth first traversal. The semigroup composition of B4 appends streams in a depth first manner just like SerialTM except that it can produce elements concurrently ahead of time. It is like AsyncT except that AsyncT, produces the output as it arrives whereas B* orders the output in the traversal order. main = (toList . D; $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print   [1,2,3,4] VAny exceptions generated by a constituent stream are propagated to the output stream.!Similarly, the monad instance of Bc may run each iteration concurrently ahead of time but presents the results in the same order as SerialT. import Streamly import qualified Streamly.Prelude( as S import Control.Concurrent main =  runStream . D $ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)  ?ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3 ?All iterations may run in the same thread if they do not block.Note that ahead composition with depth first traversal can be used to combine infinite number of streams as it explores only a bounded number of streams at a time.CstreamlyPolymorphic version of the  operation  of BA. Merges two streams sequentially but with concurrent lookahead.$streamlykXXX we can implement it more efficienty by directly implementing instead of combining streams using ahead.Dstreamly(Fix the type of a polymorphic stream as B.ABCD(c) 2018 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone(EstreamlylTypes that can be enumerated as a stream. The operations in this type class are equivalent to those in the %[ type class, except that these generate a stream instead of a list. Use the functions in Streamly.Enumeration module to define new instances.FstreamlyenumerateFrom from/ generates a stream starting with the element from, enumerating up to & when the type is '8 or generating an infinite stream when the type is not '. => S.toList $ S.take 4 $ S.enumerateFrom (0 :: Int) [0,1,2,3] For (c types, enumeration is numerically stable. However, no overflow or underflow checks are performed. >> S.toList $ S.take 4 $ S.enumerateFrom 1.1 [1.1,2.1,3.1,4.1] Gstreamly3Generate a finite stream starting with the element from(, enumerating the type up to the value to. If to is smaller than from# then an empty stream is returned. /> S.toList $ S.enumerateFromTo 0 4 [0,1,2,3,4] For (3 types, the last element is equal to the specified to5 value after rounding to the nearest integral value. t> S.toList $ S.enumerateFromTo 1.1 4 [1.1,2.1,3.1,4.1] > S.toList $ S.enumerateFromTo 1.1 4.6 [1.1,2.1,3.1,4.1,5.1] HstreamlyenumerateFromThen from then, generates a stream whose first element is from, the second element is then3 and the successive elements are in increments of  then - fromD. Enumeration can occur downwards or upwards depending on whether then comes before or after from. For ' types the stream ends when &B is reached, for unbounded types it keeps enumerating infinitely. z> S.toList $ S.take 4 $ S.enumerateFromThen 0 2 [0,2,4,6] > S.toList $ S.take 4 $ S.enumerateFromThen 0 (-2) [0,-2,-4,-6] Istreamly enumerateFromThenTo from then to3 generates a finite stream whose first element is from, the second element is then3 and the successive elements are in increments of  then - from up to toC. Enumeration can occur downwards or upwards depending on whether then comes before or after from. o> S.toList $ S.enumerateFromThenTo 0 2 6 [0,2,4,6] > S.toList $ S.enumerateFromThenTo 0 (-2) (-6) [0,-2,-4,-6] )streamly#enumerateFromStepIntegral from step6 generates an infinite stream whose first element is from3 and the successive elements are in increments of step.sCAUTION: This function is not safe for finite integral types. It does not check for overflow, underflow or bounds. > S.toList $ S.take 4 $ S.enumerateFromStepIntegral 0 2 [0,2,4,6] > S.toList $ S.take 3 $ S.enumerateFromStepIntegral 0 (-2) [0,-2,-4] *streamly Enumerate an + type. enumerateFromIntegral from, generates a stream whose first element is from3 and the successive elements are in increments of 1+. The stream is bounded by the size of the + type. E> S.toList $ S.take 4 $ S.enumerateFromIntegral (0 :: Int) [0,1,2,3] ,streamly Enumerate an + type in steps. $enumerateFromThenIntegral from then+ generates a stream whose first element is from, the second element is then2 and the successive elements are in increments of  then - from,. The stream is bounded by the size of the + type. > S.toList $ S.take 4 $ S.enumerateFromThenIntegral (0 :: Int) 2 [0,2,4,6] > S.toList $ S.take 4 $ S.enumerateFromThenIntegral (0 :: Int) (-2) [0,-2,-4,-6] -streamly Enumerate an + type up to a given limit. enumerateFromToIntegral from to3 generates a finite stream whose first element is from. and successive elements are in increments of 1 up to to. 7> S.toList $ S.enumerateFromToIntegral 0 4 [0,1,2,3,4] .streamly Enumerate an +% type in steps up to a given limit. (enumerateFromThenToIntegral from then to3 generates a finite stream whose first element is from, the second element is then3 and the successive elements are in increments of  then - from up to to. > S.toList $ S.enumerateFromThenToIntegral 0 2 6 [0,2,4,6] > S.toList $ S.enumerateFromThenToIntegral 0 (-2) (-6) [0,-2,-4,-6] /streamly&Numerically stable enumeration from a ( number in steps of size 1. enumerateFromFractional from, generates a stream whose first element is from2 and the successive elements are in increments of 12. No overflow or underflow checks are performed.This is the equivalent to 0 for ( types. For example: H> S.toList $ S.take 4 $ S.enumerateFromFractional 1.1 [1.1,2.1,3.1,4.1] 1streamly&Numerically stable enumeration from a ( number in steps. %enumerateFromThenFractional from then, generates a stream whose first element is from, the second element is then3 and the successive elements are in increments of  then - from2. No overflow or underflow checks are performed.This is the equivalent of 2 for ( types. For example: > S.toList $ S.take 4 $ S.enumerateFromThenFractional 1.1 2.1 [1.1,2.1,3.1,4.1] > S.toList $ S.take 4 $ S.enumerateFromThenFractional 1.1 (-2.1) [1.1,-2.1,-5.300000000000001,-8.500000000000002] 3streamly&Numerically stable enumeration from a ( number to a given limit. !enumerateFromToFractional from to3 generates a finite stream whose first element is from. and successive elements are in increments of 1 up to to.This is the equivalent of 4 for ( types. For example: > S.toList $ S.enumerateFromToFractional 1.1 4 [1.1,2.1,3.1,4.1] > S.toList $ S.enumerateFromToFractional 1.1 4.6 [1.1,2.1,3.1,4.1,5.1] 7Notice that the last element is equal to the specified to. value after rounding to the nearest integer.5streamly&Numerically stable enumeration from a (( number in steps up to a given limit. *enumerateFromThenToFractional from then to3 generates a finite stream whose first element is from, the second element is then3 and the successive elements are in increments of  then - from up to to.This is the equivalent of 6 for ( types. For example: > S.toList $ S.enumerateFromThenToFractional 0.1 2 6 [0.1,2.0,3.9,5.799999999999999] > S.toList $ S.enumerateFromThenToFractional 0.1 (-2) (-6) [0.1,-2.0,-4.1000000000000005,-6.200000000000001] 7streamlyG for % types not larger than 8.9streamlyI for % types not larger than 8.:streamlyH for % types not larger than 8.Note: We convert the % to 8 and enumerate the 8,. If a type is bounded but does not have a 'n instance then we can go on enumerating it beyond the legal values of the type, resulting in the failure of ; when converting back to %. Therefore we require a '/ instance for this function to be safely used.Jstreamly "enumerate = enumerateFrom minBound Enumerate a ' type from its < to &Kstreamly &enumerateTo = enumerateFromTo minBound Enumerate a ' type from its < to specified value.=streamly 4enumerateFromBounded = enumerateFromTo from maxBoundF for ' % types.EIFGH)*,-./13579:JK=(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone,/=>?@AHMV Lstreamly>An IO stream whose applicative instance zips streams wAsyncly.MstreamlyLike PP but zips in parallel, it generates all the elements to be zipped concurrently. main = (toList . U $ (,,) <$> s1 <*> s2 <*> s3) >>= print where s1 = fromFoldable [1, 2] s2 = fromFoldable [3, 4] s3 = fromFoldable [5, 6]  [(1,3,5),(2,4,6)] The 6 instance of this type works the same way as that of SerialT.Nstreamly>An IO stream whose applicative instance zips streams serially.OstreamlyPstreamlyThe applicative instance of P} zips a number of streams serially i.e. it produces one element from each stream serially and then zips all those elements. main = (toList . Q $ (,,) <$> s1 <*> s2 <*> s3) >>= print where s1 = fromFoldable [1, 2] s2 = fromFoldable [3, 4] s3 = fromFoldable [5, 6]  [(1,3,5),(2,4,6)] The 6 instance of this type works the same way as that of SerialT.Qstreamly(Fix the type of a polymorphic stream as P.RstreamlySame as Q.SstreamlyLike zipWithV but zips concurrently i.e. both the streams being zipped are generated concurrently.TstreamlyLike zipWithMV but zips concurrently i.e. both the streams being zipped are generated concurrently.Ustreamly(Fix the type of a polymorphic stream as M.VstreamlySame as U. lmLMNOPQRSTUV(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone>eWstreamlyLDecompose a stream into its head and tail. If the stream is empty, returns &. If the stream is non-empty, returns  Just (a, ma), where a is the head of the stream and ma its tail.Xstreamly 7unfoldr step s = case step s of Nothing -> 0 Just (a, b) -> a `cons` unfoldr step b Build a stream by unfolding a pure step function step starting from a seed sq. The step function returns the next element in the stream and the next seed value. When it is done it returns # and the stream ends. For example, elet f b = if b > 3 then Nothing else Just (b, b + 1) in toList $ unfoldr f 0  [0,1,2,3] YstreamlyBuild a stream by unfolding a monadic step function starting from a seed. The step function returns the next element in the stream and the next seed value. When it is done it returns # and the stream ends. For example, let f b = if b > 3 then return Nothing else print b >> return (Just (b, b + 1)) in runStream $ unfoldrM f 0   0 1 2 3 When run concurrently, the next unfold step can run concurrently with the processing of the output of the previous step. Note that more than one step cannot run concurrently as the next step depends on the output of the previous step. (asyncly $ S.unfoldrM (\n -> liftIO (threadDelay 1000000) >> return (Just (n, n + 1))) 0) & S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()  Concurrent Since: 0.1.0Zstreamly yield a = a `cons` nil ,Create a singleton stream from a pure value.?The following holds in monadic streams, but not in Zip streams: #yield = pure yield = yieldM . pure In Zip applicative streams Z is not the same as R because in that case R is equivalent to > instead. Z and R( are equally efficient, in other cases ZG may be slightly more efficient than the other equivalent definitions.[streamly yieldM m = m `consM` nil 0Create a singleton stream from a monadic action. *> toList $ yieldM getLine hello ["hello"] \streamly 6fromIndices f = let g i = f i `cons` g (i + 1) in g 0 GGenerate an infinite stream, whose values are the output of a function f9 applied on the corresponding index. Index starts at 0. 5> S.toList $ S.take 5 $ S.fromIndices id [0,1,2,3,4] ]streamly 8fromIndicesM f = let g i = f i `consM` g (i + 1) in g 0 PGenerate an infinite stream, whose values are the output of a monadic function f7 applied on the corresponding index. Index starts at 0.^streamly replicateM = take n . repeatM 1Generate a stream by performing a monadic action n times. Same as: runStream $ serially $ S.replicateM 10 $ (threadDelay 1000000 >> print 1) runStream $ asyncly $ S.replicateM 10 $ (threadDelay 1000000 >> print 1)  Concurrent_streamly replicate = take n . repeat Generate a stream of length n by repeating a value n times.`streamly 0repeatM = fix . consM repeatM = cycle1 . yieldM CGenerate a stream by repeatedly executing a monadic action forever. runStream $ serially $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1) runStream $ asyncly $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1) &Concurrent, infinite (do not use with  parallely)astreamly #iterate f x = x `cons` iterate f x !Generate an infinite stream with xT as the first element and each successive element derived by applying the function f on the previous element. 5> S.toList $ S.take 5 $ S.iterate (+1) 1 [1,2,3,4,5] bstreamly &iterateM f m = m `consM` iterateM f m LGenerate an infinite stream with the first element generated by the action mG and each successive element derived by applying the monadic function f on the previous element.When run concurrently, the next iteration can run concurrently with the processing of the previous iteration. Note that more than one iteration cannot run concurrently as the next iteration depends on the output of the previous iteration. runStream $ serially $ S.take 10 $ S.iterateM (\x -> threadDelay 1000000 >> print x >> return (x + 1)) 0 runStream $ asyncly $ S.take 10 $ S.iterateM (\x -> threadDelay 1000000 >> print x >> return (x + 1)) 0  Concurrentcstreamly  fromListM = a    PConstruct a stream from a list of monadic actions. This is more efficient than d for serial streams.dstreamly fromFoldableM = a    Construct a stream from a b containing monadic actions. runStream $ serially $ S.fromFoldableM $ replicateM 10 (threadDelay 1000000 >> print 1) runStream $ asyncly $ S.fromFoldableM $ replicateM 10 (threadDelay 1000000 >> print 1) Concurrent (do not use with  parallely on infinite containers)estreamlySame as  fromFoldable.fstreamly6Read lines from an IO Handle into a stream of Strings.gstreamlyYLazy right fold with a monadic step function. For example, to fold a stream into a list: E>> S.foldrM (\x xs -> return (x : xs)) [] $ fromList [1,2,3] [1,2,3] hstreamlyLazy right associative fold. For lists a foldr looks like: ;foldr f z [] = z foldr f z (x:xs) = x `f` foldr f z xs AThe recursive expression is the second argument of the fold step f>. Therefore, the evaluation of the recursive call depends on f. It can terminate recursion by not inspecting the second argument based on a condition. When expanded fully, it results in the following right associated expression: .foldr f z xs == x1 `f` (x2 `f` ...(xn `f` z)) When fY is a constructor, we can see that the first deconstruction of this expression would be x1> on the left and the recursive expression on the right. Therefore, we can deconstruct it to access the input elements in the first-in-first-out (FIFO) order and consume the reconstructed structure lazily. The recursive expression on the right gets evaluated incrementall as demanded by the consumer. For example: 2> S.foldr (:) [] $ S.fromList [1,2,3,4] [1,2,3,4] When fj is a function strict in its second argument, the right side of the expression gets evaluated as follows: tfoldr f z xs == x1 `f` tail1 tail1 == x2 `f` tail2 tail2 == x3 `f` tail3 ... tailn == xn `f` z In foldl' we have both the arguments of fW available at each step, therefore, each step can be reduced immediately. However, in foldr the second argument to f is a recursive call, therefore, it ends up building the whole expression in memory before it can be reduced, consuming the whole input. This makes foldr0 much less efficient for reduction compared to foldl'. For example: *> S.foldr (+) 0 $ S.fromList [1,2,3,4] 10 When the underlying monad m is strict (e.g. IO), then foldrZ ends up evaluating all of its input because of strict evaluation of the recursive call: U> S.foldr (\_ _ -> []) [] $ S.fromList (1:undefined) *** Exception: Prelude.undefinedIn a lazy monad, we can consume the input lazily, and terminate the fold by conditionally not inspecting the recursive expression. l> runIdentity $ S.foldr (\x rest -> if x == 3 then [] else x : rest) [] $ S.fromList (4:1:3:undefined) [4,1]'The arguments to the folding function ( a -> b -> b1) are in the head and tail order of the output, a is the head and bV is the tail. Remember, in a right fold the zero is on the right, it is the tail end.istreamly[Lazy right fold for non-empty streams, using first element as the starting value. Returns  if the stream is empty.jstreamlyStrict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.kstreamlylstreamlyStrict left associative fold. For lists a foldl looks like: =foldl f z [] = z foldl f z (x:xs) = foldl f (z `f` x) xs oThe recursive call at the head of the output expression is bound to be evaluated until recursion terminates, (deconstructing the whole input container8 and building the following left associated expression: 1foldl f z xs == (((z `f` x1) `f` x2) ...) `f` xn When f is a constructor, we can see that the first deconstruction of this expression would be the recursive expression on the left and xnl on the right. Therefore, it can access the input elements only in the reverse (LIFO) order. For example: :> S.foldl' (flip (:)) [] $ S.fromList [1,2,3,4] [4,3,2,1] The strict left fold foldl'& forces the reduction of its argument z `f` xT before using it, therefore it never builds the whole expression in memory. Thus, z `f` x1 would get reduced to z1 and then  z1 `f` x2 would get reduced to z2 and so on, incrementally reducing the expression as it recurses. However, it evaluates the accumulator only to WHNF, it may further help to use a strict data structure as accumulator. For example: +> S.foldl' (+) 0 $ S.fromList [1,2,3,4] 10  <0 + 1 (0 + 1) + 2 ((0 + 1) + 2) + 3 (((0 + 1) + 2) + 3) + 4 foldl] strictly deconstructs the whole input container irrespective of whether it needs it or not: y> S.foldl' (\acc x -> if x == 3 then acc else x : acc) [] $ S.fromList (4:1:3:undefined) *** Exception: Prelude.undefinedqHowever, evaluation of the items contained in the input container is lazy as demanded by the fold step function: ^> S.foldl' (\acc x -> if x == 3 then acc else x : acc) [] $ S.fromList [4,1,3,undefined] [4,1]CTo perform a left fold without consuming all the input one can use scanlE to stream the intermediate results of the fold and use them lazily.9In stateful or event-driven programming, we can consider zO as the initial state and the stream being folded as a stream of events, thus foldl'y processes all the events in the stream updating the state on each event and then ultimately returning the final state.'The arguments to the folding function ( b -> a -> b<) are in the head and tail order of the output expression, b is the head and a` is the tail. Remember, in a left fold the zero is on the left, at the head of the expression.mstreamly]Strict left fold, for non-empty streams, using first element as the starting value. Returns  if the stream is empty.nstreamlyLike j#, but with a monadic step function.ostreamlypstreamlyLike l" but with a monadic step function.qstreamlyNRun a stream, discarding the results. By default it interprets the stream as  O, to run other types of streams use the type adapting combinators for example  runStream . asyncly.rstreamly runN n = runStream . take nRun maximum up to n iterations of a stream.sstreamly $runWhile p = runStream . takeWhile p1Run a stream as long as the predicate holds true.tstreamly&Determine whether the stream is empty.ustreamly0Extract the first element of the stream, if any.  head = (!! 0)vstreamly8Extract all but the first element of the stream, if any.wstreamly7Extract all but the last element of the stream, if any.xstreamly/Extract the last element of the stream, if any. last xs = xs !! (length xs - 1)ystreamly6Determine whether an element is present in the stream.zstreamly:Determine whether an element is not present in the stream.{streamly#Determine the length of the stream.|streamly?Determine whether all elements of a stream satisfy a predicate.}streamlyFDetermine whether any of the elements of a stream satisfy a predicate.~streamly8Determines if all elements of a boolean stream are True.streamlyDDetermines whether at least one element of a boolean stream is True.streamlyBDetermine the sum of all elements of a stream of numbers. Returns 0a when the stream is empty. Note that this is not numerically stable for floating point numbers.streamlyFDetermine the product of all elements of a stream of numbers. Returns 1 when the stream is empty.streamly  minimum =  compare *Determine the minimum element in a stream.streamlyRDetermine the minimum element in a stream using the supplied comparison function.streamly  maximum =  compare *Determine the maximum element in a stream.streamlyRDetermine the maximum element in a stream using the supplied comparison function.streamly&Lookup the element at the given index.streamly!In a stream of (key-value) pairs (a, b), return the value b9 of the first pair where the key equals the given value a. "lookup = snd <$> find ((==) . fst)streamlyLike " but with a non-monadic predicate. find p = findM (return . p)streamly=Returns the first element that satisfies the given predicate.streamlyTFind all the indices where the element in the stream satisfies the given predicate.streamly;Returns the first index that satisfies the given predicate.streamly_Find all the indices where the value of the element in the stream is equal to the given value.streamlyCReturns the first index where a given value is found in the stream. elemIndex a = findIndex (== a)streamlyPMap each element to a stream and then flatten the results into a single stream. %concatMap f = concatMapM (return . f)streamlyiMap each element to a stream using a monadic function and then flatten the results into a single stream.streamlyReturns ?? if the first stream is the same as or a prefix of the second. Q> S.isPrefixOf (S.fromList "hello") (S.fromList "hello" :: SerialT IO Char) True streamlyReturns ? if all the elements of the first stream occur, in order, in the second stream. The elements do not have to occur consecutively. A stream is treated as a subsequence of itself. T> S.isSubsequenceOf (S.fromList "hlo") (S.fromList "hello" :: SerialT IO Char) True streamly.Drops the given prefix from a stream. Returns > if the stream does not start with the given prefix. Returns Just nil, when the prefix is the same as the stream.streamly[Apply a monadic action to each element of the stream and discard the output of the action.streamly toList = S.foldr (:) [] >Convert a stream into a list in the underlying monad. Same as:streamly #toHandle h = S.mapM_ $ hPutStrLn h *Write a stream of Strings to an IO Handle.streamly3Strict left scan with an extraction function. Like y, but applies a user supplied extraction function (the third argument) at each step. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.streamlystreamlyLike " but with a monadic fold function.streamlyStrict left scan. >> S.toList $ S.scanl' (+) 0 $ fromList [1,2,3,4] [0,1,3,6,10]  \> S.toList $ S.scanl' (flip (:)) [] $ S.fromList [1,2,3,4] [[],[1],[2,1],[3,2,1],[4,3,2,1]] The output of i is the initial value of the accumulator followed by all the intermediate steps and the final result of l.LBy streaming the accumulated state after each fold step, we can share the state across multiple stages of stream composition. Each stage can modify or extend the state, do some processing with it and emit it for the next stage, thus modularizing the stream processing. This can be useful in stateful or event-driven programming.qConsider the following example, computing the sum and the product of the elements in a stream in one go using a foldl': N> S.foldl' (\(s, p) x -> (s + x, p * x)) (0,1) $ S.fromList [1,2,3,4] (10,24) Using scanl'i we can compute the sum in the first stage and pass it down to the next stage for computing the product: > S.foldl' (\(_, p) (s, x) -> (s, p * x)) (0,1) $ S.scanl' (\(s, _) x -> (s + x, x)) (0,1) $ S.fromList [1,2,3,4] (10,24)  IMPORTANT:  evaluates the accumulator to WHNF. To avoid building lazy expressions inside the accumulator, it is recommended that a strict data structure is used for accumulator.@streamlyELike scanl' but does not stream the initial value of the accumulator. ,postscanl' f z xs = S.drop 1 $ scanl' f z xsAstreamly1Like postscanl' but with a monadic step function.BstreamlyCLike scanl' but does not stream the final value of the accumulator.Cstreamly1Like postscanl' but with a monadic step function.streamlyLike " but with a monadic step function.streamlyLike  but for a non-empty stream. The first element of the stream is used as the initial value of the accumulator. Does nothing if the stream is empty. :> S.toList $ S.scanl1 (+) $ fromList [1,2,3,4] [1,3,6,10] streamly2Include only those elements that pass a predicate.streamlySame as  but with a monadic predicate.streamly7Drop repeated elements that are adjacent to each other.streamly`Ensures that all the elements of the stream are identical and then returns that unique element.streamly Take first n/ elements from the stream and discard the rest.streamly<End the stream as soon as the predicate fails on an element.streamlySame as  but with a monadic predicate.streamlyDiscard first n, elements from the stream and take the rest.streamlydDrop elements in the stream as long as the predicate succeeds and then take the rest of the stream.streamlySame as  but with a monadic predicate.streamly mapM f = sequence . map f oApply a monadic function to each element of the stream and replace it with the output of the resulting action. > runStream $ S.mapM putStr $ S.fromList ["a", "b", "c"] abc runStream $ S.replicateM 10 (return 1) & (serially . S.mapM (\x -> threadDelay 1000000 >> print x)) runStream $ S.replicateM 10 (return 1) & (asyncly . S.mapM (\x -> threadDelay 1000000 >> print x)) Concurrent (do not use with  parallely on infinite streams)streamly sequence = mapM id WReplace the elements of a stream of monadic actions with the outputs of those actions. &> runStream $ S.sequence $ S.fromList [putStr "a", putStr "b", putStrLn "c"] abc runStream $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1) & (serially . S.sequence) runStream $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1) & (asyncly . S.sequence) Concurrent (do not use with  parallely on infinite streams)streamlyMap a D0 returning function to a stream, filter out the 9 elements, and return a stream of values extracted from E.streamlyLike  but maps a monadic function.Concurrent (do not use with  parallely on infinite streams)streamlyPReturns the elements of the stream in reverse order. The stream must be finite.streamlycGenerate a stream by performing a monadic action between consecutive elements of the given stream.Concurrent (do not use with  parallely on infinite streams) ]> S.toList $ S.intersperseM (putChar 'a' >> return ',') $ S.fromList "hello" aaaa"h,e,l,l,o" streamlyinsertBy cmp elem stream inserts elem before the first element in stream that is less than elem when compared using cmp. insertBy cmp x =  cmp (Z x) A> S.toList $ S.insertBy compare 2 $ S.fromList [1,3,5] [1,2,3,5] streamlyfDeletes the first occurence of the element in the stream that satisfies the given equality predicate. >> S.toList $ S.deleteBy (==) 3 $ S.fromList [1,3,3,5] [1,3,5] streamly %indexed = S.zipWith (,) (S.intFrom 0)-Pair each element in a stream with its index. 0> S.toList $ S.indexed $ S.fromList "hello" [(0,h),(1,e),(2,l),(3,l),(4,o)] streamly 4indexedR n = S.zipWith (,) (S.intFromThen n (n - 1))MPair each element in a stream with its index, starting from the given index n and counting down. 4> S.toList $ S.indexedR 10 $ S.fromList "hello" [(9,h),(8,e),(7,l),(6,l),(5,o)] streamlyLike & but using a monadic zipping function.streamly7Zip two streams serially using a pure zipping function. M> S.toList $ S.zipWith (+) (S.fromList [1,2,3]) (S.fromList [4,5,6]) [5,7,9] streamly<Compare two streams for equality using an equality function.streamlyBCompare two streams lexicographically using a comparison function.streamlyMerge two streams using a comparison function. The head elements of both the streams are compared and the smaller of the two elements is emitted, if both elements are equal then the element from the first stream is used first.pIf the streams are sorted in ascending order, the resulting stream would also remain sorted in ascending order. [> S.toList $ S.mergeBy compare (S.fromList [1,3,5]) (S.fromList [2,4,6,8]) [1,2,3,4,5,6,8] streamlyLike ( but with a monadic comparison function.Merge two streams randomly: > randomly _ _ = randomIO >>= x -> return $ if x then LT else GT > S.toList $ S.mergeByM randomly (S.fromList [1,1,1,1]) (S.fromList [2,2,2,2]) [2,1,2,2,2,1,1,1] )Merge two streams in a proportion of 2:1: 9proportionately m n = do ref <- newIORef $ cycle $ concat [replicate m LT, replicate n GT] return $ \_ _ -> do r <- readIORef ref writeIORef ref $ tail r return $ head r main = do f <- proportionately 2 1 xs <- S.toList $ S.mergeByM f (S.fromList [1,1,1,1,1,1]) (S.fromList [2,2,2]) print xs [1,1,2,1,1,2,1,1,2] streamlyLike [ but merges concurrently (i.e. both the elements being merged are generated concurrently).streamlyLike [ but merges concurrently (i.e. both the elements being merged are generated concurrently).t "EIFGHJKSTWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~t Z[`_^EIFGHJKXYab\]cdfWhiglmpjnqrsuxvwtyz|}~{"STeko!(c) 2018 Composewell TechnologiesBSD3harendra.kumar@gmail.com experimentalGHCNone %456HMVgFstreamly Just like G except that it has a zipping H instance and no  instance.GstreamlyList a is a replacement for [a].Istreamly3A list constructor and pattern that deconstructs a G) into its head and tail. Corresponds to : for Haskell lists.Jstreamly<An empty list constructor and pattern that matches an empty G). Corresponds to '[]' for Haskell lists.Kstreamly Convert a F to a regular GLstreamlyConvert a regular G to a F FMNGJIOPKLI5(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone8streamlySame as streamlySame as  runStream.streamlySame as runStream . wSerially.streamlySame as runStream . parallely.streamlySame as runStream . asyncly.streamlySame as runStream . zipping.streamlySame as runStream . zippingAsync.K  !#$%&'()*+,-./012345789:;<=>?@ABCDLMNOPQRUVqK B:8(PMq*+,-;%C<?)/0 12345 !#>D@.QUA97'NL O$RV&=!(c) 2018 Composewell TechnologiesBSD3harendra.kumar@gmail.com experimentalGHCNoneQ(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCSafestreamlyXRun an action forever periodically at the given frequency specified in per second (Hz).streamlyRun a computation on every clock tick, the clock runs at the specified frequency. It allows running a computation at high frequency efficiently by maintaining a local clock and adjusting it with the provided base clock at longer intervals. The first argument is a base clock returning some notion of time in microseconds. The second argument is the frequency in per second (Hz). The third argument is the action to run, the action is provided the local time as an argument.(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.comNone R !"#$%&'()*+,-./ 0 1 2 3456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~       !"#$%&'()*+,--../0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopq;tr  spqrswx0tuvwcxyzc{|; * + } s t | 0 p r q . w x ~         v u   t  u                   }~.%streamly-0.6.0-Fm7I6UzL4kZCflELs6As7VStreamlyStreamly.PreludeStreamly.Internal Streamly.Time Streamly.SVarStreamly.Streams.StreamK.TypeStreamly.Streams.StreamKStreamly.Streams.StreamD.TypeStreamly.Streams.StreamDStreamly.Streams.SVarStreamly.Streams.Prelude Data.FoldablefoldStreamly.Streams.SerialStreamly.Streams.ParallelStreamly.Streams.CombinatorsStreamly.Streams.AsyncStreamly.Streams.AheadStreamly.EnumerationStreamly.Streams.Zip Streamly.ListStreamly.StringStreamly.TutorialbaseGHC.Base<> Semigroupstimessconcat MonadAsyncRaterateLowrateGoalrateHigh rateBuffer StreamingIsStreamconsM|:adaptserialnilcons.:oncerepeat fromFoldablefromListfoldWith foldMapWith forEachWith InterleavedTWSerialWSerialTStreamTSerialSerialTseriallymap wSerially interleavingwSerial<=>Parallel ParallelTparallel|$|&|$.|&. parallely maxThreads maxBufferrateavgRateminRatemaxRate constRate inspectModeWAsyncWAsyncTAsyncAsyncTmkAsyncasync<|asynclywAsyncwAsynclyAheadAheadTaheadaheadly Enumerable enumerateFromenumerateFromToenumerateFromThenenumerateFromThenTo enumerate enumerateToZipAsync ZipAsyncM ZipSerial ZipStream ZipSerialM zipSeriallyzipping zipAsyncWith zipAsyncWithM zipAsyncly zippingAsyncunconsunfoldrunfoldrMyieldyieldM fromIndices fromIndicesM replicateM replicaterepeatMiterateiterateM fromListM fromFoldableMeach fromHandlefoldrMfoldrfoldr1foldxfoldlfoldl'foldl1'foldxMfoldlMfoldlM' runStreamrunNrunWhilenullheadtailinitlastelemnotElemlengthallanyandorsumproductminimum minimumBymaximum maximumBy!!lookupfindfindM findIndices findIndex elemIndices elemIndex concatMap concatMapM isPrefixOfisSubsequenceOf stripPrefixmapM_toListtoHandlescanxscanscanlM'scanl'scanl1M'scanl1'filterfilterMuniqthetake takeWhile takeWhileMdrop dropWhile dropWhileMmapMsequencemapMaybe mapMaybeMreverse intersperseMinsertBydeleteByindexedindexedRzipWithMzipWitheqBycmpBymergeBymergeByM mergeAsyncBy mergeAsyncByM runStreaming runStreamTrunInterleavedT runParallelT runAsyncT runZipStream runZipAsyncperiodic withClock WorkerInfo SVarStyleAheadHeapEntry ChildEvent adaptStatesecssendallThreadsDone workerThreads pushWorkerParminThreadDelayrateRecoveryTimegetWorkerLatency toStreamVarSVarHeapDequeueResultClearingWaitingReadyRunInIOrunInIOState streamVar svarStylesvarMrun outputQueueoutputDoorBell readOutputQ postProcessmaxWorkerLimitmaxBufferLimit remainingWork yieldRateInfoenqueue isWorkDone isQueueDone needDoorBellworkLoop workerCount accountThreadworkerStopMVar svarStatssvarRefsvarInspectMode svarCreator outputHeapaheadWorkQueueLimit UnlimitedLimited SVarStats maxHeapSizetotalDispatches maxWorkers maxOutQSize maxWorkQSizeavgWorkerLatencyminWorkerLatencymaxWorkerLatency svarStopTime YieldRateInfosvarLatencyTargetsvarLatencyRangesvarRateBuffersvarGainedLostYieldssvarAllTimeLatencyworkerBootstrapLatencyworkerPollingIntervalworkerPendingLatencyworkerCollectedLatencyworkerMeasuredLatencyworkerYieldMaxworkerYieldCountworkerLatencyStartAsyncVar WAsyncVar ParallelVarAheadVarAheadEntryNullAheadEntryPureAheadEntryStream ChildYield ChildStop ThreadAbortNanoSecsdefState setYieldLimit getYieldLimit setMaxThreads getMaxThreads setMaxBuffer getMaxBuffer setStreamRate getStreamRatesetStreamLatencysetInspectModegetInspectMode cleanupSVarcleanupSVarFromWorkerdumpSVaratomicModifyIORefCAScaptureMonadStatedecrementYieldLimitdecrementYieldLimitPostincrementYieldLimitworkerUpdateLatencyupdateYieldCountworkerRateControl sendYieldsendStop enqueueLIFO enqueueFIFO enqueueAheadreEnqueueAheadqueueEmptyAhead dequeueAhead withIORefdequeueFromHeapdequeueFromHeapSeq heapIsSanerequeueOnHeapTop updateHeapSeq delThreadisBeyondMaxRatecollectLatencydispatchWorkerPacedreadOutputQBoundedreadOutputQPacedpostProcessBoundedpostProcessPacedgetYieldRateInfo newSVarStatssendFirstWorker newAheadVarnewParallelVarYieldKStopKStreampuretransformers-0.5.5.0Control.Monad.Trans.ClassliftmkStream fromStopK fromYieldKconsKfoldStreamShared foldStreamtoStream fromStreamfoldStreamSVar consMStreamunShareFoldable fromStreamK toStreamKbindWith withLocalStepYieldStopUnStreamSkipenumerateFromStepIntegralenumerateFromToIntegralenumerateFromIntegralenumerateFromThenToIntegralenumerateFromThenIntegralenumerateFromStepNumnumFrom numFromThenenumerateFromToFractionalenumerateFromThenToFractional generateMgenerate toStreamD fromStreamD prescanlM' prescanl' postscanlM' postscanl' postscanlM postscanlscanlMscanlscanl1Mscanl1 fromStreamVartoSVarfromSVarfoldMap fromStreamS toStreamSGHC.ExtsMonadfmap consMParallel$ mkParallel GHC.MaybeNothing_serialLatency maxYields printState newWAsyncVar forkSVarAsyncjoinStreamVarAsync consMAsync consMWAsyncmkAsync' consMAheadGHC.EnumEnummaxBoundBoundedGHC.Real FractionalIntegralenumerateFromFractionalenumFromenumerateFromThenFractional enumFromThen enumFromToenumFromThenToenumerateFromToSmallghc-prim GHC.TypesIntenumerateFromThenToSmallenumerateFromThenSmallBoundedtoEnumminBoundenumerateFromBoundedGHC.ListTrue _postscanl' _postscanlM' _prescanl' _prescanlM'MaybeJustZipListList ApplicativeConsNil fromZipList toZipList toZipSerialtoSerialString