module Data.Pool
( Pool(nStripes, idleTime, maxResources)
, LocalPool
, createPool
, destroyResource
, purgePool
, putResource
, takeResource
, tryTakeResource
, tryWithResource
, withResource
)
where
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.Catch
import Control.Monad.IO.Class
import Data.Hashable (hash)
import Data.IORef (IORef, mkWeakIORef, newIORef)
import Data.List (partition)
import Data.Time.Clock
import Data.Vector (Vector, (!))
import qualified Data.Vector as V
import Data.Word
data Resource a = Resource
{ resource :: !a
, lastUse :: !UTCTime
}
data LocalPool a = LocalPool
{ inUse :: !(TVar Word32)
, resources :: !(TVar [Resource a])
, lfin :: !(IORef ())
}
data Pool a = Pool
{ create :: IO a
, destroy :: a -> IO ()
, nStripes :: !Word32
, idleTime :: !NominalDiffTime
, maxResources :: !Word32
, localPools :: !(Vector (LocalPool a))
, fin :: !(IORef ())
}
instance Show (Pool a) where
show p = "Pool {nStripes = " ++ show (nStripes p) ++ ", " ++
"idleTime = " ++ show (idleTime p) ++ ", " ++
"maxResources = " ++ show (maxResources p) ++ "}"
createPool :: IO a
-> (a -> IO ())
-> Word32
-> NominalDiffTime
-> Word32
-> IO (Pool a)
createPool create destroy nStripes idleTime maxResources = do
locals <- V.replicateM (fromIntegral nStripes) $ LocalPool
<$> newTVarIO 0
<*> newTVarIO []
<*> newIORef ()
reapId <- forkIO $ reaper destroy idleTime locals
pool <- Pool create destroy nStripes idleTime maxResources locals
<$> newIORef ()
addFinalizer (fin pool) $ do
killThread reapId
purgePool pool
V.forM_ locals $ \ lp -> addFinalizer (lfin lp) $ purgeLocalPool destroy lp
return pool
where
addFinalizer ref = void . mkWeakIORef ref
purgePool :: Pool a -> IO ()
purgePool p = V.forM_ (localPools p) $ purgeLocalPool (destroy p)
withResource :: (MonadIO m, MonadCatch m) => Pool a -> (a -> m b) -> m b
withResource p act = mask $ \ restore -> do
(r, lp) <- takeResource p
res <- restore (act r) `onException` destroyResource p lp r
putResource lp r
return res
tryWithResource :: (MonadIO m, MonadCatch m)
=> Pool a -> (a -> m b) -> m (Maybe b)
tryWithResource p act = mask $ \ restore -> do
mres <- tryTakeResource p
case mres of
Just (r, lp) -> do
res <- restore (act r) `onException` destroyResource p lp r
putResource lp r
return (Just res)
Nothing -> restore $ return Nothing
takeResource :: MonadIO m => Pool a -> m (a, LocalPool a)
takeResource p = do
lp <- getLocalPool p
r <- liftIO . join . atomically $ do
rs <- readTVar (resources lp)
case rs of
(x:xs) -> do
writeTVar (resources lp) xs
return . return . resource $ x
[] -> do
used <- readTVar (inUse lp)
when (used == maxResources p) retry
writeTVar (inUse lp) $! used + 1
return $ liftIO (create p)
`onException` modify_ (inUse lp) (subtract 1)
return (r, lp)
tryTakeResource :: MonadIO m => Pool a -> m (Maybe (a, LocalPool a))
tryTakeResource p = do
lp <- getLocalPool p
r <- liftIO . join . atomically $ do
rs <- readTVar (resources lp)
case rs of
(x:xs) -> do
writeTVar (resources lp) xs
return . return . Just . resource $ x
[] -> do
used <- readTVar (inUse lp)
if used == maxResources p
then return . return $ Nothing
else do
writeTVar (inUse lp) $! used + 1
return $ Just
<$> liftIO (create p)
`onException` modify_ (inUse lp) (subtract 1)
return $ flip (,) lp <$> r
putResource :: MonadIO m => LocalPool a -> a -> m ()
putResource lp r = liftIO $ do
now <- getCurrentTime
atomically $ modifyTVar' (resources lp) (Resource r now:)
destroyResource :: MonadIO m => Pool a -> LocalPool a -> a -> m ()
destroyResource p lp r = liftIO $ do
destroy p r `catch` \(_::SomeException) -> return ()
modify_ (inUse lp) (subtract 1)
reaper :: (a -> IO ()) -> NominalDiffTime -> Vector (LocalPool a) -> IO ()
reaper destroy idleTime pools = forever $ do
threadDelay (1 * 1000000)
now <- getCurrentTime
let isStale r = now `diffUTCTime` lastUse r > idleTime
V.forM_ pools $ \ (LocalPool inUse resources _) -> do
rs <- atomically $ do
(stale,fresh) <- partition isStale <$> readTVar resources
unless (null stale) $ do
writeTVar resources fresh
modifyTVar' inUse $ subtract (fromIntegral (length stale))
return (map resource stale)
forM_ rs $ \ r ->
liftIO (destroy r) `catch` \(_::SomeException) -> return ()
purgeLocalPool :: (a -> IO ()) -> LocalPool a -> IO ()
purgeLocalPool destroy (LocalPool inUse resources _) = do
rs <- atomically $ do
rs <- readTVar resources
modifyTVar' inUse $ subtract (fromIntegral (length rs))
modifyTVar' resources $ const []
return rs
forM_ rs $ \ r ->
liftIO (destroy (resource r)) `catch` \(_::SomeException) -> return ()
getLocalPool :: MonadIO m => Pool a -> m (LocalPool a)
getLocalPool p = do
i <- liftIO $ ((`mod` fromIntegral (nStripes p)) . hash) <$> myThreadId
return $ localPools p ! i
modify_ :: TVar a -> (a -> a) -> IO ()
modify_ t f = atomically $ modifyTVar' t f