{-# LANGUAGE CPP, NamedFieldPuns, RecordWildCards, ScopedTypeVariables, RankNTypes, DeriveDataTypeable #-}
#if !MIN_VERSION_base(4,3,0)
{-# LANGUAGE RankNTypes #-}
#endif
module Data.Pool
(
Pool(idleTime, maxResources, numStripes)
, LocalPool
, Stats(..)
, PoolStats(..)
, createPool
, withResource
, takeResource
, tryWithResource
, tryTakeResource
, destroyResource
, putResource
, destroyAllResources
, stats
) where
import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread, myThreadId, threadDelay)
import Control.Concurrent.STM
import Control.Exception (SomeException, onException, mask_)
import Control.Monad (forM_, forever, join, liftM5, unless, when)
import Data.Hashable (hash)
import Data.IORef (IORef, newIORef, mkWeakIORef)
import Data.List (partition)
import Data.Pool.WaiterQueue (WaiterQueue, newQueueIO, push, pop)
import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime)
import Data.Typeable (Typeable)
import GHC.Conc.Sync (labelThread)
import qualified Control.Exception as E
import qualified Data.Vector as V
import UnliftIO (MonadUnliftIO, mask, withRunInIO)
data Entry a = Entry {
Entry a -> a
entry :: a
, Entry a -> UTCTime
lastUse :: UTCTime
}
data PoolStats = PoolStats {
PoolStats -> Int
highwaterUsage :: Int
, PoolStats -> Int
currentUsage :: Int
, PoolStats -> Int
takes :: Int
, PoolStats -> Int
creates :: Int
, PoolStats -> Int
createFailures :: Int
} deriving (Int -> PoolStats -> ShowS
[PoolStats] -> ShowS
PoolStats -> String
(Int -> PoolStats -> ShowS)
-> (PoolStats -> String)
-> ([PoolStats] -> ShowS)
-> Show PoolStats
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PoolStats] -> ShowS
$cshowList :: [PoolStats] -> ShowS
show :: PoolStats -> String
$cshow :: PoolStats -> String
showsPrec :: Int -> PoolStats -> ShowS
$cshowsPrec :: Int -> PoolStats -> ShowS
Show)
data Stats = Stats {
Stats -> Vector PoolStats
perStripe :: V.Vector PoolStats
, Stats -> PoolStats
poolStats :: PoolStats
} deriving (Int -> Stats -> ShowS
[Stats] -> ShowS
Stats -> String
(Int -> Stats -> ShowS)
-> (Stats -> String) -> ([Stats] -> ShowS) -> Show Stats
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Stats] -> ShowS
$cshowList :: [Stats] -> ShowS
show :: Stats -> String
$cshow :: Stats -> String
showsPrec :: Int -> Stats -> ShowS
$cshowsPrec :: Int -> Stats -> ShowS
Show)
data LocalPool a = LocalPool {
LocalPool a -> TVar Int
inUse :: TVar Int
, LocalPool a -> TVar [Entry a]
entries :: TVar [Entry a]
, LocalPool a -> TVar Int
highwaterVar :: TVar Int
, LocalPool a -> TVar Int
takeVar :: TVar Int
, LocalPool a -> TVar Int
createVar :: TVar Int
, LocalPool a -> TVar Int
createFailureVar :: TVar Int
, LocalPool a -> WaiterQueue (TMVar (Maybe (Entry a)))
waiters :: WaiterQueue (TMVar (Maybe (Entry a)))
, LocalPool a -> IORef ()
lfin :: IORef ()
} deriving (Typeable)
data Pool a = Pool {
Pool a -> IO a
create :: IO a
, Pool a -> a -> IO ()
destroy :: a -> IO ()
, Pool a -> Int
numStripes :: Int
, Pool a -> NominalDiffTime
idleTime :: NominalDiffTime
, Pool a -> Int
maxResources :: Int
, Pool a -> Vector (LocalPool a)
localPools :: V.Vector (LocalPool a)
, Pool a -> IORef ()
fin :: IORef ()
} deriving (Typeable)
instance Show (Pool a) where
show :: Pool a -> String
show Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} = String
"Pool {numStripes = " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
numStripes String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
", " String -> ShowS
forall a. [a] -> [a] -> [a]
++
String
"idleTime = " String -> ShowS
forall a. [a] -> [a] -> [a]
++ NominalDiffTime -> String
forall a. Show a => a -> String
show NominalDiffTime
idleTime String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
", " String -> ShowS
forall a. [a] -> [a] -> [a]
++
String
"maxResources = " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
maxResources String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"}"
createPool
:: IO a
-> (a -> IO ())
-> Int
-> NominalDiffTime
-> Int
-> IO (Pool a)
createPool :: IO a
-> (a -> IO ()) -> Int -> NominalDiffTime -> Int -> IO (Pool a)
createPool IO a
create a -> IO ()
destroy Int
numStripes NominalDiffTime
idleTime Int
maxResources = do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
numStripes Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
String -> String -> IO ()
forall a. String -> String -> a
modError String
"pool " (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"invalid stripe count " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
numStripes
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NominalDiffTime
idleTime NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
< NominalDiffTime
0.5) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
String -> String -> IO ()
forall a. String -> String -> a
modError String
"pool " (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"invalid idle time " String -> ShowS
forall a. [a] -> [a] -> [a]
++ NominalDiffTime -> String
forall a. Show a => a -> String
show NominalDiffTime
idleTime
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
maxResources Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
String -> String -> IO ()
forall a. String -> String -> a
modError String
"pool " (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"invalid maximum resource count " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
maxResources
Vector (LocalPool a)
localPools <- Int -> IO (LocalPool a) -> IO (Vector (LocalPool a))
forall (m :: * -> *) a. Monad m => Int -> m a -> m (Vector a)
V.replicateM Int
numStripes (IO (LocalPool a) -> IO (Vector (LocalPool a)))
-> IO (LocalPool a) -> IO (Vector (LocalPool a))
forall a b. (a -> b) -> a -> b
$
TVar Int
-> TVar [Entry a]
-> TVar Int
-> TVar Int
-> TVar Int
-> TVar Int
-> WaiterQueue (TMVar (Maybe (Entry a)))
-> IORef ()
-> LocalPool a
forall a.
TVar Int
-> TVar [Entry a]
-> TVar Int
-> TVar Int
-> TVar Int
-> TVar Int
-> WaiterQueue (TMVar (Maybe (Entry a)))
-> IORef ()
-> LocalPool a
LocalPool (TVar Int
-> TVar [Entry a]
-> TVar Int
-> TVar Int
-> TVar Int
-> TVar Int
-> WaiterQueue (TMVar (Maybe (Entry a)))
-> IORef ()
-> LocalPool a)
-> IO (TVar Int)
-> IO
(TVar [Entry a]
-> TVar Int
-> TVar Int
-> TVar Int
-> TVar Int
-> WaiterQueue (TMVar (Maybe (Entry a)))
-> IORef ()
-> LocalPool a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0 IO
(TVar [Entry a]
-> TVar Int
-> TVar Int
-> TVar Int
-> TVar Int
-> WaiterQueue (TMVar (Maybe (Entry a)))
-> IORef ()
-> LocalPool a)
-> IO (TVar [Entry a])
-> IO
(TVar Int
-> TVar Int
-> TVar Int
-> TVar Int
-> WaiterQueue (TMVar (Maybe (Entry a)))
-> IORef ()
-> LocalPool a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> [Entry a] -> IO (TVar [Entry a])
forall a. a -> IO (TVar a)
newTVarIO [] IO
(TVar Int
-> TVar Int
-> TVar Int
-> TVar Int
-> WaiterQueue (TMVar (Maybe (Entry a)))
-> IORef ()
-> LocalPool a)
-> IO (TVar Int)
-> IO
(TVar Int
-> TVar Int
-> TVar Int
-> WaiterQueue (TMVar (Maybe (Entry a)))
-> IORef ()
-> LocalPool a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0 IO
(TVar Int
-> TVar Int
-> TVar Int
-> WaiterQueue (TMVar (Maybe (Entry a)))
-> IORef ()
-> LocalPool a)
-> IO (TVar Int)
-> IO
(TVar Int
-> TVar Int
-> WaiterQueue (TMVar (Maybe (Entry a)))
-> IORef ()
-> LocalPool a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0 IO
(TVar Int
-> TVar Int
-> WaiterQueue (TMVar (Maybe (Entry a)))
-> IORef ()
-> LocalPool a)
-> IO (TVar Int)
-> IO
(TVar Int
-> WaiterQueue (TMVar (Maybe (Entry a)))
-> IORef ()
-> LocalPool a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0 IO
(TVar Int
-> WaiterQueue (TMVar (Maybe (Entry a)))
-> IORef ()
-> LocalPool a)
-> IO (TVar Int)
-> IO
(WaiterQueue (TMVar (Maybe (Entry a))) -> IORef () -> LocalPool a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0 IO
(WaiterQueue (TMVar (Maybe (Entry a))) -> IORef () -> LocalPool a)
-> IO (WaiterQueue (TMVar (Maybe (Entry a))))
-> IO (IORef () -> LocalPool a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO (WaiterQueue (TMVar (Maybe (Entry a))))
forall a. IO (WaiterQueue a)
newQueueIO IO (IORef () -> LocalPool a) -> IO (IORef ()) -> IO (LocalPool a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()
ThreadId
reaperId <- String -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOLabeledWithUnmask String
"resource-pool: reaper" (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask ->
IO () -> IO ()
forall a. IO a -> IO a
unmask (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (a -> IO ()) -> NominalDiffTime -> Vector (LocalPool a) -> IO ()
forall a.
(a -> IO ()) -> NominalDiffTime -> Vector (LocalPool a) -> IO ()
reaper a -> IO ()
destroy NominalDiffTime
idleTime Vector (LocalPool a)
localPools
IORef ()
fin <- () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()
let p :: Pool a
p = Pool :: forall a.
IO a
-> (a -> IO ())
-> Int
-> NominalDiffTime
-> Int
-> Vector (LocalPool a)
-> IORef ()
-> Pool a
Pool {
IO a
create :: IO a
create :: IO a
create
, a -> IO ()
destroy :: a -> IO ()
destroy :: a -> IO ()
destroy
, Int
numStripes :: Int
numStripes :: Int
numStripes
, NominalDiffTime
idleTime :: NominalDiffTime
idleTime :: NominalDiffTime
idleTime
, Int
maxResources :: Int
maxResources :: Int
maxResources
, Vector (LocalPool a)
localPools :: Vector (LocalPool a)
localPools :: Vector (LocalPool a)
localPools
, IORef ()
fin :: IORef ()
fin :: IORef ()
fin
}
IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
fin (ThreadId -> IO ()
killThread ThreadId
reaperId) IO (Weak (IORef ())) -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
(LocalPool a -> IO (Weak (IORef ())))
-> Vector (LocalPool a) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> Vector a -> m ()
V.mapM_ (\LocalPool a
lp -> IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef (LocalPool a -> IORef ()
forall a. LocalPool a -> IORef ()
lfin LocalPool a
lp) ((a -> IO ()) -> LocalPool a -> IO ()
forall a. (a -> IO ()) -> LocalPool a -> IO ()
purgeLocalPool a -> IO ()
destroy LocalPool a
lp)) Vector (LocalPool a)
localPools
Pool a -> IO (Pool a)
forall (m :: * -> *) a. Monad m => a -> m a
return Pool a
p
forkIOLabeledWithUnmask :: String
-> ((forall a. IO a -> IO a) -> IO ())
-> IO ThreadId
forkIOLabeledWithUnmask :: String -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOLabeledWithUnmask String
label (forall a. IO a -> IO a) -> IO ()
m = IO ThreadId -> IO ThreadId
forall a. IO a -> IO a
mask_ (IO ThreadId -> IO ThreadId) -> IO ThreadId -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> do
ThreadId
tid <- IO ThreadId
myThreadId
ThreadId -> String -> IO ()
labelThread ThreadId
tid String
label
(forall a. IO a -> IO a) -> IO ()
m forall a. IO a -> IO a
unmask
reaper :: (a -> IO ()) -> NominalDiffTime -> V.Vector (LocalPool a) -> IO ()
reaper :: (a -> IO ()) -> NominalDiffTime -> Vector (LocalPool a) -> IO ()
reaper a -> IO ()
destroy NominalDiffTime
idleTime Vector (LocalPool a)
pools = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int -> IO ()
threadDelay (Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000)
UTCTime
now <- IO UTCTime
getCurrentTime
let isStale :: Entry a -> Bool
isStale Entry{a
UTCTime
lastUse :: UTCTime
entry :: a
lastUse :: forall a. Entry a -> UTCTime
entry :: forall a. Entry a -> a
..} = UTCTime
now UTCTime -> UTCTime -> NominalDiffTime
`diffUTCTime` UTCTime
lastUse NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
> NominalDiffTime
idleTime
Vector (LocalPool a) -> (LocalPool a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => Vector a -> (a -> m b) -> m ()
V.forM_ Vector (LocalPool a)
pools ((LocalPool a -> IO ()) -> IO ())
-> (LocalPool a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \LocalPool{TVar Int
TVar [Entry a]
IORef ()
WaiterQueue (TMVar (Maybe (Entry a)))
lfin :: IORef ()
waiters :: WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: TVar Int
createVar :: TVar Int
takeVar :: TVar Int
highwaterVar :: TVar Int
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
waiters :: forall a. LocalPool a -> WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: forall a. LocalPool a -> TVar Int
createVar :: forall a. LocalPool a -> TVar Int
takeVar :: forall a. LocalPool a -> TVar Int
highwaterVar :: forall a. LocalPool a -> TVar Int
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} -> do
[a]
resources <- STM [a] -> IO [a]
forall a. STM a -> IO a
atomically (STM [a] -> IO [a]) -> STM [a] -> IO [a]
forall a b. (a -> b) -> a -> b
$ do
([Entry a]
stale,[Entry a]
fresh) <- (Entry a -> Bool) -> [Entry a] -> ([Entry a], [Entry a])
forall a. (a -> Bool) -> [a] -> ([a], [a])
partition Entry a -> Bool
forall a. Entry a -> Bool
isStale ([Entry a] -> ([Entry a], [Entry a]))
-> STM [Entry a] -> STM ([Entry a], [Entry a])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar [Entry a] -> STM [Entry a]
forall a. TVar a -> STM a
readTVar TVar [Entry a]
entries
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([Entry a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Entry a]
stale) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
TVar [Entry a] -> [Entry a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [Entry a]
entries [Entry a]
fresh
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
inUse (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract ([Entry a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Entry a]
stale))
[a] -> STM [a]
forall (m :: * -> *) a. Monad m => a -> m a
return ((Entry a -> a) -> [Entry a] -> [a]
forall a b. (a -> b) -> [a] -> [b]
map Entry a -> a
forall a. Entry a -> a
entry [Entry a]
stale)
[a] -> (a -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [a]
resources ((a -> IO ()) -> IO ()) -> (a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \a
resource -> do
a -> IO ()
destroy a
resource IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` \(SomeException
_::SomeException) -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
purgeLocalPool :: (a -> IO ()) -> LocalPool a -> IO ()
purgeLocalPool :: (a -> IO ()) -> LocalPool a -> IO ()
purgeLocalPool a -> IO ()
destroy LocalPool{TVar Int
TVar [Entry a]
IORef ()
WaiterQueue (TMVar (Maybe (Entry a)))
lfin :: IORef ()
waiters :: WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: TVar Int
createVar :: TVar Int
takeVar :: TVar Int
highwaterVar :: TVar Int
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
waiters :: forall a. LocalPool a -> WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: forall a. LocalPool a -> TVar Int
createVar :: forall a. LocalPool a -> TVar Int
takeVar :: forall a. LocalPool a -> TVar Int
highwaterVar :: forall a. LocalPool a -> TVar Int
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} = do
[a]
resources <- STM [a] -> IO [a]
forall a. STM a -> IO a
atomically (STM [a] -> IO [a]) -> STM [a] -> IO [a]
forall a b. (a -> b) -> a -> b
$ do
[Entry a]
idle <- TVar [Entry a] -> [Entry a] -> STM [Entry a]
forall a. TVar a -> a -> STM a
swapTVar TVar [Entry a]
entries []
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
inUse (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract ([Entry a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Entry a]
idle))
[a] -> STM [a]
forall (m :: * -> *) a. Monad m => a -> m a
return ((Entry a -> a) -> [Entry a] -> [a]
forall a b. (a -> b) -> [a] -> [b]
map Entry a -> a
forall a. Entry a -> a
entry [Entry a]
idle)
[a] -> (a -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [a]
resources ((a -> IO ()) -> IO ()) -> (a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \a
resource ->
a -> IO ()
destroy a
resource IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` \(SomeException
_::SomeException) -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
withResource :: MonadUnliftIO m => Pool a -> (a -> m b) -> m b
{-# SPECIALIZE withResource :: Pool a -> (a -> IO b) -> IO b #-}
withResource :: Pool a -> (a -> m b) -> m b
withResource Pool a
pool a -> m b
act = ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO b) -> m b)
-> ((forall a. m a -> IO a) -> IO b) -> m b
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
runInIO -> ((forall a. IO a -> IO a) -> IO b) -> IO b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. IO a -> IO a) -> IO b) -> IO b)
-> ((forall a. IO a -> IO a) -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
(a
resource, LocalPool a
local) <- Pool a -> IO (a, LocalPool a)
forall a. Pool a -> IO (a, LocalPool a)
takeResource Pool a
pool
b
ret <- IO b -> IO b
forall a. IO a -> IO a
restore (m b -> IO b
forall a. m a -> IO a
runInIO (a -> m b
act a
resource)) IO b -> IO () -> IO b
forall a b. IO a -> IO b -> IO a
`onException`
Pool a -> LocalPool a -> a -> IO ()
forall a. Pool a -> LocalPool a -> a -> IO ()
destroyResource Pool a
pool LocalPool a
local a
resource
LocalPool a -> a -> IO ()
forall a. LocalPool a -> a -> IO ()
putResource LocalPool a
local a
resource
b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return b
ret
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE withResource #-}
#endif
takeResource :: Pool a -> IO (a, LocalPool a)
takeResource :: Pool a -> IO (a, LocalPool a)
takeResource pool :: Pool a
pool@Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} = do
local :: LocalPool a
local@LocalPool{TVar Int
TVar [Entry a]
IORef ()
WaiterQueue (TMVar (Maybe (Entry a)))
lfin :: IORef ()
waiters :: WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: TVar Int
createVar :: TVar Int
takeVar :: TVar Int
highwaterVar :: TVar Int
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
waiters :: forall a. LocalPool a -> WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: forall a. LocalPool a -> TVar Int
createVar :: forall a. LocalPool a -> TVar Int
takeVar :: forall a. LocalPool a -> TVar Int
highwaterVar :: forall a. LocalPool a -> TVar Int
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} <- Pool a -> IO (LocalPool a)
forall a. Pool a -> IO (LocalPool a)
getLocalPool Pool a
pool
a
resource <- IO (IO a) -> IO a
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO a) -> IO a)
-> (STM (IO a) -> IO (IO a)) -> STM (IO a) -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (IO a) -> IO (IO a)
forall a. STM a -> IO a
atomically (STM (IO a) -> IO a) -> STM (IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ do
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
takeVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
[Entry a]
ents <- TVar [Entry a] -> STM [Entry a]
forall a. TVar a -> STM a
readTVar TVar [Entry a]
entries
case [Entry a]
ents of
(Entry{a
UTCTime
lastUse :: UTCTime
entry :: a
lastUse :: forall a. Entry a -> UTCTime
entry :: forall a. Entry a -> a
..}:[Entry a]
es) -> TVar [Entry a] -> [Entry a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [Entry a]
entries [Entry a]
es STM () -> STM (IO a) -> STM (IO a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO a -> STM (IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
entry)
[] -> do
Int
used <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
inUse
case Int
used Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxResources of
Bool
False -> do
TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
inUse (Int -> STM ()) -> Int -> STM ()
forall a b. (a -> b) -> a -> b
$! Int
used Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
highwaterVar (Int -> Int -> Int
forall a. Ord a => a -> a -> a
`max` (Int
used Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1))
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
createVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
IO a -> STM (IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (IO a -> STM (IO a)) -> IO a -> STM (IO a)
forall a b. (a -> b) -> a -> b
$
IO a
create IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`onException` STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
createFailureVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) STM () -> STM () -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> LocalPool a -> STM ()
forall a. LocalPool a -> STM ()
destroyResourceSTM LocalPool a
local)
Bool
True -> do
TMVar (Maybe (Entry a))
var <- STM (TMVar (Maybe (Entry a)))
forall a. STM (TMVar a)
newEmptyTMVar
STM ()
removeSelf <- WaiterQueue (TMVar (Maybe (Entry a)))
-> TMVar (Maybe (Entry a)) -> STM (STM ())
forall a. WaiterQueue a -> a -> STM (STM ())
push WaiterQueue (TMVar (Maybe (Entry a)))
waiters TMVar (Maybe (Entry a))
var
let getResource :: Maybe (Entry a) -> IO a
getResource Maybe (Entry a)
x = case Maybe (Entry a)
x of
Just Entry a
y -> a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Entry a -> a
forall a. Entry a -> a
entry Entry a
y)
Maybe (Entry a)
Nothing -> IO a
create IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`onException` STM () -> IO ()
forall a. STM a -> IO a
atomically (LocalPool a -> STM ()
forall a. LocalPool a -> STM ()
destroyResourceSTM LocalPool a
local)
let dequeue :: IO ()
dequeue = do
Maybe (Maybe (Entry a))
maybeEntry <- STM (Maybe (Maybe (Entry a))) -> IO (Maybe (Maybe (Entry a)))
forall a. STM a -> IO a
atomically (STM (Maybe (Maybe (Entry a))) -> IO (Maybe (Maybe (Entry a))))
-> STM (Maybe (Maybe (Entry a))) -> IO (Maybe (Maybe (Entry a)))
forall a b. (a -> b) -> a -> b
$ do
STM ()
removeSelf
TMVar (Maybe (Entry a)) -> STM (Maybe (Maybe (Entry a)))
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar TMVar (Maybe (Entry a))
var
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ case Maybe (Maybe (Entry a))
maybeEntry of
Maybe (Maybe (Entry a))
Nothing -> () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just Maybe (Entry a)
Nothing -> LocalPool a -> STM ()
forall a. LocalPool a -> STM ()
destroyResourceSTM LocalPool a
local
Just (Just Entry a
v) -> LocalPool a -> Entry a -> STM ()
forall a. LocalPool a -> Entry a -> STM ()
putResourceSTM LocalPool a
local Entry a
v
IO a -> STM (IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Entry a) -> IO a
getResource (Maybe (Entry a) -> IO a) -> IO (Maybe (Entry a)) -> IO a
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STM (Maybe (Entry a)) -> IO (Maybe (Entry a))
forall a. STM a -> IO a
atomically (TMVar (Maybe (Entry a)) -> STM (Maybe (Entry a))
forall a. TMVar a -> STM a
takeTMVar TMVar (Maybe (Entry a))
var) IO (Maybe (Entry a)) -> IO () -> IO (Maybe (Entry a))
forall a b. IO a -> IO b -> IO a
`onException` IO ()
dequeue)
(a, LocalPool a) -> IO (a, LocalPool a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
resource, LocalPool a
local)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE takeResource #-}
#endif
tryWithResource :: forall m a b. MonadUnliftIO m => Pool a -> (a -> m b) -> m (Maybe b)
tryWithResource :: Pool a -> (a -> m b) -> m (Maybe b)
tryWithResource Pool a
pool a -> m b
act = ((forall a. m a -> IO a) -> IO (Maybe b)) -> m (Maybe b)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (Maybe b)) -> m (Maybe b))
-> ((forall a. m a -> IO a) -> IO (Maybe b)) -> m (Maybe b)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
runInIO -> ((forall a. IO a -> IO a) -> IO (Maybe b)) -> IO (Maybe b)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. IO a -> IO a) -> IO (Maybe b)) -> IO (Maybe b))
-> ((forall a. IO a -> IO a) -> IO (Maybe b)) -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
Maybe (a, LocalPool a)
res <- Pool a -> IO (Maybe (a, LocalPool a))
forall a. Pool a -> IO (Maybe (a, LocalPool a))
tryTakeResource Pool a
pool
case Maybe (a, LocalPool a)
res of
Just (a
resource, LocalPool a
local) -> do
Maybe b
ret <- IO (Maybe b) -> IO (Maybe b)
forall a. IO a -> IO a
restore (m (Maybe b) -> IO (Maybe b)
forall a. m a -> IO a
runInIO (b -> Maybe b
forall a. a -> Maybe a
Just (b -> Maybe b) -> m b -> m (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> m b
act a
resource)) IO (Maybe b) -> IO () -> IO (Maybe b)
forall a b. IO a -> IO b -> IO a
`onException`
Pool a -> LocalPool a -> a -> IO ()
forall a. Pool a -> LocalPool a -> a -> IO ()
destroyResource Pool a
pool LocalPool a
local a
resource
LocalPool a -> a -> IO ()
forall a. LocalPool a -> a -> IO ()
putResource LocalPool a
local a
resource
Maybe b -> IO (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
ret
Maybe (a, LocalPool a)
Nothing -> IO (Maybe b) -> IO (Maybe b)
forall a. IO a -> IO a
restore (IO (Maybe b) -> IO (Maybe b))
-> (m (Maybe b) -> IO (Maybe b)) -> m (Maybe b) -> IO (Maybe b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (Maybe b) -> IO (Maybe b)
forall a. m a -> IO a
runInIO (m (Maybe b) -> IO (Maybe b)) -> m (Maybe b) -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ Maybe b -> m (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe b
forall a. Maybe a
Nothing :: Maybe b)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE tryWithResource #-}
#endif
tryTakeResource :: Pool a -> IO (Maybe (a, LocalPool a))
tryTakeResource :: Pool a -> IO (Maybe (a, LocalPool a))
tryTakeResource pool :: Pool a
pool@Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} = do
local :: LocalPool a
local@LocalPool{TVar Int
TVar [Entry a]
IORef ()
WaiterQueue (TMVar (Maybe (Entry a)))
lfin :: IORef ()
waiters :: WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: TVar Int
createVar :: TVar Int
takeVar :: TVar Int
highwaterVar :: TVar Int
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
waiters :: forall a. LocalPool a -> WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: forall a. LocalPool a -> TVar Int
createVar :: forall a. LocalPool a -> TVar Int
takeVar :: forall a. LocalPool a -> TVar Int
highwaterVar :: forall a. LocalPool a -> TVar Int
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} <- Pool a -> IO (LocalPool a)
forall a. Pool a -> IO (LocalPool a)
getLocalPool Pool a
pool
Maybe a
resource <- IO (IO (Maybe a)) -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO (Maybe a)) -> IO (Maybe a))
-> (STM (IO (Maybe a)) -> IO (IO (Maybe a)))
-> STM (IO (Maybe a))
-> IO (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (IO (Maybe a)) -> IO (IO (Maybe a))
forall a. STM a -> IO a
atomically (STM (IO (Maybe a)) -> IO (Maybe a))
-> STM (IO (Maybe a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ do
[Entry a]
ents <- TVar [Entry a] -> STM [Entry a]
forall a. TVar a -> STM a
readTVar TVar [Entry a]
entries
case [Entry a]
ents of
(Entry{a
UTCTime
lastUse :: UTCTime
entry :: a
lastUse :: forall a. Entry a -> UTCTime
entry :: forall a. Entry a -> a
..}:[Entry a]
es) -> TVar [Entry a] -> [Entry a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [Entry a]
entries [Entry a]
es STM () -> STM (IO (Maybe a)) -> STM (IO (Maybe a))
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (Maybe a) -> STM (IO (Maybe a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)) -> (a -> Maybe a) -> a -> IO (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just (a -> IO (Maybe a)) -> a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ a
entry)
[] -> do
Int
used <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
inUse
if Int
used Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxResources
then IO (Maybe a) -> STM (IO (Maybe a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing)
else do
TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
inUse (Int -> STM ()) -> Int -> STM ()
forall a b. (a -> b) -> a -> b
$! Int
used Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
IO (Maybe a) -> STM (IO (Maybe a))
forall (m :: * -> *) a. Monad m => a -> m a
return (IO (Maybe a) -> STM (IO (Maybe a)))
-> IO (Maybe a) -> STM (IO (Maybe a))
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> IO a -> IO (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
IO a
create IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`onException` STM () -> IO ()
forall a. STM a -> IO a
atomically (LocalPool a -> STM ()
forall a. LocalPool a -> STM ()
destroyResourceSTM LocalPool a
local)
Maybe (a, LocalPool a) -> IO (Maybe (a, LocalPool a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (a, LocalPool a) -> IO (Maybe (a, LocalPool a)))
-> Maybe (a, LocalPool a) -> IO (Maybe (a, LocalPool a))
forall a b. (a -> b) -> a -> b
$ ((a -> LocalPool a -> (a, LocalPool a))
-> LocalPool a -> a -> (a, LocalPool a)
forall a b c. (a -> b -> c) -> b -> a -> c
flip (,) LocalPool a
local) (a -> (a, LocalPool a)) -> Maybe a -> Maybe (a, LocalPool a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe a
resource
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE tryTakeResource #-}
#endif
getLocalPool :: Pool a -> IO (LocalPool a)
getLocalPool :: Pool a -> IO (LocalPool a)
getLocalPool Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} = do
Int
i <- ((Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
numStripes) (Int -> Int) -> (ThreadId -> Int) -> ThreadId -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ThreadId -> Int
forall a. Hashable a => a -> Int
hash) (ThreadId -> Int) -> IO ThreadId -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO ThreadId
myThreadId
LocalPool a -> IO (LocalPool a)
forall (m :: * -> *) a. Monad m => a -> m a
return (LocalPool a -> IO (LocalPool a))
-> LocalPool a -> IO (LocalPool a)
forall a b. (a -> b) -> a -> b
$ Vector (LocalPool a)
localPools Vector (LocalPool a) -> Int -> LocalPool a
forall a. Vector a -> Int -> a
V.! Int
i
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE getLocalPool #-}
#endif
destroyResource :: Pool a -> LocalPool a -> a -> IO ()
destroyResource :: Pool a -> LocalPool a -> a -> IO ()
destroyResource Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} LocalPool a
local a
resource = do
a -> IO ()
destroy a
resource IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` \(SomeException
_::SomeException) -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
STM () -> IO ()
forall a. STM a -> IO a
atomically (LocalPool a -> STM ()
forall a. LocalPool a -> STM ()
destroyResourceSTM LocalPool a
local)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE destroyResource #-}
#endif
putResource :: LocalPool a -> a -> IO ()
putResource :: LocalPool a -> a -> IO ()
putResource LocalPool a
lp a
resource = do
UTCTime
now <- IO UTCTime
getCurrentTime
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ LocalPool a -> Entry a -> STM ()
forall a. LocalPool a -> Entry a -> STM ()
putResourceSTM LocalPool a
lp (a -> UTCTime -> Entry a
forall a. a -> UTCTime -> Entry a
Entry a
resource UTCTime
now)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE putResource #-}
#endif
putResourceSTM :: LocalPool a -> Entry a -> STM ()
putResourceSTM :: LocalPool a -> Entry a -> STM ()
putResourceSTM LocalPool{TVar Int
TVar [Entry a]
IORef ()
WaiterQueue (TMVar (Maybe (Entry a)))
lfin :: IORef ()
waiters :: WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: TVar Int
createVar :: TVar Int
takeVar :: TVar Int
highwaterVar :: TVar Int
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
waiters :: forall a. LocalPool a -> WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: forall a. LocalPool a -> TVar Int
createVar :: forall a. LocalPool a -> TVar Int
takeVar :: forall a. LocalPool a -> TVar Int
highwaterVar :: forall a. LocalPool a -> TVar Int
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} Entry a
resourceEntry = do
Maybe (TMVar (Maybe (Entry a)))
mWaiters <- WaiterQueue (TMVar (Maybe (Entry a)))
-> STM (Maybe (TMVar (Maybe (Entry a))))
forall a. WaiterQueue a -> STM (Maybe a)
pop WaiterQueue (TMVar (Maybe (Entry a)))
waiters
case Maybe (TMVar (Maybe (Entry a)))
mWaiters of
Maybe (TMVar (Maybe (Entry a)))
Nothing -> TVar [Entry a] -> ([Entry a] -> [Entry a]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar [Entry a]
entries (Entry a
resourceEntryEntry a -> [Entry a] -> [Entry a]
forall a. a -> [a] -> [a]
:)
Just TMVar (Maybe (Entry a))
w -> TMVar (Maybe (Entry a)) -> Maybe (Entry a) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe (Entry a))
w (Entry a -> Maybe (Entry a)
forall a. a -> Maybe a
Just Entry a
resourceEntry)
{-# INLINE putResourceSTM #-}
destroyResourceSTM :: LocalPool a -> STM ()
destroyResourceSTM :: LocalPool a -> STM ()
destroyResourceSTM LocalPool{TVar Int
TVar [Entry a]
IORef ()
WaiterQueue (TMVar (Maybe (Entry a)))
lfin :: IORef ()
waiters :: WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: TVar Int
createVar :: TVar Int
takeVar :: TVar Int
highwaterVar :: TVar Int
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
waiters :: forall a. LocalPool a -> WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: forall a. LocalPool a -> TVar Int
createVar :: forall a. LocalPool a -> TVar Int
takeVar :: forall a. LocalPool a -> TVar Int
highwaterVar :: forall a. LocalPool a -> TVar Int
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} = do
Maybe (TMVar (Maybe (Entry a)))
mwaiter <- WaiterQueue (TMVar (Maybe (Entry a)))
-> STM (Maybe (TMVar (Maybe (Entry a))))
forall a. WaiterQueue a -> STM (Maybe a)
pop WaiterQueue (TMVar (Maybe (Entry a)))
waiters
case Maybe (TMVar (Maybe (Entry a)))
mwaiter of
Maybe (TMVar (Maybe (Entry a)))
Nothing -> TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
inUse (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1)
Just TMVar (Maybe (Entry a))
w -> TMVar (Maybe (Entry a)) -> Maybe (Entry a) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe (Entry a))
w Maybe (Entry a)
forall a. Maybe a
Nothing
{-# INLINE destroyResourceSTM #-}
destroyAllResources :: Pool a -> IO ()
destroyAllResources :: Pool a -> IO ()
destroyAllResources Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} = Vector (LocalPool a) -> (LocalPool a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => Vector a -> (a -> m b) -> m ()
V.forM_ Vector (LocalPool a)
localPools ((LocalPool a -> IO ()) -> IO ())
-> (LocalPool a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ (a -> IO ()) -> LocalPool a -> IO ()
forall a. (a -> IO ()) -> LocalPool a -> IO ()
purgeLocalPool a -> IO ()
destroy
stats :: Pool a -> Bool -> IO Stats
stats :: Pool a -> Bool -> IO Stats
stats Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} Bool
reset = do
let stripeStats :: LocalPool a -> IO PoolStats
stripeStats LocalPool{TVar Int
TVar [Entry a]
IORef ()
WaiterQueue (TMVar (Maybe (Entry a)))
lfin :: IORef ()
waiters :: WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: TVar Int
createVar :: TVar Int
takeVar :: TVar Int
highwaterVar :: TVar Int
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
waiters :: forall a. LocalPool a -> WaiterQueue (TMVar (Maybe (Entry a)))
createFailureVar :: forall a. LocalPool a -> TVar Int
createVar :: forall a. LocalPool a -> TVar Int
takeVar :: forall a. LocalPool a -> TVar Int
highwaterVar :: forall a. LocalPool a -> TVar Int
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} = STM PoolStats -> IO PoolStats
forall a. STM a -> IO a
atomically (STM PoolStats -> IO PoolStats) -> STM PoolStats -> IO PoolStats
forall a b. (a -> b) -> a -> b
$ do
PoolStats
s <- (Int -> Int -> Int -> Int -> Int -> PoolStats)
-> STM Int
-> STM Int
-> STM Int
-> STM Int
-> STM Int
-> STM PoolStats
forall (m :: * -> *) a1 a2 a3 a4 a5 r.
Monad m =>
(a1 -> a2 -> a3 -> a4 -> a5 -> r)
-> m a1 -> m a2 -> m a3 -> m a4 -> m a5 -> m r
liftM5 Int -> Int -> Int -> Int -> Int -> PoolStats
PoolStats (TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
highwaterVar) (TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
inUse) (TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
takeVar) (TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
createVar) (TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
createFailureVar)
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
reset (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
(TVar Int -> STM ()) -> [TVar Int] -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\TVar Int
v -> TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
v Int
0) [TVar Int
takeVar, TVar Int
createVar, TVar Int
createFailureVar]
TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
highwaterVar (Int -> STM ()) -> Int -> STM ()
forall a b. (a -> b) -> a -> b
$! PoolStats -> Int
currentUsage PoolStats
s
PoolStats -> STM PoolStats
forall (m :: * -> *) a. Monad m => a -> m a
return PoolStats
s
Vector PoolStats
per <- (LocalPool a -> IO PoolStats)
-> Vector (LocalPool a) -> IO (Vector PoolStats)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Vector a -> m (Vector b)
V.mapM LocalPool a -> IO PoolStats
forall a. LocalPool a -> IO PoolStats
stripeStats Vector (LocalPool a)
localPools
let poolWide :: PoolStats
poolWide = (PoolStats -> PoolStats -> PoolStats)
-> PoolStats -> Vector PoolStats -> PoolStats
forall a b. (a -> b -> b) -> b -> Vector a -> b
V.foldr PoolStats -> PoolStats -> PoolStats
merge (Int -> Int -> Int -> Int -> Int -> PoolStats
PoolStats Int
0 Int
0 Int
0 Int
0 Int
0) Vector PoolStats
per
merge :: PoolStats -> PoolStats -> PoolStats
merge (PoolStats Int
hw1 Int
cu1 Int
t1 Int
c1 Int
f1) (PoolStats Int
hw2 Int
cu2 Int
t2 Int
c2 Int
f2) = Int -> Int -> Int -> Int -> Int -> PoolStats
PoolStats (Int
hw1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
hw2) (Int
cu1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
cu2) (Int
t1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
t2) (Int
c1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
c2) (Int
f1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
f2)
Stats -> IO Stats
forall (m :: * -> *) a. Monad m => a -> m a
return (Stats -> IO Stats) -> Stats -> IO Stats
forall a b. (a -> b) -> a -> b
$ Vector PoolStats -> PoolStats -> Stats
Stats Vector PoolStats
per PoolStats
poolWide
modifyTVar_ :: TVar a -> (a -> a) -> STM ()
modifyTVar_ :: TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar a
v a -> a
f = TVar a -> STM a
forall a. TVar a -> STM a
readTVar TVar a
v STM a -> (a -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
a -> TVar a -> a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar a
v (a -> STM ()) -> a -> STM ()
forall a b. (a -> b) -> a -> b
$! a -> a
f a
a
modError :: String -> String -> a
modError :: String -> String -> a
modError String
func String
msg =
String -> a
forall a. HasCallStack => String -> a
error (String -> a) -> String -> a
forall a b. (a -> b) -> a -> b
$ String
"Data.Pool." String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
func String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
": " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
msg