-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Message passing concurrency as extensible-effect -- -- Please see the README on GitHub at -- https://github.com/sheyll/extensible-effects-concurrent#readme @package extensible-effects-concurrent @version 0.6.3 -- | The message passing effect. -- -- This module describes an abstract message passing effect, and a -- process effect, mimicking Erlang's process and message semantics. -- -- Two scheduler implementations for the Process effect are -- provided: -- -- module Control.Eff.Concurrent.Process -- | Each process is identified by a single process id, that stays constant -- throughout the life cycle of a process. Also, message sending relies -- on these values to address messages to processes. newtype ProcessId ProcessId :: Int -> ProcessId [_fromProcessId] :: ProcessId -> Int fromProcessId :: Iso' ProcessId Int -- | The process effect is the basis for message passing concurrency. This -- effect describes an interface for concurrent, communicating isolated -- processes identified uniquely by a process-id. -- -- Processes can raise exceptions that can be caught, exit gracefully or -- with an error, or be killed by other processes, with the option of -- ignoring the shutdown request. -- -- Process Scheduling is implemented in different modules. All scheduler -- implementations should follow some basic rules: -- -- data Process (r :: [Type -> Type]) b -- | In cooperative schedulers, this will give processing time to the -- scheduler. Every other operation implicitly serves the same purpose. [YieldProcess] :: Process r (ResumeProcess ()) -- | Return the current ProcessId [SelfPid] :: Process r (ResumeProcess ProcessId) -- | Start a new process, the new process will execute an effect, the -- function will return immediately with a ProcessId. [Spawn] :: Eff (Process r : r) () -> Process r (ResumeProcess ProcessId) -- | Process exit, this is the same as if the function that was applied to -- a spawn function returned. [Shutdown] :: Process r a -- | Exit the process due to an error, this cannot be caught. [ExitWithError] :: String -> Process r b -- | Raise an error, that can be handled. [RaiseError] :: String -> Process r b -- | Request that another a process exits. The targeted process is -- interrupted and gets a ShutdownRequested, the target process -- may decide to ignore the shutdown requests. [SendShutdown] :: ProcessId -> Process r (ResumeProcess Bool) -- | Send a message to a process addressed by the ProcessId. Sending -- a message should **always succeed** and return **immediately**, even -- if the destination process does not exist, or does not accept messages -- of the given type. [SendMessage] :: ProcessId -> Dynamic -> Process r (ResumeProcess Bool) -- | Receive a message. This should block until an a message was received. -- The pure function may convert the incoming message into something, and -- the result is returned as ProcessMessage value. Another -- reason why this function returns, is if a process control message was -- sent to the process. This can only occur from inside the runtime -- system, aka the effect handler implementation. (Currently there is one -- in ForkIOScheduler.) [ReceiveMessage] :: Process r (ResumeProcess Dynamic) -- | Cons Process onto a list of effects. type ConsProcess r = Process r : r -- | Every Process action returns it's actual result wrapped in this -- type. It will allow to signal errors as well as pass on normal results -- such as incoming messages. data ResumeProcess v -- | The process is required to exit. [ShutdownRequested] :: ResumeProcess v -- | The process is required to exit from an error condition, that cannot -- be recovered from. [OnError] :: String -> ResumeProcess v -- | The process may resume to do work, using the given result. [ResumeWith] :: a -> ResumeProcess a -- | This indicates that the action did not complete, and maybe retried [RetryLastAction] :: ResumeProcess v -- | Every function for Process things needs such a proxy value for -- the low-level effect list, i.e. the effects identified by -- r in Process r : r, this might be -- dependent on the scheduler implementation. data SchedulerProxy :: [Type -> Type] -> Type -- | Tell the typechecker what effects we have below Process [SchedulerProxy] :: SchedulerProxy q -- | Like SchedulerProxy but shorter [SP] :: SchedulerProxy q -- | Return a SchedulerProxy for a Process effect. thisSchedulerProxy :: Eff (Process r : r) (SchedulerProxy r) -- | Execute a and action and resume the process, retry the action, -- shutdown the process or return an error. executeAndCatch :: forall q r v. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Eff r (ResumeProcess v) -> Eff r (Either String v) -- | Execute a Process action and resume the process, retry the -- action or exit the process when a shutdown was requested. executeAndResume :: forall r q v. (SetMember Process (Process q) r, HasCallStack) => Process q (ResumeProcess v) -> Eff r v -- | Use executeAndResume to execute YieldProcess. Refer to -- YieldProcess for more information. yieldProcess :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r () -- | Send a message to a process addressed by the ProcessId. See -- SendMessage. sendMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r () -- | Send a message to a process addressed by the ProcessId. See -- SendMessage. sendMessageAs :: forall o r q. (HasCallStack, SetMember Process (Process q) r, Typeable o) => SchedulerProxy q -> ProcessId -> o -> Eff r () -- | Send a message to a process addressed by the ProcessId. See -- SendMessage. Return True if the process existed. I you -- don't care, just sendMessage instead. sendMessageChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r Bool -- | Start a new process, the new process will execute an effect, the -- function will return immediately with a ProcessId. spawn :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q : q) () -> Eff r ProcessId -- | Like spawn but return (). spawn_ :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q : q) () -> Eff r () -- | Block until a message was received. receiveMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r Dynamic -- | Receive and cast the message to some Typeable instance. receiveMessageAs :: forall a r q. (HasCallStack, Typeable a, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -- | Enter a loop to receive messages and pass them to a callback, until -- the function returns False. receiveLoop :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> (Either (Maybe String) Dynamic -> Eff r ()) -> Eff r () -- | Returns the ProcessId of the current process. self :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ProcessId -- | Exit a process addressed by the ProcessId. See -- SendShutdown. sendShutdown :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r () -- | Like sendShutdown, but also return True iff the -- process to exit exists. sendShutdownChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r Bool -- | Exit the process with an error. exitWithError :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r a -- | Exit the process. exitNormally :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -- | Thrown an error, can be caught by catchRaisedError. raiseError :: forall r q b. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r b -- | Catch and handle an error raised by raiseError. Works -- independent of the handler implementation. catchRaisedError :: forall r q w. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> (String -> Eff r w) -> Eff r w -> Eff r w -- | Like catchRaisedError it catches raiseError, but instead -- of invoking a user provided handler, the result is wrapped into an -- Either. ignoreProcessError :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -> Eff r (Either String a) instance GHC.Real.Real Control.Eff.Concurrent.Process.ProcessId instance GHC.Real.Integral Control.Eff.Concurrent.Process.ProcessId instance GHC.Enum.Enum Control.Eff.Concurrent.Process.ProcessId instance GHC.Num.Num Control.Eff.Concurrent.Process.ProcessId instance GHC.Enum.Bounded Control.Eff.Concurrent.Process.ProcessId instance GHC.Classes.Ord Control.Eff.Concurrent.Process.ProcessId instance GHC.Classes.Eq Control.Eff.Concurrent.Process.ProcessId instance GHC.Generics.Generic1 Control.Eff.Concurrent.Process.ResumeProcess instance GHC.Generics.Generic (Control.Eff.Concurrent.Process.ResumeProcess v) instance Data.Traversable.Traversable Control.Eff.Concurrent.Process.ResumeProcess instance GHC.Classes.Ord v => GHC.Classes.Ord (Control.Eff.Concurrent.Process.ResumeProcess v) instance GHC.Classes.Eq v => GHC.Classes.Eq (Control.Eff.Concurrent.Process.ResumeProcess v) instance GHC.Show.Show v => GHC.Show.Show (Control.Eff.Concurrent.Process.ResumeProcess v) instance GHC.Base.Functor Control.Eff.Concurrent.Process.ResumeProcess instance Data.Foldable.Foldable Control.Eff.Concurrent.Process.ResumeProcess instance GHC.Read.Read Control.Eff.Concurrent.Process.ProcessId instance GHC.Show.Show Control.Eff.Concurrent.Process.ProcessId instance Control.DeepSeq.NFData a => Control.DeepSeq.NFData (Control.Eff.Concurrent.Process.ResumeProcess a) instance Control.DeepSeq.NFData1 Control.Eff.Concurrent.Process.ResumeProcess -- | This module contains a mechanism to specify what kind of messages (aka -- requests) a Server (Process) can handle, and if -- the caller blocks and waits for an answer, which the server process -- provides. -- -- The type magic in the Api type familiy allows to define a -- related set of requests along with the corresponding responses. -- -- Request handling can be either blocking, if a response is requred, or -- non-blocking. -- -- A process can serve a specific Api instance by using the -- functions provided by the Control.Eff.Concurrent.Api.Server -- module. -- -- To enable a process to use such a service, the functions -- provided by the Control.Eff.Concurrent.Api.Client should be -- used. module Control.Eff.Concurrent.Api -- | This data family defines an API, a communication interface description -- between at least two processes. The processes act as servers or -- client(s) regarding a specific instance of this type. -- -- The first parameter is usually a user defined phantom type that -- identifies the Api instance. -- -- The second parameter specifies if a specific constructor of an -- (GADT-like) Api instance is Synchronous, i.e. returns -- a result and blocks the caller or if it is Asynchronous -- -- Example: -- --
--   data BookShop deriving Typeable
--   
--   data instance Api BookShop r where
--     RentBook  :: BookId   -> Api BookShop ('Synchronous (Either RentalError RentalId))
--     BringBack :: RentalId -> Api BookShop 'Asynchronous
--   
--   type BookId = Int
--   type RentalId = Int
--   type RentalError = String
--   
-- | The (promoted) constructors of this type specify (at the type level) -- the reply behavior of a specific constructor of an Api -- instance. data Synchronicity -- | Specify that handling a request is a blocking operation with a -- specific return type, e.g. ('Synchronous (Either RentalError -- RentalId)) Synchronous :: Type -> Synchronicity -- | Non-blocking, asynchronous, request handling Asynchronous :: Synchronicity -- | This is a tag-type that wraps around a ProcessId and holds an -- Api index type. newtype Server api Server :: ProcessId -> Server api [_fromServer] :: Server api -> ProcessId fromServer :: forall api_aoVK api_ap5G. Iso (Server api_aoVK) (Server api_ap5G) ProcessId ProcessId -- | Tag a ProcessId with an Api type index to mark it a -- Server process handling that API proxyAsServer :: proxy api -> ProcessId -> Server api -- | Tag a ProcessId with an Api type index to mark it a -- Server process handling that API asServer :: forall api. ProcessId -> Server api instance forall k (api :: k). GHC.Classes.Ord (Control.Eff.Concurrent.Api.Server api) instance forall k (api :: k). GHC.Classes.Eq (Control.Eff.Concurrent.Api.Server api) instance forall k (api :: k). GHC.Read.Read (Control.Eff.Concurrent.Api.Server api) instance forall k (api :: k). Data.Typeable.Internal.Typeable api => GHC.Show.Show (Control.Eff.Concurrent.Api.Server api) -- | Functions to implement Api servers. module Control.Eff.Concurrent.Api.Server -- | Receive and process incoming requests until the process exits, using -- an ApiHandler. serve :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Eff r () -- | A record of callbacks, handling requests sent to a server -- Process, all belonging to a specific Api family -- instance. data ApiHandler p r [ApiHandler] :: {_handleCast :: HasCallStack => Api p 'Asynchronous -> Eff r () A cast will not return a result directly. This is used for async methods., _handleCall :: forall x. HasCallStack => Api p ( 'Synchronous x) -> (x -> Eff r ()) -> Eff r () A call is a blocking operation, the caller is blocked until this handler calls the reply continuation., _handleTerminate :: HasCallStack => Maybe String -> Eff r () This callback is called with @Nothing@ if the process exits peacefully, or @Just "error message..."@ if the process exits with an error. This function is responsible to exit the process if necessary. The default behavior is defined in 'defaultTermination'.} -> ApiHandler p r -- | A default handler to use in _handleCall in ApiHandler. -- It will call raiseError with a nice error message. unhandledCallError :: forall p x r q. (Show (Api p ( 'Synchronous x)), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p ( 'Synchronous x) -> (x -> Eff r ()) -> Eff r () -- | A default handler to use in _handleCast in ApiHandler. -- It will call raiseError with a nice error message. unhandledCastError :: forall p r q. (Show (Api p 'Asynchronous), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p 'Asynchronous -> Eff r () -- | Exit the process either normally of the error message is -- Nothing or with exitWithError otherwise. defaultTermination :: forall q r. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Maybe String -> Eff r () -- | serve two ApiHandlers at once. The first handler is used -- for termination handling. serveBoth :: forall r q p1 p2. (Typeable p1, Typeable p2, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> Eff r () -- | serve three ApiHandlers at once. The first handler is -- used for termination handling. serve3 :: forall r q p1 p2 p3. (Typeable p1, Typeable p2, Typeable p3, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> ApiHandler p3 r -> Eff r () -- | The basic building block of the combination of ApiHandlers is -- this function, which can not only be passed to receiveLoop, but -- also throws an UnhandledRequest exception if -- requestFromDynamic failed, such that multiple invokation of -- this function for different ApiHandlers can be tried, by using -- catchUnhandled. -- --
--   tryApiHandler px handlers1 message
--     `catchUnhandled`
--   tryApiHandler px handlers2
--     `catchUnhandled`
--   tryApiHandler px handlers3
--   
tryApiHandler :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Dynamic -> Eff (Exc UnhandledRequest : r) () -- | An exception that is used by the mechanism that chains together -- multiple different ApiHandler allowing a single process to -- implement multiple Apis. This exception is thrown by -- requestFromDynamic. This exception can be caught with -- catchUnhandled, this way, several distinct ApiHandler -- can be tried until one fits or until the exitUnhandled -- is invoked. data UnhandledRequest -- | If requestFromDynamic failes to cast the message to a -- Request for a certain ApiHandler it throws an -- UnhandledRequest exception. That exception is caught by this -- function and the raw message is given to the handler function. This is -- the basis for chaining ApiHandlers. catchUnhandled :: forall r a. (Member (Exc UnhandledRequest) r, HasCallStack) => Eff r a -> (Dynamic -> Eff r a) -> Eff r a -- | Catch UnhandledRequests and terminate the process with an -- error, if necessary. ensureAllHandled :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff (Exc UnhandledRequest : r) () -> Eff r () -- | Cast a Dynamic value, and if that fails, throw an -- UnhandledRequest error. requestFromDynamic :: forall r a. (HasCallStack, Typeable a, Member (Exc UnhandledRequest) r) => Dynamic -> Eff r a -- | If an incoming message could not be casted to a Request -- corresponding to an ApiHandler (e.g. with -- requestFromDynamic) one should use this function to exit the -- process with a corresponding error. exitUnhandled :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Dynamic -> Eff r () -- | Functions for Api clients. -- -- This modules is required to write clients that consume an Api. module Control.Eff.Concurrent.Api.Client -- | Send an Api request that has no return value and return as fast -- as possible. The type signature enforces that the corresponding -- Api clause is Asynchronous. cast :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o 'Asynchronous)) => SchedulerProxy q -> Server o -> Api o 'Asynchronous -> Eff r () -- | Send an Api request that has no return value and return as fast -- as possible. The type signature enforces that the corresponding -- Api clause is Asynchronous. Return True if the -- message was sent to the process. Note that this is totally not the -- same as that the request was successfully handled. If that is -- important, use call instead. castChecked :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o 'Asynchronous)) => SchedulerProxy q -> Server o -> Api o 'Asynchronous -> Eff r Bool -- | Send an Api request and wait for the server to return a result -- value. -- -- The type signature enforces that the corresponding Api clause -- is Synchronous. call :: forall result api r q. (SetMember Process (Process q) r, Typeable api, Typeable (Api api ( 'Synchronous result)), Typeable result, HasCallStack) => SchedulerProxy q -> Server api -> Api api ( 'Synchronous result) -> Eff r result -- | Like cast but take the Server from the reader provided -- by registerServer. castRegistered :: (Typeable o, ServesApi o r q) => SchedulerProxy q -> Api o 'Asynchronous -> Eff r () -- | Like call but take the Server from the reader provided -- by registerServer. callRegistered :: (Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o ( 'Synchronous reply) -> Eff r reply -- | Like callRegistered but also catch errors raised if e.g. the -- server crashed. By allowing Alternative instances to contain -- the reply, application level errors can be combined with errors rising -- from inter process communication. callRegisteredA :: forall r q o f reply. (Alternative f, Typeable f, Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o ( 'Synchronous (f reply)) -> Eff r (f reply) -- | Instead of passing around a Server value and passing to -- functions like cast or call, a Server can -- provided by a Reader effect, if there is only a single -- server for a given Api instance. This type alias is -- convenience to express that an effect has Process and a reader -- for a Server. type ServesApi o r q = (Typeable o, SetMember Process (Process q) r, Member (ServerReader o) r) -- | The reader effect for ProcessIds for Apis, see -- registerServer type ServerReader o = Reader (Server o) -- | Run a reader effect that contains the one server handling a -- specific Api instance. registerServer :: Server o -> Eff (ServerReader o : r) a -> Eff r a -- | This module provides support for executing Process actions from -- IO. -- -- One use case is interacting with processes from the REPL, e.g.: -- --
--   >>> import Control.Eff.Concurrent.Process.SingleThreadedScheduler (defaultMain)
--   
-- --
--   >>> import Control.Eff.Loop
--   
-- --
--   >>> import Data.Dynamic
--   
-- --
--   >>> import Data.Maybe
--   
-- --
--   >>> s <- forkInteractiveScheduler Control.Eff.Concurrent.Process.SingleThreadedScheduler.defaultMain
--   
-- --
--   >>> fooPid <- submit s (spawn (foreverCheap (receiveMessage SP >>= (logMsg . fromMaybe "Huh!??" . fromDynamic))))
--   
-- --
--   >>> fooPid
--   <0.1.0>
--   
-- --
--   >>> submit s (sendMessageAs SP fooPid "test")
--   test
--   
-- --
--   >>> submit s (sendShutdown SP fooPid)
--   
module Control.Eff.Concurrent.Process.Interactive -- | Contains the communication channels to interact with a scheduler -- running in its' own thread. data SchedulerSession r -- | Fork a scheduler with a process that communicates with it via -- MVar, which is also the reason for the Lift IO -- constraint. forkInteractiveScheduler :: forall r. (SetMember Lift (Lift IO) r) => (Eff (Process r : r) () -> IO ()) -> IO (SchedulerSession r) -- | Exit the schedulder immediately using an asynchronous exception. killInteractiveScheduler :: SchedulerSession r -> IO () -- | Send a Process effect to the main process of a scheduler, this -- blocks until the effect is executed. submit :: forall r a. (SetMember Lift (Lift IO) r) => SchedulerSession r -> Eff (Process r : r) a -> IO a -- | Combination of submit and cast. submitCast :: forall o r. (SetMember Lift (Lift IO) r, Typeable o) => SchedulerSession r -> Server o -> Api o 'Asynchronous -> IO () -- | Combination of submit and cast. submitCall :: forall o q r. (SetMember Lift (Lift IO) r, Typeable o, Typeable q) => SchedulerSession r -> Server o -> Api o ( 'Synchronous q) -> IO q -- | Add-ons to Exception module Control.Eff.ExceptionExtra -- | Catch Exception thrown by an effect. liftTry :: forall e r a. (HasCallStack, Exception e, SetMember Lift (Lift IO) r) => Eff r a -> Eff r (Either e a) -- | A logging effect based on MonadLog. module Control.Eff.Log.Handler -- | Logging effect type, parameterized by a log message type. data Logs message a [LogMsg] :: message -> Logs message () -- | Log a message. logMsg :: Member (Logs m) r => m -> Eff r () -- | Change, add or remove log messages and perform arbitrary actions upon -- intercepting a log message. -- -- Requirements: -- -- -- -- Approach: Install a callback that sneaks into to log message -- sending/receiving, to intercept the messages and execute some code and -- then return a new message. interceptLogging :: forall r m a. Member (Logs m) r => (m -> Eff r ()) -> Eff r a -> Eff r a -- | Intercept logging to change, add or remove log messages. -- -- This is without side effects, hence faster than -- interceptLogging. foldLogMessages :: forall r m a f. (Foldable f, Member (Logs m) r) => (m -> f m) -> Eff r a -> Eff r a -- | Handle a Logs effect with a message that has a Show -- instance by **re-logging** each message applied to show. relogAsString :: forall m e a. (Show m, Member (Logs String) e) => Eff (Logs m : e) a -> Eff e a -- | Capture all log messages in a Seq (strict). captureLogs :: NFData message => Eff (Logs message : r) a -> Eff r (a, Seq message) -- | Throw away all log messages. ignoreLogs :: forall message r a. Eff (Logs message : r) a -> Eff r a -- | Apply a function that returns an effect to each log message. handleLogsWith :: forall message e a. (message -> Eff e ()) -> Eff (Logs message : e) a -> Eff e a -- | Handle the Logs effect with a monadic call back function -- (strict). handleLogsLifted :: forall m r message a. (NFData message, Monad m, SetMember Lift (Lift m) r) => (message -> m ()) -> Eff (Logs message : r) a -> Eff r a -- | Handle the Logs effect using LoggingT Handlers. handleLogsWithLoggingTHandler :: forall m r message a. (Monad m, SetMember Lift (Lift m) r) => (forall b. (Handler m message -> m b) -> m b) -> Eff (Logs message : r) a -> Eff r a -- | Concurrent Logging module Control.Eff.Log.Channel -- | A log channel processes logs from the Logs effect by en-queuing -- them in a shared queue read from a seperate processes. A channel can -- contain log message filters. data LogChannel message -- | Send the log messages to a LogChannel. logToChannel :: forall r message a. (SetMember Lift (Lift IO) r) => LogChannel message -> Eff (Logs message : r) a -> Eff r a -- | Create a LogChannel that will discard all messages sent via -- forwardLogstochannel or logChannelPutIO. noLogger :: LogChannel message -- | Fork a new process, that applies a monadic action to all log messages -- sent via logToChannel or logChannelPutIO. forkLogger :: forall message. (Typeable message) => Int -> (message -> IO ()) -> Maybe message -> IO (LogChannel message) -- | Filter logs sent to a LogChannel using a predicate. filterLogChannel :: (message -> Bool) -> LogChannel message -> LogChannel message -- | Close a log channel created by e.g. forkLogger. Message already -- enqueue are handled. Subsequent log message will not be handled -- anymore. If the log channel must be closed immediately, use -- killLogChannel instead. joinLogChannel :: (Typeable message) => LogChannel message -> IO () -- | Close a log channel quickly, without logging messages already in the -- queue. Subsequent logging requests will not be handled anymore. If the -- log channel must be closed without loosing any messages, use -- joinLogChannel instead. killLogChannel :: (Typeable message) => LogChannel message -> IO () -- | Run an action and close a LogChannel created by -- noLogger, forkLogger or filterLogChannel -- afterwards using joinLogChannel. If a SomeException was -- thrown, the log channel is killed with killLogChannel, and the -- exception is re-thrown. closeLogChannelAfter :: (Typeable message, IsString message) => Maybe message -> LogChannel message -> IO a -> IO a -- | Wrap LogChannel creation and destruction around a monad action -- in brackety manner. This function uses joinLogChannel, -- so en-queued messages are flushed on exit. The resulting action is a -- LoggingT action, which is essentially a reader for a log -- handler function in IO. logChannelBracket :: (Typeable message) => Int -> Maybe message -> (LogChannel message -> IO a) -> LoggingT message IO a -- | Enqueue a log message into a log channel logChannelPutIO :: LogChannel message -> message -> IO () -- | Internal exception to shutdown a LogChannel process created by -- forkLogger. This exception is handled such that all message -- already en-queued are handled and then an optional final message is -- written. data JoinLogChannelException -- | Internal exception to **immediately** shutdown a LogChannel -- process created by forkLogger, other than -- JoinLogChannelException the message queue will not be flushed, -- not further messages will be logged, except for the optional final -- message. data KillLogChannelException instance GHC.Show.Show Control.Eff.Log.Channel.KillLogChannelException instance GHC.Show.Show Control.Eff.Log.Channel.JoinLogChannelException instance GHC.Exception.Exception Control.Eff.Log.Channel.KillLogChannelException instance GHC.Exception.Exception Control.Eff.Log.Channel.JoinLogChannelException -- | An RFC 5434 inspired log message and convenience functions for logging -- them. module Control.Eff.Log.Message -- | A message data type inspired by the RFC-5424 Syslog Protocol data LogMessage LogMessage :: Facility -> Severity -> Maybe UTCTime -> Maybe String -> Maybe String -> Maybe String -> Maybe String -> [StructuredDataElement] -> Maybe ThreadId -> Maybe SrcLoc -> String -> LogMessage [_lmFacility] :: LogMessage -> Facility [_lmSeverity] :: LogMessage -> Severity [_lmTimestamp] :: LogMessage -> Maybe UTCTime [_lmHostname] :: LogMessage -> Maybe String [_lmAppname] :: LogMessage -> Maybe String [_lmProcessId] :: LogMessage -> Maybe String [_lmMessageId] :: LogMessage -> Maybe String [_lmStructuredData] :: LogMessage -> [StructuredDataElement] [_lmThreadId] :: LogMessage -> Maybe ThreadId [_lmSrcLoc] :: LogMessage -> Maybe SrcLoc [_lmMessage] :: LogMessage -> String -- | Render a LogMessage according to the rules in the given RFC, -- except for the rules concerning unicode and ascii renderRFC5424 :: LogMessage -> String -- | Render a LogMessage but set the timestamp and thread id fields. printLogMessage :: LogMessage -> IO () -- | Handle a Logs effect for String messages by re-logging -- the messages as LogMessages with debugSeverity. relogAsDebugMessages :: Member (Logs LogMessage) e => Eff (Logs String : e) a -> Eff e a -- | Log a String as LogMessage with a given Severity. logWithSeverity :: Member (Logs LogMessage) e => Severity -> String -> Eff e () -- | Log a String as emergencySeverity. logEmergency :: Member (Logs LogMessage) e => String -> Eff e () -- | Log a message with alertSeverity. logAlert :: Member (Logs LogMessage) e => String -> Eff e () -- | Log a criticalSeverity message. logCritical :: Member (Logs LogMessage) e => String -> Eff e () -- | Log a errorSeverity message. logError :: Member (Logs LogMessage) e => String -> Eff e () -- | Log a warningSeverity message. logWarning :: Member (Logs LogMessage) e => String -> Eff e () -- | Log a noticeSeverity message. logNotice :: Member (Logs LogMessage) e => String -> Eff e () -- | Log a informationalSeverity message. logInfo :: Member (Logs LogMessage) e => String -> Eff e () -- | Log a debugSeverity message. logDebug :: Member (Logs LogMessage) e => String -> Eff e () -- | Construct a LogMessage with errorSeverity errorMessage :: String -> LogMessage -- | Construct a LogMessage with informationalSeverity infoMessage :: String -> LogMessage -- | Construct a LogMessage with debugSeverity debugMessage :: String -> LogMessage -- | Construct a LogMessage with errorSeverity errorMessageIO :: MonadIO m => String -> m LogMessage -- | Construct a LogMessage with informationalSeverity infoMessageIO :: MonadIO m => String -> m LogMessage -- | Construct a LogMessage with debugSeverity debugMessageIO :: MonadIO m => String -> m LogMessage -- | An rfc 5424 severity data Severity emergencySeverity :: Severity alertSeverity :: Severity criticalSeverity :: Severity errorSeverity :: Severity warningSeverity :: Severity noticeSeverity :: Severity informationalSeverity :: Severity debugSeverity :: Severity -- | An rfc 5424 facility data Facility kernelMessages :: Facility userLevelMessages :: Facility mailSystem :: Facility systemDaemons :: Facility securityAuthorizationMessages4 :: Facility linePrinterSubsystem :: Facility networkNewsSubsystem :: Facility uucpSubsystem :: Facility clockDaemon :: Facility securityAuthorizationMessages10 :: Facility ftpDaemon :: Facility ntpSubsystem :: Facility logAuditFacility :: Facility logAlertFacility :: Facility clockDaemon2 :: Facility local0 :: Facility local1 :: Facility local2 :: Facility local3 :: Facility local4 :: Facility local5 :: Facility local6 :: Facility local7 :: Facility lmFacility :: Lens' LogMessage Facility lmSeverity :: Lens' LogMessage Severity lmTimestamp :: Lens' LogMessage (Maybe UTCTime) lmHostname :: Lens' LogMessage (Maybe String) lmAppname :: Lens' LogMessage (Maybe String) lmProcessId :: Lens' LogMessage (Maybe String) lmMessageId :: Lens' LogMessage (Maybe String) lmStructuredData :: Lens' LogMessage [StructuredDataElement] lmSrcLoc :: Lens' LogMessage (Maybe SrcLoc) lmThreadId :: Lens' LogMessage (Maybe ThreadId) lmMessage :: Lens' LogMessage String -- | Put the source location of the given callstack in lmSrcLoc setCallStack :: CallStack -> LogMessage -> LogMessage -- | RFC-5424 defines how structured data can be included in a log message. data StructuredDataElement SdElement :: String -> [SdParameter] -> StructuredDataElement [_sdElementId] :: StructuredDataElement -> String [_sdElementParameters] :: StructuredDataElement -> [SdParameter] sdElementId :: Lens' StructuredDataElement String sdElementParameters :: Lens' StructuredDataElement [SdParameter] instance Data.Default.Class.Default Control.Eff.Log.Message.LogMessage instance Data.String.IsString Control.Eff.Log.Message.LogMessage instance Data.Default.Class.Default Control.Eff.Log.Message.Severity instance Data.Default.Class.Default Control.Eff.Log.Message.Facility instance GHC.Generics.Generic Control.Eff.Log.Message.LogMessage instance GHC.Classes.Eq Control.Eff.Log.Message.LogMessage instance Control.DeepSeq.NFData Control.Eff.Log.Message.Facility instance GHC.Generics.Generic Control.Eff.Log.Message.Facility instance GHC.Show.Show Control.Eff.Log.Message.Facility instance GHC.Classes.Ord Control.Eff.Log.Message.Facility instance GHC.Classes.Eq Control.Eff.Log.Message.Facility instance Control.DeepSeq.NFData Control.Eff.Log.Message.Severity instance GHC.Generics.Generic Control.Eff.Log.Message.Severity instance GHC.Classes.Ord Control.Eff.Log.Message.Severity instance GHC.Classes.Eq Control.Eff.Log.Message.Severity instance GHC.Generics.Generic Control.Eff.Log.Message.StructuredDataElement instance GHC.Classes.Ord Control.Eff.Log.Message.StructuredDataElement instance GHC.Classes.Eq Control.Eff.Log.Message.StructuredDataElement instance GHC.Generics.Generic Control.Eff.Log.Message.SdParameter instance GHC.Classes.Ord Control.Eff.Log.Message.SdParameter instance GHC.Classes.Eq Control.Eff.Log.Message.SdParameter instance Control.DeepSeq.NFData Control.Eff.Log.Message.LogMessage instance GHC.Show.Show Control.Eff.Log.Message.Severity instance GHC.Show.Show Control.Eff.Log.Message.StructuredDataElement instance Control.DeepSeq.NFData Control.Eff.Log.Message.StructuredDataElement instance GHC.Show.Show Control.Eff.Log.Message.SdParameter instance Control.DeepSeq.NFData Control.Eff.Log.Message.SdParameter -- | A logging effect based on MonadLog. module Control.Eff.Log -- | A coroutine based, single threaded scheduler for Processes. module Control.Eff.Concurrent.Process.SingleThreadedScheduler -- | Handle the Process effect, as well as all lower effects using -- an effect handler function. -- -- Execute the main Process and all the other processes -- spawned by it in the current thread concurrently, using a -- co-routine based, round-robin scheduler. If a process exits with -- exitNormally, exitWithError, raiseError or is -- killed by another process Left ... is returned. Otherwise, -- the result will be wrapped in a Right. -- -- Every time a process _yields_ the effects are evaluated down to the a -- value of type m (Either String a). -- -- If the evaluator function runs the action down e.g. IO this -- might improve memory consumption, for long running services, with -- processes that loop endlessly. scheduleM :: Monad m => (forall b. Eff r b -> m b) -> m () -> Eff (ConsProcess r) a -> m (Either String a) -- | Like schedule but pure. The yield effect is -- just return (). schedulePure == runIdentity . -- scheduleM (Identity . run) (return ()) schedulePure :: Eff (ConsProcess '[]) a -> Either String a -- | Invoke schedule with lift yield as yield -- effect. scheduleIO runEff == scheduleM (runLift . runEff) -- (liftIO yield) scheduleIO :: MonadIO m => (forall b. Eff r b -> Eff '[Lift m] b) -> Eff (ConsProcess r) a -> m (Either String a) -- | Invoke schedule with lift yield as yield -- effect. scheduleMonadIOEff == scheduleM id (liftIO -- yield) scheduleMonadIOEff :: MonadIO (Eff r) => Eff (ConsProcess r) a -> Eff r (Either String a) -- | Run processes that have the Logs and the Lift effects. -- The user must provide a log handler function. -- -- Log messages are evaluated strict. -- --
--   scheduleIOWithLogging == run . captureLogs . schedule (return ())
--   
scheduleIOWithLogging :: (NFData l, MonadIO m) => (l -> m ()) -> Eff (ConsProcess '[Logs l, Lift m]) a -> m (Either String a) -- | Execute a Process using schedule on top of Lift -- IO and Logs String effects. defaultMain :: HasCallStack => Eff '[Process '[Logs LogMessage, Lift IO], Logs LogMessage, Lift IO] () -> IO () -- | A SchedulerProxy for LoggingAndIo. singleThreadedIoScheduler :: SchedulerProxy LoggingAndIo -- | The concrete list of Effects for running this pure scheduler on -- IO and with string logging. type LoggingAndIo = '[Logs LogMessage, Lift IO] -- | Implement Erlang style message passing concurrency. -- -- This handles the MessagePassing and Process effects, -- using TQueues and forkIO. -- -- This aims to be a pragmatic implementation, so even logging is -- supported. -- -- At the core is a main process that enters schedule and -- creates all of the internal state stored in TVars to manage -- processes with message queues. -- -- The Eff handler for Process and MessagePassing -- use are implemented and available through spawn. -- -- spawn uses forkFinally and TQueues and tries to -- catch most exceptions. module Control.Eff.Concurrent.Process.ForkIOScheduler -- | This is the main entry point to running a message passing concurrency -- application. This function takes a Process on top of the -- SchedulerIO effect and a LogChannel for concurrent -- logging. schedule :: Eff (ConsProcess SchedulerIO) () -> LogChannel LogMessage -> IO () -- | Start the message passing concurrency system then execute a -- Process on top of SchedulerIO effect. All logging is -- sent to standard output. defaultMain :: Eff (ConsProcess SchedulerIO) () -> IO () -- | Start the message passing concurrency system then execute a -- Process on top of SchedulerIO effect. All logging is -- sent to standard output. defaultMainWithLogChannel :: LogChannel LogMessage -> Eff (ConsProcess SchedulerIO) () -> IO () -- | A sum-type with errors that can occur when scheduleing messages. data SchedulerError -- | No process info was found for a ProcessId during internal -- processing. NOTE: This is **ONLY** caused by internal errors, probably -- by an incorrect MessagePassing handler in this module. -- **Sending a message to a process ALWAYS succeeds!** Even if the -- process does not exist. ProcessNotFound :: ProcessId -> SchedulerError -- | A process called raiseError. ProcessRaisedError :: String -> SchedulerError -- | A process called exitWithError. ProcessExitError :: String -> SchedulerError -- | A process exits. ProcessShuttingDown :: SchedulerError -- | An action was not performed while the scheduler was exiting. SchedulerShuttingDown :: SchedulerError -- | The concrete list of Effects for this scheduler implementation. -- See HasSchedulerIO type SchedulerIO = '[Reader SchedulerVar, Logs LogMessage, Lift IO] -- | A SchedulerProxy for SchedulerIO forkIoScheduler :: SchedulerProxy SchedulerIO -- | An alias for the constraints for the effects essential to this -- scheduler implementation, i.e. these effects allow spawning new -- Processes. See SchedulerIO type HasSchedulerIO r = (HasCallStack, SetMember Lift (Lift IO) r, Member (Logs LogMessage) r, Member (Reader SchedulerVar) r) instance GHC.Show.Show Control.Eff.Concurrent.Process.ForkIOScheduler.SchedulerError instance GHC.Exception.Exception Control.Eff.Concurrent.Process.ForkIOScheduler.SchedulerError instance GHC.Show.Show Control.Eff.Concurrent.Process.ForkIOScheduler.ProcessInfo -- | Observer Effects -- -- This module supports the implementation of observers and observables. -- One use case is event propagation. The tools in this module are -- tailored towards Api servers/clients. module Control.Eff.Concurrent.Api.Observer -- | An Api index that support observation of the another Api -- that is Observable. class (Typeable p, Observable o) => Observer p o -- | Wrap the Observation and the ProcessId (i.e. the -- Server) that caused the observation into a Api value -- that the Observable understands. observationMessage :: Observer p o => Server o -> Observation o -> Api p 'Asynchronous -- | An Api index that supports registration and de-registration of -- Observers. class (Typeable o, Typeable (Observation o)) => Observable o where { data family Observation o; } -- | Return the Api value for the cast_ that registeres an -- observer registerObserverMessage :: Observable o => SomeObserver o -> Api o 'Asynchronous -- | Return the Api value for the cast_ that de-registeres -- an observer forgetObserverMessage :: Observable o => SomeObserver o -> Api o 'Asynchronous -- | Send an Observation to an Observer notifyObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Observation o -> Eff r () -- | Send the registerObserverMessage registerObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Eff r () -- | Send the forgetObserverMessage forgetObserver :: (SetMember Process (Process q) r, Observable o, Observer p o) => SchedulerProxy q -> Server p -> Server o -> Eff r () -- | An existential wrapper around a Server of an Observer. -- Needed to support different types of observers to observe the same -- Observable in a general fashion. data SomeObserver o [SomeObserver] :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o -- | Send an Observation to SomeObserver. notifySomeObserver :: (SetMember Process (Process q) r, Observable o, HasCallStack) => SchedulerProxy q -> Server o -> Observation o -> SomeObserver o -> Eff r () -- | Internal state for manageobservers data Observers o -- | Keep track of registered Observers Observers can be added and -- removed, and an Observation can be sent to all registerd -- observers at once. manageObservers :: Eff (State (Observers o) : r) a -> Eff r a -- | Add an Observer to the Observers managed by -- manageObservers. addObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r () -- | Delete an Observer from the Observers managed by -- manageObservers. removeObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r () -- | Send an Observation to all SomeObservers in the -- Observers state. notifyObservers :: forall o r q. (Observable o, SetMember Process (Process q) r, Member (State (Observers o)) r) => SchedulerProxy q -> Observation o -> Eff r () -- | An Observer that schedules the observations to an effectful -- callback. data CallbackObserver o -- | Start a new process for an Observer that schedules all -- observations to an effectful callback. spawnCallbackObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q) => SchedulerProxy q -> (Server o -> Observation o -> Eff (Process q : q) Bool) -> Eff r (Server (CallbackObserver o)) -- | Use spawnCallbackObserver to create a universal logging -- observer, using the Show instance of the Observation. | -- Start a new process for an Observer that schedules all -- observations to an effectful callback. spawnLoggingObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q) => SchedulerProxy q -> Eff r (Server (CallbackObserver o)) instance GHC.Show.Show (Control.Eff.Concurrent.Api.Observer.SomeObserver o) instance GHC.Show.Show (Control.Eff.Concurrent.Api.Observer.Observation o) => GHC.Show.Show (Control.Eff.Concurrent.Api.Api (Control.Eff.Concurrent.Api.Observer.CallbackObserver o) r) instance Control.Eff.Concurrent.Api.Observer.Observable o => Control.Eff.Concurrent.Api.Observer.Observer (Control.Eff.Concurrent.Api.Observer.CallbackObserver o) o instance GHC.Classes.Ord (Control.Eff.Concurrent.Api.Observer.SomeObserver o) instance GHC.Classes.Eq (Control.Eff.Concurrent.Api.Observer.SomeObserver o) -- | In rare occasions GHC optimizes innocent looking loops into -- space-leaking monsters. See the discussion here: GHC issue -- 13080 for more details, or this blog post about space leaks in -- nested loops -- -- These functions in this module might help, at least in -- conjunction with the -fno-full-laziness GHC option. -- -- There is a unit test in the sources of this module, which can be used -- to do a comperative heap profiling of these function vs. their -- counterparts in the base package. -- -- Here are the images of the profiling results, the images show that the -- functions in this module do not leak space, compared to the original -- functions (forever and replicateM_): -- module Control.Eff.Loop -- | A version of forever that hopefully tricks GHC into -- not creating a space leak. The intuition is, that we -- want to do something that is cheap, and hence should be -- recomputed instead of shared. foreverCheap :: Monad m => m a -> m () -- | A version of replicateM_ that hopefully tricks GHC into -- not creating a space leak. The intuition is, that we -- want to do something that is cheap, and hence should be -- recomputed instead of shared. replicateCheapM_ :: Monad m => Int -> m a -> m () -- | Erlang style processes with message passing concurrency based on -- (more) extensible-effects. module Control.Eff.Concurrent -- | Each process is identified by a single process id, that stays constant -- throughout the life cycle of a process. Also, message sending relies -- on these values to address messages to processes. newtype ProcessId ProcessId :: Int -> ProcessId [_fromProcessId] :: ProcessId -> Int -- | Every function for Process things needs such a proxy value for -- the low-level effect list, i.e. the effects identified by -- r in Process r : r, this might be -- dependent on the scheduler implementation. data SchedulerProxy :: [Type -> Type] -> Type -- | Tell the typechecker what effects we have below Process [SchedulerProxy] :: SchedulerProxy q -- | Like SchedulerProxy but shorter [SP] :: SchedulerProxy q -- | Cons Process onto a list of effects. type ConsProcess r = Process r : r -- | Every Process action returns it's actual result wrapped in this -- type. It will allow to signal errors as well as pass on normal results -- such as incoming messages. data ResumeProcess v -- | The process is required to exit. [ShutdownRequested] :: ResumeProcess v -- | The process is required to exit from an error condition, that cannot -- be recovered from. [OnError] :: String -> ResumeProcess v -- | The process may resume to do work, using the given result. [ResumeWith] :: a -> ResumeProcess a -- | This indicates that the action did not complete, and maybe retried [RetryLastAction] :: ResumeProcess v -- | The process effect is the basis for message passing concurrency. This -- effect describes an interface for concurrent, communicating isolated -- processes identified uniquely by a process-id. -- -- Processes can raise exceptions that can be caught, exit gracefully or -- with an error, or be killed by other processes, with the option of -- ignoring the shutdown request. -- -- Process Scheduling is implemented in different modules. All scheduler -- implementations should follow some basic rules: -- -- data Process (r :: [Type -> Type]) b -- | In cooperative schedulers, this will give processing time to the -- scheduler. Every other operation implicitly serves the same purpose. [YieldProcess] :: Process r (ResumeProcess ()) -- | Return the current ProcessId [SelfPid] :: Process r (ResumeProcess ProcessId) -- | Start a new process, the new process will execute an effect, the -- function will return immediately with a ProcessId. [Spawn] :: Eff (Process r : r) () -> Process r (ResumeProcess ProcessId) -- | Process exit, this is the same as if the function that was applied to -- a spawn function returned. [Shutdown] :: Process r a -- | Exit the process due to an error, this cannot be caught. [ExitWithError] :: String -> Process r b -- | Raise an error, that can be handled. [RaiseError] :: String -> Process r b -- | Request that another a process exits. The targeted process is -- interrupted and gets a ShutdownRequested, the target process -- may decide to ignore the shutdown requests. [SendShutdown] :: ProcessId -> Process r (ResumeProcess Bool) -- | Send a message to a process addressed by the ProcessId. Sending -- a message should **always succeed** and return **immediately**, even -- if the destination process does not exist, or does not accept messages -- of the given type. [SendMessage] :: ProcessId -> Dynamic -> Process r (ResumeProcess Bool) -- | Receive a message. This should block until an a message was received. -- The pure function may convert the incoming message into something, and -- the result is returned as ProcessMessage value. Another -- reason why this function returns, is if a process control message was -- sent to the process. This can only occur from inside the runtime -- system, aka the effect handler implementation. (Currently there is one -- in ForkIOScheduler.) [ReceiveMessage] :: Process r (ResumeProcess Dynamic) -- | Execute a Process action and resume the process, retry the -- action or exit the process when a shutdown was requested. executeAndResume :: forall r q v. (SetMember Process (Process q) r, HasCallStack) => Process q (ResumeProcess v) -> Eff r v -- | Execute a and action and resume the process, retry the action, -- shutdown the process or return an error. executeAndCatch :: forall q r v. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Eff r (ResumeProcess v) -> Eff r (Either String v) -- | Return a SchedulerProxy for a Process effect. thisSchedulerProxy :: Eff (Process r : r) (SchedulerProxy r) -- | Use executeAndResume to execute YieldProcess. Refer to -- YieldProcess for more information. yieldProcess :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r () -- | Send a message to a process addressed by the ProcessId. See -- SendMessage. sendMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r () -- | Send a message to a process addressed by the ProcessId. See -- SendMessage. Return True if the process existed. I you -- don't care, just sendMessage instead. sendMessageChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r Bool -- | Send a message to a process addressed by the ProcessId. See -- SendMessage. sendMessageAs :: forall o r q. (HasCallStack, SetMember Process (Process q) r, Typeable o) => SchedulerProxy q -> ProcessId -> o -> Eff r () -- | Exit a process addressed by the ProcessId. See -- SendShutdown. sendShutdown :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r () -- | Like sendShutdown, but also return True iff the -- process to exit exists. sendShutdownChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r Bool -- | Start a new process, the new process will execute an effect, the -- function will return immediately with a ProcessId. spawn :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q : q) () -> Eff r ProcessId -- | Like spawn but return (). spawn_ :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q : q) () -> Eff r () -- | Block until a message was received. receiveMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r Dynamic -- | Receive and cast the message to some Typeable instance. receiveMessageAs :: forall a r q. (HasCallStack, Typeable a, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -- | Enter a loop to receive messages and pass them to a callback, until -- the function returns False. receiveLoop :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> (Either (Maybe String) Dynamic -> Eff r ()) -> Eff r () -- | Returns the ProcessId of the current process. self :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ProcessId -- | Exit the process. exitNormally :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -- | Exit the process with an error. exitWithError :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r a -- | Thrown an error, can be caught by catchRaisedError. raiseError :: forall r q b. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r b -- | Catch and handle an error raised by raiseError. Works -- independent of the handler implementation. catchRaisedError :: forall r q w. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> (String -> Eff r w) -> Eff r w -> Eff r w -- | Like catchRaisedError it catches raiseError, but instead -- of invoking a user provided handler, the result is wrapped into an -- Either. ignoreProcessError :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -> Eff r (Either String a) fromProcessId :: Iso' ProcessId Int -- | This is a tag-type that wraps around a ProcessId and holds an -- Api index type. newtype Server api Server :: ProcessId -> Server api [_fromServer] :: Server api -> ProcessId -- | The (promoted) constructors of this type specify (at the type level) -- the reply behavior of a specific constructor of an Api -- instance. data Synchronicity -- | Specify that handling a request is a blocking operation with a -- specific return type, e.g. ('Synchronous (Either RentalError -- RentalId)) Synchronous :: Type -> Synchronicity -- | Non-blocking, asynchronous, request handling Asynchronous :: Synchronicity -- | This data family defines an API, a communication interface description -- between at least two processes. The processes act as servers or -- client(s) regarding a specific instance of this type. -- -- The first parameter is usually a user defined phantom type that -- identifies the Api instance. -- -- The second parameter specifies if a specific constructor of an -- (GADT-like) Api instance is Synchronous, i.e. returns -- a result and blocks the caller or if it is Asynchronous -- -- Example: -- --
--   data BookShop deriving Typeable
--   
--   data instance Api BookShop r where
--     RentBook  :: BookId   -> Api BookShop ('Synchronous (Either RentalError RentalId))
--     BringBack :: RentalId -> Api BookShop 'Asynchronous
--   
--   type BookId = Int
--   type RentalId = Int
--   type RentalError = String
--   
fromServer :: forall api_aoVK api_ap5G. Iso (Server api_aoVK) (Server api_ap5G) ProcessId ProcessId -- | Tag a ProcessId with an Api type index to mark it a -- Server process handling that API proxyAsServer :: proxy api -> ProcessId -> Server api -- | Tag a ProcessId with an Api type index to mark it a -- Server process handling that API asServer :: forall api. ProcessId -> Server api -- | Instead of passing around a Server value and passing to -- functions like cast or call, a Server can -- provided by a Reader effect, if there is only a single -- server for a given Api instance. This type alias is -- convenience to express that an effect has Process and a reader -- for a Server. type ServesApi o r q = (Typeable o, SetMember Process (Process q) r, Member (ServerReader o) r) -- | Send an Api request that has no return value and return as fast -- as possible. The type signature enforces that the corresponding -- Api clause is Asynchronous. Return True if the -- message was sent to the process. Note that this is totally not the -- same as that the request was successfully handled. If that is -- important, use call instead. castChecked :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o 'Asynchronous)) => SchedulerProxy q -> Server o -> Api o 'Asynchronous -> Eff r Bool -- | Send an Api request that has no return value and return as fast -- as possible. The type signature enforces that the corresponding -- Api clause is Asynchronous. cast :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o 'Asynchronous)) => SchedulerProxy q -> Server o -> Api o 'Asynchronous -> Eff r () -- | Send an Api request and wait for the server to return a result -- value. -- -- The type signature enforces that the corresponding Api clause -- is Synchronous. call :: forall result api r q. (SetMember Process (Process q) r, Typeable api, Typeable (Api api ( 'Synchronous result)), Typeable result, HasCallStack) => SchedulerProxy q -> Server api -> Api api ( 'Synchronous result) -> Eff r result -- | Run a reader effect that contains the one server handling a -- specific Api instance. registerServer :: Server o -> Eff (ServerReader o : r) a -> Eff r a -- | Like call but take the Server from the reader provided -- by registerServer. callRegistered :: (Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o ( 'Synchronous reply) -> Eff r reply -- | Like callRegistered but also catch errors raised if e.g. the -- server crashed. By allowing Alternative instances to contain -- the reply, application level errors can be combined with errors rising -- from inter process communication. callRegisteredA :: forall r q o f reply. (Alternative f, Typeable f, Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o ( 'Synchronous (f reply)) -> Eff r (f reply) -- | Like cast but take the Server from the reader provided -- by registerServer. castRegistered :: (Typeable o, ServesApi o r q) => SchedulerProxy q -> Api o 'Asynchronous -> Eff r () -- | An exception that is used by the mechanism that chains together -- multiple different ApiHandler allowing a single process to -- implement multiple Apis. This exception is thrown by -- requestFromDynamic. This exception can be caught with -- catchUnhandled, this way, several distinct ApiHandler -- can be tried until one fits or until the exitUnhandled -- is invoked. data UnhandledRequest -- | A record of callbacks, handling requests sent to a server -- Process, all belonging to a specific Api family -- instance. data ApiHandler p r [ApiHandler] :: {_handleCast :: HasCallStack => Api p 'Asynchronous -> Eff r () A cast will not return a result directly. This is used for async methods., _handleCall :: forall x. HasCallStack => Api p ( 'Synchronous x) -> (x -> Eff r ()) -> Eff r () A call is a blocking operation, the caller is blocked until this handler calls the reply continuation., _handleTerminate :: HasCallStack => Maybe String -> Eff r () This callback is called with @Nothing@ if the process exits peacefully, or @Just "error message..."@ if the process exits with an error. This function is responsible to exit the process if necessary. The default behavior is defined in 'defaultTermination'.} -> ApiHandler p r -- | Receive and process incoming requests until the process exits, using -- an ApiHandler. serve :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Eff r () -- | A default handler to use in _handleCall in ApiHandler. -- It will call raiseError with a nice error message. unhandledCallError :: forall p x r q. (Show (Api p ( 'Synchronous x)), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p ( 'Synchronous x) -> (x -> Eff r ()) -> Eff r () -- | A default handler to use in _handleCast in ApiHandler. -- It will call raiseError with a nice error message. unhandledCastError :: forall p r q. (Show (Api p 'Asynchronous), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p 'Asynchronous -> Eff r () -- | Exit the process either normally of the error message is -- Nothing or with exitWithError otherwise. defaultTermination :: forall q r. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Maybe String -> Eff r () -- | serve two ApiHandlers at once. The first handler is used -- for termination handling. serveBoth :: forall r q p1 p2. (Typeable p1, Typeable p2, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> Eff r () -- | serve three ApiHandlers at once. The first handler is -- used for termination handling. serve3 :: forall r q p1 p2 p3. (Typeable p1, Typeable p2, Typeable p3, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> ApiHandler p3 r -> Eff r () -- | The basic building block of the combination of ApiHandlers is -- this function, which can not only be passed to receiveLoop, but -- also throws an UnhandledRequest exception if -- requestFromDynamic failed, such that multiple invokation of -- this function for different ApiHandlers can be tried, by using -- catchUnhandled. -- --
--   tryApiHandler px handlers1 message
--     `catchUnhandled`
--   tryApiHandler px handlers2
--     `catchUnhandled`
--   tryApiHandler px handlers3
--   
tryApiHandler :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Dynamic -> Eff (Exc UnhandledRequest : r) () -- | If requestFromDynamic failes to cast the message to a -- Request for a certain ApiHandler it throws an -- UnhandledRequest exception. That exception is caught by this -- function and the raw message is given to the handler function. This is -- the basis for chaining ApiHandlers. catchUnhandled :: forall r a. (Member (Exc UnhandledRequest) r, HasCallStack) => Eff r a -> (Dynamic -> Eff r a) -> Eff r a -- | Catch UnhandledRequests and terminate the process with an -- error, if necessary. ensureAllHandled :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff (Exc UnhandledRequest : r) () -> Eff r () -- | Cast a Dynamic value, and if that fails, throw an -- UnhandledRequest error. requestFromDynamic :: forall r a. (HasCallStack, Typeable a, Member (Exc UnhandledRequest) r) => Dynamic -> Eff r a -- | If an incoming message could not be casted to a Request -- corresponding to an ApiHandler (e.g. with -- requestFromDynamic) one should use this function to exit the -- process with a corresponding error. exitUnhandled :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Dynamic -> Eff r () -- | An Observer that schedules the observations to an effectful -- callback. data CallbackObserver o -- | Internal state for manageobservers data Observers o -- | An existential wrapper around a Server of an Observer. -- Needed to support different types of observers to observe the same -- Observable in a general fashion. data SomeObserver o [SomeObserver] :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o -- | An Api index that supports registration and de-registration of -- Observers. class (Typeable o, Typeable (Observation o)) => Observable o where { data family Observation o; } -- | Return the Api value for the cast_ that registeres an -- observer registerObserverMessage :: Observable o => SomeObserver o -> Api o 'Asynchronous -- | Return the Api value for the cast_ that de-registeres -- an observer forgetObserverMessage :: Observable o => SomeObserver o -> Api o 'Asynchronous -- | An Api index that support observation of the another Api -- that is Observable. class (Typeable p, Observable o) => Observer p o -- | Wrap the Observation and the ProcessId (i.e. the -- Server) that caused the observation into a Api value -- that the Observable understands. observationMessage :: Observer p o => Server o -> Observation o -> Api p 'Asynchronous -- | Send an Observation to an Observer notifyObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Observation o -> Eff r () -- | Send the registerObserverMessage registerObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Eff r () -- | Send the forgetObserverMessage forgetObserver :: (SetMember Process (Process q) r, Observable o, Observer p o) => SchedulerProxy q -> Server p -> Server o -> Eff r () -- | Send an Observation to SomeObserver. notifySomeObserver :: (SetMember Process (Process q) r, Observable o, HasCallStack) => SchedulerProxy q -> Server o -> Observation o -> SomeObserver o -> Eff r () -- | Keep track of registered Observers Observers can be added and -- removed, and an Observation can be sent to all registerd -- observers at once. manageObservers :: Eff (State (Observers o) : r) a -> Eff r a -- | Add an Observer to the Observers managed by -- manageObservers. addObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r () -- | Delete an Observer from the Observers managed by -- manageObservers. removeObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r () -- | Send an Observation to all SomeObservers in the -- Observers state. notifyObservers :: forall o r q. (Observable o, SetMember Process (Process q) r, Member (State (Observers o)) r) => SchedulerProxy q -> Observation o -> Eff r () -- | Start a new process for an Observer that schedules all -- observations to an effectful callback. spawnCallbackObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q) => SchedulerProxy q -> (Server o -> Observation o -> Eff (Process q : q) Bool) -> Eff r (Server (CallbackObserver o)) -- | Use spawnCallbackObserver to create a universal logging -- observer, using the Show instance of the Observation. | -- Start a new process for an Observer that schedules all -- observations to an effectful callback. spawnLoggingObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q) => SchedulerProxy q -> Eff r (Server (CallbackObserver o)) -- | The concrete list of Effects for this scheduler implementation. -- See HasSchedulerIO type SchedulerIO = '[Reader SchedulerVar, Logs LogMessage, Lift IO] -- | An alias for the constraints for the effects essential to this -- scheduler implementation, i.e. these effects allow spawning new -- Processes. See SchedulerIO type HasSchedulerIO r = (HasCallStack, SetMember Lift (Lift IO) r, Member (Logs LogMessage) r, Member (Reader SchedulerVar) r) -- | A sum-type with errors that can occur when scheduleing messages. data SchedulerError -- | No process info was found for a ProcessId during internal -- processing. NOTE: This is **ONLY** caused by internal errors, probably -- by an incorrect MessagePassing handler in this module. -- **Sending a message to a process ALWAYS succeeds!** Even if the -- process does not exist. ProcessNotFound :: ProcessId -> SchedulerError -- | A process called raiseError. ProcessRaisedError :: String -> SchedulerError -- | A process called exitWithError. ProcessExitError :: String -> SchedulerError -- | A process exits. ProcessShuttingDown :: SchedulerError -- | An action was not performed while the scheduler was exiting. SchedulerShuttingDown :: SchedulerError -- | A SchedulerProxy for SchedulerIO forkIoScheduler :: SchedulerProxy SchedulerIO -- | This is the main entry point to running a message passing concurrency -- application. This function takes a Process on top of the -- SchedulerIO effect and a LogChannel for concurrent -- logging. schedule :: Eff (ConsProcess SchedulerIO) () -> LogChannel LogMessage -> IO () -- | Start the message passing concurrency system then execute a -- Process on top of SchedulerIO effect. All logging is -- sent to standard output. defaultMain :: Eff (ConsProcess SchedulerIO) () -> IO () -- | Start the message passing concurrency system then execute a -- Process on top of SchedulerIO effect. All logging is -- sent to standard output. defaultMainWithLogChannel :: LogChannel LogMessage -> Eff (ConsProcess SchedulerIO) () -> IO () -- | Like schedule but pure. The yield effect is -- just return (). schedulePure == runIdentity . -- scheduleM (Identity . run) (return ()) schedulePure :: Eff (ConsProcess '[]) a -> Either String a