{-# LANGUAGE MultiWayIf, LambdaCase #-}
module Control.Concurrent.Throttled
( throttle, throttle_
, throttleMaybe, throttleMaybe_
) where
import Control.Concurrent (getNumCapabilities, threadDelay)
import Control.Concurrent.Async (replicateConcurrently_)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Monad (void)
import Data.Bifunctor (bimap)
import Data.Foldable (traverse_)
import Data.Functor (($>))
data Pool a b = Pool { threads :: !Word
, waiters :: !(TVar Word)
, job :: !(TVar JobStatus)
, source :: !(TQueue a)
, target :: !(TQueue b) }
newPool :: Foldable f => f a -> IO (Pool a b)
newPool xs = Pool
<$> (fromIntegral <$> getNumCapabilities)
<*> newTVarIO 0
<*> newTVarIO Fantastic
<*> atomically (newTQueue >>= \q -> traverse_ (writeTQueue q) xs $> q)
<*> atomically newTQueue
data ThreadStatus = Waiting | Working
data JobStatus = Failed | Fantastic
throttle :: Foldable f => (TQueue a -> a -> IO b) -> f a -> IO (TQueue b)
throttle f xs = either id id <$> throttleGen (\q b -> atomically $ writeTQueue q b) (\q a -> Just <$> f q a) xs
throttle_ :: Foldable f => (TQueue a -> a -> IO b) -> f a -> IO ()
throttle_ f = void . throttleGen (\_ _ -> mempty) (\q a -> Just <$> f q a)
throttleMaybe :: Foldable f => (TQueue a -> a -> IO (Maybe b)) -> f a -> IO (Either (TQueue b) (TQueue b))
throttleMaybe = throttleGen (\q b -> atomically $ writeTQueue q b)
throttleMaybe_ :: Foldable f => (TQueue a -> a -> IO (Maybe b)) -> f a -> IO (Either () ())
throttleMaybe_ f xs = bimap (const ()) (const ()) <$> throttleGen (\_ _ -> mempty) f xs
throttleGen :: Foldable f => (TQueue b -> b -> IO ()) -> (TQueue a -> a -> IO (Maybe b)) -> f a -> IO (Either (TQueue b) (TQueue b))
throttleGen g f xs = do
p <- newPool xs
replicateConcurrently_ (fromIntegral $ threads p) (check p Working)
($ target p) . jobStatus <$> readTVarIO (job p)
where check p s = readTVarIO (job p) >>= \case
Failed -> pure ()
Fantastic -> atomically (tryReadTQueue $ source p) >>= work p s
work p Waiting Nothing = do
ws <- readTVarIO $ waiters p
if | ws == threads p -> pure ()
| otherwise -> threadDelay 100000 *> check p Waiting
work p Working Nothing = do
atomically $ modifyTVar' (waiters p) succ
threadDelay 100000
check p Waiting
work p Waiting j@(Just _) = do
atomically $ modifyTVar' (waiters p) pred
work p Working j
work p Working (Just x) = f (source p) x >>= \case
Nothing -> atomically $ writeTVar (job p) Failed
Just res -> g (target p) res >> check p Working
jobStatus :: JobStatus -> (a -> Either a a)
jobStatus Failed = Left
jobStatus Fantastic = Right