Copyright | (c) Tim Watson 2012 - 2017 |
---|---|
License | BSD3 (see the file LICENSE) |
Maintainer | Tim Watson <watson.timothy@gmail.com> |
Stability | experimental |
Portability | non-portable (requires concurrency) |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
This module provides a high(er) level API for building complex Process
implementations by abstracting out the management of the process' mailbox,
reply/response handling, timeouts, process hiberation, error handling
and shutdown/stop procedures. It is modelled along similar lines to OTP's
gen_server API - http://www.erlang.org/doc/man/gen_server.html.
In particular, a managed process will interoperate cleanly with the supervisor API in distributed-process-supervision.
- API Overview For The Impatient
Once started, a managed process will consume messages from its mailbox and
pass them on to user defined handlers based on the types received (mapped
to those accepted by the handlers) and optionally by also evaluating user
supplied predicates to determine which handler(s) should run.
Each handler returns a ProcessAction
which specifies how we should proceed.
If none of the handlers is able to process a message (because their types are
incompatible), then the unhandledMessagePolicy
will be applied.
The ProcessAction
type defines the ways in which our process can respond
to its inputs, whether by continuing to read incoming messages, setting an
optional timeout, sleeping for a while, or stopping. The optional timeout
behaves a little differently to the other process actions: If no messages
are received within the specified time span, a user defined timeoutHandler
will be called in order to determine the next action.
The ProcessDefinition
type also defines a shutdownHandler
,
which is called whenever the process exits, whether because a callback has
returned stop
as the next action, or as the result of unhandled exit signal
or similar asynchronous exceptions thrown in (or to) the process itself.
The handlers are split into groups: apiHandlers, infoHandlers, and extHandlers.
- Seriously, TL;DR
Use serve
for a process that sits reading its mailbox and generally behaves
as you'd expect. Use pserve
and PrioritisedProcessDefinition
for a server
that manages its mailbox more comprehensively and handles errors a bit differently.
Both use the same client API.
DO NOT mask in handler code, unless you can guarantee it won't be long running and absolutely won't block kill signals from a supervisor.
Do look at the various API offerings, as there are several, at different levels of abstraction.
- Managed Process Mailboxes
Managed processes come in two flavours, with different runtime characteristics and (to some extent) semantics. These flavours are differentiated by the way in which they handle the server process mailbox - all client interactions remain the same.
The vanilla managed process mailbox, provided by the serve
API, is roughly
akin to a tail recursive listen function that calls a list of passed in
matchers. We might naively implement it roughly like this:
loop :: stateT -> [(stateT -> Message -> Maybe stateT)] -> Process () loop state handlers = do st2 <- receiveWait $ map (\d -> handleMessage (d state)) handlers case st2 of Nothing -> {- we're done serving -} return () Just s2 -> loop s2 handlers
Obviously all the details have been ellided, but this is the essential premise
behind a managed process loop. The process keeps reading from its mailbox
indefinitely, until either a handler instructs it to stop, or an asynchronous
exception (or exit signal - in the form of an async ProcessExitException
)
terminates it. This kind of mailbox has fairly intuitive runtime characteristics
compared to a plain server process (i.e. one implemented without the use of
this library): messages will pile up in its mailbox whilst handlers are
running, and each handler will be checked against the mailbox based on the
type of messages it recognises. We can potentially end up scanning a very
large mailbox trying to match each handler, which can be a performance
bottleneck depending on expected traffic patterns.
For most simple server processes, this technique works well and is easy to
reason about a use. See the sections on error and exit handling later on for
more details about serve
based managed processes.
- Prioritised Mailboxes
A prioritised mailbox serves two purposes. The first of these is to allow a managed process author to specify that certain classes of message should be prioritised by the server loop. This is achieved by draining the real process mailbox into an internal priority queue, and running the server's handlers repeatedly over its contents, which are dequeued in priority order. The obvious consequence of this approach leads to the second purpose (or the accidental side effect, depending on your point of view) of a prioritised mailbox, which is that we avoid scanning a large mailbox when searching for messages that match the handlers we anticipate running most frequently (or those messages that we deem most important).
There are several consequences to this approach. One is that we do quite a bit
more work to manage the process mailbox behind the scenes, therefore we have
additional space overhead to consider (although we are also reducing the size
of the mailbox, so there is some counter balance here). The other is that if
we do not see the anticipated traffic patterns at runtime, then we might
spend more time attempting to prioritise infrequent messages than we would
have done simply receiving them! We do however, gain a degree of safety with
regards message loss that the serve
based vanilla mailbox cannot offer.
See the sections on error and exit handling later on for more details about
these.
A Prioritised pserve
loop maintains its internal state - including the user
defined server state - in an IORef
, ensuring it is held consistently
between executions, even in the face of unhandled exceptions.
- Defining Prioritised Process Definitions
A PrioritisedProcessDefintion
combines the usual ProcessDefintion
-
containing the cast/call API, error, termination and info handlers - with a
list of Priority
entries, which are used at runtime to prioritise the
server's inputs. Note that it is only messages which are prioritised; The
server's various handlers are still evaluated in the order in which they
are specified in the ProcessDefinition
.
Prioritisation does not guarantee that a prioritised message/type will be processed before other traffic - indeed doing so in a multi-threaded runtime would be very hard - but in the absence of races between multiple processes, if two messages are both present in the process' own mailbox, they will be applied to the ProcessDefinition's handlers in priority order.
A prioritised process should probably be configured with a Priority
list to
be useful. Creating a prioritised process without any priorities could be a
potential waste of computational resources, and it is worth thinking carefully
about whether or not prioritisation is truly necessary in your design before
choosing to use it.
Using a prioritised process is as simple as calling pserve
instead of
serve
, and passing an initialised PrioritisedProcessDefinition
.
- The Cast and Call Protocols
Deliberate interactions with a managed process usually falls into one of
two categories. A cast
interaction involves a client sending a message
asynchronously and the server handling this input. No reply is sent to
the client. On the other hand, a call
is a remote procedure call,
where the client sends a message and waits for a reply from the server.
All expressions given to apiHandlers
have to conform to the cast or call
protocol. The protocol (messaging) implementation is hidden from the user;
API functions for creating user defined apiHandlers
are given instead,
which take expressions (i.e., a function or lambda expression) and create the
appropriate Dispatcher
for handling the cast (or call).
These cast and call protocols are for dealing with expected inputs. They will usually form the explicit public API for the process, and be exposed by providing module level functions that defer to the cast or call client API, giving the process author an opportunity to enforce the correct input and response types. For example:
{- Ask the server to add two numbers -} add :: ProcessId -> Double -> Double -> Double add pid x y = call pid (Add x y)
Note here that the return type from the call is inferred and will not be enforced by the type system. If the server sent a different type back in the reply, then the caller might be blocked indefinitely! In fact, the result of mis-matching the expected return type (in the client facing API) with the actual type returned by the server is more severe in practise. The underlying types that implement the call protocol carry information about the expected return type. If there is a mismatch between the input and output types that the client API uses and those which the server declares it can handle, then the message will be considered unroutable - no handler will be executed against it and the unhandled message policy will be applied. You should, therefore, take great care to align these types since the default unhandled message policy is to terminate the server! That might seem pretty extreme, but you can alter the unhandled message policy and/or use the various overloaded versions of the call API in order to detect errors on the server such as this.
The cost of potential type mismatches between the client and server is the main disadvantage of this looser coupling between them. This mechanism does however, allow servers to handle a variety of messages without specifying the entire protocol to be supported in excruciating detail. For that, we would want session types, which are beyond the scope of this library.
- Handling Unexpected/Info Messages
An explicit protocol for communicating with the process can be
configured using cast
and call
, but it is not possible to prevent
other kinds of messages from being sent to the process mailbox. When
any message arrives for which there are no handlers able to process
its content, the UnhandledMessagePolicy
will be applied. Sometimes
it is desirable to process incoming messages which aren't part of the
protocol, rather than let the policy deal with them. This is particularly
true when incoming messages are important to the process, but their point
of origin is outside the author's control. Handling signals such as
ProcessMonitorNotification
is a typical example of this:
handleInfo_ (\(ProcessMonitorNotification _ _ r) -> say $ show r >> continue_)
- Handling Process State
The ProcessDefinition
is parameterised by the type of state it maintains.
A process that has no state will have the type ProcessDefinition ()
and can
be bootstrapped by evaluating statelessProcess
.
All call/cast handlers come in two flavours, those which take the process state as an input and those which do not. Handlers that ignore the process state have to return a function that takes the state and returns the required action. Versions of the various action generating functions ending in an underscore are provided to simplify this:
statelessProcess { apiHandlers = [ handleCall_ (\(n :: Int) -> return (n * 2)) , handleCastIf_ (\(c :: String, _ :: Delay) -> c == "timeout") (\("timeout", (d :: Delay)) -> timeoutAfter_ d) ] , timeoutHandler = \_ _ -> stop $ ExitOther "timeout" }
- Avoiding Side Effects
If you wish to only write side-effect free code in your server definition, then there is an explicit API for doing so. Instead of using the handler definition functions in this module, import the pure server module instead, which provides a StateT based monad for building referentially transparent callbacks.
See Control.Distributed.Process.ManagedProcess.Server.Restricted for details and API documentation.
- Handling Errors
Error handling appears in several contexts and process definitions can hook into these with relative ease. Catching exceptions inside handle functions is no different to ordinary exception handling in monadic code.
handleCall (\x y -> catch (hereBeDragons x y) (\(e :: SmaugTheTerribleException) -> return (Left (show e))))
The caveats mentioned in Control.Distributed.Process.Extras about
exit signal handling are very important here - it is strongly advised that
you do not catch exceptions of type ProcessExitException
unless you plan
to re-throw them again.
- Structured Exit Handling
Because Control.Distributed.Process.ProcessExitException is a ubiquitous
signalling mechanism in Cloud Haskell, it is treated unlike other
asynchronous exceptions. The ProcessDefinition
exitHandlers
field
accepts a list of handlers that, for a specific exit reason, can decide
how the process should respond. If none of these handlers matches the
type of reason
then the process will exit. with DiedException why
. In
addition, a private exit handler is installed for exit signals where
(reason :: ExitReason) == ExitShutdown
, which is an of exit signal used
explicitly by supervision APIs. This behaviour, which cannot be overriden, is
to gracefully shut down the process, calling the shutdownHandler
as usual,
before stopping with reason
given as the final outcome.
Example: handling custom data is ProcessExitException
handleExit (\state from (sigExit :: SomeExitData) -> continue s)
Under some circumstances, handling exit signals is perfectly legitimate. Handling of other forms of asynchronous exception (e.g., exceptions not generated by an exit signal) is not supported by this API. Cloud Haskell's primitives for exception handling will work normally in managed process callbacks, but you are strongly advised against swallowing exceptions in general, or masking, unless you have carefully considered the consequences.
- Different Mailbox Types and Exceptions: Message Loss
Neither the vanilla nor the prioritised mailbox implementations will allow you to handle arbitrary asynchronous exceptions outside of your handler code. The way in which the two mailboxes handle unexpected asynchronous exceptions differs significantly however. The first consideration pertains to potential message loss.
Consider a plain Cloud Haskell expression such as the following:
catch (receiveWait [ match ((m :: SomeType) -> doSomething m) ]) ((e :: SomeCustomAsyncException) -> handleExFrom e pid)
It is entirely possible that receiveWait
will succeed in matching a message
of type SomeType
from the mailbox and removing it, to be handed to the
supplied expression doSomething
. Should an asynchronous exception arrive
at this moment in time, though the handler might run and allow the server to
recover, the message will be permanently lost.
The mailbox exposed by serve
operates in exactly this way, and as such it
is advisible to avoid swallowing asynchronous exceptions, since doing so can
introduce the possibility of unexpected message loss.
The prioritised mailbox exposed by pserve
on the other hand, does not suffer
this scenario. Whilst the mailbox is drained into the internal priority queue,
asynchronous exceptions are masked, and only once the queue has been updated
are they removed. In addition, it is possible to peek
at the priority queue
without removing a message, thereby ensuring that should the handler fail or
an asynchronous exception arrive whilst processing the message, we can resume
handling our message immediately upon recovering from the exception. This
behaviour allows the process to guarantee against message loss, whilst avoiding
masking within handlers, which is generally bad form (and can potentially lead
to zombie processes, when supervised servers refuse to respond to kill
signals whilst stuck in a long running handler).
Also note that a process' internal state is subject to the same semantics,
such that the arrival of an asynchronous exception (including exit signals!)
can lead to handlers (especially exit and shutdown handlers) running with
a stale version of their state. For this reason - since we cannot guarantee
an up to date state in the presence of these semantics - a shutdown handler
for a serve
loop will always have its state passed as LastKnown stateT
.
- Different Mailbox Types and Exceptions: Error Recovery And Shutdown
If any asynchronous exception goes unhandled by a vanilla process, the
server will immediately exit without running the user supplied shutdownHandler
.
It is very important to note that in Cloud Haskell, link failures generate
asynchronous exceptions in the target and these will NOT be caught by the serve
API and will therefore cause the process to exit /without running the
termination handler/ callback. If your termination handler is set up to do
important work (such as resource cleanup) then you should avoid linking you
process and use monitors instead. If your code absolutely must run its
termination handlers in the face of any unhandled (async) exception, consider
using a prioritised mailbox, which handles this. Alternatively, consider
arranging your processes in a supervision tree, and using a shutdown strategy
to ensure that siblings terminate cleanly (based off a supervisor's ordered
shutdown signal) in order to ensure cleanup code can run reliably.
As mentioned above, a prioritised mailbox behaves differently in the face
of unhandled asynchronous exceptions. Whilst pserve
still offers no means
for handling arbitrary async exceptions outside your handlers - and you should
avoid handling them within, to the maximum extent possible - it does execute
its receiving process in such a way that any unhandled exception will be
caught and rethrown. Because of this, and the fact that a prioritised process
manages its internal state in an IORef
, shutdown handlers are guaranteed
to run even in the face of async exceptions. These are run with the latest
version of the server state available, given as CleanShutdown stateT
when
the process is terminating normally (i.e. for reasons ExitNormal
or
ExitShutdown
), and LastKnown stateT
when an exception terminated the
server process abruptly. The latter acknowledges that we cannot guarantee
the exception did not interrupt us after the last handler ran and returned an
updated state, but prior to storing the update.
Although shutdown handlers are run even in the face of unhandled exceptions (and prior to re-throwing, when there is one present), they are not run in a masked state. In fact, exceptions are explicitly unmasked prior to executing a handler, therefore it is possible for a shutdown handler to terminate abruptly. Once again, supervision hierarchies are a better way to ensure consistent cleanup occurs when valued resources are held by a process.
- Filters, pre-processing, and safe handlers
A prioritised process can take advantage of filters, which enable the server to pre-process messages, reject them (based on the message itself, or the server's state), and mark classes of message as requiring safe handling.
Assuming a PrioritisedProcessDefinition
that holds its state as an Int
,
here are some simple applications of filters:
let rejectUnchecked = rejectApi Foo :: Int -> P.Message String String -> Process (Filter Int) filters = [ store (+1) , ensure (>0) , check $ api_ (\(s :: String) -> return $ "checked-" `isInfixOf` s) rejectUnchecked , check $ info (\_ (_ :: MonitorRef, _ :: ProcessId) -> return False) $ reject Foo , refuse ((> 10) :: Int -> Bool) ]
We can store/update our state, ensure our state is in a valid condition, check api and info messages, and refuse messages using simple predicates. Messages cannot be modified by filters, not can reply data.
A safe
filter is a means to instruct the prioritised managed process loop
not to dequeue the current message from the internal priority queue until a
handler has successfully matched and run against it (without an exception,
either synchronous or asynchronous) to completion. Messages marked thus, will
remain in the priority queue even in the face of exit signals, which means that
if the server process code handles and swallows them, it will begin re-processing
the last message a second time.
It is important to recognise that the safe
filter does not act like a
transaction. There are no checkpoints, nor facilities for rolling back actions
on failure. If an exit signal terminates a handler for a message marked as
safe
and an exit handler catches and swallows it, the handler (and all prior
filters too) will be re-run in its entireity.
- Special Clients: Control Channels
For advanced users and those requiring very low latency, a prioritised
process definition might not be suitable, since it performs considerable
work behind the scenes. There are also designs that need to segregate a
process' control plane from other kinds of traffic it is expected to
receive. For such use cases, a control channel may prove a better choice,
since typed channels are already prioritised during the mailbox scans that
the base receiveWait
and receiveTimeout
primitives from
distribute-process provides.
In order to utilise a control channel in a server, it must be passed to the
corresponding handleControlChan
function (or its stateless variant). The
control channel is created by evaluating newControlChan
, in the same way
that we create regular typed channels.
In order for clients to communicate with a server via its control channel
however, they must pass a handle to a ControlPort
, which can be obtained by
evaluating channelControlPort
on the ControlChannel
. A ControlPort
is
Serializable
, so they can alternatively be sent to other processes.
Control channel traffic will only be prioritised over other traffic if the
handlers using it are present before others (e.g., handleInfo, handleCast
,
etc) in the process definition. It is not possible to combine prioritised
processes with control channels. Attempting to do so will satisfy the
compiler, but crash with a runtime error once you attempt to evaluate the
prioritised server loop (i.e., pserve
).
Since the primary purpose of control channels is to simplify and optimise
client-server communication over a single channel, this module provides an
alternate server loop in the form of chanServe
. Instead of passing an
initialised ProcessDefinition
, this API takes an expression from a
ControlChannel
to ProcessDefinition
, operating in the Process
monad.
Providing the opaque reference in this fashion is useful, since the type of
messages the control channel carries will not correlate directly to the
inter-process traffic we use internally.
Although control channels are intended for use as a single control plane
(via chanServe
), it is possible to use them as a more strictly typed
communications backbone, since they do enforce absolute type safety in client
code, being bound to a particular type on creation. For rpc (i.e., call
)
interaction however, it is not possible to have the server reply to a control
channel, since they're a one way pipe. It is possible to alleviate this
situation by passing a request type than contains a typed channel bound to
the expected reply type, enabling client and server to match on both the input
and output types as specifically as possible. Note that this still does not
guarantee an agreement on types between all parties at runtime however.
An example of how to do this follows:
data Request = Request String (SendPort String) deriving (Typeable, Generic) instance Binary Request where -- note that our initial caller needs an mvar to obtain the control port... echoServer :: MVar (ControlPort Request) -> Process () echoServer mv = do cc <- newControlChan :: Process (ControlChannel Request) liftIO $ putMVar mv $ channelControlPort cc let s = statelessProcess { apiHandlers = [ handleControlChan_ cc (\(Request m sp) -> sendChan sp m >> continue_) ] } serve () (statelessInit Infinity) s echoClient :: String -> ControlPort Request -> Process String echoClient str cp = do (sp, rp) <- newChan sendControlMessage cp $ Request str sp receiveChan rp
- Communicating with the outside world: External (STM) Input Channels
Both client and server APIs provide a mechanism for interacting with a running server process via STM. This is primarily intended for code that runs outside of Cloud Haskell's Process monad, but can also be used as a channel for sending and/or receiving non-serializable data to or from a managed process. Obviously if you attempt to do this across a remote boundary, things will go spectacularly wrong. The APIs provided do not attempt to restrain this, or to impose any particular scheme on the programmer, therefore you're on your own when it comes to writing the STM code for reading and writing data between client and server.
For code running inside the Process monad and passing Serializable thunks, there is no real advantage to this approach, and indeed there are several serious disadvantages - none of Cloud Haskell's ordering guarantees will hold when passing data to and from server processes in this fashion, nor are there any guarantees the runtime system can make with regards interleaving between messages passed across Cloud Haskell's communication fabric vs. data shared via STM. This is true even when client(s) and server(s) reside on the same local node.
A server wishing to receive data via STM can do so using the handleExternal
API. By way of example, here is a simple echo server implemented using STM:
demoExternal = do inChan <- liftIO newTQueueIO replyQ <- liftIO newTQueueIO let procDef = statelessProcess { apiHandlers = [ handleExternal (readTQueue inChan) (\s (m :: String) -> do liftIO $ atomically $ writeTQueue replyQ m continue s) ] } let txt = "hello 2-way stm foo" pid <- spawnLocal $ serve () (statelessInit Infinity) procDef echoTxt <- liftIO $ do -- firstly we write something that the server can receive atomically $ writeTQueue inChan txt -- then sit and wait for it to write something back to us atomically $ readTQueue replyQ say (show $ echoTxt == txt)
For request/reply channels such as this, a convenience based on the call API
is also provided, which allows the server author to write an ordinary call
handler, and the client author to utilise an API that monitors the server and
does the usual stuff you'd expect an RPC style client to do. Here is another
example of this in use, demonstrating the callSTM
and handleCallExternal
APIs in practise.
data StmServer = StmServer { serverPid :: ProcessId , writerChan :: TQueue String , readerChan :: TQueue String } instance Resolvable StmServer where resolve = return . Just . serverPid echoStm :: StmServer -> String -> Process (Either ExitReason String) echoStm StmServer{..} = callSTM serverPid (writeTQueue writerChan) (readTQueue readerChan) launchEchoServer :: CallHandler () String String -> Process StmServer launchEchoServer handler = do (inQ, replyQ) <- liftIO $ do cIn <- newTQueueIO cOut <- newTQueueIO return (cIn, cOut) let procDef = statelessProcess { apiHandlers = [ handleCallExternal (readTQueue inQ) (writeTQueue replyQ) handler ] } pid <- spawnLocal $ serve () (statelessInit Infinity) procDef return $ StmServer pid inQ replyQ testExternalCall :: TestResult Bool -> Process () testExternalCall result = do let txt = "hello stm-call foo" srv <- launchEchoServer (\st (msg :: String) -> reply msg st) echoStm srv txt >>= stash result . (== Right txt)
- Performance Considerations
The various server loops are fairly optimised, but there is a definite cost associated with scanning the mailbox to match on protocol messages, plus additional costs in space and time due to mapping over all available info handlers for non-protocol (i.e., neither call nor cast) messages. These are exacerbated significantly when using prioritisation, whilst using a single control channel is very fast and carries little overhead.
From the client perspective, it's important to remember that the call
protocol will wait for a reply in most cases, triggering a full O(n) scan of
the caller's mailbox. If the mailbox is extremely full and calls are
regularly made, this may have a significant impact on the caller. The
callChan
family of client API functions can alleviate this, by using (and
matching on) a private typed channel instead, but the server must be written
to accomodate this. Similar gains can be had using a control channel and
providing a typed reply channel in the request data, however the call
mechanism does not support this notion, so not only are we unable
to use the various reply functions, client code should also consider
monitoring the server's pid and handling server failures whilst waiting on
Synopsis
- data InitResult s
- = InitOk s Delay
- | InitStop String
- | InitIgnore
- type InitHandler a s = a -> Process (InitResult s)
- serve :: a -> InitHandler a s -> ProcessDefinition s -> Process ()
- pserve :: a -> InitHandler a s -> PrioritisedProcessDefinition s -> Process ()
- chanServe :: Serializable b => a -> InitHandler a s -> (ControlChannel b -> Process (ProcessDefinition s)) -> Process ()
- runProcess :: (s -> Delay -> Process ExitReason) -> a -> InitHandler a s -> Process ()
- prioritised :: ProcessDefinition s -> [DispatchPriority s] -> PrioritisedProcessDefinition s
- module Control.Distributed.Process.ManagedProcess.Client
- data ProcessDefinition s = ProcessDefinition {}
- data PrioritisedProcessDefinition s = PrioritisedProcessDefinition {
- processDef :: ProcessDefinition s
- priorities :: [DispatchPriority s]
- filters :: [DispatchFilter s]
- recvTimeout :: RecvTimeoutPolicy
- data RecvTimeoutPolicy
- data Priority a
- data DispatchPriority s
- type ShutdownHandler s = ExitState s -> ExitReason -> Process ()
- type TimeoutHandler s = ActionHandler s Delay
- data Condition s m
- type Action s = Process (ProcessAction s)
- data ProcessAction s
- type Reply b s = Process (ProcessReply b s)
- data ProcessReply r s
- type ActionHandler s a = s -> a -> Action s
- type CallHandler s a b = s -> a -> Reply b s
- type CastHandler s a = ActionHandler s a
- type StatelessHandler s a = a -> s -> Action s
- type DeferredCallHandler s a b = CallRef b -> CallHandler s a b
- type StatelessCallHandler s a b = CallRef b -> a -> Reply b s
- type InfoHandler s a = ActionHandler s a
- type ChannelHandler s a b = SendPort b -> ActionHandler s a
- type StatelessChannelHandler s a b = SendPort b -> StatelessHandler s a
- data UnhandledMessagePolicy
- = Terminate
- | DeadLetter ProcessId
- | Log
- | Drop
- data CallRef a
- data ExitState s
- = CleanShutdown s
- | LastKnown s
- isCleanShutdown :: ExitState s -> Bool
- exitState :: ExitState s -> s
- defaultProcess :: ProcessDefinition s
- defaultProcessWithPriorities :: [DispatchPriority s] -> PrioritisedProcessDefinition s
- statelessProcess :: ProcessDefinition ()
- statelessInit :: Delay -> InitHandler () ()
- data ControlChannel m
- data ControlPort m
- newControlChan :: Serializable m => Process (ControlChannel m)
- channelControlPort :: ControlChannel m -> ControlPort m
- module Control.Distributed.Process.ManagedProcess.Server
- data Message a b
- data GenProcess s a
- data Filter s
- data DispatchFilter s
- peek :: GenProcess s (Maybe Message)
- processState :: GenProcess s s
- runAfter :: forall s m. Serializable m => TimeInterval -> m -> GenProcess s ()
- evalAfter :: forall s m. Serializable m => TimeInterval -> m -> s -> Action s
- currentTimeout :: GenProcess s Delay
- processDefinition :: GenProcess s (ProcessDefinition s)
- processFilters :: GenProcess s [DispatchFilter s]
- processUnhandledMsgPolicy :: GenProcess s UnhandledMessagePolicy
- setUserTimeout :: Delay -> GenProcess s ()
- setProcessState :: s -> GenProcess s ()
- push :: forall s. Message -> GenProcess s ()
- addUserTimer :: Timer -> Message -> GenProcess s TimerKey
- act :: forall s. GenProcess s () -> Action s
- module Control.Distributed.Process.ManagedProcess.Internal.GenProcess
Starting/Running server processes
data InitResult s Source #
Return type for and InitHandler
expression.
type InitHandler a s = a -> Process (InitResult s) Source #
An expression used to initialise a process with its state
serve :: a -> InitHandler a s -> ProcessDefinition s -> Process () Source #
Starts the message handling loop for a managed process configured with the supplied process definition, after calling the init handler with its initial arguments. Note that this function does not return until the server exits.
pserve :: a -> InitHandler a s -> PrioritisedProcessDefinition s -> Process () Source #
Starts the message handling loop for a prioritised managed process, configured with the supplied process definition, after calling the init handler with its initial arguments. Note that this function does not return until the server exits.
chanServe :: Serializable b => a -> InitHandler a s -> (ControlChannel b -> Process (ProcessDefinition s)) -> Process () Source #
Starts the message handling loop for a managed process, configured with
a typed control channel. The caller supplied expression is evaluated with
an opaque reference to the channel, which must be passed when calling
handleControlChan
. The meaning and behaviour of the init handler and
initial arguments are the same as those given to serve
. Note that this
function does not return until the server exits.
runProcess :: (s -> Delay -> Process ExitReason) -> a -> InitHandler a s -> Process () Source #
Wraps any process loop and ensures that it adheres to the
managed process start/stop semantics, i.e., evaluating the
InitHandler
with an initial state and delay will either
die
due to InitStop
, exit silently (due to InitIgnore
)
or evaluate the process' loop
. The supplied loop
must evaluate
to ExitNormal
, otherwise the calling processing will die
with
whatever ExitReason
is given.
prioritised :: ProcessDefinition s -> [DispatchPriority s] -> PrioritisedProcessDefinition s Source #
Turns a standard ProcessDefinition
into a PrioritisedProcessDefinition
,
by virtue of the supplied list of DispatchPriority
expressions.
Client interactions
Defining server processes
data ProcessDefinition s Source #
Stores the functions that determine runtime behaviour in response to incoming messages and a policy for responding to unhandled messages.
ProcessDefinition | |
|
data PrioritisedProcessDefinition s Source #
A ProcessDefinition
decorated with DispatchPriority
for certain
input domains.
data RecvTimeoutPolicy Source #
For a PrioritisedProcessDefinition
, this policy determines for how long
the receive loop should continue draining the process' mailbox before
processing its received mail (in priority order).
If a prioritised managed process is receiving a lot of messages (into its real mailbox), the server might never get around to actually processing its inputs. This (mandatory) policy provides a guarantee that eventually (i.e., after a specified number of received messages or time interval), the server will stop removing messages from its mailbox and process those it has already received.
data DispatchPriority s Source #
Dispatcher for prioritised handlers
type ShutdownHandler s = ExitState s -> ExitReason -> Process () Source #
An expression used to handle process termination
type TimeoutHandler s = ActionHandler s Delay Source #
An expression used to handle process timeouts
Wraps a predicate that is used to determine whether or not a handler is valid based on some combination of the current process state, the type and/or value of the input message or both.
type Action s = Process (ProcessAction s) Source #
An action (server state transition) in the Process
monad
data ProcessAction s Source #
The action taken by a process after a handler has run and its updated state. See "Control.Distributed.Process.ManagedProcess.Server.continue" "Control.Distributed.Process.ManagedProcess.Server.timeoutAfter" "Control.Distributed.Process.ManagedProcess.Server.hibernate" "Control.Distributed.Process.ManagedProcess.Server.stop" "Control.Distributed.Process.ManagedProcess.Server.stopWith"
Also see "Control.Distributed.Process.Management.Priority.act" and "Control.Distributed.Process.ManagedProcess.Priority.runAfter".
And other actions. This type should not be used directly.
type Reply b s = Process (ProcessReply b s) Source #
An action (server state transition) causing a reply to a caller, in the
Process
monad
data ProcessReply r s Source #
Returned from handlers for the synchronous call
protocol, encapsulates
the reply data and the action to take after sending the reply. A handler
can return NoReply
if they wish to ignore the call.
type ActionHandler s a = s -> a -> Action s Source #
An expression used to handle a message
type CallHandler s a b = s -> a -> Reply b s Source #
An expression used to handle a message and providing a reply
type CastHandler s a = ActionHandler s a Source #
An expression used to handle a cast message
type StatelessHandler s a = a -> s -> Action s Source #
An expression used to ignore server state during handling
type DeferredCallHandler s a b = CallRef b -> CallHandler s a b Source #
An expression used to handle a call message where the reply is deferred
via the CallRef
type StatelessCallHandler s a b = CallRef b -> a -> Reply b s Source #
An expression used to handle a call message ignoring server state
type InfoHandler s a = ActionHandler s a Source #
An expression used to handle an info message
type ChannelHandler s a b = SendPort b -> ActionHandler s a Source #
An expression used to handle a channel message
type StatelessChannelHandler s a b = SendPort b -> StatelessHandler s a Source #
An expression used to handle a channel message in a stateless process
data UnhandledMessagePolicy Source #
Policy for handling unexpected messages, i.e., messages which are not
sent using the call
or cast
APIs, and which are not handled by any of the
handleInfo
handlers.
Terminate | stop immediately, giving |
DeadLetter ProcessId | forward the message to the given recipient |
Log | log messages, then behave identically to |
Drop | dequeue and then drop/ignore the message |
Instances
Wraps a consumer of the call API
Instances
Generic (CallRef a) Source # | |
Show (CallRef a) Source # | |
Binary (CallRef a) Source # | |
NFData (CallRef a) Source # | |
Resolvable (CallRef a) Source # | |
Routable (CallRef a) Source # | |
Defined in Control.Distributed.Process.ManagedProcess.Internal.Types sendTo :: (Serializable m, Resolvable (CallRef a)) => CallRef a -> m -> Process () # unsafeSendTo :: (NFSerializable m, Resolvable (CallRef a)) => CallRef a -> m -> Process () # | |
Eq (CallRef a) Source # | |
type Rep (CallRef a) Source # | |
Defined in Control.Distributed.Process.ManagedProcess.Internal.Types type Rep (CallRef a) = D1 ('MetaData "CallRef" "Control.Distributed.Process.ManagedProcess.Internal.Types" "distributed-process-client-server-0.2.7.0-J8jFpPKNrsjKtWieo0rAq3" 'True) (C1 ('MetaCons "CallRef" 'PrefixI 'True) (S1 ('MetaSel ('Just "unCaller") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Recipient, CallId)))) |
Informs a shutdown handler of whether it is running due to a clean shutdown, or in response to an unhandled exception.
CleanShutdown s | given when an ordered shutdown is underway |
LastKnown s |
isCleanShutdown :: ExitState s -> Bool Source #
True
if the ExitState
is CleanShutdown
, otherwise False
.
defaultProcess :: ProcessDefinition s Source #
A default ProcessDefinition
, with no api, info or exit handler.
The default timeoutHandler
simply continues, the shutdownHandler
is a no-op and the unhandledMessagePolicy
is Terminate
.
defaultProcessWithPriorities :: [DispatchPriority s] -> PrioritisedProcessDefinition s Source #
Creates a default PrioritisedProcessDefinition
from a list of
DispatchPriority
. See defaultProcess
for the underlying definition.
statelessProcess :: ProcessDefinition () Source #
A basic, stateless ProcessDefinition
. See defaultProcess
for the
default field values.
statelessInit :: Delay -> InitHandler () () Source #
A default, state unaware InitHandler
that can be used with
statelessProcess
. This simply returns InitOk
with the empty
state (i.e., unit) and the given Delay
.
Control channels
data ControlChannel m Source #
Provides a means for servers to listen on a separate, typed control channel, thereby segregating the channel from their regular (and potentially busy) mailbox.
data ControlPort m Source #
The writable end of a ControlChannel
.
Instances
Show (ControlPort m) Source # | |
Defined in Control.Distributed.Process.ManagedProcess.Internal.Types showsPrec :: Int -> ControlPort m -> ShowS # show :: ControlPort m -> String # showList :: [ControlPort m] -> ShowS # | |
Serializable m => Binary (ControlPort m) Source # | |
Eq (ControlPort m) Source # | |
Defined in Control.Distributed.Process.ManagedProcess.Internal.Types (==) :: ControlPort m -> ControlPort m -> Bool # (/=) :: ControlPort m -> ControlPort m -> Bool # |
newControlChan :: Serializable m => Process (ControlChannel m) Source #
Creates a new ControlChannel
.
channelControlPort :: ControlChannel m -> ControlPort m Source #
Obtain an opaque expression for communicating with a ControlChannel
.
Server side callbacks
Prioritised mailboxes
Message
type used internally by the call, cast, and rpcChan APIs.
Instances
data GenProcess s a Source #
StateT based monad for prioritised process loops.
Instances
Given as the result of evaluating a DispatchFilter. This type is intended for internal use. For an API for working with filters, see Control.Distributed.Process.ManagedProcess.Priority.
data DispatchFilter s Source #
Provides dispatch from a variety of inputs to a typed filter handler.
peek :: GenProcess s (Maybe Message) Source #
Peek at the next available message in the internal priority queue, without removing it.
processState :: GenProcess s s Source #
Evaluates to the user defined state for the currently executing server loop.
runAfter :: forall s m. Serializable m => TimeInterval -> m -> GenProcess s () Source #
Warning: This interface is intended for internal use only
Starts a timer and adds it as a user timeout.
evalAfter :: forall s m. Serializable m => TimeInterval -> m -> s -> Action s Source #
Evaluate any matching info handler with the supplied datum after waiting
for at least TimeInterval
. The process state (for the resulting Action s
)
is also given and the process loop will go on as per Server.continue
.
Informally, evaluating this expression (such that the Action
is given as the
result of a handler or filter) will ensure that the supplied message (datum)
is availble for processing no sooner than TimeInterval
.
Currently, this expression creates an Action
that triggers immediate
evaluation in the process loop before continuing with the given state. The
process loop stores a user timeout for the given time interval, which is
trigerred like a wait/drain timeout. This implementation is subject to change.
currentTimeout :: GenProcess s Delay Source #
The current (user supplied) timeout.
processDefinition :: GenProcess s (ProcessDefinition s) Source #
The ProcessDefinition
for the current loop.
processFilters :: GenProcess s [DispatchFilter s] Source #
The list of filters for the current loop.
processUnhandledMsgPolicy :: GenProcess s UnhandledMessagePolicy Source #
Evaluates to the UnhandledMessagePolicy
for the current loop.
setUserTimeout :: Delay -> GenProcess s () Source #
Set the user timeout applied whilst a prioritised process loop is in a blocking receive.
setProcessState :: s -> GenProcess s () Source #
Set the current process state.
push :: forall s. Message -> GenProcess s () Source #
Push a message to the head of the internal priority queue.
addUserTimer :: Timer -> Message -> GenProcess s TimerKey Source #
Add a user timer, bound to the given datum.
act :: forall s. GenProcess s () -> Action s Source #
Warning: This interface is intended for internal use only
Produce an Action s
that, if it is the result of a handler, will cause the
server loop to evaluate the supplied expression. This is given in the GenProcess
monad, which is intended for internal use only.