{-# LANGUAGE DataKinds #-} {-# LANGUAGE QuasiQuotes #-} module Neptune.AbortHandler where import Control.Lens (_Right, (^?!)) import Control.Retry (constantDelay, recoverAll) import Data.Aeson (FromJSON (..), Value (..), eitherDecode', (.:)) import Data.Aeson.Types (prependFailure, typeMismatch) import Data.Text.Encoding (encodeUtf8) import qualified Network.WebSockets as WS import RIO import RIO.Text as T (pack, unpack) import Text.URI (RText, RTextLabel (..), mkURI) import Text.URI.Lens import qualified Wuss import Neptune.Backend.Core (configHost) import Neptune.Backend.Model (ExperimentId (..)) import Neptune.OAuth (oas_access_token) import Neptune.Session (ClientToken (..), Experiment, NeptuneSession (..), exp_experiment_id) abortListener :: NeptuneSession -> Experiment -> ThreadId -> IO () abortListener :: NeptuneSession -> Experiment -> ThreadId -> IO () abortListener NeptuneSession sess Experiment exp ThreadId main_thread = do URI base_url <- Text -> IO URI forall (m :: * -> *). MonadThrow m => Text -> m URI mkURI (NeptuneSession sess NeptuneSession -> (NeptuneSession -> ClientToken) -> ClientToken forall a b. a -> (a -> b) -> b & NeptuneSession -> ClientToken _neptune_client_token ClientToken -> (ClientToken -> Text) -> Text forall a b. a -> (a -> b) -> b & ClientToken -> Text _ct_api_url) let host :: Text host = URI base_url URI -> Getting (Endo Text) URI Text -> Text forall s a. HasCallStack => s -> Getting (Endo a) s a -> a ^?! (Either Bool Authority -> Const (Endo Text) (Either Bool Authority)) -> URI -> Const (Endo Text) URI Lens' URI (Either Bool Authority) uriAuthority ((Either Bool Authority -> Const (Endo Text) (Either Bool Authority)) -> URI -> Const (Endo Text) URI) -> ((Text -> Const (Endo Text) Text) -> Either Bool Authority -> Const (Endo Text) (Either Bool Authority)) -> Getting (Endo Text) URI Text forall b c a. (b -> c) -> (a -> b) -> a -> c . (Authority -> Const (Endo Text) Authority) -> Either Bool Authority -> Const (Endo Text) (Either Bool Authority) forall c a b. Prism (Either c a) (Either c b) a b _Right ((Authority -> Const (Endo Text) Authority) -> Either Bool Authority -> Const (Endo Text) (Either Bool Authority)) -> ((Text -> Const (Endo Text) Text) -> Authority -> Const (Endo Text) Authority) -> (Text -> Const (Endo Text) Text) -> Either Bool Authority -> Const (Endo Text) (Either Bool Authority) forall b c a. (b -> c) -> (a -> b) -> a -> c . (RText 'Host -> Const (Endo Text) (RText 'Host)) -> Authority -> Const (Endo Text) Authority Lens' Authority (RText 'Host) authHost ((RText 'Host -> Const (Endo Text) (RText 'Host)) -> Authority -> Const (Endo Text) Authority) -> ((Text -> Const (Endo Text) Text) -> RText 'Host -> Const (Endo Text) (RText 'Host)) -> (Text -> Const (Endo Text) Text) -> Authority -> Const (Endo Text) Authority forall b c a. (b -> c) -> (a -> b) -> a -> c . (Text -> Const (Endo Text) Text) -> RText 'Host -> Const (Endo Text) (RText 'Host) forall (l :: RTextLabel). Getter (RText l) Text unRText path :: String path = String notif_path String -> String -> String forall a. Semigroup a => a -> a -> a <> String exp_id String -> String -> String forall a. Semigroup a => a -> a -> a <> String "/operations" :: String oauth_ref :: MVar OAuth2Session oauth_ref = NeptuneSession sess NeptuneSession -> (NeptuneSession -> MVar OAuth2Session) -> MVar OAuth2Session forall a b. a -> (a -> b) -> b & NeptuneSession -> MVar OAuth2Session _neptune_oauth2 run_listener :: IO () run_listener = do OAuth2Session oauth_current <- MVar OAuth2Session -> IO OAuth2Session forall (m :: * -> *) a. MonadIO m => MVar a -> m a readMVar MVar OAuth2Session oauth_ref let oauth_token :: Text oauth_token = OAuth2Session oauth_current OAuth2Session -> Getting Text OAuth2Session Text -> Text forall s a. s -> Getting a s a -> a ^. Getting Text OAuth2Session Text Lens' OAuth2Session Text oas_access_token String -> PortNumber -> String -> ConnectionOptions -> Headers -> ClientApp () -> IO () forall a. String -> PortNumber -> String -> ConnectionOptions -> Headers -> ClientApp a -> IO a Wuss.runSecureClientWith (Text -> String T.unpack Text host) PortNumber port String path ConnectionOptions WS.defaultConnectionOptions [(CI ByteString "Authorization", ByteString "Bearer " ByteString -> ByteString -> ByteString forall a. Semigroup a => a -> a -> a <> Text -> ByteString encodeUtf8 Text oauth_token)] (ThreadId -> ClientApp () listener ThreadId main_thread) RetryPolicyM IO -> (RetryStatus -> IO ()) -> IO () forall (m :: * -> *) a. (MonadIO m, MonadMask m) => RetryPolicyM m -> (RetryStatus -> m a) -> m a recoverAll (Int -> RetryPolicy constantDelay Int 500) ((RetryStatus -> IO ()) -> IO ()) -> (RetryStatus -> IO ()) -> IO () forall a b. (a -> b) -> a -> b $ \RetryStatus _ -> IO () run_listener where port :: PortNumber port = PortNumber 443 notif_path :: String notif_path = String "/api/notifications/v1/experiments/" exp_id :: String exp_id = Text -> String T.unpack (Text -> String) -> Text -> String forall a b. (a -> b) -> a -> b $ ExperimentId -> Text unExperimentId (ExperimentId -> Text) -> ExperimentId -> Text forall a b. (a -> b) -> a -> b $ Experiment exp Experiment -> Getting ExperimentId Experiment ExperimentId -> ExperimentId forall s a. s -> Getting a s a -> a ^. Getting ExperimentId Experiment ExperimentId Lens' Experiment ExperimentId exp_experiment_id data Message = MessageAbort { Message -> Text _msg_experiment_id :: Text } instance FromJSON Message where parseJSON :: Value -> Parser Message parseJSON (Object Object v) = do Text typ <- Object v Object -> Text -> Parser Text forall a. FromJSON a => Object -> Text -> Parser a .: Text "messageType" case Text typ of Text "Abort" -> do Object body <- Object v Object -> Text -> Parser Object forall a. FromJSON a => Object -> Text -> Parser a .: Text "messageBody" Text -> Message MessageAbort (Text -> Message) -> Parser Text -> Parser Message forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> Object body Object -> Text -> Parser Text forall a. FromJSON a => Object -> Text -> Parser a .: Text "experimentId" Text _ -> String -> Parser Message forall (m :: * -> *) a. MonadFail m => String -> m a fail (String -> Parser Message) -> String -> Parser Message forall a b. (a -> b) -> a -> b $ String "Unsupported message type" String -> String -> String forall a. Semigroup a => a -> a -> a <> (Text -> String T.unpack Text typ) parseJSON Value invalid = String -> Parser Message -> Parser Message forall a. String -> Parser a -> Parser a prependFailure String "parsing Message failed, " (String -> Value -> Parser Message forall a. String -> Value -> Parser a typeMismatch String "Object" Value invalid) data AbortException = AbortException deriving Int -> AbortException -> String -> String [AbortException] -> String -> String AbortException -> String (Int -> AbortException -> String -> String) -> (AbortException -> String) -> ([AbortException] -> String -> String) -> Show AbortException forall a. (Int -> a -> String -> String) -> (a -> String) -> ([a] -> String -> String) -> Show a showList :: [AbortException] -> String -> String $cshowList :: [AbortException] -> String -> String show :: AbortException -> String $cshow :: AbortException -> String showsPrec :: Int -> AbortException -> String -> String $cshowsPrec :: Int -> AbortException -> String -> String Show instance Exception AbortException listener :: ThreadId -> WS.ClientApp () listener :: ThreadId -> ClientApp () listener ThreadId main_thread Connection conn = IO () -> IO () forall (f :: * -> *) a b. Applicative f => f a -> f b forever (IO () -> IO ()) -> IO () -> IO () forall a b. (a -> b) -> a -> b $ do ByteString msg <- Connection -> IO ByteString forall a. WebSocketsData a => Connection -> IO a WS.receiveData Connection conn case ByteString -> Either String Message forall a. FromJSON a => ByteString -> Either String a eitherDecode' ByteString msg of Right (MessageAbort Text _) -> ThreadId -> AbortException -> IO () forall e (m :: * -> *). (Exception e, MonadIO m) => ThreadId -> e -> m () throwTo ThreadId main_thread AbortException AbortException Left String msg -> Text -> IO () forall (f :: * -> *). Applicative f => Text -> f () traceM (String -> Text T.pack String msg) IO () -> IO () -> IO () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> () -> IO () forall (m :: * -> *) a. Monad m => a -> m a return ()