{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving#-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-}
module Streamly.Prelude
(
cons
, nil
, unfoldr
, unfoldrM
, each
, fromHandle
, foldr
, foldrM
, foldl
, foldlM
, uncons
, toList
, toHandle
, all
, any
, sum
, product
, head
, last
, length
, elem
, notElem
, maximum
, minimum
, filter
, take
, takeWhile
, drop
, dropWhile
, mapM
, mapM_
, sequence
, zipWith
, zipWithM
, zipAsyncWith
, zipAsyncWithM
)
where
import Control.Monad (liftM)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Semigroup (Semigroup(..))
import Prelude hiding (filter, drop, dropWhile, take,
takeWhile, zipWith, foldr, foldl,
mapM, mapM_, sequence, all, any,
sum, product, elem, notElem,
maximum, minimum, head, last,
length)
import qualified Prelude as Prelude
import qualified System.IO as IO
import Streamly.Core
import Streamly.Streams
unfoldr :: Streaming t => (b -> Maybe (a, b)) -> b -> t m a
unfoldr step = fromStream . go
where
go s = Stream $ \_ stp yld -> do
case step s of
Nothing -> stp
Just (a, b) -> yld a (Just (go b))
unfoldrM :: (Streaming t, Monad m) => (b -> m (Maybe (a, b))) -> b -> t m a
unfoldrM step = fromStream . go
where
go s = Stream $ \_ stp yld -> do
mayb <- step s
case mayb of
Nothing -> stp
Just (a, b) -> yld a (Just (go b))
{-# INLINE each #-}
each :: (Foldable f, Streaming t) => f a -> t m a
each xs = Prelude.foldr cons nil xs
fromHandle :: (MonadIO m, Streaming t) => IO.Handle -> t m String
fromHandle h = fromStream $ go
where
go = Stream $ \_ stp yld -> do
eof <- liftIO $ IO.hIsEOF h
if eof
then stp
else do
str <- liftIO $ IO.hGetLine h
yld str (Just go)
foldr :: (Monad m, Streaming t) => (a -> b -> b) -> b -> t m a -> m b
foldr step acc m = go (toStream m)
where
go m1 =
let stop = return acc
yield a Nothing = return (step a acc)
yield a (Just x) = go x >>= \b -> return (step a b)
in (runStream m1) Nothing stop yield
{-# INLINE foldrM #-}
foldrM :: Streaming t => (a -> m b -> m b) -> m b -> t m a -> m b
foldrM step acc m = go (toStream m)
where
go m1 =
let stop = acc
yield a Nothing = step a acc
yield a (Just x) = step a (go x)
in (runStream m1) Nothing stop yield
foldl :: (Monad m, Streaming t)
=> (x -> a -> x) -> x -> (x -> b) -> t m a -> m b
foldl step begin done m = go begin (toStream m)
where
go !acc m1 =
let stop = return (done acc)
yield a Nothing = return (done (step acc a))
yield a (Just x) = go (step acc a) x
in (runStream m1) Nothing stop yield
foldlM :: (Monad m, Streaming t)
=> (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b
foldlM step begin done m = go begin (toStream m)
where
go !acc m1 =
let stop = acc >>= done
yield a Nothing = acc >>= \b -> step b a >>= done
yield a (Just x) = acc >>= \b -> go (step b a) x
in (runStream m1) Nothing stop yield
uncons :: (Streaming t, Monad m) => t m a -> m (Maybe (a, t m a))
uncons m =
let stop = return Nothing
yield a Nothing = return (Just (a, nil))
yield a (Just x) = return (Just (a, (fromStream x)))
in (runStream (toStream m)) Nothing stop yield
toHandle :: (Streaming t, MonadIO m) => IO.Handle -> t m String -> m ()
toHandle h m = go (toStream m)
where
go m1 =
let stop = return ()
yield a Nothing = liftIO (IO.hPutStrLn h a)
yield a (Just x) = liftIO (IO.hPutStrLn h a) >> go x
in (runStream m1) Nothing stop yield
{-# INLINABLE toList #-}
toList :: (Monad m, Streaming t) => t m a -> m [a]
toList = foldrM (\a xs -> liftM (a :) xs) (return [])
take :: Streaming t => Int -> t m a -> t m a
take n m = fromStream $ go n (toStream m)
where
go n1 m1 = Stream $ \ctx stp yld -> do
let yield a Nothing = yld a Nothing
yield a (Just x) = yld a (Just (go (n1 - 1) x))
if (n1 <= 0)
then stp
else (runStream m1) ctx stp yield
{-# INLINE filter #-}
filter :: (Streaming t, Monad (t m)) => (a -> Bool) -> t m a -> t m a
filter p m = m >>= \x -> if p x then return x else nil
takeWhile :: Streaming t => (a -> Bool) -> t m a -> t m a
takeWhile p m = fromStream $ go (toStream m)
where
go m1 = Stream $ \ctx stp yld -> do
let yield a Nothing | p a = yld a Nothing
| otherwise = stp
yield a (Just x) | p a = yld a (Just (go x))
| otherwise = stp
in (runStream m1) ctx stp yield
drop :: Streaming t => Int -> t m a -> t m a
drop n m = fromStream $ go n (toStream m)
where
go n1 m1 = Stream $ \ctx stp yld -> do
let yield _ Nothing = stp
yield _ (Just x) = (runStream $ go (n1 - 1) x) ctx stp yld
if (n1 <= 0)
then (runStream m1) ctx stp yld
else (runStream m1) ctx stp yield
dropWhile :: Streaming t => (a -> Bool) -> t m a -> t m a
dropWhile p m = fromStream $ go (toStream m)
where
go m1 = Stream $ \ctx stp yld -> do
let yield a Nothing | p a = stp
| otherwise = yld a Nothing
yield a (Just x) | p a = (runStream (go x)) ctx stp yield
| otherwise = yld a (Just x)
in (runStream m1) ctx stp yield
all :: (Streaming t, Monad m) => (a -> Bool) -> t m a -> m Bool
all p m = go (toStream m)
where
go m1 =
let yield a Nothing | p a = return True
| otherwise = return False
yield a (Just x) | p a = go x
| otherwise = return False
in (runStream m1) Nothing (return True) yield
any :: (Streaming t, Monad m) => (a -> Bool) -> t m a -> m Bool
any p m = go (toStream m)
where
go m1 =
let yield a Nothing | p a = return True
| otherwise = return False
yield a (Just x) | p a = return True
| otherwise = go x
in (runStream m1) Nothing (return False) yield
sum :: (Streaming t, Monad m, Num a) => t m a -> m a
sum = foldl (+) 0 id
product :: (Streaming t, Monad m, Num a) => t m a -> m a
product = foldl (*) 0 id
head :: (Streaming t, Monad m) => t m a -> m (Maybe a)
head m =
let stop = return Nothing
yield a _ = return (Just a)
in (runStream (toStream m)) Nothing stop yield
last :: (Streaming t, Monad m) => t m a -> m (Maybe a)
last m = go (toStream m)
where
go m1 =
let stop = return Nothing
yield a Nothing = return (Just a)
yield _ (Just x) = go x
in (runStream m1) Nothing stop yield
elem :: (Streaming t, Monad m, Eq a) => a -> t m a -> m Bool
elem e m = go (toStream m)
where
go m1 =
let stop = return False
yield a Nothing = return (a == e)
yield a (Just x) = if (a == e) then return True else go x
in (runStream m1) Nothing stop yield
notElem :: (Streaming t, Monad m, Eq a) => a -> t m a -> m Bool
notElem e m = go (toStream m)
where
go m1 =
let stop = return True
yield a Nothing = return (a /= e)
yield a (Just x) = if (a == e) then return False else go x
in (runStream m1) Nothing stop yield
length :: (Streaming t, Monad m) => t m a -> m Int
length = foldl (\n _ -> n + 1) 0 id
minimum :: (Streaming t, Monad m, Ord a) => t m a -> m (Maybe a)
minimum m = go Nothing (toStream m)
where
go r m1 =
let stop = return r
yield a Nothing = return $ min_ a r
yield a (Just x) = go (min_ a r) x
in (runStream m1) Nothing stop yield
min_ a r = case r of
Nothing -> Just a
Just e -> Just $ min a e
maximum :: (Streaming t, Monad m, Ord a) => t m a -> m (Maybe a)
maximum m = go Nothing (toStream m)
where
go r m1 =
let stop = return r
yield a Nothing = return $ max_ a r
yield a (Just x) = go (max_ a r) x
in (runStream m1) Nothing stop yield
max_ a r = case r of
Nothing -> Just a
Just e -> Just $ max a e
mapM :: (Streaming t, Monad m) => (a -> m b) -> t m a -> t m b
mapM f m = fromStream $ go (toStream m)
where
go m1 = Stream $ \_ stp yld -> do
let stop = stp
yield a Nothing = f a >>= \b -> yld b Nothing
yield a (Just x) = f a >>= \b -> yld b (Just (go x))
in (runStream m1) Nothing stop yield
mapM_ :: (Streaming t, Monad m) => (a -> m b) -> t m a -> m ()
mapM_ f m = go (toStream m)
where
go m1 =
let stop = return ()
yield a Nothing = f a >> return ()
yield a (Just x) = f a >> go x
in (runStream m1) Nothing stop yield
sequence :: (Streaming t, Monad m) => t m (m a) -> t m a
sequence m = fromStream $ go (toStream m)
where
go m1 = Stream $ \_ stp yld -> do
let stop = stp
yield a Nothing = a >>= \b -> yld b Nothing
yield a (Just x) = a >>= \b -> yld b (Just (go x))
in (runStream m1) Nothing stop yield
zipWithM :: Streaming t => (a -> b -> t m c) -> t m a -> t m b -> t m c
zipWithM f m1 m2 = fromStream $ go (toStream m1) (toStream m2)
where
go mx my = Stream $ \_ stp yld -> do
let merge a ra =
let yield2 b Nothing = (runStream (g a b)) Nothing stp yld
yield2 b (Just rb) =
(runStream ((g a b) <> (go ra rb))) Nothing stp yld
in (runStream my) Nothing stp yield2
let yield1 a Nothing = merge a snil
yield1 a (Just ra) = merge a ra
(runStream mx) Nothing stp yield1
g a b = toStream $ f a b
zipAsyncWithM :: (Streaming t, MonadAsync m)
=> (a -> b -> t m c) -> t m a -> t m b -> t m c
zipAsyncWithM f m1 m2 = fromStream $ Stream $ \_ stp yld -> do
ma <- async m1
mb <- async m2
(runStream (toStream (zipWithM f ma mb))) Nothing stp yld