-- generic amqp consumer -- -- compile: -- ghc -O2 -threaded --make konsum.hs -- -- run: -- ./konsum -- ./konsum -o amqp.example.com -p 5673 -T -k amqp-key.pem -c amqp-crt.pem -y vhost -x exchange -X./callback.sh -a -c -a callback.config.sh -f 2 routing.key.# 500 -- ./konsum -o amqp.example.com -U user -P pass -q queue -t -- -- custom CA cert via enviroment: -- $ env SYSTEM_CERTIFICATE_PATH=/etc/amqp/cacert.crt ./konsum -T -y vhost -x exchange -- -- Stop with ^C import Paths_amqp_utils ( version ) import Data.Version ( showVersion ) import System.Environment import System.Console.GetOpt import System.IO import System.Process import System.Exit import Control.Monad import Control.Concurrent import qualified Control.Exception as X import Data.List import Data.Maybe import qualified Data.Text as T import qualified Data.ByteString as B import qualified Data.Map as M import Network.AMQP import Network.AMQP.Types import Network.Connection import Network.TLS import Network.TLS.Extra import System.X509 import qualified Data.ByteString.Lazy.Char8 as BL import Data.Time import Data.Time.Clock.POSIX import Data.Default.Class main :: IO () main = do hr "starting" tid <- myThreadId args <- getArgs >>= parseargs let addiArgs = reverse $ additionalArgs args printparam' "client version" $ "amqp-utils " ++ (showVersion version) printparam' "server" $ server args printparam' "port" $ show $ port args printparam' "vhost" $ vHost args globalCertificateStore <- getSystemCertificateStore let myTLS = TLSSettings (defaultParamsClient "" B.empty) { clientShared = def { sharedValidationCache = def , sharedCAStore = globalCertificateStore } , clientSupported = def { supportedCiphers = ciphersuite_all } , clientHooks = def { onCertificateRequest = myCert (cert args) (key args) } } conn <- openConnection'' defaultConnectionOpts { coAuth = [ SASLMechanism (T.pack "EXTERNAL") B.empty Nothing , plain (T.pack (user args)) (T.pack (pass args)) ] , coVHost = T.pack $ vHost args , coTLSSettings = if (tls args ) then Just ( TLSCustom myTLS ) else Nothing , coServers = [ (server args, fromIntegral $ port args) ] , coHeartbeatDelay = fmap fromIntegral $ heartBeat args } chan <- openChannel conn addChannelExceptionHandler chan (\exception -> closeConnection conn >> printparam' "exiting" (show exception) >> killThread tid) -- set prefetch printparam' "prefetch" $ show $ preFetch args qos chan 0 (fromIntegral $ preFetch args) False -- attach to given queue? or build exclusive queue and bind it? queue <- maybe (tempQueue chan (exChange args) (bindingKey args)) (return) (fmap T.pack (qName args)) printparam' "queue name" $ T.unpack queue printparam "head chars" $ fmap show $ anRiss args printparam "temp dir" $ tempDir args printparam "callback" $ fileProcess args printparam "callback args" $ listToMaybeUnwords addiArgs -- subscribe to the queue ctag <- consumeMsgs chan queue Ack (myCallback (anRiss args) (fileProcess args) (tempDir args) addiArgs tid) printparam' "consumer tag" $ T.unpack ctag hr "entering main loop" X.catch (forever $ threadDelay 1000000) (\exception -> printparam' "exception" $ show (exception :: X.SomeException)) closeConnection conn -- -- noop sharedValidationCache, handy when debugging -- noValidation :: ValidationCache -- noValidation = ValidationCache -- (\_ _ _ -> return ValidationCachePass) -- (\_ _ _ -> return ()) -- client certificate myCert :: Maybe FilePath -> Maybe FilePath -> t -> IO (Maybe Credential) myCert (Just cert') (Just key') _ = do result <- credentialLoadX509 cert' key' case result of Left x -> printparam' "ERROR" x >> return Nothing Right x -> return $ Just x myCert _ _ _ = return Nothing -- exclusive temp queue tempQueue :: Channel -> String -> String -> IO T.Text tempQueue chan xchange bkey = do (q, _, _) <- declareQueue chan newQueue { queueExclusive = True } bindQueue chan q (T.pack xchange) (T.pack bkey) printparam' "exchange" xchange printparam' "binding key" bkey return q -- log cmdline options listToMaybeUnwords :: [String] -> Maybe String listToMaybeUnwords [] = Nothing listToMaybeUnwords x = Just $ unwords x -- Strings or ByteStrings with label, oder nothing at all printwithlabel :: String -> Maybe (IO ()) -> IO () printwithlabel _ Nothing = return () printwithlabel labl (Just i) = do mapM_ putStr [ " --- ", labl, ": " ] i hFlush stdout -- optional parameters printparam :: String -> Maybe String -> IO () printparam labl ms = printwithlabel labl $ fmap putStrLn ms -- required parameters printparam' :: String -> String -> IO () printparam' d s = printparam d (Just s) -- head chars of body printbody :: (String, Maybe BL.ByteString) -> IO () printbody (labl, ms) = printwithlabel labl $ fmap (\s -> putStrLn "" >> BL.putStrLn s) ms -- options for everybody data Args = Args { server :: String , port :: Int , tls :: Bool , vHost :: String , exChange :: String , bindingKey :: String , anRiss :: Maybe Int , fileProcess :: Maybe String , qName :: Maybe String , cert :: Maybe String , key :: Maybe String , user :: String , pass :: String , preFetch :: Int , heartBeat :: Maybe Int , tempDir :: Maybe String , additionalArgs :: [String] } instance Default Args where def = Args "localhost" 5672 False "/" "default" "#" Nothing Nothing Nothing Nothing Nothing "guest" "guest" 1 Nothing Nothing [] callback :: String callback = "/usr/lib/haskell-amqp-utils/callback" options :: [OptDescr (Args -> Args)] options = [ Option [ 'o' ] [ "server" ] (ReqArg (\s o -> o { server = s }) "SERVER") ("AMQP Server (default: " ++ server def ++ ")") , Option [ 'y' ] [ "vhost" ] (ReqArg (\s o -> o { vHost = s }) "VHOST") ("AMQP Virtual Host (default: " ++ vHost def ++ ")") , Option [ 'x' ] [ "exchange" ] (ReqArg (\s o -> o { exChange = s }) "EXCHANGE") ("AMQP Exchange (default: " ++ exChange def ++ ")") , Option [ 'X' ] [ "execute" ] (OptArg (\s o -> o { fileProcess = Just (fromMaybe callback s) , tempDir = Just (fromMaybe "/tmp" (tempDir o)) }) "EXE") ("Callback Script File (implies -t) (-X without arg: " ++ callback ++ ")") , Option [ 'p' ] [ "port" ] (ReqArg (\s o -> o { port = read s }) "PORT") ("Server Port Number (default: " ++ show (port def) ++ ")") , Option [ 'T' ] [ "tls" ] (NoArg (\o -> o { tls = not (tls o) })) ("Toggle TLS (default: " ++ show (tls def) ++ ")") , Option [ 'q' ] [ "queue" ] (ReqArg (\s o -> o { qName = Just s }) "QUEUENAME") "Ignore Exchange and bind to existing Queue" , Option [ 'c' ] [ "cert" ] (ReqArg (\s o -> o { cert = Just s }) "CERTFILE") ("TLS Client Certificate File") , Option [ 'k' ] [ "key" ] (ReqArg (\s o -> o { key = Just s }) "KEYFILE") ("TLS Client Private Key File") , Option [ 'U' ] [ "user" ] (ReqArg (\s o -> o { user = s }) "USERNAME") ("Username for Auth") , Option [ 'P' ] [ "pass" ] (ReqArg (\s o -> o { pass = s }) "PASSWORD") ("Password for Auth") , Option [ 'f' ] [ "prefetch" ] (ReqArg (\s o -> o { preFetch = read s }) "INT") ("Prefetch count. (0=unlimited, 1=off, default: " ++ show (preFetch def) ++ ")") , Option [ 'r' ] [ "bindingkey" ] (ReqArg (\s o -> o { bindingKey = s }) "BINDINGKEY") ("AMQP binding key (default: " ++ bindingKey def ++ ")") , Option [ 's' ] [ "heartbeats" ] (ReqArg (\s o -> o { heartBeat = (Just (read s)) }) "INT") "heartbeat interval (0=disable, default: set by server)" , Option [ 't' ] [ "tempdir", "target" ] (OptArg (\s o -> o { tempDir = Just (fromMaybe "/tmp" s) }) "DIR") "tempdir (default: no file creation, -t without arg: /tmp)" , Option [ 'a' ] [ "args" ] (ReqArg (\s o -> o { additionalArgs = s : (additionalArgs o) }) "ARG") "additional argument for -X callback" ] usage :: String usage = "\n\ \amqp-utils " ++ (showVersion version) ++ "\n\n\ \Usage:\n\ \konsum [options] [bindingkey] [body byte count to show]\n\n\ \Options:" -- special options parsearg :: Args -> String -> Args parsearg args arg -- binding keys | elem '.' arg = args { bindingKey = arg } | elem '#' arg = args { bindingKey = arg } | elem '*' arg = args { bindingKey = arg } -- number -> show how many bytes | all (flip elem ['0' .. '9']) arg = args { anRiss = Just (read arg :: Int) } | otherwise = args -- apply both options and parsearg onto argstring parseargs :: [String] -> IO Args parseargs argstring = case getOpt Permute options argstring of (o, n, []) -> return $ foldl parsearg (foldl (flip id) def o) n (_, _, errs) -> ioError $ userError $ concat errs ++ usageInfo usage options -- process received message myCallback :: Maybe Int -> Maybe String -> Maybe String -> [String] -> ThreadId -> (Message, Envelope) -> IO () myCallback anR filePr tempD addi tid m@(_, envi) = do let numstring = show $ envDeliveryTag envi hr $ "BEGIN " ++ numstring now <- getZonedTime callbackoptions <- printmsg m anR now either (\e -> printparam' "ERROR" (show (e :: X.SomeException)) >> rejectEnv envi True) return =<< X.try (optionalFileStuff m callbackoptions addi numstring tempD filePr tid) hr $ "END " ++ numstring -- if the message is to be saved -- and maybe processed further optionalFileStuff :: (Message, Envelope) -> [String] -> [String] -> String -> Maybe String -> Maybe String -> ThreadId -> IO () optionalFileStuff (msg, envi) callbackoptions addi numstring tempD filePr tid = do path <- saveFile tempD numstring (msgBody msg) printparam "saved to" path let callbackcmdline = liftM2 (constructCallbackCmdLine callbackoptions addi numstring) filePr path printparam "calling" $ fmap unwords callbackcmdline maybe (ackEnv envi) (\c -> forkFinally (doProc numstring envi c) (either (throwTo tid) return) >> return ()) callbackcmdline -- save message into temp file saveFile :: Maybe String -> String -> BL.ByteString -> IO (Maybe String) saveFile Nothing _ _ = return Nothing saveFile (Just tempD) numstring body = do (p, h) <- openBinaryTempFileWithDefaultPermissions tempD ("konsum-" ++ numstring ++ "-.tmp") BL.hPut h body hClose h return $ Just p -- construct cmdline for callback script constructCallbackCmdLine :: [String] -> [String] -> String -> String -> String -> [String] constructCallbackCmdLine opts addi num exe path = exe : "-f" : path : "-n" : num : opts ++ addi -- call callback script doProc :: String -> Envelope -> [String] -> IO () doProc numstring envi (exe : args) = do (_, _, _, processhandle) <- createProcess (proc exe args) { std_out = Inherit , std_err = Inherit } exitcode <- waitForProcess processhandle printparam' (numstring ++ " call returned") $ show exitcode case exitcode of ExitSuccess -> ackEnv envi ExitFailure _ -> rejectEnv envi True doProc _ _ _ = return () -- log marker hr :: String -> IO () hr x = putStrLn hr' >> hFlush stdout where hr' = take 72 $ (take 25 hr'') ++ " " ++ x ++ " " ++ hr'' hr'' = repeat '-' formatheaders :: ((T.Text, FieldValue) -> [a]) -> FieldTable -> [a] formatheaders f (FieldTable ll) = concat $ map f $ M.toList ll -- log formatting fieldshow :: (T.Text, FieldValue) -> String fieldshow (k, v) = "\n " ++ T.unpack k ++ ": " ++ valueshow v -- callback cmdline formatting fieldshow' :: (T.Text, FieldValue) -> [String] fieldshow' (k, v) = [ "-h", T.unpack k ++ "=" ++ valueshow v ] -- showing a FieldValue valueshow :: FieldValue -> String valueshow (FVString value) = T.unpack value valueshow (FVInt32 value) = show value valueshow value = show value -- skip showing body head if binary type isimage :: Maybe String -> Bool isimage Nothing = False isimage (Just ctype) | isPrefixOf "application/xml" ctype = False | isPrefixOf "application/json" ctype = False | otherwise = any (flip isPrefixOf ctype) [ "application", "image" ] -- show the first bytes of message body anriss' :: Maybe Int -> BL.ByteString -> BL.ByteString anriss' x = case x of Nothing -> id Just y -> BL.take (fromIntegral y) -- callback cmdline with optional parameters printopt :: (String, Maybe String) -> [String] printopt (_, Nothing) = [] printopt (opt, Just s) = [ opt, s ] -- prints header and head on STDOUT and returns cmdline options to callback printmsg :: (Message, Envelope) -> Maybe Int -> ZonedTime -> IO [String] printmsg (msg, envi) anR now = do mapM_ (uncurry printparam) [ ("routing key", rkey) , ("message-id", messageid) , ("headers", headers) , ("content-type", contenttype) , ("content-encoding", contentencoding) , ("redelivered", redeliv) , ("timestamp", timestamp'') , ("time now", now') , ("size", size) , ("priority", prio) , ("type", mtype) , ("user id", muserid) , ("application id", mappid) , ("cluster id", mclusterid) , ("reply to", mreplyto) , ("correlation id", mcorrid) , ("expiration", mexp) , ("delivery mode", mdelivmode) ] printbody (label, anriss) return $ concat (map printopt [ ("-r", rkey) , ("-m", contenttype) , ("-e", contentencoding) , ("-i", messageid) , ("-t", timestamp) , ("-p", prio) ] ++ maybeToList headers') where headers = fmap (formatheaders fieldshow) $ msgHeaders msg headers' = fmap (formatheaders fieldshow') $ msgHeaders msg body = msgBody msg anriss = if isimage contenttype then Nothing else Just (anriss' anR body) :: Maybe BL.ByteString anriss'' = maybe "" (\a -> "first " ++ (show a) ++ " bytes of ") anR label = anriss'' ++ "body" contenttype = fmap T.unpack $ msgContentType msg contentencoding = fmap T.unpack $ msgContentEncoding msg rkey = Just . T.unpack $ envRoutingKey envi messageid = fmap T.unpack $ msgID msg prio = fmap show $ msgPriority msg mtype = fmap show $ msgType msg muserid = fmap show $ msgUserID msg mappid = fmap show $ msgApplicationID msg mclusterid = fmap show $ msgClusterID msg mreplyto = fmap show $ msgReplyTo msg mcorrid = fmap show $ msgCorrelationID msg mexp = fmap show $ msgExpiration msg mdelivmode = fmap show $ msgDeliveryMode msg size = Just . show $ BL.length body redeliv = if envRedelivered envi then Just "YES" else Nothing tz = zonedTimeZone now nowutc = zonedTimeToUTCFLoor now msgtime = msgTimestamp msg msgtimeutc = fmap (posixSecondsToUTCTime . realToFrac) msgtime timestamp = fmap show msgtime timediff = fmap (difftime nowutc) msgtimeutc now' = case timediff of Just "now" -> Nothing _ -> showtime tz $ Just nowutc timestamp' = showtime tz msgtimeutc timestamp'' = liftM3 (\a b c -> a ++ " (" ++ b ++ ") (" ++ c ++ ")") timestamp timestamp' timediff zonedTimeToUTCFLoor :: ZonedTime -> UTCTime zonedTimeToUTCFLoor x = posixSecondsToUTCTime $ realToFrac ((floor . utcTimeToPOSIXSeconds . zonedTimeToUTC) x :: Timestamp) showtime :: TimeZone -> Maybe UTCTime -> Maybe String showtime tz = fmap (show . (utcToZonedTime tz)) difftime :: UTCTime -> UTCTime -> String difftime now msg | now == msg = "now" | now > msg = diff ++ " ago" | otherwise = diff ++ " in the future" where diff = show (diffUTCTime now msg)