{-# LANGUAGE Safe #-} module Control.Concurrent.PooledIO.Monad where import qualified Control.Concurrent.Split.MVar as MVar import Control.Concurrent (ThreadId, forkIO, getNumCapabilities, myThreadId, killThread) import Control.DeepSeq (NFData, ($!!)) import Control.Exception (SomeException, finally, try, throw) import qualified Control.Monad.Trans.State as MS import qualified Control.Monad.Trans.Reader as MR import qualified Control.Monad.Trans.Class as MT import Control.Monad.IO.Class (liftIO) import Control.Monad (replicateM_, liftM2) import Control.Functor.HT (void) import qualified Data.Foldable as Fold import qualified Data.Set as Set; import Data.Set (Set) type T = MR.ReaderT (MVar.In (), MVar.Out ()) (MS.StateT Int IO) fork :: (NFData a) => IO a -> T (IO a) fork :: forall a. NFData a => IO a -> T (IO a) fork IO a act = do (In () completeIn, Out () completeOut) <- forall (m :: * -> *) r. Monad m => ReaderT r m r MR.ask Int initial <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. (MonadTrans t, Monad m) => m a -> t m a MT.lift forall (m :: * -> *) s. Monad m => StateT s m s MS.get if Int initialforall a. Ord a => a -> a -> Bool >Int 0 then forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. (MonadTrans t, Monad m) => m a -> t m a MT.lift forall a b. (a -> b) -> a -> b $ forall (m :: * -> *) s. Monad m => s -> StateT s m () MS.put (Int initialforall a. Num a => a -> a -> a -Int 1) else forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO forall a b. (a -> b) -> a -> b $ forall a. Out a -> IO a MVar.take Out () completeOut forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO forall a b. (a -> b) -> a -> b $ do (In a resultIn, Out a resultOut) <- forall a. IO (In a, Out a) MVar.newEmpty In () -> IO () -> IO () forkFinally In () completeIn forall a b. (a -> b) -> a -> b $ (forall a. In a -> a -> IO () MVar.put In a resultIn forall a b. NFData a => (a -> b) -> a -> b $!!) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b =<< IO a act forall (m :: * -> *) a. Monad m => a -> m a return forall a b. (a -> b) -> a -> b $ forall a. Out a -> IO a MVar.take Out a resultOut forkFinally :: MVar.In () -> IO () -> IO () forkFinally :: In () -> IO () -> IO () forkFinally In () mvar IO () act = forall (f :: * -> *) a. Functor f => f a -> f () void forall a b. (a -> b) -> a -> b $ IO () -> IO ThreadId forkIO forall a b. (a -> b) -> a -> b $ forall a b. IO a -> IO b -> IO a finally IO () act forall a b. (a -> b) -> a -> b $ forall a. In a -> a -> IO () MVar.put In () mvar () forkTry :: (NFData a) => MVar.In (ThreadId, Either SomeException a) -> IO a -> MS.StateT (Set ThreadId) IO () forkTry :: forall a. NFData a => In (ThreadId, Either SomeException a) -> IO a -> StateT (Set ThreadId) IO () forkTry In (ThreadId, Either SomeException a) mvar IO a act = do ThreadId thread <- forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO forall a b. (a -> b) -> a -> b $ IO () -> IO ThreadId forkIO forall a b. (a -> b) -> a -> b $ forall a id e b. NFData a => ((id, Either e a) -> b) -> (id, Either e a) -> b applyStrictRight (forall a. In a -> a -> IO () MVar.put In (ThreadId, Either SomeException a) mvar) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b =<< forall (m :: * -> *) a1 a2 r. Monad m => (a1 -> a2 -> r) -> m a1 -> m a2 -> m r liftM2 (,) IO ThreadId myThreadId (forall e a. Exception e => IO a -> IO (Either e a) try IO a act) forall (m :: * -> *) s. Monad m => (s -> s) -> StateT s m () MS.modify (forall a. Ord a => a -> Set a -> Set a Set.insert ThreadId thread) applyStrictRight :: (NFData a) => ((id, Either e a) -> b) -> (id, Either e a) -> b applyStrictRight :: forall a id e b. NFData a => ((id, Either e a) -> b) -> (id, Either e a) -> b applyStrictRight (id, Either e a) -> b f (id thread, Either e a ee) = case Either e a ee of Left e e -> (id, Either e a) -> b f (id thread, forall a b. a -> Either a b Left e e) Right a a -> (id, Either e a) -> b f forall b c a. (b -> c) -> (a -> b) -> a -> c . (,) id thread forall b c a. (b -> c) -> (a -> b) -> a -> c . forall a b. b -> Either a b Right forall a b. NFData a => (a -> b) -> a -> b $!! a a takeMVarTry :: MVar.Out (ThreadId, Either SomeException a) -> MS.StateT (Set ThreadId) IO a takeMVarTry :: forall a. Out (ThreadId, Either SomeException a) -> StateT (Set ThreadId) IO a takeMVarTry Out (ThreadId, Either SomeException a) mvar = do (ThreadId thread, Either SomeException a ee) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO forall a b. (a -> b) -> a -> b $ forall a. Out a -> IO a MVar.take Out (ThreadId, Either SomeException a) mvar forall (m :: * -> *) s. Monad m => (s -> s) -> StateT s m () MS.modify (forall a. Ord a => a -> Set a -> Set a Set.delete ThreadId thread) case Either SomeException a ee of Left SomeException e -> do forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c . forall (t :: * -> *) (m :: * -> *) a b. (Foldable t, Monad m) => (a -> m b) -> t a -> m () Fold.mapM_ ThreadId -> IO () killThread forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b =<< forall (m :: * -> *) s. Monad m => StateT s m s MS.get; forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO forall a b. (a -> b) -> a -> b $ forall a e. Exception e => e -> a throw SomeException e Right a a -> forall (m :: * -> *) a. Monad m => a -> m a return a a runTry :: MS.StateT (Set ThreadId) IO a -> IO a runTry :: forall a. StateT (Set ThreadId) IO a -> IO a runTry StateT (Set ThreadId) IO a act = forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a MS.evalStateT StateT (Set ThreadId) IO a act forall a. Set a Set.empty chooseNumCapabilities :: Maybe Int -> IO Int chooseNumCapabilities :: Maybe Int -> IO Int chooseNumCapabilities = forall b a. b -> (a -> b) -> Maybe a -> b maybe IO Int getNumCapabilities forall (m :: * -> *) a. Monad m => a -> m a return withNumCapabilities :: (Int -> a -> IO b) -> a -> IO b withNumCapabilities :: forall a b. (Int -> a -> IO b) -> a -> IO b withNumCapabilities Int -> a -> IO b run a acts = do Int numCaps <- IO Int getNumCapabilities Int -> a -> IO b run Int numCaps a acts runLimited :: Int -> T a -> IO a runLimited :: forall a. Int -> T a -> IO a runLimited Int maxThreads T a m = do complete :: (In (), Out ()) complete@(In () _, Out () completeOut) <- forall a. IO (In a, Out a) MVar.newEmpty (a result, Int uninitialized) <- forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s) MS.runStateT (forall r (m :: * -> *) a. ReaderT r m a -> r -> m a MR.runReaderT T a m (In (), Out ()) complete) Int maxThreads forall (m :: * -> *) a. Applicative m => Int -> m a -> m () replicateM_ (Int maxThreadsforall a. Num a => a -> a -> a -Int uninitialized) forall a b. (a -> b) -> a -> b $ forall a. Out a -> IO a MVar.take Out () completeOut forall (m :: * -> *) a. Monad m => a -> m a return a result