{-# LANGUAGE RecursiveDo #-}
{-# LANGUAGE TemplateHaskell #-}

-- | The shard logic
module Calamity.Gateway.Shard (
  Shard (..),
  newShard,
) where

import Calamity.Gateway.DispatchEvents (
  CalamityEvent (Dispatch),
  DispatchData (Ready),
 )
import Calamity.Gateway.Intents (Intents)
import Calamity.Gateway.Types (
  ControlMessage (..),
  IdentifyData (
    IdentifyData,
    compress,
    intents,
    largeThreshold,
    presence,
    properties,
    shard,
    token
  ),
  IdentifyProps (IdentifyProps, browser, device),
  ReceivedDiscordMessage (
    EvtDispatch,
    HeartBeatAck,
    HeartBeatReq,
    Hello,
    InvalidSession,
    Reconnect
  ),
  ResumeData (ResumeData, seq, sessionID, token),
  SentDiscordMessage (HeartBeat, Identify, Resume, StatusUpdate),
  Shard (..),
  ShardC,
  ShardFlowControl (..),
  ShardMsg (..),
  ShardState (ShardState, wsConn),
  StatusUpdateData,
 )
import Calamity.Internal.RunIntoIO (bindSemToIO)
import Calamity.Internal.Utils (
  debug,
  error,
  info,
  leftToMaybe,
  swap,
  unlessM,
  untilJustFinalIO,
  whenJust,
  whileMFinalIO,
 )
import Calamity.Metrics.Eff (
  MetricEff,
  modifyGauge,
  registerGauge,
 )
import Calamity.Types.LogEff (LogEff)
import Calamity.Types.Token (Token, rawToken)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (Async, cancel)
import qualified Control.Concurrent.Chan.Unagi as UC
import Control.Concurrent.STM (STM, atomically, retry)
import Control.Concurrent.STM.TBMQueue (
  TBMQueue,
  closeTBMQueue,
  newTBMQueueIO,
  readTBMQueue,
  tryWriteTBMQueue,
  writeTBMQueue,
 )
import Control.Exception (
  Exception (fromException),
  SomeException,
 )
import qualified Control.Exception.Safe as Ex
import Optics
import Optics.State.Operators
import Control.Monad (void, when)
import Control.Monad.State.Lazy (runState)
import qualified Data.Aeson as A
import qualified Data.ByteString.Lazy as LBS
import Data.Default.Class (def)
import Data.IORef (newIORef)
import Data.Maybe (fromMaybe)
import qualified Data.Text as T
import DiPolysemy (attr, push)
import qualified Network.Connection as NC
import qualified Network.TLS as NT
import qualified Network.TLS.Extra as NT
import Network.WebSockets (
  Connection,
  ConnectionException (..),
  receiveData,
  sendCloseCode,
  sendTextData,
 )
import qualified Network.WebSockets as NW
import qualified Network.WebSockets.Stream as NW
import Polysemy (Sem)
import qualified Polysemy as P
import qualified Polysemy.Async as P
import qualified Polysemy.AtomicState as P
import qualified Polysemy.Error as P
import qualified Polysemy.Resource as P
import PyF (fmt)
import qualified System.X509 as X509
import TextShow (showt)
import Prelude hiding (error)

runWebsocket ::
  P.Members '[LogEff, P.Final IO, P.Embed IO] r =>
  T.Text ->
  T.Text ->
  (Connection -> P.Sem r a) ->
  P.Sem r (Maybe a)
runWebsocket :: forall (r :: EffectRow) a.
Members '[LogEff, Final IO, Embed IO] r =>
Text -> Text -> (Connection -> Sem r a) -> Sem r (Maybe a)
runWebsocket Text
host Text
path Connection -> Sem r a
ma = do
  Connection -> IO (Maybe a)
inner <- forall (r :: EffectRow) p a.
Member (Final IO) r =>
(p -> Sem r a) -> Sem r (p -> IO (Maybe a))
bindSemToIO Connection -> Sem r a
ma

  -- We have to do this all ourself I think?
  -- TODO: see if this isn't needed
  let logExc :: p -> Sem r ()
logExc p
e = forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug forall a b. (a -> b) -> a -> b
$ Text
"runWebsocket raised with " forall a. Semigroup a => a -> a -> a
<> (String -> Text
T.pack forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Show a => a -> String
show forall a b. (a -> b) -> a -> b
$ p
e)
  SomeException -> IO (Maybe ())
logExc' <- forall (r :: EffectRow) p a.
Member (Final IO) r =>
(p -> Sem r a) -> Sem r (p -> IO (Maybe a))
bindSemToIO forall {r :: EffectRow} {p}.
(Member LogEff r, Show p) =>
p -> Sem r ()
logExc
  let handler :: SomeException -> IO (Maybe a)
handler SomeException
e = do
        forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ SomeException -> IO (Maybe ())
logExc' SomeException
e
        forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing

  forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
MonadCatch m =>
(SomeException -> m a) -> m a -> m a
Ex.handleAny SomeException -> IO (Maybe a)
handler forall a b. (a -> b) -> a -> b
$ do
    ConnectionContext
ctx <- IO ConnectionContext
NC.initConnectionContext
    CertificateStore
certStore <- IO CertificateStore
X509.getSystemCertificateStore
    let clientParams :: ClientParams
clientParams =
          (String -> ByteString -> ClientParams
NT.defaultParamsClient (Text -> String
T.unpack Text
host) ByteString
"443")
            { clientSupported :: Supported
NT.clientSupported = forall a. Default a => a
def {supportedCiphers :: [Cipher]
NT.supportedCiphers = [Cipher]
NT.ciphersuite_default}
            , clientShared :: Shared
NT.clientShared =
                forall a. Default a => a
def
                  { sharedCAStore :: CertificateStore
NT.sharedCAStore = CertificateStore
certStore
                  }
            }
    let tlsSettings :: TLSSettings
tlsSettings = ClientParams -> TLSSettings
NC.TLSSettings ClientParams
clientParams
        connParams :: ConnectionParams
connParams = String
-> PortNumber
-> Maybe TLSSettings
-> Maybe ProxySettings
-> ConnectionParams
NC.ConnectionParams (Text -> String
T.unpack Text
host) PortNumber
443 (forall a. a -> Maybe a
Just TLSSettings
tlsSettings) forall a. Maybe a
Nothing

    forall (m :: * -> *) a b c.
MonadMask m =>
m a -> (a -> m b) -> (a -> m c) -> m c
Ex.bracket
      (ConnectionContext -> ConnectionParams -> IO Connection
NC.connectTo ConnectionContext
ctx ConnectionParams
connParams)
      Connection -> IO ()
NC.connectionClose
      ( \Connection
conn -> do
          Stream
stream <-
            IO (Maybe ByteString) -> (Maybe ByteString -> IO ()) -> IO Stream
NW.makeStream
              (forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO ByteString
NC.connectionGetChunk Connection
conn)
              (forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (Connection -> ByteString -> IO ()
NC.connectionPut Connection
conn forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
LBS.toStrict))
          forall a.
Stream
-> String
-> String
-> ConnectionOptions
-> Headers
-> ClientApp a
-> IO a
NW.runClientWithStream Stream
stream (Text -> String
T.unpack Text
host) (Text -> String
T.unpack Text
path) ConnectionOptions
NW.defaultConnectionOptions [] Connection -> IO (Maybe a)
inner
      )

newShardState :: Shard -> ShardState
newShardState :: Shard -> ShardState
newShardState Shard
shard = Shard
-> Maybe Int
-> Maybe (Async (Maybe ()))
-> Bool
-> Maybe Text
-> Maybe Text
-> Maybe Connection
-> ShardState
ShardState Shard
shard forall a. Maybe a
Nothing forall a. Maybe a
Nothing Bool
False forall a. Maybe a
Nothing forall a. Maybe a
Nothing forall a. Maybe a
Nothing

-- | Creates and launches a shard
newShard ::
  P.Members '[LogEff, MetricEff, P.Embed IO, P.Final IO, P.Async] r =>
  T.Text ->
  Int ->
  Int ->
  Token ->
  Maybe StatusUpdateData ->
  Intents ->
  UC.InChan CalamityEvent ->
  Sem r (UC.InChan ControlMessage, Async (Maybe ()))
newShard :: forall (r :: EffectRow).
Members '[LogEff, MetricEff, Embed IO, Final IO, Async] r =>
Text
-> Int
-> Int
-> Token
-> Maybe StatusUpdateData
-> Intents
-> InChan CalamityEvent
-> Sem r (InChan ControlMessage, Async (Maybe ()))
newShard Text
gateway Int
id Int
count Token
token Maybe StatusUpdateData
presence Intents
intents InChan CalamityEvent
evtIn = do
  (InChan ControlMessage
cmdIn, OutChan ControlMessage
cmdOut) <- forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall a. IO (InChan a, OutChan a)
UC.newChan
  let shard :: Shard
shard = Int
-> Int
-> Text
-> InChan CalamityEvent
-> OutChan ControlMessage
-> Text
-> Maybe StatusUpdateData
-> Intents
-> Shard
Shard Int
id Int
count Text
gateway InChan CalamityEvent
evtIn OutChan ControlMessage
cmdOut (Token -> Text
rawToken Token
token) Maybe StatusUpdateData
presence Intents
intents
  IORef ShardState
stateVar <- forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> IO (IORef a)
newIORef forall a b. (a -> b) -> a -> b
$ Shard -> ShardState
newShardState Shard
shard

  let runShard :: Sem r ()
runShard = forall s (r :: EffectRow) a.
Member (Embed IO) r =>
IORef s -> Sem (AtomicState s : r) a -> Sem r a
P.runAtomicStateIORef IORef ShardState
stateVar forall (r :: EffectRow). ShardC r => Sem r ()
shardLoop
  let action :: Sem r ()
action = forall level msg (r :: EffectRow) a.
Member (Di level Path msg) r =>
Segment -> Sem r a -> Sem r a
push Segment
"calamity-shard" forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall value level msg (r :: EffectRow) a.
(ToValue value, Member (Di level Path msg) r) =>
Key -> value -> Sem r a -> Sem r a
attr Key
"shard-id" Int
id forall a b. (a -> b) -> a -> b
$ Sem r ()
runShard

  Async (Maybe ())
thread' <- forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async Sem r ()
action

  forall (f :: * -> *) a. Applicative f => a -> f a
pure (InChan ControlMessage
cmdIn, Async (Maybe ())
thread')

sendToWs :: ShardC r => SentDiscordMessage -> Sem r ()
sendToWs :: forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs SentDiscordMessage
data' = do
  Maybe Connection
wsConn' <- forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets ShardState -> Maybe Connection
wsConn
  case Maybe Connection
wsConn' of
    Just Connection
wsConn -> do
      let encodedData :: ByteString
encodedData = forall a. ToJSON a => a -> ByteString
A.encode SentDiscordMessage
data'
      forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ String
"sending " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show SentDiscordMessage
data' forall a. Semigroup a => a -> a -> a
<> String
" encoded to " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show ByteString
encodedData forall a. Semigroup a => a -> a -> a
<> String
" to gateway"
      forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
wsConn forall a b. (a -> b) -> a -> b
$ ByteString
encodedData
    Maybe Connection
Nothing -> forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"tried to send to closed WS"

tryWriteTBMQueue' :: TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' :: forall a. TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' TBMQueue a
q a
v = do
  Maybe Bool
v' <- forall a. TBMQueue a -> a -> STM (Maybe Bool)
tryWriteTBMQueue TBMQueue a
q a
v
  case Maybe Bool
v' of
    Just Bool
False -> forall a. STM a
retry
    Just Bool
True -> forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
    Maybe Bool
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False

restartUnless :: P.Members '[LogEff, P.Error ShardFlowControl] r => T.Text -> Maybe a -> P.Sem r a
restartUnless :: forall (r :: EffectRow) a.
Members '[LogEff, Error ShardFlowControl] r =>
Text -> Maybe a -> Sem r a
restartUnless Text
_ (Just a
a) = forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a
restartUnless Text
msg Maybe a
Nothing = do
  forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
error Text
msg
  forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart

-- | The loop a shard will run on
shardLoop :: ShardC r => Sem r ()
shardLoop :: forall (r :: EffectRow). ShardC r => Sem r ()
shardLoop = do
  Gauge
activeShards <- forall (r :: EffectRow).
Member MetricEff r =>
Text -> [(Text, Text)] -> Sem r Gauge
registerGauge Text
"active_shards" forall a. Monoid a => a
mempty
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (r :: EffectRow).
Member MetricEff r =>
(Double -> Double) -> Gauge -> Sem r Double
modifyGauge (forall a. Num a => a -> a -> a
+ Double
1) Gauge
activeShards
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall (r :: EffectRow). ShardC r => Sem r ()
outerloop
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (r :: EffectRow).
Member MetricEff r =>
(Double -> Double) -> Gauge -> Sem r Double
modifyGauge (forall a. Num a => a -> a -> a
subtract Double
1) Gauge
activeShards
  forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Shard shut down"
  where
    controlStream :: Shard -> TBMQueue ShardMsg -> IO ()
    controlStream :: Shard -> TBMQueue ShardMsg -> IO ()
controlStream Shard
shard TBMQueue ShardMsg
outqueue = IO ()
inner
      where
        q :: OutChan ControlMessage
q = Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "cmdOut" a => a
#cmdOut
        inner :: IO ()
inner = do
          ControlMessage
v <- forall a. OutChan a -> IO a
UC.readChan OutChan ControlMessage
q
          Bool
r <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' TBMQueue ShardMsg
outqueue (ControlMessage -> ShardMsg
Control ControlMessage
v)
          forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
r IO ()
inner

    handleWSException :: SomeException -> IO (Either (ControlMessage, Maybe T.Text) a)
    handleWSException :: forall a.
SomeException -> IO (Either (ControlMessage, Maybe Text) a)
handleWSException SomeException
e = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ case forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
      Just (CloseRequest Word16
code ByteString
_)
        | Word16
code forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Word16
4004, Word16
4010, Word16
4011, Word16
4012, Word16
4013, Word16
4014] ->
          forall a b. a -> Either a b
Left (ControlMessage
ShutDownShard, forall a. a -> Maybe a
Just forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TextShow a => a -> Text
showt forall a b. (a -> b) -> a -> b
$ Word16
code)
      Maybe ConnectionException
e -> forall a b. a -> Either a b
Left (ControlMessage
RestartShard, forall a. a -> Maybe a
Just forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Show a => a -> String
show forall a b. (a -> b) -> a -> b
$ Maybe ConnectionException
e)

    discordStream :: P.Members '[LogEff, MetricEff, P.Embed IO, P.Final IO] r => Connection -> TBMQueue ShardMsg -> Sem r ()
    discordStream :: forall (r :: EffectRow).
Members '[LogEff, MetricEff, Embed IO, Final IO] r =>
Connection -> TBMQueue ShardMsg -> Sem r ()
discordStream Connection
ws TBMQueue ShardMsg
outqueue = Sem r ()
inner
      where
        inner :: Sem r ()
inner = do
          Either (ControlMessage, Maybe Text) ByteString
msg <- forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadCatch m =>
m a -> (SomeException -> m a) -> m a
Ex.catchAny (forall a b. b -> Either a b
Right forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. WebSocketsData a => Connection -> IO a
receiveData Connection
ws) forall a.
SomeException -> IO (Either (ControlMessage, Maybe Text) a)
handleWSException

          case Either (ControlMessage, Maybe Text) ByteString
msg of
            Left (ControlMessage
c, Maybe Text
reason) -> do
              forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
whenJust Maybe Text
reason (\Text
r -> forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
error forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ String
"Shard closed with reason: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Text
r)
              forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue ShardMsg
outqueue (ControlMessage -> ShardMsg
Control ControlMessage
c)
            Right ByteString
msg' -> do
              -- debug [fmt|Got msg: {msg'}|]
              let decoded :: Either String ReceivedDiscordMessage
decoded = forall a. FromJSON a => ByteString -> Either String a
A.eitherDecode ByteString
msg'
              Bool
r <- case Either String ReceivedDiscordMessage
decoded of
                Right ReceivedDiscordMessage
a ->
                  forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' TBMQueue ShardMsg
outqueue (ReceivedDiscordMessage -> ShardMsg
Discord ReceivedDiscordMessage
a)
                Left String
e -> do
                  forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
error forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ String
"Failed to decode " forall a. Semigroup a => a -> a -> a
<> String
e forall a. Semigroup a => a -> a -> a
<> String
": "forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show ByteString
msg'
                  forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
              forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
r Sem r ()
inner
    outerloop :: ShardC r => Sem r ()
    outerloop :: forall (r :: EffectRow). ShardC r => Sem r ()
outerloop = forall (r :: EffectRow).
Member (Final IO) r =>
Sem r Bool -> Sem r ()
whileMFinalIO forall a b. (a -> b) -> a -> b
$ do
      Shard
shard :: Shard <- forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "shardS" a => a
#shardS)
      let host :: Text
host = Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "gateway" a => a
#gateway
      let host' :: Text
host' = forall a. a -> Maybe a -> a
fromMaybe Text
host forall a b. (a -> b) -> a -> b
$ Text -> Text -> Maybe Text
T.stripPrefix Text
"wss://" Text
host
      forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ String
"starting up shard " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (Shard -> Int
shardID Shard
shard) forall a. Semigroup a => a -> a -> a
<> String
" of " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (Shard -> Int
shardCount Shard
shard)

      Maybe ShardFlowControl
innerLoopVal <- forall (r :: EffectRow) a.
Members '[LogEff, Final IO, Embed IO] r =>
Text -> Text -> (Connection -> Sem r a) -> Sem r (Maybe a)
runWebsocket Text
host' Text
"/?v=9&encoding=json" forall (r :: EffectRow).
ShardC r =>
Connection -> Sem r ShardFlowControl
innerloop

      case Maybe ShardFlowControl
innerLoopVal of
        Just ShardFlowControl
ShardFlowShutDown -> do
          forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Shutting down shard"
          forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
        Just ShardFlowControl
ShardFlowRestart -> do
          forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Restaring shard"
          forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
        -- we restart normally when we loop

        Maybe ShardFlowControl
Nothing -> do
          -- won't happen unless innerloop starts using a non-deterministic effect or connecting to the ws dies
          forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Restarting shard (abnormal reasons?)"
          forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True

    innerloop :: ShardC r => Connection -> Sem r ShardFlowControl
    innerloop :: forall (r :: EffectRow).
ShardC r =>
Connection -> Sem r ShardFlowControl
innerloop Connection
ws = do
      forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Entering inner loop of shard"

      Shard
shard <- forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "shardS" a => a
#shardS)
      forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (forall a. IsLabel "wsConn" a => a
#wsConn forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a (Maybe b) -> b -> s -> t
?~ Connection
ws)

      Maybe Int
seqNum' <- forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "seqNum" a => a
#seqNum)
      Maybe Text
sessionID' <- forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "sessionID" a => a
#sessionID)

      case (Maybe Int
seqNum', Maybe Text
sessionID') of
        (Just Int
n, Just Text
s) -> do
          forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug forall a b. (a -> b) -> a -> b
$ Text
"Resuming shard (sessionID: " forall a. Semigroup a => a -> a -> a
<> Text
s forall a. Semigroup a => a -> a -> a
<> Text
", seq: " forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (forall a. Show a => a -> String
show Int
n)
          forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs
            ( ResumeData -> SentDiscordMessage
Resume
                ResumeData
                  { $sel:token:ResumeData :: Text
token = Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "token" a => a
#token
                  , $sel:sessionID:ResumeData :: Text
sessionID = Text
s
                  , $sel:seq:ResumeData :: Int
seq = Int
n
                  }
            )
        (Maybe Int, Maybe Text)
_noActiveSession -> do
          forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Identifying shard"
          forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs
            ( IdentifyData -> SentDiscordMessage
Identify
                IdentifyData
                  { $sel:token:IdentifyData :: Text
token = Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "token" a => a
#token
                  , $sel:properties:IdentifyData :: IdentifyProps
properties =
                      IdentifyProps
                        { $sel:browser:IdentifyProps :: Text
browser = Text
"Calamity: https://github.com/simmsb/calamity"
                        , $sel:device:IdentifyProps :: Text
device = Text
"Calamity: https://github.com/simmsb/calamity"
                        }
                  , $sel:compress:IdentifyData :: Bool
compress = Bool
False
                  , $sel:largeThreshold:IdentifyData :: Maybe Int
largeThreshold = forall a. Maybe a
Nothing
                  , $sel:shard:IdentifyData :: Maybe (Int, Int)
shard =
                      forall a. a -> Maybe a
Just ( Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "shardID" a => a
#shardID , Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "shardCount" a => a
#shardCount )
                  , $sel:presence:IdentifyData :: Maybe StatusUpdateData
presence = Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "initialStatus" a => a
#initialStatus
                  , $sel:intents:IdentifyData :: Intents
intents = Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "intents" a => a
#intents
                  }
            )

      ShardFlowControl
result <-
        forall (r :: EffectRow) a.
Member (Final IO) r =>
Sem (Resource : r) a -> Sem r a
P.resourceToIOFinal forall a b. (a -> b) -> a -> b
$
          forall (r :: EffectRow) a c b.
Member Resource r =>
Sem r a -> (a -> Sem r c) -> (a -> Sem r b) -> Sem r b
P.bracket
            (forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall a b. (a -> b) -> a -> b
$ forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
1)
            (forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TBMQueue a -> STM ()
closeTBMQueue)
            ( \TBMQueue ShardMsg
q -> do
                forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"handling events now"
                Async (Maybe ())
_controlThread <- forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall a b. (a -> b) -> a -> b
$ Shard -> TBMQueue ShardMsg -> IO ()
controlStream Shard
shard TBMQueue ShardMsg
q
                Async (Maybe ())
_discordThread <- forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async forall a b. (a -> b) -> a -> b
$ forall (r :: EffectRow).
Members '[LogEff, MetricEff, Embed IO, Final IO] r =>
Connection -> TBMQueue ShardMsg -> Sem r ()
discordStream Connection
ws TBMQueue ShardMsg
q
                forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Sem r a -> Sem (e : r) a
P.raise forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (r :: EffectRow) a.
Member (Final IO) r =>
Sem r (Maybe a) -> Sem r a
untilJustFinalIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall e a. Either e a -> Maybe e
leftToMaybe forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e (r :: EffectRow) a.
Sem (Error e : r) a -> Sem r (Either e a)
P.runError forall a b. (a -> b) -> a -> b
$ do
                  -- only we close the queue
                  Maybe ShardMsg
msg <- forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue ShardMsg
q
                  forall (r :: EffectRow).
(ShardC r, Member (Error ShardFlowControl) r) =>
ShardMsg -> Sem r ()
handleMsg forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (r :: EffectRow) a.
Members '[LogEff, Error ShardFlowControl] r =>
Text -> Maybe a -> Sem r a
restartUnless Text
"shard message stream closed by someone other than the sink" Maybe ShardMsg
msg
            )

      forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Exiting inner loop of shard"

      forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (forall a. IsLabel "wsConn" a => a
#wsConn forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ forall a. Maybe a
Nothing)
      forall (r :: EffectRow). ShardC r => Sem r ()
haltHeartBeat
      forall (f :: * -> *) a. Applicative f => a -> f a
pure ShardFlowControl
result
    handleMsg :: (ShardC r, P.Member (P.Error ShardFlowControl) r) => ShardMsg -> Sem r ()
    handleMsg :: forall (r :: EffectRow).
(ShardC r, Member (Error ShardFlowControl) r) =>
ShardMsg -> Sem r ()
handleMsg (Discord ReceivedDiscordMessage
msg) = case ReceivedDiscordMessage
msg of
      EvtDispatch Int
sn DispatchData
data' -> do
        -- trace $ "Handling event: ("+||data'||+")"
        forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (forall a. IsLabel "seqNum" a => a
#seqNum forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a (Maybe b) -> b -> s -> t
?~ Int
sn)

        case DispatchData
data' of
          Ready ReadyData
rdata' ->
            forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (forall a. IsLabel "sessionID" a => a
#sessionID forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a (Maybe b) -> b -> s -> t
?~ (ReadyData
rdata' forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "sessionID" a => a
#sessionID))
          DispatchData
_NotReady -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

        Shard
shard <- forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "shardS" a => a
#shardS)
        forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall a b. (a -> b) -> a -> b
$ forall a. InChan a -> a -> IO ()
UC.writeChan (Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "evtIn" a => a
#evtIn) (Int -> DispatchData -> CalamityEvent
Dispatch (Shard
shard forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "shardID" a => a
#shardID) DispatchData
data')
      ReceivedDiscordMessage
HeartBeatReq -> do
        forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Received heartbeat request"
        forall (r :: EffectRow). ShardC r => Sem r ()
sendHeartBeat
      ReceivedDiscordMessage
Reconnect -> do
        forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Being asked to restart by Discord"
        forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
      InvalidSession Bool
resumable -> do
        if Bool
resumable
          then forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Received resumable invalid session"
          else do
            forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Received non-resumable invalid session, sleeping for 15 seconds then retrying"
            forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (forall a. IsLabel "sessionID" a => a
#sessionID forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ forall a. Maybe a
Nothing)
            forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (forall a. IsLabel "seqNum" a => a
#seqNum forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ forall a. Maybe a
Nothing)
            forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int
15 forall a. Num a => a -> a -> a
* Int
1000 forall a. Num a => a -> a -> a
* Int
1000)
        forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
      Hello Int
interval -> do
        forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ String
"Received hello, beginning to heartbeat at an interval of " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
interval forall a. Semigroup a => a -> a -> a
<> String
"ms"
        forall (r :: EffectRow). ShardC r => Int -> Sem r ()
startHeartBeatLoop Int
interval
      ReceivedDiscordMessage
HeartBeatAck -> do
        forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Received heartbeat ack"
        forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (forall a. IsLabel "hbResponse" a => a
#hbResponse forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ Bool
True)
    handleMsg (Control ControlMessage
msg) = case ControlMessage
msg of
      SendPresence StatusUpdateData
data' -> do
        forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ String
"Sending presence: (" forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show StatusUpdateData
data' forall a. Semigroup a => a -> a -> a
<> String
")"
        forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs forall a b. (a -> b) -> a -> b
$ StatusUpdateData -> SentDiscordMessage
StatusUpdate StatusUpdateData
data'
      ControlMessage
RestartShard -> forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
      ControlMessage
ShutDownShard -> forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowShutDown

startHeartBeatLoop :: ShardC r => Int -> Sem r ()
startHeartBeatLoop :: forall (r :: EffectRow). ShardC r => Int -> Sem r ()
startHeartBeatLoop Int
interval = do
  forall (r :: EffectRow). ShardC r => Sem r ()
haltHeartBeat -- cancel any currently running hb thread
  Async (Maybe ())
thread <- forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async forall a b. (a -> b) -> a -> b
$ forall (r :: EffectRow). ShardC r => Int -> Sem r ()
heartBeatLoop Int
interval
  forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (forall a. IsLabel "hbThread" a => a
#hbThread forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a (Maybe b) -> b -> s -> t
?~ Async (Maybe ())
thread)

haltHeartBeat :: ShardC r => Sem r ()
haltHeartBeat :: forall (r :: EffectRow). ShardC r => Sem r ()
haltHeartBeat = do
  Maybe (Async (Maybe ()))
thread <- forall s a (r :: EffectRow).
Member (AtomicState s) r =>
(s -> (s, a)) -> Sem r a
P.atomicState @ShardState forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a b. (a, b) -> (b, a)
swap forall b c a. (b -> c) -> (a -> b) -> a -> c
.) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall s a. State s a -> s -> (a, s)
runState forall a b. (a -> b) -> a -> b
$ do
    Maybe (Async (Maybe ()))
thread <- forall k s (m :: * -> *) (is :: IxList) a.
(Is k A_Getter, MonadState s m) =>
Optic' k is s a -> m a
use forall a. IsLabel "hbThread" a => a
#hbThread
    #hbThread .= Nothing
    forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Async (Maybe ()))
thread
  case Maybe (Async (Maybe ()))
thread of
    Just Async (Maybe ())
t -> do
      forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Stopping heartbeat thread"
      forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. Async a -> IO ()
cancel Async (Maybe ())
t)
    Maybe (Async (Maybe ()))
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

sendHeartBeat :: ShardC r => Sem r ()
sendHeartBeat :: forall (r :: EffectRow). ShardC r => Sem r ()
sendHeartBeat = do
  Maybe Int
sn <- forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "seqNum" a => a
#seqNum)
  forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ String
"Sending heartbeat (seq: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Maybe Int
sn forall a. Semigroup a => a -> a -> a
<> String
")"
  forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs forall a b. (a -> b) -> a -> b
$ Maybe Int -> SentDiscordMessage
HeartBeat Maybe Int
sn
  forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (forall a. IsLabel "hbResponse" a => a
#hbResponse forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ Bool
False)

heartBeatLoop :: ShardC r => Int -> Sem r ()
heartBeatLoop :: forall (r :: EffectRow). ShardC r => Int -> Sem r ()
heartBeatLoop Int
interval = forall (r :: EffectRow) a.
Member (Final IO) r =>
Sem r (Maybe a) -> Sem r a
untilJustFinalIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall e a. Either e a -> Maybe e
leftToMaybe forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e (r :: EffectRow) a.
Sem (Error e : r) a -> Sem r (Either e a)
P.runError forall a b. (a -> b) -> a -> b
$ do
  forall (r :: EffectRow). ShardC r => Sem r ()
sendHeartBeat
  forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ Int
interval forall a. Num a => a -> a -> a
* Int
1000
  forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
unlessM (forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "hbResponse" a => a
#hbResponse)) forall a b. (a -> b) -> a -> b
$ do
    forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"No heartbeat response, restarting shard"
    Connection
wsConn <- forall e (r :: EffectRow) a.
Member (Error e) r =>
e -> Maybe a -> Sem r a
P.note () forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. forall a. IsLabel "wsConn" a => a
#wsConn)
    forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed forall a b. (a -> b) -> a -> b
$ forall a. WebSocketsData a => Connection -> Word16 -> a -> IO ()
sendCloseCode Connection
wsConn Word16
4000 (Text
"No heartbeat in time" :: T.Text)
    forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ()