module Network.MQTT
(
connect
, MQTT
, disconnect
, reconnect
, onReconnect
, resubscribe
, MQTTConfig
, defaultConfig
, cHost
, cPort
, cClean
, cWill
, cUsername
, cPassword
, cKeepAlive
, cClientID
, cConnectTimeout
, cReconnPeriod
, cLogger
, subscribe
, unsubscribe
, publish
, send
, addHandler
, removeHandler
, awaitMsg
, awaitMsg'
, module Network.MQTT.Types
) where
import Control.Applicative (pure, (<$>), (<*>), (<$))
import Control.Concurrent
import Control.Exception hiding (handle)
import Control.Monad hiding (sequence_)
import Data.Attoparsec.ByteString (parseOnly)
import Data.Bits ((.&.))
import Data.ByteString (hGet, ByteString)
import qualified Data.ByteString as BS
import Data.Foldable (for_, sequence_, traverse_)
import Data.Maybe (isJust, fromJust)
import Data.Singletons (withSomeSing, SingI(..))
import Data.Singletons.Decide
import Data.Text (Text)
import Data.Traversable (for)
import Data.Typeable (Typeable)
import Data.Unique
import Data.Word
import Network
import Prelude hiding (sequence_)
import System.IO (Handle, hClose, hIsEOF, hSetBinaryMode)
import System.Timeout (timeout)
import Network.MQTT.Types
import Network.MQTT.Parser (mqttBody, mqttHeader)
import Network.MQTT.Encoding
import qualified Network.MQTT.Logger as L
data MQTT
= MQTT
{ config :: MQTTConfig
, handle :: MVar Handle
, handlers :: MVar [MessageHandler]
, topicHandlers :: MVar [TopicHandler]
, recvThread :: MVar ThreadId
, reconnectHandler :: MVar (IO ())
, keepAliveThread :: MVar ThreadId
, sendSem :: Maybe QSem
}
data TopicHandler
= TopicHandler
{ thTopic :: Topic
, thQoS :: QoS
, thHandler :: Topic -> ByteString -> IO ()
}
data MessageHandler where
MessageHandler :: SingI t
=> Unique
-> (Message t -> IO ())
-> MessageHandler
data MQTTConfig
= MQTTConfig
{ cHost :: HostName
, cPort :: PortNumber
, cClean :: Bool
, cWill :: Maybe Will
, cUsername :: Maybe Text
, cPassword :: Maybe Text
, cKeepAlive :: Maybe Int
, cClientID :: Text
, cConnectTimeout :: Maybe Int
, cReconnPeriod :: Maybe Int
, cLogger :: L.Logger
}
defaultConfig :: MQTTConfig
defaultConfig = MQTTConfig
{ cHost = "localhost"
, cPort = 1883
, cClean = True
, cWill = Nothing
, cUsername = Nothing
, cPassword = Nothing
, cKeepAlive = Nothing
, cClientID = "mqtt-haskell"
, cConnectTimeout = Nothing
, cReconnPeriod = Nothing
, cLogger = L.stdLogger
}
connect :: MQTTConfig -> IO (Maybe MQTT)
connect conf = do
h <- connectTo (cHost conf) (PortNumber $ cPort conf)
hSetBinaryMode h True
mqtt <- MQTT conf
<$> newMVar h
<*> newMVar []
<*> newMVar []
<*> newEmptyMVar
<*> newEmptyMVar
<*> newEmptyMVar
<*> for (cKeepAlive conf) (const (newQSem 0))
mCode <- handshake mqtt
if mCode == Just 0
then Just mqtt <$ do forkIO (recvLoop mqtt) >>= putMVar (recvThread mqtt)
forkIO (keepAliveLoop mqtt) >>=
putMVar (keepAliveThread mqtt)
addHandler mqtt (publishHandler mqtt)
else Nothing <$ hClose h
send :: MQTT -> Message t -> IO ()
send mqtt msg = do
logInfo mqtt $ "Sending " ++ show (toMsgType msg)
h <- readMVar (handle mqtt)
writeTo h msg
for_ (sendSem mqtt) signalQSem
handshake :: MQTT -> IO (Maybe Word8)
handshake mqtt = do
let timeout' = maybe (fmap Just) (timeout . (* 1000000))
(cConnectTimeout (config mqtt))
sendConnect mqtt
msg <- timeout' (getMessage mqtt) `catch` \e ->
Nothing <$ logError mqtt (show (e :: MQTTException) ++
" while waiting for CONNACK")
return $ case msg of
Just (SomeMessage (Message _ (MConnAck (ConnAck code)))) -> Just code
_ -> Nothing
sendConnect :: MQTT -> IO ()
sendConnect mqtt = send mqtt connect
where
conf = config mqtt
connect = Message
(Header False NoConfirm False)
(MConnect $ Connect
(cClean conf)
(cWill conf)
(MqttText $ cClientID conf)
(MqttText <$> cUsername conf)
(MqttText <$> cPassword conf)
(maybe 0 fromIntegral $ cKeepAlive conf))
awaitMsg :: SingI t => MQTT -> SMsgType t -> Maybe MsgID -> IO (Message t)
awaitMsg mqtt _ mMsgID = do
var <- newEmptyMVar
handlerID <- addHandler mqtt (putMVar var)
let wait = do
msg <- readMVar var
if isJust mMsgID
then if mMsgID == getMsgID (body msg)
then removeHandler mqtt handlerID >> return msg
else wait
else removeHandler mqtt handlerID >> return msg
wait
awaitMsg' :: SingI t => MQTT -> Maybe MsgID -> IO (Message t)
awaitMsg' mqtt mMsgID = awaitMsg mqtt sing mMsgID
addHandler :: SingI t => MQTT -> (Message t -> IO ()) -> IO Unique
addHandler mqtt handler = do
mhID <- newUnique
modifyMVar_ (handlers mqtt) $ \hs ->
return $ MessageHandler mhID handler : hs
return mhID
removeHandler :: MQTT -> Unique -> IO ()
removeHandler mqtt mhID = modifyMVar_ (handlers mqtt) $ \hs ->
return $ filter (\(MessageHandler mhID' _) -> mhID' /= mhID) hs
subscribe :: MQTT -> QoS -> Topic -> (Topic -> ByteString -> IO ())
-> IO QoS
subscribe mqtt qos topic handler = do
qosGranted <- sendSubscribe mqtt qos topic
modifyMVar_ (topicHandlers mqtt) $ \hs ->
return $ TopicHandler topic qosGranted handler : hs
return qosGranted
sendSubscribe :: MQTT -> QoS -> Topic -> IO QoS
sendSubscribe mqtt qos topic = do
msgID <- fromIntegral . hashUnique <$> newUnique
send mqtt $ Message
(Header False Confirm False)
(MSubscribe $ Subscribe
msgID
[(topic, qos)])
msg <- awaitMsg mqtt SSUBACK (Just msgID)
case msg of
(Message _ (MSubAck (SubAck _ [qosGranted]))) -> return qosGranted
_ -> fail $ "Received invalid message as response to subscribe: "
++ show (toMsgType msg)
unsubscribe :: MQTT -> Topic -> IO ()
unsubscribe mqtt topic = do
modifyMVar_ (topicHandlers mqtt) $ return . filter ((== topic) . thTopic)
msgID <- fromIntegral . hashUnique <$> newUnique
send mqtt $ Message
(Header False Confirm False)
(MUnsubscribe $ Unsubscribe msgID [topic])
void $ awaitMsg mqtt SUNSUBACK (Just msgID)
publish :: MQTT -> QoS -> Bool -> Topic -> ByteString -> IO ()
publish mqtt qos retain topic body = do
msgID <- if qos > NoConfirm
then Just . fromIntegral . hashUnique <$> newUnique
else return Nothing
send mqtt $ Message
(Header False qos retain)
(MPublish $ Publish topic msgID body)
case qos of
NoConfirm -> return ()
Confirm -> void $ awaitMsg mqtt SPUBACK msgID
Handshake -> do
void $ awaitMsg mqtt SPUBREC msgID
send mqtt $ Message
(Header False Confirm False)
(MPubRel $ SimpleMsg (fromJust msgID))
void $ awaitMsg mqtt SPUBCOMP msgID
disconnect :: MQTT -> IO ()
disconnect mqtt = do
h <- takeMVar $ handle mqtt
writeTo h $
Message
(Header False NoConfirm False)
MDisconnect
readMVar (recvThread mqtt) >>= killThread
readMVar (keepAliveThread mqtt) >>= killThread
hClose h
reconnect :: MQTT -> Int -> IO ()
reconnect mqtt period = do
_ <- takeMVar (handle mqtt)
logInfo mqtt "Reconnecting..."
handleVar <- newEmptyMVar
go (mqtt { handle = handleVar })
readMVar handleVar >>= putMVar (handle mqtt)
tryReadMVar (reconnectHandler mqtt) >>= traverse_ (void . forkIO)
logInfo mqtt "Reconnect successfull"
where
go mqtt' = do
let conf = config mqtt
connectTo (cHost conf) (PortNumber $ cPort conf)
>>= putMVar (handle mqtt')
mCode <- handshake mqtt'
unless (mCode == Just 0) $ do
void $ takeMVar (handle mqtt')
threadDelay (period * 10^6)
go mqtt'
`catch`
\e -> do
logWarning mqtt $ "reconnect: " ++ show (e :: IOException)
threadDelay (period * 10^6)
go mqtt'
onReconnect :: MQTT -> IO () -> IO ()
onReconnect mqtt io = do
let mvar = reconnectHandler mqtt
empty <- isEmptyMVar mvar
unless empty (void $ takeMVar mvar)
putMVar mvar io
resubscribe :: MQTT -> IO [QoS]
resubscribe mqtt = do
ths <- readMVar (topicHandlers mqtt)
mapM (\th -> sendSubscribe mqtt (thQoS th) (thTopic th)) ths
maybeReconnect :: MQTT -> IO ()
maybeReconnect mqtt = do
catch
(readMVar (handle mqtt) >>= hClose)
(const (pure ()) :: IOException -> IO ())
for_ (cReconnPeriod $ config mqtt) $ reconnect mqtt
logInfo :: MQTT -> String -> IO ()
logInfo mqtt = L.logInfo (cLogger (config mqtt))
logWarning :: MQTT -> String -> IO ()
logWarning mqtt = L.logWarning (cLogger (config mqtt))
logError :: MQTT -> String -> IO ()
logError mqtt = L.logError (cLogger (config mqtt))
recvLoop :: MQTT -> IO ()
recvLoop mqtt = forever $ do
h <- readMVar (handle mqtt)
eof <- hIsEOF h
if eof
then do
logError mqtt "EOF in recvLoop"
maybeReconnect mqtt
else getMessage mqtt >>= dispatchMessage mqtt
`catches`
[ Handler $ \e -> do
logError mqtt $ "recvLoop: Caught " ++ show (e :: IOException)
maybeReconnect mqtt
, Handler $ \e ->
logWarning mqtt $ "recvLoop: Caught " ++ show (e :: MQTTException)
]
dispatchMessage :: MQTT -> SomeMessage -> IO ()
dispatchMessage mqtt (SomeMessage (msg :: Message t)) =
readMVar (handlers mqtt) >>= mapM_ applyMsg
where
typeSing :: SMsgType t
typeSing = toSMsgType msg
applyMsg :: MessageHandler -> IO ()
applyMsg (MessageHandler _ (handler :: Message t' -> IO ())) =
case typeSing %~ (sing :: SMsgType t') of
Proved Refl -> void $ forkIO $ handler msg
Disproved _ -> return ()
keepAliveLoop :: MQTT -> IO ()
keepAliveLoop mqtt =
sequence_ (loop <$> cKeepAlive (config mqtt) <*> sendSem mqtt)
where
loop period sem = forever $ do
rslt <- timeout (period * 1000000) $ waitQSem sem
case rslt of
Nothing -> (do send mqtt $ Message
(Header False NoConfirm False)
MPingReq
void $ awaitMsg mqtt SPINGRESP Nothing)
`catch`
(\e -> logError mqtt $ "keepAliveLoop: " ++ show (e :: IOException))
Just _ -> return ()
publishHandler :: MQTT -> Message PUBLISH -> IO ()
publishHandler mqtt (Message header (MPublish body)) = do
case (qos header, pubMsgID body) of
(Confirm, Just msgid) ->
send mqtt $ Message
(Header False NoConfirm False)
(MPubAck $ SimpleMsg msgid)
(Handshake, Just msgid) -> do
send mqtt $ Message
(Header False NoConfirm False)
(MPubRec $ SimpleMsg msgid)
void $ awaitMsg mqtt SPUBREL (Just msgid)
send mqtt $ Message
(Header False NoConfirm False)
(MPubComp $ SimpleMsg msgid)
_ -> return ()
callbacks <- filter (matches (topic body) . thTopic)
<$> readMVar (topicHandlers mqtt)
for_ callbacks $ \th -> thHandler th (topic body) (payload body)
getMessage :: MQTT -> IO SomeMessage
getMessage mqtt = do
h <- readMVar (handle mqtt)
headerByte <- hGet' h 1
remaining <- getRemaining h 0
rest <- hGet' h remaining
let parseRslt = do
(mType, header) <- parseOnly mqttHeader headerByte
withSomeSing mType $ \sMsgType ->
parseOnly
(SomeMessage . Message header
<$> mqttBody header sMsgType (fromIntegral remaining))
rest
case parseRslt of
Left err -> logError mqtt ("Error while parsing: " ++ err) >>
throw (ParseError err)
Right msg -> msg <$
logInfo mqtt ("Received " ++ show (toMsgType' msg))
getRemaining :: Handle -> Int -> IO Int
getRemaining h n = go n 1
where
go acc fac = do
b <- getByte h
let acc' = acc + (b .&. 127) * fac
if b .&. 128 == 0
then return acc'
else go acc' (fac * 128)
getByte :: Handle -> IO Int
getByte h = fromIntegral . BS.head <$> hGet' h 1
hGet' :: Handle -> Int -> IO BS.ByteString
hGet' h n = do
bs <- hGet h n
if BS.length bs < n
then throw EOF
else return bs
data MQTTException
= EOF
| ParseError String
deriving (Show, Typeable)
instance Exception MQTTException where