{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE UnboxedTuples #-}
{-# LANGUAGE UndecidableInstances #-}
module Streamly.Streams.StreamK
(
IsStream (..)
, adapt
, Stream (..)
, mkStream
, nil
, cons
, (.:)
, nilK
, yieldK
, consK
, uncons
, unfoldr
, unfoldrM
, repeat
, yield
, yieldM
, fromFoldable
, fromList
, fromStreamK
, foldStream
, foldr
, foldrM
, foldl'
, foldlM'
, foldx
, foldxM
, runStream
, null
, head
, tail
, elem
, notElem
, all
, any
, last
, minimum
, maximum
, mapM_
, toList
, toStreamK
, scanl'
, scanx
, filter
, take
, takeWhile
, drop
, dropWhile
, map
, mapM
, sequence
, mapMaybe
, zipWith
, zipWithM
, serial
, consMSerial
, bindWith
, withLocal
, Streaming
, once
)
where
import Control.Monad (void)
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Semigroup (Semigroup(..))
import Prelude
hiding (foldl, foldr, last, map, mapM, mapM_, repeat, sequence,
take, filter, all, any, takeWhile, drop, dropWhile, minimum,
maximum, elem, notElem, null, head, tail, zipWith)
import qualified Prelude
import Streamly.SVar
newtype Stream m a =
Stream {
unStream :: forall r.
State Stream m a
-> m r
-> (a -> m r)
-> (a -> Stream m a -> m r)
-> m r
}
infixr 5 `consM`
infixr 5 |:
class IsStream t where
toStream :: t m a -> Stream m a
fromStream :: Stream m a -> t m a
consM :: MonadAsync m => m a -> t m a -> t m a
(|:) :: MonadAsync m => m a -> t m a -> t m a
{-# DEPRECATED Streaming "Please use IsStream instead." #-}
type Streaming = IsStream
adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
adapt = fromStream . toStream
mkStream:: IsStream t
=> (forall r. State Stream m a
-> m r
-> (a -> m r)
-> (a -> t m a -> m r)
-> m r)
-> t m a
mkStream k = fromStream $ Stream $ \st stp sng yld ->
let yieldk a r = yld a (toStream r)
in k (rstState st) stp sng yieldk
nil :: IsStream t => t m a
nil = fromStream $ Stream $ \_ stp _ _ -> stp
infixr 5 `cons`
cons :: IsStream t => a -> t m a -> t m a
cons a r = fromStream $ Stream $ \_ _ _ yld -> yld a (toStream r)
infixr 5 .:
(.:) :: IsStream t => a -> t m a -> t m a
(.:) = cons
{-# INLINE consMSerial #-}
consMSerial :: (Monad m) => m a -> Stream m a -> Stream m a
consMSerial m r = Stream $ \_ _ _ yld -> m >>= \a -> yld a r
nilK :: IsStream t => (forall r. m r -> m r) -> t m a
nilK k = fromStream $ Stream $ \_ stp _ _ -> k stp
yieldK :: IsStream t => (forall r. (a -> m r) -> m r) -> t m a
yieldK k = fromStream $ Stream $ \_ _ sng _ -> k sng
consK :: IsStream t => (forall r. (a -> m r) -> m r) -> t m a -> t m a
consK k r = fromStream $ Stream $ \_ _ _ yld -> k (\x -> yld x (toStream r))
instance IsStream Stream where
toStream = id
fromStream = id
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> Stream IO a -> Stream IO a #-}
consM :: Monad m => m a -> Stream m a -> Stream m a
consM = consMSerial
{-# INLINE (|:) #-}
{-# SPECIALIZE (|:) :: IO a -> Stream IO a -> Stream IO a #-}
(|:) :: Monad m => m a -> Stream m a -> Stream m a
(|:) = consMSerial
{-# INLINE uncons #-}
uncons :: (IsStream t, Monad m) => t m a -> m (Maybe (a, t m a))
uncons m =
let stop = return Nothing
single a = return (Just (a, nil))
yieldk a r = return (Just (a, fromStream r))
in (unStream (toStream m)) defState stop single yieldk
{-# INLINE unfoldr #-}
unfoldr :: IsStream t => (b -> Maybe (a, b)) -> b -> t m a
unfoldr step = fromStream . go
where
go s = Stream $ \_ stp _ yld ->
case step s of
Nothing -> stp
Just (a, b) -> yld a (go b)
{-# INLINE unfoldrM #-}
unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a
unfoldrM step = go
where
go s = fromStream $ Stream $ \svr stp sng yld -> do
mayb <- step s
case mayb of
Nothing -> stp
Just (a, b) ->
unStream (toStream (return a |: go b)) svr stp sng yld
yield :: IsStream t => a -> t m a
yield a = fromStream $ Stream $ \_ _ single _ -> single a
{-# INLINE yieldM #-}
yieldM :: (Monad m, IsStream t) => m a -> t m a
yieldM m = fromStream $ Stream $ \_ _ single _ -> m >>= single
{-# DEPRECATED once "Please use yieldM instead." #-}
{-# INLINE once #-}
once :: (Monad m, IsStream t) => m a -> t m a
once = yieldM
repeat :: IsStream t => a -> t m a
repeat a = let x = cons a x in x
{-# INLINE fromFoldable #-}
fromFoldable :: (IsStream t, Foldable f) => f a -> t m a
fromFoldable = Prelude.foldr cons nil
{-# INLINE fromList #-}
fromList :: IsStream t => [a] -> t m a
fromList = fromFoldable
{-# INLINE fromStreamK #-}
fromStreamK :: Stream m a -> Stream m a
fromStreamK = id
foldStream
:: IsStream t
=> State Stream m a
-> m r
-> (a -> m r)
-> (a -> t m a -> m r)
-> t m a
-> m r
foldStream st blank single step m =
let yieldk a x = step a (fromStream x)
in (unStream (toStream m)) st blank single yieldk
foldr :: (IsStream t, Monad m) => (a -> b -> b) -> b -> t m a -> m b
foldr step acc m = go (toStream m)
where
go m1 =
let stop = return acc
single a = return (step a acc)
yieldk a r = go r >>= \b -> return (step a b)
in (unStream m1) defState stop single yieldk
{-# INLINE foldrM #-}
foldrM :: (IsStream t, Monad m) => (a -> b -> m b) -> b -> t m a -> m b
foldrM step acc m = go (toStream m)
where
go m1 =
let stop = return acc
single a = step a acc
yieldk a r = go r >>= step a
in (unStream m1) defState stop single yieldk
{-# INLINE foldx #-}
foldx :: (IsStream t, Monad m)
=> (x -> a -> x) -> x -> (x -> b) -> t m a -> m b
foldx step begin done m = get $ go (toStream m) begin
where
{-# NOINLINE get #-}
get m1 =
let single = return . done
in (unStream m1) undefined undefined single undefined
go m1 !acc = Stream $ \_ _ sng yld ->
let stop = sng acc
single a = sng $ step acc a
yieldk a r =
let stream = go r (step acc a)
in (unStream stream) defState undefined sng yld
in (unStream m1) defState stop single yieldk
{-# INLINE foldl' #-}
foldl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> m b
foldl' step begin m = foldx step begin id m
foldxM :: (IsStream t, Monad m)
=> (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b
foldxM step begin done m = go begin (toStream m)
where
go !acc m1 =
let stop = acc >>= done
single a = acc >>= \b -> step b a >>= done
yieldk a r = acc >>= \b -> step b a >>= \x -> go (return x) r
in (unStream m1) defState stop single yieldk
foldlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> m b
foldlM' step begin m = foldxM step (return begin) return m
{-# INLINE runStream #-}
runStream :: (Monad m, IsStream t) => t m a -> m ()
runStream m = go (toStream m)
where
go m1 =
let stop = return ()
single _ = return ()
yieldk _ r = go (toStream r)
in unStream m1 defState stop single yieldk
{-# INLINE null #-}
null :: (IsStream t, Monad m) => t m a -> m Bool
null m =
let stop = return True
single _ = return False
yieldk _ _ = return False
in unStream (toStream m) defState stop single yieldk
{-# INLINE head #-}
head :: (IsStream t, Monad m) => t m a -> m (Maybe a)
head m =
let stop = return Nothing
single a = return (Just a)
yieldk a _ = return (Just a)
in unStream (toStream m) defState stop single yieldk
{-# INLINE tail #-}
tail :: (IsStream t, Monad m) => t m a -> m (Maybe (t m a))
tail m =
let stop = return Nothing
single _ = return $ Just nil
yieldk _ r = return $ Just $ fromStream r
in unStream (toStream m) defState stop single yieldk
{-# INLINE elem #-}
elem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool
elem e m = go (toStream m)
where
go m1 =
let stop = return False
single a = return (a == e)
yieldk a r = if a == e then return True else go r
in (unStream m1) defState stop single yieldk
{-# INLINE notElem #-}
notElem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool
notElem e m = go (toStream m)
where
go m1 =
let stop = return True
single a = return (a /= e)
yieldk a r = if a == e then return False else go r
in (unStream m1) defState stop single yieldk
all :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool
all p m = go (toStream m)
where
go m1 =
let single a | p a = return True
| otherwise = return False
yieldk a r | p a = go r
| otherwise = return False
in unStream m1 defState (return True) single yieldk
any :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool
any p m = go (toStream m)
where
go m1 =
let single a | p a = return True
| otherwise = return False
yieldk a r | p a = return True
| otherwise = go r
in unStream m1 defState (return False) single yieldk
{-# INLINE last #-}
last :: (IsStream t, Monad m) => t m a -> m (Maybe a)
last = foldx (\_ y -> Just y) Nothing id
{-# INLINE minimum #-}
minimum :: (IsStream t, Monad m, Ord a) => t m a -> m (Maybe a)
minimum m = go Nothing (toStream m)
where
go Nothing m1 =
let stop = return Nothing
single a = return (Just a)
yieldk a r = go (Just a) r
in unStream m1 defState stop single yieldk
go (Just res) m1 =
let stop = return (Just res)
single a =
if res <= a
then return (Just res)
else return (Just a)
yieldk a r =
if res <= a
then go (Just res) r
else go (Just a) r
in unStream m1 defState stop single yieldk
{-# INLINE maximum #-}
maximum :: (IsStream t, Monad m, Ord a) => t m a -> m (Maybe a)
maximum m = go Nothing (toStream m)
where
go Nothing m1 =
let stop = return Nothing
single a = return (Just a)
yieldk a r = go (Just a) r
in unStream m1 defState stop single yieldk
go (Just res) m1 =
let stop = return (Just res)
single a =
if res <= a
then return (Just a)
else return (Just res)
yieldk a r =
if res <= a
then go (Just a) r
else go (Just res) r
in unStream m1 defState stop single yieldk
mapM_ :: (IsStream t, Monad m) => (a -> m b) -> t m a -> m ()
mapM_ f m = go (toStream m)
where
go m1 =
let stop = return ()
single a = void (f a)
yieldk a r = f a >> go r
in (unStream m1) defState stop single yieldk
{-# INLINABLE toList #-}
toList :: (IsStream t, Monad m) => t m a -> m [a]
toList = foldr (:) []
{-# INLINE toStreamK #-}
toStreamK :: Stream m a -> Stream m a
toStreamK = id
{-# INLINE scanx #-}
scanx :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
scanx step begin done m =
cons (done begin) $ fromStream $ go (toStream m) begin
where
go m1 !acc = Stream $ \st stp sng yld ->
let single a = sng (done $ step acc a)
yieldk a r =
let s = step acc a
in yld (done s) (go r s)
in unStream m1 (rstState st) stp single yieldk
{-# INLINE scanl' #-}
scanl' :: IsStream t => (b -> a -> b) -> b -> t m a -> t m b
scanl' step begin m = scanx step begin id m
{-# INLINE filter #-}
filter :: IsStream t => (a -> Bool) -> t m a -> t m a
filter p m = fromStream $ go (toStream m)
where
go m1 = Stream $ \st stp sng yld ->
let single a | p a = sng a
| otherwise = stp
yieldk a r | p a = yld a (go r)
| otherwise = (unStream r) (rstState st) stp single yieldk
in unStream m1 (rstState st) stp single yieldk
{-# INLINE take #-}
take :: IsStream t => Int -> t m a -> t m a
take n m = fromStream $ go n (toStream m)
where
go n1 m1 = Stream $ \st stp sng yld ->
let yieldk a r = yld a (go (n1 - 1) r)
in if n1 <= 0
then stp
else unStream m1 (rstState st) stp sng yieldk
{-# INLINE takeWhile #-}
takeWhile :: IsStream t => (a -> Bool) -> t m a -> t m a
takeWhile p m = fromStream $ go (toStream m)
where
go m1 = Stream $ \st stp sng yld ->
let single a | p a = sng a
| otherwise = stp
yieldk a r | p a = yld a (go r)
| otherwise = stp
in unStream m1 (rstState st) stp single yieldk
drop :: IsStream t => Int -> t m a -> t m a
drop n m = fromStream $ Stream $ \st stp sng yld ->
unStream (go n (toStream m)) (rstState st) stp sng yld
where
go n1 m1 = Stream $ \st stp sng yld ->
let single _ = stp
yieldk _ r = (unStream $ go (n1 - 1) r) st stp sng yld
in if n1 <= 0
then unStream m1 st stp sng yld
else unStream m1 st stp single yieldk
{-# INLINE dropWhile #-}
dropWhile :: IsStream t => (a -> Bool) -> t m a -> t m a
dropWhile p m = fromStream $ go (toStream m)
where
go m1 = Stream $ \st stp sng yld ->
let single a | p a = stp
| otherwise = sng a
yieldk a r | p a = (unStream r) (rstState st) stp single yieldk
| otherwise = yld a r
in unStream m1 (rstState st) stp single yieldk
{-# INLINE map #-}
map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b
map f m = fromStream $ Stream $ \st stp sng yld ->
let single = sng . f
yieldk a r = yld (f a) (fmap f r)
in unStream (toStream m) (rstState st) stp single yieldk
{-# INLINE mapM #-}
mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b
mapM f m = go (toStream m)
where
go m1 = fromStream $ Stream $ \st stp sng yld ->
let single a = f a >>= sng
yieldk a r = unStream (toStream (f a |: (go r))) st stp sng yld
in (unStream m1) (rstState st) stp single yieldk
{-# INLINE sequence #-}
sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a
sequence m = go (toStream m)
where
go m1 = fromStream $ Stream $ \st stp sng yld ->
let single ma = ma >>= sng
yieldk ma r = unStream (toStream $ ma |: go r) st stp sng yld
in (unStream m1) (rstState st) stp single yieldk
{-# INLINE mapMaybe #-}
mapMaybe :: IsStream t => (a -> Maybe b) -> t m a -> t m b
mapMaybe f m = go (toStream m)
where
go m1 = fromStream $ Stream $ \st stp sng yld ->
let single a = case f a of
Just b -> sng b
Nothing -> stp
yieldk a r = case f a of
Just b -> yld b (toStream $ go r)
Nothing -> (unStream r) (rstState st) stp single yieldk
in unStream m1 (rstState st) stp single yieldk
{-# INLINE zipWithS #-}
zipWithS :: (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipWithS f m1 m2 = go m1 m2
where
go mx my = Stream $ \st stp sng yld -> do
let merge a ra =
let single2 b = sng (f a b)
yield2 b rb = yld (f a b) (go ra rb)
in unStream my (rstState st) stp single2 yield2
let single1 a = merge a nil
yield1 a ra = merge a ra
unStream mx (rstState st) stp single1 yield1
{-# INLINABLE zipWith #-}
zipWith :: IsStream t => (a -> b -> c) -> t m a -> t m b -> t m c
zipWith f m1 m2 = fromStream $ zipWithS f (toStream m1) (toStream m2)
zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c
zipWithM f m1 m2 = fromStream $ go (toStream m1) (toStream m2)
where
go mx my = Stream $ \st stp sng yld -> do
let merge a ra =
let runIt x = unStream x (rstState st) stp sng yld
single2 b = f a b >>= sng
yield2 b rb = f a b >>= \x -> runIt (x `cons` go ra rb)
in unStream my (rstState st) stp single2 yield2
let single1 a = merge a nil
yield1 a ra = merge a ra
unStream mx (rstState st) stp single1 yield1
{-# INLINE serial #-}
serial :: Stream m a -> Stream m a -> Stream m a
serial m1 m2 = go m1
where
go (Stream m) = Stream $ \st stp sng yld ->
let stop = (unStream m2) (rstState st) stp sng yld
single a = yld a m2
yieldk a r = yld a (go r)
in m (rstState st) stop single yieldk
instance Semigroup (Stream m a) where
(<>) = serial
instance Monoid (Stream m a) where
mempty = nil
mappend = (<>)
instance Monad m => Functor (Stream m) where
fmap = map
{-# INLINE bindWith #-}
bindWith
:: (forall c. Stream m c -> Stream m c -> Stream m c)
-> Stream m a
-> (a -> Stream m b)
-> Stream m b
bindWith par m f = go m
where
go (Stream g) =
Stream $ \st stp sng yld ->
let run x = (unStream x) st stp sng yld
single a = run $ f a
yieldk a r = run $ f a `par` go r
in g (rstState st) stp single yieldk
_alt :: Stream m a -> Stream m a -> Stream m a
_alt m1 m2 = Stream $ \st stp sng yld ->
let stop = unStream m2 (rstState st) stp sng yld
in unStream m1 (rstState st) stop sng yld
withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a
withLocal f m =
Stream $ \st stp sng yld ->
let single = local f . sng
yieldk a r = local f $ yld a (withLocal f r)
in (unStream m) (rstState st) (local f stp) single yieldk
instance MonadTrans Stream where
lift = yieldM