module Control.Eff.Concurrent.Process.SingleThreadedScheduler
( schedule
, scheduleWithYield
, defaultMain
, singleThreadedIoScheduler
, LoggingAndIo
)
where
import Control.Concurrent ( yield )
import Control.Eff
import Control.Eff.Lift
import Control.Eff.Log
import Control.Eff.Concurrent.Process
import Control.Lens hiding ( (|>)
, Empty
)
import qualified Data.Sequence as Seq
import Data.Sequence ( Seq(..) )
import qualified Data.Map.Strict as Map
import GHC.Stack
import Data.Kind ( )
import Data.Dynamic
import Data.Maybe
schedule :: forall r . Eff (Process r ': r) () -> Eff r ()
schedule = scheduleWithYield (return ())
scheduleWithYield :: forall r . Eff r () -> Eff (Process r ': r) () -> Eff r ()
scheduleWithYield yieldEff mainProcessAction = do
y <- runAsCoroutine mainProcessAction
go 1 (Map.singleton 0 Seq.empty) (Seq.singleton (y, 0))
where
go
:: ProcessId
-> Map.Map ProcessId (Seq Dynamic)
-> Seq (OnYield r, ProcessId)
-> Eff r ()
go _newPid _msgQs Empty = return ()
go newPid msgQs allProcs@((processState, pid) :<| rest)
= let
handleExit = if pid == 0
then return ()
else go newPid (msgQs & at pid .~ Nothing) rest
maybeYield = if pid == 0 then yieldEff else return ()
in
case processState of
OnDone -> handleExit
OnRaiseError _ -> handleExit
OnExitError _ -> handleExit
OnSendShutdown targetPid k -> do
let allButTarget =
Seq.filter (\(_, e) -> e /= pid && e /= targetPid) allProcs
targets = Seq.filter (\(_, e) -> e == targetPid) allProcs
suicide = targetPid == pid
targetFound = suicide || not (Seq.null targets)
if suicide
then do
nextK <- k ShutdownRequested
go newPid msgQs (rest :|> (nextK, pid))
else do
let deliverTheGoodNews (targetState, tPid) = do
nextTargetState <- case targetState of
OnSendShutdown _ tk -> tk ShutdownRequested
OnYield tk -> tk ShutdownRequested
OnSelf tk -> tk ShutdownRequested
OnSend _ _ tk -> tk ShutdownRequested
OnRecv tk -> tk ShutdownRequested
OnSpawn _ tk -> tk ShutdownRequested
OnDone -> return OnDone
OnExitError er -> return (OnExitError er)
OnRaiseError er -> return (OnExitError er)
return (nextTargetState, tPid)
nextTargets <- traverse deliverTheGoodNews targets
nextK <- k (ResumeWith targetFound)
maybeYield
go newPid
msgQs
(allButTarget Seq.>< (nextTargets :|> (nextK, pid)))
OnSelf k -> do
nextK <- k (ResumeWith pid)
maybeYield
go newPid msgQs (rest :|> (nextK, pid))
OnYield k -> do
yieldEff
nextK <- k (ResumeWith ())
go newPid msgQs (rest :|> (nextK, pid))
OnSend toPid msg k -> do
nextK <- k (ResumeWith (msgQs ^. at toPid . to isJust))
maybeYield
go newPid
(msgQs & at toPid . _Just %~ (:|> msg))
(rest :|> (nextK, pid))
recv@(OnRecv k) -> case msgQs ^. at pid of
Nothing -> do
nextK <- k (OnError (show pid ++ " has no message queue!"))
maybeYield
go newPid msgQs (rest :|> (nextK, pid))
Just Empty -> if Seq.length rest == 0
then do
nextK <- k (OnError ("Process " ++ show pid ++ " deadlocked!"))
maybeYield
go newPid msgQs (rest :|> (nextK, pid))
else go newPid msgQs (rest :|> (recv, pid))
Just (nextMessage :<| restMessages) -> do
nextK <- k (ResumeWith nextMessage)
maybeYield
go newPid
(msgQs & at pid . _Just .~ restMessages)
(rest :|> (nextK, pid))
OnSpawn f k -> do
nextK <- k (ResumeWith newPid)
fk <- runAsCoroutine f
maybeYield
go (newPid + 1)
(msgQs & at newPid .~ Just Seq.empty)
(rest :|> (nextK, pid) :|> (fk, newPid))
data OnYield r where
OnYield :: (ResumeProcess () -> Eff r (OnYield r))
-> OnYield r
OnSelf :: (ResumeProcess ProcessId -> Eff r (OnYield r))
-> OnYield r
OnSpawn :: Eff (Process r ': r) ()
-> (ResumeProcess ProcessId -> Eff r (OnYield r))
-> OnYield r
OnDone :: OnYield r
OnExitError :: String -> OnYield r
OnRaiseError :: String -> OnYield r
OnSend :: ProcessId -> Dynamic
-> (ResumeProcess Bool -> Eff r (OnYield r))
-> OnYield r
OnRecv :: (ResumeProcess Dynamic -> Eff r (OnYield r))
-> OnYield r
OnSendShutdown :: ProcessId -> (ResumeProcess Bool -> Eff r (OnYield r)) -> OnYield r
runAsCoroutine :: forall r v . Eff (Process r ': r) v -> Eff r (OnYield r)
runAsCoroutine m = handle_relay (const $ return OnDone) cont m
where
cont :: Process r x -> (x -> Eff r (OnYield r)) -> Eff r (OnYield r)
cont YieldProcess k = return (OnYield k)
cont SelfPid k = return (OnSelf k)
cont (Spawn e) k = return (OnSpawn e k)
cont Shutdown _k = return OnDone
cont (ExitWithError e ) _k = return (OnExitError e)
cont (RaiseError e ) _k = return (OnRaiseError e)
cont (SendMessage tp msg) k = return (OnSend tp msg k)
cont ReceiveMessage k = return (OnRecv k)
cont (SendShutdown pid) k = return (OnSendShutdown pid k)
type LoggingAndIo =
'[ Logs String
, Lift IO
]
singleThreadedIoScheduler :: SchedulerProxy LoggingAndIo
singleThreadedIoScheduler = SchedulerProxy
defaultMain
:: HasCallStack
=> Eff '[Process '[Logs String, Lift IO], Logs String, Lift IO] ()
-> IO ()
defaultMain go =
runLift $ handleLogsWith (scheduleWithYield (lift yield) go) ($ putStrLn)