{-# LANGUAGE OverloadedStrings #-} -- generic AMQP rpc client import Control.Concurrent import qualified Control.Exception as X import Control.Monad import qualified Data.ByteString.Lazy.Char8 as BL import qualified Data.Text as T import Data.Time import Data.Time.Clock.POSIX import Data.Version (showVersion) import Network.AMQP import Network.AMQP.Utils.Connection import Network.AMQP.Utils.Helpers import Network.AMQP.Utils.Options import Paths_amqp_utils (version) import System.Environment import System.Exit main :: IO () main = do hr "starting" tid <- myThreadId args <- getArgs >>= parseargs 'p' X.onException (printparam' "timeout" $ show $ timeout args) (error $ "invalid timeout") printparam' "client version" $ "amqp-utils " ++ (showVersion version) printparam' "destination queue" $ tmpQName args (conn, chan) <- connect args addChannelExceptionHandler chan (X.throwTo tid) (q, _, _) <- declareQueue chan newQueue {queueExclusive = True} ctag <- consumeMsgs chan q NoAck (rpcClientCallback tid args) printparam' "consumer tag" $ T.unpack ctag message <- BL.readFile (inputFile args) now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds hr "publishing request" _ <- publishMsg chan (T.pack $ currentExchange args) (T.pack $ tmpQName args) newMsg { msgBody = message , msgReplyTo = Just q , msgCorrelationID = corrid args , msgExpiration = msgexp args , msgTimestamp = Just now } hr "waiting for answer" _ <- forkIO (threadDelay (floor (1000000 * timeout args)) >> throwTo tid TimeoutException) X.catch (forever $ threadDelay 200000) (\x -> do ec <- exceptionHandler x hr "closing connection" closeConnection conn printparam' "exiting" $ show ec exitWith ec) exceptionHandler :: RpcException -> IO (ExitCode) exceptionHandler ReceivedException = return ExitSuccess exceptionHandler TimeoutException = return $ ExitFailure 1 rpcClientCallback :: ThreadId -> Args -> (Message, Envelope) -> IO () rpcClientCallback tid a m@(_, env) = do let numstring = show $ envDeliveryTag env hr $ "received " ++ numstring now <- getZonedTime _ <- X.catch (printmsg m (anRiss a) now) (\x -> X.throwTo tid (x :: X.SomeException) >> return []) throwTo tid ReceivedException data RpcException = ReceivedException | TimeoutException deriving (Show) instance X.Exception RpcException