module Hasql.Pool
  ( -- * Pool
    Pool,
    acquire,
    use,
    release,

    -- * Errors
    UsageError (..),
  )
where

import qualified Data.Text.Encoding as Text
import qualified Data.Text.Encoding.Error as Text
import qualified Data.UUID.V4 as Uuid
import Hasql.Connection (Connection)
import qualified Hasql.Connection as Connection
import qualified Hasql.Pool.Config.Config as Config
import Hasql.Pool.Observation
import Hasql.Pool.Prelude
import qualified Hasql.Session as Session

-- | A connection tagged with metadata.
data Entry = Entry
  { Entry -> Connection
entryConnection :: Connection,
    Entry -> Word64
entryCreationTimeNSec :: Word64,
    Entry -> Word64
entryUseTimeNSec :: Word64,
    Entry -> UUID
entryId :: UUID
  }

entryIsAged :: Word64 -> Word64 -> Entry -> Bool
entryIsAged :: Word64 -> Word64 -> Entry -> Bool
entryIsAged Word64
maxLifetime Word64
now Entry {Word64
Connection
UUID
entryConnection :: Entry -> Connection
entryCreationTimeNSec :: Entry -> Word64
entryUseTimeNSec :: Entry -> Word64
entryId :: Entry -> UUID
entryConnection :: Connection
entryCreationTimeNSec :: Word64
entryUseTimeNSec :: Word64
entryId :: UUID
..} =
  Word64
now Word64 -> Word64 -> Bool
forall a. Ord a => a -> a -> Bool
> Word64
entryCreationTimeNSec Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
maxLifetime

entryIsIdle :: Word64 -> Word64 -> Entry -> Bool
entryIsIdle :: Word64 -> Word64 -> Entry -> Bool
entryIsIdle Word64
maxIdletime Word64
now Entry {Word64
Connection
UUID
entryConnection :: Entry -> Connection
entryCreationTimeNSec :: Entry -> Word64
entryUseTimeNSec :: Entry -> Word64
entryId :: Entry -> UUID
entryConnection :: Connection
entryCreationTimeNSec :: Word64
entryUseTimeNSec :: Word64
entryId :: UUID
..} =
  Word64
now Word64 -> Word64 -> Bool
forall a. Ord a => a -> a -> Bool
> Word64
entryUseTimeNSec Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
maxIdletime

-- | Pool of connections to DB.
data Pool = Pool
  { -- | Pool size.
    Pool -> Int
poolSize :: Int,
    -- | Connection settings.
    Pool -> IO Settings
poolFetchConnectionSettings :: IO Connection.Settings,
    -- | Acquisition timeout, in microseconds.
    Pool -> Int
poolAcquisitionTimeout :: Int,
    -- | Maximal connection lifetime, in nanoseconds.
    Pool -> Word64
poolMaxLifetime :: Word64,
    -- | Maximal connection idle time, in nanoseconds.
    Pool -> Word64
poolMaxIdletime :: Word64,
    -- | Avail connections.
    Pool -> TQueue Entry
poolConnectionQueue :: TQueue Entry,
    -- | Remaining capacity.
    -- The pool size limits the sum of poolCapacity, the length
    -- of poolConnectionQueue and the number of in-flight
    -- connections.
    Pool -> TVar Int
poolCapacity :: TVar Int,
    -- | Whether to return a connection to the pool.
    Pool -> TVar (TVar Bool)
poolReuseVar :: TVar (TVar Bool),
    -- | To stop the manager thread via garbage collection.
    Pool -> IORef ()
poolReaperRef :: IORef (),
    -- | Action for reporting the observations.
    Pool -> Observation -> IO ()
poolObserver :: Observation -> IO ()
  }

-- | Create a connection-pool.
--
-- No connections actually get established by this function. It is delegated
-- to 'use'.
acquire :: Config.Config -> IO Pool
acquire :: Config -> IO Pool
acquire Config
config = do
  TQueue Entry
connectionQueue <- IO (TQueue Entry)
forall a. IO (TQueue a)
newTQueueIO
  TVar Int
capVar <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO (Config -> Int
Config.size Config
config)
  TVar (TVar Bool)
reuseVar <- TVar Bool -> IO (TVar (TVar Bool))
forall a. a -> IO (TVar a)
newTVarIO (TVar Bool -> IO (TVar (TVar Bool)))
-> IO (TVar Bool) -> IO (TVar (TVar Bool))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
True
  IORef ()
reaperRef <- () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()

  ThreadId
managerTid <- ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> IO () -> IO ()
forall a. IO a -> IO a
unmask (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    Int -> IO ()
threadDelay Int
1000000
    Word64
now <- IO Word64
getMonotonicTimeNSec
    IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ())
-> (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO ()) -> STM (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      [Entry]
entries <- TQueue Entry -> STM [Entry]
forall a. TQueue a -> STM [a]
flushTQueue TQueue Entry
connectionQueue
      let ([Entry]
agedEntries, [Entry]
unagedEntries) = (Entry -> Bool) -> [Entry] -> ([Entry], [Entry])
forall a. (a -> Bool) -> [a] -> ([a], [a])
partition (Word64 -> Word64 -> Entry -> Bool
entryIsAged Word64
agingTimeoutNanos Word64
now) [Entry]
entries
          ([Entry]
idleEntries, [Entry]
liveEntries) = (Entry -> Bool) -> [Entry] -> ([Entry], [Entry])
forall a. (a -> Bool) -> [a] -> ([a], [a])
partition (Word64 -> Word64 -> Entry -> Bool
entryIsIdle Word64
agingTimeoutNanos Word64
now) [Entry]
unagedEntries
      (Entry -> STM ()) -> [Entry] -> STM ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (TQueue Entry -> Entry -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue Entry
connectionQueue) [Entry]
liveEntries
      IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ do
        [Entry] -> (Entry -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Entry]
agedEntries ((Entry -> IO ()) -> IO ()) -> (Entry -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Entry
entry -> do
          Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
capVar Int -> Int
forall a. Enum a => a -> a
succ
          (Config -> Observation -> IO ()
Config.observationHandler Config
config) (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus ConnectionTerminationReason
AgingConnectionTerminationReason))
        [Entry] -> (Entry -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Entry]
idleEntries ((Entry -> IO ()) -> IO ()) -> (Entry -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Entry
entry -> do
          Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
capVar Int -> Int
forall a. Enum a => a -> a
succ
          (Config -> Observation -> IO ()
Config.observationHandler Config
config) (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus ConnectionTerminationReason
IdlenessConnectionTerminationReason))

  IO (Weak (IORef ())) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Weak (IORef ())) -> IO ())
-> (IO () -> IO (Weak (IORef ()))) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
reaperRef (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    -- When the pool goes out of scope, stop the manager.
    ThreadId -> IO ()
killThread ThreadId
managerTid

  Pool -> IO Pool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Pool -> IO Pool) -> Pool -> IO Pool
forall a b. (a -> b) -> a -> b
$ Int
-> IO Settings
-> Int
-> Word64
-> Word64
-> TQueue Entry
-> TVar Int
-> TVar (TVar Bool)
-> IORef ()
-> (Observation -> IO ())
-> Pool
Pool (Config -> Int
Config.size Config
config) (Config -> IO Settings
Config.connectionSettingsProvider Config
config) Int
acqTimeoutMicros Word64
agingTimeoutNanos Word64
maxIdletimeNanos TQueue Entry
connectionQueue TVar Int
capVar TVar (TVar Bool)
reuseVar IORef ()
reaperRef (Config -> Observation -> IO ()
Config.observationHandler Config
config)
  where
    acqTimeoutMicros :: Int
acqTimeoutMicros =
      Int -> Int -> Int
forall a. Integral a => a -> a -> a
div (Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (DiffTime -> Integer
diffTimeToPicoseconds (Config -> DiffTime
Config.acquisitionTimeout Config
config))) Int
1_000_000
    agingTimeoutNanos :: Word64
agingTimeoutNanos =
      Word64 -> Word64 -> Word64
forall a. Integral a => a -> a -> a
div (Integer -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (DiffTime -> Integer
diffTimeToPicoseconds (Config -> DiffTime
Config.agingTimeout Config
config))) Word64
1_000
    maxIdletimeNanos :: Word64
maxIdletimeNanos =
      Word64 -> Word64 -> Word64
forall a. Integral a => a -> a -> a
div (Integer -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (DiffTime -> Integer
diffTimeToPicoseconds (Config -> DiffTime
Config.idlenessTimeout Config
config))) Word64
1_000

-- | Release all the idle connections in the pool, and mark the in-use connections
-- to be released after use. Any connections acquired after the call will be
-- freshly established.
--
-- The pool remains usable after this action.
-- So you can use this function to reset the connections in the pool.
-- Naturally, you can also use it to release the resources.
release :: Pool -> IO ()
release :: Pool -> IO ()
release Pool {Int
IO Settings
Word64
TVar Int
TVar (TVar Bool)
IORef ()
TQueue Entry
Observation -> IO ()
poolSize :: Pool -> Int
poolFetchConnectionSettings :: Pool -> IO Settings
poolAcquisitionTimeout :: Pool -> Int
poolMaxLifetime :: Pool -> Word64
poolMaxIdletime :: Pool -> Word64
poolConnectionQueue :: Pool -> TQueue Entry
poolCapacity :: Pool -> TVar Int
poolReuseVar :: Pool -> TVar (TVar Bool)
poolReaperRef :: Pool -> IORef ()
poolObserver :: Pool -> Observation -> IO ()
poolSize :: Int
poolFetchConnectionSettings :: IO Settings
poolAcquisitionTimeout :: Int
poolMaxLifetime :: Word64
poolMaxIdletime :: Word64
poolConnectionQueue :: TQueue Entry
poolCapacity :: TVar Int
poolReuseVar :: TVar (TVar Bool)
poolReaperRef :: IORef ()
poolObserver :: Observation -> IO ()
..} =
  IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ())
-> (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO ()) -> STM (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    TVar Bool
prevReuse <- TVar (TVar Bool) -> STM (TVar Bool)
forall a. TVar a -> STM a
readTVar TVar (TVar Bool)
poolReuseVar
    TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
prevReuse Bool
False
    TVar Bool
newReuse <- Bool -> STM (TVar Bool)
forall a. a -> STM (TVar a)
newTVar Bool
True
    TVar (TVar Bool) -> TVar Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (TVar Bool)
poolReuseVar TVar Bool
newReuse
    [Entry]
entries <- TQueue Entry -> STM [Entry]
forall a. TQueue a -> STM [a]
flushTQueue TQueue Entry
poolConnectionQueue
    IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ [Entry] -> (Entry -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Entry]
entries ((Entry -> IO ()) -> IO ()) -> (Entry -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Entry
entry -> do
      Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity Int -> Int
forall a. Enum a => a -> a
succ
      Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus ConnectionTerminationReason
ReleaseConnectionTerminationReason))

-- | Use a connection from the pool to run a session and return the connection
-- to the pool, when finished.
--
-- Session failing with a 'Session.ClientError' gets interpreted as a loss of
-- connection. In such case the connection does not get returned to the pool
-- and a slot gets freed up for a new connection to be established the next
-- time one is needed. The error still gets returned from this function.
--
-- __Warning:__ Due to the mechanism mentioned above you should avoid intercepting this error type from within sessions.
use :: Pool -> Session.Session a -> IO (Either UsageError a)
use :: forall a. Pool -> Session a -> IO (Either UsageError a)
use Pool {Int
IO Settings
Word64
TVar Int
TVar (TVar Bool)
IORef ()
TQueue Entry
Observation -> IO ()
poolSize :: Pool -> Int
poolFetchConnectionSettings :: Pool -> IO Settings
poolAcquisitionTimeout :: Pool -> Int
poolMaxLifetime :: Pool -> Word64
poolMaxIdletime :: Pool -> Word64
poolConnectionQueue :: Pool -> TQueue Entry
poolCapacity :: Pool -> TVar Int
poolReuseVar :: Pool -> TVar (TVar Bool)
poolReaperRef :: Pool -> IORef ()
poolObserver :: Pool -> Observation -> IO ()
poolSize :: Int
poolFetchConnectionSettings :: IO Settings
poolAcquisitionTimeout :: Int
poolMaxLifetime :: Word64
poolMaxIdletime :: Word64
poolConnectionQueue :: TQueue Entry
poolCapacity :: TVar Int
poolReuseVar :: TVar (TVar Bool)
poolReaperRef :: IORef ()
poolObserver :: Observation -> IO ()
..} Session a
sess = do
  STM Bool
timeout <- do
    TVar Bool
delay <- Int -> IO (TVar Bool)
registerDelay Int
poolAcquisitionTimeout
    STM Bool -> IO (STM Bool)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (STM Bool -> IO (STM Bool)) -> STM Bool -> IO (STM Bool)
forall a b. (a -> b) -> a -> b
$ TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
delay
  IO (IO (Either UsageError a)) -> IO (Either UsageError a)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO (Either UsageError a)) -> IO (Either UsageError a))
-> (STM (IO (Either UsageError a))
    -> IO (IO (Either UsageError a)))
-> STM (IO (Either UsageError a))
-> IO (Either UsageError a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. STM (IO (Either UsageError a)) -> IO (IO (Either UsageError a))
forall a. STM a -> IO a
atomically (STM (IO (Either UsageError a)) -> IO (Either UsageError a))
-> STM (IO (Either UsageError a)) -> IO (Either UsageError a)
forall a b. (a -> b) -> a -> b
$ do
    TVar Bool
reuseVar <- TVar (TVar Bool) -> STM (TVar Bool)
forall a. TVar a -> STM a
readTVar TVar (TVar Bool)
poolReuseVar
    [STM (IO (Either UsageError a))] -> STM (IO (Either UsageError a))
forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Alternative f) =>
t (f a) -> f a
asum
      [ TQueue Entry -> STM Entry
forall a. TQueue a -> STM a
readTQueue TQueue Entry
poolConnectionQueue STM Entry
-> (Entry -> IO (Either UsageError a))
-> STM (IO (Either UsageError a))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> TVar Bool -> Entry -> IO (Either UsageError a)
onConn TVar Bool
reuseVar,
        do
          Int
capVal <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
poolCapacity
          if Int
capVal Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
            then do
              TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
poolCapacity (Int -> STM ()) -> Int -> STM ()
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
pred Int
capVal
              IO (Either UsageError a) -> STM (IO (Either UsageError a))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO (Either UsageError a) -> STM (IO (Either UsageError a)))
-> IO (Either UsageError a) -> STM (IO (Either UsageError a))
forall a b. (a -> b) -> a -> b
$ TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar
            else STM (IO (Either UsageError a))
forall a. STM a
retry,
        do
          Bool
timedOut <- STM Bool
timeout
          if Bool
timedOut
            then IO (Either UsageError a) -> STM (IO (Either UsageError a))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO (Either UsageError a) -> STM (IO (Either UsageError a)))
-> (UsageError -> IO (Either UsageError a))
-> UsageError
-> STM (IO (Either UsageError a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. Either UsageError a -> IO (Either UsageError a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either UsageError a -> IO (Either UsageError a))
-> (UsageError -> Either UsageError a)
-> UsageError
-> IO (Either UsageError a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (UsageError -> STM (IO (Either UsageError a)))
-> UsageError -> STM (IO (Either UsageError a))
forall a b. (a -> b) -> a -> b
$ UsageError
AcquisitionTimeoutUsageError
            else STM (IO (Either UsageError a))
forall a. STM a
retry
      ]
  where
    onNewConn :: TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar = do
      Settings
settings <- IO Settings
poolFetchConnectionSettings
      Word64
now <- IO Word64
getMonotonicTimeNSec
      UUID
id <- IO UUID
Uuid.nextRandom
      Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation UUID
id ConnectionStatus
ConnectingConnectionStatus)
      Either ConnectionError Connection
connRes <- Settings -> IO (Either ConnectionError Connection)
Connection.acquire Settings
settings
      case Either ConnectionError Connection
connRes of
        Left ConnectionError
connErr -> do
          Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation UUID
id (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus (Maybe Text -> ConnectionTerminationReason
NetworkErrorConnectionTerminationReason ((Settings -> Text) -> ConnectionError -> Maybe Text
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (OnDecodeError -> Settings -> Text
Text.decodeUtf8With OnDecodeError
Text.lenientDecode) ConnectionError
connErr))))
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity Int -> Int
forall a. Enum a => a -> a
succ
          Either UsageError a -> IO (Either UsageError a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either UsageError a -> IO (Either UsageError a))
-> Either UsageError a -> IO (Either UsageError a)
forall a b. (a -> b) -> a -> b
$ UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (UsageError -> Either UsageError a)
-> UsageError -> Either UsageError a
forall a b. (a -> b) -> a -> b
$ ConnectionError -> UsageError
ConnectionUsageError ConnectionError
connErr
        Right Connection
entry -> do
          Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation UUID
id ConnectionStatus
ReadyForUseConnectionStatus)
          TVar Bool -> Entry -> IO (Either UsageError a)
onLiveConn TVar Bool
reuseVar (Connection -> Word64 -> Word64 -> UUID -> Entry
Entry Connection
entry Word64
now Word64
now UUID
id)

    onConn :: TVar Bool -> Entry -> IO (Either UsageError a)
onConn TVar Bool
reuseVar Entry
entry = do
      Word64
now <- IO Word64
getMonotonicTimeNSec
      if Word64 -> Word64 -> Entry -> Bool
entryIsAged Word64
poolMaxLifetime Word64
now Entry
entry
        then do
          Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
          Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus ConnectionTerminationReason
AgingConnectionTerminationReason))
          TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar
        else
          if Word64 -> Word64 -> Entry -> Bool
entryIsIdle Word64
poolMaxIdletime Word64
now Entry
entry
            then do
              Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
              Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus ConnectionTerminationReason
IdlenessConnectionTerminationReason))
              TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar
            else do
              TVar Bool -> Entry -> IO (Either UsageError a)
onLiveConn TVar Bool
reuseVar Entry
entry {entryUseTimeNSec = now}

    onLiveConn :: TVar Bool -> Entry -> IO (Either UsageError a)
onLiveConn TVar Bool
reuseVar Entry
entry = do
      Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) ConnectionStatus
InUseConnectionStatus)
      Either SomeException (Either QueryError a)
sessRes <- forall e a. Exception e => IO a -> IO (Either e a)
try @SomeException (Session a -> Connection -> IO (Either QueryError a)
forall a. Session a -> Connection -> IO (Either QueryError a)
Session.run Session a
sess (Entry -> Connection
entryConnection Entry
entry))

      case Either SomeException (Either QueryError a)
sessRes of
        Left SomeException
exc -> do
          IO ()
returnConn
          SomeException -> IO (Either UsageError a)
forall e a. Exception e => e -> IO a
throwIO SomeException
exc
        Right (Left QueryError
err) -> case QueryError
err of
          Session.QueryError Settings
_ [Text]
_ (Session.ClientError ConnectionError
details) -> do
            Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
            STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity Int -> Int
forall a. Enum a => a -> a
succ
            Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus (Maybe Text -> ConnectionTerminationReason
NetworkErrorConnectionTerminationReason ((Settings -> Text) -> ConnectionError -> Maybe Text
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (OnDecodeError -> Settings -> Text
Text.decodeUtf8With OnDecodeError
Text.lenientDecode) ConnectionError
details))))
            Either UsageError a -> IO (Either UsageError a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either UsageError a -> IO (Either UsageError a))
-> Either UsageError a -> IO (Either UsageError a)
forall a b. (a -> b) -> a -> b
$ UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (UsageError -> Either UsageError a)
-> UsageError -> Either UsageError a
forall a b. (a -> b) -> a -> b
$ QueryError -> UsageError
SessionUsageError QueryError
err
          QueryError
_ -> do
            IO ()
returnConn
            Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) ConnectionStatus
ReadyForUseConnectionStatus)
            Either UsageError a -> IO (Either UsageError a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either UsageError a -> IO (Either UsageError a))
-> Either UsageError a -> IO (Either UsageError a)
forall a b. (a -> b) -> a -> b
$ UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (UsageError -> Either UsageError a)
-> UsageError -> Either UsageError a
forall a b. (a -> b) -> a -> b
$ QueryError -> UsageError
SessionUsageError QueryError
err
        Right (Right a
res) -> do
          IO ()
returnConn
          Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) ConnectionStatus
ReadyForUseConnectionStatus)
          Either UsageError a -> IO (Either UsageError a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either UsageError a -> IO (Either UsageError a))
-> Either UsageError a -> IO (Either UsageError a)
forall a b. (a -> b) -> a -> b
$ a -> Either UsageError a
forall a b. b -> Either a b
Right a
res
      where
        returnConn :: IO ()
returnConn =
          IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ())
-> (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO ()) -> STM (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            Bool
reuse <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
reuseVar
            if Bool
reuse
              then TQueue Entry -> Entry -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue Entry
poolConnectionQueue Entry
entry STM () -> IO () -> STM (IO ())
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
              else IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ do
                Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
                STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity Int -> Int
forall a. Enum a => a -> a
succ
                Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus ConnectionTerminationReason
ReleaseConnectionTerminationReason))

-- | Union over all errors that 'use' can result in.
data UsageError
  = -- | Attempt to establish a connection failed.
    ConnectionUsageError Connection.ConnectionError
  | -- | Session execution failed.
    SessionUsageError Session.QueryError
  | -- | Timeout acquiring a connection.
    AcquisitionTimeoutUsageError
  deriving (Int -> UsageError -> ShowS
[UsageError] -> ShowS
UsageError -> String
(Int -> UsageError -> ShowS)
-> (UsageError -> String)
-> ([UsageError] -> ShowS)
-> Show UsageError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UsageError -> ShowS
showsPrec :: Int -> UsageError -> ShowS
$cshow :: UsageError -> String
show :: UsageError -> String
$cshowList :: [UsageError] -> ShowS
showList :: [UsageError] -> ShowS
Show, UsageError -> UsageError -> Bool
(UsageError -> UsageError -> Bool)
-> (UsageError -> UsageError -> Bool) -> Eq UsageError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: UsageError -> UsageError -> Bool
== :: UsageError -> UsageError -> Bool
$c/= :: UsageError -> UsageError -> Bool
/= :: UsageError -> UsageError -> Bool
Eq)

instance Exception UsageError