{-# LANGUAGE CPP  #-}
{-# LANGUAGE RankNTypes  #-}
{-# LANGUAGE DeriveDataTypeable  #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE ScopedTypeVariables  #-}
{-# LANGUAGE ExistentialQuantification  #-}
{-# LANGUAGE BangPatterns #-}

-- | Cloud Haskell primitives
--
-- We define these in a separate module so that we don't have to rely on
-- the closure combinators
module Control.Distributed.Process.Internal.Primitives
  ( -- * Basic messaging
    send
  , usend
  , expect
    -- * Channels
  , newChan
  , sendChan
  , receiveChan
  , mergePortsBiased
  , mergePortsRR
    -- * Unsafe messaging variants
  , unsafeSend
  , unsafeUSend
  , unsafeSendChan
  , unsafeNSend
  , unsafeNSendRemote
    -- * Advanced messaging
  , Match
  , receiveWait
  , receiveTimeout
  , match
  , matchIf
  , matchUnknown
  , matchAny
  , matchAnyIf
  , matchChan
  , matchSTM
  , matchMessage
  , matchMessageIf
  , isEncoded
  , wrapMessage
  , unsafeWrapMessage
  , unwrapMessage
  , handleMessage
  , handleMessageIf
  , handleMessage_
  , handleMessageIf_
  , forward
  , uforward
  , delegate
  , relay
  , proxy
    -- * Process management
  , terminate
  , ProcessTerminationException(..)
  , die
  , kill
  , exit
  , catchExit
  , catchesExit
    -- keep the exception constructor hidden, so that handling exit
    -- reasons /must/ take place via the 'catchExit' family of primitives
  , ProcessExitException()
  , getSelfPid
  , getSelfNode
  , ProcessInfo(..)
  , getProcessInfo
  , NodeStats(..)
  , getNodeStats
  , getLocalNodeStats
    -- * Monitoring and linking
  , link
  , unlink
  , monitor
  , unmonitor
  , unmonitorAsync
  , withMonitor
  , withMonitor_
    -- * Logging
  , SayMessage(..)
  , say
    -- * Registry
  , register
  , reregister
  , unregister
  , whereis
  , nsend
  , registerRemoteAsync
  , reregisterRemoteAsync
  , unregisterRemoteAsync
  , whereisRemoteAsync
  , nsendRemote
    -- * Closures
  , unClosure
  , unStatic
    -- * Exception handling
  , catch
  , Handler(..)
  , catches
  , try
  , mask
  , mask_
  , onException
  , bracket
  , bracket_
  , finally
    -- * Auxiliary API
  , expectTimeout
  , receiveChanTimeout
  , spawnAsync
  , linkNode
  , linkPort
  , unlinkNode
  , unlinkPort
  , monitorNode
  , monitorPort
    -- * Reconnecting
  , reconnect
  , reconnectPort
    -- * Internal Exports
  , sendCtrlMsg
  ) where

#if ! MIN_VERSION_base(4,6,0)
import Prelude hiding (catch)
#endif

import Data.Binary (Binary(..), Put, Get, decode)
import Data.Time.Clock (getCurrentTime, UTCTime(..))
import Data.Time.Calendar (Day(..))
import Data.Time.Format (formatTime)
#if MIN_VERSION_time(1,5,0)
import Data.Time.Format (defaultTimeLocale)
#else
import System.Locale (defaultTimeLocale)
#endif
import System.Timeout (timeout)
import Control.Monad (when, void)
import Control.Monad.Reader (ask)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Catch
  ( Exception
  , SomeException
  , throwM
  , fromException
  )
import qualified Control.Monad.Catch as Catch
import Control.Applicative
import Control.Distributed.Process.Internal.StrictMVar
  ( StrictMVar
  , modifyMVar
  , modifyMVar_
  )
import Control.Concurrent.STM
  ( STM
  , TVar
  , atomically
  , orElse
  , newTVar
  , readTVar
  , writeTVar
  )
import Control.Distributed.Process.Internal.CQueue
  ( dequeue
  , BlockSpec(..)
  , MatchOn(..)
  )
import Control.Distributed.Process.Serializable (Serializable, fingerprint)
import Data.Accessor ((^.), (^:), (^=))
import Control.Distributed.Static
  ( Static
  , Closure
  )
import Data.Rank1Typeable (Typeable)
import qualified Control.Distributed.Static as Static (unstatic, unclosure)
import qualified Control.Distributed.Process.UnsafePrimitives as Unsafe
import Control.Distributed.Process.Internal.Types
  ( NodeId(..)
  , ProcessId(..)
  , LocalNode(..)
  , LocalProcess(..)
  , Process(..)
  , Message(..)
  , MonitorRef(..)
  , SpawnRef(..)
  , ProcessSignal(..)
  , NodeMonitorNotification(..)
  , ProcessMonitorNotification(..)
  , monitorCounter
  , spawnCounter
  , SendPort(..)
  , ReceivePort(..)
  , channelCounter
  , typedChannelWithId
  , TypedChannel(..)
  , SendPortId(..)
  , Identifier(..)
  , ProcessExitException(..)
  , DiedReason(..)
  , DidUnmonitor(..)
  , DidUnlinkProcess(..)
  , DidUnlinkNode(..)
  , DidUnlinkPort(..)
  , WhereIsReply(..)
  , RegisterReply(..)
  , ProcessRegistrationException(..)
  , ProcessInfo(..)
  , ProcessInfoNone(..)
  , NodeStats(..)
  , isEncoded
  , createMessage
  , createUnencodedMessage
  , ImplicitReconnect( NoImplicitReconnect)
  , LocalProcessState
  , LocalSendPortId
  , messageToPayload
  )
import Control.Distributed.Process.Internal.Messaging
  ( sendMessage
  , sendBinary
  , sendPayload
  , disconnect
  , sendCtrlMsg
  )
import Control.Distributed.Process.Management.Internal.Types
  ( MxEvent(..)
  )
import Control.Distributed.Process.Management.Internal.Trace.Types
  ( traceEvent
  )
import Control.Distributed.Process.Internal.WeakTQueue
  ( newTQueueIO
  , readTQueue
  , mkWeakTQueue
  )
import Prelude

import Unsafe.Coerce

--------------------------------------------------------------------------------
-- Basic messaging                                                            --
--------------------------------------------------------------------------------

-- | Send a message
send :: Serializable a => ProcessId -> a -> Process ()
-- This requires a lookup on every send. If we want to avoid that we need to
-- modify serializable to allow for stateful (IO) deserialization.
send :: forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
them a
msg = do
  LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  let us :: ProcessId
us       = LocalProcess -> ProcessId
processId LocalProcess
proc
      node :: LocalNode
node     = LocalProcess -> LocalNode
processNode LocalProcess
proc
      nodeId :: NodeId
nodeId   = LocalNode -> NodeId
localNodeId LocalNode
node
      destNode :: NodeId
destNode = (ProcessId -> NodeId
processNodeId ProcessId
them)
  IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node)
                      (ProcessId -> ProcessId -> Message -> MxEvent
MxSent ProcessId
them ProcessId
us (a -> Message
forall a. Serializable a => a -> Message
createUnencodedMessage a
msg))
  if NodeId
destNode NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
nodeId
    then ProcessId -> a -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
sendLocal ProcessId
them a
msg
    else IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
forall a.
Serializable a =>
LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
sendMessage (LocalProcess -> LocalNode
processNode LocalProcess
proc)
                              (ProcessId -> Identifier
ProcessIdentifier (LocalProcess -> ProcessId
processId LocalProcess
proc))
                              (ProcessId -> Identifier
ProcessIdentifier ProcessId
them)
                              ImplicitReconnect
NoImplicitReconnect
                              a
msg

-- | /Unsafe/ variant of 'send'. This function makes /no/ attempt to serialize
-- and (in the case when the destination process resides on the same local
-- node) therefore ensure that the payload is fully evaluated before it is
-- delivered.
unsafeSend :: Serializable a => ProcessId -> a -> Process ()
unsafeSend :: forall a. Serializable a => ProcessId -> a -> Process ()
unsafeSend = ProcessId -> a -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
Unsafe.send

-- | Send a message unreliably.
--
-- Unlike 'send', this function is insensitive to 'reconnect'. It will
-- try to send the message regardless of the history of connection failures
-- between the nodes.
--
-- Message passing with 'usend' is ordered for a given sender and receiver
-- if the messages arrive at all.
--
usend :: Serializable a => ProcessId -> a -> Process ()
usend :: forall a. Serializable a => ProcessId -> a -> Process ()
usend ProcessId
them a
msg = do
    LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
    let there :: NodeId
there = ProcessId -> NodeId
processNodeId ProcessId
them
    let (ProcessId
us, LocalNode
node) = (LocalProcess -> ProcessId
processId LocalProcess
proc, LocalProcess -> LocalNode
processNode LocalProcess
proc)
    let msg' :: Message
msg' = a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
msg
    IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node) (ProcessId -> ProcessId -> Message -> MxEvent
MxSent ProcessId
them ProcessId
us Message
msg')
    if LocalNode -> NodeId
localNodeId (LocalProcess -> LocalNode
processNode LocalProcess
proc) NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
there
      then ProcessId -> a -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
sendLocal ProcessId
them a
msg
      else Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
there) (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalProcessId -> Message -> ProcessSignal
UnreliableSend (ProcessId -> LocalProcessId
processLocalId ProcessId
them)
                                                     (a -> Message
forall a. Serializable a => a -> Message
createMessage a
msg)

-- | /Unsafe/ variant of 'usend'. This function makes /no/ attempt to serialize
-- the message when the destination process resides on the same local
-- node. Therefore, a local receiver would need to be prepared to cope with any
-- errors resulting from evaluation of the message.
unsafeUSend :: Serializable a => ProcessId -> a -> Process ()
unsafeUSend :: forall a. Serializable a => ProcessId -> a -> Process ()
unsafeUSend = ProcessId -> a -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
Unsafe.usend

-- | Wait for a message of a specific type
expect :: forall a. Serializable a => Process a
expect :: forall a. Serializable a => Process a
expect = [Match a] -> Process a
forall b. [Match b] -> Process b
receiveWait [(a -> Process a) -> Match a
forall a b. Serializable a => (a -> Process b) -> Match b
match a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return]

--------------------------------------------------------------------------------
-- Channels                                                                   --
--------------------------------------------------------------------------------

-- | Create a new typed channel, bound to the calling Process
--
-- Note that the channel is bound to the lifecycle of the process that evaluates
-- this function, such that when it dies/exits, the channel will no longer
-- function, but will remain accessible. Thus reading from the ReceivePort will
-- fail silently thereafter, blocking indefinitely (unless a timeout is used).
--
newChan :: Serializable a => Process (SendPort a, ReceivePort a)
newChan :: forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan = do
    LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
    IO (SendPort a, ReceivePort a)
-> Process (SendPort a, ReceivePort a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SendPort a, ReceivePort a)
 -> Process (SendPort a, ReceivePort a))
-> ((LocalProcessState
     -> IO (LocalProcessState, (SendPort a, ReceivePort a)))
    -> IO (SendPort a, ReceivePort a))
-> (LocalProcessState
    -> IO (LocalProcessState, (SendPort a, ReceivePort a)))
-> Process (SendPort a, ReceivePort a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StrictMVar LocalProcessState
-> (LocalProcessState
    -> IO (LocalProcessState, (SendPort a, ReceivePort a)))
-> IO (SendPort a, ReceivePort a)
forall a b. StrictMVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (LocalProcess -> StrictMVar LocalProcessState
processState LocalProcess
proc) ((LocalProcessState
  -> IO (LocalProcessState, (SendPort a, ReceivePort a)))
 -> Process (SendPort a, ReceivePort a))
-> (LocalProcessState
    -> IO (LocalProcessState, (SendPort a, ReceivePort a)))
-> Process (SendPort a, ReceivePort a)
forall a b. (a -> b) -> a -> b
$ \LocalProcessState
st -> do
      let lcid :: Int32
lcid  = LocalProcessState
st LocalProcessState -> T LocalProcessState Int32 -> Int32
forall r a. r -> T r a -> a
^. T LocalProcessState Int32
channelCounter
      let cid :: SendPortId
cid   = SendPortId { sendPortProcessId :: ProcessId
sendPortProcessId = LocalProcess -> ProcessId
processId LocalProcess
proc
                             , sendPortLocalId :: Int32
sendPortLocalId   = Int32
lcid
                             }
      let sport :: SendPort a
sport = SendPortId -> SendPort a
forall a. SendPortId -> SendPort a
SendPort SendPortId
cid
      TQueue a
chan  <- IO (TQueue a) -> IO (TQueue a)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TQueue a)
forall a. IO (TQueue a)
newTQueueIO
      Weak (TQueue a)
chan' <- TQueue a -> IO () -> IO (Weak (TQueue a))
forall a. TQueue a -> IO () -> IO (Weak (TQueue a))
mkWeakTQueue TQueue a
chan (IO () -> IO (Weak (TQueue a))) -> IO () -> IO (Weak (TQueue a))
forall a b. (a -> b) -> a -> b
$ StrictMVar LocalProcessState -> Int32 -> IO ()
finalizer (LocalProcess -> StrictMVar LocalProcessState
processState LocalProcess
proc) Int32
lcid
      let rport :: ReceivePort a
rport = STM a -> ReceivePort a
forall a. STM a -> ReceivePort a
ReceivePort (STM a -> ReceivePort a) -> STM a -> ReceivePort a
forall a b. (a -> b) -> a -> b
$ TQueue a -> STM a
forall a. TQueue a -> STM a
readTQueue TQueue a
chan
      let tch :: TypedChannel
tch   = Weak (TQueue a) -> TypedChannel
forall a. Serializable a => Weak (TQueue a) -> TypedChannel
TypedChannel Weak (TQueue a)
chan'
      (LocalProcessState, (SendPort a, ReceivePort a))
-> IO (LocalProcessState, (SendPort a, ReceivePort a))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ( (T LocalProcessState Int32
channelCounter T LocalProcessState Int32
-> (Int32 -> Int32) -> LocalProcessState -> LocalProcessState
forall r a. T r a -> (a -> a) -> r -> r
^: (Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
1))
             (LocalProcessState -> LocalProcessState)
-> (LocalProcessState -> LocalProcessState)
-> LocalProcessState
-> LocalProcessState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int32 -> Accessor LocalProcessState (Maybe TypedChannel)
typedChannelWithId Int32
lcid Accessor LocalProcessState (Maybe TypedChannel)
-> Maybe TypedChannel -> LocalProcessState -> LocalProcessState
forall r a. T r a -> a -> r -> r
^= TypedChannel -> Maybe TypedChannel
forall a. a -> Maybe a
Just TypedChannel
tch)
             (LocalProcessState -> LocalProcessState)
-> LocalProcessState -> LocalProcessState
forall a b. (a -> b) -> a -> b
$ LocalProcessState
st
             , (SendPort a
forall {a}. SendPort a
sport, ReceivePort a
rport)
             )
  where
    finalizer :: StrictMVar LocalProcessState -> LocalSendPortId -> IO ()
    finalizer :: StrictMVar LocalProcessState -> Int32 -> IO ()
finalizer StrictMVar LocalProcessState
st Int32
lcid = StrictMVar LocalProcessState
-> (LocalProcessState -> IO LocalProcessState) -> IO ()
forall a. StrictMVar a -> (a -> IO a) -> IO ()
modifyMVar_ StrictMVar LocalProcessState
st ((LocalProcessState -> IO LocalProcessState) -> IO ())
-> (LocalProcessState -> IO LocalProcessState) -> IO ()
forall a b. (a -> b) -> a -> b
$
      LocalProcessState -> IO LocalProcessState
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (LocalProcessState -> IO LocalProcessState)
-> (LocalProcessState -> LocalProcessState)
-> LocalProcessState
-> IO LocalProcessState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int32 -> Accessor LocalProcessState (Maybe TypedChannel)
typedChannelWithId Int32
lcid Accessor LocalProcessState (Maybe TypedChannel)
-> Maybe TypedChannel -> LocalProcessState -> LocalProcessState
forall r a. T r a -> a -> r -> r
^= Maybe TypedChannel
forall a. Maybe a
Nothing)

-- | Send a message on a typed channel
sendChan :: Serializable a => SendPort a -> a -> Process ()
sendChan :: forall a. Serializable a => SendPort a -> a -> Process ()
sendChan (SendPort SendPortId
cid) a
msg = do
  LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  let node :: LocalNode
node = LocalProcess -> LocalNode
processNode LocalProcess
proc
      pid :: ProcessId
pid  = LocalProcess -> ProcessId
processId LocalProcess
proc
      us :: NodeId
us   = LocalNode -> NodeId
localNodeId LocalNode
node
      them :: NodeId
them = ProcessId -> NodeId
processNodeId (SendPortId -> ProcessId
sendPortProcessId SendPortId
cid)
  IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node) (ProcessId -> SendPortId -> Message -> MxEvent
MxSentToPort ProcessId
pid SendPortId
cid (Message -> MxEvent) -> Message -> MxEvent
forall a b. (a -> b) -> a -> b
$ a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
msg)
  case NodeId
them NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
us of
    Bool
True  -> SendPortId -> a -> Process ()
forall a. Serializable a => SendPortId -> a -> Process ()
sendChanLocal SendPortId
cid a
msg
    Bool
False -> do
      IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
forall a.
Binary a =>
LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
sendBinary LocalNode
node
                          (ProcessId -> Identifier
ProcessIdentifier ProcessId
pid)
                          (SendPortId -> Identifier
SendPortIdentifier SendPortId
cid)
                          ImplicitReconnect
NoImplicitReconnect
                          a
msg

-- | Send a message on a typed channel. This function makes /no/ attempt to
-- serialize and (in the case when the @ReceivePort@ resides on the same local
-- node) therefore ensure that the payload is fully evaluated before it is
-- delivered.
unsafeSendChan :: Serializable a => SendPort a -> a -> Process ()
unsafeSendChan :: forall a. Serializable a => SendPort a -> a -> Process ()
unsafeSendChan = SendPort a -> a -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
Unsafe.sendChan

-- | Wait for a message on a typed channel
receiveChan :: Serializable a => ReceivePort a -> Process a
receiveChan :: forall a. Serializable a => ReceivePort a -> Process a
receiveChan = IO a -> Process a
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> Process a)
-> (ReceivePort a -> IO a) -> ReceivePort a -> Process a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall a. STM a -> IO a
atomically (STM a -> IO a)
-> (ReceivePort a -> STM a) -> ReceivePort a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ReceivePort a -> STM a
forall a. ReceivePort a -> STM a
receiveSTM

-- | Like 'receiveChan' but with a timeout. If the timeout is 0, do a
-- non-blocking check for a message.
receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout :: forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
0 ReceivePort a
ch = IO (Maybe a) -> Process (Maybe a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe a) -> Process (Maybe a))
-> (STM (Maybe a) -> IO (Maybe a))
-> STM (Maybe a)
-> Process (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (STM (Maybe a) -> Process (Maybe a))
-> STM (Maybe a) -> Process (Maybe a)
forall a b. (a -> b) -> a -> b
$
  (a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> STM a -> STM (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ReceivePort a -> STM a
forall a. ReceivePort a -> STM a
receiveSTM ReceivePort a
ch) STM (Maybe a) -> STM (Maybe a) -> STM (Maybe a)
forall a. STM a -> STM a -> STM a
`orElse` Maybe a -> STM (Maybe a)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
receiveChanTimeout Int
n ReceivePort a
ch = IO (Maybe a) -> Process (Maybe a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe a) -> Process (Maybe a))
-> (STM a -> IO (Maybe a)) -> STM a -> Process (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO a -> IO (Maybe a)
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
n (IO a -> IO (Maybe a)) -> (STM a -> IO a) -> STM a -> IO (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall a. STM a -> IO a
atomically (STM a -> Process (Maybe a)) -> STM a -> Process (Maybe a)
forall a b. (a -> b) -> a -> b
$
  ReceivePort a -> STM a
forall a. ReceivePort a -> STM a
receiveSTM ReceivePort a
ch

-- | Merge a list of typed channels.
--
-- The result port is left-biased: if there are messages available on more
-- than one port, the first available message is returned.
mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsBiased :: forall a.
Serializable a =>
[ReceivePort a] -> Process (ReceivePort a)
mergePortsBiased = ReceivePort a -> Process (ReceivePort a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ReceivePort a -> Process (ReceivePort a))
-> ([ReceivePort a] -> ReceivePort a)
-> [ReceivePort a]
-> Process (ReceivePort a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> ReceivePort a
forall a. STM a -> ReceivePort a
ReceivePort(STM a -> ReceivePort a)
-> ([ReceivePort a] -> STM a) -> [ReceivePort a] -> ReceivePort a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (STM a -> STM a -> STM a) -> [STM a] -> STM a
forall a. (a -> a -> a) -> [a] -> a
forall (t :: * -> *) a. Foldable t => (a -> a -> a) -> t a -> a
foldr1 STM a -> STM a -> STM a
forall a. STM a -> STM a -> STM a
orElse ([STM a] -> STM a)
-> ([ReceivePort a] -> [STM a]) -> [ReceivePort a] -> STM a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ReceivePort a -> STM a) -> [ReceivePort a] -> [STM a]
forall a b. (a -> b) -> [a] -> [b]
map ReceivePort a -> STM a
forall a. ReceivePort a -> STM a
receiveSTM

-- | Like 'mergePortsBiased', but with a round-robin scheduler (rather than
-- left-biased)
mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsRR :: forall a.
Serializable a =>
[ReceivePort a] -> Process (ReceivePort a)
mergePortsRR = \[ReceivePort a]
ps -> do
    TVar [STM a]
psVar <- IO (TVar [STM a]) -> Process (TVar [STM a])
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TVar [STM a]) -> Process (TVar [STM a]))
-> (STM (TVar [STM a]) -> IO (TVar [STM a]))
-> STM (TVar [STM a])
-> Process (TVar [STM a])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (TVar [STM a]) -> IO (TVar [STM a])
forall a. STM a -> IO a
atomically (STM (TVar [STM a]) -> Process (TVar [STM a]))
-> STM (TVar [STM a]) -> Process (TVar [STM a])
forall a b. (a -> b) -> a -> b
$ [STM a] -> STM (TVar [STM a])
forall a. a -> STM (TVar a)
newTVar ((ReceivePort a -> STM a) -> [ReceivePort a] -> [STM a]
forall a b. (a -> b) -> [a] -> [b]
map ReceivePort a -> STM a
forall a. ReceivePort a -> STM a
receiveSTM [ReceivePort a]
ps)
    ReceivePort a -> Process (ReceivePort a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ReceivePort a -> Process (ReceivePort a))
-> ReceivePort a -> Process (ReceivePort a)
forall a b. (a -> b) -> a -> b
$ STM a -> ReceivePort a
forall a. STM a -> ReceivePort a
ReceivePort (TVar [STM a] -> STM a
forall a. TVar [STM a] -> STM a
rr TVar [STM a]
psVar)
  where
    rotate :: [a] -> [a]
    rotate :: forall a. [a] -> [a]
rotate []     = []
    rotate (a
x:[a]
xs) = [a]
xs [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ [a
x]

    rr :: TVar [STM a] -> STM a
    rr :: forall a. TVar [STM a] -> STM a
rr TVar [STM a]
psVar = do
      [STM a]
ps <- TVar [STM a] -> STM [STM a]
forall a. TVar a -> STM a
readTVar TVar [STM a]
psVar
      a
a  <- (STM a -> STM a -> STM a) -> [STM a] -> STM a
forall a. (a -> a -> a) -> [a] -> a
forall (t :: * -> *) a. Foldable t => (a -> a -> a) -> t a -> a
foldr1 STM a -> STM a -> STM a
forall a. STM a -> STM a -> STM a
orElse [STM a]
ps
      TVar [STM a] -> [STM a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [STM a]
psVar ([STM a] -> [STM a]
forall a. [a] -> [a]
rotate [STM a]
ps)
      a -> STM a
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a

--------------------------------------------------------------------------------
-- Advanced messaging                                                         --
--------------------------------------------------------------------------------

-- | Opaque type used in 'receiveWait' and 'receiveTimeout'
newtype Match b = Match { forall b. Match b -> MatchOn Message (Process b)
unMatch :: MatchOn Message (Process b) }
                  deriving ((forall a b. (a -> b) -> Match a -> Match b)
-> (forall a b. a -> Match b -> Match a) -> Functor Match
forall a b. a -> Match b -> Match a
forall a b. (a -> b) -> Match a -> Match b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall a b. (a -> b) -> Match a -> Match b
fmap :: forall a b. (a -> b) -> Match a -> Match b
$c<$ :: forall a b. a -> Match b -> Match a
<$ :: forall a b. a -> Match b -> Match a
Functor)

-- | Test the matches in order against each message in the queue
receiveWait :: [Match b] -> Process b
receiveWait :: forall b. [Match b] -> Process b
receiveWait [Match b]
ms = do
  CQueue Message
queue <- LocalProcess -> CQueue Message
processQueue (LocalProcess -> CQueue Message)
-> Process LocalProcess -> Process (CQueue Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  Maybe (Process b)
mProc <- IO (Maybe (Process b)) -> Process (Maybe (Process b))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Process b)) -> Process (Maybe (Process b)))
-> IO (Maybe (Process b)) -> Process (Maybe (Process b))
forall a b. (a -> b) -> a -> b
$ CQueue Message
-> BlockSpec
-> [MatchOn Message (Process b)]
-> IO (Maybe (Process b))
forall m a. CQueue m -> BlockSpec -> [MatchOn m a] -> IO (Maybe a)
dequeue CQueue Message
queue BlockSpec
Blocking ((Match b -> MatchOn Message (Process b))
-> [Match b] -> [MatchOn Message (Process b)]
forall a b. (a -> b) -> [a] -> [b]
map Match b -> MatchOn Message (Process b)
forall b. Match b -> MatchOn Message (Process b)
unMatch [Match b]
ms)
  case Maybe (Process b)
mProc of
    Just Process b
proc' -> Process b
proc'
    Maybe (Process b)
Nothing    -> String -> Process b
forall a b. Serializable a => a -> Process b
die (String -> Process b) -> String -> Process b
forall a b. (a -> b) -> a -> b
$ String
"System Invariant Violation: CQueue.hs returned `Nothing` "
                     String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"in the absence of a timeout value."

-- | Like 'receiveWait' but with a timeout.
--
-- If the timeout is zero do a non-blocking check for matching messages. A
-- non-zero timeout is applied only when waiting for incoming messages (that is,
-- /after/ we have checked the messages that are already in the mailbox).
receiveTimeout :: Int -> [Match b] -> Process (Maybe b)
receiveTimeout :: forall b. Int -> [Match b] -> Process (Maybe b)
receiveTimeout Int
t [Match b]
ms = do
  CQueue Message
queue <- LocalProcess -> CQueue Message
processQueue (LocalProcess -> CQueue Message)
-> Process LocalProcess -> Process (CQueue Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  let blockSpec :: BlockSpec
blockSpec = if Int
t Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 then BlockSpec
NonBlocking else Int -> BlockSpec
Timeout Int
t
  Maybe (Process b)
mProc <- IO (Maybe (Process b)) -> Process (Maybe (Process b))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Process b)) -> Process (Maybe (Process b)))
-> IO (Maybe (Process b)) -> Process (Maybe (Process b))
forall a b. (a -> b) -> a -> b
$ CQueue Message
-> BlockSpec
-> [MatchOn Message (Process b)]
-> IO (Maybe (Process b))
forall m a. CQueue m -> BlockSpec -> [MatchOn m a] -> IO (Maybe a)
dequeue CQueue Message
queue BlockSpec
blockSpec ((Match b -> MatchOn Message (Process b))
-> [Match b] -> [MatchOn Message (Process b)]
forall a b. (a -> b) -> [a] -> [b]
map Match b -> MatchOn Message (Process b)
forall b. Match b -> MatchOn Message (Process b)
unMatch [Match b]
ms)
  case Maybe (Process b)
mProc of
    Maybe (Process b)
Nothing   -> Maybe b -> Process (Maybe b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
    Just Process b
proc -> b -> Maybe b
forall a. a -> Maybe a
Just (b -> Maybe b) -> Process b -> Process (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process b
proc

-- | Match on a typed channel
matchChan :: ReceivePort a -> (a -> Process b) -> Match b
matchChan :: forall a b. ReceivePort a -> (a -> Process b) -> Match b
matchChan ReceivePort a
p a -> Process b
fn = MatchOn Message (Process b) -> Match b
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process b) -> Match b)
-> MatchOn Message (Process b) -> Match b
forall a b. (a -> b) -> a -> b
$ STM (Process b) -> MatchOn Message (Process b)
forall m a. STM a -> MatchOn m a
MatchChan ((a -> Process b) -> STM a -> STM (Process b)
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Process b
fn (ReceivePort a -> STM a
forall a. ReceivePort a -> STM a
receiveSTM ReceivePort a
p))

-- | Match on an arbitrary STM action.
--
-- This rather unusaul /match primitive/ allows us to compose arbitrary STM
-- actions with checks against our process' mailbox and/or any typed channel
-- @ReceivePort@s we may hold.
--
-- This allows us to process multiple input streams /along with our mailbox/,
-- in just the same way that 'matchChan' supports checking both the mailbox
-- /and/ an arbitrary set of typed channels in one atomic transaction.
--
-- Note there are no ordering guarnatees with respect to these disparate input
-- sources.
--
matchSTM :: STM a -> (a -> Process b) -> Match b
matchSTM :: forall a b. STM a -> (a -> Process b) -> Match b
matchSTM STM a
stm a -> Process b
fn = MatchOn Message (Process b) -> Match b
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process b) -> Match b)
-> MatchOn Message (Process b) -> Match b
forall a b. (a -> b) -> a -> b
$ STM (Process b) -> MatchOn Message (Process b)
forall m a. STM a -> MatchOn m a
MatchChan ((a -> Process b) -> STM a -> STM (Process b)
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Process b
fn STM a
stm)

-- | Match against any message of the right type
match :: forall a b. Serializable a => (a -> Process b) -> Match b
match :: forall a b. Serializable a => (a -> Process b) -> Match b
match = (a -> Bool) -> (a -> Process b) -> Match b
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (Bool -> a -> Bool
forall a b. a -> b -> a
const Bool
True)

-- | Match against any message of the right type that satisfies a predicate
matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
matchIf :: forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf a -> Bool
c a -> Process b
p = MatchOn Message (Process b) -> Match b
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process b) -> Match b)
-> MatchOn Message (Process b) -> Match b
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall m a. (m -> Maybe a) -> MatchOn m a
MatchMsg ((Message -> Maybe (Process b)) -> MatchOn Message (Process b))
-> (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall a b. (a -> b) -> a -> b
$ \Message
msg ->
  case Message -> Fingerprint
messageFingerprint Message
msg Fingerprint -> Fingerprint -> Bool
forall a. Eq a => a -> a -> Bool
== a -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint (a
forall a. HasCallStack => a
undefined :: a) of
    Bool
False -> Maybe (Process b)
forall a. Maybe a
Nothing
    Bool
True  -> case Message
msg of
      (UnencodedMessage Fingerprint
_ a
m) ->
        let m' :: a
m' = a -> a
forall a b. a -> b
unsafeCoerce a
m :: a in
        case (a -> Bool
c a
m') of
          Bool
True  -> Process b -> Maybe (Process b)
forall a. a -> Maybe a
Just (a -> Process b
p a
m')
          Bool
False -> Maybe (Process b)
forall a. Maybe a
Nothing
      (EncodedMessage Fingerprint
_ ByteString
_) ->
        if (a -> Bool
c a
decoded) then Process b -> Maybe (Process b)
forall a. a -> Maybe a
Just (a -> Process b
p a
decoded) else Maybe (Process b)
forall a. Maybe a
Nothing
        where
          decoded :: a
            -- Make sure the value is fully decoded so that we don't hang to
            -- bytestrings when the process calling 'matchIf' doesn't process
            -- the values immediately
          !decoded :: a
decoded = ByteString -> a
forall a. Binary a => ByteString -> a
decode (Message -> ByteString
messageEncoding Message
msg)

-- | Match against any message, regardless of the underlying (contained) type
matchMessage :: (Message -> Process Message) -> Match Message
matchMessage :: (Message -> Process Message) -> Match Message
matchMessage Message -> Process Message
p = MatchOn Message (Process Message) -> Match Message
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process Message) -> Match Message)
-> MatchOn Message (Process Message) -> Match Message
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe (Process Message))
-> MatchOn Message (Process Message)
forall m a. (m -> Maybe a) -> MatchOn m a
MatchMsg ((Message -> Maybe (Process Message))
 -> MatchOn Message (Process Message))
-> (Message -> Maybe (Process Message))
-> MatchOn Message (Process Message)
forall a b. (a -> b) -> a -> b
$ \Message
msg -> Process Message -> Maybe (Process Message)
forall a. a -> Maybe a
Just (Message -> Process Message
p Message
msg)

-- | Match against any message (regardless of underlying type) that satisfies a predicate
matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message
matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message
matchMessageIf Message -> Bool
c Message -> Process Message
p = MatchOn Message (Process Message) -> Match Message
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process Message) -> Match Message)
-> MatchOn Message (Process Message) -> Match Message
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe (Process Message))
-> MatchOn Message (Process Message)
forall m a. (m -> Maybe a) -> MatchOn m a
MatchMsg ((Message -> Maybe (Process Message))
 -> MatchOn Message (Process Message))
-> (Message -> Maybe (Process Message))
-> MatchOn Message (Process Message)
forall a b. (a -> b) -> a -> b
$ \Message
msg ->
    case (Message -> Bool
c Message
msg) of
      Bool
True  -> Process Message -> Maybe (Process Message)
forall a. a -> Maybe a
Just (Message -> Process Message
p Message
msg)
      Bool
False -> Maybe (Process Message)
forall a. Maybe a
Nothing

-- | Forward a raw 'Message' to the given 'ProcessId'.
forward :: Message -> ProcessId -> Process ()
forward :: Message -> ProcessId -> Process ()
forward Message
msg ProcessId
them = do
  LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  let node :: LocalNode
node     = LocalProcess -> LocalNode
processNode LocalProcess
proc
      us :: ProcessId
us       = LocalProcess -> ProcessId
processId LocalProcess
proc
      nid :: NodeId
nid      = LocalNode -> NodeId
localNodeId LocalNode
node
      destNode :: NodeId
destNode = (ProcessId -> NodeId
processNodeId ProcessId
them)
  IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node) (ProcessId -> ProcessId -> Message -> MxEvent
MxSent ProcessId
them ProcessId
us Message
msg)
  if NodeId
destNode NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
nid
    then Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessId -> Message -> ProcessSignal
LocalSend ProcessId
them Message
msg)
    else IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalNode
-> Identifier
-> Identifier
-> ImplicitReconnect
-> [ByteString]
-> IO ()
sendPayload (LocalProcess -> LocalNode
processNode LocalProcess
proc)
                              (ProcessId -> Identifier
ProcessIdentifier (LocalProcess -> ProcessId
processId LocalProcess
proc))
                              (ProcessId -> Identifier
ProcessIdentifier ProcessId
them)
                              ImplicitReconnect
NoImplicitReconnect
                              (Message -> [ByteString]
messageToPayload Message
msg)

-- | Forward a raw 'Message' to the given 'ProcessId'.
--
-- Unlike 'forward', this function is insensitive to 'reconnect'. It will
-- try to send the message regardless of the history of connection failures
-- between the nodes.
uforward :: Message -> ProcessId -> Process ()
uforward :: Message -> ProcessId -> Process ()
uforward Message
msg ProcessId
them = do
  LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  let node :: LocalNode
node     = LocalProcess -> LocalNode
processNode LocalProcess
proc
      us :: ProcessId
us       = LocalProcess -> ProcessId
processId LocalProcess
proc
      nid :: NodeId
nid      = LocalNode -> NodeId
localNodeId LocalNode
node
      destNode :: NodeId
destNode = (ProcessId -> NodeId
processNodeId ProcessId
them)
  IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node) (ProcessId -> ProcessId -> Message -> MxEvent
MxSent ProcessId
them ProcessId
us Message
msg)
  if NodeId
destNode NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
nid
    then Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessId -> Message -> ProcessSignal
LocalSend ProcessId
them Message
msg)
    else Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
destNode) (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalProcessId -> Message -> ProcessSignal
UnreliableSend (ProcessId -> LocalProcessId
processLocalId ProcessId
them) Message
msg

-- | Wrap a 'Serializable' value in a 'Message'. Note that 'Message's are
-- 'Serializable' - like the datum they contain - but also note, deserialising
-- such a 'Message' will yield a 'Message', not the type within it! To obtain the
-- wrapped datum, use 'unwrapMessage' or 'handleMessage' with a specific type.
--
-- @
-- do
--    self <- getSelfPid
--    send self (wrapMessage "blah")
--    Nothing  <- expectTimeout 1000000 :: Process (Maybe String)
--    (Just m) <- expectTimeout 1000000 :: Process (Maybe Message)
--    (Just "blah") <- unwrapMessage m :: Process (Maybe String)
-- @
--
wrapMessage :: Serializable a => a -> Message
wrapMessage :: forall a. Serializable a => a -> Message
wrapMessage = a -> Message
forall a. Serializable a => a -> Message
createUnencodedMessage -- see [note Serializable UnencodedMessage]

-- | This is the /unsafe/ variant of 'wrapMessage'. See
-- "Control.Distributed.Process.UnsafePrimitives" for details.
unsafeWrapMessage :: Serializable a => a -> Message
unsafeWrapMessage :: forall a. Serializable a => a -> Message
unsafeWrapMessage = a -> Message
forall a. Serializable a => a -> Message
Unsafe.wrapMessage

-- [note Serializable UnencodedMessage]
-- if we attempt to serialise an UnencodedMessage, it will be converted to an
-- EncodedMessage first, so we're ok here. See 'messageToPayload' in
-- Primitives.hs for the gory details

-- | Attempt to unwrap a raw 'Message'.
-- If the type of the decoded message payload matches the expected type, the
-- value will be returned with @Just@, otherwise @Nothing@ indicates the types
-- do not match.
--
-- This expression, for example, will evaluate to @Nothing@
-- > unwrapMessage (wrapMessage "foobar") :: Process (Maybe Int)
--
-- Whereas this expression, will yield @Just "foo"@
-- > unwrapMessage (wrapMessage "foo") :: Process (Maybe String)
--
unwrapMessage :: forall m a. (Monad m, Serializable a) => Message -> m (Maybe a)
unwrapMessage :: forall (m :: * -> *) a.
(Monad m, Serializable a) =>
Message -> m (Maybe a)
unwrapMessage Message
msg =
  case Message -> Fingerprint
messageFingerprint Message
msg Fingerprint -> Fingerprint -> Bool
forall a. Eq a => a -> a -> Bool
== a -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint (a
forall a. HasCallStack => a
undefined :: a) of
    Bool
False -> Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing :: m (Maybe a)
    Bool
True  -> case Message
msg of
      (UnencodedMessage Fingerprint
_ a
ms) ->
        let ms' :: a
ms' = a -> a
forall a b. a -> b
unsafeCoerce a
ms :: a
        in Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
ms')
      (EncodedMessage Fingerprint
_ ByteString
_) ->
        Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just (a
decoded))
        where
          decoded :: a -- note [decoding]
          !decoded :: a
decoded = ByteString -> a
forall a. Binary a => ByteString -> a
decode (Message -> ByteString
messageEncoding Message
msg)

-- | Attempt to handle a raw 'Message'.
-- If the type of the message matches the type of the first argument to
-- the supplied expression, then the message will be decoded and the expression
-- evaluated against its value. If this runtime type checking fails however,
-- @Nothing@ will be returned to indicate the fact. If the check succeeds and
-- evaluation proceeds, the resulting value with be wrapped with @Just@.
--
-- Intended for use in `catchesExit` and `matchAny` primitives.
--
handleMessage :: forall m a b. (Monad m, Serializable a)
              => Message -> (a -> m b) -> m (Maybe b)
handleMessage :: forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
msg a -> m b
proc = Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b)
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b)
handleMessageIf Message
msg (Bool -> a -> Bool
forall a b. a -> b -> a
const Bool
True) a -> m b
proc

-- | Conditionally handle a raw 'Message'.
-- If the predicate @(a -> Bool)@ evaluates to @True@, invokes the supplied
-- handler, other returns @Nothing@ to indicate failure. See 'handleMessage'
-- for further information about runtime type checking.
--
handleMessageIf :: forall m a b . (Monad m, Serializable a)
                => Message
                -> (a -> Bool)
                -> (a -> m b)
                -> m (Maybe b)
handleMessageIf :: forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b)
handleMessageIf Message
msg a -> Bool
c a -> m b
proc = do
  case Message -> Fingerprint
messageFingerprint Message
msg Fingerprint -> Fingerprint -> Bool
forall a. Eq a => a -> a -> Bool
== a -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint (a
forall a. HasCallStack => a
undefined :: a) of
    Bool
False -> Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing :: m (Maybe b)
    Bool
True  -> case Message
msg of
      (UnencodedMessage Fingerprint
_ a
ms) ->
        let ms' :: a
ms' = a -> a
forall a b. a -> b
unsafeCoerce a
ms :: a in
        case (a -> Bool
c a
ms') of
          Bool
True  -> do { b
r <- a -> m b
proc a
ms'; Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Maybe b
forall a. a -> Maybe a
Just b
r) }
          Bool
False -> Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing :: m (Maybe b)
      (EncodedMessage Fingerprint
_ ByteString
_) ->
        case (a -> Bool
c a
decoded) of
          Bool
True  -> do { b
r <- a -> m b
proc (a
decoded :: a); Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Maybe b
forall a. a -> Maybe a
Just b
r) }
          Bool
False -> Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing :: m (Maybe b)
        where
          decoded :: a -- note [decoding]
          !decoded :: a
decoded = ByteString -> a
forall a. Binary a => ByteString -> a
decode (Message -> ByteString
messageEncoding Message
msg)

-- | As 'handleMessage' but ignores result, which is useful if you don't
-- care whether or not the handler succeeded.
--
handleMessage_ :: forall m a . (Monad m, Serializable a)
               => Message -> (a -> m ()) -> m ()
handleMessage_ :: forall (m :: * -> *) a.
(Monad m, Serializable a) =>
Message -> (a -> m ()) -> m ()
handleMessage_ Message
msg a -> m ()
proc = Message -> (a -> Bool) -> (a -> m ()) -> m ()
forall (m :: * -> *) a.
(Monad m, Serializable a) =>
Message -> (a -> Bool) -> (a -> m ()) -> m ()
handleMessageIf_ Message
msg (Bool -> a -> Bool
forall a b. a -> b -> a
const Bool
True) a -> m ()
proc

-- | Conditional version of 'handleMessage_'.
--
handleMessageIf_ :: forall m a . (Monad m, Serializable a)
                => Message
                -> (a -> Bool)
                -> (a -> m ())
                -> m ()
handleMessageIf_ :: forall (m :: * -> *) a.
(Monad m, Serializable a) =>
Message -> (a -> Bool) -> (a -> m ()) -> m ()
handleMessageIf_ Message
msg a -> Bool
c a -> m ()
proc = Message -> (a -> Bool) -> (a -> m ()) -> m (Maybe ())
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b)
handleMessageIf Message
msg a -> Bool
c a -> m ()
proc m (Maybe ()) -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Match against an arbitrary message. 'matchAny' removes the first available
-- message from the process mailbox. To handle arbitrary /raw/ messages once
-- removed from the mailbox, see 'handleMessage' and 'unwrapMessage'.
--
matchAny :: forall b. (Message -> Process b) -> Match b
matchAny :: forall b. (Message -> Process b) -> Match b
matchAny Message -> Process b
p = MatchOn Message (Process b) -> Match b
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process b) -> Match b)
-> MatchOn Message (Process b) -> Match b
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall m a. (m -> Maybe a) -> MatchOn m a
MatchMsg ((Message -> Maybe (Process b)) -> MatchOn Message (Process b))
-> (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall a b. (a -> b) -> a -> b
$ \Message
msg -> Process b -> Maybe (Process b)
forall a. a -> Maybe a
Just (Message -> Process b
p Message
msg)

-- | Match against an arbitrary message. Intended for use with 'handleMessage'
-- and 'unwrapMessage', this function /only/ removes a message from the process
-- mailbox, /if/ the supplied condition matches. The success (or failure) of
-- runtime type checks deferred to @handleMessage@ and friends is irrelevant
-- here, i.e., if the condition evaluates to @True@ then the message will
-- be removed from the process mailbox and decoded, but that does /not/
-- guarantee that an expression passed to @handleMessage@ will pass the
-- runtime type checks and therefore be evaluated.
--
matchAnyIf :: forall a b. (Serializable a)
                       => (a -> Bool)
                       -> (Message -> Process b)
                       -> Match b
matchAnyIf :: forall a b.
Serializable a =>
(a -> Bool) -> (Message -> Process b) -> Match b
matchAnyIf a -> Bool
c Message -> Process b
p = MatchOn Message (Process b) -> Match b
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process b) -> Match b)
-> MatchOn Message (Process b) -> Match b
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall m a. (m -> Maybe a) -> MatchOn m a
MatchMsg ((Message -> Maybe (Process b)) -> MatchOn Message (Process b))
-> (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall a b. (a -> b) -> a -> b
$ \Message
msg ->
  case Message -> Fingerprint
messageFingerprint Message
msg Fingerprint -> Fingerprint -> Bool
forall a. Eq a => a -> a -> Bool
== a -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint (a
forall a. HasCallStack => a
undefined :: a) of
     Bool
True | Bool
check -> Process b -> Maybe (Process b)
forall a. a -> Maybe a
Just (Message -> Process b
p Message
msg)
       where
         check :: Bool
         !check :: Bool
check =
           case Message
msg of
             (EncodedMessage Fingerprint
_ ByteString
_)    -> a -> Bool
c a
decoded
             (UnencodedMessage Fingerprint
_ a
m') -> a -> Bool
c (a -> a
forall a b. a -> b
unsafeCoerce a
m')

         decoded :: a -- note [decoding]
         !decoded :: a
decoded = ByteString -> a
forall a. Binary a => ByteString -> a
decode (Message -> ByteString
messageEncoding Message
msg)
     Bool
_ -> Maybe (Process b)
forall a. Maybe a
Nothing

{- note [decoding]
For an EncodedMessage, we need to ensure the value is fully decoded so that
we don't hang to bytestrings if the calling process doesn't evaluate
immediately. For UnencodedMessage we know (because the fingerprint comparison
succeeds) that unsafeCoerce will not fail.
-}

-- | Remove any message from the queue
matchUnknown :: Process b -> Match b
matchUnknown :: forall b. Process b -> Match b
matchUnknown Process b
p = MatchOn Message (Process b) -> Match b
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process b) -> Match b)
-> MatchOn Message (Process b) -> Match b
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall m a. (m -> Maybe a) -> MatchOn m a
MatchMsg (Maybe (Process b) -> Message -> Maybe (Process b)
forall a b. a -> b -> a
const (Process b -> Maybe (Process b)
forall a. a -> Maybe a
Just Process b
p))

-- | Receives messages and forwards them to @pid@ if @p msg == True@.
delegate :: ProcessId -> (Message -> Bool) -> Process ()
delegate :: ProcessId -> (Message -> Bool) -> Process ()
delegate ProcessId
pid Message -> Bool
p = do
  [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [
      (Message -> Process ()) -> Match ()
forall b. (Message -> Process b) -> Match b
matchAny (\Message
m -> case (Message -> Bool
p Message
m) of
                        Bool
True  -> Message -> ProcessId -> Process ()
forward Message
m ProcessId
pid
                        Bool
False -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
    ]
  ProcessId -> (Message -> Bool) -> Process ()
delegate ProcessId
pid Message -> Bool
p

-- | A straight relay that forwards all messages to the supplied pid.
relay :: ProcessId -> Process ()
relay :: ProcessId -> Process ()
relay !ProcessId
pid = [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [ (Message -> Process ()) -> Match ()
forall b. (Message -> Process b) -> Match b
matchAny (\Message
m -> Message -> ProcessId -> Process ()
forward Message
m ProcessId
pid) ] Process () -> Process () -> Process ()
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ProcessId -> Process ()
relay ProcessId
pid

-- | Proxies @pid@ and forwards messages whenever @proc@ evaluates to @True@.
-- Unlike 'delegate' the predicate @proc@ runs in the 'Process' monad, allowing
-- for richer proxy behaviour. If @proc@ returns @False@ or the /runtime type check/
-- fails, no action is taken and the proxy process will continue running.
proxy :: Serializable a => ProcessId -> (a -> Process Bool) -> Process ()
proxy :: forall a.
Serializable a =>
ProcessId -> (a -> Process Bool) -> Process ()
proxy ProcessId
pid a -> Process Bool
proc = do
  [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [
      (Message -> Process ()) -> Match ()
forall b. (Message -> Process b) -> Match b
matchAny (\Message
m -> do
                   Maybe Bool
next <- Message -> (a -> Process Bool) -> Process (Maybe Bool)
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
m a -> Process Bool
proc
                   case Maybe Bool
next of
                     Just Bool
True  -> Message -> ProcessId -> Process ()
forward Message
m ProcessId
pid
                     Just Bool
False -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()  -- explicitly ignored
                     Maybe Bool
Nothing    -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) -- un-routable
    ]
  ProcessId -> (a -> Process Bool) -> Process ()
forall a.
Serializable a =>
ProcessId -> (a -> Process Bool) -> Process ()
proxy ProcessId
pid a -> Process Bool
proc

--------------------------------------------------------------------------------
-- Process management                                                         --
--------------------------------------------------------------------------------

-- | Thrown by 'terminate'
data ProcessTerminationException = ProcessTerminationException
  deriving (Int -> ProcessTerminationException -> String -> String
[ProcessTerminationException] -> String -> String
ProcessTerminationException -> String
(Int -> ProcessTerminationException -> String -> String)
-> (ProcessTerminationException -> String)
-> ([ProcessTerminationException] -> String -> String)
-> Show ProcessTerminationException
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
$cshowsPrec :: Int -> ProcessTerminationException -> String -> String
showsPrec :: Int -> ProcessTerminationException -> String -> String
$cshow :: ProcessTerminationException -> String
show :: ProcessTerminationException -> String
$cshowList :: [ProcessTerminationException] -> String -> String
showList :: [ProcessTerminationException] -> String -> String
Show, Typeable)

instance Exception ProcessTerminationException

-- | Terminate immediately (throws a ProcessTerminationException)
terminate :: Process a
terminate :: forall a. Process a
terminate = ProcessTerminationException -> Process a
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM ProcessTerminationException
ProcessTerminationException

-- [Issue #110]
-- | Die immediately - throws a 'ProcessExitException' with the given @reason@.
die :: Serializable a => a -> Process b
die :: forall a b. Serializable a => a -> Process b
die a
reason = do
-- NOTE: We do /not/ want to use UnencodedMessage here, as the exception
-- could be decoded by a handler passed to 'catchExit', re-thrown or even
-- passed to another process via the tracing mechanism.
  ProcessId
pid <- Process ProcessId
getSelfPid
  ProcessExitException -> Process b
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM (ProcessId -> Message -> ProcessExitException
ProcessExitException ProcessId
pid (a -> Message
forall a. Serializable a => a -> Message
createMessage a
reason))

-- | Forceful request to kill a process. Where 'exit' provides an exception
-- that can be caught and handled, 'kill' throws an unexposed exception type
-- which cannot be handled explicitly (by type).
kill :: ProcessId -> String -> Process ()
-- NOTE: We send the message to our local node controller, which will then
-- forward it to a remote node controller (if applicable). Sending it directly
-- to a remote node controller means that that the message may overtake a
-- 'monitor' or 'link' request.
kill :: ProcessId -> String -> Process ()
kill ProcessId
them String
reason = Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessId -> String -> ProcessSignal
Kill ProcessId
them String
reason)

-- | Graceful request to exit a process. Throws 'ProcessExitException' with the
-- supplied @reason@ encoded as a message. Any /exit signal/ raised in this
-- manner can be handled using the 'catchExit' family of functions.
exit :: Serializable a => ProcessId -> a -> Process ()
-- NOTE: We send the message to our local node controller, which will then
-- forward it to a remote node controller (if applicable). Sending it directly
-- to a remote node controller means that that the message may overtake a
-- 'monitor' or 'link' request.
exit :: forall a. Serializable a => ProcessId -> a -> Process ()
exit ProcessId
them a
reason = Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessId -> Message -> ProcessSignal
Exit ProcessId
them (a -> Message
forall a. Serializable a => a -> Message
createMessage a
reason))

-- | Catches 'ProcessExitException'. The handler will not be applied unless its
-- type matches the encoded data stored in the exception (see the /reason/
-- argument given to the 'exit' primitive). If the handler cannot be applied,
-- the exception will be re-thrown.
--
-- To handle 'ProcessExitException' without regard for /reason/, see 'catch'.
-- To handle multiple /reasons/ of differing types, see 'catchesExit'.
catchExit :: forall a b . (Show a, Serializable a)
                       => Process b
                       -> (ProcessId -> a -> Process b)
                       -> Process b
catchExit :: forall a b.
(Show a, Serializable a) =>
Process b -> (ProcessId -> a -> Process b) -> Process b
catchExit Process b
act ProcessId -> a -> Process b
exitHandler = Process b -> (ProcessExitException -> Process b) -> Process b
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
Catch.catch Process b
act ProcessExitException -> Process b
handleExit
  where
    handleExit :: ProcessExitException -> Process b
handleExit ex :: ProcessExitException
ex@(ProcessExitException ProcessId
from Message
msg) =
        if Message -> Fingerprint
messageFingerprint Message
msg Fingerprint -> Fingerprint -> Bool
forall a. Eq a => a -> a -> Bool
== a -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint (a
forall a. HasCallStack => a
undefined :: a)
          then ProcessId -> a -> Process b
exitHandler ProcessId
from a
decoded
          else ProcessExitException -> Process b
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM ProcessExitException
ex
     where
       decoded :: a
       -- Make sure the value is fully decoded so that we don't hang to
       -- bytestrings if the caller doesn't use the value immediately
       !decoded :: a
decoded = ByteString -> a
forall a. Binary a => ByteString -> a
decode (Message -> ByteString
messageEncoding Message
msg)

-- | Lift 'Control.Exception.catches' (almost).
--
-- As 'ProcessExitException' stores the exit @reason@ as a typed, encoded
-- message, a handler must accept inputs of the expected type. In order to
-- handle a list of potentially different handlers (and therefore input types),
-- a handler passed to 'catchesExit' must accept 'Message' and return
-- @Maybe@ (i.e., @Just p@ if it handled the exit reason, otherwise @Nothing@).
--
-- See 'maybeHandleMessage' and 'Message' for more details.
catchesExit :: Process b
            -> [(ProcessId -> Message -> (Process (Maybe b)))]
            -> Process b
catchesExit :: forall b.
Process b
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
catchesExit Process b
act [ProcessId -> Message -> Process (Maybe b)]
handlers = Process b -> (ProcessExitException -> Process b) -> Process b
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
Catch.catch Process b
act (((ProcessExitException
 -> [ProcessId -> Message -> Process (Maybe b)] -> Process b)
-> [ProcessId -> Message -> Process (Maybe b)]
-> ProcessExitException
-> Process b
forall a b c. (a -> b -> c) -> b -> a -> c
flip ProcessExitException
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
forall b.
ProcessExitException
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
handleExit) [ProcessId -> Message -> Process (Maybe b)]
handlers)
  where
    handleExit :: ProcessExitException
               -> [(ProcessId -> Message -> Process (Maybe b))]
               -> Process b
    handleExit :: forall b.
ProcessExitException
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
handleExit ProcessExitException
ex [] = ProcessExitException -> Process b
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM ProcessExitException
ex
    handleExit ex :: ProcessExitException
ex@(ProcessExitException ProcessId
from Message
msg) (ProcessId -> Message -> Process (Maybe b)
h:[ProcessId -> Message -> Process (Maybe b)]
hs) = do
      Maybe b
r <- ProcessId -> Message -> Process (Maybe b)
h ProcessId
from Message
msg
      case Maybe b
r of
        Maybe b
Nothing -> ProcessExitException
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
forall b.
ProcessExitException
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
handleExit ProcessExitException
ex [ProcessId -> Message -> Process (Maybe b)]
hs
        Just b
p  -> b -> Process b
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return b
p

-- | Our own process ID
getSelfPid :: Process ProcessId
getSelfPid :: Process ProcessId
getSelfPid = LocalProcess -> ProcessId
processId (LocalProcess -> ProcessId)
-> Process LocalProcess -> Process ProcessId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask

-- | Get the node ID of our local node
getSelfNode :: Process NodeId
getSelfNode :: Process NodeId
getSelfNode = LocalNode -> NodeId
localNodeId (LocalNode -> NodeId)
-> (LocalProcess -> LocalNode) -> LocalProcess -> NodeId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalProcess -> LocalNode
processNode (LocalProcess -> NodeId) -> Process LocalProcess -> Process NodeId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask


-- | Get statistics about the specified node
getNodeStats :: NodeId -> Process (Either DiedReason NodeStats)
getNodeStats :: NodeId -> Process (Either DiedReason NodeStats)
getNodeStats NodeId
nid = do
    NodeId
selfNode <- Process NodeId
getSelfNode
    if NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
selfNode
      then NodeStats -> Either DiedReason NodeStats
forall a b. b -> Either a b
Right (NodeStats -> Either DiedReason NodeStats)
-> Process NodeStats -> Process (Either DiedReason NodeStats)
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` Process NodeStats
getLocalNodeStats -- optimisation
      else NodeId -> Process (Either DiedReason NodeStats)
getNodeStatsRemote NodeId
selfNode
  where
    getNodeStatsRemote :: NodeId -> Process (Either DiedReason NodeStats)
    getNodeStatsRemote :: NodeId -> Process (Either DiedReason NodeStats)
getNodeStatsRemote NodeId
selfNode = do
        Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ NodeId -> ProcessSignal
GetNodeStats NodeId
selfNode
        Process MonitorRef
-> (MonitorRef -> Process ())
-> (MonitorRef -> Process (Either DiedReason NodeStats))
-> Process (Either DiedReason NodeStats)
forall a b c.
Process a -> (a -> Process b) -> (a -> Process c) -> Process c
bracket (NodeId -> Process MonitorRef
monitorNode NodeId
nid) MonitorRef -> Process ()
unmonitor ((MonitorRef -> Process (Either DiedReason NodeStats))
 -> Process (Either DiedReason NodeStats))
-> (MonitorRef -> Process (Either DiedReason NodeStats))
-> Process (Either DiedReason NodeStats)
forall a b. (a -> b) -> a -> b
$ \MonitorRef
mRef ->
            [Match (Either DiedReason NodeStats)]
-> Process (Either DiedReason NodeStats)
forall b. [Match b] -> Process b
receiveWait [ (NodeStats -> Process (Either DiedReason NodeStats))
-> Match (Either DiedReason NodeStats)
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(NodeStats
stats :: NodeStats) -> Either DiedReason NodeStats
-> Process (Either DiedReason NodeStats)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either DiedReason NodeStats
 -> Process (Either DiedReason NodeStats))
-> Either DiedReason NodeStats
-> Process (Either DiedReason NodeStats)
forall a b. (a -> b) -> a -> b
$ NodeStats -> Either DiedReason NodeStats
forall a b. b -> Either a b
Right NodeStats
stats)
                        , (NodeMonitorNotification -> Bool)
-> (NodeMonitorNotification
    -> Process (Either DiedReason NodeStats))
-> Match (Either DiedReason NodeStats)
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(NodeMonitorNotification MonitorRef
ref NodeId
_ DiedReason
_) -> MonitorRef
ref MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
mRef)
                                  (\(NodeMonitorNotification MonitorRef
_ NodeId
_ DiedReason
dr) -> Either DiedReason NodeStats
-> Process (Either DiedReason NodeStats)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either DiedReason NodeStats
 -> Process (Either DiedReason NodeStats))
-> Either DiedReason NodeStats
-> Process (Either DiedReason NodeStats)
forall a b. (a -> b) -> a -> b
$ DiedReason -> Either DiedReason NodeStats
forall a b. a -> Either a b
Left DiedReason
dr)
                        ]

-- | Get statistics about our local node
getLocalNodeStats :: Process NodeStats
getLocalNodeStats :: Process NodeStats
getLocalNodeStats = do
  NodeId
self <- Process NodeId
getSelfNode
  Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ NodeId -> ProcessSignal
GetNodeStats NodeId
self
  [Match NodeStats] -> Process NodeStats
forall b. [Match b] -> Process b
receiveWait [ (NodeStats -> Process NodeStats) -> Match NodeStats
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(NodeStats
stats :: NodeStats) -> NodeStats -> Process NodeStats
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return NodeStats
stats) ]

-- | Get information about the specified process
getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo)
getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo)
getProcessInfo ProcessId
pid =
  let them :: NodeId
them = ProcessId -> NodeId
processNodeId ProcessId
pid in do
  NodeId
us <- Process NodeId
getSelfNode
  Maybe NodeId
dest <- NodeId -> NodeId -> Process (Maybe NodeId)
mkNode NodeId
them NodeId
us
  Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
dest (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> ProcessSignal
GetInfo ProcessId
pid
  [Match (Maybe ProcessInfo)] -> Process (Maybe ProcessInfo)
forall b. [Match b] -> Process b
receiveWait [
       (ProcessInfo -> Process (Maybe ProcessInfo))
-> Match (Maybe ProcessInfo)
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(ProcessInfo
p :: ProcessInfo)     -> Maybe ProcessInfo -> Process (Maybe ProcessInfo)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ProcessInfo -> Process (Maybe ProcessInfo))
-> Maybe ProcessInfo -> Process (Maybe ProcessInfo)
forall a b. (a -> b) -> a -> b
$ ProcessInfo -> Maybe ProcessInfo
forall a. a -> Maybe a
Just ProcessInfo
p)
     , (ProcessInfoNone -> Process (Maybe ProcessInfo))
-> Match (Maybe ProcessInfo)
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(ProcessInfoNone
_ :: ProcessInfoNone) -> Maybe ProcessInfo -> Process (Maybe ProcessInfo)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ProcessInfo
forall a. Maybe a
Nothing)
     ]
  where mkNode :: NodeId -> NodeId -> Process (Maybe NodeId)
        mkNode :: NodeId -> NodeId -> Process (Maybe NodeId)
mkNode NodeId
them NodeId
us = case NodeId
them NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
us of
                           Bool
True -> Maybe NodeId -> Process (Maybe NodeId)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe NodeId
forall a. Maybe a
Nothing
                           Bool
_    -> Maybe NodeId -> Process (Maybe NodeId)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe NodeId -> Process (Maybe NodeId))
-> Maybe NodeId -> Process (Maybe NodeId)
forall a b. (a -> b) -> a -> b
$ NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
them

--------------------------------------------------------------------------------
-- Monitoring and linking                                                     --
--------------------------------------------------------------------------------

-- | Link to a remote process (asynchronous)
--
-- When process A links to process B (that is, process A calls
-- @link pidB@) then an asynchronous exception will be thrown to process A
-- when process B terminates (normally or abnormally), or when process A gets
-- disconnected from process B. Although it is /technically/ possible to catch
-- these exceptions, chances are if you find yourself trying to do so you should
-- probably be using 'monitor' rather than 'link'. In particular, code such as
--
-- > link pidB   -- Link to process B
-- > expect      -- Wait for a message from process B
-- > unlink pidB -- Unlink again
--
-- doesn't quite do what one might expect: if process B sends a message to
-- process A, and /subsequently terminates/, then process A might or might not
-- be terminated too, depending on whether the exception is thrown before or
-- after the 'unlink' (i.e., this code has a race condition).
--
-- Linking is all-or-nothing: A is either linked to B, or it's not. A second
-- call to 'link' has no effect.
--
-- Note that 'link' provides unidirectional linking (see 'spawnSupervised').
-- Linking makes no distinction between normal and abnormal termination of
-- the remote process.
link :: ProcessId -> Process ()
link :: ProcessId -> Process ()
link = Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ())
-> (ProcessId -> ProcessSignal) -> ProcessId -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identifier -> ProcessSignal
Link (Identifier -> ProcessSignal)
-> (ProcessId -> Identifier) -> ProcessId -> ProcessSignal
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessId -> Identifier
ProcessIdentifier

-- | Monitor another process (asynchronous)
--
-- When process A monitors process B (that is, process A calls
-- @monitor pidB@) then process A will receive a 'ProcessMonitorNotification'
-- when process B terminates (normally or abnormally), or when process A gets
-- disconnected from process B. You receive this message like any other (using
-- 'expect'); the notification includes a reason ('DiedNormal', 'DiedException',
-- 'DiedDisconnect', etc.).
--
-- Every call to 'monitor' returns a new monitor reference 'MonitorRef'; if
-- multiple monitors are set up, multiple notifications will be delivered
-- and monitors can be disabled individually using 'unmonitor'.
monitor :: ProcessId -> Process MonitorRef
monitor :: ProcessId -> Process MonitorRef
monitor = Identifier -> Process MonitorRef
monitor' (Identifier -> Process MonitorRef)
-> (ProcessId -> Identifier) -> ProcessId -> Process MonitorRef
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessId -> Identifier
ProcessIdentifier

-- | Establishes temporary monitoring of another process.
--
-- @withMonitor pid code@ sets up monitoring of @pid@ for the duration
-- of @code@.  Note: although monitoring is no longer active when
-- @withMonitor@ returns, there might still be unreceived monitor
-- messages in the queue.
--
withMonitor :: ProcessId -> (MonitorRef -> Process a) -> Process a
withMonitor :: forall a. ProcessId -> (MonitorRef -> Process a) -> Process a
withMonitor ProcessId
pid = Process MonitorRef
-> (MonitorRef -> Process ())
-> (MonitorRef -> Process a)
-> Process a
forall a b c.
Process a -> (a -> Process b) -> (a -> Process c) -> Process c
bracket (ProcessId -> Process MonitorRef
monitor ProcessId
pid) MonitorRef -> Process ()
unmonitor

-- | Establishes temporary monitoring of another process.
--
-- @withMonitor_ pid code@ sets up monitoring of @pid@ for the duration
-- of @code@. Note: although monitoring is no longer active when
-- @withMonitor_@ returns, there might still be unreceived monitor
-- messages in the queue.
--
-- Since 0.6.1
withMonitor_ :: ProcessId -> Process a -> Process a
withMonitor_ :: forall a. ProcessId -> Process a -> Process a
withMonitor_ ProcessId
p = ProcessId -> (MonitorRef -> Process a) -> Process a
forall a. ProcessId -> (MonitorRef -> Process a) -> Process a
withMonitor ProcessId
p ((MonitorRef -> Process a) -> Process a)
-> (Process a -> MonitorRef -> Process a) -> Process a -> Process a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Process a -> MonitorRef -> Process a
forall a b. a -> b -> a
const

-- | Remove a link
--
-- This is synchronous in the sense that once it returns you are guaranteed
-- that no exception will be raised if the remote process dies. However, it is
-- asynchronous in the sense that we do not wait for a response from the remote
-- node.
unlink :: ProcessId -> Process ()
unlink :: ProcessId -> Process ()
unlink ProcessId
pid = do
  ProcessId -> Process ()
unlinkAsync ProcessId
pid
  [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [ (DidUnlinkProcess -> Bool)
-> (DidUnlinkProcess -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(DidUnlinkProcess ProcessId
pid') -> ProcessId
pid' ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
pid)
                        (\DidUnlinkProcess
_ -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
              ]

-- | Remove a node link
--
-- This has the same synchronous/asynchronous nature as 'unlink'.
unlinkNode :: NodeId -> Process ()
unlinkNode :: NodeId -> Process ()
unlinkNode NodeId
nid = do
  NodeId -> Process ()
unlinkNodeAsync NodeId
nid
  [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [ (DidUnlinkNode -> Bool)
-> (DidUnlinkNode -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(DidUnlinkNode NodeId
nid') -> NodeId
nid' NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
nid)
                        (\DidUnlinkNode
_ -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
              ]

-- | Remove a channel (send port) link
--
-- This has the same synchronous/asynchronous nature as 'unlink'.
unlinkPort :: SendPort a -> Process ()
unlinkPort :: forall a. SendPort a -> Process ()
unlinkPort SendPort a
sport = do
  SendPort a -> Process ()
forall a. SendPort a -> Process ()
unlinkPortAsync SendPort a
sport
  [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [ (DidUnlinkPort -> Bool)
-> (DidUnlinkPort -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(DidUnlinkPort SendPortId
cid) -> SendPortId
cid SendPortId -> SendPortId -> Bool
forall a. Eq a => a -> a -> Bool
== SendPort a -> SendPortId
forall a. SendPort a -> SendPortId
sendPortId SendPort a
sport)
                        (\DidUnlinkPort
_ -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
              ]

-- | Remove a monitor
--
-- This has the same synchronous/asynchronous nature as 'unlink'.
--
-- ProcessMonitorNotification messages for the given MonitorRef are removed from
-- the mailbox.
unmonitor :: MonitorRef -> Process ()
unmonitor :: MonitorRef -> Process ()
unmonitor MonitorRef
ref = do
  MonitorRef -> Process ()
unmonitorAsync MonitorRef
ref
  [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [ (DidUnmonitor -> Bool) -> (DidUnmonitor -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(DidUnmonitor MonitorRef
ref') -> MonitorRef
ref' MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
ref)
                        (\DidUnmonitor
_ -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
              ]
  -- Discard the notification if any. With the current NC implementation at most
  -- one notification is in the mailbox for any given ref.
  Process (Maybe ()) -> Process ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Process (Maybe ()) -> Process ())
-> Process (Maybe ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ Int -> [Match ()] -> Process (Maybe ())
forall b. Int -> [Match b] -> Process (Maybe b)
receiveTimeout Int
0
    [ (ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(ProcessMonitorNotification MonitorRef
ref' ProcessId
_ DiedReason
_) -> MonitorRef
ref' MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
ref)
              (Process () -> ProcessMonitorNotification -> Process ()
forall a b. a -> b -> a
const (Process () -> ProcessMonitorNotification -> Process ())
-> Process () -> ProcessMonitorNotification -> Process ()
forall a b. (a -> b) -> a -> b
$ () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
    ]

--------------------------------------------------------------------------------
-- Exception handling                                                         --
--------------------------------------------------------------------------------

-- | Lift 'Control.Exception.catch'
catch :: Exception e => Process a -> (e -> Process a) -> Process a
catch :: forall e a.
Exception e =>
Process a -> (e -> Process a) -> Process a
catch = Process a -> (e -> Process a) -> Process a
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
Catch.catch
{-# DEPRECATED catch "Use Control.Monad.Catch.catch instead" #-}

-- | Lift 'Control.Exception.try'
try :: Exception e => Process a -> Process (Either e a)
try :: forall e a. Exception e => Process a -> Process (Either e a)
try = Process a -> Process (Either e a)
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
Catch.try
{-# DEPRECATED try "Use Control.Monad.Catch.try instead" #-}

-- | Lift 'Control.Exception.mask'
mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b
mask :: forall b.
((forall a. Process a -> Process a) -> Process b) -> Process b
mask = ((forall a. Process a -> Process a) -> Process b) -> Process b
forall b.
HasCallStack =>
((forall a. Process a -> Process a) -> Process b) -> Process b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
Catch.mask
{-# DEPRECATED mask "Use Control.Monad.Catch.mask_ instead" #-}

-- | Lift 'Control.Exception.mask_'
mask_ :: Process a -> Process a
mask_ :: forall a. Process a -> Process a
mask_ = Process a -> Process a
forall (m :: * -> *) a. (HasCallStack, MonadMask m) => m a -> m a
Catch.mask_
{-# DEPRECATED mask_ "Use Control.Monad.Catch.mask_ instead" #-}

-- | Lift 'Control.Exception.onException'
onException :: Process a -> Process b -> Process a
onException :: forall a b. Process a -> Process b -> Process a
onException = Process a -> Process b -> Process a
forall (m :: * -> *) a b.
(HasCallStack, MonadCatch m) =>
m a -> m b -> m a
Catch.onException
{-# DEPRECATED onException "Use Control.Monad.Catch.onException instead" #-}

-- | Lift 'Control.Exception.bracket'
bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c
bracket :: forall a b c.
Process a -> (a -> Process b) -> (a -> Process c) -> Process c
bracket = Process a -> (a -> Process b) -> (a -> Process c) -> Process c
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> (a -> m c) -> (a -> m b) -> m b
Catch.bracket
{-# DEPRECATED bracket "Use Control.Monad.Catch.bracket instead" #-}

-- | Lift 'Control.Exception.bracket_'
bracket_ :: Process a -> Process b -> Process c -> Process c
bracket_ :: forall a b c. Process a -> Process b -> Process c -> Process c
bracket_ = Process a -> Process b -> Process c -> Process c
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> m c -> m b -> m b
Catch.bracket_
{-# DEPRECATED bracket_ "Use Control.Monad.Catch.bracket_ instead" #-}

-- | Lift 'Control.Exception.finally'
finally :: Process a -> Process b -> Process a
finally :: forall a b. Process a -> Process b -> Process a
finally = Process a -> Process b -> Process a
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
Catch.finally
{-# DEPRECATED finally "Use Control.Monad.Catch.finally instead" #-}

-- | You need this when using 'catches'
data Handler a = forall e . Exception e => Handler (e -> Process a)

instance Functor Handler where
    fmap :: forall a b. (a -> b) -> Handler a -> Handler b
fmap a -> b
f (Handler e -> Process a
h) = (e -> Process b) -> Handler b
forall a e. Exception e => (e -> Process a) -> Handler a
Handler ((a -> b) -> Process a -> Process b
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f (Process a -> Process b) -> (e -> Process a) -> e -> Process b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> Process a
h)

-- | Lift 'Control.Exception.catches'
catches :: Process a -> [Handler a] -> Process a
catches :: forall a. Process a -> [Handler a] -> Process a
catches Process a
proc [Handler a]
handlers = Process a
proc Process a -> (SomeException -> Process a) -> Process a
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
`Catch.catch` [Handler a] -> SomeException -> Process a
forall a. [Handler a] -> SomeException -> Process a
catchesHandler [Handler a]
handlers

catchesHandler :: [Handler a] -> SomeException -> Process a
catchesHandler :: forall a. [Handler a] -> SomeException -> Process a
catchesHandler [Handler a]
handlers SomeException
e = (Handler a -> Process a -> Process a)
-> Process a -> [Handler a] -> Process a
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr Handler a -> Process a -> Process a
forall {a}. Handler a -> Process a -> Process a
tryHandler (SomeException -> Process a
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM SomeException
e) [Handler a]
handlers
    where tryHandler :: Handler a -> Process a -> Process a
tryHandler (Handler e -> Process a
handler) Process a
res
              = case SomeException -> Maybe e
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
                Just e
e' -> e -> Process a
handler e
e'
                Maybe e
Nothing -> Process a
res

--------------------------------------------------------------------------------
-- Auxiliary API                                                              --
--------------------------------------------------------------------------------

-- | Like 'expect' but with a timeout
expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)
expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)
expectTimeout Int
n = Int -> [Match a] -> Process (Maybe a)
forall b. Int -> [Match b] -> Process (Maybe b)
receiveTimeout Int
n [(a -> Process a) -> Match a
forall a b. Serializable a => (a -> Process b) -> Match b
match a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return]

-- | Asynchronous version of 'spawn'
--
-- ('spawn' is defined in terms of 'spawnAsync' and 'expect')
spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef
spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef
spawnAsync NodeId
nid Closure (Process ())
proc = do
  SpawnRef
spawnRef <- Process SpawnRef
getSpawnRef
  NodeId
node     <- Process NodeId
getSelfNode
  if NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
node
    then Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ Closure (Process ()) -> SpawnRef -> ProcessSignal
Spawn Closure (Process ())
proc SpawnRef
spawnRef
    else Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ Closure (Process ()) -> SpawnRef -> ProcessSignal
Spawn Closure (Process ())
proc SpawnRef
spawnRef
  SpawnRef -> Process SpawnRef
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return SpawnRef
spawnRef

-- | Monitor a node (asynchronous)
monitorNode :: NodeId -> Process MonitorRef
monitorNode :: NodeId -> Process MonitorRef
monitorNode =
  Identifier -> Process MonitorRef
monitor' (Identifier -> Process MonitorRef)
-> (NodeId -> Identifier) -> NodeId -> Process MonitorRef
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeId -> Identifier
NodeIdentifier

-- | Monitor a typed channel (asynchronous)
monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRef
monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRef
monitorPort (SendPort SendPortId
cid) =
  Identifier -> Process MonitorRef
monitor' (SendPortId -> Identifier
SendPortIdentifier SendPortId
cid)

-- | Remove a monitor (asynchronous)
unmonitorAsync :: MonitorRef -> Process ()
unmonitorAsync :: MonitorRef -> Process ()
unmonitorAsync =
  Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ())
-> (MonitorRef -> ProcessSignal) -> MonitorRef -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MonitorRef -> ProcessSignal
Unmonitor

-- | Link to a node (asynchronous)
linkNode :: NodeId -> Process ()
linkNode :: NodeId -> Process ()
linkNode = Identifier -> Process ()
link' (Identifier -> Process ())
-> (NodeId -> Identifier) -> NodeId -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeId -> Identifier
NodeIdentifier

-- | Link to a channel (asynchronous)
linkPort :: SendPort a -> Process ()
linkPort :: forall a. SendPort a -> Process ()
linkPort (SendPort SendPortId
cid) =
  Identifier -> Process ()
link' (SendPortId -> Identifier
SendPortIdentifier SendPortId
cid)

-- | Remove a process link (asynchronous)
unlinkAsync :: ProcessId -> Process ()
unlinkAsync :: ProcessId -> Process ()
unlinkAsync =
  Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ())
-> (ProcessId -> ProcessSignal) -> ProcessId -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identifier -> ProcessSignal
Unlink (Identifier -> ProcessSignal)
-> (ProcessId -> Identifier) -> ProcessId -> ProcessSignal
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessId -> Identifier
ProcessIdentifier

-- | Remove a node link (asynchronous)
unlinkNodeAsync :: NodeId -> Process ()
unlinkNodeAsync :: NodeId -> Process ()
unlinkNodeAsync =
  Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ())
-> (NodeId -> ProcessSignal) -> NodeId -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identifier -> ProcessSignal
Unlink (Identifier -> ProcessSignal)
-> (NodeId -> Identifier) -> NodeId -> ProcessSignal
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeId -> Identifier
NodeIdentifier

-- | Remove a channel (send port) link (asynchronous)
unlinkPortAsync :: SendPort a -> Process ()
unlinkPortAsync :: forall a. SendPort a -> Process ()
unlinkPortAsync (SendPort SendPortId
cid) =
  Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ())
-> (Identifier -> ProcessSignal) -> Identifier -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identifier -> ProcessSignal
Unlink (Identifier -> Process ()) -> Identifier -> Process ()
forall a b. (a -> b) -> a -> b
$ SendPortId -> Identifier
SendPortIdentifier SendPortId
cid

--------------------------------------------------------------------------------
-- Logging                                                                    --
--------------------------------------------------------------------------------

data SayMessage = SayMessage { SayMessage -> UTCTime
sayTime    :: UTCTime
                             , SayMessage -> ProcessId
sayProcess :: ProcessId
                             , SayMessage -> String
sayMessage :: String }
  deriving (Typeable)

-- There is sadly no Show UTCTime instance
instance Show SayMessage where
  showsPrec :: Int -> SayMessage -> String -> String
showsPrec Int
p SayMessage
msg =
    Bool -> (String -> String) -> String -> String
showParen (Int
p Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
11)
    ((String -> String) -> String -> String)
-> (String -> String) -> String -> String
forall a b. (a -> b) -> a -> b
$ String -> String -> String
showString String
"SayMessage "
    (String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> String -> String
showString (TimeLocale -> String -> UTCTime -> String
forall t. FormatTime t => TimeLocale -> String -> t -> String
formatTime TimeLocale
defaultTimeLocale String
"%c" (SayMessage -> UTCTime
sayTime SayMessage
msg))
    (String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Char -> String -> String
showChar Char
' '
    (String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> ProcessId -> String -> String
forall a. Show a => Int -> a -> String -> String
showsPrec Int
11 (SayMessage -> ProcessId
sayProcess SayMessage
msg) (String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Char -> String -> String
showChar Char
' '
    (String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> String -> String -> String
forall a. Show a => Int -> a -> String -> String
showsPrec Int
11 (SayMessage -> String
sayMessage SayMessage
msg) (String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Char -> String -> String
showChar Char
' '

instance Binary SayMessage where
  put :: SayMessage -> Put
put SayMessage
s = do
    UTCTime -> Put
putUTCTime (SayMessage -> UTCTime
sayTime SayMessage
s)
    ProcessId -> Put
forall t. Binary t => t -> Put
put (SayMessage -> ProcessId
sayProcess SayMessage
s)
    String -> Put
forall t. Binary t => t -> Put
put (SayMessage -> String
sayMessage SayMessage
s)
  get :: Get SayMessage
get = UTCTime -> ProcessId -> String -> SayMessage
SayMessage (UTCTime -> ProcessId -> String -> SayMessage)
-> Get UTCTime -> Get (ProcessId -> String -> SayMessage)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get UTCTime
getUTCTime Get (ProcessId -> String -> SayMessage)
-> Get ProcessId -> Get (String -> SayMessage)
forall a b. Get (a -> b) -> Get a -> Get b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Get ProcessId
forall t. Binary t => Get t
get Get (String -> SayMessage) -> Get String -> Get SayMessage
forall a b. Get (a -> b) -> Get a -> Get b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Get String
forall t. Binary t => Get t
get

-- Sadly there is no Binary UTCTime instance
putUTCTime :: UTCTime -> Put
putUTCTime :: UTCTime -> Put
putUTCTime (UTCTime (ModifiedJulianDay Integer
day) DiffTime
tod) = do
  Integer -> Put
forall t. Binary t => t -> Put
put Integer
day
  Rational -> Put
forall t. Binary t => t -> Put
put (DiffTime -> Rational
forall a. Real a => a -> Rational
toRational DiffTime
tod)

getUTCTime :: Get UTCTime
getUTCTime :: Get UTCTime
getUTCTime = do
  Integer
day <- Get Integer
forall t. Binary t => Get t
get
  Rational
tod <- Get Rational
forall t. Binary t => Get t
get
  UTCTime -> Get UTCTime
forall a. a -> Get a
forall (m :: * -> *) a. Monad m => a -> m a
return (UTCTime -> Get UTCTime) -> UTCTime -> Get UTCTime
forall a b. (a -> b) -> a -> b
$! Day -> DiffTime -> UTCTime
UTCTime (Integer -> Day
ModifiedJulianDay Integer
day)
                    (Rational -> DiffTime
forall a. Fractional a => Rational -> a
fromRational Rational
tod)

-- | Log a string
--
-- @say message@ sends a message of type 'SayMessage' with the current time and
-- 'ProcessId' of the current process to the process registered as @logger@. By
-- default, this process simply sends the string to @stderr@. Individual Cloud
-- Haskell backends might replace this with a different logger process, however.
say :: String -> Process ()
say :: String -> Process ()
say String
string = do
  UTCTime
now <- IO UTCTime -> Process UTCTime
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
  ProcessId
us  <- Process ProcessId
getSelfPid
  String -> SayMessage -> Process ()
forall a. Serializable a => String -> a -> Process ()
nsend String
"logger" (UTCTime -> ProcessId -> String -> SayMessage
SayMessage UTCTime
now ProcessId
us String
string)

--------------------------------------------------------------------------------
-- Registry                                                                   --
--------------------------------------------------------------------------------

-- | Register a process with the local registry (synchronous). The name must not
--  already be registered. The process need not be on this node. A bad
--  registration will result in a 'ProcessRegistrationException'
--
-- The process to be registered does not have to be local itself.
register :: String -> ProcessId -> Process ()
register :: String -> ProcessId -> Process ()
register = Bool -> String -> ProcessId -> Process ()
registerImpl Bool
False

-- | Like 'register', but will replace an existing registration.
-- The name must already be registered.
reregister :: String -> ProcessId -> Process ()
reregister :: String -> ProcessId -> Process ()
reregister = Bool -> String -> ProcessId -> Process ()
registerImpl Bool
True

registerImpl :: Bool -> String -> ProcessId -> Process ()
registerImpl :: Bool -> String -> ProcessId -> Process ()
registerImpl Bool
force String
label ProcessId
pid = do
  NodeId
mynid <- Process NodeId
getSelfNode
  Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
mynid (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
pid) Bool
force)
  [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait
    [ (RegisterReply -> Bool)
-> (RegisterReply -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(RegisterReply String
label' Bool
_ Maybe ProcessId
_) -> String
label String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label')
              (\(RegisterReply String
_ Bool
ok Maybe ProcessId
owner) -> String -> Bool -> Maybe ProcessId -> Process ()
handleRegistrationReply String
label Bool
ok Maybe ProcessId
owner)
    ]

-- | Register a process with a remote registry (asynchronous).
--
-- The process to be registered does not have to live on the same remote node.
-- Reply wil come in the form of a 'RegisterReply' message
--
-- See comments in 'whereisRemoteAsync'
registerRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
registerRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
registerRemoteAsync NodeId
nid String
label ProcessId
pid = do
    NodeId
here <- Process NodeId
getSelfNode
    Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (if NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
here then Maybe NodeId
forall a. Maybe a
Nothing else NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid)
                (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
nid (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
pid) Bool
False)

reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
reregisterRemoteAsync NodeId
nid String
label ProcessId
pid =
  Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
nid (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
pid) Bool
True)

-- | Remove a process from the local registry (asynchronous).
-- This version will wait until a response is gotten from the
-- management process. The name must already be registered.
unregister :: String -> Process ()
unregister :: String -> Process ()
unregister String
label = do
  NodeId
mynid <- Process NodeId
getSelfNode
  Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
mynid Maybe ProcessId
forall a. Maybe a
Nothing Bool
False)
  [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait
    [ (RegisterReply -> Bool)
-> (RegisterReply -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(RegisterReply String
label' Bool
_ Maybe ProcessId
_) -> String
label String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label')
              (\(RegisterReply String
_ Bool
ok Maybe ProcessId
owner) -> String -> Bool -> Maybe ProcessId -> Process ()
handleRegistrationReply String
label Bool
ok Maybe ProcessId
owner)
    ]

-- | Deal with the result from an attempted registration or unregistration
-- by throwing an exception if necessary
handleRegistrationReply :: String -> Bool -> Maybe ProcessId -> Process ()
handleRegistrationReply :: String -> Bool -> Maybe ProcessId -> Process ()
handleRegistrationReply String
label Bool
ok Maybe ProcessId
owner =
  Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
ok) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
     ProcessRegistrationException -> Process ()
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM (ProcessRegistrationException -> Process ())
-> ProcessRegistrationException -> Process ()
forall a b. (a -> b) -> a -> b
$ String -> Maybe ProcessId -> ProcessRegistrationException
ProcessRegistrationException String
label Maybe ProcessId
owner

-- | Remove a process from a remote registry (asynchronous).
--
-- Reply wil come in the form of a 'RegisterReply' message
--
-- See comments in 'whereisRemoteAsync'
unregisterRemoteAsync :: NodeId -> String -> Process ()
unregisterRemoteAsync :: NodeId -> String -> Process ()
unregisterRemoteAsync NodeId
nid String
label =
  Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
nid Maybe ProcessId
forall a. Maybe a
Nothing Bool
False)

-- | Query the local process registry
whereis :: String -> Process (Maybe ProcessId)
whereis :: String -> Process (Maybe ProcessId)
whereis String
label = do
  Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (String -> ProcessSignal
WhereIs String
label)
  [Match (Maybe ProcessId)] -> Process (Maybe ProcessId)
forall b. [Match b] -> Process b
receiveWait [ (WhereIsReply -> Bool)
-> (WhereIsReply -> Process (Maybe ProcessId))
-> Match (Maybe ProcessId)
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(WhereIsReply String
label' Maybe ProcessId
_) -> String
label String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label')
                        (\(WhereIsReply String
_ Maybe ProcessId
mPid) -> Maybe ProcessId -> Process (Maybe ProcessId)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ProcessId
mPid)
              ]

-- | Query a remote process registry (asynchronous)
--
-- Reply will come in the form of a 'WhereIsReply' message.
--
-- There is currently no synchronous version of 'whereisRemoteAsync': if
-- you implement one yourself, be sure to take into account that the remote
-- node might die or get disconnect before it can respond (i.e. you should
-- use 'monitorNode' and take appropriate action when you receive a
-- 'NodeMonitorNotification').
whereisRemoteAsync :: NodeId -> String -> Process ()
whereisRemoteAsync :: NodeId -> String -> Process ()
whereisRemoteAsync NodeId
nid String
label = do
    NodeId
here <- Process NodeId
getSelfNode
    Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (if NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
here then Maybe NodeId
forall a. Maybe a
Nothing else NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (String -> ProcessSignal
WhereIs String
label)

-- | Named send to a process in the local registry (asynchronous)
nsend :: Serializable a => String -> a -> Process ()
nsend :: forall a. Serializable a => String -> a -> Process ()
nsend String
label a
msg = do
  LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  let msg' :: Message
msg' = a -> Message
forall a. Serializable a => a -> Message
createUnencodedMessage a
msg
  IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus (LocalProcess -> LocalNode
processNode LocalProcess
proc))
                      (String -> ProcessId -> Message -> MxEvent
MxSentToName String
label (LocalProcess -> ProcessId
processId LocalProcess
proc) Message
msg')
  Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (String -> Message -> ProcessSignal
NamedSend String
label Message
msg')

-- | Named send to a process in the local registry (asynchronous).
-- This function makes /no/ attempt to serialize and (in the case when the
-- destination process resides on the same local node) therefore ensure that
-- the payload is fully evaluated before it is delivered.
unsafeNSend :: Serializable a => String -> a -> Process ()
unsafeNSend :: forall a. Serializable a => String -> a -> Process ()
unsafeNSend = String -> a -> Process ()
forall a. Serializable a => String -> a -> Process ()
Unsafe.nsend

-- | Named send to a process in a remote registry (asynchronous)
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
nsendRemote :: forall a. Serializable a => NodeId -> String -> a -> Process ()
nsendRemote NodeId
nid String
label a
msg = do
  LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  let us :: ProcessId
us = LocalProcess -> ProcessId
processId LocalProcess
proc
  let node :: LocalNode
node = LocalProcess -> LocalNode
processNode LocalProcess
proc
  if LocalNode -> NodeId
localNodeId LocalNode
node NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
nid
    then String -> a -> Process ()
forall a. Serializable a => String -> a -> Process ()
nsend String
label a
msg
    else let lbl :: String
lbl = String
label String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"@" String -> String -> String
forall a. [a] -> [a] -> [a]
++ NodeId -> String
forall a. Show a => a -> String
show NodeId
nid in do
            IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node)
                                (String -> ProcessId -> Message -> MxEvent
MxSentToName String
lbl ProcessId
us (a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
msg))
            Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (String -> Message -> ProcessSignal
NamedSend String
label (a -> Message
forall a. Serializable a => a -> Message
createMessage a
msg))

-- | Named send to a process in a remote registry (asynchronous)
-- This function makes /no/ attempt to serialize and (in the case when the
-- destination process resides on the same local node) therefore ensure that
-- the payload is fully evaluated before it is delivered.
unsafeNSendRemote :: Serializable a => NodeId -> String -> a -> Process ()
unsafeNSendRemote :: forall a. Serializable a => NodeId -> String -> a -> Process ()
unsafeNSendRemote = NodeId -> String -> a -> Process ()
forall a. Serializable a => NodeId -> String -> a -> Process ()
Unsafe.nsendRemote

--------------------------------------------------------------------------------
-- Closures                                                                   --
--------------------------------------------------------------------------------

-- | Resolve a static value
unStatic :: Typeable a => Static a -> Process a
unStatic :: forall a. Typeable a => Static a -> Process a
unStatic Static a
static = do
  RemoteTable
rtable <- LocalNode -> RemoteTable
remoteTable (LocalNode -> RemoteTable)
-> (LocalProcess -> LocalNode) -> LocalProcess -> RemoteTable
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalProcess -> LocalNode
processNode (LocalProcess -> RemoteTable)
-> Process LocalProcess -> Process RemoteTable
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  case RemoteTable -> Static a -> Either String a
forall a. Typeable a => RemoteTable -> Static a -> Either String a
Static.unstatic RemoteTable
rtable Static a
static of
    Left String
err -> String -> Process a
forall a. String -> Process a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Process a) -> String -> Process a
forall a b. (a -> b) -> a -> b
$ String
"Could not resolve static value: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
err
    Right a
x  -> a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x

-- | Resolve a closure
unClosure :: Typeable a => Closure a -> Process a
unClosure :: forall a. Typeable a => Closure a -> Process a
unClosure Closure a
closure = do
  RemoteTable
rtable <- LocalNode -> RemoteTable
remoteTable (LocalNode -> RemoteTable)
-> (LocalProcess -> LocalNode) -> LocalProcess -> RemoteTable
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalProcess -> LocalNode
processNode (LocalProcess -> RemoteTable)
-> Process LocalProcess -> Process RemoteTable
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  case RemoteTable -> Closure a -> Either String a
forall a. Typeable a => RemoteTable -> Closure a -> Either String a
Static.unclosure RemoteTable
rtable Closure a
closure of
    Left String
err -> String -> Process a
forall a. String -> Process a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Process a) -> String -> Process a
forall a b. (a -> b) -> a -> b
$ String
"Could not resolve closure: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
err
    Right a
x  -> a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x

--------------------------------------------------------------------------------
-- Reconnecting                                                               --
--------------------------------------------------------------------------------

-- | Cloud Haskell provides the illusion of connection-less, reliable, ordered
-- message passing. However, when network connections get disrupted this
-- illusion cannot always be maintained. Once a network connection breaks (even
-- temporarily) no further communication on that connection will be possible.
-- For example, if process A sends a message to process B, and A is then
-- notified (by monitor notification) that it got disconnected from B, A will
-- not be able to send any further messages to B, /unless/ A explicitly
-- indicates that it is acceptable to attempt to reconnect to B using the
-- Cloud Haskell 'reconnect' primitive.
--
-- Importantly, when A calls 'reconnect' it acknowledges that some messages to
-- B might have been lost. For instance, if A sends messages m1 and m2 to B,
-- then receives a monitor notification that its connection to B has been lost,
-- calls 'reconnect' and then sends m3, it is possible that B will receive m1
-- and m3 but not m2.
--
-- Note that 'reconnect' does not mean /reconnect now/ but rather /it is okay
-- to attempt to reconnect on the next send/. In particular, if no further
-- communication attempts are made to B then A can use reconnect to clean up
-- its connection to B.
reconnect :: ProcessId -> Process ()
reconnect :: ProcessId -> Process ()
reconnect ProcessId
them = do
  ProcessId
us <- Process ProcessId
getSelfPid
  LocalNode
node <- LocalProcess -> LocalNode
processNode (LocalProcess -> LocalNode)
-> Process LocalProcess -> Process LocalNode
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> Identifier -> Identifier -> IO ()
disconnect LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
us) (ProcessId -> Identifier
ProcessIdentifier ProcessId
them)

-- | Reconnect to a sendport. See 'reconnect' for more information.
reconnectPort :: SendPort a -> Process ()
reconnectPort :: forall a. SendPort a -> Process ()
reconnectPort SendPort a
them = do
  ProcessId
us <- Process ProcessId
getSelfPid
  LocalNode
node <- LocalProcess -> LocalNode
processNode (LocalProcess -> LocalNode)
-> Process LocalProcess -> Process LocalNode
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> Identifier -> Identifier -> IO ()
disconnect LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
us) (SendPortId -> Identifier
SendPortIdentifier (SendPort a -> SendPortId
forall a. SendPort a -> SendPortId
sendPortId SendPort a
them))

--------------------------------------------------------------------------------
-- Auxiliary functions                                                        --
--------------------------------------------------------------------------------

sendLocal :: (Serializable a) => ProcessId -> a -> Process ()
sendLocal :: forall a. Serializable a => ProcessId -> a -> Process ()
sendLocal ProcessId
to a
msg =
  Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> Message -> ProcessSignal
LocalSend ProcessId
to (a -> Message
forall a. Serializable a => a -> Message
createUnencodedMessage a
msg)

sendChanLocal :: (Serializable a) => SendPortId -> a -> Process ()
sendChanLocal :: forall a. Serializable a => SendPortId -> a -> Process ()
sendChanLocal SendPortId
spId a
msg =
  -- we *must* fully serialize/encode the message here, because
  -- attempting to use `unsafeCoerce' in the node controller
  -- won't work since we know nothing about the required type
  Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ SendPortId -> Message -> ProcessSignal
LocalPortSend SendPortId
spId (a -> Message
forall a. Serializable a => a -> Message
createUnencodedMessage a
msg)

getMonitorRefFor :: Identifier -> Process MonitorRef
getMonitorRefFor :: Identifier -> Process MonitorRef
getMonitorRefFor Identifier
ident = do
  LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  IO MonitorRef -> Process MonitorRef
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO MonitorRef -> Process MonitorRef)
-> IO MonitorRef -> Process MonitorRef
forall a b. (a -> b) -> a -> b
$ StrictMVar LocalProcessState
-> (LocalProcessState -> IO (LocalProcessState, MonitorRef))
-> IO MonitorRef
forall a b. StrictMVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (LocalProcess -> StrictMVar LocalProcessState
processState LocalProcess
proc) ((LocalProcessState -> IO (LocalProcessState, MonitorRef))
 -> IO MonitorRef)
-> (LocalProcessState -> IO (LocalProcessState, MonitorRef))
-> IO MonitorRef
forall a b. (a -> b) -> a -> b
$ \LocalProcessState
st -> do
    let counter :: Int32
counter = LocalProcessState
st LocalProcessState -> T LocalProcessState Int32 -> Int32
forall r a. r -> T r a -> a
^. T LocalProcessState Int32
monitorCounter
    (LocalProcessState, MonitorRef)
-> IO (LocalProcessState, MonitorRef)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ( T LocalProcessState Int32
monitorCounter T LocalProcessState Int32
-> (Int32 -> Int32) -> LocalProcessState -> LocalProcessState
forall r a. T r a -> (a -> a) -> r -> r
^: (Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
1) (LocalProcessState -> LocalProcessState)
-> LocalProcessState -> LocalProcessState
forall a b. (a -> b) -> a -> b
$ LocalProcessState
st
           , Identifier -> Int32 -> MonitorRef
MonitorRef Identifier
ident Int32
counter
           )

getSpawnRef :: Process SpawnRef
getSpawnRef :: Process SpawnRef
getSpawnRef = do
  LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  IO SpawnRef -> Process SpawnRef
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO SpawnRef -> Process SpawnRef)
-> IO SpawnRef -> Process SpawnRef
forall a b. (a -> b) -> a -> b
$ StrictMVar LocalProcessState
-> (LocalProcessState -> IO (LocalProcessState, SpawnRef))
-> IO SpawnRef
forall a b. StrictMVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (LocalProcess -> StrictMVar LocalProcessState
processState LocalProcess
proc) ((LocalProcessState -> IO (LocalProcessState, SpawnRef))
 -> IO SpawnRef)
-> (LocalProcessState -> IO (LocalProcessState, SpawnRef))
-> IO SpawnRef
forall a b. (a -> b) -> a -> b
$ \LocalProcessState
st -> do
    let counter :: Int32
counter = LocalProcessState
st LocalProcessState -> T LocalProcessState Int32 -> Int32
forall r a. r -> T r a -> a
^. T LocalProcessState Int32
spawnCounter
    (LocalProcessState, SpawnRef) -> IO (LocalProcessState, SpawnRef)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ( T LocalProcessState Int32
spawnCounter T LocalProcessState Int32
-> (Int32 -> Int32) -> LocalProcessState -> LocalProcessState
forall r a. T r a -> (a -> a) -> r -> r
^: (Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
1) (LocalProcessState -> LocalProcessState)
-> LocalProcessState -> LocalProcessState
forall a b. (a -> b) -> a -> b
$ LocalProcessState
st
           , Int32 -> SpawnRef
SpawnRef Int32
counter
           )

-- | Monitor a process/node/channel
monitor' :: Identifier -> Process MonitorRef
monitor' :: Identifier -> Process MonitorRef
monitor' Identifier
ident = do
  MonitorRef
monitorRef <- Identifier -> Process MonitorRef
getMonitorRefFor Identifier
ident
  Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ MonitorRef -> ProcessSignal
Monitor MonitorRef
monitorRef
  MonitorRef -> Process MonitorRef
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return MonitorRef
monitorRef

-- | Link to a process/node/channel
link' :: Identifier -> Process ()
link' :: Identifier -> Process ()
link' = Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ())
-> (Identifier -> ProcessSignal) -> Identifier -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identifier -> ProcessSignal
Link