module Web.Dependencies.Sparrow.Client where
import Web.Dependencies.Sparrow.Types
( Client, ClientArgs (..), ClientReturn (..)
, Topic (..), WSIncoming (..), WSOutgoing (..), WithTopic (..), WithSessionID (..)
)
import Web.Dependencies.Sparrow.Session (SessionID (..))
import Web.Dependencies.Sparrow.Client.Types
( SparrowClientT (..), Env (..), ask', RegisteredTopicSubscriptions
, callReject, callOnReceive, registerSubscription, removeSubscription
, SparrowClientException (..)
)
import Data.URI (URI, printURI)
import Data.URI.Auth (URIAuth (..))
import Data.URI.Auth.Host (printURIAuthHost)
import Data.Url (packLocation)
import qualified Data.Text as T
import qualified Data.Strict.Maybe as Strict
import Data.Aeson (FromJSON, ToJSON, Value)
import qualified Data.Aeson as Aeson
import Data.Singleton.Class (Extractable (runSingleton))
import Data.UUID.V4 (nextRandom)
import qualified Data.HashMap.Strict as HM
import Data.HashSet (HashSet)
import qualified Data.HashSet as HS
import Control.Monad (forever, void)
import Control.Monad.Trans (MonadTrans (lift))
import Control.Monad.Trans.Control (MonadBaseControl)
import qualified Control.Monad.Trans.Control.Aligned as Aligned
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Catch (MonadCatch, MonadThrow (..))
import Control.Monad.Trans.Reader (ReaderT (runReaderT))
import Control.Concurrent.Async (Async, async, cancel, wait)
import Control.Concurrent.STM ( TMVar, TVar, TChan, atomically, newEmptyTMVarIO, newTChanIO
, readTChan, writeTChan, takeTMVar, putTMVar, tryTakeTMVar
, newTVarIO, readTVar, modifyTVar')
import Control.Concurrent.STM.TMapMVar (newTMapMVar)
import Control.DeepSeq (NFData (rnf))
import Control.Exception (evaluate)
import Path (Path, Abs, Rel, File, toFilePath, parseRelFile, (</>), parent, dirname, absdir)
import Path.Extended (Location, fromPath, (<&>))
import System.IO.Unsafe (unsafePerformIO)
import Network.Wai.Trans (runClientAppT)
import Network.WebSockets (runClient)
import Network.WebSockets.Simple (WebSocketsApp (..), WebSocketsAppParams (..), toClientAppT, expBackoffStrategy)
import Network.WebSockets.Simple.PingPong (pingPong)
import Network.HTTP.Client (parseRequest, newManager, defaultManagerSettings, method, requestBody, responseBody, responseStatus, httpLbs, RequestBody (RequestBodyLBS))
import Network.HTTP.Client.TLS (newTlsManager)
import Network.HTTP.Types (status200)
import Wuss (runSecureClient)
unpackClient :: forall m stM initIn initOut deltaIn deltaOut
. MonadIO m
=> MonadThrow m
=> Aligned.MonadBaseControl IO m stM
=> Extractable stM
=> ToJSON initIn
=> FromJSON initOut
=> ToJSON deltaIn
=> FromJSON deltaOut
=> Topic
-> Client m initIn initOut deltaIn deltaOut
-> SparrowClientT m ()
unpackClient topic client = do
env@Env{envSendInit,envSendDelta} <- ask'
lift $ Aligned.liftBaseWith $ \runInBase -> do
let runM :: forall b. m b -> IO b
runM x = runSingleton <$> runInBase x
threadVar <- newEmptyTMVarIO
thread <- async $ runM $ client $ \ClientArgs{clientReceive,clientInitIn,clientOnReject} -> do
mInitOut <- envSendInit topic (Aeson.toJSON clientInitIn)
case mInitOut of
Nothing -> do
_ <- throwM InitOutFailed
pure Nothing
Just v -> case Aeson.fromJSON v of
Aeson.Error e -> do
_ <- throwM (InitOutDecodingError e)
pure Nothing
Aeson.Success (initOut :: initOut) -> do
let clientUnsubscribe = do
envSendDelta (WSUnsubscribe topic)
liftIO $ do
thread <- atomically $ do
removeSubscription env topic
takeTMVar threadVar
cancel thread
clientSendCurrent =
envSendDelta . WSIncoming . WithTopic topic . Aeson.toJSON
clientReturn = ClientReturn
{ clientSendCurrent
, clientUnsubscribe
, clientInitOut = initOut
}
liftIO $ atomically $
let go :: Value -> m ()
go v' = case Aeson.fromJSON v' of
Aeson.Error e ->
throwM (DeltaOutDecodingError e)
Aeson.Success (deltaOut :: deltaOut) ->
clientReceive clientReturn deltaOut
in registerSubscription env topic
go
( do clientOnReject
liftIO $ do
mThread <- atomically $ tryTakeTMVar threadVar
case mThread of
Nothing -> pure ()
Just thread -> cancel thread
)
pure (Just clientReturn)
atomically (putTMVar threadVar thread)
allocateDependencies :: forall m stM a
. MonadIO m
=> MonadBaseControl IO m
=> Aligned.MonadBaseControl IO m stM
=> MonadCatch m
=> Extractable stM
=> Bool
-> URIAuth
-> SparrowClientT m a
-> m ()
allocateDependencies tls auth@(URIAuth _ host port) SparrowClientT{runSparrowClientT} = Aligned.liftBaseWith $ \runInBase -> do
let path = [absdir|/dependencies/|]
runM :: forall b. m b -> IO b
runM x = runSingleton <$> runInBase x
httpURI :: Topic -> URI
httpURI (Topic topic) =
packLocation (Strict.Just (if tls then "https" else "http")) True auth $
fromPath $ path </> unsafePerformIO (parseRelFile $ T.unpack $ T.intercalate "/" topic)
sessionID <- SessionID <$> nextRandom
let q = T.pack $ toFilePath $ dirname path
file <- parseRelFile $ T.unpack $ T.take (T.length q 1) q
let runWS :: WebSocketsApp m (WSOutgoing (WithTopic Value)) (WSIncoming (WithTopic Value)) -> IO ()
runWS x = do
let loc :: Location Abs File
loc = fromPath (parent path </> file)
<&> ("sessionID", Just (show sessionID))
f | tls = runSecureClient (T.unpack $ printURIAuthHost host) (Strict.maybe 80 fromIntegral port) (show loc)
| otherwise = runClient (T.unpack $ printURIAuthHost host) (Strict.maybe 80 fromIntegral port) (show loc)
x' <- runM $ pingPong ((10^6) * 10) x
f $ runClientAppT runM $ toClientAppT x'
( toWSThread :: TMVar (Async ())
) <- newEmptyTMVarIO
( toWS :: TChan (WSIncoming (WithTopic Value))
) <- newTChanIO
( envSubscriptions :: RegisteredTopicSubscriptions m
) <- atomically newTMapMVar
( pendingTopicsAdded :: TVar (HashSet Topic)
) <- newTVarIO HS.empty
( pendingTopicsRemoved :: TVar (HashSet Topic)
) <- newTVarIO HS.empty
httpManager <-
if tls
then newTlsManager
else newManager defaultManagerSettings
let env :: Env m
env = Env
{ envSendInit = \topic initIn -> liftIO $ do
atomically $ modifyTVar' pendingTopicsAdded (HS.insert topic)
req <- parseRequest $ T.unpack $ printURI $ httpURI topic
let req' = req
{ method = "POST"
, requestBody = RequestBodyLBS $ Aeson.encode WithSessionID
{ withSessionIDSessionID = sessionID
, withSessionIDContent = initIn
}
}
response <- httpLbs req' httpManager
if responseStatus response == status200
then case Aeson.eitherDecode (responseBody response) of
Right (initOut :: Value) -> pure (Just initOut)
Left e -> do
_ <- throwM (InitOutDecodingError e)
pure Nothing
else do
_ <- throwM InitOutHTTPError
pure Nothing
, envSendDelta = \x -> do
case x of
WSUnsubscribe topic -> liftIO $ atomically $ modifyTVar' pendingTopicsRemoved (HS.insert topic)
_ -> pure ()
liftIO $ atomically $ writeTChan toWS x
, envSubscriptions
}
( attemptWS :: TChan ()
) <- newTChanIO
atomically $ writeTChan attemptWS ()
backoff <- expBackoffStrategy $ atomically $ writeTChan attemptWS ()
ws <- async $ forever $ do
atomically (readTChan attemptWS)
runWS WebSocketsApp
{ onOpen = \WebSocketsAppParams{send} -> do
liftIO $ do
thread <- async $ forever $ do
outgoing <- atomically (readTChan toWS)
runM (send outgoing)
atomically (putTMVar toWSThread thread)
, onReceive = \_ r -> do
liftIO $ evaluate (rnf r)
case r of
WSTopicsSubscribed topics -> pure ()
WSTopicAdded topic -> do
liftIO $ do
exists <- atomically $ HS.member topic <$> readTVar pendingTopicsAdded
if exists
then atomically $ modifyTVar' pendingTopicsAdded (HS.delete topic)
else throwM (UnexpectedAddedTopic topic)
WSTopicRemoved topic -> do
liftIO $ do
exists <- atomically $ HS.member topic <$> readTVar pendingTopicsRemoved
if exists
then atomically $ modifyTVar' pendingTopicsRemoved (HS.delete topic)
else throwM (UnexpectedRemovedTopic topic)
WSTopicRejected topic -> callReject env topic
WSDecodingError err -> throwM (NetworkingDecodingError err)
WSOutgoing (WithTopic topic v) -> callOnReceive env topic v
, onClose = \_ e ->
liftIO $ do
thread <- atomically (takeTMVar toWSThread)
cancel thread
backoff e
}
z <- runM (runReaderT runSparrowClientT env)
evaluate z
wait ws