{-|
Module      : Network.MQTT.Client.
Description : An MQTT client.
Copyright   : (c) Dustin Sallings, 2019
License     : BSD3
Maintainer  : dustin@spy.net
Stability   : experimental

An MQTT protocol client

Both
<http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html MQTT 3.1.1>
and
<https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html MQTT 5.0>
are supported over plain TCP, TLS, WebSockets and Secure WebSockets.
-}

{-# LANGUAGE NamedFieldPuns    #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards   #-}

module Network.MQTT.Client (
  -- * Configuring the client.
  MQTTConfig(..), MQTTClient, QoS(..), Topic, mqttConfig,  mkLWT, LastWill(..),
  ProtocolLevel(..), Property(..), SubOptions(..), subOptions, MessageCallback(..),
  -- * Running and waiting for the client.
  waitForClient,
  connectURI, isConnected,
  disconnect, normalDisconnect,
  -- * General client interactions.
  subscribe, unsubscribe, publish, publishq, pubAliased,
  svrProps, connACK, MQTTException(..),
  -- * Low-level bits
  runMQTTConduit, MQTTConduit, isConnectedSTM, connACKSTM,
  registerCorrelated, unregisterCorrelated
  ) where

import           Control.Concurrent         (myThreadId, threadDelay)
import           Control.Concurrent.Async   (Async, async, asyncThreadId, cancelWith, link, race_, wait, waitAnyCancel)
import           Control.Concurrent.STM     (STM, TChan, TVar, atomically, check, modifyTVar', newTChan, newTChanIO,
                                             newTVarIO, orElse, readTChan, readTVar, readTVarIO, registerDelay, retry,
                                             writeTChan, writeTVar)
import           Control.DeepSeq            (force)
import qualified Control.Exception          as E
import           Control.Monad              (forever, guard, unless, void, when)
import           Control.Monad.IO.Class     (liftIO)
import           Data.Bifunctor             (first)
import qualified Data.ByteString.Char8      as BCS
import qualified Data.ByteString.Lazy       as BL
import qualified Data.ByteString.Lazy.Char8 as BC
import           Data.Conduit               (ConduitT, Void, await, runConduit, yield, (.|))
import           Data.Conduit.Attoparsec    (conduitParser)
import qualified Data.Conduit.Combinators   as C
import           Data.Conduit.Network       (AppData, appSink, appSource, clientSettings, runTCPClient)
import           Data.Conduit.Network.TLS   (runTLSClient, tlsClientConfig, tlsClientTLSSettings)
import           Data.Map.Strict            (Map)
import qualified Data.Map.Strict            as Map
import           Data.Maybe                 (fromMaybe)
import           Data.Text                  (Text)
import qualified Data.Text.Encoding         as TE
import           Data.Word                  (Word16)
import           GHC.Conc                   (labelThread)
import           Network.Connection         (ConnectionParams (..), TLSSettings (..), connectTo, connectionClose,
                                             connectionGetChunk, connectionPut, initConnectionContext)
import           Network.URI                (URI (..), unEscapeString, uriPort, uriRegName, uriUserInfo)
import qualified Network.WebSockets         as WS
import           Network.WebSockets.Stream  (makeStream)
import           System.IO.Error            (catchIOError, isEOFError)
import           System.Timeout             (timeout)

import           Network.MQTT.Topic         (Filter, Topic)
import           Network.MQTT.Types         as T

data ConnState = Starting
               | Connected
               | Stopped
               | Disconnected
               | DiscoErr DisconnectRequest
               | ConnErr ConnACKFlags deriving (ConnState -> ConnState -> Bool
(ConnState -> ConnState -> Bool)
-> (ConnState -> ConnState -> Bool) -> Eq ConnState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConnState -> ConnState -> Bool
$c/= :: ConnState -> ConnState -> Bool
== :: ConnState -> ConnState -> Bool
$c== :: ConnState -> ConnState -> Bool
Eq, Int -> ConnState -> ShowS
[ConnState] -> ShowS
ConnState -> String
(Int -> ConnState -> ShowS)
-> (ConnState -> String)
-> ([ConnState] -> ShowS)
-> Show ConnState
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConnState] -> ShowS
$cshowList :: [ConnState] -> ShowS
show :: ConnState -> String
$cshow :: ConnState -> String
showsPrec :: Int -> ConnState -> ShowS
$cshowsPrec :: Int -> ConnState -> ShowS
Show)

data DispatchType = DSubACK | DUnsubACK | DPubACK | DPubREC | DPubREL | DPubCOMP
  deriving (DispatchType -> DispatchType -> Bool
(DispatchType -> DispatchType -> Bool)
-> (DispatchType -> DispatchType -> Bool) -> Eq DispatchType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: DispatchType -> DispatchType -> Bool
$c/= :: DispatchType -> DispatchType -> Bool
== :: DispatchType -> DispatchType -> Bool
$c== :: DispatchType -> DispatchType -> Bool
Eq, Int -> DispatchType -> ShowS
[DispatchType] -> ShowS
DispatchType -> String
(Int -> DispatchType -> ShowS)
-> (DispatchType -> String)
-> ([DispatchType] -> ShowS)
-> Show DispatchType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [DispatchType] -> ShowS
$cshowList :: [DispatchType] -> ShowS
show :: DispatchType -> String
$cshow :: DispatchType -> String
showsPrec :: Int -> DispatchType -> ShowS
$cshowsPrec :: Int -> DispatchType -> ShowS
Show, Eq DispatchType
Eq DispatchType
-> (DispatchType -> DispatchType -> Ordering)
-> (DispatchType -> DispatchType -> Bool)
-> (DispatchType -> DispatchType -> Bool)
-> (DispatchType -> DispatchType -> Bool)
-> (DispatchType -> DispatchType -> Bool)
-> (DispatchType -> DispatchType -> DispatchType)
-> (DispatchType -> DispatchType -> DispatchType)
-> Ord DispatchType
DispatchType -> DispatchType -> Bool
DispatchType -> DispatchType -> Ordering
DispatchType -> DispatchType -> DispatchType
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: DispatchType -> DispatchType -> DispatchType
$cmin :: DispatchType -> DispatchType -> DispatchType
max :: DispatchType -> DispatchType -> DispatchType
$cmax :: DispatchType -> DispatchType -> DispatchType
>= :: DispatchType -> DispatchType -> Bool
$c>= :: DispatchType -> DispatchType -> Bool
> :: DispatchType -> DispatchType -> Bool
$c> :: DispatchType -> DispatchType -> Bool
<= :: DispatchType -> DispatchType -> Bool
$c<= :: DispatchType -> DispatchType -> Bool
< :: DispatchType -> DispatchType -> Bool
$c< :: DispatchType -> DispatchType -> Bool
compare :: DispatchType -> DispatchType -> Ordering
$ccompare :: DispatchType -> DispatchType -> Ordering
$cp1Ord :: Eq DispatchType
Ord, Int -> DispatchType
DispatchType -> Int
DispatchType -> [DispatchType]
DispatchType -> DispatchType
DispatchType -> DispatchType -> [DispatchType]
DispatchType -> DispatchType -> DispatchType -> [DispatchType]
(DispatchType -> DispatchType)
-> (DispatchType -> DispatchType)
-> (Int -> DispatchType)
-> (DispatchType -> Int)
-> (DispatchType -> [DispatchType])
-> (DispatchType -> DispatchType -> [DispatchType])
-> (DispatchType -> DispatchType -> [DispatchType])
-> (DispatchType -> DispatchType -> DispatchType -> [DispatchType])
-> Enum DispatchType
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
enumFromThenTo :: DispatchType -> DispatchType -> DispatchType -> [DispatchType]
$cenumFromThenTo :: DispatchType -> DispatchType -> DispatchType -> [DispatchType]
enumFromTo :: DispatchType -> DispatchType -> [DispatchType]
$cenumFromTo :: DispatchType -> DispatchType -> [DispatchType]
enumFromThen :: DispatchType -> DispatchType -> [DispatchType]
$cenumFromThen :: DispatchType -> DispatchType -> [DispatchType]
enumFrom :: DispatchType -> [DispatchType]
$cenumFrom :: DispatchType -> [DispatchType]
fromEnum :: DispatchType -> Int
$cfromEnum :: DispatchType -> Int
toEnum :: Int -> DispatchType
$ctoEnum :: Int -> DispatchType
pred :: DispatchType -> DispatchType
$cpred :: DispatchType -> DispatchType
succ :: DispatchType -> DispatchType
$csucc :: DispatchType -> DispatchType
Enum, DispatchType
DispatchType -> DispatchType -> Bounded DispatchType
forall a. a -> a -> Bounded a
maxBound :: DispatchType
$cmaxBound :: DispatchType
minBound :: DispatchType
$cminBound :: DispatchType
Bounded)


-- | Callback invoked on each incoming subscribed message.
data MessageCallback = NoCallback
  | SimpleCallback (MQTTClient -> Topic -> BL.ByteString -> [Property] -> IO ())
  | LowLevelCallback (MQTTClient -> PublishRequest -> IO ())

-- | The MQTT client.
--
-- See 'connectURI' for the most straightforward example.
data MQTTClient = MQTTClient {
  MQTTClient -> TChan MQTTPkt
_ch             :: TChan MQTTPkt
  , MQTTClient -> TVar Word16
_pktID        :: TVar Word16
  , MQTTClient -> MessageCallback
_cb           :: MessageCallback
  , MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks         :: TVar (Map (DispatchType,Word16) (TChan MQTTPkt))
  , MQTTClient -> TVar (Map Word16 PublishRequest)
_inflight     :: TVar (Map Word16 PublishRequest)
  , MQTTClient -> TVar ConnState
_st           :: TVar ConnState
  , MQTTClient -> TVar (Maybe (Async ()))
_ct           :: TVar (Maybe (Async ()))
  , MQTTClient -> TVar (Map Topic Word16)
_outA         :: TVar (Map Topic Word16)
  , MQTTClient -> TVar (Map Word16 Topic)
_inA          :: TVar (Map Word16 Topic)
  , MQTTClient -> TVar ConnACKFlags
_connACKFlags :: TVar ConnACKFlags
  , MQTTClient -> TVar (Map ByteString MessageCallback)
_corr         :: TVar (Map BL.ByteString MessageCallback)
  }

-- | Configuration for setting up an MQTT client.
data MQTTConfig = MQTTConfig{
  MQTTConfig -> Bool
_cleanSession     :: Bool -- ^ False if a session should be reused.
  , MQTTConfig -> Maybe LastWill
_lwt            :: Maybe LastWill -- ^ LastWill message to be sent on client disconnect.
  , MQTTConfig -> MessageCallback
_msgCB          :: MessageCallback -- ^ Callback for incoming messages.
  , MQTTConfig -> ProtocolLevel
_protocol       :: ProtocolLevel -- ^ Protocol to use for the connection.
  , MQTTConfig -> [Property]
_connProps      :: [Property] -- ^ Properties to send to the broker in the CONNECT packet.
  , MQTTConfig -> String
_hostname       :: String -- ^ Host to connect to (parsed from the URI)
  , MQTTConfig -> Int
_port           :: Int -- ^ Port number (parsed from the URI)
  , MQTTConfig -> String
_connID         :: String -- ^ Unique connection ID (parsed from the URI)
  , MQTTConfig -> Maybe String
_username       :: Maybe String -- ^ Optional username (parsed from the URI)
  , MQTTConfig -> Maybe String
_password       :: Maybe String -- ^ Optional password (parsed from the URI)
  , MQTTConfig -> Int
_connectTimeout :: Int -- ^ Connection timeout (microseconds)
  , MQTTConfig -> TLSSettings
_tlsSettings    :: TLSSettings -- ^ TLS Settings for secure connections
  }

-- | A default 'MQTTConfig'.  A '_connID' /may/ be required depending on
-- your broker (or if you just want an identifiable/resumable
-- connection).  In MQTTv5, an empty connection ID may be sent and the
-- server may assign an identifier for you and return it in the
-- 'PropAssignedClientIdentifier' 'Property'.
mqttConfig :: MQTTConfig
mqttConfig :: MQTTConfig
mqttConfig = MQTTConfig :: Bool
-> Maybe LastWill
-> MessageCallback
-> ProtocolLevel
-> [Property]
-> String
-> Int
-> String
-> Maybe String
-> Maybe String
-> Int
-> TLSSettings
-> MQTTConfig
MQTTConfig{_hostname :: String
_hostname=String
"", _port :: Int
_port=Int
1883, _connID :: String
_connID=String
"",
                        _username :: Maybe String
_username=Maybe String
forall a. Maybe a
Nothing, _password :: Maybe String
_password=Maybe String
forall a. Maybe a
Nothing,
                        _cleanSession :: Bool
_cleanSession=Bool
True, _lwt :: Maybe LastWill
_lwt=Maybe LastWill
forall a. Maybe a
Nothing,
                        _msgCB :: MessageCallback
_msgCB=MessageCallback
NoCallback,
                        _protocol :: ProtocolLevel
_protocol=ProtocolLevel
Protocol311, _connProps :: [Property]
_connProps=[Property]
forall a. Monoid a => a
mempty,
                        _connectTimeout :: Int
_connectTimeout=Int
180000000,
                        _tlsSettings :: TLSSettings
_tlsSettings=Bool -> Bool -> Bool -> TLSSettings
TLSSettingsSimple Bool
False Bool
False Bool
False}

-- | Connect to an MQTT server by URI.
--
-- @mqtt://@, @mqtts://@, @ws://@, and @wss://@ URLs are supported.
-- The host, port, username, and password will be derived from the URI
-- and the values supplied in the config will be ignored.
--
-- > main :: IO
-- > main = do
-- >   let (Just uri) = parseURI "mqtt://test.mosquitto.org"
-- >   mc <- connectURI mqttConfig{} uri
-- >   publish mc "tmp/topic" "hello!" False
connectURI :: MQTTConfig -> URI -> IO MQTTClient
connectURI :: MQTTConfig -> URI -> IO MQTTClient
connectURI cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
TLSSettings
ProtocolLevel
MessageCallback
_tlsSettings :: TLSSettings
_connectTimeout :: Int
_password :: Maybe String
_username :: Maybe String
_connID :: String
_port :: Int
_hostname :: String
_connProps :: [Property]
_protocol :: ProtocolLevel
_msgCB :: MessageCallback
_lwt :: Maybe LastWill
_cleanSession :: Bool
_tlsSettings :: MQTTConfig -> TLSSettings
_connectTimeout :: MQTTConfig -> Int
_password :: MQTTConfig -> Maybe String
_username :: MQTTConfig -> Maybe String
_connID :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_hostname :: MQTTConfig -> String
_connProps :: MQTTConfig -> [Property]
_protocol :: MQTTConfig -> ProtocolLevel
_msgCB :: MQTTConfig -> MessageCallback
_lwt :: MQTTConfig -> Maybe LastWill
_cleanSession :: MQTTConfig -> Bool
..} URI
uri = do
  let cf :: MQTTConfig -> IO MQTTClient
cf = case URI -> String
uriScheme URI
uri of
             String
"mqtt:"  -> MQTTConfig -> IO MQTTClient
runClient
             String
"mqtts:" -> MQTTConfig -> IO MQTTClient
runClientTLS
             String
"ws:"    -> URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS URI
uri Bool
False
             String
"wss:"   -> URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS URI
uri Bool
True
             String
us       -> String -> MQTTConfig -> IO MQTTClient
forall a. String -> a
mqttFail (String -> MQTTConfig -> IO MQTTClient)
-> String -> MQTTConfig -> IO MQTTClient
forall a b. (a -> b) -> a -> b
$ String
"invalid URI scheme: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
us

      (Just URIAuth
a) = URI -> Maybe URIAuth
uriAuthority URI
uri
      (Maybe String
u,Maybe String
p) = String -> (Maybe String, Maybe String)
up (URIAuth -> String
uriUserInfo URIAuth
a)

  Maybe MQTTClient
v <- String -> Int -> IO MQTTClient -> IO (Maybe MQTTClient)
forall a. String -> Int -> IO a -> IO (Maybe a)
namedTimeout String
"MQTT connect" Int
_connectTimeout (IO MQTTClient -> IO (Maybe MQTTClient))
-> IO MQTTClient -> IO (Maybe MQTTClient)
forall a b. (a -> b) -> a -> b
$
    MQTTConfig -> IO MQTTClient
cf MQTTConfig
cfg{_connID :: String
Network.MQTT.Client._connID=ProtocolLevel -> ShowS
forall p. p -> ShowS
cid ProtocolLevel
_protocol (URI -> String
uriFragment URI
uri),
           _hostname :: String
_hostname=URIAuth -> String
uriRegName URIAuth
a, _port :: Int
_port=String -> String -> Int
forall a p. (Eq a, IsString a, Num p, Read p) => String -> a -> p
port (URIAuth -> String
uriPort URIAuth
a) (URI -> String
uriScheme URI
uri),
           _username :: Maybe String
Network.MQTT.Client._username=Maybe String
u, _password :: Maybe String
Network.MQTT.Client._password=Maybe String
p}

  case Maybe MQTTClient
v of
    Maybe MQTTClient
Nothing -> String -> IO MQTTClient
forall a. String -> a
mqttFail (String -> IO MQTTClient) -> String -> IO MQTTClient
forall a b. (a -> b) -> a -> b
$ String
"connection to " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> URI -> String
forall a. Show a => a -> String
show URI
uri String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" timed out"
    Just MQTTClient
x  -> MQTTClient -> IO MQTTClient
forall (f :: * -> *) a. Applicative f => a -> f a
pure MQTTClient
x

  where
    port :: String -> a -> p
port String
"" a
"mqtt:"  = p
1883
    port String
"" a
"mqtts:" = p
8883
    port String
"" a
"ws:"    = p
80
    port String
"" a
"wss:"   = p
443
    port String
x a
_         = (String -> p
forall a. Read a => String -> a
read (String -> p) -> ShowS -> String -> p
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ShowS
forall a. [a] -> [a]
tail) String
x

    cid :: p -> ShowS
cid p
_ [Char
'#']    = String
""
    cid p
_ (Char
'#':String
xs) = String
xs
    cid p
_ String
_        = String
""

    up :: String -> (Maybe String, Maybe String)
up String
"" = (Maybe String
forall a. Maybe a
Nothing, Maybe String
forall a. Maybe a
Nothing)
    up String
x = let (String
u,String
r) = (Char -> Bool) -> String -> (String, String)
forall a. (a -> Bool) -> [a] -> ([a], [a])
break (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
':') (ShowS
forall a. [a] -> [a]
init String
x) in
             (String -> Maybe String
forall a. a -> Maybe a
Just (ShowS
unEscapeString String
u), if String
r String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
"" then Maybe String
forall a. Maybe a
Nothing else String -> Maybe String
forall a. a -> Maybe a
Just (ShowS
unEscapeString ShowS -> ShowS
forall a b. (a -> b) -> a -> b
$ ShowS
forall a. [a] -> [a]
tail String
r))


-- | Set up and run a client from the given config.
runClient :: MQTTConfig -> IO MQTTClient
runClient :: MQTTConfig -> IO MQTTClient
runClient cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
TLSSettings
ProtocolLevel
MessageCallback
_tlsSettings :: TLSSettings
_connectTimeout :: Int
_password :: Maybe String
_username :: Maybe String
_connID :: String
_port :: Int
_hostname :: String
_connProps :: [Property]
_protocol :: ProtocolLevel
_msgCB :: MessageCallback
_lwt :: Maybe LastWill
_cleanSession :: Bool
_tlsSettings :: MQTTConfig -> TLSSettings
_connectTimeout :: MQTTConfig -> Int
_password :: MQTTConfig -> Maybe String
_username :: MQTTConfig -> Maybe String
_connID :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_hostname :: MQTTConfig -> String
_connProps :: MQTTConfig -> [Property]
_protocol :: MQTTConfig -> ProtocolLevel
_msgCB :: MQTTConfig -> MessageCallback
_lwt :: MQTTConfig -> Maybe LastWill
_cleanSession :: MQTTConfig -> Bool
..} = ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat (ClientSettings -> (AppData -> IO ()) -> IO ()
forall a. ClientSettings -> (AppData -> IO a) -> IO a
runTCPClient (Int -> ByteString -> ClientSettings
clientSettings Int
_port (String -> ByteString
BCS.pack String
_hostname))) MQTTConfig
cfg

-- | Set up and run a client connected via TLS.
runClientTLS :: MQTTConfig -> IO MQTTClient
runClientTLS :: MQTTConfig -> IO MQTTClient
runClientTLS cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
TLSSettings
ProtocolLevel
MessageCallback
_tlsSettings :: TLSSettings
_connectTimeout :: Int
_password :: Maybe String
_username :: Maybe String
_connID :: String
_port :: Int
_hostname :: String
_connProps :: [Property]
_protocol :: ProtocolLevel
_msgCB :: MessageCallback
_lwt :: Maybe LastWill
_cleanSession :: Bool
_tlsSettings :: MQTTConfig -> TLSSettings
_connectTimeout :: MQTTConfig -> Int
_password :: MQTTConfig -> Maybe String
_username :: MQTTConfig -> Maybe String
_connID :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_hostname :: MQTTConfig -> String
_connProps :: MQTTConfig -> [Property]
_protocol :: MQTTConfig -> ProtocolLevel
_msgCB :: MQTTConfig -> MessageCallback
_lwt :: MQTTConfig -> Maybe LastWill
_cleanSession :: MQTTConfig -> Bool
..} = ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat (TLSClientConfig -> (AppData -> IO ()) -> IO ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
TLSClientConfig -> (AppData -> m a) -> m a
runTLSClient TLSClientConfig
tlsConf) MQTTConfig
cfg
  where tlsConf :: TLSClientConfig
tlsConf = (Int -> ByteString -> TLSClientConfig
tlsClientConfig Int
_port (String -> ByteString
BCS.pack String
_hostname)) {tlsClientTLSSettings :: TLSSettings
tlsClientTLSSettings=TLSSettings
_tlsSettings}

-- Compatibility mechanisms for TCP Conduit bits.
tcpCompat :: ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat :: ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat (AppData -> IO ()) -> IO ()
mkconn = ((MQTTConduit -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
runMQTTConduit (((AppData -> IO ()) -> IO ()) -> (MQTTConduit -> IO ()) -> IO ()
forall ad (m :: * -> *) (m :: * -> *) c t i o.
(HasReadWrite ad, MonadIO m, MonadIO m) =>
((ad -> c) -> t)
-> ((ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c)
-> t
adapt (AppData -> IO ()) -> IO ()
mkconn)
  where adapt :: ((ad -> c) -> t)
-> ((ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c)
-> t
adapt (ad -> c) -> t
mk (ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c
f = (ad -> c) -> t
mk ((ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c
f ((ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c)
-> (ad -> (ConduitT i ByteString m (), ConduitT ByteString o m ()))
-> ad
-> c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ad -> (ConduitT i ByteString m (), ConduitT ByteString o m ())
forall ad (m :: * -> *) (m :: * -> *) i o.
(HasReadWrite ad, MonadIO m, MonadIO m) =>
ad -> (ConduitT i ByteString m (), ConduitT ByteString o m ())
adaptor)
        adaptor :: ad -> (ConduitT i ByteString m (), ConduitT ByteString o m ())
adaptor ad
ad = (ad -> ConduitT i ByteString m ()
forall ad (m :: * -> *) i.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT i ByteString m ()
appSource ad
ad, ad -> ConduitT ByteString o m ()
forall ad (m :: * -> *) o.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT ByteString o m ()
appSink ad
ad)

runWS :: URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS :: URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS URI{String
uriPath :: URI -> String
uriPath :: String
uriPath, String
uriQuery :: URI -> String
uriQuery :: String
uriQuery} Bool
secure cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
TLSSettings
ProtocolLevel
MessageCallback
_tlsSettings :: TLSSettings
_connectTimeout :: Int
_password :: Maybe String
_username :: Maybe String
_connID :: String
_port :: Int
_hostname :: String
_connProps :: [Property]
_protocol :: ProtocolLevel
_msgCB :: MessageCallback
_lwt :: Maybe LastWill
_cleanSession :: Bool
_tlsSettings :: MQTTConfig -> TLSSettings
_connectTimeout :: MQTTConfig -> Int
_password :: MQTTConfig -> Maybe String
_username :: MQTTConfig -> Maybe String
_connID :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_hostname :: MQTTConfig -> String
_connProps :: MQTTConfig -> [Property]
_protocol :: MQTTConfig -> ProtocolLevel
_msgCB :: MQTTConfig -> MessageCallback
_lwt :: MQTTConfig -> Maybe LastWill
_cleanSession :: MQTTConfig -> Bool
..} =
  ((MQTTConduit -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
runMQTTConduit (((Connection -> IO ()) -> IO ()) -> (MQTTConduit -> IO ()) -> IO ()
forall c t. ((Connection -> c) -> t) -> (MQTTConduit -> c) -> t
adapt (((Connection -> IO ()) -> IO ())
 -> (MQTTConduit -> IO ()) -> IO ())
-> ((Connection -> IO ()) -> IO ())
-> (MQTTConduit -> IO ())
-> IO ()
forall a b. (a -> b) -> a -> b
$ Bool
-> String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO ())
-> IO ()
cf Bool
secure String
_hostname Int
_port String
endpoint ConnectionOptions
WS.defaultConnectionOptions Headers
hdrs) MQTTConfig
cfg

  where
    hdrs :: Headers
hdrs = [(CI ByteString
"Sec-WebSocket-Protocol", ByteString
"mqtt")]
    adapt :: ((Connection -> c) -> t) -> (MQTTConduit -> c) -> t
adapt (Connection -> c) -> t
mk MQTTConduit -> c
f = (Connection -> c) -> t
mk (MQTTConduit -> c
f (MQTTConduit -> c)
-> (Connection -> MQTTConduit) -> Connection -> c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> MQTTConduit
adaptor)
    adaptor :: Connection -> MQTTConduit
adaptor Connection
s = (Connection -> ConduitT () ByteString IO ()
wsSource Connection
s, Connection -> ConduitT ByteString Void IO ()
wsSink Connection
s)

    endpoint :: String
endpoint = String
uriPath String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
uriQuery

    cf :: Bool -> String -> Int -> String -> WS.ConnectionOptions -> WS.Headers -> WS.ClientApp () -> IO ()
    cf :: Bool
-> String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO ())
-> IO ()
cf Bool
False = String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO ())
-> IO ()
forall a.
String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> ClientApp a
-> IO a
WS.runClientWith
    cf Bool
True  = String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO ())
-> IO ()
runWSS

    wsSource :: WS.Connection -> ConduitT () BCS.ByteString IO ()
    wsSource :: Connection -> ConduitT () ByteString IO ()
wsSource Connection
ws = ConduitT () ByteString IO () -> ConduitT () ByteString IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ConduitT () ByteString IO () -> ConduitT () ByteString IO ())
-> ConduitT () ByteString IO () -> ConduitT () ByteString IO ()
forall a b. (a -> b) -> a -> b
$ do
      ByteString
bs <- IO ByteString -> ConduitT () ByteString IO ByteString
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ByteString -> ConduitT () ByteString IO ByteString)
-> IO ByteString -> ConduitT () ByteString IO ByteString
forall a b. (a -> b) -> a -> b
$ Connection -> IO ByteString
forall a. WebSocketsData a => Connection -> IO a
WS.receiveData Connection
ws
      Bool
-> ConduitT () ByteString IO () -> ConduitT () ByteString IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteString -> Bool
BCS.null ByteString
bs) (ConduitT () ByteString IO () -> ConduitT () ByteString IO ())
-> ConduitT () ByteString IO () -> ConduitT () ByteString IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> ConduitT () ByteString IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield ByteString
bs

    wsSink :: WS.Connection -> ConduitT BCS.ByteString Void IO ()
    wsSink :: Connection -> ConduitT ByteString Void IO ()
wsSink Connection
ws = ConduitT ByteString Void IO ()
-> (ByteString -> ConduitT ByteString Void IO ())
-> Maybe ByteString
-> ConduitT ByteString Void IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT ByteString Void IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (\ByteString
bs -> IO () -> ConduitT ByteString Void IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
WS.sendBinaryData Connection
ws ByteString
bs) ConduitT ByteString Void IO ()
-> ConduitT ByteString Void IO () -> ConduitT ByteString Void IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Connection -> ConduitT ByteString Void IO ()
wsSink Connection
ws) (Maybe ByteString -> ConduitT ByteString Void IO ())
-> ConduitT ByteString Void IO (Maybe ByteString)
-> ConduitT ByteString Void IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ConduitT ByteString Void IO (Maybe ByteString)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await

    runWSS :: String -> Int -> String -> WS.ConnectionOptions -> WS.Headers -> WS.ClientApp () -> IO ()
    runWSS :: String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO ())
-> IO ()
runWSS String
host Int
port String
path ConnectionOptions
options Headers
hdrs' Connection -> IO ()
app = do
      let connectionParams :: ConnectionParams
connectionParams = ConnectionParams :: String
-> PortNumber
-> Maybe TLSSettings
-> Maybe ProxySettings
-> ConnectionParams
ConnectionParams
            { connectionHostname :: String
connectionHostname = String
host
            , connectionPort :: PortNumber
connectionPort =  Int -> PortNumber
forall a. Enum a => Int -> a
toEnum Int
port
            , connectionUseSecure :: Maybe TLSSettings
connectionUseSecure = TLSSettings -> Maybe TLSSettings
forall a. a -> Maybe a
Just TLSSettings
_tlsSettings
            , connectionUseSocks :: Maybe ProxySettings
connectionUseSocks = Maybe ProxySettings
forall a. Maybe a
Nothing
            }

      ConnectionContext
context <- IO ConnectionContext
initConnectionContext
      IO Connection
-> (Connection -> IO ()) -> (Connection -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
E.bracket (ConnectionContext -> ConnectionParams -> IO Connection
connectTo ConnectionContext
context ConnectionParams
connectionParams) Connection -> IO ()
connectionClose
        (\Connection
conn -> do
            Stream
stream <- IO (Maybe ByteString) -> (Maybe ByteString -> IO ()) -> IO Stream
makeStream (Connection -> IO (Maybe ByteString)
reader Connection
conn) (Connection -> Maybe ByteString -> IO ()
writer Connection
conn)
            Stream
-> String
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO ())
-> IO ()
forall a.
Stream
-> String
-> String
-> ConnectionOptions
-> Headers
-> ClientApp a
-> IO a
WS.runClientWithStream Stream
stream String
host String
path ConnectionOptions
options Headers
hdrs' Connection -> IO ()
app)

        where
          reader :: Connection -> IO (Maybe ByteString)
reader Connection
conn =
            IO (Maybe ByteString)
-> (IOError -> IO (Maybe ByteString)) -> IO (Maybe ByteString)
forall a. IO a -> (IOError -> IO a) -> IO a
catchIOError (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString)
-> IO ByteString -> IO (Maybe ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO ByteString
connectionGetChunk Connection
conn)
            (\IOError
e -> if IOError -> Bool
isEOFError IOError
e then Maybe ByteString -> IO (Maybe ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ByteString
forall a. Maybe a
Nothing else IOError -> IO (Maybe ByteString)
forall e a. Exception e => e -> IO a
E.throwIO IOError
e)

          writer :: Connection -> Maybe ByteString -> IO ()
writer Connection
conn = IO () -> (ByteString -> IO ()) -> Maybe ByteString -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (Connection -> ByteString -> IO ()
connectionPut Connection
conn (ByteString -> IO ())
-> (ByteString -> ByteString) -> ByteString -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BC.toStrict)

pingPeriod :: Int
pingPeriod :: Int
pingPeriod = Int
30000000 -- 30 seconds

mqttFail :: String -> a
mqttFail :: String -> a
mqttFail = MQTTException -> a
forall a e. Exception e => e -> a
E.throw (MQTTException -> a) -> (String -> MQTTException) -> String -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> MQTTException
MQTTException

-- A couple async utilities to get our stuff named.

namedAsync :: String -> IO a -> IO (Async a)
namedAsync :: String -> IO a -> IO (Async a)
namedAsync String
s IO a
a = IO a -> IO (Async a)
forall a. IO a -> IO (Async a)
async IO a
a IO (Async a) -> (Async a -> IO (Async a)) -> IO (Async a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Async a
p -> ThreadId -> String -> IO ()
labelThread (Async a -> ThreadId
forall a. Async a -> ThreadId
asyncThreadId Async a
p) String
s IO () -> IO (Async a) -> IO (Async a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async a -> IO (Async a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Async a
p

namedTimeout :: String -> Int -> IO a -> IO (Maybe a)
namedTimeout :: String -> Int -> IO a -> IO (Maybe a)
namedTimeout String
n Int
to IO a
a = Int -> IO a -> IO (Maybe a)
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
to (IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ThreadId
tid -> ThreadId -> String -> IO ()
labelThread ThreadId
tid String
n IO () -> IO a -> IO a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO a
a)

-- | MQTTConduit provides a source and sink for data as used by 'runMQTTConduit'.
type MQTTConduit = (ConduitT () BCS.ByteString IO (), ConduitT BCS.ByteString Void IO ())

-- | Set up and run a client with a conduit context function.
--
-- The provided action calls another IO action with a 'MQTTConduit' as a
-- parameter.  It is expected that this action will manage the
-- lifecycle of the conduit source/sink on behalf of the client.
runMQTTConduit :: ((MQTTConduit -> IO ()) -> IO ()) -- ^ an action providing an 'MQTTConduit' in an execution context
               -> MQTTConfig -- ^ the 'MQTTConfig'
               -> IO MQTTClient
runMQTTConduit :: ((MQTTConduit -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
runMQTTConduit (MQTTConduit -> IO ()) -> IO ()
mkconn MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
TLSSettings
ProtocolLevel
MessageCallback
_tlsSettings :: TLSSettings
_connectTimeout :: Int
_password :: Maybe String
_username :: Maybe String
_connID :: String
_port :: Int
_hostname :: String
_connProps :: [Property]
_protocol :: ProtocolLevel
_msgCB :: MessageCallback
_lwt :: Maybe LastWill
_cleanSession :: Bool
_tlsSettings :: MQTTConfig -> TLSSettings
_connectTimeout :: MQTTConfig -> Int
_password :: MQTTConfig -> Maybe String
_username :: MQTTConfig -> Maybe String
_connID :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_hostname :: MQTTConfig -> String
_connProps :: MQTTConfig -> [Property]
_protocol :: MQTTConfig -> ProtocolLevel
_msgCB :: MQTTConfig -> MessageCallback
_lwt :: MQTTConfig -> Maybe LastWill
_cleanSession :: MQTTConfig -> Bool
..} = do
  TChan MQTTPkt
_ch <- IO (TChan MQTTPkt)
forall a. IO (TChan a)
newTChanIO
  TVar Word16
_pktID <- Word16 -> IO (TVar Word16)
forall a. a -> IO (TVar a)
newTVarIO Word16
1
  TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks <- Map (DispatchType, Word16) (TChan MQTTPkt)
-> IO (TVar (Map (DispatchType, Word16) (TChan MQTTPkt)))
forall a. a -> IO (TVar a)
newTVarIO Map (DispatchType, Word16) (TChan MQTTPkt)
forall a. Monoid a => a
mempty
  TVar (Map Word16 PublishRequest)
_inflight <- Map Word16 PublishRequest -> IO (TVar (Map Word16 PublishRequest))
forall a. a -> IO (TVar a)
newTVarIO Map Word16 PublishRequest
forall a. Monoid a => a
mempty
  TVar ConnState
_st <- ConnState -> IO (TVar ConnState)
forall a. a -> IO (TVar a)
newTVarIO ConnState
Starting
  TVar (Maybe (Async ()))
_ct <- Maybe (Async ()) -> IO (TVar (Maybe (Async ())))
forall a. a -> IO (TVar a)
newTVarIO Maybe (Async ())
forall a. Maybe a
Nothing
  TVar (Map Topic Word16)
_outA <- Map Topic Word16 -> IO (TVar (Map Topic Word16))
forall a. a -> IO (TVar a)
newTVarIO Map Topic Word16
forall a. Monoid a => a
mempty
  TVar (Map Word16 Topic)
_inA <- Map Word16 Topic -> IO (TVar (Map Word16 Topic))
forall a. a -> IO (TVar a)
newTVarIO Map Word16 Topic
forall a. Monoid a => a
mempty
  TVar ConnACKFlags
_connACKFlags <- ConnACKFlags -> IO (TVar ConnACKFlags)
forall a. a -> IO (TVar a)
newTVarIO (SessionReuse -> ConnACKRC -> [Property] -> ConnACKFlags
ConnACKFlags SessionReuse
NewSession ConnACKRC
ConnUnspecifiedError [Property]
forall a. Monoid a => a
mempty)
  TVar (Map ByteString MessageCallback)
_corr <- Map ByteString MessageCallback
-> IO (TVar (Map ByteString MessageCallback))
forall a. a -> IO (TVar a)
newTVarIO Map ByteString MessageCallback
forall a. Monoid a => a
mempty
  let _cb :: MessageCallback
_cb = MessageCallback
_msgCB
      cli :: MQTTClient
cli = MQTTClient :: TChan MQTTPkt
-> TVar Word16
-> MessageCallback
-> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
-> TVar (Map Word16 PublishRequest)
-> TVar ConnState
-> TVar (Maybe (Async ()))
-> TVar (Map Topic Word16)
-> TVar (Map Word16 Topic)
-> TVar ConnACKFlags
-> TVar (Map ByteString MessageCallback)
-> MQTTClient
MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
MessageCallback
_cb :: MessageCallback
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
..}

  Async ()
t <- String -> IO () -> IO (Async ())
forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT clientThread" (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ MQTTClient -> IO ()
clientThread MQTTClient
cli
  ConnState
s <- STM ConnState -> IO ConnState
forall a. STM a -> IO a
atomically (MQTTClient -> Async () -> STM ConnState
waitForLaunch MQTTClient
cli Async ()
t)

  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnState
s ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnState
Disconnected) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Async () -> IO ()
forall a. Async a -> IO a
wait Async ()
t

  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ MQTTClient -> STM ()
checkConnected MQTTClient
cli

  MQTTClient -> IO MQTTClient
forall (f :: * -> *) a. Applicative f => a -> f a
pure MQTTClient
cli

  where
    clientThread :: MQTTClient -> IO ()
clientThread MQTTClient
cli = IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
E.finally IO ()
connectAndRun IO ()
markDisco
      where
        connectAndRun :: IO ()
connectAndRun = (MQTTConduit -> IO ()) -> IO ()
mkconn ((MQTTConduit -> IO ()) -> IO ())
-> (MQTTConduit -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \MQTTConduit
ad -> MQTTClient -> MQTTConduit -> IO MQTTClient
forall (m :: * -> *) a a.
Monad m =>
MQTTClient -> (a, ConduitM ByteString Void m a) -> m MQTTClient
start MQTTClient
cli MQTTConduit
ad IO MQTTClient -> (MQTTClient -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MQTTConduit -> MQTTClient -> IO ()
run MQTTConduit
ad
        markDisco :: IO ()
markDisco = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          ConnState
st <- TVar ConnState -> STM ConnState
forall a. TVar a -> STM a
readTVar (MQTTClient -> TVar ConnState
_st MQTTClient
cli)
          Bool -> STM ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (Bool -> STM ()) -> Bool -> STM ()
forall a b. (a -> b) -> a -> b
$ ConnState
st ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnState
Starting Bool -> Bool -> Bool
|| ConnState
st ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnState
Connected
          TVar ConnState -> ConnState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (MQTTClient -> TVar ConnState
_st MQTTClient
cli) ConnState
Disconnected

    start :: MQTTClient -> (a, ConduitM ByteString Void m a) -> m MQTTClient
start c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
MessageCallback
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} (a
_,ConduitM ByteString Void m a
sink) = do
      m a -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m a -> m ())
-> (ConduitT () Void m a -> m a) -> ConduitT () Void m a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConduitT () Void m a -> m a
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void m a -> m ()) -> ConduitT () Void m a -> m ()
forall a b. (a -> b) -> a -> b
$ do
        let req :: ConnectRequest
req = ConnectRequest
connectRequest{_connID :: ByteString
T._connID=String -> ByteString
BC.pack String
_connID,
                                 _lastWill :: Maybe LastWill
T._lastWill=Maybe LastWill
_lwt,
                                 _username :: Maybe ByteString
T._username=String -> ByteString
BC.pack (String -> ByteString) -> Maybe String -> Maybe ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe String
_username,
                                 _password :: Maybe ByteString
T._password=String -> ByteString
BC.pack (String -> ByteString) -> Maybe String -> Maybe ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe String
_password,
                                 _cleanSession :: Bool
T._cleanSession=Bool
_cleanSession,
                                 _connProperties :: [Property]
T._connProperties=[Property]
_connProps}
        ByteString -> ConduitT () ByteString m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (ByteString -> ByteString
BL.toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ ProtocolLevel -> ConnectRequest -> ByteString
forall a. ByteMe a => ProtocolLevel -> a -> ByteString
toByteString ProtocolLevel
_protocol ConnectRequest
req) ConduitT () ByteString m ()
-> ConduitM ByteString Void m a -> ConduitT () Void m a
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitM ByteString Void m a
sink

      MQTTClient -> m MQTTClient
forall (f :: * -> *) a. Applicative f => a -> f a
pure MQTTClient
c

    run :: MQTTConduit -> MQTTClient -> IO ()
run (ConduitT () ByteString IO ()
src,ConduitT ByteString Void IO ()
sink) c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
MessageCallback
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} = do
      TChan Bool
pch <- IO (TChan Bool)
forall a. IO (TChan a)
newTChanIO
      Async ()
o <- String -> IO () -> IO (Async ())
forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT out" (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ IO ()
onceConnected IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
processOut
      Async ()
p <- String -> IO () -> IO (Async ())
forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT ping" (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ IO ()
onceConnected IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
forall b. IO b
doPing
      Async ()
w <- String -> IO () -> IO (Async ())
forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT watchdog" (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ TChan Bool -> IO ()
forall a b. TChan a -> IO b
watchdog TChan Bool
pch
      Async ()
s <- String -> IO () -> IO (Async ())
forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT in" (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ TChan Bool -> IO ()
doSrc TChan Bool
pch

      IO (Async (), ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Async (), ()) -> IO ()) -> IO (Async (), ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ [Async ()] -> IO (Async (), ())
forall a. [Async a] -> IO (Async a, a)
waitAnyCancel [Async ()
o, Async ()
p, Async ()
w, Async ()
s]

      where
        doSrc :: TChan Bool -> IO ()
doSrc TChan Bool
pch = ConduitT () Void IO () -> IO ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void IO () -> IO ())
-> ConduitT () Void IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ConduitT () ByteString IO ()
src
                    ConduitT () ByteString IO ()
-> ConduitT ByteString Void IO () -> ConduitT () Void IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| Parser ByteString MQTTPkt
-> ConduitT ByteString (PositionRange, MQTTPkt) IO ()
forall a (m :: * -> *) b.
(AttoparsecInput a, MonadThrow m) =>
Parser a b -> ConduitT a (PositionRange, b) m ()
conduitParser (ProtocolLevel -> Parser ByteString MQTTPkt
parsePacket ProtocolLevel
_protocol)
                    ConduitT ByteString (PositionRange, MQTTPkt) IO ()
-> ConduitM (PositionRange, MQTTPkt) Void IO ()
-> ConduitT ByteString Void IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ((PositionRange, MQTTPkt) -> IO ())
-> ConduitM (PositionRange, MQTTPkt) Void IO ()
forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
C.mapM_ (\(PositionRange
_,MQTTPkt
x) -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (MQTTClient -> TChan Bool -> MQTTPkt -> IO ()
dispatch MQTTClient
c TChan Bool
pch (MQTTPkt -> IO ()) -> MQTTPkt -> IO ()
forall a b. (a -> b) -> a -> b
$! MQTTPkt
x))

        onceConnected :: IO ()
onceConnected = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> STM ()
check (Bool -> STM ()) -> (ConnState -> Bool) -> ConnState -> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnState
Connected) (ConnState -> STM ()) -> STM ConnState -> STM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar ConnState -> STM ConnState
forall a. TVar a -> STM a
readTVar TVar ConnState
_st

        processOut :: IO ()
processOut = ConduitT () Void IO () -> IO ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void IO () -> IO ())
-> ConduitT () Void IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
          IO MQTTPkt -> ConduitT () MQTTPkt IO ()
forall (m :: * -> *) a i. Monad m => m a -> ConduitT i a m ()
C.repeatM (IO MQTTPkt -> IO MQTTPkt
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (STM MQTTPkt -> IO MQTTPkt
forall a. STM a -> IO a
atomically (STM MQTTPkt -> IO MQTTPkt) -> STM MQTTPkt -> IO MQTTPkt
forall a b. (a -> b) -> a -> b
$ MQTTClient -> STM ()
checkConnected MQTTClient
c STM () -> STM MQTTPkt -> STM MQTTPkt
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TChan MQTTPkt -> STM MQTTPkt
forall a. TChan a -> STM a
readTChan TChan MQTTPkt
_ch))
          ConduitT () MQTTPkt IO ()
-> ConduitM MQTTPkt Void IO () -> ConduitT () Void IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (MQTTPkt -> ByteString) -> ConduitT MQTTPkt ByteString IO ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map (ByteString -> ByteString
BL.toStrict (ByteString -> ByteString)
-> (MQTTPkt -> ByteString) -> MQTTPkt -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProtocolLevel -> MQTTPkt -> ByteString
forall a. ByteMe a => ProtocolLevel -> a -> ByteString
toByteString ProtocolLevel
_protocol)
          ConduitT MQTTPkt ByteString IO ()
-> ConduitT ByteString Void IO () -> ConduitM MQTTPkt Void IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT ByteString Void IO ()
sink

        doPing :: IO b
doPing = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
pingPeriod IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c MQTTPkt
PingPkt

        watchdog :: TChan a -> IO b
watchdog TChan a
ch = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ do
          TVar Bool
toch <- Int -> IO (TVar Bool)
registerDelay (Int
pingPeriod Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
3)
          Bool
timedOut <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ ((Bool -> STM ()
check (Bool -> STM ()) -> STM Bool -> STM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
toch) STM () -> STM Bool -> STM Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True) STM Bool -> STM Bool -> STM Bool
forall a. STM a -> STM a -> STM a
`orElse` (TChan a -> STM a
forall a. TChan a -> STM a
readTChan TChan a
ch STM a -> STM Bool -> STM Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False)
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
timedOut (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MQTTClient -> MQTTException -> IO ()
forall e. Exception e => MQTTClient -> e -> IO ()
killConn MQTTClient
c MQTTException
Timeout

    waitForLaunch :: MQTTClient -> Async () -> STM ConnState
waitForLaunch MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
MessageCallback
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} Async ()
t = do
      TVar (Maybe (Async ())) -> Maybe (Async ()) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Async ()))
_ct (Async () -> Maybe (Async ())
forall a. a -> Maybe a
Just Async ()
t)
      ConnState
c <- TVar ConnState -> STM ConnState
forall a. TVar a -> STM a
readTVar TVar ConnState
_st
      if ConnState
c ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnState
Starting then STM ConnState
forall a. STM a
retry else ConnState -> STM ConnState
forall (f :: * -> *) a. Applicative f => a -> f a
pure ConnState
c

-- | Wait for a client to terminate its connection.
-- An exception is thrown if the client didn't terminate expectedly.
waitForClient :: MQTTClient -> IO ()
waitForClient :: MQTTClient -> IO ()
waitForClient c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
MessageCallback
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} = do
  IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ())
-> (Maybe (Async ()) -> IO (Maybe ())) -> Maybe (Async ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Async () -> IO ()) -> Maybe (Async ()) -> IO (Maybe ())
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse Async () -> IO ()
forall a. Async a -> IO a
wait (Maybe (Async ()) -> IO ()) -> IO (Maybe (Async ())) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
  Maybe SomeException
e <- STM (Maybe SomeException) -> IO (Maybe SomeException)
forall a. STM a -> IO a
atomically (STM (Maybe SomeException) -> IO (Maybe SomeException))
-> STM (Maybe SomeException) -> IO (Maybe SomeException)
forall a b. (a -> b) -> a -> b
$ MQTTClient -> ConnState -> STM (Maybe SomeException)
stateX MQTTClient
c ConnState
Stopped
  case Maybe SomeException
e of
    Maybe SomeException
Nothing -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just SomeException
x  -> SomeException -> IO ()
forall e a. Exception e => e -> IO a
E.throwIO SomeException
x

stateX :: MQTTClient -> ConnState -> STM (Maybe E.SomeException)
stateX :: MQTTClient -> ConnState -> STM (Maybe SomeException)
stateX MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
MessageCallback
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} ConnState
want = ConnState -> Maybe SomeException
f (ConnState -> Maybe SomeException)
-> STM ConnState -> STM (Maybe SomeException)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar ConnState -> STM ConnState
forall a. TVar a -> STM a
readTVar TVar ConnState
_st

  where
    je :: String -> Maybe SomeException
je = SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (SomeException -> Maybe SomeException)
-> (String -> SomeException) -> String -> Maybe SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTException -> SomeException
forall e. Exception e => e -> SomeException
E.toException (MQTTException -> SomeException)
-> (String -> MQTTException) -> String -> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> MQTTException
MQTTException

    f :: ConnState -> Maybe E.SomeException
    f :: ConnState -> Maybe SomeException
f ConnState
Connected    = if ConnState
want ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnState
Connected then Maybe SomeException
forall a. Maybe a
Nothing else String -> Maybe SomeException
je String
"unexpectedly connected"
    f ConnState
Stopped      = if ConnState
want ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnState
Stopped then Maybe SomeException
forall a. Maybe a
Nothing else String -> Maybe SomeException
je String
"unexpectedly stopped"
    f ConnState
Disconnected = String -> Maybe SomeException
je String
"disconnected"
    f ConnState
Starting     = String -> Maybe SomeException
je String
"died while starting"
    f (DiscoErr DisconnectRequest
x) = SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (SomeException -> Maybe SomeException)
-> (DisconnectRequest -> SomeException)
-> DisconnectRequest
-> Maybe SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTException -> SomeException
forall e. Exception e => e -> SomeException
E.toException (MQTTException -> SomeException)
-> (DisconnectRequest -> MQTTException)
-> DisconnectRequest
-> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DisconnectRequest -> MQTTException
Discod (DisconnectRequest -> Maybe SomeException)
-> DisconnectRequest -> Maybe SomeException
forall a b. (a -> b) -> a -> b
$ DisconnectRequest
x
    f (ConnErr ConnACKFlags
e)  = String -> Maybe SomeException
je (ConnACKFlags -> String
forall a. Show a => a -> String
show ConnACKFlags
e)

data MQTTException = Timeout | BadData | Discod DisconnectRequest | MQTTException String deriving(MQTTException -> MQTTException -> Bool
(MQTTException -> MQTTException -> Bool)
-> (MQTTException -> MQTTException -> Bool) -> Eq MQTTException
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: MQTTException -> MQTTException -> Bool
$c/= :: MQTTException -> MQTTException -> Bool
== :: MQTTException -> MQTTException -> Bool
$c== :: MQTTException -> MQTTException -> Bool
Eq, Int -> MQTTException -> ShowS
[MQTTException] -> ShowS
MQTTException -> String
(Int -> MQTTException -> ShowS)
-> (MQTTException -> String)
-> ([MQTTException] -> ShowS)
-> Show MQTTException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [MQTTException] -> ShowS
$cshowList :: [MQTTException] -> ShowS
show :: MQTTException -> String
$cshow :: MQTTException -> String
showsPrec :: Int -> MQTTException -> ShowS
$cshowsPrec :: Int -> MQTTException -> ShowS
Show)

instance E.Exception MQTTException

dispatch :: MQTTClient -> TChan Bool -> MQTTPkt -> IO ()
dispatch :: MQTTClient -> TChan Bool -> MQTTPkt -> IO ()
dispatch c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
MessageCallback
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} TChan Bool
pch MQTTPkt
pkt =
  case MQTTPkt
pkt of
    (ConnACKPkt ConnACKFlags
p)                            -> ConnACKFlags -> IO ()
connACKd ConnACKFlags
p
    (PublishPkt PublishRequest
p)                            -> PublishRequest -> IO ()
pub PublishRequest
p
    (SubACKPkt (SubscribeResponse Word16
i [Either SubErr QoS]
_ [Property]
_))     -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DSubACK Word16
i
    (UnsubACKPkt (UnsubscribeResponse Word16
i [Property]
_ [UnsubStatus]
_)) -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DUnsubACK Word16
i
    (PubACKPkt (PubACK Word16
i Word8
_ [Property]
_))                -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DPubACK Word16
i
    (PubRELPkt (PubREL Word16
i Word8
_ [Property]
_))                -> Word16 -> IO ()
pubd Word16
i
    (PubRECPkt (PubREC Word16
i Word8
_ [Property]
_))                -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DPubREC Word16
i
    (PubCOMPPkt (PubCOMP Word16
i Word8
_ [Property]
_))              -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DPubCOMP Word16
i
    (DisconnectPkt DisconnectRequest
req)                       -> DisconnectRequest -> IO ()
disco DisconnectRequest
req
    MQTTPkt
PongPkt                                   -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (Bool -> STM ()) -> Bool -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TChan Bool -> Bool -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan Bool
pch (Bool -> IO ()) -> Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool
True

    -- Not implemented
    (AuthPkt AuthRequest
p)                               -> String -> IO ()
forall a. String -> a
mqttFail (String
"unexpected incoming auth: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> AuthRequest -> String
forall a. Show a => a -> String
show AuthRequest
p)

    -- Things clients shouldn't see
    MQTTPkt
PingPkt                                   -> String -> IO ()
forall a. String -> a
mqttFail String
"unexpected incoming ping packet"
    (ConnPkt ConnectRequest
_ ProtocolLevel
_)                             -> String -> IO ()
forall a. String -> a
mqttFail String
"unexpected incoming connect"
    (SubscribePkt SubscribeRequest
_)                          -> String -> IO ()
forall a. String -> a
mqttFail String
"unexpected incoming subscribe"
    (UnsubscribePkt UnsubscribeRequest
_)                        -> String -> IO ()
forall a. String -> a
mqttFail String
"unexpected incoming unsubscribe"

  where connACKd :: ConnACKFlags -> IO ()
connACKd connr :: ConnACKFlags
connr@(ConnACKFlags SessionReuse
_ ConnACKRC
val [Property]
_) = case ConnACKRC
val of
                                                  ConnACKRC
ConnAccepted -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                                                    TVar ConnACKFlags -> ConnACKFlags -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnACKFlags
_connACKFlags ConnACKFlags
connr
                                                    TVar ConnState -> ConnState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st ConnState
Connected
                                                  ConnACKRC
_ -> do
                                                    Maybe (Async ())
t <- TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
                                                    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ConnState -> ConnState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st (ConnACKFlags -> ConnState
ConnErr ConnACKFlags
connr)
                                                    MQTTException -> Maybe (Async ()) -> IO ()
forall e. Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith (String -> MQTTException
MQTTException (String -> MQTTException) -> String -> MQTTException
forall a b. (a -> b) -> a -> b
$ ConnACKFlags -> String
forall a. Show a => a -> String
show ConnACKFlags
connr) Maybe (Async ())
t

        pub :: PublishRequest -> IO ()
pub p :: PublishRequest
p@PublishRequest{_pubQoS :: PublishRequest -> QoS
_pubQoS=QoS
QoS0} = STM PublishRequest -> IO PublishRequest
forall a. STM a -> IO a
atomically (PublishRequest -> STM PublishRequest
resolve PublishRequest
p) IO PublishRequest -> (PublishRequest -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe MQTTPkt -> PublishRequest -> IO ()
forall (t :: * -> *).
Traversable t =>
t MQTTPkt -> PublishRequest -> IO ()
notify Maybe MQTTPkt
forall a. Maybe a
Nothing
        pub p :: PublishRequest
p@PublishRequest{_pubQoS :: PublishRequest -> QoS
_pubQoS=QoS
QoS1, Word16
_pubPktID :: PublishRequest -> Word16
_pubPktID :: Word16
_pubPktID} =
          Maybe MQTTPkt -> PublishRequest -> IO ()
forall (t :: * -> *).
Traversable t =>
t MQTTPkt -> PublishRequest -> IO ()
notify (MQTTPkt -> Maybe MQTTPkt
forall a. a -> Maybe a
Just (PubACK -> MQTTPkt
PubACKPkt (Word16 -> Word8 -> [Property] -> PubACK
PubACK Word16
_pubPktID Word8
0 [Property]
forall a. Monoid a => a
mempty))) (PublishRequest -> IO ()) -> IO PublishRequest -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STM PublishRequest -> IO PublishRequest
forall a. STM a -> IO a
atomically (PublishRequest -> STM PublishRequest
resolve PublishRequest
p)
        pub p :: PublishRequest
p@PublishRequest{_pubQoS :: PublishRequest -> QoS
_pubQoS=QoS
QoS2} = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          p' :: PublishRequest
p'@PublishRequest{Bool
[Property]
Word16
ByteString
QoS
_pubProps :: PublishRequest -> [Property]
_pubBody :: PublishRequest -> ByteString
_pubTopic :: PublishRequest -> ByteString
_pubRetain :: PublishRequest -> Bool
_pubDup :: PublishRequest -> Bool
_pubProps :: [Property]
_pubBody :: ByteString
_pubPktID :: Word16
_pubTopic :: ByteString
_pubRetain :: Bool
_pubQoS :: QoS
_pubDup :: Bool
_pubPktID :: PublishRequest -> Word16
_pubQoS :: PublishRequest -> QoS
..} <- PublishRequest -> STM PublishRequest
resolve PublishRequest
p
          TVar (Map Word16 PublishRequest)
-> (Map Word16 PublishRequest -> Map Word16 PublishRequest)
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map Word16 PublishRequest)
_inflight (Word16
-> PublishRequest
-> Map Word16 PublishRequest
-> Map Word16 PublishRequest
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Word16
_pubPktID PublishRequest
p')
          MQTTClient -> MQTTPkt -> STM ()
sendPacket MQTTClient
c (PubREC -> MQTTPkt
PubRECPkt (Word16 -> Word8 -> [Property] -> PubREC
PubREC Word16
_pubPktID Word8
0 [Property]
forall a. Monoid a => a
mempty))

        pubd :: Word16 -> IO ()
pubd Word16
i = do
          Maybe PublishRequest
mp <- STM (Maybe PublishRequest) -> IO (Maybe PublishRequest)
forall a. STM a -> IO a
atomically (STM (Maybe PublishRequest) -> IO (Maybe PublishRequest))
-> STM (Maybe PublishRequest) -> IO (Maybe PublishRequest)
forall a b. (a -> b) -> a -> b
$ do
            Maybe PublishRequest
r <- Word16 -> Map Word16 PublishRequest -> Maybe PublishRequest
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Word16
i (Map Word16 PublishRequest -> Maybe PublishRequest)
-> STM (Map Word16 PublishRequest) -> STM (Maybe PublishRequest)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Map Word16 PublishRequest) -> STM (Map Word16 PublishRequest)
forall a. TVar a -> STM a
readTVar TVar (Map Word16 PublishRequest)
_inflight
            TVar (Map Word16 PublishRequest)
-> (Map Word16 PublishRequest -> Map Word16 PublishRequest)
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map Word16 PublishRequest)
_inflight (Word16 -> Map Word16 PublishRequest -> Map Word16 PublishRequest
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete Word16
i)
            Maybe PublishRequest -> STM (Maybe PublishRequest)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe PublishRequest
r
          case Maybe PublishRequest
mp of
            Maybe PublishRequest
Nothing -> MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (PubCOMP -> MQTTPkt
PubCOMPPkt (Word16 -> Word8 -> [Property] -> PubCOMP
PubCOMP Word16
i Word8
0x92 [Property]
forall a. Monoid a => a
mempty))
            Just PublishRequest
p  -> Maybe MQTTPkt -> PublishRequest -> IO ()
forall (t :: * -> *).
Traversable t =>
t MQTTPkt -> PublishRequest -> IO ()
notify (MQTTPkt -> Maybe MQTTPkt
forall a. a -> Maybe a
Just (PubCOMP -> MQTTPkt
PubCOMPPkt (Word16 -> Word8 -> [Property] -> PubCOMP
PubCOMP Word16
i Word8
0 [Property]
forall a. Monoid a => a
mempty))) PublishRequest
p

        notify :: t MQTTPkt -> PublishRequest -> IO ()
notify t MQTTPkt
rpkt p :: PublishRequest
p@PublishRequest{Bool
[Property]
Word16
ByteString
QoS
_pubProps :: [Property]
_pubBody :: ByteString
_pubPktID :: Word16
_pubTopic :: ByteString
_pubRetain :: Bool
_pubQoS :: QoS
_pubDup :: Bool
_pubProps :: PublishRequest -> [Property]
_pubBody :: PublishRequest -> ByteString
_pubTopic :: PublishRequest -> ByteString
_pubRetain :: PublishRequest -> Bool
_pubDup :: PublishRequest -> Bool
_pubPktID :: PublishRequest -> Word16
_pubQoS :: PublishRequest -> QoS
..} = do
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map Word16 PublishRequest)
-> (Map Word16 PublishRequest -> Map Word16 PublishRequest)
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map Word16 PublishRequest)
_inflight (Word16 -> Map Word16 PublishRequest -> Map Word16 PublishRequest
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete Word16
_pubPktID)
          Map ByteString MessageCallback
corrs <- TVar (Map ByteString MessageCallback)
-> IO (Map ByteString MessageCallback)
forall a. TVar a -> IO a
readTVarIO TVar (Map ByteString MessageCallback)
_corr
          () -> IO ()
forall a. a -> IO a
E.evaluate (() -> IO ()) -> (() -> ()) -> () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. () -> ()
forall a. NFData a => a -> a
force (() -> IO ()) -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< case MessageCallback
-> (ByteString -> MessageCallback)
-> Maybe ByteString
-> MessageCallback
forall b a. b -> (a -> b) -> Maybe a -> b
maybe MessageCallback
_cb (\ByteString
cd -> MessageCallback
-> ByteString -> Map ByteString MessageCallback -> MessageCallback
forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault MessageCallback
_cb ByteString
cd Map ByteString MessageCallback
corrs) Maybe ByteString
cdata of
                                   MessageCallback
NoCallback         -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                                   SimpleCallback MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
f   -> IO () -> IO ()
forall a. IO a -> IO ()
call (MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
f MQTTClient
c (ByteString -> Topic
blToText ByteString
_pubTopic) ByteString
_pubBody [Property]
_pubProps)
                                   LowLevelCallback MQTTClient -> PublishRequest -> IO ()
f -> IO () -> IO ()
forall a. IO a -> IO ()
call (MQTTClient -> PublishRequest -> IO ()
f MQTTClient
c PublishRequest
p)

            where
              call :: IO a -> IO ()
call IO a
a = Async () -> IO ()
forall a. Async a -> IO ()
link (Async () -> IO ()) -> IO (Async ()) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< String -> IO () -> IO (Async ())
forall a. String -> IO a -> IO (Async a)
namedAsync String
"notifier" (IO a
a IO a -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
respond)
              respond :: IO ()
respond = IO (t ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (t ()) -> IO ()) -> IO (t ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ (MQTTPkt -> IO ()) -> t MQTTPkt -> IO (t ())
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c) t MQTTPkt
rpkt
              cdata :: Maybe ByteString
cdata = (Property -> Maybe ByteString -> Maybe ByteString)
-> Maybe ByteString -> [Property] -> Maybe ByteString
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr Property -> Maybe ByteString -> Maybe ByteString
f Maybe ByteString
forall a. Maybe a
Nothing [Property]
_pubProps
                where f :: Property -> Maybe ByteString -> Maybe ByteString
f (PropCorrelationData ByteString
x) Maybe ByteString
_ = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
x
                      f Property
_ Maybe ByteString
o                       = Maybe ByteString
o

        resolve :: PublishRequest -> STM PublishRequest
resolve p :: PublishRequest
p@PublishRequest{Bool
[Property]
Word16
ByteString
QoS
_pubProps :: [Property]
_pubBody :: ByteString
_pubPktID :: Word16
_pubTopic :: ByteString
_pubRetain :: Bool
_pubQoS :: QoS
_pubDup :: Bool
_pubProps :: PublishRequest -> [Property]
_pubBody :: PublishRequest -> ByteString
_pubTopic :: PublishRequest -> ByteString
_pubRetain :: PublishRequest -> Bool
_pubDup :: PublishRequest -> Bool
_pubPktID :: PublishRequest -> Word16
_pubQoS :: PublishRequest -> QoS
..} = do
          Topic
topic <- Maybe Word16 -> STM Topic
resolveTopic ((Property -> Maybe Word16 -> Maybe Word16)
-> Maybe Word16 -> [Property] -> Maybe Word16
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr Property -> Maybe Word16 -> Maybe Word16
aliasID Maybe Word16
forall a. Maybe a
Nothing [Property]
_pubProps)
          PublishRequest -> STM PublishRequest
forall (f :: * -> *) a. Applicative f => a -> f a
pure PublishRequest
p{_pubTopic :: ByteString
_pubTopic=Topic -> ByteString
textToBL Topic
topic}

          where
            aliasID :: Property -> Maybe Word16 -> Maybe Word16
aliasID (PropTopicAlias Word16
x) Maybe Word16
_ = Word16 -> Maybe Word16
forall a. a -> Maybe a
Just Word16
x
            aliasID Property
_ Maybe Word16
o                  = Maybe Word16
o

            resolveTopic :: Maybe Word16 -> STM Topic
resolveTopic Maybe Word16
Nothing = Topic -> STM Topic
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Topic
blToText ByteString
_pubTopic)
            resolveTopic (Just Word16
x) = do
              Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString
_pubTopic ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
"") (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ TVar (Map Word16 Topic)
-> (Map Word16 Topic -> Map Word16 Topic) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map Word16 Topic)
_inA (Word16 -> Topic -> Map Word16 Topic -> Map Word16 Topic
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Word16
x (ByteString -> Topic
blToText ByteString
_pubTopic))
              Map Word16 Topic
m <- TVar (Map Word16 Topic) -> STM (Map Word16 Topic)
forall a. TVar a -> STM a
readTVar TVar (Map Word16 Topic)
_inA
              case Word16 -> Map Word16 Topic -> Maybe Topic
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Word16
x Map Word16 Topic
m of
                Maybe Topic
Nothing -> String -> STM Topic
forall a. String -> a
mqttFail (String
"failed to lookup topic alias " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Word16 -> String
forall a. Show a => a -> String
show Word16
x)
                Just Topic
t  -> Topic -> STM Topic
forall (f :: * -> *) a. Applicative f => a -> f a
pure Topic
t

        delegate :: DispatchType -> Word16 -> IO ()
delegate DispatchType
dt Word16
pid = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          Map (DispatchType, Word16) (TChan MQTTPkt)
m <- TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
-> STM (Map (DispatchType, Word16) (TChan MQTTPkt))
forall a. TVar a -> STM a
readTVar TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks
          case (DispatchType, Word16)
-> Map (DispatchType, Word16) (TChan MQTTPkt)
-> Maybe (TChan MQTTPkt)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup (DispatchType
dt, Word16
pid) Map (DispatchType, Word16) (TChan MQTTPkt)
m of
            Maybe (TChan MQTTPkt)
Nothing -> DispatchType -> STM ()
nak DispatchType
dt
            Just TChan MQTTPkt
ch -> TChan MQTTPkt -> MQTTPkt -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan MQTTPkt
ch MQTTPkt
pkt

            where
              nak :: DispatchType -> STM ()
nak DispatchType
DPubREC = MQTTClient -> MQTTPkt -> STM ()
sendPacket MQTTClient
c (PubREL -> MQTTPkt
PubRELPkt  (Word16 -> Word8 -> [Property] -> PubREL
PubREL  Word16
pid Word8
0x92 [Property]
forall a. Monoid a => a
mempty))
              nak DispatchType
_       = () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()


        disco :: DisconnectRequest -> IO ()
disco DisconnectRequest
req = do
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ConnState -> ConnState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st (DisconnectRequest -> ConnState
DiscoErr DisconnectRequest
req)
          MQTTException -> Maybe (Async ()) -> IO ()
forall e. Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith (DisconnectRequest -> MQTTException
Discod DisconnectRequest
req) (Maybe (Async ()) -> IO ()) -> IO (Maybe (Async ())) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct

maybeCancelWith :: E.Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith :: e -> Maybe (Async ()) -> IO ()
maybeCancelWith e
e = IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ())
-> (Maybe (Async ()) -> IO (Maybe ())) -> Maybe (Async ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Async () -> IO ()) -> Maybe (Async ()) -> IO (Maybe ())
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (Async () -> e -> IO ()
forall e a. Exception e => Async a -> e -> IO ()
`cancelWith` e
e)

killConn :: E.Exception e => MQTTClient -> e -> IO ()
killConn :: MQTTClient -> e -> IO ()
killConn MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
MessageCallback
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} e
e = TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct IO (Maybe (Async ())) -> (Maybe (Async ()) -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= e -> Maybe (Async ()) -> IO ()
forall e. Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith e
e

checkConnected :: MQTTClient -> STM ()
checkConnected :: MQTTClient -> STM ()
checkConnected MQTTClient
mc = STM ()
-> (SomeException -> STM ()) -> Maybe SomeException -> STM ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) SomeException -> STM ()
forall a e. Exception e => e -> a
E.throw (Maybe SomeException -> STM ())
-> STM (Maybe SomeException) -> STM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MQTTClient -> ConnState -> STM (Maybe SomeException)
stateX MQTTClient
mc ConnState
Connected

-- | True if we're currently in a normally connected state (in the IO monad).
isConnected :: MQTTClient -> IO Bool
isConnected :: MQTTClient -> IO Bool
isConnected = STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool)
-> (MQTTClient -> STM Bool) -> MQTTClient -> IO Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> STM Bool
isConnectedSTM

-- | True if we're currently in a normally connected state (in the STM monad).
isConnectedSTM :: MQTTClient -> STM Bool
isConnectedSTM :: MQTTClient -> STM Bool
isConnectedSTM MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
MessageCallback
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} = (ConnState
Connected ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
==) (ConnState -> Bool) -> STM ConnState -> STM Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar ConnState -> STM ConnState
forall a. TVar a -> STM a
readTVar TVar ConnState
_st

sendPacket :: MQTTClient -> MQTTPkt -> STM ()
sendPacket :: MQTTClient -> MQTTPkt -> STM ()
sendPacket c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
MessageCallback
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} MQTTPkt
p = MQTTClient -> STM ()
checkConnected MQTTClient
c STM () -> STM () -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TChan MQTTPkt -> MQTTPkt -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan MQTTPkt
_ch MQTTPkt
p

sendPacketIO :: MQTTClient -> MQTTPkt -> IO ()
sendPacketIO :: MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (MQTTPkt -> STM ()) -> MQTTPkt -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> MQTTPkt -> STM ()
sendPacket MQTTClient
c

textToBL :: Text -> BL.ByteString
textToBL :: Topic -> ByteString
textToBL = ByteString -> ByteString
BL.fromStrict (ByteString -> ByteString)
-> (Topic -> ByteString) -> Topic -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Topic -> ByteString
TE.encodeUtf8

blToText :: BL.ByteString -> Text
blToText :: ByteString -> Topic
blToText = ByteString -> Topic
TE.decodeUtf8 (ByteString -> Topic)
-> (ByteString -> ByteString) -> ByteString -> Topic
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BL.toStrict

reservePktID :: MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID :: MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
MessageCallback
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} [DispatchType]
dts = do
  MQTTClient -> STM ()
checkConnected MQTTClient
c
  TChan MQTTPkt
ch <- STM (TChan MQTTPkt)
forall a. STM (TChan a)
newTChan
  Word16
pid <- TVar Word16 -> STM Word16
forall a. TVar a -> STM a
readTVar TVar Word16
_pktID
  TVar Word16 -> (Word16 -> Word16) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Word16
_pktID ((Word16 -> Word16) -> STM ()) -> (Word16 -> Word16) -> STM ()
forall a b. (a -> b) -> a -> b
$ if Word16
pid Word16 -> Word16 -> Bool
forall a. Eq a => a -> a -> Bool
== Word16
forall a. Bounded a => a
maxBound then Word16 -> Word16 -> Word16
forall a b. a -> b -> a
const Word16
1 else Word16 -> Word16
forall a. Enum a => a -> a
succ
  TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
-> (Map (DispatchType, Word16) (TChan MQTTPkt)
    -> Map (DispatchType, Word16) (TChan MQTTPkt))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks (Map (DispatchType, Word16) (TChan MQTTPkt)
-> Map (DispatchType, Word16) (TChan MQTTPkt)
-> Map (DispatchType, Word16) (TChan MQTTPkt)
forall k a. Ord k => Map k a -> Map k a -> Map k a
Map.union ([((DispatchType, Word16), TChan MQTTPkt)]
-> Map (DispatchType, Word16) (TChan MQTTPkt)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [((DispatchType
t, Word16
pid), TChan MQTTPkt
ch) | DispatchType
t <- [DispatchType]
dts]))
  (TChan MQTTPkt, Word16) -> STM (TChan MQTTPkt, Word16)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TChan MQTTPkt
ch,Word16
pid)

releasePktID :: MQTTClient -> (DispatchType,Word16) -> STM ()
releasePktID :: MQTTClient -> (DispatchType, Word16) -> STM ()
releasePktID c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
MessageCallback
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} (DispatchType, Word16)
k = MQTTClient -> STM ()
checkConnected MQTTClient
c STM () -> STM () -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
-> (Map (DispatchType, Word16) (TChan MQTTPkt)
    -> Map (DispatchType, Word16) (TChan MQTTPkt))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks ((DispatchType, Word16)
-> Map (DispatchType, Word16) (TChan MQTTPkt)
-> Map (DispatchType, Word16) (TChan MQTTPkt)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete (DispatchType, Word16)
k)

releasePktIDs :: MQTTClient -> [(DispatchType,Word16)] -> STM ()
releasePktIDs :: MQTTClient -> [(DispatchType, Word16)] -> STM ()
releasePktIDs c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
MessageCallback
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} [(DispatchType, Word16)]
ks = MQTTClient -> STM ()
checkConnected MQTTClient
c STM () -> STM () -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
-> (Map (DispatchType, Word16) (TChan MQTTPkt)
    -> Map (DispatchType, Word16) (TChan MQTTPkt))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks Map (DispatchType, Word16) (TChan MQTTPkt)
-> Map (DispatchType, Word16) (TChan MQTTPkt)
forall a.
Map (DispatchType, Word16) a -> Map (DispatchType, Word16) a
deleteMany
  where deleteMany :: Map (DispatchType, Word16) a -> Map (DispatchType, Word16) a
deleteMany Map (DispatchType, Word16) a
m = ((DispatchType, Word16)
 -> Map (DispatchType, Word16) a -> Map (DispatchType, Word16) a)
-> Map (DispatchType, Word16) a
-> [(DispatchType, Word16)]
-> Map (DispatchType, Word16) a
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (DispatchType, Word16)
-> Map (DispatchType, Word16) a -> Map (DispatchType, Word16) a
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete Map (DispatchType, Word16) a
m [(DispatchType, Word16)]
ks

sendAndWait :: MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait :: MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
MessageCallback
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} DispatchType
dt Word16 -> MQTTPkt
f = do
  (TChan MQTTPkt
ch,Word16
pid) <- STM (TChan MQTTPkt, Word16) -> IO (TChan MQTTPkt, Word16)
forall a. STM a -> IO a
atomically (STM (TChan MQTTPkt, Word16) -> IO (TChan MQTTPkt, Word16))
-> STM (TChan MQTTPkt, Word16) -> IO (TChan MQTTPkt, Word16)
forall a b. (a -> b) -> a -> b
$ do
    (TChan MQTTPkt
ch,Word16
pid) <- MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID MQTTClient
c [DispatchType
dt]
    MQTTClient -> MQTTPkt -> STM ()
sendPacket MQTTClient
c (Word16 -> MQTTPkt
f Word16
pid)
    (TChan MQTTPkt, Word16) -> STM (TChan MQTTPkt, Word16)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TChan MQTTPkt
ch,Word16
pid)

  -- Wait for the response in a separate transaction.
  STM MQTTPkt -> IO MQTTPkt
forall a. STM a -> IO a
atomically (STM MQTTPkt -> IO MQTTPkt) -> STM MQTTPkt -> IO MQTTPkt
forall a b. (a -> b) -> a -> b
$ do
    ConnState
st <- TVar ConnState -> STM ConnState
forall a. TVar a -> STM a
readTVar TVar ConnState
_st
    Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnState
st ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
/= ConnState
Connected) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ String -> STM ()
forall a. String -> a
mqttFail String
"disconnected waiting for response"
    MQTTClient -> (DispatchType, Word16) -> STM ()
releasePktID MQTTClient
c (DispatchType
dt,Word16
pid)
    TChan MQTTPkt -> STM MQTTPkt
forall a. TChan a -> STM a
readTChan TChan MQTTPkt
ch

-- | Subscribe to a list of topic filters with their respective 'QoS'es.
-- The accepted 'QoS'es are returned in the same order as requested.
subscribe :: MQTTClient -> [(Filter, SubOptions)] -> [Property] -> IO ([Either SubErr QoS], [Property])
subscribe :: MQTTClient
-> [(Topic, SubOptions)]
-> [Property]
-> IO ([Either SubErr QoS], [Property])
subscribe c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
MessageCallback
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} [(Topic, SubOptions)]
ls [Property]
props = do
  MQTTPkt
r <- MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait MQTTClient
c DispatchType
DSubACK (\Word16
pid -> SubscribeRequest -> MQTTPkt
SubscribePkt (SubscribeRequest -> MQTTPkt) -> SubscribeRequest -> MQTTPkt
forall a b. (a -> b) -> a -> b
$ Word16
-> [(ByteString, SubOptions)] -> [Property] -> SubscribeRequest
SubscribeRequest Word16
pid [(ByteString, SubOptions)]
ls' [Property]
props)
  let (SubACKPkt (SubscribeResponse Word16
_ [Either SubErr QoS]
rs [Property]
aprops)) = MQTTPkt
r
  ([Either SubErr QoS], [Property])
-> IO ([Either SubErr QoS], [Property])
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([Either SubErr QoS]
rs, [Property]
aprops)

    where ls' :: [(ByteString, SubOptions)]
ls' = ((Topic, SubOptions) -> (ByteString, SubOptions))
-> [(Topic, SubOptions)] -> [(ByteString, SubOptions)]
forall a b. (a -> b) -> [a] -> [b]
map ((Topic -> ByteString)
-> (Topic, SubOptions) -> (ByteString, SubOptions)
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first Topic -> ByteString
textToBL) [(Topic, SubOptions)]
ls

-- | Unsubscribe from a list of topic filters.
--
-- In MQTT 3.1.1, there is no body to an unsubscribe response, so it
-- can be ignored.  If this returns, you were unsubscribed.  In MQTT
-- 5, you'll get a list of unsub status values corresponding to your
-- request filters, and whatever properties the server thought you
-- should know about.
unsubscribe :: MQTTClient -> [Filter] -> [Property] -> IO ([UnsubStatus], [Property])
unsubscribe :: MQTTClient
-> [Topic] -> [Property] -> IO ([UnsubStatus], [Property])
unsubscribe c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
MessageCallback
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} [Topic]
ls [Property]
props = do
  (UnsubACKPkt (UnsubscribeResponse Word16
_ [Property]
rsn [UnsubStatus]
rprop)) <- MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait MQTTClient
c DispatchType
DUnsubACK (\Word16
pid -> UnsubscribeRequest -> MQTTPkt
UnsubscribePkt (UnsubscribeRequest -> MQTTPkt) -> UnsubscribeRequest -> MQTTPkt
forall a b. (a -> b) -> a -> b
$ Word16 -> [ByteString] -> [Property] -> UnsubscribeRequest
UnsubscribeRequest Word16
pid ((Topic -> ByteString) -> [Topic] -> [ByteString]
forall a b. (a -> b) -> [a] -> [b]
map Topic -> ByteString
textToBL [Topic]
ls) [Property]
props)
  ([UnsubStatus], [Property]) -> IO ([UnsubStatus], [Property])
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([UnsubStatus]
rprop, [Property]
rsn)

-- | Publish a message (QoS 0).
publish :: MQTTClient
        -> Topic         -- ^ Topic
        -> BL.ByteString -- ^ Message body
        -> Bool          -- ^ Retain flag
        -> IO ()
publish :: MQTTClient -> Topic -> ByteString -> Bool -> IO ()
publish MQTTClient
c Topic
t ByteString
m Bool
r = IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
publishq MQTTClient
c Topic
t ByteString
m Bool
r QoS
QoS0 [Property]
forall a. Monoid a => a
mempty

-- | Publish a message with the specified QoS and Properties list.
publishq :: MQTTClient
         -> Topic         -- ^ Topic
         -> BL.ByteString -- ^ Message body
         -> Bool          -- ^ Retain flag
         -> QoS           -- ^ QoS
         -> [Property]    -- ^ Properties
         -> IO ()
publishq :: MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
publishq MQTTClient
c Topic
t ByteString
m Bool
r QoS
q [Property]
props = do
  (TChan MQTTPkt
ch,Word16
pid) <- STM (TChan MQTTPkt, Word16) -> IO (TChan MQTTPkt, Word16)
forall a. STM a -> IO a
atomically (STM (TChan MQTTPkt, Word16) -> IO (TChan MQTTPkt, Word16))
-> STM (TChan MQTTPkt, Word16) -> IO (TChan MQTTPkt, Word16)
forall a b. (a -> b) -> a -> b
$ MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID MQTTClient
c [DispatchType]
types
  IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
E.finally (TChan MQTTPkt -> Word16 -> IO ()
publishAndWait TChan MQTTPkt
ch Word16
pid) (STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ MQTTClient -> [(DispatchType, Word16)] -> STM ()
releasePktIDs MQTTClient
c [(DispatchType
t',Word16
pid) | DispatchType
t' <- [DispatchType]
types])

    where
      types :: [DispatchType]
types = [DispatchType
DPubACK, DispatchType
DPubREC, DispatchType
DPubCOMP]
      publishAndWait :: TChan MQTTPkt -> Word16 -> IO ()
publishAndWait TChan MQTTPkt
ch Word16
pid = do
        MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (Word16 -> MQTTPkt
pkt Word16
pid)
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (QoS
q QoS -> QoS -> Bool
forall a. Ord a => a -> a -> Bool
> QoS
QoS0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan MQTTPkt -> Word16 -> IO ()
satisfyQoS TChan MQTTPkt
ch Word16
pid

      pkt :: Word16 -> MQTTPkt
pkt Word16
pid = PublishRequest -> MQTTPkt
PublishPkt (PublishRequest -> MQTTPkt) -> PublishRequest -> MQTTPkt
forall a b. (a -> b) -> a -> b
$ PublishRequest :: Bool
-> QoS
-> Bool
-> ByteString
-> Word16
-> ByteString
-> [Property]
-> PublishRequest
PublishRequest {_pubDup :: Bool
_pubDup = Bool
False,
                                             _pubQoS :: QoS
_pubQoS = QoS
q,
                                             _pubPktID :: Word16
_pubPktID = Word16
pid,
                                             _pubRetain :: Bool
_pubRetain = Bool
r,
                                             _pubTopic :: ByteString
_pubTopic = Topic -> ByteString
textToBL Topic
t,
                                             _pubBody :: ByteString
_pubBody = ByteString
m,
                                             _pubProps :: [Property]
_pubProps = [Property]
props}

      satisfyQoS :: TChan MQTTPkt -> Word16 -> IO ()
satisfyQoS TChan MQTTPkt
ch Word16
pid
        | QoS
q QoS -> QoS -> Bool
forall a. Eq a => a -> a -> Bool
== QoS
QoS0 = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        | QoS
q QoS -> QoS -> Bool
forall a. Eq a => a -> a -> Bool
== QoS
QoS1 = IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            (PubACKPkt (PubACK Word16
_ Word8
st [Property]
pprops)) <- STM MQTTPkt -> IO MQTTPkt
forall a. STM a -> IO a
atomically (STM MQTTPkt -> IO MQTTPkt) -> STM MQTTPkt -> IO MQTTPkt
forall a b. (a -> b) -> a -> b
$ MQTTClient -> STM ()
checkConnected MQTTClient
c STM () -> STM MQTTPkt -> STM MQTTPkt
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TChan MQTTPkt -> STM MQTTPkt
forall a. TChan a -> STM a
readTChan TChan MQTTPkt
ch
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Word8 -> Bool
forall a. (Eq a, Num a) => a -> Bool
isOK Word8
st) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall a. String -> a
mqttFail (String
"qos 1 publish error: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Word8 -> String
forall a. Show a => a -> String
show Word8
st String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> [Property] -> String
forall a. Show a => a -> String
show [Property]
pprops)
        | QoS
q QoS -> QoS -> Bool
forall a. Eq a => a -> a -> Bool
== QoS
QoS2 = IO ()
waitRec
        | Bool
otherwise = String -> IO ()
forall a. HasCallStack => String -> a
error String
"invalid QoS"

        where
          isOK :: a -> Bool
isOK a
0  = Bool
True -- success
          isOK a
16 = Bool
True -- It worked, but nobody cares (no matching subscribers)
          isOK a
_  = Bool
False

          waitRec :: IO ()
waitRec = do
            MQTTPkt
rpkt <- STM MQTTPkt -> IO MQTTPkt
forall a. STM a -> IO a
atomically (STM MQTTPkt -> IO MQTTPkt) -> STM MQTTPkt -> IO MQTTPkt
forall a b. (a -> b) -> a -> b
$ MQTTClient -> STM ()
checkConnected MQTTClient
c STM () -> STM MQTTPkt -> STM MQTTPkt
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TChan MQTTPkt -> STM MQTTPkt
forall a. TChan a -> STM a
readTChan TChan MQTTPkt
ch
            case MQTTPkt
rpkt of
              PubRECPkt (PubREC Word16
_ Word8
st [Property]
recprops) -> do
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Word8 -> Bool
forall a. (Eq a, Num a) => a -> Bool
isOK Word8
st) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall a. String -> a
mqttFail (String
"qos 2 REC publish error: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Word8 -> String
forall a. Show a => a -> String
show Word8
st String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> [Property] -> String
forall a. Show a => a -> String
show [Property]
recprops)
                MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (PubREL -> MQTTPkt
PubRELPkt (PubREL -> MQTTPkt) -> PubREL -> MQTTPkt
forall a b. (a -> b) -> a -> b
$ Word16 -> Word8 -> [Property] -> PubREL
PubREL Word16
pid Word8
0 [Property]
forall a. Monoid a => a
mempty)
              PubCOMPPkt (PubCOMP Word16
_ Word8
st' [Property]
compprops) ->
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word8
st' Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
/= Word8
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall a. String -> a
mqttFail (String
"qos 2 COMP publish error: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Word8 -> String
forall a. Show a => a -> String
show Word8
st' String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> [Property] -> String
forall a. Show a => a -> String
show [Property]
compprops)
              MQTTPkt
wtf -> String -> IO ()
forall a. String -> a
mqttFail (String
"unexpected packet received in QoS2 publish: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> MQTTPkt -> String
forall a. Show a => a -> String
show MQTTPkt
wtf)

-- | Disconnect from the MQTT server.
disconnect :: MQTTClient -> DiscoReason -> [Property] -> IO ()
disconnect :: MQTTClient -> DiscoReason -> [Property] -> IO ()
disconnect c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
MessageCallback
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} DiscoReason
reason [Property]
props = IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
race_ IO ()
getDisconnected IO ()
orDieTrying
  where
    getDisconnected :: IO ()
getDisconnected = do
      MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (DisconnectRequest -> MQTTPkt
DisconnectPkt (DisconnectRequest -> MQTTPkt) -> DisconnectRequest -> MQTTPkt
forall a b. (a -> b) -> a -> b
$ DiscoReason -> [Property] -> DisconnectRequest
DisconnectRequest DiscoReason
reason [Property]
props)
      IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ())
-> (Maybe (Async ()) -> IO (Maybe ())) -> Maybe (Async ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Async () -> IO ()) -> Maybe (Async ()) -> IO (Maybe ())
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse Async () -> IO ()
forall a. Async a -> IO a
wait (Maybe (Async ()) -> IO ()) -> IO (Maybe (Async ())) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ConnState -> ConnState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st ConnState
Stopped
    orDieTrying :: IO ()
orDieTrying = Int -> IO ()
threadDelay Int
10000000 IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MQTTClient -> MQTTException -> IO ()
forall e. Exception e => MQTTClient -> e -> IO ()
killConn MQTTClient
c MQTTException
Timeout

-- | Disconnect with 'DiscoNormalDisconnection' and no properties.
normalDisconnect :: MQTTClient -> IO ()
normalDisconnect :: MQTTClient -> IO ()
normalDisconnect MQTTClient
c = MQTTClient -> DiscoReason -> [Property] -> IO ()
disconnect MQTTClient
c DiscoReason
DiscoNormalDisconnection [Property]
forall a. Monoid a => a
mempty

-- | A convenience method for creating a 'LastWill'.
mkLWT :: Topic -> BL.ByteString -> Bool -> T.LastWill
mkLWT :: Topic -> ByteString -> Bool -> LastWill
mkLWT Topic
t ByteString
m Bool
r = LastWill :: Bool -> QoS -> ByteString -> ByteString -> [Property] -> LastWill
T.LastWill{
  _willRetain :: Bool
T._willRetain=Bool
r,
  _willQoS :: QoS
T._willQoS=QoS
QoS0,
  _willTopic :: ByteString
T._willTopic = Topic -> ByteString
textToBL Topic
t,
  _willMsg :: ByteString
T._willMsg=ByteString
m,
  _willProps :: [Property]
T._willProps=[Property]
forall a. Monoid a => a
mempty
  }

-- | Get the list of properties that were sent from the broker at connect time.
svrProps :: MQTTClient -> IO [Property]
svrProps :: MQTTClient -> IO [Property]
svrProps MQTTClient
mc = ConnACKFlags -> [Property]
p (ConnACKFlags -> [Property]) -> IO ConnACKFlags -> IO [Property]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM ConnACKFlags -> IO ConnACKFlags
forall a. STM a -> IO a
atomically (MQTTClient -> STM ConnACKFlags
connACKSTM MQTTClient
mc)
  where p :: ConnACKFlags -> [Property]
p (ConnACKFlags SessionReuse
_ ConnACKRC
_ [Property]
props) = [Property]
props

-- | Get the complete connection ACK packet from the beginning of this session.
connACKSTM :: MQTTClient -> STM ConnACKFlags
connACKSTM :: MQTTClient -> STM ConnACKFlags
connACKSTM MQTTClient{TVar ConnACKFlags
_connACKFlags :: TVar ConnACKFlags
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_connACKFlags} = TVar ConnACKFlags -> STM ConnACKFlags
forall a. TVar a -> STM a
readTVar TVar ConnACKFlags
_connACKFlags

-- | Get the complete connection aCK packet from the beginning of this session.
connACK :: MQTTClient -> IO ConnACKFlags
connACK :: MQTTClient -> IO ConnACKFlags
connACK = STM ConnACKFlags -> IO ConnACKFlags
forall a. STM a -> IO a
atomically (STM ConnACKFlags -> IO ConnACKFlags)
-> (MQTTClient -> STM ConnACKFlags)
-> MQTTClient
-> IO ConnACKFlags
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> STM ConnACKFlags
connACKSTM

maxAliases :: MQTTClient -> IO Word16
maxAliases :: MQTTClient -> IO Word16
maxAliases MQTTClient
mc = (Property -> Word16 -> Word16) -> Word16 -> [Property] -> Word16
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr Property -> Word16 -> Word16
f Word16
0 ([Property] -> Word16) -> IO [Property] -> IO Word16
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MQTTClient -> IO [Property]
svrProps MQTTClient
mc
  where
    f :: Property -> Word16 -> Word16
f (PropTopicAliasMaximum Word16
n) Word16
_ = Word16
n
    f Property
_ Word16
o                         = Word16
o

-- | Publish a message with the specified 'QoS' and 'Property' list.  If
-- possible, use an alias to shorten the message length.  The alias
-- list is managed by the client in a first-come, first-served basis,
-- so if you use this with more properties than the broker allows,
-- only the first N (up to TopicAliasMaximum, as specified by the
-- broker at connect time) will be aliased.
--
-- This is safe to use as a general publish mechanism, as it will
-- default to not aliasing whenver there's not already an alias and we
-- can't create any more.
pubAliased :: MQTTClient
         -> Topic         -- ^ Topic
         -> BL.ByteString -- ^ Message body
         -> Bool          -- ^ Retain flag
         -> QoS           -- ^ QoS
         -> [Property]    -- ^ Properties
         -> IO ()
pubAliased :: MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
pubAliased c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
MessageCallback
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} Topic
t ByteString
m Bool
r QoS
q [Property]
props = do
  Word16
x <- MQTTClient -> IO Word16
maxAliases MQTTClient
c
  (Topic
t', Word16
n) <- Word16 -> IO (Topic, Word16)
alias Word16
x
  let np :: [Property]
np = [Property]
props [Property] -> [Property] -> [Property]
forall a. Semigroup a => a -> a -> a
<> case Word16
n of
                      Word16
0 -> [Property]
forall a. Monoid a => a
mempty
                      Word16
_ -> [Word16 -> Property
PropTopicAlias Word16
n]
  MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
publishq MQTTClient
c Topic
t' ByteString
m Bool
r QoS
q [Property]
np

  where
    alias :: Word16 -> IO (Topic, Word16)
alias Word16
mv = STM (Topic, Word16) -> IO (Topic, Word16)
forall a. STM a -> IO a
atomically (STM (Topic, Word16) -> IO (Topic, Word16))
-> STM (Topic, Word16) -> IO (Topic, Word16)
forall a b. (a -> b) -> a -> b
$ do
      Map Topic Word16
as <- TVar (Map Topic Word16) -> STM (Map Topic Word16)
forall a. TVar a -> STM a
readTVar TVar (Map Topic Word16)
_outA
      let n :: Word16
n = Int -> Word16
forall a. Enum a => Int -> a
toEnum (Map Topic Word16 -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length Map Topic Word16
as Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
          cur :: Maybe Word16
cur = Topic -> Map Topic Word16 -> Maybe Word16
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Topic
t Map Topic Word16
as
          v :: Word16
v = Word16 -> Maybe Word16 -> Word16
forall a. a -> Maybe a -> a
fromMaybe (if Word16
n Word16 -> Word16 -> Bool
forall a. Ord a => a -> a -> Bool
> Word16
mv then Word16
0 else Word16
n) Maybe Word16
cur
      Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word16
v Word16 -> Word16 -> Bool
forall a. Ord a => a -> a -> Bool
> Word16
0) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ TVar (Map Topic Word16) -> Map Topic Word16 -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Map Topic Word16)
_outA (Topic -> Word16 -> Map Topic Word16 -> Map Topic Word16
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Topic
t Word16
v Map Topic Word16
as)
      (Topic, Word16) -> STM (Topic, Word16)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Topic -> (Word16 -> Topic) -> Maybe Word16 -> Topic
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Topic
t (Topic -> Word16 -> Topic
forall a b. a -> b -> a
const Topic
"") Maybe Word16
cur, Word16
v)

-- | Register a callback handler for a message with the given correlated data identifier.
--
-- This registration will remain in place until unregisterCorrelated is called to remove it.
registerCorrelated :: MQTTClient -> BL.ByteString -> MessageCallback -> STM ()
registerCorrelated :: MQTTClient -> ByteString -> MessageCallback -> STM ()
registerCorrelated MQTTClient{TVar (Map ByteString MessageCallback)
_corr :: TVar (Map ByteString MessageCallback)
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_corr} ByteString
bs MessageCallback
cb = TVar (Map ByteString MessageCallback)
-> (Map ByteString MessageCallback
    -> Map ByteString MessageCallback)
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map ByteString MessageCallback)
_corr (ByteString
-> MessageCallback
-> Map ByteString MessageCallback
-> Map ByteString MessageCallback
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert ByteString
bs MessageCallback
cb)

-- | Unregister a callback handler for the given correlated data identifier.
unregisterCorrelated :: MQTTClient -> BL.ByteString -> STM ()
unregisterCorrelated :: MQTTClient -> ByteString -> STM ()
unregisterCorrelated MQTTClient{TVar (Map ByteString MessageCallback)
_corr :: TVar (Map ByteString MessageCallback)
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_corr} ByteString
bs = TVar (Map ByteString MessageCallback)
-> (Map ByteString MessageCallback
    -> Map ByteString MessageCallback)
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map ByteString MessageCallback)
_corr (ByteString
-> Map ByteString MessageCallback -> Map ByteString MessageCallback
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete ByteString
bs)