module Control.Concurrent.PooledIO.Monad where
import Control.Concurrent.MVar (MVar, newEmptyMVar, takeMVar, putMVar)
import Control.Concurrent (ThreadId, forkIO, getNumCapabilities, myThreadId, killThread)
import Control.DeepSeq (NFData, ($!!))
import Control.Exception (SomeException, finally, try, throw)
import qualified Control.Monad.Trans.State as MS
import qualified Control.Monad.Trans.Reader as MR
import qualified Control.Monad.Trans.Class as MT
import Control.Monad.IO.Class (liftIO)
import Control.Monad (replicateM_, liftM2)
import Control.Functor.HT (void)
import qualified Data.Foldable as Fold
import qualified Data.Set as Set; import Data.Set (Set)
type T = MR.ReaderT (MVar ()) (MS.StateT Int IO)
fork :: (NFData a) => IO a -> T (IO a)
fork act = do
complete <- MR.ask
initial <- MT.lift MS.get
if initial>0
then MT.lift $ MS.put (initial1)
else liftIO $ takeMVar complete
liftIO $ do
result <- newEmptyMVar
forkFinally complete $ (putMVar result $!!) =<< act
return $ takeMVar result
forkFinally :: MVar () -> IO () -> IO ()
forkFinally mvar act =
void $ forkIO $ finally act $ putMVar mvar ()
forkTry ::
(NFData a) =>
MVar (ThreadId, Either SomeException a) -> IO a ->
MS.StateT (Set ThreadId) IO ()
forkTry mvar act = do
thread <-
liftIO $ forkIO $
applyStrictRight (putMVar mvar) =<< liftM2 (,) myThreadId (try act)
MS.modify (Set.insert thread)
applyStrictRight :: (NFData a) => ((id, Either e a) -> b) -> (id, Either e a) -> b
applyStrictRight f (thread, ee) =
case ee of
Left e -> f (thread, Left e)
Right a -> f . (,) thread . Right $!! a
takeMVarTry ::
MVar (ThreadId, Either SomeException a) ->
MS.StateT (Set ThreadId) IO a
takeMVarTry mvar = do
(thread, ee) <- liftIO $ takeMVar mvar
MS.modify (Set.delete thread)
case ee of
Left e -> do liftIO . Fold.mapM_ killThread =<< MS.get; liftIO $ throw e
Right a -> return a
runTry :: MS.StateT (Set ThreadId) IO a -> IO a
runTry act = MS.evalStateT act Set.empty
chooseNumCapabilities :: Maybe Int -> IO Int
chooseNumCapabilities = maybe getNumCapabilities return
withNumCapabilities :: (Int -> a -> IO b) -> a -> IO b
withNumCapabilities run acts = do
numCaps <- getNumCapabilities
run numCaps acts
runLimited :: Int -> T a -> IO a
runLimited maxThreads m = do
complete <- newEmptyMVar
(result, uninitialized) <-
MS.runStateT (MR.runReaderT m complete) maxThreads
replicateM_ (maxThreadsuninitialized) $ takeMVar complete
return result