module Control.Eff.Concurrent.Process.ForkIOScheduler
( schedule
, defaultMain
, defaultMainWithLogWriter
, ProcEff
, InterruptableProcEff
, SchedulerIO
, HasSchedulerIO
)
where
import Control.Concurrent ( yield )
import qualified Control.Concurrent.Async as Async
import Control.Concurrent.Async ( Async(..) )
import Control.Concurrent.STM as STM
import Control.Eff
import Control.Eff.Concurrent.Process
import qualified Control.Eff.ExceptionExtra as ExcExtra
( )
import Control.Eff.Extend
import Control.Eff.Log
import Control.Eff.LogWriter.IO
import Control.Eff.LogWriter.Console
import Control.Eff.LogWriter.Async
import Control.Eff.Reader.Strict as Reader
import Control.Exception.Safe as Safe
import Control.Lens
import Control.Monad ( void
, when
)
import Control.Monad.Trans.Control ( MonadBaseControl(..)
, control
)
import Data.Default
import Data.Dynamic
import Data.Foldable
import Data.Function ( fix )
import Data.Kind ( )
import Data.Map ( Map )
import qualified Data.Map as Map
import Data.Maybe
import Data.Sequence ( Seq(..) )
import qualified Data.Sequence as Seq
import Data.Set ( Set )
import qualified Data.Set as Set
import qualified Data.Text as T
import GHC.Stack
import System.Timeout
import Text.Printf
data MessageQ = MessageQ
{ _incomingMessages :: Seq Dynamic
, _shutdownRequests :: Maybe SomeExitReason
}
instance Default MessageQ where
def = MessageQ def def
makeLenses ''MessageQ
tryTakeNextShutdownRequestSTM :: TVar MessageQ -> STM (Maybe SomeExitReason)
tryTakeNextShutdownRequestSTM mqVar = do
mq <- readTVar mqVar
when (isJust (mq ^. shutdownRequests))
(writeTVar mqVar (mq & shutdownRequests .~ Nothing))
return (mq ^. shutdownRequests)
data ProcessInfo = ProcessInfo
{ _processId :: ProcessId
, _processState :: TVar ProcessState
, _messageQ :: TVar MessageQ
, _processLinks :: TVar (Set ProcessId)
}
makeLenses ''ProcessInfo
data SchedulerState = SchedulerState
{ _nextPid :: TVar ProcessId
, _processTable :: TVar (Map ProcessId ProcessInfo)
, _processCancellationTable :: TVar (Map ProcessId (Async (ExitReason 'NoRecovery)))
, _processMonitors :: TVar (Set (MonitorReference, ProcessId))
, _nextMonitorIndex :: TVar Int
}
makeLenses ''SchedulerState
addMonitoring
:: ProcessId -> ProcessId -> SchedulerState -> STM MonitorReference
addMonitoring target owner schedulerState = do
aNewMonitorIndex <- readTVar (schedulerState ^. nextMonitorIndex)
modifyTVar' (schedulerState ^. nextMonitorIndex) (+ 1)
let monitorRef = MonitorReference aNewMonitorIndex target
when (target /= owner) $ do
pt <- readTVar (schedulerState ^. processTable)
if Map.member target pt
then modifyTVar' (schedulerState ^. processMonitors)
(Set.insert (monitorRef, owner))
else
let processDownMessage =
ProcessDown monitorRef (SomeExitReason (ProcessNotRunning target))
in enqueueMessageOtherProcess owner
(toDyn processDownMessage)
schedulerState
return monitorRef
removeMonitoring :: MonitorReference -> SchedulerState -> STM ()
removeMonitoring monitorRef schedulerState = modifyTVar'
(schedulerState ^. processMonitors)
(Set.filter (\(ref, _) -> ref /= monitorRef))
triggerAndRemoveMonitor
:: ProcessId -> SomeExitReason -> SchedulerState -> STM ()
triggerAndRemoveMonitor downPid reason schedulerState = do
monRefs <- readTVar (schedulerState ^. processMonitors)
traverse_ go monRefs
where
go (mr, owner) = when
(monitoredProcess mr == downPid)
(do
let processDownMessage = ProcessDown mr reason
enqueueMessageOtherProcess owner (toDyn processDownMessage) schedulerState
removeMonitoring mr schedulerState
)
instance Show ProcessInfo where
show p = "process info: " ++ show (p ^. processId)
newProcessInfo :: HasCallStack => ProcessId -> STM ProcessInfo
newProcessInfo a =
ProcessInfo a <$> newTVar ProcessBooting <*> newTVar def <*> newTVar def
newSchedulerState :: HasCallStack => STM SchedulerState
newSchedulerState =
SchedulerState
<$> newTVar 1
<*> newTVar def
<*> newTVar def
<*> newTVar def
<*> newTVar def
withNewSchedulerState
:: (HasCallStack) => Eff SchedulerIO () -> Eff LoggingAndIo ()
withNewSchedulerState mainProcessAction = Safe.bracketWithError
(lift (atomically newSchedulerState))
(\exceptions schedulerState -> do
traverse_
( logError
. ("scheduler setup crashed with: " <>)
. T.pack
. Safe.displayException
)
exceptions
logDebug "scheduler cleanup begin"
runReader schedulerState tearDownScheduler
)
(\schedulerState -> do
logDebug "scheduler loop entered"
x <- runReader schedulerState mainProcessAction
logDebug "scheduler loop returned"
return x
)
where
tearDownScheduler :: Eff SchedulerIO ()
tearDownScheduler = do
schedulerState <- getSchedulerState
let cancelTableVar = schedulerState ^. processCancellationTable
allProcesses <- lift
(atomically (readTVar cancelTableVar <* writeTVar cancelTableVar def))
logNotice
( "cancelling processes: "
<> T.pack (show (toListOf (ifolded . asIndex) allProcesses))
)
void
(liftBaseWith
(\runS -> timeout
5000000
( Async.mapConcurrently
(\a -> do
Async.cancel a
runS
(logNotice
("process cancelled: " <> T.pack (show (asyncThreadId a)))
)
)
allProcesses
>> runS (logNotice "all processes cancelled")
)
)
)
type ProcEff = ConsProcess SchedulerIO
type InterruptableProcEff = InterruptableProcess SchedulerIO
type HasSchedulerIO r = (HasCallStack, Lifted IO r, SchedulerIO <:: r)
type SchedulerIO = (Reader SchedulerState : LoggingAndIo)
defaultMain :: HasCallStack => Eff InterruptableProcEff () -> IO ()
defaultMain = defaultMainWithLogWriter consoleLogWriter
defaultMainWithLogWriter
:: HasCallStack => LogWriter IO -> Eff InterruptableProcEff () -> IO ()
defaultMainWithLogWriter lw =
runLift . withLogging lw . withAsyncLogWriter (1024 :: Int) . schedule
handleProcess
:: (HasCallStack)
=> ProcessInfo
-> Eff ProcEff (ExitReason 'NoRecovery)
-> Eff SchedulerIO (ExitReason 'NoRecovery)
handleProcess myProcessInfo actionToRun = fix
(handle_relay' singleStep (const . const (return ExitNormally)))
actionToRun
0
where
singleStep
:: (Eff ProcEff xx -> (Int -> Eff SchedulerIO (ExitReason 'NoRecovery)))
-> Arrs ProcEff x xx
-> Process SchedulerIO x
-> (Int -> Eff SchedulerIO (ExitReason 'NoRecovery))
singleStep k q p !nextRef = stepProcessInterpreter
nextRef
p
(\nextNextRef x -> k (qApp q x) nextNextRef)
return
myPid = myProcessInfo ^. processId
myProcessStateVar = myProcessInfo ^. processState
setMyProcessState = lift . atomically . setMyProcessStateSTM
setMyProcessStateSTM = writeTVar myProcessStateVar
myMessageQVar = myProcessInfo ^. messageQ
kontinueWith
:: forall s v a
. HasCallStack
=> (s -> Arr SchedulerIO v a)
-> (s -> Arr SchedulerIO v a)
kontinueWith kontinue !nextRef !result = do
setMyProcessState ProcessIdle
lift yield
kontinue nextRef result
diskontinueWith
:: forall a
. HasCallStack
=> Arr SchedulerIO (ExitReason 'NoRecovery) a
-> Arr SchedulerIO (ExitReason 'NoRecovery) a
diskontinueWith diskontinue !reason = do
setMyProcessState ProcessShuttingDown
diskontinue reason
stepProcessInterpreter
:: forall v a
. HasCallStack
=> Int
-> Process SchedulerIO v
-> (Int -> Arr SchedulerIO v a)
-> Arr SchedulerIO (ExitReason 'NoRecovery) a
-> Eff SchedulerIO a
stepProcessInterpreter !nextRef !request kontinue diskontinue =
tryTakeNextShutdownRequest >>= maybe
noShutdownRequested
(either onShutdownRequested onInterruptRequested . fromSomeExitReason)
where
tryTakeNextShutdownRequest =
lift (atomically (tryTakeNextShutdownRequestSTM myMessageQVar))
onShutdownRequested shutdownRequest = do
logDebug ("shutdown requested: " <> T.pack (show shutdownRequest))
setMyProcessState ProcessShuttingDown
interpretRequestAfterShutdownRequest (diskontinueWith diskontinue)
shutdownRequest
request
onInterruptRequested interruptRequest = do
logDebug ("interrupt requested: " <> T.pack (show interruptRequest))
setMyProcessState ProcessShuttingDown
interpretRequestAfterInterruptRequest (kontinueWith kontinue nextRef)
(diskontinueWith diskontinue)
interruptRequest
request
noShutdownRequested = do
setMyProcessState ProcessBusy
interpretRequest (kontinueWith kontinue)
(diskontinueWith diskontinue)
nextRef
request
interpretRequestAfterShutdownRequest
:: forall v a
. HasCallStack
=> Arr SchedulerIO (ExitReason 'NoRecovery) a
-> ExitReason 'NoRecovery
-> Process SchedulerIO v
-> Eff SchedulerIO a
interpretRequestAfterShutdownRequest diskontinue shutdownRequest = \case
SendMessage _ _ -> diskontinue shutdownRequest
SendInterrupt _ _ -> diskontinue shutdownRequest
SendShutdown toPid r ->
if toPid == myPid then diskontinue r else diskontinue shutdownRequest
Spawn _ -> diskontinue shutdownRequest
SpawnLink _ -> diskontinue shutdownRequest
ReceiveSelectedMessage _ -> diskontinue shutdownRequest
FlushMessages -> diskontinue shutdownRequest
SelfPid -> diskontinue shutdownRequest
MakeReference -> diskontinue shutdownRequest
YieldProcess -> diskontinue shutdownRequest
Shutdown r -> diskontinue r
GetProcessState _ -> diskontinue shutdownRequest
Monitor _ -> diskontinue shutdownRequest
Demonitor _ -> diskontinue shutdownRequest
Link _ -> diskontinue shutdownRequest
Unlink _ -> diskontinue shutdownRequest
interpretRequestAfterInterruptRequest
:: forall v a
. HasCallStack
=> Arr SchedulerIO v a
-> Arr SchedulerIO (ExitReason 'NoRecovery) a
-> ExitReason 'Recoverable
-> Process SchedulerIO v
-> Eff SchedulerIO a
interpretRequestAfterInterruptRequest kontinue diskontinue interruptRequest =
\case
SendMessage _ _ -> kontinue (Interrupted interruptRequest)
SendInterrupt _ _ -> kontinue (Interrupted interruptRequest)
SendShutdown toPid r -> if toPid == myPid
then diskontinue r
else kontinue (Interrupted interruptRequest)
Spawn _ -> kontinue (Interrupted interruptRequest)
SpawnLink _ -> kontinue (Interrupted interruptRequest)
ReceiveSelectedMessage _ -> kontinue (Interrupted interruptRequest)
FlushMessages -> kontinue (Interrupted interruptRequest)
SelfPid -> kontinue (Interrupted interruptRequest)
MakeReference -> kontinue (Interrupted interruptRequest)
YieldProcess -> kontinue (Interrupted interruptRequest)
Shutdown r -> diskontinue r
GetProcessState _ -> kontinue (Interrupted interruptRequest)
Monitor _ -> kontinue (Interrupted interruptRequest)
Demonitor _ -> kontinue (Interrupted interruptRequest)
Link _ -> kontinue (Interrupted interruptRequest)
Unlink _ -> kontinue (Interrupted interruptRequest)
interpretRequest
:: forall v a
. HasCallStack
=> (Int -> Arr SchedulerIO v a)
-> Arr SchedulerIO (ExitReason 'NoRecovery) a
-> Int
-> Process SchedulerIO v
-> Eff SchedulerIO a
interpretRequest kontinue diskontinue nextRef = \case
SendMessage toPid msg ->
interpretSend toPid msg >>= kontinue nextRef . ResumeWith
SendInterrupt toPid msg -> if toPid == myPid
then kontinue nextRef (Interrupted msg)
else
interpretSendShutdownOrInterrupt toPid (SomeExitReason msg)
>>= kontinue nextRef
. ResumeWith
SendShutdown toPid msg -> if toPid == myPid
then diskontinue msg
else
interpretSendShutdownOrInterrupt toPid (SomeExitReason msg)
>>= kontinue nextRef
. ResumeWith
Spawn child ->
spawnNewProcess Nothing child >>= kontinue nextRef . ResumeWith . fst
SpawnLink child ->
spawnNewProcess (Just myProcessInfo) child
>>= kontinue nextRef
. ResumeWith
. fst
ReceiveSelectedMessage f ->
interpretReceive f >>= either diskontinue (kontinue nextRef)
FlushMessages -> interpretFlush >>= kontinue nextRef
SelfPid -> kontinue nextRef (ResumeWith myPid)
MakeReference -> kontinue (nextRef + 1) (ResumeWith nextRef)
YieldProcess -> kontinue nextRef (ResumeWith ())
Shutdown r -> diskontinue r
GetProcessState toPid ->
interpretGetProcessState toPid >>= kontinue nextRef . ResumeWith
Monitor target ->
interpretMonitor target >>= kontinue nextRef . ResumeWith
Demonitor ref -> interpretDemonitor ref >>= kontinue nextRef . ResumeWith
Link toPid ->
interpretLink toPid >>= kontinue nextRef . either Interrupted ResumeWith
Unlink toPid -> interpretUnlink toPid >>= kontinue nextRef . ResumeWith
where
interpretMonitor !target = do
setMyProcessState ProcessBusyMonitoring
schedulerState <- getSchedulerState
lift (atomically (addMonitoring target myPid schedulerState))
interpretDemonitor !ref = do
setMyProcessState ProcessBusyMonitoring
schedulerState <- getSchedulerState
lift (atomically (removeMonitoring ref schedulerState))
interpretUnlink !toPid = do
setMyProcessState ProcessBusyUnlinking
schedulerState <- getSchedulerState
let procInfoVar = schedulerState ^. processTable
lift $ atomically $ do
procInfo <- readTVar procInfoVar
traverse_
(\toProcInfo ->
modifyTVar' (toProcInfo ^. processLinks) (Set.delete myPid)
)
(procInfo ^. at toPid)
modifyTVar' (myProcessInfo ^. processLinks) (Set.delete toPid)
interpretGetProcessState !toPid = do
setMyProcessState ProcessBusy
schedulerState <- getSchedulerState
let procInfoVar = schedulerState ^. processTable
lift $ atomically $ do
procInfoTable <- readTVar procInfoVar
traverse (\toProcInfo -> readTVar (toProcInfo ^. processState))
(procInfoTable ^. at toPid)
interpretLink !toPid = do
setMyProcessState ProcessBusyLinking
schedulerState <- getSchedulerState
let procInfoVar = schedulerState ^. processTable
lift $ atomically $ do
procInfoTable <- readTVar procInfoVar
case procInfoTable ^. at toPid of
Just toProcInfo -> do
modifyTVar' (toProcInfo ^. processLinks) (Set.insert myPid)
modifyTVar' (myProcessInfo ^. processLinks) (Set.insert toPid)
return (Right ())
Nothing -> return (Left (LinkedProcessCrashed toPid))
interpretSend !toPid msg =
setMyProcessState ProcessBusySending
*> getSchedulerState
>>= lift
. atomically
. enqueueMessageOtherProcess toPid msg
interpretSendShutdownOrInterrupt !toPid !msg =
setMyProcessState
(either (const ProcessBusySendingShutdown)
(const ProcessBusySendingInterrupt)
(fromSomeExitReason msg)
)
*> getSchedulerState
>>= lift
. atomically
. enqueueShutdownRequest toPid msg
interpretFlush :: Eff SchedulerIO (ResumeProcess [Dynamic])
interpretFlush = do
setMyProcessState ProcessBusyReceiving
lift $ atomically $ do
myMessageQ <- readTVar myMessageQVar
modifyTVar' myMessageQVar (incomingMessages .~ Seq.Empty)
return (ResumeWith (toList (myMessageQ ^. incomingMessages)))
interpretReceive
:: MessageSelector b
-> Eff SchedulerIO (Either (ExitReason 'NoRecovery) (ResumeProcess b))
interpretReceive f = do
setMyProcessState ProcessBusyReceiving
lift $ atomically $ do
myMessageQ <- readTVar myMessageQVar
case myMessageQ ^. shutdownRequests of
Nothing ->
case
partitionMessages (myMessageQ ^. incomingMessages) Seq.Empty
of
Nothing -> retry
Just (selectedMessage, otherMessages) -> do
modifyTVar' myMessageQVar (incomingMessages .~ otherMessages)
return (Right (ResumeWith selectedMessage))
Just shutdownRequest -> do
modifyTVar' myMessageQVar (shutdownRequests .~ Nothing)
case fromSomeExitReason shutdownRequest of
Left sr -> return (Left sr)
Right ir -> return (Right (Interrupted ir))
where
partitionMessages Seq.Empty _acc = Nothing
partitionMessages (m :<| msgRest) acc = maybe
(partitionMessages msgRest (acc :|> m))
(\res -> Just (res, acc Seq.>< msgRest))
(runMessageSelector f m)
schedule :: (HasCallStack) => Eff InterruptableProcEff () -> Eff LoggingAndIo ()
schedule procEff =
liftBaseWith
(\runS -> Async.withAsync
(runS $ withNewSchedulerState $ do
(_, mainProcAsync) <- spawnNewProcess Nothing $ do
logNotice "++++++++ main process started ++++++++"
provideInterruptsShutdown procEff
logNotice "++++++++ main process returned ++++++++"
lift (void (Async.wait mainProcAsync))
)
(\ast -> runS $ do
a <- restoreM ast
void $ lift (Async.wait a)
)
)
>>= restoreM
spawnNewProcess
:: (HasCallStack)
=> Maybe ProcessInfo
-> Eff ProcEff ()
-> Eff SchedulerIO (ProcessId, Async (ExitReason 'NoRecovery))
spawnNewProcess mLinkedParent mfa = do
schedulerState <- getSchedulerState
procInfo <- allocateProcInfo schedulerState
traverse_ (linkToParent procInfo) mLinkedParent
procAsync <- doForkProc procInfo schedulerState
return (procInfo ^. processId, procAsync)
where
linkToParent toProcInfo parent = do
let toPid = toProcInfo ^. processId
parentPid = parent ^. processId
lift $ atomically $ do
modifyTVar' (toProcInfo ^. processLinks) (Set.insert parentPid)
modifyTVar' (parent ^. processLinks) (Set.insert toPid)
allocateProcInfo schedulerState = lift
(atomically
(do
let nextPidVar = schedulerState ^. nextPid
processInfoVar = schedulerState ^. processTable
pid <- readTVar nextPidVar
modifyTVar' nextPidVar (+ 1)
procInfo <- newProcessInfo pid
modifyTVar' processInfoVar (at pid ?~ procInfo)
return procInfo
)
)
logAppendProcInfo pid =
let addProcessId = over
lmProcessId
(maybe (Just (T.pack (printf "% 9s" (show pid)))) Just)
in censorLogs @IO addProcessId
triggerProcessLinksAndMonitors
:: ProcessId -> ExitReason e -> TVar (Set ProcessId) -> Eff SchedulerIO ()
triggerProcessLinksAndMonitors !pid !reason !linkSetVar = do
schedulerState <- getSchedulerState
lift $ atomically $ triggerAndRemoveMonitor pid
(SomeExitReason reason)
schedulerState
let exitSeverity = toExitSeverity reason
sendIt !linkedPid = do
let msg = SomeExitReason (LinkedProcessCrashed pid)
lift $ atomically $ do
procInfoTable <- readTVar (schedulerState ^. processTable)
let mLinkedProcInfo = procInfoTable ^? ix linkedPid
case mLinkedProcInfo of
Nothing -> return (Left linkedPid)
Just linkedProcInfo ->
let linkedMsgQVar = linkedProcInfo ^. messageQ
linkedLinkSetVar = linkedProcInfo ^. processLinks
in do
linkedLinkSet <- readTVar linkedLinkSetVar
if Set.member pid linkedLinkSet
then do
writeTVar linkedLinkSetVar
(Set.delete pid linkedLinkSet)
when
(exitSeverity == Crash)
(modifyTVar' linkedMsgQVar
(shutdownRequests ?~ msg)
)
return (Right linkedPid)
else return (Left linkedPid)
linkedPids <- lift
(atomically
(do
linkSet <- readTVar linkSetVar
writeTVar linkSetVar def
return linkSet
)
)
res <- traverse sendIt (toList linkedPids)
traverse_
(logDebug . either
(("linked process no found: " <>) . T.pack . show)
(("sent shutdown to linked process: " <>) . T.pack . show)
)
res
doForkProc
:: ProcessInfo
-> SchedulerState
-> Eff SchedulerIO (Async (ExitReason 'NoRecovery))
doForkProc procInfo schedulerState = control
(\inScheduler -> do
let cancellationsVar = schedulerState ^. processCancellationTable
processInfoVar = schedulerState ^. processTable
pid = procInfo ^. processId
procAsync <- Async.async
(inScheduler
(logAppendProcInfo
pid
(Safe.bracketWithError
(logDebug "enter process")
(\mExc () -> do
lift
(atomically
(do
modifyTVar' processInfoVar (at pid .~ Nothing)
modifyTVar' cancellationsVar (at pid .~ Nothing)
)
)
traverse_
(\exc -> logExitAndTriggerLinksAndMonitors
(exitReasonFromException exc)
pid
)
mExc
)
(const
(do
res <- handleProcess procInfo (mfa >> return ExitNormally)
logExitAndTriggerLinksAndMonitors res pid
)
)
)
)
)
atomically (modifyTVar' cancellationsVar (at pid ?~ procAsync))
return procAsync
)
where
exitReasonFromException exc = case Safe.fromException exc of
Just Async.AsyncCancelled -> Killed
Nothing -> UnexpectedException (prettyCallStack callStack)
(Safe.displayException exc)
logExitAndTriggerLinksAndMonitors reason pid = do
triggerProcessLinksAndMonitors pid reason (procInfo ^. processLinks)
logProcessExit reason
return reason
getSchedulerState :: HasSchedulerIO r => Eff r SchedulerState
getSchedulerState = ask
enqueueMessageOtherProcess
:: HasCallStack => ProcessId -> Dynamic -> SchedulerState -> STM ()
enqueueMessageOtherProcess toPid msg schedulerState =
view (at toPid) <$> readTVar (schedulerState ^. processTable) >>= maybe
(return ())
(\toProcessTable -> do
modifyTVar' (toProcessTable ^. messageQ) (incomingMessages %~ (:|> msg))
return ()
)
enqueueShutdownRequest
:: HasCallStack => ProcessId -> SomeExitReason -> SchedulerState -> STM ()
enqueueShutdownRequest toPid msg schedulerState =
view (at toPid) <$> readTVar (schedulerState ^. processTable) >>= maybe
(return ())
(\toProcessTable -> do
modifyTVar' (toProcessTable ^. messageQ) (shutdownRequests ?~ msg)
return ()
)