=64^      !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\](c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone +;<=>?QV]@A monad that can perform asynchronous/concurrent IO operations. Streams that can be composed concurrently require the underlying monad to be .^DThe type 'Stream m a' represents a monadic stream of values of type a% constructed using actions in monad mv. It uses a stop continuation and a yield continuation. You can consider it a rough equivalent of direct style type:5data Stream m a = Stop | Yield a (Maybe (Stream m a))OOur goal is to be able to represent finite as well infinite streams and being able to compose a large number of small streams efficiently. In addition we want to compose streams in parallel, to facilitate that we maintain a local state in an SVar that is shared across and is used for synchronization of the streams being composed.Using this type, there are two ways to indicate the end of a stream, one is by calling the stop continuation and the other one is by yielding the last value along with _ as the rest of the stream.Why do we have this redundancy? Why can't we use (a -> Stream m a -> m r) as the type of the yield continuation and always use the stop continuation to indicate the end of the stream? The reason is that when we compose a large number of short or singleton streams then using the stop continuation becomes expensive, just to know that there is no next element we have to call the continuation, introducing an indirection, it seems when using CPS GHC is not able to optimize this out as efficiently as it can be in direct style because of the function call involved. In direct style it will just be a constructor check and a memory access instead of a function call. So we could use:-data Stream m a = Stop | Yield a (Stream m a)In CPS style, when we use the ` argument of yield to indicate the end then just like direct style we can figure out that there is no next element without a function call.Then why not get rid of the stop continuation and use only yield to indicate the end of stream? The answer is, in that case to indicate the end of the stream we would have to yield at least one element so there is no way to represent an empty stream.WWhenever we make a singleton stream or in general when we build a stream strictly i.e. when we know all the elements of the stream in advance we can use the last yield to indicate th end of the stream, because we know in advance at the time of the last yield that the stream is ending. We build singleton streams in the implementation of a$ for Applicative and Monad, and in 3 for MonadTrans, in these places we use yield with _ to indicate the end of the stream. Note that, the only advantage of Maybe is when we have to build a large number of singleton or short streams. For larger streams anyway the overhead of a separate stop continuation is not significant. This could be significant when we breakdown a large stream into its elements, process them in some way and then recompose it from the pieces. Zipping streams is one such example. Zipping with streamly is the fastest among all streaming libraries.kHowever in a lazy computation we cannot know in advance that the stream is ending therefore we cannot use `{, we use the stop continuation in that case. For example when building a stream from a lazy container using a right fold.bVAn SVar or a Stream Var is a conduit to the output from multiple streams running concurrently and asynchronously. An SVar can be thought of as an asynchronous IO handle. We can write any number of streams to an SVar in a non-blocking manner and then read them back at any time at any pace. The SVar would run the streams asynchronously and accumulate results. An SVar may not really execute the stream completely and accumulate all the results. However, it ensures that the reader can read the results at whatever paces it wants to read. The SVar monitors and adapts to the consumer's pace.An SVar is a mini scheduler, it has an associated runqueue that holds the stream tasks to be picked and run by a pool of worker threads. It has an associated output queue where the output stream elements are placed by the worker threads. A doorBell is used by the worker threads to intimate the consumer thread about availability of new results in the output queue. More workers are added to the SVar by c" on demand if the output produced is not keeping pace with the consumer. On bounded SVars, workers block on the output queue to provide throttling of the producer when the consumer is not pulling fast enough. The number of workers may even get reduced depending on the consuming pace.{New work is enqueued either at the time of creation of the SVar or as a result of executing the parallel combinators i.e. <| and  < when the already enqueued computations get evaluated. See d.ehIdentify the type of the SVar. Two computations using the same style can be scheduled on the same SVar.f~For fairly interleaved parallel composition the sched policy is FIFO whereas for left biased parallel composition it is LIFO.gConjunction is used for monadic/product style composition. Disjunction is used for fold/sum style composition. We need to distinguish the two types of SVars so that the scheduling of the two is independent.h7Events that a child thread may send to a parent thread.iSame as <=>.cPull a stream from an SVar.jCreate a new empty SVar.k;Create a new SVar and enqueue one stream computation on it.l<Create a new SVar and enqueue two stream computations on it.mWrite a stream to an bQ in a non-blocking manner. The stream can then be read back from the SVar using fromSVar.d/Join two computations on the currently running b& queue for concurrent execution. The e required by the current composition context is passed as one of the parameters. If the style does not match with the style of the current b we create a new b| and schedule the computations on that. The newly created SVar joins as one of the computations on the current SVar queue.When we are using parallel composition, an SVar is passed around as a state variable. We try to schedule a new parallel computation on the SVar passed to us. The first time, when no SVar exists, a new SVar is created. Subsequently, d may get called when a computation already scheduled on the SVar is further evaluated. For example, when (a <|> b) is evaluated it calls a d to put a and b on the current scheduler queue. However, if the scheduling and composition style of the new computation being scheduled is different than the style of the current SVar, then we create a new SVar and schedule it on that. For example:E(x <|> y) <|> (t <|> u) -- all of them get scheduled on the same SVar(x <|> y) <|> (t <| u) -- t and uN get scheduled on a new child SVar because of the scheduling policy change.if we adapt a stream of type AsyncT to a stream of type  ParallelT1, we create a new SVar at the transitioning bind.When the stream is switching from disjunctive composition to conjunctive composition and vice-versa we create a new SVar to isolate the scheduling of the two.nSame as  x. Since this schedules all the composed streams fairly you cannot fold infinite number of streams using this operation.oSame as <|0. Since this schedules the left side computation first you can right fold an infinite container using this operator. However a left fold will not work well as it first unpeels the whole structure before scheduling a computation requiring an amount of memory proportional to the size of the structure.p concatenates two streams sequentially i.e. the first stream is exhausted completely before yielding any element from the second stream.q g represents an action that takes non-zero time to complete. Since all actions take non-zero time, an  composition ( R) is a monoidal composition executing all actions in parallel, it is similar to W except that it runs all the actions in parallel and interleaves their results fairly.^rsbetfuvgwxyz{icjklmdno^rsb|}~etfuvgwxh(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone 0;<=>?KQV?%Like Q but zips in parallel, it generates both the elements to be zipped concurrently. main = (toList . )` $ (,) <$> s1 <*> s2) >>= print where s1 = pure 1 <> pure 2 s2 = pure 3 <> pure 4  [(1,3),(2,4)] _This applicative operation can be seen as the zipping equivalent of parallel composition with  . zips serially i.e. it produces one element from each stream serially and then zips the two elements. Note, for convenience we have used the (I combinator in the following example instead of using a type annotation. main = (toList . (` $ (,) <$> s1 <*> s2) >>= print where s1 = pure 1 <> pure 2 s2 = pure 3 <> pure 4  [(1,3),(2,4)] WThis applicative operation can be seen as the zipping equivalent of interleaving with 0.Like  but runs all@ iterations fairly concurrently using a round robin scheduling. import Streamly# import Control.Concurrent main = - $ do n <- return 3 <> return 2 <> return 1 liftIO $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)  ?ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3 Unlike L all iterations are guaranteed to run fairly concurrently, unconditionally.Like  but may run each iteration concurrently using demand driven concurrency. More concurrent iterations are started only if the previous iterations are not able to produce enough output for the consumer. import Streamly# import Control.Concurrent main = , $ do n <- return 3 <> return 2 <> return 1 liftIO $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)  ?ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3 ?All iterations may run in the same thread if they do not block.Like  but different in nesting behavior. It fairly interleaves the iterations of the inner and the outer loop, nesting loops in a breadth first manner. main = +\ $ do x <- return 1 <> return 2 y <- return 3 <> return 4 liftIO $ print (x, y) (1,3) (2,3) (1,4) (2,4) The  instance of  runs the monadic continuation+ for each element of the stream, serially. main = *9 $ do x <- return 1 <> return 2 liftIO $ print x  1 2 0 nests streams serially in a depth first manner. main = *\ $ do x <- return 1 <> return 2 y <- return 3 <> return 4 liftIO $ print (x, y)  (1,3) (1,4) (2,3) (2,4) This behavior is exactly like a list transformer. We call the monadic code being run for each element of the stream a monadic continuation. In imperative paradigm we can think of this composition as nested forp loops and the monadic continuation is the body of the loop. The loop iterates for all elements of the stream.DClass of types that can represent a stream of elements of some type a in some monad m.&Represesnts an empty stream just like [] represents an empty list.YConstructs a stream by adding a pure value at the head of an existing stream, just like : constructs lists. For example: S> let stream = 1 `cons` 2 `cons` 3 `cons` nil > (toList . serially) stream [1,2,3] Operator equivalent of N so that you can construct a stream of pure values more succinctly like this: G> let stream = 1 .: 2 .: 3 .: nil > (toList . serially) stream [1,2,3]  constructs a stream just like : constructs a list.OAlso note that another equivalent way of building streams from pure values is: O> let stream = pure 1 <> pure 2 <> pure 3 > (toList . serially) stream [1,2,3] In the first method we construct a stream by adding one element at a time. In the second method we first construct singleton streams using a8 and then compose all those streams together using the [ style composition of streams. The former method is a bit more efficient than the latter.Build a stream from its church encoding. The function passed maps directly to the underlying representation of the stream type. The second parameter to the function is the "yield" function yielding a value and the remaining stream if any otherwise _9. The third parameter is to represent an "empty" stream.2Build a singleton stream from a callback function.Read an SVar to get a stream.Fold a stream using its church encoding. The second argument is the "step" function consuming an element and the remaining stream, if any. The third argument is for consuming an "empty" stream that yields nothing.1Run a streaming composition, discard the results.Write a stream to an bQ in a non-blocking manner. The stream can then be read back from the SVar using . EMake a stream asynchronous, triggers the computation and returns a stream in the underlying monad representing the output generated by the original computation. The returned action is exhaustible and must be drained once. If not drained fully we may have a thread blocked forever and once exhausted it will always return  .!7Zip two streams serially using a pure zipping function."xZip two streams asyncly (i.e. both the elements being zipped are generated concurrently) using a pure zipping function.#$Adapt one streaming type to another.$)Interpret an ambiguously typed stream as .%)Interpret an ambiguously typed stream as .&)Interpret an ambiguously typed stream as .')Interpret an ambiguously typed stream as .()Interpret an ambiguously typed stream as .))Interpret an ambiguously typed stream as .*Same as runStreaming . serially.+Same as runStreaming . interleaving.,Same as runStreaming . asyncly.-Same as runStreaming . parallely..Same as runStreaming . zipping./Same as runStreaming . zippingAsync.03Sequential interleaved composition, in contrast to } this operator fairly interleaves two streams instead of appending them; yielding one element from each stream alternately. main = (toList . $@ $ (return 1 <> return 2) <=> (return 3 <> return 4)) >>= print   [1,3,2,4] !This operator corresponds to the  style. Unlike , this operator cannot be used to fold infinite containers since that might accumulate too many partially drained streams. To be clear, it can combine infinite streams but not infinite number of streams.15Demand driven concurrent composition. In contrast to  > this operator concurrently "merges" streams in a left biased manner rather than fairly interleaving them. It keeps yielding from the stream on the left as long as it can. If the left stream blocks or cannot keep up with the pace of the consumer it can concurrently yield from the stream on the right in parallel. main = (toList . $? $ (return 1 <> return 2) <| (return 3 <> return 4)) >>= print   [1,2,3,4] Unlike  Z it can be used to fold infinite containers of streams. This operator corresponds to the $ type for product style composition.2 Like the Prelude foldj but allows you to specify a binary sum style stream composition operator to fold a container of streams. !foldWith (<>) $ map return [1..3]3Like  but allows you to specify a binary sum style composition operator to fold a container of streams. Maps a monadic streaming action on the container before folding it. foldMapWith (<>) return [1..3]4Like 3d but with the last two arguments reversed i.e. the monadic streaming function is the last argument.2betfuvgwxj !"#$%&'()*+,-./012345505(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone ;<=>?QV%''5<Build a Stream by unfolding pure steps starting from a seed.6?Build a Stream by unfolding monadic steps starting from a seed.7Same as  foldWith (<>) but more efficient.8HIterate a pure function from a seed value, streaming the results forever9KIterate a monadic function from a seed value, streaming the results forever:6Read lines from an IO Handle into a stream of Strings.; Right fold.<.Right fold with a monadic step function. See B for an example use.=mScan left. A strict left fold which accumulates the result of its reduction steps inside a stream, from left.>^Strict left fold. This is typed to work with the foldl package. To use it normally just pass  as the third argument.?rStrict left fold, with monadic step function. This is typed to work with the foldl package. To use directly pass  as the third argument.@LDecompose a stream into its head and tail. If the stream is empty, returns _<. If the stream is non-empty, returns 'Just (a, ma)', where a is the head of the stream and ma its tail.A*Write a stream of Strings to an IO Handle.B5Convert a stream into a list in the underlying monad.C Take first n/ elements from the stream and discard the rest.D2Include only those elements that pass a predicate.E<End the stream as soon as the predicate fails on an element.FDiscard first n, elements from the stream and take the rest.GdDrop elements in the stream as long as the predicate succeeds and then take the rest of the stream.H?Determine whether all elements of a stream satisfy a predicate.IFDetermine whether any of the elements of a stream satisfy a predicate.J8Determine the sum of all elements of a stream of numbersK<Determine the product of all elements of a stream of numbersL0Extract the first element of the stream, if any.M8Extract all but the first element of the stream, if any.N/Extract the last element of the stream, if any.O&Determine whether the stream is empty.P6Determine whether an element is present in the stream.Q:Determine whether an element is not present in the stream.R#Determine the length of the stream.SPReturns the elements of the stream in reverse order. The stream must be finite.T*Determine the minimum element in a stream.U*Determine the maximum element in a stream.V_Replace each element of the stream with the result of a monadic action applied on the element.W[Apply a monadic action to each element of the stream and discard the output of the action.XOReduce a stream of monadic actions to a stream of the output of those actions.Y*Generate a stream by performing an action n times.Z:Zip two streams serially using a monadic zipping function.[{Zip two streams asyncly (i.e. both the elements being zipped are generated concurrently) using a monadic zipping function.,!"56789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[,56789;<=>?@BHILMNORPQSUTJKDCEFGVWXY!Z"[:A(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone'0   #$%&'()*+,-./01234101 $%&'()#*+,-./234  (c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCSafe3\XRun an action forever periodically at the given frequency specified in per second (Hz).]Run a computation on every clock tick, the clock runs at the specified frequency. It allows running a computation at high frequency efficiently by maintaining a local clock and adjusting it with the provided base clock at longer intervals. The first argument is a base clock returning some notion of time in microseconds. The second argument is the frequency in per second (Hz). The third argument is the action to run, the action is provided the local time as an argument.\]\](c) 2017 Harendra KumarBSD3harendra.kumar@gmail.comNone4              !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijk l m nopqrstuvwxyz{|}~kro "#$%&' %streamly-0.1.2-9l33z8bhuSFIhT28JWsVBQStreamlyStreamly.Prelude Streamly.Time Streamly.CoreStreamly.StreamsStreamly.TutorialbaseData.Semigroup<>GHC.Basememptymappendmconcat SemigroupMonoidstimessconcatControl.Monad.IO.ClassliftIOMonadIOmanysome<|>empty Alternativemplusmzero MonadPlus+transformers-0.5.5.0-1bbDhu8ypp8LC8lJMFju65Control.Monad.Trans.Class MonadTranslift MonadAsyncZipAsync ZipStream ParallelTAsyncT InterleavedTStreamT Streamingnilcons.: runStreamingasynczipWith zipAsyncWithadaptserially interleavingasyncly parallelyzipping zippingAsync runStreamTrunInterleavedT runAsyncT runParallelT runZipStream runZipAsync<=><|foldWith foldMapWith forEachWithunfoldrunfoldrMeachiterateiterateM fromHandlefoldrfoldrMscanfoldlfoldlMunconstoHandletoListtakefilter takeWhiledrop dropWhileallanysumproductheadtaillastnullelemnotElemlengthreverseminimummaximummapMmapM_sequence replicateMzipWithM zipAsyncWithMperiodic withClockStreamNothingMaybepureSVar fromStreamVarjoinStreamVar2 SVarStyle SVarSchedSVarTag ChildEvent interleave newEmptySVar newStreamVar1 newStreamVar2 toStreamVarparAltparLeft$fSemigroupStream$fAlternativeStream runStreamLIFOFIFO Conjunction Disjunctionsconssrepeatsnil outputQueuedoorBellenqueuerunqueuerunningThreads queueEmpty svarStyle ChildYield ChildStopMonad streamBuild fromCallbackfromSVar streamFoldtoSVar Data.FoldablefoldMaptoStream fromStream getZipAsync getZipStream getParallelT getAsyncTgetInterleavedT getStreamTid