-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Distributed.Process.UnsafePrimitives
-- Copyright   :  (c) Well-Typed / Tim Watson
-- License     :  BSD3 (see the file LICENSE)
--
-- Maintainer  :  Tim Watson <watson.timothy@gmail.com>
-- Stability   :  experimental
-- Portability :  non-portable (requires concurrency)
--
-- [Unsafe Variants of Cloud Haskell's Messaging Primitives]
--
-- Cloud Haskell's semantics do not mention evaluation/strictness properties
-- at all; Only the promise (or lack) of signal/message delivery between
-- processes is considered. For practical purposes, Cloud Haskell optimises
-- communication between (intra-node) local processes by skipping the
-- network-transport layer. In order to ensure the same strictness semantics
-- however, messages /still/ undergo binary serialisation before (internal)
-- transmission takes place. Thus we ensure that in both (the local and remote)
-- cases, message payloads are fully evaluated. Whilst this provides the user
-- with /unsurprising/ behaviour, the resulting performance overhead is quite
-- severe. Communicating data between Haskell threads /without/ forcing binary
-- serialisation reduces (intra-node, inter-process) communication overheads
-- by several orders of magnitude.
--
-- This module provides variants of Cloud Haskell's messaging primitives
-- ('send', 'sendChan', 'nsend' and 'wrapMessage') which do /not/ force binary
-- serialisation in the local case, thereby offering superior intra-node
-- messaging performance. The /catch/ is that any evaluation problems lurking
-- within the passed data structure (e.g., fields set to @undefined@ and so on)
-- will show up in the receiver rather than in the caller (as they would with
-- the /normal/ strategy).
--
-- Use of the functions in this module can potentially change the runtime
-- behaviour of your application. In addition, messages passed between Cloud
-- Haskell processes are written to a tracing infrastructure on the local node,
-- to provide improved introspection and debugging facilities for complex actor
-- based systems. This module makes no attempt to force evaluation in these
-- cases either, thus evaluation problems in passed data structures could not
-- only crash your processes, but could also bring down critical internal
-- services on which the node relies to function correctly.
--
-- If you wish to repudiate such issues, you are advised to consider the use
-- of NFSerialisable in the distributed-process-extras package, which type
-- class brings NFData into scope along with Serializable, such that we can
-- force evaluation. Intended for use with modules such as this one, this
-- approach guarantees correct evaluatedness in terms of @NFData@. Please note
-- however, that we /cannot/ guarantee that an @NFData@ instance will behave the
-- same way as a @Binary@ one with regards evaluation, so it is still possible
-- to introduce unexpected behaviour by using /unsafe/ primitives in this way.
--
-- You have been warned!
--
-- This module is exported so that you can replace the use of Cloud Haskell's
-- /safe/ messaging primitives. If you want to use both variants, then you can
-- take advantage of qualified imports, however "Control.Distributed.Process"
-- also re-exports these functions under different names, using the @unsafe@
-- prefix.
--
module Control.Distributed.Process.UnsafePrimitives
  ( -- * Unsafe Basic Messaging
    send
  , sendChan
  , nsend
  , nsendRemote
  , usend
  , wrapMessage
  ) where

import Control.Distributed.Process.Internal.Messaging
  ( sendMessage
  , sendBinary
  , sendCtrlMsg
  )
import Control.Distributed.Process.Management.Internal.Types
  ( MxEvent(..)
  )
import Control.Distributed.Process.Management.Internal.Trace.Types
  ( traceEvent
  )
import Control.Distributed.Process.Internal.Types
  ( ProcessId(..)
  , NodeId(..)
  , LocalNode(..)
  , LocalProcess(..)
  , Process(..)
  , SendPort(..)
  , ProcessSignal(..)
  , Identifier(..)
  , ImplicitReconnect(..)
  , SendPortId(..)
  , Message
  , createMessage
  , sendPortProcessId
  , unsafeCreateUnencodedMessage
  )
import Control.Distributed.Process.Serializable (Serializable)

import Control.Monad.IO.Class (liftIO)
import Control.Monad.Reader (ask)

-- | Named send to a process in the local registry (asynchronous)
nsend :: Serializable a => String -> a -> Process ()
nsend :: forall a. Serializable a => String -> a -> Process ()
nsend String
label a
msg = do
  LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  let us :: ProcessId
us = LocalProcess -> ProcessId
processId LocalProcess
proc
  let msg' :: Message
msg' = a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
msg
  -- see [note: tracing]
  IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus (LocalProcess -> LocalNode
processNode LocalProcess
proc))
                      (String -> ProcessId -> Message -> MxEvent
MxSentToName String
label ProcessId
us Message
msg')
  Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (String -> Message -> ProcessSignal
NamedSend String
label Message
msg')

-- | Named send to a process in a remote registry (asynchronous)
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
nsendRemote :: forall a. Serializable a => NodeId -> String -> a -> Process ()
nsendRemote NodeId
nid String
label a
msg = do
  LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  let us :: ProcessId
us = LocalProcess -> ProcessId
processId LocalProcess
proc
  let node :: LocalNode
node = LocalProcess -> LocalNode
processNode LocalProcess
proc
  if LocalNode -> NodeId
localNodeId LocalNode
node NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
nid
    then String -> a -> Process ()
forall a. Serializable a => String -> a -> Process ()
nsend String
label a
msg
    else
      let lbl :: String
lbl = String
label String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"@" String -> String -> String
forall a. [a] -> [a] -> [a]
++ NodeId -> String
forall a. Show a => a -> String
show NodeId
nid in do
        -- see [note: tracing] NB: this is a remote call to another NC...
        IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node)
                            (String -> ProcessId -> Message -> MxEvent
MxSentToName String
lbl ProcessId
us (a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
msg))
        Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (String -> Message -> ProcessSignal
NamedSend String
label (a -> Message
forall a. Serializable a => a -> Message
createMessage a
msg))

-- | Send a message
send :: Serializable a => ProcessId -> a -> Process ()
send :: forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
them a
msg = do
  LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  let node :: NodeId
node     = LocalNode -> NodeId
localNodeId (LocalProcess -> LocalNode
processNode LocalProcess
proc)
      destNode :: NodeId
destNode = (ProcessId -> NodeId
processNodeId ProcessId
them)
      us :: ProcessId
us       = (LocalProcess -> ProcessId
processId LocalProcess
proc)
      msg' :: Message
msg'     = a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
msg in do
    -- see [note: tracing]
    IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus (LocalProcess -> LocalNode
processNode LocalProcess
proc))
                        (ProcessId -> ProcessId -> Message -> MxEvent
MxSent ProcessId
them ProcessId
us Message
msg')
    if NodeId
destNode NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
node
      then Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> Message -> ProcessSignal
LocalSend ProcessId
them Message
msg'
      else IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
forall a.
Serializable a =>
LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
sendMessage (LocalProcess -> LocalNode
processNode LocalProcess
proc)
                                (ProcessId -> Identifier
ProcessIdentifier (LocalProcess -> ProcessId
processId LocalProcess
proc))
                                (ProcessId -> Identifier
ProcessIdentifier ProcessId
them)
                                ImplicitReconnect
NoImplicitReconnect
                                a
msg

-- | Send a message unreliably.
--
-- Unlike 'send', this function is insensitive to 'reconnect'. It will
-- try to send the message regardless of the history of connection failures
-- between the nodes.
--
-- Message passing with 'usend' is ordered for a given sender and receiver
-- if the messages arrive at all.
--
usend :: Serializable a => ProcessId -> a -> Process ()
usend :: forall a. Serializable a => ProcessId -> a -> Process ()
usend ProcessId
them a
msg = do
    LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
    let there :: NodeId
there = ProcessId -> NodeId
processNodeId ProcessId
them
    let (ProcessId
us, LocalNode
node) = (LocalProcess -> ProcessId
processId LocalProcess
proc, LocalProcess -> LocalNode
processNode LocalProcess
proc)
    let msg' :: Message
msg' = a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
msg
    -- see [note: tracing]
    IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node) (ProcessId -> ProcessId -> Message -> MxEvent
MxSent ProcessId
them ProcessId
us Message
msg')
    if LocalNode -> NodeId
localNodeId (LocalProcess -> LocalNode
processNode LocalProcess
proc) NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
there
      then Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> Message -> ProcessSignal
LocalSend ProcessId
them Message
msg'
      else Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
there) (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalProcessId -> Message -> ProcessSignal
UnreliableSend (ProcessId -> LocalProcessId
processLocalId ProcessId
them)
                                                     (a -> Message
forall a. Serializable a => a -> Message
createMessage a
msg)

-- [note: tracing]
-- Note that tracing writes to the local node's control channel, and this
-- module explicitly specifies to its clients that it does unsafe message
-- encoding. The same is true for the messages it puts onto the Management
-- event bus, however we do *not* want unevaluated thunks hitting the event
-- bus control thread. Hence the word /Unsafe/ in this module's name!
--

-- | Send a message on a typed channel
sendChan :: Serializable a => SendPort a -> a -> Process ()
sendChan :: forall a. Serializable a => SendPort a -> a -> Process ()
sendChan (SendPort SendPortId
cid) a
msg = do
  LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  let
    node :: LocalNode
node = LocalProcess -> LocalNode
processNode LocalProcess
proc
    pid :: ProcessId
pid  = LocalProcess -> ProcessId
processId LocalProcess
proc
    us :: NodeId
us   = LocalNode -> NodeId
localNodeId LocalNode
node
    them :: NodeId
them = ProcessId -> NodeId
processNodeId (SendPortId -> ProcessId
sendPortProcessId SendPortId
cid)
    msg' :: Message
msg' = a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
msg
  IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node) (ProcessId -> SendPortId -> Message -> MxEvent
MxSentToPort ProcessId
pid SendPortId
cid Message
msg')
  if NodeId
them NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
us
    then SendPortId -> Message -> Process ()
unsafeSendChanLocal SendPortId
cid Message
msg' -- NB: we wrap to P.Message !!!
    else IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
forall a.
Binary a =>
LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
sendBinary LocalNode
node
                             (ProcessId -> Identifier
ProcessIdentifier ProcessId
pid)
                             (SendPortId -> Identifier
SendPortIdentifier SendPortId
cid)
                             ImplicitReconnect
NoImplicitReconnect
                             a
msg
  where
    unsafeSendChanLocal :: SendPortId -> Message -> Process ()
    unsafeSendChanLocal :: SendPortId -> Message -> Process ()
unsafeSendChanLocal SendPortId
p Message
m = Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ SendPortId -> Message -> ProcessSignal
LocalPortSend SendPortId
p Message
m

-- | Create an unencoded @Message@ for any @Serializable@ type.
wrapMessage :: Serializable a => a -> Message
wrapMessage :: forall a. Serializable a => a -> Message
wrapMessage = a -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage