module Hasql.Pool
  ( -- * Pool
    Pool,
    acquire,
    acquireDynamically,
    release,
    use,

    -- * Errors
    UsageError (..),
  )
where

import Hasql.Connection (Connection)
import qualified Hasql.Connection as Connection
import Hasql.Pool.Prelude
import Hasql.Session (Session)
import qualified Hasql.Session as Session

-- | Pool of connections to DB.
data Pool = Pool
  { -- | Connection settings.
    Pool -> IO Settings
poolFetchConnectionSettings :: IO Connection.Settings,
    -- | Acquisition timeout, in microseconds.
    Pool -> Maybe Int
poolAcquisitionTimeout :: Maybe Int,
    -- | Avail connections.
    Pool -> TQueue Connection
poolConnectionQueue :: TQueue Connection,
    -- | Remaining capacity.
    -- The pool size limits the sum of poolCapacity, the length
    -- of poolConnectionQueue and the number of in-flight
    -- connections.
    Pool -> TVar Int
poolCapacity :: TVar Int,
    -- | Whether to return a connection to the pool.
    Pool -> TVar (TVar Bool)
poolReuse :: TVar (TVar Bool)
  }

-- | Create a connection-pool.
--
-- No connections actually get established by this function. It is delegated
-- to 'use'.
acquire ::
  -- | Pool size.
  Int ->
  -- | Connection acquisition timeout.
  Maybe Int ->
  -- | Connection settings.
  Connection.Settings ->
  IO Pool
acquire :: Int -> Maybe Int -> Settings -> IO Pool
acquire Int
poolSize Maybe Int
timeout Settings
connectionSettings =
  Int -> Maybe Int -> IO Settings -> IO Pool
acquireDynamically Int
poolSize Maybe Int
timeout (forall (f :: * -> *) a. Applicative f => a -> f a
pure Settings
connectionSettings)

-- | Create a connection-pool.
--
-- In difference to 'acquire' new settings get fetched each time a connection
-- is created. This may be useful for some security models.
--
-- No connections actually get established by this function. It is delegated
-- to 'use'.
acquireDynamically ::
  -- | Pool size.
  Int ->
  -- | Connection acquisition timeout.
  Maybe Int ->
  -- | Action fetching connection settings.
  IO Connection.Settings ->
  IO Pool
acquireDynamically :: Int -> Maybe Int -> IO Settings -> IO Pool
acquireDynamically Int
poolSize Maybe Int
timeout IO Settings
fetchConnectionSettings = do
  IO Settings
-> Maybe Int
-> TQueue Connection
-> TVar Int
-> TVar (TVar Bool)
-> Pool
Pool IO Settings
fetchConnectionSettings Maybe Int
timeout
    forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IO (TQueue a)
newTQueueIO
    forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. a -> IO (TVar a)
newTVarIO Int
poolSize
    forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (forall a. a -> IO (TVar a)
newTVarIO forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. a -> IO (TVar a)
newTVarIO Bool
True)

-- | Release all the idle connections in the pool, and mark the in-use connections
-- to be released on return. Any connections acquired after the call will be
-- newly established.
release :: Pool -> IO ()
release :: Pool -> IO ()
release Pool {Maybe Int
IO Settings
TVar Int
TVar (TVar Bool)
TQueue Connection
poolReuse :: TVar (TVar Bool)
poolCapacity :: TVar Int
poolConnectionQueue :: TQueue Connection
poolAcquisitionTimeout :: Maybe Int
poolFetchConnectionSettings :: IO Settings
poolReuse :: Pool -> TVar (TVar Bool)
poolCapacity :: Pool -> TVar Int
poolConnectionQueue :: Pool -> TQueue Connection
poolAcquisitionTimeout :: Pool -> Maybe Int
poolFetchConnectionSettings :: Pool -> IO Settings
..} =
  forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    TVar Bool
prevReuse <- forall a. TVar a -> STM a
readTVar TVar (TVar Bool)
poolReuse
    forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
prevReuse Bool
False
    TVar Bool
newReuse <- forall a. a -> STM (TVar a)
newTVar Bool
True
    forall a. TVar a -> a -> STM ()
writeTVar TVar (TVar Bool)
poolReuse TVar Bool
newReuse
    [Connection]
conns <- forall a. TQueue a -> STM [a]
flushTQueue TQueue Connection
poolConnectionQueue
    forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity (forall a. Num a => a -> a -> a
+ (forall (t :: * -> *) a. Foldable t => t a -> Int
length [Connection]
conns))
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Connection]
conns Connection -> IO ()
Connection.release

-- | Use a connection from the pool to run a session and return the connection
-- to the pool, when finished.
--
-- Session failing with a 'Session.ClientError' gets interpreted as a loss of
-- connection. In such case the connection does not get returned to the pool
-- and a slot gets freed up for a new connection to be established the next
-- time one is needed. The error still gets returned from this function.
use :: Pool -> Session.Session a -> IO (Either UsageError a)
use :: forall a. Pool -> Session a -> IO (Either UsageError a)
use Pool {Maybe Int
IO Settings
TVar Int
TVar (TVar Bool)
TQueue Connection
poolReuse :: TVar (TVar Bool)
poolCapacity :: TVar Int
poolConnectionQueue :: TQueue Connection
poolAcquisitionTimeout :: Maybe Int
poolFetchConnectionSettings :: IO Settings
poolReuse :: Pool -> TVar (TVar Bool)
poolCapacity :: Pool -> TVar Int
poolConnectionQueue :: Pool -> TQueue Connection
poolAcquisitionTimeout :: Pool -> Maybe Int
poolFetchConnectionSettings :: Pool -> IO Settings
..} Session a
sess = do
  STM Bool
timeout <- case Maybe Int
poolAcquisitionTimeout of
    Just Int
delta -> do
      TVar Bool
delay <- Int -> IO (TVar Bool)
registerDelay Int
delta
      forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar TVar Bool
delay
    Maybe Int
Nothing ->
      forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
  forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    TVar Bool
reuseVar <- forall a. TVar a -> STM a
readTVar TVar (TVar Bool)
poolReuse
    forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Alternative f) =>
t (f a) -> f a
asum
      [ forall a. TQueue a -> STM a
readTQueue TQueue Connection
poolConnectionQueue forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> TVar Bool -> Connection -> IO (Either UsageError a)
onConn TVar Bool
reuseVar,
        do
          Int
capVal <- forall a. TVar a -> STM a
readTVar TVar Int
poolCapacity
          if Int
capVal forall a. Ord a => a -> a -> Bool
> Int
0
            then do
              forall a. TVar a -> a -> STM ()
writeTVar TVar Int
poolCapacity forall a b. (a -> b) -> a -> b
$! forall a. Enum a => a -> a
pred Int
capVal
              forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar
            else forall a. STM a
retry,
        do
          Bool
timedOut <- STM Bool
timeout
          if Bool
timedOut
            then forall (m :: * -> *) a. Monad m => a -> m a
return forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall (m :: * -> *) a. Monad m => a -> m a
return forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ UsageError
AcquisitionTimeoutUsageError
            else forall a. STM a
retry
      ]
  where
    onNewConn :: TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar = do
      Settings
settings <- IO Settings
poolFetchConnectionSettings
      Either ConnectionError Connection
connRes <- Settings -> IO (Either ConnectionError Connection)
Connection.acquire Settings
settings
      case Either ConnectionError Connection
connRes of
        Left ConnectionError
connErr -> do
          forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity forall a. Enum a => a -> a
succ
          forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ ConnectionError -> UsageError
ConnectionUsageError ConnectionError
connErr
        Right Connection
conn -> TVar Bool -> Connection -> IO (Either UsageError a)
onConn TVar Bool
reuseVar Connection
conn
    onConn :: TVar Bool -> Connection -> IO (Either UsageError a)
onConn TVar Bool
reuseVar Connection
conn = do
      Either QueryError a
sessRes <- forall a. Session a -> Connection -> IO (Either QueryError a)
Session.run Session a
sess Connection
conn
      case Either QueryError a
sessRes of
        Left QueryError
err -> case QueryError
err of
          Session.QueryError Settings
_ [Text]
_ (Session.ClientError ConnectionError
_) -> do
            forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity forall a. Enum a => a -> a
succ
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ QueryError -> UsageError
SessionUsageError QueryError
err
          QueryError
_ -> do
            IO ()
returnConn
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ QueryError -> UsageError
SessionUsageError QueryError
err
        Right a
res -> do
          IO ()
returnConn
          forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right a
res
      where
        returnConn :: IO ()
returnConn =
          forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
            Bool
reuse <- forall a. TVar a -> STM a
readTVar TVar Bool
reuseVar
            if Bool
reuse
              then forall a. TQueue a -> a -> STM ()
writeTQueue TQueue Connection
poolConnectionQueue Connection
conn forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> forall (m :: * -> *) a. Monad m => a -> m a
return ()
              else do
                forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity forall a. Enum a => a -> a
succ
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Connection -> IO ()
Connection.release Connection
conn

-- | Union over all errors that 'use' can result in.
data UsageError
  = -- | Attempt to establish a connection failed.
    ConnectionUsageError Connection.ConnectionError
  | -- | Session execution failed.
    SessionUsageError Session.QueryError
  | -- | Timeout acquiring a connection.
    AcquisitionTimeoutUsageError
  deriving (Int -> UsageError -> ShowS
[UsageError] -> ShowS
UsageError -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UsageError] -> ShowS
$cshowList :: [UsageError] -> ShowS
show :: UsageError -> String
$cshow :: UsageError -> String
showsPrec :: Int -> UsageError -> ShowS
$cshowsPrec :: Int -> UsageError -> ShowS
Show, UsageError -> UsageError -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UsageError -> UsageError -> Bool
$c/= :: UsageError -> UsageError -> Bool
== :: UsageError -> UsageError -> Bool
$c== :: UsageError -> UsageError -> Bool
Eq)

instance Exception UsageError