module Control.Concurrent.ManagedThreads (
ThreadManager
, newManager
, forkManaged
, getStatus
, waitFor
, waitAll
, nParSequenceIO
, nParSequenceIO_
, nParMapIO
, nParMapIO_
, nParCmd_
, parCmd_
) where
import Data.Ord (comparing)
import Data.List (sortBy)
import qualified Data.Map as M
import Control.Monad
import Control.Concurrent
import Control.Exception (IOException, try)
import GHC.Conc (numCapabilities)
data ThreadStatus = Running
| Finished
| Threw IOException
deriving (Show)
newtype ThreadManager =
Mgr (MVar (M.Map ThreadId (MVar ThreadStatus)))
deriving (Eq)
newManager :: IO ThreadManager
newManager = Mgr `fmap` newMVar M.empty
forkManaged :: ThreadManager -> IO () -> IO ThreadId
forkManaged (Mgr mgr) body =
modifyMVar mgr $ \m -> do
state <- newEmptyMVar
tid <- forkIO $ do
result <- try body
putMVar state (either Threw (const Finished) result)
return (M.insert tid state m, tid)
getStatus :: ThreadManager -> ThreadId -> IO (Maybe ThreadStatus)
getStatus (Mgr mgr) tid =
modifyMVar mgr $ \m ->
case M.lookup tid m of
Nothing -> return (m, Nothing)
Just st -> tryTakeMVar st >>= \mst -> case mst of
Nothing -> return (m, Just Running)
Just sth -> return (M.delete tid m, Just sth)
waitFor :: ThreadManager -> ThreadId -> IO (Maybe ThreadStatus)
waitFor (Mgr mgr) tid =
join . modifyMVar mgr $ \m ->
return $ case M.updateLookupWithKey (\_ _ -> Nothing) tid m of
(Nothing, _) -> (m, return Nothing)
(Just st, m') -> (m', Just `fmap` takeMVar st)
waitAll :: ThreadManager -> IO ()
waitAll (Mgr mgr) = modifyMVar mgr elems >>= mapM_ takeMVar
where elems m = return (M.empty, M.elems m)
takeHead :: MVar [a] -> IO (Maybe a)
takeHead v =
modifyMVar v (return . extract)
where
extract [] = ([], Nothing)
extract (x:xs) = (xs, Just x)
nParSequenceIO :: Int -> [IO a] -> IO [a]
nParSequenceIO n ios = do
inMv <- newMVar $ zip [(1::Int)..] ios
resMv <- newMVar []
mgr <- newManager
mapM_ (forkManaged mgr) (replicate (max 1 n) $ worker inMv resMv)
waitAll mgr
(map snd . sortBy (comparing fst)) `liftM` takeMVar resMv
where
worker inMv resMv = do
nextJob <- takeHead inMv
case nextJob of
Nothing -> return ()
Just (i, io) -> do
out <- io
modifyMVar_ resMv (\res -> return $ (i,out) : res)
worker inMv resMv
nParSequenceIO_ :: Int -> [IO a] -> IO ()
nParSequenceIO_ n ios = do
inMv <- newMVar ios
mgr <- newManager
mapM_ (forkManaged mgr) (replicate n $ worker inMv)
waitAll mgr
where
worker inMv = do
nextJob <- takeHead inMv
case nextJob of
Nothing -> return ()
Just io -> io >> worker inMv
nParMapIO :: Int -> (a -> IO b) -> [a] -> IO [b]
nParMapIO n f = nParSequenceIO n . map f
nParMapIO_ :: Int -> (a -> IO b) -> [a] -> IO ()
nParMapIO_ n f = nParSequenceIO_ n . map f
nParCmd_ :: Int -> (Int -> Int -> b -> IO ()) -> [Chan b -> IO a] -> IO ()
nParCmd_ nThreads display cmds = do
chan <- newChan
mgr <- newManager
_ <- forkManaged mgr (displayThread chan 1)
_ <- forkManaged mgr (nParMapIO_ nThreads (\cmd -> cmd chan) cmds)
waitAll mgr
where
nCmds = length cmds
displayThread ch i
| nCmds < i = do return ()
| otherwise = do msg <- readChan ch
display nCmds i msg
displayThread ch (succ i)
parCmd_ :: (Int -> Int -> b -> IO ()) -> [Chan b -> IO a] -> IO ()
parCmd_ display cmds = nParCmd_ (numCapabilities+1) display cmds