module Network.AMQP.Worker.Worker where
import Control.Concurrent (threadDelay)
import Control.Exception (SomeException(..))
import Control.Monad.Base (liftBase)
import Control.Monad.Catch (Exception(..), catch, MonadCatch)
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad (forever)
import Data.Aeson (FromJSON)
import Data.ByteString.Lazy (ByteString)
import Data.Default (Default(..))
import Network.AMQP.Worker.Connection (Connection)
import Network.AMQP.Worker.Queue (Queue(..))
import Network.AMQP.Worker.Message (Message(..), ConsumeResult(..), ParseError(..), Microseconds, consumeNext)
worker :: (FromJSON a, MonadBaseControl IO m, MonadCatch m) => WorkerOptions -> Connection -> Queue key a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m ()
worker opts conn queue onError action =
forever $ do
eres <- consumeNext (pollDelay opts) conn queue
case eres of
Error (ParseError reason bd) ->
onError (MessageParseError bd reason)
Parsed msg ->
catch
(action msg)
(onError . OtherException (body msg))
liftBase $ threadDelay (loopDelay opts)
data WorkerOptions = WorkerOptions
{ pollDelay :: Microseconds
, loopDelay :: Microseconds
} deriving (Show, Eq)
instance Default WorkerOptions where
def = WorkerOptions
{ pollDelay = 10 * 1000
, loopDelay = 0
}
data WorkerException e
= MessageParseError ByteString String
| OtherException ByteString e
deriving (Show, Eq)
instance (Exception e) => Exception (WorkerException e)