{-# LANGUAGE CPP #-} module Control.Concurrent.Spawn ( -- * Spawn spawn -- * Spawn with @'try'@ , Result , spawnTry -- * Higher-level functions , parMapIO , parMapIO_ , (|*|) -- * Limiting concurrency , pool ) where import Control.Concurrent import Control.Exception import Control.Monad -- | Two ways a computation of type @'IO' a@ can end. type Result a = Either SomeException a -- | Spawn a concurrent computation. Produces an action which -- demands a @'Result'@. spawnTry :: IO a -> IO (IO (Result a)) -- We block asynchronous exceptions around 'forkIO', then restore -- the parent thread's exception mask state inside 'try'. This -- prevents an exception from interrupting 'putMVar', which would -- deadlock the parent. -- -- The API for doing this changed in base-4.3 with GHC 7.0. #if MIN_VERSION_base(4,3,0) spawnTry m = do v <- newEmptyMVar _ <- mask $ \restore -> forkIO (try (restore m) >>= putMVar v) return (readMVar v) #else spawnTry m = do v <- newEmptyMVar b <- blocked _ <- block $ forkIO (try (if b then m else unblock m) >>= putMVar v) return (readMVar v) #endif -- | Spawn a concurrent computation. Produces an action which -- demands the result. Any exception from the original computation -- is re-thrown when and where the result is demanded. spawn :: IO a -> IO (IO a) spawn m = do r <- spawnTry m return (r >>= either throwIO return) -- | Given /n/, produces a function to wrap @'IO'@ actions. -- No more than /n/ wrapped actions will be in progress at -- one time. pool :: Int -> IO (IO a -> IO a) pool n = do s <- newQSem n return $ bracket_ (waitQSem s) (signalQSem s) -- | Execute a separate thread of IO for each element of a list, and -- collect results. -- -- The analogy to @parMap@ is misleading. The concurrent execution -- of these actions is non-deterministic and can affect results. -- However, @'parMapIO'@ is expected to be most useful for actions -- which do not interact. parMapIO :: (a -> IO b) -> [a] -> IO [b] parMapIO f xs = mapM (spawn . f) xs >>= sequence -- | Execute a separate thread of IO for each element of a list. -- -- Results are discarded, but the @'parMapIO_'@ action does not -- complete until all threads have finished. parMapIO_ :: (a -> IO b) -> [a] -> IO () parMapIO_ f xs = mapM (spawn . f) xs >>= sequence_ infixl 4 |*| -- | A concurrent version of @'ap'@ or @(\<*\>)@ for @'IO'@. -- -- Spawns a thread for the right-hand action, while executing the -- left-hand action in the current thread. (|*|) :: IO (a -> b) -> IO a -> IO b mf |*| mx = spawn mx >>= (mf `ap`)