-- | Implement Erlang style message passing concurrency.
--
-- This module contains 'spawn' which handles the 'Process' effects, using
-- 'STM.TQueue's and 'Control.Concurrent.Async.withAsync'.
--
-- This aims to be a pragmatic implementation, so even logging is
-- supported.
--
-- At the core is a /main process/ that enters 'schedule'
-- and creates all of the internal state stored in 'STM.TVar's
-- to manage processes with message queues.
--
module Control.Eff.Concurrent.Process.ForkIOScheduler
  ( schedule
  , defaultMain
  , defaultMainWithLogWriter
  , ProcEff
  , InterruptableProcEff
  , SchedulerIO
  , HasSchedulerIO
  )
where

import           Control.Concurrent             ( yield )
import qualified Control.Concurrent.Async      as Async
import           Control.Concurrent.Async       ( Async(..) )
import           Control.Concurrent.STM        as STM
import           Control.Eff
import           Control.Eff.Concurrent.Process
import qualified Control.Eff.ExceptionExtra    as ExcExtra
                                                ( )
import           Control.Eff.Extend
import           Control.Eff.Log
import           Control.Eff.LogWriter.IO
import           Control.Eff.LogWriter.Console
import           Control.Eff.LogWriter.Async
import           Control.Eff.Reader.Strict     as Reader
import           Control.Exception.Safe        as Safe
import           Control.Lens
import           Control.Monad                  ( void
                                                , when
                                                )
import           Control.Monad.Trans.Control    ( MonadBaseControl(..)
                                                , control
                                                )
import           Data.Default
import           Data.Dynamic
import           Data.Foldable
import           Data.Function                  ( fix )
import           Data.Kind                      ( )
import           Data.Map                       ( Map )
import qualified Data.Map                      as Map
import           Data.Maybe
import           Data.Sequence                  ( Seq(..) )
import qualified Data.Sequence                 as Seq
import           Data.Set                       ( Set )
import qualified Data.Set                      as Set
import qualified Data.Text                     as T
import           GHC.Stack
import           System.Timeout
import           Text.Printf

-- * Process Types

-- | A message queue of a process, contains the actual queue and maybe an
-- exit reason. The message queue is backed by a 'Seq' sequence with 'Dynamic' values.
data MessageQ = MessageQ
  { _incomingMessages :: Seq Dynamic
  , _shutdownRequests :: Maybe SomeExitReason
  }

instance Default MessageQ where
  def = MessageQ def def

makeLenses ''MessageQ

-- | Return any '_shutdownRequests' from a 'MessageQ' in a 'TVar' and
-- reset the '_shutdownRequests' field to 'Nothing' in the 'TVar'.
tryTakeNextShutdownRequestSTM :: TVar MessageQ -> STM (Maybe SomeExitReason)
tryTakeNextShutdownRequestSTM mqVar = do
  mq <- readTVar mqVar
  when (isJust (mq ^. shutdownRequests))
       (writeTVar mqVar (mq & shutdownRequests .~ Nothing))
  return (mq ^. shutdownRequests)

-- | Information about a process, needed to implement
-- 'Process' handlers. The message queue is backed by a 'STM.TVar' that contains
-- a 'MessageQ'.
data ProcessInfo = ProcessInfo
  { _processId :: ProcessId
  , _processState :: TVar ProcessState
  , _messageQ :: TVar MessageQ
  , _processLinks :: TVar (Set ProcessId)
  }

makeLenses ''ProcessInfo

-- * Scheduler Types

-- | Contains all process info'elements, as well as the state needed to implement
-- inter-process communication.
data SchedulerState = SchedulerState
  { _nextPid :: TVar ProcessId
  , _processTable :: TVar (Map ProcessId ProcessInfo)
  , _processCancellationTable :: TVar (Map ProcessId (Async (ExitReason 'NoRecovery)))
  , _processMonitors :: TVar (Set (MonitorReference, ProcessId))  -- ^ Set of monitors and monitor owners
  , _nextMonitorIndex :: TVar Int
  }

makeLenses ''SchedulerState

-- | Add monitor: If the process is dead, enqueue a 'ProcessDown' message into the
-- owners message queue
addMonitoring
  :: ProcessId -> ProcessId -> SchedulerState -> STM MonitorReference
addMonitoring target owner schedulerState = do
  aNewMonitorIndex <- readTVar (schedulerState ^. nextMonitorIndex)
  modifyTVar' (schedulerState ^. nextMonitorIndex) (+ 1)
  let monitorRef = MonitorReference aNewMonitorIndex target
  when (target /= owner) $ do
    pt <- readTVar (schedulerState ^. processTable)
    if Map.member target pt
      then modifyTVar' (schedulerState ^. processMonitors)
                       (Set.insert (monitorRef, owner))
      else
        let processDownMessage =
              ProcessDown monitorRef (SomeExitReason (ProcessNotRunning target))
        in  enqueueMessageOtherProcess owner
                                       (toDyn processDownMessage)
                                       schedulerState
  return monitorRef

removeMonitoring :: MonitorReference -> SchedulerState -> STM ()
removeMonitoring monitorRef schedulerState = modifyTVar'
  (schedulerState ^. processMonitors)
  (Set.filter (\(ref, _) -> ref /= monitorRef))

triggerAndRemoveMonitor
  :: ProcessId -> SomeExitReason -> SchedulerState -> STM ()
triggerAndRemoveMonitor downPid reason schedulerState = do
  monRefs <- readTVar (schedulerState ^. processMonitors)
  traverse_ go monRefs
 where
  go (mr, owner) = when
    (monitoredProcess mr == downPid)
    (do
      let processDownMessage = ProcessDown mr reason
      enqueueMessageOtherProcess owner (toDyn processDownMessage) schedulerState
      removeMonitoring mr schedulerState
    )

-- * Process Implementation
instance Show ProcessInfo where
  show p = "process info: " ++ show (p ^. processId)

-- | Create a new 'ProcessInfo'
newProcessInfo :: HasCallStack => ProcessId -> STM ProcessInfo
newProcessInfo a =
  ProcessInfo a <$> newTVar ProcessBooting <*> newTVar def <*> newTVar def

-- * Scheduler Implementation

-- | Create a new 'SchedulerState'
newSchedulerState :: HasCallStack => STM SchedulerState
newSchedulerState =
  SchedulerState
    <$> newTVar 1
    <*> newTVar def
    <*> newTVar def
    <*> newTVar def
    <*> newTVar def

-- | Create a new 'SchedulerState' run an IO action, catching all exceptions,
-- and when the actions returns, clean up and kill all processes.
withNewSchedulerState
  :: (HasCallStack) => Eff SchedulerIO () -> Eff LoggingAndIo ()
withNewSchedulerState mainProcessAction = Safe.bracketWithError
  (lift (atomically newSchedulerState))
  (\exceptions schedulerState -> do
    traverse_
      ( logError
      . ("scheduler setup crashed with: " <>)
      . T.pack
      . Safe.displayException
      )
      exceptions
    logDebug "scheduler cleanup begin"
    runReader schedulerState tearDownScheduler
  )
  (\schedulerState -> do
    logDebug "scheduler loop entered"
    x <- runReader schedulerState mainProcessAction
    logDebug "scheduler loop returned"
    return x
  )
 where
  tearDownScheduler :: Eff SchedulerIO ()
  tearDownScheduler = do
    schedulerState <- getSchedulerState
    let cancelTableVar = schedulerState ^. processCancellationTable
    -- cancel all processes
    allProcesses <- lift
      (atomically (readTVar cancelTableVar <* writeTVar cancelTableVar def))
    logNotice
      (  "cancelling processes: "
      <> T.pack (show (toListOf (ifolded . asIndex) allProcesses))
      )
    void
      (liftBaseWith
        (\runS -> timeout
          5000000
          (  Async.mapConcurrently
              (\a -> do
                Async.cancel a
                runS
                  (logNotice
                    ("process cancelled: " <> T.pack (show (asyncThreadId a)))
                  )
              )
              allProcesses
          >> runS (logNotice "all processes cancelled")
          )
        )
      )

-- | The concrete list of 'Eff'ects of processes compatible with this scheduler.
-- This builds upon 'SchedulerIO'.
type ProcEff = ConsProcess SchedulerIO

-- | The concrete list of the effects, that the 'Process' uses
type InterruptableProcEff = InterruptableProcess SchedulerIO

-- | Type class constraint to indicate that an effect union contains the
-- effects required by every process and the scheduler implementation itself.
type HasSchedulerIO r = (HasCallStack, Lifted IO r, SchedulerIO <:: r)

-- | The concrete list of 'Eff'ects for this scheduler implementation.
type SchedulerIO = (Reader SchedulerState : LoggingAndIo)

-- | Start the message passing concurrency system then execute a 'Process' on
-- top of 'SchedulerIO' effect. All logging is sent to standard output.
defaultMain :: HasCallStack => Eff InterruptableProcEff () -> IO ()
defaultMain = defaultMainWithLogWriter consoleLogWriter

-- | Start the message passing concurrency system then execute a 'Process' on
-- top of 'SchedulerIO' effect. All logging is sent to standard output.
defaultMainWithLogWriter
  :: HasCallStack => LogWriter IO -> Eff InterruptableProcEff () -> IO ()
defaultMainWithLogWriter lw =
  runLift . withLogging lw . withAsyncLogWriter (1024 :: Int) . schedule

-- ** Process Execution

handleProcess
  :: (HasCallStack)
  => ProcessInfo
  -> Eff ProcEff (ExitReason 'NoRecovery)
  -> Eff SchedulerIO (ExitReason 'NoRecovery)
handleProcess myProcessInfo actionToRun = fix
  (handle_relay' singleStep (const . const (return ExitNormally)))
  actionToRun
  0
 where
  singleStep
    :: (Eff ProcEff xx -> (Int -> Eff SchedulerIO (ExitReason 'NoRecovery)))
    -> Arrs ProcEff x xx
    -> Process SchedulerIO x
    -> (Int -> Eff SchedulerIO (ExitReason 'NoRecovery))
  singleStep k q p !nextRef = stepProcessInterpreter
    nextRef
    p
    (\nextNextRef x -> k (qApp q x) nextNextRef)
    return
  myPid                = myProcessInfo ^. processId
  myProcessStateVar    = myProcessInfo ^. processState
  setMyProcessState    = lift . atomically . setMyProcessStateSTM
-- DEBUG variant:
-- setMyProcessState st = do
--  oldSt <- lift (atomically (readTVar myProcessStateVar <* setMyProcessStateSTM st))
--  logDebug ("state change: "<> show oldSt <> " -> " <> show st)
  setMyProcessStateSTM = writeTVar myProcessStateVar
  myMessageQVar        = myProcessInfo ^. messageQ
  kontinueWith
    :: forall s v a
     . HasCallStack
    => (s -> Arr SchedulerIO v a)
    -> (s -> Arr SchedulerIO v a)
  kontinueWith kontinue !nextRef !result = do
    setMyProcessState ProcessIdle
    lift yield
    kontinue nextRef result
  diskontinueWith
    :: forall a
     . HasCallStack
    => Arr SchedulerIO (ExitReason 'NoRecovery) a
    -> Arr SchedulerIO (ExitReason 'NoRecovery) a
  diskontinueWith diskontinue !reason = do
    setMyProcessState ProcessShuttingDown
    diskontinue reason
  stepProcessInterpreter
    :: forall v a
     . HasCallStack
    => Int
    -> Process SchedulerIO v
    -> (Int -> Arr SchedulerIO v a)
    -> Arr SchedulerIO (ExitReason 'NoRecovery) a
    -> Eff SchedulerIO a
  stepProcessInterpreter !nextRef !request kontinue diskontinue =
    tryTakeNextShutdownRequest >>= maybe
      noShutdownRequested
      (either onShutdownRequested onInterruptRequested . fromSomeExitReason)
    -- handle process shutdown requests:
    --   1. take process exit reason
    --   2. set process state to ProcessShuttingDown
    --   3. apply kontinue to (Right Interrupted)
    --
   where
    tryTakeNextShutdownRequest =
      lift (atomically (tryTakeNextShutdownRequestSTM myMessageQVar))
    onShutdownRequested shutdownRequest = do
      logDebug ("shutdown requested: " <> T.pack (show shutdownRequest))
      setMyProcessState ProcessShuttingDown
      interpretRequestAfterShutdownRequest (diskontinueWith diskontinue)
                                           shutdownRequest
                                           request
    onInterruptRequested interruptRequest = do
      logDebug ("interrupt requested: " <> T.pack (show interruptRequest))
      setMyProcessState ProcessShuttingDown
      interpretRequestAfterInterruptRequest (kontinueWith kontinue nextRef)
                                            (diskontinueWith diskontinue)
                                            interruptRequest
                                            request
    noShutdownRequested = do
      setMyProcessState ProcessBusy
      interpretRequest (kontinueWith kontinue)
                       (diskontinueWith diskontinue)
                       nextRef
                       request
-- This gets no nextRef and may not pass a Left value to the continuation.
-- This forces the caller to defer the process exit to the next request
-- and hence ensures that the scheduler code cannot forget to allow the
-- client code to react to a shutdown request.
  interpretRequestAfterShutdownRequest
    :: forall v a
     . HasCallStack
    => Arr SchedulerIO (ExitReason 'NoRecovery) a
    -> ExitReason 'NoRecovery
    -> Process SchedulerIO v
    -> Eff SchedulerIO a
  interpretRequestAfterShutdownRequest diskontinue shutdownRequest = \case
    SendMessage   _ _ -> diskontinue shutdownRequest
    SendInterrupt _ _ -> diskontinue shutdownRequest
    SendShutdown toPid r ->
      if toPid == myPid then diskontinue r else diskontinue shutdownRequest
    Spawn                  _ -> diskontinue shutdownRequest
    SpawnLink              _ -> diskontinue shutdownRequest
    ReceiveSelectedMessage _ -> diskontinue shutdownRequest
    FlushMessages            -> diskontinue shutdownRequest
    SelfPid                  -> diskontinue shutdownRequest
    MakeReference            -> diskontinue shutdownRequest
    YieldProcess             -> diskontinue shutdownRequest
    Shutdown        r        -> diskontinue r
    GetProcessState _        -> diskontinue shutdownRequest
    Monitor         _        -> diskontinue shutdownRequest
    Demonitor       _        -> diskontinue shutdownRequest
    Link            _        -> diskontinue shutdownRequest
    Unlink          _        -> diskontinue shutdownRequest
  interpretRequestAfterInterruptRequest
    :: forall v a
     . HasCallStack
    => Arr SchedulerIO v a
    -> Arr SchedulerIO (ExitReason 'NoRecovery) a
    -> ExitReason 'Recoverable
    -> Process SchedulerIO v
    -> Eff SchedulerIO a
  interpretRequestAfterInterruptRequest kontinue diskontinue interruptRequest =
    \case
      SendMessage   _     _ -> kontinue (Interrupted interruptRequest)
      SendInterrupt _     _ -> kontinue (Interrupted interruptRequest)
      SendShutdown  toPid r -> if toPid == myPid
        then diskontinue r
        else kontinue (Interrupted interruptRequest)
      Spawn                  _ -> kontinue (Interrupted interruptRequest)
      SpawnLink              _ -> kontinue (Interrupted interruptRequest)
      ReceiveSelectedMessage _ -> kontinue (Interrupted interruptRequest)
      FlushMessages            -> kontinue (Interrupted interruptRequest)
      SelfPid                  -> kontinue (Interrupted interruptRequest)
      MakeReference            -> kontinue (Interrupted interruptRequest)
      YieldProcess             -> kontinue (Interrupted interruptRequest)
      Shutdown        r        -> diskontinue r
      GetProcessState _        -> kontinue (Interrupted interruptRequest)
      Monitor         _        -> kontinue (Interrupted interruptRequest)
      Demonitor       _        -> kontinue (Interrupted interruptRequest)
      Link            _        -> kontinue (Interrupted interruptRequest)
      Unlink          _        -> kontinue (Interrupted interruptRequest)
  interpretRequest
    :: forall v a
     . HasCallStack
    => (Int -> Arr SchedulerIO v a)
    -> Arr SchedulerIO (ExitReason 'NoRecovery) a
    -> Int
    -> Process SchedulerIO v
    -> Eff SchedulerIO a
  interpretRequest kontinue diskontinue nextRef = \case
    SendMessage toPid msg ->
      interpretSend toPid msg >>= kontinue nextRef . ResumeWith
    SendInterrupt toPid msg -> if toPid == myPid
      then kontinue nextRef (Interrupted msg)
      else
        interpretSendShutdownOrInterrupt toPid (SomeExitReason msg)
        >>= kontinue nextRef
        .   ResumeWith
    SendShutdown toPid msg -> if toPid == myPid
      then diskontinue msg
      else
        interpretSendShutdownOrInterrupt toPid (SomeExitReason msg)
        >>= kontinue nextRef
        .   ResumeWith
    Spawn child ->
      spawnNewProcess Nothing child >>= kontinue nextRef . ResumeWith . fst
    SpawnLink child ->
      spawnNewProcess (Just myProcessInfo) child
        >>= kontinue nextRef
        .   ResumeWith
        .   fst
    ReceiveSelectedMessage f ->
      interpretReceive f >>= either diskontinue (kontinue nextRef)
    FlushMessages -> interpretFlush >>= kontinue nextRef
    SelfPid       -> kontinue nextRef (ResumeWith myPid)
    MakeReference -> kontinue (nextRef + 1) (ResumeWith nextRef)
    YieldProcess  -> kontinue nextRef (ResumeWith ())
    Shutdown r    -> diskontinue r
    GetProcessState toPid ->
      interpretGetProcessState toPid >>= kontinue nextRef . ResumeWith
    Monitor target ->
      interpretMonitor target >>= kontinue nextRef . ResumeWith
    Demonitor ref -> interpretDemonitor ref >>= kontinue nextRef . ResumeWith
    Link toPid ->
      interpretLink toPid >>= kontinue nextRef . either Interrupted ResumeWith
    Unlink toPid -> interpretUnlink toPid >>= kontinue nextRef . ResumeWith
   where
    interpretMonitor !target = do
      setMyProcessState ProcessBusyMonitoring
      schedulerState <- getSchedulerState
      lift (atomically (addMonitoring target myPid schedulerState))
    interpretDemonitor !ref = do
      setMyProcessState ProcessBusyMonitoring
      schedulerState <- getSchedulerState
      lift (atomically (removeMonitoring ref schedulerState))
    interpretUnlink !toPid = do
      setMyProcessState ProcessBusyUnlinking
      schedulerState <- getSchedulerState
      let procInfoVar = schedulerState ^. processTable
      lift $ atomically $ do
        procInfo <- readTVar procInfoVar
        traverse_
          (\toProcInfo ->
            modifyTVar' (toProcInfo ^. processLinks) (Set.delete myPid)
          )
          (procInfo ^. at toPid)
        modifyTVar' (myProcessInfo ^. processLinks) (Set.delete toPid)
    interpretGetProcessState !toPid = do
      setMyProcessState ProcessBusy
      schedulerState <- getSchedulerState
      let procInfoVar = schedulerState ^. processTable
      lift $ atomically $ do
        procInfoTable <- readTVar procInfoVar
        traverse (\toProcInfo -> readTVar (toProcInfo ^. processState))
                 (procInfoTable ^. at toPid)
    interpretLink !toPid = do
      setMyProcessState ProcessBusyLinking
      schedulerState <- getSchedulerState
      let procInfoVar = schedulerState ^. processTable
      lift $ atomically $ do
        procInfoTable <- readTVar procInfoVar
        case procInfoTable ^. at toPid of
          Just toProcInfo -> do
            modifyTVar' (toProcInfo ^. processLinks)    (Set.insert myPid)
            modifyTVar' (myProcessInfo ^. processLinks) (Set.insert toPid)
            return (Right ())
          Nothing -> return (Left (LinkedProcessCrashed toPid))
    interpretSend !toPid msg =
      setMyProcessState ProcessBusySending
        *>  getSchedulerState
        >>= lift
        .   atomically
        .   enqueueMessageOtherProcess toPid msg
    interpretSendShutdownOrInterrupt !toPid !msg =
      setMyProcessState
          (either (const ProcessBusySendingShutdown)
                  (const ProcessBusySendingInterrupt)
                  (fromSomeExitReason msg)
          )
        *>  getSchedulerState
        >>= lift
        .   atomically
        .   enqueueShutdownRequest toPid msg
    interpretFlush :: Eff SchedulerIO (ResumeProcess [Dynamic])
    interpretFlush = do
      setMyProcessState ProcessBusyReceiving
      lift $ atomically $ do
        myMessageQ <- readTVar myMessageQVar
        modifyTVar' myMessageQVar (incomingMessages .~ Seq.Empty)
        return (ResumeWith (toList (myMessageQ ^. incomingMessages)))
    interpretReceive
      :: MessageSelector b
      -> Eff SchedulerIO (Either (ExitReason 'NoRecovery) (ResumeProcess b))
    interpretReceive f = do
      setMyProcessState ProcessBusyReceiving
      lift $ atomically $ do
        myMessageQ <- readTVar myMessageQVar
        case myMessageQ ^. shutdownRequests of
          Nothing ->
            case
                partitionMessages (myMessageQ ^. incomingMessages) Seq.Empty
              of
                Nothing                               -> retry
                Just (selectedMessage, otherMessages) -> do
                  modifyTVar' myMessageQVar (incomingMessages .~ otherMessages)
                  return (Right (ResumeWith selectedMessage))
          Just shutdownRequest -> do
            modifyTVar' myMessageQVar (shutdownRequests .~ Nothing)
            case fromSomeExitReason shutdownRequest of
              Left  sr -> return (Left sr)
              Right ir -> return (Right (Interrupted ir))
     where
      partitionMessages Seq.Empty       _acc = Nothing
      partitionMessages (m :<| msgRest) acc  = maybe
        (partitionMessages msgRest (acc :|> m))
        (\res -> Just (res, acc Seq.>< msgRest))
        (runMessageSelector f m)

-- | This is the main entry point to running a message passing concurrency
-- application. This function takes a 'Process' on top of the 'SchedulerIO'
-- effect for concurrent logging.
schedule :: (HasCallStack) => Eff InterruptableProcEff () -> Eff LoggingAndIo ()
schedule procEff =
  liftBaseWith
      (\runS -> Async.withAsync
        (runS $ withNewSchedulerState $ do
          (_, mainProcAsync) <- spawnNewProcess Nothing $ do
            logNotice "++++++++ main process started ++++++++"
            provideInterruptsShutdown procEff
            logNotice "++++++++ main process returned ++++++++"
          lift (void (Async.wait mainProcAsync))
        )
        (\ast -> runS $ do
          a <- restoreM ast
          void $ lift (Async.wait a)
        )
      )
    >>= restoreM

spawnNewProcess
  :: (HasCallStack)
  => Maybe ProcessInfo
  -> Eff ProcEff ()
  -> Eff SchedulerIO (ProcessId, Async (ExitReason 'NoRecovery))
spawnNewProcess mLinkedParent mfa = do
  schedulerState <- getSchedulerState
  procInfo       <- allocateProcInfo schedulerState
  traverse_ (linkToParent procInfo) mLinkedParent
  procAsync <- doForkProc procInfo schedulerState
  return (procInfo ^. processId, procAsync)
 where
  linkToParent toProcInfo parent = do
    let toPid     = toProcInfo ^. processId
        parentPid = parent ^. processId
    lift $ atomically $ do
      modifyTVar' (toProcInfo ^. processLinks) (Set.insert parentPid)
      modifyTVar' (parent ^. processLinks)     (Set.insert toPid)
  allocateProcInfo schedulerState = lift
    (atomically
      (do
        let nextPidVar     = schedulerState ^. nextPid
            processInfoVar = schedulerState ^. processTable
        pid <- readTVar nextPidVar
        modifyTVar' nextPidVar (+ 1)
        procInfo <- newProcessInfo pid
        modifyTVar' processInfoVar (at pid ?~ procInfo)
        return procInfo
      )
    )
  logAppendProcInfo pid =
    let addProcessId = over
          lmProcessId
          (maybe (Just (T.pack (printf "% 9s" (show pid)))) Just)
    in  censorLogs @IO addProcessId
  triggerProcessLinksAndMonitors
    :: ProcessId -> ExitReason e -> TVar (Set ProcessId) -> Eff SchedulerIO ()
  triggerProcessLinksAndMonitors !pid !reason !linkSetVar = do
    schedulerState <- getSchedulerState
    lift $ atomically $ triggerAndRemoveMonitor pid
                                                (SomeExitReason reason)
                                                schedulerState
    let exitSeverity = toExitSeverity reason
        sendIt !linkedPid = do
          let msg = SomeExitReason (LinkedProcessCrashed pid)
          lift $ atomically $ do
            procInfoTable <- readTVar (schedulerState ^. processTable)
            let mLinkedProcInfo = procInfoTable ^? ix linkedPid
            case mLinkedProcInfo of
              Nothing -> return (Left linkedPid)
              Just linkedProcInfo ->
                let linkedMsgQVar    = linkedProcInfo ^. messageQ
                    linkedLinkSetVar = linkedProcInfo ^. processLinks
                in  do
                      linkedLinkSet <- readTVar linkedLinkSetVar
                      if Set.member pid linkedLinkSet
                        then do
                          writeTVar linkedLinkSetVar
                                    (Set.delete pid linkedLinkSet)
                          when
                            (exitSeverity == Crash)
                            (modifyTVar' linkedMsgQVar
                                         (shutdownRequests ?~ msg)
                            )
                          return (Right linkedPid)
                        else return (Left linkedPid)
    linkedPids <- lift
      (atomically
        (do
          linkSet <- readTVar linkSetVar
          writeTVar linkSetVar def
          return linkSet
        )
      )
    res <- traverse sendIt (toList linkedPids)
    traverse_
      (logDebug . either
        (("linked process no found: " <>) . T.pack . show)
        (("sent shutdown to linked process: " <>) . T.pack . show)
      )
      res
  doForkProc
    :: ProcessInfo
    -> SchedulerState
    -> Eff SchedulerIO (Async (ExitReason 'NoRecovery))
  doForkProc procInfo schedulerState = control
    (\inScheduler -> do
      let cancellationsVar = schedulerState ^. processCancellationTable
          processInfoVar   = schedulerState ^. processTable
          pid              = procInfo ^. processId
      procAsync <- Async.async
        (inScheduler
          (logAppendProcInfo
            pid
            (Safe.bracketWithError
              (logDebug "enter process")
              (\mExc () -> do
                lift
                  (atomically
                    (do
                      modifyTVar' processInfoVar   (at pid .~ Nothing)
                      modifyTVar' cancellationsVar (at pid .~ Nothing)
                    )
                  )
                traverse_
                  (\exc -> logExitAndTriggerLinksAndMonitors
                    (exitReasonFromException exc)
                    pid
                  )
                  mExc
              )
              (const
                (do
                  res <- handleProcess procInfo (mfa >> return ExitNormally)
                  logExitAndTriggerLinksAndMonitors res pid
                )
              )
            )
          )
        )
      atomically (modifyTVar' cancellationsVar (at pid ?~ procAsync))
      return procAsync
    )
   where
    exitReasonFromException exc = case Safe.fromException exc of
      Just Async.AsyncCancelled -> Killed
      Nothing -> UnexpectedException (prettyCallStack callStack)
                                     (Safe.displayException exc)
    logExitAndTriggerLinksAndMonitors reason pid = do
      triggerProcessLinksAndMonitors pid reason (procInfo ^. processLinks)
      logProcessExit reason
      return reason

-- * Scheduler Accessor
getSchedulerState :: HasSchedulerIO r => Eff r SchedulerState
getSchedulerState = ask

enqueueMessageOtherProcess
  :: HasCallStack => ProcessId -> Dynamic -> SchedulerState -> STM ()
enqueueMessageOtherProcess toPid msg schedulerState =
  view (at toPid) <$> readTVar (schedulerState ^. processTable) >>= maybe
    (return ())
    (\toProcessTable -> do
      modifyTVar' (toProcessTable ^. messageQ) (incomingMessages %~ (:|> msg))
      return ()
    )

enqueueShutdownRequest
  :: HasCallStack => ProcessId -> SomeExitReason -> SchedulerState -> STM ()
enqueueShutdownRequest toPid msg schedulerState =
  view (at toPid) <$> readTVar (schedulerState ^. processTable) >>= maybe
    (return ())
    (\toProcessTable -> do
      modifyTVar' (toProcessTable ^. messageQ) (shutdownRequests ?~ msg)
      return ()
    )