úΖµŒeo      !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmn(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone +;<=>?DQV]V–ŽA monad that can perform concurrent or parallel IO operations. Streams that can be composed concurrently require the underlying monad to be .oDThe type 'Stream m a' represents a monadic stream of values of type a% constructed using actions in monad ma. It uses stop, singleton and yield continuations equivalent to the following direct style type:;data Stream m a = Stop | Singleton a | Yield a (Stream m a)žTo facilitate parallel composition we maintain a local state in an SVar that is shared across and is used for synchronization of the streams being composed.ÜThe singleton case can be expressed in terms of stop and yield but we have it as a separate case to optimize composition operations for streams with single element. We build singleton streams in the implementation of p$ for Applicative and Monad, and in q for MonadTrans.rÿVAn SVar or a Stream Var is a conduit to the output from multiple streams running concurrently and asynchronously. An SVar can be thought of as an asynchronous IO handle. We can write any number of streams to an SVar in a non-blocking manner and then read them back at any time at any pace. The SVar would run the streams asynchronously and accumulate results. An SVar may not really execute the stream completely and accumulate all the results. However, it ensures that the reader can read the results at whatever paces it wants to read. The SVar monitors and adapts to the consumer's pace.ÿ™An SVar is a mini scheduler, it has an associated runqueue that holds the stream tasks to be picked and run by a pool of worker threads. It has an associated output queue where the output stream elements are placed by the worker threads. A doorBell is used by the worker threads to intimate the consumer thread about availability of new results in the output queue. More workers are added to the SVar by sÿ" on demand if the output produced is not keeping pace with the consumer. On bounded SVars, workers block on the output queue to provide throttling of the producer when the consumer is not pulling fast enough. The number of workers may even get reduced depending on the consuming pace.{New work is enqueued either at the time of creation of the SVar or as a result of executing the parallel combinators i.e. <| and <|>< when the already enqueued computations get evaluated. See t.uhIdentify the type of the SVar. Two computations using the same style can be scheduled on the same SVar.vÐConjunction is used for monadic/product style composition. Disjunction is used for fold/sum style composition. We need to distinguish the two types of SVars so that the scheduling of the two is independent.w7Events that a child thread may send to a parent thread.xSame as  once . returnySame as consM . returnzˆConcatenates two streams sequentially i.e. the first stream is exhausted completely before yielding any element from the second stream.{}This is safe even if we are adding more threads concurrently because if a child thread is adding another thread then anyway | will not be empty.}ÿ=In contrast to pushWorker which always happens only from the consumer thread, a pushWorkerPar can happen concurrently from multiple threads on the producer side. So we need to use a thread safe modification of runningThreads. Alternatively, we can use a CreateThread event to avoid using a CAS based modification.sPull a stream from an SVar.~Create a new empty SVar.;Create a new SVar and enqueue one stream computation on it.€<Create a new SVar and enqueue two stream computations on it.Write a stream to an rQ in a non-blocking manner. The stream can then be read back from the SVar using fromSVar.t/Join two computations on the currently running rÿ queue for concurrent execution. When we are using parallel composition, an SVar is passed around as a state variable. We try to schedule a new parallel computation on the SVar passed to us. The first time, when no SVar exists, a new SVar is created. Subsequently, tn may get called when a computation already scheduled on the SVar is further evaluated. For example, when (a ‚ b) is evaluated it calls a t to put a and b! on the current scheduler queue.The uÿ\ required by the current composition context is passed as one of the parameters. If the scheduling and composition style of the new computation being scheduled is different than the style of the current SVar, then we create a new SVar and schedule it on that. The newly created SVar joins as one of the computations on the current SVar queue.+Cases when we need to switch to a new SVar:(x ‚ y) ‚ (t ‚1 u) -- all of them get scheduled on the same SVar(x ‚ y) ‚ (t ƒ u) -- t and uN get scheduled on a new child SVar because of the scheduling policy change.if we adapt a stream of type ƒ to a stream of type Parallel1, we create a new SVar at the transitioning bind.¤When the stream is switching from disjunctive composition to conjunctive composition and vice-versa we create a new SVar to isolate the scheduling of the two.$o„…ru†‡ˆ‰Šv‹ŒŽxyz‘s~€tƒ’‚“”•–—o„…r ˜™š›œ|žŸ u†‡ˆ‰Šv‹Œw¡¢(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone +0;<=>?KQV5ù;>An IO stream whose applicative instance zips streams wAsyncly.>An IO stream whose applicative instance zips streams serially.4A parallely composing IO stream of elements of type a. See  documentation for more details.@A round robin parallely composing IO stream of elements of type a. See  documentation for more details. OA demand driven left biased parallely composing IO stream of elements of type a. See  documentation for more details. 5An interleaving serial IO stream of elements of type a. See ! documentation for more details. 'A serial IO stream of elements of type a. See ! documentation for more details. Like P but zips in parallel, it generates all the elements to be zipped concurrently. main = (toList . 0‘ $ (,,) <$> s1 <*> s2 <*> s3) >>= print where s1 = fromFoldable [1, 2] s2 = fromFoldable [3, 4] s3 = fromFoldable [5, 6]  [(1,3,5),(2,4,6)] The 6 instance of this type works the same way as that of . The applicative instance of } zips a number of streams serially i.e. it produces one element from each stream serially and then zips all those elements. main = (toList . .‘ $ (,,) <$> s1 <*> s2 <*> s3) >>= print where s1 = fromFoldable [1, 2] s2 = fromFoldable [3, 4] s3 = fromFoldable [5, 6]  [(1,3,5),(2,4,6)] The 6 instance of this type works the same way as that of .=Async composition with simultaneous traversal of all streams.The Semigroup instance of  concurrently merges” two streams, running both strictly concurrently and yielding elements from both streams as they arrive. When multiple streams are combined using ‹ each one is evaluated in its own thread and the results produced are presented in the combined stream on a first come first serve basis. and  are concurrent lookahead streamsÿî each with a specific type of consumption pattern (depth first or breadth first). Since they are lookahead, they may introduce certain default latency in starting more concurrent tasks for efficiency reasons or may put a default limitation on the resource consumption (e.g. number of concurrent threads for lookahead). If we look at the implementation detail, they both can share a pool of worker threads to evaluate the streams in the desired pattern and at the desired rate. However, 9 uses a separate runtime thread to evaluate each stream. is similar to z, as both of them evaluate the constituent streams fairly in a round robin fashion. However, the key difference is that ! is lazy or pull driven whereas  is strict or push driven. z immediately starts concurrent evaluation of both the streams (in separate threads) and later picks the results whereas ë may wait for a certain latency threshold before initiating concurrent evaluation of the next stream. The concurrent scheduling of the next stream or the degree of concurrency is driven by the feedback from the consumer. In case of @ each stream is evaluated in a separate thread and results are pushedd to a shared output buffer, the evaluation rate is controlled by blocking when the buffer is full.@Concurrent lookahead streams are generally more efficient than   and can work pretty efficiently even for smaller tasks because they do not necessarily use a separate thread for each task. So they should be preferred over d especially when efficiency is a concern and simultaneous strict evaluation is not a requirement. ÿ] is useful for cases when the streams are required to be evaluated simultaneously irrespective of how the consumer consumes them e.g. when we want to race two tasks and want to start both strictly at the same time or if we have timers in the parallel tasks and our results depend on the timers being started at the same time. We can say that A is almost the same (modulo some implementation differences) as \ when the latter is used with unlimited lookahead and zero latency in initiating lookahead. main = (toList . -; $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print   [1,3,2,4] zWhen streams with more than one element are merged, it yields whichever stream yields first without any bias, unlike the   style streams.ÿŠAny exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.Similarly, the £ instance of  runs all& iterations of the loop concurrently. import Streamly# import Control.Concurrent main =  . -¯ $ do n <- return 3 <> return 2 <> return 1 liftIO $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)  ?ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3 ƒNote that parallel composition can only combine a finite number of streams as it needs to retain state for each unfinished stream.eWide async composition or async composition with breadth first traversal. The Semigroup instance of  concurrently  traverses‚ the composed streams using a depth first travesal or in a round robin fashion, yielding elements from both streams alternately. main = (toList . ,; $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print   [1,3,2,4] ÿŠAny exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.Similarly, the £ instance of  runs all@ iterations fairly concurrently using a round robin scheduling. import Streamly# import Control.Concurrent main =  . ,¯ $ do n <- return 3 <> return 2 <> return 1 liftIO $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)  ?ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3 Unlike L all iterations are guaranteed to run fairly concurrently, unconditionally.žNote that async composition with breadth first traversal can only combine a finite number of streams as it needs to retain state for each unfinished stream.\Deep async composition or async composition with depth first traversal. In a left to right ÿ composition it tries to yield elements from the left stream as long as it can, but it can run the right stream in parallel if it needs to, based on demand. The right stream can be run if the left stream blocks on IO or cannot produce elements fast enough for the consumer. main = (toList . +; $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print   [1,2,3,4] ÿŠAny exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.!Similarly, the monad instance of  may´ run each iteration concurrently based on demand. More concurrent iterations are started only if the previous iterations are not able to produce enough output for the consumer. import Streamly# import Control.Concurrent main =  . +¯ $ do n <- return 3 <> return 2 <> return 1 liftIO $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)  ?ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3 ?All iterations may run in the same thread if they do not block.¥Note that async composition with depth first traversal can be used to combine infinite number of streams as it explores only a bounded number of streams at a time.SWide serial composition or serial composition with a breadth first traversal. The  instance of – traverses the two streams in a breadth first manner. In other words, it interleaves two streams, yielding one element from each stream alternately. main = (toList . ); $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print   [1,3,2,4] Similarly, the £o instance interleaves the iterations of the inner and the outer loop, nesting loops in a breadth first manner. main =  . )\ $ do x <- return 1 <> return 2 y <- return 3 <> return 4 liftIO $ print (x, y)  (1,3) (2,3) (1,4) (2,4) ¡Note that a serial composition with breadth first traversal can only combine a finite number of streams as it needs to retain state for each unfinished stream.ODeep serial composition or serial composition with depth first traversal. The  instance of “ appends two streams serially in a depth first manner, yielding all elements from the first stream, and then all elements from the second stream. main = (toList . (; $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print   [1,2,3,4] The £ instance runs the monadic continuation+ for each element of the stream, serially. main =  . (9 $ do x <- return 1 <> return 2 liftIO $ print x  1 2 0 nests streams serially in a depth first manner. main =  . (\ $ do x <- return 1 <> return 2 y <- return 3 <> return 4 liftIO $ print (x, y)  (1,3) (1,4) (2,3) (2,4) This behavior of Ä is exactly like a list transformer. We call the monadic code being run for each element of the stream a monadic continuation. In imperative paradigm we can think of this composition as nested foro loops and the monadic continuation is the body of the loop. The loop iterates for all elements of the stream.The (: combinator can be omitted as the default stream type is ›. Note that serial composition with depth first traversal can be used to combine an infinite number of streams as it explores only one stream at a time.Same as .DClass of types that can represent a stream of elements of some type a in some monad m.An empty stream. > toList nil [] _Constructs a stream by adding a monadic action at the head of an existing stream. For example: M> toList $ getLine `consM` getLine `consM` nil hello world ["hello","world"] Operator equivalent of . C> toList $ getLine |: getLine |: nil hello world ["hello","world"] VConstruct a stream by adding a pure value at the head of an existing stream. Same as consM . return. For example: 2> toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3] Operator equivalent of . &> toList $ 1 .: 2 .: 3 .: nil [1,2,3] ¤üBuild a stream from its church encoding. The function passed maps directly to the underlying representation of the stream type. The second parameter to the function is the "yield" function yielding a value and the remaining stream if any otherwise ¥9. The third parameter is to represent an "empty" stream.¦2Build a singleton stream from a callback function.§Read an SVar to get a stream.¨×Fold a stream using its church encoding. The second argument is the "step" function consuming an element and the remaining stream, if any. The third argument is for consuming an "empty" stream that yields nothing.ZRun a streaming composition, discard the results. By default it interprets the stream as O, to run other types of streams use the type adapting combinators for example  runStream . +.Same as ©Write a stream to an rQ in a non-blocking manner. The stream can then be read back from the SVar using §.ÿEMake a stream asynchronous, triggers the computation and returns a stream in the underlying monad representing the output generated by the original computation. The returned action is exhaustible and must be drained once. If not drained fully we may have a thread blocked forever and once exhausted it will always return empty. Polymorphic version of the  operation  of €. Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.!Polymorphic version of the  operation  of N. Interleaves two streams, yielding one element from each stream alternately."Same as !.#Polymorphic version of the  operation  of g. Merges two streams possibly concurrently, preferring the elements from the left one when available.$Same as #.%Polymorphic version of the  operation  of F. Merges two streams concurrently choosing elements from both fairly.&Polymorphic version of the  operation  of " Merges two streams concurrently.'AAdapt any specific stream type to any other specific stream type.((Fix the type of a polymorphic stream as .)(Fix the type of a polymorphic stream as .*Same as ).+(Fix the type of a polymorphic stream as .,(Fix the type of a polymorphic stream as .-(Fix the type of a polymorphic stream as ..(Fix the type of a polymorphic stream as ./Same as ..0(Fix the type of a polymorphic stream as  .1Same as 0.2Same as  runStream.3Same as runStream . wSerially.4Same as runStream . asyncly.5Same as runStream . parallely.6Same as runStream . zipping.7Same as runStream . zippingAsync.8 A variant of  that allows you to fold a ª@ container of streams using the specified stream sum operation.  foldWith # $ map return [1..3]9 A variant of «9 that allows you to map a monadic streaming action on a ªF container and then fold it using the specified stream sum operation.  foldMapWith # return [1..3]:Like 9d but with the last two arguments reversed i.e. the monadic streaming function is the last argument.Iru†‡ˆ‰Šv‹Œ~ ¬­¤¦§¨© !"#$%&'()*+,-./0123456789: ®¯°±²³´µ¶·¸¹º»¬­5555"5(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone ;<=>?QV|,2;<Build a Stream by unfolding pure steps starting from a seed.<?Build a Stream by unfolding monadic steps starting from a seed.=Construct a stream from a ª container.>Same as =.?GCreate a singleton stream by executing a monadic action once. Same as  m `consM` nil but more efficient. (> toList $ once getLine hello ["hello"] @1Generate a stream by performing a monadic action n times.ACGenerate a stream by repeatedly executing a monadic action forever.BIIterate a pure function from a seed value, streaming the results forever.CMIterate a monadic function from a seed value, streaming the results forever.D6Read lines from an IO Handle into a stream of Strings.EGLazy right associative fold. For example, to fold a stream into a list: H>> runIdentity $ foldr (:) [] (serially $ fromFoldable [1,2,3]) [1,2,3] FYLazy right fold with a monadic step function. For example, to fold a stream into a list: `>> runIdentity $ foldrM (\x xs -> return (x : xs)) [] (serially $ fromFoldable [1,2,3]) [1,2,3] G3Strict left scan with an extraction function. Like Iy, but applies a user supplied extraction function (the third argument) at each step. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.HIStrict left scan. Like Lê, but returns the folded value at each step, generating a stream of all intermediate fold results. The first element of the stream is the user supplied initial value, and the last element of the stream is the same as the result of L.JÜStrict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.KLStrict left associative fold.MLike J#, but with a monadic step function.NOLike L" but with a monadic step function.PLDecompose a stream into its head and tail. If the stream is empty, returns ¥<. If the stream is non-empty, returns 'Just (a, ma)', where a is the head of the stream and ma its tail.Q*Write a stream of Strings to an IO Handle.R5Convert a stream into a list in the underlying monad.S Take first n/ elements from the stream and discard the rest.T2Include only those elements that pass a predicate.U<End the stream as soon as the predicate fails on an element.VDiscard first n, elements from the stream and take the rest.WdDrop elements in the stream as long as the predicate succeeds and then take the rest of the stream.X?Determine whether all elements of a stream satisfy a predicate.YFDetermine whether any of the elements of a stream satisfy a predicate.Z8Determine the sum of all elements of a stream of numbers[<Determine the product of all elements of a stream of numbers\0Extract the first element of the stream, if any.]8Extract all but the first element of the stream, if any.^/Extract the last element of the stream, if any._&Determine whether the stream is empty.`6Determine whether an element is present in the stream.a:Determine whether an element is not present in the stream.b#Determine the length of the stream.cPReturns the elements of the stream in reverse order. The stream must be finite.d*Determine the minimum element in a stream.e*Determine the maximum element in a stream.f_Replace each element of the stream with the result of a monadic action applied on the element.g[Apply a monadic action to each element of the stream and discard the output of the action.hOReduce a stream of monadic actions to a stream of the output of those actions.i7Zip two streams serially using a pure zipping function.j:Zip two streams serially using a monadic zipping function.k}Zip two streams concurrently (i.e. both the elements being zipped are generated concurrently) using a pure zipping function.l{Zip two streams asyncly (i.e. both the elements being zipped are generated concurrently) using a monadic zipping function.7;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijkl7;<?@ABC=PEFLOJMgRXY\]^_b`aedZ[IGTSUVWcfhijklDQ>HKN(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCNone)6  !"#$%&'()*+,-./0123456789:7 !#%&()+,-.0' 89:234567 */1"$(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.com experimentalGHCSafe‹HmXRun an action forever periodically at the given frequency specified in per second (Hz).nÿàRun a computation on every clock tick, the clock runs at the specified frequency. It allows running a computation at high frequency efficiently by maintaining a local clock and adjusting it with the provided base clock at longer intervals. The first argument is a base clock returning some notion of time in microseconds. The second argument is the frequency in per second (Hz). The third argument is the action to run, the action is provided the local time as an argument.mnmn(c) 2017 Harendra KumarBSD3harendra.kumar@gmail.comNoneŒ_¼   !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz {|}~€‚ƒ„…†&+‡ˆ‰Š‹Œ1.z(ƒŽ‘’“#J$”,0•tv–—€˜™š›œžŸ ¡ {¢£ {¤¥¦§¨ © ª«¬­®¯°±² ³´%streamly-0.2.1-KbXhVsx87QhDbpq25OCmpzStreamlyStreamly.Prelude Streamly.Time Streamly.CoreStreamly.Streams Data.FoldablefoldStreamly.TutorialbaseData.Semigroup<> Semigroupstimessconcat MonadAsyncZipAsync ZipSerialParallelWAsyncAsyncWSerialSerial ZipAsyncM ZipStream ZipSerialM ParallelTWAsyncTAsyncT InterleavedTWSerialTStreamTSerialT StreamingIsStreamnilconsM|:cons.: runStream runStreamingmkAsyncserialwSerial<=>async<|wAsyncparalleladaptserially wSerially interleavingasynclywAsyncly parallely zipSeriallyzipping zipAsyncly zippingAsync runStreamTrunInterleavedT runAsyncT runParallelT runZipStream runZipAsyncfoldWith foldMapWith forEachWithunfoldrunfoldrM fromFoldableeachonce replicateMrepeatMiterateiterateM fromHandlefoldrfoldrMscanxscanscanl'foldxfoldlfoldl'foldxMfoldlMfoldlM'unconstoHandletoListtakefilter takeWhiledrop dropWhileallanysumproductheadtaillastnullelemnotElemlengthreverseminimummaximummapMmapM_sequencezipWithzipWithM zipAsyncWith zipAsyncWithMperiodic withClockStreamGHC.Basepuretransformers-0.5.2.0Control.Monad.Trans.ClassliftSVar fromStreamVarjoinStreamVar2 SVarStyleSVarTag ChildEvent singletonallThreadsDonerunningThreads pushWorkerPar newEmptySVar newStreamVar1 newStreamVar2 toStreamVar SVarSchedLIFOFIFOPar Conjunction Disjunctionrepeatalt withLocalwithCatchError outputQueuedoorBellsirenenqueuerunqueue queueEmpty activeWorkers svarStyle ChildYield ChildStopMonad streamBuildNothing fromCallbackfromSVar streamFoldtoSVarFoldablefoldMaptoStream fromStream getZipAsyncM getZipSerialM getParallelT getWAsyncT getAsyncT getWSerialT getSerialT