-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Beautiful Streaming, Concurrent and Reactive Composition -- -- Streamly is a monad transformer unifying non-determinism -- (list-t/logict), concurrency (async), streaming -- (conduit/pipes), and FRP (Yampa/reflex) -- functionality in a concise and intuitive API. High level concurrency -- makes concurrent applications almost indistinguishable from -- non-concurrent ones. By changing a single combinator you can control -- whether the code runs serially or concurrently. It naturally -- integrates concurrency with streaming rather than adding it as an -- afterthought. Moreover, it interworks with the popular streaming -- libraries. -- -- See the README for an overview and the haddock documentation for full -- reference. It is recommended to read the comprehensive tutorial module -- Streamly.Tutorial first. Also see Streamly.Examples for -- some working examples. @package streamly @version 0.1.2 module Streamly.Prelude -- | Represesnts an empty stream just like [] represents an empty -- list. nil :: Streaming t => t m a -- | Constructs a stream by adding a pure value at the head of an existing -- stream, just like : constructs lists. For example: -- --
-- > let stream = 1 `cons` 2 `cons` 3 `cons` nil -- > (toList . serially) stream -- [1,2,3] --cons :: (Streaming t) => a -> t m a -> t m a infixr 5 `cons` -- | Operator equivalent of cons so that you can construct a stream -- of pure values more succinctly like this: -- --
-- > let stream = 1 .: 2 .: 3 .: nil -- > (toList . serially) stream -- [1,2,3] ---- -- .: constructs a stream just like : constructs a list. -- -- Also note that another equivalent way of building streams from pure -- values is: -- --
-- > let stream = pure 1 <> pure 2 <> pure 3 -- > (toList . serially) stream -- [1,2,3] ---- -- In the first method we construct a stream by adding one element at a -- time. In the second method we first construct singleton streams using -- pure and then compose all those streams together using the -- Semigroup style composition of streams. The former method is a -- bit more efficient than the latter. (.:) :: (Streaming t) => a -> t m a -> t m a infixr 5 .: -- | Build a Stream by unfolding pure steps starting from a seed. unfoldr :: Streaming t => (b -> Maybe (a, b)) -> b -> t m a -- | Build a Stream by unfolding monadic steps starting from a seed. unfoldrM :: (Streaming t, Monad m) => (b -> m (Maybe (a, b))) -> b -> t m a -- | Same as foldWith (<>) but more efficient. each :: (Streaming t, Foldable f) => f a -> t m a -- | Iterate a pure function from a seed value, streaming the results -- forever iterate :: Streaming t => (a -> a) -> a -> t m a -- | Iterate a monadic function from a seed value, streaming the results -- forever iterateM :: (Streaming t, Monad m) => (a -> m a) -> a -> t m a -- | Right fold. foldr :: (Streaming t, Monad m) => (a -> b -> b) -> b -> t m a -> m b -- | Right fold with a monadic step function. See toList for an -- example use. foldrM :: Streaming t => (a -> m b -> m b) -> m b -> t m a -> m b -- | Scan left. A strict left fold which accumulates the result of its -- reduction steps inside a stream, from left. scan :: Streaming t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b -- | Strict left fold. This is typed to work with the foldl package. To use -- it normally just pass id as the third argument. foldl :: (Streaming t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b -- | Strict left fold, with monadic step function. This is typed to work -- with the foldl package. To use directly pass id as the third -- argument. foldlM :: (Streaming t, Monad m) => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b -- | Decompose a stream into its head and tail. If the stream is empty, -- returns Nothing. If the stream is non-empty, returns 'Just (a, -- ma)', where a is the head of the stream and ma its -- tail. uncons :: (Streaming t, Monad m) => t m a -> m (Maybe (a, t m a)) -- | Convert a stream into a list in the underlying monad. toList :: (Streaming t, Monad m) => t m a -> m [a] -- | Determine whether all elements of a stream satisfy a predicate. all :: (Streaming t, Monad m) => (a -> Bool) -> t m a -> m Bool -- | Determine whether any of the elements of a stream satisfy a predicate. any :: (Streaming t, Monad m) => (a -> Bool) -> t m a -> m Bool -- | Extract the first element of the stream, if any. head :: (Streaming t, Monad m) => t m a -> m (Maybe a) -- | Extract all but the first element of the stream, if any. tail :: (Streaming t, Monad m) => t m a -> m (Maybe (t m a)) -- | Extract the last element of the stream, if any. last :: (Streaming t, Monad m) => t m a -> m (Maybe a) -- | Determine whether the stream is empty. null :: (Streaming t, Monad m) => t m a -> m Bool -- | Determine the length of the stream. length :: (Streaming t, Monad m) => t m a -> m Int -- | Determine whether an element is present in the stream. elem :: (Streaming t, Monad m, Eq a) => a -> t m a -> m Bool -- | Determine whether an element is not present in the stream. notElem :: (Streaming t, Monad m, Eq a) => a -> t m a -> m Bool -- | Returns the elements of the stream in reverse order. The stream must -- be finite. reverse :: (Streaming t) => t m a -> t m a -- | Determine the maximum element in a stream. maximum :: (Streaming t, Monad m, Ord a) => t m a -> m (Maybe a) -- | Determine the minimum element in a stream. minimum :: (Streaming t, Monad m, Ord a) => t m a -> m (Maybe a) -- | Determine the sum of all elements of a stream of numbers sum :: (Streaming t, Monad m, Num a) => t m a -> m a -- | Determine the product of all elements of a stream of numbers product :: (Streaming t, Monad m, Num a) => t m a -> m a -- | Include only those elements that pass a predicate. filter :: Streaming t => (a -> Bool) -> t m a -> t m a -- | Take first n elements from the stream and discard the rest. take :: Streaming t => Int -> t m a -> t m a -- | End the stream as soon as the predicate fails on an element. takeWhile :: Streaming t => (a -> Bool) -> t m a -> t m a -- | Discard first n elements from the stream and take the rest. drop :: Streaming t => Int -> t m a -> t m a -- | Drop elements in the stream as long as the predicate succeeds and then -- take the rest of the stream. dropWhile :: Streaming t => (a -> Bool) -> t m a -> t m a -- | Replace each element of the stream with the result of a monadic action -- applied on the element. mapM :: (Streaming t, Monad m) => (a -> m b) -> t m a -> t m b -- | Apply a monadic action to each element of the stream and discard the -- output of the action. mapM_ :: (Streaming t, Monad m) => (a -> m b) -> t m a -> m () -- | Reduce a stream of monadic actions to a stream of the output of those -- actions. sequence :: (Streaming t, Monad m) => t m (m a) -> t m a -- | Generate a stream by performing an action n times. replicateM :: (Streaming t, Monad m) => Int -> m a -> t m a -- | Zip two streams serially using a pure zipping function. zipWith :: Streaming t => (a -> b -> c) -> t m a -> t m b -> t m c -- | Zip two streams serially using a monadic zipping function. zipWithM :: Streaming t => (a -> b -> t m c) -> t m a -> t m b -> t m c -- | Zip two streams asyncly (i.e. both the elements being zipped are -- generated concurrently) using a pure zipping function. zipAsyncWith :: (Streaming t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c -- | Zip two streams asyncly (i.e. both the elements being zipped are -- generated concurrently) using a monadic zipping function. zipAsyncWithM :: (Streaming t, MonadAsync m) => (a -> b -> t m c) -> t m a -> t m b -> t m c -- | Read lines from an IO Handle into a stream of Strings. fromHandle :: (Streaming t, MonadIO m) => Handle -> t m String -- | Write a stream of Strings to an IO Handle. toHandle :: (Streaming t, MonadIO m) => Handle -> t m String -> m () module Streamly -- | A monad that can perform asynchronous/concurrent IO operations. -- Streams that can be composed concurrently require the underlying monad -- to be MonadAsync. type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) -- | Class of types that can represent a stream of elements of some type -- a in some monad m. class Streaming t -- | The Monad instance of StreamT runs the monadic -- continuation for each element of the stream, serially. -- --
-- main = runStreamT $ do -- x <- return 1 <> return 2 -- liftIO $ print x ---- --
-- 1 -- 2 ---- -- StreamT nests streams serially in a depth first manner. -- --
-- main = runStreamT $ do -- x <- return 1 <> return 2 -- y <- return 3 <> return 4 -- liftIO $ print (x, y) ---- --
-- (1,3) -- (1,4) -- (2,3) -- (2,4) ---- -- This behavior is exactly like a list transformer. We call the monadic -- code being run for each element of the stream a monadic continuation. -- In imperative paradigm we can think of this composition as nested -- for loops and the monadic continuation is the body of the -- loop. The loop iterates for all elements of the stream. data StreamT m a -- | Like StreamT but different in nesting behavior. It fairly -- interleaves the iterations of the inner and the outer loop, nesting -- loops in a breadth first manner. -- --
-- main = runInterleavedT $ do -- x <- return 1 <> return 2 -- y <- return 3 <> return 4 -- liftIO $ print (x, y) ---- --
-- (1,3) -- (2,3) -- (1,4) -- (2,4) --data InterleavedT m a -- | Like StreamT but may run each iteration concurrently -- using demand driven concurrency. More concurrent iterations are -- started only if the previous iterations are not able to produce enough -- output for the consumer. -- --
-- import Streamly -- import Control.Concurrent -- -- main = runAsyncT $ do -- n <- return 3 <> return 2 <> return 1 -- liftIO $ do -- threadDelay (n * 1000000) -- myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n) ---- --
-- ThreadId 40: Delay 1 -- ThreadId 39: Delay 2 -- ThreadId 38: Delay 3 ---- -- All iterations may run in the same thread if they do not block. data AsyncT m a -- | Like StreamT but runs all iterations fairly concurrently -- using a round robin scheduling. -- --
-- import Streamly -- import Control.Concurrent -- -- main = runParallelT $ do -- n <- return 3 <> return 2 <> return 1 -- liftIO $ do -- threadDelay (n * 1000000) -- myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n) ---- --
-- ThreadId 40: Delay 1 -- ThreadId 39: Delay 2 -- ThreadId 38: Delay 3 ---- -- Unlike AsyncT all iterations are guaranteed to run fairly -- concurrently, unconditionally. data ParallelT m a -- | ZipStream zips serially i.e. it produces one element from each -- stream serially and then zips the two elements. Note, for convenience -- we have used the zipping combinator in the following example -- instead of using a type annotation. -- --
-- main = (toList . zipping $ (,) <$> s1 <*> s2) >>= print -- where s1 = pure 1 <> pure 2 -- s2 = pure 3 <> pure 4 ---- --
-- [(1,3),(2,4)] ---- -- This applicative operation can be seen as the zipping equivalent of -- interleaving with <=>. data ZipStream m a -- | Like ZipStream but zips in parallel, it generates both the -- elements to be zipped concurrently. -- --
-- main = (toList . zippingAsync $ (,) <$> s1 <*> s2) >>= print -- where s1 = pure 1 <> pure 2 -- s2 = pure 3 <> pure 4 ---- --
-- [(1,3),(2,4)] ---- -- This applicative operation can be seen as the zipping equivalent of -- parallel composition with <|>. data ZipAsync m a -- | Sequential interleaved composition, in contrast to <> -- this operator fairly interleaves two streams instead of appending -- them; yielding one element from each stream alternately. -- --
-- main = (toList . serially $ (return 1 <> return 2) <=> (return 3 <> return 4)) >>= print ---- --
-- [1,3,2,4] ---- -- This operator corresponds to the InterleavedT style. Unlike -- <>, this operator cannot be used to fold infinite -- containers since that might accumulate too many partially drained -- streams. To be clear, it can combine infinite streams but not infinite -- number of streams. (<=>) :: Streaming t => t m a -> t m a -> t m a infixr 5 <=> -- | Demand driven concurrent composition. In contrast to <|> -- this operator concurrently "merges" streams in a left biased manner -- rather than fairly interleaving them. It keeps yielding from the -- stream on the left as long as it can. If the left stream blocks or -- cannot keep up with the pace of the consumer it can concurrently yield -- from the stream on the right in parallel. -- --
-- main = (toList . serially $ (return 1 <> return 2) <| (return 3 <> return 4)) >>= print ---- --
-- [1,2,3,4] ---- -- Unlike <|> it can be used to fold infinite containers of -- streams. This operator corresponds to the AsyncT type for -- product style composition. (<|) :: (Streaming t, MonadAsync m) => t m a -> t m a -> t m a -- | Make a stream asynchronous, triggers the computation and returns a -- stream in the underlying monad representing the output generated by -- the original computation. The returned action is exhaustible and must -- be drained once. If not drained fully we may have a thread blocked -- forever and once exhausted it will always return empty. async :: (Streaming t, MonadAsync m) => t m a -> m (t m a) -- | Interpret an ambiguously typed stream as StreamT. serially :: StreamT m a -> StreamT m a -- | Interpret an ambiguously typed stream as InterleavedT. interleaving :: InterleavedT m a -> InterleavedT m a -- | Interpret an ambiguously typed stream as AsyncT. asyncly :: AsyncT m a -> AsyncT m a -- | Interpret an ambiguously typed stream as ParallelT. parallely :: ParallelT m a -> ParallelT m a -- | Interpret an ambiguously typed stream as ZipStream. zipping :: ZipStream m a -> ZipStream m a -- | Interpret an ambiguously typed stream as ZipAsync. zippingAsync :: ZipAsync m a -> ZipAsync m a -- | Adapt one streaming type to another. adapt :: (Streaming t1, Streaming t2) => t1 m a -> t2 m a -- | Run a streaming composition, discard the results. runStreaming :: (Monad m, Streaming t) => t m a -> m () -- | Same as runStreaming . serially. runStreamT :: Monad m => StreamT m a -> m () -- | Same as runStreaming . interleaving. runInterleavedT :: Monad m => InterleavedT m a -> m () -- | Same as runStreaming . asyncly. runAsyncT :: Monad m => AsyncT m a -> m () -- | Same as runStreaming . parallely. runParallelT :: Monad m => ParallelT m a -> m () -- | Same as runStreaming . zipping. runZipStream :: Monad m => ZipStream m a -> m () -- | Same as runStreaming . zippingAsync. runZipAsync :: Monad m => ZipAsync m a -> m () -- | Like the Prelude fold but allows you to specify a -- binary sum style stream composition operator to fold a container of -- streams. -- --
-- foldWith (<>) $ map return [1..3] --foldWith :: (Streaming t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a -- | Like foldMap but allows you to specify a binary sum style -- composition operator to fold a container of streams. Maps a monadic -- streaming action on the container before folding it. -- --
-- foldMapWith (<>) return [1..3] --foldMapWith :: (Streaming t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b -- | Like foldMapWith but with the last two arguments reversed i.e. -- the monadic streaming function is the last argument. forEachWith :: (Streaming t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b -- | The class of monoids (types with an associative binary operation that -- has an identity). Instances should satisfy the following laws: -- --
mappend mempty x = x
mappend x mempty = x
mappend x (mappend y z) = mappend (mappend x y) z
mconcat = foldr mappend mempty
-- (a <> b) <> c = a <> (b <> c) ---- -- If a is also a Monoid we further require -- --
-- (<>) = mappend --(<>) :: Semigroup a => a -> a -> a -- | Reduce a non-empty list with <> -- -- The default definition should be sufficient, but this can be -- overridden for efficiency. sconcat :: Semigroup a => NonEmpty a -> a -- | Repeat a value n times. -- -- Given that this works on a Semigroup it is allowed to fail if -- you request 0 or fewer repetitions, and the default definition will do -- so. -- -- By making this a member of the class, idempotent semigroups and -- monoids can upgrade this to execute in O(1) by picking -- stimes = stimesIdempotent or stimes = -- stimesIdempotentMonoid respectively. stimes :: (Semigroup a, Integral b) => b -> a -> a -- | A monoid on applicative functors. -- -- If defined, some and many should be the least solutions -- of the equations: -- -- class Applicative f => Alternative (f :: * -> *) -- | The identity of <|> empty :: Alternative f => f a -- | An associative binary operation (<|>) :: Alternative f => f a -> f a -> f a -- | One or more. some :: Alternative f => f a -> f [a] -- | Zero or more. many :: Alternative f => f a -> f [a] -- | Monads that also support choice and failure. class (Alternative m, Monad m) => MonadPlus (m :: * -> *) -- | the identity of mplus. It should also satisfy the equations -- --
-- mzero >>= f = mzero -- v >> mzero = mzero --mzero :: MonadPlus m => m a -- | an associative operation mplus :: MonadPlus m => m a -> m a -> m a -- | Monads in which IO computations may be embedded. Any monad -- built by applying a sequence of monad transformers to the IO -- monad will be an instance of this class. -- -- Instances should satisfy the following laws, which state that -- liftIO is a transformer of monads: -- -- class Monad m => MonadIO (m :: * -> *) -- | Lift a computation from the IO monad. liftIO :: MonadIO m => IO a -> m a -- | The class of monad transformers. Instances should satisfy the -- following laws, which state that lift is a monad -- transformation: -- -- class MonadTrans (t :: (* -> *) -> * -> *) -- | Lift a computation from the argument monad to the constructed monad. lift :: (MonadTrans t, Monad m) => m a -> t m a -- | Time utilities for reactive programming. module Streamly.Time -- | Run an action forever periodically at the given frequency specified in -- per second (Hz). periodic :: Int -> IO () -> IO () -- | Run a computation on every clock tick, the clock runs at the specified -- frequency. It allows running a computation at high frequency -- efficiently by maintaining a local clock and adjusting it with the -- provided base clock at longer intervals. The first argument is a base -- clock returning some notion of time in microseconds. The second -- argument is the frequency in per second (Hz). The third argument is -- the action to run, the action is provided the local time as an -- argument. withClock :: IO Int -> Int -> (Int -> IO ()) -> IO () -- | Streamly, short for stream concurrently, combines the essence of -- non-determinism, streaming and concurrency in functional programming. -- Concurrent and non-concurrent applications are almost -- indistinguisable, concurrency capability does not at all impact the -- performance of non-concurrent case. Streaming enables writing modular, -- composable and scalable applications with ease and concurrency allows -- you to make them scale and perform well. Streamly enables writing -- concurrent applications without being aware of threads or -- synchronization. No explicit thread control is needed, where -- applicable the concurrency rate is automatically controlled based on -- the demand by the consumer. However, combinators are provided to fine -- tune the concurrency control. Streaming and concurrency together -- enable expressing reactive applications conveniently. See -- Streamly.Examples for a simple SDL based FRP example. -- -- Streamly streams are very much like the Haskell lists and most of the -- functions that work on lists have a counterpart that works on streams. -- However, streamly streams can be generated, consumed or combined -- concurrently. In this tutorial we will go over the basic concepts and -- how to use the library. The documentation of Streamly module -- has more details on core APIs. For more APIs for constructing, -- folding, filtering, mapping and zipping etc. see the documentation of -- Streamly.Prelude module. For examples and other ways to use the -- library see the module Streamly.Examples as well. module Streamly.Tutorial