-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Beautiful Streaming, Concurrent and Reactive Composition -- -- Streamly, short for streaming concurrently, provides monadic streams, -- with a simple API, almost identical to standard lists, and an in-built -- support for concurrency. By using stream-style combinators on stream -- composition, streams can be generated, merged, chained, mapped, -- zipped, and consumed concurrently – providing a generalized high level -- programming framework unifying streaming and concurrency. Controlled -- concurrency allows even infinite streams to be evaluated concurrently. -- Concurrency is auto scaled based on feedback from the stream consumer. -- The programmer does not have to be aware of threads, locking or -- synchronization to write scalable concurrent programs. -- -- The basic streaming functionality of streamly is equivalent to that -- provided by streaming libraries like vector, streaming, -- pipes, and conduit. In addition to providing streaming -- functionality, streamly subsumes the functionality of list transformer -- libraries like pipes or list-t and also the logic -- programming library logict. On the concurrency side, it -- subsumes the functionality of the async package. Because it -- supports streaming with concurrency we can write FRP applications -- similar in concept to Yampa or reflex. -- -- For file IO, currently the library provides only one API to stream the -- lines in the file as Strings. Future versions will provide better -- streaming file IO options. Streamly interworks with the popular -- streaming libraries, see the interworking section in -- Streamly.Tutorial. -- -- Why use streamly? -- --
-- import qualified Streamly.Prelude as S ---- -- Functions with the suffix M are general functions that work -- on monadic arguments. The corresponding functions without the suffix -- M work on pure arguments and can in general be derived from -- their monadic versions but are provided for convenience and for -- consistency with other pure APIs in the base package. -- -- Functions having a MonadAsync constraint work concurrently when -- used with appropriate stream type combinator. Please be careful to not -- use parallely with infinite streams. -- -- Deconstruction and folds accept a SerialT type instead of a -- polymorphic type to ensure that streams always have a concrete -- monomorphic type by default, reducing type errors. In case you want to -- use any other type of stream you can use one of the type combinators -- provided in the Streamly module to convert the stream type. module Streamly.Prelude -- | An empty stream. -- --
-- > toList nil -- [] --nil :: IsStream t => t m a -- | Constructs a stream by adding a monadic action at the head of an -- existing stream. For example: -- --
-- > toList $ getLine `consM` getLine `consM` nil -- hello -- world -- ["hello","world"] ---- -- Concurrent (do not use parallely to construct infinite -- streams) consM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 `consM` -- | Operator equivalent of consM. We can read it as "parallel -- colon" to remember that | comes before :. -- --
-- > toList $ getLine |: getLine |: nil -- hello -- world -- ["hello","world"] ---- --
-- let delay = threadDelay 1000000 >> print 1 -- runStream $ serially $ delay |: delay |: delay |: nil -- runStream $ parallely $ delay |: delay |: delay |: nil ---- -- Concurrent (do not use parallely to construct infinite -- streams) (|:) :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 |: -- | Construct a stream by adding a pure value at the head of an existing -- stream. For pure values it can be faster than consM. For -- example: -- --
-- > toList $ 1 `cons` 2 `cons` 3 `cons` nil -- [1,2,3] --cons :: IsStream t => a -> t m a -> t m a infixr 5 `cons` -- | Operator equivalent of cons. -- --
-- > toList $ 1 .: 2 .: 3 .: nil -- [1,2,3] --(.:) :: IsStream t => a -> t m a -> t m a infixr 5 .: -- | Build a stream by unfolding a pure step function starting from -- a seed. The step function returns the next element in the stream and -- the next seed value. When it is done it returns Nothing and the -- stream ends. For example, -- --
-- let f b = -- if b > 3 -- then Nothing -- else Just (b, b + 1) -- in toList $ unfoldr f 0 ---- --
-- [0,1,2,3] --unfoldr :: IsStream t => (b -> Maybe (a, b)) -> b -> t m a -- | Build a stream by unfolding a monadic step function starting -- from a seed. The step function returns the next element in the stream -- and the next seed value. When it is done it returns Nothing and -- the stream ends. For example, -- --
-- let f b = -- if b > 3 -- then return Nothing -- else print b >> return (Just (b, b + 1)) -- in runStream $ unfoldrM f 0 ---- --
-- 0 -- 1 -- 2 -- 3 ---- -- When run concurrently, the next unfold step can run concurrently with -- the processing of the output of the previous step. Note that more than -- one step cannot run concurrently as the next step depends on the -- output of the previous step. -- --
-- (asyncly $ S.unfoldrM (\n -> liftIO (threadDelay 1000000) >> return (Just (n, n + 1))) 0) -- & S.foldlM' (\_ a -> threadDelay 1000000 >> print a) () ---- -- Concurrent -- -- Since: 0.1.0 unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a -- | Create a singleton stream by executing a monadic action once. Same as -- m `consM` nil but more efficient. -- --
-- > toList $ once getLine -- hello -- ["hello"] --once :: (IsStream t, Monad m) => m a -> t m a -- | Generate a stream by performing a monadic action n times. -- --
-- runStream $ serially $ S.replicateM 10 $ (threadDelay 1000000 >> print 1) -- runStream $ asyncly $ S.replicateM 10 $ (threadDelay 1000000 >> print 1) ---- -- Concurrent replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a -- | Generate a stream by repeatedly executing a monadic action forever. -- --
-- runStream $ serially $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1) -- runStream $ asyncly $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1) ---- -- Concurrent, infinite (do not use with parallely) repeatM :: (IsStream t, MonadAsync m) => m a -> t m a -- | Iterate a pure function from a seed value, streaming the results -- forever. iterate :: IsStream t => (a -> a) -> a -> t m a -- | Iterate a monadic function from a seed value, streaming the results -- forever. -- -- When run concurrently, the next iteration can run concurrently with -- the processing of the previous iteration. Note that more than one -- iteration cannot run concurrently as the next iteration depends on the -- output of the previous iteration. -- --
-- runStream $ serially $ S.take 10 $ S.iterateM -- (\x -> threadDelay 1000000 >> print x >> return (x + 1)) 0 -- -- runStream $ asyncly $ S.take 10 $ S.iterateM -- (\x -> threadDelay 1000000 >> print x >> return (x + 1)) 0 ---- -- Concurrent iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> a -> t m a -- | Construct a stream from a Foldable containing pure values. fromFoldable :: (IsStream t, Foldable f) => f a -> t m a -- | Construct a stream from a Foldable containing monadic actions. -- --
-- runStream $ serially $ S.fromFoldableM $ replicate 10 (threadDelay 1000000 >> print 1) -- runStream $ asyncly $ S.fromFoldableM $ replicate 10 (threadDelay 1000000 >> print 1) ---- -- Concurrent (do not use with parallely on infinite -- containers) fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a -- | Decompose a stream into its head and tail. If the stream is empty, -- returns Nothing. If the stream is non-empty, returns Just -- (a, ma), where a is the head of the stream and -- ma its tail. uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a)) -- | Lazy right associative fold. For example, to fold a stream into a -- list: -- --
-- >> runIdentity $ foldr (:) [] (serially $ fromFoldable [1,2,3]) -- [1,2,3] --foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b -- | Lazy right fold with a monadic step function. For example, to fold a -- stream into a list: -- --
-- >> runIdentity $ foldrM (\x xs -> return (x : xs)) [] (serially $ fromFoldable [1,2,3]) -- [1,2,3] --foldrM :: Monad m => (a -> b -> m b) -> b -> SerialT m a -> m b -- | Strict left associative fold. foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b -- | Like foldl' but with a monadic step function. foldlM' :: Monad m => (b -> a -> m b) -> b -> SerialT m a -> m b -- | Strict left fold with an extraction function. Like the standard strict -- left fold, but applies a user supplied extraction function (the third -- argument) to the folded value at the end. This is designed to work -- with the foldl library. The suffix x is a mnemonic -- for extraction. foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b -- | Like foldx, but with a monadic step function. foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b -- | Apply a monadic action to each element of the stream and discard the -- output of the action. mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m () -- | Convert a stream into a list in the underlying monad. toList :: Monad m => SerialT m a -> m [a] -- | Determine whether all elements of a stream satisfy a predicate. all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool -- | Determine whether any of the elements of a stream satisfy a predicate. any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool -- | Extract the first element of the stream, if any. head :: Monad m => SerialT m a -> m (Maybe a) -- | Extract all but the first element of the stream, if any. tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) -- | Extract the last element of the stream, if any. last :: Monad m => SerialT m a -> m (Maybe a) -- | Determine whether the stream is empty. null :: Monad m => SerialT m a -> m Bool -- | Determine the length of the stream. length :: Monad m => SerialT m a -> m Int -- | Determine whether an element is present in the stream. elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool -- | Determine whether an element is not present in the stream. notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool -- | Determine the maximum element in a stream. maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) -- | Determine the minimum element in a stream. minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) -- | Determine the sum of all elements of a stream of numbers sum :: (Monad m, Num a) => SerialT m a -> m a -- | Determine the product of all elements of a stream of numbers product :: (Monad m, Num a) => SerialT m a -> m a -- | Strict left scan. Like foldl', but returns the folded value at -- each step, generating a stream of all intermediate fold results. The -- first element of the stream is the user supplied initial value, and -- the last element of the stream is the same as the result of -- foldl'. scanl' :: IsStream t => (b -> a -> b) -> b -> t m a -> t m b -- | Strict left scan with an extraction function. Like scanl', but -- applies a user supplied extraction function (the third argument) at -- each step. This is designed to work with the foldl library. -- The suffix x is a mnemonic for extraction. scanx :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b -- | Include only those elements that pass a predicate. filter :: IsStream t => (a -> Bool) -> t m a -> t m a -- | Take first n elements from the stream and discard the rest. take :: IsStream t => Int -> t m a -> t m a -- | End the stream as soon as the predicate fails on an element. takeWhile :: IsStream t => (a -> Bool) -> t m a -> t m a -- | Discard first n elements from the stream and take the rest. drop :: IsStream t => Int -> t m a -> t m a -- | Drop elements in the stream as long as the predicate succeeds and then -- take the rest of the stream. dropWhile :: IsStream t => (a -> Bool) -> t m a -> t m a -- | Returns the elements of the stream in reverse order. The stream must -- be finite. reverse :: (IsStream t) => t m a -> t m a -- | Replace each element of the stream with the result of a monadic action -- applied on the element. -- --
-- runStream $ S.replicateM 10 (return 1) -- & (serially . S.mapM (\x -> threadDelay 1000000 >> print x)) -- -- runStream $ S.replicateM 10 (return 1) -- & (asyncly . S.mapM (\x -> threadDelay 1000000 >> print x)) ---- -- Concurrent (do not use with parallely on infinite -- streams) mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b -- | Map a Maybe returning function to a stream, filter out the -- Nothing elements, and return a stream of values extracted from -- Just. mapMaybe :: (IsStream t) => (a -> Maybe b) -> t m a -> t m b -- | Like mapMaybe but maps a monadic function. -- -- Concurrent (do not use with parallely on infinite -- streams) mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m)) => (a -> m (Maybe b)) -> t m a -> t m b -- | Reduce a stream of monadic actions to a stream of the output of those -- actions. -- --
-- runStream $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1) -- & (serially . S.sequence) -- -- runStream $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1) -- & (asyncly . S.sequence) ---- -- Concurrent (do not use with parallely on infinite -- streams) sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a -- | Zip two streams serially using a pure zipping function. zipWith :: IsStream t => (a -> b -> c) -> t m a -> t m b -> t m c -- | Zip two streams serially using a monadic zipping function. zipWithM :: IsStream t => (a -> b -> t m c) -> t m a -> t m b -> t m c -- | Zip two streams concurrently (i.e. both the elements being zipped are -- generated concurrently) using a pure zipping function. zipAsyncWith :: (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c -- | Zip two streams asyncly (i.e. both the elements being zipped are -- generated concurrently) using a monadic zipping function. zipAsyncWithM :: (IsStream t, MonadAsync m) => (a -> b -> t m c) -> t m a -> t m b -> t m c -- | Read lines from an IO Handle into a stream of Strings. fromHandle :: (IsStream t, MonadIO m) => Handle -> t m String -- | Write a stream of Strings to an IO Handle. toHandle :: MonadIO m => Handle -> SerialT m String -> m () -- | Same as fromFoldable. -- | Deprecated: Please use fromFoldable instead. each :: (IsStream t, Foldable f) => f a -> t m a -- | Deprecated: Please use scanx instead. scan :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b -- | Deprecated: Please use foldx instead. foldl :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b -- | Deprecated: Please use foldxM instead. foldlM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b -- | The way a list represents a sequence of pure values, a stream -- represents a sequence of monadic actions. The monadic stream API -- offered by Streamly is very close to the Haskell Prelude pure -- lists' API, it can be considered as a natural extension of lists to -- monadic actions. Streamly streams provide concurrent composition and -- merging of streams. It can be considered as a concurrent list -- transformer. In contrast to the Prelude lists, merging or -- appending streams of arbitrary length is scalable and inexpensive. -- -- The basic stream type is Serial, it represents a sequence of IO -- actions, and is a Monad. The type SerialT is a monad -- transformer that can represent a sequence of actions in an arbitrary -- monad. The type Serial is in fact a synonym for SerialT -- IO. There are a few more types similar to SerialT, all of -- them represent a stream and differ only in the Semigroup, -- Applicative and Monad compositions of the stream. -- Serial and WSerial types compose serially whereas -- Async and WAsync types compose concurrently. All these -- types can be freely inter-converted using type combinators without any -- cost. You can freely switch to any type of composition at any point in -- the program. When no type annotation or explicit stream type -- combinators are used, the default stream type is inferred as -- Serial. -- -- Here is a simple console echo program example: -- --
-- > runStream $ S.repeatM getLine & S.mapM putStrLn ---- -- For more details please see the Streamly.Tutorial module and -- the examples directory in this package. -- -- This module exports stream types, instances and some basic operations. -- Functionality exported by this module include: -- --
-- import Streamly --module Streamly -- | A monad that can perform concurrent or parallel IO operations. Streams -- that can be composed concurrently require the underlying monad to be -- MonadAsync. type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) -- | Deep serial composition or serial composition with depth first -- traversal. The Semigroup instance of SerialT appends two -- streams serially in a depth first manner, yielding all elements from -- the first stream, and then all elements from the second stream. -- --
-- import Streamly -- import qualified Streamly.Prelude as S -- -- main = (toList . serially $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print ---- --
-- [1,2,3,4] ---- -- The Monad instance runs the monadic continuation for -- each element of the stream, serially. -- --
-- main = runStream . serially $ do -- x <- return 1 <> return 2 -- S.once $ print x ---- --
-- 1 -- 2 ---- -- SerialT nests streams serially in a depth first manner. -- --
-- main = runStream . serially $ do -- x <- return 1 <> return 2 -- y <- return 3 <> return 4 -- S.once $ print (x, y) ---- --
-- (1,3) -- (1,4) -- (2,3) -- (2,4) ---- -- This behavior of SerialT is exactly like a list transformer. We -- call the monadic code being run for each element of the stream a -- monadic continuation. In imperative paradigm we can think of this -- composition as nested for loops and the monadic continuation -- is the body of the loop. The loop iterates for all elements of the -- stream. -- -- The serially combinator can be omitted as the default stream -- type is SerialT. Note that serial composition with depth first -- traversal can be used to combine an infinite number of streams as it -- explores only one stream at a time. data SerialT m a -- | Wide serial composition or serial composition with a breadth first -- traversal. The Semigroup instance of WSerialT traverses -- the two streams in a breadth first manner. In other words, it -- interleaves two streams, yielding one element from each stream -- alternately. -- --
-- import Streamly -- import qualified Streamly.Prelude as S -- -- main = (toList . wSerially $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print ---- --
-- [1,3,2,4] ---- -- Similarly, the Monad instance interleaves the iterations of the -- inner and the outer loop, nesting loops in a breadth first manner. -- --
-- main = runStream . wSerially $ do -- x <- return 1 <> return 2 -- y <- return 3 <> return 4 -- S.once $ print (x, y) ---- --
-- (1,3) -- (2,3) -- (1,4) -- (2,4) ---- -- Note that a serial composition with breadth first traversal can only -- combine a finite number of streams as it needs to retain state for -- each unfinished stream. data WSerialT m a -- | Deep ahead composition or ahead composition with depth first -- traversal. The semigroup composition of AheadT appends streams -- in a depth first manner just like SerialT except that it can -- produce elements concurrently ahead of time. It is like AsyncT -- except that AsyncT produces the output as it arrives whereas -- AheadT orders the output in the traversal order. -- --
-- main = (toList . aheadly $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print ---- --
-- [1,2,3,4] ---- -- Any exceptions generated by a constituent stream are propagated to the -- output stream. -- -- Similarly, the monad instance of AheadT may run each iteration -- concurrently ahead of time but presents the results in the same order -- as SerialT. -- --
-- import Streamly -- import qualified Streamly.Prelude as S -- import Control.Concurrent -- -- main = runStream . aheadly $ do -- n <- return 3 <> return 2 <> return 1 -- S.once $ do -- threadDelay (n * 1000000) -- myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n) ---- --
-- ThreadId 40: Delay 1 -- ThreadId 39: Delay 2 -- ThreadId 38: Delay 3 ---- -- All iterations may run in the same thread if they do not block. -- -- Note that ahead composition with depth first traversal can be used to -- combine infinite number of streams as it explores only a bounded -- number of streams at a time. data AheadT m a -- | Deep async composition or async composition with depth first -- traversal. In a left to right Semigroup composition it tries to -- yield elements from the left stream as long as it can, but it can run -- the right stream in parallel if it needs to, based on demand. The -- right stream can be run if the left stream blocks on IO or cannot -- produce elements fast enough for the consumer. -- --
-- main = (toList . asyncly $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print ---- --
-- [1,2,3,4] ---- -- Any exceptions generated by a constituent stream are propagated to the -- output stream. The output and exceptions from a single stream are -- guaranteed to arrive in the same order in the resulting stream as they -- were generated in the input stream. However, the relative ordering of -- elements from different streams in the resulting stream can vary -- depending on scheduling and generation delays. -- -- Similarly, the monad instance of AsyncT may run each -- iteration concurrently based on demand. More concurrent iterations are -- started only if the previous iterations are not able to produce enough -- output for the consumer. -- --
-- import Streamly -- import qualified Streamly.Prelude as S -- import Control.Concurrent -- -- main = runStream . asyncly $ do -- n <- return 3 <> return 2 <> return 1 -- S.once $ do -- threadDelay (n * 1000000) -- myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n) ---- --
-- ThreadId 40: Delay 1 -- ThreadId 39: Delay 2 -- ThreadId 38: Delay 3 ---- -- All iterations may run in the same thread if they do not block. -- -- Note that async composition with depth first traversal can be used to -- combine infinite number of streams as it explores only a bounded -- number of streams at a time. data AsyncT m a -- | Wide async composition or async composition with breadth first -- traversal. The Semigroup instance of WAsyncT concurrently -- traverses the composed streams using a depth first travesal or -- in a round robin fashion, yielding elements from both streams -- alternately. -- --
-- main = (toList . wAsyncly $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print ---- --
-- [1,3,2,4] ---- -- Any exceptions generated by a constituent stream are propagated to the -- output stream. The output and exceptions from a single stream are -- guaranteed to arrive in the same order in the resulting stream as they -- were generated in the input stream. However, the relative ordering of -- elements from different streams in the resulting stream can vary -- depending on scheduling and generation delays. -- -- Similarly, the Monad instance of WAsyncT runs all -- iterations fairly concurrently using a round robin scheduling. -- --
-- import Streamly -- import qualified Streamly.Prelude as S -- import Control.Concurrent -- -- main = runStream . wAsyncly $ do -- n <- return 3 <> return 2 <> return 1 -- S.once $ do -- threadDelay (n * 1000000) -- myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n) ---- --
-- ThreadId 40: Delay 1 -- ThreadId 39: Delay 2 -- ThreadId 38: Delay 3 ---- -- Unlike AsyncT all iterations are guaranteed to run fairly -- concurrently, unconditionally. -- -- Note that async composition with breadth first traversal can only -- combine a finite number of streams as it needs to retain state for -- each unfinished stream. data WAsyncT m a -- | Async composition with simultaneous traversal of all streams. -- -- The Semigroup instance of ParallelT concurrently merges -- two streams, running both strictly concurrently and yielding elements -- from both streams as they arrive. When multiple streams are combined -- using ParallelT each one is evaluated in its own thread and the -- results produced are presented in the combined stream on a first come -- first serve basis. -- -- AsyncT and WAsyncT are concurrent lookahead -- streams each with a specific type of consumption pattern (depth -- first or breadth first). Since they are lookahead, they may introduce -- certain default latency in starting more concurrent tasks for -- efficiency reasons or may put a default limitation on the resource -- consumption (e.g. number of concurrent threads for lookahead). If we -- look at the implementation detail, they both can share a pool of -- worker threads to evaluate the streams in the desired pattern and at -- the desired rate. However, ParallelT uses a separate runtime -- thread to evaluate each stream. -- -- WAsyncT is similar to ParallelT, as both of them -- evaluate the constituent streams fairly in a round robin fashion. -- However, the key difference is that WAsyncT is lazy or pull -- driven whereas ParallelT is strict or push driven. -- ParallelT immediately starts concurrent evaluation of both the -- streams (in separate threads) and later picks the results whereas -- WAsyncT may wait for a certain latency threshold before -- initiating concurrent evaluation of the next stream. The concurrent -- scheduling of the next stream or the degree of concurrency is driven -- by the feedback from the consumer. In case of ParallelT each -- stream is evaluated in a separate thread and results are pushed -- to a shared output buffer, the evaluation rate is controlled by -- blocking when the buffer is full. -- -- Concurrent lookahead streams are generally more efficient than -- ParallelT and can work pretty efficiently even for smaller -- tasks because they do not necessarily use a separate thread for each -- task. So they should be preferred over ParallelT especially -- when efficiency is a concern and simultaneous strict evaluation is not -- a requirement. ParallelT is useful for cases when the streams -- are required to be evaluated simultaneously irrespective of how the -- consumer consumes them e.g. when we want to race two tasks and want to -- start both strictly at the same time or if we have timers in the -- parallel tasks and our results depend on the timers being started at -- the same time. We can say that ParallelT is almost the same -- (modulo some implementation differences) as WAsyncT when the -- latter is used with unlimited lookahead and zero latency in initiating -- lookahead. -- --
-- main = (toList . parallely $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print ---- --
-- [1,3,2,4] ---- -- When streams with more than one element are merged, it yields -- whichever stream yields first without any bias, unlike the -- Async style streams. -- -- Any exceptions generated by a constituent stream are propagated to the -- output stream. The output and exceptions from a single stream are -- guaranteed to arrive in the same order in the resulting stream as they -- were generated in the input stream. However, the relative ordering of -- elements from different streams in the resulting stream can vary -- depending on scheduling and generation delays. -- -- Similarly, the Monad instance of ParallelT runs -- all iterations of the loop concurrently. -- --
-- import Streamly -- import qualified Streamly.Prelude as S -- import Control.Concurrent -- -- main = runStream . parallely $ do -- n <- return 3 <> return 2 <> return 1 -- S.once $ do -- threadDelay (n * 1000000) -- myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n) ---- --
-- ThreadId 40: Delay 1 -- ThreadId 39: Delay 2 -- ThreadId 38: Delay 3 ---- -- Note that parallel composition can only combine a finite number of -- streams as it needs to retain state for each unfinished stream. data ParallelT m a -- | The applicative instance of ZipSerialM zips a number of streams -- serially i.e. it produces one element from each stream serially and -- then zips all those elements. -- --
-- main = (toList . zipSerially $ (,,) <$> s1 <*> s2 <*> s3) >>= print -- where s1 = fromFoldable [1, 2] -- s2 = fromFoldable [3, 4] -- s3 = fromFoldable [5, 6] ---- --
-- [(1,3,5),(2,4,6)] ---- -- The Semigroup instance of this type works the same way as that -- of SerialT. data ZipSerialM m a -- | Like ZipSerialM but zips in parallel, it generates all the -- elements to be zipped concurrently. -- --
-- main = (toList . zipAsyncly $ (,,) <$> s1 <*> s2 <*> s3) >>= print -- where s1 = fromFoldable [1, 2] -- s2 = fromFoldable [3, 4] -- s3 = fromFoldable [5, 6] ---- --
-- [(1,3,5),(2,4,6)] ---- -- The Semigroup instance of this type works the same way as that -- of SerialT. data ZipAsyncM m a -- | Run a streaming composition, discard the results. By default it -- interprets the stream as SerialT, to run other types of streams -- use the type adapting combinators for example runStream . -- asyncly. runStream :: Monad m => SerialT m a -> m () -- | Parallel function application operator for streams; just like the -- regular function application operator $ except that it is -- concurrent. The following code prints a value every second even though -- each stage adds a 1 second delay. -- --
-- runStream $ -- S.mapM (\x -> threadDelay 1000000 >> print x) -- |$ S.repeatM (threadDelay 1000000 >> return 1) ---- -- Concurrent (|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 |$ -- | Parallel reverse function application operator for streams; just like -- the regular reverse function application operator & -- except that it is concurrent. -- --
-- runStream $ -- S.repeatM (threadDelay 1000000 >> return 1) -- |& S.mapM (\x -> threadDelay 1000000 >> print x) ---- -- Concurrent (|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 |& -- | Parallel function application operator; applies a run or -- fold function to a stream such that the fold consumer and the -- stream producer run in parallel. A run or fold -- function reduces the stream to a value in the underlying monad. The -- . at the end of the operator is a mnemonic for termination of -- the stream. -- --
-- S.foldlM' (\_ a -> threadDelay 1000000 >> print a) () -- |$. S.repeatM (threadDelay 1000000 >> return 1) ---- -- Concurrent (|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 |$. -- | Parallel reverse function application operator for applying a run or -- fold functions to a stream. Just like |$. except that the -- operands are reversed. -- --
-- S.repeatM (threadDelay 1000000 >> return 1) -- |&. S.foldlM' (\_ a -> threadDelay 1000000 >> print a) () ---- -- Concurrent (|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 |&. -- | Make a stream asynchronous, triggers the computation and returns a -- stream in the underlying monad representing the output generated by -- the original computation. The returned action is exhaustible and must -- be drained once. If not drained fully we may have a thread blocked -- forever and once exhausted it will always return empty. mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a) -- | Polymorphic version of the Semigroup operation <> -- of SerialT. Appends two streams sequentially, yielding all -- elements from the first stream, and then all elements from the second -- stream. serial :: IsStream t => t m a -> t m a -> t m a -- | Polymorphic version of the Semigroup operation <> -- of WSerialT. Interleaves two streams, yielding one element from -- each stream alternately. wSerial :: IsStream t => t m a -> t m a -> t m a -- | Polymorphic version of the Semigroup operation <> -- of AheadT. Merges two streams sequentially but with concurrent -- lookahead. ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a -- | Polymorphic version of the Semigroup operation <> -- of AsyncT. Merges two streams possibly concurrently, preferring -- the elements from the left one when available. async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a -- | Polymorphic version of the Semigroup operation <> -- of WAsyncT. Merges two streams concurrently choosing elements -- from both fairly. wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a -- | Polymorphic version of the Semigroup operation <> -- of ParallelT Merges two streams concurrently. parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a -- | A variant of fold that allows you to fold a Foldable -- container of streams using the specified stream sum operation. -- --
-- foldWith async $ map return [1..3] --foldWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a -- | A variant of foldMap that allows you to map a monadic streaming -- action on a Foldable container and then fold it using the -- specified stream sum operation. -- --
-- foldMapWith async return [1..3] --foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b -- | Like foldMapWith but with the last two arguments reversed i.e. -- the monadic streaming function is the last argument. forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b -- | Class of types that can represent a stream of elements of some type -- a in some monad m. class IsStream t -- | Fix the type of a polymorphic stream as SerialT. serially :: IsStream t => SerialT m a -> t m a -- | Fix the type of a polymorphic stream as WSerialT. wSerially :: IsStream t => WSerialT m a -> t m a -- | Fix the type of a polymorphic stream as AsyncT. asyncly :: IsStream t => AsyncT m a -> t m a -- | Fix the type of a polymorphic stream as AheadT. aheadly :: IsStream t => AheadT m a -> t m a -- | Fix the type of a polymorphic stream as WAsyncT. wAsyncly :: IsStream t => WAsyncT m a -> t m a -- | Fix the type of a polymorphic stream as ParallelT. parallely :: IsStream t => ParallelT m a -> t m a -- | Fix the type of a polymorphic stream as ZipSerialM. zipSerially :: IsStream t => ZipSerialM m a -> t m a -- | Fix the type of a polymorphic stream as ZipAsyncM. zipAsyncly :: IsStream t => ZipAsyncM m a -> t m a -- | Adapt any specific stream type to any other specific stream type. adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a -- | A serial IO stream of elements of type a. See SerialT -- documentation for more details. type Serial a = SerialT IO a -- | An interleaving serial IO stream of elements of type a. See -- WSerialT documentation for more details. type WSerial a = WSerialT IO a -- | A serial IO stream of elements of type a with concurrent -- lookahead. See AheadT documentation for more details. type Ahead a = AheadT IO a -- | A demand driven left biased parallely composing IO stream of elements -- of type a. See AsyncT documentation for more details. type Async a = AsyncT IO a -- | A round robin parallely composing IO stream of elements of type -- a. See WAsyncT documentation for more details. type WAsync a = WAsyncT IO a -- | A parallely composing IO stream of elements of type a. See -- ParallelT documentation for more details. type Parallel a = ParallelT IO a -- | An IO stream whose applicative instance zips streams serially. type ZipSerial a = ZipSerialM IO a -- | An IO stream whose applicative instance zips streams wAsyncly. type ZipAsync a = ZipAsyncM IO a -- | The class of semigroups (types with an associative binary operation). class Semigroup a -- | An associative operation. -- --
-- (a <> b) <> c = a <> (b <> c) ---- -- If a is also a Monoid we further require -- --
-- (<>) = mappend --(<>) :: Semigroup a => a -> a -> a -- | Reduce a non-empty list with <> -- -- The default definition should be sufficient, but this can be -- overridden for efficiency. sconcat :: Semigroup a => NonEmpty a -> a -- | Repeat a value n times. -- -- Given that this works on a Semigroup it is allowed to fail if -- you request 0 or fewer repetitions, and the default definition will do -- so. -- -- By making this a member of the class, idempotent semigroups and -- monoids can upgrade this to execute in O(1) by picking -- stimes = stimesIdempotent or stimes = -- stimesIdempotentMonoid respectively. stimes :: (Semigroup a, Integral b) => b -> a -> a -- | Same as IsStream. -- | Deprecated: Please use IsStream instead. type Streaming = IsStream -- | Same as runStream -- | Deprecated: Please use runStream instead. runStreaming :: (Monad m, IsStream t) => t m a -> m () -- | Same as runStream. -- | Deprecated: Please use runStream instead. runStreamT :: Monad m => SerialT m a -> m () -- | Same as runStream . wSerially. -- | Deprecated: Please use 'runStream . interleaving' instead. runInterleavedT :: Monad m => InterleavedT m a -> m () -- | Same as runStream . asyncly. -- | Deprecated: Please use 'runStream . asyncly' instead. runAsyncT :: Monad m => AsyncT m a -> m () -- | Same as runStream . parallely. -- | Deprecated: Please use 'runStream . parallely' instead. runParallelT :: Monad m => ParallelT m a -> m () -- | Same as runStream . zipping. -- | Deprecated: Please use 'runStream . zipSerially instead. runZipStream :: Monad m => ZipSerialM m a -> m () -- | Same as runStream . zippingAsync. -- | Deprecated: Please use 'runStream . zipAsyncly instead. runZipAsync :: Monad m => ZipAsyncM m a -> m () -- | Deprecated: Please use SerialT instead. type StreamT = SerialT -- | Deprecated: Please use WSerialT instead. type InterleavedT = WSerialT -- | Deprecated: Please use ZipSerialM instead. type ZipStream = ZipSerialM -- | Same as wSerially. -- | Deprecated: Please use wSerially instead. interleaving :: IsStream t => WSerialT m a -> t m a -- | Same as zipSerially. -- | Deprecated: Please use zipSerially instead. zipping :: IsStream t => ZipSerialM m a -> t m a -- | Same as zipAsyncly. -- | Deprecated: Please use zipAsyncly instead. zippingAsync :: IsStream t => ZipAsyncM m a -> t m a -- | Same as wSerial. -- | Deprecated: Please use wSerial instead. (<=>) :: IsStream t => t m a -> t m a -> t m a infixr 5 <=> -- | Same as async. -- | Deprecated: Please use async instead. (<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a -- | Time utilities for reactive programming. module Streamly.Time -- | Run an action forever periodically at the given frequency specified in -- per second (Hz). periodic :: Int -> IO () -> IO () -- | Run a computation on every clock tick, the clock runs at the specified -- frequency. It allows running a computation at high frequency -- efficiently by maintaining a local clock and adjusting it with the -- provided base clock at longer intervals. The first argument is a base -- clock returning some notion of time in microseconds. The second -- argument is the frequency in per second (Hz). The third argument is -- the action to run, the action is provided the local time as an -- argument. withClock :: IO Int -> Int -> (Int -> IO ()) -> IO () -- | Streamly is a general computing framework based on streaming IO. The -- IO monad and pure lists are a special case of streamly. On one hand, -- streamly extends the lists of pure values to lists of monadic actions, -- on the other hand it extends the IO monad with concurrrent -- non-determinism. In simple imperative terms we can say that streamly -- extends the IO monad with for loops and nested for -- loops with concurrency support. You can understand this analogy better -- once you can go through this tutorial. -- -- Streaming in general enables writing modular, composable and scalable -- applications with ease, and concurrency allows you to make them scale -- and perform well. Streamly enables writing scalable concurrent -- applications without being aware of threads or synchronization. No -- explicit thread control is needed, where applicable the concurrency -- rate is automatically controlled based on the demand by the consumer. -- However, combinators can be used to fine tune the concurrency control. -- -- Streaming and concurrency together enable expressing reactive -- applications conveniently. See the CirclingSquare example in -- the examples directory for a simple SDL based FRP example. To -- summarize, streamly provides a unified computing framework for -- streaming, non-determinism and functional reactive programming in an -- elegant and simple API that is a natural extension of pure lists to -- monadic streams. -- -- In this tutorial we will go over the basic concepts and how to use the -- library. See the last section for further reading resources. module Streamly.Tutorial