{-# LANGUAGE CPP #-} -- generic AMQP publisher import Control.Concurrent (threadDelay) import qualified Control.Exception as X import Control.Monad (forever) import qualified Data.ByteString.Lazy.Char8 as BL #if MIN_VERSION_hinotify(0,3,10) import qualified Data.ByteString.Char8 as BS #endif import Data.List (isSuffixOf) import Data.Maybe import qualified Data.Text as T import Data.Time import Data.Time.Clock.POSIX import Data.Version (showVersion) import Data.Word (Word64) import Magic import Network.AMQP import Network.AMQP.Types import Network.AMQP.Utils.Connection import Network.AMQP.Utils.Helpers import Network.AMQP.Utils.Options import Paths_amqp_utils (version) import System.Environment import System.INotify import qualified System.Posix.Files as F main :: IO () main = do hr "starting" args <- getArgs >>= parseargs "agitprop" printparam' "client version" $ "amqp-utils " ++ (showVersion version) printparam' "routing key" $ rKey args isDir <- F.getFileStatus (inputFile args) >>= return . F.isDirectory if isDir then printparam' "hotfolder" $ inputFile args else printparam' "input file" $ (inputFile args) ++ if (lineMode args) then " (line-by-line)" else "" (conn, chan) <- connect args printparam' "confirm mode" $ show $ confirm args if (confirm args) then do confirmSelect chan False addConfirmationListener chan confirmCallback else return () let publishOneMsg = publishOneMsg' chan args X.catch (if isDir then do inotify <- initINotify wd <- addWatch inotify [CloseWrite, MoveIn] #if MIN_VERSION_hinotify(0,3,10) (BS.pack (inputFile args)) #else (inputFile args) #endif (handleEvent publishOneMsg (suffix args) (inputFile args)) hr $ "BEGIN watching " ++ (inputFile args) _ <- forever $ threadDelay 1000000 removeWatch wd hr $ "END watching " ++ (inputFile args) else do hr $ "BEGIN sending" messageFile <- BL.readFile (inputFile args) if (lineMode args) then mapM_ (publishOneMsg Nothing) (BL.lines messageFile) else publishOneMsg (Just (inputFile args)) messageFile hr "END sending") (\exception -> printparam' "exception" $ show (exception :: X.SomeException)) -- all done. wait and close. if (confirm args) then waitForConfirms chan >>= (printparam' "confirmed") . show else return () closeConnection conn hr "connection closed" -- | The handler for publisher confirms confirmCallback :: (Word64, Bool, AckType) -> IO () confirmCallback (deliveryTag, isAll, ackType) = printparam' "confirmed" ((show deliveryTag) ++ (if isAll then " all " else " this ") ++ (show ackType)) -- | Hotfolder event handler handleEvent :: (Maybe String -> BL.ByteString -> IO ()) -> [String] -> String -> Event -> IO () -- just handle closewrite and movedin events #if MIN_VERSION_hinotify(0,3,10) handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ (BS.unpack x)) handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ (BS.unpack x)) #else handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ x) handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ x) #endif handleEvent _ _ _ _ = return () -- | Hotfolder file handler handleFile :: (Maybe String -> BL.ByteString -> IO ()) -> [String] -> FilePath -> IO () handleFile _ _ ('.':_) = return () -- ignore hidden files handleFile f s@(_:_) x = if any (flip isSuffixOf x) s then handleFile f [] x else return () handleFile f [] x = X.catch (BL.readFile x >>= f (Just x)) (\exception -> printparam' "exception in handleFile" $ show (exception :: X.SomeException)) -- | Publish one message with our settings publishOneMsg' :: Channel -> Args -> Maybe String -> BL.ByteString -> IO () publishOneMsg' c a fn f = do printparam "sending" fn (mtype, mencoding) <- if (magic a) && isJust fn then do m <- magicOpen [MagicMimeType] magicLoadDefault m t <- magicFile m (fromJust fn) magicSetFlags m [MagicMimeEncoding] e <- magicFile m (fromJust fn) return (Just (T.pack t), Just (T.pack e)) else return ((contenttype a), (contentencoding a)) now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds r <- publishMsg c (T.pack $ currentExchange a) (T.pack $ rKey a) newMsg { msgBody = f , msgDeliveryMode = persistent a , msgTimestamp = Just now , msgID = msgid a , msgType = msgtype a , msgUserID = userid a , msgApplicationID = appid a , msgClusterID = clusterid a , msgContentType = mtype , msgContentEncoding = mencoding , msgReplyTo = replyto a , msgPriority = prio a , msgCorrelationID = corrid a , msgExpiration = msgexp a , msgHeaders = substheader (fnheader a) fn $ msgheader a } printparam "sent" $ fmap show r where substheader :: [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable substheader (s:r) (Just fname) old = substheader r (Just fname) (addheader old (s ++ "=" ++ fname)) substheader _ _ old = old