-- |
-- Module      : Streamly.Internal.Data.Stream.StreamD.Exception
-- Copyright   : (c) 2020 Composewell Technologies and Contributors
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.StreamD.Exception
    (
      gbracket_
    , gbracket
    , before
    , after_
    , after
    , bracket_
    , bracket'
    , onException
    , finally_
    , finally
    , ghandle
    , handle
    , retry
    )
where

#include "inline.hs"

import Control.Exception (Exception, SomeException, mask_)
import Control.Monad.Catch (MonadCatch)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Control (MonadBaseControl, liftBaseOp_)
import Data.Map.Strict (Map)
import GHC.Exts (inline)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.IOFinalizer
    (newIOFinalizer, runIOFinalizer, clearingIOFinalizer)

import qualified Control.Monad.Catch as MC
import qualified Data.Map.Strict as Map

import Streamly.Internal.Data.Stream.StreamD.Type

data GbracketState s1 s2 v
    = GBracketInit
    | GBracketNormal s1 v
    | GBracketException s2

-- | Like 'gbracket' but with following differences:
--
-- * alloc action @m c@ runs with async exceptions enabled
-- * cleanup action @c -> m d@ won't run if the stream is garbage collected
--   after partial evaluation.
-- * does not require a 'MonadAsync' constraint.
--
-- /Inhibits stream fusion/
--
-- /Pre-release/
--
{-# INLINE_NORMAL gbracket_ #-}
gbracket_
    :: Monad m
    => m c                                  -- ^ before
    -> (forall s. m s -> m (Either e s))    -- ^ try (exception handling)
    -> (c -> m d)                           -- ^ after, on normal stop
    -> (c -> e -> Stream m b -> Stream m b) -- ^ on exception
    -> (c -> Stream m b)                    -- ^ stream generator
    -> Stream m b
gbracket_ :: m c
-> (forall s. m s -> m (Either e s))
-> (c -> m d)
-> (c -> e -> Stream m b -> Stream m b)
-> (c -> Stream m b)
-> Stream m b
gbracket_ m c
bef forall s. m s -> m (Either e s)
exc c -> m d
aft c -> e -> Stream m b -> Stream m b
fexc c -> Stream m b
fnormal =
    (State Stream m b
 -> GbracketState (Stream m b) (Stream m b) c
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> GbracketState (Stream m b) (Stream m b) c -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b
-> GbracketState (Stream m b) (Stream m b) c
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
step GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. GbracketState s1 s2 v
GBracketInit

    where

    {-# INLINE_LATE step #-}
    step :: State Stream m b
-> GbracketState (Stream m b) (Stream m b) c
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
step State Stream m b
_ GbracketState (Stream m b) (Stream m b) c
GBracketInit = do
        c
r <- m c
bef
        Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketState (Stream m b) (Stream m b) c) b
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a b. (a -> b) -> a -> b
$ GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. s -> Step s a
Skip (GbracketState (Stream m b) (Stream m b) c
 -> Step (GbracketState (Stream m b) (Stream m b) c) b)
-> GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall a b. (a -> b) -> a -> b
$ Stream m b -> c -> GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. s1 -> v -> GbracketState s1 s2 v
GBracketNormal (c -> Stream m b
fnormal c
r) c
r

    step State Stream m b
gst (GBracketNormal (UnStream State Stream m b -> s -> m (Step s b)
step1 s
st) c
v) = do
        Either e (Step s b)
res <- m (Step s b) -> m (Either e (Step s b))
forall s. m s -> m (Either e s)
exc (m (Step s b) -> m (Either e (Step s b)))
-> m (Step s b) -> m (Either e (Step s b))
forall a b. (a -> b) -> a -> b
$ State Stream m b -> s -> m (Step s b)
step1 State Stream m b
gst s
st
        case Either e (Step s b)
res of
            Right Step s b
r -> case Step s b
r of
                Yield b
x s
s ->
                    Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketState (Stream m b) (Stream m b) c) b
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a b. (a -> b) -> a -> b
$ b
-> GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. a -> s -> Step s a
Yield b
x (Stream m b -> c -> GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. s1 -> v -> GbracketState s1 s2 v
GBracketNormal ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
step1 s
s) c
v)
                Skip s
s -> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketState (Stream m b) (Stream m b) c) b
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a b. (a -> b) -> a -> b
$ GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. s -> Step s a
Skip (Stream m b -> c -> GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. s1 -> v -> GbracketState s1 s2 v
GBracketNormal ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
step1 s
s) c
v)
                Step s b
Stop -> c -> m d
aft c
v m d
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. Step s a
Stop
            -- XXX Do not handle async exceptions, just rethrow them.
            Left e
e ->
                Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketState (Stream m b) (Stream m b) c) b
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a b. (a -> b) -> a -> b
$ GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. s -> Step s a
Skip (Stream m b -> GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. s2 -> GbracketState s1 s2 v
GBracketException (c -> e -> Stream m b -> Stream m b
fexc c
v e
e ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
UnStream State Stream m b -> s -> m (Step s b)
step1 s
st)))
    step State Stream m b
gst (GBracketException (UnStream State Stream m b -> s -> m (Step s b)
step1 s
st)) = do
        Step s b
res <- State Stream m b -> s -> m (Step s b)
step1 State Stream m b
gst s
st
        case Step s b
res of
            Yield b
x s
s -> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketState (Stream m b) (Stream m b) c) b
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a b. (a -> b) -> a -> b
$ b
-> GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. a -> s -> Step s a
Yield b
x (Stream m b -> GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. s2 -> GbracketState s1 s2 v
GBracketException ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
step1 s
s))
            Skip s
s    -> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketState (Stream m b) (Stream m b) c) b
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a b. (a -> b) -> a -> b
$ GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. s -> Step s a
Skip (Stream m b -> GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. s2 -> GbracketState s1 s2 v
GBracketException ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
step1 s
s))
            Step s b
Stop      -> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. Step s a
Stop

data GbracketIOState s1 s2 v wref
    = GBracketIOInit
    | GBracketIONormal s1 v wref
    | GBracketIOException s2

-- | Run the alloc action @m c@ with async exceptions disabled but keeping
-- blocking operations interruptible (see 'Control.Exception.mask').  Use the
-- output @c@ as input to @c -> Stream m b@ to generate an output stream. When
-- generating the stream use the supplied @try@ operation @forall s. m s -> m
-- (Either e s)@ to catch synchronous exceptions. If an exception occurs run
-- the exception handler @c -> e -> Stream m b -> m (Stream m b)@. Note that
-- 'gbracket' does not rethrow the exception, it has to be done by the
-- exception handler if desired.
--
-- The cleanup action @c -> m d@, runs whenever the stream ends normally, due
-- to a sync or async exception or if it gets garbage collected after a partial
-- lazy evaluation.  See 'bracket' for the semantics of the cleanup action.
--
-- 'gbracket' can express all other exception handling combinators.
--
-- /Inhibits stream fusion/
--
-- /Pre-release/
{-# INLINE_NORMAL gbracket #-}
gbracket
    :: (MonadIO m, MonadBaseControl IO m)
    => m c -- ^ before
    -> (forall s. m s -> m (Either e s)) -- ^ try (exception handling)
    -> (c -> m d1) -- ^ on normal stop
    -> (c -> m d2) -- ^ on GC without normal stop or exception
    -> (c -> e -> Stream m b -> m (Stream m b)) -- ^ on exception
    -> (c -> Stream m b) -- ^ stream generator
    -> Stream m b
gbracket :: m c
-> (forall s. m s -> m (Either e s))
-> (c -> m d1)
-> (c -> m d2)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (c -> Stream m b)
-> Stream m b
gbracket m c
bef forall s. m s -> m (Either e s)
exc c -> m d1
aft c -> m d2
gc c -> e -> Stream m b -> m (Stream m b)
fexc c -> Stream m b
fnormal =
    (State Stream m b
 -> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
step GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. GbracketIOState s1 s2 v wref
GBracketIOInit

    where

    -- If the stream is never evaluated the "aft" action will never be
    -- called. For that to occur we will need the user of this API to pass a
    -- weak pointer to us.
    {-# INLINE_LATE step #-}
    step :: State Stream m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
step State Stream m b
_ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
GBracketIOInit = do
        -- We mask asynchronous exceptions to make the execution
        -- of 'bef' and the registration of 'aft' atomic.
        -- A similar thing is done in the resourcet package: https://git.io/JvKV3
        -- Tutorial: https://markkarpov.com/tutorial/exceptions.html
        (c
r, IOFinalizer
ref) <- (IO (StM m (c, IOFinalizer)) -> IO (StM m (c, IOFinalizer)))
-> m (c, IOFinalizer) -> m (c, IOFinalizer)
forall (b :: * -> *) (m :: * -> *) a c.
MonadBaseControl b m =>
(b (StM m a) -> b (StM m c)) -> m a -> m c
liftBaseOp_ IO (StM m (c, IOFinalizer)) -> IO (StM m (c, IOFinalizer))
forall a. IO a -> IO a
mask_ (m (c, IOFinalizer) -> m (c, IOFinalizer))
-> m (c, IOFinalizer) -> m (c, IOFinalizer)
forall a b. (a -> b) -> a -> b
$ do
            c
r <- m c
bef
            IOFinalizer
ref <- m d2 -> m IOFinalizer
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
m a -> m IOFinalizer
newIOFinalizer (c -> m d2
gc c
r)
            (c, IOFinalizer) -> m (c, IOFinalizer)
forall (m :: * -> *) a. Monad m => a -> m a
return (c
r, IOFinalizer
ref)
        Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
 -> Step
      (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall a b. (a -> b) -> a -> b
$ Stream m b
-> c
-> IOFinalizer
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal (c -> Stream m b
fnormal c
r) c
r IOFinalizer
ref

    step State Stream m b
gst (GBracketIONormal (UnStream State Stream m b -> s -> m (Step s b)
step1 s
st) c
v IOFinalizer
ref) = do
        Either e (Step s b)
res <- m (Step s b) -> m (Either e (Step s b))
forall s. m s -> m (Either e s)
exc (m (Step s b) -> m (Either e (Step s b)))
-> m (Step s b) -> m (Either e (Step s b))
forall a b. (a -> b) -> a -> b
$ State Stream m b -> s -> m (Step s b)
step1 State Stream m b
gst s
st
        case Either e (Step s b)
res of
            Right Step s b
r -> case Step s b
r of
                Yield b
x s
s ->
                    Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. a -> s -> Step s a
Yield b
x (Stream m b
-> c
-> IOFinalizer
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
step1 s
s) c
v IOFinalizer
ref)
                Skip s
s ->
                    Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (Stream m b
-> c
-> IOFinalizer
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
step1 s
s) c
v IOFinalizer
ref)
                Step s b
Stop ->
                    IOFinalizer -> m d1 -> m d1
forall (m :: * -> *) a.
MonadBaseControl IO m =>
IOFinalizer -> m a -> m a
clearingIOFinalizer IOFinalizer
ref (c -> m d1
aft c
v) m d1
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. Step s a
Stop
            -- XXX Do not handle async exceptions, just rethrow them.
            Left e
e -> do
                -- Clearing of finalizer and running of exception handler must
                -- be atomic wrt async exceptions. Otherwise if we have cleared
                -- the finalizer and have not run the exception handler then we
                -- may leak the resource.
                Stream m b
stream <- IOFinalizer -> m (Stream m b) -> m (Stream m b)
forall (m :: * -> *) a.
MonadBaseControl IO m =>
IOFinalizer -> m a -> m a
clearingIOFinalizer IOFinalizer
ref (c -> e -> Stream m b -> m (Stream m b)
fexc c
v e
e ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
UnStream State Stream m b -> s -> m (Step s b)
step1 s
st))
                Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (Stream m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException Stream m b
stream)
    step State Stream m b
gst (GBracketIOException (UnStream State Stream m b -> s -> m (Step s b)
step1 s
st)) = do
        Step s b
res <- State Stream m b -> s -> m (Step s b)
step1 State Stream m b
gst s
st
        case Step s b
res of
            Yield b
x s
s ->
                Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. a -> s -> Step s a
Yield b
x (Stream m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
step1 s
s))
            Skip s
s    -> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (Stream m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
step1 s
s))
            Step s b
Stop      -> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. Step s a
Stop

-- | See 'Streamly.Internal.Data.Stream.IsStream.before'.
--
{-# INLINE_NORMAL before #-}
before :: Monad m => m b -> Stream m a -> Stream m a
before :: m b -> Stream m a -> Stream m a
before m b
action (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a -> Maybe s -> m (Step (Maybe s) a))
-> Maybe s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> Maybe s -> m (Step (Maybe s) a)
step' Maybe s
forall a. Maybe a
Nothing

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> Maybe s -> m (Step (Maybe s) a)
step' State Stream m a
_ Maybe s
Nothing = m b
action m b -> m (Step (Maybe s) a) -> m (Step (Maybe s) a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (Maybe s) a -> m (Step (Maybe s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe s -> Step (Maybe s) a
forall s a. s -> Step s a
Skip (s -> Maybe s
forall a. a -> Maybe a
Just s
state))

    step' State Stream m a
gst (Just s
st) = do
        Step s a
res <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
res of
            Yield a
x s
s -> Step (Maybe s) a -> m (Step (Maybe s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe s) a -> m (Step (Maybe s) a))
-> Step (Maybe s) a -> m (Step (Maybe s) a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe s -> Step (Maybe s) a
forall s a. a -> s -> Step s a
Yield a
x (s -> Maybe s
forall a. a -> Maybe a
Just s
s)
            Skip s
s    -> Step (Maybe s) a -> m (Step (Maybe s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe s) a -> m (Step (Maybe s) a))
-> Step (Maybe s) a -> m (Step (Maybe s) a)
forall a b. (a -> b) -> a -> b
$ Maybe s -> Step (Maybe s) a
forall s a. s -> Step s a
Skip (s -> Maybe s
forall a. a -> Maybe a
Just s
s)
            Step s a
Stop      -> Step (Maybe s) a -> m (Step (Maybe s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Maybe s) a
forall s a. Step s a
Stop

-- | See 'Streamly.Internal.Data.Stream.IsStream.after_'.
--
{-# INLINE_NORMAL after_ #-}
after_ :: Monad m => m b -> Stream m a -> Stream m a
after_ :: m b -> Stream m a -> Stream m a
after_ m b
action (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step' s
state

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> s -> m (Step s a)
step' State Stream m a
gst s
st = do
        Step s a
res <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
res of
            Yield a
x s
s -> Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> m (Step s a)) -> Step s a -> m (Step s a)
forall a b. (a -> b) -> a -> b
$ a -> s -> Step s a
forall s a. a -> s -> Step s a
Yield a
x s
s
            Skip s
s    -> Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> m (Step s a)) -> Step s a -> m (Step s a)
forall a b. (a -> b) -> a -> b
$ s -> Step s a
forall s a. s -> Step s a
Skip s
s
            Step s a
Stop      -> m b
action m b -> m (Step s a) -> m (Step s a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step s a
forall s a. Step s a
Stop

-- | See 'Streamly.Internal.Data.Stream.IsStream.after'.
--
{-# INLINE_NORMAL after #-}
after :: (MonadIO m, MonadBaseControl IO m)
    => m b -> Stream m a -> Stream m a
after :: m b -> Stream m a -> Stream m a
after m b
action (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a
 -> Maybe (s, IOFinalizer) -> m (Step (Maybe (s, IOFinalizer)) a))
-> Maybe (s, IOFinalizer) -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> Maybe (s, IOFinalizer) -> m (Step (Maybe (s, IOFinalizer)) a)
step' Maybe (s, IOFinalizer)
forall a. Maybe a
Nothing

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> Maybe (s, IOFinalizer) -> m (Step (Maybe (s, IOFinalizer)) a)
step' State Stream m a
_ Maybe (s, IOFinalizer)
Nothing = do
        IOFinalizer
ref <- m b -> m IOFinalizer
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
m a -> m IOFinalizer
newIOFinalizer m b
action
        Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, IOFinalizer)) a
 -> m (Step (Maybe (s, IOFinalizer)) a))
-> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall s a. s -> Step s a
Skip (Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a)
-> Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall a b. (a -> b) -> a -> b
$ (s, IOFinalizer) -> Maybe (s, IOFinalizer)
forall a. a -> Maybe a
Just (s
state, IOFinalizer
ref)
    step' State Stream m a
gst (Just (s
st, IOFinalizer
ref)) = do
        Step s a
res <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
res of
            Yield a
x s
s -> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, IOFinalizer)) a
 -> m (Step (Maybe (s, IOFinalizer)) a))
-> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall s a. a -> s -> Step s a
Yield a
x ((s, IOFinalizer) -> Maybe (s, IOFinalizer)
forall a. a -> Maybe a
Just (s
s, IOFinalizer
ref))
            Skip s
s    -> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, IOFinalizer)) a
 -> m (Step (Maybe (s, IOFinalizer)) a))
-> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall s a. s -> Step s a
Skip ((s, IOFinalizer) -> Maybe (s, IOFinalizer)
forall a. a -> Maybe a
Just (s
s, IOFinalizer
ref))
            Step s a
Stop      -> do
                IOFinalizer -> m ()
forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref
                Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Maybe (s, IOFinalizer)) a
forall s a. Step s a
Stop

-- XXX For high performance error checks in busy streams we may need another
-- Error constructor in step.
--
-- | See 'Streamly.Internal.Data.Stream.IsStream.onException'.
--
{-# INLINE_NORMAL onException #-}
onException :: MonadCatch m => m b -> Stream m a -> Stream m a
onException :: m b -> Stream m a -> Stream m a
onException m b
action Stream m a
str =
    m ()
-> (forall s. m s -> m (Either SomeException s))
-> (() -> m ())
-> (() -> SomeException -> Stream m a -> Stream m a)
-> (() -> Stream m a)
-> Stream m a
forall (m :: * -> *) c e d b.
Monad m =>
m c
-> (forall s. m s -> m (Either e s))
-> (c -> m d)
-> (c -> e -> Stream m b -> Stream m b)
-> (c -> Stream m b)
-> Stream m b
gbracket_ (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ((m s -> m (Either SomeException s))
-> m s -> m (Either SomeException s)
forall a. a -> a
inline m s -> m (Either SomeException s)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try) () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return
        (\()
_ (SomeException
e :: MC.SomeException) Stream m a
_ -> m Any -> Stream m a
forall (m :: * -> *) b a. Applicative m => m b -> Stream m a
nilM (m b
action m b -> m Any -> m Any
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> m Any
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
MC.throwM SomeException
e))
        (Stream m a -> () -> Stream m a
forall a b. a -> b -> a
const Stream m a
str)

{-# INLINE_NORMAL _onException #-}
_onException :: MonadCatch m => m b -> Stream m a -> Stream m a
_onException :: m b -> Stream m a -> Stream m a
_onException m b
action (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step' s
state

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> s -> m (Step s a)
step' State Stream m a
gst s
st = do
        Step s a
res <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st m (Step s a) -> m b -> m (Step s a)
forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
`MC.onException` m b
action
        case Step s a
res of
            Yield a
x s
s -> Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> m (Step s a)) -> Step s a -> m (Step s a)
forall a b. (a -> b) -> a -> b
$ a -> s -> Step s a
forall s a. a -> s -> Step s a
Yield a
x s
s
            Skip s
s    -> Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> m (Step s a)) -> Step s a -> m (Step s a)
forall a b. (a -> b) -> a -> b
$ s -> Step s a
forall s a. s -> Step s a
Skip s
s
            Step s a
Stop      -> Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step s a
forall s a. Step s a
Stop

-- | See 'Streamly.Internal.Data.Stream.IsStream.bracket_'.
--
{-# INLINE_NORMAL bracket_ #-}
bracket_ :: MonadCatch m
    => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracket_ :: m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracket_ m b
bef b -> m c
aft =
    m b
-> (forall s. m s -> m (Either SomeException s))
-> (b -> m c)
-> (b -> SomeException -> Stream m a -> Stream m a)
-> (b -> Stream m a)
-> Stream m a
forall (m :: * -> *) c e d b.
Monad m =>
m c
-> (forall s. m s -> m (Either e s))
-> (c -> m d)
-> (c -> e -> Stream m b -> Stream m b)
-> (c -> Stream m b)
-> Stream m b
gbracket_ m b
bef ((m s -> m (Either SomeException s))
-> m s -> m (Either SomeException s)
forall a. a -> a
inline m s -> m (Either SomeException s)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try) b -> m c
aft
        (\b
a (SomeException
e :: SomeException) Stream m a
_ -> m Any -> Stream m a
forall (m :: * -> *) b a. Applicative m => m b -> Stream m a
nilM (b -> m c
aft b
a m c -> m Any -> m Any
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> m Any
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
MC.throwM SomeException
e))

-- | See 'Streamly.Internal.Data.Stream.IsStream.bracket'.
--
{-# INLINE_NORMAL bracket' #-}
bracket' :: (MonadAsync m, MonadCatch m) =>
       m b
    -> (b -> m c)
    -> (b -> m d)
    -> (b -> m e)
    -> (b -> Stream m a)
    -> Stream m a
bracket' :: m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
bracket' m b
bef b -> m c
aft b -> m d
exc b -> m e
gc =
    m b
-> (forall s. m s -> m (Either SomeException s))
-> (b -> m c)
-> (b -> m e)
-> (b -> SomeException -> Stream m a -> m (Stream m a))
-> (b -> Stream m a)
-> Stream m a
forall (m :: * -> *) c e d1 d2 b.
(MonadIO m, MonadBaseControl IO m) =>
m c
-> (forall s. m s -> m (Either e s))
-> (c -> m d1)
-> (c -> m d2)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (c -> Stream m b)
-> Stream m b
gbracket m b
bef ((m s -> m (Either SomeException s))
-> m s -> m (Either SomeException s)
forall a. a -> a
inline m s -> m (Either SomeException s)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try) b -> m c
aft b -> m e
gc
        (\b
a (SomeException
e :: SomeException) Stream m a
_ -> b -> m d
exc b
a m d -> m (Stream m a) -> m (Stream m a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Stream m a -> m (Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (m Any -> Stream m a
forall (m :: * -> *) b a. Applicative m => m b -> Stream m a
nilM (SomeException -> m Any
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
MC.throwM SomeException
e)))

data BracketState s v = BracketInit | BracketRun s v

-- | Alternate (custom) implementation of 'bracket'.
--
{-# INLINE_NORMAL _bracket #-}
_bracket :: MonadCatch m
    => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
_bracket :: m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
_bracket m b
bef b -> m c
aft b -> Stream m a
bet = (State Stream m a
 -> BracketState (Stream m a) b
 -> m (Step (BracketState (Stream m a) b) a))
-> BracketState (Stream m a) b -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> BracketState (Stream m a) b
-> m (Step (BracketState (Stream m a) b) a)
step' BracketState (Stream m a) b
forall s v. BracketState s v
BracketInit

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> BracketState (Stream m a) b
-> m (Step (BracketState (Stream m a) b) a)
step' State Stream m a
_ BracketState (Stream m a) b
BracketInit = m b
bef m b
-> (b -> m (Step (BracketState (Stream m a) b) a))
-> m (Step (BracketState (Stream m a) b) a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \b
x -> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (BracketState (Stream m a) b -> Step (BracketState (Stream m a) b) a
forall s a. s -> Step s a
Skip (Stream m a -> b -> BracketState (Stream m a) b
forall s v. s -> v -> BracketState s v
BracketRun (b -> Stream m a
bet b
x) b
x))

    -- NOTE: It is important to use UnStream instead of the Stream pattern
    -- here, otherwise we get huge perf degradation, see note in concatMap.
    step' State Stream m a
gst (BracketRun (UnStream State Stream m a -> s -> m (Step s a)
step s
state) b
v) = do
        -- res <- step gst state `MC.onException` aft v
        Either SomeException (Step s a)
res <- (m (Step s a) -> m (Either SomeException (Step s a)))
-> m (Step s a) -> m (Either SomeException (Step s a))
forall a. a -> a
inline m (Step s a) -> m (Either SomeException (Step s a))
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try (m (Step s a) -> m (Either SomeException (Step s a)))
-> m (Step s a) -> m (Either SomeException (Step s a))
forall a b. (a -> b) -> a -> b
$ State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
state
        case Either SomeException (Step s a)
res of
            Left (SomeException
e :: SomeException) -> b -> m c
aft b
v m c -> m Any -> m Any
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> m Any
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
MC.throwM SomeException
e m Any
-> m (Step (BracketState (Stream m a) b) a)
-> m (Step (BracketState (Stream m a) b) a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (BracketState (Stream m a) b) a
forall s a. Step s a
Stop
            Right Step s a
r -> case Step s a
r of
                Yield a
x s
s -> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (BracketState (Stream m a) b) a
 -> m (Step (BracketState (Stream m a) b) a))
-> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall a b. (a -> b) -> a -> b
$ a
-> BracketState (Stream m a) b
-> Step (BracketState (Stream m a) b) a
forall s a. a -> s -> Step s a
Yield a
x (Stream m a -> b -> BracketState (Stream m a) b
forall s v. s -> v -> BracketState s v
BracketRun ((State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step s
s) b
v)
                Skip s
s    -> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (BracketState (Stream m a) b) a
 -> m (Step (BracketState (Stream m a) b) a))
-> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall a b. (a -> b) -> a -> b
$ BracketState (Stream m a) b -> Step (BracketState (Stream m a) b) a
forall s a. s -> Step s a
Skip (Stream m a -> b -> BracketState (Stream m a) b
forall s v. s -> v -> BracketState s v
BracketRun ((State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step s
s) b
v)
                Step s a
Stop      -> b -> m c
aft b
v m c
-> m (Step (BracketState (Stream m a) b) a)
-> m (Step (BracketState (Stream m a) b) a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (BracketState (Stream m a) b) a
forall s a. Step s a
Stop

-- | See 'Streamly.Internal.Data.Stream.IsStream.finally_'.
--
{-# INLINE finally_ #-}
finally_ :: MonadCatch m => m b -> Stream m a -> Stream m a
finally_ :: m b -> Stream m a -> Stream m a
finally_ m b
action Stream m a
xs = m () -> (() -> m b) -> (() -> Stream m a) -> Stream m a
forall (m :: * -> *) b c a.
MonadCatch m =>
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracket_ (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (m b -> () -> m b
forall a b. a -> b -> a
const m b
action) (Stream m a -> () -> Stream m a
forall a b. a -> b -> a
const Stream m a
xs)

-- | See 'Streamly.Internal.Data.Stream.IsStream.finally'.
--
-- finally action xs = after action $ onException action xs
--
{-# INLINE finally #-}
finally :: (MonadAsync m, MonadCatch m) => m b -> Stream m a -> Stream m a
finally :: m b -> Stream m a -> Stream m a
finally m b
action Stream m a
xs = m ()
-> (() -> m b)
-> (() -> m b)
-> (() -> m b)
-> (() -> Stream m a)
-> Stream m a
forall (m :: * -> *) b c d e a.
(MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
bracket' (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) () -> m b
forall p. p -> m b
act () -> m b
forall p. p -> m b
act () -> m b
forall p. p -> m b
act (Stream m a -> () -> Stream m a
forall a b. a -> b -> a
const Stream m a
xs)
    where act :: p -> m b
act p
_ = m b
action

-- | See 'Streamly.Internal.Data.Stream.IsStream.ghandle'.
--
{-# INLINE_NORMAL ghandle #-}
ghandle :: (MonadCatch m, Exception e)
    => (e -> Stream m a -> Stream m a) -> Stream m a -> Stream m a
ghandle :: (e -> Stream m a -> Stream m a) -> Stream m a -> Stream m a
ghandle e -> Stream m a -> Stream m a
f Stream m a
str =
    m ()
-> (forall s. m s -> m (Either e s))
-> (() -> m ())
-> (() -> e -> Stream m a -> Stream m a)
-> (() -> Stream m a)
-> Stream m a
forall (m :: * -> *) c e d b.
Monad m =>
m c
-> (forall s. m s -> m (Either e s))
-> (c -> m d)
-> (c -> e -> Stream m b -> Stream m b)
-> (c -> Stream m b)
-> Stream m b
gbracket_ (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ((m s -> m (Either e s)) -> m s -> m (Either e s)
forall a. a -> a
inline m s -> m (Either e s)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try) () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ((e -> Stream m a -> Stream m a)
-> () -> e -> Stream m a -> Stream m a
forall a b. a -> b -> a
const e -> Stream m a -> Stream m a
f) (Stream m a -> () -> Stream m a
forall a b. a -> b -> a
const Stream m a
str)

-- | See 'Streamly.Internal.Data.Stream.IsStream.handle'.
--
{-# INLINE_NORMAL handle #-}
handle :: (MonadCatch m, Exception e)
    => (e -> Stream m a) -> Stream m a -> Stream m a
handle :: (e -> Stream m a) -> Stream m a -> Stream m a
handle e -> Stream m a
f Stream m a
str =
    m ()
-> (forall s. m s -> m (Either e s))
-> (() -> m ())
-> (() -> e -> Stream m a -> Stream m a)
-> (() -> Stream m a)
-> Stream m a
forall (m :: * -> *) c e d b.
Monad m =>
m c
-> (forall s. m s -> m (Either e s))
-> (c -> m d)
-> (c -> e -> Stream m b -> Stream m b)
-> (c -> Stream m b)
-> Stream m b
gbracket_ (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ((m s -> m (Either e s)) -> m s -> m (Either e s)
forall a. a -> a
inline m s -> m (Either e s)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try) () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return (\()
_ e
e Stream m a
_ -> e -> Stream m a
f e
e) (Stream m a -> () -> Stream m a
forall a b. a -> b -> a
const Stream m a
str)

-- | Alternate (custom) implementation of 'handle'.
--
{-# INLINE_NORMAL _handle #-}
_handle :: (MonadCatch m, Exception e)
    => (e -> Stream m a) -> Stream m a -> Stream m a
_handle :: (e -> Stream m a) -> Stream m a -> Stream m a
_handle e -> Stream m a
f (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a
 -> Either s (Stream m a) -> m (Step (Either s (Stream m a)) a))
-> Either s (Stream m a) -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> Either s (Stream m a) -> m (Step (Either s (Stream m a)) a)
step' (s -> Either s (Stream m a)
forall a b. a -> Either a b
Left s
state)

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> Either s (Stream m a) -> m (Step (Either s (Stream m a)) a)
step' State Stream m a
gst (Left s
st) = do
        Either e (Step s a)
res <- (m (Step s a) -> m (Either e (Step s a)))
-> m (Step s a) -> m (Either e (Step s a))
forall a. a -> a
inline m (Step s a) -> m (Either e (Step s a))
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try (m (Step s a) -> m (Either e (Step s a)))
-> m (Step s a) -> m (Either e (Step s a))
forall a b. (a -> b) -> a -> b
$ State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Either e (Step s a)
res of
            Left e
e -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m a)) a
 -> m (Step (Either s (Stream m a)) a))
-> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ Either s (Stream m a) -> Step (Either s (Stream m a)) a
forall s a. s -> Step s a
Skip (Either s (Stream m a) -> Step (Either s (Stream m a)) a)
-> Either s (Stream m a) -> Step (Either s (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Either s (Stream m a)
forall a b. b -> Either a b
Right (e -> Stream m a
f e
e)
            Right Step s a
r -> case Step s a
r of
                Yield a
x s
s -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m a)) a
 -> m (Step (Either s (Stream m a)) a))
-> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ a -> Either s (Stream m a) -> Step (Either s (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
x (s -> Either s (Stream m a)
forall a b. a -> Either a b
Left s
s)
                Skip s
s    -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m a)) a
 -> m (Step (Either s (Stream m a)) a))
-> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ Either s (Stream m a) -> Step (Either s (Stream m a)) a
forall s a. s -> Step s a
Skip (s -> Either s (Stream m a)
forall a b. a -> Either a b
Left s
s)
                Step s a
Stop      -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Either s (Stream m a)) a
forall s a. Step s a
Stop

    step' State Stream m a
gst (Right (UnStream State Stream m a -> s -> m (Step s a)
step1 s
st)) = do
        Step s a
res <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
        case Step s a
res of
            Yield a
x s
s -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m a)) a
 -> m (Step (Either s (Stream m a)) a))
-> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ a -> Either s (Stream m a) -> Step (Either s (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
x (Stream m a -> Either s (Stream m a)
forall a b. b -> Either a b
Right ((State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step1 s
s))
            Skip s
s    -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m a)) a
 -> m (Step (Either s (Stream m a)) a))
-> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ Either s (Stream m a) -> Step (Either s (Stream m a)) a
forall s a. s -> Step s a
Skip (Stream m a -> Either s (Stream m a)
forall a b. b -> Either a b
Right ((State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step1 s
s))
            Step s a
Stop      -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Either s (Stream m a)) a
forall s a. Step s a
Stop

data RetryState emap s1 s2
    = RetryWithMap emap s1
    | RetryDefault s2

-- | See 'Streamly.Internal.Data.Stream.IsStream.retry'
--
{-# INLINE_NORMAL retry #-}
retry
    :: forall e m a. (Exception e, Ord e, MonadCatch m)
    => Map e Int
       -- ^ map from exception to retry count
    -> (e -> Stream m a)
       -- ^ default handler for those exceptions that are not in the map
    -> Stream m a
    -> Stream m a
retry :: Map e Int -> (e -> Stream m a) -> Stream m a -> Stream m a
retry Map e Int
emap0 e -> Stream m a
defaultHandler (Stream State Stream m a -> s -> m (Step s a)
step0 s
state0) = (State Stream m a
 -> RetryState (Map e Int) s (Stream m a)
 -> m (Step (RetryState (Map e Int) s (Stream m a)) a))
-> RetryState (Map e Int) s (Stream m a) -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> RetryState (Map e Int) s (Stream m a)
-> m (Step (RetryState (Map e Int) s (Stream m a)) a)
forall a.
(Ord a, Num a) =>
State Stream m a
-> RetryState (Map e a) s (Stream m a)
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
step RetryState (Map e Int) s (Stream m a)
forall s2. RetryState (Map e Int) s s2
state

    where

    state :: RetryState (Map e Int) s s2
state = Map e Int -> s -> RetryState (Map e Int) s s2
forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e Int
emap0 s
state0

    {-# INLINE_LATE step #-}
    step :: State Stream m a
-> RetryState (Map e a) s (Stream m a)
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
step State Stream m a
gst (RetryWithMap Map e a
emap s
st) = do
        Either e (Step s a)
eres <- m (Step s a) -> m (Either e (Step s a))
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try (m (Step s a) -> m (Either e (Step s a)))
-> m (Step s a) -> m (Either e (Step s a))
forall a b. (a -> b) -> a -> b
$ State Stream m a -> s -> m (Step s a)
step0 State Stream m a
gst s
st
        case Either e (Step s a)
eres of
            Left e
e -> e
-> Map e a -> s -> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall (m :: * -> *) a s1 a.
(Monad m, Ord a, Num a) =>
e
-> Map e a
-> s1
-> m (Step (RetryState (Map e a) s1 (Stream m a)) a)
handler e
e Map e a
emap s
st
            Right Step s a
res ->
                Step (RetryState (Map e a) s (Stream m a)) a
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return
                    (Step (RetryState (Map e a) s (Stream m a)) a
 -> m (Step (RetryState (Map e a) s (Stream m a)) a))
-> Step (RetryState (Map e a) s (Stream m a)) a
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
res of
                          Yield a
x s
st1 -> a
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
x (RetryState (Map e a) s (Stream m a)
 -> Step (RetryState (Map e a) s (Stream m a)) a)
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Map e a -> s -> RetryState (Map e a) s (Stream m a)
forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e a
emap s
st1
                          Skip s
st1 -> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. s -> Step s a
Skip (RetryState (Map e a) s (Stream m a)
 -> Step (RetryState (Map e a) s (Stream m a)) a)
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Map e a -> s -> RetryState (Map e a) s (Stream m a)
forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e a
emap s
st1
                          Step s a
Stop -> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. Step s a
Stop
    step State Stream m a
gst (RetryDefault (UnStream State Stream m a -> s -> m (Step s a)
step1 s
state1)) = do
        Step s a
res <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
state1
        Step (RetryState (Map e a) s (Stream m a)) a
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (RetryState (Map e a) s (Stream m a)) a
 -> m (Step (RetryState (Map e a) s (Stream m a)) a))
-> Step (RetryState (Map e a) s (Stream m a)) a
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
res of
                  Yield a
x s
st1 -> a
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
x (RetryState (Map e a) s (Stream m a)
 -> Step (RetryState (Map e a) s (Stream m a)) a)
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> RetryState (Map e a) s (Stream m a)
forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault ((State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step1 s
st1)
                  Skip s
st1 -> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. s -> Step s a
Skip (RetryState (Map e a) s (Stream m a)
 -> Step (RetryState (Map e a) s (Stream m a)) a)
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> RetryState (Map e a) s (Stream m a)
forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault ((State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step1 s
st1)
                  Step s a
Stop -> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. Step s a
Stop

    {-# INLINE handler #-}
    handler :: e
-> Map e a
-> s1
-> m (Step (RetryState (Map e a) s1 (Stream m a)) a)
handler e
e Map e a
emap s1
st =
        Step (RetryState (Map e a) s1 (Stream m a)) a
-> m (Step (RetryState (Map e a) s1 (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (RetryState (Map e a) s1 (Stream m a)) a
 -> m (Step (RetryState (Map e a) s1 (Stream m a)) a))
-> Step (RetryState (Map e a) s1 (Stream m a)) a
-> m (Step (RetryState (Map e a) s1 (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ RetryState (Map e a) s1 (Stream m a)
-> Step (RetryState (Map e a) s1 (Stream m a)) a
forall s a. s -> Step s a
Skip
            (RetryState (Map e a) s1 (Stream m a)
 -> Step (RetryState (Map e a) s1 (Stream m a)) a)
-> RetryState (Map e a) s1 (Stream m a)
-> Step (RetryState (Map e a) s1 (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ case e -> Map e a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup e
e Map e a
emap of
                  Just a
i
                      | a
i a -> a -> Bool
forall a. Ord a => a -> a -> Bool
> a
0 ->
                          let emap1 :: Map e a
emap1 = e -> a -> Map e a -> Map e a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert e
e (a
i a -> a -> a
forall a. Num a => a -> a -> a
- a
1) Map e a
emap
                           in Map e a -> s1 -> RetryState (Map e a) s1 (Stream m a)
forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e a
emap1 s1
st
                      | Bool
otherwise -> Stream m a -> RetryState (Map e a) s1 (Stream m a)
forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault (Stream m a -> RetryState (Map e a) s1 (Stream m a))
-> Stream m a -> RetryState (Map e a) s1 (Stream m a)
forall a b. (a -> b) -> a -> b
$ e -> Stream m a
defaultHandler e
e
                  Maybe a
Nothing -> Stream m a -> RetryState (Map e a) s1 (Stream m a)
forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault (Stream m a -> RetryState (Map e a) s1 (Stream m a))
-> Stream m a -> RetryState (Map e a) s1 (Stream m a)
forall a b. (a -> b) -> a -> b
$ e -> Stream m a
defaultHandler e
e