module Control.Eff.Concurrent.Process.ForkIOScheduler
( schedule
, defaultMain
, defaultMainWithLogChannel
, ProcEff
, InterruptableProcEff
, SchedulerIO
, HasSchedulerIO
, forkIoScheduler
)
where
import Control.Exception.Safe as Safe
import qualified Control.Eff.ExceptionExtra as ExcExtra ()
import Control.Concurrent.STM as STM
import Control.Concurrent (yield)
import qualified Control.Concurrent.Async as Async
import Control.Concurrent.Async (Async(..))
import Control.Eff
import Control.Eff.Extend
import Control.Eff.Concurrent.Process
import Control.Eff.Lift
import Control.Eff.Log
import Control.Eff.Reader.Strict as Reader
import Control.Lens
import Control.Monad ( when
, void
, (>=>)
)
import Control.Monad.Trans.Control (MonadBaseControl(..))
import Data.Default
import Data.Dynamic
import Data.Foldable
import Data.Kind ( )
import Data.Map ( Map )
import qualified Data.Map as Map
import Data.Set ( Set )
import qualified Data.Set as Set
import Data.Maybe
import Data.Sequence ( Seq(..) )
import qualified Data.Sequence as Seq
import GHC.Stack
import Text.Printf
import System.Timeout
data MessageQ = MessageQ { _incomingMessages :: Seq Dynamic
, _shutdownRequests :: Maybe SomeExitReason
}
instance Default MessageQ where def = MessageQ def def
makeLenses ''MessageQ
tryTakeNextShutdownRequestSTM
:: TVar MessageQ -> STM (Maybe SomeExitReason)
tryTakeNextShutdownRequestSTM mqVar = do
mq <- readTVar mqVar
when (isJust (mq^.shutdownRequests))
(writeTVar mqVar (mq & shutdownRequests .~ Nothing))
return (mq^.shutdownRequests)
data ProcessInfo =
ProcessInfo { _processId :: ProcessId
, _processState :: TVar ProcessState
, _messageQ :: TVar MessageQ
, _processLinks :: TVar (Set ProcessId)
}
makeLenses ''ProcessInfo
data SchedulerState =
SchedulerState { _nextPid :: TVar ProcessId
, _processTable :: TVar (Map ProcessId ProcessInfo)
, _processCancellationTable
:: TVar (Map ProcessId
(Async (ExitReason 'NoRecovery)))
, _processMonitors :: TVar (Set (MonitorReference, ProcessId))
, _nextMonitorIndex :: TVar Int
}
makeLenses ''SchedulerState
addMonitoring :: ProcessId -> ProcessId -> SchedulerState -> STM MonitorReference
addMonitoring target owner schedulerState = do
mi <- readTVar (schedulerState ^. nextMonitorIndex)
modifyTVar' (schedulerState ^. nextMonitorIndex) (+1)
let mref = MonitorReference mi target
when (target /= owner) $ do
pt <- readTVar (schedulerState ^. processTable)
if Map.member target pt then
modifyTVar' (schedulerState ^. processMonitors) (Set.insert (mref, owner))
else
let pdown = (ProcessDown mref (SomeExitReason (ProcessNotRunning target)))
in enqueueMessageOtherProcess owner (toDyn pdown) schedulerState
return mref
removeMonitoring :: MonitorReference -> SchedulerState -> STM ()
removeMonitoring mref schedulerState =
modifyTVar' (schedulerState ^. processMonitors)
(Set.filter (\(ref,_) -> ref /= mref))
triggerAndRemoveMonitor :: ProcessId -> SomeExitReason -> SchedulerState -> STM ()
triggerAndRemoveMonitor downPid reason schedulerState = do
monRefs <- readTVar (schedulerState ^. processMonitors)
traverse_ go monRefs
where
go (mr, owner) =
when (monitoredProcess mr == downPid) (do
let pdown = ProcessDown mr reason
enqueueMessageOtherProcess owner (toDyn pdown) schedulerState
removeMonitoring mr schedulerState)
instance Show ProcessInfo where
show p = "process info: " ++ show (p ^. processId)
newProcessInfo :: HasCallStack => ProcessId -> STM ProcessInfo
newProcessInfo a =
ProcessInfo a
<$> newTVar ProcessBooting
<*> newTVar def
<*> newTVar def
newSchedulerState :: HasCallStack => STM SchedulerState
newSchedulerState =
SchedulerState
<$> newTVar 1
<*> newTVar def
<*> newTVar def
<*> newTVar def
<*> newTVar def
withNewSchedulerState
:: (HasLogging IO SchedulerIO, HasCallStack)
=> Eff SchedulerIO ()
-> Eff LoggingAndIO ()
withNewSchedulerState mainProcessAction =
Safe.bracketWithError
(lift (atomically newSchedulerState))
(\exceptions schedulerState -> do
traverse_
(logError . ("scheduler setup crashed with: " ++) . Safe.displayException)
exceptions
logDebug "scheduler cleanup begin"
runReader schedulerState tearDownScheduler
)
(\schedulerState -> do
logDebug "scheduler loop entered"
x <- runReader schedulerState mainProcessAction
logDebug "scheduler loop returned"
return x
)
where
tearDownScheduler = do
schedulerState <- getSchedulerState
let cancelTableVar = schedulerState ^. processCancellationTable
allProcesses <- lift (atomically (readTVar cancelTableVar <* writeTVar cancelTableVar def))
logNotice ("cancelling processes: " ++ show (toListOf (ifolded.asIndex) allProcesses))
void
(liftBaseWith
(\runS -> timeout 5000000
(Async.mapConcurrently
(\a -> do
Async.cancel a
runS (logNotice ("process cancelled: "++ show (asyncThreadId a)))
)
allProcesses
>> runS (logNotice "all processes cancelled"))))
type ProcEff = ConsProcess SchedulerIO
type InterruptableProcEff = InterruptableProcess SchedulerIO
type HasSchedulerIO r = ( HasCallStack
, Lifted IO r
, SchedulerIO <:: r
)
type SchedulerIO = ( Reader SchedulerState : LoggingAndIO)
type LoggingAndIO =
'[ Logs LogMessage
, LogWriterReader LogMessage IO
, Lift IO
]
defaultMain :: HasCallStack => Eff InterruptableProcEff () -> IO ()
defaultMain c =
withAsyncLogChannel (1024 :: Int)
(multiMessageLogWriter ($ printLogMessage))
(handleLoggingAndIO_ (schedule c))
defaultMainWithLogChannel
:: HasCallStack => Eff InterruptableProcEff () -> LogChannel LogMessage -> IO ()
defaultMainWithLogChannel = handleLoggingAndIO_ . schedule
forkIoScheduler :: SchedulerProxy SchedulerIO
forkIoScheduler = SchedulerProxy
handleProcess
:: (HasLogging IO SchedulerIO, HasCallStack)
=> ProcessInfo
-> Eff ProcEff (ExitReason 'NoRecovery)
-> Eff SchedulerIO (ExitReason 'NoRecovery)
handleProcess myProcessInfo =
handle_relay_s
0
(const return)
(\ !nextRef !request k ->
stepProcessInterpreter nextRef request k return
)
where
myPid = myProcessInfo ^. processId
myProcessStateVar = myProcessInfo ^. processState
setMyProcessState = lift . atomically . setMyProcessStateSTM
setMyProcessStateSTM = writeTVar myProcessStateVar
myMessageQVar = myProcessInfo ^. messageQ
kontinueWith
:: forall s v a
. HasCallStack
=> (s -> Arr SchedulerIO v a)
-> s
-> Arr SchedulerIO v a
kontinueWith kontinue !nextRef !result = do
setMyProcessState ProcessIdle
lift yield
kontinue nextRef result
diskontinueWith
:: forall a
. HasCallStack
=> Arr SchedulerIO (ExitReason 'NoRecovery) a
-> Arr SchedulerIO (ExitReason 'NoRecovery) a
diskontinueWith diskontinue !reason = do
setMyProcessState ProcessShuttingDown
diskontinue reason
stepProcessInterpreter
:: forall v a
. HasCallStack
=> Int
-> Process SchedulerIO v
-> (Int -> Arr SchedulerIO v a)
-> Arr SchedulerIO (ExitReason 'NoRecovery) a
-> Eff SchedulerIO a
stepProcessInterpreter !nextRef !request kontinue diskontinue =
tryTakeNextShutdownRequest
>>= maybe noShutdownRequested
(either onShutdownRequested onInterruptRequested
. fromSomeExitReason)
where
tryTakeNextShutdownRequest =
lift (atomically (tryTakeNextShutdownRequestSTM myMessageQVar))
onShutdownRequested shutdownRequest = do
logDebug ("shutdown requested: " ++ show shutdownRequest)
setMyProcessState ProcessShuttingDown
interpretRequestAfterShutdownRequest
(diskontinueWith diskontinue)
shutdownRequest
request
onInterruptRequested interruptRequest = do
logDebug ("interrupt requested: " ++ show interruptRequest)
setMyProcessState ProcessShuttingDown
interpretRequestAfterInterruptRequest
(kontinueWith kontinue nextRef)
(diskontinueWith diskontinue)
interruptRequest
request
noShutdownRequested = do
setMyProcessState ProcessBusy
interpretRequest
(kontinueWith kontinue)
(diskontinueWith diskontinue)
nextRef
request
interpretRequestAfterShutdownRequest
:: forall v a
. HasCallStack
=> Arr SchedulerIO (ExitReason 'NoRecovery) a
-> (ExitReason 'NoRecovery)
-> Process SchedulerIO v
-> Eff SchedulerIO a
interpretRequestAfterShutdownRequest diskontinue shutdownRequest =
\case
SendMessage _ _ -> diskontinue shutdownRequest
SendInterrupt _ _ -> diskontinue shutdownRequest
SendShutdown toPid r ->
if toPid == myPid
then diskontinue r
else diskontinue shutdownRequest
Spawn _ -> diskontinue shutdownRequest
SpawnLink _ -> diskontinue shutdownRequest
ReceiveSelectedMessage _ -> diskontinue shutdownRequest
FlushMessages -> diskontinue shutdownRequest
SelfPid -> diskontinue shutdownRequest
MakeReference -> diskontinue shutdownRequest
YieldProcess -> diskontinue shutdownRequest
Shutdown r -> diskontinue r
GetProcessState _ -> diskontinue shutdownRequest
Monitor _ -> diskontinue shutdownRequest
Demonitor _ -> diskontinue shutdownRequest
Link _ -> diskontinue shutdownRequest
Unlink _ -> diskontinue shutdownRequest
interpretRequestAfterInterruptRequest
:: forall v a
. HasCallStack
=> Arr SchedulerIO v a
-> Arr SchedulerIO (ExitReason 'NoRecovery) a
-> (ExitReason 'Recoverable)
-> Process SchedulerIO v
-> Eff SchedulerIO a
interpretRequestAfterInterruptRequest kontinue diskontinue interruptRequest =
\case
SendMessage _ _ -> kontinue (Interrupted interruptRequest)
SendInterrupt _ _ -> kontinue (Interrupted interruptRequest)
SendShutdown toPid r ->
if toPid == myPid
then diskontinue r
else kontinue (Interrupted interruptRequest)
Spawn _ -> kontinue (Interrupted interruptRequest)
SpawnLink _ -> kontinue (Interrupted interruptRequest)
ReceiveSelectedMessage _ -> kontinue (Interrupted interruptRequest)
FlushMessages -> kontinue (Interrupted interruptRequest)
SelfPid -> kontinue (Interrupted interruptRequest)
MakeReference -> kontinue (Interrupted interruptRequest)
YieldProcess -> kontinue (Interrupted interruptRequest)
Shutdown r -> diskontinue r
GetProcessState _ -> kontinue (Interrupted interruptRequest)
Monitor _ -> kontinue (Interrupted interruptRequest)
Demonitor _ -> kontinue (Interrupted interruptRequest)
Link _ -> kontinue (Interrupted interruptRequest)
Unlink _ -> kontinue (Interrupted interruptRequest)
interpretRequest
:: forall v a
. HasCallStack
=> (Int -> Arr SchedulerIO v a)
-> Arr SchedulerIO (ExitReason 'NoRecovery) a
-> Int
-> Process SchedulerIO v
-> Eff SchedulerIO a
interpretRequest kontinue diskontinue nextRef =
\case
SendMessage toPid msg -> interpretSend toPid msg >>= kontinue nextRef . ResumeWith
SendInterrupt toPid msg ->
if toPid == myPid
then kontinue nextRef (Interrupted msg)
else interpretSendShutdownOrInterrupt toPid (SomeExitReason msg)
>>= kontinue nextRef . ResumeWith
SendShutdown toPid msg ->
if toPid == myPid
then diskontinue msg
else interpretSendShutdownOrInterrupt toPid (SomeExitReason msg)
>>= kontinue nextRef . ResumeWith
Spawn child -> spawnNewProcess Nothing child
>>= kontinue nextRef . ResumeWith . fst
SpawnLink child -> spawnNewProcess (Just myProcessInfo) child
>>= kontinue nextRef . ResumeWith . fst
ReceiveSelectedMessage f -> interpretReceive f >>= either diskontinue (kontinue nextRef)
FlushMessages -> interpretFlush >>= kontinue nextRef
SelfPid -> kontinue nextRef (ResumeWith myPid)
MakeReference -> kontinue (nextRef + 1) (ResumeWith nextRef)
YieldProcess -> kontinue nextRef (ResumeWith ())
Shutdown r -> diskontinue r
GetProcessState toPid -> interpretGetProcessState toPid >>= kontinue nextRef . ResumeWith
Monitor target -> interpretMonitor target >>= kontinue nextRef . ResumeWith
Demonitor ref -> interpretDemonitor ref >>= kontinue nextRef . ResumeWith
Link toPid -> interpretLink toPid
>>= kontinue nextRef . either Interrupted ResumeWith
Unlink toPid -> interpretUnlink toPid >>= kontinue nextRef . ResumeWith
where
interpretMonitor !target = do
setMyProcessState ProcessBusyMonitoring
schedulerState <- getSchedulerState
lift (atomically (addMonitoring target myPid schedulerState))
interpretDemonitor !ref = do
setMyProcessState ProcessBusyMonitoring
schedulerState <- getSchedulerState
lift (atomically (removeMonitoring ref schedulerState))
interpretUnlink !toPid = do
setMyProcessState ProcessBusyUnlinking
schedulerState <- getSchedulerState
let procInfosVar = schedulerState^.processTable
lift $ atomically $ do
procInfos <- readTVar procInfosVar
traverse_
(\toProcInfo ->
modifyTVar' (toProcInfo ^. processLinks) (Set.delete myPid))
(procInfos ^. at toPid)
modifyTVar' (myProcessInfo ^. processLinks) (Set.delete toPid)
interpretGetProcessState !toPid = do
setMyProcessState ProcessBusy
schedulerState <- getSchedulerState
let procInfosVar = schedulerState^.processTable
lift $ atomically $ do
procInfos <- readTVar procInfosVar
traverse (\toProcInfo -> readTVar (toProcInfo ^. processState))
(procInfos ^. at toPid)
interpretLink !toPid = do
setMyProcessState ProcessBusyLinking
schedulerState <- getSchedulerState
let procInfosVar = schedulerState^.processTable
lift $ atomically $ do
procInfos <- readTVar procInfosVar
case procInfos ^. at toPid of
Just toProcInfo -> do
modifyTVar' (toProcInfo ^. processLinks) (Set.insert myPid)
modifyTVar' (myProcessInfo ^. processLinks) (Set.insert toPid)
return (Right ())
Nothing ->
return (Left (LinkedProcessCrashed toPid))
interpretSend !toPid msg =
setMyProcessState ProcessBusySending *>
getSchedulerState
>>= lift
. atomically
. enqueueMessageOtherProcess toPid msg
interpretSendShutdownOrInterrupt !toPid !msg =
setMyProcessState
(either
(const ProcessBusySendingShutdown)
(const ProcessBusySendingInterrupt)
(fromSomeExitReason msg))
*> getSchedulerState
>>= lift
. atomically
. enqueueShutdownRequest toPid msg
interpretFlush :: Eff SchedulerIO (ResumeProcess [Dynamic])
interpretFlush = do
setMyProcessState ProcessBusyReceiving
lift $ atomically $ do
myMessageQ <- readTVar myMessageQVar
modifyTVar' myMessageQVar (incomingMessages .~ Seq.Empty)
return (ResumeWith (toList (myMessageQ ^. incomingMessages)))
interpretReceive
:: MessageSelector b
-> Eff SchedulerIO (Either (ExitReason 'NoRecovery) (ResumeProcess b))
interpretReceive f = do
setMyProcessState ProcessBusyReceiving
lift $ atomically $ do
myMessageQ <- readTVar myMessageQVar
case myMessageQ ^. shutdownRequests of
Nothing ->
case partitionMessages (myMessageQ ^. incomingMessages) Seq.Empty of
Nothing -> retry
Just (selectedMessage, otherMessages) -> do
modifyTVar' myMessageQVar (incomingMessages .~ otherMessages)
return (Right (ResumeWith selectedMessage))
Just shutdownRequest -> do
modifyTVar' myMessageQVar (shutdownRequests .~ Nothing)
case fromSomeExitReason shutdownRequest of
Left sr ->
return (Left sr)
Right ir ->
return (Right (Interrupted ir))
where
partitionMessages Seq.Empty _acc = Nothing
partitionMessages (m :<| msgRest) acc = maybe
(partitionMessages msgRest (acc :|> m))
(\res -> Just (res, acc Seq.>< msgRest))
(runMessageSelector f m)
schedule
:: (HasLogging IO SchedulerIO, HasCallStack)
=> Eff InterruptableProcEff ()
-> Eff LoggingAndIO ()
schedule procEff =
liftBaseWith (\runS ->
Async.withAsync (runS $ withNewSchedulerState $ do
(_, mainProcAsync) <- spawnNewProcess Nothing $ do
logNotice "++++++++ main process started ++++++++"
provideInterruptsShutdown procEff
logNotice "++++++++ main process returned ++++++++"
lift (void (Async.wait mainProcAsync))
)
(\ast -> runS $ do
a <- restoreM ast
void $ lift (Async.wait a)
)
) >>= restoreM
spawnNewProcess
:: (HasLogging IO SchedulerIO, HasCallStack)
=> Maybe ProcessInfo
-> Eff ProcEff ()
-> Eff SchedulerIO (ProcessId, Async (ExitReason 'NoRecovery))
spawnNewProcess mlinkedParent mfa = do
schedulerState <- getSchedulerState
procInfo <- allocateProcInfo schedulerState
traverse_ (linkToParent procInfo) mlinkedParent
procAsync <- doForkProc procInfo schedulerState
return (procInfo ^. processId, procAsync)
where
linkToParent toProcInfo parent = do
let toPid = toProcInfo ^. processId
parentPid = parent ^. processId
lift $ atomically $ do
modifyTVar' (toProcInfo ^. processLinks) (Set.insert parentPid)
modifyTVar' (parent ^. processLinks) (Set.insert toPid)
allocateProcInfo schedulerState =
lift (atomically (do
let nextPidVar = schedulerState ^. nextPid
processInfosVar = schedulerState ^. processTable
pid <- readTVar nextPidVar
modifyTVar' nextPidVar (+1)
procInfo <- newProcessInfo pid
modifyTVar' processInfosVar (at pid ?~ procInfo)
return procInfo
))
logAppendProcInfo pid =
let addProcessId = over lmProcessId
(maybe (Just (printf "% 9s" (show pid))) Just)
in traverseLogMessages
(traverse setLogMessageThreadId
>=> traverse (return . addProcessId))
triggerProcessLinksAndMonitors !pid !reason !linkSetVar = do
schedulerState <- getSchedulerState
lift $ atomically $
triggerAndRemoveMonitor pid (SomeExitReason reason) schedulerState
let exitSeverity = toExitSeverity reason
sendIt !linkedPid = do
let msg = SomeExitReason (LinkedProcessCrashed pid)
lift $ atomically $ do
procInfos <- readTVar (schedulerState ^. processTable)
let mLinkedProcInfo = procInfos ^? at linkedPid . _Just
case mLinkedProcInfo of
Nothing ->
return (Left linkedPid)
Just linkedProcInfo ->
let linkedMsgQVar = linkedProcInfo ^. messageQ
linkedLinkSetVar = linkedProcInfo ^. processLinks
in do linkedLinkSet <- readTVar linkedLinkSetVar
if Set.member pid linkedLinkSet
then do
writeTVar linkedLinkSetVar
(Set.delete pid linkedLinkSet)
when (exitSeverity == Crash)
(modifyTVar' linkedMsgQVar
(shutdownRequests ?~ msg))
return (Right linkedPid)
else
return (Left linkedPid)
linkedPids <- lift (atomically (do linkSet <- readTVar linkSetVar
writeTVar linkSetVar def
return linkSet))
res <- traverse sendIt (toList linkedPids)
traverse_ (logDebug . either (("linked process no found: " ++) . show)
(("sent shutdown to linked process: " ++) . show))
res
doForkProc :: ProcessInfo -> SchedulerState -> Eff SchedulerIO (Async (ExitReason 'NoRecovery))
doForkProc procInfo schedulerState =
restoreM =<< liftBaseWith
(\inScheduler -> do
let cancellationsVar = schedulerState ^. processCancellationTable
processInfosVar = schedulerState ^. processTable
pid = procInfo ^. processId
procAsync <- Async.async (
inScheduler (logAppendProcInfo pid
(Safe.bracketWithError
(logDebug "enter process")
(\mExc () -> do
lift (atomically
(do modifyTVar' processInfosVar (at pid .~ Nothing)
modifyTVar' cancellationsVar (at pid .~ Nothing)))
traverse_
(\exc -> logExitAndTriggerLinksAndMonitors
(exitReasonFromException exc)
pid)
mExc
)
(const
(do res <- handleProcess procInfo (mfa >> return ExitNormally)
logExitAndTriggerLinksAndMonitors res pid))
)))
atomically (modifyTVar' cancellationsVar (at pid ?~ procAsync))
return procAsync)
where
exitReasonFromException exc =
case Safe.fromException exc of
Just Async.AsyncCancelled ->
Killed
Nothing ->
UnexpectedException
(prettyCallStack callStack)
(Safe.displayException exc)
logExitAndTriggerLinksAndMonitors reason pid =
do triggerProcessLinksAndMonitors pid reason (procInfo ^. processLinks)
logProcessExit reason
return reason
getSchedulerState :: HasSchedulerIO r => Eff r SchedulerState
getSchedulerState = ask
enqueueMessageOtherProcess ::
HasCallStack => ProcessId -> Dynamic -> SchedulerState -> STM ()
enqueueMessageOtherProcess toPid msg schedulerState =
view (at toPid) <$> readTVar (schedulerState ^. processTable)
>>= maybe (return ())
(\toProcessTable -> do
modifyTVar' (toProcessTable ^. messageQ ) (incomingMessages %~ (:|> msg))
return ())
enqueueShutdownRequest ::
HasCallStack => ProcessId -> SomeExitReason -> SchedulerState -> STM ()
enqueueShutdownRequest toPid msg schedulerState =
view (at toPid) <$> readTVar (schedulerState ^. processTable)
>>= maybe (return ())
(\toProcessTable -> do
modifyTVar' (toProcessTable ^. messageQ ) (shutdownRequests ?~ msg)
return ())