{-# LANGUAGE CPP #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE UnboxedTuples #-}
{-# OPTIONS_HADDOCK hide #-}
module Data.Array.Accelerate.Async (
Async,
async, asyncOn, asyncBound,
wait, poll, cancel,
) where
import Control.Exception
import Control.Concurrent
import GHC.Exts
import GHC.Conc
import GHC.IO
data Async a = Async {-# UNPACK #-} !ThreadId
{-# UNPACK #-} !(MVar (Either SomeException a))
async :: IO a -> IO (Async a)
async = inline asyncUsing rawForkIO
asyncOn :: Int -> IO a -> IO (Async a)
asyncOn cpu = inline asyncUsing (rawForkOn cpu)
asyncBound :: IO a -> IO (Async a)
asyncBound = inline asyncUsing forkOS
asyncUsing :: (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing fork action = do
var <- newEmptyMVar
tid <- mask $ \restore ->
fork $ try (restore action) >>= putMVar var
return (Async tid var)
{-# INLINE wait #-}
wait :: Async a -> IO a
wait (Async _ var) = either throwIO return =<< readMVar var
{-# INLINE poll #-}
poll :: Async a -> IO (Maybe a)
poll (Async _ var) =
maybe (return Nothing) (either throwIO (return . Just)) =<< tryReadMVar var
{-# INLINE cancel #-}
cancel :: Async a -> IO ()
cancel (Async tid _) = throwTo tid ThreadKilled
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
rawForkIO action = IO $ \s ->
case fork# action s of
(# s', tid #) -> (# s', ThreadId tid #)
{-# INLINE rawForkOn #-}
rawForkOn :: Int -> IO () -> IO ThreadId
rawForkOn (I# cpu) action = IO $ \s ->
case forkOn# cpu action s of
(# s', tid #) -> (# s', ThreadId tid #)