-- |
-- Module      : Streamly.Internal.Data.Stream.IsStream.Exception
-- Copyright   : (c) 2019 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.IsStream.Exception
    (
      before
    , after_
    , after
    , bracket_
    , bracket
    , bracket'
    , onException
    , finally_
    , finally
    , ghandle
    , handle
    , retry
    )
where

import Control.Exception (Exception)
import Control.Monad.Catch (MonadCatch)
import Data.Map.Strict (Map)
import Streamly.Internal.Control.Concurrent (MonadRunInIO, MonadAsync)
import Streamly.Internal.Data.Stream.IsStream.Type
    (IsStream(..), fromStreamD, toStreamD)

import qualified Streamly.Internal.Data.Stream.StreamD as D

-- $setup
-- >>> :m
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import qualified Streamly.Internal.Data.Stream.IsStream as Stream (nilM)

------------------------------------------------------------------------------
-- Exceptions
------------------------------------------------------------------------------

-- | Run the action @m b@ before the stream yields its first element.
--
-- Same as the following but more efficient due to fusion:
--
-- >>> before action xs = Stream.nilM action <> xs
-- >>> before action xs = Stream.concatMap (const xs) (Stream.fromEffect action)
--
-- @since 0.7.0
{-# INLINE before #-}
before :: (IsStream t, Monad m) => m b -> t m a -> t m a
before :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
before m b
action t m a
xs = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
D.before m b
action forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs

-- | Like 'after', with following differences:
--
-- * action @m b@ won't run if the stream is garbage collected
--   after partial evaluation.
-- * Monad @m@ does not require any other constraints.
-- * has slightly better performance than 'after'.
--
-- Same as the following, but with stream fusion:
--
-- > after_ action xs = xs <> 'nilM' action
--
-- /Pre-release/
--
{-# INLINE after_ #-}
after_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
after_ :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
after_ m b
action t m a
xs = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
D.after_ m b
action forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs

-- | Run the action @m b@ whenever the stream @t m a@ stops normally, or if it
-- is garbage collected after a partial lazy evaluation.
--
-- The semantics of the action @m b@ are similar to the semantics of cleanup
-- action in 'bracket'.
--
-- /See also 'after_'/
--
-- @since 0.7.0
--
{-# INLINE after #-}
after :: (IsStream t, MonadRunInIO m)
    => m b -> t m a -> t m a
after :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, MonadRunInIO m) =>
m b -> t m a -> t m a
after m b
action t m a
xs = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
MonadRunInIO m =>
m b -> Stream m a -> Stream m a
D.after m b
action forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs

-- | Run the action @m b@ if the stream aborts due to an exception. The
-- exception is not caught, simply rethrown.
--
-- /Inhibits stream fusion/
--
-- @since 0.7.0
{-# INLINE onException #-}
onException :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a
onException :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, MonadCatch m) =>
m b -> t m a -> t m a
onException m b
action t m a
xs = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
MonadCatch m =>
m b -> Stream m a -> Stream m a
D.onException m b
action forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs

-- | Like 'finally' with following differences:
--
-- * action @m b@ won't run if the stream is garbage collected
--   after partial evaluation.
-- * does not require a 'MonadAsync' constraint.
-- * has slightly better performance than 'finally'.
--
-- /Inhibits stream fusion/
--
-- /Pre-release/
--
{-# INLINE finally_ #-}
finally_ :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a
finally_ :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, MonadCatch m) =>
m b -> t m a -> t m a
finally_ m b
action t m a
xs = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
MonadCatch m =>
m b -> Stream m a -> Stream m a
D.finally_ m b
action forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs

-- | Run the action @m b@ whenever the stream @t m a@ stops normally, aborts
-- due to an exception or if it is garbage collected after a partial lazy
-- evaluation.
--
-- The semantics of running the action @m b@ are similar to the cleanup action
-- semantics described in 'bracket'.
--
-- @
-- finally release = bracket (return ()) (const release)
-- @
--
-- /See also 'finally_'/
--
-- /Inhibits stream fusion/
--
-- @since 0.7.0
--
{-# INLINE finally #-}
finally :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> t m a -> t m a
finally :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, MonadAsync m, MonadCatch m) =>
m b -> t m a -> t m a
finally m b
action t m a
xs = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
(MonadAsync m, MonadCatch m) =>
m b -> Stream m a -> Stream m a
D.finally m b
action forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs

-- | Like 'bracket' but with following differences:
--
-- * alloc action @m b@ runs with async exceptions enabled
-- * cleanup action @b -> m c@ won't run if the stream is garbage collected
--   after partial evaluation.
-- * does not require a 'MonadAsync' constraint.
-- * has slightly better performance than 'bracket'.
--
-- /Inhibits stream fusion/
--
-- /Pre-release/
--
{-# INLINE bracket_ #-}
bracket_ :: (IsStream t, MonadCatch m)
    => m b -> (b -> m c) -> (b -> t m a) -> t m a
bracket_ :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b c a.
(IsStream t, MonadCatch m) =>
m b -> (b -> m c) -> (b -> t m a) -> t m a
bracket_ m b
bef b -> m c
aft b -> t m a
bet = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$
    forall (m :: * -> *) b c a.
MonadCatch m =>
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
D.bracket_ m b
bef b -> m c
aft (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> t m a
bet)

-- | Run the alloc action @m b@ with async exceptions disabled but keeping
-- blocking operations interruptible (see 'Control.Exception.mask').  Use the
-- output @b@ as input to @b -> t m a@ to generate an output stream.
--
-- @b@ is usually a resource under the state of monad @m@, e.g. a file
-- handle, that requires a cleanup after use. The cleanup action @b -> m c@,
-- runs whenever the stream ends normally, due to a sync or async exception or
-- if it gets garbage collected after a partial lazy evaluation.
--
-- 'bracket' only guarantees that the cleanup action runs, and it runs with
-- async exceptions enabled. The action must ensure that it can successfully
-- cleanup the resource in the face of sync or async exceptions.
--
-- When the stream ends normally or on a sync exception, cleanup action runs
-- immediately in the current thread context, whereas in other cases it runs in
-- the GC context, therefore, cleanup may be delayed until the GC gets to run.
--
-- /See also: 'bracket_'/
--
-- /Inhibits stream fusion/
--
-- @since 0.7.0
--
{-# INLINE bracket #-}
bracket :: (IsStream t, MonadAsync m, MonadCatch m)
    => m b -> (b -> m c) -> (b -> t m a) -> t m a
bracket :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b c a.
(IsStream t, MonadAsync m, MonadCatch m) =>
m b -> (b -> m c) -> (b -> t m a) -> t m a
bracket m b
bef b -> m c
aft = forall (t :: (* -> *) -> * -> *) (m :: * -> *) b c d e a.
(IsStream t, MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c) -> (b -> m d) -> (b -> m e) -> (b -> t m a) -> t m a
bracket' m b
bef b -> m c
aft b -> m c
aft b -> m c
aft

-- For a use case of this see the "streamly-process" package. It needs to kill
-- the process in case of exception or garbage collection, but waits for the
-- process to terminate in normal cases.
--
-- | Like 'bracket' but can use separate cleanup actions depending on the mode
-- of termination.  @bracket' before onStop onGC onException action@ runs
-- @action@ using the result of @before@. If the stream stops, @onStop@ action
-- is executed, if the stream is abandoned @onGC@ is executed, if the stream
-- encounters an exception @onException@ is executed.
--
-- /Pre-release/
{-# INLINE bracket' #-}
bracket' :: (IsStream t, MonadAsync m, MonadCatch m)
    => m b -> (b -> m c) -> (b -> m d) -> (b -> m e) -> (b -> t m a) -> t m a
bracket' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b c d e a.
(IsStream t, MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c) -> (b -> m d) -> (b -> m e) -> (b -> t m a) -> t m a
bracket' m b
bef b -> m c
aft b -> m d
gc b -> m e
exc b -> t m a
bet = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$
    forall (m :: * -> *) b c d e a.
(MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
D.bracket' m b
bef b -> m c
aft b -> m e
exc b -> m d
gc (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> t m a
bet)

-- | Like 'handle' but the exception handler is also provided with the stream
-- that generated the exception as input. The exception handler can thus
-- re-evaluate the stream to retry the action that failed. The exception
-- handler can again call 'ghandle' on it to retry the action multiple times.
--
-- This is highly experimental. In a stream of actions we can map the stream
-- with a retry combinator to retry each action on failure.
--
-- /Inhibits stream fusion/
--
-- /Pre-release/
--
{-# INLINE ghandle #-}
ghandle :: (IsStream t, MonadCatch m, Exception e)
    => (e -> t m a -> t m a) -> t m a -> t m a
ghandle :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) e a.
(IsStream t, MonadCatch m, Exception e) =>
(e -> t m a -> t m a) -> t m a -> t m a
ghandle e -> t m a -> t m a
handler =
      forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD
    forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> Stream m a -> Stream m a) -> Stream m a -> Stream m a
D.ghandle (\e
e Stream m a
xs -> forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD forall a b. (a -> b) -> a -> b
$ e -> t m a -> t m a
handler e
e (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD Stream m a
xs))
    forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD

-- | When evaluating a stream if an exception occurs, stream evaluation aborts
-- and the specified exception handler is run with the exception as argument.
--
-- /Inhibits stream fusion/
--
-- @since 0.7.0
{-# INLINE handle #-}
handle :: (IsStream t, MonadCatch m, Exception e)
    => (e -> t m a) -> t m a -> t m a
handle :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) e a.
(IsStream t, MonadCatch m, Exception e) =>
(e -> t m a) -> t m a -> t m a
handle e -> t m a
handler t m a
xs =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> Stream m a) -> Stream m a -> Stream m a
D.handle (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> t m a
handler) forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs


-- | @retry@ takes 3 arguments
--
-- 1. A map @m@ whose keys are exceptions and values are the number of times to
-- retry the action given that the exception occurs.
--
-- 2. A handler @han@ that decides how to handle an exception when the exception
-- cannot be retried.
--
-- 3. The stream itself that we want to run this mechanism on.
--
-- When evaluating a stream if an exception occurs,
--
-- 1. The stream evaluation aborts
--
-- 2. The exception is looked up in @m@
--
--    a. If the exception exists and the mapped value is > 0 then,
--
--       i. The value is decreased by 1.
--
--       ii. The stream is resumed from where the exception was called, retrying
--       the action.
--
--    b. If the exception exists and the mapped value is == 0 then the stream
--    evaluation stops.
--
--    c. If the exception does not exist then we handle the exception using
--    @han@.
--
-- /Internal/
--
{-# INLINE retry #-}
retry :: (IsStream t, MonadCatch m, Exception e, Ord e)
    => Map e Int
       -- ^ map from exception to retry count
    -> (e -> t m a)
       -- ^ default handler for those exceptions that are not in the map
    -> t m a
    -> t m a
retry :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) e a.
(IsStream t, MonadCatch m, Exception e, Ord e) =>
Map e Int -> (e -> t m a) -> t m a -> t m a
retry Map e Int
emap e -> t m a
handler t m a
inp =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall e (m :: * -> *) a.
(Exception e, Ord e, MonadCatch m) =>
Map e Int -> (e -> Stream m a) -> Stream m a -> Stream m a
D.retry Map e Int
emap (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> t m a
handler) forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
inp