module Control.Eff.Concurrent.Process.SingleThreadedScheduler
( scheduleM
, schedulePure
, scheduleIO
, scheduleMonadIOEff
, scheduleIOWithLogging
, defaultMain
, singleThreadedIoScheduler
, LoggingAndIo
)
where
import Control.Concurrent ( yield )
import Control.DeepSeq
import Control.Eff
import Control.Eff.Lift
import Control.Eff.Extend
import Control.Eff.Concurrent.Process
import Control.Eff.Log
import Control.Lens hiding ( (|>)
, Empty
)
import Control.Monad ( void )
import Control.Monad.IO.Class
import qualified Data.Sequence as Seq
import Data.Sequence ( Seq(..) )
import qualified Data.Map.Strict as Map
import GHC.Stack
import Data.Kind ( )
import Data.Dynamic
import Data.Maybe
schedulePure :: Eff (ConsProcess '[]) a -> Either String a
schedulePure = runIdentity . scheduleM (Identity . run) (return ())
scheduleIO
:: MonadIO m
=> (forall b . Eff r b -> Eff '[Lift m] b)
-> Eff (ConsProcess r) a
-> m (Either String a)
scheduleIO runEff = scheduleM (runLift . runEff) (liftIO yield)
scheduleMonadIOEff
:: MonadIO (Eff r) => Eff (ConsProcess r) a -> Eff r (Either String a)
scheduleMonadIOEff =
scheduleM id (liftIO yield)
scheduleIOWithLogging
:: (NFData l, MonadIO m)
=> (l -> m ())
-> Eff (ConsProcess '[Logs l, Lift m]) a
-> m (Either String a)
scheduleIOWithLogging handleLog = scheduleIO (handleLogsWith handleLog)
scheduleM
:: Monad m
=> (forall b . Eff r b -> m b)
-> m ()
-> Eff (ConsProcess r) a
-> m (Either String a)
scheduleM runEff yieldEff e = do
y <- runAsCoroutinePure runEff e
handleProcess runEff
yieldEff
1
(Map.singleton 0 Seq.empty)
(Seq.singleton (y, 0))
data OnYield r a where
OnYield :: (ResumeProcess () -> Eff r (OnYield r a))
-> OnYield r a
OnSelf :: (ResumeProcess ProcessId -> Eff r (OnYield r a))
-> OnYield r a
OnSpawn :: Eff (Process r ': r) ()
-> (ResumeProcess ProcessId -> Eff r (OnYield r a))
-> OnYield r a
OnDone :: !a -> OnYield r a
OnShutdown :: OnYield r a
OnExitError :: !String -> OnYield r a
OnRaiseError :: !String -> OnYield r a
OnSend :: !ProcessId -> !Dynamic
-> (ResumeProcess Bool -> Eff r (OnYield r a))
-> OnYield r a
OnRecv :: (ResumeProcess Dynamic -> Eff r (OnYield r a))
-> OnYield r a
OnSendShutdown :: !ProcessId -> (ResumeProcess Bool -> Eff r (OnYield r a)) -> OnYield r a
handleProcess
:: Monad m
=> (forall a . Eff r a -> m a)
-> m ()
-> ProcessId
-> Map.Map ProcessId (Seq Dynamic)
-> Seq (OnYield r finalResult, ProcessId)
-> m (Either String finalResult)
handleProcess _runEff _yieldEff _newPid _msgQs Empty =
return $ Left "no main process"
handleProcess runEff yieldEff !newPid !msgQs allProcs@((!processState, !pid) :<| rest)
= let handleExit res = if pid == 0
then return res
else handleProcess runEff
yieldEff
newPid
(msgQs & at pid .~ Nothing)
rest
in
case processState of
OnDone r -> handleExit (Right r)
OnShutdown -> handleExit (Left "process exited normally")
OnRaiseError errM -> handleExit (Left errM)
OnExitError errM -> handleExit (Left errM)
OnSendShutdown targetPid k -> do
let allButTarget =
Seq.filter (\(_, e) -> e /= pid && e /= targetPid) allProcs
targets = Seq.filter (\(_, e) -> e == targetPid) allProcs
suicide = targetPid == pid
targetFound = suicide || not (Seq.null targets)
if suicide
then do
nextK <- runEff $ k ShutdownRequested
handleProcess runEff yieldEff newPid msgQs (rest :|> (nextK, pid))
else do
let deliverTheGoodNews (targetState, tPid) = do
nextTargetState <- case targetState of
OnSendShutdown _ tk -> tk ShutdownRequested
OnYield tk -> tk ShutdownRequested
OnSelf tk -> tk ShutdownRequested
OnSend _ _ tk -> tk ShutdownRequested
OnRecv tk -> tk ShutdownRequested
OnSpawn _ tk -> tk ShutdownRequested
OnDone x -> return (OnDone x)
OnShutdown -> return OnShutdown
OnExitError er -> return (OnExitError er)
OnRaiseError er -> return (OnExitError er)
return (nextTargetState, tPid)
nextTargets <- runEff $ traverse deliverTheGoodNews targets
nextK <- runEff $ k (ResumeWith targetFound)
handleProcess
runEff
yieldEff
newPid
msgQs
(allButTarget Seq.>< (nextTargets :|> (nextK, pid)))
OnSelf k -> do
nextK <- runEff $ k (ResumeWith pid)
handleProcess runEff yieldEff newPid msgQs (rest :|> (nextK, pid))
OnYield k -> do
yieldEff
nextK <- runEff $ k (ResumeWith ())
handleProcess runEff yieldEff newPid msgQs (rest :|> (nextK, pid))
OnSend toPid msg k -> do
nextK <- runEff $ k (ResumeWith (msgQs ^. at toPid . to isJust))
handleProcess runEff
yieldEff
newPid
(msgQs & at toPid . _Just %~ (:|> msg))
(rest :|> (nextK, pid))
recv@(OnRecv k) -> case msgQs ^. at pid of
Nothing -> do
nextK <- runEff $ k (OnError (show pid ++ " has no message queue!"))
handleProcess runEff yieldEff newPid msgQs (rest :|> (nextK, pid))
Just Empty -> if Seq.length rest == 0
then do
nextK <- runEff
$ k (OnError ("Process " ++ show pid ++ " deadlocked!"))
handleProcess runEff yieldEff newPid msgQs (rest :|> (nextK, pid))
else handleProcess runEff
yieldEff
newPid
msgQs
(rest :|> (recv, pid))
Just (nextMessage :<| restMessages) -> do
nextK <- runEff $ k (ResumeWith nextMessage)
handleProcess runEff
yieldEff
newPid
(msgQs & at pid . _Just .~ restMessages)
(rest :|> (nextK, pid))
OnSpawn f k -> do
nextK <- runEff $ k (ResumeWith newPid)
fk <- runAsCoroutinePure runEff (f >> exitNormally SP)
handleProcess runEff
yieldEff
(newPid + 1)
(msgQs & at newPid ?~ Seq.empty)
(rest :|> (nextK, pid) :|> (fk, newPid))
runAsCoroutinePure
:: forall v r m
. Monad m
=> (forall a . Eff r a -> m a)
-> Eff (ConsProcess r) v
-> m (OnYield r v)
runAsCoroutinePure runEff = runEff . handle_relay (return . OnDone) cont
where
cont :: Process r x -> (x -> Eff r (OnYield r v)) -> Eff r (OnYield r v)
cont YieldProcess k = return (OnYield k)
cont SelfPid k = return (OnSelf k)
cont (Spawn e) k = return (OnSpawn e k)
cont Shutdown _k = return OnShutdown
cont (ExitWithError !e ) _k = return (OnExitError e)
cont (RaiseError !e ) _k = return (OnRaiseError e)
cont (SendMessage !tp !msg) k = return (OnSend tp msg k)
cont ReceiveMessage k = return (OnRecv k)
cont (SendShutdown !pid) k = return (OnSendShutdown pid k)
type LoggingAndIo =
'[ Logs String
, Lift IO
]
singleThreadedIoScheduler :: SchedulerProxy LoggingAndIo
singleThreadedIoScheduler = SchedulerProxy
defaultMain
:: HasCallStack
=> Eff '[Process '[Logs String, Lift IO], Logs String, Lift IO] ()
-> IO ()
defaultMain e = void $ runLift $ handleLogsWithLoggingTHandler
(scheduleMonadIOEff e)
($! putStrLn)