{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.MQTT.Client (
MQTTConfig(..), MQTTClient, QoS(..), Topic, mqttConfig, mkLWT, LastWill(..),
ProtocolLevel(..), Property(..), SubOptions(..), subOptions, MessageCallback(..),
waitForClient,
connectURI, isConnected,
disconnect, normalDisconnect, stopClient,
subscribe, unsubscribe, publish, publishq, pubAliased,
svrProps, connACK, MQTTException(..),
runMQTTConduit, MQTTConduit, isConnectedSTM, connACKSTM,
registerCorrelated, unregisterCorrelated
) where
import Control.Concurrent (myThreadId, threadDelay)
import Control.Concurrent.Async (Async, async, asyncThreadId, cancel, cancelWith, link, race_, wait,
waitAnyCancel)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM (STM, TChan, TVar, atomically, check, modifyTVar', newTChan, newTChanIO,
newTVarIO, orElse, readTChan, readTVar, readTVarIO, registerDelay, retry,
writeTChan, writeTVar)
import Control.DeepSeq (force)
import qualified Control.Exception as E
import Control.Monad (forever, guard, join, unless, void, when)
import Control.Monad.IO.Class (liftIO)
import Data.Bifunctor (first)
import qualified Data.ByteString.Char8 as BCS
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Char8 as BC
import Data.Conduit (ConduitT, Void, await, runConduit, yield, (.|))
import Data.Conduit.Attoparsec (conduitParser)
import qualified Data.Conduit.Combinators as C
import Data.Conduit.Network (AppData, appSink, appSource, clientSettings, runTCPClient)
import Data.Conduit.Network.TLS (runTLSClient, tlsClientConfig, tlsClientTLSSettings)
import Data.Foldable (traverse_)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Map.Strict.Decaying as Decaying
import Data.Maybe (fromJust, fromMaybe)
import Data.Text (Text)
import qualified Data.Text.Encoding as TE
import Data.Word (Word16)
import GHC.Conc (labelThread)
import Network.Connection (ConnectionParams (..), TLSSettings (..), connectTo, connectionClose,
connectionGetChunk, connectionPut, initConnectionContext)
import Network.URI (URI (..), nullURIAuth, unEscapeString, uriPort, uriRegName, uriUserInfo)
import qualified Network.WebSockets as WS
import Network.WebSockets.Stream (makeStream)
import System.IO.Error (catchIOError, isEOFError)
import System.Timeout (timeout)
import qualified Data.Set as Set
import Network.MQTT.Topic (Filter, Topic, mkTopic, unFilter, unTopic)
import Network.MQTT.Types as T
data ConnState = Starting
| Connected
| Stopped
| Disconnected
| DiscoErr DisconnectRequest
| ConnErr ConnACKFlags deriving (ConnState -> ConnState -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConnState -> ConnState -> Bool
$c/= :: ConnState -> ConnState -> Bool
== :: ConnState -> ConnState -> Bool
$c== :: ConnState -> ConnState -> Bool
Eq, Int -> ConnState -> ShowS
[ConnState] -> ShowS
ConnState -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConnState] -> ShowS
$cshowList :: [ConnState] -> ShowS
show :: ConnState -> String
$cshow :: ConnState -> String
showsPrec :: Int -> ConnState -> ShowS
$cshowsPrec :: Int -> ConnState -> ShowS
Show)
data DispatchType = DSubACK | DUnsubACK | DPubACK | DPubREC | DPubREL | DPubCOMP
deriving (DispatchType -> DispatchType -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: DispatchType -> DispatchType -> Bool
$c/= :: DispatchType -> DispatchType -> Bool
== :: DispatchType -> DispatchType -> Bool
$c== :: DispatchType -> DispatchType -> Bool
Eq, Int -> DispatchType -> ShowS
[DispatchType] -> ShowS
DispatchType -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [DispatchType] -> ShowS
$cshowList :: [DispatchType] -> ShowS
show :: DispatchType -> String
$cshow :: DispatchType -> String
showsPrec :: Int -> DispatchType -> ShowS
$cshowsPrec :: Int -> DispatchType -> ShowS
Show, Eq DispatchType
DispatchType -> DispatchType -> Bool
DispatchType -> DispatchType -> Ordering
DispatchType -> DispatchType -> DispatchType
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: DispatchType -> DispatchType -> DispatchType
$cmin :: DispatchType -> DispatchType -> DispatchType
max :: DispatchType -> DispatchType -> DispatchType
$cmax :: DispatchType -> DispatchType -> DispatchType
>= :: DispatchType -> DispatchType -> Bool
$c>= :: DispatchType -> DispatchType -> Bool
> :: DispatchType -> DispatchType -> Bool
$c> :: DispatchType -> DispatchType -> Bool
<= :: DispatchType -> DispatchType -> Bool
$c<= :: DispatchType -> DispatchType -> Bool
< :: DispatchType -> DispatchType -> Bool
$c< :: DispatchType -> DispatchType -> Bool
compare :: DispatchType -> DispatchType -> Ordering
$ccompare :: DispatchType -> DispatchType -> Ordering
Ord, Int -> DispatchType
DispatchType -> Int
DispatchType -> [DispatchType]
DispatchType -> DispatchType
DispatchType -> DispatchType -> [DispatchType]
DispatchType -> DispatchType -> DispatchType -> [DispatchType]
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
enumFromThenTo :: DispatchType -> DispatchType -> DispatchType -> [DispatchType]
$cenumFromThenTo :: DispatchType -> DispatchType -> DispatchType -> [DispatchType]
enumFromTo :: DispatchType -> DispatchType -> [DispatchType]
$cenumFromTo :: DispatchType -> DispatchType -> [DispatchType]
enumFromThen :: DispatchType -> DispatchType -> [DispatchType]
$cenumFromThen :: DispatchType -> DispatchType -> [DispatchType]
enumFrom :: DispatchType -> [DispatchType]
$cenumFrom :: DispatchType -> [DispatchType]
fromEnum :: DispatchType -> Int
$cfromEnum :: DispatchType -> Int
toEnum :: Int -> DispatchType
$ctoEnum :: Int -> DispatchType
pred :: DispatchType -> DispatchType
$cpred :: DispatchType -> DispatchType
succ :: DispatchType -> DispatchType
$csucc :: DispatchType -> DispatchType
Enum, DispatchType
forall a. a -> a -> Bounded a
maxBound :: DispatchType
$cmaxBound :: DispatchType
minBound :: DispatchType
$cminBound :: DispatchType
Bounded)
data MessageCallback = NoCallback
| SimpleCallback (MQTTClient -> Topic -> BL.ByteString -> [Property] -> IO ())
| OrderedCallback (MQTTClient -> Topic -> BL.ByteString -> [Property] -> IO ())
| LowLevelCallback (MQTTClient -> PublishRequest -> IO ())
| OrderedLowLevelCallback (MQTTClient -> PublishRequest -> IO ())
data MQTTClient = MQTTClient {
MQTTClient -> TChan MQTTPkt
_ch :: TChan MQTTPkt
, MQTTClient -> TVar Word16
_pktID :: TVar Word16
, MQTTClient -> MessageCallback
_cb :: MessageCallback
, MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks :: TVar (Map (DispatchType,Word16) (TChan MQTTPkt))
, MQTTClient -> Map Word16 PublishRequest
_inflight :: Decaying.Map Word16 PublishRequest
, MQTTClient -> TVar ConnState
_st :: TVar ConnState
, MQTTClient -> TVar (Maybe (Async ()))
_ct :: TVar (Maybe (Async ()))
, MQTTClient -> TVar (Map Topic Word16)
_outA :: TVar (Map Topic Word16)
, MQTTClient -> TVar (Map Word16 Topic)
_inA :: TVar (Map Word16 Topic)
, MQTTClient -> TVar ConnACKFlags
_connACKFlags :: TVar ConnACKFlags
, MQTTClient -> Map ByteString MessageCallback
_corr :: Decaying.Map BL.ByteString MessageCallback
, MQTTClient -> MVar (IO ())
_cbM :: MVar (IO ())
, MQTTClient -> TVar (Maybe (Async ()))
_cbHandle :: TVar (Maybe (Async ()))
}
data MQTTConfig = MQTTConfig{
MQTTConfig -> Bool
_cleanSession :: Bool
, MQTTConfig -> Maybe LastWill
_lwt :: Maybe LastWill
, MQTTConfig -> MessageCallback
_msgCB :: MessageCallback
, MQTTConfig -> ProtocolLevel
_protocol :: ProtocolLevel
, MQTTConfig -> [Property]
_connProps :: [Property]
, MQTTConfig -> String
_hostname :: String
, MQTTConfig -> Int
_port :: Int
, MQTTConfig -> String
_connID :: String
, MQTTConfig -> Maybe String
_username :: Maybe String
, MQTTConfig -> Maybe String
_password :: Maybe String
, MQTTConfig -> Int
_connectTimeout :: Int
, MQTTConfig -> TLSSettings
_tlsSettings :: TLSSettings
, MQTTConfig -> Int
_pingPeriod :: Int
, MQTTConfig -> Int
_pingPatience :: Int
}
mqttConfig :: MQTTConfig
mqttConfig :: MQTTConfig
mqttConfig = MQTTConfig{_hostname :: String
_hostname=String
"", _port :: Int
_port=Int
1883, _connID :: String
_connID=String
"",
_username :: Maybe String
_username=forall a. Maybe a
Nothing, _password :: Maybe String
_password=forall a. Maybe a
Nothing,
_cleanSession :: Bool
_cleanSession=Bool
True, _lwt :: Maybe LastWill
_lwt=forall a. Maybe a
Nothing,
_msgCB :: MessageCallback
_msgCB=MessageCallback
NoCallback,
_protocol :: ProtocolLevel
_protocol=ProtocolLevel
Protocol311, _connProps :: [Property]
_connProps=forall a. Monoid a => a
mempty,
_connectTimeout :: Int
_connectTimeout=Int
180000000,
_tlsSettings :: TLSSettings
_tlsSettings=Bool -> Bool -> Bool -> TLSSettings
TLSSettingsSimple Bool
False Bool
False Bool
False,
_pingPeriod :: Int
_pingPeriod=Int
30000000,
_pingPatience :: Int
_pingPatience=Int
90000000}
connectURI :: MQTTConfig -> URI -> IO MQTTClient
connectURI :: MQTTConfig -> URI -> IO MQTTClient
connectURI cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
TLSSettings
ProtocolLevel
MessageCallback
_pingPatience :: Int
_pingPeriod :: Int
_tlsSettings :: TLSSettings
_connectTimeout :: Int
_password :: Maybe String
_username :: Maybe String
_connID :: String
_port :: Int
_hostname :: String
_connProps :: [Property]
_protocol :: ProtocolLevel
_msgCB :: MessageCallback
_lwt :: Maybe LastWill
_cleanSession :: Bool
_pingPatience :: MQTTConfig -> Int
_pingPeriod :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_connectTimeout :: MQTTConfig -> Int
_password :: MQTTConfig -> Maybe String
_username :: MQTTConfig -> Maybe String
_connID :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_hostname :: MQTTConfig -> String
_connProps :: MQTTConfig -> [Property]
_protocol :: MQTTConfig -> ProtocolLevel
_msgCB :: MQTTConfig -> MessageCallback
_lwt :: MQTTConfig -> Maybe LastWill
_cleanSession :: MQTTConfig -> Bool
..} URI
uri = do
let cf :: MQTTConfig -> IO MQTTClient
cf = case URI -> String
uriScheme URI
uri of
String
"mqtt:" -> MQTTConfig -> IO MQTTClient
runClient
String
"mqtts:" -> MQTTConfig -> IO MQTTClient
runClientTLS
String
"ws:" -> URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS URI
uri Bool
False
String
"wss:" -> URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS URI
uri Bool
True
String
us -> forall a. String -> a
mqttFail forall a b. (a -> b) -> a -> b
$ String
"invalid URI scheme: " forall a. Semigroup a => a -> a -> a
<> String
us
a :: URIAuth
a = forall a. a -> Maybe a -> a
fromMaybe URIAuth
nullURIAuth forall a b. (a -> b) -> a -> b
$ URI -> Maybe URIAuth
uriAuthority URI
uri
(Maybe String
u,Maybe String
p) = String -> (Maybe String, Maybe String)
up (URIAuth -> String
uriUserInfo URIAuth
a)
Maybe MQTTClient
v <- forall a. String -> Int -> IO a -> IO (Maybe a)
namedTimeout String
"MQTT connect" Int
_connectTimeout forall a b. (a -> b) -> a -> b
$
MQTTConfig -> IO MQTTClient
cf MQTTConfig
cfg{_connID :: String
Network.MQTT.Client._connID=forall {p}. p -> ShowS
cid ProtocolLevel
_protocol (URI -> String
uriFragment URI
uri),
_hostname :: String
_hostname=URIAuth -> String
uriRegName URIAuth
a, _port :: Int
_port=forall {a} {a}.
(Eq a, IsString a, Num a, Read a) =>
String -> a -> a
port (URIAuth -> String
uriPort URIAuth
a) (URI -> String
uriScheme URI
uri),
_username :: Maybe String
Network.MQTT.Client._username=Maybe String
u, _password :: Maybe String
Network.MQTT.Client._password=Maybe String
p}
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall a. String -> a
mqttFail forall a b. (a -> b) -> a -> b
$ String
"connection to " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show URI
uri forall a. Semigroup a => a -> a -> a
<> String
" timed out") forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe MQTTClient
v
where
port :: String -> a -> a
port String
"" a
"mqtt:" = a
1883
port String
"" a
"mqtts:" = a
8883
port String
"" a
"ws:" = a
80
port String
"" a
"wss:" = a
443
port String
x a
_ = (forall a. Read a => String -> a
read forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. [a] -> [a]
tail) String
x
cid :: p -> ShowS
cid p
_ [Char
'#'] = String
""
cid p
_ (Char
'#':String
xs) = String
xs
cid p
_ String
_ = String
""
up :: String -> (Maybe String, Maybe String)
up String
"" = (forall a. Maybe a
Nothing, forall a. Maybe a
Nothing)
up String
x = let (String
u,String
r) = forall a. (a -> Bool) -> [a] -> ([a], [a])
break (forall a. Eq a => a -> a -> Bool
== Char
':') (forall a. [a] -> [a]
init String
x) in
(forall a. a -> Maybe a
Just (ShowS
unEscapeString String
u), if String
r forall a. Eq a => a -> a -> Bool
== String
"" then forall a. Maybe a
Nothing else forall a. a -> Maybe a
Just (ShowS
unEscapeString forall a b. (a -> b) -> a -> b
$ forall a. [a] -> [a]
tail String
r))
runClient :: MQTTConfig -> IO MQTTClient
runClient :: MQTTConfig -> IO MQTTClient
runClient cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
TLSSettings
ProtocolLevel
MessageCallback
_pingPatience :: Int
_pingPeriod :: Int
_tlsSettings :: TLSSettings
_connectTimeout :: Int
_password :: Maybe String
_username :: Maybe String
_connID :: String
_port :: Int
_hostname :: String
_connProps :: [Property]
_protocol :: ProtocolLevel
_msgCB :: MessageCallback
_lwt :: Maybe LastWill
_cleanSession :: Bool
_pingPatience :: MQTTConfig -> Int
_pingPeriod :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_connectTimeout :: MQTTConfig -> Int
_password :: MQTTConfig -> Maybe String
_username :: MQTTConfig -> Maybe String
_connID :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_hostname :: MQTTConfig -> String
_connProps :: MQTTConfig -> [Property]
_protocol :: MQTTConfig -> ProtocolLevel
_msgCB :: MQTTConfig -> MessageCallback
_lwt :: MQTTConfig -> Maybe LastWill
_cleanSession :: MQTTConfig -> Bool
..} = ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat (forall a. ClientSettings -> (AppData -> IO a) -> IO a
runTCPClient (Int -> ByteString -> ClientSettings
clientSettings Int
_port (String -> ByteString
BCS.pack String
_hostname))) MQTTConfig
cfg
runClientTLS :: MQTTConfig -> IO MQTTClient
runClientTLS :: MQTTConfig -> IO MQTTClient
runClientTLS cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
TLSSettings
ProtocolLevel
MessageCallback
_pingPatience :: Int
_pingPeriod :: Int
_tlsSettings :: TLSSettings
_connectTimeout :: Int
_password :: Maybe String
_username :: Maybe String
_connID :: String
_port :: Int
_hostname :: String
_connProps :: [Property]
_protocol :: ProtocolLevel
_msgCB :: MessageCallback
_lwt :: Maybe LastWill
_cleanSession :: Bool
_pingPatience :: MQTTConfig -> Int
_pingPeriod :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_connectTimeout :: MQTTConfig -> Int
_password :: MQTTConfig -> Maybe String
_username :: MQTTConfig -> Maybe String
_connID :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_hostname :: MQTTConfig -> String
_connProps :: MQTTConfig -> [Property]
_protocol :: MQTTConfig -> ProtocolLevel
_msgCB :: MQTTConfig -> MessageCallback
_lwt :: MQTTConfig -> Maybe LastWill
_cleanSession :: MQTTConfig -> Bool
..} = ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat (forall (m :: * -> *) a.
MonadUnliftIO m =>
TLSClientConfig -> (AppData -> m a) -> m a
runTLSClient TLSClientConfig
tlsConf) MQTTConfig
cfg
where tlsConf :: TLSClientConfig
tlsConf = (Int -> ByteString -> TLSClientConfig
tlsClientConfig Int
_port (String -> ByteString
BCS.pack String
_hostname)) {tlsClientTLSSettings :: TLSSettings
tlsClientTLSSettings=TLSSettings
_tlsSettings}
tcpCompat :: ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat :: ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat (AppData -> IO ()) -> IO ()
mkconn = ((MQTTConduit -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
runMQTTConduit (forall {a} {m :: * -> *} {m :: * -> *} {c} {t} {i} {o}.
(HasReadWrite a, MonadIO m, MonadIO m) =>
((a -> c) -> t)
-> ((ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c)
-> t
adapt (AppData -> IO ()) -> IO ()
mkconn)
where adapt :: ((a -> c) -> t)
-> ((ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c)
-> t
adapt (a -> c) -> t
mk (ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c
f = (a -> c) -> t
mk ((ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall {ad} {m :: * -> *} {m :: * -> *} {i} {o}.
(HasReadWrite ad, MonadIO m, MonadIO m) =>
ad -> (ConduitT i ByteString m (), ConduitT ByteString o m ())
adaptor)
adaptor :: ad -> (ConduitT i ByteString m (), ConduitT ByteString o m ())
adaptor ad
ad = (forall ad (m :: * -> *) i.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT i ByteString m ()
appSource ad
ad, forall ad (m :: * -> *) o.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT ByteString o m ()
appSink ad
ad)
runWS :: URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS :: URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS URI{String
uriPath :: URI -> String
uriPath :: String
uriPath, String
uriQuery :: URI -> String
uriQuery :: String
uriQuery} Bool
secure cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
TLSSettings
ProtocolLevel
MessageCallback
_pingPatience :: Int
_pingPeriod :: Int
_tlsSettings :: TLSSettings
_connectTimeout :: Int
_password :: Maybe String
_username :: Maybe String
_connID :: String
_port :: Int
_hostname :: String
_connProps :: [Property]
_protocol :: ProtocolLevel
_msgCB :: MessageCallback
_lwt :: Maybe LastWill
_cleanSession :: Bool
_pingPatience :: MQTTConfig -> Int
_pingPeriod :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_connectTimeout :: MQTTConfig -> Int
_password :: MQTTConfig -> Maybe String
_username :: MQTTConfig -> Maybe String
_connID :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_hostname :: MQTTConfig -> String
_connProps :: MQTTConfig -> [Property]
_protocol :: MQTTConfig -> ProtocolLevel
_msgCB :: MQTTConfig -> MessageCallback
_lwt :: MQTTConfig -> Maybe LastWill
_cleanSession :: MQTTConfig -> Bool
..} =
((MQTTConduit -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
runMQTTConduit (forall {c} {t}. ((Connection -> c) -> t) -> (MQTTConduit -> c) -> t
adapt forall a b. (a -> b) -> a -> b
$ Bool
-> String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> ClientApp ()
-> IO ()
cf Bool
secure String
_hostname Int
_port String
endpoint ConnectionOptions
WS.defaultConnectionOptions Headers
hdrs) MQTTConfig
cfg
where
hdrs :: Headers
hdrs = [(CI ByteString
"Sec-WebSocket-Protocol", ByteString
"mqtt")]
adapt :: ((Connection -> c) -> t) -> (MQTTConduit -> c) -> t
adapt (Connection -> c) -> t
mk MQTTConduit -> c
f = (Connection -> c) -> t
mk (MQTTConduit -> c
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> MQTTConduit
adaptor)
adaptor :: Connection -> MQTTConduit
adaptor Connection
s = (Connection -> ConduitT () ByteString IO ()
wsSource Connection
s, Connection -> ConduitT ByteString Void IO ()
wsSink Connection
s)
endpoint :: String
endpoint = String
uriPath forall a. Semigroup a => a -> a -> a
<> String
uriQuery
cf :: Bool -> String -> Int -> String -> WS.ConnectionOptions -> WS.Headers -> WS.ClientApp () -> IO ()
cf :: Bool
-> String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> ClientApp ()
-> IO ()
cf Bool
False = forall a.
String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> ClientApp a
-> IO a
WS.runClientWith
cf Bool
True = String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> ClientApp ()
-> IO ()
runWSS
wsSource :: WS.Connection -> ConduitT () BCS.ByteString IO ()
wsSource :: Connection -> ConduitT () ByteString IO ()
wsSource Connection
ws = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
ByteString
bs <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. WebSocketsData a => Connection -> IO a
WS.receiveData Connection
ws
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteString -> Bool
BCS.null ByteString
bs) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield ByteString
bs
wsSink :: WS.Connection -> ConduitT BCS.ByteString Void IO ()
wsSink :: Connection -> ConduitT ByteString Void IO ()
wsSink Connection
ws = forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (\ByteString
bs -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a. WebSocketsData a => Connection -> a -> IO ()
WS.sendBinaryData Connection
ws ByteString
bs) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Connection -> ConduitT ByteString Void IO ()
wsSink Connection
ws) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await
runWSS :: String -> Int -> String -> WS.ConnectionOptions -> WS.Headers -> WS.ClientApp () -> IO ()
runWSS :: String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> ClientApp ()
-> IO ()
runWSS String
host Int
port String
path ConnectionOptions
options Headers
hdrs' ClientApp ()
app = do
let connectionParams :: ConnectionParams
connectionParams = ConnectionParams
{ connectionHostname :: String
connectionHostname = String
host
, connectionPort :: PortNumber
connectionPort = forall a. Enum a => Int -> a
toEnum Int
port
, connectionUseSecure :: Maybe TLSSettings
connectionUseSecure = forall a. a -> Maybe a
Just TLSSettings
_tlsSettings
, connectionUseSocks :: Maybe ProxySettings
connectionUseSocks = forall a. Maybe a
Nothing
}
ConnectionContext
context <- IO ConnectionContext
initConnectionContext
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
E.bracket (ConnectionContext -> ConnectionParams -> IO Connection
connectTo ConnectionContext
context ConnectionParams
connectionParams) Connection -> IO ()
connectionClose
(\Connection
conn -> do
Stream
stream <- IO (Maybe ByteString) -> (Maybe ByteString -> IO ()) -> IO Stream
makeStream (Connection -> IO (Maybe ByteString)
reader Connection
conn) (Connection -> Maybe ByteString -> IO ()
writer Connection
conn)
forall a.
Stream
-> String
-> String
-> ConnectionOptions
-> Headers
-> ClientApp a
-> IO a
WS.runClientWithStream Stream
stream String
host String
path ConnectionOptions
options Headers
hdrs' ClientApp ()
app)
where
reader :: Connection -> IO (Maybe ByteString)
reader Connection
conn =
forall a. IO a -> (IOError -> IO a) -> IO a
catchIOError (forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO ByteString
connectionGetChunk Connection
conn)
(\IOError
e -> if IOError -> Bool
isEOFError IOError
e then forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing else forall e a. Exception e => e -> IO a
E.throwIO IOError
e)
writer :: Connection -> Maybe ByteString -> IO ()
writer Connection
conn = forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (Connection -> ByteString -> IO ()
connectionPut Connection
conn forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BC.toStrict)
mqttFail :: String -> a
mqttFail :: forall a. String -> a
mqttFail = forall a e. Exception e => e -> a
E.throw forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> MQTTException
MQTTException
namedAsync :: String -> IO a -> IO (Async a)
namedAsync :: forall a. String -> IO a -> IO (Async a)
namedAsync String
s IO a
a = forall a. IO a -> IO (Async a)
async IO a
a forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Async a
p -> ThreadId -> String -> IO ()
labelThread (forall a. Async a -> ThreadId
asyncThreadId Async a
p) String
s forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Applicative f => a -> f a
pure Async a
p
namedTimeout :: String -> Int -> IO a -> IO (Maybe a)
namedTimeout :: forall a. String -> Int -> IO a -> IO (Maybe a)
namedTimeout String
n Int
to IO a
a = forall a. Int -> IO a -> IO (Maybe a)
timeout Int
to (IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ThreadId
tid -> ThreadId -> String -> IO ()
labelThread ThreadId
tid String
n forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO a
a)
type MQTTConduit = (ConduitT () BCS.ByteString IO (), ConduitT BCS.ByteString Void IO ())
runMQTTConduit :: ((MQTTConduit -> IO ()) -> IO ())
-> MQTTConfig
-> IO MQTTClient
runMQTTConduit :: ((MQTTConduit -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
runMQTTConduit (MQTTConduit -> IO ()) -> IO ()
mkconn MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
TLSSettings
ProtocolLevel
MessageCallback
_pingPatience :: Int
_pingPeriod :: Int
_tlsSettings :: TLSSettings
_connectTimeout :: Int
_password :: Maybe String
_username :: Maybe String
_connID :: String
_port :: Int
_hostname :: String
_connProps :: [Property]
_protocol :: ProtocolLevel
_msgCB :: MessageCallback
_lwt :: Maybe LastWill
_cleanSession :: Bool
_pingPatience :: MQTTConfig -> Int
_pingPeriod :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_connectTimeout :: MQTTConfig -> Int
_password :: MQTTConfig -> Maybe String
_username :: MQTTConfig -> Maybe String
_connID :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_hostname :: MQTTConfig -> String
_connProps :: MQTTConfig -> [Property]
_protocol :: MQTTConfig -> ProtocolLevel
_msgCB :: MQTTConfig -> MessageCallback
_lwt :: MQTTConfig -> Maybe LastWill
_cleanSession :: MQTTConfig -> Bool
..} = do
TChan MQTTPkt
_ch <- forall a. IO (TChan a)
newTChanIO
TVar Word16
_pktID <- forall a. a -> IO (TVar a)
newTVarIO Word16
1
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks <- forall a. a -> IO (TVar a)
newTVarIO forall a. Monoid a => a
mempty
Map Word16 PublishRequest
_inflight <- forall k a. Ord k => NominalDiffTime -> IO (Map k a)
Decaying.new NominalDiffTime
60
TVar ConnState
_st <- forall a. a -> IO (TVar a)
newTVarIO ConnState
Starting
TVar (Maybe (Async ()))
_ct <- forall a. a -> IO (TVar a)
newTVarIO forall a. Maybe a
Nothing
TVar (Map Topic Word16)
_outA <- forall a. a -> IO (TVar a)
newTVarIO forall a. Monoid a => a
mempty
TVar (Map Word16 Topic)
_inA <- forall a. a -> IO (TVar a)
newTVarIO forall a. Monoid a => a
mempty
TVar ConnACKFlags
_connACKFlags <- forall a. a -> IO (TVar a)
newTVarIO (SessionReuse -> ConnACKRC -> [Property] -> ConnACKFlags
ConnACKFlags SessionReuse
NewSession ConnACKRC
ConnUnspecifiedError forall a. Monoid a => a
mempty)
Map ByteString MessageCallback
_corr <- forall k a. Ord k => NominalDiffTime -> IO (Map k a)
Decaying.new NominalDiffTime
600
MVar (IO ())
_cbM <- forall a. IO (MVar a)
newEmptyMVar
TVar (Maybe (Async ()))
_cbHandle <- forall a. a -> IO (TVar a)
newTVarIO forall a. Maybe a
Nothing
let _cb :: MessageCallback
_cb = MessageCallback
_msgCB
cli :: MQTTClient
cli = MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_cb :: MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: Map ByteString MessageCallback
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: Map Word16 PublishRequest
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: Map ByteString MessageCallback
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: Map Word16 PublishRequest
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
..}
Async ()
t <- forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT clientThread" forall a b. (a -> b) -> a -> b
$ MQTTClient -> IO ()
clientThread MQTTClient
cli
ConnState
s <- forall a. STM a -> IO a
atomically (MQTTClient -> Async () -> STM ConnState
waitForLaunch MQTTClient
cli Async ()
t)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnState
s forall a. Eq a => a -> a -> Bool
== ConnState
Disconnected) forall a b. (a -> b) -> a -> b
$ forall a. Async a -> IO a
wait Async ()
t
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ MQTTClient -> STM ()
checkConnected MQTTClient
cli
forall (f :: * -> *) a. Applicative f => a -> f a
pure MQTTClient
cli
where
clientThread :: MQTTClient -> IO ()
clientThread MQTTClient
cli = forall a b. IO a -> IO b -> IO a
E.finally IO ()
connectAndRun IO ()
markDisco
where
connectAndRun :: IO ()
connectAndRun = (MQTTConduit -> IO ()) -> IO ()
mkconn forall a b. (a -> b) -> a -> b
$ \MQTTConduit
ad -> forall {m :: * -> *} {b} {a} {a}.
Monad m =>
b -> (a, ConduitT ByteString Void m a) -> m b
start MQTTClient
cli MQTTConduit
ad forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MQTTConduit -> MQTTClient -> IO ()
run MQTTConduit
ad
markDisco :: IO ()
markDisco = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
ConnState
st <- forall a. TVar a -> STM a
readTVar (MQTTClient -> TVar ConnState
_st MQTTClient
cli)
forall (f :: * -> *). Alternative f => Bool -> f ()
guard forall a b. (a -> b) -> a -> b
$ ConnState
st forall a. Eq a => a -> a -> Bool
== ConnState
Starting Bool -> Bool -> Bool
|| ConnState
st forall a. Eq a => a -> a -> Bool
== ConnState
Connected
forall a. TVar a -> a -> STM ()
writeTVar (MQTTClient -> TVar ConnState
_st MQTTClient
cli) ConnState
Disconnected
start :: b -> (a, ConduitT ByteString Void m a) -> m b
start b
c (a
_,ConduitT ByteString Void m a
sink) = do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit forall a b. (a -> b) -> a -> b
$ do
let req :: ConnectRequest
req = ConnectRequest
connectRequest{_connID :: ByteString
T._connID=String -> ByteString
BC.pack String
_connID,
_lastWill :: Maybe LastWill
T._lastWill=Maybe LastWill
_lwt,
_username :: Maybe ByteString
T._username=String -> ByteString
BC.pack forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe String
_username,
_password :: Maybe ByteString
T._password=String -> ByteString
BC.pack forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe String
_password,
_cleanSession :: Bool
T._cleanSession=Bool
_cleanSession,
_connProperties :: [Property]
T._connProperties=[Property]
_connProps}
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (ByteString -> ByteString
BL.toStrict forall a b. (a -> b) -> a -> b
$ forall a. ByteMe a => ProtocolLevel -> a -> ByteString
toByteString ProtocolLevel
_protocol ConnectRequest
req) forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString Void m a
sink
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
c
run :: MQTTConduit -> MQTTClient -> IO ()
run (ConduitT () ByteString IO ()
src,ConduitT ByteString Void IO ()
sink) c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: Map ByteString MessageCallback
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: Map Word16 PublishRequest
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> Map ByteString MessageCallback
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> Map Word16 PublishRequest
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} = do
TChan Bool
pch <- forall a. IO (TChan a)
newTChanIO
Async ()
o <- forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT out" forall a b. (a -> b) -> a -> b
$ IO ()
onceConnected forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
processOut
Async ()
p <- forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT ping" forall a b. (a -> b) -> a -> b
$ IO ()
onceConnected forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall {b}. IO b
doPing
Async ()
w <- forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT watchdog" forall a b. (a -> b) -> a -> b
$ forall {a} {b}. TChan a -> IO b
watchdog TChan Bool
pch
Async ()
s <- forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT in" forall a b. (a -> b) -> a -> b
$ TChan Bool -> IO ()
doSrc TChan Bool
pch
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. [Async a] -> IO (Async a, a)
waitAnyCancel [Async ()
o, Async ()
p, Async ()
w, Async ()
s]
where
doSrc :: TChan Bool -> IO ()
doSrc TChan Bool
pch = forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit forall a b. (a -> b) -> a -> b
$ ConduitT () ByteString IO ()
src
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| forall a (m :: * -> *) b.
(AttoparsecInput a, MonadThrow m) =>
Parser a b -> ConduitT a (PositionRange, b) m ()
conduitParser (ProtocolLevel -> Parser MQTTPkt
parsePacket ProtocolLevel
_protocol)
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
C.mapM_ (\(PositionRange
_,MQTTPkt
x) -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (MQTTClient -> TChan Bool -> MQTTPkt -> IO ()
dispatch MQTTClient
c TChan Bool
pch forall a b. (a -> b) -> a -> b
$! MQTTPkt
x))
onceConnected :: IO ()
onceConnected = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ Bool -> STM ()
check forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a. Eq a => a -> a -> Bool
== ConnState
Connected) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> STM a
readTVar TVar ConnState
_st
processOut :: IO ()
processOut = forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a i. Monad m => m a -> ConduitT i a m ()
C.repeatM (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ MQTTClient -> STM ()
checkConnected MQTTClient
c forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. TChan a -> STM a
readTChan TChan MQTTPkt
_ch))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map (ByteString -> ByteString
BL.toStrict forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. ByteMe a => ProtocolLevel -> a -> ByteString
toByteString ProtocolLevel
_protocol)
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString Void IO ()
sink
doPing :: IO b
doPing = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
_pingPeriod forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c MQTTPkt
PingPkt
watchdog :: TChan a -> IO b
watchdog TChan a
ch = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
TVar Bool
toch <- Int -> IO (TVar Bool)
registerDelay Int
_pingPatience
Bool
timedOut <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ ((Bool -> STM ()
check forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> STM a
readTVar TVar Bool
toch) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True) forall a. STM a -> STM a -> STM a
`orElse` (forall a. TChan a -> STM a
readTChan TChan a
ch forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
timedOut forall a b. (a -> b) -> a -> b
$ forall e. Exception e => MQTTClient -> e -> IO ()
killConn MQTTClient
c MQTTException
Timeout
waitForLaunch :: MQTTClient -> Async () -> STM ConnState
waitForLaunch MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: Map ByteString MessageCallback
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: Map Word16 PublishRequest
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> Map ByteString MessageCallback
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> Map Word16 PublishRequest
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} Async ()
t = do
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Async ()))
_ct (forall a. a -> Maybe a
Just Async ()
t)
ConnState
c <- forall a. TVar a -> STM a
readTVar TVar ConnState
_st
if ConnState
c forall a. Eq a => a -> a -> Bool
== ConnState
Starting then forall a. STM a
retry else forall (f :: * -> *) a. Applicative f => a -> f a
pure ConnState
c
waitForClient :: MQTTClient -> IO ()
waitForClient :: MQTTClient -> IO ()
waitForClient c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: Map ByteString MessageCallback
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: Map Word16 PublishRequest
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> Map ByteString MessageCallback
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> Map Word16 PublishRequest
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall a b. IO a -> IO b -> IO a
E.finally (MQTTClient -> IO ()
stopCallbackThread MQTTClient
c) forall a b. (a -> b) -> a -> b
$ do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse forall a. Async a -> IO a
wait forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) forall e a. Exception e => e -> IO a
E.throwIO forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. STM a -> IO a
atomically (MQTTClient -> ConnState -> STM (Maybe SomeException)
stateX MQTTClient
c ConnState
Stopped)
stopClient :: MQTTClient -> IO ()
stopClient :: MQTTClient -> IO ()
stopClient MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: Map ByteString MessageCallback
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: Map Word16 PublishRequest
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> Map ByteString MessageCallback
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> Map Word16 PublishRequest
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} = do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse forall a. Async a -> IO ()
cancel forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse forall a. Async a -> IO ()
cancel forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_cbHandle
forall a. STM a -> IO a
atomically (forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st ConnState
Stopped)
stateX :: MQTTClient -> ConnState -> STM (Maybe E.SomeException)
stateX :: MQTTClient -> ConnState -> STM (Maybe SomeException)
stateX MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: Map ByteString MessageCallback
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: Map Word16 PublishRequest
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> Map ByteString MessageCallback
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> Map Word16 PublishRequest
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} ConnState
want = ConnState -> Maybe SomeException
f forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TVar a -> STM a
readTVar TVar ConnState
_st
where
je :: String -> Maybe SomeException
je = forall a. a -> Maybe a
Just forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e. Exception e => e -> SomeException
E.toException forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> MQTTException
MQTTException
f :: ConnState -> Maybe E.SomeException
f :: ConnState -> Maybe SomeException
f ConnState
Connected = if ConnState
want forall a. Eq a => a -> a -> Bool
== ConnState
Connected then forall a. Maybe a
Nothing else String -> Maybe SomeException
je String
"unexpectedly connected"
f ConnState
Stopped = if ConnState
want forall a. Eq a => a -> a -> Bool
== ConnState
Stopped then forall a. Maybe a
Nothing else String -> Maybe SomeException
je String
"unexpectedly stopped"
f ConnState
Disconnected = String -> Maybe SomeException
je String
"disconnected"
f ConnState
Starting = String -> Maybe SomeException
je String
"died while starting"
f (DiscoErr DisconnectRequest
x) = forall a. a -> Maybe a
Just forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e. Exception e => e -> SomeException
E.toException forall b c a. (b -> c) -> (a -> b) -> a -> c
. DisconnectRequest -> MQTTException
Discod forall a b. (a -> b) -> a -> b
$ DisconnectRequest
x
f (ConnErr ConnACKFlags
e) = String -> Maybe SomeException
je (forall a. Show a => a -> String
show ConnACKFlags
e)
data MQTTException = Timeout | BadData | Discod DisconnectRequest | MQTTException String deriving(MQTTException -> MQTTException -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: MQTTException -> MQTTException -> Bool
$c/= :: MQTTException -> MQTTException -> Bool
== :: MQTTException -> MQTTException -> Bool
$c== :: MQTTException -> MQTTException -> Bool
Eq, Int -> MQTTException -> ShowS
[MQTTException] -> ShowS
MQTTException -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [MQTTException] -> ShowS
$cshowList :: [MQTTException] -> ShowS
show :: MQTTException -> String
$cshow :: MQTTException -> String
showsPrec :: Int -> MQTTException -> ShowS
$cshowsPrec :: Int -> MQTTException -> ShowS
Show)
instance E.Exception MQTTException
dispatch :: MQTTClient -> TChan Bool -> MQTTPkt -> IO ()
dispatch :: MQTTClient -> TChan Bool -> MQTTPkt -> IO ()
dispatch c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: Map ByteString MessageCallback
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: Map Word16 PublishRequest
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> Map ByteString MessageCallback
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> Map Word16 PublishRequest
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} TChan Bool
pch MQTTPkt
pkt =
case MQTTPkt
pkt of
(ConnACKPkt ConnACKFlags
p) -> ConnACKFlags -> IO ()
connACKd ConnACKFlags
p
(PublishPkt PublishRequest
p) -> PublishRequest -> IO ()
pub PublishRequest
p
(SubACKPkt (SubscribeResponse Word16
i [Either SubErr QoS]
_ [Property]
_)) -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DSubACK Word16
i
(UnsubACKPkt (UnsubscribeResponse Word16
i [Property]
_ [UnsubStatus]
_)) -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DUnsubACK Word16
i
(PubACKPkt (PubACK Word16
i Word8
_ [Property]
_)) -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DPubACK Word16
i
(PubRELPkt (PubREL Word16
i Word8
_ [Property]
_)) -> Word16 -> IO ()
pubd Word16
i
(PubRECPkt (PubREC Word16
i Word8
_ [Property]
_)) -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DPubREC Word16
i
(PubCOMPPkt (PubCOMP Word16
i Word8
_ [Property]
_)) -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DPubCOMP Word16
i
(DisconnectPkt DisconnectRequest
req) -> DisconnectRequest -> IO ()
disco DisconnectRequest
req
MQTTPkt
PongPkt -> forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TChan a -> a -> STM ()
writeTChan TChan Bool
pch forall a b. (a -> b) -> a -> b
$ Bool
True
(AuthPkt AuthRequest
p) -> forall a. String -> a
mqttFail (String
"unexpected incoming auth: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show AuthRequest
p)
MQTTPkt
PingPkt -> forall a. String -> a
mqttFail String
"unexpected incoming ping packet"
(ConnPkt ConnectRequest
_ ProtocolLevel
_) -> forall a. String -> a
mqttFail String
"unexpected incoming connect"
(SubscribePkt SubscribeRequest
_) -> forall a. String -> a
mqttFail String
"unexpected incoming subscribe"
(UnsubscribePkt UnsubscribeRequest
_) -> forall a. String -> a
mqttFail String
"unexpected incoming unsubscribe"
where connACKd :: ConnACKFlags -> IO ()
connACKd connr :: ConnACKFlags
connr@(ConnACKFlags SessionReuse
_ ConnACKRC
val [Property]
_) = case ConnACKRC
val of
ConnACKRC
ConnAccepted -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnACKFlags
_connACKFlags ConnACKFlags
connr
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st ConnState
Connected
ConnACKRC
_ -> do
Maybe (Async ())
t <- forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st (ConnACKFlags -> ConnState
ConnErr ConnACKFlags
connr)
forall e. Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith (String -> MQTTException
MQTTException forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> String
show ConnACKFlags
connr) Maybe (Async ())
t
pub :: PublishRequest -> IO ()
pub p :: PublishRequest
p@PublishRequest{_pubQoS :: PublishRequest -> QoS
_pubQoS=QoS
QoS0} = forall a. STM a -> IO a
atomically (PublishRequest -> STM PublishRequest
resolve PublishRequest
p) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall {t :: * -> *}.
Foldable t =>
t MQTTPkt -> PublishRequest -> IO ()
notify forall a. Maybe a
Nothing
pub p :: PublishRequest
p@PublishRequest{_pubQoS :: PublishRequest -> QoS
_pubQoS=QoS
QoS1, Word16
_pubPktID :: PublishRequest -> Word16
_pubPktID :: Word16
_pubPktID} =
forall {t :: * -> *}.
Foldable t =>
t MQTTPkt -> PublishRequest -> IO ()
notify (forall a. a -> Maybe a
Just (PubACK -> MQTTPkt
PubACKPkt (Word16 -> Word8 -> [Property] -> PubACK
PubACK Word16
_pubPktID Word8
0 forall a. Monoid a => a
mempty))) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. STM a -> IO a
atomically (PublishRequest -> STM PublishRequest
resolve PublishRequest
p)
pub p :: PublishRequest
p@PublishRequest{_pubQoS :: PublishRequest -> QoS
_pubQoS=QoS
QoS2} = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
p' :: PublishRequest
p'@PublishRequest{Bool
[Property]
Word16
ByteString
QoS
_pubProps :: PublishRequest -> [Property]
_pubBody :: PublishRequest -> ByteString
_pubTopic :: PublishRequest -> ByteString
_pubRetain :: PublishRequest -> Bool
_pubDup :: PublishRequest -> Bool
_pubProps :: [Property]
_pubBody :: ByteString
_pubPktID :: Word16
_pubTopic :: ByteString
_pubRetain :: Bool
_pubQoS :: QoS
_pubDup :: Bool
_pubPktID :: PublishRequest -> Word16
_pubQoS :: PublishRequest -> QoS
..} <- PublishRequest -> STM PublishRequest
resolve PublishRequest
p
forall k v. Ord k => k -> v -> Map k v -> STM ()
Decaying.insert Word16
_pubPktID PublishRequest
p' Map Word16 PublishRequest
_inflight
MQTTClient -> MQTTPkt -> STM ()
sendPacket MQTTClient
c (PubREC -> MQTTPkt
PubRECPkt (Word16 -> Word8 -> [Property] -> PubREC
PubREC Word16
_pubPktID Word8
0 forall a. Monoid a => a
mempty))
pubd :: Word16 -> IO ()
pubd Word16
i = forall a. STM a -> IO a
atomically (forall k a.
Ord k =>
(k -> a -> Maybe a) -> k -> Map k a -> STM (Maybe a)
Decaying.updateLookupWithKey (\Word16
_ PublishRequest
_ -> forall a. Maybe a
Nothing) Word16
i Map Word16 PublishRequest
_inflight) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe PublishRequest
Nothing -> MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (PubCOMP -> MQTTPkt
PubCOMPPkt (Word16 -> Word8 -> [Property] -> PubCOMP
PubCOMP Word16
i Word8
0x92 forall a. Monoid a => a
mempty))
Just PublishRequest
p -> forall {t :: * -> *}.
Foldable t =>
t MQTTPkt -> PublishRequest -> IO ()
notify (forall a. a -> Maybe a
Just (PubCOMP -> MQTTPkt
PubCOMPPkt (Word16 -> Word8 -> [Property] -> PubCOMP
PubCOMP Word16
i Word8
0 forall a. Monoid a => a
mempty))) PublishRequest
p
notify :: t MQTTPkt -> PublishRequest -> IO ()
notify t MQTTPkt
rpkt p :: PublishRequest
p@PublishRequest{Bool
[Property]
Word16
ByteString
QoS
_pubProps :: [Property]
_pubBody :: ByteString
_pubPktID :: Word16
_pubTopic :: ByteString
_pubRetain :: Bool
_pubQoS :: QoS
_pubDup :: Bool
_pubProps :: PublishRequest -> [Property]
_pubBody :: PublishRequest -> ByteString
_pubTopic :: PublishRequest -> ByteString
_pubRetain :: PublishRequest -> Bool
_pubDup :: PublishRequest -> Bool
_pubPktID :: PublishRequest -> Word16
_pubQoS :: PublishRequest -> QoS
..} = do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall k v. Ord k => k -> Map k v -> STM ()
Decaying.delete Word16
_pubPktID Map Word16 PublishRequest
_inflight
MessageCallback
cb <- forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a. Applicative f => a -> f a
pure MessageCallback
_cb) (\ByteString
cd -> forall a. STM a -> IO a
atomically (forall k v. Ord k => v -> k -> Map k v -> STM v
Decaying.findWithDefault MessageCallback
_cb ByteString
cd Map ByteString MessageCallback
_corr)) Maybe ByteString
cdata
forall a. a -> IO a
E.evaluate forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. NFData a => a -> a
force forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< case MessageCallback
cb of
MessageCallback
NoCallback -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
SimpleCallback MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
f -> forall {a}. IO a -> IO ()
call (MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
f MQTTClient
c (ByteString -> Topic
blToTopic ByteString
_pubTopic) ByteString
_pubBody [Property]
_pubProps)
OrderedCallback MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
f -> forall {a}. IO a -> IO ()
callOrd (MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
f MQTTClient
c (ByteString -> Topic
blToTopic ByteString
_pubTopic) ByteString
_pubBody [Property]
_pubProps)
LowLevelCallback MQTTClient -> PublishRequest -> IO ()
f -> forall {a}. IO a -> IO ()
call (MQTTClient -> PublishRequest -> IO ()
f MQTTClient
c PublishRequest
p)
OrderedLowLevelCallback MQTTClient -> PublishRequest -> IO ()
f -> forall {a}. IO a -> IO ()
callOrd (MQTTClient -> PublishRequest -> IO ()
f MQTTClient
c PublishRequest
p)
where
call :: IO a -> IO ()
call IO a
a = forall a. Async a -> IO ()
link forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. String -> IO a -> IO (Async a)
namedAsync String
"notifier" (IO a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
respond)
callOrd :: IO a -> IO ()
callOrd IO a
a = forall a. MVar a -> a -> IO ()
putMVar MVar (IO ())
_cbM forall a b. (a -> b) -> a -> b
$ IO a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
respond
respond :: IO ()
respond = forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c) t MQTTPkt
rpkt
cdata :: Maybe ByteString
cdata = forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr Property -> Maybe ByteString -> Maybe ByteString
f forall a. Maybe a
Nothing [Property]
_pubProps
where f :: Property -> Maybe ByteString -> Maybe ByteString
f (PropCorrelationData ByteString
x) Maybe ByteString
_ = forall a. a -> Maybe a
Just ByteString
x
f Property
_ Maybe ByteString
o = Maybe ByteString
o
resolve :: PublishRequest -> STM PublishRequest
resolve p :: PublishRequest
p@PublishRequest{Bool
[Property]
Word16
ByteString
QoS
_pubProps :: [Property]
_pubBody :: ByteString
_pubPktID :: Word16
_pubTopic :: ByteString
_pubRetain :: Bool
_pubQoS :: QoS
_pubDup :: Bool
_pubProps :: PublishRequest -> [Property]
_pubBody :: PublishRequest -> ByteString
_pubTopic :: PublishRequest -> ByteString
_pubRetain :: PublishRequest -> Bool
_pubDup :: PublishRequest -> Bool
_pubPktID :: PublishRequest -> Word16
_pubQoS :: PublishRequest -> QoS
..} = do
Topic
topic <- Maybe Word16 -> STM Topic
resolveTopic (forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr Property -> Maybe Word16 -> Maybe Word16
aliasID forall a. Maybe a
Nothing [Property]
_pubProps)
forall (f :: * -> *) a. Applicative f => a -> f a
pure PublishRequest
p{_pubTopic :: ByteString
_pubTopic=Topic -> ByteString
topicToBL Topic
topic}
where
aliasID :: Property -> Maybe Word16 -> Maybe Word16
aliasID (PropTopicAlias Word16
x) Maybe Word16
_ = forall a. a -> Maybe a
Just Word16
x
aliasID Property
_ Maybe Word16
o = Maybe Word16
o
resolveTopic :: Maybe Word16 -> STM Topic
resolveTopic Maybe Word16
Nothing = forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Topic
blToTopic ByteString
_pubTopic)
resolveTopic (Just Word16
x) = do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString
_pubTopic forall a. Eq a => a -> a -> Bool
/= ByteString
"") forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map Word16 Topic)
_inA (forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Word16
x (ByteString -> Topic
blToTopic ByteString
_pubTopic))
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall a. String -> a
mqttFail (String
"failed to lookup topic alias " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Word16
x)) forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Word16
x forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> STM a
readTVar TVar (Map Word16 Topic)
_inA
delegate :: DispatchType -> Word16 -> IO ()
delegate DispatchType
dt Word16
pid = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (DispatchType -> STM ()
nak DispatchType
dt) (forall a. TChan a -> a -> STM ()
`writeTChan` MQTTPkt
pkt) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup (DispatchType
dt, Word16
pid) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> STM a
readTVar TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks
where
nak :: DispatchType -> STM ()
nak DispatchType
DPubREC = MQTTClient -> MQTTPkt -> STM ()
sendPacket MQTTClient
c (PubREL -> MQTTPkt
PubRELPkt (Word16 -> Word8 -> [Property] -> PubREL
PubREL Word16
pid Word8
0x92 forall a. Monoid a => a
mempty))
nak DispatchType
_ = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
disco :: DisconnectRequest -> IO ()
disco DisconnectRequest
req = do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st (DisconnectRequest -> ConnState
DiscoErr DisconnectRequest
req)
forall e. Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith (DisconnectRequest -> MQTTException
Discod DisconnectRequest
req) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
runCallbackThread :: MQTTClient -> IO ()
runCallbackThread :: MQTTClient -> IO ()
runCallbackThread MQTTClient{MessageCallback
_cb :: MessageCallback
_cb :: MQTTClient -> MessageCallback
_cb, MVar (IO ())
_cbM :: MVar (IO ())
_cbM :: MQTTClient -> MVar (IO ())
_cbM, TVar (Maybe (Async ()))
_cbHandle :: TVar (Maybe (Async ()))
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbHandle}
| MessageCallback -> Bool
isOrdered MessageCallback
_cb = do
TVar Bool
latch <- forall a. a -> IO (TVar a)
newTVarIO Bool
False
Async ()
handle <- forall a. String -> IO a -> IO (Async a)
namedAsync String
"ordered callbacks" (TVar Bool -> IO ()
waitFor TVar Bool
latch forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> forall {b}. IO b
runOrderedCallbacks)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
Maybe (Async ())
cbThread <- forall a. TVar a -> STM a
readTVar TVar (Maybe (Async ()))
_cbHandle
case Maybe (Async ())
cbThread of
Maybe (Async ())
Nothing -> do
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
latch Bool
True
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Async ()))
_cbHandle (forall a. a -> Maybe a
Just Async ()
handle)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
Just Async ()
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. Async a -> IO ()
cancel Async ()
handle)
| Bool
otherwise = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
where isOrdered :: MessageCallback -> Bool
isOrdered (OrderedCallback MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
_) = Bool
True
isOrdered (OrderedLowLevelCallback MQTTClient -> PublishRequest -> IO ()
_) = Bool
True
isOrdered MessageCallback
_ = Bool
False
waitFor :: TVar Bool -> IO ()
waitFor TVar Bool
latch = forall a. STM a -> IO a
atomically (Bool -> STM ()
check forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> STM a
readTVar TVar Bool
latch)
runOrderedCallbacks :: IO b
runOrderedCallbacks = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. MVar a -> IO a
takeMVar forall a b. (a -> b) -> a -> b
$ MVar (IO ())
_cbM
stopCallbackThread :: MQTTClient -> IO ()
stopCallbackThread :: MQTTClient -> IO ()
stopCallbackThread MQTTClient{TVar (Maybe (Async ()))
_cbHandle :: TVar (Maybe (Async ()))
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbHandle} = forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) forall a. Async a -> IO ()
cancel forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_cbHandle
maybeCancelWith :: E.Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith :: forall e. Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith e
e = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (forall e a. Exception e => Async a -> e -> IO ()
`cancelWith` e
e)
killConn :: E.Exception e => MQTTClient -> e -> IO ()
killConn :: forall e. Exception e => MQTTClient -> e -> IO ()
killConn MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: Map ByteString MessageCallback
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: Map Word16 PublishRequest
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> Map ByteString MessageCallback
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> Map Word16 PublishRequest
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} e
e = forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall e. Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith e
e
checkConnected :: MQTTClient -> STM ()
checkConnected :: MQTTClient -> STM ()
checkConnected MQTTClient
mc = forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) forall a e. Exception e => e -> a
E.throw forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MQTTClient -> ConnState -> STM (Maybe SomeException)
stateX MQTTClient
mc ConnState
Connected
isConnected :: MQTTClient -> IO Bool
isConnected :: MQTTClient -> IO Bool
isConnected = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> STM Bool
isConnectedSTM
isConnectedSTM :: MQTTClient -> STM Bool
isConnectedSTM :: MQTTClient -> STM Bool
isConnectedSTM MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: Map ByteString MessageCallback
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: Map Word16 PublishRequest
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> Map ByteString MessageCallback
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> Map Word16 PublishRequest
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} = (ConnState
Connected forall a. Eq a => a -> a -> Bool
==) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TVar a -> STM a
readTVar TVar ConnState
_st
sendPacket :: MQTTClient -> MQTTPkt -> STM ()
sendPacket :: MQTTClient -> MQTTPkt -> STM ()
sendPacket c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: Map ByteString MessageCallback
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: Map Word16 PublishRequest
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> Map ByteString MessageCallback
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> Map Word16 PublishRequest
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} MQTTPkt
p = MQTTClient -> STM ()
checkConnected MQTTClient
c forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. TChan a -> a -> STM ()
writeTChan TChan MQTTPkt
_ch MQTTPkt
p
sendPacketIO :: MQTTClient -> MQTTPkt -> IO ()
sendPacketIO :: MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> MQTTPkt -> STM ()
sendPacket MQTTClient
c
textToBL :: Text -> BL.ByteString
textToBL :: Text -> ByteString
textToBL = ByteString -> ByteString
BL.fromStrict forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
TE.encodeUtf8
blToText :: BL.ByteString -> Text
blToText :: ByteString -> Text
blToText = ByteString -> Text
TE.decodeUtf8 forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BL.toStrict
topicToBL :: Topic -> BL.ByteString
topicToBL :: Topic -> ByteString
topicToBL = Text -> ByteString
textToBL forall b c a. (b -> c) -> (a -> b) -> a -> c
. Topic -> Text
unTopic
filterToBL :: Filter -> BL.ByteString
filterToBL :: Filter -> ByteString
filterToBL = Text -> ByteString
textToBL forall b c a. (b -> c) -> (a -> b) -> a -> c
. Filter -> Text
unFilter
blToTopic :: BL.ByteString -> Topic
blToTopic :: ByteString -> Topic
blToTopic = forall a. HasCallStack => Maybe a -> a
fromJust forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Maybe Topic
mkTopic forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
blToText
reservePktID :: MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID :: MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: Map ByteString MessageCallback
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: Map Word16 PublishRequest
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> Map ByteString MessageCallback
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> Map Word16 PublishRequest
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} [DispatchType]
dts = do
MQTTClient -> STM ()
checkConnected MQTTClient
c
TChan MQTTPkt
ch <- forall a. STM (TChan a)
newTChan
Word16
pid <- forall a. TVar a -> STM a
readTVar TVar Word16
_pktID
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Word16
_pktID forall a b. (a -> b) -> a -> b
$ if Word16
pid forall a. Eq a => a -> a -> Bool
== forall a. Bounded a => a
maxBound then forall a b. a -> b -> a
const Word16
1 else forall a. Enum a => a -> a
succ
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks (forall k a. Ord k => Map k a -> Map k a -> Map k a
Map.union (forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [((DispatchType
t, Word16
pid), TChan MQTTPkt
ch) | DispatchType
t <- [DispatchType]
dts]))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TChan MQTTPkt
ch,Word16
pid)
releasePktID :: MQTTClient -> (DispatchType,Word16) -> STM ()
releasePktID :: MQTTClient -> (DispatchType, Word16) -> STM ()
releasePktID c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: Map ByteString MessageCallback
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: Map Word16 PublishRequest
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> Map ByteString MessageCallback
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> Map Word16 PublishRequest
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} (DispatchType, Word16)
k = MQTTClient -> STM ()
checkConnected MQTTClient
c forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks (forall k a. Ord k => k -> Map k a -> Map k a
Map.delete (DispatchType, Word16)
k)
releasePktIDs :: MQTTClient -> [(DispatchType,Word16)] -> STM ()
releasePktIDs :: MQTTClient -> [(DispatchType, Word16)] -> STM ()
releasePktIDs c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: Map ByteString MessageCallback
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: Map Word16 PublishRequest
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> Map ByteString MessageCallback
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> Map Word16 PublishRequest
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} [(DispatchType, Word16)]
ks = MQTTClient -> STM ()
checkConnected MQTTClient
c forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks forall {a}.
Map (DispatchType, Word16) a -> Map (DispatchType, Word16) a
deleteMany
where deleteMany :: Map (DispatchType, Word16) a -> Map (DispatchType, Word16) a
deleteMany Map (DispatchType, Word16) a
m = forall k a. Ord k => Map k a -> Set k -> Map k a
Map.withoutKeys Map (DispatchType, Word16) a
m (forall a. Ord a => [a] -> Set a
Set.fromList [(DispatchType, Word16)]
ks)
sendAndWait :: MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait :: MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: Map ByteString MessageCallback
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: Map Word16 PublishRequest
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> Map ByteString MessageCallback
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> Map Word16 PublishRequest
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} DispatchType
dt Word16 -> MQTTPkt
f = do
(TChan MQTTPkt
ch,Word16
pid) <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
(TChan MQTTPkt
ch,Word16
pid) <- MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID MQTTClient
c [DispatchType
dt]
MQTTClient -> MQTTPkt -> STM ()
sendPacket MQTTClient
c (Word16 -> MQTTPkt
f Word16
pid)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TChan MQTTPkt
ch,Word16
pid)
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
ConnState
st <- forall a. TVar a -> STM a
readTVar TVar ConnState
_st
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnState
st forall a. Eq a => a -> a -> Bool
/= ConnState
Connected) forall a b. (a -> b) -> a -> b
$ forall a. String -> a
mqttFail String
"disconnected waiting for response"
MQTTClient -> (DispatchType, Word16) -> STM ()
releasePktID MQTTClient
c (DispatchType
dt,Word16
pid)
forall a. TChan a -> STM a
readTChan TChan MQTTPkt
ch
subscribe :: MQTTClient -> [(Filter, SubOptions)] -> [Property] -> IO ([Either SubErr QoS], [Property])
subscribe :: MQTTClient
-> [(Filter, SubOptions)]
-> [Property]
-> IO ([Either SubErr QoS], [Property])
subscribe MQTTClient
c [(Filter, SubOptions)]
ls [Property]
props = do
MQTTClient -> IO ()
runCallbackThread MQTTClient
c
MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait MQTTClient
c DispatchType
DSubACK (\Word16
pid -> SubscribeRequest -> MQTTPkt
SubscribePkt forall a b. (a -> b) -> a -> b
$ Word16
-> [(ByteString, SubOptions)] -> [Property] -> SubscribeRequest
SubscribeRequest Word16
pid [(ByteString, SubOptions)]
ls' [Property]
props) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
SubACKPkt (SubscribeResponse Word16
_ [Either SubErr QoS]
rs [Property]
aprops) -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ([Either SubErr QoS]
rs, [Property]
aprops)
MQTTPkt
pkt -> forall a. String -> a
mqttFail forall a b. (a -> b) -> a -> b
$ String
"unexpected response to subscribe: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show MQTTPkt
pkt
where ls' :: [(ByteString, SubOptions)]
ls' = forall a b. (a -> b) -> [a] -> [b]
map (forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first Filter -> ByteString
filterToBL) [(Filter, SubOptions)]
ls
unsubscribe :: MQTTClient -> [Filter] -> [Property] -> IO ([UnsubStatus], [Property])
unsubscribe :: MQTTClient
-> [Filter] -> [Property] -> IO ([UnsubStatus], [Property])
unsubscribe MQTTClient
c [Filter]
ls [Property]
props = do
(UnsubACKPkt (UnsubscribeResponse Word16
_ [Property]
rsn [UnsubStatus]
rprop)) <- MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait MQTTClient
c DispatchType
DUnsubACK (\Word16
pid -> UnsubscribeRequest -> MQTTPkt
UnsubscribePkt forall a b. (a -> b) -> a -> b
$ Word16 -> [ByteString] -> [Property] -> UnsubscribeRequest
UnsubscribeRequest Word16
pid (forall a b. (a -> b) -> [a] -> [b]
map Filter -> ByteString
filterToBL [Filter]
ls) [Property]
props)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([UnsubStatus]
rprop, [Property]
rsn)
publish :: MQTTClient
-> Topic
-> BL.ByteString
-> Bool
-> IO ()
publish :: MQTTClient -> Topic -> ByteString -> Bool -> IO ()
publish MQTTClient
c Topic
t ByteString
m Bool
r = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
publishq MQTTClient
c Topic
t ByteString
m Bool
r QoS
QoS0 forall a. Monoid a => a
mempty
publishq :: MQTTClient
-> Topic
-> BL.ByteString
-> Bool
-> QoS
-> [Property]
-> IO ()
publishq :: MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
publishq MQTTClient
c Topic
t ByteString
m Bool
r QoS
q [Property]
props = do
(TChan MQTTPkt
ch,Word16
pid) <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID MQTTClient
c [DispatchType]
types
forall a b. IO a -> IO b -> IO a
E.finally (TChan MQTTPkt -> Word16 -> IO ()
publishAndWait TChan MQTTPkt
ch Word16
pid) (forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ MQTTClient -> [(DispatchType, Word16)] -> STM ()
releasePktIDs MQTTClient
c [(DispatchType
t',Word16
pid) | DispatchType
t' <- [DispatchType]
types])
where
types :: [DispatchType]
types = [DispatchType
DPubACK, DispatchType
DPubREC, DispatchType
DPubCOMP]
publishAndWait :: TChan MQTTPkt -> Word16 -> IO ()
publishAndWait TChan MQTTPkt
ch Word16
pid = do
MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (Word16 -> MQTTPkt
pkt Word16
pid)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (QoS
q forall a. Ord a => a -> a -> Bool
> QoS
QoS0) forall a b. (a -> b) -> a -> b
$ TChan MQTTPkt -> Word16 -> IO ()
satisfyQoS TChan MQTTPkt
ch Word16
pid
pkt :: Word16 -> MQTTPkt
pkt Word16
pid = PublishRequest -> MQTTPkt
PublishPkt forall a b. (a -> b) -> a -> b
$ PublishRequest {_pubDup :: Bool
_pubDup = Bool
False,
_pubQoS :: QoS
_pubQoS = QoS
q,
_pubPktID :: Word16
_pubPktID = Word16
pid,
_pubRetain :: Bool
_pubRetain = Bool
r,
_pubTopic :: ByteString
_pubTopic = Topic -> ByteString
topicToBL Topic
t,
_pubBody :: ByteString
_pubBody = ByteString
m,
_pubProps :: [Property]
_pubProps = [Property]
props}
satisfyQoS :: TChan MQTTPkt -> Word16 -> IO ()
satisfyQoS TChan MQTTPkt
ch Word16
pid
| QoS
q forall a. Eq a => a -> a -> Bool
== QoS
QoS0 = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
| QoS
q forall a. Eq a => a -> a -> Bool
== QoS
QoS1 = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ do
(PubACKPkt (PubACK Word16
_ Word8
st [Property]
pprops)) <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ MQTTClient -> STM ()
checkConnected MQTTClient
c forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. TChan a -> STM a
readTChan TChan MQTTPkt
ch
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (forall {a}. (Eq a, Num a) => a -> Bool
isOK Word8
st) forall a b. (a -> b) -> a -> b
$ forall a. String -> a
mqttFail (String
"qos 1 publish error: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Word8
st forall a. Semigroup a => a -> a -> a
<> String
" " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show [Property]
pprops)
| QoS
q forall a. Eq a => a -> a -> Bool
== QoS
QoS2 = IO ()
waitRec
| Bool
otherwise = forall a. HasCallStack => String -> a
error String
"invalid QoS"
where
isOK :: a -> Bool
isOK a
0 = Bool
True
isOK a
16 = Bool
True
isOK a
_ = Bool
False
waitRec :: IO ()
waitRec = forall a. STM a -> IO a
atomically (MQTTClient -> STM ()
checkConnected MQTTClient
c forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. TChan a -> STM a
readTChan TChan MQTTPkt
ch) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
PubRECPkt (PubREC Word16
_ Word8
st [Property]
recprops) -> do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (forall {a}. (Eq a, Num a) => a -> Bool
isOK Word8
st) forall a b. (a -> b) -> a -> b
$ forall a. String -> a
mqttFail (String
"qos 2 REC publish error: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Word8
st forall a. Semigroup a => a -> a -> a
<> String
" " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show [Property]
recprops)
MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (PubREL -> MQTTPkt
PubRELPkt forall a b. (a -> b) -> a -> b
$ Word16 -> Word8 -> [Property] -> PubREL
PubREL Word16
pid Word8
0 forall a. Monoid a => a
mempty)
PubCOMPPkt (PubCOMP Word16
_ Word8
st' [Property]
compprops) ->
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word8
st' forall a. Eq a => a -> a -> Bool
/= Word8
0) forall a b. (a -> b) -> a -> b
$ forall a. String -> a
mqttFail (String
"qos 2 COMP publish error: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Word8
st' forall a. Semigroup a => a -> a -> a
<> String
" " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show [Property]
compprops)
MQTTPkt
wtf -> forall a. String -> a
mqttFail (String
"unexpected packet received in QoS2 publish: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show MQTTPkt
wtf)
disconnect :: MQTTClient -> DiscoReason -> [Property] -> IO ()
disconnect :: MQTTClient -> DiscoReason -> [Property] -> IO ()
disconnect c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: Map ByteString MessageCallback
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: Map Word16 PublishRequest
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> Map ByteString MessageCallback
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> Map Word16 PublishRequest
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} DiscoReason
reason [Property]
props = forall a b. IO a -> IO b -> IO ()
race_ IO ()
getDisconnected IO ()
orDieTrying
where
getDisconnected :: IO ()
getDisconnected = do
MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (DisconnectRequest -> MQTTPkt
DisconnectPkt forall a b. (a -> b) -> a -> b
$ DiscoReason -> [Property] -> DisconnectRequest
DisconnectRequest DiscoReason
reason [Property]
props)
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse forall a. Async a -> IO a
wait forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st ConnState
Stopped
orDieTrying :: IO ()
orDieTrying = Int -> IO ()
threadDelay Int
10000000 forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall e. Exception e => MQTTClient -> e -> IO ()
killConn MQTTClient
c MQTTException
Timeout
normalDisconnect :: MQTTClient -> IO ()
normalDisconnect :: MQTTClient -> IO ()
normalDisconnect MQTTClient
c = MQTTClient -> DiscoReason -> [Property] -> IO ()
disconnect MQTTClient
c DiscoReason
DiscoNormalDisconnection forall a. Monoid a => a
mempty
mkLWT :: Topic -> BL.ByteString -> Bool -> T.LastWill
mkLWT :: Topic -> ByteString -> Bool -> LastWill
mkLWT Topic
t ByteString
m Bool
r = T.LastWill{
_willRetain :: Bool
T._willRetain=Bool
r,
_willQoS :: QoS
T._willQoS=QoS
QoS0,
_willTopic :: ByteString
T._willTopic = Topic -> ByteString
topicToBL Topic
t,
_willMsg :: ByteString
T._willMsg=ByteString
m,
_willProps :: [Property]
T._willProps=forall a. Monoid a => a
mempty
}
svrProps :: MQTTClient -> IO [Property]
svrProps :: MQTTClient -> IO [Property]
svrProps = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ConnACKFlags -> [Property]
p forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> STM ConnACKFlags
connACKSTM
where p :: ConnACKFlags -> [Property]
p (ConnACKFlags SessionReuse
_ ConnACKRC
_ [Property]
props) = [Property]
props
connACKSTM :: MQTTClient -> STM ConnACKFlags
connACKSTM :: MQTTClient -> STM ConnACKFlags
connACKSTM MQTTClient{TVar ConnACKFlags
_connACKFlags :: TVar ConnACKFlags
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_connACKFlags} = forall a. TVar a -> STM a
readTVar TVar ConnACKFlags
_connACKFlags
connACK :: MQTTClient -> IO ConnACKFlags
connACK :: MQTTClient -> IO ConnACKFlags
connACK = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> STM ConnACKFlags
connACKSTM
maxAliases :: MQTTClient -> IO Word16
maxAliases :: MQTTClient -> IO Word16
maxAliases = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr Property -> Word16 -> Word16
f Word16
0) forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> IO [Property]
svrProps
where
f :: Property -> Word16 -> Word16
f (PropTopicAliasMaximum Word16
n) Word16
_ = Word16
n
f Property
_ Word16
o = Word16
o
pubAliased :: MQTTClient
-> Topic
-> BL.ByteString
-> Bool
-> QoS
-> [Property]
-> IO ()
pubAliased :: MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
pubAliased c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_cbHandle :: TVar (Maybe (Async ()))
_cbM :: MVar (IO ())
_corr :: Map ByteString MessageCallback
_connACKFlags :: TVar ConnACKFlags
_inA :: TVar (Map Word16 Topic)
_outA :: TVar (Map Topic Word16)
_ct :: TVar (Maybe (Async ()))
_st :: TVar ConnState
_inflight :: Map Word16 PublishRequest
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MessageCallback
_pktID :: TVar Word16
_ch :: TChan MQTTPkt
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbM :: MQTTClient -> MVar (IO ())
_corr :: MQTTClient -> Map ByteString MessageCallback
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_outA :: MQTTClient -> TVar (Map Topic Word16)
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_st :: MQTTClient -> TVar ConnState
_inflight :: MQTTClient -> Map Word16 PublishRequest
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_cb :: MQTTClient -> MessageCallback
_pktID :: MQTTClient -> TVar Word16
_ch :: MQTTClient -> TChan MQTTPkt
..} Topic
t ByteString
m Bool
r QoS
q [Property]
props = do
Word16
x <- MQTTClient -> IO Word16
maxAliases MQTTClient
c
(Topic
t', Word16
n) <- Word16 -> IO (Topic, Word16)
alias Word16
x
let np :: [Property]
np = [Property]
props forall a. Semigroup a => a -> a -> a
<> case Word16
n of
Word16
0 -> forall a. Monoid a => a
mempty
Word16
_ -> [Word16 -> Property
PropTopicAlias Word16
n]
MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
publishq MQTTClient
c Topic
t' ByteString
m Bool
r QoS
q [Property]
np
where
alias :: Word16 -> IO (Topic, Word16)
alias Word16
mv = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
Map Topic Word16
as <- forall a. TVar a -> STM a
readTVar TVar (Map Topic Word16)
_outA
let n :: Word16
n = forall a. Enum a => Int -> a
toEnum (forall (t :: * -> *) a. Foldable t => t a -> Int
length Map Topic Word16
as forall a. Num a => a -> a -> a
+ Int
1)
cur :: Maybe Word16
cur = forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Topic
t Map Topic Word16
as
v :: Word16
v = forall a. a -> Maybe a -> a
fromMaybe (if Word16
n forall a. Ord a => a -> a -> Bool
> Word16
mv then Word16
0 else Word16
n) Maybe Word16
cur
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word16
v forall a. Ord a => a -> a -> Bool
> Word16
0) forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM ()
writeTVar TVar (Map Topic Word16)
_outA (forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Topic
t Word16
v Map Topic Word16
as)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall b a. b -> (a -> b) -> Maybe a -> b
maybe Topic
t (forall a b. a -> b -> a
const Topic
"") Maybe Word16
cur, Word16
v)
registerCorrelated :: MQTTClient -> BL.ByteString -> MessageCallback -> STM ()
registerCorrelated :: MQTTClient -> ByteString -> MessageCallback -> STM ()
registerCorrelated MQTTClient{Map ByteString MessageCallback
_corr :: Map ByteString MessageCallback
_corr :: MQTTClient -> Map ByteString MessageCallback
_corr} ByteString
bs MessageCallback
cb = forall k v. Ord k => k -> v -> Map k v -> STM ()
Decaying.insert ByteString
bs MessageCallback
cb Map ByteString MessageCallback
_corr
unregisterCorrelated :: MQTTClient -> BL.ByteString -> STM ()
unregisterCorrelated :: MQTTClient -> ByteString -> STM ()
unregisterCorrelated MQTTClient{Map ByteString MessageCallback
_corr :: Map ByteString MessageCallback
_corr :: MQTTClient -> Map ByteString MessageCallback
_corr} ByteString
bs = forall k v. Ord k => k -> Map k v -> STM ()
Decaying.delete ByteString
bs Map ByteString MessageCallback
_corr