-- | Operations for running IO operations asynchronously.

-- These are the same as in the 'async' package. We do not use
-- 'async' to avoid its dependencies.

{- License for the 'async' package
Copyright (c) 2012, Simon Marlow

All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

    * Redistributions of source code must retain the above copyright
      notice, this list of conditions and the following disclaimer.

    * Redistributions in binary form must reproduce the above
      copyright notice, this list of conditions and the following
      disclaimer in the documentation and/or other materials provided
      with the distribution.

    * Neither the name of Simon Marlow nor the names of other
      contributors may be used to endorse or promote products derived
      from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-}

{-# LANGUAGE DeriveDataTypeable, MagicHash, UnboxedTuples #-}

module Control.Concurrent.Async (
  async, withAsync, wait, asyncThreadId, cancel, concurrently
  ) where

import Control.Concurrent.STM
import Control.Exception
import Control.Concurrent
import Control.Monad
import Data.IORef
import Data.Typeable
import GHC.Conc
import GHC.Exts
import GHC.IO hiding (onException)

-- | An asynchronous action spawned by 'async' or 'withAsync'.
-- Asynchronous actions are executed in a separate thread, and
-- operations are provided for waiting for asynchronous actions to
-- complete and obtaining their results (see e.g. 'wait').
--
data Async a = Async
  { forall a. Async a -> ThreadId
asyncThreadId :: {-# UNPACK #-} !ThreadId
                  -- ^ Returns the 'ThreadId' of the thread running
                  -- the given 'Async'.
  , forall a. Async a -> STM (Either SomeException a)
_asyncWait    :: STM (Either SomeException a)
  }

-- | Spawn an asynchronous action in a separate thread.
async :: IO a -> IO (Async a)
async :: forall a. IO a -> IO (Async a)
async = forall a. a -> a
inline forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
rawForkIO

asyncUsing :: (IO () -> IO ThreadId)
           -> IO a -> IO (Async a)
asyncUsing :: forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
doFork = \IO a
action -> do
   TMVar (Either SomeException a)
var <- forall a. IO (TMVar a)
newEmptyTMVarIO
   -- t <- forkFinally action (\r -> atomically $ putTMVar var r)
   -- slightly faster:
   ThreadId
t <- forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore ->
          IO () -> IO ThreadId
doFork forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => IO a -> IO (Either e a)
try (forall a. IO a -> IO a
restore IO a
action) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException a)
var
   forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. ThreadId -> STM (Either SomeException a) -> Async a
Async ThreadId
t (forall a. TMVar a -> STM a
readTMVar TMVar (Either SomeException a)
var))

-- | Spawn an asynchronous action in a separate thread, and pass its
-- @Async@ handle to the supplied function.  When the function returns
-- or throws an exception, 'uninterruptibleCancel' is called on the @Async@.
--
-- > withAsync action inner = mask $ \restore -> do
-- >   a <- async (restore action)
-- >   restore (inner a) `finally` uninterruptibleCancel a
--
-- This is a useful variant of 'async' that ensures an @Async@ is
-- never left running unintentionally.
--
-- Note: a reference to the child thread is kept alive until the call
-- to `withAsync` returns, so nesting many `withAsync` calls requires
-- linear memory.
--
withAsync :: IO a -> (Async a -> IO b) -> IO b
withAsync :: forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync = forall a. a -> a
inline forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
rawForkIO

withAsyncUsing :: (IO () -> IO ThreadId)
               -> IO a -> (Async a -> IO b) -> IO b
-- The bracket version works, but is slow.  We can do better by
-- hand-coding it:
withAsyncUsing :: forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
doFork = \IO a
action Async a -> IO b
inner -> do
  TMVar (Either SomeException a)
var <- forall a. IO (TMVar a)
newEmptyTMVarIO
  forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
    ThreadId
t <- IO () -> IO ThreadId
doFork forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => IO a -> IO (Either e a)
try (forall a. IO a -> IO a
restore IO a
action) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException a)
var
    let a :: Async a
a = forall a. ThreadId -> STM (Either SomeException a) -> Async a
Async ThreadId
t (forall a. TMVar a -> STM a
readTMVar TMVar (Either SomeException a)
var)
    b
r <- forall a. IO a -> IO a
restore (Async a -> IO b
inner Async a
a) forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` \SomeException
e -> do
      forall a. Async a -> IO ()
uninterruptibleCancel Async a
a
      forall e a. Exception e => e -> IO a
throwIO SomeException
e
    forall a. Async a -> IO ()
uninterruptibleCancel Async a
a
    forall (m :: * -> *) a. Monad m => a -> m a
return b
r

-- | Wait for an asynchronous action to complete, and return its
-- value.  If the asynchronous action threw an exception, then the
-- exception is re-thrown by 'wait'.
--
-- > wait = atomically . waitSTM
--
{-# INLINE wait #-}
wait :: Async a -> IO a
wait :: forall a. Async a -> IO a
wait = forall a. IO a -> IO a
tryAgain forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> STM a
waitSTM
  where
    -- See: https://github.com/simonmar/async/issues/14
    tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Control.Exception.catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f

-- | Wait for an asynchronous action to complete, and return either
-- @Left e@ if the action raised an exception @e@, or @Right a@ if it
-- returned a value @a@.
--
-- > waitCatch = atomically . waitCatchSTM
--
{-# INLINE waitCatch #-}
waitCatch :: Async a -> IO (Either SomeException a)
waitCatch :: forall a. Async a -> IO (Either SomeException a)
waitCatch = forall a. IO a -> IO a
tryAgain forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> STM (Either SomeException a)
waitCatchSTM
  where
    -- See: https://github.com/simonmar/async/issues/14
    tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Control.Exception.catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f

-- | A version of 'wait' that can be used inside an STM transaction.
--
waitSTM :: Async a -> STM a
waitSTM :: forall a. Async a -> STM a
waitSTM Async a
a = do
   Either SomeException a
r <- forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
a
   forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall e a. Exception e => e -> STM a
throwSTM forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException a
r

-- | A version of 'waitCatch' that can be used inside an STM transaction.
--
{-# INLINE waitCatchSTM #-}
waitCatchSTM :: Async a -> STM (Either SomeException a)
waitCatchSTM :: forall a. Async a -> STM (Either SomeException a)
waitCatchSTM (Async ThreadId
_ STM (Either SomeException a)
w) = STM (Either SomeException a)
w

-- | Cancel an asynchronous action by throwing the @AsyncCancelled@
-- exception to it, and waiting for the `Async` thread to quit.
-- Has no effect if the 'Async' has already completed.
--
-- > cancel a = throwTo (asyncThreadId a) AsyncCancelled <* waitCatch a
--
-- Note that 'cancel' will not terminate until the thread the 'Async'
-- refers to has terminated. This means that 'cancel' will block for
-- as long said thread blocks when receiving an asynchronous exception.
--
-- For example, it could block if:
--
-- * It's executing a foreign call, and thus cannot receive the asynchronous
-- exception;
-- * It's executing some cleanup handler after having received the exception,
-- and the handler is blocking.
{-# INLINE cancel #-}
cancel :: Async a -> IO ()
cancel :: forall a. Async a -> IO ()
cancel a :: Async a
a@(Async ThreadId
t STM (Either SomeException a)
_) = forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
t AsyncCancelled
AsyncCancelled forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Functor f => f a -> f ()
void (forall a. Async a -> IO (Either SomeException a)
waitCatch Async a
a)

-- | The exception thrown by `cancel` to terminate a thread.
data AsyncCancelled = AsyncCancelled
  deriving (Int -> AsyncCancelled -> ShowS
[AsyncCancelled] -> ShowS
AsyncCancelled -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [AsyncCancelled] -> ShowS
$cshowList :: [AsyncCancelled] -> ShowS
show :: AsyncCancelled -> String
$cshow :: AsyncCancelled -> String
showsPrec :: Int -> AsyncCancelled -> ShowS
$cshowsPrec :: Int -> AsyncCancelled -> ShowS
Show, AsyncCancelled -> AsyncCancelled -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: AsyncCancelled -> AsyncCancelled -> Bool
$c/= :: AsyncCancelled -> AsyncCancelled -> Bool
== :: AsyncCancelled -> AsyncCancelled -> Bool
$c== :: AsyncCancelled -> AsyncCancelled -> Bool
Eq, Typeable)

instance Exception AsyncCancelled where
#if __GLASGOW_HASKELL__ >= 708
  fromException :: SomeException -> Maybe AsyncCancelled
fromException = forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException
  toException :: AsyncCancelled -> SomeException
toException = forall e. Exception e => e -> SomeException
asyncExceptionToException
#endif

-- | Cancel an asynchronous action
--
-- This is a variant of `cancel`, but it is not interruptible.
{-# INLINE uninterruptibleCancel #-}
uninterruptibleCancel :: Async a -> IO ()
uninterruptibleCancel :: forall a. Async a -> IO ()
uninterruptibleCancel = forall a. IO a -> IO a
uninterruptibleMask_ forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> IO ()
cancel

-- | Run two @IO@ actions concurrently, and return both results.  If
-- either action throws an exception at any time, then the other
-- action is 'cancel'led, and the exception is re-thrown by
-- 'concurrently'.
--
-- > concurrently left right =
-- >   withAsync left $ \a ->
-- >   withAsync right $ \b ->
-- >   waitBoth a b
concurrently :: IO a -> IO b -> IO (a,b)
concurrently :: forall a b. IO a -> IO b -> IO (a, b)
concurrently IO a
left IO b
right = forall a b r.
IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right (forall {e} {a} {b}.
Exception e =>
[Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect [])
  where
    collect :: [Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect [Left a
a, Right b
b] IO (Either e (Either a b))
_ = forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)
    collect [Right b
b, Left a
a] IO (Either e (Either a b))
_ = forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)
    collect [Either a b]
xs IO (Either e (Either a b))
m = do
        Either e (Either a b)
e <- IO (Either e (Either a b))
m
        case Either e (Either a b)
e of
            Left e
ex -> forall e a. Exception e => e -> IO a
throwIO e
ex
            Right Either a b
r -> [Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect (Either a b
rforall a. a -> [a] -> [a]
:[Either a b]
xs) IO (Either e (Either a b))
m

concurrently' :: IO a -> IO b
             -> (IO (Either SomeException (Either a b)) -> IO r)
             -> IO r
concurrently' :: forall a b r.
IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right IO (Either SomeException (Either a b)) -> IO r
collect = do
    MVar (Either SomeException (Either a b))
done <- forall a. IO (MVar a)
newEmptyMVar
    forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
        -- Note: uninterruptibleMask here is because we must not allow
        -- the putMVar in the exception handler to be interrupted,
        -- otherwise the parent thread will deadlock when it waits for
        -- the thread to terminate.
        ThreadId
lid <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ forall a. IO a -> IO a
uninterruptibleMask_ forall a b. (a -> b) -> a -> b
$
          forall a. IO a -> IO a
restore (IO a
left forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> Either a b
Left)
            forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` (forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> Either a b
Left)
        ThreadId
rid <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ forall a. IO a -> IO a
uninterruptibleMask_ forall a b. (a -> b) -> a -> b
$
          forall a. IO a -> IO a
restore (IO b
right forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right)
            forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` (forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> Either a b
Left)

        IORef Int
count <- forall a. a -> IO (IORef a)
newIORef (Int
2 :: Int)
        let takeDone :: IO (Either SomeException (Either a b))
takeDone = do
                Either SomeException (Either a b)
r <- forall a. MVar a -> IO a
takeMVar MVar (Either SomeException (Either a b))
done      -- interruptible
                -- Decrement the counter so we know how many takes are left.
                -- Since only the parent thread is calling this, we can
                -- use non-atomic modifications.
                -- NB. do this *after* takeMVar, because takeMVar might be
                -- interrupted.
                forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef Int
count (forall a. Num a => a -> a -> a
subtract Int
1)
                forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException (Either a b)
r

        let tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Control.Exception.catch` \BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar -> IO a
f

            stop :: IO ()
stop = do
                -- kill right before left, to match the semantics of
                -- the version using withAsync. (#27)
                forall a. IO a -> IO a
uninterruptibleMask_ forall a b. (a -> b) -> a -> b
$ do
                  Int
count' <- forall a. IORef a -> IO a
readIORef IORef Int
count
                  -- we only need to use killThread if there are still
                  -- children alive.  Note: forkIO here is because the
                  -- child thread could be in an uninterruptible
                  -- putMVar.
                  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
count' forall a. Ord a => a -> a -> Bool
> Int
0) forall a b. (a -> b) -> a -> b
$
                    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ do
                      forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
rid AsyncCancelled
AsyncCancelled
                      forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
lid AsyncCancelled
AsyncCancelled
                  -- ensure the children are really dead
                  forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
count' (forall a. IO a -> IO a
tryAgain forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar MVar (Either SomeException (Either a b))
done)

        r
r <- IO (Either SomeException (Either a b)) -> IO r
collect (forall a. IO a -> IO a
tryAgain forall a b. (a -> b) -> a -> b
$ IO (Either SomeException (Either a b))
takeDone) forall a b. IO a -> IO b -> IO a
`onException` IO ()
stop
        IO ()
stop
        forall (m :: * -> *) a. Monad m => a -> m a
return r
r

catchAll :: IO a -> (SomeException -> IO a) -> IO a
catchAll :: forall a. IO a -> (SomeException -> IO a) -> IO a
catchAll = forall e a. Exception e => IO a -> (e -> IO a) -> IO a
Control.Exception.catch

-- A version of forkIO that does not include the outer exception
-- handler: saves a bit of time when we will be installing our own
-- exception handler.
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
rawForkIO :: IO () -> IO ThreadId
rawForkIO IO ()
action = forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
IO forall a b. (a -> b) -> a -> b
$ \ State# RealWorld
s ->
   case (forall a.
a -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
fork# (forall a. IO a -> State# RealWorld -> (# State# RealWorld, a #)
unIO IO ()
action) State# RealWorld
s) of (# State# RealWorld
s1, ThreadId#
tid #) -> (# State# RealWorld
s1, ThreadId# -> ThreadId
ThreadId ThreadId#
tid #)