-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Message passing concurrency as extensible-effect -- -- Please see the README on GitHub at -- https://github.com/sheyll/extensible-effects-concurrent#readme @package extensible-effects-concurrent @version 0.6.0 -- | The message passing effect. -- -- This module describes an abstract message passing effect, and a -- process effect, mimicking Erlang's process and message semantics. -- -- Two scheduler implementations for the Process effect are -- provided: -- --
-- data BookShop deriving Typeable
--
-- data instance Api BookShop r where
-- RentBook :: BookId -> Api BookShop ('Synchronous (Either RentalError RentalId))
-- BringBack :: RentalId -> Api BookShop 'Asynchronous
--
-- type BookId = Int
-- type RentalId = Int
-- type RentalError = String
--
-- | The (promoted) constructors of this type specify (at the type level)
-- the reply behavior of a specific constructor of an Api
-- instance.
data Synchronicity
-- | Specify that handling a request is a blocking operation with a
-- specific return type, e.g. ('Synchronous (Either RentalError
-- RentalId))
Synchronous :: Type -> Synchronicity
-- | Non-blocking, asynchronous, request handling
Asynchronous :: Synchronicity
-- | This is a tag-type that wraps around a ProcessId and holds an
-- Api index type.
newtype Server api
Server :: ProcessId -> Server api
[_fromServer] :: Server api -> ProcessId
fromServer :: forall api_aoVK api_ap5G. Iso (Server api_aoVK) (Server api_ap5G) ProcessId ProcessId
-- | Tag a ProcessId with an Api type index to mark it a
-- Server process handling that API
proxyAsServer :: proxy api -> ProcessId -> Server api
-- | Tag a ProcessId with an Api type index to mark it a
-- Server process handling that API
asServer :: forall api. ProcessId -> Server api
instance forall k (api :: k). GHC.Classes.Ord (Control.Eff.Concurrent.Api.Server api)
instance forall k (api :: k). GHC.Classes.Eq (Control.Eff.Concurrent.Api.Server api)
instance forall k (api :: k). GHC.Read.Read (Control.Eff.Concurrent.Api.Server api)
instance forall k (api :: k). Data.Typeable.Internal.Typeable api => GHC.Show.Show (Control.Eff.Concurrent.Api.Server api)
-- | Functions to implement Api servers.
module Control.Eff.Concurrent.Api.Server
-- | Receive and process incoming requests until the process exits, using
-- an ApiHandler.
serve :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Eff r ()
-- | A record of callbacks, handling requests sent to a server
-- Process, all belonging to a specific Api family
-- instance.
data ApiHandler p r
[ApiHandler] :: {_handleCast :: HasCallStack => Api p 'Asynchronous -> Eff r () A cast will not return a result directly. This is used for async methods., _handleCall :: forall x. HasCallStack => Api p ( 'Synchronous x) -> (x -> Eff r ()) -> Eff r () A call is a blocking operation, the caller is blocked until this handler calls the reply continuation., _handleTerminate :: HasCallStack => Maybe String -> Eff r () This callback is called with @Nothing@ if the process exits peacefully, or @Just "error message..."@ if the process exits with an error. This function is responsible to exit the process if necessary. The default behavior is defined in 'defaultTermination'.} -> ApiHandler p r
-- | A default handler to use in _handleCall in ApiHandler.
-- It will call raiseError with a nice error message.
unhandledCallError :: forall p x r q. (Show (Api p ( 'Synchronous x)), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p ( 'Synchronous x) -> (x -> Eff r ()) -> Eff r ()
-- | A default handler to use in _handleCast in ApiHandler.
-- It will call raiseError with a nice error message.
unhandledCastError :: forall p r q. (Show (Api p 'Asynchronous), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p 'Asynchronous -> Eff r ()
-- | Exit the process either normally of the error message is
-- Nothing or with exitWithError otherwise.
defaultTermination :: forall q r. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Maybe String -> Eff r ()
-- | serve two ApiHandlers at once. The first handler is used
-- for termination handling.
serveBoth :: forall r q p1 p2. (Typeable p1, Typeable p2, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> Eff r ()
-- | serve three ApiHandlers at once. The first handler is
-- used for termination handling.
serve3 :: forall r q p1 p2 p3. (Typeable p1, Typeable p2, Typeable p3, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> ApiHandler p3 r -> Eff r ()
-- | The basic building block of the combination of ApiHandlers is
-- this function, which can not only be passed to receiveLoop, but
-- also throws an UnhandledRequest exception if
-- requestFromDynamic failed, such that multiple invokation of
-- this function for different ApiHandlers can be tried, by using
-- catchUnhandled.
--
-- -- tryApiHandler px handlers1 message -- `catchUnhandled` -- tryApiHandler px handlers2 -- `catchUnhandled` -- tryApiHandler px handlers3 --tryApiHandler :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Dynamic -> Eff (Exc UnhandledRequest : r) () -- | An exception that is used by the mechanism that chains together -- multiple different ApiHandler allowing a single process to -- implement multiple Apis. This exception is thrown by -- requestFromDynamic. This exception can be caught with -- catchUnhandled, this way, several distinct ApiHandler -- can be tried until one fits or until the exitUnhandled -- is invoked. data UnhandledRequest -- | If requestFromDynamic failes to cast the message to a -- Request for a certain ApiHandler it throws an -- UnhandledRequest exception. That exception is caught by this -- function and the raw message is given to the handler function. This is -- the basis for chaining ApiHandlers. catchUnhandled :: forall r a. (Member (Exc UnhandledRequest) r, HasCallStack) => Eff r a -> (Dynamic -> Eff r a) -> Eff r a -- | Catch UnhandledRequests and terminate the process with an -- error, if necessary. ensureAllHandled :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff (Exc UnhandledRequest : r) () -> Eff r () -- | Cast a Dynamic value, and if that fails, throw an -- UnhandledRequest error. requestFromDynamic :: forall r a. (HasCallStack, Typeable a, Member (Exc UnhandledRequest) r) => Dynamic -> Eff r a -- | If an incoming message could not be casted to a Request -- corresponding to an ApiHandler (e.g. with -- requestFromDynamic) one should use this function to exit the -- process with a corresponding error. exitUnhandled :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Dynamic -> Eff r () -- | Functions for Api clients. -- -- This modules is required to write clients that consume an Api. module Control.Eff.Concurrent.Api.Client -- | Send an Api request that has no return value and return as fast -- as possible. The type signature enforces that the corresponding -- Api clause is Asynchronous. cast :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o 'Asynchronous)) => SchedulerProxy q -> Server o -> Api o 'Asynchronous -> Eff r () -- | Send an Api request that has no return value and return as fast -- as possible. The type signature enforces that the corresponding -- Api clause is Asynchronous. Return True if the -- message was sent to the process. Note that this is totally not the -- same as that the request was successfully handled. If that is -- important, use call instead. castChecked :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o 'Asynchronous)) => SchedulerProxy q -> Server o -> Api o 'Asynchronous -> Eff r Bool -- | Send an Api request and wait for the server to return a result -- value. -- -- The type signature enforces that the corresponding Api clause -- is Synchronous. call :: forall result api r q. (SetMember Process (Process q) r, Typeable api, Typeable (Api api ( 'Synchronous result)), Typeable result, HasCallStack) => SchedulerProxy q -> Server api -> Api api ( 'Synchronous result) -> Eff r result -- | Like cast but take the Server from the reader provided -- by registerServer. castRegistered :: (Typeable o, ServesApi o r q) => SchedulerProxy q -> Api o 'Asynchronous -> Eff r () -- | Like call but take the Server from the reader provided -- by registerServer. callRegistered :: (Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o ( 'Synchronous reply) -> Eff r reply -- | Like callRegistered but also catch errors raised if e.g. the -- server crashed. By allowing Alternative instances to contain -- the reply, application level errors can be combined with errors rising -- from inter process communication. callRegisteredA :: forall r q o f reply. (Alternative f, Typeable f, Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o ( 'Synchronous (f reply)) -> Eff r (f reply) -- | Instead of passing around a Server value and passing to -- functions like cast or call, a Server can -- provided by a Reader effect, if there is only a single -- server for a given Api instance. This type alias is -- convenience to express that an effect has Process and a reader -- for a Server. type ServesApi o r q = (Typeable o, SetMember Process (Process q) r, Member (Reader (Server o)) r) -- | Run a reader effect that contains the one server handling a -- specific Api instance. registerServer :: Server o -> Eff (Reader (Server o) : r) a -> Eff r a -- | This module provides support for executing Process actions from -- IO. -- -- One use case is interacting with processes from the REPL, e.g.: -- --
-- >>> import Control.Eff.Concurrent.Process.SingleThreadedScheduler (defaultMain) ---- --
-- >>> import Control.Eff.Loop ---- --
-- >>> import Data.Dynamic ---- --
-- >>> import Data.Maybe ---- --
-- >>> s <- forkInteractiveScheduler Control.Eff.Concurrent.Process.SingleThreadedScheduler.defaultMain ---- --
-- >>> fooPid <- submit s (spawn (foreverCheap (receiveMessage SP >>= (logMsg . fromMaybe "Huh!??" . fromDynamic)))) ---- --
-- >>> fooPid -- <0.1.0> ---- --
-- >>> submit s (sendMessageAs SP fooPid "test") -- test ---- --
-- >>> submit s (sendShutdown SP fooPid) --module Control.Eff.Concurrent.Process.Interactive -- | Contains the communication channels to interact with a scheduler -- running in its' own thread. data SchedulerSession r -- | Fork a scheduler with a process that communicates with it via -- MVar, which is also the reason for the Lift IO -- constraint. forkInteractiveScheduler :: forall r. (SetMember Lift (Lift IO) r) => (Eff (Process r : r) () -> IO ()) -> IO (SchedulerSession r) -- | Exit the schedulder immediately using an asynchronous exception. killInteractiveScheduler :: SchedulerSession r -> IO () -- | Send a Process effect to the main process of a scheduler, this -- blocks until the effect is executed. submit :: forall r a. (SetMember Lift (Lift IO) r) => SchedulerSession r -> Eff (Process r : r) a -> IO a -- | Combination of submit and cast. submitCast :: forall o r. (SetMember Lift (Lift IO) r, Typeable o) => SchedulerSession r -> Server o -> Api o 'Asynchronous -> IO () -- | Combination of submit and cast. submitCall :: forall o q r. (SetMember Lift (Lift IO) r, Typeable o, Typeable q) => SchedulerSession r -> Server o -> Api o ( 'Synchronous q) -> IO q -- | Add-ons to Exception module Control.Eff.ExceptionExtra -- | Catch Exception thrown by an effect. liftTry :: forall e r a. (HasCallStack, Exception e, SetMember Lift (Lift IO) r) => Eff r a -> Eff r (Either e a) -- | A logging effect based on MonadLog. module Control.Eff.Log.Handler -- | Logging effect type, parameterized by a log message type. data Logs message a [LogMsg] :: message -> Logs message () -- | Log a message. logMsg :: Member (Logs m) r => m -> Eff r () -- | Change, add or remove log messages and perform arbitrary actions upon -- intercepting a log message. -- -- Requirements: -- --
-- scheduleIOWithLogging == run . captureLogs . schedule (return ()) --scheduleIOWithLogging :: (NFData l, MonadIO m) => (l -> m ()) -> Eff (ConsProcess '[Logs l, Lift m]) a -> m (Either String a) -- | Execute a Process using schedule on top of Lift -- IO and Logs String effects. defaultMain :: HasCallStack => Eff '[Process '[Logs LogMessage, Lift IO], Logs LogMessage, Lift IO] () -> IO () -- | A SchedulerProxy for LoggingAndIo. singleThreadedIoScheduler :: SchedulerProxy LoggingAndIo -- | The concrete list of Effects for running this pure scheduler on -- IO and with string logging. type LoggingAndIo = '[Logs LogMessage, Lift IO] -- | Implement Erlang style message passing concurrency. -- -- This handles the MessagePassing and Process effects, -- using TQueues and forkIO. -- -- This aims to be a pragmatic implementation, so even logging is -- supported. -- -- At the core is a main process that enters schedule and -- creates all of the internal state stored in TVars to manage -- processes with message queues. -- -- The Eff handler for Process and MessagePassing -- use are implemented and available through spawn. -- -- spawn uses forkFinally and TQueues and tries to -- catch most exceptions. module Control.Eff.Concurrent.Process.ForkIOScheduler -- | This is the main entry point to running a message passing concurrency -- application. This function takes a Process on top of the -- SchedulerIO effect and a LogChannel for concurrent -- logging. schedule :: Eff (ConsProcess SchedulerIO) () -> LogChannel LogMessage -> IO () -- | Start the message passing concurrency system then execute a -- Process on top of SchedulerIO effect. All logging is -- sent to standard output. defaultMain :: Eff (ConsProcess SchedulerIO) () -> IO () -- | Start the message passing concurrency system then execute a -- Process on top of SchedulerIO effect. All logging is -- sent to standard output. defaultMainWithLogChannel :: LogChannel LogMessage -> Eff (ConsProcess SchedulerIO) () -> IO () -- | A sum-type with errors that can occur when scheduleing messages. data SchedulerError -- | No process info was found for a ProcessId during internal -- processing. NOTE: This is **ONLY** caused by internal errors, probably -- by an incorrect MessagePassing handler in this module. -- **Sending a message to a process ALWAYS succeeds!** Even if the -- process does not exist. ProcessNotFound :: ProcessId -> SchedulerError -- | A process called raiseError. ProcessRaisedError :: String -> SchedulerError -- | A process called exitWithError. ProcessExitError :: String -> SchedulerError -- | A process exits. ProcessShuttingDown :: SchedulerError -- | An action was not performed while the scheduler was exiting. SchedulerShuttingDown :: SchedulerError -- | The concrete list of Effects for this scheduler implementation. -- See HasSchedulerIO type SchedulerIO = '[Reader SchedulerVar, Logs LogMessage, Lift IO] -- | A SchedulerProxy for SchedulerIO forkIoScheduler :: SchedulerProxy SchedulerIO -- | An alias for the constraints for the effects essential to this -- scheduler implementation, i.e. these effects allow spawning new -- Processes. See SchedulerIO type HasSchedulerIO r = (HasCallStack, SetMember Lift (Lift IO) r, Member (Logs LogMessage) r, Member (Reader SchedulerVar) r) instance GHC.Show.Show Control.Eff.Concurrent.Process.ForkIOScheduler.SchedulerError instance GHC.Exception.Exception Control.Eff.Concurrent.Process.ForkIOScheduler.SchedulerError instance GHC.Show.Show Control.Eff.Concurrent.Process.ForkIOScheduler.ProcessInfo -- | Observer Effects -- -- This module supports the implementation of observers and observables. -- One use case is event propagation. The tools in this module are -- tailored towards Api servers/clients. module Control.Eff.Concurrent.Api.Observer -- | An Api index that support observation of the another Api -- that is Observable. class (Typeable p, Observable o) => Observer p o -- | Wrap the Observation and the ProcessId (i.e. the -- Server) that caused the observation into a Api value -- that the Observable understands. observationMessage :: Observer p o => Server o -> Observation o -> Api p 'Asynchronous -- | An Api index that supports registration and de-registration of -- Observers. class (Typeable o, Typeable (Observation o)) => Observable o where { data family Observation o; } -- | Return the Api value for the cast_ that registeres an -- observer registerObserverMessage :: Observable o => SomeObserver o -> Api o 'Asynchronous -- | Return the Api value for the cast_ that de-registeres -- an observer forgetObserverMessage :: Observable o => SomeObserver o -> Api o 'Asynchronous -- | Send an Observation to an Observer notifyObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Observation o -> Eff r () -- | Send the registerObserverMessage registerObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Eff r () -- | Send the forgetObserverMessage forgetObserver :: (SetMember Process (Process q) r, Observable o, Observer p o) => SchedulerProxy q -> Server p -> Server o -> Eff r () -- | An existential wrapper around a Server of an Observer. -- Needed to support different types of observers to observe the same -- Observable in a general fashion. data SomeObserver o [SomeObserver] :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o -- | Send an Observation to SomeObserver. notifySomeObserver :: (SetMember Process (Process q) r, Observable o, HasCallStack) => SchedulerProxy q -> Server o -> Observation o -> SomeObserver o -> Eff r () -- | Internal state for manageobservers data Observers o -- | Keep track of registered Observers Observers can be added and -- removed, and an Observation can be sent to all registerd -- observers at once. manageObservers :: Eff (State (Observers o) : r) a -> Eff r a -- | Add an Observer to the Observers managed by -- manageObservers. addObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r () -- | Delete an Observer from the Observers managed by -- manageObservers. removeObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r () -- | Send an Observation to all SomeObservers in the -- Observers state. notifyObservers :: forall o r q. (Observable o, SetMember Process (Process q) r, Member (State (Observers o)) r) => SchedulerProxy q -> Observation o -> Eff r () -- | An Observer that schedules the observations to an effectful -- callback. data CallbackObserver o -- | Start a new process for an Observer that schedules all -- observations to an effectful callback. spawnCallbackObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q) => SchedulerProxy q -> (Server o -> Observation o -> Eff (Process q : q) Bool) -> Eff r (Server (CallbackObserver o)) -- | Use spawnCallbackObserver to create a universal logging -- observer, using the Show instance of the Observation. | -- Start a new process for an Observer that schedules all -- observations to an effectful callback. spawnLoggingObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q) => SchedulerProxy q -> Eff r (Server (CallbackObserver o)) instance GHC.Show.Show (Control.Eff.Concurrent.Api.Observer.SomeObserver o) instance GHC.Show.Show (Control.Eff.Concurrent.Api.Observer.Observation o) => GHC.Show.Show (Control.Eff.Concurrent.Api.Api (Control.Eff.Concurrent.Api.Observer.CallbackObserver o) r) instance Control.Eff.Concurrent.Api.Observer.Observable o => Control.Eff.Concurrent.Api.Observer.Observer (Control.Eff.Concurrent.Api.Observer.CallbackObserver o) o instance GHC.Classes.Ord (Control.Eff.Concurrent.Api.Observer.SomeObserver o) instance GHC.Classes.Eq (Control.Eff.Concurrent.Api.Observer.SomeObserver o) -- | In rare occasions GHC optimizes innocent looking loops into -- space-leaking monsters. See the discussion here: GHC issue -- 13080 for more details, or this blog post about space leaks in -- nested loops -- -- These functions in this module might help, at least in -- conjunction with the -fno-full-laziness GHC option. -- -- There is a unit test in the sources of this module, which can be used -- to do a comperative heap profiling of these function vs. their -- counterparts in the base package. -- -- Here are the images of the profiling results, the images show that the -- functions in this module do not leak space, compared to the original -- functions (forever and replicateM_): -- module Control.Eff.Loop -- | A version of forever that hopefully tricks GHC into -- not creating a space leak. The intuition is, that we -- want to do something that is cheap, and hence should be -- recomputed instead of shared. foreverCheap :: Monad m => m a -> m () -- | A version of replicateM_ that hopefully tricks GHC into -- not creating a space leak. The intuition is, that we -- want to do something that is cheap, and hence should be -- recomputed instead of shared. replicateCheapM_ :: Monad m => Int -> m a -> m () -- | Erlang style processes with message passing concurrency based on -- (more) extensible-effects. module Control.Eff.Concurrent -- | Each process is identified by a single process id, that stays constant -- throughout the life cycle of a process. Also, message sending relies -- on these values to address messages to processes. newtype ProcessId ProcessId :: Int -> ProcessId [_fromProcessId] :: ProcessId -> Int -- | Every function for Process things needs such a proxy value for -- the low-level effect list, i.e. the effects identified by -- r in Process r : r, this might be -- dependent on the scheduler implementation. data SchedulerProxy :: [Type -> Type] -> Type -- | Tell the typechecker what effects we have below Process [SchedulerProxy] :: SchedulerProxy q -- | Like SchedulerProxy but shorter [SP] :: SchedulerProxy q -- | Cons Process onto a list of effects. type ConsProcess r = Process r : r -- | Every Process action returns it's actual result wrapped in this -- type. It will allow to signal errors as well as pass on normal results -- such as incoming messages. data ResumeProcess v -- | The process is required to exit. [ShutdownRequested] :: ResumeProcess v -- | The process is required to exit from an error condition, that cannot -- be recovered from. [OnError] :: String -> ResumeProcess v -- | The process may resume to do work, using the given result. [ResumeWith] :: a -> ResumeProcess a -- | This indicates that the action did not complete, and maybe retried [RetryLastAction] :: ResumeProcess v -- | The process effect is the basis for message passing concurrency. This -- effect describes an interface for concurrent, communicating isolated -- processes identified uniquely by a process-id. -- -- Processes can raise exceptions that can be caught, exit gracefully or -- with an error, or be killed by other processes, with the option of -- ignoring the shutdown request. -- -- Process Scheduling is implemented in different modules. All scheduler -- implementations should follow some basic rules: -- --
-- data BookShop deriving Typeable
--
-- data instance Api BookShop r where
-- RentBook :: BookId -> Api BookShop ('Synchronous (Either RentalError RentalId))
-- BringBack :: RentalId -> Api BookShop 'Asynchronous
--
-- type BookId = Int
-- type RentalId = Int
-- type RentalError = String
--
fromServer :: forall api_aoVK api_ap5G. Iso (Server api_aoVK) (Server api_ap5G) ProcessId ProcessId
-- | Tag a ProcessId with an Api type index to mark it a
-- Server process handling that API
proxyAsServer :: proxy api -> ProcessId -> Server api
-- | Tag a ProcessId with an Api type index to mark it a
-- Server process handling that API
asServer :: forall api. ProcessId -> Server api
-- | Instead of passing around a Server value and passing to
-- functions like cast or call, a Server can
-- provided by a Reader effect, if there is only a single
-- server for a given Api instance. This type alias is
-- convenience to express that an effect has Process and a reader
-- for a Server.
type ServesApi o r q = (Typeable o, SetMember Process (Process q) r, Member (Reader (Server o)) r)
-- | Send an Api request that has no return value and return as fast
-- as possible. The type signature enforces that the corresponding
-- Api clause is Asynchronous. Return True if the
-- message was sent to the process. Note that this is totally not the
-- same as that the request was successfully handled. If that is
-- important, use call instead.
castChecked :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o 'Asynchronous)) => SchedulerProxy q -> Server o -> Api o 'Asynchronous -> Eff r Bool
-- | Send an Api request that has no return value and return as fast
-- as possible. The type signature enforces that the corresponding
-- Api clause is Asynchronous.
cast :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o 'Asynchronous)) => SchedulerProxy q -> Server o -> Api o 'Asynchronous -> Eff r ()
-- | Send an Api request and wait for the server to return a result
-- value.
--
-- The type signature enforces that the corresponding Api clause
-- is Synchronous.
call :: forall result api r q. (SetMember Process (Process q) r, Typeable api, Typeable (Api api ( 'Synchronous result)), Typeable result, HasCallStack) => SchedulerProxy q -> Server api -> Api api ( 'Synchronous result) -> Eff r result
-- | Run a reader effect that contains the one server handling a
-- specific Api instance.
registerServer :: Server o -> Eff (Reader (Server o) : r) a -> Eff r a
-- | Like call but take the Server from the reader provided
-- by registerServer.
callRegistered :: (Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o ( 'Synchronous reply) -> Eff r reply
-- | Like callRegistered but also catch errors raised if e.g. the
-- server crashed. By allowing Alternative instances to contain
-- the reply, application level errors can be combined with errors rising
-- from inter process communication.
callRegisteredA :: forall r q o f reply. (Alternative f, Typeable f, Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o ( 'Synchronous (f reply)) -> Eff r (f reply)
-- | Like cast but take the Server from the reader provided
-- by registerServer.
castRegistered :: (Typeable o, ServesApi o r q) => SchedulerProxy q -> Api o 'Asynchronous -> Eff r ()
-- | An exception that is used by the mechanism that chains together
-- multiple different ApiHandler allowing a single process to
-- implement multiple Apis. This exception is thrown by
-- requestFromDynamic. This exception can be caught with
-- catchUnhandled, this way, several distinct ApiHandler
-- can be tried until one fits or until the exitUnhandled
-- is invoked.
data UnhandledRequest
-- | A record of callbacks, handling requests sent to a server
-- Process, all belonging to a specific Api family
-- instance.
data ApiHandler p r
[ApiHandler] :: {_handleCast :: HasCallStack => Api p 'Asynchronous -> Eff r () A cast will not return a result directly. This is used for async methods., _handleCall :: forall x. HasCallStack => Api p ( 'Synchronous x) -> (x -> Eff r ()) -> Eff r () A call is a blocking operation, the caller is blocked until this handler calls the reply continuation., _handleTerminate :: HasCallStack => Maybe String -> Eff r () This callback is called with @Nothing@ if the process exits peacefully, or @Just "error message..."@ if the process exits with an error. This function is responsible to exit the process if necessary. The default behavior is defined in 'defaultTermination'.} -> ApiHandler p r
-- | Receive and process incoming requests until the process exits, using
-- an ApiHandler.
serve :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Eff r ()
-- | A default handler to use in _handleCall in ApiHandler.
-- It will call raiseError with a nice error message.
unhandledCallError :: forall p x r q. (Show (Api p ( 'Synchronous x)), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p ( 'Synchronous x) -> (x -> Eff r ()) -> Eff r ()
-- | A default handler to use in _handleCast in ApiHandler.
-- It will call raiseError with a nice error message.
unhandledCastError :: forall p r q. (Show (Api p 'Asynchronous), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p 'Asynchronous -> Eff r ()
-- | Exit the process either normally of the error message is
-- Nothing or with exitWithError otherwise.
defaultTermination :: forall q r. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Maybe String -> Eff r ()
-- | serve two ApiHandlers at once. The first handler is used
-- for termination handling.
serveBoth :: forall r q p1 p2. (Typeable p1, Typeable p2, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> Eff r ()
-- | serve three ApiHandlers at once. The first handler is
-- used for termination handling.
serve3 :: forall r q p1 p2 p3. (Typeable p1, Typeable p2, Typeable p3, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> ApiHandler p3 r -> Eff r ()
-- | The basic building block of the combination of ApiHandlers is
-- this function, which can not only be passed to receiveLoop, but
-- also throws an UnhandledRequest exception if
-- requestFromDynamic failed, such that multiple invokation of
-- this function for different ApiHandlers can be tried, by using
-- catchUnhandled.
--
-- -- tryApiHandler px handlers1 message -- `catchUnhandled` -- tryApiHandler px handlers2 -- `catchUnhandled` -- tryApiHandler px handlers3 --tryApiHandler :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Dynamic -> Eff (Exc UnhandledRequest : r) () -- | If requestFromDynamic failes to cast the message to a -- Request for a certain ApiHandler it throws an -- UnhandledRequest exception. That exception is caught by this -- function and the raw message is given to the handler function. This is -- the basis for chaining ApiHandlers. catchUnhandled :: forall r a. (Member (Exc UnhandledRequest) r, HasCallStack) => Eff r a -> (Dynamic -> Eff r a) -> Eff r a -- | Catch UnhandledRequests and terminate the process with an -- error, if necessary. ensureAllHandled :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff (Exc UnhandledRequest : r) () -> Eff r () -- | Cast a Dynamic value, and if that fails, throw an -- UnhandledRequest error. requestFromDynamic :: forall r a. (HasCallStack, Typeable a, Member (Exc UnhandledRequest) r) => Dynamic -> Eff r a -- | If an incoming message could not be casted to a Request -- corresponding to an ApiHandler (e.g. with -- requestFromDynamic) one should use this function to exit the -- process with a corresponding error. exitUnhandled :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Dynamic -> Eff r () -- | An Observer that schedules the observations to an effectful -- callback. data CallbackObserver o -- | Internal state for manageobservers data Observers o -- | An existential wrapper around a Server of an Observer. -- Needed to support different types of observers to observe the same -- Observable in a general fashion. data SomeObserver o [SomeObserver] :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o -- | An Api index that supports registration and de-registration of -- Observers. class (Typeable o, Typeable (Observation o)) => Observable o where { data family Observation o; } -- | Return the Api value for the cast_ that registeres an -- observer registerObserverMessage :: Observable o => SomeObserver o -> Api o 'Asynchronous -- | Return the Api value for the cast_ that de-registeres -- an observer forgetObserverMessage :: Observable o => SomeObserver o -> Api o 'Asynchronous -- | An Api index that support observation of the another Api -- that is Observable. class (Typeable p, Observable o) => Observer p o -- | Wrap the Observation and the ProcessId (i.e. the -- Server) that caused the observation into a Api value -- that the Observable understands. observationMessage :: Observer p o => Server o -> Observation o -> Api p 'Asynchronous -- | Send an Observation to an Observer notifyObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Observation o -> Eff r () -- | Send the registerObserverMessage registerObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Eff r () -- | Send the forgetObserverMessage forgetObserver :: (SetMember Process (Process q) r, Observable o, Observer p o) => SchedulerProxy q -> Server p -> Server o -> Eff r () -- | Send an Observation to SomeObserver. notifySomeObserver :: (SetMember Process (Process q) r, Observable o, HasCallStack) => SchedulerProxy q -> Server o -> Observation o -> SomeObserver o -> Eff r () -- | Keep track of registered Observers Observers can be added and -- removed, and an Observation can be sent to all registerd -- observers at once. manageObservers :: Eff (State (Observers o) : r) a -> Eff r a -- | Add an Observer to the Observers managed by -- manageObservers. addObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r () -- | Delete an Observer from the Observers managed by -- manageObservers. removeObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r () -- | Send an Observation to all SomeObservers in the -- Observers state. notifyObservers :: forall o r q. (Observable o, SetMember Process (Process q) r, Member (State (Observers o)) r) => SchedulerProxy q -> Observation o -> Eff r () -- | Start a new process for an Observer that schedules all -- observations to an effectful callback. spawnCallbackObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q) => SchedulerProxy q -> (Server o -> Observation o -> Eff (Process q : q) Bool) -> Eff r (Server (CallbackObserver o)) -- | Use spawnCallbackObserver to create a universal logging -- observer, using the Show instance of the Observation. | -- Start a new process for an Observer that schedules all -- observations to an effectful callback. spawnLoggingObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q) => SchedulerProxy q -> Eff r (Server (CallbackObserver o)) -- | The concrete list of Effects for this scheduler implementation. -- See HasSchedulerIO type SchedulerIO = '[Reader SchedulerVar, Logs LogMessage, Lift IO] -- | An alias for the constraints for the effects essential to this -- scheduler implementation, i.e. these effects allow spawning new -- Processes. See SchedulerIO type HasSchedulerIO r = (HasCallStack, SetMember Lift (Lift IO) r, Member (Logs LogMessage) r, Member (Reader SchedulerVar) r) -- | A sum-type with errors that can occur when scheduleing messages. data SchedulerError -- | No process info was found for a ProcessId during internal -- processing. NOTE: This is **ONLY** caused by internal errors, probably -- by an incorrect MessagePassing handler in this module. -- **Sending a message to a process ALWAYS succeeds!** Even if the -- process does not exist. ProcessNotFound :: ProcessId -> SchedulerError -- | A process called raiseError. ProcessRaisedError :: String -> SchedulerError -- | A process called exitWithError. ProcessExitError :: String -> SchedulerError -- | A process exits. ProcessShuttingDown :: SchedulerError -- | An action was not performed while the scheduler was exiting. SchedulerShuttingDown :: SchedulerError -- | A SchedulerProxy for SchedulerIO forkIoScheduler :: SchedulerProxy SchedulerIO -- | This is the main entry point to running a message passing concurrency -- application. This function takes a Process on top of the -- SchedulerIO effect and a LogChannel for concurrent -- logging. schedule :: Eff (ConsProcess SchedulerIO) () -> LogChannel LogMessage -> IO () -- | Start the message passing concurrency system then execute a -- Process on top of SchedulerIO effect. All logging is -- sent to standard output. defaultMain :: Eff (ConsProcess SchedulerIO) () -> IO () -- | Start the message passing concurrency system then execute a -- Process on top of SchedulerIO effect. All logging is -- sent to standard output. defaultMainWithLogChannel :: LogChannel LogMessage -> Eff (ConsProcess SchedulerIO) () -> IO () -- | Like schedule but pure. The yield effect is -- just return (). schedulePure == runIdentity . -- scheduleM (Identity . run) (return ()) schedulePure :: Eff (ConsProcess '[]) a -> Either String a