module Control.Distributed.Process.Internal.Primitives
  ( 
    send
  , usend
  , expect
    
  , newChan
  , sendChan
  , receiveChan
  , mergePortsBiased
  , mergePortsRR
    
  , unsafeSend
  , unsafeUSend
  , unsafeSendChan
  , unsafeNSend
  , unsafeNSendRemote
    
  , Match
  , receiveWait
  , receiveTimeout
  , match
  , matchIf
  , matchUnknown
  , matchAny
  , matchAnyIf
  , matchChan
  , matchSTM
  , matchMessage
  , matchMessageIf
  , isEncoded
  , wrapMessage
  , unsafeWrapMessage
  , unwrapMessage
  , handleMessage
  , handleMessageIf
  , handleMessage_
  , handleMessageIf_
  , forward
  , uforward
  , delegate
  , relay
  , proxy
    
  , terminate
  , ProcessTerminationException(..)
  , die
  , kill
  , exit
  , catchExit
  , catchesExit
    
    
  , ProcessExitException()
  , getSelfPid
  , getSelfNode
  , ProcessInfo(..)
  , getProcessInfo
  , NodeStats(..)
  , getNodeStats
  , getLocalNodeStats
    
  , link
  , unlink
  , monitor
  , unmonitor
  , withMonitor
    
  , say
    
  , register
  , reregister
  , unregister
  , whereis
  , nsend
  , registerRemoteAsync
  , reregisterRemoteAsync
  , unregisterRemoteAsync
  , whereisRemoteAsync
  , nsendRemote
    
  , unClosure
  , unStatic
    
  , catch
  , Handler(..)
  , catches
  , try
  , mask
  , mask_
  , onException
  , bracket
  , bracket_
  , finally
    
  , expectTimeout
  , receiveChanTimeout
  , spawnAsync
  , linkNode
  , linkPort
  , unlinkNode
  , unlinkPort
  , monitorNode
  , monitorPort
    
  , reconnect
  , reconnectPort
    
  , sendCtrlMsg
  ) where
#if ! MIN_VERSION_base(4,6,0)
import Prelude hiding (catch)
#endif
import Data.Binary (decode)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Format (formatTime)
#if MIN_VERSION_time(1,5,0)
import Data.Time.Format (defaultTimeLocale)
#else
import System.Locale (defaultTimeLocale)
#endif
import System.Timeout (timeout)
import Control.Monad (when)
import Control.Monad.Reader (ask)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Catch
  ( Exception
  , SomeException
  , throwM
  , fromException
  )
import qualified Control.Monad.Catch as Catch
import Control.Applicative
import Control.Distributed.Process.Internal.StrictMVar
  ( StrictMVar
  , modifyMVar
  , modifyMVar_
  )
import Control.Concurrent.STM
  ( STM
  , TVar
  , atomically
  , orElse
  , newTVar
  , readTVar
  , writeTVar
  )
import Control.Distributed.Process.Internal.CQueue
  ( dequeue
  , BlockSpec(..)
  , MatchOn(..)
  )
import Control.Distributed.Process.Serializable (Serializable, fingerprint)
import Data.Accessor ((^.), (^:), (^=))
import Control.Distributed.Static
  ( Static
  , Closure
  )
import Data.Rank1Typeable (Typeable)
import qualified Control.Distributed.Static as Static (unstatic, unclosure)
import qualified Control.Distributed.Process.UnsafePrimitives as Unsafe
import Control.Distributed.Process.Internal.Types
  ( NodeId(..)
  , ProcessId(..)
  , LocalNode(..)
  , LocalProcess(..)
  , Process(..)
  , Message(..)
  , MonitorRef(..)
  , SpawnRef(..)
  , ProcessSignal(..)
  , NodeMonitorNotification(..)
  , monitorCounter
  , spawnCounter
  , SendPort(..)
  , ReceivePort(..)
  , channelCounter
  , typedChannelWithId
  , TypedChannel(..)
  , SendPortId(..)
  , Identifier(..)
  , ProcessExitException(..)
  , DiedReason(..)
  , DidUnmonitor(..)
  , DidUnlinkProcess(..)
  , DidUnlinkNode(..)
  , DidUnlinkPort(..)
  , WhereIsReply(..)
  , RegisterReply(..)
  , ProcessRegistrationException(..)
  , ProcessInfo(..)
  , ProcessInfoNone(..)
  , NodeStats(..)
  , isEncoded
  , createMessage
  , createUnencodedMessage
  , ImplicitReconnect( NoImplicitReconnect)
  , LocalProcessState
  , LocalSendPortId
  , messageToPayload
  )
import Control.Distributed.Process.Internal.Messaging
  ( sendMessage
  , sendBinary
  , sendPayload
  , disconnect
  , sendCtrlMsg
  )
import Control.Distributed.Process.Management.Internal.Types
  ( MxEvent(..)
  )
import Control.Distributed.Process.Management.Internal.Trace.Types
  ( traceEvent
  )
import Control.Distributed.Process.Internal.WeakTQueue
  ( newTQueueIO
  , readTQueue
  , mkWeakTQueue
  )
import Prelude
import Unsafe.Coerce
send :: Serializable a => ProcessId -> a -> Process ()
send them msg = do
  proc <- ask
  let us       = processId proc
      node     = processNode proc
      nodeId   = localNodeId node
      destNode = (processNodeId them) in do
  case destNode == nodeId of
    True  -> sendLocal them msg
    False -> liftIO $ sendMessage (processNode proc)
                                  (ProcessIdentifier (processId proc))
                                  (ProcessIdentifier them)
                                  NoImplicitReconnect
                                  msg
  
  
  liftIO $ traceEvent (localEventBus node)
                      (MxSent them us (createUnencodedMessage msg))
unsafeSend :: Serializable a => ProcessId -> a -> Process ()
unsafeSend = Unsafe.send
usend :: Serializable a => ProcessId -> a -> Process ()
usend them msg = do
    here <- getSelfNode
    let there = processNodeId them
    if here == there
      then sendLocal them msg
      else sendCtrlMsg (Just there) $ UnreliableSend (processLocalId them)
                                                     (createMessage msg)
unsafeUSend :: Serializable a => ProcessId -> a -> Process ()
unsafeUSend = Unsafe.usend
expect :: forall a. Serializable a => Process a
expect = receiveWait [match return]
newChan :: Serializable a => Process (SendPort a, ReceivePort a)
newChan = do
    proc <- ask
    liftIO . modifyMVar (processState proc) $ \st -> do
      let lcid  = st ^. channelCounter
      let cid   = SendPortId { sendPortProcessId = processId proc
                             , sendPortLocalId   = lcid
                             }
      let sport = SendPort cid
      chan  <- liftIO newTQueueIO
      chan' <- mkWeakTQueue chan $ finalizer (processState proc) lcid
      let rport = ReceivePort $ readTQueue chan
      let tch   = TypedChannel chan'
      return ( (channelCounter ^: (+ 1))
             . (typedChannelWithId lcid ^= Just tch)
             $ st
             , (sport, rport)
             )
  where
    finalizer :: StrictMVar LocalProcessState -> LocalSendPortId -> IO ()
    finalizer st lcid = modifyMVar_ st $
      return . (typedChannelWithId lcid ^= Nothing)
sendChan :: Serializable a => SendPort a -> a -> Process ()
sendChan (SendPort cid) msg = do
  proc <- ask
  let node     = localNodeId (processNode proc)
      destNode = processNodeId (sendPortProcessId cid) in do
  case destNode == node of
    True  -> sendChanLocal cid msg
    False -> do
      liftIO $ sendBinary (processNode proc)
                          (ProcessIdentifier (processId proc))
                          (SendPortIdentifier cid)
                          NoImplicitReconnect
                          msg
unsafeSendChan :: Serializable a => SendPort a -> a -> Process ()
unsafeSendChan = Unsafe.sendChan
receiveChan :: Serializable a => ReceivePort a -> Process a
receiveChan = liftIO . atomically . receiveSTM
receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout 0 ch = liftIO . atomically $
  (Just <$> receiveSTM ch) `orElse` return Nothing
receiveChanTimeout n ch = liftIO . timeout n . atomically $
  receiveSTM ch
mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsBiased = return . ReceivePort. foldr1 orElse . map receiveSTM
mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsRR = \ps -> do
    psVar <- liftIO . atomically $ newTVar (map receiveSTM ps)
    return $ ReceivePort (rr psVar)
  where
    rotate :: [a] -> [a]
    rotate []     = []
    rotate (x:xs) = xs ++ [x]
    rr :: TVar [STM a] -> STM a
    rr psVar = do
      ps <- readTVar psVar
      a  <- foldr1 orElse ps
      writeTVar psVar (rotate ps)
      return a
newtype Match b = Match { unMatch :: MatchOn Message (Process b) }
                  deriving (Functor)
receiveWait :: [Match b] -> Process b
receiveWait ms = do
  queue <- processQueue <$> ask
  Just proc <- liftIO $ dequeue queue Blocking (map unMatch ms)
  proc
receiveTimeout :: Int -> [Match b] -> Process (Maybe b)
receiveTimeout t ms = do
  queue <- processQueue <$> ask
  let blockSpec = if t == 0 then NonBlocking else Timeout t
  mProc <- liftIO $ dequeue queue blockSpec (map unMatch ms)
  case mProc of
    Nothing   -> return Nothing
    Just proc -> Just <$> proc
matchChan :: ReceivePort a -> (a -> Process b) -> Match b
matchChan p fn = Match $ MatchChan (fmap fn (receiveSTM p))
matchSTM :: STM a -> (a -> Process b) -> Match b
matchSTM stm fn = Match $ MatchChan (fmap fn stm)
match :: forall a b. Serializable a => (a -> Process b) -> Match b
match = matchIf (const True)
matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
matchIf c p = Match $ MatchMsg $ \msg ->
  case messageFingerprint msg == fingerprint (undefined :: a) of
    False -> Nothing
    True  -> case msg of
      (UnencodedMessage _ m) ->
        let m' = unsafeCoerce m :: a in
        case (c m') of
          True  -> Just (p m')
          False -> Nothing
      (EncodedMessage _ _) ->
        if (c decoded) then Just (p decoded) else Nothing
        where
          decoded :: a
            
            
            
          !decoded = decode (messageEncoding msg)
matchMessage :: (Message -> Process Message) -> Match Message
matchMessage p = Match $ MatchMsg $ \msg -> Just (p msg)
matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message
matchMessageIf c p = Match $ MatchMsg $ \msg ->
    case (c msg) of
      True  -> Just (p msg)
      False -> Nothing
forward :: Message -> ProcessId -> Process ()
forward msg them = do
  proc <- ask
  let node     = processNode proc
      us       = processId proc
      nid      = localNodeId node
      destNode = (processNodeId them) in do
  case destNode == nid of
    True  -> sendCtrlMsg Nothing (LocalSend them msg)
    False -> liftIO $ sendPayload (processNode proc)
                                  (ProcessIdentifier (processId proc))
                                  (ProcessIdentifier them)
                                  NoImplicitReconnect
                                  (messageToPayload msg)
  
  
  liftIO $ traceEvent (localEventBus node)
                      (MxSent them us msg)
uforward :: Message -> ProcessId -> Process ()
uforward msg them = do
  proc <- ask
  let node     = processNode proc
      us       = processId proc
      nid      = localNodeId node
      destNode = (processNodeId them) in do
  case destNode == nid of
    True  -> sendCtrlMsg Nothing (LocalSend them msg)
    False -> sendCtrlMsg (Just destNode) $ UnreliableSend (processLocalId them)
                                                          msg
  
  
  liftIO $ traceEvent (localEventBus node)
                      (MxSent them us msg)
wrapMessage :: Serializable a => a -> Message
wrapMessage = createUnencodedMessage 
unsafeWrapMessage :: Serializable a => a -> Message
unsafeWrapMessage = Unsafe.wrapMessage
unwrapMessage :: forall m a. (Monad m, Serializable a) => Message -> m (Maybe a)
unwrapMessage msg =
  case messageFingerprint msg == fingerprint (undefined :: a) of
    False -> return Nothing :: m (Maybe a)
    True  -> case msg of
      (UnencodedMessage _ ms) ->
        let ms' = unsafeCoerce ms :: a
        in return (Just ms')
      (EncodedMessage _ _) ->
        return (Just (decoded))
        where
          decoded :: a 
          !decoded = decode (messageEncoding msg)
handleMessage :: forall m a b. (Monad m, Serializable a)
              => Message -> (a -> m b) -> m (Maybe b)
handleMessage msg proc = handleMessageIf msg (const True) proc
handleMessageIf :: forall m a b . (Monad m, Serializable a)
                => Message
                -> (a -> Bool)
                -> (a -> m b)
                -> m (Maybe b)
handleMessageIf msg c proc = do
  case messageFingerprint msg == fingerprint (undefined :: a) of
    False -> return Nothing :: m (Maybe b)
    True  -> case msg of
      (UnencodedMessage _ ms) ->
        let ms' = unsafeCoerce ms :: a in
        case (c ms') of
          True  -> do { r <- proc ms'; return (Just r) }
          False -> return Nothing :: m (Maybe b)
      (EncodedMessage _ _) ->
        case (c decoded) of
          True  -> do { r <- proc (decoded :: a); return (Just r) }
          False -> return Nothing :: m (Maybe b)
        where
          decoded :: a 
          !decoded = decode (messageEncoding msg)
handleMessage_ :: forall m a . (Monad m, Serializable a)
               => Message -> (a -> m ()) -> m ()
handleMessage_ msg proc = handleMessageIf_ msg (const True) proc
handleMessageIf_ :: forall m a . (Monad m, Serializable a)
                => Message
                -> (a -> Bool)
                -> (a -> m ())
                -> m ()
handleMessageIf_ msg c proc = handleMessageIf msg c proc >> return ()
matchAny :: forall b. (Message -> Process b) -> Match b
matchAny p = Match $ MatchMsg $ \msg -> Just (p msg)
matchAnyIf :: forall a b. (Serializable a)
                       => (a -> Bool)
                       -> (Message -> Process b)
                       -> Match b
matchAnyIf c p = Match $ MatchMsg $ \msg ->
  case messageFingerprint msg == fingerprint (undefined :: a) of
     True | check -> Just (p msg)
       where
         check :: Bool
         !check =
           case msg of
             (EncodedMessage _ _)    -> c decoded
             (UnencodedMessage _ m') -> c (unsafeCoerce m')
         decoded :: a 
         !decoded = decode (messageEncoding msg)
     _ -> Nothing
matchUnknown :: Process b -> Match b
matchUnknown p = Match $ MatchMsg (const (Just p))
delegate :: ProcessId -> (Message -> Bool) -> Process ()
delegate pid p = do
  receiveWait [
      matchAny (\m -> case (p m) of
                        True  -> forward m pid
                        False -> return ())
    ]
  delegate pid p
relay :: ProcessId -> Process ()
relay !pid = receiveWait [ matchAny (\m -> forward m pid) ] >> relay pid
proxy :: Serializable a => ProcessId -> (a -> Process Bool) -> Process ()
proxy pid proc = do
  receiveWait [
      matchAny (\m -> do
                   next <- handleMessage m proc
                   case next of
                     Just True  -> forward m pid
                     Just False -> return ()  
                     Nothing    -> return ()) 
    ]
  proxy pid proc
data ProcessTerminationException = ProcessTerminationException
  deriving (Show, Typeable)
instance Exception ProcessTerminationException
terminate :: Process a
terminate = throwM ProcessTerminationException
die :: Serializable a => a -> Process b
die reason = do
  pid <- getSelfPid
  throwM (ProcessExitException pid (createMessage reason))
kill :: ProcessId -> String -> Process ()
kill them reason = sendCtrlMsg Nothing (Kill them reason)
exit :: Serializable a => ProcessId -> a -> Process ()
exit them reason = sendCtrlMsg Nothing (Exit them (createMessage reason))
catchExit :: forall a b . (Show a, Serializable a)
                       => Process b
                       -> (ProcessId -> a -> Process b)
                       -> Process b
catchExit act exitHandler = Catch.catch act handleExit
  where
    handleExit ex@(ProcessExitException from msg) =
        if messageFingerprint msg == fingerprint (undefined :: a)
          then exitHandler from decoded
          else throwM ex
     where
       decoded :: a
       
       
       !decoded = decode (messageEncoding msg)
catchesExit :: Process b
            -> [(ProcessId -> Message -> (Process (Maybe b)))]
            -> Process b
catchesExit act handlers = Catch.catch act ((flip handleExit) handlers)
  where
    handleExit :: ProcessExitException
               -> [(ProcessId -> Message -> Process (Maybe b))]
               -> Process b
    handleExit ex [] = throwM ex
    handleExit ex@(ProcessExitException from msg) (h:hs) = do
      r <- h from msg
      case r of
        Nothing -> handleExit ex hs
        Just p  -> return p
getSelfPid :: Process ProcessId
getSelfPid = processId <$> ask
getSelfNode :: Process NodeId
getSelfNode = localNodeId . processNode <$> ask
getNodeStats :: NodeId -> Process (Either DiedReason NodeStats)
getNodeStats nid = do
    selfNode <- getSelfNode
    if nid == selfNode
      then Right `fmap` getLocalNodeStats 
      else getNodeStatsRemote selfNode
  where
    getNodeStatsRemote :: NodeId -> Process (Either DiedReason NodeStats)
    getNodeStatsRemote selfNode = do
        sendCtrlMsg (Just nid) $ GetNodeStats selfNode
        bracket (monitorNode nid) unmonitor $ \mRef ->
            receiveWait [ match (\(stats :: NodeStats) -> return $ Right stats)
                        , matchIf (\(NodeMonitorNotification ref _ _) -> ref == mRef)
                                  (\(NodeMonitorNotification _ _ dr) -> return $ Left dr)
                        ]
getLocalNodeStats :: Process NodeStats
getLocalNodeStats = do
  self <- getSelfNode
  sendCtrlMsg Nothing $ GetNodeStats self
  receiveWait [ match (\(stats :: NodeStats) -> return stats) ]
getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo)
getProcessInfo pid =
  let them = processNodeId pid in do
  us <- getSelfNode
  dest <- mkNode them us
  sendCtrlMsg dest $ GetInfo pid
  receiveWait [
       match (\(p :: ProcessInfo)     -> return $ Just p)
     , match (\(_ :: ProcessInfoNone) -> return Nothing)
     ]
  where mkNode :: NodeId -> NodeId -> Process (Maybe NodeId)
        mkNode them us = case them == us of
                           True -> return Nothing
                           _    -> return $ Just them
link :: ProcessId -> Process ()
link = sendCtrlMsg Nothing . Link . ProcessIdentifier
monitor :: ProcessId -> Process MonitorRef
monitor = monitor' . ProcessIdentifier
withMonitor :: ProcessId -> Process a -> Process a
withMonitor pid code = bracket (monitor pid) unmonitor (\_ -> code)
  
  
  
  
unlink :: ProcessId -> Process ()
unlink pid = do
  unlinkAsync pid
  receiveWait [ matchIf (\(DidUnlinkProcess pid') -> pid' == pid)
                        (\_ -> return ())
              ]
unlinkNode :: NodeId -> Process ()
unlinkNode nid = do
  unlinkNodeAsync nid
  receiveWait [ matchIf (\(DidUnlinkNode nid') -> nid' == nid)
                        (\_ -> return ())
              ]
unlinkPort :: SendPort a -> Process ()
unlinkPort sport = do
  unlinkPortAsync sport
  receiveWait [ matchIf (\(DidUnlinkPort cid) -> cid == sendPortId sport)
                        (\_ -> return ())
              ]
unmonitor :: MonitorRef -> Process ()
unmonitor ref = do
  unmonitorAsync ref
  receiveWait [ matchIf (\(DidUnmonitor ref') -> ref' == ref)
                        (\_ -> return ())
              ]
catch :: Exception e => Process a -> (e -> Process a) -> Process a
catch = Catch.catch
try :: Exception e => Process a -> Process (Either e a)
try = Catch.try
mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b
mask = Catch.mask
mask_ :: Process a -> Process a
mask_ = Catch.mask_
onException :: Process a -> Process b -> Process a
onException = Catch.onException
bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c
bracket = Catch.bracket
bracket_ :: Process a -> Process b -> Process c -> Process c
bracket_ = Catch.bracket_
finally :: Process a -> Process b -> Process a
finally = Catch.finally
data Handler a = forall e . Exception e => Handler (e -> Process a)
instance Functor Handler where
    fmap f (Handler h) = Handler (fmap f . h)
catches :: Process a -> [Handler a] -> Process a
catches proc handlers = proc `Catch.catch` catchesHandler handlers
catchesHandler :: [Handler a] -> SomeException -> Process a
catchesHandler handlers e = foldr tryHandler (throwM e) handlers
    where tryHandler (Handler handler) res
              = case fromException e of
                Just e' -> handler e'
                Nothing -> res
expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)
expectTimeout n = receiveTimeout n [match return]
spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef
spawnAsync nid proc = do
  spawnRef <- getSpawnRef
  node     <- getSelfNode
  if nid == node
    then sendCtrlMsg Nothing $ Spawn proc spawnRef
    else sendCtrlMsg (Just nid) $ Spawn proc spawnRef
  return spawnRef
monitorNode :: NodeId -> Process MonitorRef
monitorNode =
  monitor' . NodeIdentifier
monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRef
monitorPort (SendPort cid) =
  monitor' (SendPortIdentifier cid)
unmonitorAsync :: MonitorRef -> Process ()
unmonitorAsync =
  sendCtrlMsg Nothing . Unmonitor
linkNode :: NodeId -> Process ()
linkNode = link' . NodeIdentifier
linkPort :: SendPort a -> Process ()
linkPort (SendPort cid) =
  link' (SendPortIdentifier cid)
unlinkAsync :: ProcessId -> Process ()
unlinkAsync =
  sendCtrlMsg Nothing . Unlink . ProcessIdentifier
unlinkNodeAsync :: NodeId -> Process ()
unlinkNodeAsync =
  sendCtrlMsg Nothing . Unlink . NodeIdentifier
unlinkPortAsync :: SendPort a -> Process ()
unlinkPortAsync (SendPort cid) =
  sendCtrlMsg Nothing . Unlink $ SendPortIdentifier cid
say :: String -> Process ()
say string = do
  now <- liftIO getCurrentTime
  us  <- getSelfPid
  nsend "logger" (formatTime defaultTimeLocale "%c" now, us, string)
register :: String -> ProcessId -> Process ()
register = registerImpl False
reregister :: String -> ProcessId -> Process ()
reregister = registerImpl True
registerImpl :: Bool -> String -> ProcessId -> Process ()
registerImpl force label pid = do
  mynid <- getSelfNode
  sendCtrlMsg Nothing (Register label mynid (Just pid) force)
  receiveWait
    [ matchIf (\(RegisterReply label' _ _) -> label == label')
              (\(RegisterReply _ ok owner) -> handleRegistrationReply label ok owner)
    ]
registerRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
registerRemoteAsync nid label pid = do
    here <- getSelfNode
    sendCtrlMsg (if nid == here then Nothing else Just nid)
                (Register label nid (Just pid) False)
reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
reregisterRemoteAsync nid label pid =
  sendCtrlMsg (Just nid) (Register label nid (Just pid) True)
unregister :: String -> Process ()
unregister label = do
  mynid <- getSelfNode
  sendCtrlMsg Nothing (Register label mynid Nothing False)
  receiveWait
    [ matchIf (\(RegisterReply label' _ _) -> label == label')
              (\(RegisterReply _ ok owner) -> handleRegistrationReply label ok owner)
    ]
handleRegistrationReply :: String -> Bool -> Maybe ProcessId -> Process ()
handleRegistrationReply label ok owner =
  when (not ok) $
     throwM $ ProcessRegistrationException label owner
unregisterRemoteAsync :: NodeId -> String -> Process ()
unregisterRemoteAsync nid label =
  sendCtrlMsg (Just nid) (Register label nid Nothing False)
whereis :: String -> Process (Maybe ProcessId)
whereis label = do
  sendCtrlMsg Nothing (WhereIs label)
  receiveWait [ matchIf (\(WhereIsReply label' _) -> label == label')
                        (\(WhereIsReply _ mPid) -> return mPid)
              ]
whereisRemoteAsync :: NodeId -> String -> Process ()
whereisRemoteAsync nid label = do
    here <- getSelfNode
    sendCtrlMsg (if nid == here then Nothing else Just nid) (WhereIs label)
nsend :: Serializable a => String -> a -> Process ()
nsend label msg =
  sendCtrlMsg Nothing (NamedSend label (createMessage msg))
unsafeNSend :: Serializable a => String -> a -> Process ()
unsafeNSend = Unsafe.nsend
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
nsendRemote nid label msg = do
  here <- getSelfNode
  if here == nid then nsend label msg
    else sendCtrlMsg (Just nid) (NamedSend label (createMessage msg))
unsafeNSendRemote :: Serializable a => NodeId -> String -> a -> Process ()
unsafeNSendRemote = Unsafe.nsendRemote
unStatic :: Typeable a => Static a -> Process a
unStatic static = do
  rtable <- remoteTable . processNode <$> ask
  case Static.unstatic rtable static of
    Left err -> fail $ "Could not resolve static value: " ++ err
    Right x  -> return x
unClosure :: Typeable a => Closure a -> Process a
unClosure closure = do
  rtable <- remoteTable . processNode <$> ask
  case Static.unclosure rtable closure of
    Left err -> fail $ "Could not resolve closure: " ++ err
    Right x  -> return x
reconnect :: ProcessId -> Process ()
reconnect them = do
  us <- getSelfPid
  node <- processNode <$> ask
  liftIO $ disconnect node (ProcessIdentifier us) (ProcessIdentifier them)
reconnectPort :: SendPort a -> Process ()
reconnectPort them = do
  us <- getSelfPid
  node <- processNode <$> ask
  liftIO $ disconnect node (ProcessIdentifier us) (SendPortIdentifier (sendPortId them))
sendLocal :: (Serializable a) => ProcessId -> a -> Process ()
sendLocal to msg =
  sendCtrlMsg Nothing $ LocalSend to (createUnencodedMessage msg)
sendChanLocal :: (Serializable a) => SendPortId -> a -> Process ()
sendChanLocal spId msg =
  
  
  
  sendCtrlMsg Nothing $ LocalPortSend spId (createUnencodedMessage msg)
getMonitorRefFor :: Identifier -> Process MonitorRef
getMonitorRefFor ident = do
  proc <- ask
  liftIO $ modifyMVar (processState proc) $ \st -> do
    let counter = st ^. monitorCounter
    return ( monitorCounter ^: (+ 1) $ st
           , MonitorRef ident counter
           )
getSpawnRef :: Process SpawnRef
getSpawnRef = do
  proc <- ask
  liftIO $ modifyMVar (processState proc) $ \st -> do
    let counter = st ^. spawnCounter
    return ( spawnCounter ^: (+ 1) $ st
           , SpawnRef counter
           )
monitor' :: Identifier -> Process MonitorRef
monitor' ident = do
  monitorRef <- getMonitorRefFor ident
  sendCtrlMsg Nothing $ Monitor monitorRef
  return monitorRef
link' :: Identifier -> Process ()
link' = sendCtrlMsg Nothing . Link