thread-supervisor-0.1.0.0: A simplified implementation of Erlang/OTP like supervisor over thread

Copyright(c) Naoto Shimazaki 2018-2020
LicenseMIT (see the file LICENSE)
Maintainerhttps://github.com/nshimaza
Stabilityexperimental
Safe HaskellNone
LanguageHaskell2010

Control.Concurrent.SupervisorInternal

Description

A simplified implementation of Erlang/OTP like supervisor over thread. This is internal module where all real implementations present.

Synopsis

Documentation

data Inbox a Source #

Message queue abstraction.

Constructors

Inbox 

Fields

newtype InboxLength Source #

Maximum length of Inbox.

Constructors

InboxLength Word 
Instances
Default InboxLength Source # 
Instance details

Defined in Control.Concurrent.SupervisorInternal

Methods

def :: InboxLength #

newtype ActorQ a Source #

Write end of Inbox exposed to outside of actor.

Constructors

ActorQ (Inbox a) 

newInbox :: InboxLength -> IO (Inbox a) Source #

Create a new empty Inbox.

send Source #

Arguments

:: ActorQ a

Write-end of target actor's message queue.

-> a

Message to be sent.

-> IO () 

Send a message to given ActorQ. Block while the queue is full.

trySend Source #

Arguments

:: ActorQ a

Write-end of target actor's message queue.

-> a

Message to be sent.

-> IO (Maybe ()) 

Try to send a message to given ActorQ. Return Nothing if the queue is already full.

length :: ActorQ a -> IO Word Source #

Number of elements currently held by the ActorQ.

receiveSelect Source #

Arguments

:: (a -> Bool)

Predicate to pick a interesting message.

-> Inbox a

Message queue where interesting message searched for.

-> IO a 

Perform selective receive from given Inbox.

receiveSelect searches given queue for first interesting message predicated by user supplied function. It applies the predicate to the queue, returns the first element that satisfy the predicate, and remove the element from the Inbox.

It blocks until interesting message arrived if no interesting message was found in the queue.

Caution

Use this function with care. It does NOT discard any message unsatisfying predicate. It keeps them in the queue for future receive and the function itself blocks until interesting message arrived. That causes your queue filled up by non-interesting messages. There is no escape hatch.

Consider using tryReceiveSelect instead.

Caveat

Current implementation has performance caveat. It has O(n) performance characteristic where n is number of messages existing before your interested message appears. It is because this function performs liner scan from the top of the queue every time it is called. It doesn't cache pair of predicates and results you have given before.

Use this function in limited situation only.

tryReceiveSelect Source #

Arguments

:: (a -> Bool)

Predicate to pick a interesting message.

-> Inbox a

Message queue where interesting message searched for.

-> IO (Maybe a) 

Try to perform selective receive from given Inbox.

tryReceiveSelect searches given queue for first interesting message predicated by user supplied function. It applies the predicate to the queue, returns the first element that satisfy the predicate, and remove the element from the Inbox.

It return Nothing if there is no interesting message found in the queue.

Caveat

Current implementation has performance caveat. It has O(n) performance characteristic where n is number of messages existing before your interested message appears. It is because this function performs liner scan from the top of the queue every time it is called. It doesn't cache pair of predicates and results you have given before.

Use this function in limited situation only.

pickFromSaveStack :: (a -> Bool) -> [a] -> Maybe (a, [a]) Source #

Find oldest message satisfying predicate from saveStack, return the message, and remove it from the saveStack. Returns Nothing if there is no satisfying message.

oneMessageRemoved Source #

Arguments

:: TVar Word

TVar holding current number of messages in the queue.

-> TVar [a]

TVar to hold given new saveStack.

-> [a]

New saveStack to update given TVar.

-> STM () 

Update Inbox with new saveStack which already have one message removed.

receive :: Inbox a -> IO a Source #

Receive first message in Inbox. Block until message available.

tryReceive :: Inbox a -> IO (Maybe a) Source #

Try to receive first message in Inbox. It returns Nothing if there is no message available.

type ActorHandler message result = Inbox message -> IO result Source #

Type synonym of user supplied message handler working inside actor.

data Actor message result Source #

Actor representation.

Constructors

Actor 

Fields

newActor Source #

Arguments

:: ActorHandler message result

IO action handling received messages.

-> IO (Actor message result) 

Create a new actor.

Users have to supply a message handler function with ActorHandler type. ActorHandler accepts a Inbox and returns anything.

newActor creates a new Inbox, apply user supplied message handler to the queue, returns reference to write-end of the queue and IO action of the actor. Because newActor only returns ActorQ, caller of newActor can only send messages to created actor. Caller cannot receive message from the queue.

Inbox, or read-end of the queue, is passed to user supplied message handler so the handler can receive message to the actor. If the handler need to send a message to itself, wrap the message queue by ActorQ constructor then use send over created ActorQ. Here is an example.

send (ActorQ yourInbox) message

newBoundedActor Source #

Arguments

:: InboxLength

Maximum length of inbox message queue.

-> ActorHandler message result

IO action handling received messages.

-> IO (Actor message result) 

Create a new actor with bounded inbox queue.

newStateMachine Source #

Arguments

:: state

Initial state of the state machine.

-> (state -> message -> IO (Either result state))

Message handler which processes event and returns result or next state.

-> ActorHandler message result 

Create a new finite state machine.

The state machine waits for new message at Inbox then callback user supplied message handler. The message handler must return Right with new state or Left with final result. When Right is returned, the state machine waits for next message. When Left is returned, the state machine terminates and returns the result.

newStateMachine returns an IO action wrapping the state machine described above. The returned IO action can be executed within an Async or bare thread.

Created IO action is designed to run in separate thread from main thread. If you try to run the IO action at main thread without having producer of the message queue you gave, the state machine will dead lock.

newStateMachineActor :: state -> (state -> message -> IO (Either result state)) -> IO (Actor message result) Source #

create an unbound actor of newStateMachine. Short-cut of following.

newActor $ newStateMachine initialState messageHandler

type ServerCallback a = a -> IO () Source #

Type synonym of callback function to obtain return value.

cast Source #

Arguments

:: ActorQ cmd

Message queue of the target server.

-> cmd

Request to the server.

-> IO () 

Send an asynchronous request to a server.

newtype CallTimeout Source #

Timeout of call method for server behavior in microseconds. Default is 5 second.

Constructors

CallTimeout Int 
Instances
Default CallTimeout Source # 
Instance details

Defined in Control.Concurrent.SupervisorInternal

Methods

def :: CallTimeout #

call Source #

Arguments

:: CallTimeout

Timeout.

-> ActorQ cmd

Message queue of the target server.

-> (ServerCallback a -> cmd)

Request to the server without callback supplied.

-> IO (Maybe a) 

Send an synchronous request to a server and waits for a return value until timeout.

callAsync Source #

Arguments

:: CallTimeout

Timeout.

-> ActorQ cmd

Message queue.

-> (ServerCallback a -> cmd)

Request to the server without callback supplied.

-> (Maybe a -> IO b)

callback to process return value of the call. Nothing is given on timeout.

-> IO (Async b) 

Make an asynchronous call to a server and give result in CPS style. The return value is delivered to given callback function. It also can fail by timeout. Calling thread can wait for a return value from the callback.

Use this function with care because there is no guaranteed cancellation of background worker thread other than timeout. Giving infinite timeout (zero) to the CallTimeout argument may cause the background thread left to run, possibly indefinitely.

callIgnore Source #

Arguments

:: ActorQ cmd

Message queue of the target server.

-> (ServerCallback a -> cmd)

Request to the server without callback supplied.

-> IO () 

Send an request to a server but ignore return value.

data ExitReason Source #

ExitReason indicates reason of thread termination.

Constructors

Normal

Thread was normally finished.

UncaughtException SomeException

A synchronous exception was thrown and it was not caught. This indicates some unhandled error happened inside of the thread handler.

Killed

An asynchronous exception was thrown. This also happen when the thread was killed by supervisor.

type Monitor Source #

Arguments

 = ExitReason

Reason of thread termination.

-> ThreadId

ID of terminated thread.

-> IO () 

Monitor is user supplied callback function which is called when monitored thread is terminated.

type MonitoredAction = (IO () -> IO ()) -> IO () Source #

MonitoredAction is type synonym of function with callback on termination installed. Its type signature fits to argument for forkIOWithUnmask.

watch :: Monitor -> IO () -> MonitoredAction Source #

Install Monitor callback function to simple IO action.

noWatch :: IO () -> MonitoredAction Source #

Convert simple IO action to MonitoredAction without installing Monitor.

nestWatch :: Monitor -> MonitoredAction -> MonitoredAction Source #

Install another Monitor callback function to MonitoredAction.

data Restart Source #

Restart defines when terminated child thread triggers restart operation by its supervisor. Restart only defines when it triggers restart operation. It does not directly means if the thread will be or will not be restarted. It is determined by restart strategy of supervisor. For example, a static Temporary child never triggers restart on its termination but static Temporary child will be restarted if another Permanent or Transient thread with common supervisor triggered restart operation and the supervisor has OneForAll strategy.

Constructors

Permanent

Permanent thread always triggers restart.

Transient

Transient thread triggers restart only if it was terminated by exception.

Temporary

Temporary thread never triggers restart.

Instances
Eq Restart Source # 
Instance details

Defined in Control.Concurrent.SupervisorInternal

Methods

(==) :: Restart -> Restart -> Bool #

(/=) :: Restart -> Restart -> Bool #

Show Restart Source # 
Instance details

Defined in Control.Concurrent.SupervisorInternal

data ChildSpec Source #

ChildSpec is representation of IO action which can be supervised by supervisor. Supervisor can run the IO action with separate thread, watch its termination and restart it based on restart type.

newMonitoredChildSpec Source #

Arguments

:: Restart

Restart type of resulting ChildSpec. One of Permanent, Transient or Temporary.

-> MonitoredAction

User supplied monitored IO action which the ChildSpec actually does.

-> ChildSpec 

Create a ChildSpec from MonitoredAction.

newChildSpec Source #

Arguments

:: Restart

Restart type of resulting ChildSpec. One of Permanent, Transient or Temporary.

-> IO ()

User supplied IO action which the ChildSpec actually does.

-> ChildSpec 

Create a ChildSpec from plain IO action.

addMonitor Source #

Arguments

:: Monitor

Callback function called when the IO action of the ChildSpec terminated.

-> ChildSpec

Existing ChildSpec where the Monitor is going to be added.

-> ChildSpec

Newly created ChildSpec with the Monitor added.

Add a Monitor function to existing ChildSpec.

type ThreadMap = IORef (Map ThreadId ChildSpec) Source #

ThreadMap is mutable variable which holds pool of living threads and ChildSpec of each thread. ThreadMap is used inside of supervisor only.

newThread Source #

Arguments

:: ThreadMap

Map of current live threads where the new thread is going to be added.

-> ChildSpec

Specification of newly started thread.

-> IO ThreadId

Thread ID of forked thread.

Start new thread based on given ChildSpec, register the thread to given ThreadMap then returns ThreadId of the thread.

data RestartSensitivity Source #

RestartSensitivity defines condition how supervisor determines intensive restart is happening. If more than restartSensitivityIntensity time of restart is triggered within restartSensitivityPeriod, supervisor decides intensive restart is happening and it terminates itself. Default intensity (maximum number of acceptable restart) is 1. Default period is 5 seconds.

Constructors

RestartSensitivity 

Fields

data IntenseRestartDetector Source #

IntenseRestartDetector keeps data used for detecting intense restart. It keeps maxR (maximum restart intensity), maxT (period of majoring restart intensity) and history of restart with system timestamp in Monotonic form.

Constructors

IntenseRestartDetector 

Fields

newIntenseRestartDetector :: RestartSensitivity -> IntenseRestartDetector Source #

Create new IntenseRestartDetector with given RestartSensitivity parameters.

detectIntenseRestart Source #

Arguments

:: IntenseRestartDetector

Intense restart detector containing history of past restart with maxT and maxR

-> TimeSpec

System timestamp in Monotonic form when the last restart was triggered.

-> (Bool, IntenseRestartDetector)

Returns True if intensive restart is happening. Returns new history of restart which has the oldest history removed if possible.

Determine if the last restart results intensive restart. It pushes the last restart timestamp to the DelayedQueue of restart history held inside of the IntenseRestartDetector then check if the oldest restart record is pushed out from the queue. If no record was pushed out, there are less number of restarts than limit, so it is not intensive. If a record was pushed out, it means we had one more restarts than allowed. If the oldest restart and newest restart happened within allowed time interval, it is intensive.

This function implements pure part of detectIntenseRestartNow.

getCurrentTime :: IO TimeSpec Source #

Get current system timestamp in Monotonic form.

detectIntenseRestartNow Source #

Arguments

:: IntenseRestartDetector

Intense restart detector containing history of past restart with maxT and maxR.

-> IO (Bool, IntenseRestartDetector) 

Determine if intensive restart is happening now. It is called when restart is triggered by some thread termination.

data SupervisorMessage Source #

SupervisorMessage defines all message types supervisor can receive.

Constructors

Down ExitReason ThreadId

Notification of child thread termination.

StartChild ChildSpec (ServerCallback ThreadId)

Command to start a new supervised thread.

type SupervisorQueue = ActorQ SupervisorMessage Source #

Type synonym for write-end of supervisor's message queue.

type SupervisorInbox = Inbox SupervisorMessage Source #

Type synonym for read-end of supervisor's message queue.

newSupervisedThread Source #

Arguments

:: SupervisorQueue

Inbox message queue of the supervisor.

-> ThreadMap

Map of current live threads which the supervisor monitors.

-> ChildSpec

Specification of the child thread to be started.

-> IO ThreadId 

Start a new thread with supervision.

startAllSupervisedThread Source #

Arguments

:: SupervisorQueue

Inbox message queue of the supervisor.

-> ThreadMap

Map of current live threads which the supervisor monitors.

-> [ChildSpec]

List of child specifications to be started.

-> IO () 

Start all given ChildSpec on new thread each with supervision.

killAllSupervisedThread :: SupervisorInbox -> ThreadMap -> IO () Source #

Kill all running threads supervised by the supervisor represented by SupervisorQueue.

data Strategy Source #

Restart strategy of supervisor

Constructors

OneForOne

Restart only exited thread.

OneForAll

Restart all threads supervised by the same supervisor of exited thread.

newSimpleOneForOneSupervisor :: ActorHandler SupervisorMessage () Source #

Create a supervisor with OneForOne restart strategy and has no static ChildSpec. When it started, it has no child threads. Only newChild can add new thread supervised by the supervisor. Thus the simple one-for-one supervisor only manages dynamic and Temporary children.

newSupervisor Source #

Arguments

:: Strategy

Restarting strategy of monitored threads. OneForOne or OneForAll.

-> RestartSensitivity

Restart intensity sensitivity in restart count and period.

-> [ChildSpec]

List of supervised child specifications.

-> ActorHandler SupervisorMessage () 

Create a supervisor.

When created supervisor IO action started, it automatically creates child threads based on given ChildSpec list and supervise them. After it created such static children, it listens given SupervisorQueue. User can let the supervisor creates dynamic child thread by calling newChild. Dynamic child threads created by newChild are also supervised.

When the supervisor thread is killed or terminated in some reason, all children including static children and dynamic children are all killed.

With OneForOne restart strategy, when a child thread terminated, it is restarted based on its restart type given in ChildSpec. If the terminated thread has Permanent restart type, supervisor restarts it regardless its exit reason. If the terminated thread has Transient restart type, and termination reason is other than Normal (meaning UncaughtException or Killed), it is restarted. If the terminated thread has Temporary restart type, supervisor does not restart it regardless its exit reason.

Created IO action is designed to run in separate thread from main thread. If you try to run the IO action at main thread without having producer of the supervisor queue you gave, the supervisor will dead lock.

newChild Source #

Arguments

:: CallTimeout

Request timeout in microsecond.

-> SupervisorQueue

Inbox message queue of the supervisor to ask new thread.

-> ChildSpec

Child specification of the thread to spawn.

-> IO (Maybe ThreadId) 

Ask the supervisor to spawn new temporary child thread. Returns ThreadId of the new child.