{-# LANGUAGE RecursiveDo #-}
{-# LANGUAGE TemplateHaskell #-}
module Calamity.Gateway.Shard (
Shard (..),
newShard,
) where
import Calamity.Gateway.DispatchEvents (
CalamityEvent (Dispatch),
DispatchData (Ready),
)
import Calamity.Gateway.Intents (Intents)
import Calamity.Gateway.Types (
ControlMessage (..),
IdentifyData (
IdentifyData,
compress,
intents,
largeThreshold,
presence,
properties,
shard,
token
),
IdentifyProps (IdentifyProps, browser, device),
ReceivedDiscordMessage (
EvtDispatch,
HeartBeatAck,
HeartBeatReq,
Hello,
InvalidSession,
Reconnect
),
ResumeData (ResumeData, seq, sessionID, token),
SentDiscordMessage (HeartBeat, Identify, Resume, StatusUpdate),
Shard (..),
ShardC,
ShardFlowControl (..),
ShardMsg (..),
ShardState (ShardState, wsConn),
StatusUpdateData,
)
import Calamity.Internal.RunIntoIO (bindSemToIO)
import Calamity.Internal.Utils (
debug,
error,
info,
leftToMaybe,
swap,
unlessM,
untilJustFinalIO,
whenJust,
whileMFinalIO,
)
import Calamity.Metrics.Eff (
MetricEff,
modifyGauge,
registerGauge,
)
import Calamity.Types.LogEff (LogEff)
import Calamity.Types.Token (Token, rawToken)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (Async, cancel)
import qualified Control.Concurrent.Chan.Unagi as UC
import Control.Concurrent.STM (STM, atomically, retry)
import Control.Concurrent.STM.TBMQueue (
TBMQueue,
closeTBMQueue,
newTBMQueueIO,
readTBMQueue,
tryWriteTBMQueue,
writeTBMQueue,
)
import Control.Exception (
Exception (fromException),
SomeException,
)
import qualified Control.Exception.Safe as Ex
import Optics
import Optics.State.Operators
import Control.Monad (void, when)
import Control.Monad.State.Lazy (runState)
import qualified Data.Aeson as A
import qualified Data.ByteString.Lazy as LBS
import Data.Default.Class (def)
import Data.IORef (newIORef)
import Data.Maybe (fromMaybe)
import qualified Data.Text as T
import DiPolysemy (attr, push)
import qualified Network.Connection as NC
import qualified Network.TLS as NT
import qualified Network.TLS.Extra as NT
import Network.WebSockets (
Connection,
ConnectionException (..),
receiveData,
sendCloseCode,
sendTextData,
)
import qualified Network.WebSockets as NW
import qualified Network.WebSockets.Stream as NW
import Polysemy (Sem)
import qualified Polysemy as P
import qualified Polysemy.Async as P
import qualified Polysemy.AtomicState as P
import qualified Polysemy.Error as P
import qualified Polysemy.Resource as P
import PyF (fmt)
import qualified System.X509 as X509
import TextShow (showt)
import Prelude hiding (error)
runWebsocket ::
P.Members '[LogEff, P.Final IO, P.Embed IO] r =>
T.Text ->
T.Text ->
(Connection -> P.Sem r a) ->
P.Sem r (Maybe a)
runWebsocket :: forall (r :: EffectRow) a.
Members '[LogEff, Final IO, Embed IO] r =>
Text -> Text -> (Connection -> Sem r a) -> Sem r (Maybe a)
runWebsocket Text
host Text
path Connection -> Sem r a
ma = do
Connection -> IO (Maybe a)
inner <- forall (r :: EffectRow) p a.
Member (Final IO) r =>
(p -> Sem r a) -> Sem r (p -> IO (Maybe a))
bindSemToIO Connection -> Sem r a
ma
let logExc :: p -> Sem r ()
logExc p
e = forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug forall a b. (a -> b) -> a -> b
$ Text
"runWebsocket raised with " forall a. Semigroup a => a -> a -> a
<> (String -> Text
T.pack forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Show a => a -> String
show forall a b. (a -> b) -> a -> b
$ p
e)
SomeException -> IO (Maybe ())
logExc' <- forall (r :: EffectRow) p a.
Member (Final IO) r =>
(p -> Sem r a) -> Sem r (p -> IO (Maybe a))
bindSemToIO forall {r :: EffectRow} {p}.
(Member LogEff r, Show p) =>
p -> Sem r ()
logExc
let handler :: SomeException -> IO (Maybe a)
handler SomeException
e = do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ SomeException -> IO (Maybe ())
logExc' SomeException
e
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
MonadCatch m =>
(SomeException -> m a) -> m a -> m a
Ex.handleAny SomeException -> IO (Maybe a)
handler forall a b. (a -> b) -> a -> b
$ do
ConnectionContext
ctx <- IO ConnectionContext
NC.initConnectionContext
CertificateStore
certStore <- IO CertificateStore
X509.getSystemCertificateStore
let clientParams :: ClientParams
clientParams =
(String -> ByteString -> ClientParams
NT.defaultParamsClient (Text -> String
T.unpack Text
host) ByteString
"443")
{ clientSupported :: Supported
NT.clientSupported = forall a. Default a => a
def {supportedCiphers :: [Cipher]
NT.supportedCiphers = [Cipher]
NT.ciphersuite_default}
, clientShared :: Shared
NT.clientShared =
forall a. Default a => a
def
{ sharedCAStore :: CertificateStore
NT.sharedCAStore = CertificateStore
certStore
}
}
let tlsSettings :: TLSSettings
tlsSettings = ClientParams -> TLSSettings
NC.TLSSettings ClientParams
clientParams
connParams :: ConnectionParams
connParams = String
-> PortNumber
-> Maybe TLSSettings
-> Maybe ProxySettings
-> ConnectionParams
NC.ConnectionParams (Text -> String
T.unpack Text
host) PortNumber
443 (forall a. a -> Maybe a
Just TLSSettings
tlsSettings) forall a. Maybe a
Nothing
forall (m :: * -> *) a b c.
MonadMask m =>
m a -> (a -> m b) -> (a -> m c) -> m c
Ex.bracket
(ConnectionContext -> ConnectionParams -> IO Connection
NC.connectTo ConnectionContext
ctx ConnectionParams
connParams)
Connection -> IO ()
NC.connectionClose
( \Connection
conn -> do
Stream
stream <-
IO (Maybe ByteString) -> (Maybe ByteString -> IO ()) -> IO Stream
NW.makeStream
(forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO ByteString
NC.connectionGetChunk Connection
conn)
(forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (Connection -> ByteString -> IO ()
NC.connectionPut Connection
conn forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
LBS.toStrict))
forall a.
Stream
-> String
-> String
-> ConnectionOptions
-> Headers
-> ClientApp a
-> IO a
NW.runClientWithStream Stream
stream (Text -> String
T.unpack Text
host) (Text -> String
T.unpack Text
path) ConnectionOptions
NW.defaultConnectionOptions [] Connection -> IO (Maybe a)
inner
)
newShardState :: Shard -> ShardState
newShardState :: Shard -> ShardState
newShardState Shard
shard = Shard
-> Maybe Int
-> Maybe (Async (Maybe ()))
-> Bool
-> Maybe Text
-> Maybe Text
-> Maybe Connection
-> ShardState
ShardState Shard
shard forall a. Maybe a
Nothing forall a. Maybe a
Nothing Bool
False forall a. Maybe a
Nothing forall a. Maybe a
Nothing forall a. Maybe a
Nothing
newShard ::
P.Members '[LogEff, MetricEff, P.Embed IO, P.Final IO, P.Async] r =>
T.Text ->
Int ->
Int ->
Token ->
Maybe StatusUpdateData ->
Intents ->
UC.InChan CalamityEvent ->
Sem r (UC.InChan ControlMessage, Async (Maybe ()))
newShard :: forall (r :: EffectRow).
Members '[LogEff, MetricEff, Embed IO, Final IO, Async] r =>
Text
-> Int
-> Int
-> Token
-> Maybe StatusUpdateData
-> Intents
-> InChan CalamityEvent
-> Sem r (InChan ControlMessage, Async (Maybe ()))
newShard Text
gateway Int
id Int
count Token
token Maybe StatusUpdateData
presence Intents
intents InChan CalamityEvent
evtIn = do
(InChan ControlMessage
cmdIn, OutChan ControlMessage
cmdOut) <- forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall a. IO (InChan a, OutChan a)
UC.newChan
let shard :: Shard
shard = Int
-> Int
-> Text
-> InChan CalamityEvent
-> OutChan ControlMessage
-> Text
-> Maybe StatusUpdateData
-> Intents
-> Shard
Shard Int
id Int
count Text
gateway InChan CalamityEvent
evtIn OutChan ControlMessage
cmdOut (Token -> Text
rawToken Token
token) Maybe StatusUpdateData
presence Intents
intents
IORef ShardState
stateVar <- forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> IO (IORef a)
newIORef forall a b. (a -> b) -> a -> b
$ Shard -> ShardState
newShardState Shard
shard
let runShard :: Sem r ()
runShard = forall s (r :: EffectRow) a.
Member (Embed IO) r =>
IORef s -> Sem (AtomicState s : r) a -> Sem r a
P.runAtomicStateIORef IORef ShardState
stateVar forall (r :: EffectRow). ShardC r => Sem r ()
shardLoop
let action :: Sem r ()
action = forall level msg (r :: EffectRow) a.
Member (Di level Path msg) r =>
Segment -> Sem r a -> Sem r a
push Segment
"calamity-shard" forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall value level msg (r :: EffectRow) a.
(ToValue value, Member (Di level Path msg) r) =>
Key -> value -> Sem r a -> Sem r a
attr Key
"shard-id" Int
id forall a b. (a -> b) -> a -> b
$ Sem r ()
runShard
Async (Maybe ())
thread' <- forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async Sem r ()
action
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InChan ControlMessage
cmdIn, Async (Maybe ())
thread')
sendToWs :: ShardC r => SentDiscordMessage -> Sem r ()
sendToWs :: forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs SentDiscordMessage
data' = do
Maybe Connection
wsConn' <- forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets ShardState -> Maybe Connection
wsConn
case Maybe Connection
wsConn' of
Just Connection
wsConn -> do
let encodedData :: ByteString
encodedData = forall a. ToJSON a => a -> ByteString
A.encode SentDiscordMessage
data'
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ String
"sending " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show SentDiscordMessage
data' forall a. Semigroup a => a -> a -> a
<> String
" encoded to " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show ByteString
encodedData forall a. Semigroup a => a -> a -> a
<> String
" to gateway"
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
wsConn forall a b. (a -> b) -> a -> b
$ ByteString
encodedData
Maybe Connection
Nothing -> forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"tried to send to closed WS"
tryWriteTBMQueue' :: TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' :: forall a. TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' TBMQueue a
q a
v = do
Maybe Bool
v' <- forall a. TBMQueue a -> a -> STM (Maybe Bool)
tryWriteTBMQueue TBMQueue a
q a
v
case Maybe Bool
v' of
Just Bool
False -> forall a. STM a
retry
Just Bool
True -> forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
Maybe Bool
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
restartUnless :: P.Members '[LogEff, P.Error ShardFlowControl] r => T.Text -> Maybe a -> P.Sem r a
restartUnless :: forall (r :: EffectRow) a.
Members '[LogEff, Error ShardFlowControl] r =>
Text -> Maybe a -> Sem r a
restartUnless Text
_ (Just a
a) = forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a
restartUnless Text
msg Maybe a
Nothing = do
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
error Text
msg
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
shardLoop :: ShardC r => Sem r ()
shardLoop :: forall (r :: EffectRow). ShardC r => Sem r ()
shardLoop = do
Gauge
activeShards <- forall (r :: EffectRow).
Member MetricEff r =>
Text -> [(Text, Text)] -> Sem r Gauge
registerGauge Text
"active_shards" forall a. Monoid a => a
mempty
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (r :: EffectRow).
Member MetricEff r =>
(Double -> Double) -> Gauge -> Sem r Double
modifyGauge (forall a. Num a => a -> a -> a
+ Double
1) Gauge
activeShards
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall (r :: EffectRow). ShardC r => Sem r ()
outerloop
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (r :: EffectRow).
Member MetricEff r =>
(Double -> Double) -> Gauge -> Sem r Double
modifyGauge (forall a. Num a => a -> a -> a
subtract Double
1) Gauge
activeShards
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Shard shut down"
where
controlStream :: Shard -> TBMQueue ShardMsg -> IO ()
controlStream :: Shard -> TBMQueue ShardMsg -> IO ()
controlStream Shard
shard TBMQueue ShardMsg
outqueue = IO ()
inner
where
q :: OutChan ControlMessage
q = Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "cmdOut" a => a
#cmdOut
inner :: IO ()
inner = do
ControlMessage
v <- forall a. OutChan a -> IO a
UC.readChan OutChan ControlMessage
q
Bool
r <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' TBMQueue ShardMsg
outqueue (ControlMessage -> ShardMsg
Control ControlMessage
v)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
r IO ()
inner
handleWSException :: SomeException -> IO (Either (ControlMessage, Maybe T.Text) a)
handleWSException :: forall a.
SomeException -> IO (Either (ControlMessage, Maybe Text) a)
handleWSException SomeException
e = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ case forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
Just (CloseRequest Word16
code ByteString
_)
| Word16
code forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Word16
4004, Word16
4010, Word16
4011, Word16
4012, Word16
4013, Word16
4014] ->
forall a b. a -> Either a b
Left (ControlMessage
ShutDownShard, forall a. a -> Maybe a
Just forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TextShow a => a -> Text
showt forall a b. (a -> b) -> a -> b
$ Word16
code)
Maybe ConnectionException
e -> forall a b. a -> Either a b
Left (ControlMessage
RestartShard, forall a. a -> Maybe a
Just forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Show a => a -> String
show forall a b. (a -> b) -> a -> b
$ Maybe ConnectionException
e)
discordStream :: P.Members '[LogEff, MetricEff, P.Embed IO, P.Final IO] r => Connection -> TBMQueue ShardMsg -> Sem r ()
discordStream :: forall (r :: EffectRow).
Members '[LogEff, MetricEff, Embed IO, Final IO] r =>
Connection -> TBMQueue ShardMsg -> Sem r ()
discordStream Connection
ws TBMQueue ShardMsg
outqueue = Sem r ()
inner
where
inner :: Sem r ()
inner = do
Either (ControlMessage, Maybe Text) ByteString
msg <- forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadCatch m =>
m a -> (SomeException -> m a) -> m a
Ex.catchAny (forall a b. b -> Either a b
Right forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. WebSocketsData a => Connection -> IO a
receiveData Connection
ws) forall a.
SomeException -> IO (Either (ControlMessage, Maybe Text) a)
handleWSException
case Either (ControlMessage, Maybe Text) ByteString
msg of
Left (ControlMessage
c, Maybe Text
reason) -> do
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
whenJust Maybe Text
reason (\Text
r -> forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
error forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ String
"Shard closed with reason: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Text
r)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue ShardMsg
outqueue (ControlMessage -> ShardMsg
Control ControlMessage
c)
Right ByteString
msg' -> do
let decoded :: Either String ReceivedDiscordMessage
decoded = forall a. FromJSON a => ByteString -> Either String a
A.eitherDecode ByteString
msg'
Bool
r <- case Either String ReceivedDiscordMessage
decoded of
Right ReceivedDiscordMessage
a ->
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' TBMQueue ShardMsg
outqueue (ReceivedDiscordMessage -> ShardMsg
Discord ReceivedDiscordMessage
a)
Left String
e -> do
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
error forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ String
"Failed to decode " forall a. Semigroup a => a -> a -> a
<> String
e forall a. Semigroup a => a -> a -> a
<> String
": "forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show ByteString
msg'
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
r Sem r ()
inner
outerloop :: ShardC r => Sem r ()
outerloop :: forall (r :: EffectRow). ShardC r => Sem r ()
outerloop = forall (r :: EffectRow).
Member (Final IO) r =>
Sem r Bool -> Sem r ()
whileMFinalIO forall a b. (a -> b) -> a -> b
$ do
Shard
shard :: Shard <- forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "shardS" a => a
#shardS)
let host :: Text
host = Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "gateway" a => a
#gateway
let host' :: Text
host' = forall a. a -> Maybe a -> a
fromMaybe Text
host forall a b. (a -> b) -> a -> b
$ Text -> Text -> Maybe Text
T.stripPrefix Text
"wss://" Text
host
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ String
"starting up shard " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (Shard -> Int
shardID Shard
shard) forall a. Semigroup a => a -> a -> a
<> String
" of " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (Shard -> Int
shardCount Shard
shard)
Maybe ShardFlowControl
innerLoopVal <- forall (r :: EffectRow) a.
Members '[LogEff, Final IO, Embed IO] r =>
Text -> Text -> (Connection -> Sem r a) -> Sem r (Maybe a)
runWebsocket Text
host' Text
"/?v=9&encoding=json" forall (r :: EffectRow).
ShardC r =>
Connection -> Sem r ShardFlowControl
innerloop
case Maybe ShardFlowControl
innerLoopVal of
Just ShardFlowControl
ShardFlowShutDown -> do
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Shutting down shard"
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
Just ShardFlowControl
ShardFlowRestart -> do
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Restaring shard"
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
Maybe ShardFlowControl
Nothing -> do
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Restarting shard (abnormal reasons?)"
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
innerloop :: ShardC r => Connection -> Sem r ShardFlowControl
innerloop :: forall (r :: EffectRow).
ShardC r =>
Connection -> Sem r ShardFlowControl
innerloop Connection
ws = do
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Entering inner loop of shard"
Shard
shard <- forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "shardS" a => a
#shardS)
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (forall a. IsLabel "wsConn" a => a
#wsConn forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a (Maybe b) -> b -> s -> t
?~ Connection
ws)
Maybe Int
seqNum' <- forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "seqNum" a => a
#seqNum)
Maybe Text
sessionID' <- forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "sessionID" a => a
#sessionID)
case (Maybe Int
seqNum', Maybe Text
sessionID') of
(Just Int
n, Just Text
s) -> do
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug forall a b. (a -> b) -> a -> b
$ Text
"Resuming shard (sessionID: " forall a. Semigroup a => a -> a -> a
<> Text
s forall a. Semigroup a => a -> a -> a
<> Text
", seq: " forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (forall a. Show a => a -> String
show Int
n)
forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs
( ResumeData -> SentDiscordMessage
Resume
ResumeData
{ $sel:token:ResumeData :: Text
token = Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "token" a => a
#token
, $sel:sessionID:ResumeData :: Text
sessionID = Text
s
, $sel:seq:ResumeData :: Int
seq = Int
n
}
)
(Maybe Int, Maybe Text)
_noActiveSession -> do
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Identifying shard"
forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs
( IdentifyData -> SentDiscordMessage
Identify
IdentifyData
{ $sel:token:IdentifyData :: Text
token = Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "token" a => a
#token
, $sel:properties:IdentifyData :: IdentifyProps
properties =
IdentifyProps
{ $sel:browser:IdentifyProps :: Text
browser = Text
"Calamity: https://github.com/simmsb/calamity"
, $sel:device:IdentifyProps :: Text
device = Text
"Calamity: https://github.com/simmsb/calamity"
}
, $sel:compress:IdentifyData :: Bool
compress = Bool
False
, $sel:largeThreshold:IdentifyData :: Maybe Int
largeThreshold = forall a. Maybe a
Nothing
, $sel:shard:IdentifyData :: Maybe (Int, Int)
shard =
forall a. a -> Maybe a
Just ( Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "shardID" a => a
#shardID , Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "shardCount" a => a
#shardCount )
, $sel:presence:IdentifyData :: Maybe StatusUpdateData
presence = Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "initialStatus" a => a
#initialStatus
, $sel:intents:IdentifyData :: Intents
intents = Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "intents" a => a
#intents
}
)
ShardFlowControl
result <-
forall (r :: EffectRow) a.
Member (Final IO) r =>
Sem (Resource : r) a -> Sem r a
P.resourceToIOFinal forall a b. (a -> b) -> a -> b
$
forall (r :: EffectRow) a c b.
Member Resource r =>
Sem r a -> (a -> Sem r c) -> (a -> Sem r b) -> Sem r b
P.bracket
(forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall a b. (a -> b) -> a -> b
$ forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
1)
(forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TBMQueue a -> STM ()
closeTBMQueue)
( \TBMQueue ShardMsg
q -> do
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"handling events now"
Async (Maybe ())
_controlThread <- forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall a b. (a -> b) -> a -> b
$ Shard -> TBMQueue ShardMsg -> IO ()
controlStream Shard
shard TBMQueue ShardMsg
q
Async (Maybe ())
_discordThread <- forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async forall a b. (a -> b) -> a -> b
$ forall (r :: EffectRow).
Members '[LogEff, MetricEff, Embed IO, Final IO] r =>
Connection -> TBMQueue ShardMsg -> Sem r ()
discordStream Connection
ws TBMQueue ShardMsg
q
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Sem r a -> Sem (e : r) a
P.raise forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (r :: EffectRow) a.
Member (Final IO) r =>
Sem r (Maybe a) -> Sem r a
untilJustFinalIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall e a. Either e a -> Maybe e
leftToMaybe forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e (r :: EffectRow) a.
Sem (Error e : r) a -> Sem r (Either e a)
P.runError forall a b. (a -> b) -> a -> b
$ do
Maybe ShardMsg
msg <- forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue ShardMsg
q
forall (r :: EffectRow).
(ShardC r, Member (Error ShardFlowControl) r) =>
ShardMsg -> Sem r ()
handleMsg forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (r :: EffectRow) a.
Members '[LogEff, Error ShardFlowControl] r =>
Text -> Maybe a -> Sem r a
restartUnless Text
"shard message stream closed by someone other than the sink" Maybe ShardMsg
msg
)
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Exiting inner loop of shard"
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (forall a. IsLabel "wsConn" a => a
#wsConn forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ forall a. Maybe a
Nothing)
forall (r :: EffectRow). ShardC r => Sem r ()
haltHeartBeat
forall (f :: * -> *) a. Applicative f => a -> f a
pure ShardFlowControl
result
handleMsg :: (ShardC r, P.Member (P.Error ShardFlowControl) r) => ShardMsg -> Sem r ()
handleMsg :: forall (r :: EffectRow).
(ShardC r, Member (Error ShardFlowControl) r) =>
ShardMsg -> Sem r ()
handleMsg (Discord ReceivedDiscordMessage
msg) = case ReceivedDiscordMessage
msg of
EvtDispatch Int
sn DispatchData
data' -> do
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (forall a. IsLabel "seqNum" a => a
#seqNum forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a (Maybe b) -> b -> s -> t
?~ Int
sn)
case DispatchData
data' of
Ready ReadyData
rdata' ->
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (forall a. IsLabel "sessionID" a => a
#sessionID forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a (Maybe b) -> b -> s -> t
?~ (ReadyData
rdata' forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "sessionID" a => a
#sessionID))
DispatchData
_NotReady -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Shard
shard <- forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "shardS" a => a
#shardS)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall a b. (a -> b) -> a -> b
$ forall a. InChan a -> a -> IO ()
UC.writeChan (Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "evtIn" a => a
#evtIn) (Int -> DispatchData -> CalamityEvent
Dispatch (Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "shardID" a => a
#shardID) DispatchData
data')
ReceivedDiscordMessage
HeartBeatReq -> do
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Received heartbeat request"
forall (r :: EffectRow). ShardC r => Sem r ()
sendHeartBeat
ReceivedDiscordMessage
Reconnect -> do
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Being asked to restart by Discord"
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
InvalidSession Bool
resumable -> do
if Bool
resumable
then forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Received resumable invalid session"
else do
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Received non-resumable invalid session, sleeping for 15 seconds then retrying"
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (forall a. IsLabel "sessionID" a => a
#sessionID forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ forall a. Maybe a
Nothing)
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (forall a. IsLabel "seqNum" a => a
#seqNum forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ forall a. Maybe a
Nothing)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int
15 forall a. Num a => a -> a -> a
* Int
1000 forall a. Num a => a -> a -> a
* Int
1000)
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
Hello Int
interval -> do
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ String
"Received hello, beginning to heartbeat at an interval of " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
interval forall a. Semigroup a => a -> a -> a
<> String
"ms"
forall (r :: EffectRow). ShardC r => Int -> Sem r ()
startHeartBeatLoop Int
interval
ReceivedDiscordMessage
HeartBeatAck -> do
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Received heartbeat ack"
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (forall a. IsLabel "hbResponse" a => a
#hbResponse forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ Bool
True)
handleMsg (Control ControlMessage
msg) = case ControlMessage
msg of
SendPresence StatusUpdateData
data' -> do
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ String
"Sending presence: (" forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show StatusUpdateData
data' forall a. Semigroup a => a -> a -> a
<> String
")"
forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs forall a b. (a -> b) -> a -> b
$ StatusUpdateData -> SentDiscordMessage
StatusUpdate StatusUpdateData
data'
ControlMessage
RestartShard -> forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
ControlMessage
ShutDownShard -> forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowShutDown
startHeartBeatLoop :: ShardC r => Int -> Sem r ()
startHeartBeatLoop :: forall (r :: EffectRow). ShardC r => Int -> Sem r ()
startHeartBeatLoop Int
interval = do
forall (r :: EffectRow). ShardC r => Sem r ()
haltHeartBeat
Async (Maybe ())
thread <- forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async forall a b. (a -> b) -> a -> b
$ forall (r :: EffectRow). ShardC r => Int -> Sem r ()
heartBeatLoop Int
interval
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (forall a. IsLabel "hbThread" a => a
#hbThread forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a (Maybe b) -> b -> s -> t
?~ Async (Maybe ())
thread)
haltHeartBeat :: ShardC r => Sem r ()
haltHeartBeat :: forall (r :: EffectRow). ShardC r => Sem r ()
haltHeartBeat = do
Maybe (Async (Maybe ()))
thread <- forall s a (r :: EffectRow).
Member (AtomicState s) r =>
(s -> (s, a)) -> Sem r a
P.atomicState @ShardState forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a b. (a, b) -> (b, a)
swap forall b c a. (b -> c) -> (a -> b) -> a -> c
.) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall s a. State s a -> s -> (a, s)
runState forall a b. (a -> b) -> a -> b
$ do
Maybe (Async (Maybe ()))
thread <- forall k s (m :: * -> *) (is :: IxList) a.
(Is k A_Getter, MonadState s m) =>
Optic' k is s a -> m a
use forall a. IsLabel "hbThread" a => a
#hbThread
#hbThread .= Nothing
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Async (Maybe ()))
thread
case Maybe (Async (Maybe ()))
thread of
Just Async (Maybe ())
t -> do
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Stopping heartbeat thread"
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. Async a -> IO ()
cancel Async (Maybe ())
t)
Maybe (Async (Maybe ()))
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
sendHeartBeat :: ShardC r => Sem r ()
sendHeartBeat :: forall (r :: EffectRow). ShardC r => Sem r ()
sendHeartBeat = do
Maybe Int
sn <- forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "seqNum" a => a
#seqNum)
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ String
"Sending heartbeat (seq: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Maybe Int
sn forall a. Semigroup a => a -> a -> a
<> String
")"
forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs forall a b. (a -> b) -> a -> b
$ Maybe Int -> SentDiscordMessage
HeartBeat Maybe Int
sn
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (forall a. IsLabel "hbResponse" a => a
#hbResponse forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ Bool
False)
heartBeatLoop :: ShardC r => Int -> Sem r ()
heartBeatLoop :: forall (r :: EffectRow). ShardC r => Int -> Sem r ()
heartBeatLoop Int
interval = forall (r :: EffectRow) a.
Member (Final IO) r =>
Sem r (Maybe a) -> Sem r a
untilJustFinalIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall e a. Either e a -> Maybe e
leftToMaybe forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e (r :: EffectRow) a.
Sem (Error e : r) a -> Sem r (Either e a)
P.runError forall a b. (a -> b) -> a -> b
$ do
forall (r :: EffectRow). ShardC r => Sem r ()
sendHeartBeat
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ Int
interval forall a. Num a => a -> a -> a
* Int
1000
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
unlessM (forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "hbResponse" a => a
#hbResponse)) forall a b. (a -> b) -> a -> b
$ do
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"No heartbeat response, restarting shard"
Connection
wsConn <- forall e (r :: EffectRow) a.
Member (Error e) r =>
e -> Maybe a -> Sem r a
P.note () forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "wsConn" a => a
#wsConn)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall a b. (a -> b) -> a -> b
$ forall a. WebSocketsData a => Connection -> Word16 -> a -> IO ()
sendCloseCode Connection
wsConn Word16
4000 (Text
"No heartbeat in time" :: T.Text)
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ()