{-# LANGUAGE CPP, NamedFieldPuns, RecordWildCards, ScopedTypeVariables, RankNTypes, DeriveDataTypeable #-}

#if !MIN_VERSION_base(4,3,0)
{-# LANGUAGE RankNTypes #-}
#endif

-- |
-- Module:      Data.Pool
-- Copyright:   (c) 2011 MailRank, Inc.
-- License:     BSD3
-- Maintainer:  Bryan O'Sullivan <bos@serpentine.com>,
--              Bas van Dijk <v.dijk.bas@gmail.com>
-- Stability:   experimental
-- Portability: portable
--
-- A high-performance striped pooling abstraction for managing
-- flexibly-sized collections of resources such as database
-- connections.
--
-- \"Striped\" means that a single 'Pool' consists of several
-- sub-pools, each managed independently.  A single stripe is fine for
-- many applications, and probably what you should choose by default.
-- More stripes will lead to reduced contention in high-performance
-- multicore applications, at a trade-off of causing the maximum
-- number of simultaneous resources in use to grow.
module Data.Pool
    (
      Pool(idleTime, maxResources, numStripes)
    , LocalPool
    , Stats(..)
    , PoolStats(..)
    , createPool
    , withResource
    , takeResource
    , tryWithResource
    , tryTakeResource
    , destroyResource
    , putResource
    , destroyAllResources
    , stats
    ) where

import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread, myThreadId, threadDelay)
import Control.Concurrent.STM
import Control.Exception (SomeException, onException, mask_)
import Control.Monad (forM_, forever, join, liftM5, unless, when)
import Data.Hashable (hash)
import Data.IORef (IORef, newIORef, mkWeakIORef)
import Data.List (partition)
import Data.Pool.WaiterQueue (WaiterQueue, newQueueIO, push, pop)
import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime)
import Data.Typeable (Typeable)
import GHC.Conc.Sync (labelThread)
import qualified Control.Exception as E
import qualified Data.Vector as V
import UnliftIO (MonadUnliftIO, mask, withRunInIO)

-- | A single resource pool entry.
data Entry a = Entry {
      Entry a -> a
entry :: a
    , Entry a -> UTCTime
lastUse :: UTCTime
    -- ^ Time of last return.
    }


-- | Stats for a single 'LocalPool'.
data PoolStats = PoolStats {
      PoolStats -> Int
highwaterUsage :: Int
    -- ^ Highest usage since last reset.
    , PoolStats -> Int
currentUsage   :: Int
    -- ^ Current number of items.
    , PoolStats -> Int
takes          :: Int
    -- ^ Number of takes since last reset.
    , PoolStats -> Int
creates        :: Int
    -- ^ Number of creates since last reset.
    , PoolStats -> Int
createFailures :: Int
    -- ^ Number of creation failures since last reset.
} deriving (Int -> PoolStats -> ShowS
[PoolStats] -> ShowS
PoolStats -> String
(Int -> PoolStats -> ShowS)
-> (PoolStats -> String)
-> ([PoolStats] -> ShowS)
-> Show PoolStats
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PoolStats] -> ShowS
$cshowList :: [PoolStats] -> ShowS
show :: PoolStats -> String
$cshow :: PoolStats -> String
showsPrec :: Int -> PoolStats -> ShowS
$cshowsPrec :: Int -> PoolStats -> ShowS
Show)

-- | Pool-wide stats.
data Stats = Stats {
      Stats -> Vector PoolStats
perStripe :: V.Vector PoolStats
     -- ^ Stats per 'LocalPool' (stripe).
    , Stats -> PoolStats
poolStats :: PoolStats
     -- ^ Aggregate stats across pool.
} deriving (Int -> Stats -> ShowS
[Stats] -> ShowS
Stats -> String
(Int -> Stats -> ShowS)
-> (Stats -> String) -> ([Stats] -> ShowS) -> Show Stats
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Stats] -> ShowS
$cshowList :: [Stats] -> ShowS
show :: Stats -> String
$cshow :: Stats -> String
showsPrec :: Int -> Stats -> ShowS
$cshowsPrec :: Int -> Stats -> ShowS
Show)

-- | A single striped pool.
data LocalPool a = LocalPool {
      LocalPool a -> TVar Int
inUse :: TVar Int
    -- ^ Count of open entries (both idle and in use).
    , LocalPool a -> TVar [Entry a]
entries :: TVar [Entry a]
    -- ^ Idle entries.
    , LocalPool a -> TVar Int
highwaterVar :: TVar Int
    -- ^ Highest value of 'inUse' since last reset.
    , LocalPool a -> TVar Int
takeVar :: TVar Int
    -- ^ Number of takes since last reset.
    , LocalPool a -> TVar Int
createVar :: TVar Int
    -- ^ Number of creates since last reset.
    , LocalPool a -> TVar Int
createFailureVar :: TVar Int
    -- ^ Number of create failures since last reset.
    , LocalPool a -> WaiterQueue (TMVar (Maybe (Entry a)))
waiters :: WaiterQueue (TMVar (Maybe (Entry a)))
    -- ^ threads waiting for a resource
    , LocalPool a -> IORef ()
lfin :: IORef ()
    -- ^ empty value used to attach a finalizer to (internal)
    } deriving (Typeable)

data Pool a = Pool {
      Pool a -> IO a
create :: IO a
    -- ^ Action for creating a new entry to add to the pool.
    , Pool a -> a -> IO ()
destroy :: a -> IO ()
    -- ^ Action for destroying an entry that is now done with.
    , Pool a -> Int
numStripes :: Int
    -- ^ The number of stripes (distinct sub-pools) to maintain.
    -- The smallest acceptable value is 1.
    , Pool a -> NominalDiffTime
idleTime :: NominalDiffTime
    -- ^ Amount of time for which an unused resource is kept alive.
    -- The smallest acceptable value is 0.5 seconds.
    --
    -- The elapsed time before closing may be a little longer than
    -- requested, as the reaper thread wakes at 1-second intervals.
    , Pool a -> Int
maxResources :: Int
    -- ^ Maximum number of resources to maintain per stripe.  The
    -- smallest acceptable value is 1.
    --
    -- Requests for resources will block if this limit is reached on a
    -- single stripe, even if other stripes have idle resources
    -- available.
    , Pool a -> Vector (LocalPool a)
localPools :: V.Vector (LocalPool a)
    -- ^ Per-capability resource pools.
    , Pool a -> IORef ()
fin :: IORef ()
    -- ^ empty value used to attach a finalizer to (internal)
    } deriving (Typeable)

instance Show (Pool a) where
    show :: Pool a -> String
show Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} = String
"Pool {numStripes = " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
numStripes String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
", " String -> ShowS
forall a. [a] -> [a] -> [a]
++
                    String
"idleTime = " String -> ShowS
forall a. [a] -> [a] -> [a]
++ NominalDiffTime -> String
forall a. Show a => a -> String
show NominalDiffTime
idleTime String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
", " String -> ShowS
forall a. [a] -> [a] -> [a]
++
                    String
"maxResources = " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
maxResources String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"}"

-- | Create a striped resource pool.
--
-- Although the garbage collector will destroy all idle resources when
-- the pool is garbage collected it's recommended to manually
-- 'destroyAllResources' when you're done with the pool so that the
-- resources are freed up as soon as possible.
createPool
    :: IO a
    -- ^ Action that creates a new resource.
    -> (a -> IO ())
    -- ^ Action that destroys an existing resource.
    -> Int
    -- ^ The number of stripes (distinct sub-pools) to maintain.
    -- The smallest acceptable value is 1.
    -> NominalDiffTime
    -- ^ Amount of time for which an unused resource is kept open.
    -- The smallest acceptable value is 0.5 seconds.
    --
    -- The elapsed time before destroying a resource may be a little
    -- longer than requested, as the reaper thread wakes at 1-second
    -- intervals.
    -> Int
    -- ^ Maximum number of resources to keep open per stripe.  The
    -- smallest acceptable value is 1.
    --
    -- Requests for resources will block if this limit is reached on a
    -- single stripe, even if other stripes have idle resources
    -- available.
     -> IO (Pool a)
createPool :: IO a
-> (a -> IO ()) -> Int -> NominalDiffTime -> Int -> IO (Pool a)
createPool IO a
create a -> IO ()
destroy Int
numStripes NominalDiffTime
idleTime Int
maxResources = do
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
numStripes Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    String -> String -> IO ()
forall a. String -> String -> a
modError String
"pool " (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"invalid stripe count " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
numStripes
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NominalDiffTime
idleTime NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
< NominalDiffTime
0.5) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    String -> String -> IO ()
forall a. String -> String -> a
modError String
"pool " (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"invalid idle time " String -> ShowS
forall a. [a] -> [a] -> [a]
++ NominalDiffTime -> String
forall a. Show a => a -> String
show NominalDiffTime
idleTime
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
maxResources Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    String -> String -> IO ()
forall a. String -> String -> a
modError String
"pool " (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"invalid maximum resource count " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
maxResources
  Vector (LocalPool a)
localPools <- Int -> IO (LocalPool a) -> IO (Vector (LocalPool a))
forall (m :: * -> *) a. Monad m => Int -> m a -> m (Vector a)
V.replicateM Int
numStripes (IO (LocalPool a) -> IO (Vector (LocalPool a)))
-> IO (LocalPool a) -> IO (Vector (LocalPool a))
forall a b. (a -> b) -> a -> b
$
                TVar Int
-> TVar [Entry a]
-> TVar Int
-> TVar Int
-> TVar Int
-> TVar Int
-> WaiterQueue (TMVar (Maybe (Entry a)))
-> IORef ()
-> LocalPool a
forall a.
TVar Int
-> TVar [Entry a]
-> TVar Int
-> TVar Int
-> TVar Int
-> TVar Int
-> WaiterQueue (TMVar (Maybe (Entry a)))
-> IORef ()
-> LocalPool a
LocalPool (TVar Int
 -> TVar [Entry a]
 -> TVar Int
 -> TVar Int
 -> TVar Int
 -> TVar Int
 -> WaiterQueue (TMVar (Maybe (Entry a)))
 -> IORef ()
 -> LocalPool a)
-> IO (TVar Int)
-> IO
     (TVar [Entry a]
      -> TVar Int
      -> TVar Int
      -> TVar Int
      -> TVar Int
      -> WaiterQueue (TMVar (Maybe (Entry a)))
      -> IORef ()
      -> LocalPool a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0 IO
  (TVar [Entry a]
   -> TVar Int
   -> TVar Int
   -> TVar Int
   -> TVar Int
   -> WaiterQueue (TMVar (Maybe (Entry a)))
   -> IORef ()
   -> LocalPool a)
-> IO (TVar [Entry a])
-> IO
     (TVar Int
      -> TVar Int
      -> TVar Int
      -> TVar Int
      -> WaiterQueue (TMVar (Maybe (Entry a)))
      -> IORef ()
      -> LocalPool a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> [Entry a] -> IO (TVar [Entry a])
forall a. a -> IO (TVar a)
newTVarIO [] IO
  (TVar Int
   -> TVar Int
   -> TVar Int
   -> TVar Int
   -> WaiterQueue (TMVar (Maybe (Entry a)))
   -> IORef ()
   -> LocalPool a)
-> IO (TVar Int)
-> IO
     (TVar Int
      -> TVar Int
      -> TVar Int
      -> WaiterQueue (TMVar (Maybe (Entry a)))
      -> IORef ()
      -> LocalPool a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0 IO
  (TVar Int
   -> TVar Int
   -> TVar Int
   -> WaiterQueue (TMVar (Maybe (Entry a)))
   -> IORef ()
   -> LocalPool a)
-> IO (TVar Int)
-> IO
     (TVar Int
      -> TVar Int
      -> WaiterQueue (TMVar (Maybe (Entry a)))
      -> IORef ()
      -> LocalPool a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0 IO
  (TVar Int
   -> TVar Int
   -> WaiterQueue (TMVar (Maybe (Entry a)))
   -> IORef ()
   -> LocalPool a)
-> IO (TVar Int)
-> IO
     (TVar Int
      -> WaiterQueue (TMVar (Maybe (Entry a)))
      -> IORef ()
      -> LocalPool a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0 IO
  (TVar Int
   -> WaiterQueue (TMVar (Maybe (Entry a)))
   -> IORef ()
   -> LocalPool a)
-> IO (TVar Int)
-> IO
     (WaiterQueue (TMVar (Maybe (Entry a))) -> IORef () -> LocalPool a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0 IO
  (WaiterQueue (TMVar (Maybe (Entry a))) -> IORef () -> LocalPool a)
-> IO (WaiterQueue (TMVar (Maybe (Entry a))))
-> IO (IORef () -> LocalPool a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO (WaiterQueue (TMVar (Maybe (Entry a))))
forall a. IO (WaiterQueue a)
newQueueIO IO (IORef () -> LocalPool a) -> IO (IORef ()) -> IO (LocalPool a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()
  ThreadId
reaperId <- String -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOLabeledWithUnmask String
"resource-pool: reaper" (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask ->
                IO () -> IO ()
forall a. IO a -> IO a
unmask (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (a -> IO ()) -> NominalDiffTime -> Vector (LocalPool a) -> IO ()
forall a.
(a -> IO ()) -> NominalDiffTime -> Vector (LocalPool a) -> IO ()
reaper a -> IO ()
destroy NominalDiffTime
idleTime Vector (LocalPool a)
localPools
  IORef ()
fin <- () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()
  let p :: Pool a
p = Pool :: forall a.
IO a
-> (a -> IO ())
-> Int
-> NominalDiffTime
-> Int
-> Vector (LocalPool a)
-> IORef ()
-> Pool a
Pool {
            IO a
create :: IO a
create :: IO a
create
          , a -> IO ()
destroy :: a -> IO ()
destroy :: a -> IO ()
destroy
          , Int
numStripes :: Int
numStripes :: Int
numStripes
          , NominalDiffTime
idleTime :: NominalDiffTime
idleTime :: NominalDiffTime
idleTime
          , Int
maxResources :: Int
maxResources :: Int
maxResources
          , Vector (LocalPool a)
localPools :: Vector (LocalPool a)
localPools :: Vector (LocalPool a)
localPools
          , IORef ()
fin :: IORef ()
fin :: IORef ()
fin
          }
  IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
fin (ThreadId -> IO ()
killThread ThreadId
reaperId) IO (Weak (IORef ())) -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
    (LocalPool a -> IO (Weak (IORef ())))
-> Vector (LocalPool a) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> Vector a -> m ()
V.mapM_ (\LocalPool a
lp -> IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef (LocalPool a -> IORef ()
forall a. LocalPool a -> IORef ()
lfin LocalPool a
lp) ((a -> IO ()) -> LocalPool a -> IO ()
forall a. (a -> IO ()) -> LocalPool a -> IO ()
purgeLocalPool a -> IO ()
destroy LocalPool a
lp)) Vector (LocalPool a)
localPools
  Pool a -> IO (Pool a)
forall (m :: * -> *) a. Monad m => a -> m a
return Pool a
p

-- TODO: Propose 'forkIOLabeledWithUnmask' for the base library.

-- | Sparks off a new thread using 'forkIOWithUnmask' to run the given
-- IO computation, but first labels the thread with the given label
-- (using 'labelThread').
--
-- The implementation makes sure that asynchronous exceptions are
-- masked until the given computation is executed. This ensures the
-- thread will always be labeled which guarantees you can always
-- easily find it in the GHC event log.
--
-- Like 'forkIOWithUnmask', the given computation is given a function
-- to unmask asynchronous exceptions. See the documentation of that
-- function for the motivation of this.
--
-- Returns the 'ThreadId' of the newly created thread.
forkIOLabeledWithUnmask :: String
                        -> ((forall a. IO a -> IO a) -> IO ())
                        -> IO ThreadId
forkIOLabeledWithUnmask :: String -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOLabeledWithUnmask String
label (forall a. IO a -> IO a) -> IO ()
m = IO ThreadId -> IO ThreadId
forall a. IO a -> IO a
mask_ (IO ThreadId -> IO ThreadId) -> IO ThreadId -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> do
                                    ThreadId
tid <- IO ThreadId
myThreadId
                                    ThreadId -> String -> IO ()
labelThread ThreadId
tid String
label
                                    (forall a. IO a -> IO a) -> IO ()
m forall a. IO a -> IO a
unmask

-- | Periodically go through all pools, closing any resources that
-- have been left idle for too long.
reaper :: (a -> IO ()) -> NominalDiffTime -> V.Vector (LocalPool a) -> IO ()
reaper :: (a -> IO ()) -> NominalDiffTime -> Vector (LocalPool a) -> IO ()
reaper a -> IO ()
destroy NominalDiffTime
idleTime Vector (LocalPool a)
pools = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  Int -> IO ()
threadDelay (Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000)
  UTCTime
now <- IO UTCTime
getCurrentTime
  let isStale :: Entry a -> Bool
isStale Entry{a
UTCTime
lastUse :: UTCTime
entry :: a
lastUse :: forall a. Entry a -> UTCTime
entry :: forall a. Entry a -> a
..} = UTCTime
now UTCTime -> UTCTime -> NominalDiffTime
`diffUTCTime` UTCTime
lastUse NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
> NominalDiffTime
idleTime
  Vector (LocalPool a) -> (LocalPool a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => Vector a -> (a -> m b) -> m ()
V.forM_ Vector (LocalPool a)
pools ((LocalPool a -> IO ()) -> IO ())
-> (LocalPool a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \LocalPool{TVar Int
TVar [Entry a]
IORef ()
WaiterQueue (TMVar (Maybe (Entry a)))
lfin :: IORef ()
waiters :: WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: TVar Int
createVar :: TVar Int
takeVar :: TVar Int
highwaterVar :: TVar Int
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
waiters :: forall a. LocalPool a -> WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: forall a. LocalPool a -> TVar Int
createVar :: forall a. LocalPool a -> TVar Int
takeVar :: forall a. LocalPool a -> TVar Int
highwaterVar :: forall a. LocalPool a -> TVar Int
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} -> do
    [a]
resources <- STM [a] -> IO [a]
forall a. STM a -> IO a
atomically (STM [a] -> IO [a]) -> STM [a] -> IO [a]
forall a b. (a -> b) -> a -> b
$ do
      ([Entry a]
stale,[Entry a]
fresh) <- (Entry a -> Bool) -> [Entry a] -> ([Entry a], [Entry a])
forall a. (a -> Bool) -> [a] -> ([a], [a])
partition Entry a -> Bool
forall a. Entry a -> Bool
isStale ([Entry a] -> ([Entry a], [Entry a]))
-> STM [Entry a] -> STM ([Entry a], [Entry a])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar [Entry a] -> STM [Entry a]
forall a. TVar a -> STM a
readTVar TVar [Entry a]
entries
      Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([Entry a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Entry a]
stale) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
        TVar [Entry a] -> [Entry a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [Entry a]
entries [Entry a]
fresh
        TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
inUse (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract ([Entry a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Entry a]
stale))
      [a] -> STM [a]
forall (m :: * -> *) a. Monad m => a -> m a
return ((Entry a -> a) -> [Entry a] -> [a]
forall a b. (a -> b) -> [a] -> [b]
map Entry a -> a
forall a. Entry a -> a
entry [Entry a]
stale)
    [a] -> (a -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [a]
resources ((a -> IO ()) -> IO ()) -> (a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \a
resource -> do
      a -> IO ()
destroy a
resource IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` \(SomeException
_::SomeException) -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Destroy all idle resources of the given 'LocalPool' and remove them from
-- the pool.
purgeLocalPool :: (a -> IO ()) -> LocalPool a -> IO ()
purgeLocalPool :: (a -> IO ()) -> LocalPool a -> IO ()
purgeLocalPool a -> IO ()
destroy LocalPool{TVar Int
TVar [Entry a]
IORef ()
WaiterQueue (TMVar (Maybe (Entry a)))
lfin :: IORef ()
waiters :: WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: TVar Int
createVar :: TVar Int
takeVar :: TVar Int
highwaterVar :: TVar Int
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
waiters :: forall a. LocalPool a -> WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: forall a. LocalPool a -> TVar Int
createVar :: forall a. LocalPool a -> TVar Int
takeVar :: forall a. LocalPool a -> TVar Int
highwaterVar :: forall a. LocalPool a -> TVar Int
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} = do
  [a]
resources <- STM [a] -> IO [a]
forall a. STM a -> IO a
atomically (STM [a] -> IO [a]) -> STM [a] -> IO [a]
forall a b. (a -> b) -> a -> b
$ do
    [Entry a]
idle <- TVar [Entry a] -> [Entry a] -> STM [Entry a]
forall a. TVar a -> a -> STM a
swapTVar TVar [Entry a]
entries []
    TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
inUse (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract ([Entry a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Entry a]
idle))
    [a] -> STM [a]
forall (m :: * -> *) a. Monad m => a -> m a
return ((Entry a -> a) -> [Entry a] -> [a]
forall a b. (a -> b) -> [a] -> [b]
map Entry a -> a
forall a. Entry a -> a
entry [Entry a]
idle)
  [a] -> (a -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [a]
resources ((a -> IO ()) -> IO ()) -> (a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \a
resource ->
    a -> IO ()
destroy a
resource IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` \(SomeException
_::SomeException) -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Temporarily take a resource from a 'Pool', perform an action with
-- it, and return it to the pool afterwards.
--
-- * If the pool has an idle resource available, it is used
--   immediately.
--
-- * Otherwise, if the maximum number of resources has not yet been
--   reached, a new resource is created and used.
--
-- * If the maximum number of resources has been reached, this
--   function blocks until a resource becomes available.
--
-- If the action throws an exception of any type, the resource is
-- destroyed, and not returned to the pool.
--
-- It probably goes without saying that you should never manually
-- destroy a pooled resource, as doing so will almost certainly cause
-- a subsequent user (who expects the resource to be valid) to throw
-- an exception.
withResource :: MonadUnliftIO m => Pool a -> (a -> m b) -> m b
{-# SPECIALIZE withResource :: Pool a -> (a -> IO b) -> IO b #-}
withResource :: Pool a -> (a -> m b) -> m b
withResource Pool a
pool a -> m b
act = ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO b) -> m b)
-> ((forall a. m a -> IO a) -> IO b) -> m b
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
runInIO -> ((forall a. IO a -> IO a) -> IO b) -> IO b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. IO a -> IO a) -> IO b) -> IO b)
-> ((forall a. IO a -> IO a) -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
  (a
resource, LocalPool a
local) <- Pool a -> IO (a, LocalPool a)
forall a. Pool a -> IO (a, LocalPool a)
takeResource Pool a
pool
  b
ret <- IO b -> IO b
forall a. IO a -> IO a
restore (m b -> IO b
forall a. m a -> IO a
runInIO (a -> m b
act a
resource)) IO b -> IO () -> IO b
forall a b. IO a -> IO b -> IO a
`onException`
            Pool a -> LocalPool a -> a -> IO ()
forall a. Pool a -> LocalPool a -> a -> IO ()
destroyResource Pool a
pool LocalPool a
local a
resource
  LocalPool a -> a -> IO ()
forall a. LocalPool a -> a -> IO ()
putResource LocalPool a
local a
resource
  b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return b
ret
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE withResource #-}
#endif

-- | Take a resource from the pool, following the same results as
-- 'withResource'. Note that this function should be used with caution, as
-- improper exception handling can lead to leaked resources.
--
-- This function returns both a resource and the @LocalPool@ it came from so
-- that it may either be destroyed (via 'destroyResource') or returned to the
-- pool (via 'putResource').
takeResource :: Pool a -> IO (a, LocalPool a)
takeResource :: Pool a -> IO (a, LocalPool a)
takeResource pool :: Pool a
pool@Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} = do
  local :: LocalPool a
local@LocalPool{TVar Int
TVar [Entry a]
IORef ()
WaiterQueue (TMVar (Maybe (Entry a)))
lfin :: IORef ()
waiters :: WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: TVar Int
createVar :: TVar Int
takeVar :: TVar Int
highwaterVar :: TVar Int
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
waiters :: forall a. LocalPool a -> WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: forall a. LocalPool a -> TVar Int
createVar :: forall a. LocalPool a -> TVar Int
takeVar :: forall a. LocalPool a -> TVar Int
highwaterVar :: forall a. LocalPool a -> TVar Int
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} <- Pool a -> IO (LocalPool a)
forall a. Pool a -> IO (LocalPool a)
getLocalPool Pool a
pool
  a
resource <- IO (IO a) -> IO a
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO a) -> IO a)
-> (STM (IO a) -> IO (IO a)) -> STM (IO a) -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (IO a) -> IO (IO a)
forall a. STM a -> IO a
atomically (STM (IO a) -> IO a) -> STM (IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ do
    TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
takeVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
    [Entry a]
ents <- TVar [Entry a] -> STM [Entry a]
forall a. TVar a -> STM a
readTVar TVar [Entry a]
entries
    case [Entry a]
ents of
      (Entry{a
UTCTime
lastUse :: UTCTime
entry :: a
lastUse :: forall a. Entry a -> UTCTime
entry :: forall a. Entry a -> a
..}:[Entry a]
es) -> TVar [Entry a] -> [Entry a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [Entry a]
entries [Entry a]
es STM () -> STM (IO a) -> STM (IO a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO a -> STM (IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
entry)
      [] -> do
        Int
used <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
inUse
        case Int
used Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxResources of
          Bool
False -> do
            TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
inUse (Int -> STM ()) -> Int -> STM ()
forall a b. (a -> b) -> a -> b
$! Int
used Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
            TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
highwaterVar (Int -> Int -> Int
forall a. Ord a => a -> a -> a
`max` (Int
used Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1))
            TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
createVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            IO a -> STM (IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (IO a -> STM (IO a)) -> IO a -> STM (IO a)
forall a b. (a -> b) -> a -> b
$
              IO a
create IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`onException` STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
createFailureVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) STM () -> STM () -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> LocalPool a -> STM ()
forall a. LocalPool a -> STM ()
destroyResourceSTM LocalPool a
local)
          Bool
True -> do
            TMVar (Maybe (Entry a))
var <- STM (TMVar (Maybe (Entry a)))
forall a. STM (TMVar a)
newEmptyTMVar
            STM ()
removeSelf <- WaiterQueue (TMVar (Maybe (Entry a)))
-> TMVar (Maybe (Entry a)) -> STM (STM ())
forall a. WaiterQueue a -> a -> STM (STM ())
push WaiterQueue (TMVar (Maybe (Entry a)))
waiters TMVar (Maybe (Entry a))
var
            let getResource :: Maybe (Entry a) -> IO a
getResource Maybe (Entry a)
x = case Maybe (Entry a)
x of
                  Just Entry a
y -> a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Entry a -> a
forall a. Entry a -> a
entry Entry a
y)
                  Maybe (Entry a)
Nothing -> IO a
create IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`onException` STM () -> IO ()
forall a. STM a -> IO a
atomically (LocalPool a -> STM ()
forall a. LocalPool a -> STM ()
destroyResourceSTM LocalPool a
local)
            let dequeue :: IO ()
dequeue = do
                  Maybe (Maybe (Entry a))
maybeEntry <- STM (Maybe (Maybe (Entry a))) -> IO (Maybe (Maybe (Entry a)))
forall a. STM a -> IO a
atomically (STM (Maybe (Maybe (Entry a))) -> IO (Maybe (Maybe (Entry a))))
-> STM (Maybe (Maybe (Entry a))) -> IO (Maybe (Maybe (Entry a)))
forall a b. (a -> b) -> a -> b
$ do
                    STM ()
removeSelf
                    TMVar (Maybe (Entry a)) -> STM (Maybe (Maybe (Entry a)))
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar TMVar (Maybe (Entry a))
var
                  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ case Maybe (Maybe (Entry a))
maybeEntry of
                    Maybe (Maybe (Entry a))
Nothing -> () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                    Just Maybe (Entry a)
Nothing -> LocalPool a -> STM ()
forall a. LocalPool a -> STM ()
destroyResourceSTM LocalPool a
local
                    Just (Just Entry a
v) -> LocalPool a -> Entry a -> STM ()
forall a. LocalPool a -> Entry a -> STM ()
putResourceSTM LocalPool a
local Entry a
v
            IO a -> STM (IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Entry a) -> IO a
getResource (Maybe (Entry a) -> IO a) -> IO (Maybe (Entry a)) -> IO a
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STM (Maybe (Entry a)) -> IO (Maybe (Entry a))
forall a. STM a -> IO a
atomically (TMVar (Maybe (Entry a)) -> STM (Maybe (Entry a))
forall a. TMVar a -> STM a
takeTMVar TMVar (Maybe (Entry a))
var) IO (Maybe (Entry a)) -> IO () -> IO (Maybe (Entry a))
forall a b. IO a -> IO b -> IO a
`onException` IO ()
dequeue)
  (a, LocalPool a) -> IO (a, LocalPool a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
resource, LocalPool a
local)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE takeResource #-}
#endif

-- | Similar to 'withResource', but only performs the action if a resource could
-- be taken from the pool /without blocking/. Otherwise, 'tryWithResource'
-- returns immediately with 'Nothing' (ie. the action function is /not/ called).
-- Conversely, if a resource can be borrowed from the pool without blocking, the
-- action is performed and it's result is returned, wrapped in a 'Just'.
tryWithResource :: forall m a b. MonadUnliftIO m => Pool a -> (a -> m b) -> m (Maybe b)
tryWithResource :: Pool a -> (a -> m b) -> m (Maybe b)
tryWithResource Pool a
pool a -> m b
act = ((forall a. m a -> IO a) -> IO (Maybe b)) -> m (Maybe b)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (Maybe b)) -> m (Maybe b))
-> ((forall a. m a -> IO a) -> IO (Maybe b)) -> m (Maybe b)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
runInIO -> ((forall a. IO a -> IO a) -> IO (Maybe b)) -> IO (Maybe b)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. IO a -> IO a) -> IO (Maybe b)) -> IO (Maybe b))
-> ((forall a. IO a -> IO a) -> IO (Maybe b)) -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
  Maybe (a, LocalPool a)
res <- Pool a -> IO (Maybe (a, LocalPool a))
forall a. Pool a -> IO (Maybe (a, LocalPool a))
tryTakeResource Pool a
pool
  case Maybe (a, LocalPool a)
res of
    Just (a
resource, LocalPool a
local) -> do
      Maybe b
ret <- IO (Maybe b) -> IO (Maybe b)
forall a. IO a -> IO a
restore (m (Maybe b) -> IO (Maybe b)
forall a. m a -> IO a
runInIO (b -> Maybe b
forall a. a -> Maybe a
Just (b -> Maybe b) -> m b -> m (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> m b
act a
resource)) IO (Maybe b) -> IO () -> IO (Maybe b)
forall a b. IO a -> IO b -> IO a
`onException`
                Pool a -> LocalPool a -> a -> IO ()
forall a. Pool a -> LocalPool a -> a -> IO ()
destroyResource Pool a
pool LocalPool a
local a
resource
      LocalPool a -> a -> IO ()
forall a. LocalPool a -> a -> IO ()
putResource LocalPool a
local a
resource
      Maybe b -> IO (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
ret
    Maybe (a, LocalPool a)
Nothing -> IO (Maybe b) -> IO (Maybe b)
forall a. IO a -> IO a
restore (IO (Maybe b) -> IO (Maybe b))
-> (m (Maybe b) -> IO (Maybe b)) -> m (Maybe b) -> IO (Maybe b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (Maybe b) -> IO (Maybe b)
forall a. m a -> IO a
runInIO (m (Maybe b) -> IO (Maybe b)) -> m (Maybe b) -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ Maybe b -> m (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe b
forall a. Maybe a
Nothing :: Maybe b)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE tryWithResource #-}
#endif

-- | A non-blocking version of 'takeResource'. The 'tryTakeResource' function
-- returns immediately, with 'Nothing' if the pool is exhausted, or @'Just' (a,
-- 'LocalPool' a)@ if a resource could be borrowed from the pool successfully.
tryTakeResource :: Pool a -> IO (Maybe (a, LocalPool a))
tryTakeResource :: Pool a -> IO (Maybe (a, LocalPool a))
tryTakeResource pool :: Pool a
pool@Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} = do
  local :: LocalPool a
local@LocalPool{TVar Int
TVar [Entry a]
IORef ()
WaiterQueue (TMVar (Maybe (Entry a)))
lfin :: IORef ()
waiters :: WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: TVar Int
createVar :: TVar Int
takeVar :: TVar Int
highwaterVar :: TVar Int
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
waiters :: forall a. LocalPool a -> WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: forall a. LocalPool a -> TVar Int
createVar :: forall a. LocalPool a -> TVar Int
takeVar :: forall a. LocalPool a -> TVar Int
highwaterVar :: forall a. LocalPool a -> TVar Int
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} <- Pool a -> IO (LocalPool a)
forall a. Pool a -> IO (LocalPool a)
getLocalPool Pool a
pool
  Maybe a
resource <- IO (IO (Maybe a)) -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO (Maybe a)) -> IO (Maybe a))
-> (STM (IO (Maybe a)) -> IO (IO (Maybe a)))
-> STM (IO (Maybe a))
-> IO (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (IO (Maybe a)) -> IO (IO (Maybe a))
forall a. STM a -> IO a
atomically (STM (IO (Maybe a)) -> IO (Maybe a))
-> STM (IO (Maybe a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ do
    [Entry a]
ents <- TVar [Entry a] -> STM [Entry a]
forall a. TVar a -> STM a
readTVar TVar [Entry a]
entries
    case [Entry a]
ents of
      (Entry{a
UTCTime
lastUse :: UTCTime
entry :: a
lastUse :: forall a. Entry a -> UTCTime
entry :: forall a. Entry a -> a
..}:[Entry a]
es) -> TVar [Entry a] -> [Entry a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [Entry a]
entries [Entry a]
es STM () -> STM (IO (Maybe a)) -> STM (IO (Maybe a))
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (Maybe a) -> STM (IO (Maybe a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)) -> (a -> Maybe a) -> a -> IO (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just (a -> IO (Maybe a)) -> a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ a
entry)
      [] -> do
        Int
used <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
inUse
        if Int
used Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxResources
          then IO (Maybe a) -> STM (IO (Maybe a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing)
          else do
            TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
inUse (Int -> STM ()) -> Int -> STM ()
forall a b. (a -> b) -> a -> b
$! Int
used Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
            IO (Maybe a) -> STM (IO (Maybe a))
forall (m :: * -> *) a. Monad m => a -> m a
return (IO (Maybe a) -> STM (IO (Maybe a)))
-> IO (Maybe a) -> STM (IO (Maybe a))
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> IO a -> IO (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
              IO a
create IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`onException` STM () -> IO ()
forall a. STM a -> IO a
atomically (LocalPool a -> STM ()
forall a. LocalPool a -> STM ()
destroyResourceSTM LocalPool a
local)
  Maybe (a, LocalPool a) -> IO (Maybe (a, LocalPool a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (a, LocalPool a) -> IO (Maybe (a, LocalPool a)))
-> Maybe (a, LocalPool a) -> IO (Maybe (a, LocalPool a))
forall a b. (a -> b) -> a -> b
$ ((a -> LocalPool a -> (a, LocalPool a))
-> LocalPool a -> a -> (a, LocalPool a)
forall a b c. (a -> b -> c) -> b -> a -> c
flip (,) LocalPool a
local) (a -> (a, LocalPool a)) -> Maybe a -> Maybe (a, LocalPool a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe a
resource
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE tryTakeResource #-}
#endif

-- | Get a (Thread-)'LocalPool'
--
-- Internal, just to not repeat code for 'takeResource' and 'tryTakeResource'
getLocalPool :: Pool a -> IO (LocalPool a)
getLocalPool :: Pool a -> IO (LocalPool a)
getLocalPool Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} = do
  Int
i <- ((Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
numStripes) (Int -> Int) -> (ThreadId -> Int) -> ThreadId -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ThreadId -> Int
forall a. Hashable a => a -> Int
hash) (ThreadId -> Int) -> IO ThreadId -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO ThreadId
myThreadId
  LocalPool a -> IO (LocalPool a)
forall (m :: * -> *) a. Monad m => a -> m a
return (LocalPool a -> IO (LocalPool a))
-> LocalPool a -> IO (LocalPool a)
forall a b. (a -> b) -> a -> b
$ Vector (LocalPool a)
localPools Vector (LocalPool a) -> Int -> LocalPool a
forall a. Vector a -> Int -> a
V.! Int
i
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE getLocalPool #-}
#endif

-- | Destroy a resource. Note that this will ignore any exceptions in the
-- destroy function.
destroyResource :: Pool a -> LocalPool a -> a -> IO ()
destroyResource :: Pool a -> LocalPool a -> a -> IO ()
destroyResource Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} LocalPool a
local a
resource = do
   a -> IO ()
destroy a
resource IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` \(SomeException
_::SomeException) -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
   STM () -> IO ()
forall a. STM a -> IO a
atomically (LocalPool a -> STM ()
forall a. LocalPool a -> STM ()
destroyResourceSTM LocalPool a
local)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE destroyResource #-}
#endif

-- | Return a resource to the given 'LocalPool'.
putResource :: LocalPool a -> a -> IO ()
putResource :: LocalPool a -> a -> IO ()
putResource LocalPool a
lp a
resource = do
    UTCTime
now <- IO UTCTime
getCurrentTime
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ LocalPool a -> Entry a -> STM ()
forall a. LocalPool a -> Entry a -> STM ()
putResourceSTM LocalPool a
lp (a -> UTCTime -> Entry a
forall a. a -> UTCTime -> Entry a
Entry a
resource UTCTime
now)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE putResource #-}
#endif

putResourceSTM :: LocalPool a -> Entry a -> STM ()
putResourceSTM :: LocalPool a -> Entry a -> STM ()
putResourceSTM LocalPool{TVar Int
TVar [Entry a]
IORef ()
WaiterQueue (TMVar (Maybe (Entry a)))
lfin :: IORef ()
waiters :: WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: TVar Int
createVar :: TVar Int
takeVar :: TVar Int
highwaterVar :: TVar Int
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
waiters :: forall a. LocalPool a -> WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: forall a. LocalPool a -> TVar Int
createVar :: forall a. LocalPool a -> TVar Int
takeVar :: forall a. LocalPool a -> TVar Int
highwaterVar :: forall a. LocalPool a -> TVar Int
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} Entry a
resourceEntry = do
    Maybe (TMVar (Maybe (Entry a)))
mWaiters <- WaiterQueue (TMVar (Maybe (Entry a)))
-> STM (Maybe (TMVar (Maybe (Entry a))))
forall a. WaiterQueue a -> STM (Maybe a)
pop WaiterQueue (TMVar (Maybe (Entry a)))
waiters
    case Maybe (TMVar (Maybe (Entry a)))
mWaiters of
      Maybe (TMVar (Maybe (Entry a)))
Nothing -> TVar [Entry a] -> ([Entry a] -> [Entry a]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar [Entry a]
entries (Entry a
resourceEntryEntry a -> [Entry a] -> [Entry a]
forall a. a -> [a] -> [a]
:)
      Just TMVar (Maybe (Entry a))
w -> TMVar (Maybe (Entry a)) -> Maybe (Entry a) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe (Entry a))
w (Entry a -> Maybe (Entry a)
forall a. a -> Maybe a
Just Entry a
resourceEntry)
{-# INLINE putResourceSTM #-}

destroyResourceSTM :: LocalPool a -> STM ()
destroyResourceSTM :: LocalPool a -> STM ()
destroyResourceSTM LocalPool{TVar Int
TVar [Entry a]
IORef ()
WaiterQueue (TMVar (Maybe (Entry a)))
lfin :: IORef ()
waiters :: WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: TVar Int
createVar :: TVar Int
takeVar :: TVar Int
highwaterVar :: TVar Int
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
waiters :: forall a. LocalPool a -> WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: forall a. LocalPool a -> TVar Int
createVar :: forall a. LocalPool a -> TVar Int
takeVar :: forall a. LocalPool a -> TVar Int
highwaterVar :: forall a. LocalPool a -> TVar Int
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} = do
  Maybe (TMVar (Maybe (Entry a)))
mwaiter <- WaiterQueue (TMVar (Maybe (Entry a)))
-> STM (Maybe (TMVar (Maybe (Entry a))))
forall a. WaiterQueue a -> STM (Maybe a)
pop WaiterQueue (TMVar (Maybe (Entry a)))
waiters
  case Maybe (TMVar (Maybe (Entry a)))
mwaiter of
    Maybe (TMVar (Maybe (Entry a)))
Nothing -> TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
inUse (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1)
    Just TMVar (Maybe (Entry a))
w -> TMVar (Maybe (Entry a)) -> Maybe (Entry a) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe (Entry a))
w Maybe (Entry a)
forall a. Maybe a
Nothing
{-# INLINE destroyResourceSTM #-}

-- | Destroy all resources in all stripes in the pool. Note that this
-- will ignore any exceptions in the destroy function.
--
-- This function is useful when you detect that all resources in the
-- pool are broken. For example after a database has been restarted
-- all connections opened before the restart will be broken. In that
-- case it's better to close those connections so that 'takeResource'
-- won't take a broken connection from the pool but will open a new
-- connection instead.
--
-- Another use-case for this function is that when you know you are
-- done with the pool you can destroy all idle resources immediately
-- instead of waiting on the garbage collector to destroy them, thus
-- freeing up those resources sooner.
destroyAllResources :: Pool a -> IO ()
destroyAllResources :: Pool a -> IO ()
destroyAllResources Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} = Vector (LocalPool a) -> (LocalPool a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => Vector a -> (a -> m b) -> m ()
V.forM_ Vector (LocalPool a)
localPools ((LocalPool a -> IO ()) -> IO ())
-> (LocalPool a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ (a -> IO ()) -> LocalPool a -> IO ()
forall a. (a -> IO ()) -> LocalPool a -> IO ()
purgeLocalPool a -> IO ()
destroy

-- | @stats pool reset@ returns statistics on each 'LocalPool' as well as a summary across the entire Pool.
-- When @reset@ is true, the stats are reset.
stats :: Pool a -> Bool -> IO Stats
stats :: Pool a -> Bool -> IO Stats
stats Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} Bool
reset = do
  let stripeStats :: LocalPool a -> IO PoolStats
stripeStats LocalPool{TVar Int
TVar [Entry a]
IORef ()
WaiterQueue (TMVar (Maybe (Entry a)))
lfin :: IORef ()
waiters :: WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: TVar Int
createVar :: TVar Int
takeVar :: TVar Int
highwaterVar :: TVar Int
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
waiters :: forall a. LocalPool a -> WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: forall a. LocalPool a -> TVar Int
createVar :: forall a. LocalPool a -> TVar Int
takeVar :: forall a. LocalPool a -> TVar Int
highwaterVar :: forall a. LocalPool a -> TVar Int
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} = STM PoolStats -> IO PoolStats
forall a. STM a -> IO a
atomically (STM PoolStats -> IO PoolStats) -> STM PoolStats -> IO PoolStats
forall a b. (a -> b) -> a -> b
$ do
                                    PoolStats
s <- (Int -> Int -> Int -> Int -> Int -> PoolStats)
-> STM Int
-> STM Int
-> STM Int
-> STM Int
-> STM Int
-> STM PoolStats
forall (m :: * -> *) a1 a2 a3 a4 a5 r.
Monad m =>
(a1 -> a2 -> a3 -> a4 -> a5 -> r)
-> m a1 -> m a2 -> m a3 -> m a4 -> m a5 -> m r
liftM5 Int -> Int -> Int -> Int -> Int -> PoolStats
PoolStats (TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
highwaterVar) (TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
inUse) (TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
takeVar) (TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
createVar) (TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
createFailureVar)
                                    Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
reset (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
                                                 (TVar Int -> STM ()) -> [TVar Int] -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\TVar Int
v -> TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
v Int
0) [TVar Int
takeVar, TVar Int
createVar, TVar Int
createFailureVar]
                                                 TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
highwaterVar (Int -> STM ()) -> Int -> STM ()
forall a b. (a -> b) -> a -> b
$! PoolStats -> Int
currentUsage PoolStats
s
                                    PoolStats -> STM PoolStats
forall (m :: * -> *) a. Monad m => a -> m a
return PoolStats
s

  Vector PoolStats
per <- (LocalPool a -> IO PoolStats)
-> Vector (LocalPool a) -> IO (Vector PoolStats)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Vector a -> m (Vector b)
V.mapM LocalPool a -> IO PoolStats
forall a. LocalPool a -> IO PoolStats
stripeStats Vector (LocalPool a)
localPools
  let poolWide :: PoolStats
poolWide = (PoolStats -> PoolStats -> PoolStats)
-> PoolStats -> Vector PoolStats -> PoolStats
forall a b. (a -> b -> b) -> b -> Vector a -> b
V.foldr PoolStats -> PoolStats -> PoolStats
merge (Int -> Int -> Int -> Int -> Int -> PoolStats
PoolStats Int
0 Int
0 Int
0 Int
0 Int
0) Vector PoolStats
per
      merge :: PoolStats -> PoolStats -> PoolStats
merge (PoolStats Int
hw1 Int
cu1 Int
t1 Int
c1 Int
f1) (PoolStats Int
hw2 Int
cu2 Int
t2 Int
c2 Int
f2) = Int -> Int -> Int -> Int -> Int -> PoolStats
PoolStats (Int
hw1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
hw2) (Int
cu1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
cu2) (Int
t1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
t2) (Int
c1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
c2) (Int
f1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
f2)
  Stats -> IO Stats
forall (m :: * -> *) a. Monad m => a -> m a
return (Stats -> IO Stats) -> Stats -> IO Stats
forall a b. (a -> b) -> a -> b
$ Vector PoolStats -> PoolStats -> Stats
Stats Vector PoolStats
per PoolStats
poolWide

modifyTVar_ :: TVar a -> (a -> a) -> STM ()
modifyTVar_ :: TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar a
v a -> a
f = TVar a -> STM a
forall a. TVar a -> STM a
readTVar TVar a
v STM a -> (a -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
a -> TVar a -> a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar a
v (a -> STM ()) -> a -> STM ()
forall a b. (a -> b) -> a -> b
$! a -> a
f a
a

modError :: String -> String -> a
modError :: String -> String -> a
modError String
func String
msg =
    String -> a
forall a. HasCallStack => String -> a
error (String -> a) -> String -> a
forall a b. (a -> b) -> a -> b
$ String
"Data.Pool." String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
func String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
": " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
msg