{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE NamedFieldPuns #-} -- | -- Module: Data.Pool -- Copyright: (c) 2013 Kim Altintop, (c) 2011 MailRank, Inc. -- License: BSD3 -- Maintainer: Kim Altintop -- Stability: experimental -- Portability: portable -- -- A high-performance striped pooling abstraction for managing flexibly-sized -- collections of resources such as database connections. -- -- This module is based on @resource-pool@. For more comprehensive -- documentation, please refer to the original package: -- -- module Data.Pool ( Pool(nStripes, idleTime, maxResources) , LocalPool , createPool , destroyResource , purgePool , putResource , takeResource , tryTakeResource , tryWithResource , withResource ) where import Control.Applicative import Control.Concurrent import Control.Concurrent.STM import Control.Monad import Control.Monad.Catch import Control.Monad.IO.Class import Data.Hashable (hash) import Data.IORef (IORef, mkWeakIORef, newIORef) import Data.List (partition) import Data.Time.Clock import Data.Vector (Vector, (!)) import qualified Data.Vector as V import Data.Word data Resource a = Resource { resource :: !a , lastUse :: !UTCTime } data LocalPool a = LocalPool { inUse :: !(TVar Word32) , resources :: !(TVar [Resource a]) , lfin :: !(IORef ()) } data Pool a = Pool { create :: IO a , destroy :: a -> IO () , nStripes :: !Word32 , idleTime :: !NominalDiffTime , maxResources :: !Word32 , localPools :: !(Vector (LocalPool a)) , fin :: !(IORef ()) } instance Show (Pool a) where show p = "Pool {nStripes = " ++ show (nStripes p) ++ ", " ++ "idleTime = " ++ show (idleTime p) ++ ", " ++ "maxResources = " ++ show (maxResources p) ++ "}" createPool :: IO a -- ^ Action to create a new resource -> (a -> IO ()) -- ^ Action to destroy a resource -> Word32 -- ^ Stripe count -> NominalDiffTime -- ^ Amount of time after which an unused resource can be released -> Word32 -- ^ Maximum number of resources per stripe -> IO (Pool a) createPool create destroy nStripes idleTime maxResources = do locals <- V.replicateM (fromIntegral nStripes) $ LocalPool <$> newTVarIO 0 <*> newTVarIO [] <*> newIORef () reapId <- forkIO $ reaper destroy idleTime locals pool <- Pool create destroy nStripes idleTime maxResources locals <$> newIORef () addFinalizer (fin pool) $ do killThread reapId purgePool pool V.forM_ locals $ \ lp -> addFinalizer (lfin lp) $ purgeLocalPool destroy lp return pool where addFinalizer ref = void . mkWeakIORef ref -- | Destroys all resources currently not in use and removes them from the pool. -- -- Note that resources are automatically released when the 'Pool' is -- garbage-collected. This function is however useful in situations where a -- 'Pool' is explicitly discarded and resources should be freed immediately. purgePool :: Pool a -> IO () purgePool p = V.forM_ (localPools p) $ purgeLocalPool (destroy p) withResource :: (MonadIO m, MonadCatch m) => Pool a -> (a -> m b) -> m b {-# SPECIALIZE withResource :: Pool a -> (a -> IO b) -> IO b #-} withResource p act = mask $ \ restore -> do (r, lp) <- takeResource p res <- restore (act r) `onException` destroyResource p lp r putResource lp r return res {-# INLINABLE withResource #-} -- | Similar to 'withResource', but only performs the action if a resource could -- be taken from the pool /without blocking/. Otherwise, 'tryWithResource' -- returns immediately with 'Nothing' (ie. the action function is /not/ called). -- Conversely, if a resource can be borrowed from the pool without blocking, the -- action is performed and it's result is returned, wrapped in a 'Just'. tryWithResource :: (MonadIO m, MonadCatch m) => Pool a -> (a -> m b) -> m (Maybe b) {-# SPECIALIZE tryWithResource :: Pool a -> (a -> IO b) -> IO (Maybe b) #-} tryWithResource p act = mask $ \ restore -> do mres <- tryTakeResource p case mres of Just (r, lp) -> do res <- restore (act r) `onException` destroyResource p lp r putResource lp r return (Just res) Nothing -> restore $ return Nothing {-# INLINABLE tryWithResource #-} takeResource :: MonadIO m => Pool a -> m (a, LocalPool a) {-# SPECIALIZE takeResource :: Pool a -> IO (a, LocalPool a) #-} takeResource p = do lp <- getLocalPool p r <- liftIO . join . atomically $ do rs <- readTVar (resources lp) case rs of (x:xs) -> do writeTVar (resources lp) xs return . return . resource $ x [] -> do used <- readTVar (inUse lp) when (used == maxResources p) retry writeTVar (inUse lp) $! used + 1 return $ liftIO (create p) `onException` modify_ (inUse lp) (subtract 1) return (r, lp) {-# INLINABLE takeResource #-} -- | A non-blocking version of 'takeResource'. The 'tryTakeResource' function -- returns immediately, with 'Nothing' if the pool is exhausted, or @'Just' (a, -- 'LocalPool' a)@ if a resource could be borrowed from the pool successfully. tryTakeResource :: MonadIO m => Pool a -> m (Maybe (a, LocalPool a)) {-# SPECIALIZE tryTakeResource :: Pool a -> IO (Maybe (a, LocalPool a)) #-} tryTakeResource p = do lp <- getLocalPool p r <- liftIO . join . atomically $ do rs <- readTVar (resources lp) case rs of (x:xs) -> do writeTVar (resources lp) xs return . return . Just . resource $ x [] -> do used <- readTVar (inUse lp) if used == maxResources p then return . return $ Nothing else do writeTVar (inUse lp) $! used + 1 return $ Just <$> liftIO (create p) `onException` modify_ (inUse lp) (subtract 1) return $ flip (,) lp <$> r {-# INLINABLE tryTakeResource #-} putResource :: MonadIO m => LocalPool a -> a -> m () {-# SPECIALIZE putResource :: LocalPool a -> a -> IO () #-} putResource lp r = liftIO $ do now <- getCurrentTime atomically $ modifyTVar' (resources lp) (Resource r now:) {-# INLINABLE putResource #-} destroyResource :: MonadIO m => Pool a -> LocalPool a -> a -> m () {-# SPECIALIZE destroyResource :: Pool a -> LocalPool a -> a -> IO () #-} destroyResource p lp r = liftIO $ do destroy p r `catch` \(_::SomeException) -> return () modify_ (inUse lp) (subtract 1) {-# INLINABLE destroyResource #-} -------------------------------------------------------------------------------- -- Internal -- -------------------------------------------------------------------------------- reaper :: (a -> IO ()) -> NominalDiffTime -> Vector (LocalPool a) -> IO () reaper destroy idleTime pools = forever $ do threadDelay (1 * 1000000) now <- getCurrentTime let isStale r = now `diffUTCTime` lastUse r > idleTime V.forM_ pools $ \ (LocalPool inUse resources _) -> do rs <- atomically $ do (stale,fresh) <- partition isStale <$> readTVar resources unless (null stale) $ do writeTVar resources fresh modifyTVar' inUse $ subtract (fromIntegral (length stale)) return (map resource stale) forM_ rs $ \ r -> liftIO (destroy r) `catch` \(_::SomeException) -> return () purgeLocalPool :: (a -> IO ()) -> LocalPool a -> IO () purgeLocalPool destroy (LocalPool inUse resources _) = do rs <- atomically $ do rs <- readTVar resources modifyTVar' inUse $ subtract (fromIntegral (length rs)) modifyTVar' resources $ const [] return rs forM_ rs $ \ r -> liftIO (destroy (resource r)) `catch` \(_::SomeException) -> return () {-# INLINABLE purgeLocalPool #-} getLocalPool :: MonadIO m => Pool a -> m (LocalPool a) {-# SPECIALIZE getLocalPool :: Pool a -> IO (LocalPool a) #-} getLocalPool p = do i <- liftIO $ ((`mod` fromIntegral (nStripes p)) . hash) <$> myThreadId return $ localPools p ! i {-# INLINABLE getLocalPool #-} modify_ :: TVar a -> (a -> a) -> IO () modify_ t f = atomically $ modifyTVar' t f {-# INLINABLE modify_ #-}