module Control.Eff.Concurrent.Process.SingleThreadedScheduler
( schedule
, schedulePure
, scheduleIO
, scheduleWithLogging
, defaultMain
, singleThreadedIoScheduler
, LoggingAndIo
)
where
import Control.Concurrent ( yield )
import Control.Eff
import Control.Eff.Concurrent.Process
import Control.Eff.Lift
import Control.Eff.Log
import Control.Lens hiding ( (|>)
, Empty
)
import Control.Monad ( void )
import qualified Data.Sequence as Seq
import Data.Sequence ( Seq(..) )
import qualified Data.Map.Strict as Map
import GHC.Stack
import Data.Kind ( )
import Data.Dynamic
import Data.Maybe
scheduleIO
:: SetMember Lift (Lift IO) r
=> Eff (ConsProcess r) a
-> Eff r (Either String a)
scheduleIO = schedule (lift yield)
schedulePure :: Eff (ConsProcess '[]) a -> Either String a
schedulePure = run . schedule (return ())
scheduleWithLogging :: Eff (ConsProcess '[Logs m]) a -> (Either String a, Seq m)
scheduleWithLogging = run . captureLogs . schedule (return ())
schedule
:: forall r finalResult
. Eff r ()
-> Eff (Process r ': r) finalResult
-> Eff r (Either String finalResult)
schedule yieldEff mainProcessAction = do
y <- runAsCoroutine mainProcessAction
go 1 (Map.singleton 0 Seq.empty) (Seq.singleton (y, 0))
where
go
:: ProcessId
-> Map.Map ProcessId (Seq Dynamic)
-> Seq (OnYield r finalResult, ProcessId)
-> Eff r (Either String finalResult)
go _newPid _msgQs Empty = return (Left "no main process")
go newPid msgQs allProcs@((processState, pid) :<| rest)
= let
handleExit res = if pid == 0
then return res
else go newPid (msgQs & at pid .~ Nothing) rest
maybeYield = if pid == 0 then yieldEff else return ()
in
case processState of
OnDone r -> handleExit (Right r)
OnShutdown -> handleExit (Left "process exited normally")
OnRaiseError errM -> handleExit (Left errM)
OnExitError errM -> handleExit (Left errM)
OnSendShutdown targetPid k -> do
let allButTarget =
Seq.filter (\(_, e) -> e /= pid && e /= targetPid) allProcs
targets = Seq.filter (\(_, e) -> e == targetPid) allProcs
suicide = targetPid == pid
targetFound = suicide || not (Seq.null targets)
if suicide
then do
nextK <- k ShutdownRequested
go newPid msgQs (rest :|> (nextK, pid))
else do
let deliverTheGoodNews (targetState, tPid) = do
nextTargetState <- case targetState of
OnSendShutdown _ tk -> tk ShutdownRequested
OnYield tk -> tk ShutdownRequested
OnSelf tk -> tk ShutdownRequested
OnSend _ _ tk -> tk ShutdownRequested
OnRecv tk -> tk ShutdownRequested
OnSpawn _ tk -> tk ShutdownRequested
OnDone x -> return (OnDone x)
OnShutdown -> return OnShutdown
OnExitError er -> return (OnExitError er)
OnRaiseError er -> return (OnExitError er)
return (nextTargetState, tPid)
nextTargets <- traverse deliverTheGoodNews targets
nextK <- k (ResumeWith targetFound)
maybeYield
go newPid
msgQs
(allButTarget Seq.>< (nextTargets :|> (nextK, pid)))
OnSelf k -> do
nextK <- k (ResumeWith pid)
maybeYield
go newPid msgQs (rest :|> (nextK, pid))
OnYield k -> do
yieldEff
nextK <- k (ResumeWith ())
go newPid msgQs (rest :|> (nextK, pid))
OnSend toPid msg k -> do
nextK <- k (ResumeWith (msgQs ^. at toPid . to isJust))
maybeYield
go newPid
(msgQs & at toPid . _Just %~ (:|> msg))
(rest :|> (nextK, pid))
recv@(OnRecv k) -> case msgQs ^. at pid of
Nothing -> do
nextK <- k (OnError (show pid ++ " has no message queue!"))
maybeYield
go newPid msgQs (rest :|> (nextK, pid))
Just Empty -> if Seq.length rest == 0
then do
nextK <- k (OnError ("Process " ++ show pid ++ " deadlocked!"))
maybeYield
go newPid msgQs (rest :|> (nextK, pid))
else go newPid msgQs (rest :|> (recv, pid))
Just (nextMessage :<| restMessages) -> do
nextK <- k (ResumeWith nextMessage)
maybeYield
go newPid
(msgQs & at pid . _Just .~ restMessages)
(rest :|> (nextK, pid))
OnSpawn f k -> do
nextK <- k (ResumeWith newPid)
fk <- runAsCoroutine (f >> exitNormally SP)
maybeYield
go (newPid + 1)
(msgQs & at newPid .~ Just Seq.empty)
(rest :|> (nextK, pid) :|> (fk, newPid))
data OnYield r a where
OnYield :: (ResumeProcess () -> Eff r (OnYield r a))
-> OnYield r a
OnSelf :: (ResumeProcess ProcessId -> Eff r (OnYield r a))
-> OnYield r a
OnSpawn :: Eff (Process r ': r) ()
-> (ResumeProcess ProcessId -> Eff r (OnYield r a))
-> OnYield r a
OnDone :: a -> OnYield r a
OnShutdown :: OnYield r a
OnExitError :: String -> OnYield r a
OnRaiseError :: String -> OnYield r a
OnSend :: ProcessId -> Dynamic
-> (ResumeProcess Bool -> Eff r (OnYield r a))
-> OnYield r a
OnRecv :: (ResumeProcess Dynamic -> Eff r (OnYield r a))
-> OnYield r a
OnSendShutdown :: ProcessId -> (ResumeProcess Bool -> Eff r (OnYield r a)) -> OnYield r a
runAsCoroutine :: forall r v . Eff (Process r ': r) v -> Eff r (OnYield r v)
runAsCoroutine m = handle_relay (return . OnDone) cont m
where
cont :: Process r x -> (x -> Eff r (OnYield r v)) -> Eff r (OnYield r v)
cont YieldProcess k = return (OnYield k)
cont SelfPid k = return (OnSelf k)
cont (Spawn e) k = return (OnSpawn e k)
cont Shutdown _k = return OnShutdown
cont (ExitWithError e ) _k = return (OnExitError e)
cont (RaiseError e ) _k = return (OnRaiseError e)
cont (SendMessage tp msg) k = return (OnSend tp msg k)
cont ReceiveMessage k = return (OnRecv k)
cont (SendShutdown pid) k = return (OnSendShutdown pid k)
type LoggingAndIo =
'[ Logs String
, Lift IO
]
singleThreadedIoScheduler :: SchedulerProxy LoggingAndIo
singleThreadedIoScheduler = SchedulerProxy
defaultMain
:: HasCallStack
=> Eff '[Process '[Logs String, Lift IO], Logs String, Lift IO] ()
-> IO ()
defaultMain go = void $ runLift $ handleLogsWith (scheduleIO go) ($! putStrLn)