concurrent-rpc- An abstraction for inter-thread RPC based on MVars

Copyright(c) Lars Petersen 2016
Safe HaskellSafe



This module offers the single method newRPC which creates a typed communication channel. Give the one end of the channel to client threads and the other end to worker threads.

Also see for an example.



type RPC request response = request -> IO response Source

The client interface for threads invoking remote procedure calls.

  • The operation blocks until another thread processed the request.
  • If the other thread throws an exception during processing, the exception is re-thrown in the thread waiting for the response.
type Rocket  = String
type Liftoff = Bool

main = do
  (launchRocket, withRocket) <- newRPC :: IO (RPC Rocket Liftoff, WithRPC Rocket Liftoff)
    ( launchRocket "Apollo 11" >>= \liftoff-> if liftoff
        then print "Houston, we have a liftoff!"
        else print "Launch cancelled!"
    ( \e-> print $ "Houston, we have a problem: " ++ show (e :: IOError) )

type WithRPC request response = (request -> IO response) -> IO () Source

The interface for threads that serve and process remote procedure calls.

  • More than one thread may be used to process requests all using the same interface object. All processing threads will block on take on the same MVar and only one will be served at a time. Fairness properties of MVar apply.
  • Exceptions thrown by the handler operation will be re-thrown in both the processing thread and the requesting thread.
type Rocket  = String
type Liftoff = Bool

main = do
  (launchRocket, withRocket) <- newRPC :: IO (RPC Rocket Liftoff, WithRPC Rocket Liftoff)
  -- This is the rocket launch site thread. It forever waits for rockets and fires them into space one after the other.
  forkIO $ withFile "/dev/null" WriteMode $ \space->
    forever $ catch
      ( withRocket $ \rocket-> do
          weather <- getWeatherCondition
          when (isGood weather) $
            hPutStrLn space rocket -- The actual launch may throw an IOError!
          return weather
      ( \e-> print "A rocket exploded during launch phase: " ++ (e :: IOError) )

newRPC :: IO (RPC request response, WithRPC request response) Source

Creates a new request-response communication channel that may be used by arbitrary many requesting and/or processing threads.

main :: IO ()
main = do
 (rpc, withRpc) <- newRPC
 forkIO $ forever $ withRpc $ \request->
   response <- doSomethingWith request
   return response
 response <- rpc request
  • newRPC initially creates one empty MVar for queueing requests.
  • Each call of rpc creates a temporary MVar for the reponse.
  • If the handler given to withRpc throws an exception, the exception is re-thrown in the withRpc thread as well as in the rpc thread that issued the call.
  • If an rpc thread that issued a call dies before the request processing has started, the request gets discarded. If processing has already started, the processing will finish and the response gets discarded.