-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Message passing concurrency as extensible-effect -- -- Please see the README on GitHub at -- https://github.com/sheyll/extensible-effects-concurrent#readme @package extensible-effects-concurrent @version 0.14.2 -- | Add-ons to Exception and Excepion module Control.Eff.ExceptionExtra -- | Catch Exception thrown by an effect. liftTry :: forall e r a. (HasCallStack, Exception e, Lifted IO r) => Eff r a -> Eff r (Either e a) -- | Very similar to liftEither but for Maybes. Unlike -- liftMaybe this will throw the given value (instead of using -- Fail). maybeThrow :: Member (Exc x) e => x -> Maybe a -> Eff e a instance Control.Monad.Catch.MonadThrow (Control.Eff.Internal.Eff e) => Control.Monad.Catch.MonadThrow (Control.Eff.Internal.Eff (Control.Eff.Reader.Lazy.Reader x : e)) instance Control.Monad.Catch.MonadCatch (Control.Eff.Internal.Eff e) => Control.Monad.Catch.MonadCatch (Control.Eff.Internal.Eff (Control.Eff.Reader.Lazy.Reader x : e)) instance Control.Monad.Catch.MonadMask (Control.Eff.Internal.Eff e) => Control.Monad.Catch.MonadMask (Control.Eff.Internal.Eff (Control.Eff.Reader.Lazy.Reader x : e)) instance Control.Monad.Catch.MonadThrow (Control.Eff.Internal.Eff e) => Control.Monad.Catch.MonadThrow (Control.Eff.Internal.Eff (Control.Eff.Reader.Strict.Reader x : e)) instance Control.Monad.Catch.MonadCatch (Control.Eff.Internal.Eff e) => Control.Monad.Catch.MonadCatch (Control.Eff.Internal.Eff (Control.Eff.Reader.Strict.Reader x : e)) instance Control.Monad.Catch.MonadMask (Control.Eff.Internal.Eff e) => Control.Monad.Catch.MonadMask (Control.Eff.Internal.Eff (Control.Eff.Reader.Strict.Reader x : e)) instance Control.Monad.Catch.MonadThrow m => Control.Monad.Catch.MonadThrow (Control.Eff.Internal.Eff '[Control.Eff.Internal.Lift m]) instance Control.Monad.Catch.MonadCatch m => Control.Monad.Catch.MonadCatch (Control.Eff.Internal.Eff '[Control.Eff.Internal.Lift m]) instance Control.Monad.Catch.MonadMask m => Control.Monad.Catch.MonadMask (Control.Eff.Internal.Eff '[Control.Eff.Internal.Lift m]) instance Control.Monad.Catch.MonadThrow (Control.Eff.Internal.Eff e) => Control.Monad.Catch.MonadThrow (Control.Eff.Internal.Eff (Control.Eff.Exception.Exc x : e)) instance Control.Monad.Catch.MonadCatch (Control.Eff.Internal.Eff e) => Control.Monad.Catch.MonadCatch (Control.Eff.Internal.Eff (Control.Eff.Exception.Exc x : e)) instance Control.Monad.Catch.MonadMask (Control.Eff.Internal.Eff e) => Control.Monad.Catch.MonadMask (Control.Eff.Internal.Eff (Control.Eff.Exception.Exc x : e)) -- | A logging effect based on MonadLog. module Control.Eff.Log.Handler -- | Log a message. The message is reduced to normal form (strict). logMsg :: (NFData m, Member (Logs m) e) => m -> Eff e () -- | Log a bunch of messages. This might be more efficient than calling -- logMsg multiple times. The messages are reduced to normal form -- (strict). logMsgs :: (Traversable f, MonadPlus f, NFData1 f, NFData (f m), NFData m, Member (Logs m) e) => f m -> Eff e () -- | A constraint that combines constraints for logging into any log writer -- monad. type HasLogWriter message logWriterMonad effects = (Member (Reader (LogWriter message logWriterMonad)) effects, Member (Logs message) effects, NFData message, Monad logWriterMonad, Lifted logWriterMonad effects) -- | Map a pure function over log messages. mapLogMessages :: forall m r b. (NFData m, Member (Logs m) r) => (m -> m) -> Eff r b -> Eff r b -- | Keep only those messages, for which a predicate holds. -- -- E.g. to keep only messages which begin with OMG: -- --
-- filterLogMessages (\msg -> case msg of -- 'O':'M':'G':_ -> True -- _ -> False) -- (do logMsg "this message will not be logged" -- logMsg "OMG logged") --filterLogMessages :: forall m r b. (NFData m, Member (Logs m) r) => (m -> Bool) -> Eff r b -> Eff r b -- | Map an Effectful function over every bunch of log messages. -- -- For example, to attach the current time to each log message: -- --
-- appendTimestamp -- :: ( Member (Logs String) e -- , Lifted IO e) -- => Eff e a -- -> Eff e a -- appendTimestamp = traverseLogMessages $ \ms -> do -- now <- getCurrentTime -- return (fmap (show now ++) ms) --traverseLogMessages :: forall m r h b. (Member (Logs m) r, Monad h, Lifted h r, Member (Reader (LogWriter m h)) r) => (forall f. (MonadPlus f, Traversable f, NFData1 f) => f m -> h (f m)) -> Eff r b -> Eff r b -- | Change the way log messages are *written*. Replaces the existing -- LogWriter by a new one. The new LogWriter is constructed -- from a function that gets a bunch of messages and returns an -- Effect. That effect has a Reader for the previous -- LogWriter and Lifts the log writer base monad. changeLogWriter :: forall r m h a. (Monad h, Lifted h r, Member (Reader (LogWriter m h)) r) => (forall f. (Traversable f, NFData1 f, MonadPlus f) => f m -> Eff '[Reader (LogWriter m h), Lift h] ()) -> Eff r a -> Eff r a -- | Throw away all log messages. ignoreLogs :: forall message r a. Eff (Logs message : r) a -> Eff r a -- | Trace all log messages using traceM. The message value is -- converted to String using the given function. traceLogs :: forall message r a. (message -> String) -> Eff (Logs message : r) a -> Eff r a -- | A function that takes a log message and returns an effect that -- logs the message. data LogWriter message writerM -- | Type alias for the Reader effect that writes logs type LogWriterReader message writerM = Reader (LogWriter message writerM) -- | Create a LogWriter from a function that can write a -- Traversable container. foldingLogWriter :: (forall f. (MonadPlus f, Traversable f, NFData1 f) => f message -> writerM ()) -> LogWriter message writerM -- | Efficiently apply the LogWriter to a Traversable -- container of log messages. writeAllLogMessages :: (NFData1 f, MonadPlus f, Traversable f, Applicative writerM) => LogWriter message writerM -> f message -> writerM () -- | Create a LogWriter from a function that is applied to each -- individual log message. NOTE: This is probably the simplest, but also -- the most inefficient and annoying way to make a LogWriter. -- Better use foldingLogWriter or even -- multiMessageLogWriter. singleMessageLogWriter :: Applicative writerM => (message -> writerM ()) -> LogWriter message writerM -- | Create a LogWriter from a function that is applied to each -- individual log message. Don't be scared by the type signature, here is -- an example file appender that re-opens the log file everytime a bunch -- of log messages are written: -- --
-- fileAppender fn = multiMessageLogWriter -- (\writeLogMessageWith -> -- withFile fn AppendMode (writeLogMessageWith . hPutStrLn)) --multiMessageLogWriter :: Applicative writerM => (((message -> writerM ()) -> writerM ()) -> writerM ()) -> LogWriter message writerM -- | Get the current LogWriter askLogWriter :: forall m h r. Member (Reader (LogWriter m h)) r => Eff r (LogWriter m h) -- | This effect sends log messages. The logs are not sent one-by-one, but -- always in batches of containers that must be Traversable and -- MonadPlus instances. Log messages are consumed by -- LogWriters installed via runLogs or more high level -- functions like writeLogs. data Logs m v -- | Handle log message effects by a monadic action, e.g. an IO action to -- send logs to the console output or a log-server. The monadic log -- writer action is wrapped in a newtype called LogWriter. -- -- Use the smart constructors below to create them, e.g. -- foldingLogWriter, singleMessageLogWriter or -- mulitMessageLogWriter. writeLogs :: forall message writerM r a. (Applicative writerM, Lifted writerM r, NFData message) => LogWriter message writerM -> Eff (Logs message : (Reader (LogWriter message writerM) : r)) a -> Eff r a -- | Install Logs handler that asks for a LogWriter -- for the message type and applies the log writer to the messages. runLogs :: forall m h e b. (Applicative h, Lifted h e, Member (LogWriterReader m h) e, NFData m) => Eff (Logs m : e) b -> Eff e b instance (Control.Monad.Base.MonadBase m m, Control.Eff.Lift.Lifted m r, Control.DeepSeq.NFData l, Control.Monad.Trans.Control.MonadBaseControl m (Control.Eff.Internal.Eff r)) => Control.Monad.Trans.Control.MonadBaseControl m (Control.Eff.Internal.Eff (Control.Eff.Log.Handler.Logs l : Control.Eff.Log.Handler.LogWriterReader l m : r)) instance (Control.DeepSeq.NFData l, Control.Eff.Lift.Lifted m e, Control.Monad.Catch.MonadThrow (Control.Eff.Internal.Eff e)) => Control.Monad.Catch.MonadThrow (Control.Eff.Internal.Eff (Control.Eff.Log.Handler.Logs l : Control.Eff.Log.Handler.LogWriterReader l m : e)) instance (Control.DeepSeq.NFData l, GHC.Base.Applicative m, Control.Eff.Lift.Lifted m e, Control.Monad.Catch.MonadCatch (Control.Eff.Internal.Eff e)) => Control.Monad.Catch.MonadCatch (Control.Eff.Internal.Eff (Control.Eff.Log.Handler.Logs l : Control.Eff.Log.Handler.LogWriterReader l m : e)) instance (Control.DeepSeq.NFData l, GHC.Base.Applicative m, Control.Eff.Lift.Lifted m e, Control.Monad.Catch.MonadMask (Control.Eff.Internal.Eff e)) => Control.Monad.Catch.MonadMask (Control.Eff.Internal.Eff (Control.Eff.Log.Handler.Logs l : Control.Eff.Log.Handler.LogWriterReader l m : e)) instance GHC.Base.Applicative w => Data.Default.Class.Default (Control.Eff.Log.Handler.LogWriter m w) -- | Concurrent Logging module Control.Eff.Log.Channel -- | A log channel processes logs from the Logs effect by en-queuing -- them in a shared queue read from a seperate processes. A channel can -- contain log message filters. data LogChannel message -- | Fork a new process in which the given log message writer, will listen -- on a message queue in a LogChannel, which is passed to the -- second function. If the function returns or throws, the logging -- process will be killed. -- -- Log messages are deeply evaluated before being sent to the logger -- process, to prevent that lazy evaluation leads to heavy work being -- done in the logger process instead of the caller process. -- -- Example usage, a super stupid log to file: -- --
-- main = -- withAsyncLogChannel -- 1000 -- (singleMessageLogWriter putStrLn) -- (handleLoggingAndIO -- (do logMsg "test 1" -- logMsg "test 2" -- logMsg "test 3")) --withAsyncLogChannel :: forall message a len. (NFData message, Integral len) => len -> LogWriter message IO -> (LogChannel message -> IO a) -> IO a -- | Fork an IO based log writer thread and set the LogWriter to an -- action that will send all logs to that thread via a bounded queue. -- When the queue is full, flush it handleLoggingAndIO :: (NFData m, HasCallStack) => Eff '[Logs m, LogWriterReader m IO, Lift IO] a -> LogChannel m -> IO a -- | Like handleLoggingAndIO but return (). handleLoggingAndIO_ :: (NFData m, HasCallStack) => Eff '[Logs m, LogWriterReader m IO, Lift IO] a -> LogChannel m -> IO () -- | An RFC 5434 inspired log message and convenience functions for logging -- them. module Control.Eff.Log.Message -- | A message data type inspired by the RFC-5424 Syslog Protocol data LogMessage LogMessage :: Facility -> Severity -> Maybe UTCTime -> Maybe String -> Maybe String -> Maybe String -> Maybe String -> [StructuredDataElement] -> Maybe ThreadId -> Maybe SrcLoc -> String -> Int -> LogMessage [_lmFacility] :: LogMessage -> Facility [_lmSeverity] :: LogMessage -> Severity [_lmTimestamp] :: LogMessage -> Maybe UTCTime [_lmHostname] :: LogMessage -> Maybe String [_lmAppname] :: LogMessage -> Maybe String [_lmProcessId] :: LogMessage -> Maybe String [_lmMessageId] :: LogMessage -> Maybe String [_lmStructuredData] :: LogMessage -> [StructuredDataElement] [_lmThreadId] :: LogMessage -> Maybe ThreadId [_lmSrcLoc] :: LogMessage -> Maybe SrcLoc [_lmMessage] :: LogMessage -> String [_lmDistance] :: LogMessage -> Int -- | A convenient alias for the constraints that enable logging of -- LogMessages in the monad, which is Lifted into a given -- Eff effect list. type HasLogging writerM effect = (HasLogWriter LogMessage writerM effect) -- | Render a LogMessage according to the rules in the given RFC, -- except for the rules concerning unicode and ascii renderRFC5424 :: LogMessage -> String -- | Render a LogMessage but set the timestamp and thread id fields. printLogMessage :: LogMessage -> IO () -- | Use ioLogMessageWriter to handle logging using -- handleLogs. ioLogMessageHandler :: (HasCallStack, Lifted IO e) => LogWriter String IO -> Eff (Logs LogMessage : (LogWriterReader LogMessage IO : e)) a -> Eff e a -- | Set a timestamp (if not set), the thread id (if not set) using IO -- actions then write the log message using the IO and -- String based LogWriter. ioLogMessageWriter :: HasCallStack => LogWriter String IO -> LogWriter LogMessage IO -- | A LogWriter that applys renderLogMessage to the log -- message and then traces it using traceM. traceLogMessageWriter :: Monad m => LogWriter LogMessage m -- | Render a LogMessage human readable. renderLogMessage :: LogMessage -> String -- | Increase the distance of log messages by one. Logs can be -- filtered by their distance with dropDistantLogMessages increaseLogMessageDistance :: (HasCallStack, HasLogWriter LogMessage h e) => Eff e a -> Eff e a -- | Drop all log messages with an lmDistance greater than the given -- value. dropDistantLogMessages :: HasLogging m r => Int -> Eff r a -> Eff r a -- | Log a String as LogMessage with a given Severity. logWithSeverity :: (HasCallStack, Member (Logs LogMessage) e) => Severity -> String -> Eff e () -- | Log a String as emergencySeverity. logEmergency :: (HasCallStack, Member (Logs LogMessage) e) => String -> Eff e () -- | Log a message with alertSeverity. logAlert :: (HasCallStack, Member (Logs LogMessage) e) => String -> Eff e () -- | Log a criticalSeverity message. logCritical :: (HasCallStack, Member (Logs LogMessage) e) => String -> Eff e () -- | Log a errorSeverity message. logError :: (HasCallStack, Member (Logs LogMessage) e) => String -> Eff e () -- | Log a warningSeverity message. logWarning :: (HasCallStack, Member (Logs LogMessage) e) => String -> Eff e () -- | Log a noticeSeverity message. logNotice :: (HasCallStack, Member (Logs LogMessage) e) => String -> Eff e () -- | Log a informationalSeverity message. logInfo :: (HasCallStack, Member (Logs LogMessage) e) => String -> Eff e () -- | Log a debugSeverity message. logDebug :: (HasCallStack, Member (Logs LogMessage) e) => String -> Eff e () -- | Construct a LogMessage with errorSeverity errorMessage :: HasCallStack => String -> LogMessage -- | Construct a LogMessage with informationalSeverity infoMessage :: HasCallStack => String -> LogMessage -- | Construct a LogMessage with debugSeverity debugMessage :: HasCallStack => String -> LogMessage -- | Construct a LogMessage with errorSeverity errorMessageIO :: (HasCallStack, MonadIO m) => String -> m LogMessage -- | Construct a LogMessage with informationalSeverity infoMessageIO :: (HasCallStack, MonadIO m) => String -> m LogMessage -- | Construct a LogMessage with debugSeverity debugMessageIO :: (HasCallStack, MonadIO m) => String -> m LogMessage -- | An rfc 5424 severity data Severity emergencySeverity :: Severity alertSeverity :: Severity criticalSeverity :: Severity errorSeverity :: Severity warningSeverity :: Severity noticeSeverity :: Severity informationalSeverity :: Severity debugSeverity :: Severity -- | An rfc 5424 facility data Facility kernelMessages :: Facility userLevelMessages :: Facility mailSystem :: Facility systemDaemons :: Facility securityAuthorizationMessages4 :: Facility linePrinterSubsystem :: Facility networkNewsSubsystem :: Facility uucpSubsystem :: Facility clockDaemon :: Facility securityAuthorizationMessages10 :: Facility ftpDaemon :: Facility ntpSubsystem :: Facility logAuditFacility :: Facility logAlertFacility :: Facility clockDaemon2 :: Facility local0 :: Facility local1 :: Facility local2 :: Facility local3 :: Facility local4 :: Facility local5 :: Facility local6 :: Facility local7 :: Facility lmFacility :: Lens' LogMessage Facility lmSeverity :: Lens' LogMessage Severity lmTimestamp :: Lens' LogMessage (Maybe UTCTime) lmHostname :: Lens' LogMessage (Maybe String) lmAppname :: Lens' LogMessage (Maybe String) lmProcessId :: Lens' LogMessage (Maybe String) lmMessageId :: Lens' LogMessage (Maybe String) lmStructuredData :: Lens' LogMessage [StructuredDataElement] lmSrcLoc :: Lens' LogMessage (Maybe SrcLoc) lmThreadId :: Lens' LogMessage (Maybe ThreadId) lmMessage :: Lens' LogMessage String lmDistance :: Lens' LogMessage Int -- | Put the source location of the given callstack in lmSrcLoc setCallStack :: CallStack -> LogMessage -> LogMessage -- | An IO action that sets the current UTC time (see -- enableLogMessageTimestamps) in lmTimestamp. setLogMessageTimestamp :: MonadIO m => LogMessage -> m LogMessage -- | An IO action appends the the ThreadId of the calling process -- (see myThreadId) to lmMessage. setLogMessageThreadId :: MonadIO m => LogMessage -> m LogMessage -- | RFC-5424 defines how structured data can be included in a log message. data StructuredDataElement SdElement :: String -> [SdParameter] -> StructuredDataElement [_sdElementId] :: StructuredDataElement -> String [_sdElementParameters] :: StructuredDataElement -> [SdParameter] sdElementId :: Lens' StructuredDataElement String sdElementParameters :: Lens' StructuredDataElement [SdParameter] instance Data.Default.Class.Default Control.Eff.Log.Message.LogMessage instance Data.String.IsString Control.Eff.Log.Message.LogMessage instance Data.Default.Class.Default Control.Eff.Log.Message.Severity instance Data.Default.Class.Default Control.Eff.Log.Message.Facility instance GHC.Generics.Generic Control.Eff.Log.Message.LogMessage instance GHC.Classes.Eq Control.Eff.Log.Message.LogMessage instance Control.DeepSeq.NFData Control.Eff.Log.Message.Facility instance GHC.Generics.Generic Control.Eff.Log.Message.Facility instance GHC.Show.Show Control.Eff.Log.Message.Facility instance GHC.Classes.Ord Control.Eff.Log.Message.Facility instance GHC.Classes.Eq Control.Eff.Log.Message.Facility instance Control.DeepSeq.NFData Control.Eff.Log.Message.Severity instance GHC.Generics.Generic Control.Eff.Log.Message.Severity instance GHC.Classes.Ord Control.Eff.Log.Message.Severity instance GHC.Classes.Eq Control.Eff.Log.Message.Severity instance GHC.Generics.Generic Control.Eff.Log.Message.StructuredDataElement instance GHC.Classes.Ord Control.Eff.Log.Message.StructuredDataElement instance GHC.Classes.Eq Control.Eff.Log.Message.StructuredDataElement instance GHC.Generics.Generic Control.Eff.Log.Message.SdParameter instance GHC.Classes.Ord Control.Eff.Log.Message.SdParameter instance GHC.Classes.Eq Control.Eff.Log.Message.SdParameter instance Control.DeepSeq.NFData Control.Eff.Log.Message.LogMessage instance GHC.Show.Show Control.Eff.Log.Message.Severity instance GHC.Show.Show Control.Eff.Log.Message.StructuredDataElement instance Control.DeepSeq.NFData Control.Eff.Log.Message.StructuredDataElement instance GHC.Show.Show Control.Eff.Log.Message.SdParameter instance Control.DeepSeq.NFData Control.Eff.Log.Message.SdParameter -- | A logging effect based on MonadLog. module Control.Eff.Log -- | The message passing effect. -- -- This module describes an abstract message passing effect, and a -- process effect, mimicking Erlang's process and message semantics. -- -- Two scheduler implementations for the Process effect are -- provided: -- --
-- logCrash :: ExitReason -> Eff e () -- logCrash (toCrashReason -> Just reason) = logError reason -- logCrash _ = return () ---- -- Though this can be improved to: -- --
-- logCrash = traverse_ logError . toCrashReason --toCrashReason :: ExitReason x -> Maybe String -- | Partition a SomeExitReason back into either a NoRecovery -- or a Recoverable ExitReason fromSomeExitReason :: SomeExitReason -> Either (ExitReason 'NoRecovery) InterruptReason -- | Log the ProcessExitReaons logProcessExit :: (HasCallStack, Member (Logs LogMessage) e) => ExitReason x -> Eff e () instance GHC.Show.Show v => GHC.Show.Show (Control.Eff.Concurrent.Process.ResumeProcess v) instance GHC.Generics.Generic1 Control.Eff.Concurrent.Process.ResumeProcess instance GHC.Generics.Generic (Control.Eff.Concurrent.Process.ResumeProcess v) instance GHC.Classes.Ord Control.Eff.Concurrent.Process.ProcessDown instance GHC.Classes.Eq Control.Eff.Concurrent.Process.ProcessDown instance GHC.Generics.Generic Control.Eff.Concurrent.Process.ProcessDown instance GHC.Generics.Generic Control.Eff.Concurrent.Process.MonitorReference instance GHC.Classes.Ord Control.Eff.Concurrent.Process.MonitorReference instance GHC.Classes.Eq Control.Eff.Concurrent.Process.MonitorReference instance GHC.Read.Read Control.Eff.Concurrent.Process.MonitorReference instance Control.DeepSeq.NFData Control.Eff.Concurrent.Process.ProcessId instance GHC.Real.Real Control.Eff.Concurrent.Process.ProcessId instance GHC.Real.Integral Control.Eff.Concurrent.Process.ProcessId instance GHC.Enum.Enum Control.Eff.Concurrent.Process.ProcessId instance GHC.Num.Num Control.Eff.Concurrent.Process.ProcessId instance GHC.Enum.Bounded Control.Eff.Concurrent.Process.ProcessId instance GHC.Classes.Ord Control.Eff.Concurrent.Process.ProcessId instance GHC.Classes.Eq Control.Eff.Concurrent.Process.ProcessId instance GHC.Generics.Generic Control.Eff.Concurrent.Process.ExitSeverity instance GHC.Classes.Eq Control.Eff.Concurrent.Process.ExitSeverity instance GHC.Classes.Ord Control.Eff.Concurrent.Process.ExitSeverity instance GHC.Generics.Generic Control.Eff.Concurrent.Process.ExitRecovery instance GHC.Classes.Eq Control.Eff.Concurrent.Process.ExitRecovery instance GHC.Classes.Ord Control.Eff.Concurrent.Process.ExitRecovery instance GHC.Generics.Generic Control.Eff.Concurrent.Process.ProcessState instance GHC.Enum.Enum Control.Eff.Concurrent.Process.ProcessState instance GHC.Classes.Eq Control.Eff.Concurrent.Process.ProcessState instance GHC.Classes.Ord Control.Eff.Concurrent.Process.ProcessState instance GHC.Show.Show Control.Eff.Concurrent.Process.ProcessState instance GHC.Read.Read Control.Eff.Concurrent.Process.ProcessState instance GHC.Base.Functor Control.Eff.Concurrent.Process.MessageSelector instance GHC.Base.Semigroup a => GHC.Base.Monoid (Control.Eff.Concurrent.Process.MessageSelector a) instance GHC.Base.Semigroup a => GHC.Base.Semigroup (Control.Eff.Concurrent.Process.MessageSelector a) instance GHC.Show.Show (Control.Eff.Concurrent.Process.Process r b) instance Control.DeepSeq.NFData a => Control.DeepSeq.NFData (Control.Eff.Concurrent.Process.ResumeProcess a) instance Control.DeepSeq.NFData1 Control.Eff.Concurrent.Process.ResumeProcess instance Control.DeepSeq.NFData Control.Eff.Concurrent.Process.ProcessDown instance GHC.Show.Show Control.Eff.Concurrent.Process.ProcessDown instance GHC.Classes.Ord Control.Eff.Concurrent.Process.SomeExitReason instance GHC.Classes.Eq Control.Eff.Concurrent.Process.SomeExitReason instance GHC.Show.Show Control.Eff.Concurrent.Process.SomeExitReason instance Control.DeepSeq.NFData Control.Eff.Concurrent.Process.SomeExitReason instance GHC.Show.Show (Control.Eff.Concurrent.Process.ExitReason x) instance GHC.Exception.Type.Exception (Control.Eff.Concurrent.Process.ExitReason 'Control.Eff.Concurrent.Process.Recoverable) instance GHC.Exception.Type.Exception (Control.Eff.Concurrent.Process.ExitReason 'Control.Eff.Concurrent.Process.NoRecovery) instance Control.DeepSeq.NFData (Control.Eff.Concurrent.Process.ExitReason x) instance GHC.Classes.Ord (Control.Eff.Concurrent.Process.ExitReason x) instance GHC.Classes.Eq (Control.Eff.Concurrent.Process.ExitReason x) instance Control.DeepSeq.NFData Control.Eff.Concurrent.Process.MonitorReference instance GHC.Show.Show Control.Eff.Concurrent.Process.MonitorReference instance GHC.Read.Read Control.Eff.Concurrent.Process.ProcessId instance GHC.Show.Show Control.Eff.Concurrent.Process.ProcessId instance GHC.Show.Show Control.Eff.Concurrent.Process.ExitSeverity instance Control.DeepSeq.NFData Control.Eff.Concurrent.Process.ExitSeverity instance Control.DeepSeq.NFData Control.Eff.Concurrent.Process.ExitRecovery instance GHC.Show.Show Control.Eff.Concurrent.Process.ExitRecovery instance Control.DeepSeq.NFData Control.Eff.Concurrent.Process.ProcessState instance Data.Default.Class.Default Control.Eff.Concurrent.Process.ProcessState instance GHC.Base.Applicative Control.Eff.Concurrent.Process.MessageSelector instance GHC.Base.Alternative Control.Eff.Concurrent.Process.MessageSelector -- | Functions for timeouts when receiving messages. -- -- NOTE: If you use a single threaded scheduler, these functions will not -- work as expected. (This is an open TODO) module Control.Eff.Concurrent.Process.Timer -- | A number of micro seconds. data Timeout -- | The reference to a timer started by startTimer, required to -- stop a timer via cancelTimer. data TimerReference -- | A value to be sent when timer started with startTimer has -- elapsed. data TimerElapsed -- | Send a message to a given process after waiting. The message is -- created by applying the function parameter to the -- TimerReference, such that the message can directly refer to the -- timer. sendAfter :: forall r q message. (Lifted IO q, HasCallStack, SetMember Process (Process q) r, Member Interrupts r, Typeable message, NFData message) => SchedulerProxy q -> ProcessId -> Timeout -> (TimerReference -> message) -> Eff r TimerReference -- | Start a new timer, after the time has elapsed, TimerElapsed is -- sent to calling process. The message also contains the -- TimerReference returned by this function. Use -- cancelTimer to cancel the timer. Use selectTimerElapsed -- to receive the message using receiveSelectedMessage. To receive -- messages with guarded with a timeout see receiveAfter. startTimer :: forall r q. (Lifted IO q, HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Timeout -> Eff r TimerReference -- | Cancel a timer started with startTimer. cancelTimer :: forall r q. (Lifted IO q, HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> TimerReference -> Eff r () -- | A MessageSelector matching TimerElapsed messages created -- by startTimer. selectTimerElapsed :: TimerReference -> MessageSelector TimerElapsed -- | Wait for a message of the given type for the given time. When no -- message arrives in time, return Nothing. This is based on -- receiveSelectedAfter. receiveAfter :: forall a r q. (Lifted IO q, HasCallStack, SetMember Process (Process q) r, Member Interrupts r, Typeable a, NFData a, Show a) => SchedulerProxy q -> Timeout -> Eff r (Maybe a) -- | Wait for a message of the given type for the given time. When no -- message arrives in time, return Left TimerElapsed. This -- is based on selectTimerElapsed and startTimer. receiveSelectedAfter :: forall a r q. (Lifted IO q, HasCallStack, SetMember Process (Process q) r, Member Interrupts r, Show a) => SchedulerProxy q -> MessageSelector a -> Timeout -> Eff r (Either TimerElapsed a) instance GHC.Classes.Eq Control.Eff.Concurrent.Process.Timer.TimerElapsed instance GHC.Classes.Ord Control.Eff.Concurrent.Process.Timer.TimerElapsed instance Control.DeepSeq.NFData Control.Eff.Concurrent.Process.Timer.TimerElapsed instance GHC.Enum.Enum Control.Eff.Concurrent.Process.Timer.TimerReference instance GHC.Real.Real Control.Eff.Concurrent.Process.Timer.TimerReference instance GHC.Real.Integral Control.Eff.Concurrent.Process.Timer.TimerReference instance GHC.Num.Num Control.Eff.Concurrent.Process.Timer.TimerReference instance GHC.Classes.Eq Control.Eff.Concurrent.Process.Timer.TimerReference instance GHC.Classes.Ord Control.Eff.Concurrent.Process.Timer.TimerReference instance Control.DeepSeq.NFData Control.Eff.Concurrent.Process.Timer.TimerReference instance GHC.Enum.Enum Control.Eff.Concurrent.Process.Timer.Timeout instance GHC.Real.Real Control.Eff.Concurrent.Process.Timer.Timeout instance GHC.Real.Integral Control.Eff.Concurrent.Process.Timer.Timeout instance GHC.Num.Num Control.Eff.Concurrent.Process.Timer.Timeout instance GHC.Classes.Eq Control.Eff.Concurrent.Process.Timer.Timeout instance GHC.Classes.Ord Control.Eff.Concurrent.Process.Timer.Timeout instance Control.DeepSeq.NFData Control.Eff.Concurrent.Process.Timer.Timeout instance GHC.Show.Show Control.Eff.Concurrent.Process.Timer.TimerElapsed instance GHC.Show.Show Control.Eff.Concurrent.Process.Timer.TimerReference instance GHC.Show.Show Control.Eff.Concurrent.Process.Timer.Timeout -- | A coroutine based, single threaded scheduler for Processes. module Control.Eff.Concurrent.Process.SingleThreadedScheduler -- | Handle the Process effect, as well as all lower effects using -- an effect handler function. -- -- Execute the main Process and all the other processes -- spawned by it in the current thread concurrently, using a -- co-routine based, round-robin scheduler. If a process exits with -- exitNormally, exitWithError, raiseError or is -- killed by another process Left ... is returned. Otherwise, -- the result will be wrapped in a Right. -- -- Every time a process _yields_ the effects are evaluated down to the a -- value of type m (Either String a). -- -- If the evaluator function runs the action down e.g. IO this -- might improve memory consumption, for long running services, with -- processes that loop endlessly. scheduleM :: forall m r a. Monad m => (forall b. Eff r b -> m b) -> m () -> Eff (InterruptableProcess r) a -> m (Either (ExitReason 'NoRecovery) a) -- | Like schedule but pure. The yield effect is -- just return (). schedulePure == runIdentity . -- scheduleM (Identity . run) (return ()) schedulePure :: Eff (InterruptableProcess '[Logs LogMessage]) a -> Either (ExitReason 'NoRecovery) a -- | Invoke schedule with lift yield as yield -- effect. scheduleIO runEff == scheduleM (runLift . runEff) -- (liftIO yield) scheduleIO :: MonadIO m => (forall b. Eff r b -> Eff '[Lift m] b) -> Eff (InterruptableProcess r) a -> m (Either (ExitReason 'NoRecovery) a) -- | Invoke schedule with lift yield as yield -- effect. scheduleMonadIOEff == scheduleM id (liftIO -- yield) scheduleMonadIOEff :: MonadIO (Eff r) => Eff (InterruptableProcess r) a -> Eff r (Either (ExitReason 'NoRecovery) a) -- | Run processes that have the Logs and the Lift effects. -- The user must provide a log handler function. -- -- Log messages are evaluated strict. -- --
-- scheduleIOWithLogging == run . captureLogs . schedule (return ()) --scheduleIOWithLogging :: NFData l => LogWriter l IO -> Eff (InterruptableProcess '[Logs l, LogWriterReader l IO, Lift IO]) a -> IO (Either (ExitReason 'NoRecovery) a) -- | Execute a Process using schedule on top of Lift -- IO and Logs String effects. defaultMainSingleThreaded :: HasCallStack => Eff (InterruptableProcess '[Logs LogMessage, LogWriterReader LogMessage IO, Lift IO]) () -> IO () -- | A SchedulerProxy for LoggingAndIo. singleThreadedIoScheduler :: SchedulerProxy LoggingAndIo instance GHC.Show.Show (Control.Eff.Concurrent.Process.SingleThreadedScheduler.OnYield r a) instance GHC.Show.Show (Control.Eff.Concurrent.Process.SingleThreadedScheduler.STS r m) -- | Implement Erlang style message passing concurrency. -- -- This handles the MessagePassing and Process effects, -- using TQueues and forkIO. -- -- This aims to be a pragmatic implementation, so even logging is -- supported. -- -- At the core is a main process that enters schedule and -- creates all of the internal state stored in TVars to manage -- processes with message queues. -- -- The Eff handler for Process and MessagePassing -- use are implemented and available through spawn. module Control.Eff.Concurrent.Process.ForkIOScheduler -- | This is the main entry point to running a message passing concurrency -- application. This function takes a Process on top of the -- SchedulerIO effect and a LogChannel for concurrent -- logging. schedule :: (HasLogging IO SchedulerIO, HasCallStack) => Eff InterruptableProcEff () -> Eff LoggingAndIO () -- | Start the message passing concurrency system then execute a -- Process on top of SchedulerIO effect. All logging is -- sent to standard output. defaultMain :: HasCallStack => Eff InterruptableProcEff () -> IO () -- | Start the message passing concurrency system then execute a -- Process on top of SchedulerIO effect. All logging is -- sent to standard output. defaultMainWithLogChannel :: HasCallStack => Eff InterruptableProcEff () -> LogChannel LogMessage -> IO () -- | The concrete list of Effects of processes compatible with this -- scheduler. This builds upon SchedulerIO. type ProcEff = ConsProcess SchedulerIO -- | The concrete list of the effects, that the Process uses type InterruptableProcEff = InterruptableProcess SchedulerIO -- | The concrete list of Effects for this scheduler implementation. type SchedulerIO = (Reader SchedulerState : LoggingAndIO) -- | Type class constraint to indicate that an effect union contains the -- effects required by every process and the scheduler implementation -- itself. type HasSchedulerIO r = (HasCallStack, Lifted IO r, SchedulerIO <:: r) -- | A SchedulerProxy for SchedulerIO forkIoScheduler :: SchedulerProxy SchedulerIO instance GHC.Show.Show Control.Eff.Concurrent.Process.ForkIOScheduler.ProcessInfo instance Data.Default.Class.Default Control.Eff.Concurrent.Process.ForkIOScheduler.MessageQ -- | This module contains a mechanism to specify what kind of messages (aka -- requests) a Server (Process) can handle, and if -- the caller blocks and waits for an answer, which the server process -- provides. -- -- The type magic in the Api type familiy allows to define a -- related set of requests along with the corresponding responses. -- -- Request handling can be either blocking, if a response is requred, or -- non-blocking. -- -- A process can serve a specific Api instance by using the -- functions provided by the Control.Eff.Concurrent.Api.Server -- module. -- -- To enable a process to use such a service, the functions -- provided by the Control.Eff.Concurrent.Api.Client should be -- used. module Control.Eff.Concurrent.Api -- | This data family defines an API, a communication interface description -- between at least two processes. The processes act as servers or -- client(s) regarding a specific instance of this type. -- -- The first parameter is usually a user defined phantom type that -- identifies the Api instance. -- -- The second parameter specifies if a specific constructor of an -- (GADT-like) Api instance is Synchronous, i.e. returns -- a result and blocks the caller or if it is Asynchronous -- -- Example: -- --
-- data BookShop deriving Typeable
--
-- data instance Api BookShop r where
-- RentBook :: BookId -> Api BookShop ('Synchronous (Either RentalError RentalId))
-- BringBack :: RentalId -> Api BookShop 'Asynchronous
--
-- type BookId = Int
-- type RentalId = Int
-- type RentalError = String
--
data family Api (api :: Type) (reply :: Synchronicity)
-- | The (promoted) constructors of this type specify (at the type level)
-- the reply behavior of a specific constructor of an Api
-- instance.
data Synchronicity
-- | Specify that handling a request is a blocking operation with a
-- specific return type, e.g. ('Synchronous (Either RentalError
-- RentalId))
Synchronous :: Type -> Synchronicity
-- | Non-blocking, asynchronous, request handling
Asynchronous :: Synchronicity
-- | This is a tag-type that wraps around a ProcessId and holds an
-- Api index type.
newtype Server api
Server :: ProcessId -> Server api
[_fromServer] :: Server api -> ProcessId
fromServer :: forall api_a17VQ api_a1868. Iso (Server api_a17VQ) (Server api_a1868) ProcessId ProcessId
-- | Tag a ProcessId with an Api type index to mark it a
-- Server process handling that API
proxyAsServer :: proxy api -> ProcessId -> Server api
-- | Tag a ProcessId with an Api type index to mark it a
-- Server process handling that API
asServer :: forall api. ProcessId -> Server api
instance forall k (api :: k). GHC.Classes.Ord (Control.Eff.Concurrent.Api.Server api)
instance forall k (api :: k). GHC.Classes.Eq (Control.Eff.Concurrent.Api.Server api)
instance forall k (api :: k). Data.Typeable.Internal.Typeable api => GHC.Show.Show (Control.Eff.Concurrent.Api.Server api)
-- | Functions to implement Api servers.
module Control.Eff.Concurrent.Api.Server
-- | Receive and process incoming requests until the process exits.
serve :: forall a effScheduler. (Servable a, SetMember Process (Process effScheduler) (ServerEff a), Member Interrupts (ServerEff a), HasCallStack) => SchedulerProxy effScheduler -> a -> Eff (ServerEff a) ()
-- | Spawn a new process, that will receive and process incoming requests
-- until the process exits.
spawnServer :: forall a effScheduler eff. (Servable a, ServerEff a ~ InterruptableProcess effScheduler, SetMember Process (Process effScheduler) eff, Member Interrupts eff, HasCallStack) => SchedulerProxy effScheduler -> a -> Eff eff (ServerPids a)
-- | Spawn a new process, that will receive and process incoming requests
-- until the process exits. Also handle all internal effects.
spawnServerWithEffects :: forall a effScheduler eff. (Servable a, SetMember Process (Process effScheduler) (ServerEff a), SetMember Process (Process effScheduler) eff, Member Interrupts eff, Member Interrupts (ServerEff a), HasCallStack) => SchedulerProxy effScheduler -> a -> (Eff (ServerEff a) () -> Eff (InterruptableProcess effScheduler) ()) -> Eff eff (ServerPids a)
-- | A record of callbacks, handling requests sent to a server
-- Process, all belonging to a specific Api family
-- instance. The values of this type can be serveed or combined
-- via Servable or ServerCallbacks.
data ApiHandler api eff
[ApiHandler] :: {_castCallback :: Maybe (Api api 'Asynchronous -> Eff eff ApiServerCmd) " A cast will not return a result directly. This is used for async methods. This returns an 'ApiServerCmd' to the server loop.", _callCallback :: forall reply. Maybe (Api api ( 'Synchronous reply) -> (reply -> Eff eff ()) -> Eff eff ApiServerCmd) " A call is a blocking operation, the caller is blocked until this handler calls the reply continuation. This returns an 'ApiServerCmd' to the server loop.", _terminateCallback :: Maybe (ExitReason 'Recoverable -> Eff eff ()) " This callback is called with @Nothing@ if one of these things happen: * the process exits * '_callCallback' or '_castCallback' return 'StopApiServer' If the process exist peacefully the parameter is 'NotServerCallbacking', otherwise @Just "error message..."@ if the process exits with an error. The default behavior is defined in 'defaultTermination'."} -> ApiHandler api eff
castCallback :: forall api_a18Ad eff_a18Ae. Lens' (ApiHandler api_a18Ad eff_a18Ae) (Maybe (Api api_a18Ad 'Asynchronous -> Eff eff_a18Ae ApiServerCmd))
callCallback :: forall api_a18Ad eff_a18Ae reply_a18Ah. Getter (ApiHandler api_a18Ad eff_a18Ae) (Maybe (Api api_a18Ad ( 'Synchronous reply_a18Ah) -> (reply_a18Ah -> Eff eff_a18Ae ()) -> Eff eff_a18Ae ApiServerCmd))
terminateCallback :: forall api_a18Ad eff_a18Ae. Lens' (ApiHandler api_a18Ad eff_a18Ae) (Maybe (ExitReason 'Recoverable -> Eff eff_a18Ae ()))
-- | Create an ApiHandler with a _castCallback, a
-- _callCallback and a _terminateCallback implementation.
apiHandler :: (Api api 'Asynchronous -> Eff e ApiServerCmd) -> (forall r. Api api ( 'Synchronous r) -> (r -> Eff e ()) -> Eff e ApiServerCmd) -> (ExitReason 'Recoverable -> Eff e ()) -> ApiHandler api e
-- | Like apiHandler but the server will loop until an error is
-- raised or the process exits. The callback actions won't decide wether
-- to stop the server or not, instead the ApiServerCmd
-- HandleNextRequest is used.
apiHandlerForever :: (Api api 'Asynchronous -> Eff e ()) -> (forall r. Api api ( 'Synchronous r) -> (r -> Eff e ()) -> Eff e ()) -> (ExitReason 'Recoverable -> Eff e ()) -> ApiHandler api e
-- | Create an ApiHandler with only a _castCallback
-- implementation.
castHandler :: (Api api 'Asynchronous -> Eff eff ApiServerCmd) -> ApiHandler api eff
-- | Like castHandler but the server will loop until an error is
-- raised or the process exits. See apiHandlerForver.
castHandlerForever :: (Api api 'Asynchronous -> Eff eff ()) -> ApiHandler api eff
-- | Create an ApiHandler with only a _callCallback
-- implementation.
callHandler :: (forall r. Api api ( 'Synchronous r) -> (r -> Eff e ()) -> Eff e ApiServerCmd) -> ApiHandler api e
-- | Like callHandler but the server will loop until an error is
-- raised or the process exits. See apiHandlerForver.
callHandlerForever :: (forall r. Api api ( 'Synchronous r) -> (r -> Eff e ()) -> Eff e ()) -> ApiHandler api e
-- | Create an ApiHandler with only a _castCallback and
-- _callCallback implementation.
castAndCallHandler :: (Api api 'Asynchronous -> Eff e ApiServerCmd) -> (forall r. Api api ( 'Synchronous r) -> (r -> Eff e ()) -> Eff e ApiServerCmd) -> ApiHandler api e
-- | Like castAndCallHandler but the server will loop until an error
-- is raised or the process exits. See apiHandlerForver.
castAndCallHandlerForever :: (Api api 'Asynchronous -> Eff e ()) -> (forall r. Api api ( 'Synchronous r) -> (r -> Eff e ()) -> Eff e ()) -> ApiHandler api e
-- | A command to the server loop started e.g. by server or
-- spawnServerWithEffects. Typically returned by an
-- ApiHandler member to indicate if the server should continue or
-- stop.
data ApiServerCmd
-- | Tell the server to keep the server loop running
[HandleNextRequest] :: ApiServerCmd
-- | Tell the server to exit, this will make serve stop handling
-- requests without exitting the process. _terminateCallback will
-- be invoked with the given optional reason.
[StopApiServer] :: ExitReason 'Recoverable -> ApiServerCmd
-- | A default handler to use in _callCallback in ApiHandler.
-- It will call raiseError with a nice error message.
unhandledCallError :: forall p x r q. (Typeable p, HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Api p ( 'Synchronous x) -> (x -> Eff r ()) -> Eff r ApiServerCmd
-- | A default handler to use in _castCallback in ApiHandler.
-- It will call raiseError with a nice error message.
unhandledCastError :: forall p r q. (Typeable p, HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Api p 'Asynchronous -> Eff r ApiServerCmd
-- | Either do nothing, if the error message is Nothing, or call
-- exitWithError with the error message.
defaultTermination :: forall q r. (HasCallStack, SetMember Process (Process q) r, Member (Logs LogMessage) r) => SchedulerProxy q -> ExitReason 'Recoverable -> Eff r ()
-- | Helper type class to allow composition of ApiHandler.
class Servable a where {
-- | The effect of the callbacks
type family ServerEff a :: [Type -> Type];
-- | The is used to let the spawn function return multiple Server
-- ProcessIds in a type safe way, e.g. for a tuple instance of
-- this class (Server a, Server b)
type family ServerPids a;
}
-- | The is used to let the spawn function return multiple Server
-- ProcessIds in a type safe way.
toServerPids :: Servable a => proxy a -> ProcessId -> ServerPids a
-- | Convert the value to a ServerCallback
toServerCallback :: (Servable a, Member Interrupts (ServerEff a), SetMember Process (Process effScheduler) (ServerEff a)) => SchedulerProxy effScheduler -> a -> ServerCallback (ServerEff a)
-- | Building block for composition of ApiHandler. A wrapper for
-- ApiHandler. Use this to combine ApiHandler, allowing a
-- process to implement several Api instances. The termination
-- will be evenly propagated. Create this via e.g. Servable
-- instances To serve multiple apis use <> to combine server
-- callbacks, e.g.
--
-- @@ let f = apiHandlerServerCallback px $ ApiHandler ... g =
-- apiHandlerServerCallback px $ ApiHandler ... h = f <> g in serve
-- px h @@
data ServerCallback eff
ServerCallback :: MessageSelector (Eff eff ApiServerCmd) -> (ExitReason 'Recoverable -> Eff eff ()) -> ServerCallback eff
[_requestHandlerSelector] :: ServerCallback eff -> MessageSelector (Eff eff ApiServerCmd)
[_terminationHandler] :: ServerCallback eff -> ExitReason 'Recoverable -> Eff eff ()
requestHandlerSelector :: forall eff_a18Mt. Lens' (ServerCallback eff_a18Mt) (MessageSelector (Eff eff_a18Mt ApiServerCmd))
terminationHandler :: forall eff_a18Mt. Lens' (ServerCallback eff_a18Mt) (ExitReason 'Recoverable -> Eff eff_a18Mt ())
instance Control.Eff.Concurrent.Api.Server.Servable (Control.Eff.Concurrent.Api.Server.ServerCallback eff)
instance Data.Typeable.Internal.Typeable a => Control.Eff.Concurrent.Api.Server.Servable (Control.Eff.Concurrent.Api.Server.ApiHandler a eff)
instance (Control.Eff.Concurrent.Api.Server.ServerEff a Data.Type.Equality.~ Control.Eff.Concurrent.Api.Server.ServerEff b, Control.Eff.Concurrent.Api.Server.Servable a, Control.Eff.Concurrent.Api.Server.Servable b) => Control.Eff.Concurrent.Api.Server.Servable (a, b)
instance GHC.Base.Semigroup (Control.Eff.Concurrent.Api.Server.ServerCallback eff)
instance GHC.Base.Monoid (Control.Eff.Concurrent.Api.Server.ServerCallback eff)
instance GHC.Generics.Generic Control.Eff.Concurrent.Api.Server.ApiServerCmd
instance GHC.Show.Show Control.Eff.Concurrent.Api.Server.ApiServerCmd
instance Data.Default.Class.Default (Control.Eff.Concurrent.Api.Server.ApiHandler api eff)
instance Control.DeepSeq.NFData Control.Eff.Concurrent.Api.Server.ApiServerCmd
-- | Functions for Api clients.
--
-- This modules is required to write clients that consume an Api.
module Control.Eff.Concurrent.Api.Client
-- | Send an Api request that has no return value and return as fast
-- as possible. The type signature enforces that the corresponding
-- Api clause is Asynchronous. The operation never fails,
-- if it is important to know if the message was delivered, use
-- call instead.
cast :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r, Typeable o, Typeable (Api o 'Asynchronous)) => SchedulerProxy q -> Server o -> Api o 'Asynchronous -> Eff r ()
-- | Send an Api request and wait for the server to return a result
-- value.
--
-- The type signature enforces that the corresponding Api clause
-- is Synchronous.
call :: forall result api r q. (SetMember Process (Process q) r, Member Interrupts r, Typeable api, Typeable (Api api ( 'Synchronous result)), Typeable result, HasCallStack, NFData result, Show result) => SchedulerProxy q -> Server api -> Api api ( 'Synchronous result) -> Eff r result
-- | Like cast but take the Server from the reader provided
-- by registerServer.
castRegistered :: (Typeable o, ServesApi o r q, HasCallStack, Member Interrupts r) => SchedulerProxy q -> Api o 'Asynchronous -> Eff r ()
-- | Like call but take the Server from the reader provided
-- by registerServer.
callRegistered :: (Typeable reply, ServesApi o r q, HasCallStack, NFData reply, Show reply, Member Interrupts r) => SchedulerProxy q -> Api o ( 'Synchronous reply) -> Eff r reply
-- | Instead of passing around a Server value and passing to
-- functions like cast or call, a Server can
-- provided by a Reader effect, if there is only a single
-- server for a given Api instance. This type alias is
-- convenience to express that an effect has Process and a reader
-- for a Server.
type ServesApi o r q = (Typeable o, SetMember Process (Process q) r, Member (ServerReader o) r)
-- | The reader effect for ProcessIds for Apis, see
-- registerServer
type ServerReader o = Reader (Server o)
-- | Get the Server registered with registerServer.
whereIsServer :: Member (ServerReader o) e => Eff e (Server o)
-- | Run a reader effect that contains the one server handling a
-- specific Api instance.
registerServer :: HasCallStack => Server o -> Eff (ServerReader o : r) a -> Eff r a
-- | This module provides support for executing Process actions from
-- IO.
--
-- One use case is interacting with processes from the REPL, e.g.:
--
-- -- >>> import Control.Eff.Concurrent.Process.SingleThreadedScheduler (defaultMain) ---- --
-- >>> import Control.Eff.Loop ---- --
-- >>> import Data.Dynamic ---- --
-- >>> import Data.Maybe ---- --
-- >>> s <- forkInteractiveScheduler Control.Eff.Concurrent.Process.SingleThreadedScheduler.defaultMain ---- --
-- >>> fooPid <- submit s (spawn (foreverCheap (receiveAnyMessage SP >>= (logMsg . fromMaybe "Huh!??" . fromDynamic)))) ---- --
-- >>> fooPid -- <0.1.0> ---- --
-- >>> submit s (sendMessageAs SP fooPid "test") -- test ---- --
-- >>> submit s (sendShutdown SP fooPid) --module Control.Eff.Concurrent.Process.Interactive -- | Contains the communication channels to interact with a scheduler -- running in its' own thread. data SchedulerSession r -- | Fork a scheduler with a process that communicates with it via -- MVar, which is also the reason for the Lift IO -- constraint. forkInteractiveScheduler :: forall r. SetMember Lift (Lift IO) r => (Eff (InterruptableProcess r) () -> IO ()) -> IO (SchedulerSession r) -- | Exit the schedulder immediately using an asynchronous exception. killInteractiveScheduler :: SchedulerSession r -> IO () -- | Send a Process effect to the main process of a scheduler, this -- blocks until the effect is executed. submit :: forall r a. SetMember Lift (Lift IO) r => SchedulerSession r -> Eff (InterruptableProcess r) a -> IO a -- | Combination of submit and cast. submitCast :: forall o r. (SetMember Lift (Lift IO) r, Typeable o, Member Interrupts r) => SchedulerSession r -> Server o -> Api o 'Asynchronous -> IO () -- | Combination of submit and cast. submitCall :: forall o q r. (SetMember Lift (Lift IO) r, Typeable o, Typeable q, NFData q, Show q, Member Interrupts r) => SchedulerSession r -> Server o -> Api o ( 'Synchronous q) -> IO q -- | Observer Effects -- -- This module supports the implementation of observers and observables. -- One use case is event propagation. The tools in this module are -- tailored towards Api servers/clients. module Control.Eff.Concurrent.Api.Observer -- | An Api index that support observation of the another Api -- that is Observable. class (Typeable p, Observable o) => Observer p o -- | Wrap the Observation and the ProcessId (i.e. the -- Server) that caused the observation into an Api value -- that the Observable understands. observationMessage :: Observer p o => Server o -> Observation o -> Api p 'Asynchronous -- | An Api index that supports registration and de-registration of -- Observers. class (Typeable o, Typeable (Observation o)) => Observable o where { -- | Type of observations visible on this observable data family Observation o; } -- | Return the Api value for the cast_ that registeres an -- observer registerObserverMessage :: Observable o => SomeObserver o -> Api o 'Asynchronous -- | Return the Api value for the cast_ that de-registeres -- an observer forgetObserverMessage :: Observable o => SomeObserver o -> Api o 'Asynchronous -- | Send an Observation to an Observer notifyObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack, Member Interrupts r) => SchedulerProxy q -> Server p -> Server o -> Observation o -> Eff r () -- | Send the registerObserverMessage registerObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack, Member Interrupts r) => SchedulerProxy q -> Server p -> Server o -> Eff r () -- | Send the forgetObserverMessage forgetObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, Member Interrupts r) => SchedulerProxy q -> Server p -> Server o -> Eff r () -- | An existential wrapper around a Server of an Observer. -- Needed to support different types of observers to observe the same -- Observable in a general fashion. data SomeObserver o [SomeObserver] :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o -- | Send an Observation to SomeObserver. notifySomeObserver :: (SetMember Process (Process q) r, Observable o, HasCallStack, Member Interrupts r) => SchedulerProxy q -> Server o -> Observation o -> SomeObserver o -> Eff r () -- | Internal state for manageObservers data Observers o -- | Alias for the effect that contains the observers managed by -- manageObservers type ObserverState o = State (Observers o) -- | Keep track of registered Observers Observers can be added and -- removed, and an Observation can be sent to all registerd -- observers at once. manageObservers :: Eff (ObserverState o : r) a -> Eff r a -- | Add an Observer to the Observers managed by -- manageObservers. addObserver :: (SetMember Process (Process q) r, Member (ObserverState o) r, Observable o) => SomeObserver o -> Eff r () -- | Delete an Observer from the Observers managed by -- manageObservers. removeObserver :: (SetMember Process (Process q) r, Member (ObserverState o) r, Observable o, Member Interrupts r) => SomeObserver o -> Eff r () -- | Send an Observation to all SomeObservers in the -- Observers state. notifyObservers :: forall o r q. (Observable o, SetMember Process (Process q) r, Member (ObserverState o) r, Member Interrupts r) => SchedulerProxy q -> Observation o -> Eff r () -- | An Observer that schedules the observations to an effectful -- callback. data CallbackObserver o -- | Start a new process for an Observer that schedules all -- observations to an effectful callback. spawnCallbackObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q, Member Interrupts r, HasCallStack) => SchedulerProxy q -> (Server o -> Observation o -> Eff (InterruptableProcess q) ApiServerCmd) -> Eff r (Server (CallbackObserver o)) -- | Start a new process for an Observer that schedules all -- observations to an effectful callback. spawnLoggingObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q, Member (Logs LogMessage) r, Member Interrupts r, HasCallStack) => SchedulerProxy q -> Eff r (Server (CallbackObserver o)) instance GHC.Show.Show (Control.Eff.Concurrent.Api.Observer.SomeObserver o) instance GHC.Show.Show (Control.Eff.Concurrent.Api.Observer.Observation o) => GHC.Show.Show (Control.Eff.Concurrent.Api.Api (Control.Eff.Concurrent.Api.Observer.CallbackObserver o) r) instance Control.Eff.Concurrent.Api.Observer.Observable o => Control.Eff.Concurrent.Api.Observer.Observer (Control.Eff.Concurrent.Api.Observer.CallbackObserver o) o instance GHC.Classes.Ord (Control.Eff.Concurrent.Api.Observer.SomeObserver o) instance GHC.Classes.Eq (Control.Eff.Concurrent.Api.Observer.SomeObserver o) -- | Experimental new Api server handler. module Control.Eff.Concurrent.Api.Server2 -- | Serve an Api in a newly spawned process. spawnApiServer :: forall api eff. (ToServerPids api, HasCallStack) => MessageCallback api (InterruptableProcess eff) -> InterruptCallback (ConsProcess eff) -> Eff (InterruptableProcess eff) (ServerPids api) -- | Serve an Api in a newly spawned -and linked - process. spawnLinkApiServer :: forall api eff. (ToServerPids api, HasCallStack) => MessageCallback api (InterruptableProcess eff) -> InterruptCallback (ConsProcess eff) -> Eff (InterruptableProcess eff) (ServerPids api) -- | Server an Api in a newly spawned process; the callbacks -- have access to some state initialed by the function in the first -- parameter. spawnApiServerStateful :: forall api eff state. (HasCallStack, ToServerPids api) => Eff (InterruptableProcess eff) state -> MessageCallback api (State state : InterruptableProcess eff) -> InterruptCallback (State state : ConsProcess eff) -> Eff (InterruptableProcess eff) (ServerPids api) -- | Server an Api in a newly spawned process; The caller -- provides an effect handler for arbitrary effects used by the server -- callbacks. spawnApiServerEffectful :: forall api eff serverEff. (HasCallStack, ToServerPids api, Member Interrupts serverEff, SetMember Process (Process eff) serverEff) => (forall b. Eff serverEff b -> Eff (InterruptableProcess eff) b) -> MessageCallback api serverEff -> InterruptCallback serverEff -> Eff (InterruptableProcess eff) (ServerPids api) -- | Server an Api in a newly spawned process; The caller -- provides an effect handler for arbitrary effects used by the server -- callbacks. Links to the calling process like linkProcess would. spawnLinkApiServerEffectful :: forall api eff serverEff. (HasCallStack, ToServerPids api, Member Interrupts serverEff, SetMember Process (Process eff) serverEff) => (forall b. Eff serverEff b -> Eff (InterruptableProcess eff) b) -> MessageCallback api serverEff -> InterruptCallback serverEff -> Eff (InterruptableProcess eff) (ServerPids api) -- | Receive loop for Api calls. This starts a receive loop -- for a MessageCallback. It is used behind the scenes by -- spawnLinkApiServerEffectful and spawnApiServerEffectful. apiServerLoop :: forall api eff serverEff. (HasCallStack, ToServerPids api, Member Interrupts serverEff, SetMember Process (Process eff) serverEff) => MessageCallback api serverEff -> InterruptCallback serverEff -> Eff serverEff () -- | A command to the server loop started e.g. by server or -- spawnServerWithEffects. Typically returned by an -- ApiHandler member to indicate if the server should continue -- or stop. data CallbackResult -- | Tell the server to keep the server loop running [AwaitNext] :: CallbackResult -- | Tell the server to exit, this will make serve stop handling -- requests without exitting the process. _terminateCallback -- will be invoked with the given optional reason. [StopServer] :: InterruptReason -> CallbackResult -- | An existential wrapper around a MessageSelector and a function -- that handles the selected message. The api type parameter is -- a phantom type. -- -- The return value if the handler function is a CallbackResult. data MessageCallback api eff [MessageCallback] :: MessageSelector a -> (a -> Eff eff CallbackResult) -> MessageCallback api eff -- | A smart constructor for MessageCallbacks handleCasts :: forall api eff. (HasCallStack, Typeable api, Typeable (Api api 'Asynchronous)) => (Api api 'Asynchronous -> Eff eff CallbackResult) -> MessageCallback api eff -- | A smart constructor for MessageCallbacks -- --
-- handleCalls SP -- (\ (RentBook bookId customerId) runCall -> -- runCall $ do -- rentalIdE <- rentBook bookId customerId -- case rentalIdE of -- -- on fail we just don't send a reply, let the caller run into -- -- timeout -- Left err -> return (Nothing, AwaitNext) -- Right rentalId -> return (Just rentalId, AwaitNext)) --handleCalls :: forall api eff effScheduler. (HasCallStack, Typeable api, SetMember Process (Process effScheduler) eff, Member Interrupts eff) => SchedulerProxy effScheduler -> (forall secret reply. (Typeable reply, Typeable (Api api ( 'Synchronous reply))) => Api api ( 'Synchronous reply) -> (Eff eff (Maybe reply, CallbackResult) -> secret) -> secret) -> MessageCallback api eff -- | A smart constructor for MessageCallbacks handleCastsAndCalls :: forall api eff effScheduler. (HasCallStack, Typeable api, Typeable (Api api 'Asynchronous), SetMember Process (Process effScheduler) eff, Member Interrupts eff) => SchedulerProxy effScheduler -> (Api api 'Asynchronous -> Eff eff CallbackResult) -> (forall secret reply. (Typeable reply, Typeable (Api api ( 'Synchronous reply))) => Api api ( 'Synchronous reply) -> (Eff eff (Maybe reply, CallbackResult) -> secret) -> secret) -> MessageCallback api eff -- | A variation of handleCalls that allows to defer a reply to a -- call. handleCallsDeferred :: forall api eff effScheduler. (HasCallStack, Typeable api, SetMember Process (Process effScheduler) eff, Member Interrupts eff) => SchedulerProxy effScheduler -> (forall reply. (Typeable reply, Typeable (Api api ( 'Synchronous reply))) => Api api ( 'Synchronous reply) -> (reply -> Eff eff ()) -> Eff eff CallbackResult) -> MessageCallback api eff -- | A smart constructor for MessageCallbacks handleMessages :: forall eff a. (HasCallStack, NFData a, Typeable a) => (a -> Eff eff CallbackResult) -> MessageCallback '[] eff -- | A smart constructor for MessageCallbacks handleSelectedMessages :: forall eff a. HasCallStack => MessageSelector a -> (a -> Eff eff CallbackResult) -> MessageCallback '[] eff -- | A smart constructor for MessageCallbacks handleAnyMessages :: forall eff. HasCallStack => (Dynamic -> Eff eff CallbackResult) -> MessageCallback '[] eff -- | A smart constructor for MessageCallbacks handleProcessDowns :: forall eff. HasCallStack => (MonitorReference -> Eff eff CallbackResult) -> MessageCallback '[] eff -- | A fallbackHandler that drops the left-over messages. dropUnhandledMessages :: forall eff. HasCallStack => MessageCallback '[] eff -- | A fallbackHandler that terminates if there are unhandled -- messages. exitOnUnhandled :: forall eff. HasCallStack => MessageCallback '[] eff -- | A fallbackHandler that drops the left-over messages. logUnhandledMessages :: forall eff. (Member (Logs LogMessage) eff, HasCallStack) => MessageCallback '[] eff -- | Compose two Apis to a type-leve pair of them. -- --
-- handleCalls api1calls ^: handleCalls api2calls ^: --(^:) :: forall (api1 :: Type) (apis2 :: [Type]) eff. HasCallStack => MessageCallback api1 eff -> MessageCallback apis2 eff -> MessageCallback (api1 : apis2) eff infixr 5 ^: -- | Make a fallback handler, i.e. a handler to which no other can be -- composed to from the right. fallbackHandler :: forall api eff. HasCallStack => MessageCallback api eff -> MessageCallback '[] eff -- | Helper type class for the return values of spawnApiServer et -- al. class ToServerPids (t :: k) where { type family ServerPids t; } toServerPids :: ToServerPids t => proxy t -> ProcessId -> ServerPids t -- | Just a wrapper around a function that will be applied to the result of -- a MessageCallbacks StopServer clause, or an -- InterruptReason caught during the execution of receive -- or a MessageCallback data InterruptCallback eff [InterruptCallback] :: (InterruptReason -> Eff eff CallbackResult) -> InterruptCallback eff -- | A smart constructor for InterruptCallbacks stopServerOnInterrupt :: forall eff. HasCallStack => InterruptCallback eff -- | Apply a given callback function to incoming Observerations. handleObservations :: Typeable o => (Server o -> Observation o -> Eff e CallbackResult) -> MessageCallback (Observing o) e -- | An Api type for generic Observers, see -- handleObservations. data Observing o instance (Data.Typeable.Internal.Typeable o, Control.Eff.Concurrent.Api.Observer.Observable o) => Control.Eff.Concurrent.Api.Observer.Observer (Control.Eff.Concurrent.Api.Server2.Observing o) o instance Data.Default.Class.Default (Control.Eff.Concurrent.Api.Server2.InterruptCallback eff) instance Control.Eff.Concurrent.Api.Server2.ToServerPids '[] instance (Control.Eff.Concurrent.Api.Server2.ToServerPids api1, Control.Eff.Concurrent.Api.Server2.ToServerPids api2) => Control.Eff.Concurrent.Api.Server2.ToServerPids (api1 : api2) instance Control.Eff.Concurrent.Api.Server2.ToServerPids api1 => Control.Eff.Concurrent.Api.Server2.ToServerPids api1 instance forall k (api :: k) (eff :: [* -> *]). GHC.Base.Semigroup (Control.Eff.Concurrent.Api.Server2.MessageCallback api eff) instance forall k (api :: k) (eff :: [* -> *]). GHC.Base.Monoid (Control.Eff.Concurrent.Api.Server2.MessageCallback api eff) instance forall k (api :: k) (eff :: [* -> *]). Data.Default.Class.Default (Control.Eff.Concurrent.Api.Server2.MessageCallback api eff) -- | Capture Observations and enqueue then into an STM -- TBQeueu. module Control.Eff.Concurrent.Api.Observer.Queue -- | Contains a TBQueue capturing observations received by -- enqueueObservationsRegistered or enqueueObservations. data ObservationQueue a -- | A Reader for an ObservationQueue. type ObservationQueueReader a = Reader (ObservationQueue a) -- | Read queued observations captured by observing a Server that -- implements an Observable Api using -- enqueueObservationsRegistered or enqueueObservations. -- This blocks until the next Observation received. For a -- non-blocking variant use tryReadObservationQueue or -- flushObservationQueue. readObservationQueue :: forall o r. (Member (ObservationQueueReader o) r, HasCallStack, MonadIO (Eff r), Typeable o, HasLogging IO r) => Eff r (Observation o) -- | Read queued observations captured by observing a Server that -- implements an Observable Api using -- enqueueObservationsRegistered or enqueueObservations. -- Return the next Observation immediately or Nothing if -- the queue is empty. Use readObservationQueue to block until an -- observation is observed. tryReadObservationQueue :: forall o r. (Member (ObservationQueueReader o) r, HasCallStack, MonadIO (Eff r), Typeable o, HasLogging IO r) => Eff r (Maybe (Observation o)) -- | Read all currently queued Observations captured by -- enqueueObservations. This returns immediately all currently -- enqueued Observations. For a blocking variant use -- readObservationQueue. flushObservationQueue :: forall o r. (Member (ObservationQueueReader o) r, HasCallStack, MonadIO (Eff r), Typeable o, HasLogging IO r) => Eff r [Observation o] -- | Observe a(the) registered Server that implements an -- Observable Api. Based on enqueueObservations. enqueueObservationsRegistered :: forall o r q a. (ServesApi o r q, SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, HasLogging IO q, HasLogging IO r, Member Interrupts r, Lifted IO r, HasCallStack) => SchedulerProxy q -> Int -> Eff (ObservationQueueReader o : r) a -> Eff r a -- | Observe a Server that implements an Observable -- Api, the Observations can be obtained by -- readObservationQueue. All observations are captured up to the -- queue size limit, such that the first message received will be first -- message returned by readObservationQueue. -- -- This function captures runtime exceptions and cleans up accordingly. enqueueObservations :: forall o r q a. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, HasLogging IO r, HasLogging IO q, Member Interrupts r, Lifted IO q, HasCallStack) => SchedulerProxy q -> Server o -> Int -> Eff (ObservationQueueReader o : r) a -> Eff r a -- | In rare occasions GHC optimizes innocent looking loops into -- space-leaking monsters. See the discussion here: GHC issue -- 13080 for more details, or this blog post about space leaks in -- nested loops -- -- These functions in this module might help, at least in -- conjunction with the -fno-full-laziness GHC option. -- -- There is a unit test in the sources of this module, which can be used -- to do a comperative heap profiling of these function vs. their -- counterparts in the base package. -- -- Here are the images of the profiling results, the images show that the -- functions in this module do not leak space, compared to the original -- functions (forever and replicateM_): -- module Control.Eff.Loop -- | A version of forever that hopefully tricks GHC into -- not creating a space leak. The intuition is, that we -- want to do something that is cheap, and hence should be -- recomputed instead of shared. foreverCheap :: Monad m => m a -> m () -- | A version of replicateM_ that hopefully tricks GHC into -- not creating a space leak. The intuition is, that we -- want to do something that is cheap, and hence should be -- recomputed instead of shared. replicateCheapM_ :: Monad m => Int -> m a -> m () -- | Erlang style processes with message passing concurrency based on -- (more) extensible-effects. module Control.Eff.Concurrent -- | Each process is identified by a single process id, that stays constant -- throughout the life cycle of a process. Also, message sending relies -- on these values to address messages to processes. newtype ProcessId ProcessId :: Int -> ProcessId [_fromProcessId] :: ProcessId -> Int -- | A monitored process exited. This message is sent to a process by the -- scheduler, when a process that was monitored via a -- SchedulerCommand died. data ProcessDown ProcessDown :: !MonitorReference -> !SomeExitReason -> ProcessDown [downReference] :: ProcessDown -> !MonitorReference [downReason] :: ProcessDown -> !SomeExitReason -- | A value that contains a unique reference of a process monitoring. data MonitorReference MonitorReference :: Int -> ProcessId -> MonitorReference [monitorIndex] :: MonitorReference -> Int [monitoredProcess] :: MonitorReference -> ProcessId -- | An existential wrapper around ExitReason data SomeExitReason [SomeExitReason] :: ExitReason x -> SomeExitReason -- | This adds a layer of the Interrupts effect ontop of -- ConsProcess type InterruptableProcess e = Interrupts : ConsProcess e -- | Exceptions containing InterruptReasons. See -- handleInterrupts, exitOnInterrupt or -- provideInterrupts type Interrupts = Exc InterruptReason -- | ExitReasons which are recoverable are interrupts. type InterruptReason = ExitReason 'Recoverable -- | A sum-type with reasons for why a process exists the scheduling loop, -- this includes errors, that can occur when scheduleing messages. data ExitReason (t :: ExitRecovery) -- | A process has finished a unit of work and might exit or work on -- something else. This is primarily used for interupting infinite server -- loops, allowing for additional cleanup work before exitting (e.g. with -- ExitNormally) [ProcessFinished] :: ExitReason 'Recoverable -- | A process that should be running was not running. [ProcessNotRunning] :: ProcessId -> ExitReason 'Recoverable -- | A linked process is down [LinkedProcessCrashed] :: ProcessId -> ExitReason 'Recoverable -- | An exit reason that has an error message but isn't Recoverable. [ProcessError] :: String -> ExitReason 'Recoverable -- | A process function returned or exited without any error. [ExitNormally] :: ExitReason 'NoRecovery -- | An unhandled Recoverable allows NoRecovery. [NotRecovered] :: ExitReason 'Recoverable -> ExitReason 'NoRecovery -- | An unexpected runtime exception was thrown, i.e. an exception derived -- from SomeException [UnexpectedException] :: String -> String -> ExitReason 'NoRecovery -- | A process was cancelled (e.g. killed, in cancel) [Killed] :: ExitReason 'NoRecovery -- | This value indicates wether a process exited in way consistent with -- the planned behaviour or not. data ExitSeverity NormalExit :: ExitSeverity Crash :: ExitSeverity -- | This kind is used to indicate if a ExitReason can be treated -- like a short interrupt which can be handled or ignored. data ExitRecovery Recoverable :: ExitRecovery NoRecovery :: ExitRecovery -- | The state that a Process is currently in. data ProcessState -- | The process has just been started but not called -- handleProcess yet. ProcessBooting :: ProcessState -- | The process yielded it's timeslice ProcessIdle :: ProcessState -- | The process is busy with non-blocking ProcessBusy :: ProcessState -- | The process is busy with sending a message ProcessBusySending :: ProcessState -- | The process is busy with killing ProcessBusySendingShutdown :: ProcessState -- | The process is busy with killing ProcessBusySendingInterrupt :: ProcessState -- | The process blocked by a receiveAnyMessage ProcessBusyReceiving :: ProcessState -- | The process blocked by a linkProcess ProcessBusyLinking :: ProcessState -- | The process blocked by a unlinkProcess ProcessBusyUnlinking :: ProcessState -- | The process blocked by a monitor ProcessBusyMonitoring :: ProcessState -- | The process blocked by a demonitor ProcessBusyDemonitoring :: ProcessState -- | The process was interrupted ProcessInterrupted :: ProcessState -- | The process was shutdown or crashed ProcessShuttingDown :: ProcessState -- | Cons Process onto a list of effects. type ConsProcess r = Process r : r -- | A constraint for the implicit SchedulerProxy parameter. Use -- getSchedulerProxy to query it. _EXPERIMENTAL_ type HasScheduler q = (?_schedulerProxy :: SchedulerProxy q) -- | Every function for Process things needs such a proxy value for -- the low-level effect list, i.e. the effects identified by -- r in Process r : r, this might be -- dependent on the scheduler implementation. data SchedulerProxy :: [Type -> Type] -> Type -- | Tell the typechecker what effects we have below Process [SchedulerProxy] :: SchedulerProxy q -- | Like SchedulerProxy but shorter [SP] :: SchedulerProxy q -- | Like SP but different [Scheduler] :: SchedulerProxy q -- | A function that deciced if the next message will be received by -- ReceiveSelectedMessage. It conveniently is an instance of -- Alternative so the message selector can be combined: > > -- selectInt :: MessageSelector Int > selectInt = selectMessage > -- > selectString :: MessageSelector String > selectString = -- selectMessage > > selectIntOrString :: MessageSelector (Either -- Int String) > selectIntOrString = > Left $ -- selectTimeout| Right $ selectString data MessageSelector a -- | Every Process action returns it's actual result wrapped in this -- type. It will allow to signal errors as well as pass on normal results -- such as incoming messages. data ResumeProcess v -- | The current operation of the process was interrupted with a -- ExitReason. If isRecoverable holds for the given reason, -- the process may choose to continue. [Interrupted] :: InterruptReason -> ResumeProcess v -- | The process may resume to do work, using the given result. [ResumeWith] :: a -> ResumeProcess a -- | The process effect is the basis for message passing concurrency. This -- effect describes an interface for concurrent, communicating isolated -- processes identified uniquely by a process-id. -- -- Processes can raise exceptions that can be caught, exit gracefully or -- with an error, or be killed by other processes, with the option of -- ignoring the shutdown request. -- -- Process Scheduling is implemented in different modules. All scheduler -- implementations should follow some basic rules: -- --
-- logCrash :: ExitReason -> Eff e () -- logCrash (toCrashReason -> Just reason) = logError reason -- logCrash _ = return () ---- -- Though this can be improved to: -- --
-- logCrash = traverse_ logError . toCrashReason --toCrashReason :: ExitReason x -> Maybe String -- | Log the ProcessExitReaons logProcessExit :: (HasCallStack, Member (Logs LogMessage) e) => ExitReason x -> Eff e () -- | Execute a and action and return the result; if the process is -- interrupted by an error or exception, or an explicit shutdown from -- another process, or through a crash of a linked process, i.e. whenever -- the exit reason satisfies isRecoverable, return the exit -- reason. executeAndResume :: forall q r v. (SetMember Process (Process q) r, HasCallStack) => Process q (ResumeProcess v) -> Eff r (Either (ExitReason 'Recoverable) v) -- | Execute a Process action and resume the process, exit the -- process when an Interrupts was raised. Use -- executeAndResume to catch interrupts. executeAndResumeOrExit :: forall r q v. (SetMember Process (Process q) r, HasCallStack) => Process q (ResumeProcess v) -> Eff r v -- | Execute a Process action and resume the process, exit the -- process when an Interrupts was raised. Use -- executeAndResume to catch interrupts. executeAndResumeOrThrow :: forall q r v. (SetMember Process (Process q) r, HasCallStack, Member Interrupts r) => Process q (ResumeProcess v) -> Eff r v -- | Use executeAndResumeOrExit to execute YieldProcess. -- Refer to YieldProcess for more information. yieldProcess :: forall r q. (SetMember Process (Process q) r, HasCallStack, Member Interrupts r) => SchedulerProxy q -> Eff r () -- | Send a message to a process addressed by the ProcessId. See -- SendMessage. sendMessage :: forall r q o. (SetMember Process (Process q) r, HasCallStack, Member Interrupts r, Typeable o) => SchedulerProxy q -> ProcessId -> o -> Eff r () -- | Send a Dynamic value to a process addressed by the -- ProcessId. See SendMessage. sendAnyMessage :: forall r q. (SetMember Process (Process q) r, HasCallStack, Member Interrupts r) => SchedulerProxy q -> ProcessId -> Dynamic -> Eff r () -- | Exit a process addressed by the ProcessId. The process will -- exit, it might do some cleanup, but is ultimately unrecoverable. See -- SendShutdown. sendShutdown :: forall r q. (SetMember Process (Process q) r, HasCallStack, Member Interrupts r) => SchedulerProxy q -> ProcessId -> ExitReason 'NoRecovery -> Eff r () -- | Interrupts a process addressed by the ProcessId. The process -- might exit, or it may continue. | Like sendInterrupt, but also -- return True iff the process to exit exists. sendInterrupt :: forall r q. (SetMember Process (Process q) r, HasCallStack, Member Interrupts r) => SchedulerProxy q -> ProcessId -> InterruptReason -> Eff r () -- | Start a new process, the new process will execute an effect, the -- function will return immediately with a ProcessId. If the new -- process is interrupted, the process will Shutdown with the -- InterruptReason wrapped in NotCovered. For specific -- use cases it might be better to use spawnRaw. spawn :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => Eff (InterruptableProcess q) () -> Eff r ProcessId -- | Like spawn but return (). spawn_ :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => Eff (InterruptableProcess q) () -> Eff r () -- | Start a new process, and immediately link to it. spawnLink :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => Eff (InterruptableProcess q) () -> Eff r ProcessId -- | Start a new process, the new process will execute an effect, the -- function will return immediately with a ProcessId. The spawned -- process has only the raw ConsProcess effects. For -- non-library code spawn might be better suited. spawnRaw :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => Eff (ConsProcess q) () -> Eff r ProcessId -- | Like spawnRaw but return (). spawnRaw_ :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => Eff (ConsProcess q) () -> Eff r () -- | Return True if the process is alive. isProcessAlive :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> ProcessId -> Eff r Bool -- | Block until a message was received. See ReceiveSelectedMessage -- for more documentation. receiveAnyMessage :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Eff r Dynamic -- | Block until a message was received, that is not Nothing after -- applying a callback to it. See ReceiveSelectedMessage for more -- documentation. receiveSelectedMessage :: forall r q a. (HasCallStack, Show a, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> MessageSelector a -> Eff r a -- | Receive and cast the message to some Typeable instance. See -- ReceiveSelectedMessage for more documentation. This will wait -- for a message of the return type using receiveSelectedMessage receiveMessage :: forall a r q. (HasCallStack, Typeable a, Show a, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Eff r a -- | Remove and return all messages currently enqueued in the process -- message queue. flushMessages :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r, HasScheduler q) => Eff r [Dynamic] -- | Enter a loop to receive messages and pass them to a callback, until -- the function returns Just a result. Only the messages of the -- given type will be received. If the process is interrupted by an -- exception of by a SendShutdown from another process, with an -- exit reason that satisfies isRecoverable, then the callback -- will be invoked with Left ProcessExitReaon, -- otherwise the process will be exited with the same reason using -- exitBecause. See also ReceiveSelectedMessage for more -- documentation. receiveSelectedLoop :: forall r q a endOfLoopResult. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> MessageSelector a -> (Either InterruptReason a -> Eff r (Maybe endOfLoopResult)) -> Eff r endOfLoopResult -- | Like receiveSelectedLoop but not selective. See also -- selectAnyMessageLazy, receiveSelectedLoop. receiveAnyLoop :: forall r q endOfLoopResult. (SetMember Process (Process q) r, HasCallStack) => SchedulerProxy q -> (Either InterruptReason Dynamic -> Eff r (Maybe endOfLoopResult)) -> Eff r endOfLoopResult -- | Like receiveSelectedLoop but refined to casting to a specific -- Typeable using selectMessageLazy. receiveLoop :: forall r q a endOfLoopResult. (SetMember Process (Process q) r, HasCallStack, Typeable a) => SchedulerProxy q -> (Either InterruptReason a -> Eff r (Maybe endOfLoopResult)) -> Eff r endOfLoopResult -- | Returns the ProcessId of the current process. self :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ProcessId -- | Generate a unique Int for the current process. makeReference :: (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Eff r Int -- | Monitor another process. When the monitored process exits a -- ProcessDown is sent to the calling process. The return value is -- a unique identifier for that monitor. There can be multiple monitors -- on the same process, and a message for each will be sent. If the -- process is already dead, the ProcessDown message will be sent -- immediately, w.thout exit reason monitor :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> ProcessId -> Eff r MonitorReference -- | Remove a monitor created with monitor. demonitor :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> MonitorReference -> Eff r () -- | monitor another process before while performing an action and -- demonitor afterwards. withMonitor :: (HasCallStack, Member Interrupts r, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> ProcessId -> (MonitorReference -> Eff r a) -> Eff r a -- | A MessageSelector for receiving either a monitor of the given -- process or another message. receiveWithMonitor :: (HasCallStack, Member Interrupts r, SetMember Process (Process q) r, Member Interrupts r, Typeable a, Show a) => SchedulerProxy q -> ProcessId -> MessageSelector a -> Eff r (Either ProcessDown a) -- | Trigger an Interrupt for a ProcessDown message. The -- reason will be ProcessNotRunning becauseProcessIsDown :: ProcessDown -> InterruptReason -- | A MesssageSelector for the ProcessDown message of a -- specific process. selectProcessDown :: MonitorReference -> MessageSelector ProcessDown -- | Connect the calling process to another process, such that if one of -- the processes crashes (i.e. isCrash returns True), the -- other is shutdown with the ProcessExitReaon -- LinkedProcessCrashed. linkProcess :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> ProcessId -> Eff r () -- | Unlink the calling proccess from the other process. unlinkProcess :: forall r q. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> ProcessId -> Eff r () -- | Exit the process with a ProcessExitReaon. exitBecause :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> ExitReason 'NoRecovery -> Eff r a -- | Exit the process. exitNormally :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -- | Exit the process with an error. exitWithError :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r a fromProcessId :: Iso' ProcessId Int -- | A value to be sent when timer started with startTimer has -- elapsed. data TimerElapsed -- | The reference to a timer started by startTimer, required to -- stop a timer via cancelTimer. data TimerReference -- | A number of micro seconds. data Timeout -- | Wait for a message of the given type for the given time. When no -- message arrives in time, return Nothing. This is based on -- receiveSelectedAfter. receiveAfter :: forall a r q. (Lifted IO q, HasCallStack, SetMember Process (Process q) r, Member Interrupts r, Typeable a, NFData a, Show a) => SchedulerProxy q -> Timeout -> Eff r (Maybe a) -- | Wait for a message of the given type for the given time. When no -- message arrives in time, return Left TimerElapsed. This -- is based on selectTimerElapsed and startTimer. receiveSelectedAfter :: forall a r q. (Lifted IO q, HasCallStack, SetMember Process (Process q) r, Member Interrupts r, Show a) => SchedulerProxy q -> MessageSelector a -> Timeout -> Eff r (Either TimerElapsed a) -- | A MessageSelector matching TimerElapsed messages created -- by startTimer. selectTimerElapsed :: TimerReference -> MessageSelector TimerElapsed -- | Send a message to a given process after waiting. The message is -- created by applying the function parameter to the -- TimerReference, such that the message can directly refer to the -- timer. sendAfter :: forall r q message. (Lifted IO q, HasCallStack, SetMember Process (Process q) r, Member Interrupts r, Typeable message, NFData message) => SchedulerProxy q -> ProcessId -> Timeout -> (TimerReference -> message) -> Eff r TimerReference -- | Start a new timer, after the time has elapsed, TimerElapsed is -- sent to calling process. The message also contains the -- TimerReference returned by this function. Use -- cancelTimer to cancel the timer. Use selectTimerElapsed -- to receive the message using receiveSelectedMessage. To receive -- messages with guarded with a timeout see receiveAfter. startTimer :: forall r q. (Lifted IO q, HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Timeout -> Eff r TimerReference -- | This is a tag-type that wraps around a ProcessId and holds an -- Api index type. newtype Server api Server :: ProcessId -> Server api [_fromServer] :: Server api -> ProcessId -- | The (promoted) constructors of this type specify (at the type level) -- the reply behavior of a specific constructor of an Api -- instance. data Synchronicity -- | Specify that handling a request is a blocking operation with a -- specific return type, e.g. ('Synchronous (Either RentalError -- RentalId)) Synchronous :: Type -> Synchronicity -- | Non-blocking, asynchronous, request handling Asynchronous :: Synchronicity -- | This data family defines an API, a communication interface description -- between at least two processes. The processes act as servers or -- client(s) regarding a specific instance of this type. -- -- The first parameter is usually a user defined phantom type that -- identifies the Api instance. -- -- The second parameter specifies if a specific constructor of an -- (GADT-like) Api instance is Synchronous, i.e. returns -- a result and blocks the caller or if it is Asynchronous -- -- Example: -- --
-- data BookShop deriving Typeable
--
-- data instance Api BookShop r where
-- RentBook :: BookId -> Api BookShop ('Synchronous (Either RentalError RentalId))
-- BringBack :: RentalId -> Api BookShop 'Asynchronous
--
-- type BookId = Int
-- type RentalId = Int
-- type RentalError = String
--
data family Api (api :: Type) (reply :: Synchronicity)
fromServer :: forall api_a17VQ api_a1868. Iso (Server api_a17VQ) (Server api_a1868) ProcessId ProcessId
-- | Tag a ProcessId with an Api type index to mark it a
-- Server process handling that API
proxyAsServer :: proxy api -> ProcessId -> Server api
-- | Tag a ProcessId with an Api type index to mark it a
-- Server process handling that API
asServer :: forall api. ProcessId -> Server api
-- | The reader effect for ProcessIds for Apis, see
-- registerServer
type ServerReader o = Reader (Server o)
-- | Instead of passing around a Server value and passing to
-- functions like cast or call, a Server can
-- provided by a Reader effect, if there is only a single
-- server for a given Api instance. This type alias is
-- convenience to express that an effect has Process and a reader
-- for a Server.
type ServesApi o r q = (Typeable o, SetMember Process (Process q) r, Member (ServerReader o) r)
-- | Send an Api request that has no return value and return as fast
-- as possible. The type signature enforces that the corresponding
-- Api clause is Asynchronous. The operation never fails,
-- if it is important to know if the message was delivered, use
-- call instead.
cast :: forall r q o. (HasCallStack, SetMember Process (Process q) r, Member Interrupts r, Typeable o, Typeable (Api o 'Asynchronous)) => SchedulerProxy q -> Server o -> Api o 'Asynchronous -> Eff r ()
-- | Send an Api request and wait for the server to return a result
-- value.
--
-- The type signature enforces that the corresponding Api clause
-- is Synchronous.
call :: forall result api r q. (SetMember Process (Process q) r, Member Interrupts r, Typeable api, Typeable (Api api ( 'Synchronous result)), Typeable result, HasCallStack, NFData result, Show result) => SchedulerProxy q -> Server api -> Api api ( 'Synchronous result) -> Eff r result
-- | Run a reader effect that contains the one server handling a
-- specific Api instance.
registerServer :: HasCallStack => Server o -> Eff (ServerReader o : r) a -> Eff r a
-- | Get the Server registered with registerServer.
whereIsServer :: Member (ServerReader o) e => Eff e (Server o)
-- | Like call but take the Server from the reader provided
-- by registerServer.
callRegistered :: (Typeable reply, ServesApi o r q, HasCallStack, NFData reply, Show reply, Member Interrupts r) => SchedulerProxy q -> Api o ( 'Synchronous reply) -> Eff r reply
-- | Like cast but take the Server from the reader provided
-- by registerServer.
castRegistered :: (Typeable o, ServesApi o r q, HasCallStack, Member Interrupts r) => SchedulerProxy q -> Api o 'Asynchronous -> Eff r ()
-- | A command to the server loop started e.g. by server or
-- spawnServerWithEffects. Typically returned by an
-- ApiHandler member to indicate if the server should continue or
-- stop.
data ApiServerCmd
-- | Tell the server to keep the server loop running
[HandleNextRequest] :: ApiServerCmd
-- | Tell the server to exit, this will make serve stop handling
-- requests without exitting the process. _terminateCallback will
-- be invoked with the given optional reason.
[StopApiServer] :: ExitReason 'Recoverable -> ApiServerCmd
-- | A record of callbacks, handling requests sent to a server
-- Process, all belonging to a specific Api family
-- instance. The values of this type can be serveed or combined
-- via Servable or ServerCallbacks.
data ApiHandler api eff
[ApiHandler] :: {_castCallback :: Maybe (Api api 'Asynchronous -> Eff eff ApiServerCmd) " A cast will not return a result directly. This is used for async methods. This returns an 'ApiServerCmd' to the server loop.", _callCallback :: forall reply. Maybe (Api api ( 'Synchronous reply) -> (reply -> Eff eff ()) -> Eff eff ApiServerCmd) " A call is a blocking operation, the caller is blocked until this handler calls the reply continuation. This returns an 'ApiServerCmd' to the server loop.", _terminateCallback :: Maybe (ExitReason 'Recoverable -> Eff eff ()) " This callback is called with @Nothing@ if one of these things happen: * the process exits * '_callCallback' or '_castCallback' return 'StopApiServer' If the process exist peacefully the parameter is 'NotServerCallbacking', otherwise @Just "error message..."@ if the process exits with an error. The default behavior is defined in 'defaultTermination'."} -> ApiHandler api eff
-- | Create an ApiHandler with a _castCallback, a
-- _callCallback and a _terminateCallback implementation.
apiHandler :: (Api api 'Asynchronous -> Eff e ApiServerCmd) -> (forall r. Api api ( 'Synchronous r) -> (r -> Eff e ()) -> Eff e ApiServerCmd) -> (ExitReason 'Recoverable -> Eff e ()) -> ApiHandler api e
-- | Like apiHandler but the server will loop until an error is
-- raised or the process exits. The callback actions won't decide wether
-- to stop the server or not, instead the ApiServerCmd
-- HandleNextRequest is used.
apiHandlerForever :: (Api api 'Asynchronous -> Eff e ()) -> (forall r. Api api ( 'Synchronous r) -> (r -> Eff e ()) -> Eff e ()) -> (ExitReason 'Recoverable -> Eff e ()) -> ApiHandler api e
-- | Create an ApiHandler with only a _castCallback
-- implementation.
castHandler :: (Api api 'Asynchronous -> Eff eff ApiServerCmd) -> ApiHandler api eff
-- | Like castHandler but the server will loop until an error is
-- raised or the process exits. See apiHandlerForver.
castHandlerForever :: (Api api 'Asynchronous -> Eff eff ()) -> ApiHandler api eff
-- | Create an ApiHandler with only a _callCallback
-- implementation.
callHandler :: (forall r. Api api ( 'Synchronous r) -> (r -> Eff e ()) -> Eff e ApiServerCmd) -> ApiHandler api e
-- | Like callHandler but the server will loop until an error is
-- raised or the process exits. See apiHandlerForver.
callHandlerForever :: (forall r. Api api ( 'Synchronous r) -> (r -> Eff e ()) -> Eff e ()) -> ApiHandler api e
-- | Create an ApiHandler with only a _castCallback and
-- _callCallback implementation.
castAndCallHandler :: (Api api 'Asynchronous -> Eff e ApiServerCmd) -> (forall r. Api api ( 'Synchronous r) -> (r -> Eff e ()) -> Eff e ApiServerCmd) -> ApiHandler api e
-- | Like castAndCallHandler but the server will loop until an error
-- is raised or the process exits. See apiHandlerForver.
castAndCallHandlerForever :: (Api api 'Asynchronous -> Eff e ()) -> (forall r. Api api ( 'Synchronous r) -> (r -> Eff e ()) -> Eff e ()) -> ApiHandler api e
-- | Building block for composition of ApiHandler. A wrapper for
-- ApiHandler. Use this to combine ApiHandler, allowing a
-- process to implement several Api instances. The termination
-- will be evenly propagated. Create this via e.g. Servable
-- instances To serve multiple apis use <> to combine server
-- callbacks, e.g.
--
-- @@ let f = apiHandlerServerCallback px $ ApiHandler ... g =
-- apiHandlerServerCallback px $ ApiHandler ... h = f <> g in serve
-- px h @@
data ServerCallback eff
ServerCallback :: MessageSelector (Eff eff ApiServerCmd) -> (ExitReason 'Recoverable -> Eff eff ()) -> ServerCallback eff
[_requestHandlerSelector] :: ServerCallback eff -> MessageSelector (Eff eff ApiServerCmd)
[_terminationHandler] :: ServerCallback eff -> ExitReason 'Recoverable -> Eff eff ()
callCallback :: forall api_a18Ad eff_a18Ae reply_a18Ah. Getter (ApiHandler api_a18Ad eff_a18Ae) (Maybe (Api api_a18Ad ( 'Synchronous reply_a18Ah) -> (reply_a18Ah -> Eff eff_a18Ae ()) -> Eff eff_a18Ae ApiServerCmd))
castCallback :: forall api_a18Ad eff_a18Ae. Lens' (ApiHandler api_a18Ad eff_a18Ae) (Maybe (Api api_a18Ad 'Asynchronous -> Eff eff_a18Ae ApiServerCmd))
terminateCallback :: forall api_a18Ad eff_a18Ae. Lens' (ApiHandler api_a18Ad eff_a18Ae) (Maybe (ExitReason 'Recoverable -> Eff eff_a18Ae ()))
requestHandlerSelector :: forall eff_a18Mt. Lens' (ServerCallback eff_a18Mt) (MessageSelector (Eff eff_a18Mt ApiServerCmd))
terminationHandler :: forall eff_a18Mt. Lens' (ServerCallback eff_a18Mt) (ExitReason 'Recoverable -> Eff eff_a18Mt ())
-- | Receive and process incoming requests until the process exits.
serve :: forall a effScheduler. (Servable a, SetMember Process (Process effScheduler) (ServerEff a), Member Interrupts (ServerEff a), HasCallStack) => SchedulerProxy effScheduler -> a -> Eff (ServerEff a) ()
-- | Spawn a new process, that will receive and process incoming requests
-- until the process exits.
spawnServer :: forall a effScheduler eff. (Servable a, ServerEff a ~ InterruptableProcess effScheduler, SetMember Process (Process effScheduler) eff, Member Interrupts eff, HasCallStack) => SchedulerProxy effScheduler -> a -> Eff eff (ServerPids a)
-- | Spawn a new process, that will receive and process incoming requests
-- until the process exits. Also handle all internal effects.
spawnServerWithEffects :: forall a effScheduler eff. (Servable a, SetMember Process (Process effScheduler) (ServerEff a), SetMember Process (Process effScheduler) eff, Member Interrupts eff, Member Interrupts (ServerEff a), HasCallStack) => SchedulerProxy effScheduler -> a -> (Eff (ServerEff a) () -> Eff (InterruptableProcess effScheduler) ()) -> Eff eff (ServerPids a)
-- | A default handler to use in _callCallback in ApiHandler.
-- It will call raiseError with a nice error message.
unhandledCallError :: forall p x r q. (Typeable p, HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Api p ( 'Synchronous x) -> (x -> Eff r ()) -> Eff r ApiServerCmd
-- | A default handler to use in _castCallback in ApiHandler.
-- It will call raiseError with a nice error message.
unhandledCastError :: forall p r q. (Typeable p, HasCallStack, SetMember Process (Process q) r, Member Interrupts r) => SchedulerProxy q -> Api p 'Asynchronous -> Eff r ApiServerCmd
-- | Either do nothing, if the error message is Nothing, or call
-- exitWithError with the error message.
defaultTermination :: forall q r. (HasCallStack, SetMember Process (Process q) r, Member (Logs LogMessage) r) => SchedulerProxy q -> ExitReason 'Recoverable -> Eff r ()
-- | An Api type for generic Observers, see
-- handleObservations.
data Observing o
-- | Just a wrapper around a function that will be applied to the result of
-- a MessageCallbacks StopServer clause, or an
-- InterruptReason caught during the execution of receive
-- or a MessageCallback
data InterruptCallback eff
[InterruptCallback] :: (InterruptReason -> Eff eff CallbackResult) -> InterruptCallback eff
-- | Helper type class for the return values of spawnApiServer et
-- al.
class ToServerPids (t :: k) where {
type family ServerPids t;
}
toServerPids :: ToServerPids t => proxy t -> ProcessId -> ServerPids t
-- | An existential wrapper around a MessageSelector and a function
-- that handles the selected message. The api type parameter is
-- a phantom type.
--
-- The return value if the handler function is a CallbackResult.
data MessageCallback api eff
[MessageCallback] :: MessageSelector a -> (a -> Eff eff CallbackResult) -> MessageCallback api eff
-- | A command to the server loop started e.g. by server or
-- spawnServerWithEffects. Typically returned by an
-- ApiHandler member to indicate if the server should continue
-- or stop.
data CallbackResult
-- | Tell the server to keep the server loop running
[AwaitNext] :: CallbackResult
-- | Tell the server to exit, this will make serve stop handling
-- requests without exitting the process. _terminateCallback
-- will be invoked with the given optional reason.
[StopServer] :: InterruptReason -> CallbackResult
-- | Serve an Api in a newly spawned process.
spawnApiServer :: forall api eff. (ToServerPids api, HasCallStack) => MessageCallback api (InterruptableProcess eff) -> InterruptCallback (ConsProcess eff) -> Eff (InterruptableProcess eff) (ServerPids api)
-- | Server an Api in a newly spawned process; the callbacks
-- have access to some state initialed by the function in the first
-- parameter.
spawnApiServerStateful :: forall api eff state. (HasCallStack, ToServerPids api) => Eff (InterruptableProcess eff) state -> MessageCallback api (State state : InterruptableProcess eff) -> InterruptCallback (State state : ConsProcess eff) -> Eff (InterruptableProcess eff) (ServerPids api)
-- | Server an Api in a newly spawned process; The caller
-- provides an effect handler for arbitrary effects used by the server
-- callbacks.
spawnApiServerEffectful :: forall api eff serverEff. (HasCallStack, ToServerPids api, Member Interrupts serverEff, SetMember Process (Process eff) serverEff) => (forall b. Eff serverEff b -> Eff (InterruptableProcess eff) b) -> MessageCallback api serverEff -> InterruptCallback serverEff -> Eff (InterruptableProcess eff) (ServerPids api)
-- | A smart constructor for MessageCallbacks
handleMessages :: forall eff a. (HasCallStack, NFData a, Typeable a) => (a -> Eff eff CallbackResult) -> MessageCallback '[] eff
-- | A smart constructor for MessageCallbacks
handleSelectedMessages :: forall eff a. HasCallStack => MessageSelector a -> (a -> Eff eff CallbackResult) -> MessageCallback '[] eff
-- | A smart constructor for MessageCallbacks
handleAnyMessages :: forall eff. HasCallStack => (Dynamic -> Eff eff CallbackResult) -> MessageCallback '[] eff
-- | A smart constructor for MessageCallbacks
handleCasts :: forall api eff. (HasCallStack, Typeable api, Typeable (Api api 'Asynchronous)) => (Api api 'Asynchronous -> Eff eff CallbackResult) -> MessageCallback api eff
-- | A smart constructor for MessageCallbacks
--
-- -- handleCalls SP -- (\ (RentBook bookId customerId) runCall -> -- runCall $ do -- rentalIdE <- rentBook bookId customerId -- case rentalIdE of -- -- on fail we just don't send a reply, let the caller run into -- -- timeout -- Left err -> return (Nothing, AwaitNext) -- Right rentalId -> return (Just rentalId, AwaitNext)) --handleCalls :: forall api eff effScheduler. (HasCallStack, Typeable api, SetMember Process (Process effScheduler) eff, Member Interrupts eff) => SchedulerProxy effScheduler -> (forall secret reply. (Typeable reply, Typeable (Api api ( 'Synchronous reply))) => Api api ( 'Synchronous reply) -> (Eff eff (Maybe reply, CallbackResult) -> secret) -> secret) -> MessageCallback api eff -- | A smart constructor for MessageCallbacks handleCastsAndCalls :: forall api eff effScheduler. (HasCallStack, Typeable api, Typeable (Api api 'Asynchronous), SetMember Process (Process effScheduler) eff, Member Interrupts eff) => SchedulerProxy effScheduler -> (Api api 'Asynchronous -> Eff eff CallbackResult) -> (forall secret reply. (Typeable reply, Typeable (Api api ( 'Synchronous reply))) => Api api ( 'Synchronous reply) -> (Eff eff (Maybe reply, CallbackResult) -> secret) -> secret) -> MessageCallback api eff -- | A smart constructor for MessageCallbacks handleProcessDowns :: forall eff. HasCallStack => (MonitorReference -> Eff eff CallbackResult) -> MessageCallback '[] eff -- | Compose two Apis to a type-leve pair of them. -- --
-- handleCalls api1calls ^: handleCalls api2calls ^: --(^:) :: forall (api1 :: Type) (apis2 :: [Type]) eff. HasCallStack => MessageCallback api1 eff -> MessageCallback apis2 eff -> MessageCallback (api1 : apis2) eff infixr 5 ^: -- | Make a fallback handler, i.e. a handler to which no other can be -- composed to from the right. fallbackHandler :: forall api eff. HasCallStack => MessageCallback api eff -> MessageCallback '[] eff -- | A fallbackHandler that drops the left-over messages. dropUnhandledMessages :: forall eff. HasCallStack => MessageCallback '[] eff -- | A fallbackHandler that terminates if there are unhandled -- messages. exitOnUnhandled :: forall eff. HasCallStack => MessageCallback '[] eff -- | A fallbackHandler that drops the left-over messages. logUnhandledMessages :: forall eff. (Member (Logs LogMessage) eff, HasCallStack) => MessageCallback '[] eff -- | A smart constructor for InterruptCallbacks stopServerOnInterrupt :: forall eff. HasCallStack => InterruptCallback eff -- | Apply a given callback function to incoming Observerations. handleObservations :: Typeable o => (Server o -> Observation o -> Eff e CallbackResult) -> MessageCallback (Observing o) e -- | An Observer that schedules the observations to an effectful -- callback. data CallbackObserver o -- | Alias for the effect that contains the observers managed by -- manageObservers type ObserverState o = State (Observers o) -- | Internal state for manageObservers data Observers o -- | An existential wrapper around a Server of an Observer. -- Needed to support different types of observers to observe the same -- Observable in a general fashion. data SomeObserver o [SomeObserver] :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o -- | An Api index that supports registration and de-registration of -- Observers. class (Typeable o, Typeable (Observation o)) => Observable o where { -- | Type of observations visible on this observable data family Observation o; } -- | Return the Api value for the cast_ that registeres an -- observer registerObserverMessage :: Observable o => SomeObserver o -> Api o 'Asynchronous -- | Return the Api value for the cast_ that de-registeres -- an observer forgetObserverMessage :: Observable o => SomeObserver o -> Api o 'Asynchronous -- | An Api index that support observation of the another Api -- that is Observable. class (Typeable p, Observable o) => Observer p o -- | Wrap the Observation and the ProcessId (i.e. the -- Server) that caused the observation into an Api value -- that the Observable understands. observationMessage :: Observer p o => Server o -> Observation o -> Api p 'Asynchronous -- | Send an Observation to an Observer notifyObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack, Member Interrupts r) => SchedulerProxy q -> Server p -> Server o -> Observation o -> Eff r () -- | Send the registerObserverMessage registerObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, HasCallStack, Member Interrupts r) => SchedulerProxy q -> Server p -> Server o -> Eff r () -- | Send the forgetObserverMessage forgetObserver :: (SetMember Process (Process q) r, Observable o, Observer p o, Member Interrupts r) => SchedulerProxy q -> Server p -> Server o -> Eff r () -- | Send an Observation to SomeObserver. notifySomeObserver :: (SetMember Process (Process q) r, Observable o, HasCallStack, Member Interrupts r) => SchedulerProxy q -> Server o -> Observation o -> SomeObserver o -> Eff r () -- | Keep track of registered Observers Observers can be added and -- removed, and an Observation can be sent to all registerd -- observers at once. manageObservers :: Eff (ObserverState o : r) a -> Eff r a -- | Add an Observer to the Observers managed by -- manageObservers. addObserver :: (SetMember Process (Process q) r, Member (ObserverState o) r, Observable o) => SomeObserver o -> Eff r () -- | Delete an Observer from the Observers managed by -- manageObservers. removeObserver :: (SetMember Process (Process q) r, Member (ObserverState o) r, Observable o, Member Interrupts r) => SomeObserver o -> Eff r () -- | Send an Observation to all SomeObservers in the -- Observers state. notifyObservers :: forall o r q. (Observable o, SetMember Process (Process q) r, Member (ObserverState o) r, Member Interrupts r) => SchedulerProxy q -> Observation o -> Eff r () -- | Start a new process for an Observer that schedules all -- observations to an effectful callback. spawnCallbackObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q, Member Interrupts r, HasCallStack) => SchedulerProxy q -> (Server o -> Observation o -> Eff (InterruptableProcess q) ApiServerCmd) -> Eff r (Server (CallbackObserver o)) -- | Start a new process for an Observer that schedules all -- observations to an effectful callback. spawnLoggingObserver :: forall o r q. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, Member (Logs LogMessage) q, Member (Logs LogMessage) r, Member Interrupts r, HasCallStack) => SchedulerProxy q -> Eff r (Server (CallbackObserver o)) -- | A Reader for an ObservationQueue. type ObservationQueueReader a = Reader (ObservationQueue a) -- | Contains a TBQueue capturing observations received by -- enqueueObservationsRegistered or enqueueObservations. data ObservationQueue a -- | Read queued observations captured by observing a Server that -- implements an Observable Api using -- enqueueObservationsRegistered or enqueueObservations. -- This blocks until the next Observation received. For a -- non-blocking variant use tryReadObservationQueue or -- flushObservationQueue. readObservationQueue :: forall o r. (Member (ObservationQueueReader o) r, HasCallStack, MonadIO (Eff r), Typeable o, HasLogging IO r) => Eff r (Observation o) -- | Read queued observations captured by observing a Server that -- implements an Observable Api using -- enqueueObservationsRegistered or enqueueObservations. -- Return the next Observation immediately or Nothing if -- the queue is empty. Use readObservationQueue to block until an -- observation is observed. tryReadObservationQueue :: forall o r. (Member (ObservationQueueReader o) r, HasCallStack, MonadIO (Eff r), Typeable o, HasLogging IO r) => Eff r (Maybe (Observation o)) -- | Read all currently queued Observations captured by -- enqueueObservations. This returns immediately all currently -- enqueued Observations. For a blocking variant use -- readObservationQueue. flushObservationQueue :: forall o r. (Member (ObservationQueueReader o) r, HasCallStack, MonadIO (Eff r), Typeable o, HasLogging IO r) => Eff r [Observation o] -- | Observe a(the) registered Server that implements an -- Observable Api. Based on enqueueObservations. enqueueObservationsRegistered :: forall o r q a. (ServesApi o r q, SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, HasLogging IO q, HasLogging IO r, Member Interrupts r, Lifted IO r, HasCallStack) => SchedulerProxy q -> Int -> Eff (ObservationQueueReader o : r) a -> Eff r a -- | Observe a Server that implements an Observable -- Api, the Observations can be obtained by -- readObservationQueue. All observations are captured up to the -- queue size limit, such that the first message received will be first -- message returned by readObservationQueue. -- -- This function captures runtime exceptions and cleans up accordingly. enqueueObservations :: forall o r q a. (SetMember Process (Process q) r, Typeable o, Show (Observation o), Observable o, HasLogging IO r, HasLogging IO q, Member Interrupts r, Lifted IO q, HasCallStack) => SchedulerProxy q -> Server o -> Int -> Eff (ObservationQueueReader o : r) a -> Eff r a -- | The concrete list of Effects for this scheduler implementation. type SchedulerIO = (Reader SchedulerState : LoggingAndIO) -- | Type class constraint to indicate that an effect union contains the -- effects required by every process and the scheduler implementation -- itself. type HasSchedulerIO r = (HasCallStack, Lifted IO r, SchedulerIO <:: r) -- | The concrete list of the effects, that the Process uses type InterruptableProcEff = InterruptableProcess SchedulerIO -- | The concrete list of Effects of processes compatible with this -- scheduler. This builds upon SchedulerIO. type ProcEff = ConsProcess SchedulerIO -- | Start the message passing concurrency system then execute a -- Process on top of SchedulerIO effect. All logging is -- sent to standard output. defaultMain :: HasCallStack => Eff InterruptableProcEff () -> IO () -- | Start the message passing concurrency system then execute a -- Process on top of SchedulerIO effect. All logging is -- sent to standard output. defaultMainWithLogChannel :: HasCallStack => Eff InterruptableProcEff () -> LogChannel LogMessage -> IO () -- | A SchedulerProxy for SchedulerIO forkIoScheduler :: SchedulerProxy SchedulerIO -- | This is the main entry point to running a message passing concurrency -- application. This function takes a Process on top of the -- SchedulerIO effect and a LogChannel for concurrent -- logging. schedule :: (HasLogging IO SchedulerIO, HasCallStack) => Eff InterruptableProcEff () -> Eff LoggingAndIO () -- | Like schedule but pure. The yield effect is -- just return (). schedulePure == runIdentity . -- scheduleM (Identity . run) (return ()) schedulePure :: Eff (InterruptableProcess '[Logs LogMessage]) a -> Either (ExitReason 'NoRecovery) a -- | A SchedulerProxy for LoggingAndIo. singleThreadedIoScheduler :: SchedulerProxy LoggingAndIo -- | Execute a Process using schedule on top of Lift -- IO and Logs String effects. defaultMainSingleThreaded :: HasCallStack => Eff (InterruptableProcess '[Logs LogMessage, LogWriterReader LogMessage IO, Lift IO]) () -> IO ()