module Database.PostgreSQL.Consumers.Components (
    runConsumer
  , runConsumerWithIdleSignal
  , spawnListener
  , spawnMonitor
  , spawnDispatcher
  ) where

import Control.Applicative
import Control.Concurrent.Lifted
import Control.Concurrent.STM hiding (atomically)
import Control.Exception (AsyncException(ThreadKilled))
import Control.Monad
import Control.Monad.Base
import Control.Monad.Catch
import Control.Monad.Time
import Control.Monad.Trans
import Control.Monad.Trans.Control
import Data.Function
import Data.Int
import Data.Maybe
import Data.Monoid
import Data.Monoid.Utils
import Database.PostgreSQL.PQTypes
import Log
import Prelude
import qualified Control.Concurrent.STM as STM
import qualified Control.Concurrent.Thread.Lifted as T
import qualified Data.Foldable as F
import qualified Data.Map.Strict as M

import Database.PostgreSQL.Consumers.Config
import Database.PostgreSQL.Consumers.Consumer
import Database.PostgreSQL.Consumers.Utils

-- | Run the consumer. The purpose of the returned monadic
-- action is to wait for currently processed jobs and clean up.
-- This function is best used in conjunction with 'finalize' to
-- seamlessly handle the finalization.
runConsumer
  :: ( MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Eq idx, Show idx
     , FromSQL idx, ToSQL idx )
  => ConsumerConfig m idx job
  -> ConnectionSourceM m
  -> m (m ())
runConsumer :: ConsumerConfig m idx job -> ConnectionSourceM m -> m (m ())
runConsumer ConsumerConfig m idx job
cc ConnectionSourceM m
cs = ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
runConsumerWithMaybeIdleSignal ConsumerConfig m idx job
cc ConnectionSourceM m
cs Maybe (TMVar Bool)
forall a. Maybe a
Nothing

runConsumerWithIdleSignal
  :: ( MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Eq idx, Show idx
     , FromSQL idx, ToSQL idx )
  => ConsumerConfig m idx job
  -> ConnectionSourceM m
  -> TMVar Bool
  -> m (m ())
runConsumerWithIdleSignal :: ConsumerConfig m idx job
-> ConnectionSourceM m -> TMVar Bool -> m (m ())
runConsumerWithIdleSignal ConsumerConfig m idx job
cc ConnectionSourceM m
cs TMVar Bool
idleSignal = ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
runConsumerWithMaybeIdleSignal ConsumerConfig m idx job
cc ConnectionSourceM m
cs (TMVar Bool -> Maybe (TMVar Bool)
forall a. a -> Maybe a
Just TMVar Bool
idleSignal)

-- | Run the consumer and also signal whenever the consumer is waiting for
-- getNotification or threadDelay.
runConsumerWithMaybeIdleSignal
  :: ( MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Eq idx, Show idx
     , FromSQL idx, ToSQL idx )
  => ConsumerConfig m idx job
  -> ConnectionSourceM m
  -> Maybe (TMVar Bool)
  -> m (m ())
runConsumerWithMaybeIdleSignal :: ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
runConsumerWithMaybeIdleSignal ConsumerConfig m idx job
cc ConnectionSourceM m
cs Maybe (TMVar Bool)
mIdleSignal
  | ConsumerConfig m idx job -> Int
forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccMaxRunningJobs ConsumerConfig m idx job
cc Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1 = do
      Text -> m ()
forall (m :: * -> *). MonadLog m => Text -> m ()
logInfo_ Text
"ccMaxRunningJobs < 1, not starting the consumer"
      m () -> m (m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (m () -> m (m ())) -> m () -> m (m ())
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
  | Bool
otherwise = do
      MVar ()
semaphore <- () -> m (MVar ())
forall (m :: * -> *) a. MonadBase IO m => a -> m (MVar a)
newMVar ()
      TVar (Map ThreadId idx)
runningJobsInfo <- IO (TVar (Map ThreadId idx)) -> m (TVar (Map ThreadId idx))
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO (TVar (Map ThreadId idx)) -> m (TVar (Map ThreadId idx)))
-> IO (TVar (Map ThreadId idx)) -> m (TVar (Map ThreadId idx))
forall a b. (a -> b) -> a -> b
$ Map ThreadId idx -> IO (TVar (Map ThreadId idx))
forall a. a -> IO (TVar a)
newTVarIO Map ThreadId idx
forall k a. Map k a
M.empty
      TVar Int
runningJobs <- IO (TVar Int) -> m (TVar Int)
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO (TVar Int) -> m (TVar Int)) -> IO (TVar Int) -> m (TVar Int)
forall a b. (a -> b) -> a -> b
$ Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0

      Either DBException ()
skipLockedTest :: Either DBException () <-
        m () -> m (Either DBException ())
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (m () -> m (Either DBException ()))
-> (DBT m () -> m ()) -> DBT m () -> m (Either DBException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionSourceM m -> TransactionSettings -> DBT m () -> m ()
forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
defaultTransactionSettings (DBT m () -> m (Either DBException ()))
-> DBT m () -> m (Either DBException ())
forall a b. (a -> b) -> a -> b
$
        SQL -> DBT m ()
forall (m :: * -> *). MonadDB m => SQL -> m ()
runSQL_ SQL
"SELECT TRUE FOR UPDATE SKIP LOCKED"
      let useSkipLocked :: Bool
useSkipLocked = (DBException -> Bool)
-> (() -> Bool) -> Either DBException () -> Bool
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Bool -> DBException -> Bool
forall a b. a -> b -> a
const Bool
False) (Bool -> () -> Bool
forall a b. a -> b -> a
const Bool
True) Either DBException ()
skipLockedTest

      ConsumerID
cid <- ConsumerConfig m idx job -> ConnectionSourceM m -> m ConsumerID
forall (m :: * -> *) (n :: * -> *) idx job.
(MonadBase IO m, MonadMask m, MonadTime m) =>
ConsumerConfig n idx job -> ConnectionSourceM m -> m ConsumerID
registerConsumer ConsumerConfig m idx job
cc ConnectionSourceM m
cs
      [Pair] -> m (m ()) -> m (m ())
forall (m :: * -> *) a. MonadLog m => [Pair] -> m a -> m a
localData [Key
"consumer_id" Key -> String -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= ConsumerID -> String
forall a. Show a => a -> String
show ConsumerID
cid] (m (m ()) -> m (m ())) -> m (m ()) -> m (m ())
forall a b. (a -> b) -> a -> b
$ do
        ThreadId
listener <- ConsumerConfig m idx job
-> ConnectionSourceM m -> MVar () -> m ThreadId
forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadMask m) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> MVar () -> m ThreadId
spawnListener ConsumerConfig m idx job
cc ConnectionSourceM m
cs MVar ()
semaphore
        ThreadId
monitor <- Text -> m ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadLog m => Text -> m a -> m a
localDomain Text
"monitor" (m ThreadId -> m ThreadId) -> m ThreadId -> m ThreadId
forall a b. (a -> b) -> a -> b
$ ConsumerConfig m idx job
-> ConnectionSourceM m -> ConsumerID -> m ThreadId
forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Show idx, FromSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> ConsumerID -> m ThreadId
spawnMonitor ConsumerConfig m idx job
cc ConnectionSourceM m
cs ConsumerID
cid
        ThreadId
dispatcher <- Text -> m ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadLog m => Text -> m a -> m a
localDomain Text
"dispatcher" (m ThreadId -> m ThreadId) -> m ThreadId -> m ThreadId
forall a b. (a -> b) -> a -> b
$ ConsumerConfig m idx job
-> Bool
-> ConnectionSourceM m
-> ConsumerID
-> MVar ()
-> TVar (Map ThreadId idx)
-> TVar Int
-> Maybe (TMVar Bool)
-> m ThreadId
forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Show idx, ToSQL idx) =>
ConsumerConfig m idx job
-> Bool
-> ConnectionSourceM m
-> ConsumerID
-> MVar ()
-> TVar (Map ThreadId idx)
-> TVar Int
-> Maybe (TMVar Bool)
-> m ThreadId
spawnDispatcher ConsumerConfig m idx job
cc Bool
useSkipLocked
          ConnectionSourceM m
cs ConsumerID
cid MVar ()
semaphore TVar (Map ThreadId idx)
runningJobsInfo TVar Int
runningJobs Maybe (TMVar Bool)
mIdleSignal
        m () -> m (m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (m () -> m (m ())) -> (m () -> m ()) -> m () -> m (m ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> m () -> m ()
forall (m :: * -> *) a. MonadLog m => Text -> m a -> m a
localDomain Text
"finalizer" (m () -> m (m ())) -> m () -> m (m ())
forall a b. (a -> b) -> a -> b
$ do
          ThreadId -> m ()
forall (m :: * -> *). MonadBase IO m => ThreadId -> m ()
stopExecution ThreadId
listener
          ThreadId -> m ()
forall (m :: * -> *). MonadBase IO m => ThreadId -> m ()
stopExecution ThreadId
dispatcher
          TVar (Map ThreadId idx) -> TVar Int -> m ()
forall (m :: * -> *) a k a.
(MonadBase IO m, MonadLog m, Num a, Eq a, Eq k, Eq a, Show a) =>
TVar (Map k a) -> TVar a -> m ()
waitForRunningJobs TVar (Map ThreadId idx)
runningJobsInfo TVar Int
runningJobs
          ThreadId -> m ()
forall (m :: * -> *). MonadBase IO m => ThreadId -> m ()
stopExecution ThreadId
monitor
          ConsumerConfig m idx job
-> ConnectionSourceM m -> ConsumerID -> m ()
forall (m :: * -> *) (n :: * -> *) idx job.
(MonadBase IO m, MonadMask m) =>
ConsumerConfig n idx job
-> ConnectionSourceM m -> ConsumerID -> m ()
unregisterConsumer ConsumerConfig m idx job
cc ConnectionSourceM m
cs ConsumerID
cid
  where
    waitForRunningJobs :: TVar (Map k a) -> TVar a -> m ()
waitForRunningJobs TVar (Map k a)
runningJobsInfo TVar a
runningJobs = do
      Map k a
initialJobs <- IO (Map k a) -> m (Map k a)
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO (Map k a) -> m (Map k a)) -> IO (Map k a) -> m (Map k a)
forall a b. (a -> b) -> a -> b
$ TVar (Map k a) -> IO (Map k a)
forall a. TVar a -> IO a
readTVarIO TVar (Map k a)
runningJobsInfo
      (((Map k a -> m ()) -> Map k a -> m ()) -> Map k a -> m ()
forall a. (a -> a) -> a
`fix` Map k a
initialJobs) (((Map k a -> m ()) -> Map k a -> m ()) -> m ())
-> ((Map k a -> m ()) -> Map k a -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Map k a -> m ()
loop Map k a
jobsInfo -> do
        -- If jobs are still running, display info about them.
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Map k a -> Bool
forall k a. Map k a -> Bool
M.null Map k a
jobsInfo) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
          Text -> Value -> m ()
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Waiting for running jobs" (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [
              Key
"job_id" Key -> [String] -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Map k a -> [String]
forall k. Map k a -> [String]
showJobsInfo Map k a
jobsInfo
            ]
        m (m ()) -> m ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (m (m ()) -> m ())
-> (STM (m ()) -> m (m ())) -> STM (m ()) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (m ()) -> m (m ())
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM (m ()) -> m ()) -> STM (m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ do
          a
jobs <- TVar a -> STM a
forall a. TVar a -> STM a
readTVar TVar a
runningJobs
          if a
jobs a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
0
            then m () -> STM (m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (m () -> STM (m ())) -> m () -> STM (m ())
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            else do
              Map k a
newJobsInfo <- TVar (Map k a) -> STM (Map k a)
forall a. TVar a -> STM a
readTVar TVar (Map k a)
runningJobsInfo
              -- If jobs info didn't change, wait for it to change.
              -- Otherwise loop so it either displays the new info
              -- or exits if there are no jobs running anymore.
              if (Map k a
newJobsInfo Map k a -> Map k a -> Bool
forall a. Eq a => a -> a -> Bool
== Map k a
jobsInfo)
                then STM (m ())
forall a. STM a
retry
                else m () -> STM (m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (m () -> STM (m ())) -> m () -> STM (m ())
forall a b. (a -> b) -> a -> b
$ Map k a -> m ()
loop Map k a
newJobsInfo
      where
        showJobsInfo :: Map k a -> [String]
showJobsInfo = (a -> [String] -> [String]) -> [String] -> Map k a -> [String]
forall a b k. (a -> b -> b) -> b -> Map k a -> b
M.foldr (\a
idx [String]
acc -> a -> String
forall a. Show a => a -> String
show a
idx String -> [String] -> [String]
forall a. a -> [a] -> [a]
: [String]
acc) []

-- | Spawn a thread that generates signals for the
-- dispatcher to probe the database for incoming jobs.
spawnListener
  :: (MonadBaseControl IO m, MonadMask m)
  => ConsumerConfig m idx job
  -> ConnectionSourceM m
  -> MVar ()
  -> m ThreadId
spawnListener :: ConsumerConfig m idx job
-> ConnectionSourceM m -> MVar () -> m ThreadId
spawnListener ConsumerConfig m idx job
cc ConnectionSourceM m
cs MVar ()
semaphore =
  String -> m () -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"listener" (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$
  case ConsumerConfig m idx job -> Maybe Channel
forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> Maybe Channel
ccNotificationChannel ConsumerConfig m idx job
cc of
    Just Channel
chan ->
      ConnectionSourceM m -> TransactionSettings -> DBT m () -> m ()
forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
noTs (DBT m () -> m ())
-> (DBT_ m m Bool -> DBT m ()) -> DBT_ m m Bool -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DBT m () -> DBT m () -> DBT m () -> DBT m ()
forall (m :: * -> *) a c b. MonadMask m => m a -> m c -> m b -> m b
bracket_ (Channel -> DBT m ()
forall (m :: * -> *). MonadDB m => Channel -> m ()
listen Channel
chan) (Channel -> DBT m ()
forall (m :: * -> *). MonadDB m => Channel -> m ()
unlisten Channel
chan)
      (DBT m () -> DBT m ())
-> (DBT_ m m Bool -> DBT m ()) -> DBT_ m m Bool -> DBT m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DBT_ m m Bool -> DBT m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (DBT_ m m Bool -> m ()) -> DBT_ m m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ do
      -- If there are many notifications, we need to collect them
      -- as soon as possible, because they are stored in memory by
      -- libpq. They are also not squashed, so we perform the
      -- squashing ourselves with the help of MVar ().
      DBT_ m m (Maybe Notification) -> DBT m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (DBT_ m m (Maybe Notification) -> DBT m ())
-> (Int -> DBT_ m m (Maybe Notification)) -> Int -> DBT m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> DBT_ m m (Maybe Notification)
forall (m :: * -> *). MonadDB m => Int -> m (Maybe Notification)
getNotification (Int -> DBT m ()) -> Int -> DBT m ()
forall a b. (a -> b) -> a -> b
$ ConsumerConfig m idx job -> Int
forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationTimeout ConsumerConfig m idx job
cc
      m Bool -> DBT_ m m Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m Bool
signalDispatcher
    Maybe Channel
Nothing -> m Bool -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ do
      IO () -> m ()
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO () -> m ()) -> (Int -> IO ()) -> Int -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
forall (m :: * -> *). MonadBase IO m => Int -> m ()
threadDelay (Int -> m ()) -> Int -> m ()
forall a b. (a -> b) -> a -> b
$ ConsumerConfig m idx job -> Int
forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationTimeout ConsumerConfig m idx job
cc
      m Bool
signalDispatcher
  where
    signalDispatcher :: m Bool
signalDispatcher = do
      IO Bool -> m Bool
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall (m :: * -> *) a. MonadBase IO m => MVar a -> a -> m Bool
tryPutMVar MVar ()
semaphore ()

    noTs :: TransactionSettings
noTs = TransactionSettings
defaultTransactionSettings {
      tsAutoTransaction :: Bool
tsAutoTransaction = Bool
False
    }

-- | Spawn a thread that monitors working consumers
-- for activity and periodically updates its own.
spawnMonitor
  :: forall m idx job. (MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
                        Show idx, FromSQL idx)
  => ConsumerConfig m idx job
  -> ConnectionSourceM m
  -> ConsumerID
  -> m ThreadId
spawnMonitor :: ConsumerConfig m idx job
-> ConnectionSourceM m -> ConsumerID -> m ThreadId
spawnMonitor ConsumerConfig{Int
[SQL]
Maybe Channel
RawSQL ()
job -> idx
job -> m Result
row -> job
SomeException -> job -> m Action
ccOnException :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> SomeException -> job -> m Action
ccProcessJob :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> m Result
ccJobIndex :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> idx
ccJobFetcher :: ()
ccJobSelectors :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> [SQL]
ccConsumersTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccJobsTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccOnException :: SomeException -> job -> m Action
ccProcessJob :: job -> m Result
ccMaxRunningJobs :: Int
ccNotificationTimeout :: Int
ccNotificationChannel :: Maybe Channel
ccJobIndex :: job -> idx
ccJobFetcher :: row -> job
ccJobSelectors :: [SQL]
ccConsumersTable :: RawSQL ()
ccJobsTable :: RawSQL ()
ccNotificationTimeout :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationChannel :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> Maybe Channel
ccMaxRunningJobs :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
..} ConnectionSourceM m
cs ConsumerID
cid = String -> m () -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"monitor" (m () -> m ThreadId) -> (m () -> m ()) -> m () -> m ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$ do
  ConnectionSourceM m -> TransactionSettings -> DBT m () -> m ()
forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts (DBT m () -> m ()) -> DBT m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    UTCTime
now <- DBT_ m m UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
    -- Update last_activity of the consumer.
    Bool
ok <- SQL -> DBT_ m m Bool
forall (m :: * -> *). (MonadDB m, MonadThrow m) => SQL -> m Bool
runSQL01 (SQL -> DBT_ m m Bool) -> SQL -> DBT_ m m Bool
forall a b. (a -> b) -> a -> b
$ [SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat [
        SQL
"UPDATE" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccConsumersTable
      , SQL
"SET last_activity = " SQL -> UTCTime -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
      , SQL
"WHERE id =" SQL -> ConsumerID -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> ConsumerID
cid
      , SQL
"  AND name =" SQL -> Text -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> RawSQL () -> Text
unRawSQL RawSQL ()
ccJobsTable
      ]
    if Bool
ok
      then Text -> DBT m ()
forall (m :: * -> *). MonadLog m => Text -> m ()
logInfo_ Text
"Activity of the consumer updated"
      else do
        Text -> DBT m ()
forall (m :: * -> *). MonadLog m => Text -> m ()
logInfo_ (Text -> DBT m ()) -> Text -> DBT m ()
forall a b. (a -> b) -> a -> b
$ Text
"Consumer is not registered"
        AsyncException -> DBT m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM AsyncException
ThreadKilled
  -- Freeing jobs locked by inactive consumers needs to happen
  -- exactly once, otherwise it's possible to free it twice, after
  -- it was already marked as reserved by other consumer, so let's
  -- run it in serializable transaction.
  (Int
inactiveConsumers, [idx]
freedJobs) <- ConnectionSourceM m
-> TransactionSettings -> DBT m (Int, [idx]) -> m (Int, [idx])
forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
tsSerializable (DBT m (Int, [idx]) -> m (Int, [idx]))
-> DBT m (Int, [idx]) -> m (Int, [idx])
forall a b. (a -> b) -> a -> b
$ do
    UTCTime
now <- DBT_ m m UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
    -- Delete all inactive (assumed dead) consumers and get their ids.
    SQL -> DBT m ()
forall (m :: * -> *). MonadDB m => SQL -> m ()
runSQL_ (SQL -> DBT m ()) -> SQL -> DBT m ()
forall a b. (a -> b) -> a -> b
$ [SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat [
        SQL
"DELETE FROM" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccConsumersTable
      , SQL
"  WHERE last_activity +" SQL -> Interval -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> Int32 -> Interval
iminutes Int32
1 SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
"<= " SQL -> UTCTime -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
      , SQL
"    AND name =" SQL -> Text -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> RawSQL () -> Text
unRawSQL RawSQL ()
ccJobsTable
      , SQL
"  RETURNING id::bigint"
      ]
    [Int64]
inactive :: [Int64] <- (Identity Int64 -> Int64) -> DBT_ m m [Int64]
forall (m :: * -> *) row t.
(MonadDB m, FromRow row) =>
(row -> t) -> m [t]
fetchMany Identity Int64 -> Int64
forall a. Identity a -> a
runIdentity
    -- Reset reserved jobs manually, do not rely on the foreign key constraint
    -- to do its job. We also reset finished_at to correctly bump number of
    -- attempts on the next try.
    [idx]
freedJobs :: [idx] <- if [Int64] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Int64]
inactive
      then [idx] -> DBT_ m m [idx]
forall (m :: * -> *) a. Monad m => a -> m a
return []
      else do
        SQL -> DBT m ()
forall (m :: * -> *). MonadDB m => SQL -> m ()
runSQL_ (SQL -> DBT m ()) -> SQL -> DBT m ()
forall a b. (a -> b) -> a -> b
$ [SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat
          [ SQL
"UPDATE" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccJobsTable
          , SQL
"SET reserved_by = NULL"
          , SQL
"  , finished_at = NULL"
          , SQL
"WHERE reserved_by = ANY(" SQL -> Array1 Int64 -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [Int64] -> Array1 Int64
forall a. [a] -> Array1 a
Array1 [Int64]
inactive SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
")"
          , SQL
"RETURNING id"
          ]
        (Identity idx -> idx) -> DBT_ m m [idx]
forall (m :: * -> *) row t.
(MonadDB m, FromRow row) =>
(row -> t) -> m [t]
fetchMany Identity idx -> idx
forall a. Identity a -> a
runIdentity
    (Int, [idx]) -> DBT m (Int, [idx])
forall (m :: * -> *) a. Monad m => a -> m a
return ([Int64] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int64]
inactive, [idx]
freedJobs)
  Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
inactiveConsumers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    Text -> Value -> m ()
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Unregistered inactive consumers" (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [
        Key
"inactive_consumers" Key -> Int -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Int
inactiveConsumers
      ]
  Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([idx] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [idx]
freedJobs) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    Text -> Value -> m ()
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Freed locked jobs" (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [
        Key
"freed_jobs" Key -> [String] -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= (idx -> String) -> [idx] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map idx -> String
forall a. Show a => a -> String
show [idx]
freedJobs
      ]
  IO () -> m ()
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO () -> m ()) -> (Int -> IO ()) -> Int -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
forall (m :: * -> *). MonadBase IO m => Int -> m ()
threadDelay (Int -> m ()) -> Int -> m ()
forall a b. (a -> b) -> a -> b
$ Int
30 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000 -- wait 30 seconds
  where
    tsSerializable :: TransactionSettings
tsSerializable = TransactionSettings
ts {
      tsIsolationLevel :: IsolationLevel
tsIsolationLevel = IsolationLevel
Serializable
    }

-- | Spawn a thread that reserves and processes jobs.
spawnDispatcher
  :: forall m idx job. ( MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m
                       , Show idx, ToSQL idx )
  => ConsumerConfig m idx job
  -> Bool
  -> ConnectionSourceM m
  -> ConsumerID
  -> MVar ()
  -> TVar (M.Map ThreadId idx)
  -> TVar Int
  -> Maybe (TMVar Bool)
  -> m ThreadId
spawnDispatcher :: ConsumerConfig m idx job
-> Bool
-> ConnectionSourceM m
-> ConsumerID
-> MVar ()
-> TVar (Map ThreadId idx)
-> TVar Int
-> Maybe (TMVar Bool)
-> m ThreadId
spawnDispatcher ConsumerConfig{Int
[SQL]
Maybe Channel
RawSQL ()
job -> idx
job -> m Result
row -> job
SomeException -> job -> m Action
ccOnException :: SomeException -> job -> m Action
ccProcessJob :: job -> m Result
ccMaxRunningJobs :: Int
ccNotificationTimeout :: Int
ccNotificationChannel :: Maybe Channel
ccJobIndex :: job -> idx
ccJobFetcher :: row -> job
ccJobSelectors :: [SQL]
ccConsumersTable :: RawSQL ()
ccJobsTable :: RawSQL ()
ccOnException :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> SomeException -> job -> m Action
ccProcessJob :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> m Result
ccJobIndex :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> idx
ccJobFetcher :: ()
ccJobSelectors :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> [SQL]
ccConsumersTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccJobsTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccNotificationTimeout :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationChannel :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> Maybe Channel
ccMaxRunningJobs :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
..} Bool
useSkipLocked ConnectionSourceM m
cs ConsumerID
cid MVar ()
semaphore
  TVar (Map ThreadId idx)
runningJobsInfo TVar Int
runningJobs Maybe (TMVar Bool)
mIdleSignal =
  String -> m () -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"dispatcher" (m () -> m ThreadId) -> (m () -> m ()) -> m () -> m ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$ do
    m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> m ()
forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
takeMVar MVar ()
semaphore
    Bool
someJobWasProcessed <- Int -> m Bool
loop Int
1
    if Bool
someJobWasProcessed
      then Bool -> m ()
forall (m' :: * -> *). MonadBaseControl IO m' => Bool -> m' ()
setIdle Bool
False
      else Bool -> m ()
forall (m' :: * -> *). MonadBaseControl IO m' => Bool -> m' ()
setIdle Bool
True
  where
    setIdle :: forall m' . (MonadBaseControl IO m') => Bool -> m' ()
    setIdle :: Bool -> m' ()
setIdle Bool
isIdle = case Maybe (TMVar Bool)
mIdleSignal of
      Maybe (TMVar Bool)
Nothing -> () -> m' ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      Just TMVar Bool
idleSignal -> STM () -> m' ()
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM () -> m' ()) -> STM () -> m' ()
forall a b. (a -> b) -> a -> b
$ do
        Maybe Bool
_ <- TMVar Bool -> STM (Maybe Bool)
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar TMVar Bool
idleSignal
        TMVar Bool -> Bool -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar Bool
idleSignal Bool
isIdle

    loop :: Int -> m Bool
    loop :: Int -> m Bool
loop Int
limit = do
      ([job]
batch, Int
batchSize) <- Int -> m ([job], Int)
reserveJobs Int
limit
      Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
batchSize Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        Text -> Value -> m ()
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Processing batch" (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [
            Key
"batch_size" Key -> Int -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Int
batchSize
          ]
        -- Update runningJobs before forking so that we can
        -- adjust maxBatchSize appropriately later. We also
        -- need to mask asynchronous exceptions here as we
        -- rely on correct value of runningJobs to perform
        -- graceful termination.
        ((forall a. m a -> m a) -> m ()) -> m ()
forall (m :: * -> *) b.
MonadMask m =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. m a -> m a) -> m ()) -> m ())
-> ((forall a. m a -> m a) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
restore -> do
          STM () -> m ()
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
runningJobs (Int -> Int -> Int
forall a. Num a => a -> a -> a
+Int
batchSize)
          let subtractJobs :: m ()
subtractJobs = STM () -> m ()
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
runningJobs (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
batchSize)
          m ThreadId -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m ThreadId -> m ()) -> (m () -> m ThreadId) -> m () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> m () -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"batch processor"
            (m () -> m ThreadId) -> (m () -> m ()) -> m () -> m ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (m () -> m () -> m ()
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` m ()
subtractJobs) (m () -> m ()) -> (m () -> m ()) -> m () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m ()
forall a. m a -> m a
restore (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            (job -> m (job, m (Result Result)))
-> [job] -> m [(job, m (Result Result))]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM job -> m (job, m (Result Result))
startJob [job]
batch m [(job, m (Result Result))]
-> ([(job, m (Result Result))] -> m [(idx, Result)])
-> m [(idx, Result)]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ((job, m (Result Result)) -> m (idx, Result))
-> [(job, m (Result Result))] -> m [(idx, Result)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (job, m (Result Result)) -> m (idx, Result)
joinJob m [(idx, Result)] -> ([(idx, Result)] -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= [(idx, Result)] -> m ()
updateJobs

        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
batchSize Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
limit) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
          Int
maxBatchSize <- STM Int -> m Int
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM Int -> m Int) -> STM Int -> m Int
forall a b. (a -> b) -> a -> b
$ do
            Int
jobs <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
runningJobs
            Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
jobs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
ccMaxRunningJobs) STM ()
forall a. STM a
retry
            Int -> STM Int
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> STM Int) -> Int -> STM Int
forall a b. (a -> b) -> a -> b
$ Int
ccMaxRunningJobs Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
jobs
          m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> m Bool
loop (Int -> m Bool) -> Int -> m Bool
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
maxBatchSize (Int
2Int -> Int -> Int
forall a. Num a => a -> a -> a
*Int
limit)

      Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
batchSize Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0)

    reserveJobs :: Int -> m ([job], Int)
    reserveJobs :: Int -> m ([job], Int)
reserveJobs Int
limit = ConnectionSourceM m
-> TransactionSettings -> DBT m ([job], Int) -> m ([job], Int)
forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts (DBT m ([job], Int) -> m ([job], Int))
-> DBT m ([job], Int) -> m ([job], Int)
forall a b. (a -> b) -> a -> b
$ do
      UTCTime
now <- DBT_ m m UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
      Int
n <- SQL -> DBT_ m m Int
forall (m :: * -> *). MonadDB m => SQL -> m Int
runSQL (SQL -> DBT_ m m Int) -> SQL -> DBT_ m m Int
forall a b. (a -> b) -> a -> b
$ [SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat [
          SQL
"UPDATE" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccJobsTable SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
"SET"
        , SQL
"  reserved_by =" SQL -> ConsumerID -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> ConsumerID
cid
        , SQL
", attempts = CASE"
        , SQL
"    WHEN finished_at IS NULL THEN attempts + 1"
        , SQL
"    ELSE 1"
        , SQL
"  END"
        , SQL
"WHERE id IN (" SQL -> SQL -> SQL
forall a. Semigroup a => a -> a -> a
<> UTCTime -> SQL
reservedJobs UTCTime
now SQL -> SQL -> SQL
forall a. Semigroup a => a -> a -> a
<> SQL
")"
        , SQL
"RETURNING" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL -> [SQL] -> SQL
forall m. Monoid m => m -> [m] -> m
mintercalate SQL
", " [SQL]
ccJobSelectors
        ]
      -- Decode lazily as we want the transaction to be as short as possible.
      (, Int
n) ([job] -> ([job], Int))
-> (QueryResult row -> [job]) -> QueryResult row -> ([job], Int)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QueryResult job -> [job]
forall (t :: * -> *) a. Foldable t => t a -> [a]
F.toList (QueryResult job -> [job])
-> (QueryResult row -> QueryResult job) -> QueryResult row -> [job]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (row -> job) -> QueryResult row -> QueryResult job
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap row -> job
ccJobFetcher (QueryResult row -> ([job], Int))
-> DBT_ m m (QueryResult row) -> DBT m ([job], Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> DBT_ m m (QueryResult row)
forall (m :: * -> *) row.
(MonadDB m, MonadThrow m, FromRow row) =>
m (QueryResult row)
queryResult
      where
        reservedJobs :: UTCTime -> SQL
        reservedJobs :: UTCTime -> SQL
reservedJobs UTCTime
now = [SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat [
            SQL
"SELECT id FROM" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccJobsTable
            -- Converting id to text and hashing it may seem silly,
            -- especially when we're dealing with integers in the
            -- first place, but even in such case the overhead is
            -- small enough (converting 100k integers to text and
            -- hashing them takes around 15 ms on i7) to be worth the
            -- generality. Note that even if IDs of two pending jobs
            -- produce the same hash, it just means that in the worst
            -- case they will be processed by the same consumer.
          , if Bool
useSkipLocked
            then SQL
"WHERE TRUE"
            else SQL
"WHERE pg_try_advisory_xact_lock("
                 SQL -> Text -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> RawSQL () -> Text
unRawSQL RawSQL ()
ccJobsTable
                 SQL -> SQL -> SQL
forall a. Semigroup a => a -> a -> a
<> SQL
"::regclass::integer, hashtext(id::text))"
          , SQL
"       AND reserved_by IS NULL"
          , SQL
"       AND run_at IS NOT NULL"
          , SQL
"       AND run_at <= " SQL -> UTCTime -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
          , SQL
"       ORDER BY run_at"
          , SQL
"LIMIT" SQL -> Int -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> Int
limit
            -- Use SKIP LOCKED if available. Otherwise utilise
            -- advisory locks.
          , if Bool
useSkipLocked
            then SQL
"FOR UPDATE SKIP LOCKED"
            else SQL
"FOR UPDATE"
          ]

    -- | Spawn each job in a separate thread.
    startJob :: job -> m (job, m (T.Result Result))
    startJob :: job -> m (job, m (Result Result))
startJob job
job = do
      (ThreadId
_, m (Result Result)
joinFork) <- ((forall a. m a -> m a) -> m (ThreadId, m (Result Result)))
-> m (ThreadId, m (Result Result))
forall (m :: * -> *) b.
MonadMask m =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. m a -> m a) -> m (ThreadId, m (Result Result)))
 -> m (ThreadId, m (Result Result)))
-> ((forall a. m a -> m a) -> m (ThreadId, m (Result Result)))
-> m (ThreadId, m (Result Result))
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
restore -> m Result -> m (ThreadId, m (Result Result))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (ThreadId, m (Result a))
T.fork (m Result -> m (ThreadId, m (Result Result)))
-> m Result -> m (ThreadId, m (Result Result))
forall a b. (a -> b) -> a -> b
$ do
        ThreadId
tid <- m ThreadId
forall (m :: * -> *). MonadBase IO m => m ThreadId
myThreadId
        m () -> m () -> m Result -> m Result
forall (m :: * -> *) a c b. MonadMask m => m a -> m c -> m b -> m b
bracket_ (ThreadId -> m ()
registerJob ThreadId
tid) (ThreadId -> m ()
unregisterJob ThreadId
tid) (m Result -> m Result)
-> (m Result -> m Result) -> m Result -> m Result
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m Result -> m Result
forall a. m a -> m a
restore (m Result -> m Result) -> m Result -> m Result
forall a b. (a -> b) -> a -> b
$ do
          job -> m Result
ccProcessJob job
job
      (job, m (Result Result)) -> m (job, m (Result Result))
forall (m :: * -> *) a. Monad m => a -> m a
return (job
job, m (Result Result)
joinFork)
      where
        registerJob :: ThreadId -> m ()
registerJob ThreadId
tid = STM () -> m ()
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
          TVar (Map ThreadId idx)
-> (Map ThreadId idx -> Map ThreadId idx) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map ThreadId idx)
runningJobsInfo ((Map ThreadId idx -> Map ThreadId idx) -> STM ())
-> (idx -> Map ThreadId idx -> Map ThreadId idx) -> idx -> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ThreadId -> idx -> Map ThreadId idx -> Map ThreadId idx
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert ThreadId
tid (idx -> STM ()) -> idx -> STM ()
forall a b. (a -> b) -> a -> b
$ job -> idx
ccJobIndex job
job
        unregisterJob :: ThreadId -> m ()
unregisterJob ThreadId
tid = STM () -> m ()
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
           TVar (Map ThreadId idx)
-> (Map ThreadId idx -> Map ThreadId idx) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map ThreadId idx)
runningJobsInfo ((Map ThreadId idx -> Map ThreadId idx) -> STM ())
-> (Map ThreadId idx -> Map ThreadId idx) -> STM ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> Map ThreadId idx -> Map ThreadId idx
forall k a. Ord k => k -> Map k a -> Map k a
M.delete ThreadId
tid

    -- | Wait for all the jobs and collect their results.
    joinJob :: (job, m (T.Result Result)) -> m (idx, Result)
    joinJob :: (job, m (Result Result)) -> m (idx, Result)
joinJob (job
job, m (Result Result)
joinFork) = m (Result Result)
joinFork m (Result Result)
-> (Result Result -> m (idx, Result)) -> m (idx, Result)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Result Result
eres -> case Result Result
eres of
      Right Result
result -> (idx, Result) -> m (idx, Result)
forall (m :: * -> *) a. Monad m => a -> m a
return (job -> idx
ccJobIndex job
job, Result
result)
      Left SomeException
ex -> do
        Action
action <- SomeException -> job -> m Action
ccOnException SomeException
ex job
job
        Text -> Value -> m ()
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logAttention Text
"Unexpected exception caught while processing job" (Value -> m ()) -> Value -> m ()
forall a b. (a -> b) -> a -> b
$
          [Pair] -> Value
object [
            Key
"job_id" Key -> String -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= idx -> String
forall a. Show a => a -> String
show (job -> idx
ccJobIndex job
job)
          , Key
"exception" Key -> String -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= SomeException -> String
forall a. Show a => a -> String
show SomeException
ex
          , Key
"action" Key -> String -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Action -> String
forall a. Show a => a -> String
show Action
action
          ]
        (idx, Result) -> m (idx, Result)
forall (m :: * -> *) a. Monad m => a -> m a
return (job -> idx
ccJobIndex job
job, Action -> Result
Failed Action
action)

    -- | Update status of the jobs.
    updateJobs :: [(idx, Result)] -> m ()
    updateJobs :: [(idx, Result)] -> m ()
updateJobs [(idx, Result)]
results = ConnectionSourceM m -> TransactionSettings -> DBT m () -> m ()
forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts (DBT m () -> m ()) -> DBT m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      UTCTime
now <- DBT_ m m UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
      SQL -> DBT m ()
forall (m :: * -> *). MonadDB m => SQL -> m ()
runSQL_ (SQL -> DBT m ()) -> SQL -> DBT m ()
forall a b. (a -> b) -> a -> b
$ [SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat [
          SQL
"WITH removed AS ("
        , SQL
"  DELETE FROM" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccJobsTable
        , SQL
"  WHERE id = ANY(" SQL -> Array1 idx -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [idx] -> Array1 idx
forall a. [a] -> Array1 a
Array1 [idx]
deletes SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
")"
        , SQL
")"
        , SQL
"UPDATE" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccJobsTable SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
"SET"
        , SQL
"  reserved_by = NULL"
        , SQL
", run_at = CASE"
        , SQL
"    WHEN FALSE THEN run_at"
        ,      [SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat ([SQL] -> SQL) -> [SQL] -> SQL
forall a b. (a -> b) -> a -> b
$ (Either Interval UTCTime -> [idx] -> [SQL] -> [SQL])
-> [SQL] -> Map (Either Interval UTCTime) [idx] -> [SQL]
forall k a b. (k -> a -> b -> b) -> b -> Map k a -> b
M.foldrWithKey (UTCTime -> Either Interval UTCTime -> [idx] -> [SQL] -> [SQL]
forall a t t t.
(Show a, Show t, Show t, Show t, ToSQL a, ToSQL t, ToSQL t,
 ToSQL t) =>
t -> Either t t -> [a] -> [SQL] -> [SQL]
retryToSQL UTCTime
now) [] Map (Either Interval UTCTime) [idx]
retries
        , SQL
"    ELSE NULL" -- processed
        , SQL
"  END"
        , SQL
", finished_at = CASE"
        , SQL
"    WHEN id = ANY(" SQL -> Array1 idx -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [idx] -> Array1 idx
forall a. [a] -> Array1 a
Array1 [idx]
successes SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
") THEN " SQL -> UTCTime -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
        , SQL
"    ELSE NULL"
        , SQL
"  END"
        , SQL
"WHERE id = ANY(" SQL -> Array1 idx -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [idx] -> Array1 idx
forall a. [a] -> Array1 a
Array1 (((idx, Result) -> idx) -> [(idx, Result)] -> [idx]
forall a b. (a -> b) -> [a] -> [b]
map (idx, Result) -> idx
forall a b. (a, b) -> a
fst [(idx, Result)]
updates) SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
")"
        ]
      where
        retryToSQL :: t -> Either t t -> [a] -> [SQL] -> [SQL]
retryToSQL t
now (Left t
int) [a]
ids =
          (SQL
"WHEN id = ANY(" SQL -> Array1 a -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [a] -> Array1 a
forall a. [a] -> Array1 a
Array1 [a]
ids SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
") THEN " SQL -> t -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> t
now SQL -> SQL -> SQL
forall a. Semigroup a => a -> a -> a
<> SQL
" +" SQL -> t -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> t
int SQL -> [SQL] -> [SQL]
forall a. a -> [a] -> [a]
:)
        retryToSQL t
_   (Right t
time) [a]
ids =
          (SQL
"WHEN id = ANY(" SQL -> Array1 a -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> [a] -> Array1 a
forall a. [a] -> Array1 a
Array1 [a]
ids SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
") THEN" SQL -> t -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> t
time SQL -> [SQL] -> [SQL]
forall a. a -> [a] -> [a]
:)

        retries :: Map (Either Interval UTCTime) [idx]
retries = ((idx, Action)
 -> Map (Either Interval UTCTime) [idx]
 -> Map (Either Interval UTCTime) [idx])
-> Map (Either Interval UTCTime) [idx]
-> [(idx, Action)]
-> Map (Either Interval UTCTime) [idx]
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (idx, Action)
-> Map (Either Interval UTCTime) [idx]
-> Map (Either Interval UTCTime) [idx]
forall a.
(a, Action)
-> Map (Either Interval UTCTime) [a]
-> Map (Either Interval UTCTime) [a]
step Map (Either Interval UTCTime) [idx]
forall k a. Map k a
M.empty ([(idx, Action)] -> Map (Either Interval UTCTime) [idx])
-> [(idx, Action)] -> Map (Either Interval UTCTime) [idx]
forall a b. (a -> b) -> a -> b
$ ((idx, Result) -> (idx, Action))
-> [(idx, Result)] -> [(idx, Action)]
forall a b. (a -> b) -> [a] -> [b]
map (idx, Result) -> (idx, Action)
forall a. (a, Result) -> (a, Action)
f [(idx, Result)]
updates
          where
            f :: (a, Result) -> (a, Action)
f (a
idx, Result
result) = case Result
result of
              Ok     Action
action -> (a
idx, Action
action)
              Failed Action
action -> (a
idx, Action
action)

            step :: (a, Action)
-> Map (Either Interval UTCTime) [a]
-> Map (Either Interval UTCTime) [a]
step (a
idx, Action
action) Map (Either Interval UTCTime) [a]
iretries = case Action
action of
              Action
MarkProcessed  -> Map (Either Interval UTCTime) [a]
iretries
              RerunAfter Interval
int -> ([a] -> [a] -> [a])
-> Either Interval UTCTime
-> [a]
-> Map (Either Interval UTCTime) [a]
-> Map (Either Interval UTCTime) [a]
forall k a. Ord k => (a -> a -> a) -> k -> a -> Map k a -> Map k a
M.insertWith [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
(++) (Interval -> Either Interval UTCTime
forall a b. a -> Either a b
Left Interval
int) [a
idx] Map (Either Interval UTCTime) [a]
iretries
              RerunAt UTCTime
time   -> ([a] -> [a] -> [a])
-> Either Interval UTCTime
-> [a]
-> Map (Either Interval UTCTime) [a]
-> Map (Either Interval UTCTime) [a]
forall k a. Ord k => (a -> a -> a) -> k -> a -> Map k a -> Map k a
M.insertWith [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
(++) (UTCTime -> Either Interval UTCTime
forall a b. b -> Either a b
Right UTCTime
time) [a
idx] Map (Either Interval UTCTime) [a]
iretries
              Action
Remove         -> String -> Map (Either Interval UTCTime) [a]
forall a. HasCallStack => String -> a
error
                String
"updateJobs: Remove should've been filtered out"

        successes :: [idx]
successes = ((idx, Result) -> [idx] -> [idx])
-> [idx] -> [(idx, Result)] -> [idx]
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (idx, Result) -> [idx] -> [idx]
forall a. (a, Result) -> [a] -> [a]
step [] [(idx, Result)]
updates
          where
            step :: (a, Result) -> [a] -> [a]
step (a
idx, Ok     Action
_) [a]
acc = a
idx a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
acc
            step (a
_,   Failed Action
_) [a]
acc =       [a]
acc

        ([idx]
deletes, [(idx, Result)]
updates) = ((idx, Result)
 -> ([idx], [(idx, Result)]) -> ([idx], [(idx, Result)]))
-> ([idx], [(idx, Result)])
-> [(idx, Result)]
-> ([idx], [(idx, Result)])
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (idx, Result)
-> ([idx], [(idx, Result)]) -> ([idx], [(idx, Result)])
forall a.
(a, Result) -> ([a], [(a, Result)]) -> ([a], [(a, Result)])
step ([], []) [(idx, Result)]
results
          where
            step :: (a, Result) -> ([a], [(a, Result)]) -> ([a], [(a, Result)])
step job :: (a, Result)
job@(a
idx, Result
result) ([a]
ideletes, [(a, Result)]
iupdates) = case Result
result of
              Ok     Action
Remove -> (a
idx a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
ideletes, [(a, Result)]
iupdates)
              Failed Action
Remove -> (a
idx a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
ideletes, [(a, Result)]
iupdates)
              Result
_             -> ([a]
ideletes, (a, Result)
job (a, Result) -> [(a, Result)] -> [(a, Result)]
forall a. a -> [a] -> [a]
: [(a, Result)]
iupdates)

----------------------------------------

ts :: TransactionSettings
ts :: TransactionSettings
ts = TransactionSettings
defaultTransactionSettings {
  -- PostgreSQL doesn't seem to handle very high amount of
  -- concurrent transactions that modify multiple rows in
  -- the same table well (see updateJobs) and sometimes (very
  -- rarely though) ends up in a deadlock. It doesn't matter
  -- much though, we just restart the transaction in such case.
  tsRestartPredicate :: Maybe RestartPredicate
tsRestartPredicate = RestartPredicate -> Maybe RestartPredicate
forall a. a -> Maybe a
Just (RestartPredicate -> Maybe RestartPredicate)
-> ((DetailedQueryError -> Integer -> Bool) -> RestartPredicate)
-> (DetailedQueryError -> Integer -> Bool)
-> Maybe RestartPredicate
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (DetailedQueryError -> Integer -> Bool) -> RestartPredicate
forall e. Exception e => (e -> Integer -> Bool) -> RestartPredicate
RestartPredicate
  ((DetailedQueryError -> Integer -> Bool) -> Maybe RestartPredicate)
-> (DetailedQueryError -> Integer -> Bool)
-> Maybe RestartPredicate
forall a b. (a -> b) -> a -> b
$ \DetailedQueryError
e Integer
_ -> DetailedQueryError -> ErrorCode
qeErrorCode DetailedQueryError
e ErrorCode -> ErrorCode -> Bool
forall a. Eq a => a -> a -> Bool
== ErrorCode
DeadlockDetected
         Bool -> Bool -> Bool
|| DetailedQueryError -> ErrorCode
qeErrorCode DetailedQueryError
e ErrorCode -> ErrorCode -> Bool
forall a. Eq a => a -> a -> Bool
== ErrorCode
SerializationFailure
}

atomically :: MonadBase IO m => STM a -> m a
atomically :: STM a -> m a
atomically = IO a -> m a
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO a -> m a) -> (STM a -> IO a) -> STM a -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall a. STM a -> IO a
STM.atomically