-- | High-level interface for a Worker
--
-- Runs forever, @FETCH@-ing Jobs from the given Queue and handing each to your
-- processing function.
module Faktory.Worker
  ( WorkerHalt (..)
  , runWorker
  , runWorkerEnv
  , jobArg
  )
where

import Faktory.Prelude

import Control.Concurrent (killThread)
import Data.Aeson
import Data.Aeson.Casing
import qualified Data.Text as T
import Faktory.Client
import Faktory.Job (Job, JobId, jobArg, jobJid, jobReserveForMicroseconds)
import Faktory.Settings
import GHC.Generics
import GHC.Stack
import System.Timeout (timeout)

-- | If processing functions @'throw'@ this, @'runWorker'@ will exit
data WorkerHalt = WorkerHalt
  deriving stock (WorkerHalt -> WorkerHalt -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WorkerHalt -> WorkerHalt -> Bool
$c/= :: WorkerHalt -> WorkerHalt -> Bool
== :: WorkerHalt -> WorkerHalt -> Bool
$c== :: WorkerHalt -> WorkerHalt -> Bool
Eq, Int -> WorkerHalt -> ShowS
[WorkerHalt] -> ShowS
WorkerHalt -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkerHalt] -> ShowS
$cshowList :: [WorkerHalt] -> ShowS
show :: WorkerHalt -> String
$cshow :: WorkerHalt -> String
showsPrec :: Int -> WorkerHalt -> ShowS
$cshowsPrec :: Int -> WorkerHalt -> ShowS
Show)
  deriving anyclass (Show WorkerHalt
Typeable WorkerHalt
SomeException -> Maybe WorkerHalt
WorkerHalt -> String
WorkerHalt -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: WorkerHalt -> String
$cdisplayException :: WorkerHalt -> String
fromException :: SomeException -> Maybe WorkerHalt
$cfromException :: SomeException -> Maybe WorkerHalt
toException :: WorkerHalt -> SomeException
$ctoException :: WorkerHalt -> SomeException
Exception)

newtype BeatPayload = BeatPayload
  { BeatPayload -> WorkerId
_bpWid :: WorkerId
  }
  deriving stock (forall x. Rep BeatPayload x -> BeatPayload
forall x. BeatPayload -> Rep BeatPayload x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep BeatPayload x -> BeatPayload
$cfrom :: forall x. BeatPayload -> Rep BeatPayload x
Generic)

instance ToJSON BeatPayload where
  toJSON :: BeatPayload -> Value
toJSON = forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
genericToJSON forall a b. (a -> b) -> a -> b
$ ShowS -> Options
aesonPrefix ShowS
snakeCase
  toEncoding :: BeatPayload -> Encoding
toEncoding = forall a.
(Generic a, GToJSON' Encoding Zero (Rep a)) =>
Options -> a -> Encoding
genericToEncoding forall a b. (a -> b) -> a -> b
$ ShowS -> Options
aesonPrefix ShowS
snakeCase

newtype AckPayload = AckPayload
  { AckPayload -> String
_apJid :: JobId
  }
  deriving stock (forall x. Rep AckPayload x -> AckPayload
forall x. AckPayload -> Rep AckPayload x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep AckPayload x -> AckPayload
$cfrom :: forall x. AckPayload -> Rep AckPayload x
Generic)

instance ToJSON AckPayload where
  toJSON :: AckPayload -> Value
toJSON = forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
genericToJSON forall a b. (a -> b) -> a -> b
$ ShowS -> Options
aesonPrefix ShowS
snakeCase
  toEncoding :: AckPayload -> Encoding
toEncoding = forall a.
(Generic a, GToJSON' Encoding Zero (Rep a)) =>
Options -> a -> Encoding
genericToEncoding forall a b. (a -> b) -> a -> b
$ ShowS -> Options
aesonPrefix ShowS
snakeCase

data FailPayload = FailPayload
  { FailPayload -> Text
_fpMessage :: Text
  , FailPayload -> String
_fpErrtype :: String
  , FailPayload -> String
_fpJid :: JobId
  , FailPayload -> [String]
_fpBacktrace :: [String]
  }
  deriving stock (forall x. Rep FailPayload x -> FailPayload
forall x. FailPayload -> Rep FailPayload x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep FailPayload x -> FailPayload
$cfrom :: forall x. FailPayload -> Rep FailPayload x
Generic)

instance ToJSON FailPayload where
  toJSON :: FailPayload -> Value
toJSON = forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
genericToJSON forall a b. (a -> b) -> a -> b
$ ShowS -> Options
aesonPrefix ShowS
snakeCase
  toEncoding :: FailPayload -> Encoding
toEncoding = forall a.
(Generic a, GToJSON' Encoding Zero (Rep a)) =>
Options -> a -> Encoding
genericToEncoding forall a b. (a -> b) -> a -> b
$ ShowS -> Options
aesonPrefix ShowS
snakeCase

runWorker
  :: (HasCallStack, FromJSON args)
  => Settings
  -> WorkerSettings
  -> (Job args -> IO ())
  -> IO ()
runWorker :: forall args.
(HasCallStack, FromJSON args) =>
Settings -> WorkerSettings -> (Job args -> IO ()) -> IO ()
runWorker Settings
settings WorkerSettings
workerSettings Job args -> IO ()
f = do
  WorkerId
workerId <- forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO WorkerId
randomWorkerId forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ WorkerSettings -> Maybe WorkerId
settingsId WorkerSettings
workerSettings
  Client
client <- HasCallStack => Settings -> Maybe WorkerId -> IO Client
newClient Settings
settings forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just WorkerId
workerId
  ThreadId
beatThreadId <- IO () -> IO ThreadId
forkIOWithThrowToParent forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ Client -> WorkerId -> IO ()
heartBeat Client
client WorkerId
workerId

  forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (forall arg.
(HasCallStack, FromJSON arg) =>
Client -> Settings -> WorkerSettings -> (Job arg -> IO ()) -> IO ()
processorLoop Client
client Settings
settings WorkerSettings
workerSettings Job args -> IO ()
f)
    forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` (\(WorkerHalt
_ex :: WorkerHalt) -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
    forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` (ThreadId -> IO ()
killThread ThreadId
beatThreadId forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Client -> IO ()
closeClient Client
client)

runWorkerEnv :: FromJSON args => (Job args -> IO ()) -> IO ()
runWorkerEnv :: forall args. FromJSON args => (Job args -> IO ()) -> IO ()
runWorkerEnv Job args -> IO ()
f = do
  Settings
settings <- IO Settings
envSettings
  WorkerSettings
workerSettings <- IO WorkerSettings
envWorkerSettings
  forall args.
(HasCallStack, FromJSON args) =>
Settings -> WorkerSettings -> (Job args -> IO ()) -> IO ()
runWorker Settings
settings WorkerSettings
workerSettings Job args -> IO ()
f

processorLoop
  :: (HasCallStack, FromJSON arg)
  => Client
  -> Settings
  -> WorkerSettings
  -> (Job arg -> IO ())
  -> IO ()
processorLoop :: forall arg.
(HasCallStack, FromJSON arg) =>
Client -> Settings -> WorkerSettings -> (Job arg -> IO ()) -> IO ()
processorLoop Client
client Settings
settings WorkerSettings
workerSettings Job arg -> IO ()
f = do
  let
    namespace :: Namespace
namespace = ConnectionInfo -> Namespace
connectionInfoNamespace forall a b. (a -> b) -> a -> b
$ Settings -> ConnectionInfo
settingsConnection Settings
settings
    processAndAck :: Job arg -> IO ()
processAndAck Job arg
job = do
      Maybe ()
mResult <- forall a. Int -> IO a -> IO (Maybe a)
timeout (forall arg. Job arg -> Int
jobReserveForMicroseconds Job arg
job) forall a b. (a -> b) -> a -> b
$ Job arg -> IO ()
f Job arg
job
      case Maybe ()
mResult of
        Maybe ()
Nothing -> Settings -> String -> IO ()
settingsLogError Settings
settings String
"Job reservation period expired."
        Just () -> forall args. HasCallStack => Client -> Job args -> IO ()
ackJob Client
client Job arg
job

  Either String (Maybe (Job arg))
emJob <-
    forall args.
FromJSON args =>
Client -> Queue -> IO (Either String (Maybe (Job args)))
fetchJob Client
client forall a b. (a -> b) -> a -> b
$
      Namespace -> Queue -> Queue
namespaceQueue Namespace
namespace forall a b. (a -> b) -> a -> b
$
        WorkerSettings -> Queue
settingsQueue
          WorkerSettings
workerSettings

  case Either String (Maybe (Job arg))
emJob of
    Left String
err -> Settings -> String -> IO ()
settingsLogError Settings
settings forall a b. (a -> b) -> a -> b
$ String
"Invalid Job: " forall a. Semigroup a => a -> a -> a
<> String
err
    Right Maybe (Job arg)
Nothing -> Int -> IO ()
threadDelaySeconds forall a b. (a -> b) -> a -> b
$ WorkerSettings -> Int
settingsIdleDelay WorkerSettings
workerSettings
    Right (Just Job arg
job) ->
      Job arg -> IO ()
processAndAck Job arg
job
        forall (m :: * -> *) a.
(MonadCatch m, MonadThrow m) =>
m a -> [Handler m a] -> m a
`catches` [ forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler forall a b. (a -> b) -> a -> b
$ \(WorkerHalt
ex :: WorkerHalt) -> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw WorkerHalt
ex
                  , forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: SomeException) ->
                      forall args. HasCallStack => Client -> Job args -> Text -> IO ()
failJob Client
client Job arg
job forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> String
show SomeException
ex
                  ]

-- | <https://github.com/contribsys/faktory/wiki/Worker-Lifecycle#heartbeat>
heartBeat :: Client -> WorkerId -> IO ()
heartBeat :: Client -> WorkerId -> IO ()
heartBeat Client
client WorkerId
workerId = do
  Int -> IO ()
threadDelaySeconds Int
25
  Client -> ByteString -> [ByteString] -> IO ()
command_ Client
client ByteString
"BEAT" [forall a. ToJSON a => a -> ByteString
encode forall a b. (a -> b) -> a -> b
$ WorkerId -> BeatPayload
BeatPayload WorkerId
workerId]

fetchJob
  :: FromJSON args => Client -> Queue -> IO (Either String (Maybe (Job args)))
fetchJob :: forall args.
FromJSON args =>
Client -> Queue -> IO (Either String (Maybe (Job args)))
fetchJob Client
client Queue
queue = forall a.
FromJSON a =>
Client
-> ByteString -> [ByteString] -> IO (Either String (Maybe a))
commandJSON Client
client ByteString
"FETCH" [Queue -> ByteString
queueArg Queue
queue]

ackJob :: HasCallStack => Client -> Job args -> IO ()
ackJob :: forall args. HasCallStack => Client -> Job args -> IO ()
ackJob Client
client Job args
job = HasCallStack => Client -> ByteString -> [ByteString] -> IO ()
commandOK Client
client ByteString
"ACK" [forall a. ToJSON a => a -> ByteString
encode forall a b. (a -> b) -> a -> b
$ String -> AckPayload
AckPayload forall a b. (a -> b) -> a -> b
$ forall arg. Job arg -> String
jobJid Job args
job]

failJob :: HasCallStack => Client -> Job args -> Text -> IO ()
failJob :: forall args. HasCallStack => Client -> Job args -> Text -> IO ()
failJob Client
client Job args
job Text
message =
  HasCallStack => Client -> ByteString -> [ByteString] -> IO ()
commandOK Client
client ByteString
"FAIL" [forall a. ToJSON a => a -> ByteString
encode forall a b. (a -> b) -> a -> b
$ Text -> String -> String -> [String] -> FailPayload
FailPayload Text
message String
"" (forall arg. Job arg -> String
jobJid Job args
job) []]