{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE RankNTypes #-}
-- -----------------------------------------------------------------------------
--
-- (c) The University of Glasgow 2012
--
-- -----------------------------------------------------------------------------

-- | Monadic streams
module GHC.Data.Stream (
    Stream(..), StreamS(..), runStream, yield, liftIO,
    collect,  consume, fromList,
    map, mapM, mapAccumL_
  ) where

import GHC.Prelude hiding (map,mapM)

import Control.Monad hiding (mapM)
import Control.Monad.IO.Class

-- |
-- @Stream m a b@ is a computation in some Monad @m@ that delivers a sequence
-- of elements of type @a@ followed by a result of type @b@.
--
-- More concretely, a value of type @Stream m a b@ can be run using @runStreamInternal@
-- in the Monad @m@, and it delivers either
--
--  * the final result: @Done b@, or
--  * @Yield a str@ where @a@ is the next element in the stream, and @str@
--     is the rest of the stream
--  * @Effect mstr@ where @mstr@ is some action running in @m@ which
--  generates the rest of the stream.
--
-- Stream is itself a Monad, and provides an operation 'yield' that
-- produces a new element of the stream.  This makes it convenient to turn
-- existing monadic computations into streams.
--
-- The idea is that Stream is useful for making a monadic computation
-- that produces values from time to time.  This can be used for
-- knitting together two complex monadic operations, so that the
-- producer does not have to produce all its values before the
-- consumer starts consuming them.  We make the producer into a
-- Stream, and the consumer pulls on the stream each time it wants a
-- new value.
--
-- 'Stream' is implemented in the "yoneda" style for efficiency. By
-- representing a stream in this manner 'fmap' and '>>=' operations are
-- accumulated in the function parameters before being applied once when
-- the stream is destroyed. In the old implementation each usage of 'mapM'
-- and '>>=' would traverse the entire stream in order to apply the
-- substitution at the leaves.
--
-- The >>= operation for 'Stream' was a hot-spot in the ticky profile for
-- the "ManyConstructors" test which called the 'cg' function many times in
-- @StgToCmm.hs@
--
newtype Stream m a b =
          Stream { Stream m a b
-> forall r' r.
   (a -> m r') -> (b -> StreamS m r' r) -> StreamS m r' r
runStreamInternal :: forall r' r .
                                        (a -> m r') -- For fusing calls to `map` and `mapM`
                                     -> (b -> StreamS m r' r)  -- For fusing `>>=`
                                     -> StreamS m r' r }

runStream :: Applicative m => Stream m r' r -> StreamS m r' r
runStream :: Stream m r' r -> StreamS m r' r
runStream Stream m r' r
st = Stream m r' r
-> (r' -> m r') -> (r -> StreamS m r' r) -> StreamS m r' r
forall (m :: * -> *) a b.
Stream m a b
-> forall r' r.
   (a -> m r') -> (b -> StreamS m r' r) -> StreamS m r' r
runStreamInternal Stream m r' r
st r' -> m r'
forall (f :: * -> *) a. Applicative f => a -> f a
pure r -> StreamS m r' r
forall (m :: * -> *) a b. b -> StreamS m a b
Done

data StreamS m a b = Yield a (StreamS m a b)
                   | Done b
                   | Effect (m (StreamS m a b))

instance Monad m => Functor (StreamS m a) where
  fmap :: (a -> b) -> StreamS m a a -> StreamS m a b
fmap = (a -> b) -> StreamS m a a -> StreamS m a b
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM

instance Monad m => Applicative (StreamS m a) where
  pure :: a -> StreamS m a a
pure = a -> StreamS m a a
forall (m :: * -> *) a b. b -> StreamS m a b
Done
  <*> :: StreamS m a (a -> b) -> StreamS m a a -> StreamS m a b
(<*>) = StreamS m a (a -> b) -> StreamS m a a -> StreamS m a b
forall (m :: * -> *) a b. Monad m => m (a -> b) -> m a -> m b
ap

instance Monad m => Monad (StreamS m a) where
  StreamS m a a
a >>= :: StreamS m a a -> (a -> StreamS m a b) -> StreamS m a b
>>= a -> StreamS m a b
k = case StreamS m a a
a of
                      Done a
r -> a -> StreamS m a b
k a
r
                      Yield a
a StreamS m a a
s -> a -> StreamS m a b -> StreamS m a b
forall (m :: * -> *) a b. a -> StreamS m a b -> StreamS m a b
Yield a
a (StreamS m a a
s StreamS m a a -> (a -> StreamS m a b) -> StreamS m a b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> StreamS m a b
k)
                      Effect m (StreamS m a a)
m -> m (StreamS m a b) -> StreamS m a b
forall (m :: * -> *) a b. m (StreamS m a b) -> StreamS m a b
Effect ((StreamS m a a -> StreamS m a b)
-> m (StreamS m a a) -> m (StreamS m a b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (StreamS m a a -> (a -> StreamS m a b) -> StreamS m a b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> StreamS m a b
k) m (StreamS m a a)
m)

instance Functor (Stream f a) where
  fmap :: (a -> b) -> Stream f a a -> Stream f a b
fmap = (a -> b) -> Stream f a a -> Stream f a b
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM

instance Applicative (Stream m a) where
  pure :: a -> Stream m a a
pure a
a = (forall r' r.
 (a -> m r') -> (a -> StreamS m r' r) -> StreamS m r' r)
-> Stream m a a
forall (m :: * -> *) a b.
(forall r' r.
 (a -> m r') -> (b -> StreamS m r' r) -> StreamS m r' r)
-> Stream m a b
Stream ((forall r' r.
  (a -> m r') -> (a -> StreamS m r' r) -> StreamS m r' r)
 -> Stream m a a)
-> (forall r' r.
    (a -> m r') -> (a -> StreamS m r' r) -> StreamS m r' r)
-> Stream m a a
forall a b. (a -> b) -> a -> b
$ \a -> m r'
_f a -> StreamS m r' r
g -> a -> StreamS m r' r
g a
a
  <*> :: Stream m a (a -> b) -> Stream m a a -> Stream m a b
(<*>) = Stream m a (a -> b) -> Stream m a a -> Stream m a b
forall (m :: * -> *) a b. Monad m => m (a -> b) -> m a -> m b
ap

instance Monad (Stream m a) where
  Stream forall r' r. (a -> m r') -> (a -> StreamS m r' r) -> StreamS m r' r
m >>= :: Stream m a a -> (a -> Stream m a b) -> Stream m a b
>>= a -> Stream m a b
k = (forall r' r.
 (a -> m r') -> (b -> StreamS m r' r) -> StreamS m r' r)
-> Stream m a b
forall (m :: * -> *) a b.
(forall r' r.
 (a -> m r') -> (b -> StreamS m r' r) -> StreamS m r' r)
-> Stream m a b
Stream ((forall r' r.
  (a -> m r') -> (b -> StreamS m r' r) -> StreamS m r' r)
 -> Stream m a b)
-> (forall r' r.
    (a -> m r') -> (b -> StreamS m r' r) -> StreamS m r' r)
-> Stream m a b
forall a b. (a -> b) -> a -> b
$ \a -> m r'
f b -> StreamS m r' r
h -> (a -> m r') -> (a -> StreamS m r' r) -> StreamS m r' r
forall r' r. (a -> m r') -> (a -> StreamS m r' r) -> StreamS m r' r
m a -> m r'
f (\a
a -> Stream m a b
-> (a -> m r') -> (b -> StreamS m r' r) -> StreamS m r' r
forall (m :: * -> *) a b.
Stream m a b
-> forall r' r.
   (a -> m r') -> (b -> StreamS m r' r) -> StreamS m r' r
runStreamInternal (a -> Stream m a b
k a
a) a -> m r'
f b -> StreamS m r' r
h)

instance MonadIO m => MonadIO (Stream m b) where
  liftIO :: IO a -> Stream m b a
liftIO IO a
io = (forall r' r.
 (b -> m r') -> (a -> StreamS m r' r) -> StreamS m r' r)
-> Stream m b a
forall (m :: * -> *) a b.
(forall r' r.
 (a -> m r') -> (b -> StreamS m r' r) -> StreamS m r' r)
-> Stream m a b
Stream ((forall r' r.
  (b -> m r') -> (a -> StreamS m r' r) -> StreamS m r' r)
 -> Stream m b a)
-> (forall r' r.
    (b -> m r') -> (a -> StreamS m r' r) -> StreamS m r' r)
-> Stream m b a
forall a b. (a -> b) -> a -> b
$ \b -> m r'
_f a -> StreamS m r' r
g -> m (StreamS m r' r) -> StreamS m r' r
forall (m :: * -> *) a b. m (StreamS m a b) -> StreamS m a b
Effect (a -> StreamS m r' r
g (a -> StreamS m r' r) -> m a -> m (StreamS m r' r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO a
io)

yield :: Monad m => a -> Stream m a ()
yield :: a -> Stream m a ()
yield a
a = (forall r' r.
 (a -> m r') -> (() -> StreamS m r' r) -> StreamS m r' r)
-> Stream m a ()
forall (m :: * -> *) a b.
(forall r' r.
 (a -> m r') -> (b -> StreamS m r' r) -> StreamS m r' r)
-> Stream m a b
Stream ((forall r' r.
  (a -> m r') -> (() -> StreamS m r' r) -> StreamS m r' r)
 -> Stream m a ())
-> (forall r' r.
    (a -> m r') -> (() -> StreamS m r' r) -> StreamS m r' r)
-> Stream m a ()
forall a b. (a -> b) -> a -> b
$ \a -> m r'
f () -> StreamS m r' r
rest -> m (StreamS m r' r) -> StreamS m r' r
forall (m :: * -> *) a b. m (StreamS m a b) -> StreamS m a b
Effect ((r' -> StreamS m r' r -> StreamS m r' r)
-> StreamS m r' r -> r' -> StreamS m r' r
forall a b c. (a -> b -> c) -> b -> a -> c
flip r' -> StreamS m r' r -> StreamS m r' r
forall (m :: * -> *) a b. a -> StreamS m a b -> StreamS m a b
Yield (() -> StreamS m r' r
rest ())  (r' -> StreamS m r' r) -> m r' -> m (StreamS m r' r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> m r'
f a
a)

-- | Turn a Stream into an ordinary list, by demanding all the elements.
collect :: Monad m => Stream m a () -> m [a]
collect :: Stream m a () -> m [a]
collect Stream m a ()
str = [a] -> StreamS m a () -> m [a]
forall (m :: * -> *) a. Monad m => [a] -> StreamS m a () -> m [a]
go [] (Stream m a () -> StreamS m a ()
forall (m :: * -> *) r' r.
Applicative m =>
Stream m r' r -> StreamS m r' r
runStream Stream m a ()
str)
 where
  go :: [a] -> StreamS m a () -> m [a]
go [a]
acc (Done ()) = [a] -> m [a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([a] -> [a]
forall a. [a] -> [a]
reverse [a]
acc)
  go [a]
acc (Effect m (StreamS m a ())
m) = m (StreamS m a ())
m m (StreamS m a ()) -> (StreamS m a () -> m [a]) -> m [a]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= [a] -> StreamS m a () -> m [a]
go [a]
acc
  go [a]
acc (Yield a
a StreamS m a ()
k) = [a] -> StreamS m a () -> m [a]
go (a
aa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
acc) StreamS m a ()
k

consume :: (Monad m, Monad n) => Stream m a b -> (forall a . m a -> n a) -> (a -> n ()) -> n b
consume :: Stream m a b -> (forall a. m a -> n a) -> (a -> n ()) -> n b
consume Stream m a b
str forall a. m a -> n a
l a -> n ()
f = StreamS m a b -> n b
go (Stream m a b -> StreamS m a b
forall (m :: * -> *) r' r.
Applicative m =>
Stream m r' r -> StreamS m r' r
runStream Stream m a b
str)
  where
    go :: StreamS m a b -> n b
go (Done b
r) = b -> n b
forall (m :: * -> *) a. Monad m => a -> m a
return b
r
    go (Yield a
a StreamS m a b
p) = a -> n ()
f a
a n () -> n b -> n b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StreamS m a b -> n b
go StreamS m a b
p
    go (Effect m (StreamS m a b)
m)  = m (StreamS m a b) -> n (StreamS m a b)
forall a. m a -> n a
l m (StreamS m a b)
m n (StreamS m a b) -> (StreamS m a b -> n b) -> n b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= StreamS m a b -> n b
go

-- | Turn a list into a 'Stream', by yielding each element in turn.
fromList :: Monad m => [a] -> Stream m a ()
fromList :: [a] -> Stream m a ()
fromList = (a -> Stream m a ()) -> [a] -> Stream m a ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ a -> Stream m a ()
forall (m :: * -> *) a. Monad m => a -> Stream m a ()
yield

-- | Apply a function to each element of a 'Stream', lazily
map :: Monad m => (a -> b) -> Stream m a x -> Stream m b x
map :: (a -> b) -> Stream m a x -> Stream m b x
map a -> b
f Stream m a x
str = (forall r' r.
 (b -> m r') -> (x -> StreamS m r' r) -> StreamS m r' r)
-> Stream m b x
forall (m :: * -> *) a b.
(forall r' r.
 (a -> m r') -> (b -> StreamS m r' r) -> StreamS m r' r)
-> Stream m a b
Stream ((forall r' r.
  (b -> m r') -> (x -> StreamS m r' r) -> StreamS m r' r)
 -> Stream m b x)
-> (forall r' r.
    (b -> m r') -> (x -> StreamS m r' r) -> StreamS m r' r)
-> Stream m b x
forall a b. (a -> b) -> a -> b
$ \b -> m r'
g x -> StreamS m r' r
h -> Stream m a x
-> (a -> m r') -> (x -> StreamS m r' r) -> StreamS m r' r
forall (m :: * -> *) a b.
Stream m a b
-> forall r' r.
   (a -> m r') -> (b -> StreamS m r' r) -> StreamS m r' r
runStreamInternal Stream m a x
str (b -> m r'
g (b -> m r') -> (a -> b) -> a -> m r'
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
f) x -> StreamS m r' r
h

-- | Apply a monadic operation to each element of a 'Stream', lazily
mapM :: Monad m => (a -> m b) -> Stream m a x -> Stream m b x
mapM :: (a -> m b) -> Stream m a x -> Stream m b x
mapM a -> m b
f Stream m a x
str = (forall r' r.
 (b -> m r') -> (x -> StreamS m r' r) -> StreamS m r' r)
-> Stream m b x
forall (m :: * -> *) a b.
(forall r' r.
 (a -> m r') -> (b -> StreamS m r' r) -> StreamS m r' r)
-> Stream m a b
Stream ((forall r' r.
  (b -> m r') -> (x -> StreamS m r' r) -> StreamS m r' r)
 -> Stream m b x)
-> (forall r' r.
    (b -> m r') -> (x -> StreamS m r' r) -> StreamS m r' r)
-> Stream m b x
forall a b. (a -> b) -> a -> b
$ \b -> m r'
g x -> StreamS m r' r
h -> Stream m a x
-> (a -> m r') -> (x -> StreamS m r' r) -> StreamS m r' r
forall (m :: * -> *) a b.
Stream m a b
-> forall r' r.
   (a -> m r') -> (b -> StreamS m r' r) -> StreamS m r' r
runStreamInternal Stream m a x
str (b -> m r'
g (b -> m r') -> (a -> m b) -> a -> m r'
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< a -> m b
f) x -> StreamS m r' r
h

-- | Note this is not very efficient because it traverses the whole stream
-- before rebuilding it, avoid using it if you can. mapAccumL used to
-- implemented but it wasn't used anywhere in the compiler and has similar
-- effiency problems.
mapAccumL_ :: forall m a b c r . Monad m => (c -> a -> m (c,b)) -> c -> Stream m a r
           -> Stream m b (c, r)
mapAccumL_ :: (c -> a -> m (c, b)) -> c -> Stream m a r -> Stream m b (c, r)
mapAccumL_ c -> a -> m (c, b)
f c
c Stream m a r
str = (forall r' r.
 (b -> m r') -> ((c, r) -> StreamS m r' r) -> StreamS m r' r)
-> Stream m b (c, r)
forall (m :: * -> *) a b.
(forall r' r.
 (a -> m r') -> (b -> StreamS m r' r) -> StreamS m r' r)
-> Stream m a b
Stream ((forall r' r.
  (b -> m r') -> ((c, r) -> StreamS m r' r) -> StreamS m r' r)
 -> Stream m b (c, r))
-> (forall r' r.
    (b -> m r') -> ((c, r) -> StreamS m r' r) -> StreamS m r' r)
-> Stream m b (c, r)
forall a b. (a -> b) -> a -> b
$ \b -> m r'
f (c, r) -> StreamS m r' r
h -> c
-> (b -> m r')
-> ((c, r) -> StreamS m r' r)
-> StreamS m a r
-> StreamS m r' r
forall r' r1.
c
-> (b -> m r')
-> ((c, r) -> StreamS m r' r1)
-> StreamS m a r
-> StreamS m r' r1
go c
c b -> m r'
f (c, r) -> StreamS m r' r
h (Stream m a r -> StreamS m a r
forall (m :: * -> *) r' r.
Applicative m =>
Stream m r' r -> StreamS m r' r
runStream Stream m a r
str)

  where
    go :: c
             -> (b -> m r')
             -> ((c, r) -> StreamS m r' r1)
             -> StreamS m a r
             -> StreamS m r' r1
    go :: c
-> (b -> m r')
-> ((c, r) -> StreamS m r' r1)
-> StreamS m a r
-> StreamS m r' r1
go c
c b -> m r'
_f1 (c, r) -> StreamS m r' r1
h1 (Done r
r) = (c, r) -> StreamS m r' r1
h1 (c
c, r
r)
    go c
c b -> m r'
f1 (c, r) -> StreamS m r' r1
h1 (Yield a
a StreamS m a r
p) = m (StreamS m r' r1) -> StreamS m r' r1
forall (m :: * -> *) a b. m (StreamS m a b) -> StreamS m a b
Effect (c -> a -> m (c, b)
f c
c a
a m (c, b) -> ((c, b) -> m (StreamS m r' r1)) -> m (StreamS m r' r1)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (\(c
c', b
b) -> b -> m r'
f1 b
b
                                           m r' -> (r' -> m (StreamS m r' r1)) -> m (StreamS m r' r1)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \r'
r' -> StreamS m r' r1 -> m (StreamS m r' r1)
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamS m r' r1 -> m (StreamS m r' r1))
-> StreamS m r' r1 -> m (StreamS m r' r1)
forall a b. (a -> b) -> a -> b
$ r' -> StreamS m r' r1 -> StreamS m r' r1
forall (m :: * -> *) a b. a -> StreamS m a b -> StreamS m a b
Yield r'
r' (c
-> (b -> m r')
-> ((c, r) -> StreamS m r' r1)
-> StreamS m a r
-> StreamS m r' r1
forall r' r1.
c
-> (b -> m r')
-> ((c, r) -> StreamS m r' r1)
-> StreamS m a r
-> StreamS m r' r1
go c
c' b -> m r'
f1 (c, r) -> StreamS m r' r1
h1 StreamS m a r
p)))
    go c
c b -> m r'
f1 (c, r) -> StreamS m r' r1
h1 (Effect m (StreamS m a r)
m) = m (StreamS m r' r1) -> StreamS m r' r1
forall (m :: * -> *) a b. m (StreamS m a b) -> StreamS m a b
Effect (c
-> (b -> m r')
-> ((c, r) -> StreamS m r' r1)
-> StreamS m a r
-> StreamS m r' r1
forall r' r1.
c
-> (b -> m r')
-> ((c, r) -> StreamS m r' r1)
-> StreamS m a r
-> StreamS m r' r1
go c
c b -> m r'
f1 (c, r) -> StreamS m r' r1
h1 (StreamS m a r -> StreamS m r' r1)
-> m (StreamS m a r) -> m (StreamS m r' r1)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m (StreamS m a r)
m)