module Control.Concurrent.Throttled
( throttled
, throttled_
) where
import Control.Concurrent (getNumCapabilities, threadDelay)
import Control.Concurrent.Async (replicateConcurrently_)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Monad (void)
import Data.Foldable (traverse_)
import Data.Functor (($>))
data Pool a b = Pool { threads :: !Word
, waiters :: !(TVar Word)
, source :: !(TQueue a)
, target :: !(TQueue b) }
newPool :: Foldable f => f a -> IO (Pool a b)
newPool xs = Pool
<$> (fromIntegral <$> getNumCapabilities)
<*> newTVarIO 0
<*> atomically (newTQueue >>= \q -> traverse_ (writeTQueue q) xs $> q)
<*> atomically newTQueue
data Status = Waiting | Working
throttled :: Foldable f => (TQueue a -> a -> IO b) -> f a -> IO (TQueue b)
throttled = throttledGen (\q b -> atomically $ writeTQueue q b)
throttled_ :: Foldable f => (TQueue a -> a -> IO ()) -> f a -> IO ()
throttled_ f xs = void $ throttledGen (\_ _ -> pure ()) f xs
throttledGen :: Foldable f => (TQueue b -> b -> IO ()) -> (TQueue a -> a -> IO b) -> f a -> IO (TQueue b)
throttledGen g f xs = do
p <- newPool xs
replicateConcurrently_ (fromIntegral $ threads p) (work p Working)
pure $ target p
where work p s = do
mx <- atomically . tryReadTQueue $ source p
ws <- atomically . readTVar $ waiters p
case (mx, s) of
(Nothing, Waiting) | ws == threads p -> pure ()
| otherwise -> threadDelay 100000 *> work p Waiting
(Nothing, Working) -> do
atomically $ modifyTVar' (waiters p) succ
threadDelay 100000
work p Waiting
(Just x, Waiting) -> do
atomically $ modifyTVar' (waiters p) pred
f (source p) x >>= g (target p) >> work p Working
(Just x, Working) -> f (source p) x >>= g (target p) >> work p Working