Safe Haskell | None |
---|---|
Language | Haskell2010 |
Concurrent monads with a fixed scheduler: internal types and functions.
- runFixed :: (Functor n, Monad n) => Fixed n r s -> (forall x. s x -> IdSource -> n (Result x, IdSource, TTrace)) -> Scheduler ThreadId ThreadAction Lookahead g -> MemType -> g -> M n r s a -> n (Either Failure a, g, Trace ThreadId ThreadAction Lookahead)
- runFixed' :: forall n r s g a. (Functor n, Monad n) => Fixed n r s -> (forall x. s x -> IdSource -> n (Result x, IdSource, TTrace)) -> Scheduler ThreadId ThreadAction Lookahead g -> MemType -> g -> IdSource -> M n r s a -> n (Either Failure a, g, IdSource, Trace ThreadId ThreadAction Lookahead)
- newtype M n r s a = M {}
- data MVar r a = MVar {}
- data CRef r a = CRef {}
- data Ticket a = Ticket {
- _ticketCRef :: CRefId
- _ticketWrites :: Integer
- _ticketVal :: a
- type Fixed n r s = Ref n r (M n r s)
- cont :: ((a -> Action n r s) -> Action n r s) -> M n r s a
- runCont :: M n r s a -> (a -> Action n r s) -> Action n r s
- data Action n r s
- = AFork String ((forall b. M n r s b -> M n r s b) -> Action n r s) (ThreadId -> Action n r s)
- | AMyTId (ThreadId -> Action n r s)
- | AGetNumCapabilities (Int -> Action n r s)
- | ASetNumCapabilities Int (Action n r s)
- | forall a . ANewVar String (MVar r a -> Action n r s)
- | forall a . APutVar (MVar r a) a (Action n r s)
- | forall a . ATryPutVar (MVar r a) a (Bool -> Action n r s)
- | forall a . AReadVar (MVar r a) (a -> Action n r s)
- | forall a . ATakeVar (MVar r a) (a -> Action n r s)
- | forall a . ATryTakeVar (MVar r a) (Maybe a -> Action n r s)
- | forall a . ANewRef String a (CRef r a -> Action n r s)
- | forall a . AReadRef (CRef r a) (a -> Action n r s)
- | forall a . AReadRefCas (CRef r a) (Ticket a -> Action n r s)
- | forall a . APeekTicket (Ticket a) (a -> Action n r s)
- | forall a b . AModRef (CRef r a) (a -> (a, b)) (b -> Action n r s)
- | forall a b . AModRefCas (CRef r a) (a -> (a, b)) (b -> Action n r s)
- | forall a . AWriteRef (CRef r a) a (Action n r s)
- | forall a . ACasRef (CRef r a) (Ticket a) a ((Bool, Ticket a) -> Action n r s)
- | forall e . Exception e => AThrow e
- | forall e . Exception e => AThrowTo ThreadId e (Action n r s)
- | forall a e . Exception e => ACatching (e -> M n r s a) (M n r s a) (a -> Action n r s)
- | APopCatching (Action n r s)
- | forall a . AMasking MaskingState ((forall b. M n r s b -> M n r s b) -> M n r s a) (a -> Action n r s)
- | AResetMask Bool Bool MaskingState (Action n r s)
- | AKnowsAbout (Either MVarId TVarId) (Action n r s)
- | AForgets (Either MVarId TVarId) (Action n r s)
- | AAllKnown (Action n r s)
- | AMessage Dynamic (Action n r s)
- | forall a . AAtom (s a) (a -> Action n r s)
- | ALift (n (Action n r s))
- | AYield (Action n r s)
- | AReturn (Action n r s)
- | ACommit ThreadId CRefId
- | AStop
- data ThreadId = ThreadId (Maybe String) Int
- data MVarId = MVarId (Maybe String) Int
- data CRefId = CRefId (Maybe String) Int
- initialThread :: ThreadId
- data MemType
- type Scheduler tid action lookahead s = [(Decision tid, action)] -> Maybe (tid, action) -> NonEmpty (tid, lookahead) -> s -> (Maybe tid, s)
- type Trace tid action lookahead = [(Decision tid, [(tid, NonEmpty lookahead)], action)]
- data Decision tid :: * -> *
- data ThreadAction
- = Fork ThreadId
- | MyThreadId
- | GetNumCapabilities Int
- | SetNumCapabilities Int
- | Yield
- | NewVar MVarId
- | PutVar MVarId [ThreadId]
- | BlockedPutVar MVarId
- | TryPutVar MVarId Bool [ThreadId]
- | ReadVar MVarId
- | BlockedReadVar MVarId
- | TakeVar MVarId [ThreadId]
- | BlockedTakeVar MVarId
- | TryTakeVar MVarId Bool [ThreadId]
- | NewRef CRefId
- | ReadRef CRefId
- | ReadRefCas CRefId
- | PeekTicket CRefId
- | ModRef CRefId
- | ModRefCas CRefId
- | WriteRef CRefId
- | CasRef CRefId Bool
- | CommitRef ThreadId CRefId
- | STM TTrace [ThreadId]
- | BlockedSTM TTrace
- | Catching
- | PopCatching
- | Throw
- | ThrowTo ThreadId
- | BlockedThrowTo ThreadId
- | Killed
- | SetMasking Bool MaskingState
- | ResetMasking Bool MaskingState
- | Lift
- | Return
- | KnowsAbout
- | Forgets
- | AllKnown
- | Message Dynamic
- | Stop
- data Lookahead
- = WillFork
- | WillMyThreadId
- | WillGetNumCapabilities
- | WillSetNumCapabilities Int
- | WillYield
- | WillNewVar
- | WillPutVar MVarId
- | WillTryPutVar MVarId
- | WillReadVar MVarId
- | WillTakeVar MVarId
- | WillTryTakeVar MVarId
- | WillNewRef
- | WillReadRef CRefId
- | WillPeekTicket CRefId
- | WillReadRefCas CRefId
- | WillModRef CRefId
- | WillModRefCas CRefId
- | WillWriteRef CRefId
- | WillCasRef CRefId
- | WillCommitRef ThreadId CRefId
- | WillSTM
- | WillCatching
- | WillPopCatching
- | WillThrow
- | WillThrowTo ThreadId
- | WillSetMasking Bool MaskingState
- | WillResetMasking Bool MaskingState
- | WillLift
- | WillReturn
- | WillKnowsAbout
- | WillForgets
- | WillAllKnown
- | WillMessage Dynamic
- | WillStop
- isBlock :: ThreadAction -> Bool
- lookahead :: Action n r s -> NonEmpty Lookahead
- willRelease :: Lookahead -> Bool
- preEmpCount :: [(Decision ThreadId, ThreadAction)] -> (Decision ThreadId, Lookahead) -> Int
- showTrace :: Trace ThreadId ThreadAction Lookahead -> String
- showFail :: Failure -> String
- data ActionType
- isBarrier :: ActionType -> Bool
- isCommit :: ActionType -> CRefId -> Bool
- synchronises :: ActionType -> CRefId -> Bool
- crefOf :: ActionType -> Maybe CRefId
- cvarOf :: ActionType -> Maybe MVarId
- simplify :: ThreadAction -> ActionType
- simplify' :: Lookahead -> ActionType
- data Failure
Execution
runFixed :: (Functor n, Monad n) => Fixed n r s -> (forall x. s x -> IdSource -> n (Result x, IdSource, TTrace)) -> Scheduler ThreadId ThreadAction Lookahead g -> MemType -> g -> M n r s a -> n (Either Failure a, g, Trace ThreadId ThreadAction Lookahead) Source
runFixed' :: forall n r s g a. (Functor n, Monad n) => Fixed n r s -> (forall x. s x -> IdSource -> n (Result x, IdSource, TTrace)) -> Scheduler ThreadId ThreadAction Lookahead g -> MemType -> g -> IdSource -> M n r s a -> n (Either Failure a, g, IdSource, Trace ThreadId ThreadAction Lookahead) Source
The Conc
Monad
The underlying monad is based on continuations over Action
s.
One might wonder why the return type isn't reflected in Action
,
and a free monad formulation used. This would remove the need for a
Lift
action as the penultimate action of thread 0 used to
communicate back the result, and be more pleasing in a
sense. However, this makes the current expression of threads and
exception handlers very difficult (perhaps even not possible
without significant reworking), so I abandoned the attempt.
The concurrent variable type used with the Conc
monad. One
notable difference between these and MVar
s is that MVar
s are
single-wakeup, and wake up in a FIFO order. Writing to a MVar
wakes up all threads blocked on reading it, and it is up to the
scheduler which one runs next. Taking from a MVar
behaves
analogously.
The mutable non-blocking reference type. These are like IORef
s.
CRef
s are represented as a unique numeric identifier and a
reference containing (a) any thread-local non-synchronised writes
(so each thread sees its latest write), (b) a commit count (used in
compare-and-swaps), and (c) the current value visible to all
threads.
The compare-and-swap proof type.
Ticket
s are represented as just a wrapper around the identifier
of the CRef
it came from, the commit count at the time it was
produced, and an a
value. This doesn't work in the source package
(atomic-primops) because of the need to use pointer equality. Here
we can just pack extra information into CRef
to avoid that need.
Ticket | |
|
cont :: ((a -> Action n r s) -> Action n r s) -> M n r s a Source
Construct a continuation-passing operation from a function.
runCont :: M n r s a -> (a -> Action n r s) -> Action n r s Source
Run a CPS computation with the given final computation.
Primitive Actions
Scheduling is done in terms of a trace of Action
s. Blocking can
only occur as a result of an action, and they cover (most of) the
primitives of the concurrency. spawn
is absent as it is
implemented in terms of newEmptyMVar
, fork
, and putMVar
.
AFork String ((forall b. M n r s b -> M n r s b) -> Action n r s) (ThreadId -> Action n r s) | |
AMyTId (ThreadId -> Action n r s) | |
AGetNumCapabilities (Int -> Action n r s) | |
ASetNumCapabilities Int (Action n r s) | |
forall a . ANewVar String (MVar r a -> Action n r s) | |
forall a . APutVar (MVar r a) a (Action n r s) | |
forall a . ATryPutVar (MVar r a) a (Bool -> Action n r s) | |
forall a . AReadVar (MVar r a) (a -> Action n r s) | |
forall a . ATakeVar (MVar r a) (a -> Action n r s) | |
forall a . ATryTakeVar (MVar r a) (Maybe a -> Action n r s) | |
forall a . ANewRef String a (CRef r a -> Action n r s) | |
forall a . AReadRef (CRef r a) (a -> Action n r s) | |
forall a . AReadRefCas (CRef r a) (Ticket a -> Action n r s) | |
forall a . APeekTicket (Ticket a) (a -> Action n r s) | |
forall a b . AModRef (CRef r a) (a -> (a, b)) (b -> Action n r s) | |
forall a b . AModRefCas (CRef r a) (a -> (a, b)) (b -> Action n r s) | |
forall a . AWriteRef (CRef r a) a (Action n r s) | |
forall a . ACasRef (CRef r a) (Ticket a) a ((Bool, Ticket a) -> Action n r s) | |
forall e . Exception e => AThrow e | |
forall e . Exception e => AThrowTo ThreadId e (Action n r s) | |
forall a e . Exception e => ACatching (e -> M n r s a) (M n r s a) (a -> Action n r s) | |
APopCatching (Action n r s) | |
forall a . AMasking MaskingState ((forall b. M n r s b -> M n r s b) -> M n r s a) (a -> Action n r s) | |
AResetMask Bool Bool MaskingState (Action n r s) | |
AKnowsAbout (Either MVarId TVarId) (Action n r s) | |
AForgets (Either MVarId TVarId) (Action n r s) | |
AAllKnown (Action n r s) | |
AMessage Dynamic (Action n r s) | |
forall a . AAtom (s a) (a -> Action n r s) | |
ALift (n (Action n r s)) | |
AYield (Action n r s) | |
AReturn (Action n r s) | |
ACommit ThreadId CRefId | |
AStop |
Identifiers
Every live thread has a unique identitifer.
Every MVar
has a unique identifier.
Every CRef
has a unique identifier.
initialThread :: ThreadId Source
The ID of the initial thread.
Memory Models
The memory model to use for non-synchronised CRef
operations.
SequentialConsistency | The most intuitive model: a program behaves as a simple
interleaving of the actions in different threads. When a |
TotalStoreOrder | Each thread has a write buffer. A thread sees its writes immediately, but other threads will only see writes when they are committed, which may happen later. Writes are committed in the same order that they are created. |
PartialStoreOrder | Each |
Scheduling & Traces
type Scheduler tid action lookahead s = [(Decision tid, action)] -> Maybe (tid, action) -> NonEmpty (tid, lookahead) -> s -> (Maybe tid, s)
A Scheduler
drives the execution of a concurrent program. The
parameters it takes are:
- The trace so far.
- The last thread executed (if this is the first invocation, this
is
Nothing
). - The runnable threads at this point.
- The state.
It returns a thread to execute, or Nothing
if execution should
abort here, and also a new state.
type Trace tid action lookahead = [(Decision tid, [(tid, NonEmpty lookahead)], action)]
One of the outputs of the runner is a Trace
, which is a log of
decisions made, all the runnable threads and what they would do,
and the action a thread took in its step.
data Decision tid :: * -> *
Scheduling decisions are based on the state of the running program, and so we can capture some of that state in recording what specific decision we made.
data ThreadAction Source
All the actions that a thread can perform.
Fork ThreadId | Start a new thread. |
MyThreadId | Get the |
GetNumCapabilities Int | Get the number of Haskell threads that can run simultaneously. |
SetNumCapabilities Int | Set the number of Haskell threads that can run simultaneously. |
Yield | Yield the current thread. |
NewVar MVarId | Create a new |
PutVar MVarId [ThreadId] | Put into a |
BlockedPutVar MVarId | Get blocked on a put. |
TryPutVar MVarId Bool [ThreadId] | Try to put into a |
ReadVar MVarId | Read from a |
BlockedReadVar MVarId | Get blocked on a read. |
TakeVar MVarId [ThreadId] | Take from a |
BlockedTakeVar MVarId | Get blocked on a take. |
TryTakeVar MVarId Bool [ThreadId] | Try to take from a |
NewRef CRefId | Create a new |
ReadRef CRefId | Read from a |
ReadRefCas CRefId | Read from a |
PeekTicket CRefId | Extract the value from a |
ModRef CRefId | Modify a |
ModRefCas CRefId | Modify a |
WriteRef CRefId | Write to a |
CasRef CRefId Bool | Attempt to to a |
CommitRef ThreadId CRefId | Commit the last write to the given |
STM TTrace [ThreadId] | An STM transaction was executed, possibly waking up some threads. |
BlockedSTM TTrace | Got blocked in an STM transaction. |
Catching | Register a new exception handler |
PopCatching | Pop the innermost exception handler from the stack. |
Throw | Throw an exception. |
ThrowTo ThreadId | Throw an exception to a thread. |
BlockedThrowTo ThreadId | Get blocked on a |
Killed | Killed by an uncaught exception. |
SetMasking Bool MaskingState | Set the masking state. If |
ResetMasking Bool MaskingState | Return to an earlier masking state. If |
Lift | Lift an action from the underlying monad. Note that the
penultimate action in a trace will always be a |
Return | |
KnowsAbout | A |
Forgets | A |
AllKnown | A |
Message Dynamic | A |
Stop | Cease execution and terminate. |
A one-step look-ahead at what a thread will do next.
WillFork | Will start a new thread. |
WillMyThreadId | Will get the |
WillGetNumCapabilities | Will get the number of Haskell threads that can run simultaneously. |
WillSetNumCapabilities Int | Will set the number of Haskell threads that can run simultaneously. |
WillYield | Will yield the current thread. |
WillNewVar | Will create a new |
WillPutVar MVarId | Will put into a |
WillTryPutVar MVarId | Will try to put into a |
WillReadVar MVarId | Will read from a |
WillTakeVar MVarId | Will take from a |
WillTryTakeVar MVarId | Will try to take from a |
WillNewRef | Will create a new |
WillReadRef CRefId | Will read from a |
WillPeekTicket CRefId | Will extract the value from a |
WillReadRefCas CRefId | Will read from a |
WillModRef CRefId | Will modify a |
WillModRefCas CRefId | Will nodify a |
WillWriteRef CRefId | Will write to a |
WillCasRef CRefId | Will attempt to to a |
WillCommitRef ThreadId CRefId | Will commit the last write by the given thread to the |
WillSTM | Will execute an STM transaction, possibly waking up some threads. |
WillCatching | Will register a new exception handler |
WillPopCatching | Will pop the innermost exception handler from the stack. |
WillThrow | Will throw an exception. |
WillThrowTo ThreadId | Will throw an exception to a thread. |
WillSetMasking Bool MaskingState | Will set the masking state. If |
WillResetMasking Bool MaskingState | Will return to an earlier masking state. If |
WillLift | Will lift an action from the underlying monad. Note that the
penultimate action in a trace will always be a |
WillReturn | |
WillKnowsAbout | Will process a |
WillForgets | Will process a |
WillAllKnown | Will process a |
WillMessage Dynamic | Will process a _concMessage' annotation. |
WillStop | Will cease execution and terminate. |
isBlock :: ThreadAction -> Bool Source
Check if a ThreadAction
immediately blocks.
lookahead :: Action n r s -> NonEmpty Lookahead Source
Look as far ahead in the given continuation as possible.
willRelease :: Lookahead -> Bool Source
Check if an operation could enable another thread.
preEmpCount :: [(Decision ThreadId, ThreadAction)] -> (Decision ThreadId, Lookahead) -> Int Source
showTrace :: Trace ThreadId ThreadAction Lookahead -> String Source
Pretty-print a trace, including a key of the thread IDs. Each line of the key is indented by two spaces.
Synchronised and Unsynchronised Actions
data ActionType Source
A simplified view of the possible actions a thread can perform.
UnsynchronisedRead CRefId | A |
UnsynchronisedWrite CRefId | A |
UnsynchronisedOther | Some other action which doesn't require cross-thread communication. |
PartiallySynchronisedCommit CRefId | A commit. |
PartiallySynchronisedWrite CRefId | A |
PartiallySynchronisedModify CRefId | A |
SynchronisedModify CRefId | An |
SynchronisedRead MVarId | A |
SynchronisedWrite MVarId | A |
SynchronisedOther | Some other action which does require cross-thread communication. |
isBarrier :: ActionType -> Bool Source
Check if an action imposes a write barrier.
synchronises :: ActionType -> CRefId -> Bool Source
Check if an action synchronises a given CRef
.
simplify :: ThreadAction -> ActionType Source
Throw away information from a ThreadAction
and give a
simplified view of what is happening.
This is used in the SCT code to help determine interesting alternative scheduling decisions.
Failures
An indication of how a concurrent computation failed.
InternalError | Will be raised if the scheduler does something bad. This should never arise unless you write your own, faulty, scheduler! If it does, please file a bug report. |
Abort | The scheduler chose to abort execution. This will be produced if, for example, all possible decisions exceed the specified bounds (there have been too many pre-emptions, the computation has executed for too long, or there have been too many yields). |
Deadlock | The computation became blocked indefinitely on |
STMDeadlock | The computation became blocked indefinitely on |
UncaughtException | An uncaught exception bubbled to the top of the computation. |