module CIO
(
CIO,
runCIO,
runCIO',
MonadCIO(..),
mapMConcurrently,
mapMConcurrently',
mapMConcurrently_,
forMConcurrently,
forMConcurrently',
forMConcurrently_,
distributeConcurrently,
distributeConcurrently_,
)
where
import CIO.Prelude
import qualified Control.Concurrent.ParallelIO.Local as ParallelIO
newtype CIO r = CIO (ReaderT (ParallelIO.Pool, Int) IO r)
deriving (Functor, Applicative, Monad)
instance MonadIO CIO where
liftIO io = CIO $ lift io
instance MonadSTM CIO where
liftSTM = CIO . liftSTM
runCIO :: Int -> CIO r -> IO r
runCIO numCapabilities (CIO t) =
ParallelIO.withPool numCapabilities $ \pool -> runReaderT t (pool, numCapabilities)
runCIO' :: CIO r -> IO r
runCIO' cio = do
numCapabilities <- getNumCapabilities
runCIO numCapabilities cio
class (Monad m) => MonadCIO m where
getPoolNumCapabilities :: m Int
sequenceConcurrently :: [m a] -> m [a]
sequenceConcurrently' :: [m a] -> m [a]
sequenceConcurrently_ :: [m a] -> m ()
instance MonadCIO CIO where
getPoolNumCapabilities =
CIO $ do
(_, z) <- ask
return z
sequenceConcurrently actions =
CIO $ do
env@(pool, _) <- ask
lift $ ParallelIO.parallel pool $ map (envToCIOToIO env) actions
where
envToCIOToIO env (CIO t) = runReaderT t env
sequenceConcurrently' actions =
CIO $ do
env@(pool, _) <- ask
lift $ ParallelIO.parallelInterleaved pool $ map (envToCIOToIO env) actions
where
envToCIOToIO env (CIO t) = runReaderT t env
sequenceConcurrently_ actions =
CIO $ do
env@(pool, _) <- ask
lift $ ParallelIO.parallel_ pool $ map (envToCIOToIO env) actions
where
envToCIOToIO env (CIO t) = runReaderT t env
instance (MonadCIO m) => MonadCIO (ReaderT r m) where
getPoolNumCapabilities = lift getPoolNumCapabilities
sequenceConcurrently actions = do
env <- ask
let cioActions = map (flip runReaderT env) actions
lift $ sequenceConcurrently cioActions
sequenceConcurrently' actions = do
env <- ask
let cioActions = map (flip runReaderT env) actions
lift $ sequenceConcurrently' cioActions
sequenceConcurrently_ actions = do
env <- ask
let cioActions = map (flip runReaderT env) actions
lift $ sequenceConcurrently_ cioActions
instance (MonadCIO m, Monoid w) => MonadCIO (WriterT w m) where
getPoolNumCapabilities = lift getPoolNumCapabilities
sequenceConcurrently actions = do
let cioActions = map runWriterT actions
WriterT $ do
(as, ws) <- return . unzip =<< sequenceConcurrently cioActions
return (as, mconcat ws)
sequenceConcurrently' actions = do
let cioActions = map runWriterT actions
WriterT $ do
(as, ws) <- return . unzip =<< sequenceConcurrently' cioActions
return (as, mconcat ws)
sequenceConcurrently_ actions = do
let cioActions = map execWriterT actions
WriterT $ do
ws <- sequenceConcurrently' cioActions
return ((), mconcat ws)
mapMConcurrently :: (MonadCIO m) => (a -> m b) -> [a] -> m [b]
mapMConcurrently f = sequenceConcurrently . map f
mapMConcurrently' :: (MonadCIO m) => (a -> m b) -> [a] -> m [b]
mapMConcurrently' f = sequenceConcurrently' . map f
mapMConcurrently_ :: (MonadCIO m) => (a -> m b) -> [a] -> m ()
mapMConcurrently_ f = sequenceConcurrently_ . map f
forMConcurrently :: (MonadCIO m) => [a] -> (a -> m b) -> m [b]
forMConcurrently = flip mapMConcurrently
forMConcurrently' :: (MonadCIO m) => [a] -> (a -> m b) -> m [b]
forMConcurrently' = flip mapMConcurrently'
forMConcurrently_ :: (MonadCIO m) => [a] -> (a -> m b) -> m ()
forMConcurrently_ = flip mapMConcurrently_
replicateMConcurrently :: (MonadCIO m) => Int -> m a -> m [a]
replicateMConcurrently n = sequenceConcurrently . replicate n
replicateMConcurrently' :: (MonadCIO m) => Int -> m a -> m [a]
replicateMConcurrently' n = sequenceConcurrently' . replicate n
replicateMConcurrently_ :: (MonadCIO m) => Int -> m a -> m ()
replicateMConcurrently_ n = sequenceConcurrently_ . replicate n
distributeConcurrently :: (MonadCIO m) => m a -> m [a]
distributeConcurrently action = do
n <- getPoolNumCapabilities
replicateMConcurrently' n action
distributeConcurrently_ :: (MonadCIO m) => m a -> m ()
distributeConcurrently_ action = do
n <- getPoolNumCapabilities
replicateMConcurrently_ n action