-- | A coroutine based, single threaded scheduler for 'Process'es. module Control.Eff.Concurrent.Process.SingleThreadedScheduler ( schedule , scheduleWithYield , defaultMain , singleThreadedIoScheduler , LoggingAndIo ) where import Control.Concurrent ( yield ) import Control.Eff import Control.Eff.Lift import Control.Eff.Log import Control.Eff.Concurrent.Process import Control.Lens hiding ( (|>) , Empty ) import qualified Data.Sequence as Seq import Data.Sequence ( Seq(..) ) import qualified Data.Map.Strict as Map import GHC.Stack import Data.Kind ( ) import Data.Dynamic import Data.Maybe -- | Execute a 'Process' in the current thread, all child processes spawned by -- 'spawn' will be executed concurrently using a co-routine based, round-robin -- scheduler. schedule :: forall r . Eff (Process r ': r) () -> Eff r () schedule = scheduleWithYield (return ()) -- | Execute a 'Process' in the current thread, all child processes spawned by -- 'spawn' will be executed concurrently using a co-routine based, round-robin -- scheduler. scheduleWithYield :: forall r . Eff r () -> Eff (Process r ': r) () -> Eff r () scheduleWithYield yieldEff mainProcessAction = do y <- runAsCoroutine mainProcessAction go 1 (Map.singleton 0 Seq.empty) (Seq.singleton (y, 0)) where go :: ProcessId -> Map.Map ProcessId (Seq Dynamic) -> Seq (OnYield r, ProcessId) -> Eff r () go _newPid _msgQs Empty = return () go newPid msgQs allProcs@((processState, pid) :<| rest) = let handleExit = if pid == 0 then return () else go newPid (msgQs & at pid .~ Nothing) rest maybeYield = if pid == 0 then yieldEff else return () in case processState of OnDone -> handleExit OnRaiseError _ -> handleExit OnExitError _ -> handleExit OnSendShutdown targetPid k -> do let allButTarget = Seq.filter (\(_, e) -> e /= pid && e /= targetPid) allProcs targets = Seq.filter (\(_, e) -> e == targetPid) allProcs suicide = targetPid == pid targetFound = suicide || not (Seq.null targets) if suicide then do nextK <- k ShutdownRequested go newPid msgQs (rest :|> (nextK, pid)) else do let deliverTheGoodNews (targetState, tPid) = do nextTargetState <- case targetState of OnSendShutdown _ tk -> tk ShutdownRequested OnYield tk -> tk ShutdownRequested OnSelf tk -> tk ShutdownRequested OnSend _ _ tk -> tk ShutdownRequested OnRecv tk -> tk ShutdownRequested OnSpawn _ tk -> tk ShutdownRequested OnDone -> return OnDone OnExitError er -> return (OnExitError er) OnRaiseError er -> return (OnExitError er) -- return (error ("TODO write test "++er)) return (nextTargetState, tPid) nextTargets <- traverse deliverTheGoodNews targets nextK <- k (ResumeWith targetFound) maybeYield go newPid msgQs (allButTarget Seq.>< (nextTargets :|> (nextK, pid))) OnSelf k -> do nextK <- k (ResumeWith pid) maybeYield go newPid msgQs (rest :|> (nextK, pid)) OnYield k -> do yieldEff nextK <- k (ResumeWith ()) go newPid msgQs (rest :|> (nextK, pid)) OnSend toPid msg k -> do nextK <- k (ResumeWith (msgQs ^. at toPid . to isJust)) maybeYield go newPid (msgQs & at toPid . _Just %~ (:|> msg)) (rest :|> (nextK, pid)) recv@(OnRecv k) -> case msgQs ^. at pid of Nothing -> do nextK <- k (OnError (show pid ++ " has no message queue!")) maybeYield go newPid msgQs (rest :|> (nextK, pid)) Just Empty -> if Seq.length rest == 0 then do nextK <- k (OnError ("Process " ++ show pid ++ " deadlocked!")) maybeYield go newPid msgQs (rest :|> (nextK, pid)) else go newPid msgQs (rest :|> (recv, pid)) Just (nextMessage :<| restMessages) -> do nextK <- k (ResumeWith nextMessage) maybeYield go newPid (msgQs & at pid . _Just .~ restMessages) (rest :|> (nextK, pid)) OnSpawn f k -> do nextK <- k (ResumeWith newPid) fk <- runAsCoroutine f maybeYield go (newPid + 1) (msgQs & at newPid .~ Just Seq.empty) (rest :|> (nextK, pid) :|> (fk, newPid)) data OnYield r where OnYield :: (ResumeProcess () -> Eff r (OnYield r)) -> OnYield r OnSelf :: (ResumeProcess ProcessId -> Eff r (OnYield r)) -> OnYield r OnSpawn :: Eff (Process r ': r) () -> (ResumeProcess ProcessId -> Eff r (OnYield r)) -> OnYield r OnDone :: OnYield r OnExitError :: String -> OnYield r OnRaiseError :: String -> OnYield r OnSend :: ProcessId -> Dynamic -> (ResumeProcess Bool -> Eff r (OnYield r)) -> OnYield r OnRecv :: (ResumeProcess Dynamic -> Eff r (OnYield r)) -> OnYield r OnSendShutdown :: ProcessId -> (ResumeProcess Bool -> Eff r (OnYield r)) -> OnYield r runAsCoroutine :: forall r v . Eff (Process r ': r) v -> Eff r (OnYield r) runAsCoroutine m = handle_relay (const $ return OnDone) cont m where cont :: Process r x -> (x -> Eff r (OnYield r)) -> Eff r (OnYield r) cont YieldProcess k = return (OnYield k) cont SelfPid k = return (OnSelf k) cont (Spawn e) k = return (OnSpawn e k) cont Shutdown _k = return OnDone cont (ExitWithError e ) _k = return (OnExitError e) cont (RaiseError e ) _k = return (OnRaiseError e) cont (SendMessage tp msg) k = return (OnSend tp msg k) cont ReceiveMessage k = return (OnRecv k) cont (SendShutdown pid) k = return (OnSendShutdown pid k) -- | The concrete list of 'Eff'ects for running this pure scheduler on @IO@ and -- with string logging. type LoggingAndIo = '[ Logs String , Lift IO ] -- | A 'SchedulerProxy' for 'LoggingAndIo'. singleThreadedIoScheduler :: SchedulerProxy LoggingAndIo singleThreadedIoScheduler = SchedulerProxy -- | Execute a 'Process' using 'schedule' on top of 'Lift' @IO@ and 'Logs' -- @String@ effects. defaultMain :: HasCallStack => Eff '[Process '[Logs String, Lift IO], Logs String, Lift IO] () -> IO () defaultMain go = runLift $ handleLogsWith (scheduleWithYield (lift yield) go) ($ putStrLn)