-- Hoogle documentation, generated by Haddock
-- See Hoogle, http://www.haskell.org/hoogle/
-- | Message passing concurrency as extensible-effect
--
-- Please see the README on GitHub at
-- https://github.com/sheyll/extensible-effects-concurrent#readme
@package extensible-effects-concurrent
@version 0.2.0.3
-- | The message passing effect.
--
-- This module describes an abstract message passing effect, and a
-- process effect, mimicking Erlang's process and message semantics.
--
-- An implementation of a handler for these effects can be found in
-- ForkIOScheduler.
module Control.Eff.Concurrent.Process
-- | Each process is identified by a single process id, that stays constant
-- throughout the life cycle of a process. Also, message sending relies
-- on these values to address messages to processes.
newtype ProcessId
ProcessId :: Int -> ProcessId
[_fromProcessId] :: ProcessId -> Int
fromProcessId :: Iso' ProcessId Int
-- | The process effect is the basis for message passing concurrency. This
-- effect describes an interface for concurrent, communicating isolated
-- processes identified uniquely by a process-id.
--
-- Processes can raise exceptions that can be caught, exit gracefully or
-- with an error, or be killed by other processes, with the option of
-- ignoring the shutdown request.
--
-- Process Scheduling is implemented in different modules. All scheduler
-- implementations should follow some basic rules:
--
--
-- - fair scheduling
-- - sending a message does not block
-- - receiving a message does block
-- - spawning a child blocks only a very moment
-- - a newly spawned process shall be scheduled before the parent
-- process after
-- - the spawn
-- - when the first process exists, all process should be killed
-- immediately
--
data Process (r :: [Type -> Type]) b
-- | Return the current ProcessId
[SelfPid] :: Process r (ResumeProcess ProcessId)
-- | Start a new process, the new process will execute an effect, the
-- function will return immediately with a ProcessId.
[Spawn] :: Eff (Process r : r) () -> Process r (ResumeProcess ProcessId)
-- | Process exit, this is the same as if the function that was applied to
-- a spawn function returned.
[Shutdown] :: Process r a
-- | Exit the process due to an error, this cannot be caught.
[ExitWithError] :: String -> Process r b
-- | Raise an error, that can be handled.
[RaiseError] :: String -> Process r b
-- | Request that another a process exits. The targeted process is
-- interrupted and gets a ShutdownRequested, the target process
-- may decide to ignore the shutdown requests.
[SendShutdown] :: ProcessId -> Process r (ResumeProcess Bool)
-- | Send a message to a process addressed by the ProcessId. Sending
-- a message should **always succeed** and return **immediately**, even
-- if the destination process does not exist, or does not accept messages
-- of the given type.
[SendMessage] :: ProcessId -> Dynamic -> Process r (ResumeProcess Bool)
-- | Receive a message. This should block until an a message was received.
-- The pure function may convert the incoming message into something, and
-- the result is returned as ProcessMessage value. Another
-- reason why this function returns, is if a process control message was
-- sent to the process. This can only occur from inside the runtime
-- system, aka the effect handler implementation. (Currently there is one
-- in ForkIOScheduler.)
[ReceiveMessage] :: Process r (ResumeProcess Dynamic)
-- | Cons Process onto a list of effects.
type ConsProcess r = Process r : r
-- | Every Process action returns it's actual result wrapped in this
-- type. It will allow to signal errors as well as pass on normal results
-- such as incoming messages.
data ResumeProcess v
-- | The process is required to exit.
[ShutdownRequested] :: ResumeProcess v
-- | The process is required to exit from an error condition, that cannot
-- be recovered from.
[OnError] :: String -> ResumeProcess v
-- | The process may resume to do work, using the given result.
[ResumeWith] :: a -> ResumeProcess a
-- | This indicates that the action did not complete, and maybe retried
[RetryLastAction] :: ResumeProcess v
-- | Every function for Process things needs proxy for the low-level
-- effect list depending on the scheduler implementation. I don't know a
-- smarter way yet to do this.
data SchedulerProxy :: [Type -> Type] -> Type
[SchedulerProxy] :: SchedulerProxy q
-- | Return a SchedulerProxy for a Process effect.
thisSchedulerProxy :: Eff (Process r : r) (SchedulerProxy r)
-- | Execute a and action and resume the process, retry the action,
-- shutdown the process or return an error.
yieldAndCatchProcess :: forall q r v. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Eff r (ResumeProcess v) -> Eff r (Either String v)
-- | Send a message to a process addressed by the ProcessId. See
-- SendMessage.
sendMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r Bool
-- | Start a new process, the new process will execute an effect, the
-- function will return immediately with a ProcessId.
spawn :: forall r q. (HasCallStack, SetMember Process (Process q) r) => Eff (Process q : q) () -> Eff r ProcessId
-- | Block until a message was received.
receiveMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r Dynamic
-- | Receive and cast the message to some Typeable instance.
receiveMessageAs :: forall a r q. (HasCallStack, Typeable a, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a
-- | Enter a loop to receive messages and pass them to a callback, until
-- the function returns False.
receiveLoop :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> (Either (Maybe String) Dynamic -> Eff r ()) -> Eff r ()
-- | Returns the ProcessId of the current process.
self :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ProcessId
-- | Send a message to a process addressed by the ProcessId. See
-- SendMessage.
sendShutdown :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ProcessId -> Eff r Bool
-- | Exit the process with an error.
exitWithError :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r a
-- | Exit the process.
exitNormally :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a
-- | Thrown an error, can be caught by catchRaisedError.
raiseError :: forall r q b. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r b
-- | Catch and handle an error raised by raiseError. Works
-- independent of the handler implementation.
catchRaisedError :: forall r q w. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> (String -> Eff r w) -> Eff r w -> Eff r w
-- | Like catchRaisedError it catches raiseError, but instead
-- of invoking a user provided handler, the result is wrapped into an
-- Either.
ignoreProcessError :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -> Eff r (Either String a)
instance GHC.Real.Real Control.Eff.Concurrent.Process.ProcessId
instance GHC.Real.Integral Control.Eff.Concurrent.Process.ProcessId
instance GHC.Enum.Enum Control.Eff.Concurrent.Process.ProcessId
instance GHC.Num.Num Control.Eff.Concurrent.Process.ProcessId
instance GHC.Enum.Bounded Control.Eff.Concurrent.Process.ProcessId
instance GHC.Classes.Ord Control.Eff.Concurrent.Process.ProcessId
instance GHC.Classes.Eq Control.Eff.Concurrent.Process.ProcessId
instance Data.Traversable.Traversable Control.Eff.Concurrent.Process.ResumeProcess
instance GHC.Classes.Ord v => GHC.Classes.Ord (Control.Eff.Concurrent.Process.ResumeProcess v)
instance GHC.Classes.Eq v => GHC.Classes.Eq (Control.Eff.Concurrent.Process.ResumeProcess v)
instance GHC.Show.Show v => GHC.Show.Show (Control.Eff.Concurrent.Process.ResumeProcess v)
instance GHC.Base.Functor Control.Eff.Concurrent.Process.ResumeProcess
instance Data.Foldable.Foldable Control.Eff.Concurrent.Process.ResumeProcess
instance GHC.Read.Read Control.Eff.Concurrent.Process.ProcessId
instance GHC.Show.Show Control.Eff.Concurrent.Process.ProcessId
-- | This module contains a mechanisms to specify what kind of messages a
-- process can receive and possible answer by sending a message back to
-- the orginator.
--
-- A message can be either a blocking or a non-blocking request.
--
-- The type magic in the Api type familiy allows to define a
-- related set of messages along with the corresponding responses.
--
-- A process can serve a specific Api instance by using the
-- functions provided by the Control.Eff.Concurrent.Api.Server
-- module.
--
-- To enable a process to use such a service, the functions
-- provided by the Control.Eff.Concurrent.Api.Client should be
-- used.
module Control.Eff.Concurrent.Api
-- | This data family defines an API, a communication interface description
-- between to processes, where one process acts as a server and
-- the other(s) as client(s).
--
-- The first parameter is usually a user defined phantom type that
-- identifies the Api instance.
--
-- The second parameter specifies if a specific constructor of an
-- (GADT-like) Api instance is Synchronous, i.e. returns
-- a result and blocks the caller or if it is Asynchronous
--
-- Example:
--
--
-- data BookShop deriving Typeable
--
-- data instance Api BookShop r where
-- RentBook :: BookId -> Api BookShop ('Synchronous (Either RentalError RentalId))
-- BringBack :: RentalId -> Api BookShop 'Asynchronous
--
-- type BookId = Int
-- type RentalId = Int
-- type RentalError = String
--
-- | This data kind is used to indicate at the type level if a
-- specific constructor of an Api instance has a result for
-- which some caller has to wait, or if it is asynchronous.
data Synchronicity
-- | Blocking operation with a specific return type, e.g. ('Synchronous
-- (Either RentalError RentalId))
Synchronous :: Type -> Synchronicity
-- | Non-blocking async operation
Asynchronous :: Synchronicity
-- | This is a tag-type that wraps around a ProcessId and holds an
-- Api index type.
newtype Server api
Server :: ProcessId -> Server api
[_fromServer] :: Server api -> ProcessId
fromServer :: forall api_ahBr api_ahLU. Iso (Server api_ahBr) (Server api_ahLU) ProcessId ProcessId
-- | Tag a ProcessId with an Api type index to mark it a
-- Server process handling that API
proxyAsServer :: proxy api -> ProcessId -> Server api
-- | Tag a ProcessId with an Api type index to mark it a
-- Server process handling that API
asServer :: forall api. ProcessId -> Server api
instance forall k (api :: k). GHC.Classes.Ord (Control.Eff.Concurrent.Api.Server api)
instance forall k (api :: k). GHC.Classes.Eq (Control.Eff.Concurrent.Api.Server api)
instance forall k (api :: k). GHC.Read.Read (Control.Eff.Concurrent.Api.Server api)
instance forall k (api :: k). Data.Typeable.Internal.Typeable api => GHC.Show.Show (Control.Eff.Concurrent.Api.Server api)
-- | Functions for Api clients.
--
-- This modules is required to write clients that consume an Api.
module Control.Eff.Concurrent.Api.Client
-- | Send an Api request that has no return value and return as fast
-- as possible. The type signature enforces that the corresponding
-- Api clause is Asynchronous.
cast :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o 'Asynchronous)) => SchedulerProxy q -> Server o -> Api o 'Asynchronous -> Eff r ()
-- | Send an Api request that has no return value and return as fast
-- as possible. The type signature enforces that the corresponding
-- Api clause is Asynchronous. Return True if the
-- message was sent to the process. Note that this is totally not the
-- same as that the request was successfully handled. If that is
-- important, use call instead.
castChecked :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Typeable o, Typeable (Api o 'Asynchronous)) => SchedulerProxy q -> Server o -> Api o 'Asynchronous -> Eff r Bool
-- | Send an Api request and wait for the server to return a result
-- value.
--
-- The type signature enforces that the corresponding Api clause
-- is Synchronous.
call :: forall result api r q. (SetMember Process (Process q) r, Typeable api, Typeable (Api api ( 'Synchronous result)), Typeable result, HasCallStack) => SchedulerProxy q -> Server api -> Api api ( 'Synchronous result) -> Eff r result
-- | Like cast but take the Server from the reader provided
-- by registerServer.
castRegistered :: (Typeable o, ServesApi o r q) => SchedulerProxy q -> Api o 'Asynchronous -> Eff r ()
-- | Like call but take the Server from the reader provided
-- by registerServer.
callRegistered :: (Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o ( 'Synchronous reply) -> Eff r reply
-- | Like callRegistered but also catch errors raised if e.g. the
-- server crashed. By allowing Alternative instances to contain
-- the reply, application level errors can be combined with errors rising
-- from inter process communication.
callRegisteredA :: forall r q o f reply. (Alternative f, Typeable f, Typeable reply, ServesApi o r q) => SchedulerProxy q -> Api o ( 'Synchronous (f reply)) -> Eff r (f reply)
-- | Instead of passing around a Server value and passing to
-- functions like cast or call, a Server can
-- provided by a Reader effect, if there is only a single
-- server for a given Api instance. This type alias is
-- convenience to express that an effect has Process and a reader
-- for a Server.
type ServesApi o r q = (Typeable o, SetMember Process (Process q) r, Member (Reader (Server o)) r)
-- | Run a reader effect that contains the one server handling a
-- specific Api instance.
registerServer :: Server o -> Eff (Reader (Server o) : r) a -> Eff r a
-- | Add-ons to Exception
module Control.Eff.ExceptionExtra
-- | Catch an exception and return it in an Either.
try :: forall e r a. Member (Exc e) r => Eff r a -> Eff r (Either e a)
-- | Lift an IO action and catch all error using try then wrap the
-- Exception using a given wrapper function and rethrow it using
-- throwError.
liftRethrow :: forall proxy e r a. (Exception e, SetMember Lift (Lift IO) r, SetMember Exc (Exc e) r) => proxy e -> IO a -> Eff r a
-- | Run an effect with exceptions like runError and rethrow it as
-- SomeException using throw
runErrorRethrowIO :: forall e r a. (Exception e, SetMember Lift (Lift IO) r) => Eff (Exc e : r) a -> Eff r a
-- | Lift an IO action and catch all errors with try with a pure
-- handler.
liftCatch :: forall e r a. (HasCallStack, Exception e, SetMember Lift (Lift IO) r) => (e -> a) -> IO a -> Eff r a
-- | Lift an IO action and catch all errors with try with an effect
-- handler.
liftCatchEff :: forall e r a. (SetMember Lift (Lift IO) r, HasCallStack, Exception e) => (e -> Eff r a) -> IO a -> Eff r a
-- | Catch Exception thrown by an effect.
liftTry :: forall e r a. (HasCallStack, Exception e, SetMember Lift (Lift IO) r) => Eff r a -> Eff r (Either e a)
module Control.Eff.Interactive
data Interaction a
[PrintLine] :: String -> Interaction ()
[ReadLine] :: (String -> a) -> Interaction a
[Step] :: Interaction ()
printLine :: (Member (Program Interaction) r, HasCallStack) => String -> Eff r ()
printStep :: (Member (Program Interaction) r, HasCallStack) => String -> Eff r ()
promptStep :: (Member (Program Interaction) r, HasCallStack) => String -> (String -> a) -> Eff r a
step :: (Member (Program Interaction) r, HasCallStack) => Eff r ()
class Interactive f
singleSteps :: (Interactive f, Member (Program Interaction) r, HasCallStack) => f a -> Eff r a
interactiveProgram :: (HasCallStack, Member (Program Interaction) r, Interactive f) => Eff (Program f : r) a -> Eff r a
runInteractionIOE :: (SetMember Lift (Lift IO) r, HasCallStack) => Eff (Program Interaction : r) a -> Eff r a
runInteractionIO :: Eff '[Program Interaction, Lift IO] a -> IO a
-- | Functions to implement Api servers.
module Control.Eff.Concurrent.Api.Server
-- | Receive and process incoming requests until the process exits, using
-- an ApiHandler.
serve :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Eff r ()
-- | A record of callbacks requests to a server, serving a specific
-- Api family instance.
data ApiHandler p r
[ApiHandler] :: {_handleCast :: HasCallStack => Api p 'Asynchronous -> Eff r () A cast will not return a result directly. This is used for async methods., _handleCall :: forall x. HasCallStack => Api p ( 'Synchronous x) -> (x -> Eff r Bool) -> Eff r () A call is a blocking operation, the caller is blocked until this handler calls the reply continuation., _handleTerminate :: HasCallStack => Maybe String -> Eff r () This callback is called with @Nothing@ if the process exits peacefully, or @Just "error message..."@ if the process exits with an error. This function is responsible to exit the process if necessary. The default behavior is defined in 'defaultTermination'.} -> ApiHandler p r
-- | A default handler to use in _handleCall in ApiHandler.
-- It will call raiseError with a nice error message.
unhandledCallError :: forall p x r q. (Show (Api p ( 'Synchronous x)), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p ( 'Synchronous x) -> (x -> Eff r Bool) -> Eff r ()
-- | A default handler to use in _handleCast in ApiHandler.
-- It will call raiseError with a nice error message.
unhandledCastError :: forall p r q. (Show (Api p 'Asynchronous), Typeable p, HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Api p 'Asynchronous -> Eff r ()
-- | Exit the process either normally of the error message is
-- Nothing or with exitWithError otherwise.
defaultTermination :: forall q r. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Maybe String -> Eff r ()
-- | serve two ApiHandlers at once. The first handler is used
-- for termination handling.
serveBoth :: forall r q p1 p2. (Typeable p1, Typeable p2, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> Eff r ()
-- | serve three ApiHandlers at once. The first handler is
-- used for termination handling.
serve3 :: forall r q p1 p2 p3. (Typeable p1, Typeable p2, Typeable p3, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p1 r -> ApiHandler p2 r -> ApiHandler p3 r -> Eff r ()
-- | The basic building block of the combination of ApiHandlers is
-- this function, which can not only be passed to receiveLoop, but
-- also throws an UnhandledRequest exception if castMessage
-- failed, such that multiple invokation of this function for different
-- ApiHandlers can be tried, by using catchUnhandled.
--
--
-- tryApiHandler px handlers1 message
-- `catchUnhandled`
-- tryApiHandler px handlers2
-- `catchUnhandled`
-- tryApiHandler px handlers3
--
tryApiHandler :: forall r q p. (Typeable p, SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> ApiHandler p r -> Dynamic -> Eff (Exc UnhandledRequest : r) ()
-- | An exception that is used by the mechanism that chains together
-- multiple different ApiHandler allowing a single process to
-- implement multiple Apis. This exception is thrown by
-- castMessage. This exception can be caught with
-- catchUnhandled, this way, several distinct ApiHandler
-- can be tried until one fits or until the exitUnhandled
-- is invoked.
data UnhandledRequest
-- | If castMessage failes to cast the message to a Request
-- for a certain ApiHandler it throws an UnhandledRequest
-- exception. That exception is caught by this function and the raw
-- message is given to the handler function. This is the basis for
-- chaining ApiHandlers.
catchUnhandled :: forall r a. (Member (Exc UnhandledRequest) r, HasCallStack) => Eff r a -> (Dynamic -> Eff r a) -> Eff r a
-- | Catch UnhandledRequests and terminate the process with an
-- error, if necessary.
ensureAllHandled :: forall r q. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff (Exc UnhandledRequest : r) () -> Eff r ()
-- | Cast a Dynamic value, and if that fails, throw an
-- UnhandledRequest error.
castMessage :: forall r a. (HasCallStack, Typeable a, Member (Exc UnhandledRequest) r) => Dynamic -> Eff r a
-- | If an incoming message could not be casted to a Request
-- corresponding to an ApiHandler (e.g. with castMessage)
-- one should use this function to exit the process with a corresponding
-- error.
exitUnhandled :: forall r q. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> Dynamic -> Eff r ()
-- | A logging effect based on MonadLog.
module Control.Eff.Log
-- | Logging effect type, parameterized by a log message type.
data Logs message a
[LogMsg] :: message -> Logs message ()
-- | Log a message.
logMsg :: Member (Logs m) r => m -> Eff r ()
-- | Change, add or remove log messages.
--
-- Requirements:
--
--
-- - All log meta data for typical prod code can be added without
-- changing much of the code
-- - Add timestamp to a log messages of a sub-computation.
-- - Write some messages to a file.
-- - Log something extra, e.g. runtime memory usage in load tests
--
--
-- Approach: Install a callback that sneaks into to log message
-- sending/receiving, to intercept the messages and execute some code and
-- then return a new message.
foldLog :: forall r m a. Member (Logs m) r => (m -> Eff r ()) -> Eff r a -> Eff r a
-- | Change, add or remove log messages without side effects, faster than
-- foldLog.
--
-- Requirements:
--
--
-- - Tests run fast in unit tests so travis won't time out
-- - Drop debug logs
-- - Grep like log filtering
--
--
-- Approach: Install a callback that sneaks into to log message
-- sending/receiving, to intercept the messages and execute some code and
-- then return a new message.
foldLogFast :: forall r m a f. (Foldable f, Member (Logs m) r) => (m -> f m) -> Eff r a -> Eff r a
-- | Capture the logs in a Seq.
captureLogs :: Eff (Logs message : r) a -> Eff r (a, Seq message)
-- | Throw away all log messages.
ignoreLogs :: Eff (Logs message : r) a -> Eff r a
-- | Handle Logs effects using LoggingT Handlers.
handleLogsWith :: forall m r message a. (Monad m, SetMember Lift (Lift m) r) => Eff (Logs message : r) a -> (forall b. (Handler m message -> m b) -> m b) -> Eff r a
-- | A log channel processes logs from the Logs effect by en-queuing
-- them in a shared queue read from a seperate processes. A channel can
-- contain log message filters.
data LogChannel message
-- | Send the log messages to a LogChannel.
logToChannel :: forall r message a. (SetMember Lift (Lift IO) r) => LogChannel message -> Eff (Logs message : r) a -> Eff r a
-- | Create a LogChannel that will discard all messages sent via
-- forwardLogstochannel or logChannelPutIO.
noLogger :: LogChannel message
-- | Fork a new process, that applies a monadic action to all log messages
-- sent via logToChannel or logChannelPutIO.
forkLogger :: forall message. (Typeable message, Show message) => Int -> (message -> IO ()) -> Maybe message -> IO (LogChannel message)
-- | Filter logs sent to a LogChannel using a predicate.
filterLogChannel :: (message -> Bool) -> LogChannel message -> LogChannel message
-- | Close a log channel created by e.g. forkLogger. Message already
-- enqueue are handled, as well as an optional final message. Subsequent
-- log message will not be handled anymore. If the log channel must be
-- closed immediately, use killLogChannel instead.
joinLogChannel :: (Show message, Typeable message) => Maybe message -> LogChannel message -> IO ()
-- | Close a log channel quickly, without logging messages already in the
-- queue. Subsequent logging requests will not be handled anymore. If the
-- log channel must be closed without loosing any messages, use
-- joinLogChannel instead.
killLogChannel :: (Show message, Typeable message) => Maybe message -> LogChannel message -> IO ()
-- | Run an action and close a LogChannel created by
-- noLogger, forkLogger or filterLogChannel
-- afterwards using joinLogChannel. If a SomeException was
-- thrown, the log channel is killed with killLogChannel, and the
-- exception is re-thrown.
closeLogChannelAfter :: (Show message, Typeable message, IsString message) => Maybe message -> LogChannel message -> IO a -> IO a
-- | Wrap LogChannel creation and destruction around a monad action
-- in brackety manner. This function uses joinLogChannel,
-- so en-queued messages are flushed on exit. The resulting action in in
-- the LoggingT monad, which is essentially a reader for the log
-- handler function.
logChannelBracket :: (Show message, Typeable message) => Int -> Maybe message -> Maybe message -> (LogChannel message -> IO a) -> LoggingT message IO a
-- | Enqueue a log message into a log channel
logChannelPutIO :: LogChannel message -> message -> IO ()
instance GHC.Show.Show m => GHC.Show.Show (Control.Eff.Log.KillLogChannelException m)
instance GHC.Show.Show m => GHC.Show.Show (Control.Eff.Log.JoinLogChannelException m)
instance (Data.Typeable.Internal.Typeable m, GHC.Show.Show m) => GHC.Exception.Exception (Control.Eff.Log.KillLogChannelException m)
instance (Data.Typeable.Internal.Typeable m, GHC.Show.Show m) => GHC.Exception.Exception (Control.Eff.Log.JoinLogChannelException m)
-- | A coroutine based, single threaded scheduler for Processes.
module Control.Eff.Concurrent.Process.SingleThreadedScheduler
-- | Execute a Process in the current thread, all child processes
-- spawned by spawn will be executed concurrently using a
-- co-routine based, round-robin scheduler.
schedule :: forall r. Eff (Process r : r) () -> Eff r ()
-- | Execute a Process using schedule on top of Lift
-- IO and Logs String effects.
defaultMain :: HasCallStack => Eff '[Process '[Logs String, Lift IO], Logs String, Lift IO] () -> IO ()
-- | A SchedulerProxy for LoggingAndIo.
singleThreadedIoScheduler :: SchedulerProxy LoggingAndIo
-- | The concrete list of Effects for running this pure scheduler on
-- IO and with string logging.
type LoggingAndIo = '[Logs String, Lift IO]
-- | Implement Erlang style message passing concurrency.
--
-- This handles the MessagePassing and Process effects,
-- using TQueues and forkIO.
--
-- This aims to be a pragmatic implementation, so even logging is
-- supported.
--
-- At the core is a main process that enters schedule and
-- creates all of the internal state stored in TVars to manage
-- processes with message queues.
--
-- The Eff handler for Process and MessagePassing
-- use are implemented and available through spawn.
--
-- spawn uses forkFinally and TQueues and tries to
-- catch most exceptions.
module Control.Eff.Concurrent.Process.ForkIOScheduler
-- | This is the main entry point to running a message passing concurrency
-- application. This function takes a Process on top of the
-- SchedulerIO effect and a LogChannel for concurrent
-- logging.
schedule :: Eff (ConsProcess SchedulerIO) () -> LogChannel String -> IO ()
-- | Start the message passing concurrency system then execute a
-- Process on top of SchedulerIO effect. All logging is
-- sent to standard output.
defaultMain :: Eff (ConsProcess SchedulerIO) () -> IO ()
-- | Start the message passing concurrency system then execute a
-- Process on top of SchedulerIO effect. All logging is
-- sent to standard output.
defaultMainWithLogChannel :: LogChannel String -> Eff (ConsProcess SchedulerIO) () -> IO ()
-- | A sum-type with errors that can occur when scheduleing messages.
data SchedulerError
-- | No ProcessInfo was found for a ProcessId during internal
-- processing. NOTE: This is **ONLY** caused by internal errors, probably
-- by an incorrect MessagePassing handler in this module.
-- **Sending a message to a process ALWAYS succeeds!** Even if the
-- process does not exist.
ProcessNotFound :: ProcessId -> SchedulerError
-- | A process called raiseError.
ProcessRaisedError :: String -> SchedulerError
-- | A process called exitWithError.
ProcessExitError :: String -> SchedulerError
-- | A process exits.
ProcessShuttingDown :: SchedulerError
-- | An action was not performed while the scheduler was exiting.
SchedulerShuttingDown :: SchedulerError
-- | The concrete list of Effects for this scheduler implementation.
-- See HasSchedulerIO
type SchedulerIO = '[Reader SchedulerVar, Logs String, Lift IO]
-- | A SchedulerProxy for SchedulerIO
forkIoScheduler :: SchedulerProxy SchedulerIO
-- | An alias for the constraints for the effects essential to this
-- scheduler implementation, i.e. these effects allow spawning new
-- Processes. See SchedulerIO
type HasSchedulerIO r = (HasCallStack, SetMember Lift (Lift IO) r, Member (Logs String) r, Member (Reader SchedulerVar) r)
instance GHC.Show.Show Control.Eff.Concurrent.Process.ForkIOScheduler.SchedulerError
instance GHC.Exception.Exception Control.Eff.Concurrent.Process.ForkIOScheduler.SchedulerError
instance GHC.Show.Show Control.Eff.Concurrent.Process.ForkIOScheduler.ProcessInfo
-- | Observer Effects
--
-- This module supports the implementation of observers and observables.
-- One use case is event propagation. The tools in this module are
-- tailored towards Api servers/clients.
module Control.Eff.Concurrent.Api.Observer
-- | An Api index that support observation of the another Api
-- that is Observable.
class (Typeable p, Observable o) => Observer p o
-- | Wrap the Observation and the ProcessId (i.e. the
-- Server) that caused the observation into a Api value
-- that the Observable understands.
observationMessage :: Observer p o => Server o -> Observation o -> Api p 'Asynchronous
-- | An Api index that supports registration and de-registration of
-- Observers.
class (Typeable o, Typeable (Observation o)) => Observable o where {
data family Observation o;
}
-- | Return the Api value for the cast_ that registeres an
-- observer
registerObserverMessage :: Observable o => SomeObserver o -> Api o 'Asynchronous
-- | Return the Api value for the cast_ that de-registeres
-- an observer
forgetObserverMessage :: Observable o => SomeObserver o -> Api o 'Asynchronous
-- | Send an Observation to an Observer
notifyObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Observation o -> Eff r ()
-- | Send the registerObserverMessage
registerObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack) => SchedulerProxy q -> Server p -> Server o -> Eff r ()
-- | Send the forgetObserverMessage
forgetObserver :: (SetMember Process (Process q) r, Observable o, Observer p o) => SchedulerProxy q -> Server p -> Server o -> Eff r ()
-- | An existential wrapper around a Server of an Observer.
-- Needed to support different types of observers to observe the same
-- Observable in a general fashion.
data SomeObserver o
[SomeObserver] :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o
-- | Send an Observation to SomeObserver.
notifySomeObserver :: (SetMember Process (Process q) r, Observable o, HasCallStack) => SchedulerProxy q -> Server o -> Observation o -> SomeObserver o -> Eff r ()
-- | Internal state for manageobservers
data Observers o
-- | Keep track of registered Observers Observers can be added and
-- removed, and an Observation can be sent to all registerd
-- observers at once.
manageObservers :: Eff (State (Observers o) : r) a -> Eff r a
-- | Add an Observer to the Observers managed by
-- manageObservers.
addObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r ()
-- | Delete an Observer from the Observers managed by
-- manageObservers.
removeObserver :: (SetMember Process (Process q) r, Member (State (Observers o)) r, Observable o) => SomeObserver o -> Eff r ()
-- | Send an Observation to all SomeObservers in the
-- Observers state.
notifyObservers :: forall o r q. (Observable o, SetMember Process (Process q) r, Member (State (Observers o)) r) => SchedulerProxy q -> Observation o -> Eff r ()
-- | An Observer that schedules the observations to an effectful
-- callback.
data CallbackObserver o
-- | Start a new process for an Observer that schedules all
-- observations to an effectful callback.
spawnCallbackObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs String) q) => SchedulerProxy q -> (Server o -> Observation o -> Eff (Process q : q) Bool) -> Eff r (Server (CallbackObserver o))
instance GHC.Show.Show (Control.Eff.Concurrent.Api.Observer.SomeObserver o)
instance GHC.Show.Show (Control.Eff.Concurrent.Api.Observer.Observation o) => GHC.Show.Show (Control.Eff.Concurrent.Api.Api (Control.Eff.Concurrent.Api.Observer.CallbackObserver o) r)
instance Control.Eff.Concurrent.Api.Observer.Observable o => Control.Eff.Concurrent.Api.Observer.Observer (Control.Eff.Concurrent.Api.Observer.CallbackObserver o) o
instance GHC.Classes.Ord (Control.Eff.Concurrent.Api.Observer.SomeObserver o)
instance GHC.Classes.Eq (Control.Eff.Concurrent.Api.Observer.SomeObserver o)