module Data.Pool
( Pool(nStripes, idleTime, maxResources)
, LocalPool
, createPool
, destroyResource
, purgePool
, putResource
, takeResource
, tryTakeResource
, tryWithResource
, withResource
)
where
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.Catch (MonadMask)
import qualified Control.Monad.Catch as E
import Control.Monad.IO.Class
import Data.Hashable (hash)
import Data.IORef (IORef, mkWeakIORef, newIORef)
import Data.List (partition)
import Data.Time.Clock
import Data.Vector (Vector, (!))
import qualified Data.Vector as V
import Data.Word
data Resource a = Resource
{ resource :: !a
, lastUse :: !UTCTime
}
data LocalPool a = LocalPool
{ inUse :: !(TVar Word32)
, resources :: !(TVar [Resource a])
, lfin :: !(IORef ())
}
data Pool a = Pool
{ create :: IO a
, destroy :: a -> IO ()
, nStripes :: !Word32
, idleTime :: !NominalDiffTime
, maxResources :: !Word32
, localPools :: !(Vector (LocalPool a))
, fin :: !(IORef ())
}
instance Show (Pool a) where
show p = "Pool {nStripes = " ++ show (nStripes p) ++ ", " ++
"idleTime = " ++ show (idleTime p) ++ ", " ++
"maxResources = " ++ show (maxResources p) ++ "}"
createPool :: IO a
-> (a -> IO ())
-> Word32
-> NominalDiffTime
-> Word32
-> IO (Pool a)
createPool create destroy nStripes idleTime maxResources = do
locals <- V.replicateM (fromIntegral nStripes) $ LocalPool
<$> newTVarIO 0
<*> newTVarIO []
<*> newIORef ()
reapId <- forkIO $ reaper destroy idleTime locals
pool <- Pool create destroy nStripes idleTime maxResources locals
<$> newIORef ()
addFinalizer (fin pool) $ do
killThread reapId
purgePool pool
V.forM_ locals $ \ lp -> addFinalizer (lfin lp) $ purgeLocalPool destroy lp
return pool
where
addFinalizer ref = void . mkWeakIORef ref
purgePool :: Pool a -> IO ()
purgePool p = V.forM_ (localPools p) $ purgeLocalPool (destroy p)
withResource :: (MonadIO m, MonadMask m) => Pool a -> (a -> m b) -> m b
withResource p act = E.mask $ \ restore -> do
(r, lp) <- takeResource p
res <- restore (act r) `E.onException` destroyResource p lp r
putResource lp r
return res
tryWithResource :: (MonadIO m, MonadMask m)
=> Pool a -> (a -> m b) -> m (Maybe b)
tryWithResource p act = E.mask $ \ restore -> do
mres <- tryTakeResource p
case mres of
Just (r, lp) -> do
res <- restore (act r) `E.onException` destroyResource p lp r
putResource lp r
return (Just res)
Nothing -> restore $ return Nothing
takeResource :: MonadIO m => Pool a -> m (a, LocalPool a)
takeResource p = do
lp <- getLocalPool p
r <- liftIO . join . atomically $ do
rs <- readTVar (resources lp)
case rs of
(x:xs) -> do
writeTVar (resources lp) xs
return . return . resource $ x
[] -> do
used <- readTVar (inUse lp)
when (used == maxResources p) retry
writeTVar (inUse lp) $! used + 1
return $ liftIO (create p)
`E.onException` modify_ (inUse lp) (subtract 1)
return (r, lp)
tryTakeResource :: MonadIO m => Pool a -> m (Maybe (a, LocalPool a))
tryTakeResource p = do
lp <- getLocalPool p
r <- liftIO . join . atomically $ do
rs <- readTVar (resources lp)
case rs of
(x:xs) -> do
writeTVar (resources lp) xs
return . return . Just . resource $ x
[] -> do
used <- readTVar (inUse lp)
if used == maxResources p
then return . return $ Nothing
else do
writeTVar (inUse lp) $! used + 1
return $ Just
<$> liftIO (create p)
`E.onException` modify_ (inUse lp) (subtract 1)
return $ flip (,) lp <$> r
putResource :: MonadIO m => LocalPool a -> a -> m ()
putResource lp r = liftIO $ do
now <- getCurrentTime
atomically $ modifyTVar' (resources lp) (Resource r now:)
destroyResource :: MonadIO m => Pool a -> LocalPool a -> a -> m ()
destroyResource p lp r = liftIO $ do
ignoreExceptions $ destroy p r
modify_ (inUse lp) (subtract 1)
reaper :: (a -> IO ()) -> NominalDiffTime -> Vector (LocalPool a) -> IO ()
reaper destroy idleTime pools = forever $ do
threadDelay (1 * 1000000)
now <- getCurrentTime
let isStale r = now `diffUTCTime` lastUse r > idleTime
V.forM_ pools $ \ (LocalPool inUse resources _) -> do
rs <- atomically $ do
(stale,fresh) <- partition isStale <$> readTVar resources
unless (null stale) $ do
writeTVar resources fresh
modifyTVar' inUse $ subtract (fromIntegral (length stale))
return (map resource stale)
forM_ rs $ liftIO . ignoreExceptions . destroy
purgeLocalPool :: (a -> IO ()) -> LocalPool a -> IO ()
purgeLocalPool destroy (LocalPool inUse resources _) = do
rs <- atomically $ do
rs <- readTVar resources
modifyTVar' inUse $ subtract (fromIntegral (length rs))
modifyTVar' resources $ const []
return rs
forM_ rs $ liftIO . ignoreExceptions . destroy . resource
getLocalPool :: MonadIO m => Pool a -> m (LocalPool a)
getLocalPool p = do
i <- liftIO $ ((`mod` fromIntegral (nStripes p)) . hash) <$> myThreadId
return $ localPools p ! i
modify_ :: TVar a -> (a -> a) -> IO ()
modify_ t f = atomically $ modifyTVar' t f
ignoreExceptions :: IO () -> IO ()
ignoreExceptions = E.handleAll (const $ return ())