{-# LANGUAGE DataKinds #-}
{-# LANGUAGE QuasiQuotes #-}
module Neptune.AbortHandler where
import Control.Lens (_Right, (^?!))
import Control.Retry (constantDelay, recoverAll)
import Data.Aeson (FromJSON (..), Value (..),
eitherDecode', (.:))
import Data.Aeson.Types (prependFailure, typeMismatch)
import Data.Text.Encoding (encodeUtf8)
import qualified Network.WebSockets as WS
import RIO
import RIO.Text as T (pack, unpack)
import Text.URI (mkURI)
import Text.URI.Lens
import qualified Wuss
import Neptune.Backend.Model (ExperimentId (..))
import Neptune.OAuth (oas_access_token)
import Neptune.Session (Experiment, NeptuneSession, ct_api_url,
exp_experiment_id, neptune_client_token,
neptune_oauth2)
abortListener :: NeptuneSession -> Experiment -> ThreadId -> IO ()
abortListener :: NeptuneSession -> Experiment -> ThreadId -> IO ()
abortListener NeptuneSession
sess Experiment
exp ThreadId
main_thread = do
URI
base_url <- Text -> IO URI
forall (m :: * -> *). MonadThrow m => Text -> m URI
mkURI (NeptuneSession
sess NeptuneSession -> Getting Text NeptuneSession Text -> Text
forall s a. s -> Getting a s a -> a
^. (ClientToken -> Const Text ClientToken)
-> NeptuneSession -> Const Text NeptuneSession
Lens' NeptuneSession ClientToken
neptune_client_token ((ClientToken -> Const Text ClientToken)
-> NeptuneSession -> Const Text NeptuneSession)
-> ((Text -> Const Text Text)
-> ClientToken -> Const Text ClientToken)
-> Getting Text NeptuneSession Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> Const Text Text) -> ClientToken -> Const Text ClientToken
Lens' ClientToken Text
ct_api_url)
let host :: Text
host = URI
base_url URI -> Getting (Endo Text) URI Text -> Text
forall s a. HasCallStack => s -> Getting (Endo a) s a -> a
^?! (Either Bool Authority
-> Const (Endo Text) (Either Bool Authority))
-> URI -> Const (Endo Text) URI
Lens' URI (Either Bool Authority)
uriAuthority ((Either Bool Authority
-> Const (Endo Text) (Either Bool Authority))
-> URI -> Const (Endo Text) URI)
-> ((Text -> Const (Endo Text) Text)
-> Either Bool Authority
-> Const (Endo Text) (Either Bool Authority))
-> Getting (Endo Text) URI Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Authority -> Const (Endo Text) Authority)
-> Either Bool Authority
-> Const (Endo Text) (Either Bool Authority)
forall c a b. Prism (Either c a) (Either c b) a b
_Right ((Authority -> Const (Endo Text) Authority)
-> Either Bool Authority
-> Const (Endo Text) (Either Bool Authority))
-> ((Text -> Const (Endo Text) Text)
-> Authority -> Const (Endo Text) Authority)
-> (Text -> Const (Endo Text) Text)
-> Either Bool Authority
-> Const (Endo Text) (Either Bool Authority)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (RText 'Host -> Const (Endo Text) (RText 'Host))
-> Authority -> Const (Endo Text) Authority
Lens' Authority (RText 'Host)
authHost ((RText 'Host -> Const (Endo Text) (RText 'Host))
-> Authority -> Const (Endo Text) Authority)
-> ((Text -> Const (Endo Text) Text)
-> RText 'Host -> Const (Endo Text) (RText 'Host))
-> (Text -> Const (Endo Text) Text)
-> Authority
-> Const (Endo Text) Authority
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> Const (Endo Text) Text)
-> RText 'Host -> Const (Endo Text) (RText 'Host)
forall (l :: RTextLabel). Getter (RText l) Text
unRText
path :: String
path = String
notif_path String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
exp_id String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"/operations" :: String
oauth_ref :: MVar OAuth2Session
oauth_ref = NeptuneSession
sess NeptuneSession
-> Getting (MVar OAuth2Session) NeptuneSession (MVar OAuth2Session)
-> MVar OAuth2Session
forall s a. s -> Getting a s a -> a
^. Getting (MVar OAuth2Session) NeptuneSession (MVar OAuth2Session)
Lens' NeptuneSession (MVar OAuth2Session)
neptune_oauth2
run_listener :: IO ()
run_listener = do
OAuth2Session
oauth_current <- MVar OAuth2Session -> IO OAuth2Session
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
readMVar MVar OAuth2Session
oauth_ref
let oauth_token :: Text
oauth_token = OAuth2Session
oauth_current OAuth2Session -> Getting Text OAuth2Session Text -> Text
forall s a. s -> Getting a s a -> a
^. Getting Text OAuth2Session Text
Lens' OAuth2Session Text
oas_access_token
String
-> PortNumber
-> String
-> ConnectionOptions
-> Headers
-> ClientApp ()
-> IO ()
forall a.
String
-> PortNumber
-> String
-> ConnectionOptions
-> Headers
-> ClientApp a
-> IO a
Wuss.runSecureClientWith
(Text -> String
T.unpack Text
host) PortNumber
port String
path
ConnectionOptions
WS.defaultConnectionOptions
[(CI ByteString
"Authorization", ByteString
"Bearer " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Text -> ByteString
encodeUtf8 Text
oauth_token)]
(ThreadId -> ClientApp ()
listener ThreadId
main_thread)
RetryPolicyM IO -> (RetryStatus -> IO ()) -> IO ()
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m -> (RetryStatus -> m a) -> m a
recoverAll (Int -> RetryPolicy
constantDelay Int
500000) ((RetryStatus -> IO ()) -> IO ())
-> (RetryStatus -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \RetryStatus
_ -> IO ()
run_listener
where
port :: PortNumber
port = PortNumber
443
notif_path :: String
notif_path = String
"/api/notifications/v1/experiments/"
exp_id :: String
exp_id = Text -> String
T.unpack (Text -> String) -> Text -> String
forall a b. (a -> b) -> a -> b
$ ExperimentId -> Text
unExperimentId (ExperimentId -> Text) -> ExperimentId -> Text
forall a b. (a -> b) -> a -> b
$ Experiment
exp Experiment
-> Getting ExperimentId Experiment ExperimentId -> ExperimentId
forall s a. s -> Getting a s a -> a
^. Getting ExperimentId Experiment ExperimentId
Lens' Experiment ExperimentId
exp_experiment_id
data Message = MessageAbort { Message -> Text
_msg_experiment_id :: Text }
instance FromJSON Message where
parseJSON :: Value -> Parser Message
parseJSON (Object Object
v) = do
Text
typ <- Object
v Object -> Text -> Parser Text
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"messageType"
case Text
typ of
Text
"Abort" -> do
Object
body <- Object
v Object -> Text -> Parser Object
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"messageBody"
Text -> Message
MessageAbort (Text -> Message) -> Parser Text -> Parser Message
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
body Object -> Text -> Parser Text
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"experimentId"
Text
_ -> String -> Parser Message
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Parser Message) -> String -> Parser Message
forall a b. (a -> b) -> a -> b
$ String
"Unsupported message type" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> (Text -> String
T.unpack Text
typ)
parseJSON Value
invalid =
String -> Parser Message -> Parser Message
forall a. String -> Parser a -> Parser a
prependFailure String
"parsing Message failed, "
(String -> Value -> Parser Message
forall a. String -> Value -> Parser a
typeMismatch String
"Object" Value
invalid)
data AbortException = AbortException
deriving Int -> AbortException -> String -> String
[AbortException] -> String -> String
AbortException -> String
(Int -> AbortException -> String -> String)
-> (AbortException -> String)
-> ([AbortException] -> String -> String)
-> Show AbortException
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [AbortException] -> String -> String
$cshowList :: [AbortException] -> String -> String
show :: AbortException -> String
$cshow :: AbortException -> String
showsPrec :: Int -> AbortException -> String -> String
$cshowsPrec :: Int -> AbortException -> String -> String
Show
instance Exception AbortException
listener :: ThreadId -> WS.ClientApp ()
listener :: ThreadId -> ClientApp ()
listener ThreadId
main_thread Connection
conn = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
ByteString
msg <- Connection -> IO ByteString
forall a. WebSocketsData a => Connection -> IO a
WS.receiveData Connection
conn
case ByteString -> Either String Message
forall a. FromJSON a => ByteString -> Either String a
eitherDecode' ByteString
msg of
Right (MessageAbort Text
_) -> ThreadId -> AbortException -> IO ()
forall e (m :: * -> *).
(Exception e, MonadIO m) =>
ThreadId -> e -> m ()
throwTo ThreadId
main_thread AbortException
AbortException
Left String
msg -> Text -> IO ()
forall (f :: * -> *). Applicative f => Text -> f ()
traceM (String -> Text
T.pack String
msg) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()