-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Message passing concurrency as extensible-effect -- -- Please see the README on GitHub at -- https://github.com/sheyll/extensible-effects-concurrent#readme @package extensible-effects-concurrent @version 0.3.0.0 -- | The message passing effect. -- -- This module describes an abstract message passing effect, and a -- process effect, mimicking Erlang's process and message semantics. -- -- Two scheduler implementations for the Process effect are -- provided: -- -- module Control.Eff.Concurrent.Process -- | Each process is identified by a single process id, that stays constant -- throughout the life cycle of a process. Also, message sending relies -- on these values to address messages to processes. newtype ProcessId ProcessId :: Int -> ProcessId [_fromProcessId] :: ProcessId -> Int fromProcessId :: Iso' ProcessId Int -- | The process effect is the basis for message passing concurrency. This -- effect describes an interface for concurrent, communicating isolated -- processes identified uniquely by a process-id. -- -- Processes can raise exceptions that can be caught, exit gracefully or -- with an error, or be killed by other processes, with the option of -- ignoring the shutdown request. -- -- Process Scheduling is implemented in different modules. All scheduler -- implementations should follow some basic rules: -- -- data Process (r :: [Type -> Type]) b -- | In cooperative schedulers, this will give processing time to the -- scheduler. Every other operation implicitly serves the same purpose. [YieldProcess] :: Process r (ResumeProcess ()) -- | Return the current ProcessId [SelfPid] :: Process r (ResumeProcess ProcessId) -- | Start a new process, the new process will execute an effect, the -- function will return immediately with a ProcessId. [Spawn] :: Eff (Process r : r) () -> Process r (ResumeProcess ProcessId) -- | Process exit, this is the same as if the function that was applied to -- a spawn function returned. [Shutdown] :: Process r a -- | Exit the process due to an error, this cannot be caught. [ExitWithError] :: String -> Process r b -- | Raise an error, that can be handled. [RaiseError] :: String -> Process r b -- | Request that another a process exits. The targeted process is -- interrupted and gets a ShutdownRequested, the target process -- may decide to ignore the shutdown requests. [SendShutdown] :: ProcessId -> Process r (ResumeProcess Bool) -- | Send a message to a process addressed by the ProcessId. Sending -- a message should **always succeed** and return **immediately**, even -- if the destination process does not exist, or does not accept messages -- of the given type. [SendMessage] :: ProcessId -> Dynamic -> Process r (ResumeProcess Bool) -- | Receive a message. This should block until an a message was received. -- The pure function may convert the incoming message into something, and -- the result is returned as ProcessMessage value. Another -- reason why this function returns, is if a process control message was -- sent to the process. This can only occur from inside the runtime -- system, aka the effect handler implementation. (Currently there is one -- in ForkIOScheduler.) [ReceiveMessage] :: Process r (ResumeProcess Dynamic) -- | Cons Process onto a list of effects. type ConsProcess r = Process r : r -- | Every Process action returns it's actual result wrapped in this -- type. It will allow to signal errors as well as pass on normal results -- such as incoming messages. data ResumeProcess v -- | The process is required to exit. [ShutdownRequested] :: ResumeProcess v -- | The process is required to exit from an error condition, that cannot -- be recovered from. [OnError] :: String -> ResumeProcess v -- | The process may resume to do work, using the given result. [ResumeWith] :: a -> ResumeProcess a -- | This indicates that the action did not complete, and maybe retried [RetryLastAction] :: ResumeProcess v -- | Every function for Process things needs such a proxy value for -- the low-level effect list, i.e. the effects identified by -- r in Process r : r, this might be -- dependent on the scheduler implementation. data SchedulerProxy :: [Type -> Type] -> Type -- | Tell the typechecker what effects we have below Process [SchedulerProxy] :: SchedulerProxy q -- | Like SchedulerProxy but shorter [SP] :: SchedulerProxy q -- | Return a SchedulerProxy for a Process effect. thisSchedulerProxy :: Eff (Process r : r) (SchedulerProxy r) -- | Execute a and action and resume the process, retry the action, -- shutdown the process or return an error. executeAndCatch :: forall q r v. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Eff r (ResumeProcess v) -> Eff r (Either String v) -- | Use executeAndResume to execute YieldProcess. Refer to -- YieldProcess for more information. yieldProcess :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r () -- | Send a message to a process addressed by the ProcessId. See -- SendMessage. sendMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r () -- | Send a message to a process addressed by the ProcessId. See -- SendMessage. sendMessageAs :: forall o r q. (HasCallStack, SetMember Process (Process q) r, Typeable o) => SchedulerProxy q -> ProcessId -> o -> Eff r () -- | Send a message to a process addressed by the ProcessId. See -- SendMessage. Return True if the process existed. I you -- don't care, just sendMessage instead. sendMessageChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r Bool -- | Start a new process, the new process will execute an effect, the -- function will return immediately with a ProcessId. spawn :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q : q) () -> Eff r ProcessId -- | Like spawn but return (). spawn_ :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q : q) () -> Eff r () -- | Block until a message was received. receiveMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r Dynamic -- | Receive and cast the message to some Typeable instance. receiveMessageAs :: forall a r q. (HasCallStack, Typeable a, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -- | Enter a loop to receive messages and pass them to a callback, until -- the function returns False. receiveLoop :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> (Either (Maybe String) Dynamic -> Eff r ()) -> Eff r () -- | Returns the ProcessId of the current process. self :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ProcessId -- | Exit a process addressed by the ProcessId. See -- SendShutdown. sendShutdown :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r () -- | Like sendShutdown, but also return True iff the -- process to exit exists. sendShutdownChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r Bool -- | Exit the process with an error. exitWithError :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r a -- | Exit the process. exitNormally :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -- | Thrown an error, can be caught by catchRaisedError. raiseError :: forall r q b. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r b -- | Catch and handle an error raised by raiseError. Works -- independent of the handler implementation. catchRaisedError :: forall r q w. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> (String -> Eff r w) -> Eff r w -> Eff r w -- | Like catchRaisedError it catches raiseError, but instead -- of invoking a user provided handler, the result is wrapped into an -- Either. ignoreProcessError :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -> Eff r (Either String a) instance GHC.Real.Real Control.Eff.Concurrent.Process.ProcessId instance GHC.Real.Integral Control.Eff.Concurrent.Process.ProcessId instance GHC.Enum.Enum Control.Eff.Concurrent.Process.ProcessId instance GHC.Num.Num Control.Eff.Concurrent.Process.ProcessId instance GHC.Enum.Bounded Control.Eff.Concurrent.Process.ProcessId instance GHC.Classes.Ord Control.Eff.Concurrent.Process.ProcessId instance GHC.Classes.Eq Control.Eff.Concurrent.Process.ProcessId instance Data.Traversable.Traversable Control.Eff.Concurrent.Process.ResumeProcess instance GHC.Classes.Ord v => GHC.Classes.Ord (Control.Eff.Concurrent.Process.ResumeProcess v) instance GHC.Classes.Eq v => GHC.Classes.Eq (Control.Eff.Concurrent.Process.ResumeProcess v) instance GHC.Show.Show v => GHC.Show.Show (Control.Eff.Concurrent.Process.ResumeProcess v) instance GHC.Base.Functor Control.Eff.Concurrent.Process.ResumeProcess instance Data.Foldable.Foldable Control.Eff.Concurrent.Process.ResumeProcess instance GHC.Read.Read Control.Eff.Concurrent.Process.ProcessId instance GHC.Show.Show Control.Eff.Concurrent.Process.ProcessId -- | This module contains a mechanism to specify what kind of messages (aka -- requests) a Server (Process) can handle, and if -- the caller blocks and waits for an answer, which the server process -- provides. -- -- The type magic in the Api type familiy allows to define a -- related set of requests along with the corresponding responses. -- -- Request handling can be either blocking, if a response is requred, or -- non-blocking. -- -- A process can serve a specific Api instance by using the -- functions provided by the Control.Eff.Concurrent.Api.Server -- module. -- -- To enable a process to use such a service, the functions -- provided by the Control.Eff.Concurrent.Api.Client should be -- used. module Control.Eff.Concurrent.Api -- | This data family defines an API, a communication interface description -- between at least two processes. The processes act as servers or -- client(s) regarding a specific instance of this type. -- -- The first parameter is usually a user defined phantom type that -- identifies the Api instance. -- -- The second parameter specifies if a specific constructor of an -- (GADT-like) Api instance is Synchronous, i.e. returns -- a result and blocks the caller or if it is Asynchronous -- -- Example: -- --
--   data BookShop deriving Typeable
--   
--   data instance Api BookShop r where
--     RentBook  :: BookId   -> Api BookShop ('Synchronous (Either RentalError RentalId))
--     BringBack :: RentalId -> Api BookShop 'Asynchronous
--   
--   type BookId = Int
--   type RentalId = Int
--   type RentalError = String
--   
-- | The (promoted) constructors of this type specify (at the type level) -- the reply behavior of a specific constructor of an Api -- instance. data Synchronicity -- | Specify that handling a request is a blocking operation with a -- specific return type, e.g. ('Synchronous (Either RentalError -- RentalId)) Synchronous :: Type -> Synchronicity -- | Non-blocking, asynchronous, request handling Asynchronous :: Synchronicity -- | This is a tag-type that wraps around a ProcessId and holds an -- Api index type. newtype Server api Server :: ProcessId -> Server api [_fromServer] :: Server api -> ProcessId fromServer :: forall api_ai3V api_aieo. Iso (Server api_ai3V) (Server api_aieo) ProcessId ProcessId -- | Tag a ProcessId with an Api type index to mark it a -- Server process handling that API proxyAsServer :: proxy api -> ProcessId -> Server api -- | Tag a ProcessId with an Api type index to mark it a -- Server process handling that API asServer :: forall api. ProcessId -> Server api instance forall k (api :: k). GHC.Classes.Ord (Control.Eff.Concurrent.Api.Server api) instance forall k (api :: k). GHC.Classes.Eq (Control.Eff.Concurrent.Api.Server api) instance forall k (api :: k). GHC.Read.Read (Control.Eff.Concurrent.Api.Server api) instance forall k (api :: k). Data.Typeable.Internal.Typeable api => GHC.Show.Show (Control.Eff.Concurrent.Api.Server api) -- | Functions for Api clients. -- -- This modules is required to write clients that consume an Api. module Control.Eff.Concurrent.Api.Client -- | Send an Api request that has no return value and return as fast -- as possible. The type signature enforces that the corresponding -- Api clause is Asynchronous. cast :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o 'Asynchronous)) => SchedulerProxy q -> Server o -> Api o 'Asynchronous -> Eff r () -- | Send an Api request that has no return value and return as fast -- as possible. The type signature enforces that the corresponding -- Api clause is Asynchronous. Return True if the -- message was sent to the process. Note that this is totally not the -- same as that the request was successfully handled. If that is -- important, use call instead. castChecked :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o 'Asynchronous)) => SchedulerProxy q -> Server o -> Api o 'Asynchronous -> Eff r Bool -- | Send an Api request and wait for the server to return a result -- value. -- -- The type signature enforces that the corresponding Api clause -- is Synchronous. call :: forall result api r q. (SetMember Process (Process q) r, Typeable api, Typeable (Api api ( 'Synchronous result)), Typeable result, HasCallStack) => SchedulerProxy q -> Server api -> Api api ( 'Synchronous result) -> Eff r result -- | Like cast but take the Server from the reader provided -- by registerServer. castRegistered :: (Typeable o, ServesApi o r q) => SchedulerProxy q -> Api o 'Asynchronous -> Eff r () -- | Like call but take the Server from the reader provided -- by registerServer. callRegistered :: (Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o ( 'Synchronous reply) -> Eff r reply -- | Like callRegistered but also catch errors raised if e.g. the -- server crashed. By allowing Alternative instances to contain -- the reply, application level errors can be combined with errors rising -- from inter process communication. callRegisteredA :: forall r q o f reply. (Alternative f, Typeable f, Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o ( 'Synchronous (f reply)) -> Eff r (f reply) -- | Instead of passing around a Server value and passing to -- functions like cast or call, a Server can -- provided by a Reader effect, if there is only a single -- server for a given Api instance. This type alias is -- convenience to express that an effect has Process and a reader -- for a Server. type ServesApi o r q = (Typeable o, SetMember Process (Process q) r, Member (Reader (Server o)) r) -- | Run a reader effect that contains the one server handling a -- specific Api instance. registerServer :: Server o -> Eff (Reader (Server o) : r) a -> Eff r a module Control.Eff.Concurrent.Process.Interactive -- | This module provides support for executing Process actions from -- IO. -- -- One use case is interacting with processes from the REPL, e.g.: -- --
--   >>> import Control.Eff.Concurrent.Process.SingleThreadedScheduler (defaultMain)
--   
-- --
--   >>> import Data.Dynamic
--   
-- --
--   >>> import Data.Maybe
--   
-- --
--   >>> s <- forkInteractiveScheduler Control.Eff.Concurrent.Process.SingleThreadedScheduler.defaultMain
--   
-- --
--   >>> fooPid <- submit s (spawn (forever (receiveMessage SP >>= (logMsg . fromMaybe "Huh!??" . fromDynamic))))
--   
-- --
--   >>> fooPid
--   <0.1.0>
--   
-- --
--   >>> submit s (sendMessageAs SP fooPid "test")
--   test
--   
-- --
--   >>> submit s (sendShutdown SP fooPid)
--   
data SchedulerVar r SchedulerVar :: ThreadId -> MVar (Eff (Process r : r) (Maybe String)) -> SchedulerVar r [_schedulerThreadId] :: SchedulerVar r -> ThreadId [_schedulerInQueue] :: SchedulerVar r -> MVar (Eff (Process r : r) (Maybe String)) -- | Fork a scheduler with a process that communicates with it via -- MVar, which is also the reason for the Lift IO -- constraint. forkInteractiveScheduler :: forall r. (SetMember Lift (Lift IO) r) => (Eff (Process r : r) () -> IO ()) -> IO (SchedulerVar r) -- | Exit the schedulder immediately using an asynchronous exception. killInteractiveScheduler :: SchedulerVar r -> IO () -- | Send a Process effect to the main process of a scheduler, this -- blocks until the effect is executed. submit :: forall r a. (SetMember Lift (Lift IO) r) => SchedulerVar r -> Eff (Process r : r) a -> IO a -- | Send a Process effect to the main process of a scheduler, this -- blocks until the effect is executed, then the result is printed by the -- thread, that runs the process 0 in the scheduler. submitPrint :: forall r a. (Show a, SetMember Lift (Lift IO) r) => SchedulerVar r -> Eff (Process r : r) a -> IO () -- | Combination of submit and cast. submitCast :: forall o r. (SetMember Lift (Lift IO) r, Typeable o) => SchedulerVar r -> Server o -> Api o 'Asynchronous -> IO () -- | Combination of submit and cast. submitCall :: forall o q r. (SetMember Lift (Lift IO) r, Typeable o, Typeable q) => SchedulerVar r -> Server o -> Api o ( 'Synchronous q) -> IO q -- | Add-ons to Exception module Control.Eff.ExceptionExtra -- | Catch Exception thrown by an effect. liftTry :: forall e r a. (HasCallStack, Exception e, SetMember Lift (Lift IO) r) => Eff r a -> Eff r (Either e a) -- | Functions to implement Api servers. module Control.Eff.Concurrent.Api.Server -- | Receive and process incoming requests until the process exits, using -- an ApiHandler. serve :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Eff r () -- | A record of callbacks, handling requests sent to a server -- Process, all belonging to a specific Api family -- instance. data ApiHandler p r [ApiHandler] :: {_handleCast :: HasCallStack => Api p 'Asynchronous -> Eff r () A cast will not return a result directly. This is used for async methods., _handleCall :: forall x. HasCallStack => Api p ( 'Synchronous x) -> (x -> Eff r ()) -> Eff r () A call is a blocking operation, the caller is blocked until this handler calls the reply continuation., _handleTerminate :: HasCallStack => Maybe String -> Eff r () This callback is called with @Nothing@ if the process exits peacefully, or @Just "error message..."@ if the process exits with an error. This function is responsible to exit the process if necessary. The default behavior is defined in 'defaultTermination'.} -> ApiHandler p r -- | A default handler to use in _handleCall in ApiHandler. -- It will call raiseError with a nice error message. unhandledCallError :: forall p x r q. (Show (Api p ( 'Synchronous x)), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p ( 'Synchronous x) -> (x -> Eff r ()) -> Eff r () -- | A default handler to use in _handleCast in ApiHandler. -- It will call raiseError with a nice error message. unhandledCastError :: forall p r q. (Show (Api p 'Asynchronous), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p 'Asynchronous -> Eff r () -- | Exit the process either normally of the error message is -- Nothing or with exitWithError otherwise. defaultTermination :: forall q r. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Maybe String -> Eff r () -- | serve two ApiHandlers at once. The first handler is used -- for termination handling. serveBoth :: forall r q p1 p2. (Typeable p1, Typeable p2, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> Eff r () -- | serve three ApiHandlers at once. The first handler is -- used for termination handling. serve3 :: forall r q p1 p2 p3. (Typeable p1, Typeable p2, Typeable p3, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> ApiHandler p3 r -> Eff r () -- | The basic building block of the combination of ApiHandlers is -- this function, which can not only be passed to receiveLoop, but -- also throws an UnhandledRequest exception if -- requestFromDynamic failed, such that multiple invokation of -- this function for different ApiHandlers can be tried, by using -- catchUnhandled. -- --
--   tryApiHandler px handlers1 message
--     `catchUnhandled`
--   tryApiHandler px handlers2
--     `catchUnhandled`
--   tryApiHandler px handlers3
--   
tryApiHandler :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Dynamic -> Eff (Exc UnhandledRequest : r) () -- | An exception that is used by the mechanism that chains together -- multiple different ApiHandler allowing a single process to -- implement multiple Apis. This exception is thrown by -- requestFromDynamic. This exception can be caught with -- catchUnhandled, this way, several distinct ApiHandler -- can be tried until one fits or until the exitUnhandled -- is invoked. data UnhandledRequest -- | If requestFromDynamic failes to cast the message to a -- Request for a certain ApiHandler it throws an -- UnhandledRequest exception. That exception is caught by this -- function and the raw message is given to the handler function. This is -- the basis for chaining ApiHandlers. catchUnhandled :: forall r a. (Member (Exc UnhandledRequest) r, HasCallStack) => Eff r a -> (Dynamic -> Eff r a) -> Eff r a -- | Catch UnhandledRequests and terminate the process with an -- error, if necessary. ensureAllHandled :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff (Exc UnhandledRequest : r) () -> Eff r () -- | Cast a Dynamic value, and if that fails, throw an -- UnhandledRequest error. requestFromDynamic :: forall r a. (HasCallStack, Typeable a, Member (Exc UnhandledRequest) r) => Dynamic -> Eff r a -- | If an incoming message could not be casted to a Request -- corresponding to an ApiHandler (e.g. with -- requestFromDynamic) one should use this function to exit the -- process with a corresponding error. exitUnhandled :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Dynamic -> Eff r () -- | A logging effect based on MonadLog. module Control.Eff.Log -- | Logging effect type, parameterized by a log message type. data Logs message a [LogMsg] :: message -> Logs message () -- | Log a message. logMsg :: Member (Logs m) r => m -> Eff r () -- | Change, add or remove log messages. -- -- Requirements: -- -- -- -- Approach: Install a callback that sneaks into to log message -- sending/receiving, to intercept the messages and execute some code and -- then return a new message. foldLog :: forall r m a. Member (Logs m) r => (m -> Eff r ()) -> Eff r a -> Eff r a -- | Change, add or remove log messages without side effects, faster than -- foldLog. -- -- Requirements: -- -- -- -- Approach: Install a callback that sneaks into to log message -- sending/receiving, to intercept the messages and execute some code and -- then return a new message. foldLogFast :: forall r m a f. (Foldable f, Member (Logs m) r) => (m -> f m) -> Eff r a -> Eff r a -- | Capture the logs in a Seq. captureLogs :: Eff (Logs message : r) a -> Eff r (a, Seq message) -- | Throw away all log messages. ignoreLogs :: Eff (Logs message : r) a -> Eff r a -- | Handle Logs effects using LoggingT Handlers. handleLogsWith :: forall m r message a. (Monad m, SetMember Lift (Lift m) r) => Eff (Logs message : r) a -> (forall b. (Handler m message -> m b) -> m b) -> Eff r a -- | A log channel processes logs from the Logs effect by en-queuing -- them in a shared queue read from a seperate processes. A channel can -- contain log message filters. data LogChannel message -- | Send the log messages to a LogChannel. logToChannel :: forall r message a. (SetMember Lift (Lift IO) r) => LogChannel message -> Eff (Logs message : r) a -> Eff r a -- | Create a LogChannel that will discard all messages sent via -- forwardLogstochannel or logChannelPutIO. noLogger :: LogChannel message -- | Fork a new process, that applies a monadic action to all log messages -- sent via logToChannel or logChannelPutIO. forkLogger :: forall message. (Typeable message, Show message) => Int -> (message -> IO ()) -> Maybe message -> IO (LogChannel message) -- | Filter logs sent to a LogChannel using a predicate. filterLogChannel :: (message -> Bool) -> LogChannel message -> LogChannel message -- | Close a log channel created by e.g. forkLogger. Message already -- enqueue are handled, as well as an optional final message. Subsequent -- log message will not be handled anymore. If the log channel must be -- closed immediately, use killLogChannel instead. joinLogChannel :: (Show message, Typeable message) => Maybe message -> LogChannel message -> IO () -- | Close a log channel quickly, without logging messages already in the -- queue. Subsequent logging requests will not be handled anymore. If the -- log channel must be closed without loosing any messages, use -- joinLogChannel instead. killLogChannel :: (Show message, Typeable message) => Maybe message -> LogChannel message -> IO () -- | Run an action and close a LogChannel created by -- noLogger, forkLogger or filterLogChannel -- afterwards using joinLogChannel. If a SomeException was -- thrown, the log channel is killed with killLogChannel, and the -- exception is re-thrown. closeLogChannelAfter :: (Show message, Typeable message, IsString message) => Maybe message -> LogChannel message -> IO a -> IO a -- | Wrap LogChannel creation and destruction around a monad action -- in brackety manner. This function uses joinLogChannel, -- so en-queued messages are flushed on exit. The resulting action in in -- the LoggingT monad, which is essentially a reader for the log -- handler function. logChannelBracket :: (Show message, Typeable message) => Int -> Maybe message -> Maybe message -> (LogChannel message -> IO a) -> LoggingT message IO a -- | Enqueue a log message into a log channel logChannelPutIO :: LogChannel message -> message -> IO () instance GHC.Show.Show m => GHC.Show.Show (Control.Eff.Log.KillLogChannelException m) instance GHC.Show.Show m => GHC.Show.Show (Control.Eff.Log.JoinLogChannelException m) instance (Data.Typeable.Internal.Typeable m, GHC.Show.Show m) => GHC.Exception.Exception (Control.Eff.Log.KillLogChannelException m) instance (Data.Typeable.Internal.Typeable m, GHC.Show.Show m) => GHC.Exception.Exception (Control.Eff.Log.JoinLogChannelException m) -- | A coroutine based, single threaded scheduler for Processes. module Control.Eff.Concurrent.Process.SingleThreadedScheduler -- | Execute a Process in the current thread, all child processes -- spawned by spawn will be executed concurrently using a -- co-routine based, round-robin scheduler. schedule :: forall r. Eff (Process r : r) () -> Eff r () -- | Execute a Process in the current thread, all child processes -- spawned by spawn will be executed concurrently using a -- co-routine based, round-robin scheduler. scheduleWithYield :: forall r. Eff r () -> Eff (Process r : r) () -> Eff r () -- | Execute a Process using schedule on top of Lift -- IO and Logs String effects. defaultMain :: HasCallStack => Eff '[Process '[Logs String, Lift IO], Logs String, Lift IO] () -> IO () -- | A SchedulerProxy for LoggingAndIo. singleThreadedIoScheduler :: SchedulerProxy LoggingAndIo -- | The concrete list of Effects for running this pure scheduler on -- IO and with string logging. type LoggingAndIo = '[Logs String, Lift IO] -- | Implement Erlang style message passing concurrency. -- -- This handles the MessagePassing and Process effects, -- using TQueues and forkIO. -- -- This aims to be a pragmatic implementation, so even logging is -- supported. -- -- At the core is a main process that enters schedule and -- creates all of the internal state stored in TVars to manage -- processes with message queues. -- -- The Eff handler for Process and MessagePassing -- use are implemented and available through spawn. -- -- spawn uses forkFinally and TQueues and tries to -- catch most exceptions. module Control.Eff.Concurrent.Process.ForkIOScheduler -- | This is the main entry point to running a message passing concurrency -- application. This function takes a Process on top of the -- SchedulerIO effect and a LogChannel for concurrent -- logging. schedule :: Eff (ConsProcess SchedulerIO) () -> LogChannel String -> IO () -- | Start the message passing concurrency system then execute a -- Process on top of SchedulerIO effect. All logging is -- sent to standard output. defaultMain :: Eff (ConsProcess SchedulerIO) () -> IO () -- | Start the message passing concurrency system then execute a -- Process on top of SchedulerIO effect. All logging is -- sent to standard output. defaultMainWithLogChannel :: LogChannel String -> Eff (ConsProcess SchedulerIO) () -> IO () -- | A sum-type with errors that can occur when scheduleing messages. data SchedulerError -- | No ProcessInfo was found for a ProcessId during internal -- processing. NOTE: This is **ONLY** caused by internal errors, probably -- by an incorrect MessagePassing handler in this module. -- **Sending a message to a process ALWAYS succeeds!** Even if the -- process does not exist. ProcessNotFound :: ProcessId -> SchedulerError -- | A process called raiseError. ProcessRaisedError :: String -> SchedulerError -- | A process called exitWithError. ProcessExitError :: String -> SchedulerError -- | A process exits. ProcessShuttingDown :: SchedulerError -- | An action was not performed while the scheduler was exiting. SchedulerShuttingDown :: SchedulerError -- | The concrete list of Effects for this scheduler implementation. -- See HasSchedulerIO type SchedulerIO = '[Reader SchedulerVar, Logs String, Lift IO] -- | A SchedulerProxy for SchedulerIO forkIoScheduler :: SchedulerProxy SchedulerIO -- | An alias for the constraints for the effects essential to this -- scheduler implementation, i.e. these effects allow spawning new -- Processes. See SchedulerIO type HasSchedulerIO r = (HasCallStack, SetMember Lift (Lift IO) r, Member (Logs String) r, Member (Reader SchedulerVar) r) instance GHC.Show.Show Control.Eff.Concurrent.Process.ForkIOScheduler.SchedulerError instance GHC.Exception.Exception Control.Eff.Concurrent.Process.ForkIOScheduler.SchedulerError instance GHC.Show.Show Control.Eff.Concurrent.Process.ForkIOScheduler.ProcessInfo -- | Observer Effects -- -- This module supports the implementation of observers and observables. -- One use case is event propagation. The tools in this module are -- tailored towards Api servers/clients. module Control.Eff.Concurrent.Api.Observer -- | An Api index that support observation of the another Api -- that is Observable. class (Typeable p, Observable o) => Observer p o -- | Wrap the Observation and the ProcessId (i.e. the -- Server) that caused the observation into a Api value -- that the Observable understands. observationMessage :: Observer p o => Server o -> Observation o -> Api p 'Asynchronous -- | An Api index that supports registration and de-registration of -- Observers. class (Typeable o, Typeable (Observation o)) => Observable o where { data family Observation o; } -- | Return the Api value for the cast_ that registeres an -- observer registerObserverMessage :: Observable o => SomeObserver o -> Api o 'Asynchronous -- | Return the Api value for the cast_ that de-registeres -- an observer forgetObserverMessage :: Observable o => SomeObserver o -> Api o 'Asynchronous -- | Send an Observation to an Observer notifyObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Observation o -> Eff r () -- | Send the registerObserverMessage registerObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Eff r () -- | Send the forgetObserverMessage forgetObserver :: (SetMember Process (Process q) r, Observable o, Observer p o) => SchedulerProxy q -> Server p -> Server o -> Eff r () -- | An existential wrapper around a Server of an Observer. -- Needed to support different types of observers to observe the same -- Observable in a general fashion. data SomeObserver o [SomeObserver] :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o -- | Send an Observation to SomeObserver. notifySomeObserver :: (SetMember Process (Process q) r, Observable o, HasCallStack) => SchedulerProxy q -> Server o -> Observation o -> SomeObserver o -> Eff r () -- | Internal state for manageobservers data Observers o -- | Keep track of registered Observers Observers can be added and -- removed, and an Observation can be sent to all registerd -- observers at once. manageObservers :: Eff (State (Observers o) : r) a -> Eff r a -- | Add an Observer to the Observers managed by -- manageObservers. addObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r () -- | Delete an Observer from the Observers managed by -- manageObservers. removeObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r () -- | Send an Observation to all SomeObservers in the -- Observers state. notifyObservers :: forall o r q. (Observable o, SetMember Process (Process q) r, Member (State (Observers o)) r) => SchedulerProxy q -> Observation o -> Eff r () -- | An Observer that schedules the observations to an effectful -- callback. data CallbackObserver o -- | Start a new process for an Observer that schedules all -- observations to an effectful callback. spawnCallbackObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs String) q) => SchedulerProxy q -> (Server o -> Observation o -> Eff (Process q : q) Bool) -> Eff r (Server (CallbackObserver o)) -- | Use spawnCallbackObserver to create a universal logging -- observer, using the Show instance of the Observation. | -- Start a new process for an Observer that schedules all -- observations to an effectful callback. spawnLoggingObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs String) q) => SchedulerProxy q -> Eff r (Server (CallbackObserver o)) instance GHC.Show.Show (Control.Eff.Concurrent.Api.Observer.SomeObserver o) instance GHC.Show.Show (Control.Eff.Concurrent.Api.Observer.Observation o) => GHC.Show.Show (Control.Eff.Concurrent.Api.Api (Control.Eff.Concurrent.Api.Observer.CallbackObserver o) r) instance Control.Eff.Concurrent.Api.Observer.Observable o => Control.Eff.Concurrent.Api.Observer.Observer (Control.Eff.Concurrent.Api.Observer.CallbackObserver o) o instance GHC.Classes.Ord (Control.Eff.Concurrent.Api.Observer.SomeObserver o) instance GHC.Classes.Eq (Control.Eff.Concurrent.Api.Observer.SomeObserver o)