{-# LANGUAGE ImplicitParams #-}
module Control.Eff.Concurrent.Process
(
Process(..)
, SafeProcesses
, Processes
, HasProcesses
, HasSafeProcesses
, ProcessTitle(..)
, fromProcessTitle
, ProcessDetails(..)
, fromProcessDetails
, Timeout(TimeoutMicros, fromTimeoutMicros)
,
StrictDynamic()
, toStrictDynamic
, fromStrictDynamic
, unwrapStrictDynamic
, Serializer(..)
, ProcessId(..)
, fromProcessId
, ProcessState(..)
, yieldProcess
, delay
, sendMessage
, sendAnyMessage
, makeReference
, receiveMessage
, receiveSelectedMessage
, flushMessages
, receiveAnyMessage
, receiveLoop
, receiveSelectedLoop
, receiveAnyLoop
, MessageSelector(runMessageSelector)
, selectMessage
, filterMessage
, selectMessageWith
, selectDynamicMessage
, selectAnyMessage
, self
, isProcessAlive
, getProcessState
, updateProcessDetails
, spawn
, spawn_
, spawnLink
, spawnRaw
, spawnRaw_
, ResumeProcess(..)
, executeAndResume
, executeAndResumeOrExit
, executeAndResumeOrThrow
, interrupt
, sendInterrupt
, exitBecause
, exitNormally
, exitWithError
, sendShutdown
, linkProcess
, unlinkProcess
, monitor
, demonitor
, ProcessDown(..)
, selectProcessDown
, selectProcessDownByProcessId
, becauseProcessIsDown
, MonitorReference(..)
, withMonitor
, receiveWithMonitor
, Interrupt(..)
, Interrupts
, interruptToExit
, ExitRecovery(..)
, RecoverableInterrupt
, ExitSeverity(..)
, SomeExitReason(SomeExitReason)
, toExitRecovery
, isRecoverable
, toExitSeverity
, isProcessDownInterrupt
, isCrash
, toCrashReason
, fromSomeExitReason
, logProcessExit
, provideInterruptsShutdown
, handleInterrupts
, tryUninterrupted
, exitOnInterrupt
, logInterrupts
, provideInterrupts
, mergeEitherInterruptAndExitReason
, sendToReceiver
, Receiver(..)
, receiverPid
)
where
import Control.Applicative
import Control.Eff.Concurrent.Misc
import Control.DeepSeq
import Control.Eff
import Control.Eff.Exception
import Control.Eff.Extend
import Control.Eff.Log.Handler
import qualified Control.Exception as Exc
import Control.Lens
import Control.Monad ( void
, (>=>)
)
import Data.Default
import Data.Dynamic
import Data.Functor.Contravariant ()
import Data.Kind
import Data.Function
import Data.Maybe
import Data.String ( IsString, fromString )
import Data.Text ( Text, pack, unpack)
import qualified Data.Text as T
import Type.Reflection ( SomeTypeRep(..), typeRep )
import GHC.Stack
import GHC.Generics ( Generic
, Generic1
)
data Process (r :: [Type -> Type]) b where
FlushMessages :: Process r (ResumeProcess [StrictDynamic])
YieldProcess :: Process r (ResumeProcess ())
Delay :: Timeout -> Process r (ResumeProcess ())
SelfPid :: Process r (ResumeProcess ProcessId)
Spawn :: ProcessTitle -> Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId)
SpawnLink :: ProcessTitle -> Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId)
Shutdown :: Interrupt 'NoRecovery -> Process r a
SendShutdown ::ProcessId -> Interrupt 'NoRecovery -> Process r (ResumeProcess ())
SendInterrupt :: ProcessId -> Interrupt 'Recoverable -> Process r (ResumeProcess ())
SendMessage :: ProcessId -> StrictDynamic -> Process r (ResumeProcess ())
ReceiveSelectedMessage :: forall r a . MessageSelector a -> Process r (ResumeProcess a)
MakeReference :: Process r (ResumeProcess Int)
Monitor :: ProcessId -> Process r (ResumeProcess MonitorReference)
Demonitor :: MonitorReference -> Process r (ResumeProcess ())
Link :: ProcessId -> Process r (ResumeProcess ())
Unlink :: ProcessId -> Process r (ResumeProcess ())
UpdateProcessDetails :: ProcessDetails -> Process r (ResumeProcess ())
GetProcessState :: ProcessId -> Process r (ResumeProcess (Maybe (ProcessTitle, ProcessDetails, ProcessState)))
instance Show (Process r b) where
showsPrec d = \case
FlushMessages -> showString "flush messages"
YieldProcess -> showString "yield process"
Delay t -> showString "delay process until: " . shows t
SelfPid -> showString "lookup the current process id"
Spawn t _ -> showString "spawn a new process: " . shows t
SpawnLink t _ -> showString "spawn a new process and link to it" . shows t
Shutdown sr ->
showParen (d >= 10) (showString "shutdown " . showsPrec 10 sr)
SendShutdown toPid sr -> showParen
(d >= 10)
( showString "shutting down "
. showsPrec 10 toPid
. showChar ' '
. showsPrec 10 sr
)
SendInterrupt toPid sr -> showParen
(d >= 10)
( showString "interrupting "
. showsPrec 10 toPid
. showChar ' '
. showsPrec 10 sr
)
SendMessage toPid sr -> showParen
(d >= 10)
( showString "sending to "
. showsPrec 10 toPid
. showChar ' '
. showsPrec 10 sr
)
ReceiveSelectedMessage _ -> showString "receive a message"
MakeReference -> showString "generate a unique reference"
Monitor pid -> showString "monitor " . shows pid
Demonitor i -> showString "demonitor " . shows i
Link l -> showString "link " . shows l
Unlink l -> showString "unlink " . shows l
GetProcessState pid -> showString "get the process state of " . shows pid
UpdateProcessDetails l -> showString "update the process details to: " . shows l
newtype ProcessTitle =
MkProcessTitle { _fromProcessTitle :: Text }
deriving (Eq, Ord, NFData, Generic, IsString, Typeable, Semigroup, Monoid)
fromProcessTitle :: Lens' ProcessTitle Text
fromProcessTitle = iso _fromProcessTitle MkProcessTitle
instance Show ProcessTitle where
showsPrec _ (MkProcessTitle t) = showString (T.unpack t)
newtype ProcessDetails =
MkProcessDetails { _fromProcessDetails :: Text }
deriving (Eq, Ord, NFData, Generic, IsString, Typeable, Semigroup, Monoid)
fromProcessDetails :: Lens' ProcessDetails Text
fromProcessDetails = iso _fromProcessDetails MkProcessDetails
instance Show ProcessDetails where
showsPrec _ (MkProcessDetails t) = showString (T.unpack t)
newtype Timeout = TimeoutMicros {fromTimeoutMicros :: Int}
deriving (NFData, Ord,Eq, Num, Integral, Real, Enum, Typeable)
instance Show Timeout where
showsPrec d (TimeoutMicros t) =
showParen (d >= 10) (showString "timeout: " . shows t . showString " µs")
newtype StrictDynamic where
MkDynamicMessage :: Dynamic -> StrictDynamic
deriving Typeable
instance NFData StrictDynamic where
rnf (MkDynamicMessage d) = d `seq` ()
instance Show StrictDynamic where
show (MkDynamicMessage d) = show d
newtype Serializer message =
MkSerializer
{ runSerializer :: message -> StrictDynamic
} deriving (Typeable)
instance NFData (Serializer message) where
rnf (MkSerializer !s) = s `seq` ()
instance Typeable message => Show (Serializer message) where
showsPrec d _x = showParen (d >= 10) (showSTypeable @message . showString "-serializer")
instance Contravariant Serializer where
contramap f (MkSerializer b) = MkSerializer (b . f)
toStrictDynamic :: (Typeable a, NFData a) => a -> StrictDynamic
toStrictDynamic x = force x `seq` toDyn (force x) `seq` MkDynamicMessage (toDyn (force x))
fromStrictDynamic :: Typeable a => StrictDynamic -> Maybe a
fromStrictDynamic (MkDynamicMessage d) = fromDynamic d
unwrapStrictDynamic :: StrictDynamic -> Dynamic
unwrapStrictDynamic (MkDynamicMessage d) = d
data ResumeProcess v where
Interrupted :: Interrupt 'Recoverable -> ResumeProcess v
ResumeWith ::a -> ResumeProcess a
deriving ( Typeable, Generic, Generic1, Show )
instance NFData a => NFData (ResumeProcess a)
instance NFData1 ResumeProcess
newtype MessageSelector a =
MessageSelector {runMessageSelector :: StrictDynamic -> Maybe a }
deriving (Semigroup, Monoid, Functor)
instance Applicative MessageSelector where
pure = MessageSelector . pure . pure
(MessageSelector f) <*> (MessageSelector x) =
MessageSelector (\dyn -> f dyn <*> x dyn)
instance Alternative MessageSelector where
empty = MessageSelector (const empty)
(MessageSelector l) <|> (MessageSelector r) =
MessageSelector (\dyn -> l dyn <|> r dyn)
selectMessage :: Typeable t => MessageSelector t
selectMessage = selectDynamicMessage fromStrictDynamic
filterMessage :: Typeable a => (a -> Bool) -> MessageSelector a
filterMessage predicate = selectDynamicMessage
(\d -> case fromStrictDynamic d of
Just a | predicate a -> Just a
_ -> Nothing
)
selectMessageWith
:: Typeable a => (a -> Maybe b) -> MessageSelector b
selectMessageWith f = selectDynamicMessage (fromStrictDynamic >=> f)
selectDynamicMessage :: (StrictDynamic -> Maybe a) -> MessageSelector a
selectDynamicMessage = MessageSelector
selectAnyMessage :: MessageSelector StrictDynamic
selectAnyMessage = MessageSelector Just
data ProcessState =
ProcessBooting
| ProcessIdle
| ProcessBusy
| ProcessBusySleeping
| ProcessBusyUpdatingDetails
| ProcessBusySending
| ProcessBusySendingShutdown
| ProcessBusySendingInterrupt
| ProcessBusyReceiving
| ProcessBusyLinking
| ProcessBusyUnlinking
| ProcessBusyMonitoring
| ProcessBusyDemonitoring
| ProcessInterrupted
| ProcessShuttingDown
deriving (Read, Show, Ord, Eq, Enum, Generic)
instance NFData ProcessState
instance Default ProcessState where
def = ProcessBooting
data ExitRecovery = Recoverable | NoRecovery
deriving (Typeable, Ord, Eq, Generic)
instance NFData ExitRecovery
instance Show ExitRecovery where
showsPrec d =
showParen (d >= 10)
. (\case
Recoverable -> showString "recoverable"
NoRecovery -> showString "not recoverable"
)
toExitRecovery :: Interrupt r -> ExitRecovery
toExitRecovery =
\case
NormalExitRequested -> Recoverable
(NormalExitRequestedWith _) -> Recoverable
(OtherProcessNotRunning _) -> Recoverable
(TimeoutInterrupt _) -> Recoverable
(LinkedProcessCrashed _) -> Recoverable
(InterruptedBy _) -> Recoverable
(ErrorInterrupt _) -> Recoverable
ExitNormally -> NoRecovery
(ExitNormallyWith _) -> NoRecovery
(ExitUnhandledError _) -> NoRecovery
(ExitProcessCancelled _) -> NoRecovery
(ExitOtherProcessNotRunning _) -> NoRecovery
data ExitSeverity = NormalExit | Crash
deriving (Typeable, Ord, Eq, Generic)
instance Show ExitSeverity where
showsPrec d =
showParen (d >= 10)
. (\case
NormalExit -> showString "exit success"
Crash -> showString "crash"
)
instance NFData ExitSeverity
toExitSeverity :: Interrupt e -> ExitSeverity
toExitSeverity = \case
ExitNormally -> NormalExit
NormalExitRequested -> NormalExit
_ -> Crash
data Interrupt (t :: ExitRecovery) where
NormalExitRequested
:: Interrupt 'Recoverable
NormalExitRequestedWith
:: forall a . (Typeable a, Show a, NFData a) => a -> Interrupt 'Recoverable
OtherProcessNotRunning
:: ProcessId -> Interrupt 'Recoverable
TimeoutInterrupt
:: String -> Interrupt 'Recoverable
LinkedProcessCrashed
:: ProcessId -> Interrupt 'Recoverable
ErrorInterrupt
:: String -> Interrupt 'Recoverable
InterruptedBy
:: forall a . (Typeable a, Show a, NFData a) => a -> Interrupt 'Recoverable
ExitNormally
:: Interrupt 'NoRecovery
ExitNormallyWith
:: forall a . (Typeable a, Show a, NFData a) => a -> Interrupt 'NoRecovery
ExitUnhandledError
:: Text -> Interrupt 'NoRecovery
ExitProcessCancelled
:: Maybe ProcessId -> Interrupt 'NoRecovery
ExitOtherProcessNotRunning
:: ProcessId -> Interrupt 'NoRecovery
deriving Typeable
interruptToExit :: Interrupt 'Recoverable -> Interrupt 'NoRecovery
interruptToExit NormalExitRequested = ExitNormally
interruptToExit (NormalExitRequestedWith x) = (ExitNormallyWith x)
interruptToExit x = ExitUnhandledError (pack (show x))
instance Show (Interrupt x) where
showsPrec d =
showParen (d >= 10) .
(\case
NormalExitRequested -> showString "interrupt: A normal exit was requested"
NormalExitRequestedWith p -> showString "interrupt: A normal exit was requested: " . showsPrec 10 p
OtherProcessNotRunning p -> showString "interrupt: Another process is not running: " . showsPrec 10 p
TimeoutInterrupt reason -> showString "interrupt: A timeout occured: " . showString reason
LinkedProcessCrashed m -> showString "interrupt: A linked process " . showsPrec 10 m . showString " crashed"
InterruptedBy reason -> showString "interrupt: " . showsPrec 10 reason
ErrorInterrupt reason -> showString "interrupt: An error occured: " . showString reason
ExitNormally -> showString "exit: Process finished successfully"
ExitNormallyWith reason -> showString "exit: Process finished successfully: " . showsPrec 10 reason
ExitUnhandledError w -> showString "exit: Unhandled " . showString (unpack w)
ExitProcessCancelled Nothing -> showString "exit: The process was cancelled by a runtime exception"
ExitProcessCancelled (Just origin) -> showString "exit: The process was cancelled by: " . shows origin
ExitOtherProcessNotRunning p -> showString "exit: Another process is not running: " . showsPrec 10 p)
instance Exc.Exception (Interrupt 'Recoverable)
instance Exc.Exception (Interrupt 'NoRecovery )
instance NFData (Interrupt x) where
rnf NormalExitRequested = rnf ()
rnf (NormalExitRequestedWith !l) = rnf l
rnf (OtherProcessNotRunning !l) = rnf l
rnf (TimeoutInterrupt !l) = rnf l
rnf (LinkedProcessCrashed !l) = rnf l
rnf (ErrorInterrupt !l) = rnf l
rnf (InterruptedBy !l) = rnf l
rnf ExitNormally = rnf ()
rnf (ExitNormallyWith !l) = rnf l
rnf (ExitUnhandledError !l) = rnf l
rnf (ExitProcessCancelled !o) = rnf o
rnf (ExitOtherProcessNotRunning !l) = rnf l
instance Ord (Interrupt x) where
compare NormalExitRequested NormalExitRequested = EQ
compare NormalExitRequested _ = LT
compare _ NormalExitRequested = GT
compare (NormalExitRequestedWith _) (NormalExitRequestedWith _) = EQ
compare (NormalExitRequestedWith _) _ = LT
compare _ (NormalExitRequestedWith _) = GT
compare (OtherProcessNotRunning l) (OtherProcessNotRunning r) = compare l r
compare (OtherProcessNotRunning _) _ = LT
compare _ (OtherProcessNotRunning _) = GT
compare (TimeoutInterrupt l) (TimeoutInterrupt r) = compare l r
compare (TimeoutInterrupt _) _ = LT
compare _ (TimeoutInterrupt _) = GT
compare (LinkedProcessCrashed l) (LinkedProcessCrashed r) = compare l r
compare (LinkedProcessCrashed _) _ = LT
compare _ (LinkedProcessCrashed _) = GT
compare (InterruptedBy _) (InterruptedBy _) = EQ
compare (InterruptedBy _) _ = LT
compare _ (InterruptedBy _) = GT
compare (ErrorInterrupt l) (ErrorInterrupt r) = compare l r
compare ExitNormally ExitNormally = EQ
compare ExitNormally _ = LT
compare _ ExitNormally = GT
compare (ExitNormallyWith _) (ExitNormallyWith _) = EQ
compare (ExitNormallyWith _ ) _ = LT
compare _ (ExitNormallyWith _) = GT
compare (ExitUnhandledError l) (ExitUnhandledError r) = compare l r
compare (ExitUnhandledError _ ) _ = LT
compare _ (ExitUnhandledError _) = GT
compare (ExitOtherProcessNotRunning l) (ExitOtherProcessNotRunning r) = compare l r
compare (ExitOtherProcessNotRunning _ ) _ = LT
compare _ (ExitOtherProcessNotRunning _) = GT
compare (ExitProcessCancelled l) (ExitProcessCancelled r) = compare l r
instance Eq (Interrupt x) where
(==) NormalExitRequested NormalExitRequested = True
(==) (OtherProcessNotRunning l) (OtherProcessNotRunning r) = (==) l r
(==) ExitNormally ExitNormally = True
(==) (TimeoutInterrupt l) (TimeoutInterrupt r) = l == r
(==) (LinkedProcessCrashed l) (LinkedProcessCrashed r) = l == r
(==) (ErrorInterrupt l) (ErrorInterrupt r) = (==) l r
(==) (ExitUnhandledError l) (ExitUnhandledError r) = (==) l r
(==) (ExitOtherProcessNotRunning l) (ExitOtherProcessNotRunning r) = (==) l r
(==) (ExitProcessCancelled l) (ExitProcessCancelled r) = l == r
(==) _ _ = False
isProcessDownInterrupt :: Maybe ProcessId -> Interrupt r -> Bool
isProcessDownInterrupt mOtherProcess =
\case
NormalExitRequested -> False
NormalExitRequestedWith _ -> False
OtherProcessNotRunning _ -> False
TimeoutInterrupt _ -> False
LinkedProcessCrashed p -> maybe True (== p) mOtherProcess
InterruptedBy _ -> False
ErrorInterrupt _ -> False
ExitNormally -> False
ExitNormallyWith _ -> False
ExitUnhandledError _ -> False
ExitProcessCancelled _ -> False
ExitOtherProcessNotRunning _ -> False
type RecoverableInterrupt = Interrupt 'Recoverable
type Processes e = Interrupts ': SafeProcesses e
type HasProcesses e inner = (HasSafeProcesses e inner, Member Interrupts e)
type SafeProcesses r = Process r ': r
type HasSafeProcesses e inner = (SetMember Process (Process inner) e)
type Interrupts = Exc (Interrupt 'Recoverable)
provideInterruptsShutdown
:: forall e a . Eff (Processes e) a -> Eff (SafeProcesses e) a
provideInterruptsShutdown e = do
res <- provideInterrupts e
case res of
Left ex -> send (Shutdown @e (interruptToExit ex))
Right a -> return a
handleInterrupts
:: (HasCallStack, Member Interrupts r)
=> (Interrupt 'Recoverable -> Eff r a)
-> Eff r a
-> Eff r a
handleInterrupts = flip catchError
tryUninterrupted
:: (HasCallStack, Member Interrupts r)
=> Eff r a
-> Eff r (Either (Interrupt 'Recoverable) a)
tryUninterrupted = handleInterrupts (pure . Left) . fmap Right
logInterrupts
:: forall r
. (Member Logs r, HasCallStack, Member Interrupts r)
=> Eff r ()
-> Eff r ()
logInterrupts = handleInterrupts logProcessExit
exitOnInterrupt
:: (HasCallStack, HasProcesses r q)
=> Eff r a
-> Eff r a
exitOnInterrupt = handleInterrupts (exitBecause . interruptToExit)
provideInterrupts
:: HasCallStack => Eff (Interrupts ': r) a -> Eff r (Either (Interrupt 'Recoverable) a)
provideInterrupts = runError
mergeEitherInterruptAndExitReason
:: Either (Interrupt 'Recoverable) (Interrupt 'NoRecovery) -> Interrupt 'NoRecovery
mergeEitherInterruptAndExitReason = either interruptToExit id
interrupt :: (HasCallStack, Member Interrupts r) => Interrupt 'Recoverable -> Eff r a
interrupt = throwError
isCrash :: Interrupt x -> Bool
isCrash NormalExitRequested = False
isCrash ExitNormally = False
isCrash _ = True
isRecoverable :: Interrupt x -> Bool
isRecoverable (toExitRecovery -> Recoverable) = True
isRecoverable _ = False
data SomeExitReason where
SomeExitReason ::Interrupt x -> SomeExitReason
instance Ord SomeExitReason where
compare = compare `on` fromSomeExitReason
instance Eq SomeExitReason where
(==) = (==) `on` fromSomeExitReason
instance Show SomeExitReason where
show = show . fromSomeExitReason
instance NFData SomeExitReason where
rnf = rnf . fromSomeExitReason
fromSomeExitReason :: SomeExitReason -> Either (Interrupt 'NoRecovery) (Interrupt 'Recoverable)
fromSomeExitReason (SomeExitReason e) =
case e of
recoverable@NormalExitRequested -> Right recoverable
recoverable@(NormalExitRequestedWith _) -> Right recoverable
recoverable@(OtherProcessNotRunning _) -> Right recoverable
recoverable@(TimeoutInterrupt _) -> Right recoverable
recoverable@(LinkedProcessCrashed _) -> Right recoverable
recoverable@(InterruptedBy _) -> Right recoverable
recoverable@(ErrorInterrupt _) -> Right recoverable
noRecovery@ExitNormally -> Left noRecovery
noRecovery@(ExitNormallyWith _) -> Left noRecovery
noRecovery@(ExitUnhandledError _) -> Left noRecovery
noRecovery@(ExitProcessCancelled _) -> Left noRecovery
noRecovery@(ExitOtherProcessNotRunning _) -> Left noRecovery
toCrashReason :: Interrupt x -> Maybe T.Text
toCrashReason e | isCrash e = Just (T.pack (show e))
| otherwise = Nothing
logProcessExit
:: forall e x . (Member Logs e, HasCallStack) => Interrupt x -> Eff e ()
logProcessExit (toCrashReason -> Just ex) = withFrozenCallStack (logWarning ex)
logProcessExit ex = withFrozenCallStack (logDebug (fromString (show ex)))
executeAndResume
:: forall q r v
. (HasSafeProcesses r q, HasCallStack)
=> Process q (ResumeProcess v)
-> Eff r (Either (Interrupt 'Recoverable) v)
executeAndResume processAction = do
result <- send processAction
case result of
ResumeWith !value -> return (Right value)
Interrupted r -> return (Left r)
executeAndResumeOrExit
:: forall r q v
. (HasSafeProcesses r q, HasCallStack)
=> Process q (ResumeProcess v)
-> Eff r v
executeAndResumeOrExit processAction = do
result <- send processAction
case result of
ResumeWith !value -> return value
Interrupted r -> send (Shutdown @q (interruptToExit r))
executeAndResumeOrThrow
:: forall q r v
. (HasProcesses r q, HasCallStack)
=> Process q (ResumeProcess v)
-> Eff r v
executeAndResumeOrThrow processAction = do
result <- send processAction
case result of
ResumeWith !value -> return value
Interrupted r -> interrupt r
yieldProcess
:: forall r q
. (HasProcesses r q, HasCallStack)
=> Eff r ()
yieldProcess = executeAndResumeOrThrow YieldProcess
delay
:: forall r q
. ( HasProcesses r q
, HasCallStack
)
=> Timeout
-> Eff r ()
delay = executeAndResumeOrThrow . Delay
sendMessage
:: forall o r q
. ( HasProcesses r q
, HasCallStack
, Typeable o
, NFData o
)
=> ProcessId
-> o
-> Eff r ()
sendMessage pid message =
rnf pid `seq` toStrictDynamic message
`seq` executeAndResumeOrThrow (SendMessage pid (toStrictDynamic message))
sendAnyMessage
:: forall r q
. (HasCallStack, HasProcesses r q)
=> ProcessId
-> StrictDynamic
-> Eff r ()
sendAnyMessage pid message =
executeAndResumeOrThrow (SendMessage pid message)
sendShutdown
:: forall r q
. (HasCallStack, HasProcesses r q)
=> ProcessId
-> Interrupt 'NoRecovery
-> Eff r ()
sendShutdown pid s =
pid `deepseq` s `deepseq` executeAndResumeOrThrow (SendShutdown pid s)
sendInterrupt
:: forall r q
. (HasCallStack, HasProcesses r q)
=> ProcessId
-> Interrupt 'Recoverable
-> Eff r ()
sendInterrupt pid s =
pid `deepseq` s `deepseq` executeAndResumeOrThrow (SendInterrupt pid s)
spawn
:: forall r q
. (HasCallStack, HasProcesses r q)
=> ProcessTitle
-> Eff (Processes q) ()
-> Eff r ProcessId
spawn t child =
executeAndResumeOrThrow (Spawn @q t (provideInterruptsShutdown child))
spawn_
:: forall r q
. (HasCallStack, HasProcesses r q)
=> ProcessTitle
-> Eff (Processes q) ()
-> Eff r ()
spawn_ t child = void (spawn t child)
spawnLink
:: forall r q
. (HasCallStack, HasProcesses r q)
=> ProcessTitle
-> Eff (Processes q) ()
-> Eff r ProcessId
spawnLink t child =
executeAndResumeOrThrow (SpawnLink @q t (provideInterruptsShutdown child))
spawnRaw
:: forall r q
. (HasCallStack, HasProcesses r q)
=> ProcessTitle
-> Eff (SafeProcesses q) ()
-> Eff r ProcessId
spawnRaw t child = executeAndResumeOrThrow (Spawn @q t child)
spawnRaw_
:: forall r q
. (HasCallStack, HasProcesses r q)
=> ProcessTitle
-> Eff (SafeProcesses q) ()
-> Eff r ()
spawnRaw_ t = void . spawnRaw t
isProcessAlive
:: forall r q
. (HasCallStack, HasProcesses r q)
=> ProcessId
-> Eff r Bool
isProcessAlive pid = isJust <$> executeAndResumeOrThrow (GetProcessState pid)
getProcessState
:: forall r q
. (HasCallStack, HasProcesses r q)
=> ProcessId
-> Eff r (Maybe (ProcessTitle, ProcessDetails, ProcessState))
getProcessState pid = executeAndResumeOrThrow (GetProcessState pid)
updateProcessDetails
:: forall r q
. (HasCallStack, HasProcesses r q)
=> ProcessDetails
-> Eff r ()
updateProcessDetails pd = executeAndResumeOrThrow (UpdateProcessDetails pd)
receiveAnyMessage
:: forall r q
. (HasCallStack, HasProcesses r q)
=> Eff r StrictDynamic
receiveAnyMessage =
executeAndResumeOrThrow (ReceiveSelectedMessage selectAnyMessage)
receiveSelectedMessage
:: forall r q a
. ( HasCallStack
, Show a
, HasProcesses r q
)
=> MessageSelector a
-> Eff r a
receiveSelectedMessage f = executeAndResumeOrThrow (ReceiveSelectedMessage f)
receiveMessage
:: forall a r q
. ( HasCallStack
, Typeable a
, NFData a
, Show a
, HasProcesses r q
)
=> Eff r a
receiveMessage = receiveSelectedMessage (MessageSelector fromStrictDynamic)
flushMessages
:: forall r q
. (HasCallStack, HasProcesses r q)
=> Eff r [StrictDynamic]
flushMessages = executeAndResumeOrThrow @q FlushMessages
receiveSelectedLoop
:: forall r q a endOfLoopResult
. (HasSafeProcesses r q, HasCallStack)
=> MessageSelector a
-> (Either (Interrupt 'Recoverable) a -> Eff r (Maybe endOfLoopResult))
-> Eff r endOfLoopResult
receiveSelectedLoop selector handlers = do
mReq <- send (ReceiveSelectedMessage @q @a selector)
mRes <- case mReq of
Interrupted reason -> handlers (Left reason)
ResumeWith message -> handlers (Right message)
maybe (receiveSelectedLoop selector handlers) return mRes
receiveAnyLoop
:: forall r q endOfLoopResult
. (HasSafeProcesses r q, HasCallStack)
=> (Either (Interrupt 'Recoverable) StrictDynamic -> Eff r (Maybe endOfLoopResult))
-> Eff r endOfLoopResult
receiveAnyLoop = receiveSelectedLoop selectAnyMessage
receiveLoop
:: forall r q a endOfLoopResult
. (HasSafeProcesses r q, HasCallStack, NFData a, Typeable a)
=> (Either (Interrupt 'Recoverable) a -> Eff r (Maybe endOfLoopResult))
-> Eff r endOfLoopResult
receiveLoop = receiveSelectedLoop selectMessage
self :: (HasCallStack, HasSafeProcesses r q) => Eff r ProcessId
self = executeAndResumeOrExit SelfPid
makeReference
:: (HasCallStack, HasProcesses r q)
=> Eff r Int
makeReference = executeAndResumeOrThrow MakeReference
data MonitorReference =
MonitorReference { monitorIndex :: Int
, monitoredProcess :: ProcessId
}
deriving (Read, Eq, Ord, Generic, Typeable)
instance NFData MonitorReference
instance Show MonitorReference where
showsPrec d m = showParen
(d >= 10)
(showString "monitor: " . shows (monitorIndex m) . showChar ' ' . shows
(monitoredProcess m)
)
monitor
:: forall r q
. (HasCallStack, HasProcesses r q)
=> ProcessId
-> Eff r MonitorReference
monitor = executeAndResumeOrThrow . Monitor . force
demonitor
:: forall r q
. (HasCallStack, HasProcesses r q)
=> MonitorReference
-> Eff r ()
demonitor = executeAndResumeOrThrow . Demonitor . force
withMonitor
:: (HasCallStack, HasProcesses r q)
=> ProcessId
-> (MonitorReference -> Eff r a)
-> Eff r a
withMonitor pid e = monitor pid >>= \ref -> e ref <* demonitor ref
receiveWithMonitor
:: ( HasCallStack
, HasProcesses r q
, Typeable a
, Show a
)
=> ProcessId
-> MessageSelector a
-> Eff r (Either ProcessDown a)
receiveWithMonitor pid sel = withMonitor
pid
(\ref ->
receiveSelectedMessage (Left <$> selectProcessDown ref <|> Right <$> sel)
)
data ProcessDown =
ProcessDown
{ downReference :: !MonitorReference
, downReason :: !(Interrupt 'NoRecovery)
, downProcess :: !ProcessId
}
deriving (Typeable, Generic, Eq, Ord)
becauseProcessIsDown :: ProcessDown -> Interrupt 'Recoverable
becauseProcessIsDown = OtherProcessNotRunning . monitoredProcess . downReference
instance NFData ProcessDown
instance Show ProcessDown where
showsPrec d =
showParen (d >= 10)
. (\case
ProcessDown ref reason pid ->
showString "down: "
. shows pid
. showChar ' '
. shows ref
. showChar ' '
. showsPrec 11 reason
)
selectProcessDown :: MonitorReference -> MessageSelector ProcessDown
selectProcessDown ref0 =
filterMessage (\(ProcessDown ref _reason _pid) -> ref0 == ref)
selectProcessDownByProcessId :: ProcessId -> MessageSelector ProcessDown
selectProcessDownByProcessId pid0 =
filterMessage (\(ProcessDown _ref _reason pid) -> pid0 == pid)
linkProcess
:: forall r q
. (HasCallStack, HasProcesses r q)
=> ProcessId
-> Eff r ()
linkProcess = executeAndResumeOrThrow . Link . force
unlinkProcess
:: forall r q
. (HasCallStack, HasProcesses r q)
=> ProcessId
-> Eff r ()
unlinkProcess = executeAndResumeOrThrow . Unlink . force
exitBecause
:: forall r q a
. (HasCallStack, HasSafeProcesses r q)
=> Interrupt 'NoRecovery
-> Eff r a
exitBecause = send . Shutdown @q . force
exitNormally
:: forall r q a . (HasCallStack, HasSafeProcesses r q) => Eff r a
exitNormally = exitBecause ExitNormally
exitWithError
:: forall r q a
. (HasCallStack,HasSafeProcesses r q)
=> String
-> Eff r a
exitWithError = exitBecause . interruptToExit . ErrorInterrupt
newtype ProcessId = ProcessId { _fromProcessId :: Int }
deriving (Eq,Ord,Typeable,Bounded,Num, Enum, Integral, Real, NFData)
instance Read ProcessId where
readsPrec _ ('!' : rest1) = case reads rest1 of
[(c, rest2)] -> [(ProcessId c, rest2)]
_ -> []
readsPrec _ _ = []
instance Show ProcessId where
showsPrec _ (ProcessId !c) = showChar '!' . shows c
makeLenses ''ProcessId
sendToReceiver :: (NFData o, HasProcesses r q) => Receiver o -> o -> Eff r ()
sendToReceiver (Receiver pid serializer) message =
rnf message `seq` sendMessage pid (toStrictDynamic (serializer message))
data Receiver a =
forall out . (NFData out, Typeable out, Show out) =>
Receiver { _receiverPid :: ProcessId
, _receiverSerializer :: a -> out
}
deriving (Typeable)
instance NFData (Receiver o) where
rnf (Receiver e f) = f `seq` rnf e
instance Eq (Receiver o) where
(==) = (==) `on` _receiverPid
instance Ord (Receiver o) where
compare = compare `on` _receiverPid
instance Contravariant Receiver where
contramap f (Receiver p s) = Receiver p (s . f)
instance Typeable protocol => Show (Receiver protocol) where
showsPrec d (Receiver c _) =
showParen (d>=10)
(showSTypeRep (SomeTypeRep (Type.Reflection.typeRep @protocol)) . showsPrec 10 c)
makeLenses ''Receiver