{-# LANGUAGE CPP                        #-}
{-# LANGUAGE DeriveDataTypeable         #-}
{-# LANGUAGE ExistentialQuantification  #-}
{-# LANGUAGE DeriveGeneric              #-}
{-# LANGUAGE StandaloneDeriving         #-}
{-# LANGUAGE ScopedTypeVariables        #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LiberalTypeSynonyms        #-}
{-# LANGUAGE Rank2Types                 #-}
{-# LANGUAGE UndecidableInstances       #-}
{-# LANGUAGE FlexibleInstances          #-}
{-# LANGUAGE TypeFamilies               #-}
{-# LANGUAGE MultiParamTypeClasses      #-}
{-# LANGUAGE FunctionalDependencies     #-}

-- | Types used throughout the ManagedProcess framework
module Control.Distributed.Process.ManagedProcess.Internal.Types
  ( -- * Exported data types
    InitResult(..)
  , GenProcess()
  , runProcess
  , lift
  , liftIO
  , ProcessState(..)
  , State
  , Queue
  , Limit
  , Condition(..)
  , ProcessAction(..)
  , ProcessReply(..)
  , Action
  , Reply
  , ActionHandler
  , CallHandler
  , CastHandler
  , StatelessHandler
  , DeferredCallHandler
  , StatelessCallHandler
  , InfoHandler
  , ChannelHandler
  , StatelessChannelHandler
  , InitHandler
  , ShutdownHandler
  , ExitState(..)
  , isCleanShutdown
  , exitState
  , TimeoutHandler
  , UnhandledMessagePolicy(..)
  , ProcessDefinition(..)
  , Priority(..)
  , DispatchPriority(..)
  , DispatchFilter(..)
  , Filter(..)
--   , Check(..)
  , PrioritisedProcessDefinition(..)
  , RecvTimeoutPolicy(..)
  , ControlChannel(..)
  , newControlChan
  , ControlPort(..)
  , channelControlPort
  , Dispatcher(..)
  , ExternDispatcher(..)
  , DeferredDispatcher(..)
  , ExitSignalDispatcher(..)
  , MessageMatcher(..)
  , ExternMatcher(..)
  , Message(..)
  , CallResponse(..)
  , CallId
  , CallRef(..)
  , CallRejected(..)
  , makeRef
  , caller
  , rejectToCaller
  , recipient
  , tag
  , initCall
  , unsafeInitCall
  , waitResponse
  ) where

import Control.Concurrent.STM (STM)
import Control.Distributed.Process hiding (Message, mask, finally, liftIO)
import qualified Control.Distributed.Process as P (Message, liftIO)
import Control.Distributed.Process.Serializable
import Control.Distributed.Process.Extras
  ( Recipient(..)
  , ExitReason(..)
  , Addressable
  , Resolvable(..)
  , Routable(..)
  , NFSerializable
  )
import Control.Distributed.Process.ManagedProcess.Internal.PriorityQueue
  ( PriorityQ
  )
import Control.Distributed.Process.Extras.Internal.Types
  ( resolveOrDie
  )
import Control.Distributed.Process.Extras.Time
import Control.Distributed.Process.ManagedProcess.Timer (Timer, TimerKey)
import Control.DeepSeq (NFData(..))
import Control.Monad.Fix (MonadFix)
import Control.Monad.Catch
  ( catch
  , throwM
  , uninterruptibleMask
  , mask
  , finally
  , MonadThrow
  , MonadCatch
  , MonadMask(..)
  )
import qualified Control.Monad.Catch as Catch
  ( catch
  , throwM
  )
import Control.Monad.IO.Class (MonadIO)
import qualified Control.Monad.State.Strict as ST
  ( MonadState
  , StateT
  , get
  , lift
  , runStateT
  )
import Data.Binary hiding (decode)
import Data.Map.Strict (Map)
import Data.Typeable (Typeable)
import Data.IORef (IORef)
import Prelude hiding (init)
import GHC.Generics

--------------------------------------------------------------------------------
-- API                                                                        --
--------------------------------------------------------------------------------

-- | wrapper for a @MonitorRef@
type CallId = MonitorRef

-- | Wraps a consumer of the call API
newtype CallRef a = CallRef { forall a. CallRef a -> (Recipient, CallId)
unCaller :: (Recipient, CallId) }
  deriving (CallRef a -> CallRef a -> Bool
(CallRef a -> CallRef a -> Bool)
-> (CallRef a -> CallRef a -> Bool) -> Eq (CallRef a)
forall a. CallRef a -> CallRef a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall a. CallRef a -> CallRef a -> Bool
== :: CallRef a -> CallRef a -> Bool
$c/= :: forall a. CallRef a -> CallRef a -> Bool
/= :: CallRef a -> CallRef a -> Bool
Eq, Int -> CallRef a -> ShowS
[CallRef a] -> ShowS
CallRef a -> String
(Int -> CallRef a -> ShowS)
-> (CallRef a -> String)
-> ([CallRef a] -> ShowS)
-> Show (CallRef a)
forall a. Int -> CallRef a -> ShowS
forall a. [CallRef a] -> ShowS
forall a. CallRef a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall a. Int -> CallRef a -> ShowS
showsPrec :: Int -> CallRef a -> ShowS
$cshow :: forall a. CallRef a -> String
show :: CallRef a -> String
$cshowList :: forall a. [CallRef a] -> ShowS
showList :: [CallRef a] -> ShowS
Show, Typeable, (forall x. CallRef a -> Rep (CallRef a) x)
-> (forall x. Rep (CallRef a) x -> CallRef a)
-> Generic (CallRef a)
forall x. Rep (CallRef a) x -> CallRef a
forall x. CallRef a -> Rep (CallRef a) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall a x. Rep (CallRef a) x -> CallRef a
forall a x. CallRef a -> Rep (CallRef a) x
$cfrom :: forall a x. CallRef a -> Rep (CallRef a) x
from :: forall x. CallRef a -> Rep (CallRef a) x
$cto :: forall a x. Rep (CallRef a) x -> CallRef a
to :: forall x. Rep (CallRef a) x -> CallRef a
Generic)

-- | Retrieve the @Recipient@ for a @CallRef@.
recipient :: CallRef a -> Recipient
recipient :: forall a. CallRef a -> Recipient
recipient = (Recipient, CallId) -> Recipient
forall a b. (a, b) -> a
fst ((Recipient, CallId) -> Recipient)
-> (CallRef a -> (Recipient, CallId)) -> CallRef a -> Recipient
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CallRef a -> (Recipient, CallId)
forall a. CallRef a -> (Recipient, CallId)
unCaller

-- | Retrieve the @CallId@ for a @CallRef@.
tag :: CallRef a -> CallId
tag :: forall a. CallRef a -> CallId
tag = (Recipient, CallId) -> CallId
forall a b. (a, b) -> b
snd ((Recipient, CallId) -> CallId)
-> (CallRef a -> (Recipient, CallId)) -> CallRef a -> CallId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CallRef a -> (Recipient, CallId)
forall a. CallRef a -> (Recipient, CallId)
unCaller

instance Binary (CallRef a) where
instance NFData (CallRef a) where rnf :: CallRef a -> ()
rnf (CallRef (Recipient, CallId)
x) = (Recipient, CallId) -> ()
forall a. NFData a => a -> ()
rnf (Recipient, CallId)
x () -> () -> ()
forall a b. a -> b -> b
`seq` ()

-- | Creates a @CallRef@ for the given @Recipient@ and @CallId@
makeRef :: Recipient -> CallId -> CallRef a
makeRef :: forall a. Recipient -> CallId -> CallRef a
makeRef Recipient
r CallId
c = (Recipient, CallId) -> CallRef a
forall a. (Recipient, CallId) -> CallRef a
CallRef (Recipient
r, CallId
c)

-- | @Message@ type used internally by the call, cast, and rpcChan APIs.
data Message a b =
    CastMessage a
  | CallMessage a (CallRef b)
  | ChanMessage a (SendPort b)
  deriving (Typeable, (forall x. Message a b -> Rep (Message a b) x)
-> (forall x. Rep (Message a b) x -> Message a b)
-> Generic (Message a b)
forall x. Rep (Message a b) x -> Message a b
forall x. Message a b -> Rep (Message a b) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall a b x. Rep (Message a b) x -> Message a b
forall a b x. Message a b -> Rep (Message a b) x
$cfrom :: forall a b x. Message a b -> Rep (Message a b) x
from :: forall x. Message a b -> Rep (Message a b) x
$cto :: forall a b x. Rep (Message a b) x -> Message a b
to :: forall x. Rep (Message a b) x -> Message a b
Generic)

-- | Retrieve the @Recipient@ from a @Message@. If the supplied message is
-- a /cast/ or /chan/ message will evaluate to @Nothing@, otherwise @Just ref@.
caller :: forall a b . Message a b -> Maybe Recipient
caller :: forall a b. Message a b -> Maybe Recipient
caller (CallMessage a
_ CallRef b
ref) = Recipient -> Maybe Recipient
forall a. a -> Maybe a
Just (Recipient -> Maybe Recipient) -> Recipient -> Maybe Recipient
forall a b. (a -> b) -> a -> b
$ CallRef b -> Recipient
forall a. CallRef a -> Recipient
recipient CallRef b
ref
caller Message a b
_                   = Maybe Recipient
forall a. Maybe a
Nothing

-- | Reject a /call/ message with the supplied string. Sends @CallRejected@ to
-- the recipient if the input is a @CallMessage@, otherwise has no side effects.
rejectToCaller :: forall a b .
                  Message a b -> String -> Process ()
rejectToCaller :: forall a b. Message a b -> String -> Process ()
rejectToCaller (CallMessage a
_ CallRef b
ref) String
m = CallRef b -> CallRejected -> Process ()
forall m.
(Serializable m, Resolvable (CallRef b)) =>
CallRef b -> m -> Process ()
forall a m.
(Routable a, Serializable m, Resolvable a) =>
a -> m -> Process ()
sendTo CallRef b
ref (String -> CallId -> CallRejected
CallRejected String
m (CallRef b -> CallId
forall a. CallRef a -> CallId
tag CallRef b
ref))
rejectToCaller Message a b
_                   String
_ = () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

instance (Serializable a, Serializable b) => Binary (Message a b) where
instance (NFSerializable a, NFSerializable b) => NFData (Message a b) where
  rnf :: Message a b -> ()
rnf (CastMessage a
a) = a -> ()
forall a. NFData a => a -> ()
rnf a
a () -> () -> ()
forall a b. a -> b -> b
`seq` ()
  rnf (CallMessage a
a CallRef b
b) = a -> ()
forall a. NFData a => a -> ()
rnf a
a () -> () -> ()
forall a b. a -> b -> b
`seq` CallRef b -> ()
forall a. NFData a => a -> ()
rnf CallRef b
b () -> () -> ()
forall a b. a -> b -> b
`seq` ()
  rnf (ChanMessage a
a SendPort b
b) = a -> ()
forall a. NFData a => a -> ()
rnf a
a () -> () -> ()
forall a b. a -> b -> b
`seq` SendPort b -> ()
forall a. NFData a => a -> ()
rnf SendPort b
b () -> () -> ()
forall a b. a -> b -> b
`seq` ()
deriving instance (Eq a, Eq b) => Eq (Message a b)
deriving instance (Show a, Show b) => Show (Message a b)

-- | Response type for the call API
data CallResponse a = CallResponse a CallId
  deriving (Typeable, (forall x. CallResponse a -> Rep (CallResponse a) x)
-> (forall x. Rep (CallResponse a) x -> CallResponse a)
-> Generic (CallResponse a)
forall x. Rep (CallResponse a) x -> CallResponse a
forall x. CallResponse a -> Rep (CallResponse a) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall a x. Rep (CallResponse a) x -> CallResponse a
forall a x. CallResponse a -> Rep (CallResponse a) x
$cfrom :: forall a x. CallResponse a -> Rep (CallResponse a) x
from :: forall x. CallResponse a -> Rep (CallResponse a) x
$cto :: forall a x. Rep (CallResponse a) x -> CallResponse a
to :: forall x. Rep (CallResponse a) x -> CallResponse a
Generic)

instance Serializable a => Binary (CallResponse a)
instance NFSerializable a => NFData (CallResponse a) where
  rnf :: CallResponse a -> ()
rnf (CallResponse a
a CallId
c) = a -> ()
forall a. NFData a => a -> ()
rnf a
a () -> () -> ()
forall a b. a -> b -> b
`seq` CallId -> ()
forall a. NFData a => a -> ()
rnf CallId
c () -> () -> ()
forall a b. a -> b -> b
`seq` ()
deriving instance Eq a => Eq (CallResponse a)
deriving instance Show a => Show (CallResponse a)

-- | Sent to a consumer of the /call/ API when a server filter expression
-- explicitly rejects an incoming call message.
data CallRejected = CallRejected String CallId
  deriving (Typeable, (forall x. CallRejected -> Rep CallRejected x)
-> (forall x. Rep CallRejected x -> CallRejected)
-> Generic CallRejected
forall x. Rep CallRejected x -> CallRejected
forall x. CallRejected -> Rep CallRejected x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. CallRejected -> Rep CallRejected x
from :: forall x. CallRejected -> Rep CallRejected x
$cto :: forall x. Rep CallRejected x -> CallRejected
to :: forall x. Rep CallRejected x -> CallRejected
Generic, Int -> CallRejected -> ShowS
[CallRejected] -> ShowS
CallRejected -> String
(Int -> CallRejected -> ShowS)
-> (CallRejected -> String)
-> ([CallRejected] -> ShowS)
-> Show CallRejected
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> CallRejected -> ShowS
showsPrec :: Int -> CallRejected -> ShowS
$cshow :: CallRejected -> String
show :: CallRejected -> String
$cshowList :: [CallRejected] -> ShowS
showList :: [CallRejected] -> ShowS
Show, CallRejected -> CallRejected -> Bool
(CallRejected -> CallRejected -> Bool)
-> (CallRejected -> CallRejected -> Bool) -> Eq CallRejected
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: CallRejected -> CallRejected -> Bool
== :: CallRejected -> CallRejected -> Bool
$c/= :: CallRejected -> CallRejected -> Bool
/= :: CallRejected -> CallRejected -> Bool
Eq)
instance Binary CallRejected where
instance NFData CallRejected where

instance Resolvable (CallRef a) where
  resolve :: CallRef a -> Process (Maybe ProcessId)
resolve (CallRef (Recipient
r, CallId
_)) = Recipient -> Process (Maybe ProcessId)
forall a. Resolvable a => a -> Process (Maybe ProcessId)
resolve Recipient
r

instance Routable (CallRef a) where
  sendTo :: forall m.
(Serializable m, Resolvable (CallRef a)) =>
CallRef a -> m -> Process ()
sendTo (CallRef (Recipient
c, CallId
_)) = Recipient -> m -> Process ()
forall m.
(Serializable m, Resolvable Recipient) =>
Recipient -> m -> Process ()
forall a m.
(Routable a, Serializable m, Resolvable a) =>
a -> m -> Process ()
sendTo Recipient
c
  unsafeSendTo :: forall m.
(NFSerializable m, Resolvable (CallRef a)) =>
CallRef a -> m -> Process ()
unsafeSendTo (CallRef (Recipient
c, CallId
_)) = Recipient -> m -> Process ()
forall a m.
(Routable a, NFSerializable m, Resolvable a) =>
a -> m -> Process ()
forall m.
(NFSerializable m, Resolvable Recipient) =>
Recipient -> m -> Process ()
unsafeSendTo Recipient
c

-- | Return type for and 'InitHandler' expression.
data InitResult s =
    InitOk s Delay {-
        ^ a successful initialisation, initial state and timeout -}
  | InitStop String {-
        ^ failed initialisation and the reason, this will result in an error -}
  | InitIgnore {-
        ^ the process has decided not to continue starting - this is not an error -}
  deriving (Typeable)

-- | Represent a max-backlog from RecvTimeoutPolicy
type Limit = Maybe Int

-- | Internal priority queue, used by prioritised processes.
type Queue = PriorityQ Int P.Message

-- | Map from @TimerKey@ to @(Timer, Message)@.
type TimerMap = Map TimerKey (Timer, P.Message)

-- | Internal state of a prioritised process loop.
data ProcessState s = ProcessState { forall s. ProcessState s -> RecvTimeoutPolicy
timeoutSpec :: RecvTimeoutPolicy
                                   , forall s. ProcessState s -> ProcessDefinition s
procDef     :: ProcessDefinition s
                                   , forall s. ProcessState s -> [DispatchPriority s]
procPrio    :: [DispatchPriority s]
                                   , forall s. ProcessState s -> [DispatchFilter s]
procFilters :: [DispatchFilter s]
                                   , forall s. ProcessState s -> Delay
usrTimeout  :: Delay
                                   , forall s. ProcessState s -> Timer
sysTimeout  :: Timer
                                   , forall s. ProcessState s -> TimerMap
usrTimers   :: TimerMap
                                   , forall s. ProcessState s -> Queue
internalQ   :: Queue
                                   , forall s. ProcessState s -> s
procState   :: s
                                   }

-- | Prioritised process state, held as an @IORef@.
type State s = IORef (ProcessState s)

-- | StateT based monad for prioritised process loops.
newtype GenProcess s a = GenProcess {
   forall s a. GenProcess s a -> StateT (State s) Process a
unManaged :: ST.StateT (State s) Process a
 }
 deriving ( (forall a b. (a -> b) -> GenProcess s a -> GenProcess s b)
-> (forall a b. a -> GenProcess s b -> GenProcess s a)
-> Functor (GenProcess s)
forall a b. a -> GenProcess s b -> GenProcess s a
forall a b. (a -> b) -> GenProcess s a -> GenProcess s b
forall s a b. a -> GenProcess s b -> GenProcess s a
forall s a b. (a -> b) -> GenProcess s a -> GenProcess s b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall s a b. (a -> b) -> GenProcess s a -> GenProcess s b
fmap :: forall a b. (a -> b) -> GenProcess s a -> GenProcess s b
$c<$ :: forall s a b. a -> GenProcess s b -> GenProcess s a
<$ :: forall a b. a -> GenProcess s b -> GenProcess s a
Functor
          , Applicative (GenProcess s)
Applicative (GenProcess s) =>
(forall a b.
 GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b)
-> (forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b)
-> (forall a. a -> GenProcess s a)
-> Monad (GenProcess s)
forall s. Applicative (GenProcess s)
forall a. a -> GenProcess s a
forall s a. a -> GenProcess s a
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall s a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall s a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
$c>>= :: forall s a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
>>= :: forall a b.
GenProcess s a -> (a -> GenProcess s b) -> GenProcess s b
$c>> :: forall s a b. GenProcess s a -> GenProcess s b -> GenProcess s b
>> :: forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
$creturn :: forall s a. a -> GenProcess s a
return :: forall a. a -> GenProcess s a
Monad
          , ST.MonadState (State s)
          , Monad (GenProcess s)
Monad (GenProcess s) =>
(forall a. IO a -> GenProcess s a) -> MonadIO (GenProcess s)
forall s. Monad (GenProcess s)
forall a. IO a -> GenProcess s a
forall s a. IO a -> GenProcess s a
forall (m :: * -> *).
Monad m =>
(forall a. IO a -> m a) -> MonadIO m
$cliftIO :: forall s a. IO a -> GenProcess s a
liftIO :: forall a. IO a -> GenProcess s a
MonadIO
          , Monad (GenProcess s)
Monad (GenProcess s) =>
(forall a. (a -> GenProcess s a) -> GenProcess s a)
-> MonadFix (GenProcess s)
forall s. Monad (GenProcess s)
forall a. (a -> GenProcess s a) -> GenProcess s a
forall s a. (a -> GenProcess s a) -> GenProcess s a
forall (m :: * -> *).
Monad m =>
(forall a. (a -> m a) -> m a) -> MonadFix m
$cmfix :: forall s a. (a -> GenProcess s a) -> GenProcess s a
mfix :: forall a. (a -> GenProcess s a) -> GenProcess s a
MonadFix
          , Typeable
          , Functor (GenProcess s)
Functor (GenProcess s) =>
(forall a. a -> GenProcess s a)
-> (forall a b.
    GenProcess s (a -> b) -> GenProcess s a -> GenProcess s b)
-> (forall a b c.
    (a -> b -> c)
    -> GenProcess s a -> GenProcess s b -> GenProcess s c)
-> (forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b)
-> (forall a b. GenProcess s a -> GenProcess s b -> GenProcess s a)
-> Applicative (GenProcess s)
forall s. Functor (GenProcess s)
forall a. a -> GenProcess s a
forall s a. a -> GenProcess s a
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s a
forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall a b.
GenProcess s (a -> b) -> GenProcess s a -> GenProcess s b
forall s a b. GenProcess s a -> GenProcess s b -> GenProcess s a
forall s a b. GenProcess s a -> GenProcess s b -> GenProcess s b
forall s a b.
GenProcess s (a -> b) -> GenProcess s a -> GenProcess s b
forall a b c.
(a -> b -> c) -> GenProcess s a -> GenProcess s b -> GenProcess s c
forall s a b c.
(a -> b -> c) -> GenProcess s a -> GenProcess s b -> GenProcess s c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
$cpure :: forall s a. a -> GenProcess s a
pure :: forall a. a -> GenProcess s a
$c<*> :: forall s a b.
GenProcess s (a -> b) -> GenProcess s a -> GenProcess s b
<*> :: forall a b.
GenProcess s (a -> b) -> GenProcess s a -> GenProcess s b
$cliftA2 :: forall s a b c.
(a -> b -> c) -> GenProcess s a -> GenProcess s b -> GenProcess s c
liftA2 :: forall a b c.
(a -> b -> c) -> GenProcess s a -> GenProcess s b -> GenProcess s c
$c*> :: forall s a b. GenProcess s a -> GenProcess s b -> GenProcess s b
*> :: forall a b. GenProcess s a -> GenProcess s b -> GenProcess s b
$c<* :: forall s a b. GenProcess s a -> GenProcess s b -> GenProcess s a
<* :: forall a b. GenProcess s a -> GenProcess s b -> GenProcess s a
Applicative
          )

instance forall s . MonadThrow (GenProcess s) where
  throwM :: forall e a. (HasCallStack, Exception e) => e -> GenProcess s a
throwM = Process a -> GenProcess s a
forall a s. Process a -> GenProcess s a
lift (Process a -> GenProcess s a)
-> (e -> Process a) -> e -> GenProcess s a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> Process a
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
Catch.throwM

instance forall s . MonadCatch (GenProcess s) where
  catch :: forall e a.
(HasCallStack, Exception e) =>
GenProcess s a -> (e -> GenProcess s a) -> GenProcess s a
catch GenProcess s a
p e -> GenProcess s a
h = do
    State s
pSt <- GenProcess s (State s)
forall s (m :: * -> *). MonadState s m => m s
ST.get
    -- we can throw away our state since it is always accessed via an IORef
    (a
a, State s
_) <- Process (a, State s) -> GenProcess s (a, State s)
forall a s. Process a -> GenProcess s a
lift (Process (a, State s) -> GenProcess s (a, State s))
-> Process (a, State s) -> GenProcess s (a, State s)
forall a b. (a -> b) -> a -> b
$ Process (a, State s)
-> (e -> Process (a, State s)) -> Process (a, State s)
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
Catch.catch (State s -> GenProcess s a -> Process (a, State s)
forall s a. State s -> GenProcess s a -> Process (a, State s)
runProcess State s
pSt GenProcess s a
p) (State s -> GenProcess s a -> Process (a, State s)
forall s a. State s -> GenProcess s a -> Process (a, State s)
runProcess State s
pSt (GenProcess s a -> Process (a, State s))
-> (e -> GenProcess s a) -> e -> Process (a, State s)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> GenProcess s a
h)
    a -> GenProcess s a
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a

instance forall s . MonadMask (GenProcess s) where
  mask :: forall b.
HasCallStack =>
((forall a. GenProcess s a -> GenProcess s a) -> GenProcess s b)
-> GenProcess s b
mask (forall a. GenProcess s a -> GenProcess s a) -> GenProcess s b
p = do
      State s
pSt <- GenProcess s (State s)
forall s (m :: * -> *). MonadState s m => m s
ST.get
      Process b -> GenProcess s b
forall a s. Process a -> GenProcess s a
lift (Process b -> GenProcess s b) -> Process b -> GenProcess s b
forall a b. (a -> b) -> a -> b
$ ((forall a. Process a -> Process a) -> Process b) -> Process b
forall b.
HasCallStack =>
((forall a. Process a -> Process a) -> Process b) -> Process b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. Process a -> Process a) -> Process b) -> Process b)
-> ((forall a. Process a -> Process a) -> Process b) -> Process b
forall a b. (a -> b) -> a -> b
$ \forall a. Process a -> Process a
restore -> do
        (b
a, State s
_) <- State s -> GenProcess s b -> Process (b, State s)
forall s a. State s -> GenProcess s a -> Process (a, State s)
runProcess State s
pSt ((forall a. GenProcess s a -> GenProcess s a) -> GenProcess s b
p ((Process (a, State s) -> Process (a, State s))
-> GenProcess s a -> GenProcess s a
forall {a} {s} {b} {b}.
(Process (a, State s) -> Process (b, b))
-> GenProcess s a -> GenProcess s b
liftRestore Process (a, State s) -> Process (a, State s)
forall a. Process a -> Process a
restore))
        b -> Process b
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return b
a
    where
      liftRestore :: (Process (a, State s) -> Process (b, b))
-> GenProcess s a -> GenProcess s b
liftRestore Process (a, State s) -> Process (b, b)
restoreP = \GenProcess s a
p2 -> do
        State s
ourSTate <- GenProcess s (State s)
forall s (m :: * -> *). MonadState s m => m s
ST.get
        (b
a', b
_) <- Process (b, b) -> GenProcess s (b, b)
forall a s. Process a -> GenProcess s a
lift (Process (b, b) -> GenProcess s (b, b))
-> Process (b, b) -> GenProcess s (b, b)
forall a b. (a -> b) -> a -> b
$ Process (a, State s) -> Process (b, b)
restoreP (Process (a, State s) -> Process (b, b))
-> Process (a, State s) -> Process (b, b)
forall a b. (a -> b) -> a -> b
$ State s -> GenProcess s a -> Process (a, State s)
forall s a. State s -> GenProcess s a -> Process (a, State s)
runProcess State s
ourSTate GenProcess s a
p2
        b -> GenProcess s b
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return b
a'

  uninterruptibleMask :: forall b.
HasCallStack =>
((forall a. GenProcess s a -> GenProcess s a) -> GenProcess s b)
-> GenProcess s b
uninterruptibleMask (forall a. GenProcess s a -> GenProcess s a) -> GenProcess s b
p = do
      State s
pSt <- GenProcess s (State s)
forall s (m :: * -> *). MonadState s m => m s
ST.get
      (b
a, State s
_) <- Process (b, State s) -> GenProcess s (b, State s)
forall a s. Process a -> GenProcess s a
lift (Process (b, State s) -> GenProcess s (b, State s))
-> Process (b, State s) -> GenProcess s (b, State s)
forall a b. (a -> b) -> a -> b
$ ((forall a. Process a -> Process a) -> Process (b, State s))
-> Process (b, State s)
forall b.
HasCallStack =>
((forall a. Process a -> Process a) -> Process b) -> Process b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
uninterruptibleMask (((forall a. Process a -> Process a) -> Process (b, State s))
 -> Process (b, State s))
-> ((forall a. Process a -> Process a) -> Process (b, State s))
-> Process (b, State s)
forall a b. (a -> b) -> a -> b
$ \forall a. Process a -> Process a
restore ->
        State s -> GenProcess s b -> Process (b, State s)
forall s a. State s -> GenProcess s a -> Process (a, State s)
runProcess State s
pSt ((forall a. GenProcess s a -> GenProcess s a) -> GenProcess s b
p ((Process (a, State s) -> Process (a, State s))
-> GenProcess s a -> GenProcess s a
forall {a} {s} {b} {b}.
(Process (a, State s) -> Process (b, b))
-> GenProcess s a -> GenProcess s b
liftRestore Process (a, State s) -> Process (a, State s)
forall a. Process a -> Process a
restore))
      b -> GenProcess s b
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return b
a
    where
      liftRestore :: (Process (a, State s) -> Process (b, b))
-> GenProcess s a -> GenProcess s b
liftRestore Process (a, State s) -> Process (b, b)
restoreP = \GenProcess s a
p2 -> do
        State s
ourSTate <- GenProcess s (State s)
forall s (m :: * -> *). MonadState s m => m s
ST.get
        (b
a', b
_) <- Process (b, b) -> GenProcess s (b, b)
forall a s. Process a -> GenProcess s a
lift (Process (b, b) -> GenProcess s (b, b))
-> Process (b, b) -> GenProcess s (b, b)
forall a b. (a -> b) -> a -> b
$ Process (a, State s) -> Process (b, b)
restoreP (Process (a, State s) -> Process (b, b))
-> Process (a, State s) -> Process (b, b)
forall a b. (a -> b) -> a -> b
$ State s -> GenProcess s a -> Process (a, State s)
forall s a. State s -> GenProcess s a -> Process (a, State s)
runProcess State s
ourSTate GenProcess s a
p2
        b -> GenProcess s b
forall a. a -> GenProcess s a
forall (m :: * -> *) a. Monad m => a -> m a
return b
a'

  generalBracket :: forall a b c.
HasCallStack =>
GenProcess s a
-> (a -> ExitCase b -> GenProcess s c)
-> (a -> GenProcess s b)
-> GenProcess s (b, c)
generalBracket GenProcess s a
acquire a -> ExitCase b -> GenProcess s c
release a -> GenProcess s b
inner = StateT (State s) Process (b, c) -> GenProcess s (b, c)
forall s a. StateT (State s) Process a -> GenProcess s a
GenProcess (StateT (State s) Process (b, c) -> GenProcess s (b, c))
-> StateT (State s) Process (b, c) -> GenProcess s (b, c)
forall a b. (a -> b) -> a -> b
$ 
    StateT (State s) Process a
-> (a -> ExitCase b -> StateT (State s) Process c)
-> (a -> StateT (State s) Process b)
-> StateT (State s) Process (b, c)
forall a b c.
HasCallStack =>
StateT (State s) Process a
-> (a -> ExitCase b -> StateT (State s) Process c)
-> (a -> StateT (State s) Process b)
-> StateT (State s) Process (b, c)
forall (m :: * -> *) a b c.
(MonadMask m, HasCallStack) =>
m a -> (a -> ExitCase b -> m c) -> (a -> m b) -> m (b, c)
generalBracket (GenProcess s a -> StateT (State s) Process a
forall s a. GenProcess s a -> StateT (State s) Process a
unManaged GenProcess s a
acquire)
                   (\a
a ExitCase b
e -> GenProcess s c -> StateT (State s) Process c
forall s a. GenProcess s a -> StateT (State s) Process a
unManaged (GenProcess s c -> StateT (State s) Process c)
-> GenProcess s c -> StateT (State s) Process c
forall a b. (a -> b) -> a -> b
$ a -> ExitCase b -> GenProcess s c
release a
a ExitCase b
e)
                   (GenProcess s b -> StateT (State s) Process b
forall s a. GenProcess s a -> StateT (State s) Process a
unManaged (GenProcess s b -> StateT (State s) Process b)
-> (a -> GenProcess s b) -> a -> StateT (State s) Process b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> GenProcess s b
inner)

-- | Run an action in the @GenProcess@ monad.
runProcess :: State s -> GenProcess s a -> Process (a, State s)
runProcess :: forall s a. State s -> GenProcess s a -> Process (a, State s)
runProcess State s
state GenProcess s a
proc = StateT (State s) Process a -> State s -> Process (a, State s)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
ST.runStateT (GenProcess s a -> StateT (State s) Process a
forall s a. GenProcess s a -> StateT (State s) Process a
unManaged GenProcess s a
proc) State s
state

-- | Lift an action in the @Process@ monad to @GenProcess@.
lift :: Process a -> GenProcess s a
lift :: forall a s. Process a -> GenProcess s a
lift Process a
p = StateT (State s) Process a -> GenProcess s a
forall s a. StateT (State s) Process a -> GenProcess s a
GenProcess (StateT (State s) Process a -> GenProcess s a)
-> StateT (State s) Process a -> GenProcess s a
forall a b. (a -> b) -> a -> b
$ Process a -> StateT (State s) Process a
forall (m :: * -> *) a. Monad m => m a -> StateT (State s) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
ST.lift Process a
p

-- | Lift an IO action directly into @GenProcess@, @liftIO = lift . Process.LiftIO@.
liftIO :: IO a -> GenProcess s a
liftIO :: forall a s. IO a -> GenProcess s a
liftIO = Process a -> GenProcess s a
forall a s. Process a -> GenProcess s a
lift (Process a -> GenProcess s a)
-> (IO a -> Process a) -> IO a -> GenProcess s a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO a -> Process a
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
P.liftIO

-- | The action taken by a process after a handler has run and its updated state.
-- See "Control.Distributed.Process.ManagedProcess.Server.continue"
--     "Control.Distributed.Process.ManagedProcess.Server.timeoutAfter"
--     "Control.Distributed.Process.ManagedProcess.Server.hibernate"
--     "Control.Distributed.Process.ManagedProcess.Server.stop"
--     "Control.Distributed.Process.ManagedProcess.Server.stopWith"
--
-- Also see "Control.Distributed.Process.Management.Priority.act" and
-- "Control.Distributed.Process.ManagedProcess.Priority.runAfter".
--
-- And other actions. This type should not be used directly.
data ProcessAction s =
    ProcessSkip
  | ProcessActivity  (GenProcess s ()) -- ^ run the given activity
  | ProcessExpression (GenProcess s (ProcessAction s)) -- ^ evaluate an expression
  | ProcessContinue  s              -- ^ continue with (possibly new) state
  | ProcessTimeout   Delay        s -- ^ timeout if no messages are received
  | ProcessHibernate TimeInterval s -- ^ hibernate for /delay/
  | ProcessStop      ExitReason     -- ^ stop the process, giving @ExitReason@
  | ProcessStopping  s ExitReason   -- ^ stop the process with @ExitReason@, with updated state
  | ProcessBecome    (ProcessDefinition s) s -- ^ changes the current process definition

-- | Returned from handlers for the synchronous 'call' protocol, encapsulates
-- the reply data /and/ the action to take after sending the reply. A handler
-- can return @NoReply@ if they wish to ignore the call.
data ProcessReply r s =
    ProcessReply r (ProcessAction s)
  | ProcessReject String (ProcessAction s)  -- TODO: can we use a functional dependency here?
  | NoReply (ProcessAction s)

-- | Wraps a predicate that is used to determine whether or not a handler
-- is valid based on some combination of the current process state, the
-- type and/or value of the input message or both.
data Condition s m =
    Condition (s -> m -> Bool)  -- ^ predicated on the process state /and/ the message
  | State     (s -> Bool)       -- ^ predicated on the process state only
  | Input     (m -> Bool)       -- ^ predicated on the input message only

{-

class Check c s m | s m -> c where
  -- data Checker c :: * -> * -> *
  -- apply :: s -> m -> Checker c s m -> Bool
  apply :: s -> m -> c -> Bool

instance Check (Condition s m) s m where
  -- data Checker (Condition s m) s m = CheckCond (Condition s m)
  apply s m (Condition f) = f s m
  apply s _ (State f)     = f s
  apply _ m (Input f)     = f m

instance Check (s -> m -> Bool) s m where
   -- data Checker (s -> m -> Bool) s m = CheckF (s -> m -> Bool)
   apply s m f = f s m
-}

-- | Informs a /shutdown handler/ of whether it is running due to a clean
-- shutdown, or in response to an unhandled exception.
data ExitState s = CleanShutdown s -- ^ given when an ordered shutdown is underway
                 | LastKnown s     {-
                  ^ given due to an unhandled exception, passing the last known state -}

-- | @True@ if the @ExitState@ is @CleanShutdown@, otherwise @False@.
isCleanShutdown :: ExitState s -> Bool
isCleanShutdown :: forall s. ExitState s -> Bool
isCleanShutdown (CleanShutdown s
_) = Bool
True
isCleanShutdown ExitState s
_                 = Bool
False

-- | Evaluates to the @s@ state datum in the given @ExitState@.
exitState :: ExitState s -> s
exitState :: forall s. ExitState s -> s
exitState (CleanShutdown s
s) = s
s
exitState (LastKnown s
s)     = s
s

-- | An action (server state transition) in the @Process@ monad
type Action s = Process (ProcessAction s)

-- | An action (server state transition) causing a reply to a  caller, in the
-- @Process@ monad
type Reply b s = Process (ProcessReply b s)

-- | An expression used to handle a message
type ActionHandler s a = s -> a -> Action s

-- | An expression used to handle a message and providing a reply
type CallHandler s a b = s -> a -> Reply b s

-- | An expression used to ignore server state during handling
type StatelessHandler s a = a -> (s -> Action s)

-- | An expression used to handle a /call/ message where the reply is deferred
-- via the 'CallRef'
type DeferredCallHandler s a b = CallRef b -> CallHandler s a b

-- | An expression used to handle a /call/ message ignoring server state
type StatelessCallHandler s a b = CallRef b -> a -> Reply b s

-- | An expression used to handle a /cast/ message
type CastHandler s a = ActionHandler s a

-- | An expression used to handle an /info/ message
type InfoHandler s a = ActionHandler s a

-- | An expression used to handle a /channel/ message
type ChannelHandler s a b = SendPort b -> ActionHandler s a

-- | An expression used to handle a /channel/ message in a stateless process
type StatelessChannelHandler s a b = SendPort b -> StatelessHandler s a

-- | An expression used to initialise a process with its state
type InitHandler a s = a -> Process (InitResult s)

-- | An expression used to handle process termination
type ShutdownHandler s = ExitState s -> ExitReason -> Process ()

-- | An expression used to handle process timeouts
type TimeoutHandler s = ActionHandler s Delay

-- dispatching to implementation callbacks

-- | Provides a means for servers to listen on a separate, typed /control/
-- channel, thereby segregating the channel from their regular
-- (and potentially busy) mailbox.
newtype ControlChannel m =
  ControlChannel {
      forall m.
ControlChannel m
-> (SendPort (Message m ()), ReceivePort (Message m ()))
unControl :: (SendPort (Message m ()), ReceivePort (Message m ()))
    }

-- | Creates a new 'ControlChannel'.
newControlChan :: (Serializable m) => Process (ControlChannel m)
newControlChan :: forall m. Serializable m => Process (ControlChannel m)
newControlChan = ((SendPort (Message m ()), ReceivePort (Message m ()))
 -> ControlChannel m)
-> Process (SendPort (Message m ()), ReceivePort (Message m ()))
-> Process (ControlChannel m)
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (SendPort (Message m ()), ReceivePort (Message m ()))
-> ControlChannel m
forall m.
(SendPort (Message m ()), ReceivePort (Message m ()))
-> ControlChannel m
ControlChannel Process (SendPort (Message m ()), ReceivePort (Message m ()))
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan

-- | The writable end of a 'ControlChannel'.
--
newtype ControlPort m =
  ControlPort {
      forall m. ControlPort m -> SendPort (Message m ())
unPort :: SendPort (Message m ())
    } deriving (Int -> ControlPort m -> ShowS
[ControlPort m] -> ShowS
ControlPort m -> String
(Int -> ControlPort m -> ShowS)
-> (ControlPort m -> String)
-> ([ControlPort m] -> ShowS)
-> Show (ControlPort m)
forall m. Int -> ControlPort m -> ShowS
forall m. [ControlPort m] -> ShowS
forall m. ControlPort m -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall m. Int -> ControlPort m -> ShowS
showsPrec :: Int -> ControlPort m -> ShowS
$cshow :: forall m. ControlPort m -> String
show :: ControlPort m -> String
$cshowList :: forall m. [ControlPort m] -> ShowS
showList :: [ControlPort m] -> ShowS
Show)
deriving instance (Serializable m) => Binary (ControlPort m)
instance Eq (ControlPort m) where
  ControlPort m
a == :: ControlPort m -> ControlPort m -> Bool
== ControlPort m
b = ControlPort m -> SendPort (Message m ())
forall m. ControlPort m -> SendPort (Message m ())
unPort ControlPort m
a SendPort (Message m ()) -> SendPort (Message m ()) -> Bool
forall a. Eq a => a -> a -> Bool
== ControlPort m -> SendPort (Message m ())
forall m. ControlPort m -> SendPort (Message m ())
unPort ControlPort m
b

-- | Obtain an opaque expression for communicating with a 'ControlChannel'.
--
channelControlPort :: ControlChannel m
                   -> ControlPort m
channelControlPort :: forall m. ControlChannel m -> ControlPort m
channelControlPort ControlChannel m
cc = SendPort (Message m ()) -> ControlPort m
forall m. SendPort (Message m ()) -> ControlPort m
ControlPort (SendPort (Message m ()) -> ControlPort m)
-> SendPort (Message m ()) -> ControlPort m
forall a b. (a -> b) -> a -> b
$ (SendPort (Message m ()), ReceivePort (Message m ()))
-> SendPort (Message m ())
forall a b. (a, b) -> a
fst ((SendPort (Message m ()), ReceivePort (Message m ()))
 -> SendPort (Message m ()))
-> (SendPort (Message m ()), ReceivePort (Message m ()))
-> SendPort (Message m ())
forall a b. (a -> b) -> a -> b
$ ControlChannel m
-> (SendPort (Message m ()), ReceivePort (Message m ()))
forall m.
ControlChannel m
-> (SendPort (Message m ()), ReceivePort (Message m ()))
unControl ControlChannel m
cc

-- | Given as the result of evaluating a "DispatchFilter". This type is intended
-- for internal use. For an API for working with filters,
-- see "Control.Distributed.Process.ManagedProcess.Priority".
data Filter s = FilterOk s
              | FilterSafe s
              | forall m . (Show m) => FilterReject m s
              | FilterSkip s
              | FilterStop s ExitReason

-- | Provides dispatch from a variety of inputs to a typed filter handler.
data DispatchFilter s =
    forall a b . (Serializable a, Serializable b) =>
    FilterApi
    {
      ()
apiFilter :: s -> Message a b -> Process (Filter s)
    }
  | forall a . (Serializable a) =>
    FilterAny
    {
      ()
anyFilter :: s -> a -> Process (Filter s)
    }
  | FilterRaw
    {
      forall s.
DispatchFilter s -> s -> Message -> Process (Maybe (Filter s))
rawFilter :: s -> P.Message -> Process (Maybe (Filter s))
    }
  | FilterState
    {
      forall s. DispatchFilter s -> s -> Process (Maybe (Filter s))
stateFilter :: s -> Process (Maybe (Filter s))
    }

-- | Provides dispatch from cast and call messages to a typed handler.
data Dispatcher s =
    forall a b . (Serializable a, Serializable b) =>
    Dispatch
    {
      ()
dispatch :: s -> Message a b -> Process (ProcessAction s)
    }
  | forall a b . (Serializable a, Serializable b) =>
    DispatchIf
    {
      dispatch   :: s -> Message a b -> Process (ProcessAction s)
    , ()
dispatchIf :: s -> Message a b -> Bool
    }

-- | Provides dispatch for channels and STM actions
data ExternDispatcher s =
    forall a b . (Serializable a, Serializable b) =>
    DispatchCC  -- control channel dispatch
    {
      ()
channel      :: ReceivePort (Message a b)
    , ()
dispatchChan :: s -> Message a b -> Process (ProcessAction s)
    }
  | forall a . (Serializable a) =>
    DispatchSTM -- arbitrary STM actions
    {
      ()
stmAction   :: STM a
    , ()
dispatchStm :: s -> a -> Process (ProcessAction s)
    , forall s. ExternDispatcher s -> Match Message
matchStm    :: Match P.Message
    , forall s. ExternDispatcher s -> forall m. (Message -> m) -> Match m
matchAnyStm :: forall m . (P.Message -> m) -> Match m
    }

-- | Provides dispatch for any input, returns 'Nothing' for unhandled messages.
data DeferredDispatcher s =
  DeferredDispatcher
  {
    forall s.
DeferredDispatcher s
-> s -> Message -> Process (Maybe (ProcessAction s))
dispatchInfo :: s
                 -> P.Message
                 -> Process (Maybe (ProcessAction s))
  }

-- | Provides dispatch for any exit signal - returns 'Nothing' for unhandled exceptions
data ExitSignalDispatcher s =
  ExitSignalDispatcher
  {
    forall s.
ExitSignalDispatcher s
-> s -> ProcessId -> Message -> Process (Maybe (ProcessAction s))
dispatchExit :: s
                 -> ProcessId
                 -> P.Message
                 -> Process (Maybe (ProcessAction s))
  }

-- | Defines the means of dispatching inbound messages to a handler
class MessageMatcher d where
  matchDispatch :: UnhandledMessagePolicy -> s -> d s -> Match (ProcessAction s)

instance MessageMatcher Dispatcher where
  matchDispatch :: forall s.
UnhandledMessagePolicy
-> s -> Dispatcher s -> Match (ProcessAction s)
matchDispatch UnhandledMessagePolicy
_ s
s (Dispatch         s -> Message a b -> Process (ProcessAction s)
d)      = (Message a b -> Process (ProcessAction s))
-> Match (ProcessAction s)
forall a b. Serializable a => (a -> Process b) -> Match b
match   (s -> Message a b -> Process (ProcessAction s)
d s
s)
  matchDispatch UnhandledMessagePolicy
_ s
s (DispatchIf       s -> Message a b -> Process (ProcessAction s)
d s -> Message a b -> Bool
cond) = (Message a b -> Bool)
-> (Message a b -> Process (ProcessAction s))
-> Match (ProcessAction s)
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (s -> Message a b -> Bool
cond s
s) (s -> Message a b -> Process (ProcessAction s)
d s
s)

instance MessageMatcher ExternDispatcher where
  matchDispatch :: forall s.
UnhandledMessagePolicy
-> s -> ExternDispatcher s -> Match (ProcessAction s)
matchDispatch UnhandledMessagePolicy
_ s
s (DispatchCC  ReceivePort (Message a b)
c s -> Message a b -> Process (ProcessAction s)
d)     = ReceivePort (Message a b)
-> (Message a b -> Process (ProcessAction s))
-> Match (ProcessAction s)
forall a b. ReceivePort a -> (a -> Process b) -> Match b
matchChan ReceivePort (Message a b)
c (s -> Message a b -> Process (ProcessAction s)
d s
s)
  matchDispatch UnhandledMessagePolicy
_ s
s (DispatchSTM STM a
c s -> a -> Process (ProcessAction s)
d Match Message
_ forall m. (Message -> m) -> Match m
_) = STM a
-> (a -> Process (ProcessAction s)) -> Match (ProcessAction s)
forall a b. STM a -> (a -> Process b) -> Match b
matchSTM  STM a
c (s -> a -> Process (ProcessAction s)
d s
s)

-- | Defines the means of dispatching messages from external channels (e.g.
-- those defined in terms of "ControlChannel", and STM actions) to a handler.
class ExternMatcher d where
  matchExtern :: UnhandledMessagePolicy -> s -> d s -> Match P.Message

  matchMapExtern :: forall m s . UnhandledMessagePolicy
                 -> s -> (P.Message -> m) -> d s -> Match m

instance ExternMatcher ExternDispatcher where
  matchExtern :: forall s.
UnhandledMessagePolicy -> s -> ExternDispatcher s -> Match Message
matchExtern UnhandledMessagePolicy
_ s
_ (DispatchCC  ReceivePort (Message a b)
c s -> Message a b -> Process (ProcessAction s)
_)     = ReceivePort (Message a b)
-> (Message a b -> Process Message) -> Match Message
forall a b. ReceivePort a -> (a -> Process b) -> Match b
matchChan ReceivePort (Message a b)
c (Message -> Process Message
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Message -> Process Message)
-> (Message a b -> Message) -> Message a b -> Process Message
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Message a b -> Message
forall a. Serializable a => a -> Message
unsafeWrapMessage)
  matchExtern UnhandledMessagePolicy
_ s
_ (DispatchSTM STM a
_ s -> a -> Process (ProcessAction s)
_ Match Message
m forall m. (Message -> m) -> Match m
_) = Match Message
m

  matchMapExtern :: forall m s.
UnhandledMessagePolicy
-> s -> (Message -> m) -> ExternDispatcher s -> Match m
matchMapExtern UnhandledMessagePolicy
_ s
_ Message -> m
f (DispatchCC ReceivePort (Message a b)
c s -> Message a b -> Process (ProcessAction s)
_)      = ReceivePort (Message a b) -> (Message a b -> Process m) -> Match m
forall a b. ReceivePort a -> (a -> Process b) -> Match b
matchChan ReceivePort (Message a b)
c (m -> Process m
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (m -> Process m) -> (Message a b -> m) -> Message a b -> Process m
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Message -> m
f (Message -> m) -> (Message a b -> Message) -> Message a b -> m
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Message a b -> Message
forall a. Serializable a => a -> Message
unsafeWrapMessage)
  matchMapExtern UnhandledMessagePolicy
_ s
_ Message -> m
f (DispatchSTM STM a
_ s -> a -> Process (ProcessAction s)
_ Match Message
_ forall m. (Message -> m) -> Match m
p) = (Message -> m) -> Match m
forall m. (Message -> m) -> Match m
p Message -> m
f

-- | Priority of a message, encoded as an @Int@
newtype Priority a = Priority { forall a. Priority a -> Int
getPrio :: Int }

-- | Dispatcher for prioritised handlers
data DispatchPriority s =
    PrioritiseCall
    {
      forall s.
DispatchPriority s
-> s -> Message -> Process (Maybe (Int, Message))
prioritise :: s -> P.Message -> Process (Maybe (Int, P.Message))
    }
  | PrioritiseCast
    {
      prioritise :: s -> P.Message -> Process (Maybe (Int, P.Message))
    }
  | PrioritiseInfo
    {
      prioritise :: s -> P.Message -> Process (Maybe (Int, P.Message))
    }

-- | For a 'PrioritisedProcessDefinition', this policy determines for how long
-- the /receive loop/ should continue draining the process' mailbox before
-- processing its received mail (in priority order).
--
-- If a prioritised /managed process/ is receiving a lot of messages (into its
-- /real/ mailbox), the server might never get around to actually processing its
-- inputs. This (mandatory) policy provides a guarantee that eventually (i.e.,
-- after a specified number of received messages or time interval), the server
-- will stop removing messages from its mailbox and process those it has already
-- received.
--
data RecvTimeoutPolicy = RecvMaxBacklog Int | RecvTimer TimeInterval
  deriving (Typeable)

-- | A @ProcessDefinition@ decorated with @DispatchPriority@ for certain
-- input domains.
data PrioritisedProcessDefinition s =
  PrioritisedProcessDefinition
  {
    forall s. PrioritisedProcessDefinition s -> ProcessDefinition s
processDef  :: ProcessDefinition s
  , forall s. PrioritisedProcessDefinition s -> [DispatchPriority s]
priorities  :: [DispatchPriority s]
  , forall s. PrioritisedProcessDefinition s -> [DispatchFilter s]
filters     :: [DispatchFilter s]
  , forall s. PrioritisedProcessDefinition s -> RecvTimeoutPolicy
recvTimeout :: RecvTimeoutPolicy
  }

-- | Policy for handling unexpected messages, i.e., messages which are not
-- sent using the 'call' or 'cast' APIs, and which are not handled by any of the
-- 'handleInfo' handlers.
data UnhandledMessagePolicy =
    Terminate  -- ^ stop immediately, giving @ExitOther "UnhandledInput"@ as the reason
  | DeadLetter ProcessId -- ^ forward the message to the given recipient
  | Log                  -- ^ log messages, then behave identically to @Drop@
  | Drop                 -- ^ dequeue and then drop/ignore the message
  deriving (Int -> UnhandledMessagePolicy -> ShowS
[UnhandledMessagePolicy] -> ShowS
UnhandledMessagePolicy -> String
(Int -> UnhandledMessagePolicy -> ShowS)
-> (UnhandledMessagePolicy -> String)
-> ([UnhandledMessagePolicy] -> ShowS)
-> Show UnhandledMessagePolicy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UnhandledMessagePolicy -> ShowS
showsPrec :: Int -> UnhandledMessagePolicy -> ShowS
$cshow :: UnhandledMessagePolicy -> String
show :: UnhandledMessagePolicy -> String
$cshowList :: [UnhandledMessagePolicy] -> ShowS
showList :: [UnhandledMessagePolicy] -> ShowS
Show, UnhandledMessagePolicy -> UnhandledMessagePolicy -> Bool
(UnhandledMessagePolicy -> UnhandledMessagePolicy -> Bool)
-> (UnhandledMessagePolicy -> UnhandledMessagePolicy -> Bool)
-> Eq UnhandledMessagePolicy
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: UnhandledMessagePolicy -> UnhandledMessagePolicy -> Bool
== :: UnhandledMessagePolicy -> UnhandledMessagePolicy -> Bool
$c/= :: UnhandledMessagePolicy -> UnhandledMessagePolicy -> Bool
/= :: UnhandledMessagePolicy -> UnhandledMessagePolicy -> Bool
Eq)

-- | Stores the functions that determine runtime behaviour in response to
-- incoming messages and a policy for responding to unhandled messages.
data ProcessDefinition s = ProcessDefinition {
    forall s. ProcessDefinition s -> [Dispatcher s]
apiHandlers    :: [Dispatcher s]       -- ^ functions that handle call/cast messages
  , forall s. ProcessDefinition s -> [DeferredDispatcher s]
infoHandlers   :: [DeferredDispatcher s] -- ^ functions that handle non call/cast messages
  , forall s. ProcessDefinition s -> [ExternDispatcher s]
externHandlers :: [ExternDispatcher s] -- ^ functions that handle control channel and STM inputs
  , forall s. ProcessDefinition s -> [ExitSignalDispatcher s]
exitHandlers   :: [ExitSignalDispatcher s] -- ^ functions that handle exit signals
  , forall s. ProcessDefinition s -> TimeoutHandler s
timeoutHandler :: TimeoutHandler s   -- ^ a function that handles timeouts
  , forall s. ProcessDefinition s -> ShutdownHandler s
shutdownHandler :: ShutdownHandler s -- ^ a function that is run just before the process exits
  , forall s. ProcessDefinition s -> UnhandledMessagePolicy
unhandledMessagePolicy :: UnhandledMessagePolicy -- ^ how to deal with unhandled messages
  }

-- note [rpc calls]
-- One problem with using plain expect/receive primitives to perform a
-- synchronous (round trip) call is that a reply matching the expected type
-- could come from anywhere! The Call.hs module uses a unique integer tag to
-- distinguish between inputs but this is easy to forge, and forces all callers
-- to maintain a tag pool, which is quite onerous.
--
-- Here, we use a private (internal) tag based on a 'MonitorRef', which is
-- guaranteed to be unique per calling process (in the absence of mallicious
-- peers). This is handled throughout the roundtrip, such that the reply will
-- either contain the CallId (i.e., the ame 'MonitorRef' with which we're
-- tracking the server process) or we'll see the server die.
--
-- Of course, the downside to all this is that the monitoring and receiving
-- clutters up your mailbox, and if your mailbox is extremely full, could
-- incur delays in delivery. The callAsync function provides a neat
-- work-around for that, relying on the insulation provided by Async.

-- TODO: Generify this /call/ API and use it in Call.hs to avoid tagging

-- TODO: the code below should be moved elsewhere. Maybe to Client.hs?

-- | The send part of the /call/ client-server interaction. The resulting
-- "CallRef" can be used to identify the corrolary response message (if one is
-- sent by the server), and is unique to this /call-reply/ pair.
initCall :: forall s a b . (Addressable s, Serializable a, Serializable b)
         => s -> a -> Process (CallRef b)
initCall :: forall s a b.
(Addressable s, Serializable a, Serializable b) =>
s -> a -> Process (CallRef b)
initCall s
sid a
msg = do
  ProcessId
pid <- s -> String -> Process ProcessId
forall a. Resolvable a => a -> String -> Process ProcessId
resolveOrDie s
sid String
"initCall: unresolveable address "
  CallId
mRef <- ProcessId -> Process CallId
monitor ProcessId
pid
  ProcessId
self <- Process ProcessId
getSelfPid
  let cRef :: CallRef b
cRef = Recipient -> CallId -> CallRef b
forall a. Recipient -> CallId -> CallRef a
makeRef (ProcessId -> Recipient
Pid ProcessId
self) CallId
mRef in do
    ProcessId -> Message a b -> Process ()
forall m.
(Serializable m, Resolvable ProcessId) =>
ProcessId -> m -> Process ()
forall a m.
(Routable a, Serializable m, Resolvable a) =>
a -> m -> Process ()
sendTo ProcessId
pid (a -> CallRef b -> Message a b
forall a b. a -> CallRef b -> Message a b
CallMessage a
msg CallRef b
cRef :: Message a b)
    CallRef b -> Process (CallRef b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return CallRef b
cRef

-- | Version of @initCall@ that utilises "unsafeSendTo".
unsafeInitCall :: forall s a b . ( Addressable s
                                 , NFSerializable a
                                 , NFSerializable b
                                 )
         => s -> a -> Process (CallRef b)
unsafeInitCall :: forall s a b.
(Addressable s, NFSerializable a, NFSerializable b) =>
s -> a -> Process (CallRef b)
unsafeInitCall s
sid a
msg = do
  ProcessId
pid <- s -> String -> Process ProcessId
forall a. Resolvable a => a -> String -> Process ProcessId
resolveOrDie s
sid String
"unsafeInitCall: unresolveable address "
  CallId
mRef <- ProcessId -> Process CallId
monitor ProcessId
pid
  ProcessId
self <- Process ProcessId
getSelfPid
  let cRef :: CallRef b
cRef = Recipient -> CallId -> CallRef b
forall a. Recipient -> CallId -> CallRef a
makeRef (ProcessId -> Recipient
Pid ProcessId
self) CallId
mRef in do
    ProcessId -> Message a b -> Process ()
forall a m.
(Routable a, NFSerializable m, Resolvable a) =>
a -> m -> Process ()
forall m.
(NFSerializable m, Resolvable ProcessId) =>
ProcessId -> m -> Process ()
unsafeSendTo ProcessId
pid (a -> CallRef b -> Message a b
forall a b. a -> CallRef b -> Message a b
CallMessage a
msg CallRef b
cRef  :: Message a b)
    CallRef b -> Process (CallRef b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return CallRef b
cRef

-- | Wait on the server's response after an "initCall" has been previously been sent.
--
-- This function does /not/ trap asynchronous exceptions.
waitResponse :: forall b. (Serializable b)
             => Maybe TimeInterval
             -> CallRef b
             -> Process (Maybe (Either ExitReason b))
waitResponse :: forall b.
Serializable b =>
Maybe TimeInterval
-> CallRef b -> Process (Maybe (Either ExitReason b))
waitResponse Maybe TimeInterval
mTimeout CallRef b
cRef =
  let (Recipient
_, CallId
mRef) = CallRef b -> (Recipient, CallId)
forall a. CallRef a -> (Recipient, CallId)
unCaller CallRef b
cRef
      matchers :: [Match (Either ExitReason b)]
matchers  = [ (CallResponse b -> Bool)
-> (CallResponse b -> Process (Either ExitReason b))
-> Match (Either ExitReason b)
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\((CallResponse b
_ CallId
ref) :: CallResponse b) -> CallId
ref CallId -> CallId -> Bool
forall a. Eq a => a -> a -> Bool
== CallId
mRef)
                            (\((CallResponse b
m CallId
_) :: CallResponse b) -> Either ExitReason b -> Process (Either ExitReason b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Either ExitReason b
forall a b. b -> Either a b
Right b
m))
                  , (CallRejected -> Bool)
-> (CallRejected -> Process (Either ExitReason b))
-> Match (Either ExitReason b)
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\((CallRejected String
_ CallId
ref)) -> CallId
ref CallId -> CallId -> Bool
forall a. Eq a => a -> a -> Bool
== CallId
mRef)
                            (\(CallRejected String
s CallId
_) -> Either ExitReason b -> Process (Either ExitReason b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ExitReason -> Either ExitReason b
forall a b. a -> Either a b
Left (ExitReason -> Either ExitReason b)
-> ExitReason -> Either ExitReason b
forall a b. (a -> b) -> a -> b
$ String -> ExitReason
ExitOther (String -> ExitReason) -> String -> ExitReason
forall a b. (a -> b) -> a -> b
$ String
s))
                  , (ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification -> Process (Either ExitReason b))
-> Match (Either ExitReason b)
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(ProcessMonitorNotification CallId
ref ProcessId
_ DiedReason
_) -> CallId
ref CallId -> CallId -> Bool
forall a. Eq a => a -> a -> Bool
== CallId
mRef)
                      (\(ProcessMonitorNotification CallId
_ ProcessId
_ DiedReason
r) -> Either ExitReason b -> Process (Either ExitReason b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ExitReason -> Either ExitReason b
forall a b. a -> Either a b
Left (DiedReason -> ExitReason
forall {a}. Show a => a -> ExitReason
err DiedReason
r)))
                  ]
      err :: a -> ExitReason
err a
r     = String -> ExitReason
ExitOther (String -> ExitReason) -> String -> ExitReason
forall a b. (a -> b) -> a -> b
$ a -> String
forall a. Show a => a -> String
show a
r in
    case Maybe TimeInterval
mTimeout of
      (Just TimeInterval
ti) -> Process (Maybe (Either ExitReason b))
-> Process () -> Process (Maybe (Either ExitReason b))
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
finally (Int
-> [Match (Either ExitReason b)]
-> Process (Maybe (Either ExitReason b))
forall b. Int -> [Match b] -> Process (Maybe b)
receiveTimeout (TimeInterval -> Int
asTimeout TimeInterval
ti) [Match (Either ExitReason b)]
matchers) (CallId -> Process ()
unmonitor CallId
mRef)
      Maybe TimeInterval
Nothing   -> Process (Maybe (Either ExitReason b))
-> Process () -> Process (Maybe (Either ExitReason b))
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
finally ((Either ExitReason b -> Maybe (Either ExitReason b))
-> Process (Either ExitReason b)
-> Process (Maybe (Either ExitReason b))
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Either ExitReason b -> Maybe (Either ExitReason b)
forall a. a -> Maybe a
Just ([Match (Either ExitReason b)] -> Process (Either ExitReason b)
forall b. [Match b] -> Process b
receiveWait [Match (Either ExitReason b)]
matchers)) (CallId -> Process ()
unmonitor CallId
mRef)