{-|
Module      : Network.MQTT.Client.
Description : An MQTT client.
Copyright   : (c) Dustin Sallings, 2019
License     : BSD3
Maintainer  : dustin@spy.net
Stability   : experimental

An MQTT protocol client

Both
<http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html MQTT 3.1.1>
and
<https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html MQTT 5.0>
are supported over plain TCP, TLS, WebSockets and Secure WebSockets.
-}

{-# LANGUAGE LambdaCase        #-}
{-# LANGUAGE NamedFieldPuns    #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards   #-}

module Network.MQTT.Client (
  -- * Configuring the client.
  MQTTConfig(..), MQTTClient, QoS(..), Topic, mqttConfig,  mkLWT, LastWill(..),
  ProtocolLevel(..), Property(..), SubOptions(..), subOptions, MessageCallback(..),
  -- * Running and waiting for the client.
  waitForClient,
  connectURI, isConnected,
  disconnect, normalDisconnect, stopClient,
  -- * General client interactions.
  subscribe, unsubscribe, publish, publishq, pubAliased,
  svrProps, connACK, MQTTException(..),
  -- * Low-level bits
  runMQTTConduit, MQTTConduit, isConnectedSTM, connACKSTM,
  registerCorrelated, unregisterCorrelated
  ) where

import           Control.Concurrent         (myThreadId, threadDelay)
import           Control.Concurrent.Async   (Async, async, asyncThreadId, cancel, cancelWith, link, race_, wait,
                                             waitAnyCancel)
import           Control.Concurrent.MVar    (MVar, newEmptyMVar, putMVar, takeMVar)
import           Control.Concurrent.STM     (STM, TChan, TVar, atomically, check, modifyTVar', newTChan, newTChanIO,
                                             newTVarIO, orElse, readTChan, readTVar, readTVarIO, registerDelay, retry,
                                             writeTChan, writeTVar)
import           Control.DeepSeq            (force)
import qualified Control.Exception          as E
import           Control.Monad              (forever, guard, join, unless, void, when)
import           Control.Monad.IO.Class     (liftIO)
import           Data.Bifunctor             (first)
import qualified Data.ByteString.Char8      as BCS
import qualified Data.ByteString.Lazy       as BL
import qualified Data.ByteString.Lazy.Char8 as BC
import           Data.Conduit               (ConduitT, Void, await, runConduit, yield, (.|))
import           Data.Conduit.Attoparsec    (conduitParser)
import qualified Data.Conduit.Combinators   as C
import           Data.Conduit.Network       (AppData, appSink, appSource, clientSettings, runTCPClient)
import           Data.Conduit.Network.TLS   (runTLSClient, tlsClientConfig, tlsClientTLSSettings)
import           Data.Foldable              (traverse_)
import           Data.Map.Strict            (Map)
import qualified Data.Map.Strict            as Map
import           Data.Maybe                 (fromJust, fromMaybe)
import           Data.Text                  (Text)
import qualified Data.Text.Encoding         as TE
import           Data.Word                  (Word16)
import           GHC.Conc                   (labelThread)
import           Network.Connection         (ConnectionParams (..), TLSSettings (..), connectTo, connectionClose,
                                             connectionGetChunk, connectionPut, initConnectionContext)
import           Network.URI                (URI (..), nullURIAuth, unEscapeString, uriPort, uriRegName, uriUserInfo)
import qualified Network.WebSockets         as WS
import           Network.WebSockets.Stream  (makeStream)
import           System.IO.Error            (catchIOError, isEOFError)
import           System.Timeout             (timeout)

import qualified Data.Set                   as Set
import           Network.MQTT.Topic         (Filter, Topic, mkTopic, unFilter, unTopic)
import           Network.MQTT.Types         as T

data ConnState = Starting
               | Connected
               | Stopped
               | Disconnected
               | DiscoErr DisconnectRequest
               | ConnErr ConnACKFlags deriving (ConnState -> ConnState -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConnState -> ConnState -> Bool
$c/= :: ConnState -> ConnState -> Bool
== :: ConnState -> ConnState -> Bool
$c== :: ConnState -> ConnState -> Bool
Eq, Int -> ConnState -> ShowS
[ConnState] -> ShowS
ConnState -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConnState] -> ShowS
$cshowList :: [ConnState] -> ShowS
show :: ConnState -> String
$cshow :: ConnState -> String
showsPrec :: Int -> ConnState -> ShowS
$cshowsPrec :: Int -> ConnState -> ShowS
Show)

data DispatchType = DSubACK | DUnsubACK | DPubACK | DPubREC | DPubREL | DPubCOMP
  deriving (DispatchType -> DispatchType -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: DispatchType -> DispatchType -> Bool
$c/= :: DispatchType -> DispatchType -> Bool
== :: DispatchType -> DispatchType -> Bool
$c== :: DispatchType -> DispatchType -> Bool
Eq, Int -> DispatchType -> ShowS
[DispatchType] -> ShowS
DispatchType -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [DispatchType] -> ShowS
$cshowList :: [DispatchType] -> ShowS
show :: DispatchType -> String
$cshow :: DispatchType -> String
showsPrec :: Int -> DispatchType -> ShowS
$cshowsPrec :: Int -> DispatchType -> ShowS
Show, Eq DispatchType
DispatchType -> DispatchType -> Bool
DispatchType -> DispatchType -> Ordering
DispatchType -> DispatchType -> DispatchType
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: DispatchType -> DispatchType -> DispatchType
$cmin :: DispatchType -> DispatchType -> DispatchType
max :: DispatchType -> DispatchType -> DispatchType
$cmax :: DispatchType -> DispatchType -> DispatchType
>= :: DispatchType -> DispatchType -> Bool
$c>= :: DispatchType -> DispatchType -> Bool
> :: DispatchType -> DispatchType -> Bool
$c> :: DispatchType -> DispatchType -> Bool
<= :: DispatchType -> DispatchType -> Bool
$c<= :: DispatchType -> DispatchType -> Bool
< :: DispatchType -> DispatchType -> Bool
$c< :: DispatchType -> DispatchType -> Bool
compare :: DispatchType -> DispatchType -> Ordering
$ccompare :: DispatchType -> DispatchType -> Ordering
Ord, Int -> DispatchType
DispatchType -> Int
DispatchType -> [DispatchType]
DispatchType -> DispatchType
DispatchType -> DispatchType -> [DispatchType]
DispatchType -> DispatchType -> DispatchType -> [DispatchType]
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
enumFromThenTo :: DispatchType -> DispatchType -> DispatchType -> [DispatchType]
$cenumFromThenTo :: DispatchType -> DispatchType -> DispatchType -> [DispatchType]
enumFromTo :: DispatchType -> DispatchType -> [DispatchType]
$cenumFromTo :: DispatchType -> DispatchType -> [DispatchType]
enumFromThen :: DispatchType -> DispatchType -> [DispatchType]
$cenumFromThen :: DispatchType -> DispatchType -> [DispatchType]
enumFrom :: DispatchType -> [DispatchType]
$cenumFrom :: DispatchType -> [DispatchType]
fromEnum :: DispatchType -> Int
$cfromEnum :: DispatchType -> Int
toEnum :: Int -> DispatchType
$ctoEnum :: Int -> DispatchType
pred :: DispatchType -> DispatchType
$cpred :: DispatchType -> DispatchType
succ :: DispatchType -> DispatchType
$csucc :: DispatchType -> DispatchType
Enum, DispatchType
forall a. a -> a -> Bounded a
maxBound :: DispatchType
$cmaxBound :: DispatchType
minBound :: DispatchType
$cminBound :: DispatchType
Bounded)


-- | Callback invoked on each incoming subscribed message.
data MessageCallback = NoCallback
  -- | Callbacks will be invoked asynchronously, ordering is likely preserved, but not guaranteed.
  -- In high throughput scenarios, slow callbacks may result in a high number of Haskell threads,
  -- potentially bringing down the entire application when running out of memory.
  -- Typically faster than `OrderedCallback`.
  | SimpleCallback (MQTTClient -> Topic -> BL.ByteString -> [Property] -> IO ())
  -- | Callbacks are guaranteed to be invoked in the same order messages are received.
  -- In high throughput scenarios, slow callbacks may cause the underlying TCP connection to block,
  -- potentially being terminated by the broker.
  -- Typically slower than `SimpleCallback`.
  | OrderedCallback (MQTTClient -> Topic -> BL.ByteString -> [Property] -> IO ())
  -- | A LowLevelCallback receives the client and the entire publish request, providing
  -- access to all of the fields of the request.  This is slightly harder to use than
  -- SimpleCallback for common cases, but there are cases where you need all the things.
  | LowLevelCallback (MQTTClient -> PublishRequest -> IO ())
  -- | A low level callback that is ordered.
  | OrderedLowLevelCallback (MQTTClient -> PublishRequest -> IO ())

-- | The MQTT client.
--
-- See 'connectURI' for the most straightforward example.
data MQTTClient = MQTTClient {
  MQTTClient -> TChan MQTTPkt
_ch             :: TChan MQTTPkt
  , MQTTClient -> TVar Word16
_pktID        :: TVar Word16
  , MQTTClient -> MessageCallback
_cb           :: MessageCallback
  , MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks         :: TVar (Map (DispatchType,Word16) (TChan MQTTPkt))
  , MQTTClient -> TVar (Map Word16 PublishRequest)
_inflight     :: TVar (Map Word16 PublishRequest)
  , MQTTClient -> TVar ConnState
_st           :: TVar ConnState
  , MQTTClient -> TVar (Maybe (Async ()))
_ct           :: TVar (Maybe (Async ()))
  , MQTTClient -> TVar (Map Topic Word16)
_outA         :: TVar (Map Topic Word16)
  , MQTTClient -> TVar (Map Word16 Topic)
_inA          :: TVar (Map Word16 Topic)
  , MQTTClient -> TVar ConnACKFlags
_connACKFlags :: TVar ConnACKFlags
  , MQTTClient -> TVar (Map ByteString MessageCallback)
_corr         :: TVar (Map BL.ByteString MessageCallback)
  , MQTTClient -> MVar (IO ())
_cbM          :: MVar (IO ())
  , MQTTClient -> TVar (Maybe (Async ()))
_cbHandle     :: TVar (Maybe (Async ()))
  }

-- | Configuration for setting up an MQTT client.
data MQTTConfig = MQTTConfig{
  MQTTConfig -> Bool
_cleanSession     :: Bool -- ^ False if a session should be reused.
  , MQTTConfig -> Maybe LastWill
_lwt            :: Maybe LastWill -- ^ LastWill message to be sent on client disconnect.
  , MQTTConfig -> MessageCallback
_msgCB          :: MessageCallback -- ^ Callback for incoming messages.
  , MQTTConfig -> ProtocolLevel
_protocol       :: ProtocolLevel -- ^ Protocol to use for the connection.
  , MQTTConfig -> [Property]
_connProps      :: [Property] -- ^ Properties to send to the broker in the CONNECT packet.
  , MQTTConfig -> String
_hostname       :: String -- ^ Host to connect to (parsed from the URI)
  , MQTTConfig -> Int
_port           :: Int -- ^ Port number (parsed from the URI)
  , MQTTConfig -> String
_connID         :: String -- ^ Unique connection ID (parsed from the URI)
  , MQTTConfig -> Maybe String
_username       :: Maybe String -- ^ Optional username (parsed from the URI)
  , MQTTConfig -> Maybe String
_password       :: Maybe String -- ^ Optional password (parsed from the URI)
  , MQTTConfig -> Int
_connectTimeout :: Int -- ^ Connection timeout (microseconds)
  , MQTTConfig -> TLSSettings
_tlsSettings    :: TLSSettings -- ^ TLS Settings for secure connections
  , MQTTConfig -> Int
_pingPeriod     :: Int -- ^ Time in seconds between pings
  , MQTTConfig -> Int
_pingPatience   :: Int -- ^ Time in seconds for which there must be no incoming packets before the broker is considered dead. Should be more than the ping period plus the maximum expected ping round trip time.
  }

-- | A default 'MQTTConfig'.  A '_connID' /may/ be required depending on
-- your broker (or if you just want an identifiable/resumable
-- connection).  In MQTTv5, an empty connection ID may be sent and the
-- server may assign an identifier for you and return it in the
-- 'PropAssignedClientIdentifier' 'Property'.
mqttConfig :: MQTTConfig
mqttConfig :: MQTTConfig
mqttConfig = MQTTConfig{_hostname :: String
_hostname=String
"", _port :: Int
_port=Int
1883, _connID :: String
_connID=String
"",
                        _username :: Maybe String
_username=forall a. Maybe a
Nothing, _password :: Maybe String
_password=forall a. Maybe a
Nothing,
                        _cleanSession :: Bool
_cleanSession=Bool
True, _lwt :: Maybe LastWill
_lwt=forall a. Maybe a
Nothing,
                        _msgCB :: MessageCallback
_msgCB=MessageCallback
NoCallback,
                        _protocol :: ProtocolLevel
_protocol=ProtocolLevel
Protocol311, _connProps :: [Property]
_connProps=forall a. Monoid a => a
mempty,
                        _connectTimeout :: Int
_connectTimeout=Int
180000000,
                        _tlsSettings :: TLSSettings
_tlsSettings=Bool -> Bool -> Bool -> TLSSettings
TLSSettingsSimple Bool
False Bool
False Bool
False,
                        _pingPeriod :: Int
_pingPeriod=Int
30000000,
                        _pingPatience :: Int
_pingPatience=Int
90000000}

-- | Connect to an MQTT server by URI.
--
-- @mqtt://@, @mqtts://@, @ws://@, and @wss://@ URLs are supported.
-- The host, port, username, and password will be derived from the URI
-- and the values supplied in the config will be ignored.
--
-- > main :: IO
-- > main = do
-- >   let (Just uri) = parseURI "mqtt://test.mosquitto.org"
-- >   mc <- connectURI mqttConfig{} uri
-- >   publish mc "tmp/topic" "hello!" False
connectURI :: MQTTConfig -> URI -> IO MQTTClient
connectURI :: MQTTConfig -> URI -> IO MQTTClient
connectURI cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
TLSSettings
ProtocolLevel
MessageCallback
_pingPatience :: Int
_pingPeriod :: Int
_tlsSettings :: TLSSettings
_connectTimeout :: Int
_password :: Maybe String
_username :: Maybe String
_connID :: String
_port :: Int
_hostname :: String
_connProps :: [Property]
_protocol :: ProtocolLevel
_msgCB :: MessageCallback
_lwt :: Maybe LastWill
_cleanSession :: Bool
_pingPatience :: MQTTConfig -> Int
_pingPeriod :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_connectTimeout :: MQTTConfig -> Int
_password :: MQTTConfig -> Maybe String
_username :: MQTTConfig -> Maybe String
_connID :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_hostname :: MQTTConfig -> String
_connProps :: MQTTConfig -> [Property]
_protocol :: MQTTConfig -> ProtocolLevel
_msgCB :: MQTTConfig -> MessageCallback
_lwt :: MQTTConfig -> Maybe LastWill
_cleanSession :: MQTTConfig -> Bool
..} URI
uri = do
  let cf :: MQTTConfig -> IO MQTTClient
cf = case URI -> String
uriScheme URI
uri of
             String
"mqtt:"  -> MQTTConfig -> IO MQTTClient
runClient
             String
"mqtts:" -> MQTTConfig -> IO MQTTClient
runClientTLS
             String
"ws:"    -> URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS URI
uri Bool
False
             String
"wss:"   -> URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS URI
uri Bool
True
             String
us       -> forall a. String -> a
mqttFail forall a b. (a -> b) -> a -> b
$ String
"invalid URI scheme: " forall a. Semigroup a => a -> a -> a
<> String
us

      a :: URIAuth
a = forall a. a -> Maybe a -> a
fromMaybe URIAuth
nullURIAuth forall a b. (a -> b) -> a -> b
$ URI -> Maybe URIAuth
uriAuthority URI
uri
      (Maybe String
u,Maybe String
p) = String -> (Maybe String, Maybe String)
up (URIAuth -> String
uriUserInfo URIAuth
a)

  Maybe MQTTClient
v <- forall a. String -> Int -> IO a -> IO (Maybe a)
namedTimeout String
"MQTT connect" Int
_connectTimeout forall a b. (a -> b) -> a -> b
$
    MQTTConfig -> IO MQTTClient
cf MQTTConfig
cfg{_connID :: String
Network.MQTT.Client._connID=forall {p}. p -> ShowS
cid ProtocolLevel
_protocol (URI -> String
uriFragment URI
uri),
           _hostname :: String
_hostname=URIAuth -> String
uriRegName URIAuth
a, _port :: Int
_port=forall {a} {a}.
(Eq a, IsString a, Num a, Read a) =>
String -> a -> a
port (URIAuth -> String
uriPort URIAuth
a) (URI -> String
uriScheme URI
uri),
           _username :: Maybe String
Network.MQTT.Client._username=Maybe String
u, _password :: Maybe String
Network.MQTT.Client._password=Maybe String
p}

  forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall a. String -> a
mqttFail forall a b. (a -> b) -> a -> b
$ String
"connection to " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show URI
uri forall a. Semigroup a => a -> a -> a
<> String
" timed out") forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe MQTTClient
v

  where
    port :: String -> a -> a
port String
"" a
"mqtt:"  = a
1883
    port String
"" a
"mqtts:" = a
8883
    port String
"" a
"ws:"    = a
80
    port String
"" a
"wss:"   = a
443
    port String
x a
_         = (forall a. Read a => String -> a
read forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. [a] -> [a]
tail) String
x

    cid :: p -> ShowS
cid p
_ [Char
'#']    = String
""
    cid p
_ (Char
'#':String
xs) = String
xs
    cid p
_ String
_        = String
""

    up :: String -> (Maybe String, Maybe String)
up String
"" = (forall a. Maybe a
Nothing, forall a. Maybe a
Nothing)
    up String
x = let (String
u,String
r) = forall a. (a -> Bool) -> [a] -> ([a], [a])
break (forall a. Eq a => a -> a -> Bool
== Char
':') (forall a. [a] -> [a]
init String
x) in
             (forall a. a -> Maybe a
Just (ShowS
unEscapeString String
u), if String
r forall a. Eq a => a -> a -> Bool
== String
"" then forall a. Maybe a
Nothing else forall a. a -> Maybe a
Just (ShowS
unEscapeString forall a b. (a -> b) -> a -> b
$ forall a. [a] -> [a]
tail String
r))


-- | Set up and run a client from the given config.
runClient :: MQTTConfig -> IO MQTTClient
runClient :: MQTTConfig -> IO MQTTClient
runClient cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
TLSSettings
ProtocolLevel
MessageCallback
_pingPatience :: Int
_pingPeriod :: Int
_tlsSettings :: TLSSettings
_connectTimeout :: Int
_password :: Maybe String
_username :: Maybe String
_connID :: String
_port :: Int
_hostname :: String
_connProps :: [Property]
_protocol :: ProtocolLevel
_msgCB :: MessageCallback
_lwt :: Maybe LastWill
_cleanSession :: Bool
_pingPatience :: MQTTConfig -> Int
_pingPeriod :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_connectTimeout :: MQTTConfig -> Int
_password :: MQTTConfig -> Maybe String
_username :: MQTTConfig -> Maybe String
_connID :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_hostname :: MQTTConfig -> String
_connProps :: MQTTConfig -> [Property]
_protocol :: MQTTConfig -> ProtocolLevel
_msgCB :: MQTTConfig -> MessageCallback
_lwt :: MQTTConfig -> Maybe LastWill
_cleanSession :: MQTTConfig -> Bool
..} = ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat (forall a. ClientSettings -> (AppData -> IO a) -> IO a
runTCPClient (Int -> ByteString -> ClientSettings
clientSettings Int
_port (String -> ByteString
BCS.pack String
_hostname))) MQTTConfig
cfg

-- | Set up and run a client connected via TLS.
runClientTLS :: MQTTConfig -> IO MQTTClient
runClientTLS :: MQTTConfig -> IO MQTTClient
runClientTLS cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
TLSSettings
ProtocolLevel
MessageCallback
_pingPatience :: Int
_pingPeriod :: Int
_tlsSettings :: TLSSettings
_connectTimeout :: Int
_password :: Maybe String
_username :: Maybe String
_connID :: String
_port :: Int
_hostname :: String
_connProps :: [Property]
_protocol :: ProtocolLevel
_msgCB :: MessageCallback
_lwt :: Maybe LastWill
_cleanSession :: Bool
_pingPatience :: MQTTConfig -> Int
_pingPeriod :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_connectTimeout :: MQTTConfig -> Int
_password :: MQTTConfig -> Maybe String
_username :: MQTTConfig -> Maybe String
_connID :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_hostname :: MQTTConfig -> String
_connProps :: MQTTConfig -> [Property]
_protocol :: MQTTConfig -> ProtocolLevel
_msgCB :: MQTTConfig -> MessageCallback
_lwt :: MQTTConfig -> Maybe LastWill
_cleanSession :: MQTTConfig -> Bool
..} = ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat (forall (m :: * -> *) a.
MonadUnliftIO m =>
TLSClientConfig -> (AppData -> m a) -> m a
runTLSClient TLSClientConfig
tlsConf) MQTTConfig
cfg
  where tlsConf :: TLSClientConfig
tlsConf = (Int -> ByteString -> TLSClientConfig
tlsClientConfig Int
_port (String -> ByteString
BCS.pack String
_hostname)) {tlsClientTLSSettings :: TLSSettings
tlsClientTLSSettings=TLSSettings
_tlsSettings}

-- Compatibility mechanisms for TCP Conduit bits.
tcpCompat :: ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat :: ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat (AppData -> IO ()) -> IO ()
mkconn = ((MQTTConduit -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
runMQTTConduit (forall {a} {m :: * -> *} {m :: * -> *} {c} {t} {i} {o}.
(HasReadWrite a, MonadIO m, MonadIO m) =>
((a -> c) -> t)
-> ((ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c)
-> t
adapt (AppData -> IO ()) -> IO ()
mkconn)
  where adapt :: ((a -> c) -> t)
-> ((ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c)
-> t
adapt (a -> c) -> t
mk (ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c
f = (a -> c) -> t
mk ((ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall {ad} {m :: * -> *} {m :: * -> *} {i} {o}.
(HasReadWrite ad, MonadIO m, MonadIO m) =>
ad -> (ConduitT i ByteString m (), ConduitT ByteString o m ())
adaptor)
        adaptor :: ad -> (ConduitT i ByteString m (), ConduitT ByteString o m ())
adaptor ad
ad = (forall ad (m :: * -> *) i.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT i ByteString m ()
appSource ad
ad, forall ad (m :: * -> *) o.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT ByteString o m ()
appSink ad
ad)

runWS :: URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS :: URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS URI{String
uriPath :: URI -> String
uriPath :: String
uriPath, String
uriQuery :: URI -> String
uriQuery :: String
uriQuery} Bool
secure cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
TLSSettings
ProtocolLevel
MessageCallback
_pingPatience :: Int
_pingPeriod :: Int
_tlsSettings :: TLSSettings
_connectTimeout :: Int
_password :: Maybe String
_username :: Maybe String
_connID :: String
_port :: Int
_hostname :: String
_connProps :: [Property]
_protocol :: ProtocolLevel
_msgCB :: MessageCallback
_lwt :: Maybe LastWill
_cleanSession :: Bool
_pingPatience :: MQTTConfig -> Int
_pingPeriod :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_connectTimeout :: MQTTConfig -> Int
_password :: MQTTConfig -> Maybe String
_username :: MQTTConfig -> Maybe String
_connID :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_hostname :: MQTTConfig -> String
_connProps :: MQTTConfig -> [Property]
_protocol :: MQTTConfig -> ProtocolLevel
_msgCB :: MQTTConfig -> MessageCallback
_lwt :: MQTTConfig -> Maybe LastWill
_cleanSession :: MQTTConfig -> Bool
..} =
  ((MQTTConduit -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
runMQTTConduit (forall {c} {t}. ((Connection -> c) -> t) -> (MQTTConduit -> c) -> t
adapt forall a b. (a -> b) -> a -> b
$ Bool
-> String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> ClientApp ()
-> IO ()
cf Bool
secure String
_hostname Int
_port String
endpoint ConnectionOptions
WS.defaultConnectionOptions Headers
hdrs) MQTTConfig
cfg

  where
    hdrs :: Headers
hdrs = [(CI ByteString
"Sec-WebSocket-Protocol", ByteString
"mqtt")]
    adapt :: ((Connection -> c) -> t) -> (MQTTConduit -> c) -> t
adapt (Connection -> c) -> t
mk MQTTConduit -> c
f = (Connection -> c) -> t
mk (MQTTConduit -> c
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> MQTTConduit
adaptor)
    adaptor :: Connection -> MQTTConduit
adaptor Connection
s = (Connection -> ConduitT () ByteString IO ()
wsSource Connection
s, Connection -> ConduitT ByteString Void IO ()
wsSink Connection
s)

    endpoint :: String
endpoint = String
uriPath forall a. Semigroup a => a -> a -> a
<> String
uriQuery

    cf :: Bool -> String -> Int -> String -> WS.ConnectionOptions -> WS.Headers -> WS.ClientApp () -> IO ()
    cf :: Bool
-> String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> ClientApp ()
-> IO ()
cf Bool
False = forall a.
String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> ClientApp a
-> IO a
WS.runClientWith
    cf Bool
True  = String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> ClientApp ()
-> IO ()
runWSS

    wsSource :: WS.Connection -> ConduitT () BCS.ByteString IO ()
    wsSource :: Connection -> ConduitT () ByteString IO ()
wsSource Connection
ws = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
      ByteString
bs <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. WebSocketsData a => Connection -> IO a
WS.receiveData Connection
ws
      forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteString -> Bool
BCS.null ByteString
bs) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield ByteString
bs

    wsSink :: WS.Connection -> ConduitT BCS.ByteString Void IO ()
    wsSink :: Connection -> ConduitT ByteString Void IO ()
wsSink Connection
ws = forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (\ByteString
bs -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a. WebSocketsData a => Connection -> a -> IO ()
WS.sendBinaryData Connection
ws ByteString
bs) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Connection -> ConduitT ByteString Void IO ()
wsSink Connection
ws) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await

    runWSS :: String -> Int -> String -> WS.ConnectionOptions -> WS.Headers -> WS.ClientApp () -> IO ()
    runWSS :: String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> ClientApp ()
-> IO ()
runWSS String
host Int
port String
path ConnectionOptions
options Headers
hdrs' ClientApp ()
app = do
      let connectionParams :: ConnectionParams
connectionParams = ConnectionParams
            { connectionHostname :: String
connectionHostname = String
host
            , connectionPort :: PortNumber
connectionPort =  forall a. Enum a => Int -> a
toEnum Int
port
            , connectionUseSecure :: Maybe TLSSettings
connectionUseSecure = forall a. a -> Maybe a
Just TLSSettings
_tlsSettings
            , connectionUseSocks :: Maybe ProxySettings
connectionUseSocks = forall a. Maybe a
Nothing
            }

      ConnectionContext
context <- IO ConnectionContext
initConnectionContext
      forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
E.bracket (ConnectionContext -> ConnectionParams -> IO Connection
connectTo ConnectionContext
context ConnectionParams
connectionParams) Connection -> IO ()
connectionClose
        (\Connection
conn -> do
            Stream
stream <- IO (Maybe ByteString) -> (Maybe ByteString -> IO ()) -> IO Stream
makeStream (Connection -> IO (Maybe ByteString)
reader Connection
conn) (Connection -> Maybe ByteString -> IO ()
writer Connection
conn)
            forall a.
Stream
-> String
-> String
-> ConnectionOptions
-> Headers
-> ClientApp a
-> IO a
WS.runClientWithStream Stream
stream String
host String
path ConnectionOptions
options Headers
hdrs' ClientApp ()
app)

        where
          reader :: Connection -> IO (Maybe ByteString)
reader Connection
conn =
            forall a. IO a -> (IOError -> IO a) -> IO a
catchIOError (forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO ByteString
connectionGetChunk Connection
conn)
            (\IOError
e -> if IOError -> Bool
isEOFError IOError
e then forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing else forall e a. Exception e => e -> IO a
E.throwIO IOError
e)

          writer :: Connection -> Maybe ByteString -> IO ()
writer Connection
conn = forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (Connection -> ByteString -> IO ()
connectionPut Connection
conn forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BC.toStrict)

mqttFail :: String -> a
mqttFail :: forall a. String -> a
mqttFail = forall a e. Exception e => e -> a
E.throw forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> MQTTException
MQTTException

-- A couple async utilities to get our stuff named.

namedAsync :: String -> IO a -> IO (Async a)
namedAsync :: forall a. String -> IO a -> IO (Async a)
namedAsync String
s IO a
a = forall a. IO a -> IO (Async a)
async IO a
a forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Async a
p -> ThreadId -> String -> IO ()
labelThread (forall a. Async a -> ThreadId
asyncThreadId Async a
p) String
s forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Applicative f => a -> f a
pure Async a
p

namedTimeout :: String -> Int -> IO a -> IO (Maybe a)
namedTimeout :: forall a. String -> Int -> IO a -> IO (Maybe a)
namedTimeout String
n Int
to IO a
a = forall a. Int -> IO a -> IO (Maybe a)
timeout Int
to (IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ThreadId
tid -> ThreadId -> String -> IO ()
labelThread ThreadId
tid String
n forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO a
a)

-- | MQTTConduit provides a source and sink for data as used by 'runMQTTConduit'.
type MQTTConduit = (ConduitT () BCS.ByteString IO (), ConduitT BCS.ByteString Void IO ())

-- | Set up and run a client with a conduit context function.
--
-- The provided action calls another IO action with a 'MQTTConduit' as a
-- parameter.  It is expected that this action will manage the
-- lifecycle of the conduit source/sink on behalf of the client.
runMQTTConduit :: ((MQTTConduit -> IO ()) -> IO ()) -- ^ an action providing an 'MQTTConduit' in an execution context
               -> MQTTConfig -- ^ the 'MQTTConfig'
               -> IO MQTTClient
runMQTTConduit :: ((MQTTConduit -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
runMQTTConduit (MQTTConduit -> IO ()) -> IO ()
mkconn MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
TLSSettings
ProtocolLevel
MessageCallback
_pingPatience :: Int
_pingPeriod :: Int
_tlsSettings :: TLSSettings
_connectTimeout :: Int
_password :: Maybe String
_username :: Maybe String
_connID :: String
_port :: Int
_hostname :: String
_connProps :: [Property]
_protocol :: ProtocolLevel
_msgCB :: MessageCallback
_lwt :: Maybe LastWill
_cleanSession :: Bool
_pingPatience :: MQTTConfig -> Int
_pingPeriod :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_connectTimeout :: MQTTConfig -> Int
_password :: MQTTConfig -> Maybe String
_username :: MQTTConfig -> Maybe String
_connID :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_hostname :: MQTTConfig -> String
_connProps :: MQTTConfig -> [Property]
_protocol :: MQTTConfig -> ProtocolLevel
_msgCB :: MQTTConfig -> MessageCallback
_lwt :: MQTTConfig -> Maybe LastWill
_cleanSession :: MQTTConfig -> Bool
..} = do
  TChan MQTTPkt
_ch <- forall a. IO (TChan a)
newTChanIO
  TVar Word16
_pktID <- forall a. a -> IO (TVar a)
newTVarIO Word16
1
  TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks <- forall a. a -> IO (TVar a)
newTVarIO forall a. Monoid a => a
mempty
  TVar (Map Word16 PublishRequest)
_inflight <- forall a. a -> IO (TVar a)
newTVarIO forall a. Monoid a => a
mempty
  TVar ConnState
_st <- forall a. a -> IO (TVar a)
newTVarIO ConnState
Starting
  TVar (Maybe (Async ()))
_ct <- forall a. a -> IO (TVar a)
newTVarIO forall a. Maybe a
Nothing
  TVar (Map Topic Word16)
_outA <- forall a. a -> IO (TVar a)
newTVarIO forall a. Monoid a => a
mempty
  TVar (Map Word16 Topic)
_inA <- forall a. a -> IO (TVar a)
newTVarIO forall a. Monoid a => a
mempty
  TVar ConnACKFlags
_connACKFlags <- forall a. a -> IO (TVar a)
newTVarIO (SessionReuse -> ConnACKRC -> [Property] -> ConnACKFlags
ConnACKFlags SessionReuse
NewSession ConnACKRC
ConnUnspecifiedError forall a. Monoid a => a
mempty)
  TVar (Map ByteString MessageCallback)
_corr <- forall a. a -> IO (TVar a)
newTVarIO forall a. Monoid a => a
mempty
  MVar (IO ())
_cbM <- forall a. IO (MVar a)
newEmptyMVar
  TVar (Maybe (Async ()))
_cbHandle <- forall a. a -> IO (TVar a)
newTVarIO forall a. Maybe a
Nothing
  let _cb :: MessageCallback
_cb = MessageCallback
_msgCB
      cli :: MQTTClient
cli = MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
MessageCallback
_cb :: MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
..}

  Async ()
t <- forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT clientThread" forall a b. (a -> b) -> a -> b
$ MQTTClient -> IO ()
clientThread MQTTClient
cli
  ConnState
s <- forall a. STM a -> IO a
atomically (MQTTClient -> Async () -> STM ConnState
waitForLaunch MQTTClient
cli Async ()
t)

  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnState
s forall a. Eq a => a -> a -> Bool
== ConnState
Disconnected) forall a b. (a -> b) -> a -> b
$ forall a. Async a -> IO a
wait Async ()
t

  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ MQTTClient -> STM ()
checkConnected MQTTClient
cli

  forall (f :: * -> *) a. Applicative f => a -> f a
pure MQTTClient
cli

  where
    clientThread :: MQTTClient -> IO ()
clientThread MQTTClient
cli = forall a b. IO a -> IO b -> IO a
E.finally IO ()
connectAndRun IO ()
markDisco
      where
        connectAndRun :: IO ()
connectAndRun = (MQTTConduit -> IO ()) -> IO ()
mkconn forall a b. (a -> b) -> a -> b
$ \MQTTConduit
ad -> forall {m :: * -> *} {b} {a} {a}.
Monad m =>
b -> (a, ConduitT ByteString Void m a) -> m b
start MQTTClient
cli MQTTConduit
ad forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MQTTConduit -> MQTTClient -> IO ()
run MQTTConduit
ad
        markDisco :: IO ()
markDisco = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
          ConnState
st <- forall a. TVar a -> STM a
readTVar (MQTTClient -> TVar ConnState
_st MQTTClient
cli)
          forall (f :: * -> *). Alternative f => Bool -> f ()
guard forall a b. (a -> b) -> a -> b
$ ConnState
st forall a. Eq a => a -> a -> Bool
== ConnState
Starting Bool -> Bool -> Bool
|| ConnState
st forall a. Eq a => a -> a -> Bool
== ConnState
Connected
          forall a. TVar a -> a -> STM ()
writeTVar (MQTTClient -> TVar ConnState
_st MQTTClient
cli) ConnState
Disconnected

    start :: b -> (a, ConduitT ByteString Void m a) -> m b
start b
c (a
_,ConduitT ByteString Void m a
sink) = do
      forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit forall a b. (a -> b) -> a -> b
$ do
        let req :: ConnectRequest
req = ConnectRequest
connectRequest{_connID :: ByteString
T._connID=String -> ByteString
BC.pack String
_connID,
                                 _lastWill :: Maybe LastWill
T._lastWill=Maybe LastWill
_lwt,
                                 _username :: Maybe ByteString
T._username=String -> ByteString
BC.pack forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe String
_username,
                                 _password :: Maybe ByteString
T._password=String -> ByteString
BC.pack forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe String
_password,
                                 _cleanSession :: Bool
T._cleanSession=Bool
_cleanSession,
                                 _connProperties :: [Property]
T._connProperties=[Property]
_connProps}
        forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (ByteString -> ByteString
BL.toStrict forall a b. (a -> b) -> a -> b
$ forall a. ByteMe a => ProtocolLevel -> a -> ByteString
toByteString ProtocolLevel
_protocol ConnectRequest
req) forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString Void m a
sink

      forall (f :: * -> *) a. Applicative f => a -> f a
pure b
c

    run :: MQTTConduit -> MQTTClient -> IO ()
run (ConduitT () ByteString IO ()
src,ConduitT ByteString Void IO ()
sink) c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} = do
      TChan Bool
pch <- forall a. IO (TChan a)
newTChanIO
      Async ()
o <- forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT out" forall a b. (a -> b) -> a -> b
$ IO ()
onceConnected forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
processOut
      Async ()
p <- forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT ping" forall a b. (a -> b) -> a -> b
$ IO ()
onceConnected forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall {b}. IO b
doPing
      Async ()
w <- forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT watchdog" forall a b. (a -> b) -> a -> b
$ forall {a} {b}. TChan a -> IO b
watchdog TChan Bool
pch
      Async ()
s <- forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT in" forall a b. (a -> b) -> a -> b
$ TChan Bool -> IO ()
doSrc TChan Bool
pch

      forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. [Async a] -> IO (Async a, a)
waitAnyCancel [Async ()
o, Async ()
p, Async ()
w, Async ()
s]

      where
        doSrc :: TChan Bool -> IO ()
doSrc TChan Bool
pch = forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit forall a b. (a -> b) -> a -> b
$ ConduitT () ByteString IO ()
src
                    forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| forall a (m :: * -> *) b.
(AttoparsecInput a, MonadThrow m) =>
Parser a b -> ConduitT a (PositionRange, b) m ()
conduitParser (ProtocolLevel -> Parser MQTTPkt
parsePacket ProtocolLevel
_protocol)
                    forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
C.mapM_ (\(PositionRange
_,MQTTPkt
x) -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (MQTTClient -> TChan Bool -> MQTTPkt -> IO ()
dispatch MQTTClient
c TChan Bool
pch forall a b. (a -> b) -> a -> b
$! MQTTPkt
x))

        onceConnected :: IO ()
onceConnected = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ Bool -> STM ()
check forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a. Eq a => a -> a -> Bool
== ConnState
Connected) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> STM a
readTVar TVar ConnState
_st

        processOut :: IO ()
processOut = forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit forall a b. (a -> b) -> a -> b
$
          forall (m :: * -> *) a i. Monad m => m a -> ConduitT i a m ()
C.repeatM (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ MQTTClient -> STM ()
checkConnected MQTTClient
c forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. TChan a -> STM a
readTChan TChan MQTTPkt
_ch))
          forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map (ByteString -> ByteString
BL.toStrict forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. ByteMe a => ProtocolLevel -> a -> ByteString
toByteString ProtocolLevel
_protocol)
          forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString Void IO ()
sink

        doPing :: IO b
doPing = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
_pingPeriod forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c MQTTPkt
PingPkt

        watchdog :: TChan a -> IO b
watchdog TChan a
ch = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
          TVar Bool
toch <- Int -> IO (TVar Bool)
registerDelay Int
_pingPatience
          Bool
timedOut <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ ((Bool -> STM ()
check forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> STM a
readTVar TVar Bool
toch) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True) forall a. STM a -> STM a -> STM a
`orElse` (forall a. TChan a -> STM a
readTChan TChan a
ch forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False)
          forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
timedOut forall a b. (a -> b) -> a -> b
$ forall e. Exception e => MQTTClient -> e -> IO ()
killConn MQTTClient
c MQTTException
Timeout

    waitForLaunch :: MQTTClient -> Async () -> STM ConnState
waitForLaunch MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} Async ()
t = do
      forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Async ()))
_ct (forall a. a -> Maybe a
Just Async ()
t)
      ConnState
c <- forall a. TVar a -> STM a
readTVar TVar ConnState
_st
      if ConnState
c forall a. Eq a => a -> a -> Bool
== ConnState
Starting then forall a. STM a
retry else forall (f :: * -> *) a. Applicative f => a -> f a
pure ConnState
c

-- | Wait for a client to terminate its connection.
-- An exception is thrown if the client didn't terminate expectedly.
waitForClient :: MQTTClient -> IO ()
waitForClient :: MQTTClient -> IO ()
waitForClient c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall a b. IO a -> IO b -> IO a
E.finally (MQTTClient -> IO ()
stopCallbackThread MQTTClient
c) forall a b. (a -> b) -> a -> b
$ do
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse forall a. Async a -> IO a
wait forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
  forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) forall e a. Exception e => e -> IO a
E.throwIO forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. STM a -> IO a
atomically (MQTTClient -> ConnState -> STM (Maybe SomeException)
stateX MQTTClient
c ConnState
Stopped)

-- | Stops the client and closes the connection without sending a DISCONNECT
-- message to the broker. This will cause the last-will message to be delivered
-- by the broker if it has been defined.
stopClient :: MQTTClient -> IO ()
stopClient :: MQTTClient -> IO ()
stopClient MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} = do
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse forall a. Async a -> IO ()
cancel forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse forall a. Async a -> IO ()
cancel forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_cbHandle
  forall a. STM a -> IO a
atomically (forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st ConnState
Stopped)

stateX :: MQTTClient -> ConnState -> STM (Maybe E.SomeException)
stateX :: MQTTClient -> ConnState -> STM (Maybe SomeException)
stateX MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} ConnState
want = ConnState -> Maybe SomeException
f forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TVar a -> STM a
readTVar TVar ConnState
_st

  where
    je :: String -> Maybe SomeException
je = forall a. a -> Maybe a
Just forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e. Exception e => e -> SomeException
E.toException forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> MQTTException
MQTTException

    f :: ConnState -> Maybe E.SomeException
    f :: ConnState -> Maybe SomeException
f ConnState
Connected    = if ConnState
want forall a. Eq a => a -> a -> Bool
== ConnState
Connected then forall a. Maybe a
Nothing else String -> Maybe SomeException
je String
"unexpectedly connected"
    f ConnState
Stopped      = if ConnState
want forall a. Eq a => a -> a -> Bool
== ConnState
Stopped then forall a. Maybe a
Nothing else String -> Maybe SomeException
je String
"unexpectedly stopped"
    f ConnState
Disconnected = String -> Maybe SomeException
je String
"disconnected"
    f ConnState
Starting     = String -> Maybe SomeException
je String
"died while starting"
    f (DiscoErr DisconnectRequest
x) = forall a. a -> Maybe a
Just forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e. Exception e => e -> SomeException
E.toException forall b c a. (b -> c) -> (a -> b) -> a -> c
. DisconnectRequest -> MQTTException
Discod forall a b. (a -> b) -> a -> b
$ DisconnectRequest
x
    f (ConnErr ConnACKFlags
e)  = String -> Maybe SomeException
je (forall a. Show a => a -> String
show ConnACKFlags
e)

data MQTTException = Timeout | BadData | Discod DisconnectRequest | MQTTException String deriving(MQTTException -> MQTTException -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: MQTTException -> MQTTException -> Bool
$c/= :: MQTTException -> MQTTException -> Bool
== :: MQTTException -> MQTTException -> Bool
$c== :: MQTTException -> MQTTException -> Bool
Eq, Int -> MQTTException -> ShowS
[MQTTException] -> ShowS
MQTTException -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [MQTTException] -> ShowS
$cshowList :: [MQTTException] -> ShowS
show :: MQTTException -> String
$cshow :: MQTTException -> String
showsPrec :: Int -> MQTTException -> ShowS
$cshowsPrec :: Int -> MQTTException -> ShowS
Show)

instance E.Exception MQTTException

dispatch :: MQTTClient -> TChan Bool -> MQTTPkt -> IO ()
dispatch :: MQTTClient -> TChan Bool -> MQTTPkt -> IO ()
dispatch c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} TChan Bool
pch MQTTPkt
pkt =
  case MQTTPkt
pkt of
    (ConnACKPkt ConnACKFlags
p)                            -> ConnACKFlags -> IO ()
connACKd ConnACKFlags
p
    (PublishPkt PublishRequest
p)                            -> PublishRequest -> IO ()
pub PublishRequest
p
    (SubACKPkt (SubscribeResponse Word16
i [Either SubErr QoS]
_ [Property]
_))     -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DSubACK Word16
i
    (UnsubACKPkt (UnsubscribeResponse Word16
i [Property]
_ [UnsubStatus]
_)) -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DUnsubACK Word16
i
    (PubACKPkt (PubACK Word16
i Word8
_ [Property]
_))                -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DPubACK Word16
i
    (PubRELPkt (PubREL Word16
i Word8
_ [Property]
_))                -> Word16 -> IO ()
pubd Word16
i
    (PubRECPkt (PubREC Word16
i Word8
_ [Property]
_))                -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DPubREC Word16
i
    (PubCOMPPkt (PubCOMP Word16
i Word8
_ [Property]
_))              -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DPubCOMP Word16
i
    (DisconnectPkt DisconnectRequest
req)                       -> DisconnectRequest -> IO ()
disco DisconnectRequest
req
    MQTTPkt
PongPkt                                   -> forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TChan a -> a -> STM ()
writeTChan TChan Bool
pch forall a b. (a -> b) -> a -> b
$ Bool
True

    -- Not implemented
    (AuthPkt AuthRequest
p)                               -> forall a. String -> a
mqttFail (String
"unexpected incoming auth: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show AuthRequest
p)

    -- Things clients shouldn't see
    MQTTPkt
PingPkt                                   -> forall a. String -> a
mqttFail String
"unexpected incoming ping packet"
    (ConnPkt ConnectRequest
_ ProtocolLevel
_)                             -> forall a. String -> a
mqttFail String
"unexpected incoming connect"
    (SubscribePkt SubscribeRequest
_)                          -> forall a. String -> a
mqttFail String
"unexpected incoming subscribe"
    (UnsubscribePkt UnsubscribeRequest
_)                        -> forall a. String -> a
mqttFail String
"unexpected incoming unsubscribe"

  where connACKd :: ConnACKFlags -> IO ()
connACKd connr :: ConnACKFlags
connr@(ConnACKFlags SessionReuse
_ ConnACKRC
val [Property]
_) = case ConnACKRC
val of
                                                  ConnACKRC
ConnAccepted -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
                                                    forall a. TVar a -> a -> STM ()
writeTVar TVar ConnACKFlags
_connACKFlags ConnACKFlags
connr
                                                    forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st ConnState
Connected
                                                  ConnACKRC
_ -> do
                                                    Maybe (Async ())
t <- forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
                                                    forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st (ConnACKFlags -> ConnState
ConnErr ConnACKFlags
connr)
                                                    forall e. Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith (String -> MQTTException
MQTTException forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> String
show ConnACKFlags
connr) Maybe (Async ())
t

        pub :: PublishRequest -> IO ()
pub p :: PublishRequest
p@PublishRequest{_pubQoS :: PublishRequest -> QoS
_pubQoS=QoS
QoS0} = forall a. STM a -> IO a
atomically (PublishRequest -> STM PublishRequest
resolve PublishRequest
p) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall {t :: * -> *}.
Foldable t =>
t MQTTPkt -> PublishRequest -> IO ()
notify forall a. Maybe a
Nothing
        pub p :: PublishRequest
p@PublishRequest{_pubQoS :: PublishRequest -> QoS
_pubQoS=QoS
QoS1, Word16
_pubPktID :: PublishRequest -> Word16
_pubPktID :: Word16
_pubPktID} =
          forall {t :: * -> *}.
Foldable t =>
t MQTTPkt -> PublishRequest -> IO ()
notify (forall a. a -> Maybe a
Just (PubACK -> MQTTPkt
PubACKPkt (Word16 -> Word8 -> [Property] -> PubACK
PubACK Word16
_pubPktID Word8
0 forall a. Monoid a => a
mempty))) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. STM a -> IO a
atomically (PublishRequest -> STM PublishRequest
resolve PublishRequest
p)
        pub p :: PublishRequest
p@PublishRequest{_pubQoS :: PublishRequest -> QoS
_pubQoS=QoS
QoS2} = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
          p' :: PublishRequest
p'@PublishRequest{Bool
[Property]
Word16
ByteString
QoS
_pubProps :: PublishRequest -> [Property]
_pubBody :: PublishRequest -> ByteString
_pubTopic :: PublishRequest -> ByteString
_pubRetain :: PublishRequest -> Bool
_pubDup :: PublishRequest -> Bool
_pubProps :: [Property]
_pubBody :: ByteString
_pubPktID :: Word16
_pubTopic :: ByteString
_pubRetain :: Bool
_pubQoS :: QoS
_pubDup :: Bool
_pubPktID :: PublishRequest -> Word16
_pubQoS :: PublishRequest -> QoS
..} <- PublishRequest -> STM PublishRequest
resolve PublishRequest
p
          forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map Word16 PublishRequest)
_inflight (forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Word16
_pubPktID PublishRequest
p')
          MQTTClient -> MQTTPkt -> STM ()
sendPacket MQTTClient
c (PubREC -> MQTTPkt
PubRECPkt (Word16 -> Word8 -> [Property] -> PubREC
PubREC Word16
_pubPktID Word8
0 forall a. Monoid a => a
mempty))

        pubd :: Word16 -> IO ()
pubd Word16
i = do
          Maybe PublishRequest
mp <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
            Maybe PublishRequest
r <- forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Word16
i forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TVar a -> STM a
readTVar TVar (Map Word16 PublishRequest)
_inflight
            forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map Word16 PublishRequest)
_inflight (forall k a. Ord k => k -> Map k a -> Map k a
Map.delete Word16
i)
            forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe PublishRequest
r
          case Maybe PublishRequest
mp of
            Maybe PublishRequest
Nothing -> MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (PubCOMP -> MQTTPkt
PubCOMPPkt (Word16 -> Word8 -> [Property] -> PubCOMP
PubCOMP Word16
i Word8
0x92 forall a. Monoid a => a
mempty))
            Just PublishRequest
p  -> forall {t :: * -> *}.
Foldable t =>
t MQTTPkt -> PublishRequest -> IO ()
notify (forall a. a -> Maybe a
Just (PubCOMP -> MQTTPkt
PubCOMPPkt (Word16 -> Word8 -> [Property] -> PubCOMP
PubCOMP Word16
i Word8
0 forall a. Monoid a => a
mempty))) PublishRequest
p

        notify :: t MQTTPkt -> PublishRequest -> IO ()
notify t MQTTPkt
rpkt p :: PublishRequest
p@PublishRequest{Bool
[Property]
Word16
ByteString
QoS
_pubProps :: [Property]
_pubBody :: ByteString
_pubPktID :: Word16
_pubTopic :: ByteString
_pubRetain :: Bool
_pubQoS :: QoS
_pubDup :: Bool
_pubProps :: PublishRequest -> [Property]
_pubBody :: PublishRequest -> ByteString
_pubTopic :: PublishRequest -> ByteString
_pubRetain :: PublishRequest -> Bool
_pubDup :: PublishRequest -> Bool
_pubPktID :: PublishRequest -> Word16
_pubQoS :: PublishRequest -> QoS
..} = do
          forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map Word16 PublishRequest)
_inflight (forall k a. Ord k => k -> Map k a -> Map k a
Map.delete Word16
_pubPktID)
          Map ByteString MessageCallback
corrs <- forall a. TVar a -> IO a
readTVarIO TVar (Map ByteString MessageCallback)
_corr
          forall a. a -> IO a
E.evaluate forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. NFData a => a -> a
force forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< case forall b a. b -> (a -> b) -> Maybe a -> b
maybe MessageCallback
_cb (\ByteString
cd -> forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault MessageCallback
_cb ByteString
cd Map ByteString MessageCallback
corrs) Maybe ByteString
cdata of
                                   MessageCallback
NoCallback                -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                                   SimpleCallback MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
f          -> forall {a}. IO a -> IO ()
call (MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
f MQTTClient
c (ByteString -> Topic
blToTopic ByteString
_pubTopic) ByteString
_pubBody [Property]
_pubProps)
                                   OrderedCallback MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
f         -> forall {a}. IO a -> IO ()
callOrd (MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
f MQTTClient
c (ByteString -> Topic
blToTopic ByteString
_pubTopic) ByteString
_pubBody [Property]
_pubProps)
                                   LowLevelCallback MQTTClient -> PublishRequest -> IO ()
f        -> forall {a}. IO a -> IO ()
call (MQTTClient -> PublishRequest -> IO ()
f MQTTClient
c PublishRequest
p)
                                   OrderedLowLevelCallback MQTTClient -> PublishRequest -> IO ()
f -> forall {a}. IO a -> IO ()
callOrd (MQTTClient -> PublishRequest -> IO ()
f MQTTClient
c PublishRequest
p)

            where
              call :: IO a -> IO ()
call IO a
a = forall a. Async a -> IO ()
link forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. String -> IO a -> IO (Async a)
namedAsync String
"notifier" (IO a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
respond)
              callOrd :: IO a -> IO ()
callOrd IO a
a = forall a. MVar a -> a -> IO ()
putMVar MVar (IO ())
_cbM forall a b. (a -> b) -> a -> b
$ IO a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
respond
              respond :: IO ()
respond = forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c) t MQTTPkt
rpkt
              cdata :: Maybe ByteString
cdata = forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr Property -> Maybe ByteString -> Maybe ByteString
f forall a. Maybe a
Nothing [Property]
_pubProps
                where f :: Property -> Maybe ByteString -> Maybe ByteString
f (PropCorrelationData ByteString
x) Maybe ByteString
_ = forall a. a -> Maybe a
Just ByteString
x
                      f Property
_ Maybe ByteString
o                       = Maybe ByteString
o

        resolve :: PublishRequest -> STM PublishRequest
resolve p :: PublishRequest
p@PublishRequest{Bool
[Property]
Word16
ByteString
QoS
_pubProps :: [Property]
_pubBody :: ByteString
_pubPktID :: Word16
_pubTopic :: ByteString
_pubRetain :: Bool
_pubQoS :: QoS
_pubDup :: Bool
_pubProps :: PublishRequest -> [Property]
_pubBody :: PublishRequest -> ByteString
_pubTopic :: PublishRequest -> ByteString
_pubRetain :: PublishRequest -> Bool
_pubDup :: PublishRequest -> Bool
_pubPktID :: PublishRequest -> Word16
_pubQoS :: PublishRequest -> QoS
..} = do
          Topic
topic <- Maybe Word16 -> STM Topic
resolveTopic (forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr Property -> Maybe Word16 -> Maybe Word16
aliasID forall a. Maybe a
Nothing [Property]
_pubProps)
          forall (f :: * -> *) a. Applicative f => a -> f a
pure PublishRequest
p{_pubTopic :: ByteString
_pubTopic=Topic -> ByteString
topicToBL Topic
topic}

          where
            aliasID :: Property -> Maybe Word16 -> Maybe Word16
aliasID (PropTopicAlias Word16
x) Maybe Word16
_ = forall a. a -> Maybe a
Just Word16
x
            aliasID Property
_ Maybe Word16
o                  = Maybe Word16
o

            resolveTopic :: Maybe Word16 -> STM Topic
resolveTopic Maybe Word16
Nothing = forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Topic
blToTopic ByteString
_pubTopic)
            resolveTopic (Just Word16
x) = do
              forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString
_pubTopic forall a. Eq a => a -> a -> Bool
/= ByteString
"") forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map Word16 Topic)
_inA (forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Word16
x (ByteString -> Topic
blToTopic ByteString
_pubTopic))
              forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall a. String -> a
mqttFail (String
"failed to lookup topic alias " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Word16
x)) forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Word16
x forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> STM a
readTVar TVar (Map Word16 Topic)
_inA

        delegate :: DispatchType -> Word16 -> IO ()
delegate DispatchType
dt Word16
pid = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$
          forall b a. b -> (a -> b) -> Maybe a -> b
maybe (DispatchType -> STM ()
nak DispatchType
dt) (forall a. TChan a -> a -> STM ()
`writeTChan` MQTTPkt
pkt) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup (DispatchType
dt, Word16
pid) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> STM a
readTVar TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks

            where
              nak :: DispatchType -> STM ()
nak DispatchType
DPubREC = MQTTClient -> MQTTPkt -> STM ()
sendPacket MQTTClient
c (PubREL -> MQTTPkt
PubRELPkt  (Word16 -> Word8 -> [Property] -> PubREL
PubREL  Word16
pid Word8
0x92 forall a. Monoid a => a
mempty))
              nak DispatchType
_       = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()


        disco :: DisconnectRequest -> IO ()
disco DisconnectRequest
req = do
          forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st (DisconnectRequest -> ConnState
DiscoErr DisconnectRequest
req)
          forall e. Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith (DisconnectRequest -> MQTTException
Discod DisconnectRequest
req) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct

-- Run OrderedCallbacks in a background thread. Does nothing for other callback types.
-- We keep the async handle in a TVar and make sure only one of these threads is running.
runCallbackThread :: MQTTClient -> IO ()
runCallbackThread :: MQTTClient -> IO ()
runCallbackThread MQTTClient{MessageCallback
_cb :: MessageCallback
_cb :: MQTTClient -> MessageCallback
_cb, MVar (IO ())
_cbM :: MVar (IO ())
_cbM :: MQTTClient -> MVar (IO ())
_cbM, TVar (Maybe (Async ()))
_cbHandle :: TVar (Maybe (Async ()))
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbHandle}
  | MessageCallback -> Bool
isOrdered MessageCallback
_cb = do
      -- We always spawn a thread, but may kill it if we already have
      -- one.  The new thread won't start until we atomically confirm
      -- it's the only one.
      TVar Bool
latch <- forall a. a -> IO (TVar a)
newTVarIO Bool
False
      Async ()
handle <- forall a. String -> IO a -> IO (Async a)
namedAsync String
"ordered callbacks" (TVar Bool -> IO ()
waitFor TVar Bool
latch forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> forall {b}. IO b
runOrderedCallbacks)
      forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
        Maybe (Async ())
cbThread <- forall a. TVar a -> STM a
readTVar TVar (Maybe (Async ()))
_cbHandle
        case Maybe (Async ())
cbThread of
          Maybe (Async ())
Nothing -> do
            -- This is the first thread.  Flip the latch and put it in place.
            forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
latch Bool
True
            forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Async ()))
_cbHandle (forall a. a -> Maybe a
Just Async ()
handle)
            forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
          Just Async ()
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. Async a -> IO ()
cancel Async ()
handle) -- otherwise, cancel the temporary thread.
  | Bool
otherwise = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    where isOrdered :: MessageCallback -> Bool
isOrdered (OrderedCallback MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
_)         = Bool
True
          isOrdered (OrderedLowLevelCallback MQTTClient -> PublishRequest -> IO ()
_) = Bool
True
          isOrdered MessageCallback
_                           = Bool
False
          waitFor :: TVar Bool -> IO ()
waitFor TVar Bool
latch = forall a. STM a -> IO a
atomically (Bool -> STM ()
check forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> STM a
readTVar TVar Bool
latch)
          -- Keep running callbacks from the MVar
          runOrderedCallbacks :: IO b
runOrderedCallbacks = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. MVar a -> IO a
takeMVar forall a b. (a -> b) -> a -> b
$ MVar (IO ())
_cbM

-- Stop the background thread for OrderedCallbacks. Does nothing for other callback types.
stopCallbackThread :: MQTTClient -> IO ()
stopCallbackThread :: MQTTClient -> IO ()
stopCallbackThread MQTTClient{TVar (Maybe (Async ()))
_cbHandle :: TVar (Maybe (Async ()))
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbHandle} = forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) forall a. Async a -> IO ()
cancel forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_cbHandle

maybeCancelWith :: E.Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith :: forall e. Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith e
e = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (forall e a. Exception e => Async a -> e -> IO ()
`cancelWith` e
e)

killConn :: E.Exception e => MQTTClient -> e -> IO ()
killConn :: forall e. Exception e => MQTTClient -> e -> IO ()
killConn MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} e
e = forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall e. Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith e
e

checkConnected :: MQTTClient -> STM ()
checkConnected :: MQTTClient -> STM ()
checkConnected MQTTClient
mc = forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) forall a e. Exception e => e -> a
E.throw forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MQTTClient -> ConnState -> STM (Maybe SomeException)
stateX MQTTClient
mc ConnState
Connected

-- | True if we're currently in a normally connected state (in the IO monad).
isConnected :: MQTTClient -> IO Bool
isConnected :: MQTTClient -> IO Bool
isConnected = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> STM Bool
isConnectedSTM

-- | True if we're currently in a normally connected state (in the STM monad).
isConnectedSTM :: MQTTClient -> STM Bool
isConnectedSTM :: MQTTClient -> STM Bool
isConnectedSTM MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} = (ConnState
Connected forall a. Eq a => a -> a -> Bool
==) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TVar a -> STM a
readTVar TVar ConnState
_st

sendPacket :: MQTTClient -> MQTTPkt -> STM ()
sendPacket :: MQTTClient -> MQTTPkt -> STM ()
sendPacket c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} MQTTPkt
p = MQTTClient -> STM ()
checkConnected MQTTClient
c forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. TChan a -> a -> STM ()
writeTChan TChan MQTTPkt
_ch MQTTPkt
p

sendPacketIO :: MQTTClient -> MQTTPkt -> IO ()
sendPacketIO :: MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> MQTTPkt -> STM ()
sendPacket MQTTClient
c

textToBL :: Text -> BL.ByteString
textToBL :: Text -> ByteString
textToBL = ByteString -> ByteString
BL.fromStrict forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
TE.encodeUtf8

blToText :: BL.ByteString -> Text
blToText :: ByteString -> Text
blToText = ByteString -> Text
TE.decodeUtf8 forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BL.toStrict

topicToBL :: Topic -> BL.ByteString
topicToBL :: Topic -> ByteString
topicToBL = Text -> ByteString
textToBL forall b c a. (b -> c) -> (a -> b) -> a -> c
. Topic -> Text
unTopic

filterToBL :: Filter -> BL.ByteString
filterToBL :: Filter -> ByteString
filterToBL = Text -> ByteString
textToBL forall b c a. (b -> c) -> (a -> b) -> a -> c
. Filter -> Text
unFilter

blToTopic :: BL.ByteString -> Topic
blToTopic :: ByteString -> Topic
blToTopic = forall a. HasCallStack => Maybe a -> a
fromJust forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Maybe Topic
mkTopic forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
blToText

reservePktID :: MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID :: MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} [DispatchType]
dts = do
  MQTTClient -> STM ()
checkConnected MQTTClient
c
  TChan MQTTPkt
ch <- forall a. STM (TChan a)
newTChan
  Word16
pid <- forall a. TVar a -> STM a
readTVar TVar Word16
_pktID
  forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Word16
_pktID forall a b. (a -> b) -> a -> b
$ if Word16
pid forall a. Eq a => a -> a -> Bool
== forall a. Bounded a => a
maxBound then forall a b. a -> b -> a
const Word16
1 else forall a. Enum a => a -> a
succ
  forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks (forall k a. Ord k => Map k a -> Map k a -> Map k a
Map.union (forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [((DispatchType
t, Word16
pid), TChan MQTTPkt
ch) | DispatchType
t <- [DispatchType]
dts]))
  forall (f :: * -> *) a. Applicative f => a -> f a
pure (TChan MQTTPkt
ch,Word16
pid)

releasePktID :: MQTTClient -> (DispatchType,Word16) -> STM ()
releasePktID :: MQTTClient -> (DispatchType, Word16) -> STM ()
releasePktID c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} (DispatchType, Word16)
k = MQTTClient -> STM ()
checkConnected MQTTClient
c forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks (forall k a. Ord k => k -> Map k a -> Map k a
Map.delete (DispatchType, Word16)
k)

releasePktIDs :: MQTTClient -> [(DispatchType,Word16)] -> STM ()
releasePktIDs :: MQTTClient -> [(DispatchType, Word16)] -> STM ()
releasePktIDs c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} [(DispatchType, Word16)]
ks = MQTTClient -> STM ()
checkConnected MQTTClient
c forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks forall {a}.
Map (DispatchType, Word16) a -> Map (DispatchType, Word16) a
deleteMany
  where deleteMany :: Map (DispatchType, Word16) a -> Map (DispatchType, Word16) a
deleteMany Map (DispatchType, Word16) a
m = forall k a. Ord k => Map k a -> Set k -> Map k a
Map.withoutKeys Map (DispatchType, Word16) a
m (forall a. Ord a => [a] -> Set a
Set.fromList [(DispatchType, Word16)]
ks)

sendAndWait :: MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait :: MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} DispatchType
dt Word16 -> MQTTPkt
f = do
  (TChan MQTTPkt
ch,Word16
pid) <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    (TChan MQTTPkt
ch,Word16
pid) <- MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID MQTTClient
c [DispatchType
dt]
    MQTTClient -> MQTTPkt -> STM ()
sendPacket MQTTClient
c (Word16 -> MQTTPkt
f Word16
pid)
    forall (f :: * -> *) a. Applicative f => a -> f a
pure (TChan MQTTPkt
ch,Word16
pid)

  -- Wait for the response in a separate transaction.
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    ConnState
st <- forall a. TVar a -> STM a
readTVar TVar ConnState
_st
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnState
st forall a. Eq a => a -> a -> Bool
/= ConnState
Connected) forall a b. (a -> b) -> a -> b
$ forall a. String -> a
mqttFail String
"disconnected waiting for response"
    MQTTClient -> (DispatchType, Word16) -> STM ()
releasePktID MQTTClient
c (DispatchType
dt,Word16
pid)
    forall a. TChan a -> STM a
readTChan TChan MQTTPkt
ch

-- | Subscribe to a list of topic filters with their respective 'QoS'es.
-- The accepted 'QoS'es are returned in the same order as requested.
subscribe :: MQTTClient -> [(Filter, SubOptions)] -> [Property] -> IO ([Either SubErr QoS], [Property])
subscribe :: MQTTClient
-> [(Filter, SubOptions)]
-> [Property]
-> IO ([Either SubErr QoS], [Property])
subscribe MQTTClient
c [(Filter, SubOptions)]
ls [Property]
props = do
  MQTTClient -> IO ()
runCallbackThread MQTTClient
c
  MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait MQTTClient
c DispatchType
DSubACK (\Word16
pid -> SubscribeRequest -> MQTTPkt
SubscribePkt forall a b. (a -> b) -> a -> b
$ Word16
-> [(ByteString, SubOptions)] -> [Property] -> SubscribeRequest
SubscribeRequest Word16
pid [(ByteString, SubOptions)]
ls' [Property]
props) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    SubACKPkt (SubscribeResponse Word16
_ [Either SubErr QoS]
rs [Property]
aprops) -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ([Either SubErr QoS]
rs, [Property]
aprops)
    MQTTPkt
pkt                                       -> forall a. String -> a
mqttFail forall a b. (a -> b) -> a -> b
$ String
"unexpected response to subscribe: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show MQTTPkt
pkt

  where ls' :: [(ByteString, SubOptions)]
ls' = forall a b. (a -> b) -> [a] -> [b]
map (forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first Filter -> ByteString
filterToBL) [(Filter, SubOptions)]
ls

-- | Unsubscribe from a list of topic filters.
--
-- In MQTT 3.1.1, there is no body to an unsubscribe response, so it
-- can be ignored.  If this returns, you were unsubscribed.  In MQTT
-- 5, you'll get a list of unsub status values corresponding to your
-- request filters, and whatever properties the server thought you
-- should know about.
unsubscribe :: MQTTClient -> [Filter] -> [Property] -> IO ([UnsubStatus], [Property])
unsubscribe :: MQTTClient
-> [Filter] -> [Property] -> IO ([UnsubStatus], [Property])
unsubscribe MQTTClient
c [Filter]
ls [Property]
props = do
  (UnsubACKPkt (UnsubscribeResponse Word16
_ [Property]
rsn [UnsubStatus]
rprop)) <- MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait MQTTClient
c DispatchType
DUnsubACK (\Word16
pid -> UnsubscribeRequest -> MQTTPkt
UnsubscribePkt forall a b. (a -> b) -> a -> b
$ Word16 -> [ByteString] -> [Property] -> UnsubscribeRequest
UnsubscribeRequest Word16
pid (forall a b. (a -> b) -> [a] -> [b]
map Filter -> ByteString
filterToBL [Filter]
ls) [Property]
props)
  forall (f :: * -> *) a. Applicative f => a -> f a
pure ([UnsubStatus]
rprop, [Property]
rsn)

-- | Publish a message (QoS 0).
publish :: MQTTClient
        -> Topic         -- ^ Topic
        -> BL.ByteString -- ^ Message body
        -> Bool          -- ^ Retain flag
        -> IO ()
publish :: MQTTClient -> Topic -> ByteString -> Bool -> IO ()
publish MQTTClient
c Topic
t ByteString
m Bool
r = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
publishq MQTTClient
c Topic
t ByteString
m Bool
r QoS
QoS0 forall a. Monoid a => a
mempty

-- | Publish a message with the specified QoS and Properties list.
publishq :: MQTTClient
         -> Topic         -- ^ Topic
         -> BL.ByteString -- ^ Message body
         -> Bool          -- ^ Retain flag
         -> QoS           -- ^ QoS
         -> [Property]    -- ^ Properties
         -> IO ()
publishq :: MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
publishq MQTTClient
c Topic
t ByteString
m Bool
r QoS
q [Property]
props = do
  (TChan MQTTPkt
ch,Word16
pid) <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID MQTTClient
c [DispatchType]
types
  forall a b. IO a -> IO b -> IO a
E.finally (TChan MQTTPkt -> Word16 -> IO ()
publishAndWait TChan MQTTPkt
ch Word16
pid) (forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ MQTTClient -> [(DispatchType, Word16)] -> STM ()
releasePktIDs MQTTClient
c [(DispatchType
t',Word16
pid) | DispatchType
t' <- [DispatchType]
types])

    where
      types :: [DispatchType]
types = [DispatchType
DPubACK, DispatchType
DPubREC, DispatchType
DPubCOMP]
      publishAndWait :: TChan MQTTPkt -> Word16 -> IO ()
publishAndWait TChan MQTTPkt
ch Word16
pid = do
        MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (Word16 -> MQTTPkt
pkt Word16
pid)
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (QoS
q forall a. Ord a => a -> a -> Bool
> QoS
QoS0) forall a b. (a -> b) -> a -> b
$ TChan MQTTPkt -> Word16 -> IO ()
satisfyQoS TChan MQTTPkt
ch Word16
pid

      pkt :: Word16 -> MQTTPkt
pkt Word16
pid = PublishRequest -> MQTTPkt
PublishPkt forall a b. (a -> b) -> a -> b
$ PublishRequest {_pubDup :: Bool
_pubDup = Bool
False,
                                             _pubQoS :: QoS
_pubQoS = QoS
q,
                                             _pubPktID :: Word16
_pubPktID = Word16
pid,
                                             _pubRetain :: Bool
_pubRetain = Bool
r,
                                             _pubTopic :: ByteString
_pubTopic = Topic -> ByteString
topicToBL Topic
t,
                                             _pubBody :: ByteString
_pubBody = ByteString
m,
                                             _pubProps :: [Property]
_pubProps = [Property]
props}

      satisfyQoS :: TChan MQTTPkt -> Word16 -> IO ()
satisfyQoS TChan MQTTPkt
ch Word16
pid
        | QoS
q forall a. Eq a => a -> a -> Bool
== QoS
QoS0 = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        | QoS
q forall a. Eq a => a -> a -> Bool
== QoS
QoS1 = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ do
            (PubACKPkt (PubACK Word16
_ Word8
st [Property]
pprops)) <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ MQTTClient -> STM ()
checkConnected MQTTClient
c forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. TChan a -> STM a
readTChan TChan MQTTPkt
ch
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (forall {a}. (Eq a, Num a) => a -> Bool
isOK Word8
st) forall a b. (a -> b) -> a -> b
$ forall a. String -> a
mqttFail (String
"qos 1 publish error: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Word8
st forall a. Semigroup a => a -> a -> a
<> String
" " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show [Property]
pprops)
        | QoS
q forall a. Eq a => a -> a -> Bool
== QoS
QoS2 = IO ()
waitRec
        | Bool
otherwise = forall a. HasCallStack => String -> a
error String
"invalid QoS"

        where
          isOK :: a -> Bool
isOK a
0  = Bool
True -- success
          isOK a
16 = Bool
True -- It worked, but nobody cares (no matching subscribers)
          isOK a
_  = Bool
False

          waitRec :: IO ()
waitRec = forall a. STM a -> IO a
atomically (MQTTClient -> STM ()
checkConnected MQTTClient
c forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. TChan a -> STM a
readTChan TChan MQTTPkt
ch) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              PubRECPkt (PubREC Word16
_ Word8
st [Property]
recprops) -> do
                forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (forall {a}. (Eq a, Num a) => a -> Bool
isOK Word8
st) forall a b. (a -> b) -> a -> b
$ forall a. String -> a
mqttFail (String
"qos 2 REC publish error: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Word8
st forall a. Semigroup a => a -> a -> a
<> String
" " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show [Property]
recprops)
                MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (PubREL -> MQTTPkt
PubRELPkt forall a b. (a -> b) -> a -> b
$ Word16 -> Word8 -> [Property] -> PubREL
PubREL Word16
pid Word8
0 forall a. Monoid a => a
mempty)
              PubCOMPPkt (PubCOMP Word16
_ Word8
st' [Property]
compprops) ->
                forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word8
st' forall a. Eq a => a -> a -> Bool
/= Word8
0) forall a b. (a -> b) -> a -> b
$ forall a. String -> a
mqttFail (String
"qos 2 COMP publish error: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Word8
st' forall a. Semigroup a => a -> a -> a
<> String
" " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show [Property]
compprops)
              MQTTPkt
wtf -> forall a. String -> a
mqttFail (String
"unexpected packet received in QoS2 publish: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show MQTTPkt
wtf)

-- | Disconnect from the MQTT server.
disconnect :: MQTTClient -> DiscoReason -> [Property] -> IO ()
disconnect :: MQTTClient -> DiscoReason -> [Property] -> IO ()
disconnect c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} DiscoReason
reason [Property]
props = forall a b. IO a -> IO b -> IO ()
race_ IO ()
getDisconnected IO ()
orDieTrying
  where
    getDisconnected :: IO ()
getDisconnected = do
      MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (DisconnectRequest -> MQTTPkt
DisconnectPkt forall a b. (a -> b) -> a -> b
$ DiscoReason -> [Property] -> DisconnectRequest
DisconnectRequest DiscoReason
reason [Property]
props)
      forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse forall a. Async a -> IO a
wait forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
      forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st ConnState
Stopped
    orDieTrying :: IO ()
orDieTrying = Int -> IO ()
threadDelay Int
10000000 forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall e. Exception e => MQTTClient -> e -> IO ()
killConn MQTTClient
c MQTTException
Timeout

-- | Disconnect with 'DiscoNormalDisconnection' and no properties.
normalDisconnect :: MQTTClient -> IO ()
normalDisconnect :: MQTTClient -> IO ()
normalDisconnect MQTTClient
c = MQTTClient -> DiscoReason -> [Property] -> IO ()
disconnect MQTTClient
c DiscoReason
DiscoNormalDisconnection forall a. Monoid a => a
mempty

-- | A convenience method for creating a 'LastWill'.
mkLWT :: Topic -> BL.ByteString -> Bool -> T.LastWill
mkLWT :: Topic -> ByteString -> Bool -> LastWill
mkLWT Topic
t ByteString
m Bool
r = T.LastWill{
  _willRetain :: Bool
T._willRetain=Bool
r,
  _willQoS :: QoS
T._willQoS=QoS
QoS0,
  _willTopic :: ByteString
T._willTopic = Topic -> ByteString
topicToBL Topic
t,
  _willMsg :: ByteString
T._willMsg=ByteString
m,
  _willProps :: [Property]
T._willProps=forall a. Monoid a => a
mempty
  }

-- | Get the list of properties that were sent from the broker at connect time.
svrProps :: MQTTClient -> IO [Property]
svrProps :: MQTTClient -> IO [Property]
svrProps = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ConnACKFlags -> [Property]
p forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> STM ConnACKFlags
connACKSTM
  where p :: ConnACKFlags -> [Property]
p (ConnACKFlags SessionReuse
_ ConnACKRC
_ [Property]
props) = [Property]
props

-- | Get the complete connection ACK packet from the beginning of this session.
connACKSTM :: MQTTClient -> STM ConnACKFlags
connACKSTM :: MQTTClient -> STM ConnACKFlags
connACKSTM MQTTClient{TVar ConnACKFlags
_connACKFlags :: TVar ConnACKFlags
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_connACKFlags} = forall a. TVar a -> STM a
readTVar TVar ConnACKFlags
_connACKFlags

-- | Get the complete connection aCK packet from the beginning of this session.
connACK :: MQTTClient -> IO ConnACKFlags
connACK :: MQTTClient -> IO ConnACKFlags
connACK = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> STM ConnACKFlags
connACKSTM

maxAliases :: MQTTClient -> IO Word16
maxAliases :: MQTTClient -> IO Word16
maxAliases = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr Property -> Word16 -> Word16
f Word16
0) forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> IO [Property]
svrProps
  where
    f :: Property -> Word16 -> Word16
f (PropTopicAliasMaximum Word16
n) Word16
_ = Word16
n
    f Property
_ Word16
o                         = Word16
o

-- | Publish a message with the specified 'QoS' and 'Property' list.  If
-- possible, use an alias to shorten the message length.  The alias
-- list is managed by the client in a first-come, first-served basis,
-- so if you use this with more properties than the broker allows,
-- only the first N (up to TopicAliasMaximum, as specified by the
-- broker at connect time) will be aliased.
--
-- This is safe to use as a general publish mechanism, as it will
-- default to not aliasing whenver there's not already an alias and we
-- can't create any more.
pubAliased :: MQTTClient
         -> Topic         -- ^ Topic
         -> BL.ByteString -- ^ Message body
         -> Bool          -- ^ Retain flag
         -> QoS           -- ^ QoS
         -> [Property]    -- ^ Properties
         -> IO ()
pubAliased :: MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
pubAliased c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map Word16 PublishRequest)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map ByteString MessageCallback)
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: TVar (Map ByteString MessageCallback)
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: TVar (Map Word16 PublishRequest)
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> TVar (Map Word16 PublishRequest)
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} Topic
t ByteString
m Bool
r QoS
q [Property]
props = do
  Word16
x <- MQTTClient -> IO Word16
maxAliases MQTTClient
c
  (Topic
t', Word16
n) <- Word16 -> IO (Topic, Word16)
alias Word16
x
  let np :: [Property]
np = [Property]
props forall a. Semigroup a => a -> a -> a
<> case Word16
n of
                      Word16
0 -> forall a. Monoid a => a
mempty
                      Word16
_ -> [Word16 -> Property
PropTopicAlias Word16
n]
  MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
publishq MQTTClient
c Topic
t' ByteString
m Bool
r QoS
q [Property]
np

  where
    alias :: Word16 -> IO (Topic, Word16)
alias Word16
mv = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
      Map Topic Word16
as <- forall a. TVar a -> STM a
readTVar TVar (Map Topic Word16)
_outA
      let n :: Word16
n = forall a. Enum a => Int -> a
toEnum (forall (t :: * -> *) a. Foldable t => t a -> Int
length Map Topic Word16
as forall a. Num a => a -> a -> a
+ Int
1)
          cur :: Maybe Word16
cur = forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Topic
t Map Topic Word16
as
          v :: Word16
v = forall a. a -> Maybe a -> a
fromMaybe (if Word16
n forall a. Ord a => a -> a -> Bool
> Word16
mv then Word16
0 else Word16
n) Maybe Word16
cur
      forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word16
v forall a. Ord a => a -> a -> Bool
> Word16
0) forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM ()
writeTVar TVar (Map Topic Word16)
_outA (forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Topic
t Word16
v Map Topic Word16
as)
      forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall b a. b -> (a -> b) -> Maybe a -> b
maybe Topic
t (forall a b. a -> b -> a
const Topic
"") Maybe Word16
cur, Word16
v)

-- | Register a callback handler for a message with the given correlated data identifier.
--
-- This registration will remain in place until unregisterCorrelated is called to remove it.
registerCorrelated :: MQTTClient -> BL.ByteString -> MessageCallback -> STM ()
registerCorrelated :: MQTTClient -> ByteString -> MessageCallback -> STM ()
registerCorrelated MQTTClient{TVar (Map ByteString MessageCallback)
_corr :: TVar (Map ByteString MessageCallback)
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_corr} ByteString
bs = forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map ByteString MessageCallback)
_corr forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert ByteString
bs

-- | Unregister a callback handler for the given correlated data identifier.
unregisterCorrelated :: MQTTClient -> BL.ByteString -> STM ()
unregisterCorrelated :: MQTTClient -> ByteString -> STM ()
unregisterCorrelated MQTTClient{TVar (Map ByteString MessageCallback)
_corr :: TVar (Map ByteString MessageCallback)
_corr :: MQTTClient -> TVar (Map ByteString MessageCallback)
_corr} = forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map ByteString MessageCallback)
_corr forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall k a. Ord k => k -> Map k a -> Map k a
Map.delete