{-# LANGUAGE BangPatterns #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE UndecidableInstances #-} -- XXX -- | -- Module : Streamly.Prelude -- Copyright : (c) 2017 Harendra Kumar -- -- License : BSD3 -- Maintainer : harendra.kumar@gmail.com -- Stability : experimental -- Portability : GHC -- -- This module is designed to be imported qualified: -- -- @ -- import qualified Streamly.Prelude as S -- @ -- -- Functions with the suffix @M@ are general functions that work on monadic -- arguments. The corresponding functions without the suffix @M@ work on pure -- arguments and can in general be derived from their monadic versions but are -- provided for convenience and for consistency with other pure APIs in the -- @base@ package. -- -- Deconstruction and folds accept a 'SerialT' type instead of a polymorphic -- type to ensure that streams always have a concrete monomorphic type by -- default, reducing type errors. In case you want to use any other type of -- stream you can use one of the type combinators provided in the "Streamly" -- module to convert the stream type. module Streamly.Prelude ( -- * Construction -- | Primitives to construct or inspect a stream. nil , consM , (|:) , cons , (.:) -- * General Unfold , unfoldr , unfoldrM -- * Special Generation -- | Generate a monadic stream from an input structure, a seed or a -- generation function. , once , replicateM , repeatM , iterate , iterateM , fromFoldable -- * Deconstruction , uncons -- * Folding -- ** General Folds , foldr , foldrM , foldl' , foldlM' , foldx , foldxM -- ** Special Folds , mapM_ , toList , all , any , head , tail , last , null , length , elem , notElem , maximum , minimum , sum , product -- * Scans , scanl' , scanx -- * Filtering , filter , take , takeWhile , drop , dropWhile -- * Reordering , reverse -- * Mapping , mapM , sequence -- * Zipping , zipWith , zipWithM , zipAsyncWith , zipAsyncWithM -- * IO , fromHandle , toHandle -- * Deprecated , each , scan , foldl , foldlM ) where import Control.Monad (void) import Control.Monad.IO.Class (MonadIO(..)) import Data.Semigroup (Semigroup(..)) import Prelude hiding (filter, drop, dropWhile, take, takeWhile, zipWith, foldr, foldl, mapM, mapM_, sequence, all, any, sum, product, elem, notElem, maximum, minimum, head, last, tail, length, null, reverse, iterate) import qualified Prelude import qualified System.IO as IO import qualified Streamly.Core as S import Streamly.Core (Stream(Stream)) import Streamly.Streams ------------------------------------------------------------------------------ -- Construction ------------------------------------------------------------------------------ -- | Build a Stream by unfolding pure steps starting from a seed. -- -- @since 0.1.0 unfoldr :: IsStream t => (b -> Maybe (a, b)) -> b -> t m a unfoldr step = fromStream . go where go s = Stream $ \_ stp _ yld -> case step s of Nothing -> stp Just (a, b) -> yld a (go b) -- | Build a Stream by unfolding monadic steps starting from a seed. -- -- @since 0.1.0 unfoldrM :: (IsStream t, Monad m) => (b -> m (Maybe (a, b))) -> b -> t m a unfoldrM step = fromStream . go where go s = Stream $ \_ stp _ yld -> do mayb <- step s case mayb of Nothing -> stp Just (a, b) -> yld a (go b) -- | Construct a stream from a 'Foldable' container. -- -- @since 0.2.0 {-# INLINE fromFoldable #-} fromFoldable :: (IsStream t, Foldable f) => f a -> t m a fromFoldable = Prelude.foldr cons nil -- | Same as 'fromFoldable'. -- -- @since 0.1.0 {-# DEPRECATED each "Please use fromFoldable instead." #-} {-# INLINE each #-} each :: (IsStream t, Foldable f) => f a -> t m a each = fromFoldable -- | Create a singleton stream by executing a monadic action once. Same as -- @m \`consM` nil@ but more efficient. -- -- @ -- > toList $ once getLine -- hello -- ["hello"] -- @ -- -- @since 0.2.0 once :: (IsStream t, Monad m) => m a -> t m a once = fromStream . S.once -- | Generate a stream by performing a monadic action @n@ times. -- -- @since 0.1.1 replicateM :: (IsStream t, Monad m) => Int -> m a -> t m a replicateM n m = fromStream $ go n where go cnt = Stream $ \_ stp _ yld -> if cnt <= 0 then stp else m >>= \a -> yld a (go (cnt - 1)) -- | Generate a stream by repeatedly executing a monadic action forever. -- -- @since 0.2.0 repeatM :: (IsStream t, Monad m) => m a -> t m a repeatM = fromStream . go where go m = Stream $ \_ _ _ yld -> m >>= \a -> yld a (go m) -- | Iterate a pure function from a seed value, streaming the results forever. -- -- @since 0.1.2 iterate :: IsStream t => (a -> a) -> a -> t m a iterate step = fromStream . go where go s = S.cons s (go (step s)) -- | Iterate a monadic function from a seed value, streaming the results -- forever. -- -- @since 0.1.2 iterateM :: (IsStream t, Monad m) => (a -> m a) -> a -> t m a iterateM step = fromStream . go where go s = Stream $ \_ _ _ yld -> do a <- step s yld s (go a) -- | Read lines from an IO Handle into a stream of Strings. -- -- @since 0.1.0 fromHandle :: (IsStream t, MonadIO m) => IO.Handle -> t m String fromHandle h = fromStream go where go = Stream $ \_ stp _ yld -> do eof <- liftIO $ IO.hIsEOF h if eof then stp else do str <- liftIO $ IO.hGetLine h yld str go ------------------------------------------------------------------------------ -- Elimination ------------------------------------------------------------------------------ -- | Lazy right associative fold. For example, to fold a stream into a list: -- -- @ -- >> runIdentity $ foldr (:) [] (serially $ fromFoldable [1,2,3]) -- [1,2,3] -- @ -- -- @since 0.1.0 foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b foldr step acc m = go (toStream m) where go m1 = let stop = return acc single a = return (step a acc) yield a r = go r >>= \b -> return (step a b) in (S.runStream m1) Nothing stop single yield -- | Lazy right fold with a monadic step function. For example, to fold a -- stream into a list: -- -- @ -- >> runIdentity $ foldrM (\\x xs -> return (x : xs)) [] (serially $ fromFoldable [1,2,3]) -- [1,2,3] -- @ -- -- @since 0.2.0 {-# INLINE foldrM #-} foldrM :: Monad m => (a -> b -> m b) -> b -> SerialT m a -> m b foldrM step acc m = go (toStream m) where go m1 = let stop = return acc single a = step a acc yield a r = go r >>= step a in (S.runStream m1) Nothing stop single yield -- | Strict left scan with an extraction function. Like 'scanl'', but applies a -- user supplied extraction function (the third argument) at each step. This is -- designed to work with the @foldl@ library. The suffix @x@ is a mnemonic for -- extraction. -- -- @since 0.2.0 {-# INLINE scanx #-} scanx :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b scanx step begin done m = cons (done begin) $ fromStream $ go (toStream m) begin where go m1 !acc = Stream $ \_ stp sng yld -> let single a = sng (done $ step acc a) yield a r = let s = step acc a in yld (done s) (go r s) in S.runStream m1 Nothing stp single yield -- | -- @since 0.1.1 {-# DEPRECATED scan "Please use scanx instead." #-} scan :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b scan = scanx -- | Strict left scan. Like 'foldl'', but returns the folded value at each -- step, generating a stream of all intermediate fold results. The first -- element of the stream is the user supplied initial value, and the last -- element of the stream is the same as the result of 'foldl''. -- -- @since 0.2.0 {-# INLINE scanl' #-} scanl' :: IsStream t => (b -> a -> b) -> b -> t m a -> t m b scanl' step begin m = scanx step begin id m -- | Strict left fold with an extraction function. Like the standard strict -- left fold, but applies a user supplied extraction function (the third -- argument) to the folded value at the end. This is designed to work with the -- @foldl@ library. The suffix @x@ is a mnemonic for extraction. -- -- @since 0.2.0 {-# INLINE foldx #-} foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b foldx step begin done m = get $ go (toStream m) begin where {-# NOINLINE get #-} get m1 = let single = return . done in (S.runStream m1) Nothing undefined single undefined -- Note, this can be implemented by making a recursive call to "go", -- however that is more expensive because of unnecessary recursion -- that cannot be tail call optimized. Unfolding recursion explicitly via -- continuations is much more efficient. go m1 !acc = Stream $ \_ _ sng yld -> let stop = sng acc single a = sng $ step acc a yield a r = let stream = go r (step acc a) in (S.runStream stream) Nothing undefined sng yld in (S.runStream m1) Nothing stop single yield -- | -- @since 0.1.0 {-# DEPRECATED foldl "Please use foldx instead." #-} foldl :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b foldl = foldx -- | Strict left associative fold. -- -- @since 0.2.0 {-# INLINE foldl' #-} foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b foldl' step begin m = foldx step begin id m -- XXX replace the recursive "go" with explicit continuations. -- | Like 'foldx', but with a monadic step function. -- -- @since 0.2.0 foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b foldxM step begin done m = go begin (toStream m) where go !acc m1 = let stop = acc >>= done single a = acc >>= \b -> step b a >>= done yield a r = acc >>= \b -> go (step b a) r in (S.runStream m1) Nothing stop single yield -- | -- @since 0.1.0 {-# DEPRECATED foldlM "Please use foldxM instead." #-} foldlM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b foldlM = foldxM -- | Like 'foldl'' but with a monadic step function. -- -- @since 0.2.0 foldlM' :: Monad m => (b -> a -> m b) -> b -> SerialT m a -> m b foldlM' step begin m = foldxM step (return begin) return m -- | Decompose a stream into its head and tail. If the stream is empty, returns -- 'Nothing'. If the stream is non-empty, returns 'Just (a, ma)', where 'a' is -- the head of the stream and 'ma' its tail. -- -- @since 0.1.0 uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a)) uncons m = let stop = return Nothing single a = return (Just (a, nil)) yield a r = return (Just (a, fromStream r)) in (S.runStream (toStream m)) Nothing stop single yield -- | Write a stream of Strings to an IO Handle. -- -- @since 0.1.0 toHandle :: MonadIO m => IO.Handle -> SerialT m String -> m () toHandle h m = go (toStream m) where go m1 = let stop = return () single a = liftIO (IO.hPutStrLn h a) yield a r = liftIO (IO.hPutStrLn h a) >> go r in (S.runStream m1) Nothing stop single yield ------------------------------------------------------------------------------ -- Special folds ------------------------------------------------------------------------------ -- | Convert a stream into a list in the underlying monad. -- -- @since 0.1.0 {-# INLINABLE toList #-} toList :: Monad m => SerialT m a -> m [a] toList = foldrM (\a xs -> return (a : xs)) [] -- | Take first 'n' elements from the stream and discard the rest. -- -- @since 0.1.0 {-# INLINE take #-} take :: IsStream t => Int -> t m a -> t m a take n m = fromStream $ go n (toStream m) where go n1 m1 = Stream $ \ctx stp sng yld -> let yield a r = yld a (go (n1 - 1) r) in if n1 <= 0 then stp else (S.runStream m1) ctx stp sng yield -- | Include only those elements that pass a predicate. -- -- @since 0.1.0 {-# INLINE filter #-} filter :: IsStream t => (a -> Bool) -> t m a -> t m a filter p m = fromStream $ go (toStream m) where go m1 = Stream $ \ctx stp sng yld -> let single a | p a = sng a | otherwise = stp yield a r | p a = yld a (go r) | otherwise = (S.runStream r) ctx stp single yield in (S.runStream m1) ctx stp single yield -- | End the stream as soon as the predicate fails on an element. -- -- @since 0.1.0 {-# INLINE takeWhile #-} takeWhile :: IsStream t => (a -> Bool) -> t m a -> t m a takeWhile p m = fromStream $ go (toStream m) where go m1 = Stream $ \ctx stp sng yld -> let single a | p a = sng a | otherwise = stp yield a r | p a = yld a (go r) | otherwise = stp in (S.runStream m1) ctx stp single yield -- | Discard first 'n' elements from the stream and take the rest. -- -- @since 0.1.0 drop :: IsStream t => Int -> t m a -> t m a drop n m = fromStream $ go n (toStream m) where go n1 m1 = Stream $ \ctx stp sng yld -> let single _ = stp yield _ r = (S.runStream $ go (n1 - 1) r) ctx stp sng yld -- Somehow "<=" check performs better than a ">" in if n1 <= 0 then (S.runStream m1) ctx stp sng yld else (S.runStream m1) ctx stp single yield -- | Drop elements in the stream as long as the predicate succeeds and then -- take the rest of the stream. -- -- @since 0.1.0 {-# INLINE dropWhile #-} dropWhile :: IsStream t => (a -> Bool) -> t m a -> t m a dropWhile p m = fromStream $ go (toStream m) where go m1 = Stream $ \ctx stp sng yld -> let single a | p a = stp | otherwise = sng a yield a r | p a = (S.runStream r) ctx stp single yield | otherwise = yld a r in (S.runStream m1) ctx stp single yield -- | Determine whether all elements of a stream satisfy a predicate. -- -- @since 0.1.0 all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool all p m = go (toStream m) where go m1 = let single a | p a = return True | otherwise = return False yield a r | p a = go r | otherwise = return False in (S.runStream m1) Nothing (return True) single yield -- | Determine whether any of the elements of a stream satisfy a predicate. -- -- @since 0.1.0 any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool any p m = go (toStream m) where go m1 = let single a | p a = return True | otherwise = return False yield a r | p a = return True | otherwise = go r in (S.runStream m1) Nothing (return False) single yield -- | Determine the sum of all elements of a stream of numbers -- -- @since 0.1.0 sum :: (Monad m, Num a) => SerialT m a -> m a sum = foldl (+) 0 id -- | Determine the product of all elements of a stream of numbers -- -- @since 0.1.1 product :: (Monad m, Num a) => SerialT m a -> m a product = foldl (*) 1 id -- | Extract the first element of the stream, if any. -- -- @since 0.1.0 head :: Monad m => SerialT m a -> m (Maybe a) head m = let stop = return Nothing single a = return (Just a) yield a _ = return (Just a) in (S.runStream (toStream m)) Nothing stop single yield -- | Extract all but the first element of the stream, if any. -- -- @since 0.1.1 tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) tail m = let stop = return Nothing single _ = return $ Just nil yield _ r = return $ Just $ fromStream r in (S.runStream (toStream m)) Nothing stop single yield -- | Extract the last element of the stream, if any. -- -- @since 0.1.1 {-# INLINE last #-} last :: Monad m => SerialT m a -> m (Maybe a) last = foldl (\_ y -> Just y) Nothing id -- | Determine whether the stream is empty. -- -- @since 0.1.1 null :: Monad m => SerialT m a -> m Bool null m = let stop = return True single _ = return False yield _ _ = return False in (S.runStream (toStream m)) Nothing stop single yield -- | Determine whether an element is present in the stream. -- -- @since 0.1.0 elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool elem e m = go (toStream m) where go m1 = let stop = return False single a = return (a == e) yield a r = if a == e then return True else go r in (S.runStream m1) Nothing stop single yield -- | Determine whether an element is not present in the stream. -- -- @since 0.1.0 notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool notElem e m = go (toStream m) where go m1 = let stop = return True single a = return (a /= e) yield a r = if a == e then return False else go r in (S.runStream m1) Nothing stop single yield -- | Determine the length of the stream. -- -- @since 0.1.0 length :: Monad m => SerialT m a -> m Int length = foldl (\n _ -> n + 1) 0 id -- | Returns the elements of the stream in reverse order. -- The stream must be finite. -- -- @since 0.1.1 reverse :: (IsStream t) => t m a -> t m a reverse m = fromStream $ go S.nil (toStream m) where go rev rest = Stream $ \svr stp sng yld -> let run x = S.runStream x svr stp sng yld stop = run rev single a = run $ a `S.cons` rev yield a r = run $ go (a `S.cons` rev) r in S.runStream rest svr stop single yield -- XXX replace the recursive "go" with continuation -- | Determine the minimum element in a stream. -- -- @since 0.1.0 minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) minimum m = go Nothing (toStream m) where go res m1 = let stop = return res single a = return $ min_ a res yield a r = go (min_ a res) r in (S.runStream m1) Nothing stop single yield min_ a res = case res of Nothing -> Just a Just e -> Just $ min a e -- XXX replace the recursive "go" with continuation -- | Determine the maximum element in a stream. -- -- @since 0.1.0 maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) maximum m = go Nothing (toStream m) where go res m1 = let stop = return res single a = return $ max_ a res yield a r = go (max_ a res) r in (S.runStream m1) Nothing stop single yield max_ a res = case res of Nothing -> Just a Just e -> Just $ max a e ------------------------------------------------------------------------------ -- Transformation ------------------------------------------------------------------------------ -- XXX Parallel variants of these? mapMWith et al. sequenceWith. -- | Replace each element of the stream with the result of a monadic action -- applied on the element. -- -- @since 0.1.0 {-# INLINE mapM #-} mapM :: (IsStream t, Monad m) => (a -> m b) -> t m a -> t m b mapM f m = fromStream $ go (toStream m) where go m1 = Stream $ \_ stp sng yld -> let single a = f a >>= sng yield a r = f a >>= \b -> yld b (go r) in (S.runStream m1) Nothing stp single yield -- | Apply a monadic action to each element of the stream and discard the -- output of the action. -- -- @since 0.1.0 mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m () mapM_ f m = go (toStream m) where go m1 = let stop = return () single a = void (f a) yield a r = f a >> go r in (S.runStream m1) Nothing stop single yield -- | Reduce a stream of monadic actions to a stream of the output of those -- actions. -- -- @since 0.1.0 sequence :: (IsStream t, Monad m) => t m (m a) -> t m a sequence m = fromStream $ go (toStream m) where go m1 = Stream $ \_ stp sng yld -> let single ma = ma >>= sng yield ma r = ma >>= \b -> yld b (go r) in (S.runStream m1) Nothing stp single yield ------------------------------------------------------------------------------ -- Serially Zipping Streams ------------------------------------------------------------------------------ -- | Zip two streams serially using a pure zipping function. -- -- @since 0.1.0 zipWith :: IsStream t => (a -> b -> c) -> t m a -> t m b -> t m c zipWith f m1 m2 = fromStream $ S.zipWith f (toStream m1) (toStream m2) -- | Zip two streams serially using a monadic zipping function. -- -- @since 0.1.0 zipWithM :: IsStream t => (a -> b -> t m c) -> t m a -> t m b -> t m c zipWithM f m1 m2 = fromStream $ go (toStream m1) (toStream m2) where go mx my = Stream $ \_ stp sng yld -> do let merge a ra = let run x = S.runStream x Nothing stp sng yld single2 b = run $ toStream (f a b) yield2 b rb = run $ toStream (f a b) <> go ra rb in (S.runStream my) Nothing stp single2 yield2 let single1 a = merge a S.nil yield1 a ra = merge a ra (S.runStream mx) Nothing stp single1 yield1 ------------------------------------------------------------------------------ -- Parallely Zipping Streams ------------------------------------------------------------------------------ -- | Zip two streams concurrently (i.e. both the elements being zipped are -- generated concurrently) using a pure zipping function. -- -- @since 0.1.0 zipAsyncWith :: (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c zipAsyncWith f m1 m2 = fromStream $ S.zipAsyncWith f (toStream m1) (toStream m2) -- | Zip two streams asyncly (i.e. both the elements being zipped are generated -- concurrently) using a monadic zipping function. -- -- @since 0.1.0 zipAsyncWithM :: (IsStream t, MonadAsync m) => (a -> b -> t m c) -> t m a -> t m b -> t m c zipAsyncWithM f m1 m2 = fromStream $ Stream $ \_ stp sng yld -> do ma <- mkAsync m1 mb <- mkAsync m2 (S.runStream (toStream (zipWithM f ma mb))) Nothing stp sng yld