module Control.Concurrent.LoadDistribution (
evenlyDistributed,
LoadBalanced,
withResource,
map,
) where
import Prelude hiding (map)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TVar (TVar, newTVar, readTVar, writeTVar,
modifyTVar)
import Control.Exception (bracket)
import Data.PSQueue (PSQ, minView, adjust, Binding((:->)), alter, insert)
import Data.Set (Set, member)
import qualified Data.PSQueue as Q (empty)
import qualified Data.Set as Set (foldr, map)
import qualified System.Log.Logger as L (debugM)
evenlyDistributed
:: (Ord resource)
=> IO (Set resource)
-> IO (LoadBalanced resource)
evenlyDistributed getResources = do
psQueueT <- atomically (newTVar Q.empty)
return LB {psQueueT, getResources}
data LoadBalanced resource =
LB {
psQueueT :: TVar (PSQ resource (Int, Int)),
getResources :: IO (Set resource)
}
map :: (Ord b) => (a -> b) -> LoadBalanced a -> IO (LoadBalanced b)
map f lb = do
psQueueT <- atomically (newTVar Q.empty)
return LB {psQueueT, getResources = Set.map f <$> getResources lb}
withResource
:: (Show resource, Ord resource)
=> LoadBalanced resource
-> (Maybe resource -> IO a)
-> IO a
withResource LB {psQueueT, getResources} action = do
resources <- getResources
invokeResource psQueueT resources action
invokeResource :: (Ord resource, Show resource)
=> TVar (PSQ resource (Int, Int))
-> Set resource
-> (Maybe resource -> IO a)
-> IO a
invokeResource psQueueT targets =
bracket checkout checkin
where
checkout = do
url <- atomically $ do
queue <- updateTargets <$> readTVar psQueueT
(newQueue, resource) <- getBest queue
writeTVar psQueueT newQueue
return resource
debugM ("Choosing: " ++ show url)
return url
where
getBest queue =
case minView queue of
Nothing -> return (queue, Nothing)
Just (resource :-> (p, r), newQueue) ->
if resource `member` targets
then return (
insert resource (p + 1, r + 1) newQueue,
Just resource
)
else getBest newQueue
checkin Nothing = return ()
checkin (Just resource) = atomically $
modifyTVar
psQueueT
(adjust decrement resource)
where
decrement (p, r) = (p 1, r)
updateTargets queue =
Set.foldr
(alter insertMissing)
queue
targets
where
insertMissing Nothing = Just (0, 0)
insertMissing (Just p) = Just p
debugM :: String -> IO ()
debugM = L.debugM "load-balancing"