{-# LANGUAGE CPP #-}
-- |
-- Module      : Streamly.Internal.Data.Stream.Exception
-- Copyright   : (c) 2020 Composewell Technologies and Contributors
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.Exception
    (
      gbracket_
    , gbracket
    , before
    , afterUnsafe
    , afterIO
    , bracketUnsafe
    , bracketIO3
    , bracketIO
    , onException
    , finallyUnsafe
    , finallyIO
    , ghandle
    , handle
    )
where

#include "inline.hs"

import Control.Monad.IO.Class (MonadIO(..))
import Control.Exception (Exception, SomeException, mask_)
import Control.Monad.Catch (MonadCatch)
import GHC.Exts (inline)
import Streamly.Internal.Data.IOFinalizer
    (newIOFinalizer, runIOFinalizer, clearingIOFinalizer)

import qualified Control.Monad.Catch as MC

import Streamly.Internal.Data.Stream.Type

#include "DocTestDataStream.hs"

data GbracketState s1 s2 v
    = GBracketInit
    | GBracketNormal s1 v
    | GBracketException s2

-- | Like 'gbracket' but with following differences:
--
-- * alloc action @m c@ runs with async exceptions enabled
-- * cleanup action @c -> m d@ won't run if the stream is garbage collected
--   after partial evaluation.
--
-- /Inhibits stream fusion/
--
-- /Pre-release/
--
{-# INLINE_NORMAL gbracket_ #-}
gbracket_
    :: Monad m
    => m c                                  -- ^ before
    -> (c -> m d)                           -- ^ after, on normal stop
    -> (c -> e -> Stream m b -> m (Stream m b)) -- ^ on exception
    -> (forall s. m s -> m (Either e s))    -- ^ try (exception handling)
    -> (c -> Stream m b)                    -- ^ stream generator
    -> Stream m b
gbracket_ :: forall (m :: * -> *) c d e b.
Monad m =>
m c
-> (c -> m d)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket_ m c
bef c -> m d
aft c -> e -> Stream m b -> m (Stream m b)
onExc forall s. m s -> m (Either e s)
ftry c -> Stream m b
action =
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b
-> GbracketState (Stream m b) (Stream m b) c
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
step forall s1 s2 v. GbracketState s1 s2 v
GBracketInit

    where

    {-# INLINE_LATE step #-}
    step :: State StreamK m b
-> GbracketState (Stream m b) (Stream m b) c
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
step State StreamK m b
_ GbracketState (Stream m b) (Stream m b) c
GBracketInit = do
        c
r <- m c
bef
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall s1 s2 v. s1 -> v -> GbracketState s1 s2 v
GBracketNormal (c -> Stream m b
action c
r) c
r

    step State StreamK m b
gst (GBracketNormal (UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st) c
v) = do
        Either e (Step s b)
res <- forall s. m s -> m (Either e s)
ftry forall a b. (a -> b) -> a -> b
$ State StreamK m b -> s -> m (Step s b)
step1 State StreamK m b
gst s
st
        case Either e (Step s b)
res of
            Right Step s b
r -> case Step s b
r of
                Yield b
x s
s ->
                    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
x (forall s1 s2 v. s1 -> v -> GbracketState s1 s2 v
GBracketNormal (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
step1 s
s) c
v)
                Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s1 s2 v. s1 -> v -> GbracketState s1 s2 v
GBracketNormal (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
step1 s
s) c
v)
                Step s b
Stop -> c -> m d
aft c
v forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
            -- XXX Do not handle async exceptions, just rethrow them.
            Left e
e -> do
                Stream m b
strm <- c -> e -> Stream m b -> m (Stream m b)
onExc c
v e
e (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st)
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s1 s2 v. s2 -> GbracketState s1 s2 v
GBracketException Stream m b
strm)
    step State StreamK m b
gst (GBracketException (UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st)) = do
        Step s b
res <- State StreamK m b -> s -> m (Step s b)
step1 State StreamK m b
gst s
st
        case Step s b
res of
            Yield b
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
x (forall s1 s2 v. s2 -> GbracketState s1 s2 v
GBracketException (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
step1 s
s))
            Skip s
s    -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s1 s2 v. s2 -> GbracketState s1 s2 v
GBracketException (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
step1 s
s))
            Step s b
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

data GbracketIOState s1 s2 v wref
    = GBracketIOInit
    | GBracketIONormal s1 v wref
    | GBracketIOException s2

-- | Run the alloc action @m c@ with async exceptions disabled but keeping
-- blocking operations interruptible (see 'Control.Exception.mask').  Use the
-- output @c@ as input to @c -> Stream m b@ to generate an output stream. When
-- generating the stream use the supplied @try@ operation @forall s. m s -> m
-- (Either e s)@ to catch synchronous exceptions. If an exception occurs run
-- the exception handler @c -> e -> Stream m b -> m (Stream m b)@. Note that
-- 'gbracket' does not rethrow the exception, it has to be done by the
-- exception handler if desired.
--
-- The cleanup action @c -> m d@, runs whenever the stream ends normally, due
-- to a sync or async exception or if it gets garbage collected after a partial
-- lazy evaluation.  See 'bracket' for the semantics of the cleanup action.
--
-- 'gbracket' can express all other exception handling combinators.
--
-- /Inhibits stream fusion/
--
-- /Pre-release/
{-# INLINE_NORMAL gbracket #-}
gbracket
    :: MonadIO m
    => IO c -- ^ before
    -> (c -> IO d1) -- ^ on normal stop
    -> (c -> e -> Stream m b -> IO (Stream m b)) -- ^ on exception
    -> (c -> IO d2) -- ^ on GC without normal stop or exception
    -> (forall s. m s -> m (Either e s)) -- ^ try (exception handling)
    -> (c -> Stream m b) -- ^ stream generator
    -> Stream m b
gbracket :: forall (m :: * -> *) c d1 e b d2.
MonadIO m =>
IO c
-> (c -> IO d1)
-> (c -> e -> Stream m b -> IO (Stream m b))
-> (c -> IO d2)
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket IO c
bef c -> IO d1
aft c -> e -> Stream m b -> IO (Stream m b)
onExc c -> IO d2
onGC forall s. m s -> m (Either e s)
ftry c -> Stream m b
action =
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
step forall s1 s2 v wref. GbracketIOState s1 s2 v wref
GBracketIOInit

    where

    -- If the stream is never evaluated the "aft" action will never be
    -- called. For that to occur we will need the user of this API to pass a
    -- weak pointer to us.
    {-# INLINE_LATE step #-}
    step :: State StreamK m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
step State StreamK m b
_ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
GBracketIOInit = do
        -- We mask asynchronous exceptions to make the execution
        -- of 'bef' and the registration of 'aft' atomic.
        -- A similar thing is done in the resourcet package: https://git.io/JvKV3
        -- Tutorial: https://markkarpov.com/tutorial/exceptions.html
        (c
r, IOFinalizer
ref) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IO a -> IO a
mask_ forall a b. (a -> b) -> a -> b
$ do
            c
r <- IO c
bef
            IOFinalizer
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m IOFinalizer
newIOFinalizer (c -> IO d2
onGC c
r)
            forall (m :: * -> *) a. Monad m => a -> m a
return (c
r, IOFinalizer
ref)
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal (c -> Stream m b
action c
r) c
r IOFinalizer
ref

    step State StreamK m b
gst (GBracketIONormal (UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st) c
v IOFinalizer
ref) = do
        Either e (Step s b)
res <- forall s. m s -> m (Either e s)
ftry forall a b. (a -> b) -> a -> b
$ State StreamK m b -> s -> m (Step s b)
step1 State StreamK m b
gst s
st
        case Either e (Step s b)
res of
            Right Step s b
r -> case Step s b
r of
                Yield b
x s
s ->
                    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
x (forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
step1 s
s) c
v IOFinalizer
ref)
                Skip s
s ->
                    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
step1 s
s) c
v IOFinalizer
ref)
                Step s b
Stop ->
                    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (m :: * -> *) a. MonadIO m => IOFinalizer -> IO a -> m a
clearingIOFinalizer IOFinalizer
ref (c -> IO d1
aft c
v)) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
            -- XXX Do not handle async exceptions, just rethrow them.
            Left e
e -> do
                -- Clearing of finalizer and running of exception handler must
                -- be atomic wrt async exceptions. Otherwise if we have cleared
                -- the finalizer and have not run the exception handler then we
                -- may leak the resource.
                Stream m b
stream <-
                    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (m :: * -> *) a. MonadIO m => IOFinalizer -> IO a -> m a
clearingIOFinalizer IOFinalizer
ref (c -> e -> Stream m b -> IO (Stream m b)
onExc c
v e
e (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st)))
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException Stream m b
stream)
    step State StreamK m b
gst (GBracketIOException (UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st)) = do
        Step s b
res <- State StreamK m b -> s -> m (Step s b)
step1 State StreamK m b
gst s
st
        case Step s b
res of
            Yield b
x s
s ->
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
x (forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
step1 s
s))
            Skip s
s    -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
step1 s
s))
            Step s b
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- | Run the action @m b@ before the stream yields its first element.
--
-- Same as the following but more efficient due to fusion:
--
-- >>> before action xs = Stream.nilM action <> xs
-- >>> before action xs = Stream.concatMap (const xs) (Stream.fromEffect action)
--
{-# INLINE_NORMAL before #-}
before :: Monad m => m b -> Stream m a -> Stream m a
before :: forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
before m b
action (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> Maybe s -> m (Step (Maybe s) a)
step' forall a. Maybe a
Nothing

    where

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> Maybe s -> m (Step (Maybe s) a)
step' State StreamK m a
_ Maybe s
Nothing = m b
action forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just s
state))

    step' State StreamK m a
gst (Just s
st) = do
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
res of
            Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall a. a -> Maybe a
Just s
s)
            Skip s
s    -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just s
s)
            Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- | Like 'after', with following differences:
--
-- * action @m b@ won't run if the stream is garbage collected
--   after partial evaluation.
-- * Monad @m@ does not require any other constraints.
-- * has slightly better performance than 'after'.
--
-- Same as the following, but with stream fusion:
--
-- >>> afterUnsafe action xs = xs <> Stream.nilM action
--
-- /Pre-release/
--
{-# INLINE_NORMAL afterUnsafe #-}
afterUnsafe :: Monad m => m b -> Stream m a -> Stream m a
afterUnsafe :: forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
afterUnsafe m b
action (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step' s
state

    where

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> s -> m (Step s a)
step' State StreamK m a
gst s
st = do
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
res of
            Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x s
s
            Skip s
s    -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip s
s
            Step s a
Stop      -> m b
action forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- | Run the action @IO b@ whenever the stream is evaluated to completion, or
-- if it is garbage collected after a partial lazy evaluation.
--
-- The semantics of the action @IO b@ are similar to the semantics of cleanup
-- action in 'bracketIO'.
--
-- /See also 'afterUnsafe'/
--
{-# INLINE_NORMAL afterIO #-}
afterIO :: MonadIO m
    => IO b -> Stream m a -> Stream m a
afterIO :: forall (m :: * -> *) b a.
MonadIO m =>
IO b -> Stream m a -> Stream m a
afterIO IO b
action (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a
-> Maybe (s, IOFinalizer) -> m (Step (Maybe (s, IOFinalizer)) a)
step' forall a. Maybe a
Nothing

    where

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a
-> Maybe (s, IOFinalizer) -> m (Step (Maybe (s, IOFinalizer)) a)
step' State StreamK m a
_ Maybe (s, IOFinalizer)
Nothing = do
        IOFinalizer
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m IOFinalizer
newIOFinalizer IO b
action
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just (s
state, IOFinalizer
ref)
    step' State StreamK m a
gst (Just (s
st, IOFinalizer
ref)) = do
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
res of
            Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall a. a -> Maybe a
Just (s
s, IOFinalizer
ref))
            Skip s
s    -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (s
s, IOFinalizer
ref))
            Step s a
Stop      -> do
                forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref
                forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- XXX For high performance error checks in busy streams we may need another
-- Error constructor in step.

-- | Run the action @m b@ if the stream evaluation is aborted due to an
-- exception. The exception is not caught, simply rethrown.
--
-- Observes exceptions only in the stream generation, and not in stream
-- consumers.
--
-- /Inhibits stream fusion/
--
{-# INLINE_NORMAL onException #-}
onException :: MonadCatch m => m b -> Stream m a -> Stream m a
onException :: forall (m :: * -> *) b a.
MonadCatch m =>
m b -> Stream m a -> Stream m a
onException m b
action Stream m a
stream =
    forall (m :: * -> *) c d e b.
Monad m =>
m c
-> (c -> m d)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket_
        (forall (m :: * -> *) a. Monad m => a -> m a
return ()) -- before
        forall (m :: * -> *) a. Monad m => a -> m a
return      -- after
        (\()
_ (SomeException
e :: MC.SomeException) Stream m a
_ -> m b
action forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
MC.throwM SomeException
e)
        (forall a. a -> a
inline forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try)
        (forall a b. a -> b -> a
const Stream m a
stream)

{-# INLINE_NORMAL _onException #-}
_onException :: MonadCatch m => m b -> Stream m a -> Stream m a
_onException :: forall (m :: * -> *) b a.
MonadCatch m =>
m b -> Stream m a -> Stream m a
_onException m b
action (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step' s
state

    where

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> s -> m (Step s a)
step' State StreamK m a
gst s
st = do
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
`MC.onException` m b
action
        case Step s a
res of
            Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x s
s
            Skip s
s    -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip s
s
            Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- | Like 'bracket' but with following differences:
--
-- * alloc action @m b@ runs with async exceptions enabled
-- * cleanup action @b -> m c@ won't run if the stream is garbage collected
--   after partial evaluation.
-- * has slightly better performance than 'bracketIO'.
--
-- /Inhibits stream fusion/
--
-- /Pre-release/
--
{-# INLINE_NORMAL bracketUnsafe #-}
bracketUnsafe :: MonadCatch m
    => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracketUnsafe :: forall (m :: * -> *) b c a.
MonadCatch m =>
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracketUnsafe m b
bef b -> m c
aft =
    forall (m :: * -> *) c d e b.
Monad m =>
m c
-> (c -> m d)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket_
        m b
bef
        b -> m c
aft
        (\b
a (SomeException
e :: SomeException) Stream m a
_ -> b -> m c
aft b
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
MC.throwM SomeException
e)
        (forall a. a -> a
inline forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try)

-- For a use case of this see the "streamly-process" package. It needs to kill
-- the process in case of exception or garbage collection, but waits for the
-- process to terminate in normal cases.

-- | Like 'bracketIO' but can use 3 separate cleanup actions depending on the
-- mode of termination:
--
-- 1. When the stream stops normally
-- 2. When the stream is garbage collected
-- 3. When the stream encounters an exception
--
-- @bracketIO3 before onStop onGC onException action@ runs @action@ using the
-- result of @before@. If the stream stops, @onStop@ action is executed, if the
-- stream is abandoned @onGC@ is executed, if the stream encounters an
-- exception @onException@ is executed.
--
-- The exception is not caught, it is rethrown.
--
-- /Inhibits stream fusion/
--
-- /Pre-release/
{-# INLINE_NORMAL bracketIO3 #-}
bracketIO3 :: (MonadIO m, MonadCatch m) =>
       IO b
    -> (b -> IO c)
    -> (b -> IO d)
    -> (b -> IO e)
    -> (b -> Stream m a)
    -> Stream m a
bracketIO3 :: forall (m :: * -> *) b c d e a.
(MonadIO m, MonadCatch m) =>
IO b
-> (b -> IO c)
-> (b -> IO d)
-> (b -> IO e)
-> (b -> Stream m a)
-> Stream m a
bracketIO3 IO b
bef b -> IO c
aft b -> IO d
onExc b -> IO e
onGC =
    forall (m :: * -> *) c d1 e b d2.
MonadIO m =>
IO c
-> (c -> IO d1)
-> (c -> e -> Stream m b -> IO (Stream m b))
-> (c -> IO d2)
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket
        IO b
bef
        b -> IO c
aft
        (\b
a (SomeException
e :: SomeException) Stream m a
_ -> b -> IO d
onExc b
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
MC.throwM SomeException
e)
        b -> IO e
onGC
        (forall a. a -> a
inline forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try)

-- | Run the alloc action @IO b@ with async exceptions disabled but keeping
-- blocking operations interruptible (see 'Control.Exception.mask').  Use the
-- output @b@ of the IO action as input to the function @b -> Stream m a@ to
-- generate an output stream.
--
-- @b@ is usually a resource under the IO monad, e.g. a file handle, that
-- requires a cleanup after use. The cleanup action @b -> IO c@, runs whenever
-- (1) the stream ends normally, (2) due to a sync or async exception or, (3)
-- if it gets garbage collected after a partial lazy evaluation. The exception
-- is not caught, it is rethrown.
--
-- 'bracketIO' only guarantees that the cleanup action runs, and it runs with
-- async exceptions enabled. The action must ensure that it can successfully
-- cleanup the resource in the face of sync or async exceptions.
--
-- When the stream ends normally or on a sync exception, cleanup action runs
-- immediately in the current thread context, whereas in other cases it runs in
-- the GC context, therefore, cleanup may be delayed until the GC gets to run.
-- An example where GC based cleanup happens is when a stream is being folded
-- but the fold terminates without draining the entire stream or if the
-- consumer of the stream encounters an exception.
--
-- Observes exceptions only in the stream generation, and not in stream
-- consumers.
--
-- /See also: 'bracketUnsafe'/
--
-- /Inhibits stream fusion/
--
{-# INLINE bracketIO #-}
bracketIO :: (MonadIO m, MonadCatch m)
    => IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a
bracketIO :: forall (m :: * -> *) b c a.
(MonadIO m, MonadCatch m) =>
IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a
bracketIO IO b
bef b -> IO c
aft = forall (m :: * -> *) b c d e a.
(MonadIO m, MonadCatch m) =>
IO b
-> (b -> IO c)
-> (b -> IO d)
-> (b -> IO e)
-> (b -> Stream m a)
-> Stream m a
bracketIO3 IO b
bef b -> IO c
aft b -> IO c
aft b -> IO c
aft

data BracketState s v = BracketInit | BracketRun s v

-- | Alternate (custom) implementation of 'bracket'.
--
{-# INLINE_NORMAL _bracket #-}
_bracket :: MonadCatch m
    => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
_bracket :: forall (m :: * -> *) b c a.
MonadCatch m =>
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
_bracket m b
bef b -> m c
aft b -> Stream m a
bet = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a
-> BracketState (Stream m a) b
-> m (Step (BracketState (Stream m a) b) a)
step' forall s v. BracketState s v
BracketInit

    where

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a
-> BracketState (Stream m a) b
-> m (Step (BracketState (Stream m a) b) a)
step' State StreamK m a
_ BracketState (Stream m a) b
BracketInit = m b
bef forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \b
x -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. s -> Step s a
Skip (forall s v. s -> v -> BracketState s v
BracketRun (b -> Stream m a
bet b
x) b
x))

    -- NOTE: It is important to use UnStream instead of the Stream pattern
    -- here, otherwise we get huge perf degradation, see note in concatMap.
    step' State StreamK m a
gst (BracketRun (UnStream State StreamK m a -> s -> m (Step s a)
step s
state) b
v) = do
        -- res <- step gst state `MC.onException` aft v
        Either SomeException (Step s a)
res <- forall a. a -> a
inline forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try forall a b. (a -> b) -> a -> b
$ State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
state
        case Either SomeException (Step s a)
res of
            Left (SomeException
e :: SomeException) -> b -> m c
aft b
v forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
MC.throwM SomeException
e forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
            Right Step s a
r -> case Step s a
r of
                Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall s v. s -> v -> BracketState s v
BracketRun (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step s
s) b
v)
                Skip s
s    -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s v. s -> v -> BracketState s v
BracketRun (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step s
s) b
v)
                Step s a
Stop      -> b -> m c
aft b
v forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- | Like 'finally' with following differences:
--
-- * action @m b@ won't run if the stream is garbage collected
--   after partial evaluation.
-- * has slightly better performance than 'finallyIO'.
--
-- /Inhibits stream fusion/
--
-- /Pre-release/
--
{-# INLINE finallyUnsafe #-}
finallyUnsafe :: MonadCatch m => m b -> Stream m a -> Stream m a
finallyUnsafe :: forall (m :: * -> *) b a.
MonadCatch m =>
m b -> Stream m a -> Stream m a
finallyUnsafe m b
action Stream m a
xs = forall (m :: * -> *) b c a.
MonadCatch m =>
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracketUnsafe (forall (m :: * -> *) a. Monad m => a -> m a
return ()) (forall a b. a -> b -> a
const m b
action) (forall a b. a -> b -> a
const Stream m a
xs)

-- | Run the action @IO b@ whenever the stream stream stops normally, aborts
-- due to an exception or if it is garbage collected after a partial lazy
-- evaluation.
--
-- The semantics of running the action @IO b@ are similar to the cleanup action
-- semantics described in 'bracketIO'.
--
-- >>> finallyIO release = Stream.bracketIO (return ()) (const release)
--
-- /See also 'finallyUnsafe'/
--
-- /Inhibits stream fusion/
--
{-# INLINE finallyIO #-}
finallyIO :: (MonadIO m, MonadCatch m) => IO b -> Stream m a -> Stream m a
finallyIO :: forall (m :: * -> *) b a.
(MonadIO m, MonadCatch m) =>
IO b -> Stream m a -> Stream m a
finallyIO IO b
action Stream m a
xs = forall (m :: * -> *) b c d e a.
(MonadIO m, MonadCatch m) =>
IO b
-> (b -> IO c)
-> (b -> IO d)
-> (b -> IO e)
-> (b -> Stream m a)
-> Stream m a
bracketIO3 (forall (m :: * -> *) a. Monad m => a -> m a
return ()) forall {p}. p -> IO b
act forall {p}. p -> IO b
act forall {p}. p -> IO b
act (forall a b. a -> b -> a
const Stream m a
xs)
    where act :: p -> IO b
act p
_ = IO b
action

-- | Like 'handle' but the exception handler is also provided with the stream
-- that generated the exception as input. The exception handler can thus
-- re-evaluate the stream to retry the action that failed. The exception
-- handler can again call 'ghandle' on it to retry the action multiple times.
--
-- This is highly experimental. In a stream of actions we can map the stream
-- with a retry combinator to retry each action on failure.
--
-- /Inhibits stream fusion/
--
-- /Pre-release/
--
{-# INLINE_NORMAL ghandle #-}
ghandle :: (MonadCatch m, Exception e)
    => (e -> Stream m a -> m (Stream m a)) -> Stream m a -> Stream m a
ghandle :: forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> Stream m a -> m (Stream m a)) -> Stream m a -> Stream m a
ghandle e -> Stream m a -> m (Stream m a)
f Stream m a
stream =
    forall (m :: * -> *) c d e b.
Monad m =>
m c
-> (c -> m d)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket_ (forall (m :: * -> *) a. Monad m => a -> m a
return ()) forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> b -> a
const e -> Stream m a -> m (Stream m a)
f) (forall a. a -> a
inline forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try) (forall a b. a -> b -> a
const Stream m a
stream)

-- | When evaluating a stream if an exception occurs, stream evaluation aborts
-- and the specified exception handler is run with the exception as argument.
-- The exception is caught and handled unless the handler decides to rethrow
-- it. Note that exception handling is not applied to the stream returned by
-- the exception handler.
--
-- Observes exceptions only in the stream generation, and not in stream
-- consumers.
--
-- /Inhibits stream fusion/
--
{-# INLINE_NORMAL handle #-}
handle :: (MonadCatch m, Exception e)
    => (e -> m (Stream m a)) -> Stream m a -> Stream m a
handle :: forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> m (Stream m a)) -> Stream m a -> Stream m a
handle e -> m (Stream m a)
f Stream m a
stream =
    forall (m :: * -> *) c d e b.
Monad m =>
m c
-> (c -> m d)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket_ (forall (m :: * -> *) a. Monad m => a -> m a
return ()) forall (m :: * -> *) a. Monad m => a -> m a
return (\()
_ e
e Stream m a
_ -> e -> m (Stream m a)
f e
e) (forall a. a -> a
inline forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try) (forall a b. a -> b -> a
const Stream m a
stream)

-- | Alternate (custom) implementation of 'handle'.
--
{-# INLINE_NORMAL _handle #-}
_handle :: (MonadCatch m, Exception e)
    => (e -> Stream m a) -> Stream m a -> Stream m a
_handle :: forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> Stream m a) -> Stream m a -> Stream m a
_handle e -> Stream m a
f (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a
-> Either s (Stream m a) -> m (Step (Either s (Stream m a)) a)
step' (forall a b. a -> Either a b
Left s
state)

    where

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a
-> Either s (Stream m a) -> m (Step (Either s (Stream m a)) a)
step' State StreamK m a
gst (Left s
st) = do
        Either e (Step s a)
res <- forall a. a -> a
inline forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try forall a b. (a -> b) -> a -> b
$ State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Either e (Step s a)
res of
            Left e
e -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right (e -> Stream m a
f e
e)
            Right Step s a
r -> case Step s a
r of
                Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall a b. a -> Either a b
Left s
s)
                Skip s
s    -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left s
s)
                Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step' State StreamK m a
gst (Right (UnStream State StreamK m a -> s -> m (Step s a)
step1 s
st)) = do
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
        case Step s a
res of
            Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall a b. b -> Either a b
Right (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step1 s
s))
            Skip s
s    -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step1 s
s))
            Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop