module Hasql.Pool
  ( -- * Pool
    Pool,
    acquire,
    acquireDynamically,
    use,
    release,

    -- * Errors
    UsageError (..),
  )
where

import Hasql.Connection (Connection)
import qualified Hasql.Connection as Connection
import Hasql.Pool.Prelude
import qualified Hasql.Session as Session

-- | A connection tagged with metadata.
data Entry = Entry
  { Entry -> Connection
entryConnection :: Connection,
    Entry -> Word64
entryCreationTimeNSec :: Word64,
    Entry -> Word64
entryUseTimeNSec :: Word64
  }

entryIsAlive :: Word64 -> Word64 -> Word64 -> Entry -> Bool
entryIsAlive :: Word64 -> Word64 -> Word64 -> Entry -> Bool
entryIsAlive Word64
maxLifetime Word64
maxIdletime Word64
now Entry {Word64
Connection
entryConnection :: Entry -> Connection
entryCreationTimeNSec :: Entry -> Word64
entryUseTimeNSec :: Entry -> Word64
entryConnection :: Connection
entryCreationTimeNSec :: Word64
entryUseTimeNSec :: Word64
..} =
  Word64
now
    Word64 -> Word64 -> Bool
forall a. Ord a => a -> a -> Bool
<= Word64
entryCreationTimeNSec
    Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
maxLifetime
    Bool -> Bool -> Bool
&& Word64
now
    Word64 -> Word64 -> Bool
forall a. Ord a => a -> a -> Bool
<= Word64
entryUseTimeNSec
    Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
maxIdletime

-- | Pool of connections to DB.
data Pool = Pool
  { -- | Pool size.
    Pool -> Int
poolSize :: Int,
    -- | Connection settings.
    Pool -> IO Settings
poolFetchConnectionSettings :: IO Connection.Settings,
    -- | Acquisition timeout, in microseconds.
    Pool -> Int
poolAcquisitionTimeout :: Int,
    -- | Maximal connection lifetime, in nanoseconds.
    Pool -> Word64
poolMaxLifetime :: Word64,
    -- | Maximal connection idle time, in nanoseconds.
    Pool -> Word64
poolMaxIdletime :: Word64,
    -- | Avail connections.
    Pool -> TQueue Entry
poolConnectionQueue :: TQueue Entry,
    -- | Remaining capacity.
    -- The pool size limits the sum of poolCapacity, the length
    -- of poolConnectionQueue and the number of in-flight
    -- connections.
    Pool -> TVar Int
poolCapacity :: TVar Int,
    -- | Whether to return a connection to the pool.
    Pool -> TVar (TVar Bool)
poolReuseVar :: TVar (TVar Bool),
    -- | To stop the manager thread via garbage collection.
    Pool -> IORef ()
poolReaperRef :: IORef ()
  }

-- | Create a connection-pool, with default settings.
--
-- No connections actually get established by this function. It is delegated
-- to 'use'.
acquire ::
  -- | Pool size.
  Int ->
  -- | Connection acquisition timeout.
  DiffTime ->
  -- | Maximal connection lifetime.
  DiffTime ->
  -- | Maximal connection idle time.
  DiffTime ->
  -- | Connection settings.
  Connection.Settings ->
  IO Pool
acquire :: Int -> DiffTime -> DiffTime -> DiffTime -> Settings -> IO Pool
acquire Int
poolSize DiffTime
acqTimeout DiffTime
maxLifetime DiffTime
maxIdletime Settings
connectionSettings =
  Int -> DiffTime -> DiffTime -> DiffTime -> IO Settings -> IO Pool
acquireDynamically Int
poolSize DiffTime
acqTimeout DiffTime
maxLifetime DiffTime
maxIdletime (Settings -> IO Settings
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Settings
connectionSettings)

-- | Create a connection-pool.
--
-- In difference to 'acquire' new connection settings get fetched each
-- time a connection is created. This may be useful for some security models.
--
-- No connections actually get established by this function. It is delegated
-- to 'use'.
acquireDynamically ::
  -- | Pool size.
  Int ->
  -- | Connection acquisition timeout.
  DiffTime ->
  -- | Maximal connection lifetime.
  DiffTime ->
  -- | Maximal connection idle time.
  DiffTime ->
  -- | Action fetching connection settings.
  IO Connection.Settings ->
  IO Pool
acquireDynamically :: Int -> DiffTime -> DiffTime -> DiffTime -> IO Settings -> IO Pool
acquireDynamically Int
poolSize DiffTime
acqTimeout DiffTime
maxLifetime DiffTime
maxIdletime IO Settings
fetchConnectionSettings = do
  TQueue Entry
connectionQueue <- IO (TQueue Entry)
forall a. IO (TQueue a)
newTQueueIO
  TVar Int
capVar <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
poolSize
  TVar (TVar Bool)
reuseVar <- TVar Bool -> IO (TVar (TVar Bool))
forall a. a -> IO (TVar a)
newTVarIO (TVar Bool -> IO (TVar (TVar Bool)))
-> IO (TVar Bool) -> IO (TVar (TVar Bool))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
True
  IORef ()
reaperRef <- () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()

  ThreadId
managerTid <- ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> IO () -> IO ()
forall a. IO a -> IO a
unmask (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    Int -> IO ()
threadDelay Int
1000000
    Word64
now <- IO Word64
getMonotonicTimeNSec
    IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ())
-> (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO ()) -> STM (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      [Entry]
entries <- TQueue Entry -> STM [Entry]
forall a. TQueue a -> STM [a]
flushTQueue TQueue Entry
connectionQueue
      let ([Entry]
keep, [Entry]
close) = (Entry -> Bool) -> [Entry] -> ([Entry], [Entry])
forall a. (a -> Bool) -> [a] -> ([a], [a])
partition (Word64 -> Word64 -> Word64 -> Entry -> Bool
entryIsAlive Word64
maxLifetimeNanos Word64
maxIdletimeNanos Word64
now) [Entry]
entries
      (Entry -> STM ()) -> [Entry] -> STM ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (TQueue Entry -> Entry -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue Entry
connectionQueue) [Entry]
keep
      IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ [Entry] -> (Entry -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Entry]
close ((Entry -> IO ()) -> IO ()) -> (Entry -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Entry
entry -> do
        Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
        STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
capVar Int -> Int
forall a. Enum a => a -> a
succ

  IO (Weak (IORef ())) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Weak (IORef ())) -> IO ())
-> (IO () -> IO (Weak (IORef ()))) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
reaperRef (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    -- When the pool goes out of scope, stop the manager.
    ThreadId -> IO ()
killThread ThreadId
managerTid

  Pool -> IO Pool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Pool -> IO Pool) -> Pool -> IO Pool
forall a b. (a -> b) -> a -> b
$ Int
-> IO Settings
-> Int
-> Word64
-> Word64
-> TQueue Entry
-> TVar Int
-> TVar (TVar Bool)
-> IORef ()
-> Pool
Pool Int
poolSize IO Settings
fetchConnectionSettings Int
acqTimeoutMicros Word64
maxLifetimeNanos Word64
maxIdletimeNanos TQueue Entry
connectionQueue TVar Int
capVar TVar (TVar Bool)
reuseVar IORef ()
reaperRef
  where
    acqTimeoutMicros :: Int
acqTimeoutMicros =
      Int -> Int -> Int
forall a. Integral a => a -> a -> a
div (Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (DiffTime -> Integer
diffTimeToPicoseconds DiffTime
acqTimeout)) Int
1_000_000
    maxLifetimeNanos :: Word64
maxLifetimeNanos =
      Word64 -> Word64 -> Word64
forall a. Integral a => a -> a -> a
div (Integer -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (DiffTime -> Integer
diffTimeToPicoseconds DiffTime
maxLifetime)) Word64
1_000
    maxIdletimeNanos :: Word64
maxIdletimeNanos =
      Word64 -> Word64 -> Word64
forall a. Integral a => a -> a -> a
div (Integer -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (DiffTime -> Integer
diffTimeToPicoseconds DiffTime
maxIdletime)) Word64
1_000

-- | Release all the idle connections in the pool, and mark the in-use connections
-- to be released after use. Any connections acquired after the call will be
-- freshly established.
--
-- The pool remains usable after this action.
-- So you can use this function to reset the connections in the pool.
-- Naturally, you can also use it to release the resources.
release :: Pool -> IO ()
release :: Pool -> IO ()
release Pool {Int
IO Settings
Word64
TVar Int
TVar (TVar Bool)
IORef ()
TQueue Entry
poolSize :: Pool -> Int
poolFetchConnectionSettings :: Pool -> IO Settings
poolAcquisitionTimeout :: Pool -> Int
poolMaxLifetime :: Pool -> Word64
poolMaxIdletime :: Pool -> Word64
poolConnectionQueue :: Pool -> TQueue Entry
poolCapacity :: Pool -> TVar Int
poolReuseVar :: Pool -> TVar (TVar Bool)
poolReaperRef :: Pool -> IORef ()
poolSize :: Int
poolFetchConnectionSettings :: IO Settings
poolAcquisitionTimeout :: Int
poolMaxLifetime :: Word64
poolMaxIdletime :: Word64
poolConnectionQueue :: TQueue Entry
poolCapacity :: TVar Int
poolReuseVar :: TVar (TVar Bool)
poolReaperRef :: IORef ()
..} =
  IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ())
-> (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO ()) -> STM (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    TVar Bool
prevReuse <- TVar (TVar Bool) -> STM (TVar Bool)
forall a. TVar a -> STM a
readTVar TVar (TVar Bool)
poolReuseVar
    TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
prevReuse Bool
False
    TVar Bool
newReuse <- Bool -> STM (TVar Bool)
forall a. a -> STM (TVar a)
newTVar Bool
True
    TVar (TVar Bool) -> TVar Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (TVar Bool)
poolReuseVar TVar Bool
newReuse
    [Entry]
entries <- TQueue Entry -> STM [Entry]
forall a. TQueue a -> STM [a]
flushTQueue TQueue Entry
poolConnectionQueue
    IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ [Entry] -> (Entry -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Entry]
entries ((Entry -> IO ()) -> IO ()) -> (Entry -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Entry
entry -> do
      Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity Int -> Int
forall a. Enum a => a -> a
succ

-- | Use a connection from the pool to run a session and return the connection
-- to the pool, when finished.
--
-- Session failing with a 'Session.ClientError' gets interpreted as a loss of
-- connection. In such case the connection does not get returned to the pool
-- and a slot gets freed up for a new connection to be established the next
-- time one is needed. The error still gets returned from this function.
--
-- __Warning:__ Due to the mechanism mentioned above you should avoid consuming
-- errors within sessions.
use :: Pool -> Session.Session a -> IO (Either UsageError a)
use :: forall a. Pool -> Session a -> IO (Either UsageError a)
use Pool {Int
IO Settings
Word64
TVar Int
TVar (TVar Bool)
IORef ()
TQueue Entry
poolSize :: Pool -> Int
poolFetchConnectionSettings :: Pool -> IO Settings
poolAcquisitionTimeout :: Pool -> Int
poolMaxLifetime :: Pool -> Word64
poolMaxIdletime :: Pool -> Word64
poolConnectionQueue :: Pool -> TQueue Entry
poolCapacity :: Pool -> TVar Int
poolReuseVar :: Pool -> TVar (TVar Bool)
poolReaperRef :: Pool -> IORef ()
poolSize :: Int
poolFetchConnectionSettings :: IO Settings
poolAcquisitionTimeout :: Int
poolMaxLifetime :: Word64
poolMaxIdletime :: Word64
poolConnectionQueue :: TQueue Entry
poolCapacity :: TVar Int
poolReuseVar :: TVar (TVar Bool)
poolReaperRef :: IORef ()
..} Session a
sess = do
  STM Bool
timeout <- do
    TVar Bool
delay <- Int -> IO (TVar Bool)
registerDelay Int
poolAcquisitionTimeout
    STM Bool -> IO (STM Bool)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (STM Bool -> IO (STM Bool)) -> STM Bool -> IO (STM Bool)
forall a b. (a -> b) -> a -> b
$ TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
delay
  IO (IO (Either UsageError a)) -> IO (Either UsageError a)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO (Either UsageError a)) -> IO (Either UsageError a))
-> (STM (IO (Either UsageError a))
    -> IO (IO (Either UsageError a)))
-> STM (IO (Either UsageError a))
-> IO (Either UsageError a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. STM (IO (Either UsageError a)) -> IO (IO (Either UsageError a))
forall a. STM a -> IO a
atomically (STM (IO (Either UsageError a)) -> IO (Either UsageError a))
-> STM (IO (Either UsageError a)) -> IO (Either UsageError a)
forall a b. (a -> b) -> a -> b
$ do
    TVar Bool
reuseVar <- TVar (TVar Bool) -> STM (TVar Bool)
forall a. TVar a -> STM a
readTVar TVar (TVar Bool)
poolReuseVar
    [STM (IO (Either UsageError a))] -> STM (IO (Either UsageError a))
forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Alternative f) =>
t (f a) -> f a
asum
      [ TQueue Entry -> STM Entry
forall a. TQueue a -> STM a
readTQueue TQueue Entry
poolConnectionQueue STM Entry
-> (Entry -> IO (Either UsageError a))
-> STM (IO (Either UsageError a))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> TVar Bool -> Entry -> IO (Either UsageError a)
onConn TVar Bool
reuseVar,
        do
          Int
capVal <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
poolCapacity
          if Int
capVal Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
            then do
              TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
poolCapacity (Int -> STM ()) -> Int -> STM ()
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
pred Int
capVal
              IO (Either UsageError a) -> STM (IO (Either UsageError a))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO (Either UsageError a) -> STM (IO (Either UsageError a)))
-> IO (Either UsageError a) -> STM (IO (Either UsageError a))
forall a b. (a -> b) -> a -> b
$ TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar
            else STM (IO (Either UsageError a))
forall a. STM a
retry,
        do
          Bool
timedOut <- STM Bool
timeout
          if Bool
timedOut
            then IO (Either UsageError a) -> STM (IO (Either UsageError a))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO (Either UsageError a) -> STM (IO (Either UsageError a)))
-> (UsageError -> IO (Either UsageError a))
-> UsageError
-> STM (IO (Either UsageError a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. Either UsageError a -> IO (Either UsageError a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either UsageError a -> IO (Either UsageError a))
-> (UsageError -> Either UsageError a)
-> UsageError
-> IO (Either UsageError a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (UsageError -> STM (IO (Either UsageError a)))
-> UsageError -> STM (IO (Either UsageError a))
forall a b. (a -> b) -> a -> b
$ UsageError
AcquisitionTimeoutUsageError
            else STM (IO (Either UsageError a))
forall a. STM a
retry
      ]
  where
    onNewConn :: TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar = do
      Settings
settings <- IO Settings
poolFetchConnectionSettings
      Word64
now <- IO Word64
getMonotonicTimeNSec
      Either ConnectionError Connection
connRes <- Settings -> IO (Either ConnectionError Connection)
Connection.acquire Settings
settings
      case Either ConnectionError Connection
connRes of
        Left ConnectionError
connErr -> do
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity Int -> Int
forall a. Enum a => a -> a
succ
          Either UsageError a -> IO (Either UsageError a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either UsageError a -> IO (Either UsageError a))
-> Either UsageError a -> IO (Either UsageError a)
forall a b. (a -> b) -> a -> b
$ UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (UsageError -> Either UsageError a)
-> UsageError -> Either UsageError a
forall a b. (a -> b) -> a -> b
$ ConnectionError -> UsageError
ConnectionUsageError ConnectionError
connErr
        Right Connection
entry -> TVar Bool -> Entry -> IO (Either UsageError a)
onLiveConn TVar Bool
reuseVar (Connection -> Word64 -> Word64 -> Entry
Entry Connection
entry Word64
now Word64
now)

    onConn :: TVar Bool -> Entry -> IO (Either UsageError a)
onConn TVar Bool
reuseVar Entry
entry = do
      Word64
now <- IO Word64
getMonotonicTimeNSec
      if Word64 -> Word64 -> Word64 -> Entry -> Bool
entryIsAlive Word64
poolMaxLifetime Word64
poolMaxIdletime Word64
now Entry
entry
        then TVar Bool -> Entry -> IO (Either UsageError a)
onLiveConn TVar Bool
reuseVar Entry
entry {entryUseTimeNSec = now}
        else do
          Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
          TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar

    onLiveConn :: TVar Bool -> Entry -> IO (Either UsageError a)
onLiveConn TVar Bool
reuseVar Entry
entry = do
      Either SomeException (Either QueryError a)
sessRes <- forall e a. Exception e => IO a -> IO (Either e a)
try @SomeException (Session a -> Connection -> IO (Either QueryError a)
forall a. Session a -> Connection -> IO (Either QueryError a)
Session.run Session a
sess (Entry -> Connection
entryConnection Entry
entry))

      case Either SomeException (Either QueryError a)
sessRes of
        Left SomeException
exc -> do
          IO ()
returnConn
          SomeException -> IO (Either UsageError a)
forall e a. Exception e => e -> IO a
throwIO SomeException
exc
        Right (Left QueryError
err) -> case QueryError
err of
          Session.QueryError Settings
_ [Text]
_ (Session.ClientError ConnectionError
_) -> do
            STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity Int -> Int
forall a. Enum a => a -> a
succ
            Either UsageError a -> IO (Either UsageError a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either UsageError a -> IO (Either UsageError a))
-> Either UsageError a -> IO (Either UsageError a)
forall a b. (a -> b) -> a -> b
$ UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (UsageError -> Either UsageError a)
-> UsageError -> Either UsageError a
forall a b. (a -> b) -> a -> b
$ QueryError -> UsageError
SessionUsageError QueryError
err
          QueryError
_ -> do
            IO ()
returnConn
            Either UsageError a -> IO (Either UsageError a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either UsageError a -> IO (Either UsageError a))
-> Either UsageError a -> IO (Either UsageError a)
forall a b. (a -> b) -> a -> b
$ UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (UsageError -> Either UsageError a)
-> UsageError -> Either UsageError a
forall a b. (a -> b) -> a -> b
$ QueryError -> UsageError
SessionUsageError QueryError
err
        Right (Right a
res) -> do
          IO ()
returnConn
          Either UsageError a -> IO (Either UsageError a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either UsageError a -> IO (Either UsageError a))
-> Either UsageError a -> IO (Either UsageError a)
forall a b. (a -> b) -> a -> b
$ a -> Either UsageError a
forall a b. b -> Either a b
Right a
res
      where
        returnConn :: IO ()
returnConn =
          IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ())
-> (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO ()) -> STM (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            Bool
reuse <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
reuseVar
            if Bool
reuse
              then TQueue Entry -> Entry -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue Entry
poolConnectionQueue Entry
entry STM () -> IO () -> STM (IO ())
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
              else IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ do
                Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
                STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity Int -> Int
forall a. Enum a => a -> a
succ

-- | Union over all errors that 'use' can result in.
data UsageError
  = -- | Attempt to establish a connection failed.
    ConnectionUsageError Connection.ConnectionError
  | -- | Session execution failed.
    SessionUsageError Session.QueryError
  | -- | Timeout acquiring a connection.
    AcquisitionTimeoutUsageError
  deriving (Int -> UsageError -> ShowS
[UsageError] -> ShowS
UsageError -> String
(Int -> UsageError -> ShowS)
-> (UsageError -> String)
-> ([UsageError] -> ShowS)
-> Show UsageError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UsageError -> ShowS
showsPrec :: Int -> UsageError -> ShowS
$cshow :: UsageError -> String
show :: UsageError -> String
$cshowList :: [UsageError] -> ShowS
showList :: [UsageError] -> ShowS
Show, UsageError -> UsageError -> Bool
(UsageError -> UsageError -> Bool)
-> (UsageError -> UsageError -> Bool) -> Eq UsageError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: UsageError -> UsageError -> Bool
== :: UsageError -> UsageError -> Bool
$c/= :: UsageError -> UsageError -> Bool
/= :: UsageError -> UsageError -> Bool
Eq)

instance Exception UsageError