{-# LANGUAGE RecursiveDo #-}
{-# LANGUAGE TemplateHaskell #-}
module Calamity.Gateway.Shard
( Shard(..)
, newShard ) where
import Calamity.Gateway.DispatchEvents
import Calamity.Gateway.Intents
import Calamity.Gateway.Types
import Calamity.Internal.Utils
import Calamity.Metrics.Eff
import Calamity.Types.LogEff
import Calamity.Types.Token
import Control.Concurrent
import Control.Concurrent.Async
import qualified Control.Concurrent.Chan.Unagi as UC
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
import Control.Exception
import qualified Control.Exception.Safe as Ex
import Control.Lens
import Control.Monad
import Control.Monad.State.Lazy
import qualified Data.Aeson as A
import Data.Functor
import Data.IORef
import Data.Maybe
import Data.Text.Lazy ( Text, stripPrefix )
import Data.Text.Lazy.Lens
import Data.Void
import DiPolysemy hiding ( debug, error, info )
import Fmt
import Network.WebSockets ( Connection, ConnectionException(..), receiveData, sendCloseCode
, sendTextData )
import Polysemy ( Sem )
import qualified Polysemy as P
import qualified Polysemy.Async as P
import qualified Polysemy.AtomicState as P
import qualified Polysemy.Error as P
import qualified Polysemy.Resource as P
import Prelude hiding ( error )
import Wuss
data Websocket m a where
RunWebsocket :: Text -> Text -> (Connection -> m a) -> Websocket m a
P.makeSem ''Websocket
websocketToIO :: forall r a. P.Member (P.Embed IO) r => Sem (Websocket ': r) a -> Sem r a
websocketToIO :: Sem (Websocket : r) a -> Sem r a
websocketToIO = (forall x (m :: * -> *). Websocket m x -> Tactical Websocket m r x)
-> Sem (Websocket : r) a -> Sem r a
forall (e :: (* -> *) -> * -> *) (r :: [(* -> *) -> * -> *]) a.
(forall x (m :: * -> *). e m x -> Tactical e m r x)
-> Sem (e : r) a -> Sem r a
P.interpretH
(\case
RunWebsocket host path a -> do
f ()
istate <- Sem (WithTactics Websocket f m r) (f ())
forall (f :: * -> *) (m :: * -> *) (r :: [(* -> *) -> * -> *])
(e :: (* -> *) -> * -> *).
Sem (WithTactics e f m r) (f ())
P.getInitialStateT
f Connection -> Sem (Websocket : r) (f x)
ma <- (Connection -> m x)
-> Sem
(WithTactics Websocket f m r)
(f Connection -> Sem (Websocket : r) (f x))
forall a (m :: * -> *) b (e :: (* -> *) -> * -> *) (f :: * -> *)
(r :: [(* -> *) -> * -> *]).
(a -> m b) -> Sem (WithTactics e f m r) (f a -> Sem (e : r) (f b))
P.bindT Connection -> m x
a
((forall x. Sem (WithTactics Websocket f m r) x -> IO x)
-> IO () -> IO (f x))
-> Sem (WithTactics Websocket f m r) (f x)
forall (r :: [(* -> *) -> * -> *]) a.
Member (Embed IO) r =>
((forall x. Sem r x -> IO x) -> IO () -> IO a) -> Sem r a
P.withLowerToIO (((forall x. Sem (WithTactics Websocket f m r) x -> IO x)
-> IO () -> IO (f x))
-> Sem (WithTactics Websocket f m r) (f x))
-> ((forall x. Sem (WithTactics Websocket f m r) x -> IO x)
-> IO () -> IO (f x))
-> Sem (WithTactics Websocket f m r) (f x)
forall a b. (a -> b) -> a -> b
$ \lower :: forall x. Sem (WithTactics Websocket f m r) x -> IO x
lower finish :: IO ()
finish -> do
let done :: Sem (Websocket ': r) x -> IO x
done :: Sem (Websocket : r) x -> IO x
done = Sem (WithTactics Websocket f m r) x -> IO x
forall x. Sem (WithTactics Websocket f m r) x -> IO x
lower (Sem (WithTactics Websocket f m r) x -> IO x)
-> (Sem (Websocket : r) x -> Sem (WithTactics Websocket f m r) x)
-> Sem (Websocket : r) x
-> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem r x -> Sem (WithTactics Websocket f m r) x
forall (e :: (* -> *) -> * -> *) (r :: [(* -> *) -> * -> *]) a.
Sem r a -> Sem (e : r) a
P.raise (Sem r x -> Sem (WithTactics Websocket f m r) x)
-> (Sem (Websocket : r) x -> Sem r x)
-> Sem (Websocket : r) x
-> Sem (WithTactics Websocket f m r) x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (Websocket : r) x -> Sem r x
forall (r :: [(* -> *) -> * -> *]) a.
Member (Embed IO) r =>
Sem (Websocket : r) a -> Sem r a
websocketToIO
HostName -> PortNumber -> HostName -> ClientApp (f x) -> IO (f x)
forall a. HostName -> PortNumber -> HostName -> ClientApp a -> IO a
runSecureClient (Text
host Text -> Getting HostName Text HostName -> HostName
forall s a. s -> Getting a s a -> a
^. Getting HostName Text HostName
Iso' Text HostName
unpacked) 443 (Text
path Text -> Getting HostName Text HostName -> HostName
forall s a. s -> Getting a s a -> a
^. Getting HostName Text HostName
Iso' Text HostName
unpacked)
(\x :: Connection
x -> do
f x
res <- Sem (Websocket : r) (f x) -> IO (f x)
forall x. Sem (Websocket : r) x -> IO x
done (f Connection -> Sem (Websocket : r) (f x)
ma (f Connection -> Sem (Websocket : r) (f x))
-> f Connection -> Sem (Websocket : r) (f x)
forall a b. (a -> b) -> a -> b
$ f ()
istate f () -> Connection -> f Connection
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Connection
x)
IO ()
finish
f x -> IO (f x)
forall (f :: * -> *) a. Applicative f => a -> f a
pure f x
res))
newShardState :: Shard -> ShardState
newShardState :: Shard -> ShardState
newShardState shard :: Shard
shard = Shard
-> Maybe Int
-> Maybe (Async (Maybe ()))
-> Bool
-> Maybe Text
-> Maybe Text
-> Maybe Connection
-> ShardState
ShardState Shard
shard Maybe Int
forall a. Maybe a
Nothing Maybe (Async (Maybe ()))
forall a. Maybe a
Nothing Bool
False Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Connection
forall a. Maybe a
Nothing
newShard :: P.Members '[LogEff, MetricEff, P.Embed IO, P.Final IO, P.Async] r
=> Text
-> Int
-> Int
-> Token
-> Maybe StatusUpdateData
-> Maybe Intents
-> UC.InChan CalamityEvent
-> Sem r (UC.InChan ControlMessage, Async (Maybe ()))
newShard :: Text
-> Int
-> Int
-> Token
-> Maybe StatusUpdateData
-> Maybe Intents
-> InChan CalamityEvent
-> Sem r (InChan ControlMessage, Async (Maybe ()))
newShard gateway :: Text
gateway id :: Int
id count :: Int
count token :: Token
token presence :: Maybe StatusUpdateData
presence intents :: Maybe Intents
intents evtIn :: InChan CalamityEvent
evtIn = do
(cmdIn :: InChan ControlMessage
cmdIn, stateVar :: IORef ShardState
stateVar) <- IO (InChan ControlMessage, IORef ShardState)
-> Sem r (InChan ControlMessage, IORef ShardState)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (InChan ControlMessage, IORef ShardState)
-> Sem r (InChan ControlMessage, IORef ShardState))
-> IO (InChan ControlMessage, IORef ShardState)
-> Sem r (InChan ControlMessage, IORef ShardState)
forall a b. (a -> b) -> a -> b
$ mdo
(cmdIn :: InChan ControlMessage
cmdIn, cmdOut :: OutChan ControlMessage
cmdOut) <- IO (InChan ControlMessage, OutChan ControlMessage)
forall a. IO (InChan a, OutChan a)
UC.newChan
IORef ShardState
stateVar <- ShardState -> IO (IORef ShardState)
forall a. a -> IO (IORef a)
newIORef (ShardState -> IO (IORef ShardState))
-> ShardState -> IO (IORef ShardState)
forall a b. (a -> b) -> a -> b
$ Shard -> ShardState
newShardState Shard
shard
let shard :: Shard
shard = Int
-> Int
-> Text
-> InChan CalamityEvent
-> OutChan ControlMessage
-> IORef ShardState
-> Text
-> Maybe StatusUpdateData
-> Maybe Intents
-> Shard
Shard Int
id Int
count Text
gateway InChan CalamityEvent
evtIn OutChan ControlMessage
cmdOut IORef ShardState
stateVar (Token -> Text
rawToken Token
token) Maybe StatusUpdateData
presence Maybe Intents
intents
(InChan ControlMessage, IORef ShardState)
-> IO (InChan ControlMessage, IORef ShardState)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InChan ControlMessage
cmdIn, IORef ShardState
stateVar)
let runShard :: Sem r ()
runShard = IORef ShardState -> Sem (AtomicState ShardState : r) () -> Sem r ()
forall s (r :: [(* -> *) -> * -> *]) a.
Member (Embed IO) r =>
IORef s -> Sem (AtomicState s : r) a -> Sem r a
P.runAtomicStateIORef IORef ShardState
stateVar Sem (AtomicState ShardState : r) ()
forall (r :: [(* -> *) -> * -> *]). ShardC r => Sem r ()
shardLoop
let action :: Sem r ()
action = Segment -> Sem r () -> Sem r ()
forall level msg (r :: [(* -> *) -> * -> *]) a.
Member (Di level Path msg) r =>
Segment -> Sem r a -> Sem r a
push "calamity-shard" (Sem r () -> Sem r ())
-> (Sem r () -> Sem r ()) -> Sem r () -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Key -> Int -> Sem r () -> Sem r ()
forall value level msg (r :: [(* -> *) -> * -> *]) a.
(ToValue value, Member (Di level Path msg) r) =>
Key -> value -> Sem r a -> Sem r a
attr "shard-id" Int
id (Sem r () -> Sem r ()) -> Sem r () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ Sem r ()
runShard
Async (Maybe ())
thread' <- Sem r () -> Sem r (Async (Maybe ()))
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async Sem r ()
action
(InChan ControlMessage, Async (Maybe ()))
-> Sem r (InChan ControlMessage, Async (Maybe ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InChan ControlMessage
cmdIn, Async (Maybe ())
thread')
sendToWs :: ShardC r => SentDiscordMessage -> Sem r ()
sendToWs :: SentDiscordMessage -> Sem r ()
sendToWs data' :: SentDiscordMessage
data' = do
Maybe Connection
wsConn' <- (ShardState -> Maybe Connection) -> Sem r (Maybe Connection)
forall s s' (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets ShardState -> Maybe Connection
wsConn
case Maybe Connection
wsConn' of
Just wsConn :: Connection
wsConn -> do
let encodedData :: ByteString
encodedData = SentDiscordMessage -> ByteString
forall a. ToJSON a => a -> ByteString
A.encode SentDiscordMessage
data'
Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug (Text -> Sem r ()) -> Text -> Sem r ()
forall a b. (a -> b) -> a -> b
$ "sending " Builder -> Builder -> Text
forall b. FromBuilder b => Builder -> Builder -> b
+|| SentDiscordMessage
data' SentDiscordMessage -> Builder -> Builder
forall a b. (Show a, FromBuilder b) => a -> Builder -> b
||+ " encoded to " Builder -> Builder -> Builder
forall b. FromBuilder b => Builder -> Builder -> b
+|| ByteString
encodedData ByteString -> Builder -> Builder
forall a b. (Show a, FromBuilder b) => a -> Builder -> b
||+ " to gateway"
IO () -> Sem r ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ())
-> (ByteString -> IO ()) -> ByteString -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
wsConn (ByteString -> Sem r ()) -> ByteString -> Sem r ()
forall a b. (a -> b) -> a -> b
$ ByteString
encodedData
Nothing -> Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "tried to send to closed WS"
fromEitherVoid :: Either a Void -> a
fromEitherVoid :: Either a Void -> a
fromEitherVoid (Left a :: a
a) = a
a
fromEitherVoid (Right a :: Void
a) = Void -> a
forall a. Void -> a
absurd Void
a
tryWriteTBMQueue' :: TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' :: TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' q :: TBMQueue a
q v :: a
v = do
Maybe Bool
v' <- TBMQueue a -> a -> STM (Maybe Bool)
forall a. TBMQueue a -> a -> STM (Maybe Bool)
tryWriteTBMQueue TBMQueue a
q a
v
case Maybe Bool
v' of
Just False -> STM Bool
forall a. STM a
retry
Just True -> Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
Nothing -> Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
shardLoop :: ShardC r => Sem r ()
shardLoop :: Sem r ()
shardLoop = do
Gauge
activeShards <- Text -> [(Text, Text)] -> Sem r Gauge
forall (r :: [(* -> *) -> * -> *]).
MemberWithError MetricEff r =>
Text -> [(Text, Text)] -> Sem r Gauge
registerGauge "active_shards" [(Text, Text)]
forall a. Monoid a => a
mempty
Sem r Double -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Sem r Double -> Sem r ()) -> Sem r Double -> Sem r ()
forall a b. (a -> b) -> a -> b
$ (Double -> Double) -> Gauge -> Sem r Double
forall (r :: [(* -> *) -> * -> *]).
MemberWithError MetricEff r =>
(Double -> Double) -> Gauge -> Sem r Double
modifyGauge Double -> Double
forall a. Enum a => a -> a
succ Gauge
activeShards
Sem r (Either ShardFlowControl ()) -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void Sem r (Either ShardFlowControl ())
forall (r :: [(* -> *) -> * -> *]).
ShardC r =>
Sem r (Either ShardFlowControl ())
outerloop
Sem r Double -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Sem r Double -> Sem r ()) -> Sem r Double -> Sem r ()
forall a b. (a -> b) -> a -> b
$ (Double -> Double) -> Gauge -> Sem r Double
forall (r :: [(* -> *) -> * -> *]).
MemberWithError MetricEff r =>
(Double -> Double) -> Gauge -> Sem r Double
modifyGauge Double -> Double
forall a. Enum a => a -> a
pred Gauge
activeShards
Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "Shard shut down"
where
controlStream :: Shard -> TBMQueue ShardMsg -> IO ()
controlStream :: Shard -> TBMQueue ShardMsg -> IO ()
controlStream shard :: Shard
shard outqueue :: TBMQueue ShardMsg
outqueue = IO ()
inner
where
q :: OutChan ControlMessage
q = Shard
shard Shard
-> Getting (OutChan ControlMessage) Shard (OutChan ControlMessage)
-> OutChan ControlMessage
forall s a. s -> Getting a s a -> a
^. IsLabel
"cmdOut"
(Getting (OutChan ControlMessage) Shard (OutChan ControlMessage))
Getting (OutChan ControlMessage) Shard (OutChan ControlMessage)
#cmdOut
inner :: IO ()
inner = do
ControlMessage
v <- OutChan ControlMessage -> IO ControlMessage
forall a. OutChan a -> IO a
UC.readChan OutChan ControlMessage
q
Bool
r <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> ShardMsg -> STM Bool
forall a. TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' TBMQueue ShardMsg
outqueue (ControlMessage -> ShardMsg
Control ControlMessage
v)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
r IO ()
inner
handleWSException :: SomeException -> IO (Either ControlMessage a)
handleWSException :: SomeException -> IO (Either ControlMessage a)
handleWSException e :: SomeException
e = Either ControlMessage a -> IO (Either ControlMessage a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ControlMessage a -> IO (Either ControlMessage a))
-> Either ControlMessage a -> IO (Either ControlMessage a)
forall a b. (a -> b) -> a -> b
$ case SomeException -> Maybe ConnectionException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
Just (CloseRequest code :: Word16
code _)
| Word16
code Word16 -> [Word16] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [1000, 4004, 4010, 4011] ->
ControlMessage -> Either ControlMessage a
forall a b. a -> Either a b
Left ControlMessage
ShutDownShard
_ -> ControlMessage -> Either ControlMessage a
forall a b. a -> Either a b
Left ControlMessage
RestartShard
discordStream :: P.Members '[LogEff, MetricEff, P.Embed IO, P.Final IO] r => Connection -> TBMQueue ShardMsg -> Sem r ()
discordStream :: Connection -> TBMQueue ShardMsg -> Sem r ()
discordStream ws :: Connection
ws outqueue :: TBMQueue ShardMsg
outqueue = Sem r ()
inner
where inner :: Sem r ()
inner = do
Either ControlMessage ByteString
msg <- IO (Either ControlMessage ByteString)
-> Sem r (Either ControlMessage ByteString)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (Either ControlMessage ByteString)
-> Sem r (Either ControlMessage ByteString))
-> IO (Either ControlMessage ByteString)
-> Sem r (Either ControlMessage ByteString)
forall a b. (a -> b) -> a -> b
$ IO (Either ControlMessage ByteString)
-> (SomeException -> IO (Either ControlMessage ByteString))
-> IO (Either ControlMessage ByteString)
forall (m :: * -> *) a.
MonadCatch m =>
m a -> (SomeException -> m a) -> m a
Ex.catchAny (ByteString -> Either ControlMessage ByteString
forall a b. b -> Either a b
Right (ByteString -> Either ControlMessage ByteString)
-> IO ByteString -> IO (Either ControlMessage ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO ByteString
forall a. WebSocketsData a => Connection -> IO a
receiveData Connection
ws) SomeException -> IO (Either ControlMessage ByteString)
forall a. SomeException -> IO (Either ControlMessage a)
handleWSException
case Either ControlMessage ByteString
msg of
Left c :: ControlMessage
c ->
IO () -> Sem r ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ()) -> (STM () -> IO ()) -> STM () -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> Sem r ()) -> STM () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> ShardMsg -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue ShardMsg
outqueue (ControlMessage -> ShardMsg
Control ControlMessage
c)
Right msg' :: ByteString
msg' -> do
let decoded :: Either HostName ReceivedDiscordMessage
decoded = ByteString -> Either HostName ReceivedDiscordMessage
forall a. FromJSON a => ByteString -> Either HostName a
A.eitherDecode ByteString
msg'
Bool
r <- case Either HostName ReceivedDiscordMessage
decoded of
Right a :: ReceivedDiscordMessage
a ->
IO Bool -> Sem r Bool
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO Bool -> Sem r Bool)
-> (STM Bool -> IO Bool) -> STM Bool -> Sem r Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> Sem r Bool) -> STM Bool -> Sem r Bool
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> ShardMsg -> STM Bool
forall a. TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' TBMQueue ShardMsg
outqueue (ReceivedDiscordMessage -> ShardMsg
Discord ReceivedDiscordMessage
a)
Left e :: HostName
e -> do
Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
error (Text -> Sem r ()) -> Text -> Sem r ()
forall a b. (a -> b) -> a -> b
$ "Failed to decode: "Builder -> Builder -> Text
forall b. FromBuilder b => Builder -> Builder -> b
+|HostName
eHostName -> Builder -> Builder
forall a b. (Buildable a, FromBuilder b) => a -> Builder -> b
|+""
Bool -> Sem r Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
Bool -> Sem r () -> Sem r ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
r Sem r ()
inner
outerloop :: ShardC r => Sem r (Either ShardFlowControl ())
outerloop :: Sem r (Either ShardFlowControl ())
outerloop = Sem (Error ShardFlowControl : r) ()
-> Sem r (Either ShardFlowControl ())
forall e (r :: [(* -> *) -> * -> *]) a.
Sem (Error e : r) a -> Sem r (Either e a)
P.runError (Sem (Error ShardFlowControl : r) ()
-> Sem r (Either ShardFlowControl ()))
-> (Sem (Error ShardFlowControl : r) ()
-> Sem (Error ShardFlowControl : r) ())
-> Sem (Error ShardFlowControl : r) ()
-> Sem r (Either ShardFlowControl ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (Error ShardFlowControl : r) ()
-> Sem (Error ShardFlowControl : r) ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Sem (Error ShardFlowControl : r) ()
-> Sem r (Either ShardFlowControl ()))
-> Sem (Error ShardFlowControl : r) ()
-> Sem r (Either ShardFlowControl ())
forall a b. (a -> b) -> a -> b
$ do
Shard
shard :: Shard <- (ShardState -> Shard) -> Sem (Error ShardFlowControl : r) Shard
forall s s' (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState -> Getting Shard ShardState Shard -> Shard
forall s a. s -> Getting a s a -> a
^. IsLabel "shardS" (Getting Shard ShardState Shard)
Getting Shard ShardState Shard
#shardS)
let host :: Text
host = Shard
shard Shard -> Getting Text Shard Text -> Text
forall s a. s -> Getting a s a -> a
^. IsLabel "gateway" (Getting Text Shard Text)
Getting Text Shard Text
#gateway
let host' :: Text
host' = Text -> Maybe Text -> Text
forall a. a -> Maybe a -> a
fromMaybe Text
host (Maybe Text -> Text) -> Maybe Text -> Text
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Maybe Text
stripPrefix "wss://" Text
host
Text -> Sem (Error ShardFlowControl : r) ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
info (Text -> Sem (Error ShardFlowControl : r) ())
-> Text -> Sem (Error ShardFlowControl : r) ()
forall a b. (a -> b) -> a -> b
$ "starting up shard "Builder -> Builder -> Text
forall b. FromBuilder b => Builder -> Builder -> b
+| (Shard
shard Shard -> Getting Int Shard Int -> Int
forall s a. s -> Getting a s a -> a
^. IsLabel "shardID" (Getting Int Shard Int)
Getting Int Shard Int
#shardID) Int -> Builder -> Builder
forall a b. (Buildable a, FromBuilder b) => a -> Builder -> b
|+" of "Builder -> Builder -> Builder
forall b. FromBuilder b => Builder -> Builder -> b
+| (Shard
shard Shard -> Getting Int Shard Int -> Int
forall s a. s -> Getting a s a -> a
^. IsLabel "shardCount" (Getting Int Shard Int)
Getting Int Shard Int
#shardCount) Int -> Builder -> Builder
forall a b. (Buildable a, FromBuilder b) => a -> Builder -> b
|+""
ShardFlowControl
innerLoopVal <- Sem (Websocket : Error ShardFlowControl : r) ShardFlowControl
-> Sem (Error ShardFlowControl : r) ShardFlowControl
forall (r :: [(* -> *) -> * -> *]) a.
Member (Embed IO) r =>
Sem (Websocket : r) a -> Sem r a
websocketToIO (Sem (Websocket : Error ShardFlowControl : r) ShardFlowControl
-> Sem (Error ShardFlowControl : r) ShardFlowControl)
-> Sem (Websocket : Error ShardFlowControl : r) ShardFlowControl
-> Sem (Error ShardFlowControl : r) ShardFlowControl
forall a b. (a -> b) -> a -> b
$ Text
-> Text
-> (Connection
-> Sem (Websocket : Error ShardFlowControl : r) ShardFlowControl)
-> Sem (Websocket : Error ShardFlowControl : r) ShardFlowControl
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Websocket r =>
Text -> Text -> (Connection -> Sem r a) -> Sem r a
runWebsocket Text
host' "/?v=7&encoding=json" Connection
-> Sem (Websocket : Error ShardFlowControl : r) ShardFlowControl
forall (r :: [(* -> *) -> * -> *]).
ShardC r =>
Connection -> Sem r ShardFlowControl
innerloop
case ShardFlowControl
innerLoopVal of
ShardFlowShutDown -> do
Text -> Sem (Error ShardFlowControl : r) ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
info "Shutting down shard"
ShardFlowControl -> Sem (Error ShardFlowControl : r) ()
forall e (r :: [(* -> *) -> * -> *]) a.
MemberWithError (Error e) r =>
e -> Sem r a
P.throw ShardFlowControl
ShardFlowShutDown
ShardFlowRestart ->
Text -> Sem (Error ShardFlowControl : r) ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
info "Restaring shard"
innerloop :: ShardC r => Connection -> Sem r ShardFlowControl
innerloop :: Connection -> Sem r ShardFlowControl
innerloop ws :: Connection
ws = do
Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "Entering inner loop of shard"
Shard
shard <- (ShardState -> Shard) -> Sem r Shard
forall s s' (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState -> Getting Shard ShardState Shard -> Shard
forall s a. s -> Getting a s a -> a
^. IsLabel "shardS" (Getting Shard ShardState Shard)
Getting Shard ShardState Shard
#shardS)
(ShardState -> ShardState) -> Sem r ()
forall s (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify (IsLabel
"wsConn"
(ASetter
ShardState ShardState (Maybe Connection) (Maybe Connection))
ASetter ShardState ShardState (Maybe Connection) (Maybe Connection)
#wsConn ASetter ShardState ShardState (Maybe Connection) (Maybe Connection)
-> Connection -> ShardState -> ShardState
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ Connection
ws)
Maybe Int
seqNum' <- (ShardState -> Maybe Int) -> Sem r (Maybe Int)
forall s s' (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Getting (Maybe Int) ShardState (Maybe Int) -> Maybe Int
forall s a. s -> Getting a s a -> a
^. IsLabel "seqNum" (Getting (Maybe Int) ShardState (Maybe Int))
Getting (Maybe Int) ShardState (Maybe Int)
#seqNum)
Maybe Text
sessionID' <- (ShardState -> Maybe Text) -> Sem r (Maybe Text)
forall s s' (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Getting (Maybe Text) ShardState (Maybe Text) -> Maybe Text
forall s a. s -> Getting a s a -> a
^. IsLabel "sessionID" (Getting (Maybe Text) ShardState (Maybe Text))
Getting (Maybe Text) ShardState (Maybe Text)
#sessionID)
case (Maybe Int
seqNum', Maybe Text
sessionID') of
(Just n :: Int
n, Just s :: Text
s) -> do
Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug (Text -> Sem r ()) -> Text -> Sem r ()
forall a b. (a -> b) -> a -> b
$ "Resuming shard (sessionID: "Builder -> Builder -> Text
forall b. FromBuilder b => Builder -> Builder -> b
+|Text
sText -> Builder -> Builder
forall a b. (Buildable a, FromBuilder b) => a -> Builder -> b
|+", seq: "Builder -> Builder -> Builder
forall b. FromBuilder b => Builder -> Builder -> b
+|Int
nInt -> Builder -> Builder
forall a b. (Buildable a, FromBuilder b) => a -> Builder -> b
|+")"
SentDiscordMessage -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
ShardC r =>
SentDiscordMessage -> Sem r ()
sendToWs (ResumeData -> SentDiscordMessage
Resume ResumeData :: Text -> Text -> Int -> ResumeData
ResumeData
{ $sel:token:ResumeData :: Text
token = Shard
shard Shard -> Getting Text Shard Text -> Text
forall s a. s -> Getting a s a -> a
^. IsLabel "token" (Getting Text Shard Text)
Getting Text Shard Text
#token
, $sel:sessionID:ResumeData :: Text
sessionID = Text
s
, $sel:seq:ResumeData :: Int
seq = Int
n
})
_ -> do
Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "Identifying shard"
SentDiscordMessage -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
ShardC r =>
SentDiscordMessage -> Sem r ()
sendToWs (IdentifyData -> SentDiscordMessage
Identify IdentifyData :: Text
-> IdentifyProps
-> Bool
-> Int
-> (Int, Int)
-> Maybe StatusUpdateData
-> Maybe Intents
-> IdentifyData
IdentifyData
{ $sel:token:IdentifyData :: Text
token = Shard
shard Shard -> Getting Text Shard Text -> Text
forall s a. s -> Getting a s a -> a
^. IsLabel "token" (Getting Text Shard Text)
Getting Text Shard Text
#token
, $sel:properties:IdentifyData :: IdentifyProps
properties = IdentifyProps :: Text -> Text -> IdentifyProps
IdentifyProps
{ $sel:browser:IdentifyProps :: Text
browser = "Calamity: https://github.com/nitros12/calamity"
, $sel:device:IdentifyProps :: Text
device = "Calamity: https://github.com/nitros12/calamity"
}
, $sel:compress:IdentifyData :: Bool
compress = Bool
False
, $sel:largeThreshold:IdentifyData :: Int
largeThreshold = 250
, $sel:shard:IdentifyData :: (Int, Int)
shard = (Shard
shard Shard -> Getting Int Shard Int -> Int
forall s a. s -> Getting a s a -> a
^. IsLabel "shardID" (Getting Int Shard Int)
Getting Int Shard Int
#shardID,
Shard
shard Shard -> Getting Int Shard Int -> Int
forall s a. s -> Getting a s a -> a
^. IsLabel "shardCount" (Getting Int Shard Int)
Getting Int Shard Int
#shardCount)
, $sel:presence:IdentifyData :: Maybe StatusUpdateData
presence = Shard
shard Shard
-> Getting (Maybe StatusUpdateData) Shard (Maybe StatusUpdateData)
-> Maybe StatusUpdateData
forall s a. s -> Getting a s a -> a
^. IsLabel
"initialStatus"
(Getting (Maybe StatusUpdateData) Shard (Maybe StatusUpdateData))
Getting (Maybe StatusUpdateData) Shard (Maybe StatusUpdateData)
#initialStatus
, $sel:intents:IdentifyData :: Maybe Intents
intents = Shard
shard Shard
-> Getting (Maybe Intents) Shard (Maybe Intents) -> Maybe Intents
forall s a. s -> Getting a s a -> a
^. IsLabel "intents" (Getting (Maybe Intents) Shard (Maybe Intents))
Getting (Maybe Intents) Shard (Maybe Intents)
#intents
})
ShardFlowControl
result <- Sem (Resource : r) ShardFlowControl -> Sem r ShardFlowControl
forall (r :: [(* -> *) -> * -> *]) a.
Member (Final IO) r =>
Sem (Resource : r) a -> Sem r a
P.resourceToIOFinal (Sem (Resource : r) ShardFlowControl -> Sem r ShardFlowControl)
-> Sem (Resource : r) ShardFlowControl -> Sem r ShardFlowControl
forall a b. (a -> b) -> a -> b
$ Sem (Resource : r) (TBMQueue ShardMsg)
-> (TBMQueue ShardMsg -> Sem (Resource : r) ())
-> (TBMQueue ShardMsg -> Sem (Resource : r) ShardFlowControl)
-> Sem (Resource : r) ShardFlowControl
forall (r :: [(* -> *) -> * -> *]) a c b.
MemberWithError Resource r =>
Sem r a -> (a -> Sem r c) -> (a -> Sem r b) -> Sem r b
P.bracket (IO (TBMQueue ShardMsg) -> Sem (Resource : r) (TBMQueue ShardMsg)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (TBMQueue ShardMsg) -> Sem (Resource : r) (TBMQueue ShardMsg))
-> IO (TBMQueue ShardMsg) -> Sem (Resource : r) (TBMQueue ShardMsg)
forall a b. (a -> b) -> a -> b
$ Int -> IO (TBMQueue ShardMsg)
forall a. Int -> IO (TBMQueue a)
newTBMQueueIO 1)
(IO () -> Sem (Resource : r) ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Resource : r) ())
-> (TBMQueue ShardMsg -> IO ())
-> TBMQueue ShardMsg
-> Sem (Resource : r) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (TBMQueue ShardMsg -> STM ()) -> TBMQueue ShardMsg -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TBMQueue ShardMsg -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue)
(\q :: TBMQueue ShardMsg
q -> do
Text -> Sem (Resource : r) ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "handling events now"
Async (Maybe ())
_controlThread <- Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ()))
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async (Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ())))
-> (IO () -> Sem (Resource : r) ())
-> IO ()
-> Sem (Resource : r) (Async (Maybe ()))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> Sem (Resource : r) ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Resource : r) (Async (Maybe ())))
-> IO () -> Sem (Resource : r) (Async (Maybe ()))
forall a b. (a -> b) -> a -> b
$ Shard -> TBMQueue ShardMsg -> IO ()
controlStream Shard
shard TBMQueue ShardMsg
q
Async (Maybe ())
_discordThread <- Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ()))
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async (Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ())))
-> Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ()))
forall a b. (a -> b) -> a -> b
$ Connection -> TBMQueue ShardMsg -> Sem (Resource : r) ()
forall (r :: [(* -> *) -> * -> *]).
Members '[LogEff, MetricEff, Embed IO, Final IO] r =>
Connection -> TBMQueue ShardMsg -> Sem r ()
discordStream Connection
ws TBMQueue ShardMsg
q
(Either ShardFlowControl Void -> ShardFlowControl
forall a. Either a Void -> a
fromEitherVoid (Either ShardFlowControl Void -> ShardFlowControl)
-> Sem (Resource : r) (Either ShardFlowControl Void)
-> Sem (Resource : r) ShardFlowControl
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Sem (Resource : r) (Either ShardFlowControl Void)
-> Sem (Resource : r) ShardFlowControl)
-> (Sem (Error ShardFlowControl : r) ()
-> Sem (Resource : r) (Either ShardFlowControl Void))
-> Sem (Error ShardFlowControl : r) ()
-> Sem (Resource : r) ShardFlowControl
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem r (Either ShardFlowControl Void)
-> Sem (Resource : r) (Either ShardFlowControl Void)
forall (e :: (* -> *) -> * -> *) (r :: [(* -> *) -> * -> *]) a.
Sem r a -> Sem (e : r) a
P.raise (Sem r (Either ShardFlowControl Void)
-> Sem (Resource : r) (Either ShardFlowControl Void))
-> (Sem (Error ShardFlowControl : r) ()
-> Sem r (Either ShardFlowControl Void))
-> Sem (Error ShardFlowControl : r) ()
-> Sem (Resource : r) (Either ShardFlowControl Void)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (Error ShardFlowControl : r) Void
-> Sem r (Either ShardFlowControl Void)
forall e (r :: [(* -> *) -> * -> *]) a.
Sem (Error e : r) a -> Sem r (Either e a)
P.runError (Sem (Error ShardFlowControl : r) Void
-> Sem r (Either ShardFlowControl Void))
-> (Sem (Error ShardFlowControl : r) ()
-> Sem (Error ShardFlowControl : r) Void)
-> Sem (Error ShardFlowControl : r) ()
-> Sem r (Either ShardFlowControl Void)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (Error ShardFlowControl : r) ()
-> Sem (Error ShardFlowControl : r) Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Sem (Error ShardFlowControl : r) ()
-> Sem (Resource : r) ShardFlowControl)
-> Sem (Error ShardFlowControl : r) ()
-> Sem (Resource : r) ShardFlowControl
forall a b. (a -> b) -> a -> b
$ do
Maybe ShardMsg
msg <- IO (Maybe ShardMsg)
-> Sem (Error ShardFlowControl : r) (Maybe ShardMsg)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (Maybe ShardMsg)
-> Sem (Error ShardFlowControl : r) (Maybe ShardMsg))
-> (STM (Maybe ShardMsg) -> IO (Maybe ShardMsg))
-> STM (Maybe ShardMsg)
-> Sem (Error ShardFlowControl : r) (Maybe ShardMsg)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Maybe ShardMsg) -> IO (Maybe ShardMsg)
forall a. STM a -> IO a
atomically (STM (Maybe ShardMsg)
-> Sem (Error ShardFlowControl : r) (Maybe ShardMsg))
-> STM (Maybe ShardMsg)
-> Sem (Error ShardFlowControl : r) (Maybe ShardMsg)
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> STM (Maybe ShardMsg)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue ShardMsg
q
ShardMsg -> Sem (Error ShardFlowControl : r) ()
forall (r :: [(* -> *) -> * -> *]).
(ShardC r, Member (Error ShardFlowControl) r) =>
ShardMsg -> Sem r ()
handleMsg (ShardMsg -> Sem (Error ShardFlowControl : r) ())
-> ShardMsg -> Sem (Error ShardFlowControl : r) ()
forall a b. (a -> b) -> a -> b
$ Maybe ShardMsg -> ShardMsg
forall a. HasCallStack => Maybe a -> a
fromJust Maybe ShardMsg
msg)
Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "Exiting inner loop of shard"
(ShardState -> ShardState) -> Sem r ()
forall s (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify (IsLabel
"wsConn"
(ASetter
ShardState ShardState (Maybe Connection) (Maybe Connection))
ASetter ShardState ShardState (Maybe Connection) (Maybe Connection)
#wsConn ASetter ShardState ShardState (Maybe Connection) (Maybe Connection)
-> Maybe Connection -> ShardState -> ShardState
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Maybe Connection
forall a. Maybe a
Nothing)
Sem r ()
forall (r :: [(* -> *) -> * -> *]). ShardC r => Sem r ()
haltHeartBeat
ShardFlowControl -> Sem r ShardFlowControl
forall (f :: * -> *) a. Applicative f => a -> f a
pure ShardFlowControl
result
handleMsg :: (ShardC r, P.Member (P.Error ShardFlowControl) r) => ShardMsg -> Sem r ()
handleMsg :: ShardMsg -> Sem r ()
handleMsg (Discord msg :: ReceivedDiscordMessage
msg) = case ReceivedDiscordMessage
msg of
EvtDispatch sn :: Int
sn data' :: DispatchData
data' -> do
(ShardState -> ShardState) -> Sem r ()
forall s (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify (IsLabel
"seqNum" (ASetter ShardState ShardState (Maybe Int) (Maybe Int))
ASetter ShardState ShardState (Maybe Int) (Maybe Int)
#seqNum ASetter ShardState ShardState (Maybe Int) (Maybe Int)
-> Int -> ShardState -> ShardState
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ Int
sn)
case DispatchData
data' of
Ready rdata' :: ReadyData
rdata' ->
(ShardState -> ShardState) -> Sem r ()
forall s (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify (IsLabel
"sessionID"
(ASetter ShardState ShardState (Maybe Text) (Maybe Text))
ASetter ShardState ShardState (Maybe Text) (Maybe Text)
#sessionID ASetter ShardState ShardState (Maybe Text) (Maybe Text)
-> Text -> ShardState -> ShardState
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ (ReadyData
rdata' ReadyData -> Getting Text ReadyData Text -> Text
forall s a. s -> Getting a s a -> a
^. IsLabel "sessionID" (Getting Text ReadyData Text)
Getting Text ReadyData Text
#sessionID))
_ -> () -> Sem r ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Shard
shard <- (ShardState -> Shard) -> Sem r Shard
forall s s' (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState -> Getting Shard ShardState Shard -> Shard
forall s a. s -> Getting a s a -> a
^. IsLabel "shardS" (Getting Shard ShardState Shard)
Getting Shard ShardState Shard
#shardS)
IO () -> Sem r ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ()) -> IO () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ InChan CalamityEvent -> CalamityEvent -> IO ()
forall a. InChan a -> a -> IO ()
UC.writeChan (Shard
shard Shard
-> Getting (InChan CalamityEvent) Shard (InChan CalamityEvent)
-> InChan CalamityEvent
forall s a. s -> Getting a s a -> a
^. IsLabel
"evtIn"
(Getting (InChan CalamityEvent) Shard (InChan CalamityEvent))
Getting (InChan CalamityEvent) Shard (InChan CalamityEvent)
#evtIn) (Int -> DispatchData -> CalamityEvent
Dispatch (Shard
shard Shard -> Getting Int Shard Int -> Int
forall s a. s -> Getting a s a -> a
^. IsLabel "shardID" (Getting Int Shard Int)
Getting Int Shard Int
#shardID) DispatchData
data')
HeartBeatReq -> do
Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "Received heartbeat request"
Sem r ()
forall (r :: [(* -> *) -> * -> *]). ShardC r => Sem r ()
sendHeartBeat
Reconnect -> do
Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "Being asked to restart by Discord"
ShardFlowControl -> Sem r ()
forall e (r :: [(* -> *) -> * -> *]) a.
MemberWithError (Error e) r =>
e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
InvalidSession resumable :: Bool
resumable -> do
if Bool
resumable
then do
Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
info "Received non-resumable invalid session, sleeping for 15 seconds then retrying"
(ShardState -> ShardState) -> Sem r ()
forall s (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify (IsLabel
"sessionID"
(ASetter ShardState ShardState (Maybe Text) (Maybe Text))
ASetter ShardState ShardState (Maybe Text) (Maybe Text)
#sessionID ASetter ShardState ShardState (Maybe Text) (Maybe Text)
-> Maybe Text -> ShardState -> ShardState
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Maybe Text
forall a. Maybe a
Nothing)
(ShardState -> ShardState) -> Sem r ()
forall s (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify (IsLabel
"seqNum" (ASetter ShardState ShardState (Maybe Int) (Maybe Int))
ASetter ShardState ShardState (Maybe Int) (Maybe Int)
#seqNum ASetter ShardState ShardState (Maybe Int) (Maybe Int)
-> Maybe Int -> ShardState -> ShardState
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Maybe Int
forall a. Maybe a
Nothing)
IO () -> Sem r ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ()) -> IO () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (15 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000)
else
Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
info "Received resumable invalid session"
ShardFlowControl -> Sem r ()
forall e (r :: [(* -> *) -> * -> *]) a.
MemberWithError (Error e) r =>
e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
Hello interval :: Int
interval -> do
Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
info (Text -> Sem r ()) -> Text -> Sem r ()
forall a b. (a -> b) -> a -> b
$ "Received hello, beginning to heartbeat at an interval of "Builder -> Builder -> Text
forall b. FromBuilder b => Builder -> Builder -> b
+|Int
intervalInt -> Builder -> Builder
forall a b. (Buildable a, FromBuilder b) => a -> Builder -> b
|+"ms"
Int -> Sem r ()
forall (r :: [(* -> *) -> * -> *]). ShardC r => Int -> Sem r ()
startHeartBeatLoop Int
interval
HeartBeatAck -> do
Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "Received heartbeat ack"
(ShardState -> ShardState) -> Sem r ()
forall s (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify (IsLabel "hbResponse" (ASetter ShardState ShardState Bool Bool)
ASetter ShardState ShardState Bool Bool
#hbResponse ASetter ShardState ShardState Bool Bool
-> Bool -> ShardState -> ShardState
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Bool
True)
handleMsg (Control msg :: ControlMessage
msg) = case ControlMessage
msg of
SendPresence data' :: StatusUpdateData
data' -> do
Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug (Text -> Sem r ()) -> Text -> Sem r ()
forall a b. (a -> b) -> a -> b
$ "Sending presence: ("Builder -> Builder -> Text
forall b. FromBuilder b => Builder -> Builder -> b
+||StatusUpdateData
data'StatusUpdateData -> Builder -> Builder
forall a b. (Show a, FromBuilder b) => a -> Builder -> b
||+")"
SentDiscordMessage -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
ShardC r =>
SentDiscordMessage -> Sem r ()
sendToWs (SentDiscordMessage -> Sem r ()) -> SentDiscordMessage -> Sem r ()
forall a b. (a -> b) -> a -> b
$ StatusUpdateData -> SentDiscordMessage
StatusUpdate StatusUpdateData
data'
RestartShard -> ShardFlowControl -> Sem r ()
forall e (r :: [(* -> *) -> * -> *]) a.
MemberWithError (Error e) r =>
e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
ShutDownShard -> ShardFlowControl -> Sem r ()
forall e (r :: [(* -> *) -> * -> *]) a.
MemberWithError (Error e) r =>
e -> Sem r a
P.throw ShardFlowControl
ShardFlowShutDown
startHeartBeatLoop :: ShardC r => Int -> Sem r ()
startHeartBeatLoop :: Int -> Sem r ()
startHeartBeatLoop interval :: Int
interval = do
Sem r ()
forall (r :: [(* -> *) -> * -> *]). ShardC r => Sem r ()
haltHeartBeat
Async (Maybe ())
thread <- Sem r () -> Sem r (Async (Maybe ()))
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async (Sem r () -> Sem r (Async (Maybe ())))
-> Sem r () -> Sem r (Async (Maybe ()))
forall a b. (a -> b) -> a -> b
$ Int -> Sem r ()
forall (r :: [(* -> *) -> * -> *]). ShardC r => Int -> Sem r ()
heartBeatLoop Int
interval
(ShardState -> ShardState) -> Sem r ()
forall s (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify (IsLabel
"hbThread"
(ASetter
ShardState
ShardState
(Maybe (Async (Maybe ())))
(Maybe (Async (Maybe ()))))
ASetter
ShardState
ShardState
(Maybe (Async (Maybe ())))
(Maybe (Async (Maybe ())))
#hbThread ASetter
ShardState
ShardState
(Maybe (Async (Maybe ())))
(Maybe (Async (Maybe ())))
-> Async (Maybe ()) -> ShardState -> ShardState
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ Async (Maybe ())
thread)
haltHeartBeat :: ShardC r => Sem r ()
haltHeartBeat :: Sem r ()
haltHeartBeat = do
Maybe (Async (Maybe ()))
thread <- forall a (r :: [(* -> *) -> * -> *]).
Member (AtomicState ShardState) r =>
(ShardState -> (ShardState, a)) -> Sem r a
forall s a (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> (s, a)) -> Sem r a
P.atomicState @ShardState ((ShardState -> (ShardState, Maybe (Async (Maybe ()))))
-> Sem r (Maybe (Async (Maybe ()))))
-> (State ShardState (Maybe (Async (Maybe ())))
-> ShardState -> (ShardState, Maybe (Async (Maybe ()))))
-> State ShardState (Maybe (Async (Maybe ())))
-> Sem r (Maybe (Async (Maybe ())))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Maybe (Async (Maybe ())), ShardState)
-> (ShardState, Maybe (Async (Maybe ())))
forall a b. (a, b) -> (b, a)
swap ((Maybe (Async (Maybe ())), ShardState)
-> (ShardState, Maybe (Async (Maybe ()))))
-> (ShardState -> (Maybe (Async (Maybe ())), ShardState))
-> ShardState
-> (ShardState, Maybe (Async (Maybe ())))
forall b c a. (b -> c) -> (a -> b) -> a -> c
.) ((ShardState -> (Maybe (Async (Maybe ())), ShardState))
-> ShardState -> (ShardState, Maybe (Async (Maybe ()))))
-> (State ShardState (Maybe (Async (Maybe ())))
-> ShardState -> (Maybe (Async (Maybe ())), ShardState))
-> State ShardState (Maybe (Async (Maybe ())))
-> ShardState
-> (ShardState, Maybe (Async (Maybe ())))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. State ShardState (Maybe (Async (Maybe ())))
-> ShardState -> (Maybe (Async (Maybe ())), ShardState)
forall s a. State s a -> s -> (a, s)
runState (State ShardState (Maybe (Async (Maybe ())))
-> Sem r (Maybe (Async (Maybe ()))))
-> State ShardState (Maybe (Async (Maybe ())))
-> Sem r (Maybe (Async (Maybe ())))
forall a b. (a -> b) -> a -> b
$ do
Maybe (Async (Maybe ()))
thread <- Getting
(Maybe (Async (Maybe ()))) ShardState (Maybe (Async (Maybe ())))
-> State ShardState (Maybe (Async (Maybe ())))
forall s (m :: * -> *) a. MonadState s m => Getting a s a -> m a
use IsLabel
"hbThread"
(Getting
(Maybe (Async (Maybe ()))) ShardState (Maybe (Async (Maybe ()))))
Getting
(Maybe (Async (Maybe ()))) ShardState (Maybe (Async (Maybe ())))
#hbThread
#hbThread .= Nothing
Maybe (Async (Maybe ()))
-> State ShardState (Maybe (Async (Maybe ())))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Async (Maybe ()))
thread
case Maybe (Async (Maybe ()))
thread of
Just t :: Async (Maybe ())
t -> do
Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "Stopping heartbeat thread"
IO () -> Sem r ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Async (Maybe ()) -> IO ()
forall a. Async a -> IO ()
cancel Async (Maybe ())
t)
Nothing -> () -> Sem r ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
sendHeartBeat :: ShardC r => Sem r ()
sendHeartBeat :: Sem r ()
sendHeartBeat = do
Maybe Int
sn <- (ShardState -> Maybe Int) -> Sem r (Maybe Int)
forall s s' (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Getting (Maybe Int) ShardState (Maybe Int) -> Maybe Int
forall s a. s -> Getting a s a -> a
^. IsLabel "seqNum" (Getting (Maybe Int) ShardState (Maybe Int))
Getting (Maybe Int) ShardState (Maybe Int)
#seqNum)
Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug (Text -> Sem r ()) -> Text -> Sem r ()
forall a b. (a -> b) -> a -> b
$ "Sending heartbeat (seq: " Builder -> Builder -> Text
forall b. FromBuilder b => Builder -> Builder -> b
+|| Maybe Int
sn Maybe Int -> Builder -> Builder
forall a b. (Show a, FromBuilder b) => a -> Builder -> b
||+ ")"
SentDiscordMessage -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
ShardC r =>
SentDiscordMessage -> Sem r ()
sendToWs (SentDiscordMessage -> Sem r ()) -> SentDiscordMessage -> Sem r ()
forall a b. (a -> b) -> a -> b
$ Maybe Int -> SentDiscordMessage
HeartBeat Maybe Int
sn
(ShardState -> ShardState) -> Sem r ()
forall s (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify (IsLabel "hbResponse" (ASetter ShardState ShardState Bool Bool)
ASetter ShardState ShardState Bool Bool
#hbResponse ASetter ShardState ShardState Bool Bool
-> Bool -> ShardState -> ShardState
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Bool
False)
heartBeatLoop :: ShardC r => Int -> Sem r ()
heartBeatLoop :: Int -> Sem r ()
heartBeatLoop interval :: Int
interval = Sem r (Either () Any) -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Sem r (Either () Any) -> Sem r ())
-> (Sem (Error () : r) () -> Sem r (Either () Any))
-> Sem (Error () : r) ()
-> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (Error () : r) Any -> Sem r (Either () Any)
forall e (r :: [(* -> *) -> * -> *]) a.
Sem (Error e : r) a -> Sem r (Either e a)
P.runError (Sem (Error () : r) Any -> Sem r (Either () Any))
-> (Sem (Error () : r) () -> Sem (Error () : r) Any)
-> Sem (Error () : r) ()
-> Sem r (Either () Any)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (Error () : r) () -> Sem (Error () : r) Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Sem (Error () : r) () -> Sem r ())
-> Sem (Error () : r) () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ do
Sem (Error () : r) ()
forall (r :: [(* -> *) -> * -> *]). ShardC r => Sem r ()
sendHeartBeat
IO () -> Sem (Error () : r) ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Error () : r) ())
-> (Int -> IO ()) -> Int -> Sem (Error () : r) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
threadDelay (Int -> Sem (Error () : r) ()) -> Int -> Sem (Error () : r) ()
forall a b. (a -> b) -> a -> b
$ Int
interval Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000
Sem (Error () : r) Bool
-> Sem (Error () : r) () -> Sem (Error () : r) ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
unlessM ((ShardState -> Bool) -> Sem (Error () : r) Bool
forall s s' (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState -> Getting Bool ShardState Bool -> Bool
forall s a. s -> Getting a s a -> a
^. IsLabel "hbResponse" (Getting Bool ShardState Bool)
Getting Bool ShardState Bool
#hbResponse)) (Sem (Error () : r) () -> Sem (Error () : r) ())
-> Sem (Error () : r) () -> Sem (Error () : r) ()
forall a b. (a -> b) -> a -> b
$ do
Text -> Sem (Error () : r) ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "No heartbeat response, restarting shard"
Connection
wsConn <- Maybe Connection -> Connection
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe Connection -> Connection)
-> Sem (Error () : r) (Maybe Connection)
-> Sem (Error () : r) Connection
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (ShardState -> Maybe Connection)
-> Sem (Error () : r) (Maybe Connection)
forall s s' (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Getting (Maybe Connection) ShardState (Maybe Connection)
-> Maybe Connection
forall s a. s -> Getting a s a -> a
^. IsLabel
"wsConn" (Getting (Maybe Connection) ShardState (Maybe Connection))
Getting (Maybe Connection) ShardState (Maybe Connection)
#wsConn)
IO () -> Sem (Error () : r) ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Error () : r) ()) -> IO () -> Sem (Error () : r) ()
forall a b. (a -> b) -> a -> b
$ Connection -> Word16 -> Text -> IO ()
forall a. WebSocketsData a => Connection -> Word16 -> a -> IO ()
sendCloseCode Connection
wsConn 4000 ("No heartbeat in time" :: Text)
() -> Sem (Error () : r) ()
forall e (r :: [(* -> *) -> * -> *]) a.
MemberWithError (Error e) r =>
e -> Sem r a
P.throw ()