{-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE RecursiveDo #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE ViewPatterns #-} -- | -- Module: Capnp.Rpc.Untyped -- Description: Core of the RPC subsystem. -- -- This module does not deal with schema-level concepts; all capabilities, -- methods etc. as used here are untyped. module Capnp.Rpc.Untyped ( -- * Connections to other vats ConnConfig(..) , handleConn -- * Clients for capabilities , Client , call , nullClient , newPromiseClient , IsClient(..) -- * Promise pipelining , Pipeline , walkPipelinePtr , pipelineClient , waitPipeline -- * Exporting local objects , export , clientMethodHandler -- * Unwrapping local clients , unwrapServer -- * Waiting for resolution , waitClient -- * Errors , RpcError(..) , R.Exception(..) , R.Exception'Type(..) -- * Shutting down the connection ) where import Control.Concurrent.STM import Control.Monad.STM.Class import Control.Monad.Trans.Class import Data.Word import Control.Concurrent (threadDelay) import Control.Concurrent.Async (concurrently_, race_) import Control.Concurrent.MVar (MVar, newEmptyMVar) import Control.Exception.Safe ( Exception , MonadThrow , SomeException , bracket , finally , fromException , throwIO , throwM , try ) import Control.Monad (forever, void, when) import Data.Default (Default(def)) import Data.Foldable (for_, toList, traverse_) import Data.Hashable (Hashable, hash, hashWithSalt) import Data.Maybe (catMaybes, fromMaybe) import Data.String (fromString) import Data.Text (Text) import Data.Typeable (Typeable) import GHC.Generics (Generic) import Supervisors (Supervisor, superviseSTM, withSupervisor) import System.Mem.StableName (StableName, hashStableName, makeStableName) import System.Timeout (timeout) import qualified Data.Vector as V import qualified Focus import qualified ListT import qualified StmContainers.Map as M import Capnp.Classes (cerialize, decerialize, fromStruct, new, toStruct) import Capnp.Convert (valueToMsg) import Capnp.Message (Message, Mutability(..)) import Capnp.Rpc.Errors ( eDisconnected , eFailed , eMethodUnimplemented , eUnimplemented , wrapException ) import Capnp.Rpc.Promise (Fulfiller, breakOrFulfill, breakPromise, fulfill, newCallback) import Capnp.Rpc.Transport (Transport(recvMsg, sendMsg)) import Capnp.TraversalLimit (LimitT, defaultLimit, evalLimitT) import Data.Mutable (thaw) import Internal.BuildPure (createPure) import Internal.Rc (Rc) import Internal.SnocList (SnocList) import qualified Capnp.Gen.Capnp.Rpc as RawRpc import qualified Capnp.Gen.Capnp.Rpc.Pure as R import qualified Capnp.Message as Message import qualified Capnp.Rpc.Server as Server import qualified Capnp.Untyped as UntypedRaw import qualified Capnp.Untyped.Pure as Untyped import qualified Internal.Rc as Rc import qualified Internal.SnocList as SnocList import qualified Internal.TCloseQ as TCloseQ import qualified Lifetimes.Gc as Fin -- Note [Organization] -- =================== -- -- As much as possible, the logic in this module is centralized according to -- type types of objects it concerns. -- -- As an example, consider how we handle embargos: The 'Conn' type's 'embargos' -- table has values that are just 'Fulfiller's. This allows the code which triggers -- sending embargoes to have full control over what happens when they return, -- while the code that routes incoming messages (in 'coordinator') doesn't need -- to concern itself with the details of embargos -- it just needs to route them -- to the right place. -- -- This approach generally results in better separation of concerns. -- Note [Level 3] -- -- This is currently a level 1 implementation, so use of most level 3 features -- results in sending abort messages. However, to make adding this support -- easier later, we mark such places with a cross-reference back to this note. -- -- In addition to filling in those spots, the following will need to be dealt -- with: -- -- * The "Tribble 4-way Race Condition" as documented in rpc.capnp. This -- doesn't affect level 1 implementations, but right now we shorten N-hop -- paths of promises to 1-hop, (calls on Ready PromiseClients just -- immediately call the target), which is unsafe in a level 3 -- implementation. See the protocol documentation for more info. -- | We use this type often enough that the types get noisy without a shorthand: type RawMPtr = Maybe (UntypedRaw.Ptr 'Const) -- | Errors which can be thrown by the rpc system. data RpcError = ReceivedAbort R.Exception -- ^ The remote vat sent us an abort message. | SentAbort R.Exception -- ^ We sent an abort to the remote vat. deriving(Show, Eq, Generic) makeAbortExn :: Bool -> SomeException -> RpcError makeAbortExn debugMode e = fromMaybe (SentAbort (wrapException debugMode e)) (fromException e) instance Exception RpcError newtype EmbargoId = EmbargoId { embargoWord :: Word32 } deriving(Eq, Hashable) newtype QAId = QAId { qaWord :: Word32 } deriving(Eq, Hashable) newtype IEId = IEId { ieWord :: Word32 } deriving(Eq, Hashable) -- We define these to just show the number; the derived instances would include -- data constructors, which is a bit weird since these show up in output that -- is sometimes shown to users. instance Show QAId where show = show . qaWord instance Show IEId where show = show . ieWord -- | A connection to a remote vat data Conn = Conn { stableName :: StableName (MVar ()) -- So we can use the connection as a map key. The MVar used to create -- this is just an arbitrary value; the only property we care about -- is that it is distinct for each 'Conn', so we use something with -- reference semantics to guarantee this. , debugMode :: !Bool -- whether to include extra (possibly sensitive) info in error messages. , liveState :: TVar LiveState } data LiveState = Live Conn' | Dead data Conn' = Conn' { sendQ :: TBQueue (Message 'Const) , recvQ :: TBQueue (Message 'Const) -- queues of messages to send and receive; each of these has a dedicated -- thread doing the IO (see 'sendLoop' and 'recvLoop'): , supervisor :: Supervisor -- Supervisor managing the lifetimes of threads bound to this connection. , questionIdPool :: IdPool , exportIdPool :: IdPool -- Pools of identifiers for new questions and exports , questions :: M.Map QAId EntryQA , answers :: M.Map QAId EntryQA , exports :: M.Map IEId EntryE , imports :: M.Map IEId EntryI , embargos :: M.Map EmbargoId (Fulfiller ()) -- Outstanding embargos. When we receive a 'Disembargo' message with its -- context field set to receiverLoopback, we look up the embargo id in -- this table, and fulfill the promise. , pendingCallbacks :: TQueue (IO ()) -- See Note [callbacks] , bootstrap :: Maybe Client -- The capability which should be served as this connection's bootstrap -- interface (if any). } instance Eq Conn where x == y = stableName x == stableName y instance Hashable Conn where hash Conn{stableName} = hashStableName stableName hashWithSalt _ = hash -- | Configuration information for a connection. data ConnConfig = ConnConfig { maxQuestions :: !Word32 -- ^ The maximum number of simultanious outstanding requests to the peer -- vat. Once this limit is reached, further questsions will block until -- some of the existing questions have been answered. -- -- Defaults to 128. , maxExports :: !Word32 -- ^ The maximum number of objects which may be exported on this connection. -- -- Defaults to 8192. , debugMode :: !Bool -- ^ In debug mode, errors reported by the RPC system to its peers will -- contain extra information. This should not be used in production, as -- it is possible for these messages to contain sensitive information, -- but it can be useful for debugging. -- -- Defaults to 'False'. , getBootstrap :: Supervisor -> STM (Maybe Client) -- ^ Get the bootstrap interface we should serve for this connection. -- the argument is a supervisor whose lifetime is bound to the -- connection. If 'getBootstrap' returns 'Nothing', we will respond -- to bootstrap messages with an exception. -- -- The default always returns 'Nothing'. -- -- 'getBootstrap' MUST NOT block; the connection will not be serviced -- and 'withBootstrap' will not be run until this returns. If you need -- to supply the bootstrap interface later, use 'newPromiseClient'. , withBootstrap :: Maybe (Supervisor -> Client -> IO ()) -- ^ An action to perform with access to the remote vat's bootstrap -- interface. The supervisor argument is bound to the lifetime of the -- connection. If this is 'Nothing' (the default), the bootstrap -- interface will not be requested. } instance Default ConnConfig where def = ConnConfig { maxQuestions = 128 , maxExports = 8192 , debugMode = False , getBootstrap = \_ -> pure Nothing , withBootstrap = Nothing } -- | Queue an IO action to be run some time after this transaction commits. -- See Note [callbacks]. queueIO :: Conn' -> IO () -> STM () queueIO Conn'{pendingCallbacks} = writeTQueue pendingCallbacks -- | Queue another transaction to be run some time after this transaction -- commits, in a thread bound to the lifetime of the connection. If this is -- called multiple times within the same transaction, each of the -- transactions will be run separately, in the order they were queued. -- -- See Note [callbacks] queueSTM :: Conn' -> STM () -> STM () queueSTM conn = queueIO conn . atomically -- | @'mapQueueSTM' conn fs val@ queues the list of transactions obtained -- by applying each element of @fs@ to @val@. mapQueueSTM :: Conn' -> SnocList (a -> STM ()) -> a -> STM () mapQueueSTM conn fs x = traverse_ (\f -> queueSTM conn (f x)) fs -- Note [callbacks] -- ================ -- -- There are many places where we want to register some code to run after -- some later event has happened -- for exmaple: -- -- * We send a Call to the remote vat, and when a corresponding Return message -- is received, we want to fulfill (or break) the local promise for the -- result. -- * We send a Disembargo (with senderLoopback set), and want to actually lift -- the embargo when the corresponding (receiverLoopback) message arrives. -- -- Keeping the two parts of these patterns together tends to result in better -- separation of concerns, and is easier to maintain. -- -- To achieve this, the four tables and other connection state have fields in -- which callbacks can be registered -- for example, an outstanding question has -- fields containing transactions to run when the return and/or finish messages -- arrive. -- -- When it is time to actually run these, we want to make sure that each of them -- runs as their own transaction. If, for example, when registering a callback to -- run when a return message is received, we find that the return message is -- already available, it might be tempting to just run the transaction immediately. -- But this means that the synchronization semantics are totally different from the -- case where the callback really does get run later! -- -- In addition, we sometimes want to register a finalizer inside a transaction, -- but this can only be done in IO. -- -- To solve these issues, the connection maintains a queue of all callback actions -- that are ready to run, and when the event a callback is waiting for occurs, we -- simply move the callback to the queue, using 'queueIO' or 'queueSTM'. When the -- connection starts up, it creates a thread running 'callbacksLoop', which just -- continually flushes the queue, running the actions in the queue. -- | Get a new question id. retries if we are out of available question ids. newQuestion :: Conn' -> STM QAId newQuestion = fmap QAId . newId . questionIdPool -- | Return a question id to the pool of available ids. freeQuestion :: Conn' -> QAId -> STM () freeQuestion conn = freeId (questionIdPool conn) . qaWord -- | Get a new export id. retries if we are out of available export ids. newExport :: Conn' -> STM IEId newExport = fmap IEId . newId . exportIdPool -- | Return a export id to the pool of available ids. freeExport :: Conn' -> IEId -> STM () freeExport conn = freeId (exportIdPool conn) . ieWord -- | Get a new embargo id. This shares the same pool as questions. newEmbargo :: Conn' -> STM EmbargoId newEmbargo = fmap EmbargoId . newId . questionIdPool -- | Return an embargo id. to the available pool. freeEmbargo :: Conn' -> EmbargoId -> STM () freeEmbargo conn = freeId (exportIdPool conn) . embargoWord -- | Handle a connection to another vat. Returns when the connection is closed. handleConn :: Transport -> ConnConfig -> IO () handleConn transport cfg@ConnConfig { maxQuestions , maxExports , withBootstrap , debugMode } = withSupervisor $ \sup -> bracket (newConn sup) stopConn runConn where newConn sup = do stableName <- makeStableName =<< newEmptyMVar atomically $ do bootstrap <- getBootstrap cfg sup questionIdPool <- newIdPool maxQuestions exportIdPool <- newIdPool maxExports sendQ <- newTBQueue $ fromIntegral maxQuestions recvQ <- newTBQueue $ fromIntegral maxQuestions questions <- M.new answers <- M.new exports <- M.new imports <- M.new embargos <- M.new pendingCallbacks <- newTQueue let conn' = Conn' { supervisor = sup , questionIdPool , exportIdPool , recvQ , sendQ , questions , answers , exports , imports , embargos , pendingCallbacks , bootstrap } liveState <- newTVar (Live conn') let conn = Conn { stableName , debugMode , liveState } pure (conn, conn') runConn (conn, conn') = do result <- try $ ( coordinator conn `concurrently_` sendLoop transport conn' `concurrently_` recvLoop transport conn' `concurrently_` callbacksLoop conn' ) `race_` useBootstrap conn conn' case result of Left (SentAbort e) -> do -- We need to actually send it: rawMsg <- createPure maxBound $ valueToMsg $ R.Message'abort e void $ timeout 1000000 $ sendMsg transport rawMsg throwIO $ SentAbort e Left e -> throwIO e Right _ -> pure () stopConn ( conn@Conn{liveState} , conn'@Conn'{questions, exports, embargos} ) = do atomically $ do let walk table = flip ListT.traverse_ (M.listT table) -- drop the bootstrap interface: case bootstrap conn' of Just (Client (Just client')) -> dropConnExport conn client' _ -> pure () -- Remove everything from the exports table: walk exports $ \(_, EntryE{client}) -> dropConnExport conn client -- Outstanding questions should all throw disconnected: walk questions $ \(qid, entry) -> let raiseDisconnected onReturn = mapQueueSTM conn' onReturn $ Return { answerId = qid , releaseParamCaps = False , union' = Return'exception eDisconnected } in case entry of NewQA{onReturn} -> raiseDisconnected onReturn HaveFinish{onReturn} -> raiseDisconnected onReturn _ -> pure () -- same thing with embargos: walk embargos $ \(_, fulfiller) -> breakPromise fulfiller eDisconnected -- mark the connection as dead, making the live state inaccessible: writeTVar liveState Dead -- Make sure any pending callbacks get run. This is important, since -- some of these do things like raise disconnected exceptions. -- -- FIXME: there's a race condition that we're not dealing with: -- if the callbacks loop is killed between dequeuing an action and -- performing it that action will be lost. flushCallbacks conn' useBootstrap conn conn' = case withBootstrap of Nothing -> forever $ threadDelay maxBound Just f -> atomically (requestBootstrap conn) >>= f (supervisor conn') -- | A pool of ids; used when choosing identifiers for questions and exports. newtype IdPool = IdPool (TVar [Word32]) -- | @'newIdPool' size@ creates a new pool of ids, with @size@ available ids. newIdPool :: Word32 -> STM IdPool newIdPool size = IdPool <$> newTVar [0..size-1] -- | Get a new id from the pool. Retries if the pool is empty. newId :: IdPool -> STM Word32 newId (IdPool pool) = readTVar pool >>= \case [] -> retry (id:ids) -> do writeTVar pool $! ids pure id -- | Return an id to the pool. freeId :: IdPool -> Word32 -> STM () freeId (IdPool pool) id = modifyTVar' pool (id:) -- | An entry in our questions or answers table. data EntryQA -- | An entry for which we have neither sent/received a finish, nor -- a return. Contains two sets of callbacks, to invoke on each type -- of message. = NewQA { onFinish :: SnocList (R.Finish -> STM ()) , onReturn :: SnocList (Return -> STM ()) } -- | An entry for which we've sent/received a return, but not a finish. -- Contains the return message, and a set of callbacks to invoke on the -- finish. | HaveReturn { returnMsg :: Return , onFinish :: SnocList (R.Finish -> STM ()) } -- | An entry for which we've sent/received a finish, but not a return. -- Contains the finish message, and a set of callbacks to invoke on the -- return. | HaveFinish { finishMsg :: R.Finish , onReturn :: SnocList (Return -> STM ()) } -- | An entry in our imports table. data EntryI = EntryI { localRc :: Rc () -- ^ A refcount cell with a finalizer attached to it; when the finalizer -- runs it will remove this entry from the table and send a release -- message to the remote vat. , remoteRc :: !Word32 -- ^ The reference count for this object as understood by the remote -- vat. This tells us what to send in the release message's count field. , proxies :: ExportMap -- ^ See Note [proxies] -- , promiseState :: Maybe ( TVar PromiseState , TmpDest -- origTarget field. TODO(cleanup): clean this up a bit. ) -- ^ If this entry is a promise, this will contain the state of that -- promise, so that it may be used to create PromiseClients and -- update the promise when it resolves. } -- | An entry in our exports table. data EntryE = EntryE { client :: Client' -- ^ The client. We cache it in the table so there's only one object -- floating around, which lets us attach a finalizer without worrying -- about it being run more than once. , refCount :: !Word32 -- ^ The refcount for this entry. This lets us know when we can drop -- the entry from the table. } -- | Types which may be converted to and from 'Client's. Typically these -- will be simple type wrappers for capabilities. class IsClient a where -- | Convert a value to a client. toClient :: a -> Client -- | Convert a client to a value. fromClient :: Client -> a instance IsClient Client where toClient = id fromClient = id instance Show Client where show (Client Nothing) = "nullClient" show _ = "({- capability; not statically representable -})" -- | A reference to a capability, which may be live either in the current vat -- or elsewhere. Holding a client affords making method calls on a capability -- or modifying the local vat's reference count to it. newtype Client = -- We wrap the real client in a Maybe, with Nothing representing a 'null' -- capability. Client (Maybe Client') deriving(Eq) -- | A non-null client. data Client' -- | A client pointing at a capability local to our own vat. = LocalClient { exportMap :: ExportMap -- ^ Record of what export IDs this client has on different remote -- connections. , qCall :: Rc (Server.CallInfo -> STM ()) -- ^ Queue a call for the local capability to handle. This is wrapped -- in a reference counted cell, whose finalizer stops the server. , finalizerKey :: Fin.Cell () -- ^ Finalizer key; when this is collected, qCall will be released. , unwrapper :: forall a. Typeable a => Maybe a } -- | A client which will resolve to some other capability at -- some point. | PromiseClient { pState :: TVar PromiseState -- ^ The current state of the promise; the indirection allows -- the promise to be updated. , exportMap :: ExportMap , origTarget :: TmpDest -- ^ The original target of this promise, before it was resolved. -- (if it is still in the pending state, it will match the TmpDest -- stored there). -- -- FIXME: if this is an ImportDest, by holding on to this we actually -- leak the cap. } -- | A client which points to a (resolved) capability in a remote vat. | ImportClient (Fin.Cell ImportRef) -- | A 'Pipeline' is a reference to a value within a message that has not yet arrived. data Pipeline = Pipeline { state :: TVar PipelineState , steps :: SnocList Word16 } data PipelineState = PendingRemotePipeline { answerId :: !QAId , clientMap :: M.Map (SnocList Word16) Client , conn :: Conn } | PendingLocalPipeline (SnocList (Fulfiller RawMPtr)) | ReadyPipeline (Either R.Exception RawMPtr) -- | 'walkPipleinePtr' follows a pointer starting from the object referred to by the -- 'Pipeline'. The 'Pipeline' must refer to a struct, and the pointer is referred to -- by its index into the struct's pointer section. walkPipelinePtr :: Pipeline -> Word16 -> Pipeline walkPipelinePtr p@Pipeline{steps} step = p { steps = SnocList.snoc steps step } -- | Convert a 'Pipeline' into a 'Client', which can be used to send messages to the -- referant of the 'Pipeline', using promise pipelining. pipelineClient :: MonadSTM m => Pipeline -> m Client pipelineClient Pipeline{state, steps} = liftSTM $ do readTVar state >>= \case PendingRemotePipeline{answerId, clientMap, conn} -> do maybeClient <- M.lookup steps clientMap case maybeClient of Nothing -> do client <- promisedAnswerClient conn PromisedAnswer { answerId, transform = steps } M.insert client steps clientMap pure client Just client -> pure client PendingLocalPipeline subscribers -> do (ret, retFulfiller) <- newPromiseClient ptrFulfiller <- newCallback $ \r -> do writeTVar state (ReadyPipeline r) case r of Left e -> breakPromise retFulfiller e Right v -> (ptrPathClient (toList steps) v >>= fulfill retFulfiller) `catchSTM` (breakPromise retFulfiller . wrapException False) writeTVar state $ PendingLocalPipeline $ SnocList.snoc subscribers ptrFulfiller pure ret ReadyPipeline r -> do -- TODO(cleanup): factor out the commonalities between this and the above case. (p, f) <- newPromiseClient case r of Left e -> breakPromise f e >> pure p Right v -> ptrPathClient (toList steps) v `catchSTM` (\e -> do breakPromise f (wrapException False e) pure p) -- | Wait for the pipeline's target to resolve, and return the corresponding -- pointer. waitPipeline :: MonadSTM m => Pipeline -> m RawMPtr waitPipeline Pipeline{state, steps} = liftSTM $ do s <- readTVar state case s of ReadyPipeline (Left e) -> throwM e ReadyPipeline (Right v) -> evalLimitT defaultLimit $ followPtrs (toList steps) v _ -> retry promisedAnswerClient :: Conn -> PromisedAnswer -> STM Client promisedAnswerClient conn answer@PromisedAnswer{answerId, transform} = do let tmpDest = RemoteDest AnswerDest { conn, answer } pState <- newTVar Pending { tmpDest } exportMap <- ExportMap <$> M.new let client = Client $ Just PromiseClient { pState , exportMap , origTarget = tmpDest } readTVar (liveState conn) >>= \case Dead -> resolveClientExn tmpDest (writeTVar pState) eDisconnected Live conn'@Conn'{questions} -> subscribeReturn "questions" conn' questions answerId $ resolveClientReturn tmpDest (writeTVar pState) conn' (toList transform) pure client -- | The current state of a 'PromiseClient'. data PromiseState -- | The promise is fully resolved. = Ready { target :: Client -- ^ Capability to which the promise resolved. } -- | The promise has resolved, but is waiting on a Disembargo message -- before it is safe to send it messages. | Embargo { callBuffer :: TQueue Server.CallInfo -- ^ A queue in which to buffer calls while waiting for the -- disembargo. } -- | The promise has not yet resolved. | Pending { tmpDest :: TmpDest -- ^ A temporary destination to send calls, while we wait for the -- promise to resolve. } -- | The promise resolved to an exception. | Error R.Exception -- | A temporary destination for calls on an unresolved promise. data TmpDest -- | A destination that is local to this vat. = LocalDest LocalDest -- | A destination in another vat. | RemoteDest RemoteDest newtype LocalDest -- | Queue the calls in a buffer. = LocalBuffer { callBuffer :: TQueue Server.CallInfo } data RemoteDest -- | Send call messages to a remote vat, targeting the results -- of an outstanding question. = AnswerDest { conn :: Conn -- ^ The connection to the remote vat. , answer :: PromisedAnswer -- ^ The answer to target. } -- | Send call messages to a remote vat, targeting an entry in our -- imports table. | ImportDest (Fin.Cell ImportRef) -- | A reference to a capability in our import table/a remote vat's export -- table. data ImportRef = ImportRef { conn :: Conn -- ^ The connection to the remote vat. , importId :: !IEId -- ^ The import id for this capability. , proxies :: ExportMap -- ^ Export ids to use when this client is passed to a vat other than -- the one identified by 'conn'. See Note [proxies] } -- Ideally we could just derive these, but stm-containers doesn't have Eq -- instances, so neither does ExportMap. not all of the fields are actually -- necessary to check equality though. See also -- https://github.com/nikita-volkov/stm-hamt/pull/1 instance Eq ImportRef where ImportRef { conn=cx, importId=ix } == ImportRef { conn=cy, importId=iy } = cx == cy && ix == iy instance Eq Client' where LocalClient { qCall = x } == LocalClient { qCall = y } = x == y PromiseClient { pState = x } == PromiseClient { pState = y } = x == y ImportClient x == ImportClient y = x == y _ == _ = False -- | an 'ExportMap' tracks a mapping from connections to export IDs; it is -- used to ensure that we re-use export IDs for capabilities when passing -- them to remote vats. This used for locally hosted capabilities, but also -- by proxied imports (see Note [proxies]). newtype ExportMap = ExportMap (M.Map Conn IEId) -- The below correspond to the similarly named types in -- rpc.capnp, except: -- -- * They use our newtype wrappers for ids -- * They don't have unknown variants -- * AnyPointers are left un-parsed -- * PromisedAnswer's transform field is just a list of pointer offsets, -- rather than a union with no other actually-useful variants. -- * PromisedAnswer's transform field is a SnocList, for efficient appending. data MsgTarget = ImportTgt !IEId | AnswerTgt PromisedAnswer data PromisedAnswer = PromisedAnswer { answerId :: !QAId , transform :: SnocList Word16 } data Call = Call { questionId :: !QAId , target :: !MsgTarget , interfaceId :: !Word64 , methodId :: !Word16 , params :: !Payload } data Return = Return { answerId :: !QAId , releaseParamCaps :: !Bool , union' :: Return' } data Return' = Return'results Payload | Return'exception R.Exception | Return'canceled | Return'resultsSentElsewhere | Return'takeFromOtherQuestion QAId | Return'acceptFromThirdParty RawMPtr data Payload = Payload { content :: RawMPtr , capTable :: V.Vector R.CapDescriptor } -- Note [proxies] -- ============== -- -- It is possible to have multiple connections open at once, and pass around -- clients freely between them. Without level 3 support, this means that when -- we pass a capability pointing into Vat A to another Vat B, we must proxy it. -- -- To achieve this, capabilities pointing into a remote vat hold an 'ExportMap', -- which tracks which export IDs we should be using to proxy the client on each -- connection. -- | Queue a call on a client. call :: MonadSTM m => Server.CallInfo -> Client -> m Pipeline call Server.CallInfo { response } (Client Nothing) = liftSTM $ do breakPromise response eMethodUnimplemented state <- newTVar $ ReadyPipeline (Left eMethodUnimplemented) pure Pipeline{state, steps = mempty} call info@Server.CallInfo { response } (Client (Just client')) = liftSTM $ do (localPipeline, response') <- makeLocalPipeline response let info' = info { Server.response = response' } case client' of LocalClient { qCall } -> do Rc.get qCall >>= \case Just q -> do q info' Nothing -> breakPromise response' eDisconnected pure localPipeline PromiseClient { pState } -> readTVar pState >>= \case Ready { target } -> call info target Embargo { callBuffer } -> do writeTQueue callBuffer info' pure localPipeline Pending { tmpDest } -> case tmpDest of LocalDest LocalBuffer { callBuffer } -> do writeTQueue callBuffer info' pure localPipeline RemoteDest AnswerDest { conn, answer } -> callRemote conn info $ AnswerTgt answer RemoteDest (ImportDest cell) -> do ImportRef { conn, importId } <- Fin.readCell cell callRemote conn info $ ImportTgt importId Error exn -> do breakPromise response' exn pure localPipeline ImportClient cell -> do ImportRef { conn, importId } <- Fin.readCell cell callRemote conn info (ImportTgt importId) makeLocalPipeline :: Fulfiller RawMPtr -> STM (Pipeline, Fulfiller RawMPtr) makeLocalPipeline f = do state <- newTVar $ PendingLocalPipeline mempty f' <- newCallback $ \r -> do s <- readTVar state case s of PendingLocalPipeline fs -> do writeTVar state (ReadyPipeline r) breakOrFulfill f r traverse_ (`breakOrFulfill` r) fs _ -> -- TODO(cleanup): refactor so we don't need this case. error "impossible" pure (Pipeline{state, steps = mempty}, f') -- | Send a call to a remote capability. callRemote :: Conn -> Server.CallInfo -> MsgTarget -> STM Pipeline callRemote conn Server.CallInfo{ interfaceId, methodId, arguments, response } target = do conn'@Conn'{questions} <- getLive conn qid <- newQuestion conn' payload@Payload{capTable} <- makeOutgoingPayload conn arguments sendCall conn' Call { questionId = qid , target = target , params = payload , interfaceId , methodId } -- save these in case the callee sends back releaseParamCaps = True in the return -- message: let paramCaps = catMaybes $ flip map (V.toList capTable) $ \R.CapDescriptor{union'} -> case union' of R.CapDescriptor'senderHosted eid -> Just (IEId eid) R.CapDescriptor'senderPromise eid -> Just (IEId eid) _ -> Nothing clientMap <- M.new rp <- newTVar PendingRemotePipeline { answerId = qid , clientMap , conn } response' <- newCallback $ \r -> do breakOrFulfill response r case r of Left e -> writeTVar rp $ ReadyPipeline (Left e) Right v -> writeTVar rp $ ReadyPipeline (Right v) M.insert NewQA { onReturn = SnocList.singleton $ cbCallReturn paramCaps conn response' , onFinish = SnocList.empty } qid questions pure Pipeline { state = rp, steps = mempty } -- | Callback to run when a return comes in that corresponds to a call -- we sent. Registered in callRemote. The first argument is a list of -- export IDs to release if the return message has releaseParamCaps = true. cbCallReturn :: [IEId] -> Conn -> Fulfiller RawMPtr -> Return -> STM () cbCallReturn paramCaps conn response Return{ answerId, union', releaseParamCaps } = do conn'@Conn'{answers} <- getLive conn when releaseParamCaps $ traverse_ (releaseExport conn 1) paramCaps case union' of Return'exception exn -> breakPromise response exn Return'results Payload{ content } -> fulfill response content Return'canceled -> breakPromise response $ eFailed "Canceled" Return'resultsSentElsewhere -> -- This should never happen, since we always set -- sendResultsTo = caller abortConn conn' $ eFailed $ mconcat [ "Received Return.resultsSentElswhere for a call " , "with sendResultsTo = caller." ] Return'takeFromOtherQuestion qid -> -- TODO(cleanup): we should be a little stricter; the protocol -- requires that (1) each answer is only used this way once, and -- (2) The question was sent with sendResultsTo set to 'yourself', -- but we don't enforce either of these requirements. subscribeReturn "answer" conn' answers qid $ cbCallReturn [] conn response Return'acceptFromThirdParty _ -> -- Note [Level 3] abortConn conn' $ eUnimplemented "This vat does not support level 3." -- Defer this until after any other callbacks run, in case disembargos -- need to be sent due to promise resolutions that we triggered: queueSTM conn' $ finishQuestion conn' def { R.questionId = qaWord answerId , R.releaseResultCaps = False } marshalMsgTarget :: MsgTarget -> R.MessageTarget marshalMsgTarget = \case ImportTgt importId -> R.MessageTarget'importedCap (ieWord importId) AnswerTgt tgt -> R.MessageTarget'promisedAnswer $ marshalPromisedAnswer tgt marshalPromisedAnswer :: PromisedAnswer -> R.PromisedAnswer marshalPromisedAnswer PromisedAnswer{ answerId, transform } = R.PromisedAnswer { R.questionId = qaWord answerId , R.transform = V.fromList $ map R.PromisedAnswer'Op'getPointerField $ toList transform } unmarshalPromisedAnswer :: MonadThrow m => R.PromisedAnswer -> m PromisedAnswer unmarshalPromisedAnswer R.PromisedAnswer { questionId, transform } = do idxes <- unmarshalOps (toList transform) pure PromisedAnswer { answerId = QAId questionId , transform = SnocList.fromList idxes } unmarshalOps :: MonadThrow m => [R.PromisedAnswer'Op] -> m [Word16] unmarshalOps [] = pure [] unmarshalOps (R.PromisedAnswer'Op'noop:ops) = unmarshalOps ops unmarshalOps (R.PromisedAnswer'Op'getPointerField i:ops) = (i:) <$> unmarshalOps ops unmarshalOps (R.PromisedAnswer'Op'unknown' tag:_) = throwM $ eFailed $ "Unknown PromisedAnswer.Op: " <> fromString (show tag) -- | A null client. This is the only client value that can be represented -- statically. Throws exceptions in response to all method calls. nullClient :: Client nullClient = Client Nothing -- | Create a new client based on a promise. The fulfiller can be used to -- supply the final client. newPromiseClient :: (MonadSTM m, IsClient c) => m (c, Fulfiller c) newPromiseClient = liftSTM $ do callBuffer <- newTQueue let tmpDest = LocalDest LocalBuffer { callBuffer } pState <- newTVar Pending { tmpDest } exportMap <- ExportMap <$> M.new f <- newCallback $ \case Left e -> resolveClientExn tmpDest (writeTVar pState) e Right v -> resolveClientClient tmpDest (writeTVar pState) (toClient v) let p = Client $ Just $ PromiseClient { pState , exportMap , origTarget = tmpDest } pure (fromClient p, f) -- | Attempt to unwrap a client, to get at an underlying value from the -- server. Returns 'Nothing' on failure. -- -- This shells out to the underlying server's implementation of -- 'Server.unwrap'. It will fail with 'Nothing' if any of these are true: -- -- * The client is a promise. -- * The client points to an object in a remote vat. -- * The underlying Server's 'unwrap' method returns 'Nothing' for type 'a'. unwrapServer :: (IsClient c, Typeable a) => c -> Maybe a unwrapServer c = case toClient c of Client (Just LocalClient { unwrapper }) -> unwrapper _ -> Nothing -- | Wait for the client to be fully resolved, and then return a client -- pointing directly to the destination. -- -- If the argument is null, a local client, or a (permanent) remote client, -- this returns the argument immediately. If the argument is a promise client, -- then this waits for the promise to resolve and returns the result of -- the resolution. If the promise resolves to *another* promise, then this waits -- for that promise to also resolve. -- -- If the promise is rejected, then this throws the corresponding exception. waitClient :: (IsClient c, MonadSTM m) => c -> m c waitClient client = liftSTM $ case toClient client of Client Nothing -> pure client Client (Just LocalClient{}) -> pure client Client (Just ImportClient{}) -> pure client Client (Just PromiseClient{pState}) -> do state <- readTVar pState case state of Ready{target} -> fromClient <$> waitClient target Error e -> throwSTM e Pending{} -> retry Embargo{} -> retry -- | Spawn a local server with its lifetime bound to the supervisor, -- and return a client for it. When the client is garbage collected, -- the server will be stopped (if it is still running). export :: MonadSTM m => Supervisor -> Server.ServerOps IO -> m Client export sup ops = liftSTM $ do q <- TCloseQ.new qCall <- Rc.new (TCloseQ.write q) (TCloseQ.close q) exportMap <- ExportMap <$> M.new finalizerKey <- Fin.newCell () let client' = LocalClient { qCall , exportMap , finalizerKey , unwrapper = Server.handleCast ops } superviseSTM sup ((do Fin.addFinalizer finalizerKey $ atomically $ Rc.release qCall Server.runServer q ops) `finally` Server.handleStop ops) pure $ Client (Just client') clientMethodHandler :: Word64 -> Word16 -> Client -> Server.MethodHandler IO p r clientMethodHandler interfaceId methodId client = Server.fromUntypedHandler $ Server.untypedHandler $ \arguments response -> atomically $ void $ call Server.CallInfo{..} client -- | See Note [callbacks] callbacksLoop :: Conn' -> IO () callbacksLoop Conn'{pendingCallbacks} = forever $ do cbs <- atomically $ flushTQueue pendingCallbacks >>= \case -- We need to make sure to block if there weren't any jobs, since -- otherwise we'll busy loop, pegging the CPU. [] -> retry cbs -> pure cbs sequence_ cbs -- Run the one iteration of the callbacks loop, without blocking. flushCallbacks :: Conn' -> IO () flushCallbacks Conn'{pendingCallbacks} = atomically (flushTQueue pendingCallbacks) >>= sequence_ -- | 'sendLoop' shunts messages from the send queue into the transport. sendLoop :: Transport -> Conn' -> IO () sendLoop transport Conn'{sendQ} = forever $ atomically (readTBQueue sendQ) >>= sendMsg transport -- | 'recvLoop' shunts messages from the transport into the receive queue. recvLoop :: Transport -> Conn' -> IO () recvLoop transport Conn'{recvQ} = forever $ recvMsg transport >>= atomically . writeTBQueue recvQ -- | The coordinator processes incoming messages. coordinator :: Conn -> IO () -- The logic here mostly routes messages to other parts of the code that know -- more about the objects in question; See Note [Organization] for more info. coordinator conn@Conn{debugMode} = forever $ atomically $ do conn'@Conn'{recvQ} <- getLive conn flip catchSTM (throwSTM . makeAbortExn debugMode) $ do capnpMsg <- readTBQueue recvQ evalLimitT defaultLimit $ do root <- UntypedRaw.rootPtr capnpMsg >>= fromStruct msg' <- RawRpc.get_Message' root case msg' of RawRpc.Message'abort exn -> decerialize exn >>= lift . handleAbortMsg conn RawRpc.Message'unimplemented oldMsg -> decerialize oldMsg >>= lift . handleUnimplementedMsg conn RawRpc.Message'bootstrap bs -> decerialize bs >>= lift . handleBootstrapMsg conn RawRpc.Message'call call -> handleCallMsg conn call RawRpc.Message'return ret -> do ret' <- acceptReturn conn ret lift $ handleReturnMsg conn ret' RawRpc.Message'finish finish -> decerialize finish >>= lift . handleFinishMsg conn RawRpc.Message'resolve res -> decerialize res >>= lift . handleResolveMsg conn RawRpc.Message'release release -> decerialize release >>= lift . handleReleaseMsg conn RawRpc.Message'disembargo disembargo -> decerialize disembargo >>= lift . handleDisembargoMsg conn _ -> do msg <- decerialize root lift $ sendPureMsg conn' $ R.Message'unimplemented msg -- Each function handle*Msg handles a message of a particular type; -- 'coordinator' dispatches to these. handleAbortMsg :: Conn -> R.Exception -> STM () handleAbortMsg _ exn = throwSTM (ReceivedAbort exn) handleUnimplementedMsg :: Conn -> R.Message -> STM () handleUnimplementedMsg conn msg = getLive conn >>= \conn' -> case msg of R.Message'unimplemented _ -> -- If the client itself doesn't handle unimplemented messages, that's -- weird, but ultimately their problem. pure () R.Message'abort _ -> abortConn conn' $ eFailed $ "Your vat sent an 'unimplemented' message for an abort message " <> "that its remote peer never sent. This is likely a bug in your " <> "capnproto library." _ -> abortConn conn' $ eFailed "Received unimplemented response for required message." handleBootstrapMsg :: Conn -> R.Bootstrap -> STM () handleBootstrapMsg conn R.Bootstrap{ questionId } = getLive conn >>= \conn' -> do ret <- case bootstrap conn' of Nothing -> pure Return { answerId = QAId questionId , releaseParamCaps = True -- Not really meaningful for bootstrap, but... , union' = Return'exception $ eFailed "No bootstrap interface for this connection." } Just client -> do capDesc <- emitCap conn client content <- createPure defaultLimit $ do msg <- Message.newMessage Nothing cerialize msg $ Just (Untyped.PtrCap client) pure Return { answerId = QAId questionId , releaseParamCaps = True -- Not really meaningful for bootstrap, but... , union' = Return'results Payload { content , capTable = V.singleton (def :: R.CapDescriptor) { R.union' = capDesc } } } M.focus (Focus.alterM $ insertBootstrap conn' ret) (QAId questionId) (answers conn') sendReturn conn' ret where insertBootstrap _ ret Nothing = pure $ Just HaveReturn { returnMsg = ret , onFinish = SnocList.fromList [ \R.Finish{releaseResultCaps} -> case ret of Return { union' = Return'results Payload { capTable = (V.toList -> [ R.CapDescriptor { union' = R.CapDescriptor'receiverHosted (IEId -> eid) } ]) } } -> when releaseResultCaps $ releaseExport conn 1 eid _ -> pure () ] } insertBootstrap conn' _ (Just _) = abortConn conn' $ eFailed "Duplicate question ID" handleCallMsg :: Conn -> RawRpc.Call 'Const -> LimitT STM () handleCallMsg conn callMsg = do conn'@Conn'{exports, answers} <- lift $ getLive conn questionId <- RawRpc.get_Call'questionId callMsg target <- RawRpc.get_Call'target callMsg >>= decerialize interfaceId <- RawRpc.get_Call'interfaceId callMsg methodId <- RawRpc.get_Call'methodId callMsg payload <- RawRpc.get_Call'params callMsg Payload{content = callParams, capTable} <- acceptPayload conn payload lift $ do -- First, add an entry in our answers table: insertNewAbort "answer" conn' (QAId questionId) NewQA { onReturn = SnocList.empty , onFinish = SnocList.fromList [ \R.Finish{releaseResultCaps} -> when releaseResultCaps $ for_ capTable $ \R.CapDescriptor{union'} -> case union' of R.CapDescriptor'receiverHosted (IEId -> importId) -> releaseExport conn 1 importId _ -> pure () ] } answers -- Set up a callback for when the call is finished, to -- send the return message: fulfiller <- newCallback $ \case Left e -> returnAnswer conn' Return { answerId = QAId questionId , releaseParamCaps = False , union' = Return'exception e } Right content -> do capTable <- genSendableCapTableRaw conn content returnAnswer conn' Return { answerId = QAId questionId , releaseParamCaps = False , union' = Return'results Payload { content = content , capTable = capTable } } -- Package up the info for the call: let callInfo = Server.CallInfo { interfaceId , methodId , arguments = callParams , response = fulfiller } -- Finally, figure out where to send it: case target of R.MessageTarget'importedCap exportId -> lookupAbort "export" conn' exports (IEId exportId) $ \EntryE{client} -> void $ call callInfo $ Client $ Just client R.MessageTarget'promisedAnswer R.PromisedAnswer { questionId = targetQid, transform } -> let onReturn ret@Return{union'} = case union' of Return'exception _ -> returnAnswer conn' ret { answerId = QAId questionId } Return'canceled -> returnAnswer conn' ret { answerId = QAId questionId } Return'results Payload{content} -> void $ transformClient transform content conn' >>= call callInfo Return'resultsSentElsewhere -> -- our implementation should never actually do this, but -- this way we don't have to change this if/when we -- support the feature: abortConn conn' $ eFailed $ "Tried to call a method on a promised answer that " <> "returned resultsSentElsewhere" Return'takeFromOtherQuestion otherQid -> subscribeReturn "answer" conn' answers otherQid onReturn Return'acceptFromThirdParty _ -> -- Note [Level 3] error "BUG: our implementation unexpectedly used a level 3 feature" in subscribeReturn "answer" conn' answers (QAId targetQid) onReturn R.MessageTarget'unknown' ordinal -> abortConn conn' $ eUnimplemented $ "Unknown MessageTarget ordinal #" <> fromString (show ordinal) ptrPathClient :: MonadThrow m => [Word16] -> RawMPtr -> m Client ptrPathClient is ptr = evalLimitT defaultLimit $ followPtrs is ptr >>= ptrClient transformClient :: V.Vector R.PromisedAnswer'Op -> RawMPtr -> Conn' -> STM Client transformClient transform ptr conn = (unmarshalOps (V.toList transform) >>= flip ptrPathClient ptr) `catchSTM` abortConn conn ptrClient :: UntypedRaw.ReadCtx m 'Const => RawMPtr -> m Client ptrClient Nothing = pure nullClient ptrClient (Just (UntypedRaw.PtrCap cap)) = UntypedRaw.getClient cap ptrClient (Just _) = throwM $ eFailed "Tried to call method on non-capability." -- | Follow a series of pointer indicies, returning the final value, or 'Left' -- with an error if any of the pointers in the chain (except the last one) is -- a non-null non struct. followPtrs :: UntypedRaw.ReadCtx m 'Const => [Word16] -> RawMPtr -> m RawMPtr followPtrs [] ptr = pure ptr followPtrs (_:_) Nothing = pure Nothing followPtrs (i:is) (Just (UntypedRaw.PtrStruct struct)) = UntypedRaw.getPtr (fromIntegral i) struct >>= followPtrs is followPtrs (_:_) (Just _) = throwM $ eFailed "Tried to access pointer field of non-struct." sendRawMsg :: Conn' -> Message 'Const -> STM () sendRawMsg conn' = writeTBQueue (sendQ conn') sendCall :: Conn' -> Call -> STM () sendCall conn' Call{questionId, target, interfaceId, methodId, params=Payload{content, capTable}} = sendRawMsg conn' =<< createPure defaultLimit (do mcontent <- thaw content msg <- case mcontent of Just v -> pure $ UntypedRaw.message v Nothing -> Message.newMessage Nothing mcapTable <- cerialize msg capTable payload <- new msg RawRpc.set_Payload'content payload mcontent RawRpc.set_Payload'capTable payload mcapTable call <- new msg RawRpc.set_Call'params call payload RawRpc.set_Call'questionId call (qaWord questionId) RawRpc.set_Call'interfaceId call interfaceId RawRpc.set_Call'methodId call methodId tgt <- cerialize msg $ marshalMsgTarget target RawRpc.set_Call'target call tgt rpcMsg <- new msg RawRpc.set_Message'call rpcMsg call UntypedRaw.setRoot (toStruct rpcMsg) pure msg ) sendReturn :: Conn' -> Return -> STM () sendReturn conn' Return{answerId, releaseParamCaps, union'} = case union' of Return'results Payload{content, capTable} -> sendRawMsg conn' =<< createPure defaultLimit (do mcontent <- thaw content msg <- case mcontent of Just v -> pure $ UntypedRaw.message v Nothing -> Message.newMessage Nothing mcapTable <- cerialize msg capTable payload <- new msg RawRpc.set_Payload'content payload mcontent RawRpc.set_Payload'capTable payload mcapTable ret <- new msg RawRpc.set_Return'results ret payload RawRpc.set_Return'answerId ret (qaWord answerId) RawRpc.set_Return'releaseParamCaps ret releaseParamCaps rpcMsg <- new msg RawRpc.set_Message'return rpcMsg ret UntypedRaw.setRoot (toStruct rpcMsg) pure msg ) Return'exception exn -> sendPureMsg conn' $ R.Message'return R.Return { answerId = qaWord answerId , releaseParamCaps , union' = R.Return'exception exn } Return'canceled -> sendPureMsg conn' $ R.Message'return R.Return { answerId = qaWord answerId , releaseParamCaps , union' = R.Return'canceled } Return'resultsSentElsewhere -> sendPureMsg conn' $ R.Message'return R.Return { answerId = qaWord answerId , releaseParamCaps , union' = R.Return'resultsSentElsewhere } Return'takeFromOtherQuestion (QAId qid) -> sendPureMsg conn' $ R.Message'return R.Return { answerId = qaWord answerId , releaseParamCaps , union' = R.Return'takeFromOtherQuestion qid } Return'acceptFromThirdParty ptr -> sendRawMsg conn' =<< createPure defaultLimit (do mptr <- thaw ptr msg <- case mptr of Just v -> pure $ UntypedRaw.message v Nothing -> Message.newMessage Nothing ret <- new msg RawRpc.set_Return'answerId ret (qaWord answerId) RawRpc.set_Return'releaseParamCaps ret releaseParamCaps RawRpc.set_Return'acceptFromThirdParty ret mptr rpcMsg <- new msg RawRpc.set_Message'return rpcMsg ret UntypedRaw.setRoot (toStruct rpcMsg) pure msg ) acceptReturn :: Conn -> RawRpc.Return 'Const -> LimitT STM Return acceptReturn conn ret = do answerId <- QAId <$> RawRpc.get_Return'answerId ret releaseParamCaps <- RawRpc.get_Return'releaseParamCaps ret ret' <- RawRpc.get_Return' ret union' <- case ret' of RawRpc.Return'results payload -> Return'results <$> acceptPayload conn payload RawRpc.Return'exception exn -> Return'exception <$> decerialize exn RawRpc.Return'canceled -> pure Return'canceled RawRpc.Return'resultsSentElsewhere -> pure Return'resultsSentElsewhere RawRpc.Return'takeFromOtherQuestion id -> pure $ Return'takeFromOtherQuestion (QAId id) RawRpc.Return'acceptFromThirdParty ptr -> pure $ Return'acceptFromThirdParty ptr RawRpc.Return'unknown' ordinal -> lift $ throwSTM $ eFailed $ "Unknown return variant #" <> fromString (show ordinal) pure Return { answerId, releaseParamCaps, union' } handleReturnMsg :: Conn -> Return -> STM () handleReturnMsg conn ret = getLive conn >>= \conn'@Conn'{questions} -> updateQAReturn conn' questions "question" ret handleFinishMsg :: Conn -> R.Finish -> STM () handleFinishMsg conn finish = getLive conn >>= \conn'@Conn'{answers} -> updateQAFinish conn' answers "answer" finish handleResolveMsg :: Conn -> R.Resolve -> STM () handleResolveMsg conn R.Resolve{promiseId, union'} = getLive conn >>= \conn'@Conn'{imports} -> do entry <- M.lookup (IEId promiseId) imports case entry of Nothing -> -- This can happen if we dropped the promise, but the release -- message is still in flight when the resolve message is sent. case union' of R.Resolve'cap R.CapDescriptor{union' = R.CapDescriptor'receiverHosted importId} -> -- Send a release message for the resolved cap, since -- we're not going to use it: sendPureMsg conn' $ R.Message'release def { R.id = importId , R.referenceCount = 1 } -- Note [Level 3]: do we need to do something with -- thirdPartyHosted here? _ -> pure () Just EntryI{ promiseState = Nothing } -> -- This wasn't a promise! The remote vat has done something wrong; -- abort the connection. abortConn conn' $ eFailed $ mconcat [ "Received a resolve message for export id #", fromString (show promiseId) , ", but that capability is not a promise!" ] Just EntryI { promiseState = Just (tvar, tmpDest) } -> case union' of R.Resolve'cap R.CapDescriptor{union' = cap} -> do client <- acceptCap conn cap resolveClientClient tmpDest (writeTVar tvar) client R.Resolve'exception exn -> resolveClientExn tmpDest (writeTVar tvar) exn R.Resolve'unknown' tag -> abortConn conn' $ eUnimplemented $ mconcat [ "Resolve variant #" , fromString (show tag) , " not understood" ] handleReleaseMsg :: Conn -> R.Release -> STM () handleReleaseMsg conn R.Release { id=(IEId -> eid) , referenceCount=refCountDiff } = releaseExport conn refCountDiff eid releaseExport :: Conn -> Word32 -> IEId -> STM () releaseExport conn refCountDiff eid = getLive conn >>= \conn'@Conn'{exports} -> lookupAbort "export" conn' exports eid $ \EntryE{client, refCount=oldRefCount} -> case compare oldRefCount refCountDiff of LT -> abortConn conn' $ eFailed $ "Received release for export with referenceCount " <> "greater than our recorded total ref count." EQ -> dropConnExport conn client GT -> M.insert EntryE { client , refCount = oldRefCount - refCountDiff } eid exports handleDisembargoMsg :: Conn -> R.Disembargo -> STM () handleDisembargoMsg conn d = getLive conn >>= go d where go R.Disembargo { context=R.Disembargo'context'receiverLoopback (EmbargoId -> eid) } conn'@Conn'{embargos} = do result <- M.lookup eid embargos case result of Nothing -> abortConn conn' $ eFailed $ "No such embargo: " <> fromString (show $ embargoWord eid) Just fulfiller -> do queueSTM conn' (fulfill fulfiller ()) M.delete eid embargos freeEmbargo conn' eid go R.Disembargo{ target, context=R.Disembargo'context'senderLoopback embargoId } conn'@Conn'{exports, answers} = case target of R.MessageTarget'importedCap exportId -> lookupAbort "export" conn' exports (IEId exportId) $ \EntryE{ client } -> disembargoPromise client R.MessageTarget'promisedAnswer R.PromisedAnswer{ questionId, transform } -> lookupAbort "answer" conn' answers (QAId questionId) $ \case HaveReturn { returnMsg=Return{union'=Return'results Payload{content} } } -> transformClient transform content conn' >>= \case Client (Just client') -> disembargoClient client' Client Nothing -> abortDisembargo "targets a null capability" _ -> abortDisembargo $ "does not target an answer which has resolved to a value hosted by" <> " the sender." R.MessageTarget'unknown' ordinal -> abortConn conn' $ eUnimplemented $ "Unknown MessageTarget ordinal #" <> fromString (show ordinal) where disembargoPromise PromiseClient{ pState } = readTVar pState >>= \case Ready (Client (Just client)) -> disembargoClient client Ready (Client Nothing) -> abortDisembargo "targets a promise which resolved to null." _ -> abortDisembargo "targets a promise which has not resolved." disembargoPromise _ = abortDisembargo "targets something that is not a promise." disembargoClient (ImportClient cell) = do client <- Fin.readCell cell case client of ImportRef {conn=targetConn, importId} | conn == targetConn -> sendPureMsg conn' $ R.Message'disembargo R.Disembargo { context = R.Disembargo'context'receiverLoopback embargoId , target = R.MessageTarget'importedCap (ieWord importId) } _ -> abortDisembargoClient disembargoClient _ = abortDisembargoClient abortDisembargoClient = abortDisembargo $ "targets a promise which has not resolved to a capability" <> " hosted by the sender." abortDisembargo info = abortConn conn' $ eFailed $ mconcat [ "Disembargo #" , fromString (show embargoId) , " with context = senderLoopback " , info ] -- Note [Level 3] go d conn' = sendPureMsg conn' $ R.Message'unimplemented $ R.Message'disembargo d lookupAbort :: (Eq k, Hashable k, Show k) => Text -> Conn' -> M.Map k v -> k -> (v -> STM a) -> STM a lookupAbort keyTypeName conn m key f = do result <- M.lookup key m case result of Just val -> f val Nothing -> abortConn conn $ eFailed $ mconcat [ "No such " , keyTypeName , ": " , fromString (show key) ] -- | @'insertNewAbort' keyTypeName conn key value stmMap@ inserts a key into a -- map, aborting the connection if it is already present. @keyTypeName@ will be -- used in the error message sent to the remote vat. insertNewAbort :: (Eq k, Hashable k) => Text -> Conn' -> k -> v -> M.Map k v -> STM () insertNewAbort keyTypeName conn key value = M.focus (Focus.alterM $ \case Just _ -> abortConn conn $ eFailed $ "duplicate entry in " <> keyTypeName <> " table." Nothing -> pure (Just value) ) key -- | Generate a cap table describing the capabilities reachable from the given -- pointer. The capability table will be correct for any message where all of -- the capabilities are within the subtree under the pointer. genSendableCapTableRaw :: Conn -> Maybe (UntypedRaw.Ptr 'Const) -> STM (V.Vector R.CapDescriptor) genSendableCapTableRaw _ Nothing = pure V.empty genSendableCapTableRaw conn (Just ptr) = traverse (\c -> do union' <- emitCap conn c pure (def :: R.CapDescriptor) { R.union' = union' } ) (Message.getCapTable (UntypedRaw.message ptr)) -- | Convert the pointer into a Payload, including a capability table for -- the clients in the pointer's cap table. makeOutgoingPayload :: Conn -> RawMPtr -> STM Payload makeOutgoingPayload conn content = do capTable <- genSendableCapTableRaw conn content pure Payload { content, capTable } sendPureMsg :: Conn' -> R.Message -> STM () sendPureMsg Conn'{sendQ} msg = createPure maxBound (valueToMsg msg) >>= writeTBQueue sendQ -- | Send a finish message, updating connection state and triggering -- callbacks as necessary. finishQuestion :: Conn' -> R.Finish -> STM () finishQuestion conn@Conn'{questions} finish@R.Finish{questionId} = do -- arrange for the question ID to be returned to the pool once -- the return has also been received: subscribeReturn "question" conn questions (QAId questionId) $ \_ -> freeQuestion conn (QAId questionId) sendPureMsg conn $ R.Message'finish finish updateQAFinish conn questions "question" finish -- | Send a return message, update the corresponding entry in our -- answers table, and queue any registered callbacks. Calls 'error' -- if the answerId is not in the table, or if we've already sent a -- return for this answer. returnAnswer :: Conn' -> Return -> STM () returnAnswer conn@Conn'{answers} ret = do sendReturn conn ret updateQAReturn conn answers "answer" ret -- TODO(cleanup): updateQAReturn/Finish have a lot in common; can we refactor? updateQAReturn :: Conn' -> M.Map QAId EntryQA -> Text -> Return -> STM () updateQAReturn conn table tableName ret@Return{answerId} = lookupAbort tableName conn table answerId $ \case NewQA{onFinish, onReturn} -> do M.insert HaveReturn { returnMsg = ret , onFinish } answerId table traverse_ ($ ret) onReturn HaveFinish{onReturn} -> do M.delete answerId table traverse_ ($ ret) onReturn HaveReturn{} -> abortConn conn $ eFailed $ "Duplicate return message for " <> tableName <> " #" <> fromString (show answerId) updateQAFinish :: Conn' -> M.Map QAId EntryQA -> Text -> R.Finish -> STM () updateQAFinish conn table tableName finish@R.Finish{questionId} = lookupAbort tableName conn table (QAId questionId) $ \case NewQA{onFinish, onReturn} -> do traverse_ ($ finish) onFinish M.insert HaveFinish { finishMsg = finish , onReturn } (QAId questionId) table HaveReturn{onFinish} -> do traverse_ ($ finish) onFinish M.delete (QAId questionId) table HaveFinish{} -> abortConn conn $ eFailed $ "Duplicate finish message for " <> tableName <> " #" <> fromString (show questionId) -- | Update an entry in the questions or answers table to queue the given -- callback when the return message for that answer comes in. If the return -- has already arrived, the callback is queued immediately. -- -- If the entry already has other callbacks registered, this callback is -- run *after* the others (see Note [callbacks]). Note that this is an -- important property, as it is necessary to preserve E-order if the -- callbacks are successive method calls on the returned object. subscribeReturn :: Text -> Conn' -> M.Map QAId EntryQA -> QAId -> (Return -> STM ()) -> STM () subscribeReturn tableName conn table qaId onRet = lookupAbort tableName conn table qaId $ \qa -> do new <- go qa M.insert new qaId table where go = \case NewQA{onFinish, onReturn} -> pure NewQA { onFinish , onReturn = SnocList.snoc onReturn onRet } HaveFinish{finishMsg, onReturn} -> pure HaveFinish { finishMsg , onReturn = SnocList.snoc onReturn onRet } val@HaveReturn{returnMsg} -> do queueSTM conn (onRet returnMsg) pure val -- | Abort the connection, sending an abort message. This is only safe to call -- from within either the thread running the coordinator or the callback loop. abortConn :: Conn' -> R.Exception -> STM a abortConn _ e = throwSTM (SentAbort e) -- | Gets the live connection state, or throws disconnected if it is not live. getLive :: Conn -> STM Conn' getLive Conn{liveState} = readTVar liveState >>= \case Live conn' -> pure conn' Dead -> throwSTM eDisconnected -- | Performs an action with the live connection state. Does nothing if the -- connection is dead. whenLive :: Conn -> (Conn' -> STM ()) -> STM () whenLive Conn{liveState} f = readTVar liveState >>= \case Live conn' -> f conn' Dead -> pure () -- | Request the remote vat's bootstrap interface. requestBootstrap :: Conn -> STM Client requestBootstrap conn@Conn{liveState} = readTVar liveState >>= \case Dead -> pure nullClient Live conn'@Conn'{questions} -> do qid <- newQuestion conn' let tmpDest = RemoteDest AnswerDest { conn , answer = PromisedAnswer { answerId = qid , transform = SnocList.empty } } pState <- newTVar Pending { tmpDest } sendPureMsg conn' $ R.Message'bootstrap def { R.questionId = qaWord qid } M.insert NewQA { onReturn = SnocList.fromList [ resolveClientReturn tmpDest (writeTVar pState) conn' [] , \_ -> finishQuestion conn' R.Finish { questionId = qaWord qid , releaseResultCaps = False } ] , onFinish = SnocList.empty } qid questions exportMap <- ExportMap <$> M.new pure $ Client $ Just PromiseClient { pState , exportMap , origTarget = tmpDest } -- Note [resolveClient] -- ==================== -- -- There are several functions resolveClient*, each of which resolves a -- 'PromiseClient', which will previously have been in the 'Pending' state. -- Each function accepts three parameters: the 'TmpDest' that the -- pending promise had been targeting, a function for setting the new state, -- and a thing to resolve the promise to. The type of the latter is specific -- to each function. -- | Resolve a promised client to an exception. See Note [resolveClient] resolveClientExn :: TmpDest -> (PromiseState -> STM ()) -> R.Exception -> STM () resolveClientExn tmpDest resolve exn = do case tmpDest of LocalDest LocalBuffer { callBuffer } -> do calls <- flushTQueue callBuffer traverse_ (\Server.CallInfo{response} -> breakPromise response exn) calls RemoteDest AnswerDest {} -> pure () RemoteDest (ImportDest _) -> pure () resolve $ Error exn -- Resolve a promised client to a pointer. If it is a non-null non-capability -- pointer, it resolves to an exception. See Note [resolveClient] resolveClientPtr :: TmpDest -> (PromiseState -> STM ()) -> RawMPtr -> STM () resolveClientPtr tmpDest resolve ptr = case ptr of Nothing -> resolveClientClient tmpDest resolve nullClient Just (UntypedRaw.PtrCap c) -> do c' <- evalLimitT defaultLimit $ UntypedRaw.getClient c resolveClientClient tmpDest resolve c' Just _ -> resolveClientExn tmpDest resolve $ eFailed "Promise resolved to non-capability pointer" -- | Resolve a promised client to another client. See Note [resolveClient] resolveClientClient :: TmpDest -> (PromiseState -> STM ()) -> Client -> STM () resolveClientClient tmpDest resolve (Client client) = case (client, tmpDest) of -- Remote resolved to local; we need to embargo: ( Just LocalClient{}, RemoteDest dest ) -> disembargoAndResolve dest ( Just PromiseClient { origTarget=LocalDest _ }, RemoteDest dest) -> disembargoAndResolve dest ( Nothing, RemoteDest dest ) -> -- It's not clear to me what we should actually do if the promise -- resolves to nullClient, but this can be encoded at the protocol -- level, so we have to deal with it. Possible options: -- -- 1. Perhaps this is simply illegal, and we should send an abort? -- 2. Treat it as resolving to a local promise, in which case we -- need to send a disembargo as above. -- 3. Treat is as resolving to a remote promise, in which case we -- can't send an embargo. -- -- (3) doesn't seem possible to implement quite correctly, since -- if we just resolve to nullClient right away, further calls will -- start returning exceptions before outstanding calls return -- we -- really do want to send a disembargo, but we can't because the -- protocol insists that we don't if the promise resolves to a -- remote cap. -- -- What we currently do is (2); I(zenhack) intend to ask for -- clarification on the mailing list. disembargoAndResolve dest -- Local promises never need embargos; we can just forward: ( _, LocalDest LocalBuffer { callBuffer } ) -> flushAndResolve callBuffer -- These cases are slightly subtle; despite resolving to a -- client that points at a "remote" target, if it points into a -- _different_ connection, we must be proxying it, so we treat -- it as local and do a disembargo like above. We may need to -- change this when we implement level 3, since third-party -- handoff is a possibility; see Note [Level 3]. -- -- If it's pointing into the same connection, we don't need to -- do a disembargo. ( Just PromiseClient { origTarget=RemoteDest newDest }, RemoteDest oldDest ) -> do newConn <- destConn newDest oldConn <- destConn oldDest if newConn == oldConn then resolveNow else disembargoAndResolve oldDest ( Just (ImportClient cell), RemoteDest oldDest ) -> do ImportRef { conn=newConn } <- Fin.readCell cell oldConn <- destConn oldDest if newConn == oldConn then resolveNow else disembargoAndResolve oldDest where destConn AnswerDest { conn } = pure conn destConn (ImportDest cell) = do ImportRef { conn } <- Fin.readCell cell pure conn destTarget AnswerDest { answer } = pure $ AnswerTgt answer destTarget (ImportDest cell) = do ImportRef { importId } <- Fin.readCell cell pure $ ImportTgt importId resolveNow = do resolve $ Ready (Client client) -- Flush the call buffer into the client's queue, and then pass the client -- to resolve. flushAndResolve callBuffer = do flushTQueue callBuffer >>= traverse_ (`call` Client client) resolve $ Ready (Client client) flushAndRaise callBuffer e = flushTQueue callBuffer >>= traverse_ (\Server.CallInfo{response} -> breakPromise response e) disembargoAndResolve dest = do Conn{liveState} <- destConn dest readTVar liveState >>= \case Live conn' -> do callBuffer <- newTQueue target <- destTarget dest disembargo conn' target $ \case Right () -> flushAndResolve callBuffer Left e -> flushAndRaise callBuffer e resolve $ Embargo { callBuffer } Dead -> resolveClientExn tmpDest resolve eDisconnected -- | Send a (senderLoopback) disembargo to the given message target, and -- register the transaction to run when the corresponding receiverLoopback -- message is received. -- -- The callback may be handed a 'Left' with a disconnected exception if -- the connection is dropped before the disembargo is echoed. disembargo :: Conn' -> MsgTarget -> (Either R.Exception () -> STM ()) -> STM () disembargo conn@Conn'{embargos} tgt onEcho = do callback <- newCallback onEcho eid <- newEmbargo conn M.insert callback eid embargos sendPureMsg conn $ R.Message'disembargo R.Disembargo { target = marshalMsgTarget tgt , context = R.Disembargo'context'senderLoopback (embargoWord eid) } -- | Resolve a promised client to the result of a return. See Note [resolveClient] -- -- The [Word16] is a list of pointer indexes to follow from the result. resolveClientReturn :: TmpDest -> (PromiseState -> STM ()) -> Conn' -> [Word16] -> Return -> STM () resolveClientReturn tmpDest resolve conn@Conn'{answers} transform Return { union' } = case union' of -- TODO(cleanup) there is a lot of redundency betwen this and cbCallReturn; can -- we refactor? Return'exception exn -> resolveClientExn tmpDest resolve exn Return'results Payload{ content } -> do res <- try $ evalLimitT defaultLimit $ followPtrs transform content case res of Right v -> resolveClientPtr tmpDest resolve v Left e -> resolveClientExn tmpDest resolve e Return'canceled -> resolveClientExn tmpDest resolve $ eFailed "Canceled" Return'resultsSentElsewhere -> -- Should never happen; we don't set sendResultsTo to anything other than -- caller. abortConn conn $ eFailed $ mconcat [ "Received Return.resultsSentElsewhere for a call " , "with sendResultsTo = caller." ] Return'takeFromOtherQuestion qid -> subscribeReturn "answer" conn answers qid $ resolveClientReturn tmpDest resolve conn transform Return'acceptFromThirdParty _ -> -- Note [Level 3] abortConn conn $ eUnimplemented "This vat does not support level 3." -- | Get the client's export ID for this connection, or allocate a new one if needed. -- If this is the first time this client has been exported on this connection, -- bump the refcount. getConnExport :: Conn -> Client' -> STM IEId getConnExport conn client = getLive conn >>= \conn'@Conn'{exports} -> do ExportMap m <- clientExportMap client val <- M.lookup conn m case val of Just eid -> do addBumpExport eid client exports pure eid Nothing -> do eid <- newExport conn' addBumpExport eid client exports M.insert eid conn m pure eid -- | Remove export of the client on the connection. This entails removing it -- from the export id, removing the connection from the client's ExportMap, -- freeing the export id, and dropping the client's refcount. dropConnExport :: Conn -> Client' -> STM () dropConnExport conn client' = do ExportMap eMap <- clientExportMap client' val <- M.lookup conn eMap case val of Just eid -> do M.delete conn eMap whenLive conn $ \conn'@Conn'{exports} -> do M.delete eid exports freeExport conn' eid Nothing -> error "BUG: tried to drop an export that doesn't exist." clientExportMap :: Client' -> STM ExportMap clientExportMap LocalClient{exportMap} = pure exportMap clientExportMap PromiseClient{exportMap} = pure exportMap clientExportMap (ImportClient cell) = do ImportRef{proxies} <- Fin.readCell cell pure proxies -- | insert the client into the exports table, bumping the refcount if it is -- already there. If a different client is already in the table at the same -- id, call 'error'. addBumpExport :: IEId -> Client' -> M.Map IEId EntryE -> STM () addBumpExport exportId client = M.focus (Focus.alter go) exportId where go Nothing = Just EntryE { client, refCount = 1 } go (Just EntryE{ client = oldClient, refCount } ) | client /= oldClient = error $ "BUG: addExportRef called with a client that is different " ++ "from what is already in our exports table." | otherwise = Just EntryE { client, refCount = refCount + 1 } -- | Generate a CapDescriptor', which we can send to the connection's remote -- vat to identify client. In the process, this may allocate export ids, update -- reference counts, and so forth. emitCap :: Conn -> Client -> STM R.CapDescriptor' emitCap _targetConn (Client Nothing) = pure R.CapDescriptor'none emitCap targetConn (Client (Just client')) = case client' of LocalClient{} -> R.CapDescriptor'senderHosted . ieWord <$> getConnExport targetConn client' PromiseClient{ pState } -> readTVar pState >>= \case Pending { tmpDest = RemoteDest AnswerDest { conn, answer } } | conn == targetConn -> pure $ R.CapDescriptor'receiverAnswer (marshalPromisedAnswer answer) Pending { tmpDest = RemoteDest (ImportDest cell) } -> do ImportRef { conn, importId = IEId iid } <- Fin.readCell cell if conn == targetConn then pure (R.CapDescriptor'receiverHosted iid) else newSenderPromise _ -> newSenderPromise ImportClient cell -> do ImportRef { conn=hostConn, importId } <- Fin.readCell cell if hostConn == targetConn then pure (R.CapDescriptor'receiverHosted (ieWord importId)) else R.CapDescriptor'senderHosted . ieWord <$> getConnExport targetConn client' where newSenderPromise = R.CapDescriptor'senderPromise . ieWord <$> getConnExport targetConn client' acceptPayload :: Conn -> RawRpc.Payload 'Const -> LimitT STM Payload acceptPayload conn payload = do capTable <- RawRpc.get_Payload'capTable payload >>= decerialize clients <- lift $ traverse (\R.CapDescriptor{union'} -> acceptCap conn union') capTable content <- RawRpc.get_Payload'content payload >>= traverse (UntypedRaw.tMsg (pure . Message.withCapTable clients)) pure Payload {content, capTable} -- | 'acceptCap' is a dual of 'emitCap'; it derives a Client from a CapDescriptor' -- received via the connection. May update connection state as necessary. acceptCap :: Conn -> R.CapDescriptor' -> STM Client acceptCap conn cap = getLive conn >>= \conn' -> go conn' cap where go _ R.CapDescriptor'none = pure (Client Nothing) go conn'@Conn'{imports} (R.CapDescriptor'senderHosted (IEId -> importId)) = do entry <- M.lookup importId imports case entry of Just EntryI{ promiseState=Just _ } -> let imp = fromString (show importId) in abortConn conn' $ eFailed $ "received senderHosted capability #" <> imp <> ", but the imports table says #" <> imp <> " is senderPromise." Just ent@EntryI{ localRc, remoteRc, proxies } -> do Rc.incr localRc M.insert ent { localRc, remoteRc = remoteRc + 1 } importId imports cell <- Fin.newCell ImportRef { conn , importId , proxies } queueIO conn' $ Fin.addFinalizer cell $ atomically $ Rc.decr localRc pure $ Client $ Just $ ImportClient cell Nothing -> Client . Just . ImportClient <$> newImport importId conn Nothing go conn'@Conn'{imports} (R.CapDescriptor'senderPromise (IEId -> importId)) = do entry <- M.lookup importId imports case entry of Just EntryI { promiseState=Nothing } -> let imp = fromString (show importId) in abortConn conn' $ eFailed $ "received senderPromise capability #" <> imp <> ", but the imports table says #" <> imp <> " is senderHosted." Just ent@EntryI { remoteRc, proxies, promiseState=Just (pState, origTarget) } -> do M.insert ent { remoteRc = remoteRc + 1 } importId imports pure $ Client $ Just PromiseClient { pState , exportMap = proxies , origTarget } Nothing -> do rec imp <- newImport importId conn (Just (pState, tmpDest)) ImportRef{proxies} <- Fin.readCell imp let tmpDest = RemoteDest (ImportDest imp) pState <- newTVar Pending { tmpDest } pure $ Client $ Just PromiseClient { pState , exportMap = proxies , origTarget = tmpDest } go conn'@Conn'{exports} (R.CapDescriptor'receiverHosted exportId) = lookupAbort "export" conn' exports (IEId exportId) $ \EntryE{client} -> pure $ Client $ Just client go conn' (R.CapDescriptor'receiverAnswer pa) = do pa <- unmarshalPromisedAnswer pa `catchSTM` abortConn conn' newLocalAnswerClient conn' pa go conn' (R.CapDescriptor'thirdPartyHosted _) = -- Note [Level 3] abortConn conn' $ eUnimplemented "thirdPartyHosted unimplemented; level 3 is not supported." go conn' (R.CapDescriptor'unknown' tag) = abortConn conn' $ eUnimplemented $ "Unimplemented CapDescriptor variant #" <> fromString (show tag) -- | Create a new entry in the imports table, with the given import id and -- 'promiseState', and return a corresponding ImportRef. When the ImportRef is -- garbage collected, the refcount in the table will be decremented. newImport :: IEId -> Conn -> Maybe (TVar PromiseState, TmpDest) -> STM (Fin.Cell ImportRef) newImport importId conn promiseState = getLive conn >>= \conn'@Conn'{imports} -> do localRc <- Rc.new () $ releaseImport importId conn' proxies <- ExportMap <$> M.new let importRef = ImportRef { conn , importId , proxies } M.insert EntryI { localRc , remoteRc = 1 , proxies , promiseState } importId imports cell <- Fin.newCell importRef queueIO conn' $ Fin.addFinalizer cell $ atomically $ Rc.decr localRc pure cell -- | Release the identified import. Removes it from the table and sends a release -- message with the correct count. releaseImport :: IEId -> Conn' -> STM () releaseImport importId conn'@Conn'{imports} = do lookupAbort "imports" conn' imports importId $ \EntryI { remoteRc } -> sendPureMsg conn' $ R.Message'release R.Release { id = ieWord importId , referenceCount = remoteRc } M.delete importId imports -- | Create a new client targeting an object in our answers table. -- Important: in this case the 'PromisedAnswer' refers to a question we -- have recevied, not sent. newLocalAnswerClient :: Conn' -> PromisedAnswer -> STM Client newLocalAnswerClient conn@Conn'{answers} PromisedAnswer{ answerId, transform } = do callBuffer <- newTQueue let tmpDest = LocalDest $ LocalBuffer { callBuffer } pState <- newTVar Pending { tmpDest } subscribeReturn "answer" conn answers answerId $ resolveClientReturn tmpDest (writeTVar pState) conn (toList transform) exportMap <- ExportMap <$> M.new pure $ Client $ Just PromiseClient { pState , exportMap , origTarget = tmpDest } -- Note [Limiting resource usage] -- ============================== -- -- N.B. much of this Note is future tense; the library is not yet robust against -- resource useage attacks. -- -- We employ various strategies to prevent remote vats from causing excessive -- resource usage. In particular: -- -- * We set a maximum size for incoming messages; this is in keeping with how -- we mitigate these concerns when dealing with plain capnp data (i.e. not -- rpc). -- * We set a limit on the total *size* of all messages from the remote vat that -- are currently being serviced. For example, if a Call message comes in, -- we note its size, and deduct it from the quota. Once we have sent a return -- and received a finish for this call, and thus can safely forget about it, -- we remove it from our answers table, and add its size back to the available -- quota. -- -- Still TBD: -- -- * We should come up with some way of guarding against too many intra-vat calls; -- depending on the object graph, it may be possible for an attacker to get us -- to "eat our own tail" so to speak. -- -- Ideas: -- * Per-object bounded queues for messages -- * Global limit on intra-vat calls. -- -- Right now I(zenhack) am more fond of the former. -- -- * What should we actually do when limits are exceeded? -- -- Possible strategies: -- * Block -- * Throw an 'overloaded' exception -- * Some combination of the two; block with a timeout, then throw. -- -- If we just block, we need to make sure this doesn't hang the vat; -- we probably need a timeout at some level.