{-# LANGUAGE CPP  #-}
{-# LANGUAGE DeriveDataTypeable  #-}
{-# LANGUAGE ScopedTypeVariables  #-}
{-# LANGUAGE RankNTypes  #-}
{-# LANGUAGE BangPatterns  #-}
{-# LANGUAGE GeneralizedNewtypeDeriving  #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE MagicHash #-}

-- | Local nodes
--
module Control.Distributed.Process.Node
  ( LocalNode
  , newLocalNode
  , closeLocalNode
  , forkProcess
  , runProcess
  , initRemoteTable
  , localNodeId
  ) where

-- TODO: Calls to 'sendBinary' and co (by the NC) may stall the node controller.

import System.IO (fixIO, hPutStrLn, stderr)
import System.Mem.Weak (Weak, deRefWeak)
import qualified Data.ByteString.Lazy as BSL (fromChunks)
import Data.Binary (decode)
import Data.Map (Map)
import qualified Data.Map as Map
  ( empty
  , toList
  , fromList
  , partition
  , partitionWithKey
  , elems
  , size
  , filterWithKey
  , foldlWithKey
  )
import Data.Time.Format (formatTime)
#if MIN_VERSION_time(1,5,0)
import Data.Time.Format (defaultTimeLocale)
#else
import System.Locale (defaultTimeLocale)
#endif
import Data.Set (Set)
import qualified Data.Set as Set
  ( empty
  , insert
  , delete
  , map
  , member
  , toList
  , union
  )
import Data.Foldable (forM_)
import Data.Maybe (isJust, fromJust, isNothing, catMaybes)
import Data.Typeable (Typeable)
import Control.Category ((>>>))
import Control.Applicative
import Control.Monad (void, when, join)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.State.Strict (MonadState, StateT, evalStateT, gets)
import qualified Control.Monad.State.Strict as StateT (get, put)
import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask)
import Control.Exception
  ( throwIO
  , SomeException
  , Exception
  , throwTo
  , uninterruptibleMask_
  , getMaskingState
  , MaskingState(..)
  )
import qualified Control.Exception as Exception
  ( Handler(..)
  , catch
  , catches
  , finally
  )
import Control.Concurrent (forkIO, killThread)
import Control.Distributed.Process.Internal.BiMultiMap (BiMultiMap)
import qualified Control.Distributed.Process.Internal.BiMultiMap as BiMultiMap
import Control.Distributed.Process.Internal.StrictMVar
  ( newMVar
  , withMVar
  , modifyMVarMasked
  , modifyMVar
  , newEmptyMVar
  , putMVar
  , takeMVar
  )
import Control.Concurrent.Chan (newChan, writeChan, readChan)
import qualified Control.Concurrent.MVar as MVar (newEmptyMVar, takeMVar)
import Control.Concurrent.STM
  ( atomically
  )
import Control.Distributed.Process.Internal.CQueue
  ( CQueue
  , enqueue
  , newCQueue
  , mkWeakCQueue
  , queueSize
  )
import qualified Network.Transport as NT
  ( Transport
  , EndPoint
  , newEndPoint
  , receive
  , Event(..)
  , EventErrorCode(..)
  , TransportError(..)
  , address
  , closeEndPoint
  , Connection
  , ConnectionId
  , close
  , EndPointAddress
  , Reliability(ReliableOrdered)
  )
import Data.Accessor (Accessor, accessor, (^.), (^=), (^:))
import System.Random (randomIO)
import Control.Distributed.Static (RemoteTable, Closure)
import qualified Control.Distributed.Static as Static
  ( unclosure
  , initRemoteTable
  )
import Control.Distributed.Process.Internal.Types
  ( NodeId(..)
  , LocalProcessId(..)
  , ProcessId(..)
  , LocalNode(..)
  , MxEventBus(..)
  , LocalNodeState(..)
  , ValidLocalNodeState(..)
  , withValidLocalState
  , modifyValidLocalState
  , LocalProcess(..)
  , LocalProcessState(..)
  , Process(..)
  , DiedReason(..)
  , NCMsg(..)
  , ProcessSignal(..)
  , localPidCounter
  , localPidUnique
  , localProcessWithId
  , localProcesses
  , localConnections
  , forever'
  , MonitorRef(..)
  , NodeClosedException(..)
  , ProcessMonitorNotification(..)
  , NodeMonitorNotification(..)
  , PortMonitorNotification(..)
  , ProcessExitException(..)
  , ProcessLinkException(..)
  , NodeLinkException(..)
  , PortLinkException(..)
  , DidUnmonitor(..)
  , DidUnlinkProcess(..)
  , DidUnlinkNode(..)
  , DidUnlinkPort(..)
  , SpawnRef
  , DidSpawn(..)
  , Message(..)
  , TypedChannel(..)
  , Identifier(..)
  , nodeOf
  , ProcessInfo(..)
  , ProcessInfoNone(..)
  , NodeStats(..)
  , SendPortId(..)
  , typedChannelWithId
  , RegisterReply(..)
  , WhereIsReply(..)
  , payloadToMessage
  , createUnencodedMessage
  , unsafeCreateUnencodedMessage
  , runLocalProcess
  , firstNonReservedProcessId
  , ImplicitReconnect(WithImplicitReconnect)
  )
import Control.Distributed.Process.Management.Internal.Agent
  ( mxAgentController
  )
import Control.Distributed.Process.Management.Internal.Types
  ( MxEvent(..)
  )
import qualified Control.Distributed.Process.Management.Internal.Trace.Remote as Trace
  ( remoteTable
  )
import Control.Distributed.Process.Management.Internal.Trace.Tracer
  ( defaultTracer
  )
import Control.Distributed.Process.Management.Internal.Trace.Types
  ( TraceArg(..)
  , traceEvent
  , traceLogFmt
  , enableTrace
  )
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process.Internal.Messaging
  ( sendBinary
  , closeImplicitReconnections
  , impliesDeathOf
  )
import Control.Distributed.Process.Internal.Primitives
  ( register
  , receiveWait
  , match
  , sendChan
  , unwrapMessage
  , SayMessage(..)
  )
import Control.Distributed.Process.Internal.Types (SendPort, Tracer(..))
import qualified Control.Distributed.Process.Internal.Closure.BuiltIn as BuiltIn (remoteTable)
import Control.Distributed.Process.Internal.WeakTQueue (TQueue, writeTQueue)
import qualified Control.Distributed.Process.Internal.StrictContainerAccessors as DAC
  ( mapMaybe
  , mapDefault
  )
import Control.Monad.Catch (try)
import GHC.IO (IO(..), unsafeUnmask)
import GHC.Base ( maskAsyncExceptions# )

import Unsafe.Coerce
import Prelude

-- Remove these definitions when the fix for
-- https://ghc.haskell.org/trac/ghc/ticket/10149
-- is included in all supported compilers:
block :: IO a -> IO a
block :: forall a. IO a -> IO a
block (IO State# RealWorld -> (# State# RealWorld, a #)
io) = (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
IO ((State# RealWorld -> (# State# RealWorld, a #)) -> IO a)
-> (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
forall a b. (a -> b) -> a -> b
$ (State# RealWorld -> (# State# RealWorld, a #))
-> State# RealWorld -> (# State# RealWorld, a #)
forall a.
(State# RealWorld -> (# State# RealWorld, a #))
-> State# RealWorld -> (# State# RealWorld, a #)
maskAsyncExceptions# State# RealWorld -> (# State# RealWorld, a #)
io
unblock :: IO a -> IO a
unblock :: forall a. IO a -> IO a
unblock = IO a -> IO a
forall a. IO a -> IO a
unsafeUnmask

--------------------------------------------------------------------------------
-- Initialization                                                             --
--------------------------------------------------------------------------------

initRemoteTable :: RemoteTable
initRemoteTable :: RemoteTable
initRemoteTable = RemoteTable -> RemoteTable
Trace.remoteTable (RemoteTable -> RemoteTable) -> RemoteTable -> RemoteTable
forall a b. (a -> b) -> a -> b
$ RemoteTable -> RemoteTable
BuiltIn.remoteTable RemoteTable
Static.initRemoteTable

-- | Initialize a new local node.
newLocalNode :: NT.Transport -> RemoteTable -> IO LocalNode
newLocalNode :: Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
transport RemoteTable
rtable = do
    Either (TransportError NewEndPointErrorCode) EndPoint
mEndPoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
NT.newEndPoint Transport
transport
    case Either (TransportError NewEndPointErrorCode) EndPoint
mEndPoint of
      Left TransportError NewEndPointErrorCode
ex -> TransportError NewEndPointErrorCode -> IO LocalNode
forall e a. Exception e => e -> IO a
throwIO TransportError NewEndPointErrorCode
ex
      Right EndPoint
endPoint -> do
        LocalNode
localNode <- EndPoint -> RemoteTable -> IO LocalNode
createBareLocalNode EndPoint
endPoint RemoteTable
rtable
        LocalNode -> IO ()
startServiceProcesses LocalNode
localNode
        LocalNode -> IO LocalNode
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return LocalNode
localNode

-- | Create a new local node (without any service processes running)
createBareLocalNode :: NT.EndPoint -> RemoteTable -> IO LocalNode
createBareLocalNode :: EndPoint -> RemoteTable -> IO LocalNode
createBareLocalNode EndPoint
endPoint RemoteTable
rtable = do
    Int32
unq <- IO Int32
forall a (m :: * -> *). (Random a, MonadIO m) => m a
randomIO
    StrictMVar LocalNodeState
state <- LocalNodeState -> IO (StrictMVar LocalNodeState)
forall a. a -> IO (StrictMVar a)
newMVar (LocalNodeState -> IO (StrictMVar LocalNodeState))
-> LocalNodeState -> IO (StrictMVar LocalNodeState)
forall a b. (a -> b) -> a -> b
$ ValidLocalNodeState -> LocalNodeState
LocalNodeValid (ValidLocalNodeState -> LocalNodeState)
-> ValidLocalNodeState -> LocalNodeState
forall a b. (a -> b) -> a -> b
$ ValidLocalNodeState
      { _localProcesses :: Map LocalProcessId LocalProcess
_localProcesses   = Map LocalProcessId LocalProcess
forall k a. Map k a
Map.empty
      , _localPidCounter :: Int32
_localPidCounter  = Int32
firstNonReservedProcessId
      , _localPidUnique :: Int32
_localPidUnique   = Int32
unq
      , _localConnections :: Map (Identifier, Identifier) (Connection, ImplicitReconnect)
_localConnections = Map (Identifier, Identifier) (Connection, ImplicitReconnect)
forall k a. Map k a
Map.empty
      }
    Chan NCMsg
ctrlChan <- IO (Chan NCMsg)
forall a. IO (Chan a)
newChan
    let node :: LocalNode
node = LocalNode { localNodeId :: NodeId
localNodeId   = EndPointAddress -> NodeId
NodeId (EndPointAddress -> NodeId) -> EndPointAddress -> NodeId
forall a b. (a -> b) -> a -> b
$ EndPoint -> EndPointAddress
NT.address EndPoint
endPoint
                         , localEndPoint :: EndPoint
localEndPoint = EndPoint
endPoint
                         , localState :: StrictMVar LocalNodeState
localState    = StrictMVar LocalNodeState
state
                         , localCtrlChan :: Chan NCMsg
localCtrlChan = Chan NCMsg
ctrlChan
                         , localEventBus :: MxEventBus
localEventBus = MxEventBus
MxEventBusInitialising
                         , remoteTable :: RemoteTable
remoteTable   = RemoteTable
rtable
                         }
    LocalNode
tracedNode <- LocalNode -> IO LocalNode
startMxAgent LocalNode
node

    -- Once the NC terminates, the endpoint isn't much use,
    IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
Exception.finally (LocalNode -> IO ()
runNodeController LocalNode
tracedNode)
                                      (EndPoint -> IO ()
NT.closeEndPoint (LocalNode -> EndPoint
localEndPoint LocalNode
node))

    -- whilst a closed/failing endpoint will terminate the NC
    IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
Exception.finally (LocalNode -> IO ()
handleIncomingMessages LocalNode
tracedNode)
                                      (LocalNode -> IO ()
stopNC LocalNode
node)

    LocalNode -> IO LocalNode
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return LocalNode
tracedNode
  where
    stopNC :: LocalNode -> IO ()
stopNC LocalNode
node =
       Chan NCMsg -> NCMsg -> IO ()
forall a. Chan a -> a -> IO ()
writeChan (LocalNode -> Chan NCMsg
localCtrlChan LocalNode
node) NCMsg
            { ctrlMsgSender :: Identifier
ctrlMsgSender = NodeId -> Identifier
NodeIdentifier (LocalNode -> NodeId
localNodeId LocalNode
node)
            , ctrlMsgSignal :: ProcessSignal
ctrlMsgSignal = ProcessSignal
SigShutdown
            }

startMxAgent :: LocalNode -> IO LocalNode
startMxAgent :: LocalNode -> IO LocalNode
startMxAgent LocalNode
node = do
  -- see note [tracer/forkProcess races]
  let fork :: Process () -> IO ProcessId
fork = LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
node
  MVar AgentConfig
mv <- IO (MVar AgentConfig)
forall a. IO (MVar a)
MVar.newEmptyMVar
  ProcessId
pid <- Process () -> IO ProcessId
fork (Process () -> IO ProcessId) -> Process () -> IO ProcessId
forall a b. (a -> b) -> a -> b
$ (Process () -> IO ProcessId) -> MVar AgentConfig -> Process ()
mxAgentController Process () -> IO ProcessId
fork MVar AgentConfig
mv
  (Tracer
tracer', Weak (CQueue Message)
wqRef, ((TChan Message, TChan Message) -> Process ()) -> IO ProcessId
mxNew') <- MVar AgentConfig -> IO AgentConfig
forall a. MVar a -> IO a
MVar.takeMVar MVar AgentConfig
mv
  LocalNode -> IO LocalNode
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return LocalNode
node { localEventBus = (MxEventBus pid tracer' wqRef mxNew') }

startDefaultTracer :: LocalNode -> IO ()
startDefaultTracer :: LocalNode -> IO ()
startDefaultTracer LocalNode
node' = do
  let t :: MxEventBus
t = LocalNode -> MxEventBus
localEventBus LocalNode
node'
  case MxEventBus
t of
    MxEventBus ProcessId
_ (Tracer ProcessId
pid Weak (CQueue Message)
_) Weak (CQueue Message)
_ ((TChan Message, TChan Message) -> Process ()) -> IO ProcessId
_ -> do
      LocalNode -> Process () -> IO ()
runProcess LocalNode
node' (Process () -> IO ()) -> Process () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> ProcessId -> Process ()
register String
"trace.controller" ProcessId
pid
      ProcessId
pid' <- LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
node' Process ()
defaultTracer
      MxEventBus -> ProcessId -> IO ()
enableTrace (LocalNode -> MxEventBus
localEventBus LocalNode
node') ProcessId
pid'
      LocalNode -> Process () -> IO ()
runProcess LocalNode
node' (Process () -> IO ()) -> Process () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> ProcessId -> Process ()
register String
"tracer.initial" ProcessId
pid'
    MxEventBus
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- TODO: we need a better mechanism for defining and registering services

-- | Start and register the service processes on a node
startServiceProcesses :: LocalNode -> IO ()
startServiceProcesses :: LocalNode -> IO ()
startServiceProcesses LocalNode
node = do
  -- tracing /spawns/ relies on the tracer being enabled, but we start
  -- the default tracer first, even though it might @nsend@ to the logger
  -- before /that/ process has started - this is a totally harmless race
  -- however, so we deliberably ignore it
  LocalNode -> IO ()
startDefaultTracer LocalNode
node
  ProcessId
logger <- LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
node Process ()
loop
  LocalNode -> Process () -> IO ()
runProcess LocalNode
node (Process () -> IO ()) -> Process () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    String -> ProcessId -> Process ()
register String
"logger" ProcessId
logger
    -- The trace.logger is used for tracing to the console to avoid feedback
    -- loops during tracing if the user reregisters the "logger" with a custom
    -- process which uses 'send' or other primitives which are traced.
    String -> ProcessId -> Process ()
register String
"trace.logger" ProcessId
logger
 where
   loop :: Process ()
loop = do
     [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait
       [ (SayMessage -> Process ()) -> Match ()
forall a b. Serializable a => (a -> Process b) -> Match b
match ((SayMessage -> Process ()) -> Match ())
-> (SayMessage -> Process ()) -> Match ()
forall a b. (a -> b) -> a -> b
$ \(SayMessage UTCTime
time ProcessId
pid String
string) -> do
           let time' :: String
time' = TimeLocale -> String -> UTCTime -> String
forall t. FormatTime t => TimeLocale -> String -> t -> String
formatTime TimeLocale
defaultTimeLocale String
"%c" UTCTime
time
           IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> (String -> IO ()) -> String -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
time' String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
": " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
string
           Process ()
loop
       , ((String, String) -> Process ()) -> Match ()
forall a b. Serializable a => (a -> Process b) -> Match b
match (((String, String) -> Process ()) -> Match ())
-> ((String, String) -> Process ()) -> Match ()
forall a b. (a -> b) -> a -> b
$ \((String
time, String
string) :: (String, String)) -> do
           -- this is a 'trace' message from the local node tracer
           IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> (String -> IO ()) -> String -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
time String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" [trace] " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
string
           Process ()
loop
       , (SendPort () -> Process ()) -> Match ()
forall a b. Serializable a => (a -> Process b) -> Match b
match ((SendPort () -> Process ()) -> Match ())
-> (SendPort () -> Process ()) -> Match ()
forall a b. (a -> b) -> a -> b
$ \(SendPort ()
ch :: SendPort ()) -> -- a shutdown request
           SendPort () -> () -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort ()
ch ()
       ]

-- | Force-close a local node, killing all processes on that node.
closeLocalNode :: LocalNode -> IO ()
closeLocalNode :: LocalNode -> IO ()
closeLocalNode LocalNode
node = do
  -- Kill processes after refilling the mvar. Otherwise, there is potential for
  -- deadlock as a dying process tries to get the mvar while masking exceptions
  -- uninterruptibly.
  IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ StrictMVar LocalNodeState
-> (LocalNodeState -> IO (LocalNodeState, IO ())) -> IO (IO ())
forall a b. StrictMVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (LocalNode -> StrictMVar LocalNodeState
localState LocalNode
node) ((LocalNodeState -> IO (LocalNodeState, IO ())) -> IO (IO ()))
-> (LocalNodeState -> IO (LocalNodeState, IO ())) -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ \LocalNodeState
st -> case LocalNodeState
st of
    LocalNodeValid ValidLocalNodeState
vst -> do
      (LocalNodeState, IO ()) -> IO (LocalNodeState, IO ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ( LocalNodeState
LocalNodeClosed
             , Map LocalProcessId LocalProcess -> (LocalProcess -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ValidLocalNodeState
vst ValidLocalNodeState
-> T ValidLocalNodeState (Map LocalProcessId LocalProcess)
-> Map LocalProcessId LocalProcess
forall r a. r -> T r a -> a
^. T ValidLocalNodeState (Map LocalProcessId LocalProcess)
localProcesses) ((LocalProcess -> IO ()) -> IO ())
-> (LocalProcess -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \LocalProcess
lproc ->
                 -- Semantics of 'throwTo' guarantee that target thread will get
                 -- delivered an exception. Therefore, target thread will be
                 -- killed eventually and that's as good as we can do. No need
                 -- to wait for thread to actually finish dying.
                 ThreadId -> IO ()
killThread (LocalProcess -> ThreadId
processThread LocalProcess
lproc)
             )
    LocalNodeState
LocalNodeClosed -> (LocalNodeState, IO ()) -> IO (LocalNodeState, IO ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (LocalNodeState
LocalNodeClosed, () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
  -- This call will have the effect of shutting down the NC as well (see
  -- 'createBareLocalNode').
  EndPoint -> IO ()
NT.closeEndPoint (LocalNode -> EndPoint
localEndPoint LocalNode
node)

-- | Run a process on a local node and wait for it to finish
runProcess :: LocalNode -> Process () -> IO ()
runProcess :: LocalNode -> Process () -> IO ()
runProcess LocalNode
node Process ()
proc = do
  StrictMVar (Either SomeException ())
done <- IO (StrictMVar (Either SomeException ()))
forall a. IO (StrictMVar a)
newEmptyMVar
  -- TODO; When forkProcess inherits the masking state, protect the forked
  -- thread against async exceptions that could occur before 'try' is evaluated.
  IO ProcessId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ProcessId -> IO ()) -> IO ProcessId -> IO ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
node (Process () -> IO ProcessId) -> Process () -> IO ProcessId
forall a b. (a -> b) -> a -> b
$ Process () -> Process (Either SomeException ())
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
try Process ()
proc Process (Either SomeException ())
-> (Either SomeException () -> Process ()) -> Process ()
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ())
-> (Either SomeException () -> IO ())
-> Either SomeException ()
-> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StrictMVar (Either SomeException ())
-> Either SomeException () -> IO ()
forall a. StrictMVar a -> a -> IO ()
putMVar StrictMVar (Either SomeException ())
done
  StrictMVar (Either SomeException ())
-> IO (Either SomeException ())
forall a. StrictMVar a -> IO a
takeMVar StrictMVar (Either SomeException ())
done IO (Either SomeException ())
-> (Either SomeException () -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (SomeException -> IO ())
-> (() -> IO ()) -> Either SomeException () -> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (SomeException -> IO a
forall {a}. SomeException -> IO a
forall e a. Exception e => e -> IO a
throwIO :: SomeException -> IO a) () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return

-- | Spawn a new process on a local node
forkProcess :: LocalNode -> Process () -> IO ProcessId
forkProcess :: LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
node Process ()
proc = do
    MaskingState
ms <- IO MaskingState
getMaskingState
    StrictMVar LocalNodeState
-> (LocalNodeState -> IO (LocalNodeState, ProcessId))
-> IO ProcessId
forall a b. StrictMVar a -> (a -> IO (a, b)) -> IO b
modifyMVarMasked (LocalNode -> StrictMVar LocalNodeState
localState LocalNode
node) (MaskingState -> LocalNodeState -> IO (LocalNodeState, ProcessId)
startProcess MaskingState
ms)
  where
    startProcess :: MaskingState
                 -> LocalNodeState
                 -> IO (LocalNodeState, ProcessId)
    startProcess :: MaskingState -> LocalNodeState -> IO (LocalNodeState, ProcessId)
startProcess MaskingState
ms (LocalNodeValid ValidLocalNodeState
vst) = do
      let lpid :: LocalProcessId
lpid  = LocalProcessId { lpidCounter :: Int32
lpidCounter = ValidLocalNodeState
vst ValidLocalNodeState -> T ValidLocalNodeState Int32 -> Int32
forall r a. r -> T r a -> a
^. T ValidLocalNodeState Int32
localPidCounter
                                 , lpidUnique :: Int32
lpidUnique  = ValidLocalNodeState
vst ValidLocalNodeState -> T ValidLocalNodeState Int32 -> Int32
forall r a. r -> T r a -> a
^. T ValidLocalNodeState Int32
localPidUnique
                                 }
      let pid :: ProcessId
pid   = ProcessId { processNodeId :: NodeId
processNodeId  = LocalNode -> NodeId
localNodeId LocalNode
node
                            , processLocalId :: LocalProcessId
processLocalId = LocalProcessId
lpid
                            }
      StrictMVar LocalProcessState
pst <- LocalProcessState -> IO (StrictMVar LocalProcessState)
forall a. a -> IO (StrictMVar a)
newMVar LocalProcessState { _monitorCounter :: Int32
_monitorCounter = Int32
0
                                       , _spawnCounter :: Int32
_spawnCounter   = Int32
0
                                       , _channelCounter :: Int32
_channelCounter = Int32
0
                                       , _typedChannels :: Map Int32 TypedChannel
_typedChannels  = Map Int32 TypedChannel
forall k a. Map k a
Map.empty
                                       }
      CQueue Message
queue <- IO (CQueue Message)
forall a. IO (CQueue a)
newCQueue
      Weak (CQueue Message)
weakQueue <- CQueue Message -> IO () -> IO (Weak (CQueue Message))
forall a. CQueue a -> IO () -> IO (Weak (CQueue a))
mkWeakCQueue CQueue Message
queue (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
      (ThreadId
_, LocalProcess
lproc) <- ((ThreadId, LocalProcess) -> IO (ThreadId, LocalProcess))
-> IO (ThreadId, LocalProcess)
forall a. (a -> IO a) -> IO a
fixIO (((ThreadId, LocalProcess) -> IO (ThreadId, LocalProcess))
 -> IO (ThreadId, LocalProcess))
-> ((ThreadId, LocalProcess) -> IO (ThreadId, LocalProcess))
-> IO (ThreadId, LocalProcess)
forall a b. (a -> b) -> a -> b
$ \ ~(ThreadId
tid, LocalProcess
_) -> do
        let lproc :: LocalProcess
lproc = LocalProcess { processQueue :: CQueue Message
processQueue  = CQueue Message
queue
                                 , processWeakQ :: Weak (CQueue Message)
processWeakQ  = Weak (CQueue Message)
weakQueue
                                 , processId :: ProcessId
processId     = ProcessId
pid
                                 , processState :: StrictMVar LocalProcessState
processState  = StrictMVar LocalProcessState
pst
                                 , processThread :: ThreadId
processThread = ThreadId
tid
                                 , processNode :: LocalNode
processNode   = LocalNode
node
                                 }
        -- Rewrite this code when this is fixed:
        -- https://ghc.haskell.org/trac/ghc/ticket/10149
        let unmask :: IO a -> IO a
unmask = case MaskingState
ms of
              MaskingState
Unmasked              -> IO a -> IO a
forall a. IO a -> IO a
unblock
              MaskingState
MaskedInterruptible   -> IO a -> IO a
forall a. IO a -> IO a
block
              MaskingState
MaskedUninterruptible -> IO a -> IO a
forall a. a -> a
id
        ThreadId
tid' <- IO ThreadId -> IO ThreadId
forall a. IO a -> IO a
uninterruptibleMask_ (IO ThreadId -> IO ThreadId) -> IO ThreadId -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
          DiedReason
reason <- IO DiedReason -> [Handler DiedReason] -> IO DiedReason
forall a. IO a -> [Handler a] -> IO a
Exception.catches
            (IO DiedReason -> IO DiedReason
forall a. IO a -> IO a
unmask (IO DiedReason -> IO DiedReason) -> IO DiedReason -> IO DiedReason
forall a b. (a -> b) -> a -> b
$ LocalProcess -> Process () -> IO ()
forall a. LocalProcess -> Process a -> IO a
runLocalProcess LocalProcess
lproc Process ()
proc IO () -> IO DiedReason -> IO DiedReason
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> DiedReason -> IO DiedReason
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return DiedReason
DiedNormal)
            [ ((ProcessExitException -> IO DiedReason) -> Handler DiedReason
forall a e. Exception e => (e -> IO a) -> Handler a
Exception.Handler (\ex :: ProcessExitException
ex@(ProcessExitException ProcessId
from Message
msg) -> do
                 Maybe String
mMsg <- Message -> IO (Maybe String)
forall (m :: * -> *) a.
(Monad m, Serializable a) =>
Message -> m (Maybe a)
unwrapMessage Message
msg :: IO (Maybe String)
                 case Maybe String
mMsg of
                   Maybe String
Nothing -> DiedReason -> IO DiedReason
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DiedReason -> IO DiedReason) -> DiedReason -> IO DiedReason
forall a b. (a -> b) -> a -> b
$ String -> DiedReason
DiedException (String -> DiedReason) -> String -> DiedReason
forall a b. (a -> b) -> a -> b
$ ProcessExitException -> String
forall a. Show a => a -> String
show ProcessExitException
ex
                   Just String
m  -> DiedReason -> IO DiedReason
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DiedReason -> IO DiedReason) -> DiedReason -> IO DiedReason
forall a b. (a -> b) -> a -> b
$ String -> DiedReason
DiedException (String
"exit-from=" String -> String -> String
forall a. [a] -> [a] -> [a]
++ (ProcessId -> String
forall a. Show a => a -> String
show ProcessId
from) String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
",reason=" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
m)))
            , ((SomeException -> IO DiedReason) -> Handler DiedReason
forall a e. Exception e => (e -> IO a) -> Handler a
Exception.Handler
                (DiedReason -> IO DiedReason
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DiedReason -> IO DiedReason)
-> (SomeException -> DiedReason) -> SomeException -> IO DiedReason
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> DiedReason
DiedException (String -> DiedReason)
-> (SomeException -> String) -> SomeException -> DiedReason
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (SomeException -> String
forall a. Show a => a -> String
show :: SomeException -> String)))]

          -- [Unified: Table 4, rules termination and exiting]
          Maybe [Connection]
mconns <- LocalNode
-> (ValidLocalNodeState -> IO (ValidLocalNodeState, [Connection]))
-> IO (Maybe [Connection])
forall a.
LocalNode
-> (ValidLocalNodeState -> IO (ValidLocalNodeState, a))
-> IO (Maybe a)
modifyValidLocalState LocalNode
node (ProcessId
-> ValidLocalNodeState -> IO (ValidLocalNodeState, [Connection])
cleanupProcess ProcessId
pid)
          -- XXX: Revisit after agreeing on the bigger picture for the semantics
          -- of transport operations.
          -- https://github.com/haskell-distributed/distributed-process/issues/204
          Maybe [Connection] -> ([Connection] -> IO ThreadId) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe [Connection]
mconns (([Connection] -> IO ThreadId) -> IO ())
-> ([Connection] -> IO ThreadId) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId)
-> ([Connection] -> IO ()) -> [Connection] -> IO ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Connection -> IO ()) -> [Connection] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Connection -> IO ()
NT.close

          Chan NCMsg -> NCMsg -> IO ()
forall a. Chan a -> a -> IO ()
writeChan (LocalNode -> Chan NCMsg
localCtrlChan LocalNode
node) NCMsg
            { ctrlMsgSender :: Identifier
ctrlMsgSender = ProcessId -> Identifier
ProcessIdentifier ProcessId
pid
            , ctrlMsgSignal :: ProcessSignal
ctrlMsgSignal = Identifier -> DiedReason -> ProcessSignal
Died (ProcessId -> Identifier
ProcessIdentifier ProcessId
pid) DiedReason
reason
            }
        (ThreadId, LocalProcess) -> IO (ThreadId, LocalProcess)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId
tid', LocalProcess
lproc)

      -- see note [tracer/forkProcess races]
      LocalNode -> MxEvent -> IO ()
trace LocalNode
node (ProcessId -> MxEvent
MxSpawned ProcessId
pid)

      if LocalProcessId -> Int32
lpidCounter LocalProcessId
lpid Int32 -> Int32 -> Bool
forall a. Eq a => a -> a -> Bool
== Int32
forall a. Bounded a => a
maxBound
        then do
          -- TODO: this doesn't look right at all - how do we know
          -- that newUnique represents a process id that is available!?
          Int32
newUnique <- IO Int32
forall a (m :: * -> *). (Random a, MonadIO m) => m a
randomIO
          (LocalNodeState, ProcessId) -> IO (LocalNodeState, ProcessId)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ( ValidLocalNodeState -> LocalNodeState
LocalNodeValid
                 (ValidLocalNodeState -> LocalNodeState)
-> ValidLocalNodeState -> LocalNodeState
forall a b. (a -> b) -> a -> b
$ (LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId LocalProcessId
lpid Accessor ValidLocalNodeState (Maybe LocalProcess)
-> Maybe LocalProcess -> ValidLocalNodeState -> ValidLocalNodeState
forall r a. T r a -> a -> r -> r
^= LocalProcess -> Maybe LocalProcess
forall a. a -> Maybe a
Just LocalProcess
lproc)
                 (ValidLocalNodeState -> ValidLocalNodeState)
-> (ValidLocalNodeState -> ValidLocalNodeState)
-> ValidLocalNodeState
-> ValidLocalNodeState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (T ValidLocalNodeState Int32
localPidCounter T ValidLocalNodeState Int32
-> Int32 -> ValidLocalNodeState -> ValidLocalNodeState
forall r a. T r a -> a -> r -> r
^= Int32
firstNonReservedProcessId)
                 (ValidLocalNodeState -> ValidLocalNodeState)
-> (ValidLocalNodeState -> ValidLocalNodeState)
-> ValidLocalNodeState
-> ValidLocalNodeState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (T ValidLocalNodeState Int32
localPidUnique T ValidLocalNodeState Int32
-> Int32 -> ValidLocalNodeState -> ValidLocalNodeState
forall r a. T r a -> a -> r -> r
^= Int32
newUnique)
                 (ValidLocalNodeState -> ValidLocalNodeState)
-> ValidLocalNodeState -> ValidLocalNodeState
forall a b. (a -> b) -> a -> b
$ ValidLocalNodeState
vst
                 , ProcessId
pid
                 )
        else
          (LocalNodeState, ProcessId) -> IO (LocalNodeState, ProcessId)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ( ValidLocalNodeState -> LocalNodeState
LocalNodeValid
                 (ValidLocalNodeState -> LocalNodeState)
-> ValidLocalNodeState -> LocalNodeState
forall a b. (a -> b) -> a -> b
$ (LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId LocalProcessId
lpid Accessor ValidLocalNodeState (Maybe LocalProcess)
-> Maybe LocalProcess -> ValidLocalNodeState -> ValidLocalNodeState
forall r a. T r a -> a -> r -> r
^= LocalProcess -> Maybe LocalProcess
forall a. a -> Maybe a
Just LocalProcess
lproc)
                 (ValidLocalNodeState -> ValidLocalNodeState)
-> (ValidLocalNodeState -> ValidLocalNodeState)
-> ValidLocalNodeState
-> ValidLocalNodeState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (T ValidLocalNodeState Int32
localPidCounter T ValidLocalNodeState Int32
-> (Int32 -> Int32) -> ValidLocalNodeState -> ValidLocalNodeState
forall r a. T r a -> (a -> a) -> r -> r
^: (Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
1))
                 (ValidLocalNodeState -> ValidLocalNodeState)
-> ValidLocalNodeState -> ValidLocalNodeState
forall a b. (a -> b) -> a -> b
$ ValidLocalNodeState
vst
                 , ProcessId
pid
                 )
    startProcess MaskingState
_ LocalNodeState
LocalNodeClosed =
      NodeClosedException -> IO (LocalNodeState, ProcessId)
forall e a. Exception e => e -> IO a
throwIO (NodeClosedException -> IO (LocalNodeState, ProcessId))
-> NodeClosedException -> IO (LocalNodeState, ProcessId)
forall a b. (a -> b) -> a -> b
$ NodeId -> NodeClosedException
NodeClosedException (NodeId -> NodeClosedException) -> NodeId -> NodeClosedException
forall a b. (a -> b) -> a -> b
$ LocalNode -> NodeId
localNodeId LocalNode
node

    cleanupProcess :: ProcessId
                   -> ValidLocalNodeState
                   -> IO (ValidLocalNodeState, [NT.Connection])
    cleanupProcess :: ProcessId
-> ValidLocalNodeState -> IO (ValidLocalNodeState, [Connection])
cleanupProcess ProcessId
pid ValidLocalNodeState
vst = do
      let pid' :: Identifier
pid' = ProcessId -> Identifier
ProcessIdentifier ProcessId
pid
      let (Map (Identifier, Identifier) (Connection, ImplicitReconnect)
affected, Map (Identifier, Identifier) (Connection, ImplicitReconnect)
unaffected) = ((Identifier, Identifier)
 -> (Connection, ImplicitReconnect) -> Bool)
-> Map (Identifier, Identifier) (Connection, ImplicitReconnect)
-> (Map (Identifier, Identifier) (Connection, ImplicitReconnect),
    Map (Identifier, Identifier) (Connection, ImplicitReconnect))
forall k a. (k -> a -> Bool) -> Map k a -> (Map k a, Map k a)
Map.partitionWithKey (\(Identifier
fr, Identifier
_to) !(Connection, ImplicitReconnect)
_v -> Identifier -> Identifier -> Bool
impliesDeathOf Identifier
pid' Identifier
fr) (ValidLocalNodeState
vst ValidLocalNodeState
-> T ValidLocalNodeState
     (Map (Identifier, Identifier) (Connection, ImplicitReconnect))
-> Map (Identifier, Identifier) (Connection, ImplicitReconnect)
forall r a. r -> T r a -> a
^. T ValidLocalNodeState
  (Map (Identifier, Identifier) (Connection, ImplicitReconnect))
localConnections)
      (ValidLocalNodeState, [Connection])
-> IO (ValidLocalNodeState, [Connection])
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ( (LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId (ProcessId -> LocalProcessId
processLocalId ProcessId
pid) Accessor ValidLocalNodeState (Maybe LocalProcess)
-> Maybe LocalProcess -> ValidLocalNodeState -> ValidLocalNodeState
forall r a. T r a -> a -> r -> r
^= Maybe LocalProcess
forall a. Maybe a
Nothing)
               (ValidLocalNodeState -> ValidLocalNodeState)
-> (ValidLocalNodeState -> ValidLocalNodeState)
-> ValidLocalNodeState
-> ValidLocalNodeState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (T ValidLocalNodeState
  (Map (Identifier, Identifier) (Connection, ImplicitReconnect))
localConnections T ValidLocalNodeState
  (Map (Identifier, Identifier) (Connection, ImplicitReconnect))
-> Map (Identifier, Identifier) (Connection, ImplicitReconnect)
-> ValidLocalNodeState
-> ValidLocalNodeState
forall r a. T r a -> a -> r -> r
^= Map (Identifier, Identifier) (Connection, ImplicitReconnect)
unaffected)
               (ValidLocalNodeState -> ValidLocalNodeState)
-> ValidLocalNodeState -> ValidLocalNodeState
forall a b. (a -> b) -> a -> b
$ ValidLocalNodeState
vst
             , ((Connection, ImplicitReconnect) -> Connection)
-> [(Connection, ImplicitReconnect)] -> [Connection]
forall a b. (a -> b) -> [a] -> [b]
map (Connection, ImplicitReconnect) -> Connection
forall a b. (a, b) -> a
fst ([(Connection, ImplicitReconnect)] -> [Connection])
-> [(Connection, ImplicitReconnect)] -> [Connection]
forall a b. (a -> b) -> a -> b
$ Map (Identifier, Identifier) (Connection, ImplicitReconnect)
-> [(Connection, ImplicitReconnect)]
forall k a. Map k a -> [a]
Map.elems Map (Identifier, Identifier) (Connection, ImplicitReconnect)
affected
             )

-- note [tracer/forkProcess races]
--
-- Our startTracing function uses forkProcess to start the trace controller
-- process, and of course forkProcess attempts to call traceEvent once the
-- process has started. This is harmless, as the localEventBus is not updated
-- until /after/ the initial forkProcess completes, so the first call to
-- traceEvent behaves as if tracing were disabled (i.e., it is ignored).
--

--------------------------------------------------------------------------------
-- Handle incoming messages                                                   --
--------------------------------------------------------------------------------

type IncomingConnection = (NT.EndPointAddress, IncomingTarget)

data IncomingTarget =
    Uninit
  | ToProc ProcessId (Weak (CQueue Message))
  | ToChan SendPortId TypedChannel
  | ToNode

data ConnectionState = ConnectionState {
    ConnectionState -> Map ConnectionId IncomingConnection
_incoming     :: !(Map NT.ConnectionId IncomingConnection)
  , ConnectionState -> Map EndPointAddress (Set ConnectionId)
_incomingFrom :: !(Map NT.EndPointAddress (Set NT.ConnectionId))
  }

initConnectionState :: ConnectionState
initConnectionState :: ConnectionState
initConnectionState = ConnectionState {
    _incoming :: Map ConnectionId IncomingConnection
_incoming     = Map ConnectionId IncomingConnection
forall k a. Map k a
Map.empty
  , _incomingFrom :: Map EndPointAddress (Set ConnectionId)
_incomingFrom = Map EndPointAddress (Set ConnectionId)
forall k a. Map k a
Map.empty
  }

incoming :: Accessor ConnectionState (Map NT.ConnectionId IncomingConnection)
incoming :: Accessor ConnectionState (Map ConnectionId IncomingConnection)
incoming = (ConnectionState -> Map ConnectionId IncomingConnection)
-> (Map ConnectionId IncomingConnection
    -> ConnectionState -> ConnectionState)
-> Accessor ConnectionState (Map ConnectionId IncomingConnection)
forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor ConnectionState -> Map ConnectionId IncomingConnection
_incoming (\Map ConnectionId IncomingConnection
conns ConnectionState
st -> ConnectionState
st { _incoming = conns })

incomingAt :: NT.ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt :: ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid = Accessor ConnectionState (Map ConnectionId IncomingConnection)
incoming Accessor ConnectionState (Map ConnectionId IncomingConnection)
-> T (Map ConnectionId IncomingConnection)
     (Maybe IncomingConnection)
-> Accessor ConnectionState (Maybe IncomingConnection)
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> ConnectionId
-> T (Map ConnectionId IncomingConnection)
     (Maybe IncomingConnection)
forall key elem.
Ord key =>
key -> Accessor (Map key elem) (Maybe elem)
DAC.mapMaybe ConnectionId
cid

incomingFrom :: NT.EndPointAddress -> Accessor ConnectionState (Set NT.ConnectionId)
incomingFrom :: EndPointAddress -> Accessor ConnectionState (Set ConnectionId)
incomingFrom EndPointAddress
addr = Accessor ConnectionState (Map EndPointAddress (Set ConnectionId))
aux Accessor ConnectionState (Map EndPointAddress (Set ConnectionId))
-> T (Map EndPointAddress (Set ConnectionId)) (Set ConnectionId)
-> Accessor ConnectionState (Set ConnectionId)
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> Set ConnectionId
-> EndPointAddress
-> T (Map EndPointAddress (Set ConnectionId)) (Set ConnectionId)
forall key elem.
Ord key =>
elem -> key -> Accessor (Map key elem) elem
DAC.mapDefault Set ConnectionId
forall a. Set a
Set.empty EndPointAddress
addr
  where
    aux :: Accessor ConnectionState (Map EndPointAddress (Set ConnectionId))
aux = (ConnectionState -> Map EndPointAddress (Set ConnectionId))
-> (Map EndPointAddress (Set ConnectionId)
    -> ConnectionState -> ConnectionState)
-> Accessor
     ConnectionState (Map EndPointAddress (Set ConnectionId))
forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor ConnectionState -> Map EndPointAddress (Set ConnectionId)
_incomingFrom (\Map EndPointAddress (Set ConnectionId)
fr ConnectionState
st -> ConnectionState
st { _incomingFrom = fr })

handleIncomingMessages :: LocalNode -> IO ()
handleIncomingMessages :: LocalNode -> IO ()
handleIncomingMessages LocalNode
node = ConnectionState -> IO ()
go ConnectionState
initConnectionState
   IO () -> (NodeClosedException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Exception.catch` \(NodeClosedException NodeId
_) -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
  where
    go :: ConnectionState -> IO ()
    go :: ConnectionState -> IO ()
go !ConnectionState
st = do
      Event
event <- EndPoint -> IO Event
NT.receive EndPoint
endpoint
      case Event
event of
        NT.ConnectionOpened ConnectionId
cid Reliability
rel EndPointAddress
theirAddr ->
          if Reliability
rel Reliability -> Reliability -> Bool
forall a. Eq a => a -> a -> Bool
== Reliability
NT.ReliableOrdered
            then
              LocalNode -> MxEvent -> IO ()
trace LocalNode
node (ConnectionId -> EndPointAddress -> MxEvent
MxConnected ConnectionId
cid EndPointAddress
theirAddr)
              IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConnectionState -> IO ()
go (
                      (ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection -> ConnectionState -> ConnectionState
forall r a. T r a -> a -> r -> r
^= IncomingConnection -> Maybe IncomingConnection
forall a. a -> Maybe a
Just (EndPointAddress
theirAddr, IncomingTarget
Uninit))
                    (ConnectionState -> ConnectionState)
-> (ConnectionState -> ConnectionState)
-> ConnectionState
-> ConnectionState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (EndPointAddress -> Accessor ConnectionState (Set ConnectionId)
incomingFrom EndPointAddress
theirAddr Accessor ConnectionState (Set ConnectionId)
-> (Set ConnectionId -> Set ConnectionId)
-> ConnectionState
-> ConnectionState
forall r a. T r a -> (a -> a) -> r -> r
^: ConnectionId -> Set ConnectionId -> Set ConnectionId
forall a. Ord a => a -> Set a -> Set a
Set.insert ConnectionId
cid)
                    (ConnectionState -> ConnectionState)
-> ConnectionState -> ConnectionState
forall a b. (a -> b) -> a -> b
$ ConnectionState
st
                    )
            else ConnectionId -> ConnectionState -> String -> IO ()
invalidRequest ConnectionId
cid ConnectionState
st (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
                  String
"attempt to connect with unsupported reliability " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Reliability -> String
forall a. Show a => a -> String
show Reliability
rel
        NT.Received ConnectionId
cid [ByteString]
payload ->
          case ConnectionState
st ConnectionState
-> Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection
forall r a. r -> T r a -> a
^. ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid of
            Just (EndPointAddress
_, ToProc ProcessId
pid Weak (CQueue Message)
weakQueue) -> do
              Maybe (CQueue Message)
mQueue <- Weak (CQueue Message) -> IO (Maybe (CQueue Message))
forall v. Weak v -> IO (Maybe v)
deRefWeak Weak (CQueue Message)
weakQueue
              Maybe (CQueue Message) -> (CQueue Message -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (CQueue Message)
mQueue ((CQueue Message -> IO ()) -> IO ())
-> (CQueue Message -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \CQueue Message
queue -> do
                -- TODO: if we find that the queue is Nothing, should we remove
                -- it from the NC state? (and same for channels, below)
                let msg :: Message
msg = [ByteString] -> Message
payloadToMessage [ByteString]
payload
                CQueue Message -> Message -> IO ()
forall a. CQueue a -> a -> IO ()
enqueue CQueue Message
queue Message
msg -- 'enqueue' is strict
                LocalNode -> MxEvent -> IO ()
trace LocalNode
node (ProcessId -> Message -> MxEvent
MxReceived ProcessId
pid Message
msg)
              ConnectionState -> IO ()
go ConnectionState
st
            Just (EndPointAddress
_, ToChan SendPortId
chId (TypedChannel Weak (TQueue a)
chan')) -> do
              Maybe (TQueue a)
mChan <- Weak (TQueue a) -> IO (Maybe (TQueue a))
forall v. Weak v -> IO (Maybe v)
deRefWeak Weak (TQueue a)
chan'
              -- If mChan is Nothing, the process has given up the read end of
              -- the channel and we simply ignore the incoming message
              Maybe (TQueue a) -> (TQueue a -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (TQueue a)
mChan ((TQueue a -> IO ()) -> IO ()) -> (TQueue a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \TQueue a
chan -> do
                a
msg' <- STM a -> IO a
forall a. STM a -> IO a
atomically (STM a -> IO a) -> STM a -> IO a
forall a b. (a -> b) -> a -> b
$ do
                  a
msg <- a -> STM a
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> STM a) -> a -> STM a
forall a b. (a -> b) -> a -> b
$! ByteString -> a
forall a. Binary a => ByteString -> a
decode ([ByteString] -> ByteString
BSL.fromChunks [ByteString]
payload)
                  -- We make sure the message is fully decoded when it is enqueued
                  TQueue a -> a -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue a
chan a
msg
                  a -> STM a
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return a
msg
                LocalNode -> MxEvent -> IO ()
trace LocalNode
node (MxEvent -> IO ()) -> MxEvent -> IO ()
forall a b. (a -> b) -> a -> b
$ SendPortId -> Message -> MxEvent
MxReceivedPort SendPortId
chId (Message -> MxEvent) -> Message -> MxEvent
forall a b. (a -> b) -> a -> b
$ a -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage a
msg'
              ConnectionState -> IO ()
go ConnectionState
st
            Just (EndPointAddress
_, IncomingTarget
ToNode) -> do
              let ctrlMsg :: NCMsg
ctrlMsg = ByteString -> NCMsg
forall a. Binary a => ByteString -> a
decode (ByteString -> NCMsg)
-> ([ByteString] -> ByteString) -> [ByteString] -> NCMsg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ByteString] -> ByteString
BSL.fromChunks ([ByteString] -> NCMsg) -> [ByteString] -> NCMsg
forall a b. (a -> b) -> a -> b
$ [ByteString]
payload
              Chan NCMsg -> NCMsg -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan NCMsg
ctrlChan (NCMsg -> IO ()) -> NCMsg -> IO ()
forall a b. (a -> b) -> a -> b
$! NCMsg
ctrlMsg
              ConnectionState -> IO ()
go ConnectionState
st
            Just (EndPointAddress
src, IncomingTarget
Uninit) ->
              case ByteString -> Identifier
forall a. Binary a => ByteString -> a
decode ([ByteString] -> ByteString
BSL.fromChunks [ByteString]
payload) of
                ProcessIdentifier ProcessId
pid -> do
                  let lpid :: LocalProcessId
lpid = ProcessId -> LocalProcessId
processLocalId ProcessId
pid
                  Maybe LocalProcess
mProc <- LocalNode
-> (ValidLocalNodeState -> IO (Maybe LocalProcess))
-> IO (Maybe LocalProcess)
forall r. LocalNode -> (ValidLocalNodeState -> IO r) -> IO r
withValidLocalState LocalNode
node ((ValidLocalNodeState -> IO (Maybe LocalProcess))
 -> IO (Maybe LocalProcess))
-> (ValidLocalNodeState -> IO (Maybe LocalProcess))
-> IO (Maybe LocalProcess)
forall a b. (a -> b) -> a -> b
$ Maybe LocalProcess -> IO (Maybe LocalProcess)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe LocalProcess -> IO (Maybe LocalProcess))
-> (ValidLocalNodeState -> Maybe LocalProcess)
-> ValidLocalNodeState
-> IO (Maybe LocalProcess)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ValidLocalNodeState
-> Accessor ValidLocalNodeState (Maybe LocalProcess)
-> Maybe LocalProcess
forall r a. r -> T r a -> a
^. LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId LocalProcessId
lpid)
                  case Maybe LocalProcess
mProc of
                    Just LocalProcess
proc ->
                      ConnectionState -> IO ()
go (ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection -> ConnectionState -> ConnectionState
forall r a. T r a -> a -> r -> r
^= IncomingConnection -> Maybe IncomingConnection
forall a. a -> Maybe a
Just (EndPointAddress
src, ProcessId -> Weak (CQueue Message) -> IncomingTarget
ToProc ProcessId
pid (LocalProcess -> Weak (CQueue Message)
processWeakQ LocalProcess
proc)) (ConnectionState -> ConnectionState)
-> ConnectionState -> ConnectionState
forall a b. (a -> b) -> a -> b
$ ConnectionState
st)
                    Maybe LocalProcess
Nothing ->
                      -- incoming attempt to connect to unknown process - might
                      -- be dead already
                      ConnectionState -> IO ()
go (ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection -> ConnectionState -> ConnectionState
forall r a. T r a -> a -> r -> r
^= Maybe IncomingConnection
forall a. Maybe a
Nothing (ConnectionState -> ConnectionState)
-> ConnectionState -> ConnectionState
forall a b. (a -> b) -> a -> b
$ ConnectionState
st)
                SendPortIdentifier SendPortId
chId -> do
                  let lcid :: Int32
lcid = SendPortId -> Int32
sendPortLocalId SendPortId
chId
                      lpid :: LocalProcessId
lpid = ProcessId -> LocalProcessId
processLocalId (SendPortId -> ProcessId
sendPortProcessId SendPortId
chId)
                  Maybe LocalProcess
mProc <- LocalNode
-> (ValidLocalNodeState -> IO (Maybe LocalProcess))
-> IO (Maybe LocalProcess)
forall r. LocalNode -> (ValidLocalNodeState -> IO r) -> IO r
withValidLocalState LocalNode
node ((ValidLocalNodeState -> IO (Maybe LocalProcess))
 -> IO (Maybe LocalProcess))
-> (ValidLocalNodeState -> IO (Maybe LocalProcess))
-> IO (Maybe LocalProcess)
forall a b. (a -> b) -> a -> b
$ Maybe LocalProcess -> IO (Maybe LocalProcess)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe LocalProcess -> IO (Maybe LocalProcess))
-> (ValidLocalNodeState -> Maybe LocalProcess)
-> ValidLocalNodeState
-> IO (Maybe LocalProcess)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ValidLocalNodeState
-> Accessor ValidLocalNodeState (Maybe LocalProcess)
-> Maybe LocalProcess
forall r a. r -> T r a -> a
^. LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId LocalProcessId
lpid)
                  case Maybe LocalProcess
mProc of
                    Just LocalProcess
proc -> do
                      Maybe TypedChannel
mChannel <- StrictMVar LocalProcessState
-> (LocalProcessState -> IO (Maybe TypedChannel))
-> IO (Maybe TypedChannel)
forall a b. StrictMVar a -> (a -> IO b) -> IO b
withMVar (LocalProcess -> StrictMVar LocalProcessState
processState LocalProcess
proc) ((LocalProcessState -> IO (Maybe TypedChannel))
 -> IO (Maybe TypedChannel))
-> (LocalProcessState -> IO (Maybe TypedChannel))
-> IO (Maybe TypedChannel)
forall a b. (a -> b) -> a -> b
$ Maybe TypedChannel -> IO (Maybe TypedChannel)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe TypedChannel -> IO (Maybe TypedChannel))
-> (LocalProcessState -> Maybe TypedChannel)
-> LocalProcessState
-> IO (Maybe TypedChannel)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (LocalProcessState
-> T LocalProcessState (Maybe TypedChannel) -> Maybe TypedChannel
forall r a. r -> T r a -> a
^. Int32 -> T LocalProcessState (Maybe TypedChannel)
typedChannelWithId Int32
lcid)
                      case Maybe TypedChannel
mChannel of
                        Just TypedChannel
channel ->
                          ConnectionState -> IO ()
go (ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection -> ConnectionState -> ConnectionState
forall r a. T r a -> a -> r -> r
^= IncomingConnection -> Maybe IncomingConnection
forall a. a -> Maybe a
Just (EndPointAddress
src, SendPortId -> TypedChannel -> IncomingTarget
ToChan SendPortId
chId TypedChannel
channel) (ConnectionState -> ConnectionState)
-> ConnectionState -> ConnectionState
forall a b. (a -> b) -> a -> b
$ ConnectionState
st)
                        Maybe TypedChannel
Nothing ->
                          ConnectionId -> ConnectionState -> String -> IO ()
invalidRequest ConnectionId
cid ConnectionState
st (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
                            String
"incoming attempt to connect to unknown channel of"
                            String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" process " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show (SendPortId -> ProcessId
sendPortProcessId SendPortId
chId)
                    Maybe LocalProcess
Nothing ->
                      -- incoming attempt to connect to channel of unknown
                      -- process - might be dead already
                      ConnectionState -> IO ()
go (ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection -> ConnectionState -> ConnectionState
forall r a. T r a -> a -> r -> r
^= Maybe IncomingConnection
forall a. Maybe a
Nothing (ConnectionState -> ConnectionState)
-> ConnectionState -> ConnectionState
forall a b. (a -> b) -> a -> b
$ ConnectionState
st)
                NodeIdentifier NodeId
nid ->
                  if NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== LocalNode -> NodeId
localNodeId LocalNode
node
                    then ConnectionState -> IO ()
go (ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection -> ConnectionState -> ConnectionState
forall r a. T r a -> a -> r -> r
^= IncomingConnection -> Maybe IncomingConnection
forall a. a -> Maybe a
Just (EndPointAddress
src, IncomingTarget
ToNode) (ConnectionState -> ConnectionState)
-> ConnectionState -> ConnectionState
forall a b. (a -> b) -> a -> b
$ ConnectionState
st)
                    else ConnectionId -> ConnectionState -> String -> IO ()
invalidRequest ConnectionId
cid ConnectionState
st (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
                           String
"incoming attempt to connect to a different node -"
                           String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" I'm " String -> String -> String
forall a. [a] -> [a] -> [a]
++ NodeId -> String
forall a. Show a => a -> String
show (LocalNode -> NodeId
localNodeId LocalNode
node)
                           String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" but the remote peer wants to connect to "
                           String -> String -> String
forall a. [a] -> [a] -> [a]
++  NodeId -> String
forall a. Show a => a -> String
show NodeId
nid
            Maybe IncomingConnection
Nothing ->
              ConnectionId -> ConnectionState -> String -> IO ()
invalidRequest ConnectionId
cid ConnectionState
st
                String
"message received from an unknown connection"
        NT.ConnectionClosed ConnectionId
cid ->
          case ConnectionState
st ConnectionState
-> Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection
forall r a. r -> T r a -> a
^. ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid of
            Maybe IncomingConnection
Nothing ->
              ConnectionId -> ConnectionState -> String -> IO ()
invalidRequest ConnectionId
cid ConnectionState
st String
"closed unknown connection"
            Just (EndPointAddress
src, IncomingTarget
_) -> do
              LocalNode -> MxEvent -> IO ()
trace LocalNode
node (ConnectionId -> EndPointAddress -> MxEvent
MxDisconnected ConnectionId
cid EndPointAddress
src)
              ConnectionState -> IO ()
go ( (ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection -> ConnectionState -> ConnectionState
forall r a. T r a -> a -> r -> r
^= Maybe IncomingConnection
forall a. Maybe a
Nothing)
                 (ConnectionState -> ConnectionState)
-> (ConnectionState -> ConnectionState)
-> ConnectionState
-> ConnectionState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (EndPointAddress -> Accessor ConnectionState (Set ConnectionId)
incomingFrom EndPointAddress
src Accessor ConnectionState (Set ConnectionId)
-> (Set ConnectionId -> Set ConnectionId)
-> ConnectionState
-> ConnectionState
forall r a. T r a -> (a -> a) -> r -> r
^: ConnectionId -> Set ConnectionId -> Set ConnectionId
forall a. Ord a => a -> Set a -> Set a
Set.delete ConnectionId
cid)
                 (ConnectionState -> ConnectionState)
-> ConnectionState -> ConnectionState
forall a b. (a -> b) -> a -> b
$ ConnectionState
st
                 )
        NT.ErrorEvent (NT.TransportError (NT.EventConnectionLost EndPointAddress
theirAddr) String
_) -> do
          -- [Unified table 9, rule node_disconnect]
          let nid :: Identifier
nid = NodeId -> Identifier
NodeIdentifier (NodeId -> Identifier) -> NodeId -> Identifier
forall a b. (a -> b) -> a -> b
$ EndPointAddress -> NodeId
NodeId EndPointAddress
theirAddr
          Chan NCMsg -> NCMsg -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan NCMsg
ctrlChan NCMsg
            { ctrlMsgSender :: Identifier
ctrlMsgSender = Identifier
nid
            , ctrlMsgSignal :: ProcessSignal
ctrlMsgSignal = Identifier -> DiedReason -> ProcessSignal
Died Identifier
nid DiedReason
DiedDisconnect
            }
          let notLost :: ConnectionId -> Bool
notLost ConnectionId
k = Bool -> Bool
not (ConnectionId
k ConnectionId -> Set ConnectionId -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` (ConnectionState
st ConnectionState
-> Accessor ConnectionState (Set ConnectionId) -> Set ConnectionId
forall r a. r -> T r a -> a
^. EndPointAddress -> Accessor ConnectionState (Set ConnectionId)
incomingFrom EndPointAddress
theirAddr))
          LocalNode -> Identifier -> IO ()
closeImplicitReconnections LocalNode
node Identifier
nid
          ConnectionState -> IO ()
go ( (EndPointAddress -> Accessor ConnectionState (Set ConnectionId)
incomingFrom EndPointAddress
theirAddr Accessor ConnectionState (Set ConnectionId)
-> Set ConnectionId -> ConnectionState -> ConnectionState
forall r a. T r a -> a -> r -> r
^= Set ConnectionId
forall a. Set a
Set.empty)
             (ConnectionState -> ConnectionState)
-> (ConnectionState -> ConnectionState)
-> ConnectionState
-> ConnectionState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Accessor ConnectionState (Map ConnectionId IncomingConnection)
incoming Accessor ConnectionState (Map ConnectionId IncomingConnection)
-> (Map ConnectionId IncomingConnection
    -> Map ConnectionId IncomingConnection)
-> ConnectionState
-> ConnectionState
forall r a. T r a -> (a -> a) -> r -> r
^: (ConnectionId -> IncomingConnection -> Bool)
-> Map ConnectionId IncomingConnection
-> Map ConnectionId IncomingConnection
forall k a. (k -> a -> Bool) -> Map k a -> Map k a
Map.filterWithKey (Bool -> IncomingConnection -> Bool
forall a b. a -> b -> a
const (Bool -> IncomingConnection -> Bool)
-> (ConnectionId -> Bool)
-> ConnectionId
-> IncomingConnection
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId -> Bool
notLost))
             (ConnectionState -> ConnectionState)
-> ConnectionState -> ConnectionState
forall a b. (a -> b) -> a -> b
$ ConnectionState
st
             )
        NT.ErrorEvent (NT.TransportError EventErrorCode
NT.EventEndPointFailed String
str) ->
          String -> IO ()
forall a. String -> IO a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Cloud Haskell fatal error: end point failed: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
str
        NT.ErrorEvent (NT.TransportError EventErrorCode
NT.EventTransportFailed String
str) ->
          String -> IO ()
forall a. String -> IO a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Cloud Haskell fatal error: transport failed: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
str
        Event
NT.EndPointClosed ->
          () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        NT.ReceivedMulticast MulticastAddress
_ [ByteString]
_ ->
          -- If we received a multicast message, something went horribly wrong
          -- and we just give up
          String -> IO ()
forall a. String -> IO a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Cloud Haskell fatal error: received unexpected multicast"

    invalidRequest :: NT.ConnectionId -> ConnectionState -> String -> IO ()
    invalidRequest :: ConnectionId -> ConnectionState -> String -> IO ()
invalidRequest ConnectionId
cid ConnectionState
st String
msg = do
      -- TODO: We should treat this as a fatal error on the part of the remote
      -- node. That is, we should report the remote node as having died, and we
      -- should close incoming connections (this requires a Transport layer
      -- extension).
      LocalNode -> String -> [TraceArg] -> IO ()
traceEventFmtIO LocalNode
node String
"" [ String -> TraceArg
TraceStr (String -> TraceArg) -> String -> TraceArg
forall a b. (a -> b) -> a -> b
$ String
" [network] invalid request"
                                           String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" (" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
msg String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"): "
                              , (ConnectionId -> TraceArg
forall a. Show a => a -> TraceArg
Trace ConnectionId
cid)
                              ]
      ConnectionState -> IO ()
go ( ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection -> ConnectionState -> ConnectionState
forall r a. T r a -> a -> r -> r
^= Maybe IncomingConnection
forall a. Maybe a
Nothing
         (ConnectionState -> ConnectionState)
-> ConnectionState -> ConnectionState
forall a b. (a -> b) -> a -> b
$ ConnectionState
st
         )

    endpoint :: EndPoint
endpoint = LocalNode -> EndPoint
localEndPoint LocalNode
node
    ctrlChan :: Chan NCMsg
ctrlChan = LocalNode -> Chan NCMsg
localCtrlChan LocalNode
node

--------------------------------------------------------------------------------
-- Top-level access to the node controller                                    --
--------------------------------------------------------------------------------

runNodeController :: LocalNode -> IO ()
runNodeController :: LocalNode -> IO ()
runNodeController LocalNode
node =
  ReaderT LocalNode IO () -> LocalNode -> IO ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (StateT NCState (ReaderT LocalNode IO) ()
-> NCState -> ReaderT LocalNode IO ()
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a
evalStateT (NC () -> StateT NCState (ReaderT LocalNode IO) ()
forall a. NC a -> StateT NCState (ReaderT LocalNode IO) a
unNC NC ()
nodeController) NCState
initNCState) LocalNode
node
   IO () -> (NodeClosedException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Exception.catch` \(NodeClosedException NodeId
_) -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

--------------------------------------------------------------------------------
-- Internal data types                                                        --
--------------------------------------------------------------------------------

data NCState = NCState
  {  -- Mapping from remote processes to linked local processes
    NCState -> BiMultiMap Identifier ProcessId ()
_links    :: !(BiMultiMap Identifier ProcessId ())
     -- Mapping from remote processes to monitoring local processes
  , NCState -> BiMultiMap Identifier ProcessId MonitorRef
_monitors :: !(BiMultiMap Identifier ProcessId MonitorRef)
     -- Process registry: names and where they live, mapped to the PIDs
  , NCState -> Map String ProcessId
_registeredHere :: !(Map String ProcessId)
  , NCState -> Map ProcessId [(NodeId, Int)]
_registeredOnNodes :: !(Map ProcessId [(NodeId,Int)])
  }

newtype NC a = NC { forall a. NC a -> StateT NCState (ReaderT LocalNode IO) a
unNC :: StateT NCState (ReaderT LocalNode IO) a }
  deriving ( Functor NC
Functor NC =>
(forall a. a -> NC a)
-> (forall a b. NC (a -> b) -> NC a -> NC b)
-> (forall a b c. (a -> b -> c) -> NC a -> NC b -> NC c)
-> (forall a b. NC a -> NC b -> NC b)
-> (forall a b. NC a -> NC b -> NC a)
-> Applicative NC
forall a. a -> NC a
forall a b. NC a -> NC b -> NC a
forall a b. NC a -> NC b -> NC b
forall a b. NC (a -> b) -> NC a -> NC b
forall a b c. (a -> b -> c) -> NC a -> NC b -> NC c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
$cpure :: forall a. a -> NC a
pure :: forall a. a -> NC a
$c<*> :: forall a b. NC (a -> b) -> NC a -> NC b
<*> :: forall a b. NC (a -> b) -> NC a -> NC b
$cliftA2 :: forall a b c. (a -> b -> c) -> NC a -> NC b -> NC c
liftA2 :: forall a b c. (a -> b -> c) -> NC a -> NC b -> NC c
$c*> :: forall a b. NC a -> NC b -> NC b
*> :: forall a b. NC a -> NC b -> NC b
$c<* :: forall a b. NC a -> NC b -> NC a
<* :: forall a b. NC a -> NC b -> NC a
Applicative
           , (forall a b. (a -> b) -> NC a -> NC b)
-> (forall a b. a -> NC b -> NC a) -> Functor NC
forall a b. a -> NC b -> NC a
forall a b. (a -> b) -> NC a -> NC b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall a b. (a -> b) -> NC a -> NC b
fmap :: forall a b. (a -> b) -> NC a -> NC b
$c<$ :: forall a b. a -> NC b -> NC a
<$ :: forall a b. a -> NC b -> NC a
Functor
           , Applicative NC
Applicative NC =>
(forall a b. NC a -> (a -> NC b) -> NC b)
-> (forall a b. NC a -> NC b -> NC b)
-> (forall a. a -> NC a)
-> Monad NC
forall a. a -> NC a
forall a b. NC a -> NC b -> NC b
forall a b. NC a -> (a -> NC b) -> NC b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
$c>>= :: forall a b. NC a -> (a -> NC b) -> NC b
>>= :: forall a b. NC a -> (a -> NC b) -> NC b
$c>> :: forall a b. NC a -> NC b -> NC b
>> :: forall a b. NC a -> NC b -> NC b
$creturn :: forall a. a -> NC a
return :: forall a. a -> NC a
Monad
           , Monad NC
Monad NC => (forall a. IO a -> NC a) -> MonadIO NC
forall a. IO a -> NC a
forall (m :: * -> *).
Monad m =>
(forall a. IO a -> m a) -> MonadIO m
$cliftIO :: forall a. IO a -> NC a
liftIO :: forall a. IO a -> NC a
MonadIO
           , MonadState NCState
           , MonadReader LocalNode
           )

initNCState :: NCState
initNCState :: NCState
initNCState = NCState { _links :: BiMultiMap Identifier ProcessId ()
_links    = BiMultiMap Identifier ProcessId ()
forall a b v. BiMultiMap a b v
BiMultiMap.empty
                      , _monitors :: BiMultiMap Identifier ProcessId MonitorRef
_monitors = BiMultiMap Identifier ProcessId MonitorRef
forall a b v. BiMultiMap a b v
BiMultiMap.empty
                      , _registeredHere :: Map String ProcessId
_registeredHere = Map String ProcessId
forall k a. Map k a
Map.empty
                      , _registeredOnNodes :: Map ProcessId [(NodeId, Int)]
_registeredOnNodes = Map ProcessId [(NodeId, Int)]
forall k a. Map k a
Map.empty
                      }

-- | Thrown in response to the user invoking 'kill' (see Primitives.hs). This
-- type is deliberately not exported so it cannot be caught explicitly.
data ProcessKillException =
    ProcessKillException !ProcessId !String
  deriving (Typeable)

instance Exception ProcessKillException
instance Show ProcessKillException where
  show :: ProcessKillException -> String
show (ProcessKillException ProcessId
pid String
reason) =
    String
"killed-by=" String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
",reason=" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
reason

ncSendToProcess :: ProcessId -> Message -> NC ()
ncSendToProcess :: ProcessId -> Message -> NC ()
ncSendToProcess = Bool -> ProcessId -> Message -> NC ()
ncSendToProcessAndTrace Bool
True

ncSendToProcessAndTrace :: Bool -> ProcessId -> Message -> NC ()
ncSendToProcessAndTrace :: Bool -> ProcessId -> Message -> NC ()
ncSendToProcessAndTrace Bool
shouldTrace ProcessId
pid Message
msg = do
    LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
    if ProcessId -> NodeId
processNodeId ProcessId
pid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== LocalNode -> NodeId
localNodeId LocalNode
node
      then Bool -> LocalNode -> ProcessId -> Message -> NC ()
ncEffectLocalSendAndTrace Bool
shouldTrace LocalNode
node ProcessId
pid Message
msg
      else IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> NCMsg -> IO ()
forall a.
Binary a =>
LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
sendBinary LocalNode
node
             (NodeId -> Identifier
NodeIdentifier (NodeId -> Identifier) -> NodeId -> Identifier
forall a b. (a -> b) -> a -> b
$ LocalNode -> NodeId
localNodeId LocalNode
node)
             (NodeId -> Identifier
NodeIdentifier (NodeId -> Identifier) -> NodeId -> Identifier
forall a b. (a -> b) -> a -> b
$ ProcessId -> NodeId
processNodeId ProcessId
pid)
             ImplicitReconnect
WithImplicitReconnect
             NCMsg { ctrlMsgSender :: Identifier
ctrlMsgSender = NodeId -> Identifier
NodeIdentifier (NodeId -> Identifier) -> NodeId -> Identifier
forall a b. (a -> b) -> a -> b
$ LocalNode -> NodeId
localNodeId LocalNode
node
                   , ctrlMsgSignal :: ProcessSignal
ctrlMsgSignal = LocalProcessId -> Message -> ProcessSignal
UnreliableSend (ProcessId -> LocalProcessId
processLocalId ProcessId
pid) Message
msg
                   }

ncSendToNode :: NodeId -> NCMsg -> NC ()
ncSendToNode :: NodeId -> NCMsg -> NC ()
ncSendToNode NodeId
to NCMsg
msg = do
    LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
    IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ if NodeId
to NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== LocalNode -> NodeId
localNodeId LocalNode
node
      then Chan NCMsg -> NCMsg -> IO ()
forall a. Chan a -> a -> IO ()
writeChan (LocalNode -> Chan NCMsg
localCtrlChan LocalNode
node) (NCMsg -> IO ()) -> NCMsg -> IO ()
forall a b. (a -> b) -> a -> b
$! NCMsg
msg
      else LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> NCMsg -> IO ()
forall a.
Binary a =>
LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
sendBinary LocalNode
node
             (NodeId -> Identifier
NodeIdentifier (NodeId -> Identifier) -> NodeId -> Identifier
forall a b. (a -> b) -> a -> b
$ LocalNode -> NodeId
localNodeId LocalNode
node)
             (NodeId -> Identifier
NodeIdentifier NodeId
to)
             ImplicitReconnect
WithImplicitReconnect
             NCMsg
msg

--------------------------------------------------------------------------------
-- Tracing/Debugging                                                          --
--------------------------------------------------------------------------------

-- [Issue #104 / DP-13]

traceNotifyDied :: LocalNode -> Identifier -> DiedReason -> NC ()
traceNotifyDied :: LocalNode -> Identifier -> DiedReason -> NC ()
traceNotifyDied LocalNode
node Identifier
ident DiedReason
reason =
  -- TODO: sendPortDied notifications
  IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> (MxEventBus -> IO ()) -> IO ()
withLocalTracer LocalNode
node ((MxEventBus -> IO ()) -> IO ()) -> (MxEventBus -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \MxEventBus
t ->
    case Identifier
ident of
      (NodeIdentifier NodeId
nid)    -> MxEventBus -> MxEvent -> IO ()
traceEvent MxEventBus
t (NodeId -> DiedReason -> MxEvent
MxNodeDied NodeId
nid DiedReason
reason)
      (ProcessIdentifier ProcessId
pid) -> MxEventBus -> MxEvent -> IO ()
traceEvent MxEventBus
t (ProcessId -> DiedReason -> MxEvent
MxProcessDied ProcessId
pid DiedReason
reason)
      Identifier
_                       -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

traceEventFmtIO :: LocalNode
                -> String
                -> [TraceArg]
                -> IO ()
traceEventFmtIO :: LocalNode -> String -> [TraceArg] -> IO ()
traceEventFmtIO LocalNode
node String
fmt [TraceArg]
args =
  LocalNode -> (MxEventBus -> IO ()) -> IO ()
withLocalTracer LocalNode
node ((MxEventBus -> IO ()) -> IO ()) -> (MxEventBus -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \MxEventBus
t -> MxEventBus -> String -> [TraceArg] -> IO ()
traceLogFmt MxEventBus
t String
fmt [TraceArg]
args

trace :: LocalNode -> MxEvent -> IO ()
trace :: LocalNode -> MxEvent -> IO ()
trace LocalNode
node MxEvent
ev = LocalNode -> (MxEventBus -> IO ()) -> IO ()
withLocalTracer LocalNode
node ((MxEventBus -> IO ()) -> IO ()) -> (MxEventBus -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \MxEventBus
t -> MxEventBus -> MxEvent -> IO ()
traceEvent MxEventBus
t MxEvent
ev

withLocalTracer :: LocalNode -> (MxEventBus -> IO ()) -> IO ()
withLocalTracer :: LocalNode -> (MxEventBus -> IO ()) -> IO ()
withLocalTracer LocalNode
node MxEventBus -> IO ()
act = MxEventBus -> IO ()
act (LocalNode -> MxEventBus
localEventBus LocalNode
node)

--------------------------------------------------------------------------------
-- Core functionality                                                         --
--------------------------------------------------------------------------------

-- [Unified: Table 7]
nodeController :: NC ()
nodeController :: NC ()
nodeController = do
  LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
  NC () -> NC ()
forall (m :: * -> *) a b. Monad m => m a -> m b
forever' (NC () -> NC ()) -> NC () -> NC ()
forall a b. (a -> b) -> a -> b
$ do
    NCMsg
msg  <- IO NCMsg -> NC NCMsg
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO NCMsg -> NC NCMsg) -> IO NCMsg -> NC NCMsg
forall a b. (a -> b) -> a -> b
$ Chan NCMsg -> IO NCMsg
forall a. Chan a -> IO a
readChan (LocalNode -> Chan NCMsg
localCtrlChan LocalNode
node)

    -- [Unified: Table 7, rule nc_forward]
    case ProcessSignal -> Maybe NodeId
destNid (NCMsg -> ProcessSignal
ctrlMsgSignal NCMsg
msg) of
      Just NodeId
nid' | NodeId
nid' NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
/= LocalNode -> NodeId
localNodeId LocalNode
node ->
        NodeId -> NCMsg -> NC ()
ncSendToNode NodeId
nid' NCMsg
msg
      Maybe NodeId
_ ->
        () -> NC ()
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

    case NCMsg
msg of
      NCMsg (ProcessIdentifier ProcessId
from) (Link Identifier
them) ->
        ProcessId -> Identifier -> Maybe MonitorRef -> NC ()
ncEffectMonitor ProcessId
from Identifier
them Maybe MonitorRef
forall a. Maybe a
Nothing
      NCMsg (ProcessIdentifier ProcessId
from) (Monitor MonitorRef
ref) ->
        ProcessId -> Identifier -> Maybe MonitorRef -> NC ()
ncEffectMonitor ProcessId
from (MonitorRef -> Identifier
monitorRefIdent MonitorRef
ref) (MonitorRef -> Maybe MonitorRef
forall a. a -> Maybe a
Just MonitorRef
ref)
      NCMsg (ProcessIdentifier ProcessId
from) (Unlink Identifier
them) ->
        ProcessId -> Identifier -> NC ()
ncEffectUnlink ProcessId
from Identifier
them
      NCMsg (ProcessIdentifier ProcessId
from) (Unmonitor MonitorRef
ref) ->
        ProcessId -> MonitorRef -> NC ()
ncEffectUnmonitor ProcessId
from MonitorRef
ref
      NCMsg Identifier
_from (Died Identifier
ident DiedReason
reason) ->
        Identifier -> DiedReason -> NC ()
ncEffectDied Identifier
ident DiedReason
reason
      NCMsg (ProcessIdentifier ProcessId
from) (Spawn Closure (Process ())
proc SpawnRef
ref) ->
        ProcessId -> Closure (Process ()) -> SpawnRef -> NC ()
ncEffectSpawn ProcessId
from Closure (Process ())
proc SpawnRef
ref
      NCMsg (ProcessIdentifier ProcessId
from) (Register String
label NodeId
atnode Maybe ProcessId
pid Bool
force) ->
        ProcessId -> String -> NodeId -> Maybe ProcessId -> Bool -> NC ()
ncEffectRegister ProcessId
from String
label NodeId
atnode Maybe ProcessId
pid Bool
force
      NCMsg (ProcessIdentifier ProcessId
from) (WhereIs String
label) ->
        ProcessId -> String -> NC ()
ncEffectWhereIs ProcessId
from String
label
      NCMsg Identifier
_ (NamedSend String
label Message
msg') ->
        String -> Message -> NC ()
ncEffectNamedSend String
label Message
msg'
      NCMsg Identifier
_ (UnreliableSend LocalProcessId
lpid Message
msg') ->
        LocalNode -> ProcessId -> Message -> NC ()
ncEffectLocalSend LocalNode
node (NodeId -> LocalProcessId -> ProcessId
ProcessId (LocalNode -> NodeId
localNodeId LocalNode
node) LocalProcessId
lpid) Message
msg'
      NCMsg Identifier
_ (LocalSend ProcessId
to Message
msg') ->
        LocalNode -> ProcessId -> Message -> NC ()
ncEffectLocalSend LocalNode
node ProcessId
to Message
msg'
      NCMsg Identifier
_ (LocalPortSend SendPortId
to Message
msg') ->
        SendPortId -> Message -> NC ()
ncEffectLocalPortSend SendPortId
to Message
msg'
      NCMsg (ProcessIdentifier ProcessId
from) (Kill ProcessId
to String
reason) ->
        ProcessId -> ProcessId -> String -> NC ()
ncEffectKill ProcessId
from ProcessId
to String
reason
      NCMsg (ProcessIdentifier ProcessId
from) (Exit ProcessId
to Message
reason) ->
        ProcessId -> ProcessId -> Message -> NC ()
ncEffectExit ProcessId
from ProcessId
to Message
reason
      NCMsg (ProcessIdentifier ProcessId
from) (GetInfo ProcessId
pid) ->
        ProcessId -> ProcessId -> NC ()
ncEffectGetInfo ProcessId
from ProcessId
pid
      NCMsg Identifier
_ ProcessSignal
SigShutdown ->
        IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ do
          EndPoint -> IO ()
NT.closeEndPoint (LocalNode -> EndPoint
localEndPoint LocalNode
node)
            IO () -> IO Any -> IO ()
forall a b. IO a -> IO b -> IO a
`Exception.finally` NodeClosedException -> IO Any
forall e a. Exception e => e -> IO a
throwIO (NodeId -> NodeClosedException
NodeClosedException (NodeId -> NodeClosedException) -> NodeId -> NodeClosedException
forall a b. (a -> b) -> a -> b
$ LocalNode -> NodeId
localNodeId LocalNode
node)
      NCMsg (ProcessIdentifier ProcessId
from) (GetNodeStats NodeId
nid) ->
        ProcessId -> NodeId -> NC ()
ncEffectGetNodeStats ProcessId
from NodeId
nid
      NCMsg
unexpected ->
        String -> NC ()
forall a. HasCallStack => String -> a
error (String -> NC ()) -> String -> NC ()
forall a b. (a -> b) -> a -> b
$ String
"nodeController: unexpected message " String -> String -> String
forall a. [a] -> [a] -> [a]
++ NCMsg -> String
forall a. Show a => a -> String
show NCMsg
unexpected

-- [Unified: Table 10]
ncEffectMonitor :: ProcessId        -- ^ Who's watching?
                -> Identifier       -- ^ Who's being watched?
                -> Maybe MonitorRef -- ^ 'Nothing' to link
                -> NC ()
ncEffectMonitor :: ProcessId -> Identifier -> Maybe MonitorRef -> NC ()
ncEffectMonitor ProcessId
from Identifier
them Maybe MonitorRef
mRef = do
  LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
  Bool
shouldLink <-
    if Bool -> Bool
not (LocalNode -> Identifier -> Bool
isLocal LocalNode
node Identifier
them)
      then Bool -> NC Bool
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
      else Identifier -> NC Bool
isValidLocalIdentifier Identifier
them
  case (Bool
shouldLink, LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
from)) of
    (Bool
True, Bool
_) ->  -- [Unified: first rule]
      case Maybe MonitorRef
mRef of
        Just MonitorRef
ref -> (NCState -> NCState) -> NC ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((NCState -> NCState) -> NC ()) -> (NCState -> NCState) -> NC ()
forall a b. (a -> b) -> a -> b
$ Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
monitors Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
-> (BiMultiMap Identifier ProcessId MonitorRef
    -> BiMultiMap Identifier ProcessId MonitorRef)
-> NCState
-> NCState
forall r a. T r a -> (a -> a) -> r -> r
^: Identifier
-> ProcessId
-> MonitorRef
-> BiMultiMap Identifier ProcessId MonitorRef
-> BiMultiMap Identifier ProcessId MonitorRef
forall a b v.
(Ord a, Ord b, Ord v) =>
a -> b -> v -> BiMultiMap a b v -> BiMultiMap a b v
BiMultiMap.insert Identifier
them ProcessId
from MonitorRef
ref
        Maybe MonitorRef
Nothing  -> (NCState -> NCState) -> NC ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((NCState -> NCState) -> NC ()) -> (NCState -> NCState) -> NC ()
forall a b. (a -> b) -> a -> b
$ Accessor NCState (BiMultiMap Identifier ProcessId ())
links Accessor NCState (BiMultiMap Identifier ProcessId ())
-> (BiMultiMap Identifier ProcessId ()
    -> BiMultiMap Identifier ProcessId ())
-> NCState
-> NCState
forall r a. T r a -> (a -> a) -> r -> r
^: Identifier
-> ProcessId
-> ()
-> BiMultiMap Identifier ProcessId ()
-> BiMultiMap Identifier ProcessId ()
forall a b v.
(Ord a, Ord b, Ord v) =>
a -> b -> v -> BiMultiMap a b v -> BiMultiMap a b v
BiMultiMap.insert Identifier
them ProcessId
from ()
    (Bool
False, Bool
True) -> -- [Unified: second rule]
      ProcessId -> Identifier -> DiedReason -> Maybe MonitorRef -> NC ()
notifyDied ProcessId
from Identifier
them DiedReason
DiedUnknownId Maybe MonitorRef
mRef
    (Bool
False, Bool
False) -> -- [Unified: third rule]
      -- TODO: this is the right sender according to the Unified semantics,
      -- but perhaps having 'them' as the sender would make more sense
      -- (see also: notifyDied)
      NodeId -> NCMsg -> NC ()
ncSendToNode (ProcessId -> NodeId
processNodeId ProcessId
from) (NCMsg -> NC ()) -> NCMsg -> NC ()
forall a b. (a -> b) -> a -> b
$ NCMsg
        { ctrlMsgSender :: Identifier
ctrlMsgSender = NodeId -> Identifier
NodeIdentifier (LocalNode -> NodeId
localNodeId LocalNode
node)
        , ctrlMsgSignal :: ProcessSignal
ctrlMsgSignal = Identifier -> DiedReason -> ProcessSignal
Died Identifier
them DiedReason
DiedUnknownId
        }

-- [Unified: Table 11]
ncEffectUnlink :: ProcessId -> Identifier -> NC ()
ncEffectUnlink :: ProcessId -> Identifier -> NC ()
ncEffectUnlink ProcessId
from Identifier
them = do
  LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
  Bool -> NC () -> NC ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
from)) (NC () -> NC ()) -> NC () -> NC ()
forall a b. (a -> b) -> a -> b
$
    case Identifier
them of
      ProcessIdentifier ProcessId
pid ->
        ProcessId -> DidUnlinkProcess -> NC ()
forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
from (DidUnlinkProcess -> NC ()) -> DidUnlinkProcess -> NC ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> DidUnlinkProcess
DidUnlinkProcess ProcessId
pid
      NodeIdentifier NodeId
nid ->
        ProcessId -> DidUnlinkNode -> NC ()
forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
from (DidUnlinkNode -> NC ()) -> DidUnlinkNode -> NC ()
forall a b. (a -> b) -> a -> b
$ NodeId -> DidUnlinkNode
DidUnlinkNode NodeId
nid
      SendPortIdentifier SendPortId
cid ->
        ProcessId -> DidUnlinkPort -> NC ()
forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
from (DidUnlinkPort -> NC ()) -> DidUnlinkPort -> NC ()
forall a b. (a -> b) -> a -> b
$ SendPortId -> DidUnlinkPort
DidUnlinkPort SendPortId
cid
  (NCState -> NCState) -> NC ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((NCState -> NCState) -> NC ()) -> (NCState -> NCState) -> NC ()
forall a b. (a -> b) -> a -> b
$ Accessor NCState (BiMultiMap Identifier ProcessId ())
links Accessor NCState (BiMultiMap Identifier ProcessId ())
-> (BiMultiMap Identifier ProcessId ()
    -> BiMultiMap Identifier ProcessId ())
-> NCState
-> NCState
forall r a. T r a -> (a -> a) -> r -> r
^: Identifier
-> ProcessId
-> ()
-> BiMultiMap Identifier ProcessId ()
-> BiMultiMap Identifier ProcessId ()
forall a b v.
(Ord a, Ord b, Ord v) =>
a -> b -> v -> BiMultiMap a b v -> BiMultiMap a b v
BiMultiMap.delete Identifier
them ProcessId
from ()

-- [Unified: Table 11]
ncEffectUnmonitor :: ProcessId -> MonitorRef -> NC ()
ncEffectUnmonitor :: ProcessId -> MonitorRef -> NC ()
ncEffectUnmonitor ProcessId
from MonitorRef
ref = do
  LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
  Bool -> NC () -> NC ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
from)) (NC () -> NC ()) -> NC () -> NC ()
forall a b. (a -> b) -> a -> b
$
    ProcessId -> DidUnmonitor -> NC ()
forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
from (DidUnmonitor -> NC ()) -> DidUnmonitor -> NC ()
forall a b. (a -> b) -> a -> b
$ MonitorRef -> DidUnmonitor
DidUnmonitor MonitorRef
ref
  (NCState -> NCState) -> NC ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((NCState -> NCState) -> NC ()) -> (NCState -> NCState) -> NC ()
forall a b. (a -> b) -> a -> b
$ Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
monitors Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
-> (BiMultiMap Identifier ProcessId MonitorRef
    -> BiMultiMap Identifier ProcessId MonitorRef)
-> NCState
-> NCState
forall r a. T r a -> (a -> a) -> r -> r
^: Identifier
-> ProcessId
-> MonitorRef
-> BiMultiMap Identifier ProcessId MonitorRef
-> BiMultiMap Identifier ProcessId MonitorRef
forall a b v.
(Ord a, Ord b, Ord v) =>
a -> b -> v -> BiMultiMap a b v -> BiMultiMap a b v
BiMultiMap.delete (MonitorRef -> Identifier
monitorRefIdent MonitorRef
ref) ProcessId
from MonitorRef
ref

-- [Unified: Table 12]
ncEffectDied :: Identifier -> DiedReason -> NC ()
ncEffectDied :: Identifier -> DiedReason -> NC ()
ncEffectDied Identifier
ident DiedReason
reason = do
  LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
  LocalNode -> Identifier -> DiedReason -> NC ()
traceNotifyDied LocalNode
node Identifier
ident DiedReason
reason
  (Map Identifier (Set (ProcessId, ()))
affectedLinks, BiMultiMap Identifier ProcessId ()
unaffectedLinks) <- (NCState
 -> (Map Identifier (Set (ProcessId, ())),
     BiMultiMap Identifier ProcessId ()))
-> NC
     (Map Identifier (Set (ProcessId, ())),
      BiMultiMap Identifier ProcessId ())
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (Identifier
-> BiMultiMap Identifier ProcessId ()
-> (Map Identifier (Set (ProcessId, ())),
    BiMultiMap Identifier ProcessId ())
forall a v.
(Ord a, Ord v) =>
Identifier
-> BiMultiMap Identifier a v
-> (Map Identifier (Set (a, v)), BiMultiMap Identifier a v)
splitNotif Identifier
ident (BiMultiMap Identifier ProcessId ()
 -> (Map Identifier (Set (ProcessId, ())),
     BiMultiMap Identifier ProcessId ()))
-> (NCState -> BiMultiMap Identifier ProcessId ())
-> NCState
-> (Map Identifier (Set (ProcessId, ())),
    BiMultiMap Identifier ProcessId ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (NCState
-> Accessor NCState (BiMultiMap Identifier ProcessId ())
-> BiMultiMap Identifier ProcessId ()
forall r a. r -> T r a -> a
^. Accessor NCState (BiMultiMap Identifier ProcessId ())
links))
  (Map Identifier (Set (ProcessId, MonitorRef))
affectedMons,  BiMultiMap Identifier ProcessId MonitorRef
unaffectedMons)  <- (NCState
 -> (Map Identifier (Set (ProcessId, MonitorRef)),
     BiMultiMap Identifier ProcessId MonitorRef))
-> NC
     (Map Identifier (Set (ProcessId, MonitorRef)),
      BiMultiMap Identifier ProcessId MonitorRef)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (Identifier
-> BiMultiMap Identifier ProcessId MonitorRef
-> (Map Identifier (Set (ProcessId, MonitorRef)),
    BiMultiMap Identifier ProcessId MonitorRef)
forall a v.
(Ord a, Ord v) =>
Identifier
-> BiMultiMap Identifier a v
-> (Map Identifier (Set (a, v)), BiMultiMap Identifier a v)
splitNotif Identifier
ident (BiMultiMap Identifier ProcessId MonitorRef
 -> (Map Identifier (Set (ProcessId, MonitorRef)),
     BiMultiMap Identifier ProcessId MonitorRef))
-> (NCState -> BiMultiMap Identifier ProcessId MonitorRef)
-> NCState
-> (Map Identifier (Set (ProcessId, MonitorRef)),
    BiMultiMap Identifier ProcessId MonitorRef)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (NCState
-> Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
-> BiMultiMap Identifier ProcessId MonitorRef
forall r a. r -> T r a -> a
^. Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
monitors))

--  _registry :: !(Map (String,NodeId) ProcessId)

  let localOnly :: Bool
localOnly = case Identifier
ident of NodeIdentifier NodeId
_ -> Bool
True ; Identifier
_ -> Bool
False

  [(Identifier, Set (ProcessId, ()))]
-> ((Identifier, Set (ProcessId, ())) -> NC ()) -> NC ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map Identifier (Set (ProcessId, ()))
-> [(Identifier, Set (ProcessId, ()))]
forall k a. Map k a -> [(k, a)]
Map.toList Map Identifier (Set (ProcessId, ()))
affectedLinks) (((Identifier, Set (ProcessId, ())) -> NC ()) -> NC ())
-> ((Identifier, Set (ProcessId, ())) -> NC ()) -> NC ()
forall a b. (a -> b) -> a -> b
$ \(Identifier
them, Set (ProcessId, ())
uss) ->
    Set (ProcessId, ()) -> ((ProcessId, ()) -> NC ()) -> NC ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Set (ProcessId, ())
uss (((ProcessId, ()) -> NC ()) -> NC ())
-> ((ProcessId, ()) -> NC ()) -> NC ()
forall a b. (a -> b) -> a -> b
$ \(ProcessId
us, ()
_) ->
      Bool -> NC () -> NC ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
localOnly Bool -> Bool -> Bool
forall a. Ord a => a -> a -> Bool
<= LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
us)) (NC () -> NC ()) -> NC () -> NC ()
forall a b. (a -> b) -> a -> b
$
        ProcessId -> Identifier -> DiedReason -> Maybe MonitorRef -> NC ()
notifyDied ProcessId
us Identifier
them DiedReason
reason Maybe MonitorRef
forall a. Maybe a
Nothing

  [(Identifier, Set (ProcessId, MonitorRef))]
-> ((Identifier, Set (ProcessId, MonitorRef)) -> NC ()) -> NC ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map Identifier (Set (ProcessId, MonitorRef))
-> [(Identifier, Set (ProcessId, MonitorRef))]
forall k a. Map k a -> [(k, a)]
Map.toList Map Identifier (Set (ProcessId, MonitorRef))
affectedMons) (((Identifier, Set (ProcessId, MonitorRef)) -> NC ()) -> NC ())
-> ((Identifier, Set (ProcessId, MonitorRef)) -> NC ()) -> NC ()
forall a b. (a -> b) -> a -> b
$ \(Identifier
them, Set (ProcessId, MonitorRef)
refs) ->
    Set (ProcessId, MonitorRef)
-> ((ProcessId, MonitorRef) -> NC ()) -> NC ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Set (ProcessId, MonitorRef)
refs (((ProcessId, MonitorRef) -> NC ()) -> NC ())
-> ((ProcessId, MonitorRef) -> NC ()) -> NC ()
forall a b. (a -> b) -> a -> b
$ \(ProcessId
us, MonitorRef
ref) ->
      Bool -> NC () -> NC ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
localOnly Bool -> Bool -> Bool
forall a. Ord a => a -> a -> Bool
<= LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
us)) (NC () -> NC ()) -> NC () -> NC ()
forall a b. (a -> b) -> a -> b
$
        ProcessId -> Identifier -> DiedReason -> Maybe MonitorRef -> NC ()
notifyDied ProcessId
us Identifier
them DiedReason
reason (MonitorRef -> Maybe MonitorRef
forall a. a -> Maybe a
Just MonitorRef
ref)

  -- Notify remote nodes that the process died so it can be removed from monitor
  -- lists.
  (NodeId -> NC ()) -> [NodeId] -> NC ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (LocalNode -> NodeId -> NC ()
forwardDeath LocalNode
node) ([NodeId] -> NC ()) -> [NodeId] -> NC ()
forall a b. (a -> b) -> a -> b
$
    [ NodeId
nid | ProcessIdentifier ProcessId
pid <- [Identifier
ident]
          , Identifier
i <- Set Identifier -> [Identifier]
forall a. Set a -> [a]
Set.toList (Set Identifier -> [Identifier]) -> Set Identifier -> [Identifier]
forall a b. (a -> b) -> a -> b
$ Set Identifier -> Set Identifier -> Set Identifier
forall a. Ord a => Set a -> Set a -> Set a
Set.union
             (((Identifier, ()) -> Identifier)
-> Set (Identifier, ()) -> Set Identifier
forall b a. Ord b => (a -> b) -> Set a -> Set b
Set.map (Identifier, ()) -> Identifier
forall a b. (a, b) -> a
fst (Set (Identifier, ()) -> Set Identifier)
-> Set (Identifier, ()) -> Set Identifier
forall a b. (a -> b) -> a -> b
$ ProcessId
-> BiMultiMap Identifier ProcessId () -> Set (Identifier, ())
forall b a v. Ord b => b -> BiMultiMap a b v -> Set (a, v)
BiMultiMap.lookupBy2nd ProcessId
pid BiMultiMap Identifier ProcessId ()
unaffectedLinks)
             (((Identifier, MonitorRef) -> Identifier)
-> Set (Identifier, MonitorRef) -> Set Identifier
forall b a. Ord b => (a -> b) -> Set a -> Set b
Set.map (Identifier, MonitorRef) -> Identifier
forall a b. (a, b) -> a
fst (Set (Identifier, MonitorRef) -> Set Identifier)
-> Set (Identifier, MonitorRef) -> Set Identifier
forall a b. (a -> b) -> a -> b
$ ProcessId
-> BiMultiMap Identifier ProcessId MonitorRef
-> Set (Identifier, MonitorRef)
forall b a v. Ord b => b -> BiMultiMap a b v -> Set (a, v)
BiMultiMap.lookupBy2nd ProcessId
pid BiMultiMap Identifier ProcessId MonitorRef
unaffectedMons)
          , let nid :: NodeId
nid = Identifier -> NodeId
nodeOf Identifier
i
          , NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
/= LocalNode -> NodeId
localNodeId LocalNode
node
    ]

  -- Delete monitors in the local node.
  let deleteDeads :: (Ord a, Ord v)
                  => BiMultiMap a ProcessId v -> BiMultiMap a ProcessId v
      deleteDeads :: forall a v.
(Ord a, Ord v) =>
BiMultiMap a ProcessId v -> BiMultiMap a ProcessId v
deleteDeads = case Identifier
ident of
                      -- deleteAllBy2nd is faster than partitionWithKeyBy2nd
                      ProcessIdentifier ProcessId
pid -> ProcessId -> BiMultiMap a ProcessId v -> BiMultiMap a ProcessId v
forall a b v.
(Ord a, Ord b, Ord v) =>
b -> BiMultiMap a b v -> BiMultiMap a b v
BiMultiMap.deleteAllBy2nd ProcessId
pid
                      Identifier
_ -> (Map ProcessId (Set (a, v)), BiMultiMap a ProcessId v)
-> BiMultiMap a ProcessId v
forall a b. (a, b) -> b
snd ((Map ProcessId (Set (a, v)), BiMultiMap a ProcessId v)
 -> BiMultiMap a ProcessId v)
-> (BiMultiMap a ProcessId v
    -> (Map ProcessId (Set (a, v)), BiMultiMap a ProcessId v))
-> BiMultiMap a ProcessId v
-> BiMultiMap a ProcessId v
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ProcessId -> Set (a, v) -> Bool)
-> BiMultiMap a ProcessId v
-> (Map ProcessId (Set (a, v)), BiMultiMap a ProcessId v)
forall a b v.
(Ord a, Ord b, Ord v) =>
(b -> Set (a, v) -> Bool)
-> BiMultiMap a b v -> (Map b (Set (a, v)), BiMultiMap a b v)
BiMultiMap.partitionWithKeyBy2nd
                        (\ProcessId
pid Set (a, v)
_ -> Identifier
ident Identifier -> Identifier -> Bool
`impliesDeathOf` ProcessId -> Identifier
ProcessIdentifier ProcessId
pid)
      unaffectedLinks' :: BiMultiMap Identifier ProcessId ()
unaffectedLinks' = BiMultiMap Identifier ProcessId ()
-> BiMultiMap Identifier ProcessId ()
forall a v.
(Ord a, Ord v) =>
BiMultiMap a ProcessId v -> BiMultiMap a ProcessId v
deleteDeads BiMultiMap Identifier ProcessId ()
unaffectedLinks
      unaffectedMons' :: BiMultiMap Identifier ProcessId MonitorRef
unaffectedMons' = BiMultiMap Identifier ProcessId MonitorRef
-> BiMultiMap Identifier ProcessId MonitorRef
forall a v.
(Ord a, Ord v) =>
BiMultiMap a ProcessId v -> BiMultiMap a ProcessId v
deleteDeads BiMultiMap Identifier ProcessId MonitorRef
unaffectedMons

  (NCState -> NCState) -> NC ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((NCState -> NCState) -> NC ()) -> (NCState -> NCState) -> NC ()
forall a b. (a -> b) -> a -> b
$ (Accessor NCState (BiMultiMap Identifier ProcessId ())
links Accessor NCState (BiMultiMap Identifier ProcessId ())
-> BiMultiMap Identifier ProcessId () -> NCState -> NCState
forall r a. T r a -> a -> r -> r
^= BiMultiMap Identifier ProcessId ()
unaffectedLinks') (NCState -> NCState) -> (NCState -> NCState) -> NCState -> NCState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
monitors Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
-> BiMultiMap Identifier ProcessId MonitorRef -> NCState -> NCState
forall r a. T r a -> a -> r -> r
^= BiMultiMap Identifier ProcessId MonitorRef
unaffectedMons')

  -- we now consider all labels for this identifier unregistered
  let toDrop :: ProcessId -> Bool
toDrop ProcessId
pid = Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Identifier
ident Identifier -> Identifier -> Bool
`impliesDeathOf` ProcessId -> Identifier
ProcessIdentifier ProcessId
pid
  (Map String ProcessId
keepNames, Map String ProcessId
dropNames) <- (ProcessId -> Bool)
-> Map String ProcessId
-> (Map String ProcessId, Map String ProcessId)
forall a k. (a -> Bool) -> Map k a -> (Map k a, Map k a)
Map.partition ProcessId -> Bool
toDrop (Map String ProcessId
 -> (Map String ProcessId, Map String ProcessId))
-> NC (Map String ProcessId)
-> NC (Map String ProcessId, Map String ProcessId)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (NCState -> Map String ProcessId) -> NC (Map String ProcessId)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (NCState -> T NCState (Map String ProcessId) -> Map String ProcessId
forall r a. r -> T r a -> a
^. T NCState (Map String ProcessId)
registeredHere)
  ((String, ProcessId) -> NC ()) -> [(String, ProcessId)] -> NC ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(String
p, ProcessId
l) -> IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> MxEvent -> IO ()
trace LocalNode
node (ProcessId -> String -> MxEvent
MxUnRegistered ProcessId
l String
p)) (Map String ProcessId -> [(String, ProcessId)]
forall k a. Map k a -> [(k, a)]
Map.toList Map String ProcessId
dropNames)
  (NCState -> NCState) -> NC ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((NCState -> NCState) -> NC ()) -> (NCState -> NCState) -> NC ()
forall a b. (a -> b) -> a -> b
$ T NCState (Map String ProcessId)
registeredHere T NCState (Map String ProcessId)
-> Map String ProcessId -> NCState -> NCState
forall r a. T r a -> a -> r -> r
^= Map String ProcessId
keepNames

  [Maybe (ProcessId, [(NodeId, Int)])]
remaining <- (Map ProcessId [(NodeId, Int)] -> [(ProcessId, [(NodeId, Int)])])
-> NC (Map ProcessId [(NodeId, Int)])
-> NC [(ProcessId, [(NodeId, Int)])]
forall a b. (a -> b) -> NC a -> NC b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Map ProcessId [(NodeId, Int)] -> [(ProcessId, [(NodeId, Int)])]
forall k a. Map k a -> [(k, a)]
Map.toList ((NCState -> Map ProcessId [(NodeId, Int)])
-> NC (Map ProcessId [(NodeId, Int)])
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (NCState
-> T NCState (Map ProcessId [(NodeId, Int)])
-> Map ProcessId [(NodeId, Int)]
forall r a. r -> T r a -> a
^. T NCState (Map ProcessId [(NodeId, Int)])
registeredOnNodes)) NC [(ProcessId, [(NodeId, Int)])]
-> ([(ProcessId, [(NodeId, Int)])]
    -> NC [Maybe (ProcessId, [(NodeId, Int)])])
-> NC [Maybe (ProcessId, [(NodeId, Int)])]
forall a b. NC a -> (a -> NC b) -> NC b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
      ((ProcessId, [(NodeId, Int)])
 -> NC (Maybe (ProcessId, [(NodeId, Int)])))
-> [(ProcessId, [(NodeId, Int)])]
-> NC [Maybe (ProcessId, [(NodeId, Int)])]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (\(ProcessId
pid,[(NodeId, Int)]
nidlist) ->
        case Identifier
ident Identifier -> Identifier -> Bool
`impliesDeathOf` ProcessId -> Identifier
ProcessIdentifier ProcessId
pid of
           Bool
True ->
              do [(NodeId, Int)] -> ((NodeId, Int) -> NC ()) -> NC ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(NodeId, Int)]
nidlist (((NodeId, Int) -> NC ()) -> NC ())
-> ((NodeId, Int) -> NC ()) -> NC ()
forall a b. (a -> b) -> a -> b
$ \(NodeId
nid,Int
_) ->
                   Bool -> NC () -> NC ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ LocalNode -> Identifier -> Bool
isLocal LocalNode
node (NodeId -> Identifier
NodeIdentifier NodeId
nid))
                      (LocalNode -> NodeId -> NC ()
forwardDeath LocalNode
node NodeId
nid)
                 Maybe (ProcessId, [(NodeId, Int)])
-> NC (Maybe (ProcessId, [(NodeId, Int)]))
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (ProcessId, [(NodeId, Int)])
forall a. Maybe a
Nothing
           Bool
False -> Maybe (ProcessId, [(NodeId, Int)])
-> NC (Maybe (ProcessId, [(NodeId, Int)]))
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (ProcessId, [(NodeId, Int)])
 -> NC (Maybe (ProcessId, [(NodeId, Int)])))
-> Maybe (ProcessId, [(NodeId, Int)])
-> NC (Maybe (ProcessId, [(NodeId, Int)]))
forall a b. (a -> b) -> a -> b
$ (ProcessId, [(NodeId, Int)]) -> Maybe (ProcessId, [(NodeId, Int)])
forall a. a -> Maybe a
Just (ProcessId
pid,[(NodeId, Int)]
nidlist)  )
  (NCState -> NCState) -> NC ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((NCState -> NCState) -> NC ()) -> (NCState -> NCState) -> NC ()
forall a b. (a -> b) -> a -> b
$ T NCState (Map ProcessId [(NodeId, Int)])
registeredOnNodes T NCState (Map ProcessId [(NodeId, Int)])
-> Map ProcessId [(NodeId, Int)] -> NCState -> NCState
forall r a. T r a -> a -> r -> r
^= ([(ProcessId, [(NodeId, Int)])] -> Map ProcessId [(NodeId, Int)]
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([Maybe (ProcessId, [(NodeId, Int)])]
-> [(ProcessId, [(NodeId, Int)])]
forall a. [Maybe a] -> [a]
catMaybes [Maybe (ProcessId, [(NodeId, Int)])]
remaining))
    where
       forwardDeath :: LocalNode -> NodeId -> NC ()
forwardDeath LocalNode
node NodeId
nid = NodeId -> NCMsg -> NC ()
ncSendToNode NodeId
nid
           NCMsg { ctrlMsgSender :: Identifier
ctrlMsgSender = NodeId -> Identifier
NodeIdentifier (LocalNode -> NodeId
localNodeId LocalNode
node)
                 , ctrlMsgSignal :: ProcessSignal
ctrlMsgSignal = Identifier -> DiedReason -> ProcessSignal
Died Identifier
ident DiedReason
reason
                 }

-- [Unified: Table 13]
ncEffectSpawn :: ProcessId -> Closure (Process ()) -> SpawnRef -> NC ()
ncEffectSpawn :: ProcessId -> Closure (Process ()) -> SpawnRef -> NC ()
ncEffectSpawn ProcessId
pid Closure (Process ())
cProc SpawnRef
ref = do
  Either String (Process ())
mProc <- Closure (Process ()) -> NC (Either String (Process ()))
forall a. Typeable a => Closure a -> NC (Either String a)
unClosure Closure (Process ())
cProc
  -- If the closure does not exist, we spawn a process that throws an exception
  -- This allows the remote node to find out what's happening
  -- TODO:
  let proc :: Process ()
proc = case Either String (Process ())
mProc of
               Left String
err -> String -> Process ()
forall a. String -> Process a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Error: Could not resolve closure: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
err
               Right Process ()
p  -> Process ()
p
  LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
  ProcessId
pid' <- IO ProcessId -> NC ProcessId
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ProcessId -> NC ProcessId) -> IO ProcessId -> NC ProcessId
forall a b. (a -> b) -> a -> b
$ LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
node Process ()
proc
  ProcessId -> Message -> NC ()
ncSendToProcess ProcessId
pid (Message -> NC ()) -> Message -> NC ()
forall a b. (a -> b) -> a -> b
$ DidSpawn -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage (DidSpawn -> Message) -> DidSpawn -> Message
forall a b. (a -> b) -> a -> b
$ SpawnRef -> ProcessId -> DidSpawn
DidSpawn SpawnRef
ref ProcessId
pid'

-- Unified semantics does not explicitly describe how to implement 'register',
-- but mentions it's "very similar to nsend" (Table 14)
-- We send a response indicated if the operation is invalid
ncEffectRegister :: ProcessId -> String -> NodeId -> Maybe ProcessId -> Bool -> NC ()
ncEffectRegister :: ProcessId -> String -> NodeId -> Maybe ProcessId -> Bool -> NC ()
ncEffectRegister ProcessId
from String
label NodeId
atnode Maybe ProcessId
mPid Bool
reregistration = do
  LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
  Maybe ProcessId
currentVal <- (NCState -> Maybe ProcessId) -> NC (Maybe ProcessId)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (NCState -> T NCState (Maybe ProcessId) -> Maybe ProcessId
forall r a. r -> T r a -> a
^. String -> T NCState (Maybe ProcessId)
registeredHereFor String
label)
  Bool
isOk <-
       case Maybe ProcessId
mPid of
         Maybe ProcessId
Nothing -> -- unregister request
           Bool -> NC Bool
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> NC Bool) -> Bool -> NC Bool
forall a b. (a -> b) -> a -> b
$ Maybe ProcessId -> Bool
forall a. Maybe a -> Bool
isJust Maybe ProcessId
currentVal
         Just ProcessId
thepid -> -- register request
           do Bool
isvalidlocal <- Identifier -> NC Bool
isValidLocalIdentifier (ProcessId -> Identifier
ProcessIdentifier ProcessId
thepid)
              Bool -> NC Bool
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> NC Bool) -> Bool -> NC Bool
forall a b. (a -> b) -> a -> b
$ (Maybe ProcessId -> Bool
forall a. Maybe a -> Bool
isNothing Maybe ProcessId
currentVal Bool -> Bool -> Bool
forall a. Eq a => a -> a -> Bool
/= Bool
reregistration) Bool -> Bool -> Bool
&&
                (Bool -> Bool
not (LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
thepid) ) Bool -> Bool -> Bool
|| Bool
isvalidlocal )
  if LocalNode -> Identifier -> Bool
isLocal LocalNode
node (NodeId -> Identifier
NodeIdentifier NodeId
atnode)
     then do Bool -> NC () -> NC ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
isOk (NC () -> NC ()) -> NC () -> NC ()
forall a b. (a -> b) -> a -> b
$
               do (NCState -> NCState) -> NC ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((NCState -> NCState) -> NC ()) -> (NCState -> NCState) -> NC ()
forall a b. (a -> b) -> a -> b
$ String -> T NCState (Maybe ProcessId)
registeredHereFor String
label T NCState (Maybe ProcessId)
-> Maybe ProcessId -> NCState -> NCState
forall r a. T r a -> a -> r -> r
^= Maybe ProcessId
mPid
                  LocalNode -> Maybe ProcessId -> Maybe ProcessId -> NC ()
updateRemote LocalNode
node Maybe ProcessId
currentVal Maybe ProcessId
mPid
                  case Maybe ProcessId
mPid of
                    (Just ProcessId
p) -> do
                      if Bool
reregistration
                        then IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> MxEvent -> IO ()
trace LocalNode
node (ProcessId -> String -> MxEvent
MxUnRegistered (Maybe ProcessId -> ProcessId
forall a. HasCallStack => Maybe a -> a
fromJust Maybe ProcessId
currentVal) String
label)
                        else () -> NC ()
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                      IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> MxEvent -> IO ()
trace LocalNode
node (ProcessId -> String -> MxEvent
MxRegistered ProcessId
p String
label)
                    Maybe ProcessId
Nothing  -> IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> MxEvent -> IO ()
trace LocalNode
node (ProcessId -> String -> MxEvent
MxUnRegistered (Maybe ProcessId -> ProcessId
forall a. HasCallStack => Maybe a -> a
fromJust Maybe ProcessId
currentVal) String
label)
             Maybe ProcessId
newVal <- (NCState -> Maybe ProcessId) -> NC (Maybe ProcessId)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (NCState -> T NCState (Maybe ProcessId) -> Maybe ProcessId
forall r a. r -> T r a -> a
^. String -> T NCState (Maybe ProcessId)
registeredHereFor String
label)
             ProcessId -> Message -> NC ()
ncSendToProcess ProcessId
from (Message -> NC ()) -> Message -> NC ()
forall a b. (a -> b) -> a -> b
$ RegisterReply -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage (RegisterReply -> Message) -> RegisterReply -> Message
forall a b. (a -> b) -> a -> b
$
               String -> Bool -> Maybe ProcessId -> RegisterReply
RegisterReply String
label Bool
isOk Maybe ProcessId
newVal
     else let operation :: NodeId -> [(NodeId, Int)] -> [(NodeId, Int)]
operation =
                 case Bool
reregistration of
                    Bool
True -> ([(NodeId, Int)] -> NodeId -> [(NodeId, Int)])
-> NodeId -> [(NodeId, Int)] -> [(NodeId, Int)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip [(NodeId, Int)] -> NodeId -> [(NodeId, Int)]
forall {a} {t}. (Num a, Eq a, Eq t) => [(t, a)] -> t -> [(t, a)]
decList
                    Bool
False -> ([(NodeId, Int)] -> NodeId -> [(NodeId, Int)])
-> NodeId -> [(NodeId, Int)] -> [(NodeId, Int)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip [(NodeId, Int)] -> NodeId -> [(NodeId, Int)]
forall {b} {t}. (Num b, Eq t) => [(t, b)] -> t -> [(t, b)]
incList
           in case Maybe ProcessId
mPid of
                Maybe ProcessId
Nothing -> () -> NC ()
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                Just ProcessId
pid -> (NCState -> NCState) -> NC ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((NCState -> NCState) -> NC ()) -> (NCState -> NCState) -> NC ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> Accessor NCState (Maybe [(NodeId, Int)])
registeredOnNodesFor ProcessId
pid Accessor NCState (Maybe [(NodeId, Int)])
-> (Maybe [(NodeId, Int)] -> Maybe [(NodeId, Int)])
-> NCState
-> NCState
forall r a. T r a -> (a -> a) -> r -> r
^: (([(NodeId, Int)] -> [(NodeId, Int)])
-> Maybe [(NodeId, Int)] -> Maybe [(NodeId, Int)]
forall {a} {a}. ([a] -> [a]) -> Maybe [a] -> Maybe [a]
maybeify (([(NodeId, Int)] -> [(NodeId, Int)])
 -> Maybe [(NodeId, Int)] -> Maybe [(NodeId, Int)])
-> ([(NodeId, Int)] -> [(NodeId, Int)])
-> Maybe [(NodeId, Int)]
-> Maybe [(NodeId, Int)]
forall a b. (a -> b) -> a -> b
$ NodeId -> [(NodeId, Int)] -> [(NodeId, Int)]
operation NodeId
atnode)
      where updateRemote :: LocalNode -> Maybe ProcessId -> Maybe ProcessId -> NC ()
updateRemote LocalNode
node (Just ProcessId
oldval) (Just ProcessId
newval) | ProcessId -> NodeId
processNodeId ProcessId
oldval NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
/= ProcessId -> NodeId
processNodeId ProcessId
newval =
              do LocalNode -> NodeId -> ProcessSignal -> NC ()
forward LocalNode
node (ProcessId -> NodeId
processNodeId ProcessId
oldval) (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
atnode (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
oldval) Bool
True)
                 LocalNode -> NodeId -> ProcessSignal -> NC ()
forward LocalNode
node (ProcessId -> NodeId
processNodeId ProcessId
newval) (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
atnode (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
newval) Bool
False)
            updateRemote LocalNode
node Maybe ProcessId
Nothing (Just ProcessId
newval) =
                 LocalNode -> NodeId -> ProcessSignal -> NC ()
forward LocalNode
node (ProcessId -> NodeId
processNodeId ProcessId
newval) (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
atnode (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
newval) Bool
False)
            updateRemote LocalNode
node (Just ProcessId
oldval) Maybe ProcessId
Nothing =
                 LocalNode -> NodeId -> ProcessSignal -> NC ()
forward LocalNode
node (ProcessId -> NodeId
processNodeId ProcessId
oldval) (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
atnode (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
oldval) Bool
True)
            updateRemote LocalNode
_ Maybe ProcessId
_ Maybe ProcessId
_ = () -> NC ()
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            maybeify :: ([a] -> [a]) -> Maybe [a] -> Maybe [a]
maybeify [a] -> [a]
f Maybe [a]
Nothing = [a] -> Maybe [a]
forall {a}. [a] -> Maybe [a]
unmaybeify ([a] -> Maybe [a]) -> [a] -> Maybe [a]
forall a b. (a -> b) -> a -> b
$ [a] -> [a]
f []
            maybeify [a] -> [a]
f (Just [a]
x) = [a] -> Maybe [a]
forall {a}. [a] -> Maybe [a]
unmaybeify ([a] -> Maybe [a]) -> [a] -> Maybe [a]
forall a b. (a -> b) -> a -> b
$ [a] -> [a]
f [a]
x

            unmaybeify :: [a] -> Maybe [a]
unmaybeify [] = Maybe [a]
forall a. Maybe a
Nothing
            unmaybeify [a]
x = [a] -> Maybe [a]
forall a. a -> Maybe a
Just [a]
x
            incList :: [(t, b)] -> t -> [(t, b)]
incList [] t
tag = [(t
tag,b
1)]
            incList ((t
atag,b
acount):[(t, b)]
xs) t
tag | t
tagt -> t -> Bool
forall a. Eq a => a -> a -> Bool
==t
atag = (t
atag,b
acountb -> b -> b
forall a. Num a => a -> a -> a
+b
1) (t, b) -> [(t, b)] -> [(t, b)]
forall a. a -> [a] -> [a]
: [(t, b)]
xs
            incList ((t, b)
x:[(t, b)]
xs) t
tag = (t, b)
x (t, b) -> [(t, b)] -> [(t, b)]
forall a. a -> [a] -> [a]
: [(t, b)] -> t -> [(t, b)]
incList [(t, b)]
xs t
tag
            decList :: [(t, a)] -> t -> [(t, a)]
decList [] t
_ = []
            decList ((t
atag,a
1):[(t, a)]
xs) t
tag | t
atag t -> t -> Bool
forall a. Eq a => a -> a -> Bool
== t
tag = [(t, a)]
xs
            decList ((t
atag,a
n):[(t, a)]
xs) t
tag | t
atag t -> t -> Bool
forall a. Eq a => a -> a -> Bool
== t
tag = (t
atag,a
na -> a -> a
forall a. Num a => a -> a -> a
-a
1)(t, a) -> [(t, a)] -> [(t, a)]
forall a. a -> [a] -> [a]
:[(t, a)]
xs
            decList ((t, a)
x:[(t, a)]
xs) t
tag = (t, a)
x(t, a) -> [(t, a)] -> [(t, a)]
forall a. a -> [a] -> [a]
:[(t, a)] -> t -> [(t, a)]
decList [(t, a)]
xs t
tag
            forward :: LocalNode -> NodeId -> ProcessSignal -> NC ()
forward LocalNode
node NodeId
to ProcessSignal
reg =
              Bool -> NC () -> NC ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ LocalNode -> Identifier -> Bool
isLocal LocalNode
node (NodeId -> Identifier
NodeIdentifier NodeId
to)) (NC () -> NC ()) -> NC () -> NC ()
forall a b. (a -> b) -> a -> b
$
                NodeId -> NCMsg -> NC ()
ncSendToNode NodeId
to (NCMsg -> NC ()) -> NCMsg -> NC ()
forall a b. (a -> b) -> a -> b
$ NCMsg { ctrlMsgSender :: Identifier
ctrlMsgSender = ProcessId -> Identifier
ProcessIdentifier ProcessId
from
                                        , ctrlMsgSignal :: ProcessSignal
ctrlMsgSignal = ProcessSignal
reg
                                        }


-- Unified semantics does not explicitly describe 'whereis'
ncEffectWhereIs :: ProcessId -> String -> NC ()
ncEffectWhereIs :: ProcessId -> String -> NC ()
ncEffectWhereIs ProcessId
from String
label = do
  Maybe ProcessId
mPid <- (NCState -> Maybe ProcessId) -> NC (Maybe ProcessId)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (NCState -> T NCState (Maybe ProcessId) -> Maybe ProcessId
forall r a. r -> T r a -> a
^. String -> T NCState (Maybe ProcessId)
registeredHereFor String
label)
  ProcessId -> Message -> NC ()
ncSendToProcess ProcessId
from (Message -> NC ()) -> Message -> NC ()
forall a b. (a -> b) -> a -> b
$ WhereIsReply -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage (WhereIsReply -> Message) -> WhereIsReply -> Message
forall a b. (a -> b) -> a -> b
$ String -> Maybe ProcessId -> WhereIsReply
WhereIsReply String
label Maybe ProcessId
mPid

-- [Unified: Table 14]
ncEffectNamedSend :: String -> Message -> NC ()
ncEffectNamedSend :: String -> Message -> NC ()
ncEffectNamedSend String
label Message
msg = do
  Maybe ProcessId
mPid <- (NCState -> Maybe ProcessId) -> NC (Maybe ProcessId)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (NCState -> T NCState (Maybe ProcessId) -> Maybe ProcessId
forall r a. r -> T r a -> a
^. String -> T NCState (Maybe ProcessId)
registeredHereFor String
label)
  -- If mPid is Nothing, we just ignore the named send (as per Table 14)
  Maybe ProcessId -> (ProcessId -> NC ()) -> NC ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe ProcessId
mPid ((ProcessId -> NC ()) -> NC ()) -> (ProcessId -> NC ()) -> NC ()
forall a b. (a -> b) -> a -> b
$ \ProcessId
to ->
    -- If this is a trace message we don't trace it to avoid entering a loop
    -- where trace messages produce more trace messages.
    Bool -> ProcessId -> Message -> NC ()
ncSendToProcessAndTrace (String
label String -> String -> Bool
forall a. Eq a => a -> a -> Bool
/= String
"trace.logger") ProcessId
to Message
msg

-- [Issue #DP-20]
ncEffectLocalSend :: LocalNode -> ProcessId -> Message -> NC ()
ncEffectLocalSend :: LocalNode -> ProcessId -> Message -> NC ()
ncEffectLocalSend = Bool -> LocalNode -> ProcessId -> Message -> NC ()
ncEffectLocalSendAndTrace Bool
True

ncEffectLocalSendAndTrace :: Bool -> LocalNode -> ProcessId -> Message -> NC ()
ncEffectLocalSendAndTrace :: Bool -> LocalNode -> ProcessId -> Message -> NC ()
ncEffectLocalSendAndTrace Bool
shouldTrace LocalNode
node ProcessId
to Message
msg =
  IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> ProcessId -> (LocalProcess -> IO ()) -> IO ()
withLocalProc LocalNode
node ProcessId
to ((LocalProcess -> IO ()) -> IO ())
-> (LocalProcess -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \LocalProcess
p -> do
    CQueue Message -> Message -> IO ()
forall a. CQueue a -> a -> IO ()
enqueue (LocalProcess -> CQueue Message
processQueue LocalProcess
p) Message
msg
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
shouldTrace (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> MxEvent -> IO ()
trace LocalNode
node (ProcessId -> Message -> MxEvent
MxReceived ProcessId
to Message
msg)

-- [Issue #DP-20]
ncEffectLocalPortSend :: SendPortId -> Message -> NC ()
ncEffectLocalPortSend :: SendPortId -> Message -> NC ()
ncEffectLocalPortSend SendPortId
from Message
msg = do
  LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
  let pid :: ProcessId
pid = SendPortId -> ProcessId
sendPortProcessId SendPortId
from
      cid :: Int32
cid = SendPortId -> Int32
sendPortLocalId   SendPortId
from
  IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> ProcessId -> (LocalProcess -> IO ()) -> IO ()
withLocalProc LocalNode
node ProcessId
pid ((LocalProcess -> IO ()) -> IO ())
-> (LocalProcess -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \LocalProcess
proc -> do
    Maybe TypedChannel
mChan <- StrictMVar LocalProcessState
-> (LocalProcessState -> IO (Maybe TypedChannel))
-> IO (Maybe TypedChannel)
forall a b. StrictMVar a -> (a -> IO b) -> IO b
withMVar (LocalProcess -> StrictMVar LocalProcessState
processState LocalProcess
proc) ((LocalProcessState -> IO (Maybe TypedChannel))
 -> IO (Maybe TypedChannel))
-> (LocalProcessState -> IO (Maybe TypedChannel))
-> IO (Maybe TypedChannel)
forall a b. (a -> b) -> a -> b
$ Maybe TypedChannel -> IO (Maybe TypedChannel)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe TypedChannel -> IO (Maybe TypedChannel))
-> (LocalProcessState -> Maybe TypedChannel)
-> LocalProcessState
-> IO (Maybe TypedChannel)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (LocalProcessState
-> T LocalProcessState (Maybe TypedChannel) -> Maybe TypedChannel
forall r a. r -> T r a -> a
^. Int32 -> T LocalProcessState (Maybe TypedChannel)
typedChannelWithId Int32
cid)
    case Maybe TypedChannel
mChan of
      -- in the unlikely event we know nothing about this channel id,
      -- there's little to be done - perhaps some logging/tracing though...
      Maybe TypedChannel
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      Just (TypedChannel Weak (TQueue a)
chan') -> do
        -- If ch is Nothing, the process has given up the read end of
        -- the channel and we simply ignore the incoming message - this
        Maybe (TQueue a)
ch <- Weak (TQueue a) -> IO (Maybe (TQueue a))
forall v. Weak v -> IO (Maybe v)
deRefWeak Weak (TQueue a)
chan'
        Maybe (TQueue a) -> (TQueue a -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (TQueue a)
ch ((TQueue a -> IO ()) -> IO ()) -> (TQueue a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \TQueue a
chan -> LocalNode -> SendPortId -> Message -> TQueue a -> IO ()
forall a. LocalNode -> SendPortId -> Message -> TQueue a -> IO ()
deliverChan LocalNode
node SendPortId
from Message
msg TQueue a
chan
  where deliverChan :: forall a . LocalNode -> SendPortId -> Message -> TQueue a -> IO ()
        deliverChan :: forall a. LocalNode -> SendPortId -> Message -> TQueue a -> IO ()
deliverChan LocalNode
n SendPortId
p (UnencodedMessage Fingerprint
_ a
raw) TQueue a
chan' = do
            STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue a -> a -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue a
chan' ((a -> a
forall a b. a -> b
unsafeCoerce a
raw) :: a)
            LocalNode -> MxEvent -> IO ()
trace LocalNode
n (SendPortId -> Message -> MxEvent
MxReceivedPort SendPortId
p (Message -> MxEvent) -> Message -> MxEvent
forall a b. (a -> b) -> a -> b
$ a -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage a
raw)
        deliverChan LocalNode
_ SendPortId
_ (EncodedMessage   Fingerprint
_ ByteString
_) TQueue a
_ =
            -- this will not happen unless someone screws with Primitives.hs
            String -> IO ()
forall a. HasCallStack => String -> a
error String
"invalid local channel delivery"

-- [Issue #69]
ncEffectKill :: ProcessId -> ProcessId -> String -> NC ()
ncEffectKill :: ProcessId -> ProcessId -> String -> NC ()
ncEffectKill ProcessId
from ProcessId
to String
reason = do
  LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
  Bool -> NC () -> NC ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
to)) (NC () -> NC ()) -> NC () -> NC ()
forall a b. (a -> b) -> a -> b
$
    ProcessId -> ProcessKillException -> NC ()
forall e. Exception e => ProcessId -> e -> NC ()
throwException ProcessId
to (ProcessKillException -> NC ()) -> ProcessKillException -> NC ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> String -> ProcessKillException
ProcessKillException ProcessId
from String
reason

-- [Issue #69]
ncEffectExit :: ProcessId -> ProcessId -> Message -> NC ()
ncEffectExit :: ProcessId -> ProcessId -> Message -> NC ()
ncEffectExit ProcessId
from ProcessId
to Message
reason = do
  LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
  Bool -> NC () -> NC ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
to)) (NC () -> NC ()) -> NC () -> NC ()
forall a b. (a -> b) -> a -> b
$
    ProcessId -> ProcessExitException -> NC ()
forall e. Exception e => ProcessId -> e -> NC ()
throwException ProcessId
to (ProcessExitException -> NC ()) -> ProcessExitException -> NC ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> Message -> ProcessExitException
ProcessExitException ProcessId
from Message
reason

-- [Issue #89]
ncEffectGetInfo :: ProcessId -> ProcessId -> NC ()
ncEffectGetInfo :: ProcessId -> ProcessId -> NC ()
ncEffectGetInfo ProcessId
from ProcessId
pid =
  let lpid :: LocalProcessId
lpid = ProcessId -> LocalProcessId
processLocalId ProcessId
pid
      them :: Identifier
them = (ProcessId -> Identifier
ProcessIdentifier ProcessId
pid)
  in do
  LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
  Maybe LocalProcess
mProc <- IO (Maybe LocalProcess) -> NC (Maybe LocalProcess)
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe LocalProcess) -> NC (Maybe LocalProcess))
-> IO (Maybe LocalProcess) -> NC (Maybe LocalProcess)
forall a b. (a -> b) -> a -> b
$ LocalNode
-> (ValidLocalNodeState -> IO (Maybe LocalProcess))
-> IO (Maybe LocalProcess)
forall r. LocalNode -> (ValidLocalNodeState -> IO r) -> IO r
withValidLocalState LocalNode
node
                  ((ValidLocalNodeState -> IO (Maybe LocalProcess))
 -> IO (Maybe LocalProcess))
-> (ValidLocalNodeState -> IO (Maybe LocalProcess))
-> IO (Maybe LocalProcess)
forall a b. (a -> b) -> a -> b
$ Maybe LocalProcess -> IO (Maybe LocalProcess)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe LocalProcess -> IO (Maybe LocalProcess))
-> (ValidLocalNodeState -> Maybe LocalProcess)
-> ValidLocalNodeState
-> IO (Maybe LocalProcess)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ValidLocalNodeState
-> Accessor ValidLocalNodeState (Maybe LocalProcess)
-> Maybe LocalProcess
forall r a. r -> T r a -> a
^. LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId LocalProcessId
lpid)
  case Maybe LocalProcess
mProc of
    Maybe LocalProcess
Nothing   -> Bool -> ProcessId -> ProcessInfoNone -> NC ()
forall a. Serializable a => Bool -> ProcessId -> a -> NC ()
dispatch (LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
from))
                          ProcessId
from (DiedReason -> ProcessInfoNone
ProcessInfoNone DiedReason
DiedUnknownId)
    Just LocalProcess
proc -> do
      Set ProcessId
itsLinks    <- ((ProcessId, ()) -> ProcessId)
-> Set (ProcessId, ()) -> Set ProcessId
forall b a. Ord b => (a -> b) -> Set a -> Set b
Set.map (ProcessId, ()) -> ProcessId
forall a b. (a, b) -> a
fst (Set (ProcessId, ()) -> Set ProcessId)
-> (BiMultiMap Identifier ProcessId () -> Set (ProcessId, ()))
-> BiMultiMap Identifier ProcessId ()
-> Set ProcessId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identifier
-> BiMultiMap Identifier ProcessId () -> Set (ProcessId, ())
forall a b v. Ord a => a -> BiMultiMap a b v -> Set (b, v)
BiMultiMap.lookupBy1st Identifier
them (BiMultiMap Identifier ProcessId () -> Set ProcessId)
-> NC (BiMultiMap Identifier ProcessId ()) -> NC (Set ProcessId)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
                       (NCState -> BiMultiMap Identifier ProcessId ())
-> NC (BiMultiMap Identifier ProcessId ())
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (NCState
-> Accessor NCState (BiMultiMap Identifier ProcessId ())
-> BiMultiMap Identifier ProcessId ()
forall r a. r -> T r a -> a
^. Accessor NCState (BiMultiMap Identifier ProcessId ())
links)
      Set (ProcessId, MonitorRef)
itsMons     <- Identifier
-> BiMultiMap Identifier ProcessId MonitorRef
-> Set (ProcessId, MonitorRef)
forall a b v. Ord a => a -> BiMultiMap a b v -> Set (b, v)
BiMultiMap.lookupBy1st Identifier
them (BiMultiMap Identifier ProcessId MonitorRef
 -> Set (ProcessId, MonitorRef))
-> NC (BiMultiMap Identifier ProcessId MonitorRef)
-> NC (Set (ProcessId, MonitorRef))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (NCState -> BiMultiMap Identifier ProcessId MonitorRef)
-> NC (BiMultiMap Identifier ProcessId MonitorRef)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (NCState
-> Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
-> BiMultiMap Identifier ProcessId MonitorRef
forall r a. r -> T r a -> a
^. Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
monitors)
      Map String ProcessId
registered  <- (NCState -> Map String ProcessId) -> NC (Map String ProcessId)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (NCState -> T NCState (Map String ProcessId) -> Map String ProcessId
forall r a. r -> T r a -> a
^. T NCState (Map String ProcessId)
registeredHere)
      Int
size        <- IO Int -> NC Int
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> NC Int) -> IO Int -> NC Int
forall a b. (a -> b) -> a -> b
$ CQueue Message -> IO Int
forall a. CQueue a -> IO Int
queueSize (CQueue Message -> IO Int) -> CQueue Message -> IO Int
forall a b. (a -> b) -> a -> b
$ LocalProcess -> CQueue Message
processQueue (LocalProcess -> CQueue Message) -> LocalProcess -> CQueue Message
forall a b. (a -> b) -> a -> b
$ LocalProcess
proc

      let reg :: [String]
reg = Map String ProcessId -> [String]
forall {a}. Map a ProcessId -> [a]
registeredNames Map String ProcessId
registered
      Bool -> ProcessId -> ProcessInfo -> NC ()
forall a. Serializable a => Bool -> ProcessId -> a -> NC ()
dispatch (LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
from))
               ProcessId
from
               ProcessInfo {
                   infoNode :: NodeId
infoNode               = (ProcessId -> NodeId
processNodeId ProcessId
pid)
                 , infoRegisteredNames :: [String]
infoRegisteredNames    = [String]
reg
                 , infoMessageQueueLength :: Int
infoMessageQueueLength = Int
size
                 , infoMonitors :: [(ProcessId, MonitorRef)]
infoMonitors           = Set (ProcessId, MonitorRef) -> [(ProcessId, MonitorRef)]
forall a. Set a -> [a]
Set.toList Set (ProcessId, MonitorRef)
itsMons
                 , infoLinks :: [ProcessId]
infoLinks              = Set ProcessId -> [ProcessId]
forall a. Set a -> [a]
Set.toList Set ProcessId
itsLinks
                 }
  where dispatch :: (Serializable a)
                 => Bool
                 -> ProcessId
                 -> a
                 -> NC ()
        dispatch :: forall a. Serializable a => Bool -> ProcessId -> a -> NC ()
dispatch Bool
True  ProcessId
dest a
pInfo = ProcessId -> a -> NC ()
forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
dest (a -> NC ()) -> a -> NC ()
forall a b. (a -> b) -> a -> b
$ a
pInfo
        dispatch Bool
False ProcessId
dest a
pInfo =
          ProcessId -> Message -> NC ()
ncSendToProcess ProcessId
dest (Message -> NC ()) -> Message -> NC ()
forall a b. (a -> b) -> a -> b
$ a -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage a
pInfo

        registeredNames :: Map a ProcessId -> [a]
registeredNames = ([a] -> a -> ProcessId -> [a]) -> [a] -> Map a ProcessId -> [a]
forall a k b. (a -> k -> b -> a) -> a -> Map k b -> a
Map.foldlWithKey (\[a]
ks a
k ProcessId
v -> if ProcessId
v ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
pid
                                                 then (a
ka -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
ks)
                                                 else [a]
ks) []

ncEffectGetNodeStats :: ProcessId -> NodeId -> NC ()
ncEffectGetNodeStats :: ProcessId -> NodeId -> NC ()
ncEffectGetNodeStats ProcessId
from NodeId
_nid = do
  LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
  NCState
ncState <- NC NCState
forall s (m :: * -> *). MonadState s m => m s
StateT.get
  ValidLocalNodeState
nodeState <- IO ValidLocalNodeState -> NC ValidLocalNodeState
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ValidLocalNodeState -> NC ValidLocalNodeState)
-> IO ValidLocalNodeState -> NC ValidLocalNodeState
forall a b. (a -> b) -> a -> b
$ LocalNode
-> (ValidLocalNodeState -> IO ValidLocalNodeState)
-> IO ValidLocalNodeState
forall r. LocalNode -> (ValidLocalNodeState -> IO r) -> IO r
withValidLocalState LocalNode
node ValidLocalNodeState -> IO ValidLocalNodeState
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return
  let stats :: NodeStats
stats =
        NodeStats {
            nodeStatsNode :: NodeId
nodeStatsNode = LocalNode -> NodeId
localNodeId LocalNode
node
          , nodeStatsRegisteredNames :: Int
nodeStatsRegisteredNames = Map String ProcessId -> Int
forall k a. Map k a -> Int
Map.size (Map String ProcessId -> Int) -> Map String ProcessId -> Int
forall a b. (a -> b) -> a -> b
$ NCState
ncState NCState -> T NCState (Map String ProcessId) -> Map String ProcessId
forall r a. r -> T r a -> a
^. T NCState (Map String ProcessId)
registeredHere
          , nodeStatsMonitors :: Int
nodeStatsMonitors = BiMultiMap Identifier ProcessId MonitorRef -> Int
forall a b v. BiMultiMap a b v -> Int
BiMultiMap.size (BiMultiMap Identifier ProcessId MonitorRef -> Int)
-> BiMultiMap Identifier ProcessId MonitorRef -> Int
forall a b. (a -> b) -> a -> b
$ NCState
ncState NCState
-> Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
-> BiMultiMap Identifier ProcessId MonitorRef
forall r a. r -> T r a -> a
^. Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
monitors
          , nodeStatsLinks :: Int
nodeStatsLinks = BiMultiMap Identifier ProcessId () -> Int
forall a b v. BiMultiMap a b v -> Int
BiMultiMap.size (BiMultiMap Identifier ProcessId () -> Int)
-> BiMultiMap Identifier ProcessId () -> Int
forall a b. (a -> b) -> a -> b
$ NCState
ncState NCState
-> Accessor NCState (BiMultiMap Identifier ProcessId ())
-> BiMultiMap Identifier ProcessId ()
forall r a. r -> T r a -> a
^. Accessor NCState (BiMultiMap Identifier ProcessId ())
links
          , nodeStatsProcesses :: Int
nodeStatsProcesses = Map LocalProcessId LocalProcess -> Int
forall k a. Map k a -> Int
Map.size (ValidLocalNodeState
nodeState ValidLocalNodeState
-> T ValidLocalNodeState (Map LocalProcessId LocalProcess)
-> Map LocalProcessId LocalProcess
forall r a. r -> T r a -> a
^. T ValidLocalNodeState (Map LocalProcessId LocalProcess)
localProcesses)
          }
  ProcessId -> NodeStats -> NC ()
forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
from NodeStats
stats

--------------------------------------------------------------------------------
-- Auxiliary                                                                  --
--------------------------------------------------------------------------------

notifyDied :: ProcessId         -- ^ Who to notify?
           -> Identifier        -- ^ Who died?
           -> DiedReason        -- ^ How did they die?
           -> Maybe MonitorRef  -- ^ 'Nothing' for linking
           -> NC ()
notifyDied :: ProcessId -> Identifier -> DiedReason -> Maybe MonitorRef -> NC ()
notifyDied ProcessId
dest Identifier
src DiedReason
reason Maybe MonitorRef
mRef = do
  LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
  case (LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
dest), Maybe MonitorRef
mRef, Identifier
src) of
    (Bool
True, Just MonitorRef
ref, ProcessIdentifier ProcessId
pid) ->
      ProcessId -> ProcessMonitorNotification -> NC ()
forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
dest (ProcessMonitorNotification -> NC ())
-> ProcessMonitorNotification -> NC ()
forall a b. (a -> b) -> a -> b
$ MonitorRef -> ProcessId -> DiedReason -> ProcessMonitorNotification
ProcessMonitorNotification MonitorRef
ref ProcessId
pid DiedReason
reason
    (Bool
True, Just MonitorRef
ref, NodeIdentifier NodeId
nid) ->
      ProcessId -> NodeMonitorNotification -> NC ()
forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
dest (NodeMonitorNotification -> NC ())
-> NodeMonitorNotification -> NC ()
forall a b. (a -> b) -> a -> b
$ MonitorRef -> NodeId -> DiedReason -> NodeMonitorNotification
NodeMonitorNotification MonitorRef
ref NodeId
nid DiedReason
reason
    (Bool
True, Just MonitorRef
ref, SendPortIdentifier SendPortId
cid) ->
      ProcessId -> PortMonitorNotification -> NC ()
forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
dest (PortMonitorNotification -> NC ())
-> PortMonitorNotification -> NC ()
forall a b. (a -> b) -> a -> b
$ MonitorRef -> SendPortId -> DiedReason -> PortMonitorNotification
PortMonitorNotification MonitorRef
ref SendPortId
cid DiedReason
reason
    (Bool
True, Maybe MonitorRef
Nothing, ProcessIdentifier ProcessId
pid) ->
      ProcessId -> ProcessLinkException -> NC ()
forall e. Exception e => ProcessId -> e -> NC ()
throwException ProcessId
dest (ProcessLinkException -> NC ()) -> ProcessLinkException -> NC ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> DiedReason -> ProcessLinkException
ProcessLinkException ProcessId
pid DiedReason
reason
    (Bool
True, Maybe MonitorRef
Nothing, NodeIdentifier NodeId
pid) ->
      ProcessId -> NodeLinkException -> NC ()
forall e. Exception e => ProcessId -> e -> NC ()
throwException ProcessId
dest (NodeLinkException -> NC ()) -> NodeLinkException -> NC ()
forall a b. (a -> b) -> a -> b
$ NodeId -> DiedReason -> NodeLinkException
NodeLinkException NodeId
pid DiedReason
reason
    (Bool
True, Maybe MonitorRef
Nothing, SendPortIdentifier SendPortId
pid) ->
      ProcessId -> PortLinkException -> NC ()
forall e. Exception e => ProcessId -> e -> NC ()
throwException ProcessId
dest (PortLinkException -> NC ()) -> PortLinkException -> NC ()
forall a b. (a -> b) -> a -> b
$ SendPortId -> DiedReason -> PortLinkException
PortLinkException SendPortId
pid DiedReason
reason
    (Bool
False, Maybe MonitorRef
_, Identifier
_) ->
      -- The change in sender comes from [Unified: Table 10]
      NodeId -> NCMsg -> NC ()
ncSendToNode (ProcessId -> NodeId
processNodeId ProcessId
dest) (NCMsg -> NC ()) -> NCMsg -> NC ()
forall a b. (a -> b) -> a -> b
$ NCMsg
        { ctrlMsgSender :: Identifier
ctrlMsgSender = NodeId -> Identifier
NodeIdentifier (LocalNode -> NodeId
localNodeId LocalNode
node)
        , ctrlMsgSignal :: ProcessSignal
ctrlMsgSignal = Identifier -> DiedReason -> ProcessSignal
Died Identifier
src DiedReason
reason
        }

-- | [Unified: Table 8]
destNid :: ProcessSignal -> Maybe NodeId
destNid :: ProcessSignal -> Maybe NodeId
destNid (Link Identifier
ident)          = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just (NodeId -> Maybe NodeId) -> NodeId -> Maybe NodeId
forall a b. (a -> b) -> a -> b
$ Identifier -> NodeId
nodeOf Identifier
ident
destNid (Unlink Identifier
ident)        = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just (NodeId -> Maybe NodeId) -> NodeId -> Maybe NodeId
forall a b. (a -> b) -> a -> b
$ Identifier -> NodeId
nodeOf Identifier
ident
destNid (Monitor MonitorRef
ref)         = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just (NodeId -> Maybe NodeId) -> NodeId -> Maybe NodeId
forall a b. (a -> b) -> a -> b
$ Identifier -> NodeId
nodeOf (MonitorRef -> Identifier
monitorRefIdent MonitorRef
ref)
destNid (Unmonitor MonitorRef
ref)       = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just (NodeId -> Maybe NodeId) -> NodeId -> Maybe NodeId
forall a b. (a -> b) -> a -> b
$ Identifier -> NodeId
nodeOf (MonitorRef -> Identifier
monitorRefIdent MonitorRef
ref)
destNid (Spawn Closure (Process ())
_ SpawnRef
_)           = Maybe NodeId
forall a. Maybe a
Nothing
destNid (Register String
_ NodeId
_ Maybe ProcessId
_ Bool
_)    = Maybe NodeId
forall a. Maybe a
Nothing
destNid (WhereIs String
_)           = Maybe NodeId
forall a. Maybe a
Nothing
destNid (NamedSend String
_ Message
_)       = Maybe NodeId
forall a. Maybe a
Nothing
destNid (UnreliableSend LocalProcessId
_ Message
_)  = Maybe NodeId
forall a. Maybe a
Nothing
-- We don't need to forward 'Died' signals; if monitoring/linking is setup,
-- then when a local process dies the monitoring/linking machinery will take
-- care of notifying remote nodes
destNid (Died Identifier
_ DiedReason
_)            = Maybe NodeId
forall a. Maybe a
Nothing
destNid (Kill ProcessId
pid String
_)          = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just (NodeId -> Maybe NodeId) -> NodeId -> Maybe NodeId
forall a b. (a -> b) -> a -> b
$ ProcessId -> NodeId
processNodeId ProcessId
pid
destNid (Exit ProcessId
pid Message
_)          = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just (NodeId -> Maybe NodeId) -> NodeId -> Maybe NodeId
forall a b. (a -> b) -> a -> b
$ ProcessId -> NodeId
processNodeId ProcessId
pid
destNid (GetInfo ProcessId
pid)         = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just (NodeId -> Maybe NodeId) -> NodeId -> Maybe NodeId
forall a b. (a -> b) -> a -> b
$ ProcessId -> NodeId
processNodeId ProcessId
pid
destNid (GetNodeStats NodeId
nid)    = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid
destNid (LocalSend ProcessId
pid Message
_)     = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just (NodeId -> Maybe NodeId) -> NodeId -> Maybe NodeId
forall a b. (a -> b) -> a -> b
$ ProcessId -> NodeId
processNodeId ProcessId
pid
destNid (LocalPortSend SendPortId
cid Message
_) = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just (NodeId -> Maybe NodeId) -> NodeId -> Maybe NodeId
forall a b. (a -> b) -> a -> b
$ ProcessId -> NodeId
processNodeId (SendPortId -> ProcessId
sendPortProcessId SendPortId
cid)
destNid (ProcessSignal
SigShutdown)       = Maybe NodeId
forall a. Maybe a
Nothing

-- | Check if a process is local to our own node
isLocal :: LocalNode -> Identifier -> Bool
isLocal :: LocalNode -> Identifier -> Bool
isLocal LocalNode
nid Identifier
ident = Identifier -> NodeId
nodeOf Identifier
ident NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== LocalNode -> NodeId
localNodeId LocalNode
nid

-- | Lookup a local closure
unClosure :: Typeable a => Closure a -> NC (Either String a)
unClosure :: forall a. Typeable a => Closure a -> NC (Either String a)
unClosure Closure a
closure = do
  RemoteTable
rtable <- LocalNode -> RemoteTable
remoteTable (LocalNode -> RemoteTable) -> NC LocalNode -> NC RemoteTable
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
  Either String a -> NC (Either String a)
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return (RemoteTable -> Closure a -> Either String a
forall a. Typeable a => RemoteTable -> Closure a -> Either String a
Static.unclosure RemoteTable
rtable Closure a
closure)

-- | Check if an identifier refers to a valid local object
isValidLocalIdentifier :: Identifier -> NC Bool
isValidLocalIdentifier :: Identifier -> NC Bool
isValidLocalIdentifier Identifier
ident = do
  LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
  IO Bool -> NC Bool
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> NC Bool)
-> ((ValidLocalNodeState -> IO Bool) -> IO Bool)
-> (ValidLocalNodeState -> IO Bool)
-> NC Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalNode -> (ValidLocalNodeState -> IO Bool) -> IO Bool
forall r. LocalNode -> (ValidLocalNodeState -> IO r) -> IO r
withValidLocalState LocalNode
node ((ValidLocalNodeState -> IO Bool) -> NC Bool)
-> (ValidLocalNodeState -> IO Bool) -> NC Bool
forall a b. (a -> b) -> a -> b
$ \ValidLocalNodeState
nSt ->
    case Identifier
ident of
      NodeIdentifier NodeId
nid ->
        Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== LocalNode -> NodeId
localNodeId LocalNode
node
      ProcessIdentifier ProcessId
pid -> do
        let mProc :: Maybe LocalProcess
mProc = ValidLocalNodeState
nSt ValidLocalNodeState
-> Accessor ValidLocalNodeState (Maybe LocalProcess)
-> Maybe LocalProcess
forall r a. r -> T r a -> a
^. LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId (ProcessId -> LocalProcessId
processLocalId ProcessId
pid)
        Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Maybe LocalProcess -> Bool
forall a. Maybe a -> Bool
isJust Maybe LocalProcess
mProc
      SendPortIdentifier SendPortId
cid -> do
        let pid :: ProcessId
pid   = SendPortId -> ProcessId
sendPortProcessId SendPortId
cid
            mProc :: Maybe LocalProcess
mProc = ValidLocalNodeState
nSt ValidLocalNodeState
-> Accessor ValidLocalNodeState (Maybe LocalProcess)
-> Maybe LocalProcess
forall r a. r -> T r a -> a
^. LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId (ProcessId -> LocalProcessId
processLocalId ProcessId
pid)
        case Maybe LocalProcess
mProc of
          Maybe LocalProcess
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
          Just LocalProcess
proc -> StrictMVar LocalProcessState
-> (LocalProcessState -> IO Bool) -> IO Bool
forall a b. StrictMVar a -> (a -> IO b) -> IO b
withMVar (LocalProcess -> StrictMVar LocalProcessState
processState LocalProcess
proc) ((LocalProcessState -> IO Bool) -> IO Bool)
-> (LocalProcessState -> IO Bool) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \LocalProcessState
pSt -> do
            let mCh :: Maybe TypedChannel
mCh = LocalProcessState
pSt LocalProcessState
-> T LocalProcessState (Maybe TypedChannel) -> Maybe TypedChannel
forall r a. r -> T r a -> a
^. Int32 -> T LocalProcessState (Maybe TypedChannel)
typedChannelWithId (SendPortId -> Int32
sendPortLocalId SendPortId
cid)
            Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Maybe TypedChannel -> Bool
forall a. Maybe a -> Bool
isJust Maybe TypedChannel
mCh

--------------------------------------------------------------------------------
-- Messages to local processes                                                --
--------------------------------------------------------------------------------

postAsMessage :: Serializable a => ProcessId -> a -> NC ()
postAsMessage :: forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
pid = ProcessId -> Message -> NC ()
postMessage ProcessId
pid (Message -> NC ()) -> (a -> Message) -> a -> NC ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Message
forall a. Serializable a => a -> Message
createUnencodedMessage

postMessage :: ProcessId -> Message -> NC ()
postMessage :: ProcessId -> Message -> NC ()
postMessage ProcessId
pid Message
msg = do
  LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
  IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> ProcessId -> (LocalProcess -> IO ()) -> IO ()
withLocalProc LocalNode
node ProcessId
pid ((LocalProcess -> IO ()) -> IO ())
-> (LocalProcess -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \LocalProcess
p -> CQueue Message -> Message -> IO ()
forall a. CQueue a -> a -> IO ()
enqueue (LocalProcess -> CQueue Message
processQueue LocalProcess
p) Message
msg

throwException :: Exception e => ProcessId -> e -> NC ()
throwException :: forall e. Exception e => ProcessId -> e -> NC ()
throwException ProcessId
pid e
e = do
  LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
  -- throwTo blocks until the exception is received by the target thread.
  -- We cannot easily make it happen asynchronpusly because then 'unlink'
  -- semantics would break.
  IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> ProcessId -> (LocalProcess -> IO ()) -> IO ()
withLocalProc LocalNode
node ProcessId
pid ((LocalProcess -> IO ()) -> IO ())
-> (LocalProcess -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \LocalProcess
p -> ThreadId -> e -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo (LocalProcess -> ThreadId
processThread LocalProcess
p) e
e

withLocalProc :: LocalNode -> ProcessId -> (LocalProcess -> IO ()) -> IO ()
withLocalProc :: LocalNode -> ProcessId -> (LocalProcess -> IO ()) -> IO ()
withLocalProc LocalNode
node ProcessId
pid LocalProcess -> IO ()
p =
  -- By [Unified: table 6, rule missing_process] messages to dead processes
  -- can silently be dropped
  let lpid :: LocalProcessId
lpid = ProcessId -> LocalProcessId
processLocalId ProcessId
pid in do
  IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> (ValidLocalNodeState -> IO (IO ())) -> IO (IO ())
forall r. LocalNode -> (ValidLocalNodeState -> IO r) -> IO r
withValidLocalState LocalNode
node ((ValidLocalNodeState -> IO (IO ())) -> IO (IO ()))
-> (ValidLocalNodeState -> IO (IO ())) -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ \ValidLocalNodeState
vst ->
    IO () -> IO (IO ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> IO (IO ())) -> IO () -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ Maybe LocalProcess -> (LocalProcess -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ValidLocalNodeState
vst ValidLocalNodeState
-> Accessor ValidLocalNodeState (Maybe LocalProcess)
-> Maybe LocalProcess
forall r a. r -> T r a -> a
^. LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId LocalProcessId
lpid) LocalProcess -> IO ()
p

--------------------------------------------------------------------------------
-- Accessors                                                                  --
--------------------------------------------------------------------------------

links :: Accessor NCState (BiMultiMap Identifier ProcessId ())
links :: Accessor NCState (BiMultiMap Identifier ProcessId ())
links = (NCState -> BiMultiMap Identifier ProcessId ())
-> (BiMultiMap Identifier ProcessId () -> NCState -> NCState)
-> Accessor NCState (BiMultiMap Identifier ProcessId ())
forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor NCState -> BiMultiMap Identifier ProcessId ()
_links (\BiMultiMap Identifier ProcessId ()
ls NCState
st -> NCState
st { _links = ls })

monitors :: Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
monitors :: Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
monitors = (NCState -> BiMultiMap Identifier ProcessId MonitorRef)
-> (BiMultiMap Identifier ProcessId MonitorRef
    -> NCState -> NCState)
-> Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor NCState -> BiMultiMap Identifier ProcessId MonitorRef
_monitors (\BiMultiMap Identifier ProcessId MonitorRef
ms NCState
st -> NCState
st { _monitors = ms })

registeredHere :: Accessor NCState (Map String ProcessId)
registeredHere :: T NCState (Map String ProcessId)
registeredHere = (NCState -> Map String ProcessId)
-> (Map String ProcessId -> NCState -> NCState)
-> T NCState (Map String ProcessId)
forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor NCState -> Map String ProcessId
_registeredHere (\Map String ProcessId
ry NCState
st -> NCState
st { _registeredHere = ry })

registeredOnNodes :: Accessor NCState (Map ProcessId [(NodeId, Int)])
registeredOnNodes :: T NCState (Map ProcessId [(NodeId, Int)])
registeredOnNodes = (NCState -> Map ProcessId [(NodeId, Int)])
-> (Map ProcessId [(NodeId, Int)] -> NCState -> NCState)
-> T NCState (Map ProcessId [(NodeId, Int)])
forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor NCState -> Map ProcessId [(NodeId, Int)]
_registeredOnNodes (\Map ProcessId [(NodeId, Int)]
ry NCState
st -> NCState
st { _registeredOnNodes = ry })

registeredHereFor :: String -> Accessor NCState (Maybe ProcessId)
registeredHereFor :: String -> T NCState (Maybe ProcessId)
registeredHereFor String
ident = T NCState (Map String ProcessId)
registeredHere T NCState (Map String ProcessId)
-> T (Map String ProcessId) (Maybe ProcessId)
-> T NCState (Maybe ProcessId)
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> String -> T (Map String ProcessId) (Maybe ProcessId)
forall key elem.
Ord key =>
key -> Accessor (Map key elem) (Maybe elem)
DAC.mapMaybe String
ident

registeredOnNodesFor :: ProcessId -> Accessor NCState (Maybe [(NodeId,Int)])
registeredOnNodesFor :: ProcessId -> Accessor NCState (Maybe [(NodeId, Int)])
registeredOnNodesFor ProcessId
ident = T NCState (Map ProcessId [(NodeId, Int)])
registeredOnNodes T NCState (Map ProcessId [(NodeId, Int)])
-> T (Map ProcessId [(NodeId, Int)]) (Maybe [(NodeId, Int)])
-> Accessor NCState (Maybe [(NodeId, Int)])
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> ProcessId
-> T (Map ProcessId [(NodeId, Int)]) (Maybe [(NodeId, Int)])
forall key elem.
Ord key =>
key -> Accessor (Map key elem) (Maybe elem)
DAC.mapMaybe ProcessId
ident

-- | @splitNotif ident@ splits a notifications map into those
-- notifications that should trigger when 'ident' fails and those links that
-- should not.
--
-- There is a hierarchy between identifiers: failure of a node implies failure
-- of all processes on that node, and failure of a process implies failure of
-- all typed channels to that process. In other words, if 'ident' refers to a
-- node, then the /should trigger/ set will include
--
-- * the notifications for the node specifically
-- * the notifications for processes on that node, and
-- * the notifications for typed channels to processes on that node.
--
-- Similarly, if 'ident' refers to a process, the /should trigger/ set will
-- include
--
-- * the notifications for that process specifically and
-- * the notifications for typed channels to that process.
--
-- See https://github.com/haskell/containers/issues/14 for the bang on _v.
splitNotif :: (Ord a, Ord v)
           => Identifier
           -> BiMultiMap Identifier a v
           -> (Map Identifier (Set (a,v)), BiMultiMap Identifier a v)
splitNotif :: forall a v.
(Ord a, Ord v) =>
Identifier
-> BiMultiMap Identifier a v
-> (Map Identifier (Set (a, v)), BiMultiMap Identifier a v)
splitNotif Identifier
ident =
    (Identifier -> Set (a, v) -> Bool)
-> BiMultiMap Identifier a v
-> (Map Identifier (Set (a, v)), BiMultiMap Identifier a v)
forall a b v.
(Ord a, Ord b, Ord v) =>
(a -> Set (b, v) -> Bool)
-> BiMultiMap a b v -> (Map a (Set (b, v)), BiMultiMap a b v)
BiMultiMap.partitionWithKeyBy1st (\Identifier
k !Set (a, v)
_v -> Identifier
ident Identifier -> Identifier -> Bool
`impliesDeathOf` Identifier
k)

--------------------------------------------------------------------------------
-- Auxiliary                                                                  --
--------------------------------------------------------------------------------

-- | Modify and evaluate the state
modify' :: MonadState s m => (s -> s) -> m ()
modify' :: forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' s -> s
f = m s
forall s (m :: * -> *). MonadState s m => m s
StateT.get m s -> (s -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \s
s -> s -> m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
StateT.put (s -> m ()) -> s -> m ()
forall a b. (a -> b) -> a -> b
$! s -> s
f s
s