{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} module Main where import Control.Concurrent (threadDelay) import Control.Concurrent.Async (async, link) import Control.Concurrent.STM (TChan, atomically, newTChanIO, readTChan, writeTChan) import Control.Exception (Handler (..), IOException, catches) import Control.Monad (foldM_, forever, when, (<=<)) import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Lazy.Char8 as BCS import qualified Data.IORef as R import Data.Maybe (fromJust) import qualified Data.Text as T import qualified Data.Text.IO as TIO import Data.Word (Word32) import Network.MQTT.Client import Network.MQTT.Topic (Filter, mkFilter, unTopic) import Network.MQTT.Types (ConnACKFlags (..), SessionReuse (..), qosFromInt) import Network.URI import Options.Applicative (Parser, argument, auto, eitherReader, execParser, fullDesc, help, helper, info, long, maybeReader, metavar, option, progDesc, short, showDefault, some, switch, value, (<**>)) import System.IO (stdout) import Text.Read (readMaybe) data Msg = Msg Topic BL.ByteString [Property] data Options = Options { optUri :: URI , optHideProps :: Bool , optSessionTime :: Word32 , optVerbose :: Bool , optQoS :: QoS , optSubResume :: Bool , optTopics :: [Filter] } options :: Parser Options options = Options <$> option (maybeReader parseURI) (long "mqtt-uri" <> short 'u' <> showDefault <> value (fromJust $ parseURI "mqtt://localhost/") <> help "mqtt broker URI") <*> switch (short 'p' <> help "hide properties") <*> option auto (long "session-timeout" <> showDefault <> value 300 <> help "mqtt session timeout (0 == clean)") <*> switch (short 'v' <> long "verbose" <> help "enable debug logging") <*> option (eitherReader pQoS) (long "qos" <> short 'q' <> showDefault <> value QoS0 <> help "QoS level (0-2)") <*> switch (long "always-subscribe" <> help "subscribe even when resuming a connection") <*> some (argument (maybeReader (mkFilter . T.pack)) (metavar "topics...")) where pQoS = maybe (Left "Only QoS 0, 1, and 2 are supported") Right . (qosFromInt <=< readMaybe) printer :: TChan Msg -> Bool -> Bool -> IO () printer ch showProps verbose = forever $ do (Msg t m props) <- atomically $ readTChan ch TIO.putStr $ mconcat [unTopic t, " → "] BL.hPut stdout m putStrLn "" mapM_ (putStrLn . (" " <>) . drop 4 . show) (filter (viewableProp showProps verbose) props) where viewableProp False _ _ = False viewableProp _ True _ = True viewableProp _ _ (PropTopicAlias _) = False viewableProp _ _ _ = True run :: Options -> IO () run Options{..} = do ch <- newTChanIO async (printer ch (not optHideProps) optVerbose) >>= link uref <- R.newIORef optUri forever $ catches (go ch uref) [Handler (\(ex :: MQTTException) -> handler (show ex)), Handler (\(ex :: IOException) -> handler (show ex))] where go ch uref = do uri <- R.readIORef uref when optVerbose $ putStrLn ("Connecting to " <> show uri) mc <- connectURI mqttConfig{_msgCB=SimpleCallback (showme ch), _protocol=Protocol50, _cleanSession=optSessionTime == 0, _connProps=[PropReceiveMaximum 65535, PropTopicAliasMaximum 10000, PropSessionExpiryInterval optSessionTime, PropRequestResponseInformation 1, PropRequestProblemInformation 1]} uri (ConnACKFlags sp _ props) <- connACK mc when (optSessionTime > 0) $ updateURI uref props when optVerbose $ putStrLn (if sp == ExistingSession then "" else "") when optVerbose $ putStrLn ("Properties: " <> show props) when (sp == NewSession || optSubResume) $ do subres <- subscribe mc [(t, subOptions{_subQoS=optQoS}) | t <- optTopics] mempty when optVerbose $ print subres print =<< waitForClient mc showme ch _ t m props = atomically $ writeTChan ch $ Msg t m props handler e = putStrLn ("ERROR: " <> e) >> threadDelay 1000000 updateURI uref = foldM_ up () where up _ (PropAssignedClientIdentifier i) = R.modifyIORef uref (\u -> u{uriFragment='#':BCS.unpack i}) up a _ = pure a main :: IO () main = run =<< execParser opts where opts = info (options <**> helper) ( fullDesc <> progDesc "Watch stuff.")