module Control.Eff.Concurrent.Process
(
ProcessId(..)
, fromProcessId
, Process(..)
, ConsProcess
, ResumeProcess(..)
, SchedulerProxy(..)
, thisSchedulerProxy
, executeAndCatch
, executeAndResume
, yieldProcess
, sendMessage
, sendMessageAs
, sendMessageChecked
, spawn
, spawn_
, receiveMessage
, receiveMessageAs
, receiveLoop
, self
, sendShutdown
, sendShutdownChecked
, exitWithError
, exitNormally
, raiseError
, catchRaisedError
, ignoreProcessError
)
where
import GHC.Generics ( Generic
, Generic1
)
import GHC.Stack
import Control.DeepSeq
import Control.Eff
import Control.Eff.Extend
import Control.Lens
import Control.Monad ( void )
import Data.Dynamic
import Data.Kind
import Text.Printf
data Process (r :: [Type -> Type]) b where
YieldProcess :: Process r (ResumeProcess ())
SelfPid :: Process r (ResumeProcess ProcessId)
Spawn :: Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId)
Shutdown :: Process r a
ExitWithError :: String -> Process r b
RaiseError :: String -> Process r b
SendShutdown :: ProcessId -> Process r (ResumeProcess Bool)
SendMessage :: ProcessId -> Dynamic -> Process r (ResumeProcess Bool)
ReceiveMessage :: Process r (ResumeProcess Dynamic)
data ResumeProcess v where
ShutdownRequested :: ResumeProcess v
OnError :: String -> ResumeProcess v
ResumeWith :: a -> ResumeProcess a
RetryLastAction :: ResumeProcess v
deriving ( Typeable, Foldable, Functor, Show, Eq, Ord
, Traversable, Generic, Generic1)
instance NFData a => NFData (ResumeProcess a)
instance NFData1 ResumeProcess
type ConsProcess r = Process r ': r
executeAndResume
:: forall r q v
. (SetMember Process (Process q) r, HasCallStack)
=> Process q (ResumeProcess v)
-> Eff r v
executeAndResume processAction = do
result <- send processAction
case result of
ResumeWith !value -> return value
RetryLastAction -> executeAndResume processAction
ShutdownRequested -> send (Shutdown @q)
OnError e -> send (ExitWithError @q e)
executeAndCatch
:: forall q r v
. (SetMember Process (Process q) r, HasCallStack)
=> SchedulerProxy q
-> Eff r (ResumeProcess v)
-> Eff r (Either String v)
executeAndCatch px processAction = do
result <- processAction
case result of
ResumeWith !value -> return (Right value)
RetryLastAction -> executeAndCatch px processAction
ShutdownRequested -> send (Shutdown @q)
OnError e -> return (Left e)
data SchedulerProxy :: [Type -> Type] -> Type where
SchedulerProxy :: SchedulerProxy q
SP :: SchedulerProxy q
thisSchedulerProxy :: Eff (Process r ': r) (SchedulerProxy r)
thisSchedulerProxy = return SchedulerProxy
yieldProcess
:: forall r q
. (HasCallStack, SetMember Process (Process q) r)
=> SchedulerProxy q
-> Eff r ()
yieldProcess _ = executeAndResume YieldProcess
sendMessage
:: forall r q
. (HasCallStack, SetMember Process (Process q) r)
=> SchedulerProxy q
-> ProcessId
-> Dynamic
-> Eff r ()
sendMessage px pid message = void (sendMessageChecked px pid message)
sendMessageChecked
:: forall r q
. (HasCallStack, SetMember Process (Process q) r)
=> SchedulerProxy q
-> ProcessId
-> Dynamic
-> Eff r Bool
sendMessageChecked _ pid message =
executeAndResume (SendMessage pid $! message)
sendMessageAs
:: forall o r q
. (HasCallStack, SetMember Process (Process q) r, Typeable o)
=> SchedulerProxy q
-> ProcessId
-> o
-> Eff r ()
sendMessageAs px pid = sendMessage px pid . toDyn
sendShutdown
:: forall r q
. (HasCallStack, SetMember Process (Process q) r)
=> SchedulerProxy q
-> ProcessId
-> Eff r ()
sendShutdown px pid = void (sendShutdownChecked px pid)
sendShutdownChecked
:: forall r q
. (HasCallStack, SetMember Process (Process q) r)
=> SchedulerProxy q
-> ProcessId
-> Eff r Bool
sendShutdownChecked _ pid = executeAndResume (SendShutdown pid)
spawn
:: forall r q
. (HasCallStack, SetMember Process (Process q) r)
=> Eff (Process q ': q) ()
-> Eff r ProcessId
spawn child = executeAndResume (Spawn @q child)
spawn_
:: forall r q
. (HasCallStack, SetMember Process (Process q) r)
=> Eff (Process q ': q) ()
-> Eff r ()
spawn_ = void . spawn
receiveMessage
:: forall r q
. (HasCallStack, SetMember Process (Process q) r)
=> SchedulerProxy q
-> Eff r Dynamic
receiveMessage _ = do
executeAndResume ReceiveMessage
receiveMessageAs
:: forall a r q
. (HasCallStack, Typeable a, SetMember Process (Process q) r)
=> SchedulerProxy q
-> Eff r a
receiveMessageAs px = do
messageDynamic <- receiveMessage px
let castAndCheck dm = case fromDynamic dm of
Nothing -> Left ("Invalid message type received: " ++ show dm)
Just m -> Right m
maybeMessage = castAndCheck messageDynamic
either (raiseError px) return maybeMessage
receiveLoop
:: forall r q
. (SetMember Process (Process q) r, HasCallStack)
=> SchedulerProxy q
-> (Either (Maybe String) Dynamic -> Eff r ())
-> Eff r ()
receiveLoop px handlers = do
mReq <- send (ReceiveMessage @q)
case mReq of
RetryLastAction -> receiveLoop px handlers
ShutdownRequested -> handlers (Left Nothing) >> receiveLoop px handlers
OnError reason -> handlers (Left (Just reason)) >> receiveLoop px handlers
ResumeWith message -> handlers (Right message) >> receiveLoop px handlers
self
:: (HasCallStack, SetMember Process (Process q) r)
=> SchedulerProxy q
-> Eff r ProcessId
self _px = executeAndResume SelfPid
exitNormally
:: forall r q a
. (HasCallStack, SetMember Process (Process q) r)
=> SchedulerProxy q
-> Eff r a
exitNormally _ = send (Shutdown @q)
exitWithError
:: forall r q a
. (HasCallStack, SetMember Process (Process q) r)
=> SchedulerProxy q
-> String
-> Eff r a
exitWithError _ = send . (ExitWithError @q $!)
raiseError
:: forall r q b
. (HasCallStack, SetMember Process (Process q) r)
=> SchedulerProxy q
-> String
-> Eff r b
raiseError _ = send . (RaiseError @q $!)
catchRaisedError
:: forall r q w
. (HasCallStack, SetMember Process (Process q) r)
=> SchedulerProxy q
-> (String -> Eff r w)
-> Eff r w
-> Eff r w
catchRaisedError _ onErr = interpose return go
where
go :: forall b . Process q b -> (b -> Eff r w) -> Eff r w
go (RaiseError emsg) _k = onErr emsg
go s k = send s >>= k
ignoreProcessError
:: (HasCallStack, SetMember Process (Process q) r)
=> SchedulerProxy q
-> Eff r a
-> Eff r (Either String a)
ignoreProcessError px = catchRaisedError px (return . Left) . fmap Right
newtype ProcessId = ProcessId { _fromProcessId :: Int }
deriving (Eq,Ord,Typeable,Bounded,Num, Enum, Integral, Real)
instance Read ProcessId where
readsPrec _ ('<':'0':'.':rest1) =
case reads rest1 of
[(c, '.':'0':'>':rest2)] -> [(ProcessId c, rest2)]
_ -> []
readsPrec _ _ = []
instance Show ProcessId where
show (ProcessId c) =
printf "<0.%d.0>" c
makeLenses ''ProcessId