Safe Haskell | None |
---|---|
Language | Haskell2010 |
Erlang style processes with message passing concurrency based on
(more) extensible-effects
.
Synopsis
- newtype ProcessId = ProcessId {}
- data ProcessDown = ProcessDown {}
- data MonitorReference = MonitorReference {}
- data SomeExitReason where
- SomeExitReason :: ExitReason x -> SomeExitReason
- type InterruptableProcess e = Interrupts ': ConsProcess e
- type Interrupts = Exc InterruptReason
- type InterruptReason = ExitReason Recoverable
- data ExitReason (t :: ExitRecovery) where
- ProcessFinished :: ExitReason Recoverable
- ProcessNotRunning :: ProcessId -> ExitReason Recoverable
- LinkedProcessCrashed :: ProcessId -> ExitReason Recoverable
- ProcessError :: String -> ExitReason Recoverable
- ExitNormally :: ExitReason NoRecovery
- NotRecovered :: ExitReason Recoverable -> ExitReason NoRecovery
- UnexpectedException :: String -> String -> ExitReason NoRecovery
- Killed :: ExitReason NoRecovery
- data ExitSeverity
- = NormalExit
- | Crash
- data ExitRecovery
- data ProcessState
- type ConsProcess r = Process r ': r
- type HasScheduler q = ?_schedulerProxy :: SchedulerProxy q
- data SchedulerProxy :: [Type -> Type] -> Type where
- SchedulerProxy :: SchedulerProxy q
- SP :: SchedulerProxy q
- Scheduler :: SchedulerProxy q
- data MessageSelector a
- data ResumeProcess v where
- Interrupted :: InterruptReason -> ResumeProcess v
- ResumeWith :: a -> ResumeProcess a
- data Process (r :: [Type -> Type]) b where
- FlushMessages :: Process r (ResumeProcess [Dynamic])
- YieldProcess :: Process r (ResumeProcess ())
- SelfPid :: Process r (ResumeProcess ProcessId)
- Spawn :: Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId)
- SpawnLink :: Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId)
- GetProcessState :: ProcessId -> Process r (ResumeProcess (Maybe ProcessState))
- Shutdown :: ExitReason NoRecovery -> Process r a
- SendShutdown :: ProcessId -> ExitReason NoRecovery -> Process r (ResumeProcess ())
- SendInterrupt :: ProcessId -> InterruptReason -> Process r (ResumeProcess ())
- SendMessage :: ProcessId -> Dynamic -> Process r (ResumeProcess ())
- ReceiveSelectedMessage :: forall r a. MessageSelector a -> Process r (ResumeProcess a)
- MakeReference :: Process r (ResumeProcess Int)
- Monitor :: ProcessId -> Process r (ResumeProcess MonitorReference)
- Demonitor :: MonitorReference -> Process r (ResumeProcess ())
- Link :: ProcessId -> Process r (ResumeProcess ())
- Unlink :: ProcessId -> Process r (ResumeProcess ())
- selectMessage :: (NFData t, Typeable t) => MessageSelector t
- selectMessageLazy :: Typeable t => MessageSelector t
- filterMessage :: (Typeable a, NFData a) => (a -> Bool) -> MessageSelector a
- filterMessageLazy :: Typeable a => (a -> Bool) -> MessageSelector a
- selectMessageWith :: (Typeable a, NFData b) => (a -> Maybe b) -> MessageSelector b
- selectMessageWithLazy :: Typeable a => (a -> Maybe b) -> MessageSelector b
- selectDynamicMessage :: NFData a => (Dynamic -> Maybe a) -> MessageSelector a
- selectDynamicMessageLazy :: (Dynamic -> Maybe a) -> MessageSelector a
- selectAnyMessageLazy :: MessageSelector Dynamic
- selectMessageProxy :: forall proxy t. (NFData t, Typeable t) => proxy t -> MessageSelector t
- selectMessageProxyLazy :: forall proxy t. Typeable t => proxy t -> MessageSelector t
- getSchedulerProxy :: HasScheduler q => SchedulerProxy q
- withSchedulerProxy :: SchedulerProxy q -> (HasScheduler q => a) -> a
- thisSchedulerProxy :: Eff (Process r ': r) (SchedulerProxy r)
- toExitRecovery :: ExitReason r -> ExitRecovery
- toExitSeverity :: ExitReason e -> ExitSeverity
- isBecauseDown :: Maybe ProcessId -> ExitReason r -> Bool
- provideInterruptsShutdown :: forall e a. Eff (InterruptableProcess e) a -> Eff (ConsProcess e) a
- handleInterrupts :: (HasCallStack, Member Interrupts r) => (InterruptReason -> Eff r a) -> Eff r a -> Eff r a
- tryUninterrupted :: (HasCallStack, Member Interrupts r) => Eff r a -> Eff r (Either InterruptReason a)
- logInterrupts :: (HasCallStack, '[Interrupts, Logs LogMessage] <:: r) => Eff r () -> Eff r ()
- exitOnInterrupt :: (HasCallStack, Member Interrupts r, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -> Eff r a
- provideInterrupts :: HasCallStack => Eff (Interrupts ': r) a -> Eff r (Either InterruptReason a)
- mergeEitherInterruptAndExitReason :: Either InterruptReason (ExitReason NoRecovery) -> ExitReason NoRecovery
- interrupt :: (HasCallStack, Member Interrupts r) => InterruptReason -> Eff r a
- isCrash :: ExitReason x -> Bool
- isRecoverable :: ExitReason x -> Bool
- fromSomeExitReason :: SomeExitReason -> Either (ExitReason NoRecovery) InterruptReason
- toCrashReason :: ExitReason x -> Maybe String
- logProcessExit :: (HasCallStack, Member (Logs LogMessage) e) => ExitReason x -> Eff e ()
- executeAndResume :: forall q r v. (SetMember Process (Process q) r, HasCallStack) => Process q (ResumeProcess v) -> Eff r (Either (ExitReason Recoverable) v)
- executeAndResumeOrExit :: forall r q v. (SetMember Process (Process q) r, HasCallStack) => Process q (ResumeProcess v) -> Eff r v
- executeAndResumeOrThrow :: forall q r v. (SetMember Process (Process q) r, HasCallStack, Member Interrupts r) => Process q (ResumeProcess v) -> Eff r v
- yieldProcess :: forall r q. (SetMember Process (Process q) r, HasCallStack, Member Interrupts r) => SchedulerProxy q -> Eff r ()
- sendMessage :: forall r q o. (SetMember Process (Process q) r, HasCallStack, Member Interrupts r, Typeable o) => SchedulerProxy q -> ProcessId -> o -> Eff r ()
- sendAnyMessage :: forall r q. (SetMember Process (Process q) r, HasCallStack, Member Interrupts r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r ()
- sendShutdown :: forall r q. (SetMember Process (Process q) r, HasCallStack, Member Interrupts r) => SchedulerProxy q -> ProcessId -> ExitReason NoRecovery -> Eff r ()
- sendInterrupt :: forall r q. (SetMember Process (Process q) r, HasCallStack, Member Interrupts r) => SchedulerProxy q -> ProcessId -> InterruptReason -> Eff r ()
- spawn :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => Eff (InterruptableProcess q) () -> Eff r ProcessId
- spawn_ :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => Eff (InterruptableProcess q) () -> Eff r ()
- spawnLink :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => Eff (InterruptableProcess q) () -> Eff r ProcessId
- spawnRaw :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => Eff (ConsProcess q) () -> Eff r ProcessId
- spawnRaw_ :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => Eff (ConsProcess q) () -> Eff r ()
- isProcessAlive :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> ProcessId -> Eff r Bool
- receiveAnyMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Eff r Dynamic
- receiveSelectedMessage :: forall r q a. (HasCallStack, Show a, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> MessageSelector a -> Eff r a
- receiveMessage :: forall a r q. (HasCallStack, Typeable a, Show a, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Eff r a
- flushMessages :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r, HasScheduler q) => Eff r [Dynamic]
- receiveSelectedLoop :: forall r q a endOfLoopResult. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> MessageSelector a -> (Either InterruptReason a -> Eff r (Maybe endOfLoopResult)) -> Eff r endOfLoopResult
- receiveAnyLoop :: forall r q endOfLoopResult. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> (Either InterruptReason Dynamic -> Eff r (Maybe endOfLoopResult)) -> Eff r endOfLoopResult
- receiveLoop :: forall r q a endOfLoopResult. (SetMember Process (Process q) r, HasCallStack, Typeable a) => SchedulerProxy q -> (Either InterruptReason a -> Eff r (Maybe endOfLoopResult)) -> Eff r endOfLoopResult
- self :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ProcessId
- makeReference :: (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Eff r Int
- monitor :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> ProcessId -> Eff r MonitorReference
- demonitor :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> MonitorReference -> Eff r ()
- withMonitor :: (HasCallStack, Member Interrupts r, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> ProcessId -> (MonitorReference -> Eff r a) -> Eff r a
- receiveWithMonitor :: (HasCallStack, Member Interrupts r, SetMember Process (Process q) r, Member Interrupts r, Typeable a, Show a) => SchedulerProxy q -> ProcessId -> MessageSelector a -> Eff r (Either ProcessDown a)
- becauseProcessIsDown :: ProcessDown -> InterruptReason
- selectProcessDown :: MonitorReference -> MessageSelector ProcessDown
- linkProcess :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> ProcessId -> Eff r ()
- unlinkProcess :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> ProcessId -> Eff r ()
- exitBecause :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ExitReason NoRecovery -> Eff r a
- exitNormally :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a
- exitWithError :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r a
- fromProcessId :: Iso' ProcessId Int
- data TimerElapsed
- data TimerReference
- data Timeout
- receiveAfter :: forall a r q. (Lifted IO q, HasCallStack, SetMember Process (Process q) r, Member Interrupts r, Typeable a, NFData a, Show a) => SchedulerProxy q -> Timeout -> Eff r (Maybe a)
- receiveSelectedAfter :: forall a r q. (Lifted IO q, HasCallStack, SetMember Process (Process q) r, Member Interrupts r, Show a) => SchedulerProxy q -> MessageSelector a -> Timeout -> Eff r (Either TimerElapsed a)
- selectTimerElapsed :: TimerReference -> MessageSelector TimerElapsed
- sendAfter :: forall r q message. (Lifted IO q, HasCallStack, SetMember Process (Process q) r, Member Interrupts r, Typeable message, NFData message) => SchedulerProxy q -> ProcessId -> Timeout -> (TimerReference -> message) -> Eff r TimerReference
- startTimer :: forall r q. (Lifted IO q, HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Timeout -> Eff r TimerReference
- newtype Server api = Server {}
- data Synchronicity
- data family Api (api :: Type) (reply :: Synchronicity)
- fromServer :: forall api api. Iso (Server api) (Server api) ProcessId ProcessId
- proxyAsServer :: proxy api -> ProcessId -> Server api
- asServer :: forall api. ProcessId -> Server api
- type ServerReader o = Reader (Server o)
- type ServesApi o r q = (Typeable o, SetMember Process (Process q) r, Member (ServerReader o) r)
- cast :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r, Typeable o, Typeable (Api o Asynchronous)) => SchedulerProxy q -> Server o -> Api o Asynchronous -> Eff r ()
- call :: forall result api r q. (SetMember Process (Process q) r, Member Interrupts r, Typeable api, Typeable (Api api (Synchronous result)), Typeable result, HasCallStack, NFData result, Show result) => SchedulerProxy q -> Server api -> Api api (Synchronous result) -> Eff r result
- registerServer :: HasCallStack => Server o -> Eff (ServerReader o ': r) a -> Eff r a
- whereIsServer :: Member (ServerReader o) e => Eff e (Server o)
- callRegistered :: (Typeable reply, ServesApi o r q, HasCallStack, NFData reply, Show reply, Member Interrupts r) => SchedulerProxy q -> Api o (Synchronous reply) -> Eff r reply
- castRegistered :: (Typeable o, ServesApi o r q, HasCallStack, Member Interrupts r) => SchedulerProxy q -> Api o Asynchronous -> Eff r ()
- data ApiServerCmd where
- data ApiHandler api eff where
- ApiHandler :: {..} -> ApiHandler api eff
- apiHandler :: (Api api Asynchronous -> Eff e ApiServerCmd) -> (forall r. Api api (Synchronous r) -> (r -> Eff e ()) -> Eff e ApiServerCmd) -> (ExitReason Recoverable -> Eff e ()) -> ApiHandler api e
- apiHandlerForever :: (Api api Asynchronous -> Eff e ()) -> (forall r. Api api (Synchronous r) -> (r -> Eff e ()) -> Eff e ()) -> (ExitReason Recoverable -> Eff e ()) -> ApiHandler api e
- castHandler :: (Api api Asynchronous -> Eff eff ApiServerCmd) -> ApiHandler api eff
- castHandlerForever :: (Api api Asynchronous -> Eff eff ()) -> ApiHandler api eff
- callHandler :: (forall r. Api api (Synchronous r) -> (r -> Eff e ()) -> Eff e ApiServerCmd) -> ApiHandler api e
- callHandlerForever :: (forall r. Api api (Synchronous r) -> (r -> Eff e ()) -> Eff e ()) -> ApiHandler api e
- castAndCallHandler :: (Api api Asynchronous -> Eff e ApiServerCmd) -> (forall r. Api api (Synchronous r) -> (r -> Eff e ()) -> Eff e ApiServerCmd) -> ApiHandler api e
- castAndCallHandlerForever :: (Api api Asynchronous -> Eff e ()) -> (forall r. Api api (Synchronous r) -> (r -> Eff e ()) -> Eff e ()) -> ApiHandler api e
- data ServerCallback eff = ServerCallback {
- _requestHandlerSelector :: MessageSelector (Eff eff ApiServerCmd)
- _terminationHandler :: ExitReason Recoverable -> Eff eff ()
- callCallback :: forall api eff reply. Getter (ApiHandler api eff) (Maybe (Api api (Synchronous reply) -> (reply -> Eff eff ()) -> Eff eff ApiServerCmd))
- castCallback :: forall api eff. Lens' (ApiHandler api eff) (Maybe (Api api Asynchronous -> Eff eff ApiServerCmd))
- terminateCallback :: forall api eff. Lens' (ApiHandler api eff) (Maybe (ExitReason Recoverable -> Eff eff ()))
- requestHandlerSelector :: forall eff. Lens' (ServerCallback eff) (MessageSelector (Eff eff ApiServerCmd))
- terminationHandler :: forall eff. Lens' (ServerCallback eff) (ExitReason Recoverable -> Eff eff ())
- serve :: forall a effScheduler. (Servable a, SetMember Process (Process effScheduler) (ServerEff a), Member Interrupts (ServerEff a), HasCallStack) => SchedulerProxy effScheduler -> a -> Eff (ServerEff a) ()
- spawnServer :: forall a effScheduler eff. (Servable a, ServerEff a ~ InterruptableProcess effScheduler, SetMember Process (Process effScheduler) eff, Member Interrupts eff, HasCallStack) => SchedulerProxy effScheduler -> a -> Eff eff (ServerPids a)
- spawnServerWithEffects :: forall a effScheduler eff. (Servable a, SetMember Process (Process effScheduler) (ServerEff a), SetMember Process (Process effScheduler) eff, Member Interrupts eff, Member Interrupts (ServerEff a), HasCallStack) => SchedulerProxy effScheduler -> a -> (Eff (ServerEff a) () -> Eff (InterruptableProcess effScheduler) ()) -> Eff eff (ServerPids a)
- unhandledCallError :: forall p x r q. (Typeable p, HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Api p (Synchronous x) -> (x -> Eff r ()) -> Eff r ApiServerCmd
- unhandledCastError :: forall p r q. (Typeable p, HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Api p Asynchronous -> Eff r ApiServerCmd
- defaultTermination :: forall q r. (HasCallStack, SetMember Process (Process q) r, Member (Logs LogMessage) r) => SchedulerProxy q -> ExitReason Recoverable -> Eff r ()
- data Observing o
- data InterruptCallback eff where
- InterruptCallback :: (InterruptReason -> Eff eff CallbackResult) -> InterruptCallback eff
- class ToServerPids (t :: k) where
- type ServerPids t
- toServerPids :: proxy t -> ProcessId -> ServerPids t
- data MessageCallback api eff where
- MessageCallback :: MessageSelector a -> (a -> Eff eff CallbackResult) -> MessageCallback api eff
- data CallbackResult where
- spawnApiServer :: forall api eff. (ToServerPids api, HasCallStack) => MessageCallback api (InterruptableProcess eff) -> InterruptCallback (ConsProcess eff) -> Eff (InterruptableProcess eff) (ServerPids api)
- spawnApiServerStateful :: forall api eff state. (HasCallStack, ToServerPids api) => Eff (InterruptableProcess eff) state -> MessageCallback api (State state ': InterruptableProcess eff) -> InterruptCallback (State state ': ConsProcess eff) -> Eff (InterruptableProcess eff) (ServerPids api)
- spawnApiServerEffectful :: forall api eff serverEff. (HasCallStack, ToServerPids api, Member Interrupts serverEff, SetMember Process (Process eff) serverEff) => (forall b. Eff serverEff b -> Eff (InterruptableProcess eff) b) -> MessageCallback api serverEff -> InterruptCallback serverEff -> Eff (InterruptableProcess eff) (ServerPids api)
- handleMessages :: forall eff a. (HasCallStack, NFData a, Typeable a) => (a -> Eff eff CallbackResult) -> MessageCallback '[] eff
- handleSelectedMessages :: forall eff a. HasCallStack => MessageSelector a -> (a -> Eff eff CallbackResult) -> MessageCallback '[] eff
- handleAnyMessages :: forall eff. HasCallStack => (Dynamic -> Eff eff CallbackResult) -> MessageCallback '[] eff
- handleCasts :: forall api eff. (HasCallStack, Typeable api, Typeable (Api api Asynchronous)) => (Api api Asynchronous -> Eff eff CallbackResult) -> MessageCallback api eff
- handleCalls :: forall api eff effScheduler. (HasCallStack, Typeable api, SetMember Process (Process effScheduler) eff, Member Interrupts eff) => SchedulerProxy effScheduler -> (forall secret reply. (Typeable reply, Typeable (Api api (Synchronous reply))) => Api api (Synchronous reply) -> (Eff eff (Maybe reply, CallbackResult) -> secret) -> secret) -> MessageCallback api eff
- handleCastsAndCalls :: forall api eff effScheduler. (HasCallStack, Typeable api, Typeable (Api api Asynchronous), SetMember Process (Process effScheduler) eff, Member Interrupts eff) => SchedulerProxy effScheduler -> (Api api Asynchronous -> Eff eff CallbackResult) -> (forall secret reply. (Typeable reply, Typeable (Api api (Synchronous reply))) => Api api (Synchronous reply) -> (Eff eff (Maybe reply, CallbackResult) -> secret) -> secret) -> MessageCallback api eff
- handleProcessDowns :: forall eff. HasCallStack => (MonitorReference -> Eff eff CallbackResult) -> MessageCallback '[] eff
- (^:) :: forall (api1 :: Type) (apis2 :: [Type]) eff. HasCallStack => MessageCallback api1 eff -> MessageCallback apis2 eff -> MessageCallback (api1 ': apis2) eff
- fallbackHandler :: forall api eff. HasCallStack => MessageCallback api eff -> MessageCallback '[] eff
- dropUnhandledMessages :: forall eff. HasCallStack => MessageCallback '[] eff
- exitOnUnhandled :: forall eff. HasCallStack => MessageCallback '[] eff
- logUnhandledMessages :: forall eff. (Member (Logs LogMessage) eff, HasCallStack) => MessageCallback '[] eff
- stopServerOnInterrupt :: forall eff. HasCallStack => InterruptCallback eff
- handleObservations :: Typeable o => (Server o -> Observation o -> Eff e CallbackResult) -> MessageCallback (Observing o) e
- data CallbackObserver o
- type ObserverState o = State (Observers o)
- data Observers o
- data SomeObserver o where
- SomeObserver :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o
- class (Typeable o, Typeable (Observation o)) => Observable o where
- data Observation o
- registerObserverMessage :: SomeObserver o -> Api o Asynchronous
- forgetObserverMessage :: SomeObserver o -> Api o Asynchronous
- class (Typeable p, Observable o) => Observer p o where
- observationMessage :: Server o -> Observation o -> Api p Asynchronous
- notifyObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack, Member Interrupts r) => SchedulerProxy q -> Server p -> Server o -> Observation o -> Eff r ()
- registerObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack, Member Interrupts r) => SchedulerProxy q -> Server p -> Server o -> Eff r ()
- forgetObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, Member Interrupts r) => SchedulerProxy q -> Server p -> Server o -> Eff r ()
- notifySomeObserver :: (SetMember Process (Process q) r, Observable o, HasCallStack, Member Interrupts r) => SchedulerProxy q -> Server o -> Observation o -> SomeObserver o -> Eff r ()
- manageObservers :: Eff (ObserverState o ': r) a -> Eff r a
- addObserver :: (SetMember Process (Process q) r, Member (ObserverState o) r, Observable o) => SomeObserver o -> Eff r ()
- removeObserver :: (SetMember Process (Process q) r, Member (ObserverState o) r, Observable o, Member Interrupts r) => SomeObserver o -> Eff r ()
- notifyObservers :: forall o r q. (Observable o, SetMember Process (Process q) r, Member (ObserverState o) r, Member Interrupts r) => SchedulerProxy q -> Observation o -> Eff r ()
- spawnCallbackObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q, Member Interrupts r, HasCallStack) => SchedulerProxy q -> (Server o -> Observation o -> Eff (InterruptableProcess q) ApiServerCmd) -> Eff r (Server (CallbackObserver o))
- spawnLoggingObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q, Member (Logs LogMessage) r, Member Interrupts r, HasCallStack) => SchedulerProxy q -> Eff r (Server (CallbackObserver o))
- type ObservationQueueReader a = Reader (ObservationQueue a)
- data ObservationQueue a
- readObservationQueue :: forall o r. (Member (ObservationQueueReader o) r, HasCallStack, MonadIO (Eff r), Typeable o, HasLogging IO r) => Eff r (Observation o)
- tryReadObservationQueue :: forall o r. (Member (ObservationQueueReader o) r, HasCallStack, MonadIO (Eff r), Typeable o, HasLogging IO r) => Eff r (Maybe (Observation o))
- flushObservationQueue :: forall o r. (Member (ObservationQueueReader o) r, HasCallStack, MonadIO (Eff r), Typeable o, HasLogging IO r) => Eff r [Observation o]
- enqueueObservationsRegistered :: forall o r q a. (ServesApi o r q, SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, HasLogging IO q, HasLogging IO r, Member Interrupts r, Lifted IO r, HasCallStack) => SchedulerProxy q -> Int -> Eff (ObservationQueueReader o ': r) a -> Eff r a
- enqueueObservations :: forall o r q a. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, HasLogging IO r, HasLogging IO q, Member Interrupts r, Lifted IO q, HasCallStack) => SchedulerProxy q -> Server o -> Int -> Eff (ObservationQueueReader o ': r) a -> Eff r a
- type SchedulerIO = Reader SchedulerState ': LoggingAndIO
- type HasSchedulerIO r = (HasCallStack, Lifted IO r, SchedulerIO <:: r)
- type InterruptableProcEff = InterruptableProcess SchedulerIO
- type ProcEff = ConsProcess SchedulerIO
- defaultMain :: HasCallStack => Eff InterruptableProcEff () -> IO ()
- defaultMainWithLogChannel :: HasCallStack => Eff InterruptableProcEff () -> LogChannel LogMessage -> IO ()
- forkIoScheduler :: SchedulerProxy SchedulerIO
- schedule :: (HasLogging IO SchedulerIO, HasCallStack) => Eff InterruptableProcEff () -> Eff LoggingAndIO ()
- schedulePure :: Eff (InterruptableProcess '[Logs LogMessage]) a -> Either (ExitReason NoRecovery) a
- singleThreadedIoScheduler :: SchedulerProxy LoggingAndIo
- defaultMainSingleThreaded :: HasCallStack => Eff (InterruptableProcess '[Logs LogMessage, LogWriterReader LogMessage IO, Lift IO]) () -> IO ()
- module Control.Eff.Log
- module Control.Eff.Loop
Concurrent Processes with Message Passing Concurrency
Each process is identified by a single process id, that stays constant throughout the life cycle of a process. Also, message sending relies on these values to address messages to processes.
Instances
Bounded ProcessId Source # | |
Enum ProcessId Source # | |
Defined in Control.Eff.Concurrent.Process succ :: ProcessId -> ProcessId # pred :: ProcessId -> ProcessId # fromEnum :: ProcessId -> Int # enumFrom :: ProcessId -> [ProcessId] # enumFromThen :: ProcessId -> ProcessId -> [ProcessId] # enumFromTo :: ProcessId -> ProcessId -> [ProcessId] # enumFromThenTo :: ProcessId -> ProcessId -> ProcessId -> [ProcessId] # | |
Eq ProcessId Source # | |
Integral ProcessId Source # | |
Defined in Control.Eff.Concurrent.Process | |
Num ProcessId Source # | |
Defined in Control.Eff.Concurrent.Process | |
Ord ProcessId Source # | |
Defined in Control.Eff.Concurrent.Process | |
Read ProcessId Source # | |
Real ProcessId Source # | |
Defined in Control.Eff.Concurrent.Process toRational :: ProcessId -> Rational # | |
Show ProcessId Source # | |
NFData ProcessId Source # | |
Defined in Control.Eff.Concurrent.Process |
data ProcessDown Source #
A monitored process exited.
This message is sent to a process by the scheduler, when
a process that was monitored via a SchedulerCommand
died.
Since: 0.12.0
Instances
data MonitorReference Source #
A value that contains a unique reference of a process monitoring.
Since: 0.12.0
Instances
data SomeExitReason where Source #
An existential wrapper around ExitReason
SomeExitReason :: ExitReason x -> SomeExitReason |
Instances
Eq SomeExitReason Source # | |
Defined in Control.Eff.Concurrent.Process (==) :: SomeExitReason -> SomeExitReason -> Bool # (/=) :: SomeExitReason -> SomeExitReason -> Bool # | |
Ord SomeExitReason Source # | |
Defined in Control.Eff.Concurrent.Process compare :: SomeExitReason -> SomeExitReason -> Ordering # (<) :: SomeExitReason -> SomeExitReason -> Bool # (<=) :: SomeExitReason -> SomeExitReason -> Bool # (>) :: SomeExitReason -> SomeExitReason -> Bool # (>=) :: SomeExitReason -> SomeExitReason -> Bool # max :: SomeExitReason -> SomeExitReason -> SomeExitReason # min :: SomeExitReason -> SomeExitReason -> SomeExitReason # | |
Show SomeExitReason Source # | |
Defined in Control.Eff.Concurrent.Process showsPrec :: Int -> SomeExitReason -> ShowS # show :: SomeExitReason -> String # showList :: [SomeExitReason] -> ShowS # | |
NFData SomeExitReason Source # | |
Defined in Control.Eff.Concurrent.Process rnf :: SomeExitReason -> () # |
type InterruptableProcess e = Interrupts ': ConsProcess e Source #
This adds a layer of the Interrupts
effect ontop of ConsProcess
type Interrupts = Exc InterruptReason Source #
Exc
eptions containing InterruptReason
s.
See handleInterrupts
, exitOnInterrupt
or provideInterrupts
type InterruptReason = ExitReason Recoverable Source #
ExitReason
s which are recoverable are interrupts.
data ExitReason (t :: ExitRecovery) where Source #
A sum-type with reasons for why a process exists the scheduling loop, this includes errors, that can occur when scheduleing messages.
ProcessFinished :: ExitReason Recoverable | A process has finished a unit of work and might exit or work on
something else. This is primarily used for interupting infinite
server loops, allowing for additional cleanup work before
exitting (e.g. with Since: 0.13.2 |
ProcessNotRunning :: ProcessId -> ExitReason Recoverable | A process that should be running was not running. |
LinkedProcessCrashed :: ProcessId -> ExitReason Recoverable | A linked process is down |
ProcessError :: String -> ExitReason Recoverable | An exit reason that has an error message but isn't |
ExitNormally :: ExitReason NoRecovery | A process function returned or exited without any error. |
NotRecovered :: ExitReason Recoverable -> ExitReason NoRecovery | An unhandled |
UnexpectedException :: String -> String -> ExitReason NoRecovery | An unexpected runtime exception was thrown, i.e. an exception
derived from |
Killed :: ExitReason NoRecovery | A process was cancelled (e.g. killed, in |
Instances
data ExitSeverity Source #
This value indicates wether a process exited in way consistent with the planned behaviour or not.
Instances
data ExitRecovery Source #
This kind is used to indicate if a ExitReason
can be treated like
a short interrupt which can be handled or ignored.
Instances
data ProcessState Source #
The state that a Process
is currently in.
ProcessBooting | The process has just been started but not
called |
ProcessIdle | The process yielded it's timeslice |
ProcessBusy | The process is busy with non-blocking |
ProcessBusySending | The process is busy with sending a message |
ProcessBusySendingShutdown | The process is busy with killing |
ProcessBusySendingInterrupt | The process is busy with killing |
ProcessBusyReceiving | The process blocked by a |
ProcessBusyLinking | The process blocked by a |
ProcessBusyUnlinking | The process blocked by a |
ProcessBusyMonitoring | The process blocked by a |
ProcessBusyDemonitoring | The process blocked by a |
ProcessInterrupted | The process was interrupted |
ProcessShuttingDown | The process was shutdown or crashed |
Instances
type ConsProcess r = Process r ': r Source #
Cons Process
onto a list of effects.
type HasScheduler q = ?_schedulerProxy :: SchedulerProxy q Source #
A constraint for the implicit SchedulerProxy
parameter.
Use getSchedulerProxy
to query it. _EXPERIMENTAL_
Since: 0.12.0
data SchedulerProxy :: [Type -> Type] -> Type where Source #
Every function for Process
things needs such a proxy value
for the low-level effect list, i.e. the effects identified by
r
in
, this might be dependent on the
scheduler implementation.Process
r : r
SchedulerProxy :: SchedulerProxy q | Tell the typechecker what effects we have below |
SP :: SchedulerProxy q | Like |
Scheduler :: SchedulerProxy q | Like |
data MessageSelector a Source #
A function that deciced if the next message will be received by
ReceiveSelectedMessage
. It conveniently is an instance of
Alternative
so the message selector can be combined:
>
> selectInt :: MessageSelector Int
> selectInt = selectMessage
>
> selectString :: MessageSelector String
> selectString = selectMessage
>
> selectIntOrString :: MessageSelector (Either Int String)
> selectIntOrString =
> Left $ selectTimeout| Right $ selectString
Instances
data ResumeProcess v where Source #
Every Process
action returns it's actual result wrapped in this type. It
will allow to signal errors as well as pass on normal results such as
incoming messages.
Interrupted :: InterruptReason -> ResumeProcess v | The current operation of the process was interrupted with a
|
ResumeWith :: a -> ResumeProcess a | The process may resume to do work, using the given result. |
Instances
data Process (r :: [Type -> Type]) b where Source #
The process effect is the basis for message passing concurrency. This effect describes an interface for concurrent, communicating isolated processes identified uniquely by a process-id.
Processes can raise exceptions that can be caught, exit gracefully or with an error, or be killed by other processes, with the option of ignoring the shutdown request.
Process Scheduling is implemented in different modules. All scheduler implementations should follow some basic rules:
- fair scheduling
- sending a message does not block
- receiving a message does block
- spawning a child blocks only a very moment
- a newly spawned process shall be scheduled before the parent process after
- the spawnRaw
- when the first process exists, all process should be killed immediately
FlushMessages :: Process r (ResumeProcess [Dynamic]) | Remove all messages from the process' message queue |
YieldProcess :: Process r (ResumeProcess ()) | In cooperative schedulers, this will give processing time to the scheduler. Every other operation implicitly serves the same purpose. Since: 0.12.0 |
SelfPid :: Process r (ResumeProcess ProcessId) | Return the current |
Spawn :: Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId) | Start a new process, the new process will execute an effect, the function
will return immediately with a |
SpawnLink :: Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId) | Start a new process, and Since: 0.12.0 |
GetProcessState :: ProcessId -> Process r (ResumeProcess (Maybe ProcessState)) | Get the process state (or |
Shutdown :: ExitReason NoRecovery -> Process r a | Shutdown the process; irregardles of the exit reason, this function never returns, |
SendShutdown :: ProcessId -> ExitReason NoRecovery -> Process r (ResumeProcess ()) | Raise an error, that can be handled. |
SendInterrupt :: ProcessId -> InterruptReason -> Process r (ResumeProcess ()) | Request that another a process interrupts. The targeted process is interrupted
and gets an |
SendMessage :: ProcessId -> Dynamic -> Process r (ResumeProcess ()) | Send a message to a process addressed by the |
ReceiveSelectedMessage :: forall r a. MessageSelector a -> Process r (ResumeProcess a) | Receive a message that matches a criterium.
This should block until an a message was received. The message is returned
as a |
MakeReference :: Process r (ResumeProcess Int) | Generate a unique |
Monitor :: ProcessId -> Process r (ResumeProcess MonitorReference) | Monitor another process. When the monitored process exits a
Since: 0.12.0 |
Demonitor :: MonitorReference -> Process r (ResumeProcess ()) | Remove a monitor. Since: 0.12.0 |
Link :: ProcessId -> Process r (ResumeProcess ()) | Connect the calling process to another process, such that
if one of the processes crashes (i.e. Since: 0.12.0 |
Unlink :: ProcessId -> Process r (ResumeProcess ()) | Unlink the calling proccess from the other process. Since: 0.12.0 |
selectMessage :: (NFData t, Typeable t) => MessageSelector t Source #
Create a message selector for a value that can be obtained by fromDynamic
.
It will also force
the result.
Since: 0.9.1
selectMessageLazy :: Typeable t => MessageSelector t Source #
Create a message selector for a value that can be obtained by fromDynamic
.
It will also force
the result.
Since: 0.9.1
filterMessage :: (Typeable a, NFData a) => (a -> Bool) -> MessageSelector a Source #
Create a message selector from a predicate. It will force
the result.
Since: 0.9.1
filterMessageLazy :: Typeable a => (a -> Bool) -> MessageSelector a Source #
Create a message selector from a predicate. It will force
the result.
Since: 0.9.1
selectMessageWith :: (Typeable a, NFData b) => (a -> Maybe b) -> MessageSelector b Source #
Select a message of type a
and apply the given function to it.
If the function returns Just
The ReceiveSelectedMessage
function will
return the result (sans Maybe
). It will force
the result.
Since: 0.9.1
selectMessageWithLazy :: Typeable a => (a -> Maybe b) -> MessageSelector b Source #
Select a message of type a
and apply the given function to it.
If the function returns Just
The ReceiveSelectedMessage
function will
return the result (sans Maybe
). It will force
the result.
Since: 0.9.1
selectDynamicMessage :: NFData a => (Dynamic -> Maybe a) -> MessageSelector a Source #
Create a message selector. It will force
the result.
Since: 0.9.1
selectDynamicMessageLazy :: (Dynamic -> Maybe a) -> MessageSelector a Source #
Create a message selector.
Since: 0.9.1
selectAnyMessageLazy :: MessageSelector Dynamic Source #
Create a message selector that will match every message. This is lazy
because the result is not force
ed.
Since: 0.9.1
selectMessageProxy :: forall proxy t. (NFData t, Typeable t) => proxy t -> MessageSelector t Source #
Create a message selector for a value that can be obtained by fromDynamic
with a proxy argument. It will also force
the result.
Since: 0.9.1
selectMessageProxyLazy :: forall proxy t. Typeable t => proxy t -> MessageSelector t Source #
Create a message selector for a value that can be obtained by fromDynamic
with a proxy argument. It will also force
the result.
Since: 0.9.1
getSchedulerProxy :: HasScheduler q => SchedulerProxy q Source #
Get access to the SchedulerProxy
for the current scheduler effects.
_EXPERIMENTAL_
Since: 0.12.0
withSchedulerProxy :: SchedulerProxy q -> (HasScheduler q => a) -> a Source #
Set the SchedulerProxy
to use, this satisfies HasScheduler
.
_EXPERIMENTAL_
Since: 0.12.0
thisSchedulerProxy :: Eff (Process r ': r) (SchedulerProxy r) Source #
Return a SchedulerProxy
for a Process
effect.
toExitRecovery :: ExitReason r -> ExitRecovery Source #
Get the ExitRecover
y
toExitSeverity :: ExitReason e -> ExitSeverity Source #
Get the ExitSeverity
of a ExitReason
.
isBecauseDown :: Maybe ProcessId -> ExitReason r -> Bool Source #
A predicate for linked process crashes.
provideInterruptsShutdown :: forall e a. Eff (InterruptableProcess e) a -> Eff (ConsProcess e) a Source #
Handle all InterruptReason
s of an InterruptableProcess
by
wrapping them up in NotRecovered
and then do a process Shutdown
.
handleInterrupts :: (HasCallStack, Member Interrupts r) => (InterruptReason -> Eff r a) -> Eff r a -> Eff r a Source #
Handle InterruptReason
s arising during process operations, e.g.
when a linked process crashes while we wait in a receiveSelectedMessage
via a call to interrupt
.
tryUninterrupted :: (HasCallStack, Member Interrupts r) => Eff r a -> Eff r (Either InterruptReason a) Source #
Like handleInterrupts
, but instead of passing the InterruptReason
to a handler function, Either
is returned.
Since: 0.13.2
logInterrupts :: (HasCallStack, '[Interrupts, Logs LogMessage] <:: r) => Eff r () -> Eff r () Source #
Handle interrupts by logging them with logProcessExit
and otherwise
ignoring them.
exitOnInterrupt :: (HasCallStack, Member Interrupts r, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -> Eff r a Source #
Handle InterruptReason
s arising during process operations, e.g.
when a linked process crashes while we wait in a receiveSelectedMessage
via a call to interrupt
.
provideInterrupts :: HasCallStack => Eff (Interrupts ': r) a -> Eff r (Either InterruptReason a) Source #
Handle InterruptReason
s arising during process operations, e.g.
when a linked process crashes while we wait in a receiveSelectedMessage
via a call to interrupt
.
mergeEitherInterruptAndExitReason :: Either InterruptReason (ExitReason NoRecovery) -> ExitReason NoRecovery Source #
Wrap all (left) InterruptReason
s into NotRecovered
and
return the (right) NoRecovery
ExitReason
s as is.
interrupt :: (HasCallStack, Member Interrupts r) => InterruptReason -> Eff r a Source #
Throw an InterruptReason
, can be handled by recoverFromInterrupt
or
exitOnInterrupt
or provideInterrupts
.
isCrash :: ExitReason x -> Bool Source #
A predicate for crashes. A crash happens when a process exits
with an ExitReason
other than ExitNormally
isRecoverable :: ExitReason x -> Bool Source #
A predicate for recoverable exit reasons. This predicate defines the
exit reasonson which functions such as executeAndResume
fromSomeExitReason :: SomeExitReason -> Either (ExitReason NoRecovery) InterruptReason Source #
Partition a SomeExitReason
back into either a NoRecovery
or a Recoverable
ExitReason
toCrashReason :: ExitReason x -> Maybe String Source #
Print a ExitReason
to Just
a formatted String
when isCrash
is True
.
This can be useful in combination with view patterns, e.g.:
logCrash :: ExitReason -> Eff e () logCrash (toCrashReason -> Just reason) = logError reason logCrash _ = return ()
Though this can be improved to:
logCrash = traverse_ logError . toCrashReason
logProcessExit :: (HasCallStack, Member (Logs LogMessage) e) => ExitReason x -> Eff e () Source #
Log the ProcessExitReaons
executeAndResume :: forall q r v. (SetMember Process (Process q) r, HasCallStack) => Process q (ResumeProcess v) -> Eff r (Either (ExitReason Recoverable) v) Source #
Execute a and action and return the result;
if the process is interrupted by an error or exception, or an explicit
shutdown from another process, or through a crash of a linked process, i.e.
whenever the exit reason satisfies isRecoverable
, return the exit reason.
executeAndResumeOrExit :: forall r q v. (SetMember Process (Process q) r, HasCallStack) => Process q (ResumeProcess v) -> Eff r v Source #
Execute a Process
action and resume the process, exit
the process when an Interrupts
was raised. Use executeAndResume
to catch
interrupts.
executeAndResumeOrThrow :: forall q r v. (SetMember Process (Process q) r, HasCallStack, Member Interrupts r) => Process q (ResumeProcess v) -> Eff r v Source #
Execute a Process
action and resume the process, exit
the process when an Interrupts
was raised. Use executeAndResume
to catch
interrupts.
yieldProcess :: forall r q. (SetMember Process (Process q) r, HasCallStack, Member Interrupts r) => SchedulerProxy q -> Eff r () Source #
Use executeAndResumeOrExit
to execute YieldProcess
. Refer to YieldProcess
for more information.
sendMessage :: forall r q o. (SetMember Process (Process q) r, HasCallStack, Member Interrupts r, Typeable o) => SchedulerProxy q -> ProcessId -> o -> Eff r () Source #
Send a message to a process addressed by the ProcessId
.
See SendMessage
.
sendAnyMessage :: forall r q. (SetMember Process (Process q) r, HasCallStack, Member Interrupts r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r () Source #
Send a Dynamic
value to a process addressed by the ProcessId
.
See SendMessage
.
sendShutdown :: forall r q. (SetMember Process (Process q) r, HasCallStack, Member Interrupts r) => SchedulerProxy q -> ProcessId -> ExitReason NoRecovery -> Eff r () Source #
Exit a process addressed by the ProcessId
. The process will exit,
it might do some cleanup, but is ultimately unrecoverable.
See SendShutdown
.
sendInterrupt :: forall r q. (SetMember Process (Process q) r, HasCallStack, Member Interrupts r) => SchedulerProxy q -> ProcessId -> InterruptReason -> Eff r () Source #
Interrupts a process addressed by the ProcessId
. The process might exit,
or it may continue.
| Like sendInterrupt
, but also return True
iff the process to exit exists.
spawn :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => Eff (InterruptableProcess q) () -> Eff r ProcessId Source #
Start a new process, the new process will execute an effect, the function
will return immediately with a ProcessId
. If the new process is
interrupted, the process will Shutdown
with the InterruptReason
wrapped in NotCovered
. For specific use cases it might be better to use
spawnRaw
.
spawn_ :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => Eff (InterruptableProcess q) () -> Eff r () Source #
Like spawn
but return ()
.
spawnLink :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => Eff (InterruptableProcess q) () -> Eff r ProcessId Source #
Start a new process, and immediately link to it.
Since: 0.12.0
spawnRaw :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => Eff (ConsProcess q) () -> Eff r ProcessId Source #
Start a new process, the new process will execute an effect, the function
will return immediately with a ProcessId
. The spawned process has only the
raw ConsProcess
effects. For non-library code spawn
might be better
suited.
spawnRaw_ :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => Eff (ConsProcess q) () -> Eff r () Source #
Like spawnRaw
but return ()
.
isProcessAlive :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> ProcessId -> Eff r Bool Source #
Return True
if the process is alive.
Since: 0.12.0
receiveAnyMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Eff r Dynamic Source #
Block until a message was received.
See ReceiveSelectedMessage
for more documentation.
receiveSelectedMessage :: forall r q a. (HasCallStack, Show a, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> MessageSelector a -> Eff r a Source #
Block until a message was received, that is not Nothing
after applying
a callback to it.
See ReceiveSelectedMessage
for more documentation.
receiveMessage :: forall a r q. (HasCallStack, Typeable a, Show a, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Eff r a Source #
Receive and cast the message to some Typeable
instance.
See ReceiveSelectedMessage
for more documentation.
This will wait for a message of the return type using receiveSelectedMessage
flushMessages :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r, HasScheduler q) => Eff r [Dynamic] Source #
Remove and return all messages currently enqueued in the process message queue.
Since: 0.12.0
receiveSelectedLoop :: forall r q a endOfLoopResult. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> MessageSelector a -> (Either InterruptReason a -> Eff r (Maybe endOfLoopResult)) -> Eff r endOfLoopResult Source #
Enter a loop to receive messages and pass them to a callback, until the
function returns Just
a result.
Only the messages of the given type will be received.
If the process is interrupted by an exception of by a SendShutdown
from
another process, with an exit reason that satisfies isRecoverable
, then
the callback will be invoked with
, otherwise the
process will be exited with the same reason using Left
ProcessExitReaon
exitBecause
.
See also ReceiveSelectedMessage
for more documentation.
receiveAnyLoop :: forall r q endOfLoopResult. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> (Either InterruptReason Dynamic -> Eff r (Maybe endOfLoopResult)) -> Eff r endOfLoopResult Source #
Like receiveSelectedLoop
but not selective.
See also selectAnyMessageLazy
, receiveSelectedLoop
.
receiveLoop :: forall r q a endOfLoopResult. (SetMember Process (Process q) r, HasCallStack, Typeable a) => SchedulerProxy q -> (Either InterruptReason a -> Eff r (Maybe endOfLoopResult)) -> Eff r endOfLoopResult Source #
Like receiveSelectedLoop
but refined to casting to a specific Typeable
using selectMessageLazy
.
self :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ProcessId Source #
Returns the ProcessId
of the current process.
makeReference :: (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Eff r Int Source #
Generate a unique Int
for the current process.
monitor :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> ProcessId -> Eff r MonitorReference Source #
Monitor another process. When the monitored process exits a
ProcessDown
is sent to the calling process.
The return value is a unique identifier for that monitor.
There can be multiple monitors on the same process,
and a message for each will be sent.
If the process is already dead, the ProcessDown
message
will be sent immediately, w.thout exit reason
Since: 0.12.0
demonitor :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> MonitorReference -> Eff r () Source #
Remove a monitor created with monitor
.
Since: 0.12.0
withMonitor :: (HasCallStack, Member Interrupts r, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> ProcessId -> (MonitorReference -> Eff r a) -> Eff r a Source #
receiveWithMonitor :: (HasCallStack, Member Interrupts r, SetMember Process (Process q) r, Member Interrupts r, Typeable a, Show a) => SchedulerProxy q -> ProcessId -> MessageSelector a -> Eff r (Either ProcessDown a) Source #
A MessageSelector
for receiving either a monitor of the
given process or another message.
Since: 0.12.0
becauseProcessIsDown :: ProcessDown -> InterruptReason Source #
Trigger an Interrupt
for a ProcessDown
message.
The reason will be ProcessNotRunning
Since: 0.12.0
selectProcessDown :: MonitorReference -> MessageSelector ProcessDown Source #
A MesssageSelector
for the ProcessDown
message of a specific
process.
Since: 0.12.0
linkProcess :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> ProcessId -> Eff r () Source #
Connect the calling process to another process, such that
if one of the processes crashes (i.e. isCrash
returns True
), the other
is shutdown with the ProcessExitReaon
LinkedProcessCrashed
.
Since: 0.12.0
unlinkProcess :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> ProcessId -> Eff r () Source #
Unlink the calling proccess from the other process.
Since: 0.12.0
exitBecause :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ExitReason NoRecovery -> Eff r a Source #
Exit the process with a ProcessExitReaon
.
exitNormally :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a Source #
Exit the process.
exitWithError :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r a Source #
Exit the process with an error.
Timers and Timeouts
data TimerElapsed Source #
A value to be sent when timer started with startTimer
has elapsed.
Since: 0.12.0
Instances
Eq TimerElapsed Source # | |
Defined in Control.Eff.Concurrent.Process.Timer (==) :: TimerElapsed -> TimerElapsed -> Bool # (/=) :: TimerElapsed -> TimerElapsed -> Bool # | |
Ord TimerElapsed Source # | |
Defined in Control.Eff.Concurrent.Process.Timer compare :: TimerElapsed -> TimerElapsed -> Ordering # (<) :: TimerElapsed -> TimerElapsed -> Bool # (<=) :: TimerElapsed -> TimerElapsed -> Bool # (>) :: TimerElapsed -> TimerElapsed -> Bool # (>=) :: TimerElapsed -> TimerElapsed -> Bool # max :: TimerElapsed -> TimerElapsed -> TimerElapsed # min :: TimerElapsed -> TimerElapsed -> TimerElapsed # | |
Show TimerElapsed Source # | |
Defined in Control.Eff.Concurrent.Process.Timer showsPrec :: Int -> TimerElapsed -> ShowS # show :: TimerElapsed -> String # showList :: [TimerElapsed] -> ShowS # | |
NFData TimerElapsed Source # | |
Defined in Control.Eff.Concurrent.Process.Timer rnf :: TimerElapsed -> () # |
data TimerReference Source #
The reference to a timer started by startTimer
, required to stop
a timer via cancelTimer
.
Since: 0.12.0
Instances
A number of micro seconds.
Since: 0.12.0
Instances
Enum Timeout Source # | |
Eq Timeout Source # | |
Integral Timeout Source # | |
Defined in Control.Eff.Concurrent.Process.Timer | |
Num Timeout Source # | |
Ord Timeout Source # | |
Defined in Control.Eff.Concurrent.Process.Timer | |
Real Timeout Source # | |
Defined in Control.Eff.Concurrent.Process.Timer toRational :: Timeout -> Rational # | |
Show Timeout Source # | |
NFData Timeout Source # | |
Defined in Control.Eff.Concurrent.Process.Timer |
receiveAfter :: forall a r q. (Lifted IO q, HasCallStack, SetMember Process (Process q) r, Member Interrupts r, Typeable a, NFData a, Show a) => SchedulerProxy q -> Timeout -> Eff r (Maybe a) Source #
Wait for a message of the given type for the given time. When no message
arrives in time, return Nothing
. This is based on
receiveSelectedAfter
.
Since: 0.12.0
receiveSelectedAfter :: forall a r q. (Lifted IO q, HasCallStack, SetMember Process (Process q) r, Member Interrupts r, Show a) => SchedulerProxy q -> MessageSelector a -> Timeout -> Eff r (Either TimerElapsed a) Source #
Wait for a message of the given type for the given time. When no message
arrives in time, return Left
TimerElapsed
. This is based on
selectTimerElapsed
and startTimer
.
Since: 0.12.0
selectTimerElapsed :: TimerReference -> MessageSelector TimerElapsed Source #
A MessageSelector
matching TimerElapsed
messages created by
startTimer
.
Since: 0.12.0
sendAfter :: forall r q message. (Lifted IO q, HasCallStack, SetMember Process (Process q) r, Member Interrupts r, Typeable message, NFData message) => SchedulerProxy q -> ProcessId -> Timeout -> (TimerReference -> message) -> Eff r TimerReference Source #
Send a message to a given process after waiting. The message is created by
applying the function parameter to the TimerReference
, such that the
message can directly refer to the timer.
Since: 0.12.0
startTimer :: forall r q. (Lifted IO q, HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Timeout -> Eff r TimerReference Source #
Start a new timer, after the time has elapsed, TimerElapsed
is sent to
calling process. The message also contains the TimerReference
returned by
this function. Use cancelTimer
to cancel the timer. Use
selectTimerElapsed
to receive the message using receiveSelectedMessage
.
To receive messages with guarded with a timeout see receiveAfter
.
Since: 0.12.0
Data Types and Functions for APIs (aka Protocols)
Instances
Eq (Server api) Source # | |
Ord (Server api) Source # | |
Defined in Control.Eff.Concurrent.Api | |
Typeable api => Show (Server api) Source # | |
data Synchronicity Source #
The (promoted) constructors of this type specify (at the type level) the
reply behavior of a specific constructor of an Api
instance.
Synchronous Type | Specify that handling a request is a blocking operation
with a specific return type, e.g. |
Asynchronous | Non-blocking, asynchronous, request handling |
data family Api (api :: Type) (reply :: Synchronicity) Source #
This data family defines an API, a communication interface description between at least two processes. The processes act as servers or client(s) regarding a specific instance of this type.
The first parameter is usually a user defined phantom type that identifies
the Api
instance.
The second parameter specifies if a specific constructor of an (GADT-like)
Api
instance is Synchronous
, i.e. returns a result and blocks the caller
or if it is Asynchronous
Example:
data BookShop deriving Typeable data instance Api BookShop r where RentBook :: BookId -> Api BookShop ('Synchronous (Either RentalError RentalId)) BringBack :: RentalId -> Api BookShop 'Asynchronous type BookId = Int type RentalId = Int type RentalError = String
Instances
Show (Observation o) => Show (Api (CallbackObserver o) r) Source # | |
Defined in Control.Eff.Concurrent.Api.Observer | |
data Api (CallbackObserver o) r Source # | |
Defined in Control.Eff.Concurrent.Api.Observer data Api (CallbackObserver o) r where
| |
data Api (Observing o) Asynchronous Source # | An Since: 0.14.1 |
Defined in Control.Eff.Concurrent.Api.Server2 data Api (Observing o) Asynchronous where
|
proxyAsServer :: proxy api -> ProcessId -> Server api Source #
Client Functions for Consuming APIs
type ServerReader o = Reader (Server o) Source #
The reader effect for ProcessId
s for Api
s, see registerServer
type ServesApi o r q = (Typeable o, SetMember Process (Process q) r, Member (ServerReader o) r) Source #
cast :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r, Typeable o, Typeable (Api o Asynchronous)) => SchedulerProxy q -> Server o -> Api o Asynchronous -> Eff r () Source #
Send an Api
request that has no return value and return as fast as
possible. The type signature enforces that the corresponding Api
clause is
Asynchronous
. The operation never fails, if it is important to know if the
message was delivered, use call
instead.
call :: forall result api r q. (SetMember Process (Process q) r, Member Interrupts r, Typeable api, Typeable (Api api (Synchronous result)), Typeable result, HasCallStack, NFData result, Show result) => SchedulerProxy q -> Server api -> Api api (Synchronous result) -> Eff r result Source #
Send an Api
request and wait for the server to return a result value.
The type signature enforces that the corresponding Api
clause is
Synchronous
.
registerServer :: HasCallStack => Server o -> Eff (ServerReader o ': r) a -> Eff r a Source #
Run a reader effect that contains the one server handling a specific
Api
instance.
whereIsServer :: Member (ServerReader o) e => Eff e (Server o) Source #
Get the Server
registered with registerServer
.
callRegistered :: (Typeable reply, ServesApi o r q, HasCallStack, NFData reply, Show reply, Member Interrupts r) => SchedulerProxy q -> Api o (Synchronous reply) -> Eff r reply Source #
Like call
but take the Server
from the reader provided by
registerServer
.
castRegistered :: (Typeable o, ServesApi o r q, HasCallStack, Member Interrupts r) => SchedulerProxy q -> Api o Asynchronous -> Eff r () Source #
Like cast
but take the Server
from the reader provided by
registerServer
.
Server Functions for Providing APIs
data ApiServerCmd where Source #
A command to the server loop started e.g. by server
or spawnServerWithEffects
.
Typically returned by an ApiHandler
member to indicate if the server
should continue or stop.
HandleNextRequest :: ApiServerCmd | Tell the server to keep the server loop running |
StopApiServer :: ExitReason Recoverable -> ApiServerCmd | Tell the server to exit, this will make |
Instances
Show ApiServerCmd Source # | |
Defined in Control.Eff.Concurrent.Api.Server showsPrec :: Int -> ApiServerCmd -> ShowS # show :: ApiServerCmd -> String # showList :: [ApiServerCmd] -> ShowS # | |
Generic ApiServerCmd Source # | |
Defined in Control.Eff.Concurrent.Api.Server type Rep ApiServerCmd :: Type -> Type # from :: ApiServerCmd -> Rep ApiServerCmd x # to :: Rep ApiServerCmd x -> ApiServerCmd # | |
NFData ApiServerCmd Source # | |
Defined in Control.Eff.Concurrent.Api.Server rnf :: ApiServerCmd -> () # | |
type Rep ApiServerCmd Source # | |
Defined in Control.Eff.Concurrent.Api.Server type Rep ApiServerCmd = D1 (MetaData "ApiServerCmd" "Control.Eff.Concurrent.Api.Server" "extensible-effects-concurrent-0.14.1-1mJ0mKIgvEZ1rhKLhsVjSQ" False) (C1 (MetaCons "HandleNextRequest" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "StopApiServer" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 (ExitReason Recoverable)))) |
data ApiHandler api eff where Source #
A record of callbacks, handling requests sent to a server Process
, all
belonging to a specific Api
family instance.
The values of this type can be serve
ed or combined via Servable
or
ServerCallback
s.
ApiHandler | |
|
Instances
Default (ApiHandler api eff) Source # | |
Defined in Control.Eff.Concurrent.Api.Server def :: ApiHandler api eff # | |
Typeable a => Servable (ApiHandler a eff) Source # | |
Defined in Control.Eff.Concurrent.Api.Server type ServerEff (ApiHandler a eff) :: [Type -> Type] Source # type ServerPids (ApiHandler a eff) :: Type Source # toServerPids :: proxy (ApiHandler a eff) -> ProcessId -> ServerPids (ApiHandler a eff) Source # toServerCallback :: (Member Interrupts (ServerEff (ApiHandler a eff)), SetMember Process (Process effScheduler) (ServerEff (ApiHandler a eff))) => SchedulerProxy effScheduler -> ApiHandler a eff -> ServerCallback (ServerEff (ApiHandler a eff)) Source # | |
type ServerEff (ApiHandler a eff) Source # | |
Defined in Control.Eff.Concurrent.Api.Server | |
type ServerPids (ApiHandler a eff) Source # | |
Defined in Control.Eff.Concurrent.Api.Server |
apiHandler :: (Api api Asynchronous -> Eff e ApiServerCmd) -> (forall r. Api api (Synchronous r) -> (r -> Eff e ()) -> Eff e ApiServerCmd) -> (ExitReason Recoverable -> Eff e ()) -> ApiHandler api e Source #
Create an ApiHandler
with a _castCallback
, a _callCallback
and
a _terminateCallback
implementation.
apiHandlerForever :: (Api api Asynchronous -> Eff e ()) -> (forall r. Api api (Synchronous r) -> (r -> Eff e ()) -> Eff e ()) -> (ExitReason Recoverable -> Eff e ()) -> ApiHandler api e Source #
Like apiHandler
but the server will loop until an error is raised or
the process exits.
The callback actions won't decide wether to stop the
server or not, instead the ApiServerCmd
HandleNextRequest
is used.
castHandler :: (Api api Asynchronous -> Eff eff ApiServerCmd) -> ApiHandler api eff Source #
Create an ApiHandler
with only a _castCallback
implementation.
castHandlerForever :: (Api api Asynchronous -> Eff eff ()) -> ApiHandler api eff Source #
Like castHandler
but the server will loop until an error is raised or
the process exits. See apiHandlerForver
.
callHandler :: (forall r. Api api (Synchronous r) -> (r -> Eff e ()) -> Eff e ApiServerCmd) -> ApiHandler api e Source #
Create an ApiHandler
with only a _callCallback
implementation.
callHandlerForever :: (forall r. Api api (Synchronous r) -> (r -> Eff e ()) -> Eff e ()) -> ApiHandler api e Source #
Like callHandler
but the server will loop until an error is raised or
the process exits. See apiHandlerForver
.
castAndCallHandler :: (Api api Asynchronous -> Eff e ApiServerCmd) -> (forall r. Api api (Synchronous r) -> (r -> Eff e ()) -> Eff e ApiServerCmd) -> ApiHandler api e Source #
Create an ApiHandler
with only a _castCallback
and _callCallback
implementation.
castAndCallHandlerForever :: (Api api Asynchronous -> Eff e ()) -> (forall r. Api api (Synchronous r) -> (r -> Eff e ()) -> Eff e ()) -> ApiHandler api e Source #
Like castAndCallHandler
but the server will loop until an error is raised or
the process exits. See apiHandlerForver
.
data ServerCallback eff Source #
Building block for composition of ApiHandler
.
A wrapper for ApiHandler
. Use this to combine ApiHandler
, allowing a
process to implement several Api
instances. The termination will be evenly
propagated.
Create this via e.g. Servable
instances
To serve multiple apis use <>
to combine server callbacks, e.g.
@@
let f = apiHandlerServerCallback px $ ApiHandler ...
g = apiHandlerServerCallback px $ ApiHandler ...
h = f <> g
in serve px h
@@
ServerCallback | |
|
Instances
callCallback :: forall api eff reply. Getter (ApiHandler api eff) (Maybe (Api api (Synchronous reply) -> (reply -> Eff eff ()) -> Eff eff ApiServerCmd)) Source #
castCallback :: forall api eff. Lens' (ApiHandler api eff) (Maybe (Api api Asynchronous -> Eff eff ApiServerCmd)) Source #
terminateCallback :: forall api eff. Lens' (ApiHandler api eff) (Maybe (ExitReason Recoverable -> Eff eff ())) Source #
requestHandlerSelector :: forall eff. Lens' (ServerCallback eff) (MessageSelector (Eff eff ApiServerCmd)) Source #
terminationHandler :: forall eff. Lens' (ServerCallback eff) (ExitReason Recoverable -> Eff eff ()) Source #
serve :: forall a effScheduler. (Servable a, SetMember Process (Process effScheduler) (ServerEff a), Member Interrupts (ServerEff a), HasCallStack) => SchedulerProxy effScheduler -> a -> Eff (ServerEff a) () Source #
Receive and process incoming requests until the process exits.
spawnServer :: forall a effScheduler eff. (Servable a, ServerEff a ~ InterruptableProcess effScheduler, SetMember Process (Process effScheduler) eff, Member Interrupts eff, HasCallStack) => SchedulerProxy effScheduler -> a -> Eff eff (ServerPids a) Source #
Spawn a new process, that will receive and process incoming requests until the process exits.
spawnServerWithEffects :: forall a effScheduler eff. (Servable a, SetMember Process (Process effScheduler) (ServerEff a), SetMember Process (Process effScheduler) eff, Member Interrupts eff, Member Interrupts (ServerEff a), HasCallStack) => SchedulerProxy effScheduler -> a -> (Eff (ServerEff a) () -> Eff (InterruptableProcess effScheduler) ()) -> Eff eff (ServerPids a) Source #
Spawn a new process, that will receive and process incoming requests until the process exits. Also handle all internal effects.
unhandledCallError :: forall p x r q. (Typeable p, HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Api p (Synchronous x) -> (x -> Eff r ()) -> Eff r ApiServerCmd Source #
A default handler to use in _callCallback
in ApiHandler
. It will call
raiseError
with a nice error message.
unhandledCastError :: forall p r q. (Typeable p, HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Api p Asynchronous -> Eff r ApiServerCmd Source #
A default handler to use in _castCallback
in ApiHandler
. It will call
raiseError
with a nice error message.
defaultTermination :: forall q r. (HasCallStack, SetMember Process (Process q) r, Member (Logs LogMessage) r) => SchedulerProxy q -> ExitReason Recoverable -> Eff r () Source #
Either do nothing, if the error message is Nothing
,
or call exitWithError
with the error message.
Server Functions for Providing APIs (new experimental)
An Api
type for generic Observer
s, see handleObservations
.
Since: 0.14.1
Instances
(Typeable o, Observable o) => Observer (Observing o) o Source # | |
Defined in Control.Eff.Concurrent.Api.Server2 observationMessage :: Server o -> Observation o -> Api (Observing o) Asynchronous Source # | |
data Api (Observing o) Asynchronous Source # | An Since: 0.14.1 |
Defined in Control.Eff.Concurrent.Api.Server2 data Api (Observing o) Asynchronous where
|
data InterruptCallback eff where Source #
Just a wrapper around a function that will be applied to the result of
a MessageCallback
s StopServer
clause, or an InterruptReason
caught during
the execution of receive
or a MessageCallback
Since: 0.13.2
InterruptCallback :: (InterruptReason -> Eff eff CallbackResult) -> InterruptCallback eff |
Instances
Default (InterruptCallback eff) Source # | |
Defined in Control.Eff.Concurrent.Api.Server2 def :: InterruptCallback eff # |
class ToServerPids (t :: k) where Source #
Helper type class for the return values of spawnApiServer
et al.
Since: 0.13.2
type ServerPids t Source #
toServerPids :: proxy t -> ProcessId -> ServerPids t Source #
Instances
ToServerPids api1 => ToServerPids (api1 :: Type) Source # | |
Defined in Control.Eff.Concurrent.Api.Server2 type ServerPids api1 :: Type Source # toServerPids :: proxy api1 -> ProcessId -> ServerPids api1 Source # | |
ToServerPids ([] :: [k]) Source # | |
Defined in Control.Eff.Concurrent.Api.Server2 type ServerPids [] :: Type Source # toServerPids :: proxy [] -> ProcessId -> ServerPids [] Source # | |
(ToServerPids api1, ToServerPids api2) => ToServerPids (api1 ': api2 :: [Type]) Source # | |
Defined in Control.Eff.Concurrent.Api.Server2 type ServerPids (api1 ': api2) :: Type Source # toServerPids :: proxy (api1 ': api2) -> ProcessId -> ServerPids (api1 ': api2) Source # |
data MessageCallback api eff where Source #
An existential wrapper around a MessageSelector
and a function that
handles the selected message. The api
type parameter is a phantom type.
The return value if the handler function is a CallbackResult
.
Since: 0.13.2
MessageCallback :: MessageSelector a -> (a -> Eff eff CallbackResult) -> MessageCallback api eff |
Instances
Semigroup (MessageCallback api eff) Source # | |
Defined in Control.Eff.Concurrent.Api.Server2 (<>) :: MessageCallback api eff -> MessageCallback api eff -> MessageCallback api eff # sconcat :: NonEmpty (MessageCallback api eff) -> MessageCallback api eff # stimes :: Integral b => b -> MessageCallback api eff -> MessageCallback api eff # | |
Monoid (MessageCallback api eff) Source # | |
Defined in Control.Eff.Concurrent.Api.Server2 mempty :: MessageCallback api eff # mappend :: MessageCallback api eff -> MessageCallback api eff -> MessageCallback api eff # mconcat :: [MessageCallback api eff] -> MessageCallback api eff # | |
Default (MessageCallback api eff) Source # | |
Defined in Control.Eff.Concurrent.Api.Server2 def :: MessageCallback api eff # |
data CallbackResult where Source #
A command to the server loop started e.g. by server
or spawnServerWithEffects
.
Typically returned by an ApiHandler
member to indicate if the server
should continue or stop.
Since: 0.13.2
AwaitNext :: CallbackResult | Tell the server to keep the server loop running |
StopServer :: InterruptReason -> CallbackResult | Tell the server to exit, this will make |
spawnApiServer :: forall api eff. (ToServerPids api, HasCallStack) => MessageCallback api (InterruptableProcess eff) -> InterruptCallback (ConsProcess eff) -> Eff (InterruptableProcess eff) (ServerPids api) Source #
Server an Api
in a newly spawned process.
Since: 0.13.2
spawnApiServerStateful :: forall api eff state. (HasCallStack, ToServerPids api) => Eff (InterruptableProcess eff) state -> MessageCallback api (State state ': InterruptableProcess eff) -> InterruptCallback (State state ': ConsProcess eff) -> Eff (InterruptableProcess eff) (ServerPids api) Source #
Server an Api
in a newly spawned process; the callbacks have access
to some state initialed by the function in the first parameter.
Since: 0.13.2
spawnApiServerEffectful :: forall api eff serverEff. (HasCallStack, ToServerPids api, Member Interrupts serverEff, SetMember Process (Process eff) serverEff) => (forall b. Eff serverEff b -> Eff (InterruptableProcess eff) b) -> MessageCallback api serverEff -> InterruptCallback serverEff -> Eff (InterruptableProcess eff) (ServerPids api) Source #
Server an Api
in a newly spawned process; The caller provides an
effect handler for arbitrary effects used by the server callbacks.
Since: 0.13.2
handleMessages :: forall eff a. (HasCallStack, NFData a, Typeable a) => (a -> Eff eff CallbackResult) -> MessageCallback '[] eff Source #
A smart constructor for MessageCallback
s
Since: 0.13.2
handleSelectedMessages :: forall eff a. HasCallStack => MessageSelector a -> (a -> Eff eff CallbackResult) -> MessageCallback '[] eff Source #
A smart constructor for MessageCallback
s
Since: 0.13.2
handleAnyMessages :: forall eff. HasCallStack => (Dynamic -> Eff eff CallbackResult) -> MessageCallback '[] eff Source #
A smart constructor for MessageCallback
s
Since: 0.13.2
handleCasts :: forall api eff. (HasCallStack, Typeable api, Typeable (Api api Asynchronous)) => (Api api Asynchronous -> Eff eff CallbackResult) -> MessageCallback api eff Source #
A smart constructor for MessageCallback
s
Since: 0.13.2
handleCalls :: forall api eff effScheduler. (HasCallStack, Typeable api, SetMember Process (Process effScheduler) eff, Member Interrupts eff) => SchedulerProxy effScheduler -> (forall secret reply. (Typeable reply, Typeable (Api api (Synchronous reply))) => Api api (Synchronous reply) -> (Eff eff (Maybe reply, CallbackResult) -> secret) -> secret) -> MessageCallback api eff Source #
A smart constructor for MessageCallback
s
handleCalls SP (\ (RentBook bookId customerId) runCall -> runCall $ do rentalIdE <- rentBook bookId customerId case rentalIdE of -- on fail we just don't send a reply, let the caller run into -- timeout Left err -> return (Nothing, AwaitNext) Right rentalId -> return (Just rentalId, AwaitNext))
Since: 0.13.2
handleCastsAndCalls :: forall api eff effScheduler. (HasCallStack, Typeable api, Typeable (Api api Asynchronous), SetMember Process (Process effScheduler) eff, Member Interrupts eff) => SchedulerProxy effScheduler -> (Api api Asynchronous -> Eff eff CallbackResult) -> (forall secret reply. (Typeable reply, Typeable (Api api (Synchronous reply))) => Api api (Synchronous reply) -> (Eff eff (Maybe reply, CallbackResult) -> secret) -> secret) -> MessageCallback api eff Source #
A smart constructor for MessageCallback
s
Since: 0.13.2
handleProcessDowns :: forall eff. HasCallStack => (MonitorReference -> Eff eff CallbackResult) -> MessageCallback '[] eff Source #
A smart constructor for MessageCallback
s
Since: 0.13.2
(^:) :: forall (api1 :: Type) (apis2 :: [Type]) eff. HasCallStack => MessageCallback api1 eff -> MessageCallback apis2 eff -> MessageCallback (api1 ': apis2) eff infixr 5 Source #
Compose two Api
s to a type-leve pair of them.
handleCalls api1calls ^: handleCalls api2calls ^:
Since: 0.13.2
fallbackHandler :: forall api eff. HasCallStack => MessageCallback api eff -> MessageCallback '[] eff Source #
Make a fallback handler, i.e. a handler to which no other can be composed to from the right.
Since: 0.13.2
dropUnhandledMessages :: forall eff. HasCallStack => MessageCallback '[] eff Source #
A fallbackHandler
that drops the left-over messages.
Since: 0.13.2
exitOnUnhandled :: forall eff. HasCallStack => MessageCallback '[] eff Source #
A fallbackHandler
that terminates if there are unhandled messages.
Since: 0.13.2
logUnhandledMessages :: forall eff. (Member (Logs LogMessage) eff, HasCallStack) => MessageCallback '[] eff Source #
A fallbackHandler
that drops the left-over messages.
Since: 0.13.2
stopServerOnInterrupt :: forall eff. HasCallStack => InterruptCallback eff Source #
A smart constructor for InterruptCallback
s
Since: 0.13.2
handleObservations :: Typeable o => (Server o -> Observation o -> Eff e CallbackResult) -> MessageCallback (Observing o) e Source #
Apply a given callback function to incoming Observeration
s.
Since: 0.14.1
Observer Functions for Events and Event Listener
data CallbackObserver o Source #
An Observer
that schedules the observations to an effectful callback.
Instances
Show (Observation o) => Show (Api (CallbackObserver o) r) Source # | |
Defined in Control.Eff.Concurrent.Api.Observer | |
Observable o => Observer (CallbackObserver o) o Source # | |
Defined in Control.Eff.Concurrent.Api.Observer observationMessage :: Server o -> Observation o -> Api (CallbackObserver o) Asynchronous Source # | |
data Api (CallbackObserver o) r Source # | |
Defined in Control.Eff.Concurrent.Api.Observer data Api (CallbackObserver o) r where
|
type ObserverState o = State (Observers o) Source #
Alias for the effect that contains the observers managed by manageObservers
Internal state for manageObservers
data SomeObserver o where Source #
An existential wrapper around a Server
of an Observer
.
Needed to support different types of observers to observe the
same Observable
in a general fashion.
SomeObserver :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o |
Instances
Eq (SomeObserver o) Source # | |
Defined in Control.Eff.Concurrent.Api.Observer (==) :: SomeObserver o -> SomeObserver o -> Bool # (/=) :: SomeObserver o -> SomeObserver o -> Bool # | |
Ord (SomeObserver o) Source # | |
Defined in Control.Eff.Concurrent.Api.Observer compare :: SomeObserver o -> SomeObserver o -> Ordering # (<) :: SomeObserver o -> SomeObserver o -> Bool # (<=) :: SomeObserver o -> SomeObserver o -> Bool # (>) :: SomeObserver o -> SomeObserver o -> Bool # (>=) :: SomeObserver o -> SomeObserver o -> Bool # max :: SomeObserver o -> SomeObserver o -> SomeObserver o # min :: SomeObserver o -> SomeObserver o -> SomeObserver o # | |
Show (SomeObserver o) Source # | |
Defined in Control.Eff.Concurrent.Api.Observer showsPrec :: Int -> SomeObserver o -> ShowS # show :: SomeObserver o -> String # showList :: [SomeObserver o] -> ShowS # |
class (Typeable o, Typeable (Observation o)) => Observable o where Source #
data Observation o Source #
Type of observations visible on this observable
registerObserverMessage :: SomeObserver o -> Api o Asynchronous Source #
Return the Api
value for the cast_
that registeres an observer
forgetObserverMessage :: SomeObserver o -> Api o Asynchronous Source #
Return the Api
value for the cast_
that de-registeres an observer
class (Typeable p, Observable o) => Observer p o where Source #
An Api
index that support observation of the
another Api
that is Observable
.
observationMessage :: Server o -> Observation o -> Api p Asynchronous Source #
Wrap the Observation
and the ProcessId
(i.e. the Server
)
that caused the observation into an Api
value that the
Observable
understands.
Instances
Observable o => Observer (CallbackObserver o) o Source # | |
Defined in Control.Eff.Concurrent.Api.Observer observationMessage :: Server o -> Observation o -> Api (CallbackObserver o) Asynchronous Source # | |
(Typeable o, Observable o) => Observer (Observing o) o Source # | |
Defined in Control.Eff.Concurrent.Api.Server2 observationMessage :: Server o -> Observation o -> Api (Observing o) Asynchronous Source # |
notifyObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack, Member Interrupts r) => SchedulerProxy q -> Server p -> Server o -> Observation o -> Eff r () Source #
Send an Observation
to an Observer
registerObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack, Member Interrupts r) => SchedulerProxy q -> Server p -> Server o -> Eff r () Source #
Send the registerObserverMessage
forgetObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, Member Interrupts r) => SchedulerProxy q -> Server p -> Server o -> Eff r () Source #
Send the forgetObserverMessage
notifySomeObserver :: (SetMember Process (Process q) r, Observable o, HasCallStack, Member Interrupts r) => SchedulerProxy q -> Server o -> Observation o -> SomeObserver o -> Eff r () Source #
Send an Observation
to SomeObserver
.
manageObservers :: Eff (ObserverState o ': r) a -> Eff r a Source #
Keep track of registered Observer
s Observers can be added and removed,
and an Observation
can be sent to all registerd observers at once.
addObserver :: (SetMember Process (Process q) r, Member (ObserverState o) r, Observable o) => SomeObserver o -> Eff r () Source #
Add an Observer
to the Observers
managed by manageObservers
.
removeObserver :: (SetMember Process (Process q) r, Member (ObserverState o) r, Observable o, Member Interrupts r) => SomeObserver o -> Eff r () Source #
Delete an Observer
from the Observers
managed by manageObservers
.
notifyObservers :: forall o r q. (Observable o, SetMember Process (Process q) r, Member (ObserverState o) r, Member Interrupts r) => SchedulerProxy q -> Observation o -> Eff r () Source #
Send an Observation
to all SomeObserver
s in the Observers
state.
spawnCallbackObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q, Member Interrupts r, HasCallStack) => SchedulerProxy q -> (Server o -> Observation o -> Eff (InterruptableProcess q) ApiServerCmd) -> Eff r (Server (CallbackObserver o)) Source #
Start a new process for an Observer
that schedules
all observations to an effectful callback.
spawnLoggingObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q, Member (Logs LogMessage) r, Member Interrupts r, HasCallStack) => SchedulerProxy q -> Eff r (Server (CallbackObserver o)) Source #
Start a new process for an Observer
that schedules
all observations to an effectful callback.
Since: 0.3.0.0
Capture Observation in a FIFO Queue
type ObservationQueueReader a = Reader (ObservationQueue a) Source #
A Reader
for an ObservationQueue
.
data ObservationQueue a Source #
Contains a TBQueue
capturing observations received by enqueueObservationsRegistered
or enqueueObservations
.
readObservationQueue :: forall o r. (Member (ObservationQueueReader o) r, HasCallStack, MonadIO (Eff r), Typeable o, HasLogging IO r) => Eff r (Observation o) Source #
Read queued observations captured by observing a Server
that implements
an Observable
Api
using enqueueObservationsRegistered
or enqueueObservations
.
This blocks until the next Observation
received. For a non-blocking
variant use tryReadObservationQueue
or flushObservationQueue
.
tryReadObservationQueue :: forall o r. (Member (ObservationQueueReader o) r, HasCallStack, MonadIO (Eff r), Typeable o, HasLogging IO r) => Eff r (Maybe (Observation o)) Source #
Read queued observations captured by observing a Server
that implements
an Observable
Api
using enqueueObservationsRegistered
or enqueueObservations
.
Return the next Observation
immediately or Nothing
if the queue is empty.
Use readObservationQueue
to block until an observation is observed.
flushObservationQueue :: forall o r. (Member (ObservationQueueReader o) r, HasCallStack, MonadIO (Eff r), Typeable o, HasLogging IO r) => Eff r [Observation o] Source #
Read all currently queued Observation
s captured by enqueueObservations
.
This returns immediately all currently enqueued Observation
s. For a blocking
variant use readObservationQueue
.
enqueueObservationsRegistered :: forall o r q a. (ServesApi o r q, SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, HasLogging IO q, HasLogging IO r, Member Interrupts r, Lifted IO r, HasCallStack) => SchedulerProxy q -> Int -> Eff (ObservationQueueReader o ': r) a -> Eff r a Source #
Observe a(the) registered Server
that implements an Observable
Api
.
Based on enqueueObservations
.
enqueueObservations :: forall o r q a. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, HasLogging IO r, HasLogging IO q, Member Interrupts r, Lifted IO q, HasCallStack) => SchedulerProxy q -> Server o -> Int -> Eff (ObservationQueueReader o ': r) a -> Eff r a Source #
Observe a Server
that implements an Observable
Api
, the Observation
s
can be obtained by readObservationQueue
. All observations are captured up to
the queue size limit, such that the first message received will be first message
returned by readObservationQueue
.
This function captures runtime exceptions and cleans up accordingly.
Scheduler Process Effect Handler
Concurrent Scheduler
type SchedulerIO = Reader SchedulerState ': LoggingAndIO Source #
The concrete list of Eff
ects for this scheduler implementation.
type HasSchedulerIO r = (HasCallStack, Lifted IO r, SchedulerIO <:: r) Source #
Type class constraint to indicate that an effect union contains the effects required by every process and the scheduler implementation itself.
type InterruptableProcEff = InterruptableProcess SchedulerIO Source #
The concrete list of the effects, that the Process
uses
type ProcEff = ConsProcess SchedulerIO Source #
The concrete list of Eff
ects of processes compatible with this scheduler.
This builds upon SchedulerIO
.
defaultMain :: HasCallStack => Eff InterruptableProcEff () -> IO () Source #
Start the message passing concurrency system then execute a Process
on
top of SchedulerIO
effect. All logging is sent to standard output.
defaultMainWithLogChannel :: HasCallStack => Eff InterruptableProcEff () -> LogChannel LogMessage -> IO () Source #
Start the message passing concurrency system then execute a Process
on
top of SchedulerIO
effect. All logging is sent to standard output.
schedule :: (HasLogging IO SchedulerIO, HasCallStack) => Eff InterruptableProcEff () -> Eff LoggingAndIO () Source #
This is the main entry point to running a message passing concurrency
application. This function takes a Process
on top of the SchedulerIO
effect and a LogChannel
for concurrent logging.
Single Threaded Scheduler
schedulePure :: Eff (InterruptableProcess '[Logs LogMessage]) a -> Either (ExitReason NoRecovery) a Source #
Like schedule
but pure. The yield
effect is just return ()
.
schedulePure == runIdentity .
scheduleM
(Identity . run) (return ())
Since: 0.3.0.2
singleThreadedIoScheduler :: SchedulerProxy LoggingAndIo Source #
A SchedulerProxy
for LoggingAndIo
.
defaultMainSingleThreaded :: HasCallStack => Eff (InterruptableProcess '[Logs LogMessage, LogWriterReader LogMessage IO, Lift IO]) () -> IO () Source #
Utilities
Logging Effect
module Control.Eff.Log
Preventing Space Leaks
module Control.Eff.Loop