-- | -- Module : Streamly.Internal.Data.Unfold.Exception -- Copyright : (c) 2019 Composewell Technologies -- License : BSD3 -- Maintainer : streamly@composewell.com -- Stability : experimental -- Portability : GHC -- -- Lifted resource management primitives. module Streamly.Internal.Data.Unfold.Exception ( gbracket , after , finally , bracket ) where #include "inline.hs" import Control.Exception (mask_) import Control.Monad.Catch (MonadCatch) import Streamly.Internal.Control.Concurrent (MonadRunInIO, MonadAsync, withRunInIO) import Streamly.Internal.Data.IOFinalizer.Lifted (newIOFinalizer, runIOFinalizer, clearingIOFinalizer) import Streamly.Internal.Data.Stream.StreamD.Type (Step(..)) import qualified Control.Monad.Catch as MC import Streamly.Internal.Data.Unfold.Type -- | Run the alloc action @a -> m c@ with async exceptions disabled but keeping -- blocking operations interruptible (see 'Control.Exception.mask'). Use the -- output @c@ as input to @Unfold m c b@ to generate an output stream. When -- unfolding use the supplied @try@ operation @forall s. m s -> m (Either e s)@ -- to catch synchronous exceptions. If an exception occurs run the exception -- handling unfold @Unfold m (c, e) b@. -- -- The cleanup action @c -> m d@, runs whenever the stream ends normally, due -- to a sync or async exception or if it gets garbage collected after a partial -- lazy evaluation. See 'bracket' for the semantics of the cleanup action. -- -- 'gbracket' can express all other exception handling combinators. -- -- /Inhibits stream fusion/ -- -- /Pre-release/ {-# INLINE_NORMAL gbracket #-} gbracket :: MonadRunInIO m => (a -> m c) -- ^ before -> (c -> m d) -- ^ after, on normal stop, or GC -> Unfold m (c, e) b -- ^ on exception -> (forall s. m s -> m (Either e s)) -- ^ try (exception handling) -> Unfold m c b -- ^ unfold to run -> Unfold m a b gbracket bef aft (Unfold estep einject) ftry (Unfold step1 inject1) = Unfold step inject where inject x = do -- Mask asynchronous exceptions to make the execution of 'bef' and -- the registration of 'aft' atomic. See comment in 'D.gbracketIO'. (r, ref) <- withRunInIO $ \run -> mask_ $ run $ do r <- bef x ref <- newIOFinalizer (aft r) return (r, ref) s <- inject1 r return $ Right (s, r, ref) {-# INLINE_LATE step #-} step (Right (st, v, ref)) = do res <- ftry $ step1 st case res of Right r -> case r of Yield x s -> return $ Yield x (Right (s, v, ref)) Skip s -> return $ Skip (Right (s, v, ref)) Stop -> do runIOFinalizer ref return Stop -- XXX Do not handle async exceptions, just rethrow them. Left e -> do -- Clearing of finalizer and running of exception handler must -- be atomic wrt async exceptions. Otherwise if we have cleared -- the finalizer and have not run the exception handler then we -- may leak the resource. r <- clearingIOFinalizer ref (einject (v, e)) return $ Skip (Left r) step (Left st) = do res <- estep st return $ case res of Yield x s -> Yield x (Left s) Skip s -> Skip (Left s) Stop -> Stop -- | Unfold the input @a@ using @Unfold m a b@, run an action on @a@ whenever -- the unfold stops normally, or if it is garbage collected after a partial -- lazy evaluation. -- -- The semantics of the action @a -> m c@ are similar to the cleanup action -- semantics in 'bracket'. -- -- /See also 'after_'/ -- -- /Pre-release/ {-# INLINE_NORMAL after #-} after :: MonadRunInIO m => (a -> m c) -> Unfold m a b -> Unfold m a b after action (Unfold step1 inject1) = Unfold step inject where inject x = do s <- inject1 x ref <- newIOFinalizer (action x) return (s, ref) {-# INLINE_LATE step #-} step (st, ref) = do res <- step1 st case res of Yield x s -> return $ Yield x (s, ref) Skip s -> return $ Skip (s, ref) Stop -> do runIOFinalizer ref return Stop -- | Unfold the input @a@ using @Unfold m a b@, run an action on @a@ whenever -- the unfold stops normally, aborts due to an exception or if it is garbage -- collected after a partial lazy evaluation. -- -- The semantics of the action @a -> m c@ are similar to the cleanup action -- semantics in 'bracket'. -- -- @ -- finally release = bracket return release -- @ -- -- /See also 'finally_'/ -- -- /Inhibits stream fusion/ -- -- /Pre-release/ {-# INLINE_NORMAL finally #-} finally :: (MonadAsync m, MonadCatch m) => (a -> m c) -> Unfold m a b -> Unfold m a b finally action (Unfold step1 inject1) = Unfold step inject where inject x = do s <- inject1 x ref <- newIOFinalizer (action x) return (s, ref) {-# INLINE_LATE step #-} step (st, ref) = do res <- step1 st `MC.onException` runIOFinalizer ref case res of Yield x s -> return $ Yield x (s, ref) Skip s -> return $ Skip (s, ref) Stop -> do runIOFinalizer ref return Stop -- | Run the alloc action @a -> m c@ with async exceptions disabled but keeping -- blocking operations interruptible (see 'Control.Exception.mask'). Use the -- output @c@ as input to @Unfold m c b@ to generate an output stream. -- -- @c@ is usually a resource under the state of monad @m@, e.g. a file -- handle, that requires a cleanup after use. The cleanup action @c -> m d@, -- runs whenever the stream ends normally, due to a sync or async exception or -- if it gets garbage collected after a partial lazy evaluation. -- -- 'bracket' only guarantees that the cleanup action runs, and it runs with -- async exceptions enabled. The action must ensure that it can successfully -- cleanup the resource in the face of sync or async exceptions. -- -- When the stream ends normally or on a sync exception, cleanup action runs -- immediately in the current thread context, whereas in other cases it runs in -- the GC context, therefore, cleanup may be delayed until the GC gets to run. -- -- /See also: 'bracket_', 'gbracket'/ -- -- /Inhibits stream fusion/ -- -- /Pre-release/ {-# INLINE_NORMAL bracket #-} bracket :: (MonadAsync m, MonadCatch m) => (a -> m c) -> (c -> m d) -> Unfold m c b -> Unfold m a b bracket bef aft (Unfold step1 inject1) = Unfold step inject where inject x = do -- Mask asynchronous exceptions to make the execution of 'bef' and -- the registration of 'aft' atomic. See comment in 'D.gbracketIO'. (r, ref) <- withRunInIO $ \run -> mask_ $ run $ do r <- bef x ref <- newIOFinalizer (aft r) return (r, ref) s <- inject1 r return (s, ref) {-# INLINE_LATE step #-} step (st, ref) = do res <- step1 st `MC.onException` runIOFinalizer ref case res of Yield x s -> return $ Yield x (s, ref) Skip s -> return $ Skip (s, ref) Stop -> do runIOFinalizer ref return Stop