module Hasql.Pool
( Pool,
Settings (..),
acquire,
release,
UsageError (..),
use,
)
where
import Hasql.Connection (Connection)
import qualified Hasql.Connection as Connection
import Hasql.Pool.Prelude
import Hasql.Session (Session)
import qualified Hasql.Session as Session
data Pool
= Pool
Connection.Settings
(TQueue ActiveConnection)
(TVar Int)
(TVar Bool)
data ActiveConnection = ActiveConnection
{ ActiveConnection -> Int
activeConnectionLastUseTimestamp :: Int,
ActiveConnection -> Connection
activeConnectionConnection :: Connection
}
loopCollectingGarbage :: Int -> TQueue ActiveConnection -> TVar Int -> TVar Bool -> IO ()
loopCollectingGarbage :: Int -> TQueue ActiveConnection -> TVar Int -> TVar Bool -> IO ()
loopCollectingGarbage Int
timeout TQueue ActiveConnection
establishedQueue TVar Int
slotsAvailVar TVar Bool
aliveVar =
IO ()
decide
where
decide :: IO ()
decide =
do
Int
ts <- IO Int
getMillisecondsSinceEpoch
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall a b. (a -> b) -> a -> b
$
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
Bool
alive <- forall a. TVar a -> STM a
readTVar TVar Bool
aliveVar
if Bool
alive
then
let tryToRelease :: STM (IO ())
tryToRelease =
forall a. TQueue a -> STM (Maybe a)
tryReadTQueue TQueue ActiveConnection
establishedQueue forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe ActiveConnection
Nothing ->
forall a. STM a
retry
Just entry :: ActiveConnection
entry@(ActiveConnection Int
lastUseTs Connection
connection) ->
let outdatingTs :: Int
outdatingTs =
Int
lastUseTs forall a. Num a => a -> a -> a
+ Int
timeout
in
if Int
outdatingTs forall a. Ord a => a -> a -> Bool
< Int
ts
then
do
Int
slotsAvail <- forall a. TVar a -> STM a
readTVar TVar Int
slotsAvailVar
Int -> [Connection] -> Int -> STM (IO ())
collectAndRelease Int
slotsAvail [Connection
connection] Int
outdatingTs
else
do
forall a. TQueue a -> a -> STM ()
unGetTQueue TQueue ActiveConnection
establishedQueue ActiveConnection
entry
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO ()
sleep Int
outdatingTs forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> IO ()
decide)
collectAndRelease :: Int -> [Connection] -> Int -> STM (IO ())
collectAndRelease !Int
slotsAvail ![Connection]
outdatedList Int
outdatingTs =
forall a. TQueue a -> STM (Maybe a)
tryReadTQueue TQueue ActiveConnection
establishedQueue forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe ActiveConnection
Nothing ->
Int -> [Connection] -> Int -> STM (IO ())
finalizeAndRelease Int
slotsAvail [Connection]
outdatedList Int
outdatingTs
Just entry :: ActiveConnection
entry@(ActiveConnection Int
lastUseTs Connection
connection) ->
let outdatingTs :: Int
outdatingTs =
Int
lastUseTs forall a. Num a => a -> a -> a
+ Int
timeout
in if Int
outdatingTs forall a. Ord a => a -> a -> Bool
< Int
ts
then do
forall a. TQueue a -> a -> STM ()
unGetTQueue TQueue ActiveConnection
establishedQueue ActiveConnection
entry
Int -> [Connection] -> Int -> STM (IO ())
finalizeAndRelease Int
slotsAvail [Connection]
outdatedList Int
outdatingTs
else Int -> [Connection] -> Int -> STM (IO ())
collectAndRelease (forall a. Enum a => a -> a
succ Int
slotsAvail) (Connection
connection forall a. a -> [a] -> [a]
: [Connection]
outdatedList) Int
outdatingTs
finalizeAndRelease :: Int -> [Connection] -> Int -> STM (IO ())
finalizeAndRelease Int
slotsAvail [Connection]
outdatedList Int
outdatingTs =
do
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
slotsAvailVar Int
slotsAvail
forall (m :: * -> *) a. Monad m => a -> m a
return (forall {t :: * -> *}. Foldable t => t Connection -> IO ()
release [Connection]
outdatedList forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Int -> IO ()
sleep Int
outdatingTs forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> IO ()
decide)
in STM (IO ())
tryToRelease
else do
[ActiveConnection]
list <- forall a. TQueue a -> STM [a]
flushTQueue TQueue ActiveConnection
establishedQueue
forall (m :: * -> *) a. Monad m => a -> m a
return (forall {t :: * -> *}. Foldable t => t Connection -> IO ()
release (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ActiveConnection -> Connection
activeConnectionConnection [ActiveConnection]
list))
sleep :: Int -> IO ()
sleep Int
untilTs =
do
Int
ts <- IO Int
getMillisecondsSinceEpoch
let diff :: Int
diff =
Int
untilTs forall a. Num a => a -> a -> a
- Int
ts
in if Int
diff forall a. Ord a => a -> a -> Bool
> Int
0
then Int -> IO ()
threadDelay (Int
diff forall a. Num a => a -> a -> a
* Int
1000)
else forall (m :: * -> *) a. Monad m => a -> m a
return ()
release :: t Connection -> IO ()
release =
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Connection -> IO ()
Connection.release
type Settings =
(Int, Int, Connection.Settings)
acquire :: Settings -> IO Pool
acquire :: Settings -> IO Pool
acquire (Int
size, Int
timeout, Settings
connectionSettings) =
do
TQueue ActiveConnection
establishedQueue <- forall a. IO (TQueue a)
newTQueueIO
TVar Int
slotsAvailVar <- forall a. a -> IO (TVar a)
newTVarIO Int
size
TVar Bool
aliveVar <- forall a. a -> IO (TVar a)
newTVarIO (Int
size forall a. Ord a => a -> a -> Bool
> Int
0)
IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ Int -> TQueue ActiveConnection -> TVar Int -> TVar Bool -> IO ()
loopCollectingGarbage Int
timeout TQueue ActiveConnection
establishedQueue TVar Int
slotsAvailVar TVar Bool
aliveVar
forall (m :: * -> *) a. Monad m => a -> m a
return (Settings
-> TQueue ActiveConnection -> TVar Int -> TVar Bool -> Pool
Pool Settings
connectionSettings TQueue ActiveConnection
establishedQueue TVar Int
slotsAvailVar TVar Bool
aliveVar)
release :: Pool -> IO ()
release :: Pool -> IO ()
release (Pool Settings
_ TQueue ActiveConnection
_ TVar Int
_ TVar Bool
aliveVar) =
forall a. STM a -> IO a
atomically (forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
aliveVar Bool
False)
data UsageError
=
ConnectionUsageError Connection.ConnectionError
|
SessionUsageError Session.QueryError
|
PoolIsReleasedUsageError
deriving (Int -> UsageError -> ShowS
[UsageError] -> ShowS
UsageError -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UsageError] -> ShowS
$cshowList :: [UsageError] -> ShowS
show :: UsageError -> String
$cshow :: UsageError -> String
showsPrec :: Int -> UsageError -> ShowS
$cshowsPrec :: Int -> UsageError -> ShowS
Show, UsageError -> UsageError -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UsageError -> UsageError -> Bool
$c/= :: UsageError -> UsageError -> Bool
== :: UsageError -> UsageError -> Bool
$c== :: UsageError -> UsageError -> Bool
Eq)
use :: Pool -> Session.Session a -> IO (Either UsageError a)
use :: forall a. Pool -> Session a -> IO (Either UsageError a)
use (Pool Settings
connectionSettings TQueue ActiveConnection
establishedQueue TVar Int
slotsAvailVar TVar Bool
aliveVar) Session a
session =
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall a b. (a -> b) -> a -> b
$
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
Bool
alive <- forall a. TVar a -> STM a
readTVar TVar Bool
aliveVar
if Bool
alive
then
forall a. TQueue a -> STM (Maybe a)
tryReadTQueue TQueue ActiveConnection
establishedQueue forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe ActiveConnection
Nothing -> do
Int
slotsAvail <- forall a. TVar a -> STM a
readTVar TVar Int
slotsAvailVar
if Int
slotsAvail forall a. Ord a => a -> a -> Bool
> Int
0
then
do
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
slotsAvailVar forall a b. (a -> b) -> a -> b
$! forall a. Enum a => a -> a
pred Int
slotsAvail
forall (m :: * -> *) a. Monad m => a -> m a
return IO (Either UsageError a)
acquireConnectionThenUseThenPutItToQueue
else
forall a. STM a
retry
Just (ActiveConnection Int
_ Connection
connection) ->
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection -> IO (Either UsageError a)
useConnectionThenPutItToQueue Connection
connection)
else forall (m :: * -> *) a. Monad m => a -> m a
return (forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left UsageError
PoolIsReleasedUsageError))
where
acquireConnectionThenUseThenPutItToQueue :: IO (Either UsageError a)
acquireConnectionThenUseThenPutItToQueue =
do
Either ConnectionError Connection
res <- Settings -> IO (Either ConnectionError Connection)
Connection.acquire Settings
connectionSettings
case Either ConnectionError Connection
res of
Left ConnectionError
acquisitionError -> do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
slotsAvailVar forall a. Enum a => a -> a
succ
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left (ConnectionError -> UsageError
ConnectionUsageError ConnectionError
acquisitionError))
Right Connection
connection ->
Connection -> IO (Either UsageError a)
useConnectionThenPutItToQueue Connection
connection
useConnectionThenPutItToQueue :: Connection -> IO (Either UsageError a)
useConnectionThenPutItToQueue Connection
connection =
do
Either QueryError a
res <- forall a. Session a -> Connection -> IO (Either QueryError a)
Session.run Session a
session Connection
connection
case Either QueryError a
res of
Left QueryError
queryError -> do
case QueryError
queryError of
Session.QueryError Settings
_ [Text]
_ (Session.ClientError ConnectionError
_) ->
Connection -> IO ()
releaseConnection Connection
connection
QueryError
_ ->
Connection -> IO ()
putConnectionToPool Connection
connection
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left (QueryError -> UsageError
SessionUsageError QueryError
queryError))
Right a
res -> do
Connection -> IO ()
putConnectionToPool Connection
connection
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right a
res)
putConnectionToPool :: Connection -> IO ()
putConnectionToPool Connection
connection =
do
Int
ts <- IO Int
getMillisecondsSinceEpoch
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue TQueue ActiveConnection
establishedQueue (Int -> Connection -> ActiveConnection
ActiveConnection Int
ts Connection
connection)
releaseConnection :: Connection -> IO ()
releaseConnection Connection
connection =
do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
slotsAvailVar forall a. Enum a => a -> a
succ
Connection -> IO ()
Connection.release Connection
connection