module Faktory.Worker
( WorkerHalt(..)
, runWorker
)
where
import Faktory.Prelude
import Control.Concurrent (killThread)
import Data.Aeson
import Data.Aeson.Casing
import qualified Data.Text as T
import Faktory.Client
import Faktory.Job (Job, JobId, jobArg, jobJid)
import Faktory.Settings
import GHC.Generics
import GHC.Stack
data WorkerHalt = WorkerHalt
deriving (Eq, Show, Exception)
newtype BeatPayload = BeatPayload
{ _bpWid :: WorkerId
}
deriving Generic
instance ToJSON BeatPayload where
toJSON = genericToJSON $ aesonPrefix snakeCase
toEncoding = genericToEncoding $ aesonPrefix snakeCase
newtype AckPayload = AckPayload
{ _apJid :: JobId
}
deriving Generic
instance ToJSON AckPayload where
toJSON = genericToJSON $ aesonPrefix snakeCase
toEncoding = genericToEncoding $ aesonPrefix snakeCase
data FailPayload = FailPayload
{ _fpMessage :: Text
, _fpErrtype :: String
, _fpJid :: JobId
, _fpBacktrace :: [String]
}
deriving Generic
instance ToJSON FailPayload where
toJSON = genericToJSON $ aesonPrefix snakeCase
toEncoding = genericToEncoding $ aesonPrefix snakeCase
runWorker :: FromJSON args => Settings -> (args -> IO ()) -> IO ()
runWorker settings f = do
workerId <- randomWorkerId
client <- newClient settings $ Just workerId
beatThreadId <- forkIOWithThrowToParent $ forever $ heartBeat client workerId
forever (processorLoop client settings f)
`catch` (\(_ex :: WorkerHalt) -> pure ())
`finally` (killThread beatThreadId >> closeClient client)
processorLoop :: FromJSON arg => Client -> Settings -> (arg -> IO ()) -> IO ()
processorLoop client settings f = do
let
processAndAck job = do
f $ jobArg job
ackJob client job
emJob <- fetchJob client $ settingsQueue settings
case emJob of
Left err -> settingsLogError settings $ "Invalid Job: " <> err
Right Nothing -> threadDelaySeconds $ settingsWorkerIdleDelay settings
Right (Just job) ->
processAndAck job
`catches` [ Handler $ \(ex :: WorkerHalt) -> throw ex
, Handler $ \(ex :: SomeException) ->
failJob client job $ T.pack $ show ex
]
heartBeat :: Client -> WorkerId -> IO ()
heartBeat client workerId = do
threadDelaySeconds 25
command_ client "BEAT" [encode $ BeatPayload workerId]
fetchJob
:: FromJSON args => Client -> Queue -> IO (Either String (Maybe (Job args)))
fetchJob client queue = commandJSON client "FETCH" [queueArg queue]
ackJob :: HasCallStack => Client -> Job args -> IO ()
ackJob client job = commandOK client "ACK" [encode $ AckPayload $ jobJid job]
failJob :: HasCallStack => Client -> Job args -> Text -> IO ()
failJob client job message =
commandOK client "FAIL" [encode $ FailPayload message "" (jobJid job) []]