module Remotion.Client (
Client,
run,
request,
Settings(..),
P.UserProtocolSignature,
URL(..),
P.Credentials(..),
Failure(..),
)
where
import Remotion.Util.Prelude hiding (traceIO, traceIOWithTime, State, listen, interact)
import qualified Remotion.Util.Prelude as Prelude
import qualified Remotion.Session as S
import qualified Remotion.Protocol as P
import qualified Control.Concurrent.Async.Lifted as A
import qualified Control.Concurrent.Lock as Lock
import qualified Network
import qualified Remotion.Util.FileSystem as FS
debugging = False
prefix = ("Client: " <>)
traceIO = if debugging
then Prelude.traceIO . prefix
else const $ return ()
traceIOWithTime = if debugging
then Prelude.traceIOWithTime . prefix
else const $ return ()
newtype Client i o m r =
Client { unClient :: ReaderT Env (EitherT Failure (S.Session m)) r }
deriving (Functor, Applicative, Monad, MonadIO, MonadError Failure)
type Env = (KeepaliveState, KeepaliveTimeout, Lock)
type KeepaliveState = MVar (Maybe UTCTime)
type KeepaliveTimeout = Int
type Lock = Lock.Lock
type Settings = (P.UserProtocolSignature, URL)
data URL =
Socket FilePath |
Host Text Int P.Credentials
instance MonadTrans (Client i o) where
lift = Client . lift . lift . lift
instance (MonadBase IO m) => MonadBase IO (Client i o m) where
liftBase = Client . liftBase
instance MonadTransControl (Client i o) where
newtype StT (Client i o) a = StT (StT S.Session (Either Failure a))
liftWith runInM = do
env <- Client $ ask
Client $ lift $ lift $ liftWith $ \runSession -> runInM $
liftM StT . runSession . runEitherT . flip runReaderT env . unClient
restoreT m = do
r <- Client $ lift $ lift $ do
StT r <- lift m
restoreT $ return $ r
either throwError return r
instance (MonadBaseControl IO m) => MonadBaseControl IO (Client i o m) where
newtype StM (Client i o m) a = StMT { unStMT :: ComposeSt (Client i o) m a }
liftBaseWith = defaultLiftBaseWith StMT
restoreM = defaultRestoreM unStMT
liftSession :: (Monad m) => S.Session m a -> Client i o m a
liftSession s = Client $ lift $ do
r <- lift $ catchError (liftM Right $ s) (return . Left . adaptSessionFailure)
hoistEither r
run ::
forall i o m r.
(Serializable IO i, Serializable IO o, MonadIO m, Applicative m,
MonadBaseControl IO m) =>
Settings -> Client i o m r -> m (Either Failure r)
run (userProtocolVersion, url) t =
runEitherT $ bracketME openSocket closeSocket $ \socket -> do
timeout <- runHandshake socket
lock <- liftIO $ Lock.new
runInteraction socket timeout lock
where
openSocket = do
traceIOWithTime "Opening socket"
openURLSocketIO url |> try |> liftIO >>= \case
Right r -> return r
Left e -> case ioeGetErrorType e of
NoSuchThing -> left $ UnreachableURL
_ -> $bug $ "Unexpected IOException: " <> (packText . show) e
closeSocket socket = do
traceIOWithTime $ "Closing socket " <> show socket
liftIO $ handle (const $ return () :: SomeException -> IO ()) $ hClose socket
runHandshake socket =
traceIOWithTime "Handshaking" >>
S.run session settings >>=
hoistEither . fmapL adaptSessionFailure >>=
hoistEither . fmapL adaptHandshakeFailure
where
session = runEitherT $ do
do
receiveFailure
do
send P.version
receiveFailure
do
send userProtocolVersion
receiveFailure
do
send credentials
receiveFailure
do
send (0::Int)
receive
where
send = lift . S.send
receive = lift S.receive
receiveFailure = receive >>= maybe (return ()) left
credentials = case url of
Socket _ -> Nothing
Host _ _ x -> x
settings = (socket, 10^6*3)
runInteraction socket timeout lock = do
traceIOWithTime $ "Interacting"
keepaliveState <- liftIO $ newMVar Nothing
join $ fmap hoistEither $ lift $ runStack socket keepaliveState timeout lock $ do
A.withAsync (finallyME (resetKeepalive *> t <* closeSession) stopKeepalive) $ \ta ->
A.withAsync (keepaliveLoop) $ \ka -> do
A.waitBoth ta ka >>= \(tr, kr) -> return tr
runStack ::
(MonadIO m) =>
S.Socket -> KeepaliveState -> KeepaliveTimeout -> Lock -> Client i o m r -> m (Either Failure r)
runStack socket keepaliveState keepaliveTimeout lock t =
if keepaliveTimeout < 10^3*100
then error $ "Too small keepalive timeout setting: " <> show keepaliveTimeout
else
unClient t |>
flip runReaderT (keepaliveState, keepaliveTimeout, lock) |>
runEitherT |>
flip S.run (socket, 10^6*30) |>
liftM (join . fmapL adaptSessionFailure)
openURLSocketIO :: URL -> IO Handle
openURLSocketIO = \case
Socket file ->
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
Network.connectTo "" (Network.UnixSocket $ FS.encodeString file)
#else
error "Socket used on Windows"
#endif
Host name port _ ->
Network.connectTo (unpackText name) (Network.PortNumber $ fromIntegral port)
stopKeepalive :: (MonadIO m) => Client i o m ()
stopKeepalive = do
traceIOWithTime "Stopping keepalive"
(state, _, _) <- Client $ ask
liftIO $ modifyMVar_ state $ const $ return Nothing
keepaliveLoop ::
(Applicative m, MonadIO m, Serializable IO o, Serializable IO i) =>
Client i o m ()
keepaliveLoop = do
(state, _, _) <- Client $ ask
(liftIO $ readMVar state) >>= \case
Nothing -> return ()
Just nextTime -> do
currentTime <- liftIO $ getCurrentTime
when (currentTime >= nextTime) $ checkIn
liftIO $ threadDelay $ 10^3 * 10
keepaliveLoop
reduceTimeout :: Int -> Int
reduceTimeout = floor . (*10^6) . curve 1.2 1.3 . (/(10^6)) . fromIntegral
where
curve bending startingStraightness x = x / exp (bending / (x + startingStraightness))
resetKeepalive :: (MonadIO m) => Client i o m ()
resetKeepalive = do
(state, timeout, _) <- Client $ ask
liftIO $ do
time <- getCurrentTime
let nextTime = (microsToDiff $ toInteger $ reduceTimeout timeout) `addUTCTime` time
modifyMVar_ state $ const $ return $ Just $ nextTime
interact ::
(Serializable IO o, Serializable IO i, MonadIO m, Applicative m) =>
P.Request i -> Client i o m (Maybe o)
interact = \request -> do
withLock $ send request >> receive >>= either (\f -> throwError $! adaptInteractionFailure f) return
where
withLock action = do
(_, _, l) <- Client ask
lock l
finallyME action (unlock l)
where
lock = Client . liftIO . Lock.acquire
unlock = Client . liftIO . Lock.release
send r =
traceIOWithTime "Sending" *>
(liftSession $ S.send r)
receive =
traceIOWithTime "Receiving" *>
liftSession S.receive
checkIn ::
(Serializable IO i, Serializable IO o, MonadIO m, Applicative m) =>
Client i o m ()
checkIn = do
traceIOWithTime "Performing keepalive request"
resetKeepalive
interact P.Keepalive >>= maybe (return ()) ($bug "Unexpected response")
closeSession ::
(Serializable IO i, Serializable IO o, MonadIO m, Applicative m) =>
Client i o m ()
closeSession =
traceIOWithTime "Closing session" >>
interact P.CloseSession >>=
maybe (return ()) ($bug "Unexpected response")
request ::
(Serializable IO i, Serializable IO o, MonadIO m, Applicative m) =>
i -> Client i o m o
request a = do
resetKeepalive
interact (P.UserRequest a) >>= maybe ($bug "Unexpected response") return
data Failure =
UnreachableURL |
ServerIsBusy |
Unauthenticated |
ConnectionInterrupted |
TimeoutReached Int |
ProtocolVersionMismatch Int Int |
UserProtocolSignatureMismatch ByteString ByteString |
CorruptRequest Text
deriving (Show, Read, Ord, Eq, Generic, Data, Typeable)
adaptHandshakeFailure :: P.HandshakeFailure -> Failure
adaptHandshakeFailure = \case
P.ServerIsBusy -> ServerIsBusy
P.ProtocolVersionMismatch c s -> ProtocolVersionMismatch c s
P.UserProtocolSignatureMismatch c s -> UserProtocolSignatureMismatch c s
P.Unauthenticated -> Unauthenticated
adaptInteractionFailure :: P.InteractionFailure -> Failure
adaptInteractionFailure = \case
P.CorruptRequest t -> CorruptRequest t
P.TimeoutReached t -> $bug $ "A connection keepalive timeout reached: " <> (packText . show) t
adaptSessionFailure :: S.Failure -> Failure
adaptSessionFailure = \case
S.ConnectionInterrupted -> ConnectionInterrupted
S.SendTimeoutReached t -> TimeoutReached t
S.ReceiveTimeoutReached t -> TimeoutReached t
S.CorruptData t -> $bug $ "Corrupt server response: " <> t