-- Hoogle documentation, generated by Haddock
-- See Hoogle, http://www.haskell.org/hoogle/
-- | Message passing concurrency as extensible-effect
--
-- Please see the README on GitHub at
-- https://github.com/sheyll/extensible-effects-concurrent#readme
@package extensible-effects-concurrent
@version 0.3.0.2
-- | The message passing effect.
--
-- This module describes an abstract message passing effect, and a
-- process effect, mimicking Erlang's process and message semantics.
--
-- Two scheduler implementations for the Process effect are
-- provided:
--
--
module Control.Eff.Concurrent.Process
-- | Each process is identified by a single process id, that stays constant
-- throughout the life cycle of a process. Also, message sending relies
-- on these values to address messages to processes.
newtype ProcessId
ProcessId :: Int -> ProcessId
[_fromProcessId] :: ProcessId -> Int
fromProcessId :: Iso' ProcessId Int
-- | The process effect is the basis for message passing concurrency. This
-- effect describes an interface for concurrent, communicating isolated
-- processes identified uniquely by a process-id.
--
-- Processes can raise exceptions that can be caught, exit gracefully or
-- with an error, or be killed by other processes, with the option of
-- ignoring the shutdown request.
--
-- Process Scheduling is implemented in different modules. All scheduler
-- implementations should follow some basic rules:
--
--
-- - fair scheduling
-- - sending a message does not block
-- - receiving a message does block
-- - spawning a child blocks only a very moment
-- - a newly spawned process shall be scheduled before the parent
-- process after
-- - the spawn
-- - when the first process exists, all process should be killed
-- immediately
--
data Process (r :: [Type -> Type]) b
-- | In cooperative schedulers, this will give processing time to the
-- scheduler. Every other operation implicitly serves the same purpose.
[YieldProcess] :: Process r (ResumeProcess ())
-- | Return the current ProcessId
[SelfPid] :: Process r (ResumeProcess ProcessId)
-- | Start a new process, the new process will execute an effect, the
-- function will return immediately with a ProcessId.
[Spawn] :: Eff (Process r : r) () -> Process r (ResumeProcess ProcessId)
-- | Process exit, this is the same as if the function that was applied to
-- a spawn function returned.
[Shutdown] :: Process r a
-- | Exit the process due to an error, this cannot be caught.
[ExitWithError] :: String -> Process r b
-- | Raise an error, that can be handled.
[RaiseError] :: String -> Process r b
-- | Request that another a process exits. The targeted process is
-- interrupted and gets a ShutdownRequested, the target process
-- may decide to ignore the shutdown requests.
[SendShutdown] :: ProcessId -> Process r (ResumeProcess Bool)
-- | Send a message to a process addressed by the ProcessId. Sending
-- a message should **always succeed** and return **immediately**, even
-- if the destination process does not exist, or does not accept messages
-- of the given type.
[SendMessage] :: ProcessId -> Dynamic -> Process r (ResumeProcess Bool)
-- | Receive a message. This should block until an a message was received.
-- The pure function may convert the incoming message into something, and
-- the result is returned as ProcessMessage value. Another
-- reason why this function returns, is if a process control message was
-- sent to the process. This can only occur from inside the runtime
-- system, aka the effect handler implementation. (Currently there is one
-- in ForkIOScheduler.)
[ReceiveMessage] :: Process r (ResumeProcess Dynamic)
-- | Cons Process onto a list of effects.
type ConsProcess r = Process r : r
-- | Every Process action returns it's actual result wrapped in this
-- type. It will allow to signal errors as well as pass on normal results
-- such as incoming messages.
data ResumeProcess v
-- | The process is required to exit.
[ShutdownRequested] :: ResumeProcess v
-- | The process is required to exit from an error condition, that cannot
-- be recovered from.
[OnError] :: String -> ResumeProcess v
-- | The process may resume to do work, using the given result.
[ResumeWith] :: a -> ResumeProcess a
-- | This indicates that the action did not complete, and maybe retried
[RetryLastAction] :: ResumeProcess v
-- | Every function for Process things needs such a proxy value for
-- the low-level effect list, i.e. the effects identified by
-- r in Process r : r, this might be
-- dependent on the scheduler implementation.
data SchedulerProxy :: [Type -> Type] -> Type
-- | Tell the typechecker what effects we have below Process
[SchedulerProxy] :: SchedulerProxy q
-- | Like SchedulerProxy but shorter
[SP] :: SchedulerProxy q
-- | Return a SchedulerProxy for a Process effect.
thisSchedulerProxy :: Eff (Process r : r) (SchedulerProxy r)
-- | Execute a and action and resume the process, retry the action,
-- shutdown the process or return an error.
executeAndCatch :: forall q r v. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Eff r (ResumeProcess v) -> Eff r (Either String v)
-- | Use executeAndResume to execute YieldProcess. Refer to
-- YieldProcess for more information.
yieldProcess :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ()
-- | Send a message to a process addressed by the ProcessId. See
-- SendMessage.
sendMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r ()
-- | Send a message to a process addressed by the ProcessId. See
-- SendMessage.
sendMessageAs :: forall o r q. (HasCallStack, SetMember Process (Process q) r, Typeable o) => SchedulerProxy q -> ProcessId -> o -> Eff r ()
-- | Send a message to a process addressed by the ProcessId. See
-- SendMessage. Return True if the process existed. I you
-- don't care, just sendMessage instead.
sendMessageChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r Bool
-- | Start a new process, the new process will execute an effect, the
-- function will return immediately with a ProcessId.
spawn :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q : q) () -> Eff r ProcessId
-- | Like spawn but return ().
spawn_ :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q : q) () -> Eff r ()
-- | Block until a message was received.
receiveMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r Dynamic
-- | Receive and cast the message to some Typeable instance.
receiveMessageAs :: forall a r q. (HasCallStack, Typeable a, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a
-- | Enter a loop to receive messages and pass them to a callback, until
-- the function returns False.
receiveLoop :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> (Either (Maybe String) Dynamic -> Eff r ()) -> Eff r ()
-- | Returns the ProcessId of the current process.
self :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ProcessId
-- | Exit a process addressed by the ProcessId. See
-- SendShutdown.
sendShutdown :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r ()
-- | Like sendShutdown, but also return True iff the
-- process to exit exists.
sendShutdownChecked :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r Bool
-- | Exit the process with an error.
exitWithError :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r a
-- | Exit the process.
exitNormally :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a
-- | Thrown an error, can be caught by catchRaisedError.
raiseError :: forall r q b. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r b
-- | Catch and handle an error raised by raiseError. Works
-- independent of the handler implementation.
catchRaisedError :: forall r q w. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> (String -> Eff r w) -> Eff r w -> Eff r w
-- | Like catchRaisedError it catches raiseError, but instead
-- of invoking a user provided handler, the result is wrapped into an
-- Either.
ignoreProcessError :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -> Eff r (Either String a)
instance GHC.Real.Real Control.Eff.Concurrent.Process.ProcessId
instance GHC.Real.Integral Control.Eff.Concurrent.Process.ProcessId
instance GHC.Enum.Enum Control.Eff.Concurrent.Process.ProcessId
instance GHC.Num.Num Control.Eff.Concurrent.Process.ProcessId
instance GHC.Enum.Bounded Control.Eff.Concurrent.Process.ProcessId
instance GHC.Classes.Ord Control.Eff.Concurrent.Process.ProcessId
instance GHC.Classes.Eq Control.Eff.Concurrent.Process.ProcessId
instance Data.Traversable.Traversable Control.Eff.Concurrent.Process.ResumeProcess
instance GHC.Classes.Ord v => GHC.Classes.Ord (Control.Eff.Concurrent.Process.ResumeProcess v)
instance GHC.Classes.Eq v => GHC.Classes.Eq (Control.Eff.Concurrent.Process.ResumeProcess v)
instance GHC.Show.Show v => GHC.Show.Show (Control.Eff.Concurrent.Process.ResumeProcess v)
instance GHC.Base.Functor Control.Eff.Concurrent.Process.ResumeProcess
instance Data.Foldable.Foldable Control.Eff.Concurrent.Process.ResumeProcess
instance GHC.Read.Read Control.Eff.Concurrent.Process.ProcessId
instance GHC.Show.Show Control.Eff.Concurrent.Process.ProcessId
-- | This module contains a mechanism to specify what kind of messages (aka
-- requests) a Server (Process) can handle, and if
-- the caller blocks and waits for an answer, which the server process
-- provides.
--
-- The type magic in the Api type familiy allows to define a
-- related set of requests along with the corresponding responses.
--
-- Request handling can be either blocking, if a response is requred, or
-- non-blocking.
--
-- A process can serve a specific Api instance by using the
-- functions provided by the Control.Eff.Concurrent.Api.Server
-- module.
--
-- To enable a process to use such a service, the functions
-- provided by the Control.Eff.Concurrent.Api.Client should be
-- used.
module Control.Eff.Concurrent.Api
-- | This data family defines an API, a communication interface description
-- between at least two processes. The processes act as servers or
-- client(s) regarding a specific instance of this type.
--
-- The first parameter is usually a user defined phantom type that
-- identifies the Api instance.
--
-- The second parameter specifies if a specific constructor of an
-- (GADT-like) Api instance is Synchronous, i.e. returns
-- a result and blocks the caller or if it is Asynchronous
--
-- Example:
--
--
-- data BookShop deriving Typeable
--
-- data instance Api BookShop r where
-- RentBook :: BookId -> Api BookShop ('Synchronous (Either RentalError RentalId))
-- BringBack :: RentalId -> Api BookShop 'Asynchronous
--
-- type BookId = Int
-- type RentalId = Int
-- type RentalError = String
--
-- | The (promoted) constructors of this type specify (at the type level)
-- the reply behavior of a specific constructor of an Api
-- instance.
data Synchronicity
-- | Specify that handling a request is a blocking operation with a
-- specific return type, e.g. ('Synchronous (Either RentalError
-- RentalId))
Synchronous :: Type -> Synchronicity
-- | Non-blocking, asynchronous, request handling
Asynchronous :: Synchronicity
-- | This is a tag-type that wraps around a ProcessId and holds an
-- Api index type.
newtype Server api
Server :: ProcessId -> Server api
[_fromServer] :: Server api -> ProcessId
fromServer :: forall api_ai3V api_aieo. Iso (Server api_ai3V) (Server api_aieo) ProcessId ProcessId
-- | Tag a ProcessId with an Api type index to mark it a
-- Server process handling that API
proxyAsServer :: proxy api -> ProcessId -> Server api
-- | Tag a ProcessId with an Api type index to mark it a
-- Server process handling that API
asServer :: forall api. ProcessId -> Server api
instance forall k (api :: k). GHC.Classes.Ord (Control.Eff.Concurrent.Api.Server api)
instance forall k (api :: k). GHC.Classes.Eq (Control.Eff.Concurrent.Api.Server api)
instance forall k (api :: k). GHC.Read.Read (Control.Eff.Concurrent.Api.Server api)
instance forall k (api :: k). Data.Typeable.Internal.Typeable api => GHC.Show.Show (Control.Eff.Concurrent.Api.Server api)
-- | Functions for Api clients.
--
-- This modules is required to write clients that consume an Api.
module Control.Eff.Concurrent.Api.Client
-- | Send an Api request that has no return value and return as fast
-- as possible. The type signature enforces that the corresponding
-- Api clause is Asynchronous.
cast :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o 'Asynchronous)) => SchedulerProxy q -> Server o -> Api o 'Asynchronous -> Eff r ()
-- | Send an Api request that has no return value and return as fast
-- as possible. The type signature enforces that the corresponding
-- Api clause is Asynchronous. Return True if the
-- message was sent to the process. Note that this is totally not the
-- same as that the request was successfully handled. If that is
-- important, use call instead.
castChecked :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o 'Asynchronous)) => SchedulerProxy q -> Server o -> Api o 'Asynchronous -> Eff r Bool
-- | Send an Api request and wait for the server to return a result
-- value.
--
-- The type signature enforces that the corresponding Api clause
-- is Synchronous.
call :: forall result api r q. (SetMember Process (Process q) r, Typeable api, Typeable (Api api ( 'Synchronous result)), Typeable result, HasCallStack) => SchedulerProxy q -> Server api -> Api api ( 'Synchronous result) -> Eff r result
-- | Like cast but take the Server from the reader provided
-- by registerServer.
castRegistered :: (Typeable o, ServesApi o r q) => SchedulerProxy q -> Api o 'Asynchronous -> Eff r ()
-- | Like call but take the Server from the reader provided
-- by registerServer.
callRegistered :: (Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o ( 'Synchronous reply) -> Eff r reply
-- | Like callRegistered but also catch errors raised if e.g. the
-- server crashed. By allowing Alternative instances to contain
-- the reply, application level errors can be combined with errors rising
-- from inter process communication.
callRegisteredA :: forall r q o f reply. (Alternative f, Typeable f, Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o ( 'Synchronous (f reply)) -> Eff r (f reply)
-- | Instead of passing around a Server value and passing to
-- functions like cast or call, a Server can
-- provided by a Reader effect, if there is only a single
-- server for a given Api instance. This type alias is
-- convenience to express that an effect has Process and a reader
-- for a Server.
type ServesApi o r q = (Typeable o, SetMember Process (Process q) r, Member (Reader (Server o)) r)
-- | Run a reader effect that contains the one server handling a
-- specific Api instance.
registerServer :: Server o -> Eff (Reader (Server o) : r) a -> Eff r a
module Control.Eff.Concurrent.Process.Interactive
-- | This module provides support for executing Process actions from
-- IO.
--
-- One use case is interacting with processes from the REPL, e.g.:
--
--
-- >>> import Control.Eff.Concurrent.Process.SingleThreadedScheduler (defaultMain)
--
--
--
-- >>> import Data.Dynamic
--
--
--
-- >>> import Data.Maybe
--
--
--
-- >>> s <- forkInteractiveScheduler Control.Eff.Concurrent.Process.SingleThreadedScheduler.defaultMain
--
--
--
-- >>> fooPid <- submit s (spawn (forever (receiveMessage SP >>= (logMsg . fromMaybe "Huh!??" . fromDynamic))))
--
--
--
-- >>> fooPid
-- <0.1.0>
--
--
--
-- >>> submit s (sendMessageAs SP fooPid "test")
-- test
--
--
--
-- >>> submit s (sendShutdown SP fooPid)
--
--
-- Contains the communication channels to interact with a scheduler
-- running in its' own thread.
data SchedulerSession r
-- | Fork a scheduler with a process that communicates with it via
-- MVar, which is also the reason for the Lift IO
-- constraint.
forkInteractiveScheduler :: forall r. (SetMember Lift (Lift IO) r) => (Eff (Process r : r) () -> IO ()) -> IO (SchedulerSession r)
-- | Exit the schedulder immediately using an asynchronous exception.
killInteractiveScheduler :: SchedulerSession r -> IO ()
-- | Send a Process effect to the main process of a scheduler, this
-- blocks until the effect is executed.
submit :: forall r a. (SetMember Lift (Lift IO) r) => SchedulerSession r -> Eff (Process r : r) a -> IO a
-- | Combination of submit and cast.
submitCast :: forall o r. (SetMember Lift (Lift IO) r, Typeable o) => SchedulerSession r -> Server o -> Api o 'Asynchronous -> IO ()
-- | Combination of submit and cast.
submitCall :: forall o q r. (SetMember Lift (Lift IO) r, Typeable o, Typeable q) => SchedulerSession r -> Server o -> Api o ( 'Synchronous q) -> IO q
-- | Add-ons to Exception
module Control.Eff.ExceptionExtra
-- | Catch Exception thrown by an effect.
liftTry :: forall e r a. (HasCallStack, Exception e, SetMember Lift (Lift IO) r) => Eff r a -> Eff r (Either e a)
-- | Functions to implement Api servers.
module Control.Eff.Concurrent.Api.Server
-- | Receive and process incoming requests until the process exits, using
-- an ApiHandler.
serve :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Eff r ()
-- | A record of callbacks, handling requests sent to a server
-- Process, all belonging to a specific Api family
-- instance.
data ApiHandler p r
[ApiHandler] :: {_handleCast :: HasCallStack => Api p 'Asynchronous -> Eff r () A cast will not return a result directly. This is used for async methods., _handleCall :: forall x. HasCallStack => Api p ( 'Synchronous x) -> (x -> Eff r ()) -> Eff r () A call is a blocking operation, the caller is blocked until this handler calls the reply continuation., _handleTerminate :: HasCallStack => Maybe String -> Eff r () This callback is called with @Nothing@ if the process exits peacefully, or @Just "error message..."@ if the process exits with an error. This function is responsible to exit the process if necessary. The default behavior is defined in 'defaultTermination'.} -> ApiHandler p r
-- | A default handler to use in _handleCall in ApiHandler.
-- It will call raiseError with a nice error message.
unhandledCallError :: forall p x r q. (Show (Api p ( 'Synchronous x)), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p ( 'Synchronous x) -> (x -> Eff r ()) -> Eff r ()
-- | A default handler to use in _handleCast in ApiHandler.
-- It will call raiseError with a nice error message.
unhandledCastError :: forall p r q. (Show (Api p 'Asynchronous), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p 'Asynchronous -> Eff r ()
-- | Exit the process either normally of the error message is
-- Nothing or with exitWithError otherwise.
defaultTermination :: forall q r. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Maybe String -> Eff r ()
-- | serve two ApiHandlers at once. The first handler is used
-- for termination handling.
serveBoth :: forall r q p1 p2. (Typeable p1, Typeable p2, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> Eff r ()
-- | serve three ApiHandlers at once. The first handler is
-- used for termination handling.
serve3 :: forall r q p1 p2 p3. (Typeable p1, Typeable p2, Typeable p3, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> ApiHandler p3 r -> Eff r ()
-- | The basic building block of the combination of ApiHandlers is
-- this function, which can not only be passed to receiveLoop, but
-- also throws an UnhandledRequest exception if
-- requestFromDynamic failed, such that multiple invokation of
-- this function for different ApiHandlers can be tried, by using
-- catchUnhandled.
--
--
-- tryApiHandler px handlers1 message
-- `catchUnhandled`
-- tryApiHandler px handlers2
-- `catchUnhandled`
-- tryApiHandler px handlers3
--
tryApiHandler :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Dynamic -> Eff (Exc UnhandledRequest : r) ()
-- | An exception that is used by the mechanism that chains together
-- multiple different ApiHandler allowing a single process to
-- implement multiple Apis. This exception is thrown by
-- requestFromDynamic. This exception can be caught with
-- catchUnhandled, this way, several distinct ApiHandler
-- can be tried until one fits or until the exitUnhandled
-- is invoked.
data UnhandledRequest
-- | If requestFromDynamic failes to cast the message to a
-- Request for a certain ApiHandler it throws an
-- UnhandledRequest exception. That exception is caught by this
-- function and the raw message is given to the handler function. This is
-- the basis for chaining ApiHandlers.
catchUnhandled :: forall r a. (Member (Exc UnhandledRequest) r, HasCallStack) => Eff r a -> (Dynamic -> Eff r a) -> Eff r a
-- | Catch UnhandledRequests and terminate the process with an
-- error, if necessary.
ensureAllHandled :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff (Exc UnhandledRequest : r) () -> Eff r ()
-- | Cast a Dynamic value, and if that fails, throw an
-- UnhandledRequest error.
requestFromDynamic :: forall r a. (HasCallStack, Typeable a, Member (Exc UnhandledRequest) r) => Dynamic -> Eff r a
-- | If an incoming message could not be casted to a Request
-- corresponding to an ApiHandler (e.g. with
-- requestFromDynamic) one should use this function to exit the
-- process with a corresponding error.
exitUnhandled :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Dynamic -> Eff r ()
-- | A logging effect based on MonadLog.
module Control.Eff.Log
-- | Logging effect type, parameterized by a log message type.
data Logs message a
[LogMsg] :: message -> Logs message ()
-- | Log a message.
logMsg :: Member (Logs m) r => m -> Eff r ()
-- | Change, add or remove log messages.
--
-- Requirements:
--
--
-- - All log meta data for typical prod code can be added without
-- changing much of the code
-- - Add timestamp to a log messages of a sub-computation.
-- - Write some messages to a file.
-- - Log something extra, e.g. runtime memory usage in load tests
--
--
-- Approach: Install a callback that sneaks into to log message
-- sending/receiving, to intercept the messages and execute some code and
-- then return a new message.
foldLog :: forall r m a. Member (Logs m) r => (m -> Eff r ()) -> Eff r a -> Eff r a
-- | Change, add or remove log messages without side effects, faster than
-- foldLog.
--
-- Requirements:
--
--
-- - Tests run fast in unit tests so travis won't time out
-- - Drop debug logs
-- - Grep like log filtering
--
--
-- Approach: Install a callback that sneaks into to log message
-- sending/receiving, to intercept the messages and execute some code and
-- then return a new message.
foldLogFast :: forall r m a f. (Foldable f, Member (Logs m) r) => (m -> f m) -> Eff r a -> Eff r a
-- | Capture the logs in a Seq.
captureLogs :: Eff (Logs message : r) a -> Eff r (a, Seq message)
-- | Throw away all log messages.
ignoreLogs :: Eff (Logs message : r) a -> Eff r a
-- | Handle Logs effects using LoggingT Handlers.
handleLogsWith :: forall m r message a. (Monad m, SetMember Lift (Lift m) r) => Eff (Logs message : r) a -> (forall b. (Handler m message -> m b) -> m b) -> Eff r a
-- | A log channel processes logs from the Logs effect by en-queuing
-- them in a shared queue read from a seperate processes. A channel can
-- contain log message filters.
data LogChannel message
-- | Send the log messages to a LogChannel.
logToChannel :: forall r message a. (SetMember Lift (Lift IO) r) => LogChannel message -> Eff (Logs message : r) a -> Eff r a
-- | Create a LogChannel that will discard all messages sent via
-- forwardLogstochannel or logChannelPutIO.
noLogger :: LogChannel message
-- | Fork a new process, that applies a monadic action to all log messages
-- sent via logToChannel or logChannelPutIO.
forkLogger :: forall message. (Typeable message, Show message) => Int -> (message -> IO ()) -> Maybe message -> IO (LogChannel message)
-- | Filter logs sent to a LogChannel using a predicate.
filterLogChannel :: (message -> Bool) -> LogChannel message -> LogChannel message
-- | Close a log channel created by e.g. forkLogger. Message already
-- enqueue are handled, as well as an optional final message. Subsequent
-- log message will not be handled anymore. If the log channel must be
-- closed immediately, use killLogChannel instead.
joinLogChannel :: (Show message, Typeable message) => Maybe message -> LogChannel message -> IO ()
-- | Close a log channel quickly, without logging messages already in the
-- queue. Subsequent logging requests will not be handled anymore. If the
-- log channel must be closed without loosing any messages, use
-- joinLogChannel instead.
killLogChannel :: (Show message, Typeable message) => Maybe message -> LogChannel message -> IO ()
-- | Run an action and close a LogChannel created by
-- noLogger, forkLogger or filterLogChannel
-- afterwards using joinLogChannel. If a SomeException was
-- thrown, the log channel is killed with killLogChannel, and the
-- exception is re-thrown.
closeLogChannelAfter :: (Show message, Typeable message, IsString message) => Maybe message -> LogChannel message -> IO a -> IO a
-- | Wrap LogChannel creation and destruction around a monad action
-- in brackety manner. This function uses joinLogChannel,
-- so en-queued messages are flushed on exit. The resulting action in in
-- the LoggingT monad, which is essentially a reader for the log
-- handler function.
logChannelBracket :: (Show message, Typeable message) => Int -> Maybe message -> Maybe message -> (LogChannel message -> IO a) -> LoggingT message IO a
-- | Enqueue a log message into a log channel
logChannelPutIO :: LogChannel message -> message -> IO ()
instance GHC.Show.Show m => GHC.Show.Show (Control.Eff.Log.KillLogChannelException m)
instance GHC.Show.Show m => GHC.Show.Show (Control.Eff.Log.JoinLogChannelException m)
instance (Data.Typeable.Internal.Typeable m, GHC.Show.Show m) => GHC.Exception.Exception (Control.Eff.Log.KillLogChannelException m)
instance (Data.Typeable.Internal.Typeable m, GHC.Show.Show m) => GHC.Exception.Exception (Control.Eff.Log.JoinLogChannelException m)
-- | A coroutine based, single threaded scheduler for Processes.
module Control.Eff.Concurrent.Process.SingleThreadedScheduler
-- | Execute a Process and all the other processes spawned by
-- it in the current thread concurrently, using a co-routine based,
-- round-robin scheduler. If a process exits with exitNormally,
-- exitWithError, raiseError or is killed by another
-- process Left ... is returned. Otherwise, the result will be
-- wrapped in a Right.
schedule :: forall r finalResult. Eff r () -> Eff (Process r : r) finalResult -> Eff r (Either String finalResult)
-- | Like schedule but pure. The yield effect is
-- just return (). schedulePure == run .
-- schedule (return ()) @since 0.3.0.2
schedulePure :: Eff (ConsProcess '[]) a -> Either String a
-- | Invoke schedule with lift yield as yield
-- effect. @since 0.3.0.2
scheduleIO :: SetMember Lift (Lift IO) r => Eff (ConsProcess r) a -> Eff r (Either String a)
-- | Like schedulePure but with logging. scheduleWithLogging ==
-- run . captureLogs . schedule (return ())
-- @since 0.3.0.2
scheduleWithLogging :: Eff (ConsProcess '[Logs m]) a -> (Either String a, Seq m)
-- | Execute a Process using schedule on top of Lift
-- IO and Logs String effects.
defaultMain :: HasCallStack => Eff '[Process '[Logs String, Lift IO], Logs String, Lift IO] () -> IO ()
-- | A SchedulerProxy for LoggingAndIo.
singleThreadedIoScheduler :: SchedulerProxy LoggingAndIo
-- | The concrete list of Effects for running this pure scheduler on
-- IO and with string logging.
type LoggingAndIo = '[Logs String, Lift IO]
-- | Implement Erlang style message passing concurrency.
--
-- This handles the MessagePassing and Process effects,
-- using TQueues and forkIO.
--
-- This aims to be a pragmatic implementation, so even logging is
-- supported.
--
-- At the core is a main process that enters schedule and
-- creates all of the internal state stored in TVars to manage
-- processes with message queues.
--
-- The Eff handler for Process and MessagePassing
-- use are implemented and available through spawn.
--
-- spawn uses forkFinally and TQueues and tries to
-- catch most exceptions.
module Control.Eff.Concurrent.Process.ForkIOScheduler
-- | This is the main entry point to running a message passing concurrency
-- application. This function takes a Process on top of the
-- SchedulerIO effect and a LogChannel for concurrent
-- logging.
schedule :: Eff (ConsProcess SchedulerIO) () -> LogChannel String -> IO ()
-- | Start the message passing concurrency system then execute a
-- Process on top of SchedulerIO effect. All logging is
-- sent to standard output.
defaultMain :: Eff (ConsProcess SchedulerIO) () -> IO ()
-- | Start the message passing concurrency system then execute a
-- Process on top of SchedulerIO effect. All logging is
-- sent to standard output.
defaultMainWithLogChannel :: LogChannel String -> Eff (ConsProcess SchedulerIO) () -> IO ()
-- | A sum-type with errors that can occur when scheduleing messages.
data SchedulerError
-- | No ProcessInfo was found for a ProcessId during internal
-- processing. NOTE: This is **ONLY** caused by internal errors, probably
-- by an incorrect MessagePassing handler in this module.
-- **Sending a message to a process ALWAYS succeeds!** Even if the
-- process does not exist.
ProcessNotFound :: ProcessId -> SchedulerError
-- | A process called raiseError.
ProcessRaisedError :: String -> SchedulerError
-- | A process called exitWithError.
ProcessExitError :: String -> SchedulerError
-- | A process exits.
ProcessShuttingDown :: SchedulerError
-- | An action was not performed while the scheduler was exiting.
SchedulerShuttingDown :: SchedulerError
-- | The concrete list of Effects for this scheduler implementation.
-- See HasSchedulerIO
type SchedulerIO = '[Reader SchedulerVar, Logs String, Lift IO]
-- | A SchedulerProxy for SchedulerIO
forkIoScheduler :: SchedulerProxy SchedulerIO
-- | An alias for the constraints for the effects essential to this
-- scheduler implementation, i.e. these effects allow spawning new
-- Processes. See SchedulerIO
type HasSchedulerIO r = (HasCallStack, SetMember Lift (Lift IO) r, Member (Logs String) r, Member (Reader SchedulerVar) r)
instance GHC.Show.Show Control.Eff.Concurrent.Process.ForkIOScheduler.SchedulerError
instance GHC.Exception.Exception Control.Eff.Concurrent.Process.ForkIOScheduler.SchedulerError
instance GHC.Show.Show Control.Eff.Concurrent.Process.ForkIOScheduler.ProcessInfo
-- | Observer Effects
--
-- This module supports the implementation of observers and observables.
-- One use case is event propagation. The tools in this module are
-- tailored towards Api servers/clients.
module Control.Eff.Concurrent.Api.Observer
-- | An Api index that support observation of the another Api
-- that is Observable.
class (Typeable p, Observable o) => Observer p o
-- | Wrap the Observation and the ProcessId (i.e. the
-- Server) that caused the observation into a Api value
-- that the Observable understands.
observationMessage :: Observer p o => Server o -> Observation o -> Api p 'Asynchronous
-- | An Api index that supports registration and de-registration of
-- Observers.
class (Typeable o, Typeable (Observation o)) => Observable o where {
data family Observation o;
}
-- | Return the Api value for the cast_ that registeres an
-- observer
registerObserverMessage :: Observable o => SomeObserver o -> Api o 'Asynchronous
-- | Return the Api value for the cast_ that de-registeres
-- an observer
forgetObserverMessage :: Observable o => SomeObserver o -> Api o 'Asynchronous
-- | Send an Observation to an Observer
notifyObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Observation o -> Eff r ()
-- | Send the registerObserverMessage
registerObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Eff r ()
-- | Send the forgetObserverMessage
forgetObserver :: (SetMember Process (Process q) r, Observable o, Observer p o) => SchedulerProxy q -> Server p -> Server o -> Eff r ()
-- | An existential wrapper around a Server of an Observer.
-- Needed to support different types of observers to observe the same
-- Observable in a general fashion.
data SomeObserver o
[SomeObserver] :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o
-- | Send an Observation to SomeObserver.
notifySomeObserver :: (SetMember Process (Process q) r, Observable o, HasCallStack) => SchedulerProxy q -> Server o -> Observation o -> SomeObserver o -> Eff r ()
-- | Internal state for manageobservers
data Observers o
-- | Keep track of registered Observers Observers can be added and
-- removed, and an Observation can be sent to all registerd
-- observers at once.
manageObservers :: Eff (State (Observers o) : r) a -> Eff r a
-- | Add an Observer to the Observers managed by
-- manageObservers.
addObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r ()
-- | Delete an Observer from the Observers managed by
-- manageObservers.
removeObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r ()
-- | Send an Observation to all SomeObservers in the
-- Observers state.
notifyObservers :: forall o r q. (Observable o, SetMember Process (Process q) r, Member (State (Observers o)) r) => SchedulerProxy q -> Observation o -> Eff r ()
-- | An Observer that schedules the observations to an effectful
-- callback.
data CallbackObserver o
-- | Start a new process for an Observer that schedules all
-- observations to an effectful callback.
spawnCallbackObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs String) q) => SchedulerProxy q -> (Server o -> Observation o -> Eff (Process q : q) Bool) -> Eff r (Server (CallbackObserver o))
-- | Use spawnCallbackObserver to create a universal logging
-- observer, using the Show instance of the Observation. |
-- Start a new process for an Observer that schedules all
-- observations to an effectful callback.
spawnLoggingObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs String) q) => SchedulerProxy q -> Eff r (Server (CallbackObserver o))
instance GHC.Show.Show (Control.Eff.Concurrent.Api.Observer.SomeObserver o)
instance GHC.Show.Show (Control.Eff.Concurrent.Api.Observer.Observation o) => GHC.Show.Show (Control.Eff.Concurrent.Api.Api (Control.Eff.Concurrent.Api.Observer.CallbackObserver o) r)
instance Control.Eff.Concurrent.Api.Observer.Observable o => Control.Eff.Concurrent.Api.Observer.Observer (Control.Eff.Concurrent.Api.Observer.CallbackObserver o) o
instance GHC.Classes.Ord (Control.Eff.Concurrent.Api.Observer.SomeObserver o)
instance GHC.Classes.Eq (Control.Eff.Concurrent.Api.Observer.SomeObserver o)