Copyright | (c) Dong Han 2019 |
---|---|
License | BSD |
Maintainer | winterland1989@gmail.com |
Stability | experimental |
Portability | non-portable |
Safe Haskell | None |
Language | Haskell2010 |
This module provides MessagePack-RPC implementation.
Synopsis
- type ServerLoop = (UVStream -> IO ()) -> IO ()
- type ServerService a = Text -> Maybe (ServerHandler a)
- data ServerHandler a where
- CallHandler :: (MessagePack req, MessagePack res) => (SessionCtx a -> req -> IO res) -> ServerHandler a
- NotifyHandler :: MessagePack req => (SessionCtx a -> req -> IO ()) -> ServerHandler a
- StreamHandler :: (MessagePack req, MessagePack res) => (SessionCtx a -> req -> IO (Source res)) -> ServerHandler a
- data SessionCtx a
- readSessionCtx :: SessionCtx a -> IO (Maybe a)
- writeSessionCtx :: SessionCtx a -> a -> IO ()
- clearSessionCtx :: SessionCtx a -> IO ()
- modifySessionCtx :: SessionCtx a -> (a -> Maybe a) -> IO ()
- serveRPC :: ServerLoop -> ServerService a -> IO ()
- serveRPC' :: ServerLoop -> Int -> Int -> ServerService a -> IO ()
- simpleRouter :: [(Text, ServerHandler a)] -> ServerService a
- data Client = Client {}
- rpcClient :: (Input dev, Output dev) => dev -> IO Client
- rpcClient' :: (Input i, Output o) => i -> o -> Int -> Int -> IO Client
- call :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO res
- notify :: (MessagePack req, HasCallStack) => Client -> Text -> req -> IO ()
- type PipelineId = Int
- type PipelineResult = FlatIntMap Value
- callPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO PipelineId
- notifyPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO ()
- execPipeline :: HasCallStack => Client -> IO PipelineResult
- fetchPipeline :: HasCallStack => MessagePack res => PipelineId -> PipelineResult -> IO res
- callStream :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO (IO (), Source res)
- data Request a
- data RPCException
Example
import Data.Maybe import Z.IO.RPC.MessagePack import Z.IO.Network import Data.IORef import Z.IO import qualified Z.Data.Text as T import qualified Z.Data.Vector as V newtype ServerCtx = ServerCtx { counter :: Int } main = serveRPC (startTCPServer defaultTCPServerConfig) $ simpleRouter [ ("hi", CallHandler $ \ctx (req :: T.Text) -> do writeSessionCtx ctx (ServerCtx 0) return ("hello, " <> req) ) , ("foo", CallHandler $ \ctx (req :: Int) -> do modifySessionCtx ctx (Just . ServerCtx . (+ 1) . counter) return (req + 1) ) , ("bar", CallHandler $ \ctx (req :: T.Text) -> do counter . fromJust <$> readSessionCtx ctx ) , ("qux", StreamHandler $ \ctx (_ :: ()) -> do withMVar stdinBuf (pure . sourceFromBuffered) ) ]
import Data.Maybe import Z.IO.RPC.MessagePack import Z.IO.Network import Data.IORef import Z.IO import qualified Z.Data.Text as T import qualified Z.Data.Vector as V main = withResource (initTCPClient defaultTCPClientConfig) $ \ uvs -> do c <- rpcClient uvs -- single call r <- call @T.Text @T.Text c "hi" "Alice" print r _ <- call @Int @Int c "foo" 1 _ <- call @Int @Int c "foo" 1 x <- call @T.Text @Int c "bar" "" print x -- streaming result (_, src) <- callStream c "qux" () runBIO $ src >|> sinkToIO (\ b -> withMVar stdoutBuf (\ bo -> do writeBuffer bo b flushBuffer bo))
Server
type ServerService a = Text -> Maybe (ServerHandler a) Source #
data ServerHandler a where Source #
CallHandler :: (MessagePack req, MessagePack res) => (SessionCtx a -> req -> IO res) -> ServerHandler a | |
NotifyHandler :: MessagePack req => (SessionCtx a -> req -> IO ()) -> ServerHandler a | |
StreamHandler :: (MessagePack req, MessagePack res) => (SessionCtx a -> req -> IO (Source res)) -> ServerHandler a |
data SessionCtx a Source #
readSessionCtx :: SessionCtx a -> IO (Maybe a) Source #
writeSessionCtx :: SessionCtx a -> a -> IO () Source #
clearSessionCtx :: SessionCtx a -> IO () Source #
modifySessionCtx :: SessionCtx a -> (a -> Maybe a) -> IO () Source #
Try to modify SessionCtx
if it has.
Note that you can set the modifier function to return Nothing to clear SessionCtx.
serveRPC :: ServerLoop -> ServerService a -> IO () Source #
Serve a RPC service.
:: ServerLoop | |
-> Int | recv buffer size |
-> Int | send buffer size |
-> ServerService a | |
-> IO () |
Serve a RPC service with more control.
simpleRouter :: [(Text, ServerHandler a)] -> ServerService a Source #
Simple router using FlatMap
, lookup name in O(log(N)).
import Z.IO.PRC.MessagePack import Z.IO.Network import Z.IO serveRPC (startTCPServer defaultTCPServerConfig) . simpleRouter $ [ ("foo", CallHandler $ \ ctx req -> do ... ) , ("bar", CallHandler $ \ ctx req -> do ... ) ]
Client
rpcClient :: (Input dev, Output dev) => dev -> IO Client Source #
Open a RPC client from input/output device.
Open a RPC client with more control.
call :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO res Source #
Send a single RPC call and get result.
notify :: (MessagePack req, HasCallStack) => Client -> Text -> req -> IO () Source #
Send a single notification RPC call without getting result.
Pipeline
type PipelineId = Int Source #
type PipelineResult = FlatIntMap Value Source #
callPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO PipelineId Source #
Make a call inside a pipeline, which will be sent in batch when execPipeline
.
... fooId <- callPipeline client "foo" $ ... barId <- callPipeline client "bar" $ ... notifyPipeline client "qux" $ ... r <- execPipeline client fooResult <- fetchPipeline fooId r barResult <- fetchPipeline barId r
notifyPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO () Source #
Make a notify inside a pipeline, which will be sent in batch when execPipeline
.
Notify calls doesn't affect execution's result.
execPipeline :: HasCallStack => Client -> IO PipelineResult Source #
Sent request in batch and get result in a map identified by PipelineId
.
fetchPipeline :: HasCallStack => MessagePack res => PipelineId -> PipelineResult -> IO res Source #
Use the PipelineId
returned when callPipeline
to fetch call's result.
callStream :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO (IO (), Source res) Source #
Call a stream method, no other call
or notify
should be sent until
returned stream is consumed completely.
This is implemented by extend MessagePack-RPC protocol by adding following new message types:
-- start stream request [typ 0x04, name, param] -- stop stream request [typ 0x05] -- each stream response [typ 0x06, err, value] -- stream response end [typ 0x07]
The return tuple is a pair of a stop action and a Source
, to terminate stream early, call the
stop action. Please continue consuming until EOF reached,
otherwise the state of the Client
will be incorrect.
Misc
data RPCException Source #
Exception thrown when remote endpoint return errors.
Instances
Show RPCException Source # | |
Defined in Z.IO.RPC.MessagePack showsPrec :: Int -> RPCException -> ShowS # show :: RPCException -> String # showList :: [RPCException] -> ShowS # | |
Exception RPCException Source # | |
Defined in Z.IO.RPC.MessagePack |