module Data.Pool
(
Pool(idleTime, maxResources, numStripes)
, createPool
, withResource
) where
import Control.Applicative ((<$>))
import Control.Concurrent (forkIO, killThread, myThreadId, threadDelay)
import Control.Concurrent.STM
import Control.Exception (SomeException, catch)
import Control.Monad (forM_, forever, join, liftM2, unless, when)
import Control.Monad.CatchIO (MonadCatchIO, onException)
import Control.Monad.IO.Class (liftIO)
import Data.Hashable (hash)
import Data.List (partition)
import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime)
import Prelude hiding (catch)
import System.Mem.Weak (addFinalizer)
import qualified Data.Vector as V
data Entry a = Entry {
entry :: a
, lastUse :: UTCTime
}
data LocalPool a = LocalPool {
inUse :: TVar Int
, entries :: TVar [Entry a]
}
data Pool a = Pool {
create :: IO a
, destroy :: a -> IO ()
, numStripes :: Int
, idleTime :: NominalDiffTime
, maxResources :: Int
, localPools :: V.Vector (LocalPool a)
}
instance Show (Pool a) where
show Pool{..} = "Pool {numStripes = " ++ show numStripes ++ ", " ++
"idleTime = " ++ show idleTime ++ ", " ++
"maxResources = " ++ show maxResources ++ "}"
createPool
:: IO a
-> (a -> IO ())
-> Int
-> NominalDiffTime
-> Int
-> IO (Pool a)
createPool create destroy numStripes idleTime maxResources = do
when (numStripes < 1) $
modError "pool " $ "invalid stripe count " ++ show numStripes
when (idleTime < 0.5) $
modError "pool " $ "invalid idle time " ++ show idleTime
when (maxResources < 1) $
modError "pool " $ "invalid maximum resource count " ++ show maxResources
localPools <- atomically . V.replicateM numStripes $
liftM2 LocalPool (newTVar 0) (newTVar [])
reaperId <- forkIO $ reaper destroy idleTime localPools
let p = Pool {
create
, destroy
, numStripes
, idleTime
, maxResources
, localPools
}
addFinalizer p $ killThread reaperId
return p
reaper :: (a -> IO ()) -> NominalDiffTime -> V.Vector (LocalPool a) -> IO ()
reaper destroy idleTime pools = forever $ do
threadDelay (1 * 1000000)
now <- getCurrentTime
let isStale Entry{..} = now `diffUTCTime` lastUse > idleTime
V.forM_ pools $ \LocalPool{..} -> do
resources <- atomically $ do
(stale,fresh) <- partition isStale <$> readTVar entries
unless (null stale) $ do
writeTVar entries fresh
modifyTVar_ inUse (subtract (length stale))
return (map entry stale)
forM_ resources $ \resource -> do
destroy resource `catch` \(_::SomeException) -> return ()
withResource :: MonadCatchIO m => Pool a -> (a -> m b) -> m b
withResource Pool{..} act = do
i <- liftIO $ ((`mod` numStripes) . hash) <$> myThreadId
let LocalPool{..} = localPools V.! i
resource <- liftIO . join . atomically $ do
ents <- readTVar entries
case ents of
(Entry{..}:es) -> writeTVar entries es >> return create
[] -> do
used <- readTVar inUse
when (used == maxResources) retry
writeTVar inUse $! used + 1
return $
create `onException` atomically (modifyTVar_ inUse (subtract 1))
ret <- act resource `onException` (liftIO $ do
destroy resource `catch` \(_::SomeException) -> return ()
atomically (modifyTVar_ inUse (subtract 1)))
liftIO $ do
now <- getCurrentTime
atomically $ modifyTVar_ entries (Entry resource now:)
return ret
#if __GLASGOW_HASKELL__ >= 700
#endif
modifyTVar_ :: TVar a -> (a -> a) -> STM ()
modifyTVar_ v f = readTVar v >>= \a -> writeTVar v $! f a
modError :: String -> String -> a
modError func msg =
error $ "Data.Pool." ++ func ++ ": " ++ msg