| Copyright | (c) Dong Han 2019 |
|---|---|
| License | BSD |
| Maintainer | winterland1989@gmail.com |
| Stability | experimental |
| Portability | non-portable |
| Safe Haskell | None |
| Language | Haskell2010 |
Z.IO.RPC.MessagePack
Description
This module provides MessagePack-RPC implementation.
Synopsis
- type ServerLoop = (UVStream -> IO ()) -> IO ()
- type ServerService a = Text -> Maybe (ServerHandler a)
- data ServerHandler a where
- CallHandler :: (MessagePack req, MessagePack res) => (SessionCtx a -> req -> IO res) -> ServerHandler a
- NotifyHandler :: MessagePack req => (SessionCtx a -> req -> IO ()) -> ServerHandler a
- StreamHandler :: (MessagePack req, MessagePack res) => (SessionCtx a -> IORef Bool -> req -> IO (Source res)) -> ServerHandler a
- data SessionCtx a
- readSessionCtx :: SessionCtx a -> IO (Maybe a)
- writeSessionCtx :: SessionCtx a -> a -> IO ()
- clearSessionCtx :: SessionCtx a -> IO ()
- modifySessionCtx :: SessionCtx a -> (a -> Maybe a) -> IO ()
- serveRPC :: ServerLoop -> ServerService a -> IO ()
- serveRPC' :: ServerLoop -> Int -> Int -> ServerService a -> IO ()
- simpleRouter :: [(Text, ServerHandler a)] -> ServerService a
- data Client = Client {
- _clientSeqRef :: Counter
- _clientPipelineReqNum :: Counter
- _clientBufferedInput :: BufferedInput
- _clientBufferedOutput :: BufferedOutput
- rpcClient :: (Input dev, Output dev) => dev -> IO Client
- rpcClient' :: (Input i, Output o) => i -> o -> Int -> Int -> IO Client
- call :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO res
- notify :: (MessagePack req, HasCallStack) => Client -> Text -> req -> IO ()
- type PipelineId = Int
- type PipelineResult = FlatIntMap Value
- callPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO PipelineId
- notifyPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO ()
- execPipeline :: HasCallStack => Client -> IO PipelineResult
- fetchPipeline :: HasCallStack => MessagePack res => PipelineId -> PipelineResult -> IO res
- callStream :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO (IO (), Source res)
- data Request a
- = Notify (Text, a)
- | Call (Int64, Text, a)
- | StreamStart (Text, a)
- data RPCException
Example
import Data.Maybe
import Z.IO.RPC.MessagePack
import Z.IO.Network
import Data.IORef
import Z.IO
import qualified Z.Data.Text as T
import qualified Z.Data.Vector as V
newtype ServerCtx = ServerCtx { counter :: Int }
main = serveRPC (startTCPServer defaultTCPServerConfig) $ simpleRouter
[ ("hi", CallHandler $ \ctx (req :: T.Text) -> do
writeSessionCtx ctx (ServerCtx 0)
return ("hello, " <> req)
)
, ("foo", CallHandler $ \ctx (req :: Int) -> do
modifySessionCtx ctx (Just . ServerCtx . (+ 1) . counter)
return (req + 1)
)
, ("bar", CallHandler $ \ctx (req :: T.Text) -> do
counter . fromJust <$> readSessionCtx ctx
)
, ("qux", StreamHandler $ \ctx eofRef (_ :: ()) -> do
withMVar stdinBuf (\ stdin -> pure $ \ k _ -> do
eof <- readIORef eofRef
if eof
then k EOF
else do
r <- readBuffer stdin
if V.null r
then k EOF
else k (Just r))
)
]import Data.Maybe
import Z.IO.RPC.MessagePack
import Z.IO.Network
import Data.IORef
import Z.IO
import qualified Z.Data.Text as T
import qualified Z.Data.Vector as V
main = withResource (initTCPClient defaultTCPClientConfig) $ \ uvs -> do
c <- rpcClient uvs
-- single call
r <- call @T.Text @T.Text c "hi" "Alice"
print r
_ <- call @Int @Int c "foo" 1
_ <- call @Int @Int c "foo" 1
x <- call @T.Text @Int c "bar" ""
print x
-- streaming result
(_, src) <- callStream c "qux" ()
runBIO_ $ src . sinkToIO (\ b -> withMVar stdoutBuf (\ bo -> do
writeBuffer bo b
flushBuffer bo))Server
type ServerLoop = (UVStream -> IO ()) -> IO () Source #
type ServerService a = Text -> Maybe (ServerHandler a) Source #
data ServerHandler a where Source #
Constructors
| CallHandler :: (MessagePack req, MessagePack res) => (SessionCtx a -> req -> IO res) -> ServerHandler a | |
| NotifyHandler :: MessagePack req => (SessionCtx a -> req -> IO ()) -> ServerHandler a | |
| StreamHandler :: (MessagePack req, MessagePack res) => (SessionCtx a -> IORef Bool -> req -> IO (Source res)) -> ServerHandler a |
|
data SessionCtx a Source #
readSessionCtx :: SessionCtx a -> IO (Maybe a) Source #
writeSessionCtx :: SessionCtx a -> a -> IO () Source #
clearSessionCtx :: SessionCtx a -> IO () Source #
modifySessionCtx :: SessionCtx a -> (a -> Maybe a) -> IO () Source #
Try to modify SessionCtx if it has.
Note that you can set the modifier function to return Nothing to clear SessionCtx.
serveRPC :: ServerLoop -> ServerService a -> IO () Source #
Serve a RPC service.
Arguments
| :: ServerLoop | |
| -> Int | recv buffer size |
| -> Int | send buffer size |
| -> ServerService a | |
| -> IO () |
Serve a RPC service with more control.
simpleRouter :: [(Text, ServerHandler a)] -> ServerService a Source #
Simple router using FlatMap, lookup name in O(log(N)).
import Z.IO.PRC.MessagePack
import Z.IO.Network
import Z.IO
serveRPC (startTCPServer defaultTCPServerConfig) . simpleRouter $
[ ("foo", CallHandler $ \ ctx req -> do
... )
, ("bar", CallHandler $ \ ctx req -> do
... )
]
Client
Constructors
| Client | |
Fields
| |
rpcClient :: (Input dev, Output dev) => dev -> IO Client Source #
Open a RPC client from input/output device.
Open a RPC client with more control.
call :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO res Source #
Send a single RPC call and get result.
notify :: (MessagePack req, HasCallStack) => Client -> Text -> req -> IO () Source #
Send a single notification RPC call without getting result.
Pipeline
type PipelineId = Int Source #
type PipelineResult = FlatIntMap Value Source #
callPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO PipelineId Source #
Make a call inside a pipeline, which will be sent in batch when execPipeline.
... fooId <- callPipeline client "foo" $ ... barId <- callPipeline client "bar" $ ... notifyPipeline client "qux" $ ... r <- execPipeline client fooResult <- fetchPipeline fooId r barResult <- fetchPipeline barId r
notifyPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO () Source #
Make a notify inside a pipeline, which will be sent in batch when execPipeline.
Notify calls doesn't affect execution's result.
execPipeline :: HasCallStack => Client -> IO PipelineResult Source #
Sent request in batch and get result in a map identified by PipelineId.
fetchPipeline :: HasCallStack => MessagePack res => PipelineId -> PipelineResult -> IO res Source #
Use the PipelineId returned when callPipeline to fetch call's result.
callStream :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO (IO (), Source res) Source #
Call a stream method, no other call or notify should be sent until
returned stream is consumed completely.
This is implemented by extend MessagePack-RPC protocol by adding following new message types:
-- start stream request [typ 0x04, name, param] -- stop stream request [typ 0x05] -- each stream response [typ 0x06, err, value] -- stream response end [typ 0x07]
The return tuple is a pair of a stop action and a Source, to terminate stream early, call the
stop action. Please continue consuming until EOF reached,
otherwise the state of the Client will be incorrect.
Misc
Constructors
| Notify (Text, a) | |
| Call (Int64, Text, a) | |
| StreamStart (Text, a) |
data RPCException Source #
Exception thrown when remote endpoint return errors.
Constructors
| RPCStreamUnconsumed CallStack | |
| RPCException Value CallStack |
Instances
| Show RPCException Source # | |
Defined in Z.IO.RPC.MessagePack Methods showsPrec :: Int -> RPCException -> ShowS # show :: RPCException -> String # showList :: [RPCException] -> ShowS # | |
| Exception RPCException Source # | |
Defined in Z.IO.RPC.MessagePack Methods toException :: RPCException -> SomeException # fromException :: SomeException -> Maybe RPCException # displayException :: RPCException -> String # | |