Copyright | (c) Naoto Shimazaki 2018-2020 |
---|---|
License | MIT (see the file LICENSE) |
Maintainer | https://github.com/nshimaza |
Stability | experimental |
Safe Haskell | None |
Language | Haskell2010 |
A simplified implementation of Erlang/OTP like supervisor over thread. This is internal module where all real implementations present.
Synopsis
- data Inbox a = Inbox {
- inboxQueue :: TQueue a
- inboxLength :: TVar Word
- inboxSaveStack :: TVar [a]
- inboxMaxBound :: Word
- newtype InboxLength = InboxLength Word
- newtype ActorQ a = ActorQ (Inbox a)
- newInbox :: InboxLength -> IO (Inbox a)
- send :: ActorQ a -> a -> IO ()
- trySend :: ActorQ a -> a -> IO (Maybe ())
- length :: ActorQ a -> IO Word
- receiveSelect :: (a -> Bool) -> Inbox a -> IO a
- tryReceiveSelect :: (a -> Bool) -> Inbox a -> IO (Maybe a)
- pickFromSaveStack :: (a -> Bool) -> [a] -> Maybe (a, [a])
- oneMessageRemoved :: TVar Word -> TVar [a] -> [a] -> STM ()
- receive :: Inbox a -> IO a
- tryReceive :: Inbox a -> IO (Maybe a)
- type ActorHandler message result = Inbox message -> IO result
- data Actor message result = Actor {
- actorQueue :: ActorQ message
- actorAction :: IO result
- newActor :: ActorHandler message result -> IO (Actor message result)
- newBoundedActor :: InboxLength -> ActorHandler message result -> IO (Actor message result)
- newStateMachine :: state -> (state -> message -> IO (Either result state)) -> ActorHandler message result
- newStateMachineActor :: state -> (state -> message -> IO (Either result state)) -> IO (Actor message result)
- type ServerCallback a = a -> IO ()
- cast :: ActorQ cmd -> cmd -> IO ()
- newtype CallTimeout = CallTimeout Int
- call :: CallTimeout -> ActorQ cmd -> (ServerCallback a -> cmd) -> IO (Maybe a)
- callAsync :: CallTimeout -> ActorQ cmd -> (ServerCallback a -> cmd) -> (Maybe a -> IO b) -> IO (Async b)
- callIgnore :: ActorQ cmd -> (ServerCallback a -> cmd) -> IO ()
- data ExitReason
- type Monitor = ExitReason -> ThreadId -> IO ()
- type MonitoredAction = (IO () -> IO ()) -> IO ()
- watch :: Monitor -> IO () -> MonitoredAction
- noWatch :: IO () -> MonitoredAction
- nestWatch :: Monitor -> MonitoredAction -> MonitoredAction
- data Restart
- data ChildSpec = ChildSpec Restart MonitoredAction
- newMonitoredChildSpec :: Restart -> MonitoredAction -> ChildSpec
- newChildSpec :: Restart -> IO () -> ChildSpec
- addMonitor :: Monitor -> ChildSpec -> ChildSpec
- type ThreadMap = IORef (Map ThreadId ChildSpec)
- newThreadMap :: IO ThreadMap
- newThread :: ThreadMap -> ChildSpec -> IO ThreadId
- data RestartSensitivity = RestartSensitivity {}
- data IntenseRestartDetector = IntenseRestartDetector {}
- newIntenseRestartDetector :: RestartSensitivity -> IntenseRestartDetector
- detectIntenseRestart :: IntenseRestartDetector -> TimeSpec -> (Bool, IntenseRestartDetector)
- getCurrentTime :: IO TimeSpec
- detectIntenseRestartNow :: IntenseRestartDetector -> IO (Bool, IntenseRestartDetector)
- data SupervisorMessage
- type SupervisorQueue = ActorQ SupervisorMessage
- type SupervisorInbox = Inbox SupervisorMessage
- newSupervisedThread :: SupervisorQueue -> ThreadMap -> ChildSpec -> IO ThreadId
- startAllSupervisedThread :: SupervisorQueue -> ThreadMap -> [ChildSpec] -> IO ()
- killAllSupervisedThread :: SupervisorInbox -> ThreadMap -> IO ()
- data Strategy
- newSimpleOneForOneSupervisor :: ActorHandler SupervisorMessage ()
- newSupervisor :: Strategy -> RestartSensitivity -> [ChildSpec] -> ActorHandler SupervisorMessage ()
- newChild :: CallTimeout -> SupervisorQueue -> ChildSpec -> IO (Maybe ThreadId)
Documentation
Message queue abstraction.
Inbox | |
|
newtype InboxLength Source #
Maximum length of Inbox
.
Instances
Default InboxLength Source # | |
Defined in Control.Concurrent.SupervisorInternal def :: InboxLength # |
Send a message to given ActorQ
. Block while the queue is full.
Try to send a message to given ActorQ
. Return Nothing if the queue is
already full.
:: (a -> Bool) | Predicate to pick a interesting message. |
-> Inbox a | Message queue where interesting message searched for. |
-> IO a |
Perform selective receive from given Inbox
.
receiveSelect
searches given queue for first interesting message
predicated by user supplied function. It applies the predicate to the
queue, returns the first element that satisfy the predicate, and remove the
element from the Inbox.
It blocks until interesting message arrived if no interesting message was found in the queue.
Caution
Use this function with care. It does NOT discard any message unsatisfying predicate. It keeps them in the queue for future receive and the function itself blocks until interesting message arrived. That causes your queue filled up by non-interesting messages. There is no escape hatch.
Consider using tryReceiveSelect
instead.
Caveat
Current implementation has performance caveat. It has O(n) performance characteristic where n is number of messages existing before your interested message appears. It is because this function performs liner scan from the top of the queue every time it is called. It doesn't cache pair of predicates and results you have given before.
Use this function in limited situation only.
:: (a -> Bool) | Predicate to pick a interesting message. |
-> Inbox a | Message queue where interesting message searched for. |
-> IO (Maybe a) |
Try to perform selective receive from given Inbox
.
tryReceiveSelect
searches given queue for first interesting message
predicated by user supplied function. It applies the predicate to the
queue, returns the first element that satisfy the predicate, and remove the
element from the Inbox.
It return Nothing if there is no interesting message found in the queue.
Caveat
Current implementation has performance caveat. It has O(n) performance characteristic where n is number of messages existing before your interested message appears. It is because this function performs liner scan from the top of the queue every time it is called. It doesn't cache pair of predicates and results you have given before.
Use this function in limited situation only.
pickFromSaveStack :: (a -> Bool) -> [a] -> Maybe (a, [a]) Source #
Find oldest message satisfying predicate from saveStack
, return the
message, and remove it from the saveStack. Returns Nothing
if there is no
satisfying message.
:: TVar Word |
|
-> TVar [a] | TVar to hold given new saveStack. |
-> [a] | New saveStack to update given TVar. |
-> STM () |
Update Inbox
with new saveStack
which already have one message removed.
tryReceive :: Inbox a -> IO (Maybe a) Source #
Try to receive first message in Inbox
. It returns Nothing if there is no
message available.
type ActorHandler message result = Inbox message -> IO result Source #
Type synonym of user supplied message handler working inside actor.
data Actor message result Source #
Actor representation.
Actor | |
|
:: ActorHandler message result | IO action handling received messages. |
-> IO (Actor message result) |
Create a new actor.
Users have to supply a message handler function with ActorHandler
type.
ActorHandler accepts a Inbox
and returns anything.
newActor
creates a new Inbox
, apply user supplied message handler to the
queue, returns reference to write-end of the queue and IO action of the
actor. Because newActor
only returns ActorQ
, caller of newActor
can
only send messages to created actor. Caller cannot receive message from the
queue.
Inbox
, or read-end of the queue, is passed to user supplied message
handler so the handler can receive message to the actor. If the handler
need to send a message to itself, wrap the message queue by ActorQ
constructor then use send
over created ActorQ
. Here is an example.
send (ActorQ yourInbox) message
:: InboxLength | Maximum length of inbox message queue. |
-> ActorHandler message result | IO action handling received messages. |
-> IO (Actor message result) |
Create a new actor with bounded inbox queue.
:: state | Initial state of the state machine. |
-> (state -> message -> IO (Either result state)) | Message handler which processes event and returns result or next state. |
-> ActorHandler message result |
Create a new finite state machine.
The state machine waits for new message at Inbox
then callback user
supplied message handler. The message handler must return Right
with new
state or Left
with final result. When Right
is returned, the state
machine waits for next message. When Left
is returned, the state machine
terminates and returns the result.
newStateMachine
returns an IO action wrapping the state machine described
above. The returned IO action can be executed within an Async
or bare
thread.
Created IO action is designed to run in separate thread from main thread. If you try to run the IO action at main thread without having producer of the message queue you gave, the state machine will dead lock.
newStateMachineActor :: state -> (state -> message -> IO (Either result state)) -> IO (Actor message result) Source #
create an unbound actor of newStateMachine. Short-cut of following.
newActor $ newStateMachine initialState messageHandler
type ServerCallback a = a -> IO () Source #
Type synonym of callback function to obtain return value.
Send an asynchronous request to a server.
newtype CallTimeout Source #
Timeout of call method for server behavior in microseconds. Default is 5 second.
Instances
Default CallTimeout Source # | |
Defined in Control.Concurrent.SupervisorInternal def :: CallTimeout # |
:: CallTimeout | Timeout. |
-> ActorQ cmd | Message queue of the target server. |
-> (ServerCallback a -> cmd) | Request to the server without callback supplied. |
-> IO (Maybe a) |
Send an synchronous request to a server and waits for a return value until timeout.
:: CallTimeout | Timeout. |
-> ActorQ cmd | Message queue. |
-> (ServerCallback a -> cmd) | Request to the server without callback supplied. |
-> (Maybe a -> IO b) | callback to process return value of the call. Nothing is given on timeout. |
-> IO (Async b) |
Make an asynchronous call to a server and give result in CPS style. The
return value is delivered to given callback function. It also can fail by
timeout. Calling thread can wait
for a return value from the
callback.
Use this function with care because there is no guaranteed cancellation of
background worker thread other than timeout. Giving infinite timeout (zero)
to the CallTimeout
argument may cause the background thread left to run,
possibly indefinitely.
:: ActorQ cmd | Message queue of the target server. |
-> (ServerCallback a -> cmd) | Request to the server without callback supplied. |
-> IO () |
Send an request to a server but ignore return value.
data ExitReason Source #
ExitReason
indicates reason of thread termination.
Normal | Thread was normally finished. |
UncaughtException SomeException | A synchronous exception was thrown and it was not caught. This indicates some unhandled error happened inside of the thread handler. |
Killed | An asynchronous exception was thrown. This also happen when the thread was killed by supervisor. |
Instances
Show ExitReason Source # | |
Defined in Control.Concurrent.SupervisorInternal showsPrec :: Int -> ExitReason -> ShowS # show :: ExitReason -> String # showList :: [ExitReason] -> ShowS # |
= ExitReason | Reason of thread termination. |
-> ThreadId | ID of terminated thread. |
-> IO () |
Monitor
is user supplied callback function which is called when monitored thread is terminated.
type MonitoredAction = (IO () -> IO ()) -> IO () Source #
MonitoredAction
is type synonym of function with callback on termination
installed. Its type signature fits to argument for forkIOWithUnmask
.
watch :: Monitor -> IO () -> MonitoredAction Source #
Install Monitor
callback function to simple IO action.
noWatch :: IO () -> MonitoredAction Source #
Convert simple IO action to MonitoredAction
without installing Monitor
.
nestWatch :: Monitor -> MonitoredAction -> MonitoredAction Source #
Install another Monitor
callback function to MonitoredAction
.
Restart
defines when terminated child thread triggers restart operation by
its supervisor. Restart
only defines when it triggers restart operation.
It does not directly means if the thread will be or will not be restarted.
It is determined by restart strategy of supervisor. For example, a static
Temporary
child never triggers restart on its termination but static
Temporary
child will be restarted if another Permanent
or Transient
thread with common supervisor triggered restart operation and the supervisor
has OneForAll
strategy.
Permanent |
|
Transient |
|
Temporary |
|
ChildSpec
is representation of IO action which can be supervised by
supervisor. Supervisor can run the IO action with separate thread, watch
its termination and restart it based on restart type.
newMonitoredChildSpec Source #
:: Restart | Restart type of resulting |
-> MonitoredAction | User supplied monitored IO action which the |
-> ChildSpec |
Create a ChildSpec
from MonitoredAction
.
:: Restart | Restart type of resulting |
-> IO () | User supplied IO action which the |
-> ChildSpec |
Create a ChildSpec
from plain IO action.
data RestartSensitivity Source #
RestartSensitivity
defines condition how supervisor determines intensive restart is happening. If more than
restartSensitivityIntensity
time of restart is triggered within restartSensitivityPeriod
, supervisor decides
intensive restart is happening and it terminates itself. Default intensity (maximum number of acceptable restart)
is 1. Default period is 5 seconds.
RestartSensitivity | |
|
Instances
Default RestartSensitivity Source # | |
Defined in Control.Concurrent.SupervisorInternal |
data IntenseRestartDetector Source #
IntenseRestartDetector
keeps data used for detecting intense restart. It keeps maxR (maximum restart intensity),
maxT (period of majoring restart intensity) and history of restart with system timestamp in Monotonic
form.
IntenseRestartDetector | |
|
newIntenseRestartDetector :: RestartSensitivity -> IntenseRestartDetector Source #
Create new IntenseRestartDetector with given RestartSensitivity
parameters.
:: IntenseRestartDetector | Intense restart detector containing history of past restart with maxT and maxR |
-> TimeSpec | System timestamp in |
-> (Bool, IntenseRestartDetector) | Returns |
Determine if the last restart results intensive restart. It pushes the last restart timestamp to the DelayedQueue
of restart history held inside of the IntenseRestartDetector then check if the oldest restart record is pushed out
from the queue. If no record was pushed out, there are less number of restarts than limit, so it is not intensive.
If a record was pushed out, it means we had one more restarts than allowed. If the oldest restart and newest
restart happened within allowed time interval, it is intensive.
This function implements pure part of detectIntenseRestartNow
.
detectIntenseRestartNow Source #
:: IntenseRestartDetector | Intense restart detector containing history of past restart with maxT and maxR. |
-> IO (Bool, IntenseRestartDetector) |
Determine if intensive restart is happening now. It is called when restart is triggered by some thread termination.
data SupervisorMessage Source #
SupervisorMessage
defines all message types supervisor can receive.
Down ExitReason ThreadId | Notification of child thread termination. |
StartChild ChildSpec (ServerCallback ThreadId) | Command to start a new supervised thread. |
type SupervisorQueue = ActorQ SupervisorMessage Source #
Type synonym for write-end of supervisor's message queue.
type SupervisorInbox = Inbox SupervisorMessage Source #
Type synonym for read-end of supervisor's message queue.
:: SupervisorQueue | Inbox message queue of the supervisor. |
-> ThreadMap | Map of current live threads which the supervisor monitors. |
-> ChildSpec | Specification of the child thread to be started. |
-> IO ThreadId |
Start a new thread with supervision.
startAllSupervisedThread Source #
:: SupervisorQueue | Inbox message queue of the supervisor. |
-> ThreadMap | Map of current live threads which the supervisor monitors. |
-> [ChildSpec] | List of child specifications to be started. |
-> IO () |
Start all given ChildSpec
on new thread each with supervision.
killAllSupervisedThread :: SupervisorInbox -> ThreadMap -> IO () Source #
Kill all running threads supervised by the supervisor represented by SupervisorQueue
.
Restart strategy of supervisor
:: Strategy | Restarting strategy of monitored threads. |
-> RestartSensitivity | Restart intensity sensitivity in restart count and period. |
-> [ChildSpec] | List of supervised child specifications. |
-> ActorHandler SupervisorMessage () |
Create a supervisor.
When created supervisor IO action started, it automatically creates child
threads based on given ChildSpec
list and supervise them. After it
created such static children, it listens given SupervisorQueue
. User can
let the supervisor creates dynamic child thread by calling newChild
.
Dynamic child threads created by newChild
are also supervised.
When the supervisor thread is killed or terminated in some reason, all children including static children and dynamic children are all killed.
With OneForOne
restart strategy, when a child thread terminated, it is
restarted based on its restart type given in ChildSpec
. If the terminated
thread has Permanent
restart type, supervisor restarts it regardless its
exit reason. If the terminated thread has Transient
restart type, and
termination reason is other than Normal
(meaning UncaughtException
or
Killed
), it is restarted. If the terminated thread has Temporary
restart type, supervisor does not restart it regardless its exit reason.
Created IO action is designed to run in separate thread from main thread. If you try to run the IO action at main thread without having producer of the supervisor queue you gave, the supervisor will dead lock.
:: CallTimeout | Request timeout in microsecond. |
-> SupervisorQueue | Inbox message queue of the supervisor to ask new thread. |
-> ChildSpec | Child specification of the thread to spawn. |
-> IO (Maybe ThreadId) |
Ask the supervisor to spawn new temporary child thread. Returns ThreadId
of the new child.