{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE GADTs #-}
module Control.Eff.Concurrent.Process.ForkIOScheduler
( schedule
, defaultMain
, SchedulerError(..)
, SchedulerIO
, forkIoScheduler
, HasSchedulerIO
)
where
import Data.Foldable
import GHC.Stack
import Data.Bifunctor
import Data.Maybe
import Data.Kind ()
import qualified Control.Exception as Exc
import Control.Concurrent as Concurrent
import Control.Concurrent.STM as STM
import Control.Eff
import Control.Eff.Concurrent.Process
import Control.Eff.Lift
import Control.Eff.Log
import Control.Eff.Reader.Strict as Reader
import Control.Lens
import Control.Monad ( when
, void
, join
)
import qualified Control.Monad.State as Mtl
import Data.Dynamic
import Data.Map ( Map )
import qualified Data.Map as Map
data ProcessInfo =
ProcessInfo { _processId :: ProcessId
, _messageQ :: STM.TQueue (Maybe Dynamic)
, _shutdownRequested :: STM.TVar Bool
}
makeLenses ''ProcessInfo
instance Show ProcessInfo where
show p = "ProcessInfo: " ++ show (p ^. processId)
data Scheduler =
Scheduler { _nextPid :: ProcessId
, _processTable :: Map ProcessId ProcessInfo
, _threadIdTable :: Map ProcessId ThreadId
, _schedulerShuttingDown :: Bool
, _logChannel :: LogChannel String
}
makeLenses ''Scheduler
newtype SchedulerVar = SchedulerVar { fromSchedulerVar :: STM.TVar Scheduler }
deriving Typeable
data SchedulerError =
ProcessNotFound ProcessId
| ProcessRaisedError String
| ProcessExitError String
| ProcessShuttingDown
| SchedulerShuttingDown
deriving (Typeable, Show)
instance Exc.Exception SchedulerError
type HasSchedulerIO r = ( HasCallStack
, SetMember Lift (Lift IO) r
, Member (Logs String) r
, Member (Reader SchedulerVar) r)
type SchedulerIO =
'[ Reader SchedulerVar
, Logs String
, Lift IO
]
forkIoScheduler :: SchedulerProxy SchedulerIO
forkIoScheduler = SchedulerProxy
schedule :: Eff (ConsProcess SchedulerIO) () -> LogChannel String -> IO ()
schedule e logC =
void
$ withNewSchedulerState
$ \schedulerStateVar ->
do pidVar <- newEmptyTMVarIO
scheduleProcessWithShutdownAction schedulerStateVar pidVar
$ do mt <- lift myThreadId
mp <- lift (atomically (readTMVar pidVar))
logMsg (show mp ++ " main process started in thread " ++ show mt)
e
logMsg (show mp ++ " main process returned")
where
withNewSchedulerState :: (SchedulerVar -> IO a) -> IO a
withNewSchedulerState mainProcessAction = do
myTId <- myThreadId
Exc.bracket
(newTVarIO (Scheduler myPid Map.empty Map.empty False logC))
(tearDownScheduler myTId)
(mainProcessAction . SchedulerVar)
where
myPid = 1
tearDownScheduler myTId v = do
logChannelPutIO logC (show myTId ++" begin scheduler tear down")
sch <-
(atomically
(do
sch <- readTVar v
let sch' = sch & schedulerShuttingDown .~ True
writeTVar v sch'
return sch'
)
)
logChannelPutIO logC (show myTId ++ " killing " ++
let ts = (sch ^.. threadIdTable. traversed)
in if length ts > 100
then show (length ts) ++ " threads"
else show ts )
imapM_ (killProcThread myTId) (sch ^. threadIdTable)
Concurrent.yield
atomically
(do
scheduler <- readTVar v
let allThreadsDead = scheduler^.threadIdTable.to Map.null
&& scheduler^.processTable.to Map.null
STM.check allThreadsDead)
logChannelPutIO logC "all threads dead"
killProcThread myTId _pid tid =
when (myTId /= tid) ( killThread tid)
defaultMain :: Eff (ConsProcess SchedulerIO) () -> IO ()
defaultMain c =
runLoggingT
(logChannelBracket
(Just "~~~~~~ main process started")
(Just "====== main process exited")
(schedule c))
(print :: String -> IO ())
scheduleProcessWithShutdownAction
:: SchedulerVar
-> STM.TMVar ProcessId
-> Eff (ConsProcess SchedulerIO) ()
-> IO (Either Exc.SomeException ())
scheduleProcessWithShutdownAction schedulerVar pidVar procAction =
do cleanupVar <- newEmptyTMVarIO
logC <- getLogChannelIO schedulerVar
mTid <- myThreadId
eeres <- Exc.try (runProcEffects cleanupVar logC)
let eres = join eeres
getAndExecCleanup cleanupVar eres logC mTid
logChannelPutIO logC (show mTid ++ " <~< process cleanup finished")
return eres
where
runProcEffects cleanupVar l =
Data.Bifunctor.first Exc.SomeException
<$> runLift
(forwardLogsToChannel l
(runReader
(scheduleProcessWithCleanup
shutdownAction
saveCleanupAndSchedule)
schedulerVar))
where
shutdownAction =
ShutdownAction
(\eres ->
do let ex = either id (const ProcessShuttingDown) eres
Exc.throw ex)
saveCleanupAndSchedule cleanUpAction pid =
do lift (atomically (do STM.putTMVar cleanupVar cleanUpAction
STM.putTMVar pidVar pid))
mTid <- lift myThreadId
logMsg (show mTid ++ " >~> begin process " ++ show pid)
procAction
getAndExecCleanup cleanupVar eres lc mt =
do mcleanup <- atomically (STM.tryTakeTMVar cleanupVar)
traverse_ execCleanup mcleanup
where
execCleanup ca =
do runCleanUpAction ca
logChannelPutIO lc
$ show mt ++
case eres of
Left se ->
case Exc.fromException se of
Nothing ->
" process caught exception: "
++ Exc.displayException se
Just schedulerErr ->
(case schedulerErr of
ProcessShuttingDown ->
" process shutdown"
ProcessExitError m ->
" process exited with error: " ++ show m
ProcessRaisedError m ->
" process raised error: " ++ show m
_ ->
" scheduler error: " ++ show schedulerErr)
++ " - full exception message: "
++ Exc.displayException se
Right _ ->
" process function returned"
getLogChannel :: HasSchedulerIO r => Eff r (LogChannel String)
getLogChannel = do
s <- getSchedulerVar
lift (getLogChannelIO s)
getLogChannelIO :: SchedulerVar -> IO (LogChannel String)
getLogChannelIO s =
(view logChannel) <$> atomically (readTVar (fromSchedulerVar s))
overProcessInfo
:: HasSchedulerIO r
=> ProcessId
-> Mtl.StateT ProcessInfo STM.STM a
-> Eff r (Either SchedulerError a)
overProcessInfo pid stAction =
overScheduler
(do
res <- use (processTable . at pid)
case res of
Nothing -> return (Left (ProcessNotFound pid))
Just pinfo -> do
(x, pinfoOut) <- Mtl.lift (Mtl.runStateT stAction pinfo)
processTable . at pid . _Just .= pinfoOut
return (Right x))
spawnImpl :: HasCallStack
=> Eff (ConsProcess SchedulerIO) ()
-> Eff SchedulerIO ProcessId
spawnImpl mfa = do
schedulerVar <- ask
pidVar <- lift STM.newEmptyTMVarIO
void
$ lift
$ Concurrent.forkIO
$ void
$ scheduleProcessWithShutdownAction
schedulerVar
pidVar
mfa
lift Concurrent.yield
lift (atomically (STM.readTMVar pidVar))
newtype CleanUpAction = CleanUpAction { runCleanUpAction :: IO () }
scheduleProcessWithCleanup
:: HasCallStack
=> ShutdownAction
-> (CleanUpAction -> ProcessId -> Eff (ConsProcess SchedulerIO) ())
-> Eff SchedulerIO (Either SchedulerError ())
scheduleProcessWithCleanup shutdownAction processAction =
withMessageQueue
(\cleanUpAction pinfo ->
handle_relay
(\x -> return x)
(go (pinfo ^. processId))
(processAction cleanUpAction (pinfo ^. processId)))
where
shutdownOrGo :: forall v a
. HasCallStack
=> ProcessId
-> (ResumeProcess v -> Eff SchedulerIO a)
-> Eff SchedulerIO a
-> Eff SchedulerIO a
shutdownOrGo pid k ok =
do psVar <- getSchedulerTVar
eHasShutdowReq <-
lift
(do p <- atomically (readTVar psVar)
let mPinfo = p ^. processTable . at pid
case mPinfo of
Just pinfo ->
atomically
(do wasRequested <- readTVar (pinfo ^. shutdownRequested)
writeTVar (pinfo ^. shutdownRequested) False
return (Right wasRequested))
Nothing ->
return (Left (ProcessNotFound pid)))
case eHasShutdowReq of
Right True ->
k ShutdownRequested
Right False ->
ok
Left e ->
k (OnError (show e))
go :: forall v a
. HasCallStack
=> ProcessId
-> Process SchedulerIO v
-> (v -> Eff SchedulerIO a)
-> Eff SchedulerIO a
go pid (SendMessage toPid reqIn) k =
shutdownOrGo pid k $
do eres <-
do psVar <- getSchedulerTVar
lift
(Right <$>
(do p <- atomically (readTVar psVar)
let mto = p ^. processTable . at toPid
case mto of
Just toProc ->
do atomically
(writeTQueue
(toProc ^. messageQ)
(Just reqIn))
return True
Nothing ->
return False))
lift Concurrent.yield
let kArg = either (OnError . show @SchedulerError) ResumeWith eres
k kArg
go pid (SendShutdown toPid) k =
shutdownOrGo pid k
$ do eres <-
do psVar <- getSchedulerTVar
lift
(Right <$>
(do p <- atomically (readTVar psVar)
let mto = p ^. processTable . at toPid
case mto of
Just toProc ->
atomically $
do writeTVar (toProc ^. shutdownRequested) True
writeTQueue (toProc ^. messageQ) Nothing
return True
Nothing ->
return False))
let kArg = either (OnError . show @SchedulerError) resume eres
resume = if toPid == pid
then const ShutdownRequested
else ResumeWith
lift Concurrent.yield
k kArg
go pid (Spawn child) k =
shutdownOrGo pid k $
do res <- spawnImpl child
k (ResumeWith res)
go pid ReceiveMessage k =
shutdownOrGo pid k $
do emq <- overProcessInfo pid (use messageQ)
case emq of
Left e ->
k (OnError (show @SchedulerError e))
Right mq -> do
emdynMsg <- lift (Right <$> (atomically (readTQueue mq)))
k (either
(OnError . show @SchedulerError)
(maybe RetryLastAction ResumeWith)
emdynMsg)
go pid SelfPid k =
shutdownOrGo pid k $
do lift Concurrent.yield
k (ResumeWith pid)
go _pid Shutdown _k = do
invokeShutdownAction shutdownAction (Right ())
go _pid (ExitWithError msg) _k = do
invokeShutdownAction shutdownAction (Left (ProcessExitError msg))
go _pid (RaiseError msg) _k = do
invokeShutdownAction shutdownAction (Left (ProcessExitError msg))
data ShutdownAction =
ShutdownAction (forall a . Either SchedulerError () -> IO a)
invokeShutdownAction :: ( HasCallStack
, SetMember Lift (Lift IO) r
, Member (Logs String) r)
=> ShutdownAction -> Either SchedulerError () -> Eff r a
invokeShutdownAction (ShutdownAction a) res =
lift (a res)
withMessageQueue
:: HasSchedulerIO r
=> (CleanUpAction -> ProcessInfo -> Eff r a)
-> Eff r (Either SchedulerError a)
withMessageQueue m = do
mpinfo <- createQueue
lc <- getLogChannel
case mpinfo of
Right pinfo -> do
cleanUpAction <-
getSchedulerTVar >>= return . CleanUpAction . destroyQueue
lc
(pinfo ^. processId)
Right <$> m cleanUpAction pinfo
Left e ->
return $ Left e
where
createQueue = do
myTId <- lift myThreadId
overScheduler
(do
abortNow <- use schedulerShuttingDown
if abortNow
then return (Left SchedulerShuttingDown)
else do
pid <- nextPid <<+= 1
channel <- Mtl.lift newTQueue
shutdownIndicator <- Mtl.lift (newTVar False)
let pinfo = ProcessInfo pid channel shutdownIndicator
threadIdTable . at pid .= Just myTId
processTable . at pid .= Just pinfo
return (Right pinfo)
)
destroyQueue lc pid psVar = do
didWork <- Exc.try
(overSchedulerIO
psVar
(do os <- processTable . at pid <<.= Nothing
ot <- threadIdTable . at pid <<.= Nothing
return (os, isJust os || isJust ot)))
let getCause =
Exc.try @Exc.SomeException
(overSchedulerIO psVar (preuse (processTable . at pid)))
>>= either
(return . (show pid ++) . show)
(return . (maybe (show pid) show))
case didWork of
Right (_pinfo, True) ->
return ()
Right (pinfo, False) ->
logChannelPutIO lc ("queue already destroyed: " ++ show pinfo)
Left (e :: Exc.SomeException) ->
getCause
>>= logChannelPutIO lc
. (("failed to destroy queue: " ++ show e ++ " ") ++)
overScheduler
:: HasSchedulerIO r
=> Mtl.StateT Scheduler STM.STM (Either SchedulerError a)
-> Eff r (Either SchedulerError a)
overScheduler stAction = do
psVar <- getSchedulerTVar
lift (overSchedulerIO psVar stAction)
overSchedulerIO
:: STM.TVar Scheduler -> Mtl.StateT Scheduler STM.STM a -> IO a
overSchedulerIO psVar stAction =
do STM.atomically
(do ps <- STM.readTVar psVar
(result, psModified) <- Mtl.runStateT stAction ps
STM.writeTVar psVar psModified
return result)
getSchedulerTVar :: HasSchedulerIO r => Eff r (TVar Scheduler)
getSchedulerTVar = fromSchedulerVar <$> ask
getSchedulerVar :: HasSchedulerIO r => Eff r SchedulerVar
getSchedulerVar = ask