{-# LANGUAGE FlexibleContexts #-}
module Network.AMQP.Worker.Worker where
import Control.Concurrent (threadDelay)
import Control.Exception (SomeException(..))
import Control.Monad.IO.Class (liftIO, MonadIO)
import Control.Monad.Catch (Exception(..), catch, MonadCatch)
import Control.Monad (forever)
import Data.Aeson (FromJSON)
import Data.ByteString.Lazy (ByteString)
import Data.Default (Default(..))
import Network.AMQP.Worker.Connection (Connection)
import Network.AMQP.Worker.Queue (Queue(..))
import Network.AMQP.Worker.Message (Message(..), ConsumeResult(..), ParseError(..), Microseconds, consumeNext)
worker :: (FromJSON a, MonadIO m, MonadCatch m) => Connection -> WorkerOptions -> Queue a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m ()
worker conn opts queue onError action =
forever $ do
eres <- consumeNext (pollDelay opts) conn queue
case eres of
Error (ParseError reason bd) ->
onError (MessageParseError bd reason)
Parsed msg ->
catch
(action msg)
(onError . OtherException (body msg))
liftIO $ threadDelay (loopDelay opts)
data WorkerOptions = WorkerOptions
{ pollDelay :: Microseconds
, loopDelay :: Microseconds
} deriving (Show, Eq)
instance Default WorkerOptions where
def = WorkerOptions
{ pollDelay = 10 * 1000
, loopDelay = 0
}
data WorkerException e
= MessageParseError ByteString String
| OtherException ByteString e
deriving (Show, Eq)
instance (Exception e) => Exception (WorkerException e)