-- | High-level interface for a Worker
--
-- Runs forever, @FETCH@-ing Jobs from the given Queue and handing each to your
-- processing function.
--
module Faktory.Worker
  ( WorkerHalt(..)
  , runWorker
  , runWorkerEnv
  , jobArg
  )
where

import Faktory.Prelude

import Control.Concurrent (killThread)
import Data.Aeson
import Data.Aeson.Casing
import qualified Data.Text as T
import Faktory.Client
import Faktory.Job (Job, JobId, jobArg, jobJid, jobReserveForMicroseconds)
import Faktory.Settings
import GHC.Generics
import GHC.Stack
import System.Timeout (timeout)

-- | If processing functions @'throw'@ this, @'runWorker'@ will exit
data WorkerHalt = WorkerHalt
  deriving stock (WorkerHalt -> WorkerHalt -> Bool
(WorkerHalt -> WorkerHalt -> Bool)
-> (WorkerHalt -> WorkerHalt -> Bool) -> Eq WorkerHalt
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WorkerHalt -> WorkerHalt -> Bool
$c/= :: WorkerHalt -> WorkerHalt -> Bool
== :: WorkerHalt -> WorkerHalt -> Bool
$c== :: WorkerHalt -> WorkerHalt -> Bool
Eq, Int -> WorkerHalt -> ShowS
[WorkerHalt] -> ShowS
WorkerHalt -> String
(Int -> WorkerHalt -> ShowS)
-> (WorkerHalt -> String)
-> ([WorkerHalt] -> ShowS)
-> Show WorkerHalt
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkerHalt] -> ShowS
$cshowList :: [WorkerHalt] -> ShowS
show :: WorkerHalt -> String
$cshow :: WorkerHalt -> String
showsPrec :: Int -> WorkerHalt -> ShowS
$cshowsPrec :: Int -> WorkerHalt -> ShowS
Show)
  deriving anyclass Show WorkerHalt
Typeable WorkerHalt
Typeable WorkerHalt
-> Show WorkerHalt
-> (WorkerHalt -> SomeException)
-> (SomeException -> Maybe WorkerHalt)
-> (WorkerHalt -> String)
-> Exception WorkerHalt
SomeException -> Maybe WorkerHalt
WorkerHalt -> String
WorkerHalt -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: WorkerHalt -> String
$cdisplayException :: WorkerHalt -> String
fromException :: SomeException -> Maybe WorkerHalt
$cfromException :: SomeException -> Maybe WorkerHalt
toException :: WorkerHalt -> SomeException
$ctoException :: WorkerHalt -> SomeException
$cp2Exception :: Show WorkerHalt
$cp1Exception :: Typeable WorkerHalt
Exception

newtype BeatPayload = BeatPayload
  { BeatPayload -> WorkerId
_bpWid :: WorkerId
  }
  deriving stock (forall x. BeatPayload -> Rep BeatPayload x)
-> (forall x. Rep BeatPayload x -> BeatPayload)
-> Generic BeatPayload
forall x. Rep BeatPayload x -> BeatPayload
forall x. BeatPayload -> Rep BeatPayload x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep BeatPayload x -> BeatPayload
$cfrom :: forall x. BeatPayload -> Rep BeatPayload x
Generic

instance ToJSON BeatPayload where
  toJSON :: BeatPayload -> Value
toJSON = Options -> BeatPayload -> Value
forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
genericToJSON (Options -> BeatPayload -> Value)
-> Options -> BeatPayload -> Value
forall a b. (a -> b) -> a -> b
$ ShowS -> Options
aesonPrefix ShowS
snakeCase
  toEncoding :: BeatPayload -> Encoding
toEncoding = Options -> BeatPayload -> Encoding
forall a.
(Generic a, GToJSON' Encoding Zero (Rep a)) =>
Options -> a -> Encoding
genericToEncoding (Options -> BeatPayload -> Encoding)
-> Options -> BeatPayload -> Encoding
forall a b. (a -> b) -> a -> b
$ ShowS -> Options
aesonPrefix ShowS
snakeCase

newtype AckPayload = AckPayload
  { AckPayload -> String
_apJid :: JobId
  }
  deriving stock (forall x. AckPayload -> Rep AckPayload x)
-> (forall x. Rep AckPayload x -> AckPayload) -> Generic AckPayload
forall x. Rep AckPayload x -> AckPayload
forall x. AckPayload -> Rep AckPayload x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep AckPayload x -> AckPayload
$cfrom :: forall x. AckPayload -> Rep AckPayload x
Generic

instance ToJSON AckPayload where
  toJSON :: AckPayload -> Value
toJSON = Options -> AckPayload -> Value
forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
genericToJSON (Options -> AckPayload -> Value) -> Options -> AckPayload -> Value
forall a b. (a -> b) -> a -> b
$ ShowS -> Options
aesonPrefix ShowS
snakeCase
  toEncoding :: AckPayload -> Encoding
toEncoding = Options -> AckPayload -> Encoding
forall a.
(Generic a, GToJSON' Encoding Zero (Rep a)) =>
Options -> a -> Encoding
genericToEncoding (Options -> AckPayload -> Encoding)
-> Options -> AckPayload -> Encoding
forall a b. (a -> b) -> a -> b
$ ShowS -> Options
aesonPrefix ShowS
snakeCase

data FailPayload = FailPayload
  { FailPayload -> Text
_fpMessage :: Text
  , FailPayload -> String
_fpErrtype :: String
  , FailPayload -> String
_fpJid :: JobId
  , FailPayload -> [String]
_fpBacktrace :: [String]
  }
  deriving stock (forall x. FailPayload -> Rep FailPayload x)
-> (forall x. Rep FailPayload x -> FailPayload)
-> Generic FailPayload
forall x. Rep FailPayload x -> FailPayload
forall x. FailPayload -> Rep FailPayload x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep FailPayload x -> FailPayload
$cfrom :: forall x. FailPayload -> Rep FailPayload x
Generic

instance ToJSON FailPayload where
  toJSON :: FailPayload -> Value
toJSON = Options -> FailPayload -> Value
forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
genericToJSON (Options -> FailPayload -> Value)
-> Options -> FailPayload -> Value
forall a b. (a -> b) -> a -> b
$ ShowS -> Options
aesonPrefix ShowS
snakeCase
  toEncoding :: FailPayload -> Encoding
toEncoding = Options -> FailPayload -> Encoding
forall a.
(Generic a, GToJSON' Encoding Zero (Rep a)) =>
Options -> a -> Encoding
genericToEncoding (Options -> FailPayload -> Encoding)
-> Options -> FailPayload -> Encoding
forall a b. (a -> b) -> a -> b
$ ShowS -> Options
aesonPrefix ShowS
snakeCase

runWorker
  :: (HasCallStack, FromJSON args)
  => Settings
  -> WorkerSettings
  -> (Job args -> IO ())
  -> IO ()
runWorker :: Settings -> WorkerSettings -> (Job args -> IO ()) -> IO ()
runWorker Settings
settings WorkerSettings
workerSettings Job args -> IO ()
f = do
  WorkerId
workerId <- IO WorkerId
-> (WorkerId -> IO WorkerId) -> Maybe WorkerId -> IO WorkerId
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO WorkerId
randomWorkerId WorkerId -> IO WorkerId
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe WorkerId -> IO WorkerId) -> Maybe WorkerId -> IO WorkerId
forall a b. (a -> b) -> a -> b
$ WorkerSettings -> Maybe WorkerId
settingsId WorkerSettings
workerSettings
  Client
client <- HasCallStack => Settings -> Maybe WorkerId -> IO Client
Settings -> Maybe WorkerId -> IO Client
newClient Settings
settings (Maybe WorkerId -> IO Client) -> Maybe WorkerId -> IO Client
forall a b. (a -> b) -> a -> b
$ WorkerId -> Maybe WorkerId
forall a. a -> Maybe a
Just WorkerId
workerId
  ThreadId
beatThreadId <- IO () -> IO ThreadId
forkIOWithThrowToParent (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Client -> WorkerId -> IO ()
heartBeat Client
client WorkerId
workerId

  IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Client
-> Settings -> WorkerSettings -> (Job args -> IO ()) -> IO ()
forall arg.
(HasCallStack, FromJSON arg) =>
Client -> Settings -> WorkerSettings -> (Job arg -> IO ()) -> IO ()
processorLoop Client
client Settings
settings WorkerSettings
workerSettings Job args -> IO ()
f)
    IO () -> (WorkerHalt -> IO ()) -> IO ()
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` (\(WorkerHalt
_ex :: WorkerHalt) -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
    IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` (ThreadId -> IO ()
killThread ThreadId
beatThreadId IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Client -> IO ()
closeClient Client
client)

runWorkerEnv :: FromJSON args => (Job args -> IO ()) -> IO ()
runWorkerEnv :: (Job args -> IO ()) -> IO ()
runWorkerEnv Job args -> IO ()
f = do
  Settings
settings <- IO Settings
envSettings
  WorkerSettings
workerSettings <- IO WorkerSettings
envWorkerSettings
  Settings -> WorkerSettings -> (Job args -> IO ()) -> IO ()
forall args.
(HasCallStack, FromJSON args) =>
Settings -> WorkerSettings -> (Job args -> IO ()) -> IO ()
runWorker Settings
settings WorkerSettings
workerSettings Job args -> IO ()
f

processorLoop
  :: (HasCallStack, FromJSON arg)
  => Client
  -> Settings
  -> WorkerSettings
  -> (Job arg -> IO ())
  -> IO ()
processorLoop :: Client -> Settings -> WorkerSettings -> (Job arg -> IO ()) -> IO ()
processorLoop Client
client Settings
settings WorkerSettings
workerSettings Job arg -> IO ()
f = do
  let
    namespace :: Namespace
namespace = ConnectionInfo -> Namespace
connectionInfoNamespace (ConnectionInfo -> Namespace) -> ConnectionInfo -> Namespace
forall a b. (a -> b) -> a -> b
$ Settings -> ConnectionInfo
settingsConnection Settings
settings
    processAndAck :: Job arg -> IO ()
processAndAck Job arg
job = do
      Maybe ()
mResult <- Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Job arg -> Int
forall arg. Job arg -> Int
jobReserveForMicroseconds Job arg
job) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ Job arg -> IO ()
f Job arg
job
      case Maybe ()
mResult of
        Maybe ()
Nothing -> Settings -> String -> IO ()
settingsLogError Settings
settings String
"Job reservation period expired."
        Just () -> Client -> Job arg -> IO ()
forall args. HasCallStack => Client -> Job args -> IO ()
ackJob Client
client Job arg
job

  Either String (Maybe (Job arg))
emJob <- Client -> Queue -> IO (Either String (Maybe (Job arg)))
forall args.
FromJSON args =>
Client -> Queue -> IO (Either String (Maybe (Job args)))
fetchJob Client
client (Queue -> IO (Either String (Maybe (Job arg))))
-> Queue -> IO (Either String (Maybe (Job arg)))
forall a b. (a -> b) -> a -> b
$ Namespace -> Queue -> Queue
namespaceQueue Namespace
namespace (Queue -> Queue) -> Queue -> Queue
forall a b. (a -> b) -> a -> b
$ WorkerSettings -> Queue
settingsQueue
    WorkerSettings
workerSettings

  case Either String (Maybe (Job arg))
emJob of
    Left String
err -> Settings -> String -> IO ()
settingsLogError Settings
settings (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Invalid Job: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
err
    Right Maybe (Job arg)
Nothing -> Int -> IO ()
threadDelaySeconds (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ WorkerSettings -> Int
settingsIdleDelay WorkerSettings
workerSettings
    Right (Just Job arg
job) ->
      Job arg -> IO ()
processAndAck Job arg
job
        IO () -> [Handler IO ()] -> IO ()
forall (m :: * -> *) a.
(MonadCatch m, MonadThrow m) =>
m a -> [Handler m a] -> m a
`catches` [ (WorkerHalt -> IO ()) -> Handler IO ()
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((WorkerHalt -> IO ()) -> Handler IO ())
-> (WorkerHalt -> IO ()) -> Handler IO ()
forall a b. (a -> b) -> a -> b
$ \(WorkerHalt
ex :: WorkerHalt) -> WorkerHalt -> IO ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw WorkerHalt
ex
                  , (SomeException -> IO ()) -> Handler IO ()
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((SomeException -> IO ()) -> Handler IO ())
-> (SomeException -> IO ()) -> Handler IO ()
forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: SomeException) ->
                    Client -> Job arg -> Text -> IO ()
forall args. HasCallStack => Client -> Job args -> Text -> IO ()
failJob Client
client Job arg
job (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show SomeException
ex
                  ]

-- | <https://github.com/contribsys/faktory/wiki/Worker-Lifecycle#heartbeat>
heartBeat :: Client -> WorkerId -> IO ()
heartBeat :: Client -> WorkerId -> IO ()
heartBeat Client
client WorkerId
workerId = do
  Int -> IO ()
threadDelaySeconds Int
25
  Client -> ByteString -> [ByteString] -> IO ()
command_ Client
client ByteString
"BEAT" [BeatPayload -> ByteString
forall a. ToJSON a => a -> ByteString
encode (BeatPayload -> ByteString) -> BeatPayload -> ByteString
forall a b. (a -> b) -> a -> b
$ WorkerId -> BeatPayload
BeatPayload WorkerId
workerId]

fetchJob
  :: FromJSON args => Client -> Queue -> IO (Either String (Maybe (Job args)))
fetchJob :: Client -> Queue -> IO (Either String (Maybe (Job args)))
fetchJob Client
client Queue
queue = Client
-> ByteString
-> [ByteString]
-> IO (Either String (Maybe (Job args)))
forall a.
FromJSON a =>
Client
-> ByteString -> [ByteString] -> IO (Either String (Maybe a))
commandJSON Client
client ByteString
"FETCH" [Queue -> ByteString
queueArg Queue
queue]

ackJob :: HasCallStack => Client -> Job args -> IO ()
ackJob :: Client -> Job args -> IO ()
ackJob Client
client Job args
job = HasCallStack => Client -> ByteString -> [ByteString] -> IO ()
Client -> ByteString -> [ByteString] -> IO ()
commandOK Client
client ByteString
"ACK" [AckPayload -> ByteString
forall a. ToJSON a => a -> ByteString
encode (AckPayload -> ByteString) -> AckPayload -> ByteString
forall a b. (a -> b) -> a -> b
$ String -> AckPayload
AckPayload (String -> AckPayload) -> String -> AckPayload
forall a b. (a -> b) -> a -> b
$ Job args -> String
forall arg. Job arg -> String
jobJid Job args
job]

failJob :: HasCallStack => Client -> Job args -> Text -> IO ()
failJob :: Client -> Job args -> Text -> IO ()
failJob Client
client Job args
job Text
message =
  HasCallStack => Client -> ByteString -> [ByteString] -> IO ()
Client -> ByteString -> [ByteString] -> IO ()
commandOK Client
client ByteString
"FAIL" [FailPayload -> ByteString
forall a. ToJSON a => a -> ByteString
encode (FailPayload -> ByteString) -> FailPayload -> ByteString
forall a b. (a -> b) -> a -> b
$ Text -> String -> String -> [String] -> FailPayload
FailPayload Text
message String
"" (Job args -> String
forall arg. Job arg -> String
jobJid Job args
job) []]