-- | A coroutine based, single threaded scheduler for 'Process'es. module Control.Eff.Concurrent.Process.SingleThreadedScheduler ( schedule , defaultMain , singleThreadedIoScheduler , LoggingAndIo) where import Control.Eff import Control.Eff.Lift import Control.Eff.Log import Control.Eff.Concurrent.Process import Control.Lens hiding ((|>), Empty) import qualified Data.Sequence as Seq import Data.Sequence (Seq (..)) import qualified Data.Map.Strict as Map import GHC.Stack import Data.Kind () import Data.Dynamic import Data.Maybe import Control.Monad -- | Execute a 'Process' in the current thread, all child processes spawned by -- 'spawn' will be executed concurrently using a co-routine based, round-robin -- scheduler. schedule :: forall r . Eff (Process r ': r) () -> Eff r () schedule mainProcessAction = do y <- runAsCoroutine mainProcessAction go 1 (Map.singleton 0 Seq.empty) (Seq.singleton (y, 0)) where go :: ProcessId -> Map.Map ProcessId (Seq Dynamic) -> Seq (OnYield r, ProcessId) -> Eff r () go _newPid _msgQs Empty = return () go newPid msgQs allProcs@((processState, pid) :<| rest) = let handleExit = if pid == 0 then return () else go newPid (msgQs & at pid .~ Nothing) rest in case processState of OnDone -> handleExit OnRaiseError _ -> handleExit OnExitError _ -> handleExit OnSendShutdown targetPid k -> do let allButTarget = Seq.filter (\(_,e) -> e /= pid && e /= targetPid) allProcs targets = Seq.filter (\(_,e) -> e == targetPid) allProcs suicide = targetPid == pid targetFound = suicide || not (Seq.null targets) if suicide then do nextK <- k ShutdownRequested go newPid msgQs (rest :|> (nextK, pid)) else do let deliverTheGoodNews (targetState, tPid) = do nextTargetState <- case targetState of OnSendShutdown _ tk -> tk ShutdownRequested OnSelf tk -> tk ShutdownRequested OnSend _ _ tk -> tk ShutdownRequested OnRecv tk -> tk ShutdownRequested OnSpawn _ tk -> tk ShutdownRequested OnDone -> return OnDone OnExitError er -> return (OnExitError er) OnRaiseError er -> return (OnExitError er) -- return (error ("TODO write test "++er)) return (nextTargetState, tPid) nextTargets <- traverse deliverTheGoodNews targets nextK <- k (ResumeWith targetFound) go newPid msgQs (allButTarget Seq.>< (nextTargets :|> (nextK, pid))) OnSelf k -> do nextK <- k (ResumeWith pid) go newPid msgQs (rest :|> (nextK, pid)) OnSend toPid msg k -> do nextK <- k (ResumeWith (msgQs ^. at toPid . to isJust)) go newPid (msgQs & at toPid . _Just %~ (:|> msg)) (rest :|> (nextK, pid)) recv@(OnRecv k) -> case msgQs ^. at pid of Nothing -> do nextK <- k (OnError (show pid ++ " has no message queue!")) go newPid msgQs (rest :|> (nextK, pid)) Just Empty -> if Seq.length rest == 0 then do nextK <- k (OnError ("Process " ++ show pid ++ " deadlocked!")) go newPid msgQs (rest :|> (nextK, pid)) else go newPid msgQs (rest :|> (recv, pid)) Just (nextMessage :<| restMessages) -> do nextK <- k (ResumeWith nextMessage) go newPid (msgQs & at pid . _Just .~ restMessages) (rest :|> (nextK, pid)) OnSpawn f k -> do nextK <- k (ResumeWith newPid) fk <- runAsCoroutine f go (newPid + 1) (msgQs & at newPid .~ Just Seq.empty) ( rest :|> (nextK, pid) :|> (fk, newPid)) data OnYield r where OnSelf :: (ResumeProcess ProcessId -> Eff r (OnYield r)) -> OnYield r OnSpawn :: Eff (Process r ': r) () -> (ResumeProcess ProcessId -> Eff r (OnYield r)) -> OnYield r OnDone :: OnYield r OnExitError :: String -> OnYield r OnRaiseError :: String -> OnYield r OnSend :: ProcessId -> Dynamic -> (ResumeProcess Bool -> Eff r (OnYield r)) -> OnYield r OnRecv :: (ResumeProcess Dynamic -> Eff r (OnYield r)) -> OnYield r OnSendShutdown :: ProcessId -> (ResumeProcess Bool -> Eff r (OnYield r)) -> OnYield r runAsCoroutine :: forall r v . Eff (Process r ': r) v -> Eff r (OnYield r) runAsCoroutine m = handle_relay (const $ return OnDone) cont m where cont :: Process r x -> (x -> Eff r (OnYield r)) -> Eff r (OnYield r) cont SelfPid k = return (OnSelf k) cont (Spawn e) k = return (OnSpawn e k) cont Shutdown _k = return OnDone cont (ExitWithError e) _k = return (OnExitError e) cont (RaiseError e) _k = return (OnRaiseError e) cont (SendMessage tp msg) k = return (OnSend tp msg k) cont ReceiveMessage k = return (OnRecv k) cont (SendShutdown pid) k = return (OnSendShutdown pid k) -- | The concrete list of 'Eff'ects for running this pure scheduler on @IO@ and -- with string logging. type LoggingAndIo = '[ Logs String , Lift IO ] -- | A 'SchedulerProxy' for 'LoggingAndIo'. singleThreadedIoScheduler :: SchedulerProxy LoggingAndIo singleThreadedIoScheduler = SchedulerProxy -- | Execute a 'Process' using 'schedule' on top of 'Lift' @IO@ and 'Logs' -- @String@ effects. defaultMain :: HasCallStack => Eff '[Process '[Logs String, Lift IO], Logs String, Lift IO] () -> IO () defaultMain go = runLift $ handleLogsWith (schedule go) ($ putStrLn) _example :: IO () _example = defaultMain go where px :: SchedulerProxy '[Logs String, Lift IO] px = SchedulerProxy go :: Eff (ConsProcess '[Logs String, Lift IO]) () go = do p1 <- spawn $ do logMsg "Hello World!" me <- self px logMsg ("I am: " ++ show me) d <- receiveMessage px let m = fromDyn @String d "" logMsg ("Got: " ++ m) void $ sendMessage px p1 (toDyn "huhu") return ()