-- |
-- Module      : Streamly.Internal.Data.Stream.IsStream.Lift
-- Copyright   : (c) 2019 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.IsStream.Lift
    (
    -- * Generalize Inner Monad
      hoist
    , generally

    -- * Transform Inner Monad
    , liftInner
    , usingReaderT
    , runReaderT
    , evalStateT
    , usingStateT
    , runStateT
    )
where

#include "inline.hs"

import Control.Monad.Trans.Reader (ReaderT)
import Control.Monad.Trans.State.Strict (StateT)
import Control.Monad.Trans.Class (MonadTrans(..))
import Data.Functor.Identity (Identity (..))
import Streamly.Internal.Data.Stream.IsStream.Type
    (IsStream(..), fromStreamS, toStreamS, fromStreamD, toStreamD)
import Streamly.Internal.Data.Stream.Serial (SerialT)

import qualified Streamly.Internal.Data.Stream.StreamD as D
#ifdef USE_STREAMK_ONLY
import qualified Streamly.Internal.Data.Stream.StreamK as S
#else
import qualified Streamly.Internal.Data.Stream.StreamD as S
#endif

------------------------------------------------------------------------------
-- Generalize the underlying monad
------------------------------------------------------------------------------

-- | Transform the inner monad of a stream using a natural transformation.
--
-- / Internal/
--
{-# INLINE hoist #-}
hoist :: (Monad m, Monad n)
    => (forall x. m x -> n x) -> SerialT m a -> SerialT n a
hoist :: (forall x. m x -> n x) -> SerialT m a -> SerialT n a
hoist forall x. m x -> n x
f SerialT m a
xs = Stream n a -> SerialT n a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream n a -> SerialT n a) -> Stream n a -> SerialT n a
forall a b. (a -> b) -> a -> b
$ (forall x. m x -> n x) -> Stream m a -> Stream n a
forall (n :: * -> *) (m :: * -> *) a.
Monad n =>
(forall x. m x -> n x) -> Stream m a -> Stream n a
S.hoist forall x. m x -> n x
f (SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS SerialT m a
xs)

-- | Generalize the inner monad of the stream from 'Identity' to any monad.
--
-- / Internal/
--
{-# INLINE generally #-}
generally :: (IsStream t, Monad m) => t Identity a -> t m a
generally :: t Identity a -> t m a
generally t Identity a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (forall x. Identity x -> m x) -> Stream Identity a -> Stream m a
forall (n :: * -> *) (m :: * -> *) a.
Monad n =>
(forall x. m x -> n x) -> Stream m a -> Stream n a
S.hoist (x -> m x
forall (m :: * -> *) a. Monad m => a -> m a
return (x -> m x) -> (Identity x -> x) -> Identity x -> m x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identity x -> x
forall a. Identity a -> a
runIdentity) (t Identity a -> Stream Identity a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t Identity a
xs)

------------------------------------------------------------------------------
-- Add and remove a monad transformer
------------------------------------------------------------------------------

-- | Lift the inner monad @m@ of a stream @t m a@ to @tr m@ using the monad
-- transformer @tr@.
--
-- @since 0.8.0
--
{-# INLINE liftInner #-}
liftInner :: (Monad m, IsStream t, MonadTrans tr, Monad (tr m))
    => t m a -> t (tr m) a
liftInner :: t m a -> t (tr m) a
liftInner t m a
xs = Stream (tr m) a -> t (tr m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream (tr m) a -> t (tr m) a) -> Stream (tr m) a -> t (tr m) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream (tr m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
D.liftInner (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs)

------------------------------------------------------------------------------
-- Sharing read only state in a stream
------------------------------------------------------------------------------

-- | Evaluate the inner monad of a stream as 'ReaderT'.
--
-- @since 0.8.0
--
{-# INLINE runReaderT #-}
runReaderT :: (IsStream t, Monad m) => m s -> t (ReaderT s m) a -> t m a
runReaderT :: m s -> t (ReaderT s m) a -> t m a
runReaderT m s
s t (ReaderT s m) a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ m s -> Stream (ReaderT s m) a -> Stream m a
forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (ReaderT s m) a -> Stream m a
D.runReaderT m s
s (t (ReaderT s m) a -> Stream (ReaderT s m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t (ReaderT s m) a
xs)

-- | Run a stream transformation using a given environment.
--
-- See also: 'Serial.map'
--
-- / Internal/
--
{-# INLINE usingReaderT #-}
usingReaderT
    :: (Monad m, IsStream t)
    => m r
    -> (t (ReaderT r m) a -> t (ReaderT r m) a)
    -> t m a
    -> t m a
usingReaderT :: m r -> (t (ReaderT r m) a -> t (ReaderT r m) a) -> t m a -> t m a
usingReaderT m r
r t (ReaderT r m) a -> t (ReaderT r m) a
f t m a
xs = m r -> t (ReaderT r m) a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) s a.
(IsStream t, Monad m) =>
m s -> t (ReaderT s m) a -> t m a
runReaderT m r
r (t (ReaderT r m) a -> t m a) -> t (ReaderT r m) a -> t m a
forall a b. (a -> b) -> a -> b
$ t (ReaderT r m) a -> t (ReaderT r m) a
f (t (ReaderT r m) a -> t (ReaderT r m) a)
-> t (ReaderT r m) a -> t (ReaderT r m) a
forall a b. (a -> b) -> a -> b
$ t m a -> t (ReaderT r m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *)
       (tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
liftInner t m a
xs

------------------------------------------------------------------------------
-- Sharing read write state in a stream
------------------------------------------------------------------------------

-- | Evaluate the inner monad of a stream as 'StateT'.
--
-- This is supported only for 'SerialT' as concurrent state updation may not be
-- safe.
--
-- > evalStateT s = Stream.map snd . Stream.runStateT s
--
-- / Internal/
--
{-# INLINE evalStateT #-}
evalStateT ::  Monad m => m s -> SerialT (StateT s m) a -> SerialT m a
-- evalStateT s = fmap snd . runStateT s
evalStateT :: m s -> SerialT (StateT s m) a -> SerialT m a
evalStateT m s
s SerialT (StateT s m) a
xs = Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> SerialT m a) -> Stream m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ m s -> Stream (StateT s m) a -> Stream m a
forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (StateT s m) a -> Stream m a
D.evalStateT m s
s (SerialT (StateT s m) a -> Stream (StateT s m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD SerialT (StateT s m) a
xs)

-- | Run a stateful (StateT) stream transformation using a given state.
--
-- This is supported only for 'SerialT' as concurrent state updation may not be
-- safe.
--
-- > usingStateT s f = evalStateT s . f . liftInner
--
-- See also: 'scanl''
--
-- / Internal/
--
{-# INLINE usingStateT #-}
usingStateT
    :: Monad m
    => m s
    -> (SerialT (StateT s m) a -> SerialT (StateT s m) a)
    -> SerialT m a
    -> SerialT m a
usingStateT :: m s
-> (SerialT (StateT s m) a -> SerialT (StateT s m) a)
-> SerialT m a
-> SerialT m a
usingStateT m s
s SerialT (StateT s m) a -> SerialT (StateT s m) a
f = m s -> SerialT (StateT s m) a -> SerialT m a
forall (m :: * -> *) s a.
Monad m =>
m s -> SerialT (StateT s m) a -> SerialT m a
evalStateT m s
s (SerialT (StateT s m) a -> SerialT m a)
-> (SerialT m a -> SerialT (StateT s m) a)
-> SerialT m a
-> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT (StateT s m) a -> SerialT (StateT s m) a
f (SerialT (StateT s m) a -> SerialT (StateT s m) a)
-> (SerialT m a -> SerialT (StateT s m) a)
-> SerialT m a
-> SerialT (StateT s m) a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m a -> SerialT (StateT s m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *)
       (tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
liftInner

-- | Evaluate the inner monad of a stream as 'StateT' and emit the resulting
-- state and value pair after each step.
--
-- This is supported only for 'SerialT' as concurrent state updation may not be
-- safe.
--
-- @since 0.8.0
--
{-# INLINE runStateT #-}
runStateT :: Monad m => m s -> SerialT (StateT s m) a -> SerialT m (s, a)
runStateT :: m s -> SerialT (StateT s m) a -> SerialT m (s, a)
runStateT m s
s SerialT (StateT s m) a
xs = Stream m (s, a) -> SerialT m (s, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m (s, a) -> SerialT m (s, a))
-> Stream m (s, a) -> SerialT m (s, a)
forall a b. (a -> b) -> a -> b
$ m s -> Stream (StateT s m) a -> Stream m (s, a)
forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (StateT s m) a -> Stream m (s, a)
D.runStateT m s
s (SerialT (StateT s m) a -> Stream (StateT s m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD SerialT (StateT s m) a
xs)