| Copyright | (c) Dong Han 2019 |
|---|---|
| License | BSD |
| Maintainer | winterland1989@gmail.com |
| Stability | experimental |
| Portability | non-portable |
| Safe Haskell | None |
| Language | Haskell2010 |
Z.IO.RPC.MessagePack
Description
This module provides MessagePack-RPC implementation.
-- server
import Z.IO.RPC.MessagePack
import Z.IO.Network
import Z.IO
import qualified Z.Data.Text as T
serveRPC (startTCPServer defaultTCPServerConfig) . simpleRouter $
[ ("foo", CallHandler $ \ (req :: Int) -> do
return (req + 1))
, ("bar", NotifyHandler $ \ (req :: T.Text) -> do
printStd (req <> "world"))
, ("qux", StreamHandler $ \ (_ :: ()) -> do
withMVar stdinBuf (pure . sourceFromBuffered))
]
-- client
import Z.IO.RPC.MessagePack
import Z.IO.Network
import Z.IO
import qualified Z.Data.Text as T
import qualified Z.Data.Vector as V
withResource (initTCPClient defaultTCPClientConfig) $ \ uvs -> do
c <- rpcClient uvs
-- single call
call @Int @Int c "foo" 1
-- notify without result
notify @T.Text c "bar" "hello"
-- streaming result
(_, src) <- callStream c "qux" ()
runBIO $ src >|> sinkToIO (\ b -> withMVar stdoutBuf (\ bo -> do
writeBuffer bo b
flushBuffer bo))
Synopsis
- data Client = Client {}
- rpcClient :: (Input dev, Output dev) => dev -> IO Client
- rpcClient' :: (Input i, Output o) => i -> o -> Int -> Int -> IO Client
- call :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO res
- notify :: (MessagePack req, HasCallStack) => Client -> Text -> req -> IO ()
- type PipelineId = Int
- type PipelineResult = FlatIntMap Value
- callPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO PipelineId
- notifyPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO ()
- data RPCException
- execPipeline :: HasCallStack => Client -> IO PipelineResult
- fetchPipeline :: HasCallStack => MessagePack res => PipelineId -> PipelineResult -> IO res
- callStream :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO (IO (), Source res)
- type ServerLoop = (UVStream -> IO ()) -> IO ()
- type ServerService = Text -> Maybe ServerHandler
- data ServerHandler where
- CallHandler :: (MessagePack req, MessagePack res) => (req -> IO res) -> ServerHandler
- NotifyHandler :: MessagePack req => (req -> IO ()) -> ServerHandler
- StreamHandler :: (MessagePack req, MessagePack res) => (req -> IO (Source res)) -> ServerHandler
- simpleRouter :: [(Text, ServerHandler)] -> ServerService
- serveRPC :: ServerLoop -> ServerService -> IO ()
- data Request a
- serveRPC' :: ServerLoop -> Int -> Int -> ServerService -> IO ()
Documentation
Constructors
| Client | |
rpcClient :: (Input dev, Output dev) => dev -> IO Client Source #
Open a RPC client from input/output device.
Open a RPC client with more control.
call :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO res Source #
Send a single RPC call and get result.
notify :: (MessagePack req, HasCallStack) => Client -> Text -> req -> IO () Source #
Send a single notification RPC call without getting result.
type PipelineId = Int Source #
type PipelineResult = FlatIntMap Value Source #
callPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO PipelineId Source #
Make a call inside a pipeline, which will be sent in batch when execPipeline.
... fooId <- callPipeline client "foo" $ ... barId <- callPipeline client "bar" $ ... notifyPipeline client "qux" $ ... r <- execPipeline client fooResult <- fetchPipeline fooId r barResult <- fetchPipeline barId r
notifyPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO () Source #
Make a notify inside a pipeline, which will be sent in batch when execPipeline.
Notify calls doesn't affect execution's result.
data RPCException Source #
Exception thrown when remote endpoint return errors.
Constructors
| RPCStreamUnconsumed CallStack | |
| RPCException Value CallStack |
Instances
| Show RPCException Source # | |
Defined in Z.IO.RPC.MessagePack Methods showsPrec :: Int -> RPCException -> ShowS # show :: RPCException -> String # showList :: [RPCException] -> ShowS # | |
| Exception RPCException Source # | |
Defined in Z.IO.RPC.MessagePack Methods toException :: RPCException -> SomeException # fromException :: SomeException -> Maybe RPCException # displayException :: RPCException -> String # | |
execPipeline :: HasCallStack => Client -> IO PipelineResult Source #
Sent request in batch and get result in a map identified by PipelineId.
fetchPipeline :: HasCallStack => MessagePack res => PipelineId -> PipelineResult -> IO res Source #
Use the PipelineId returned when callPipeline to fetch call's result.
callStream :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO (IO (), Source res) Source #
Call a stream method, no other call or notify should be sent until
returned stream is consumed completely.
This is implemented by extend MessagePack-RPC protocol by adding following new message types:
-- start stream request [typ 0x04, name, param] -- stop stream request [typ 0x05] -- each stream response [typ 0x06, err, value] -- stream response end [typ 0x07]
The return tuple is a pair of a stop action and a Source, to terminate stream early, call the
stop action. Please continue consuming until EOF reached,
otherwise the state of the Client will be incorrect.
type ServerService = Text -> Maybe ServerHandler Source #
data ServerHandler where Source #
Constructors
| CallHandler :: (MessagePack req, MessagePack res) => (req -> IO res) -> ServerHandler | |
| NotifyHandler :: MessagePack req => (req -> IO ()) -> ServerHandler | |
| StreamHandler :: (MessagePack req, MessagePack res) => (req -> IO (Source res)) -> ServerHandler |
simpleRouter :: [(Text, ServerHandler)] -> ServerService Source #
Simple router using FlatMap, lookup name in O(log(N)).
import Z.IO.PRC.MessagePack
import Z.IO.Network
import Z.IO
serveRPC (startTCPServer defaultTCPServerConfig) . simpleRouter $
[ ("foo", CallHandler $ \ req -> do
... )
, ("bar", CallHandler $ \ req -> do
... )
]
serveRPC :: ServerLoop -> ServerService -> IO () Source #
Serve a RPC service.
Arguments
| :: ServerLoop | |
| -> Int | recv buffer size |
| -> Int | send buffer size |
| -> ServerService | |
| -> IO () |
Serve a RPC service with more control.