{-# LANGUAGE OverloadedStrings #-} -- generic amqp consumer -- -- compile: -- ghc -O2 -threaded --make konsum.hs -- -- run: -- ./konsum -- ./konsum -o amqp.example.com -p 5673 -T -k amqp-key.pem -c amqp-crt.pem -y vhost -x exchange -X./callback.sh -a -c -a callback.config.sh -f 2 -r routing.key.# -l 500 -- ./konsum -o amqp.example.com -U user -P pass -q queue -t -- -- custom CA cert via enviroment: -- $ env SYSTEM_CERTIFICATE_PATH=/etc/amqp/cacert.crt ./konsum -T -y vhost -x exchange -- -- Stop with ^C import Control.Concurrent import qualified Control.Exception as X import Control.Monad import qualified Data.ByteString.Lazy.Char8 as BL import qualified Data.Text as T import Data.Time import Data.Version (showVersion) import Network.AMQP import Network.AMQP.Utils.Connection import Network.AMQP.Utils.Helpers import Network.AMQP.Utils.Options import Paths_amqp_utils (version) import System.Environment import System.Exit import System.IO import System.Process main :: IO () main = do hr "starting" tid <- myThreadId args <- getArgs >>= parseargs "konsum" let addiArgs = reverse $ additionalArgs args printparam' "client version" $ "amqp-utils " ++ (showVersion version) (conn, chan) <- connect args -- set prefetch printparam' "prefetch" $ show $ preFetch args qos chan 0 (fromIntegral $ preFetch args) False -- attach to given queue? or build exclusive queue and bind it? queue <- maybe (tempQueue chan (tmpQName args) (bindings args) (currentExchange args)) (return) (fmap T.pack (qName args)) printparam' "queue name" $ T.unpack queue printparam "shown body chars" $ fmap show $ anRiss args printparam "temp dir" $ tempDir args printparam "callback" $ fileProcess args printparam "callback args" $ listToMaybeUnwords addiArgs -- subscribe to the queue ctag <- consumeMsgs chan queue Ack (myCallback (anRiss args) (fileProcess args) (tempDir args) addiArgs tid) printparam' "consumer tag" $ T.unpack ctag hr "entering main loop" X.catch (forever $ threadDelay 5000000) (\exception -> printparam' "exception" $ show (exception :: X.SomeException)) closeConnection conn -- | exclusive temp queue tempQueue :: Channel -> String -> [(String, String)] -> String -> IO T.Text tempQueue chan tmpqname bindlist x = do (q, _, _) <- declareQueue chan newQueue {queueExclusive = True, queueName = T.pack tmpqname} mapM_ (\(xchange, bkey) -> bindQueue chan q (T.pack xchange) (T.pack bkey) >> printparam' "binding" (xchange ++ ":" ++ bkey)) (if null bindlist then [(x, "#")] else bindlist) return q -- | process received message myCallback :: Maybe Int -> Maybe String -> Maybe String -> [String] -> ThreadId -> (Message, Envelope) -> IO () myCallback anR filePr tempD addi tid m@(_, envi) = do let numstring = show $ envDeliveryTag envi hr $ "BEGIN " ++ numstring now <- getZonedTime callbackoptions <- printmsg m anR now either (\e -> printparam' "ERROR" (show (e :: X.SomeException)) >> rejectEnv envi True) return =<< X.try (optionalFileStuff m callbackoptions addi numstring tempD filePr tid) hr $ "END " ++ numstring -- | if the message is to be saved -- and maybe processed further optionalFileStuff :: (Message, Envelope) -> [String] -> [String] -> String -> Maybe String -> Maybe String -> ThreadId -> IO () optionalFileStuff (msg, envi) callbackoptions addi numstring tempD filePr tid = do path <- saveFile tempD numstring (msgBody msg) printparam "saved to" path let callbackcmdline = liftM2 (constructCallbackCmdLine callbackoptions addi numstring) filePr path printparam "calling" $ fmap unwords callbackcmdline maybe (ackEnv envi) (\c -> forkFinally (doProc numstring envi c) (either (throwTo tid) return) >> return ()) callbackcmdline -- | save message into temp file saveFile :: Maybe String -> String -> BL.ByteString -> IO (Maybe String) saveFile Nothing _ _ = return Nothing saveFile (Just tempD) numstring body = do (p, h) <- openBinaryTempFileWithDefaultPermissions tempD ("konsum-" ++ numstring ++ "-.tmp") BL.hPut h body hClose h return $ Just p -- | construct cmdline for callback script constructCallbackCmdLine :: [String] -> [String] -> String -> String -> String -> [String] constructCallbackCmdLine opts addi num exe path = exe : "-f" : path : "-n" : num : opts ++ addi -- | call callback script doProc :: String -> Envelope -> [String] -> IO () doProc numstring envi (exe:args) = do (_, _, _, processhandle) <- createProcess (proc exe args) {std_out = Inherit, std_err = Inherit} exitcode <- waitForProcess processhandle printparam' (numstring ++ " call returned") $ show exitcode case exitcode of ExitSuccess -> ackEnv envi ExitFailure _ -> rejectEnv envi True doProc _ _ _ = return ()