{-|
Module      : Network.MQTT.Client.
Description : An MQTT client.
Copyright   : (c) Dustin Sallings, 2019
License     : BSD3
Maintainer  : dustin@spy.net
Stability   : experimental

An MQTT protocol client

Both
<http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html MQTT 3.1.1>
and
<https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html MQTT 5.0>
are supported over plain TCP, TLS, WebSockets and Secure WebSockets.
-}

{-# LANGUAGE NamedFieldPuns    #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards   #-}

module Network.MQTT.Client (
  -- * Configuring the client.
  MQTTConfig(..), MQTTClient, QoS(..), Topic, mqttConfig,  mkLWT, LastWill(..),
  ProtocolLevel(..), Property(..), SubOptions(..), subOptions, MessageCallback(..),
  -- * Running and waiting for the client.
  waitForClient,
  connectURI, isConnected,
  disconnect, normalDisconnect,
  -- * General client interactions.
  subscribe, unsubscribe, publish, publishq, pubAliased,
  svrProps, connACK, MQTTException(..),
  -- * Low-level bits
  runMQTTConduit, MQTTConduit, isConnectedSTM, connACKSTM,
  registerCorrelated, unregisterCorrelated
  ) where

import           Control.Concurrent         (myThreadId, threadDelay)
import           Control.Concurrent.Async   (Async, async, asyncThreadId, cancelWith, link, race_, wait, waitAnyCancel)
import           Control.Concurrent.STM     (STM, TChan, TVar, atomically, check, modifyTVar', newTChan, newTChanIO,
                                             newTVarIO, orElse, readTChan, readTVar, readTVarIO, registerDelay, retry,
                                             writeTChan, writeTVar)
import           Control.DeepSeq            (force)
import qualified Control.Exception          as E
import           Control.Monad              (forever, guard, unless, void, when)
import           Control.Monad.IO.Class     (liftIO)
import           Data.Bifunctor             (first)
import qualified Data.ByteString.Char8      as BCS
import qualified Data.ByteString.Lazy       as BL
import qualified Data.ByteString.Lazy.Char8 as BC
import           Data.Conduit               (ConduitT, Void, await, runConduit, yield, (.|))
import           Data.Conduit.Attoparsec    (conduitParser)
import qualified Data.Conduit.Combinators   as C
import           Data.Conduit.Network       (AppData, appSink, appSource, clientSettings, runTCPClient)
import           Data.Conduit.Network.TLS   (runTLSClient, tlsClientConfig, tlsClientTLSSettings)
import           Data.Map.Strict            (Map)
import qualified Data.Map.Strict            as Map
import           Data.Maybe                 (fromMaybe)
import           Data.Text                  (Text)
import qualified Data.Text.Encoding         as TE
import           Data.Word                  (Word16)
import           GHC.Conc                   (labelThread)
import           Network.Connection         (ConnectionParams (..), TLSSettings (..), connectTo, connectionClose,
                                             connectionGetChunk, connectionPut, initConnectionContext)
import           Network.URI                (URI (..), unEscapeString, uriPort, uriRegName, uriUserInfo)
import qualified Network.WebSockets         as WS
import           Network.WebSockets.Stream  (makeStream)
import           System.IO.Error            (catchIOError, isEOFError)
import           System.Timeout             (timeout)

import           Network.MQTT.Topic         (Filter, Topic)
import           Network.MQTT.Types         as T

data ConnState = Starting
               | Connected
               | Stopped
               | Disconnected
               | DiscoErr DisconnectRequest
               | ConnErr ConnACKFlags deriving (Eq, Show)

data DispatchType = DSubACK | DUnsubACK | DPubACK | DPubREC | DPubREL | DPubCOMP
  deriving (Eq, Show, Ord, Enum, Bounded)


-- | Callback invoked on each incoming subscribed message.
data MessageCallback = NoCallback
  | SimpleCallback (MQTTClient -> Topic -> BL.ByteString -> [Property] -> IO ())
  | LowLevelCallback (MQTTClient -> PublishRequest -> IO ())

-- | The MQTT client.
--
-- See 'connectURI' for the most straightforward example.
data MQTTClient = MQTTClient {
  _ch             :: TChan MQTTPkt
  , _pktID        :: TVar Word16
  , _cb           :: MessageCallback
  , _acks         :: TVar (Map (DispatchType,Word16) (TChan MQTTPkt))
  , _inflight     :: TVar (Map Word16 PublishRequest)
  , _st           :: TVar ConnState
  , _ct           :: TVar (Maybe (Async ()))
  , _outA         :: TVar (Map Topic Word16)
  , _inA          :: TVar (Map Word16 Topic)
  , _connACKFlags :: TVar ConnACKFlags
  , _corr         :: TVar (Map BL.ByteString MessageCallback)
  }

-- | Configuration for setting up an MQTT client.
data MQTTConfig = MQTTConfig{
  _cleanSession     :: Bool -- ^ False if a session should be reused.
  , _lwt            :: Maybe LastWill -- ^ LastWill message to be sent on client disconnect.
  , _msgCB          :: MessageCallback -- ^ Callback for incoming messages.
  , _protocol       :: ProtocolLevel -- ^ Protocol to use for the connection.
  , _connProps      :: [Property] -- ^ Properties to send to the broker in the CONNECT packet.
  , _hostname       :: String -- ^ Host to connect to (parsed from the URI)
  , _port           :: Int -- ^ Port number (parsed from the URI)
  , _connID         :: String -- ^ Unique connection ID (parsed from the URI)
  , _username       :: Maybe String -- ^ Optional username (parsed from the URI)
  , _password       :: Maybe String -- ^ Optional password (parsed from the URI)
  , _connectTimeout :: Int -- ^ Connection timeout (microseconds)
  , _tlsSettings    :: TLSSettings -- ^ TLS Settings for secure connections
  }

-- | A default 'MQTTConfig'.  A '_connID' /may/ be required depending on
-- your broker (or if you just want an identifiable/resumable
-- connection).  In MQTTv5, an empty connection ID may be sent and the
-- server may assign an identifier for you and return it in the
-- 'PropAssignedClientIdentifier' 'Property'.
mqttConfig :: MQTTConfig
mqttConfig = MQTTConfig{_hostname="", _port=1883, _connID="",
                        _username=Nothing, _password=Nothing,
                        _cleanSession=True, _lwt=Nothing,
                        _msgCB=NoCallback,
                        _protocol=Protocol311, _connProps=mempty,
                        _connectTimeout=180000000,
                        _tlsSettings=TLSSettingsSimple False False False}

-- | Connect to an MQTT server by URI.
--
-- @mqtt://@, @mqtts://@, @ws://@, and @wss://@ URLs are supported.
-- The host, port, username, and password will be derived from the URI
-- and the values supplied in the config will be ignored.
--
-- > main :: IO
-- > main = do
-- >   let (Just uri) = parseURI "mqtt://test.mosquitto.org"
-- >   mc <- connectURI mqttConfig{} uri
-- >   publish mc "tmp/topic" "hello!" False
connectURI :: MQTTConfig -> URI -> IO MQTTClient
connectURI cfg@MQTTConfig{..} uri = do
  let cf = case uriScheme uri of
             "mqtt:"  -> runClient
             "mqtts:" -> runClientTLS
             "ws:"    -> runWS uri False
             "wss:"   -> runWS uri True
             us       -> mqttFail $ "invalid URI scheme: " <> us

      (Just a) = uriAuthority uri
      (u,p) = up (uriUserInfo a)

  v <- namedTimeout "MQTT connect" _connectTimeout $
    cf cfg{Network.MQTT.Client._connID=cid _protocol (uriFragment uri),
           _hostname=uriRegName a, _port=port (uriPort a) (uriScheme uri),
           Network.MQTT.Client._username=u, Network.MQTT.Client._password=p}

  case v of
    Nothing -> mqttFail $ "connection to " <> show uri <> " timed out"
    Just x  -> pure x

  where
    port "" "mqtt:"  = 1883
    port "" "mqtts:" = 8883
    port "" "ws:"    = 80
    port "" "wss:"   = 443
    port x _         = (read . tail) x

    cid _ ['#']    = ""
    cid _ ('#':xs) = xs
    cid _ _        = ""

    up "" = (Nothing, Nothing)
    up x = let (u,r) = break (== ':') (init x) in
             (Just (unEscapeString u), if r == "" then Nothing else Just (unEscapeString $ tail r))


-- | Set up and run a client from the given config.
runClient :: MQTTConfig -> IO MQTTClient
runClient cfg@MQTTConfig{..} = tcpCompat (runTCPClient (clientSettings _port (BCS.pack _hostname))) cfg

-- | Set up and run a client connected via TLS.
runClientTLS :: MQTTConfig -> IO MQTTClient
runClientTLS cfg@MQTTConfig{..} = tcpCompat (runTLSClient tlsConf) cfg
  where tlsConf = (tlsClientConfig _port (BCS.pack _hostname)) {tlsClientTLSSettings=_tlsSettings}

-- Compatibility mechanisms for TCP Conduit bits.
tcpCompat :: ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat mkconn = runMQTTConduit (adapt mkconn)
  where adapt mk f = mk (f . adaptor)
        adaptor ad = (appSource ad, appSink ad)

runWS :: URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS URI{uriPath, uriQuery} secure cfg@MQTTConfig{..} =
  runMQTTConduit (adapt $ cf secure _hostname _port endpoint WS.defaultConnectionOptions hdrs) cfg

  where
    hdrs = [("Sec-WebSocket-Protocol", "mqtt")]
    adapt mk f = mk (f . adaptor)
    adaptor s = (wsSource s, wsSink s)

    endpoint = uriPath <> uriQuery

    cf :: Bool -> String -> Int -> String -> WS.ConnectionOptions -> WS.Headers -> WS.ClientApp () -> IO ()
    cf False = WS.runClientWith
    cf True  = runWSS

    wsSource :: WS.Connection -> ConduitT () BCS.ByteString IO ()
    wsSource ws = forever $ do
      bs <- liftIO $ WS.receiveData ws
      unless (BCS.null bs) $ yield bs

    wsSink :: WS.Connection -> ConduitT BCS.ByteString Void IO ()
    wsSink ws = maybe (pure ()) (\bs -> liftIO (WS.sendBinaryData ws bs) >> wsSink ws) =<< await

    runWSS :: String -> Int -> String -> WS.ConnectionOptions -> WS.Headers -> WS.ClientApp () -> IO ()
    runWSS host port path options hdrs' app = do
      let connectionParams = ConnectionParams
            { connectionHostname = host
            , connectionPort =  toEnum port
            , connectionUseSecure = Just _tlsSettings
            , connectionUseSocks = Nothing
            }

      context <- initConnectionContext
      E.bracket (connectTo context connectionParams) connectionClose
        (\conn -> do
            stream <- makeStream (reader conn) (writer conn)
            WS.runClientWithStream stream host path options hdrs' app)

        where
          reader conn =
            catchIOError (Just <$> connectionGetChunk conn)
            (\e -> if isEOFError e then pure Nothing else E.throwIO e)

          writer conn = maybe (pure ()) (connectionPut conn . BC.toStrict)

pingPeriod :: Int
pingPeriod = 30000000 -- 30 seconds

mqttFail :: String -> a
mqttFail = E.throw . MQTTException

-- A couple async utilities to get our stuff named.

namedAsync :: String -> IO a -> IO (Async a)
namedAsync s a = async a >>= \p -> labelThread (asyncThreadId p) s >> pure p

namedTimeout :: String -> Int -> IO a -> IO (Maybe a)
namedTimeout n to a = timeout to (myThreadId >>= \tid -> labelThread tid n >> a)

-- | MQTTConduit provides a source and sink for data as used by 'runMQTTConduit'.
type MQTTConduit = (ConduitT () BCS.ByteString IO (), ConduitT BCS.ByteString Void IO ())

-- | Set up and run a client with a conduit context function.
--
-- The provided action calls another IO action with a 'MQTTConduit' as a
-- parameter.  It is expected that this action will manage the
-- lifecycle of the conduit source/sink on behalf of the client.
runMQTTConduit :: ((MQTTConduit -> IO ()) -> IO ()) -- ^ an action providing an 'MQTTConduit' in an execution context
               -> MQTTConfig -- ^ the 'MQTTConfig'
               -> IO MQTTClient
runMQTTConduit mkconn MQTTConfig{..} = do
  _ch <- newTChanIO
  _pktID <- newTVarIO 1
  _acks <- newTVarIO mempty
  _inflight <- newTVarIO mempty
  _st <- newTVarIO Starting
  _ct <- newTVarIO Nothing
  _outA <- newTVarIO mempty
  _inA <- newTVarIO mempty
  _connACKFlags <- newTVarIO (ConnACKFlags NewSession ConnUnspecifiedError mempty)
  _corr <- newTVarIO mempty
  let _cb = _msgCB
      cli = MQTTClient{..}

  t <- namedAsync "MQTT clientThread" $ clientThread cli
  s <- atomically (waitForLaunch cli t)

  when (s == Disconnected) $ wait t

  atomically $ checkConnected cli

  pure cli

  where
    clientThread cli = E.finally connectAndRun markDisco
      where
        connectAndRun = mkconn $ \ad -> start cli ad >>= run ad
        markDisco = atomically $ do
          st <- readTVar (_st cli)
          guard $ st == Starting || st == Connected
          writeTVar (_st cli) Disconnected

    start c@MQTTClient{..} (_,sink) = do
      void . runConduit $ do
        let req = connectRequest{T._connID=BC.pack _connID,
                                 T._lastWill=_lwt,
                                 T._username=BC.pack <$> _username,
                                 T._password=BC.pack <$> _password,
                                 T._cleanSession=_cleanSession,
                                 T._connProperties=_connProps}
        yield (BL.toStrict $ toByteString _protocol req) .| sink

      pure c

    run (src,sink) c@MQTTClient{..} = do
      pch <- newTChanIO
      o <- namedAsync "MQTT out" $ onceConnected >> processOut
      p <- namedAsync "MQTT ping" $ onceConnected >> doPing
      w <- namedAsync "MQTT watchdog" $ watchdog pch
      s <- namedAsync "MQTT in" $ doSrc pch

      void $ waitAnyCancel [o, p, w, s]

      where
        doSrc pch = runConduit $ src
                    .| conduitParser (parsePacket _protocol)
                    .| C.mapM_ (\(_,x) -> liftIO (dispatch c pch x))

        onceConnected = atomically $ check . (== Connected) =<< readTVar _st

        processOut = runConduit $
          C.repeatM (liftIO (atomically $ checkConnected c >> readTChan _ch))
          .| C.map (BL.toStrict . toByteString _protocol)
          .| sink

        doPing = forever $ threadDelay pingPeriod >> sendPacketIO c PingPkt

        watchdog ch = forever $ do
          toch <- registerDelay (pingPeriod * 3)
          timedOut <- atomically $ ((check =<< readTVar toch) >> pure True) `orElse` (readTChan ch >> pure False)
          when timedOut $ killConn c Timeout

    waitForLaunch MQTTClient{..} t = do
      writeTVar _ct (Just t)
      c <- readTVar _st
      if c == Starting then retry else pure c

-- | Wait for a client to terminate its connection.
-- An exception is thrown if the client didn't terminate expectedly.
waitForClient :: MQTTClient -> IO ()
waitForClient c@MQTTClient{..} = do
  void . traverse wait =<< readTVarIO _ct
  e <- atomically $ stateX c Stopped
  case e of
    Nothing -> pure ()
    Just x  -> E.throwIO x

stateX :: MQTTClient -> ConnState -> STM (Maybe E.SomeException)
stateX MQTTClient{..} want = f <$> readTVar _st

  where
    je = Just . E.toException . MQTTException

    f :: ConnState -> Maybe E.SomeException
    f Connected    = if want == Connected then Nothing else je "unexpectedly connected"
    f Stopped      = if want == Stopped then Nothing else je "unexpectedly stopped"
    f Disconnected = je "disconnected"
    f Starting     = je "died while starting"
    f (DiscoErr x) = Just . E.toException . Discod $ x
    f (ConnErr e)  = je (show e)

data MQTTException = Timeout | BadData | Discod DisconnectRequest | MQTTException String deriving(Eq, Show)

instance E.Exception MQTTException

dispatch :: MQTTClient -> TChan Bool -> MQTTPkt -> IO ()
dispatch c@MQTTClient{..} pch pkt =
  case pkt of
    (ConnACKPkt p)                            -> connACKd p
    (PublishPkt p)                            -> pub p
    (SubACKPkt (SubscribeResponse i _ _))     -> delegate DSubACK i
    (UnsubACKPkt (UnsubscribeResponse i _ _)) -> delegate DUnsubACK i
    (PubACKPkt (PubACK i _ _))                -> delegate DPubACK i
    (PubRELPkt (PubREL i _ _))                -> pubd i
    (PubRECPkt (PubREC i _ _))                -> delegate DPubREC i
    (PubCOMPPkt (PubCOMP i _ _))              -> delegate DPubCOMP i
    (DisconnectPkt req)                       -> disco req
    PongPkt                                   -> atomically . writeTChan pch $ True

    -- Not implemented
    (AuthPkt p)                               -> mqttFail ("unexpected incoming auth: " <> show p)

    -- Things clients shouldn't see
    PingPkt                                   -> mqttFail "unexpected incoming ping packet"
    (ConnPkt _ _)                             -> mqttFail "unexpected incoming connect"
    (SubscribePkt _)                          -> mqttFail "unexpected incoming subscribe"
    (UnsubscribePkt _)                        -> mqttFail "unexpected incoming unsubscribe"

  where connACKd connr@(ConnACKFlags _ val _) = case val of
                                                  ConnAccepted -> atomically $ do
                                                    writeTVar _connACKFlags connr
                                                    writeTVar _st Connected
                                                  _ -> do
                                                    t <- readTVarIO _ct
                                                    atomically $ writeTVar _st (ConnErr connr)
                                                    maybeCancelWith (MQTTException $ show connr) t

        pub p@PublishRequest{_pubQoS=QoS0} = atomically (resolve p) >>= notify Nothing
        pub p@PublishRequest{_pubQoS=QoS1, _pubPktID} =
          notify (Just (PubACKPkt (PubACK _pubPktID 0 mempty))) =<< atomically (resolve p)
        pub p@PublishRequest{_pubQoS=QoS2} = atomically $ do
          p'@PublishRequest{..} <- resolve p
          modifyTVar' _inflight (Map.insert _pubPktID p')
          sendPacket c (PubRECPkt (PubREC _pubPktID 0 mempty))

        pubd i = do
          mp <- atomically $ do
            r <- Map.lookup i <$> readTVar _inflight
            modifyTVar' _inflight (Map.delete i)
            pure r
          case mp of
            Nothing -> sendPacketIO c (PubCOMPPkt (PubCOMP i 0x92 mempty))
            Just p  -> notify (Just (PubCOMPPkt (PubCOMP i 0 mempty))) p

        notify rpkt p@PublishRequest{..} = do
          atomically $ modifyTVar' _inflight (Map.delete _pubPktID)
          corrs <- readTVarIO _corr
          E.evaluate . force =<< case maybe _cb (\cd -> Map.findWithDefault _cb cd corrs) cdata of
                                   NoCallback         -> pure ()
                                   SimpleCallback f   -> call (f c (blToText _pubTopic) _pubBody _pubProps)
                                   LowLevelCallback f -> call (f c p)

            where
              call a = link =<< namedAsync "notifier" (a >> respond)
              respond = void $ traverse (sendPacketIO c) rpkt
              cdata = foldr f Nothing _pubProps
                where f (PropCorrelationData x) _ = Just x
                      f _ o                       = o

        resolve p@PublishRequest{..} = do
          topic <- resolveTopic (foldr aliasID Nothing _pubProps)
          pure p{_pubTopic=textToBL topic}

          where
            aliasID (PropTopicAlias x) _ = Just x
            aliasID _ o                  = o

            resolveTopic Nothing = pure (blToText _pubTopic)
            resolveTopic (Just x) = do
              when (_pubTopic /= "") $ modifyTVar' _inA (Map.insert x (blToText _pubTopic))
              m <- readTVar _inA
              case Map.lookup x m of
                Nothing -> mqttFail ("failed to lookup topic alias " <> show x)
                Just t  -> pure t

        delegate dt pid = atomically $ do
          m <- readTVar _acks
          case Map.lookup (dt, pid) m of
            Nothing -> nak dt
            Just ch -> writeTChan ch pkt

            where
              nak DPubREC = sendPacket c (PubRELPkt  (PubREL  pid 0x92 mempty))
              nak _       = pure ()


        disco req = do
          atomically $ writeTVar _st (DiscoErr req)
          maybeCancelWith (Discod req) =<< readTVarIO _ct

maybeCancelWith :: E.Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith e = void . traverse (`cancelWith` e)

killConn :: E.Exception e => MQTTClient -> e -> IO ()
killConn MQTTClient{..} e = readTVarIO _ct >>= maybeCancelWith e

checkConnected :: MQTTClient -> STM ()
checkConnected mc = maybe (pure ()) E.throw =<< stateX mc Connected

-- | True if we're currently in a normally connected state (in the IO monad).
isConnected :: MQTTClient -> IO Bool
isConnected = atomically . isConnectedSTM

-- | True if we're currently in a normally connected state (in the STM monad).
isConnectedSTM :: MQTTClient -> STM Bool
isConnectedSTM MQTTClient{..} = (Connected ==) <$> readTVar _st

sendPacket :: MQTTClient -> MQTTPkt -> STM ()
sendPacket c@MQTTClient{..} p = checkConnected c >> writeTChan _ch p

sendPacketIO :: MQTTClient -> MQTTPkt -> IO ()
sendPacketIO c = atomically . sendPacket c

textToBL :: Text -> BL.ByteString
textToBL = BL.fromStrict . TE.encodeUtf8

blToText :: BL.ByteString -> Text
blToText = TE.decodeUtf8 . BL.toStrict

reservePktID :: MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID c@MQTTClient{..} dts = do
  checkConnected c
  ch <- newTChan
  pid <- readTVar _pktID
  modifyTVar' _pktID $ if pid == maxBound then const 1 else succ
  modifyTVar' _acks (Map.union (Map.fromList [((t, pid), ch) | t <- dts]))
  pure (ch,pid)

releasePktID :: MQTTClient -> (DispatchType,Word16) -> STM ()
releasePktID c@MQTTClient{..} k = checkConnected c >> modifyTVar' _acks (Map.delete k)

releasePktIDs :: MQTTClient -> [(DispatchType,Word16)] -> STM ()
releasePktIDs c@MQTTClient{..} ks = checkConnected c >> modifyTVar' _acks deleteMany
  where deleteMany m = foldr Map.delete m ks

sendAndWait :: MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait c@MQTTClient{..} dt f = do
  (ch,pid) <- atomically $ do
    (ch,pid) <- reservePktID c [dt]
    sendPacket c (f pid)
    pure (ch,pid)

  -- Wait for the response in a separate transaction.
  atomically $ do
    st <- readTVar _st
    when (st /= Connected) $ mqttFail "disconnected waiting for response"
    releasePktID c (dt,pid)
    readTChan ch

-- | Subscribe to a list of topic filters with their respective 'QoS'es.
-- The accepted 'QoS'es are returned in the same order as requested.
subscribe :: MQTTClient -> [(Filter, SubOptions)] -> [Property] -> IO ([Either SubErr QoS], [Property])
subscribe c@MQTTClient{..} ls props = do
  r <- sendAndWait c DSubACK (\pid -> SubscribePkt $ SubscribeRequest pid ls' props)
  let (SubACKPkt (SubscribeResponse _ rs aprops)) = r
  pure (rs, aprops)

    where ls' = map (first textToBL) ls

-- | Unsubscribe from a list of topic filters.
--
-- In MQTT 3.1.1, there is no body to an unsubscribe response, so it
-- can be ignored.  If this returns, you were unsubscribed.  In MQTT
-- 5, you'll get a list of unsub status values corresponding to your
-- request filters, and whatever properties the server thought you
-- should know about.
unsubscribe :: MQTTClient -> [Filter] -> [Property] -> IO ([UnsubStatus], [Property])
unsubscribe c@MQTTClient{..} ls props = do
  (UnsubACKPkt (UnsubscribeResponse _ rsn rprop)) <- sendAndWait c DUnsubACK (\pid -> UnsubscribePkt $ UnsubscribeRequest pid (map textToBL ls) props)
  pure (rprop, rsn)

-- | Publish a message (QoS 0).
publish :: MQTTClient
        -> Topic         -- ^ Topic
        -> BL.ByteString -- ^ Message body
        -> Bool          -- ^ Retain flag
        -> IO ()
publish c t m r = void $ publishq c t m r QoS0 mempty

-- | Publish a message with the specified QoS and Properties list.
publishq :: MQTTClient
         -> Topic         -- ^ Topic
         -> BL.ByteString -- ^ Message body
         -> Bool          -- ^ Retain flag
         -> QoS           -- ^ QoS
         -> [Property]    -- ^ Properties
         -> IO ()
publishq c t m r q props = do
  (ch,pid) <- atomically $ reservePktID c types
  E.finally (publishAndWait ch pid) (atomically $ releasePktIDs c [(t',pid) | t' <- types])

    where
      types = [DPubACK, DPubREC, DPubCOMP]
      publishAndWait ch pid = do
        sendPacketIO c (pkt pid)
        when (q > QoS0) $ satisfyQoS ch pid

      pkt pid = PublishPkt $ PublishRequest {_pubDup = False,
                                             _pubQoS = q,
                                             _pubPktID = pid,
                                             _pubRetain = r,
                                             _pubTopic = textToBL t,
                                             _pubBody = m,
                                             _pubProps = props}

      satisfyQoS ch pid
        | q == QoS0 = pure ()
        | q == QoS1 = void $ do
            (PubACKPkt (PubACK _ st pprops)) <- atomically $ checkConnected c >> readTChan ch
            unless (isOK st) $ mqttFail ("qos 1 publish error: " <> show st <> " " <> show pprops)
        | q == QoS2 = waitRec
        | otherwise = error "invalid QoS"

        where
          isOK 0  = True -- success
          isOK 16 = True -- It worked, but nobody cares (no matching subscribers)
          isOK _  = False

          waitRec = do
            rpkt <- atomically $ checkConnected c >> readTChan ch
            case rpkt of
              PubRECPkt (PubREC _ st recprops) -> do
                unless (isOK st) $ mqttFail ("qos 2 REC publish error: " <> show st <> " " <> show recprops)
                sendPacketIO c (PubRELPkt $ PubREL pid 0 mempty)
              PubCOMPPkt (PubCOMP _ st' compprops) ->
                when (st' /= 0) $ mqttFail ("qos 2 COMP publish error: " <> show st' <> " " <> show compprops)
              wtf -> mqttFail ("unexpected packet received in QoS2 publish: " <> show wtf)

-- | Disconnect from the MQTT server.
disconnect :: MQTTClient -> DiscoReason -> [Property] -> IO ()
disconnect c@MQTTClient{..} reason props = race_ getDisconnected orDieTrying
  where
    getDisconnected = do
      sendPacketIO c (DisconnectPkt $ DisconnectRequest reason props)
      void . traverse wait =<< readTVarIO _ct
      atomically $ writeTVar _st Stopped
    orDieTrying = threadDelay 10000000 >> killConn c Timeout

-- | Disconnect with 'DiscoNormalDisconnection' and no properties.
normalDisconnect :: MQTTClient -> IO ()
normalDisconnect c = disconnect c DiscoNormalDisconnection mempty

-- | A convenience method for creating a 'LastWill'.
mkLWT :: Topic -> BL.ByteString -> Bool -> T.LastWill
mkLWT t m r = T.LastWill{
  T._willRetain=r,
  T._willQoS=QoS0,
  T._willTopic = textToBL t,
  T._willMsg=m,
  T._willProps=mempty
  }

-- | Get the list of properties that were sent from the broker at connect time.
svrProps :: MQTTClient -> IO [Property]
svrProps mc = p <$> atomically (connACKSTM mc)
  where p (ConnACKFlags _ _ props) = props

-- | Get the complete connection ACK packet from the beginning of this session.
connACKSTM :: MQTTClient -> STM ConnACKFlags
connACKSTM MQTTClient{_connACKFlags} = readTVar _connACKFlags

-- | Get the complete connection aCK packet from the beginning of this session.
connACK :: MQTTClient -> IO ConnACKFlags
connACK = atomically . connACKSTM

maxAliases :: MQTTClient -> IO Word16
maxAliases mc = foldr f 0 <$> svrProps mc
  where
    f (PropTopicAliasMaximum n) _ = n
    f _ o                         = o

-- | Publish a message with the specified 'QoS' and 'Property' list.  If
-- possible, use an alias to shorten the message length.  The alias
-- list is managed by the client in a first-come, first-served basis,
-- so if you use this with more properties than the broker allows,
-- only the first N (up to TopicAliasMaximum, as specified by the
-- broker at connect time) will be aliased.
--
-- This is safe to use as a general publish mechanism, as it will
-- default to not aliasing whenver there's not already an alias and we
-- can't create any more.
pubAliased :: MQTTClient
         -> Topic         -- ^ Topic
         -> BL.ByteString -- ^ Message body
         -> Bool          -- ^ Retain flag
         -> QoS           -- ^ QoS
         -> [Property]    -- ^ Properties
         -> IO ()
pubAliased c@MQTTClient{..} t m r q props = do
  x <- maxAliases c
  (t', n) <- alias x
  let np = props <> case n of
                      0 -> mempty
                      _ -> [PropTopicAlias n]
  publishq c t' m r q np

  where
    alias mv = atomically $ do
      as <- readTVar _outA
      let n = toEnum (length as + 1)
          cur = Map.lookup t as
          v = fromMaybe (if n > mv then 0 else n) cur
      when (v > 0) $ writeTVar _outA (Map.insert t v as)
      pure (maybe t (const "") cur, v)

-- | Register a callback handler for a message with the given correlated data identifier.
--
-- This registration will remain in place until unregisterCorrelated is called to remove it.
registerCorrelated :: MQTTClient -> BL.ByteString -> MessageCallback -> STM ()
registerCorrelated MQTTClient{_corr} bs cb = modifyTVar' _corr (Map.insert bs cb)

-- | Unregister a callback handler for the given correlated data identifier.
unregisterCorrelated :: MQTTClient -> BL.ByteString -> STM ()
unregisterCorrelated MQTTClient{_corr} bs = modifyTVar' _corr (Map.delete bs)