-- generic amqp consumer import Control.Concurrent import qualified Control.Exception as X import Control.Monad import qualified Data.ByteString.Lazy.Char8 as BL import qualified Data.Text as T import Data.Time import Data.Version (showVersion) import Network.AMQP import Network.AMQP.Utils.Connection import Network.AMQP.Utils.Helpers import Network.AMQP.Utils.Options import Paths_amqp_utils (version) import System.Environment import System.Exit import System.IO import System.Process main :: IO () main = do hr "starting" tid <- myThreadId args <- getArgs >>= parseargs "konsum" let addiArgs = reverse $ additionalArgs args printparam' "client version" $ "amqp-utils " ++ (showVersion version) (conn, chan) <- connect args -- set prefetch printparam' "prefetch" $ show $ preFetch args qos chan 0 (fromIntegral $ preFetch args) False -- attach to given queue? or build exclusive queue and bind it? queue <- maybe (tempQueue chan (tmpQName args) (bindings args) (currentExchange args)) (return) (fmap T.pack (qName args)) printparam' "queue name" $ T.unpack queue printparam "shown body chars" $ fmap show $ anRiss args printparam "temp dir" $ tempDir args printparam "callback" $ fileProcess args printparam "callback args" $ listToMaybeUnwords addiArgs -- subscribe to the queue ctag <- consumeMsgs chan queue (if ack args then Ack else NoAck) (myCallback args addiArgs tid) printparam' "consumer tag" $ T.unpack ctag printparam' "send acks" $ show (ack args) printparam "requeue if rejected" $ if (ack args) then Just (show (requeuenack args)) else Nothing hr "entering main loop" X.catch (forever $ threadDelay 5000000) (\exception -> printparam' "exception" $ show (exception :: X.SomeException)) closeConnection conn -- | exclusive temp queue tempQueue :: Channel -> String -> [(String, String)] -> String -> IO T.Text tempQueue chan tmpqname bindlist x = do (q, _, _) <- declareQueue chan newQueue {queueExclusive = True, queueName = T.pack tmpqname} mapM_ (\(xchange, bkey) -> bindQueue chan q (T.pack xchange) (T.pack bkey) >> printparam' "binding" (xchange ++ ":" ++ bkey)) (if null bindlist then [(x, "#")] else bindlist) return q -- | process received message myCallback :: Args -> [String] -> ThreadId -> (Message, Envelope) -> IO () myCallback a addi tid m@(_, envi) = do let numstring = show $ envDeliveryTag envi hr $ "BEGIN " ++ numstring now <- getZonedTime callbackoptions <- printmsg m (anRiss a) now either (\e -> printparam' "ERROR" (show (e :: X.SomeException)) >> reje envi a) return =<< X.try (optionalFileStuff m callbackoptions addi numstring a tid) hr $ "END " ++ numstring -- | if the message is to be saved -- and maybe processed further optionalFileStuff :: (Message, Envelope) -> [String] -> [String] -> String -> Args -> ThreadId -> IO () optionalFileStuff (msg, envi) callbackoptions addi numstring a tid = do path <- saveFile (tempDir a) numstring (msgBody msg) printparam "saved to" path let callbackcmdline = liftM2 (constructCallbackCmdLine callbackoptions addi numstring) (fileProcess a) path printparam "calling" $ fmap unwords callbackcmdline maybe (acke envi a) (\c -> forkFinally (doProc a numstring envi c) (either (throwTo tid) return) >> return ()) callbackcmdline -- | save message into temp file saveFile :: Maybe String -> String -> BL.ByteString -> IO (Maybe String) saveFile Nothing _ _ = return Nothing saveFile (Just tempD) numstring body = do (p, h) <- openBinaryTempFileWithDefaultPermissions tempD ("konsum-" ++ numstring ++ "-.tmp") BL.hPut h body hClose h return $ Just p -- | construct cmdline for callback script constructCallbackCmdLine :: [String] -> [String] -> String -> String -> String -> [String] constructCallbackCmdLine opts addi num exe path = exe : "-f" : path : "-n" : num : opts ++ addi -- | call callback script doProc :: Args -> String -> Envelope -> [String] -> IO () doProc a numstring envi (exe:args) = do (_, _, _, processhandle) <- createProcess (proc exe args) {std_out = Inherit, std_err = Inherit} exitcode <- waitForProcess processhandle printparam' (numstring ++ " call returned") $ show exitcode case exitcode of ExitSuccess -> acke envi a ExitFailure _ -> reje envi a doProc _ _ _ _ = return () -- | ack acke :: Envelope -> Args -> IO () acke envi a | (ack a) = ackEnv envi | otherwise = return () -- | reject reje :: Envelope -> Args -> IO () reje envi a | (ack a) = rejectEnv envi (requeuenack a) | otherwise = return ()