module Control.Parallel.HdpH.Internal.Threadpool
(
ThreadM,
run,
forkThreadM,
liftSparkM,
liftCommM,
liftIO,
poolID,
putThread,
putThreads,
stealThread,
readMaxThreadCtrs
) where
import Prelude hiding (error)
import Control.Concurrent (ThreadId)
import Control.Monad.Reader (ReaderT, runReaderT, ask)
import Control.Monad.Trans (lift)
import Control.Parallel.HdpH.Internal.Comm (CommM)
import Control.Parallel.HdpH.Internal.Data.Deque
(DequeIO, pushFrontIO, popFrontIO, popBackIO, maxLengthIO)
import Control.Parallel.HdpH.Internal.Location (error)
import Control.Parallel.HdpH.Internal.Misc (Forkable, fork, rotate)
import Control.Parallel.HdpH.Internal.Sparkpool (SparkM, wakeupSched)
import qualified Control.Parallel.HdpH.Internal.Sparkpool as Sparkpool
(liftCommM, liftIO)
import Control.Parallel.HdpH.Internal.Type.Par (Thread)
type ThreadM m = ReaderT (State m) (SparkM m)
type State m = [(Int, DequeIO (Thread m))]
run :: [(Int, DequeIO (Thread m))] -> ThreadM m a -> SparkM m a
run pools action = runReaderT action pools
forkThreadM :: Int -> ThreadM m () -> ThreadM m ThreadId
forkThreadM n action = do
pools <- getPools
lift $ fork $ run (rotate n pools) action
liftSparkM :: SparkM m a -> ThreadM m a
liftSparkM = lift
liftCommM :: CommM a -> ThreadM m a
liftCommM = liftSparkM . Sparkpool.liftCommM
liftIO :: IO a -> ThreadM m a
liftIO = liftSparkM . Sparkpool.liftIO
getPools :: ThreadM m [(Int, DequeIO (Thread m))]
getPools = do pools <- ask
case pools of
[] -> error "HdpH.Internal.Threadpool.getPools: no pools"
_ -> return pools
poolID :: ThreadM m Int
poolID = do
my_pool:_ <- getPools
return $ fst my_pool
readMaxThreadCtrs :: ThreadM m [Int]
readMaxThreadCtrs = getPools >>= liftIO . mapM (maxLengthIO . snd)
stealThread :: ThreadM m (Maybe (Thread m))
stealThread = do
my_pool:other_pools <- getPools
maybe_thread <- liftIO $ popFrontIO $ snd my_pool
case maybe_thread of
Just _ -> return maybe_thread
Nothing -> steal other_pools
where
steal :: [(Int, DequeIO (Thread m))] -> ThreadM m (Maybe (Thread m))
steal [] = return Nothing
steal (pool:pools) = do
maybe_thread <- liftIO $ popBackIO $ snd pool
case maybe_thread of
Just _ -> return maybe_thread
Nothing -> steal pools
putThread :: Thread m -> ThreadM m ()
putThread thread = do
my_pool:_ <- getPools
liftIO $ pushFrontIO (snd my_pool) thread
liftSparkM $ wakeupSched 1
putThreads :: [Thread m] -> ThreadM m ()
putThreads threads = do
all_pools@(my_pool:_) <- getPools
liftIO $ mapM_ (pushFrontIO $ snd my_pool) threads
liftSparkM $ wakeupSched (min (length all_pools) (length threads))