{-# LANGUAGE CPP  #-}
{- | [Cloud Haskell]

This is an implementation of Cloud Haskell, as described in
/Towards Haskell in the Cloud/ by Jeff Epstein, Andrew Black, and Simon
Peyton Jones (see
<http://research.microsoft.com/en-us/um/people/simonpj/papers/parallel/>),
although some of the details are different. The precise message passing
semantics are based on /A unified semantics for future Erlang/ by Hans
Svensson, Lars-Åke Fredlund and Clara Benac Earle.

For a detailed description of the package and other reference materials,
please see the distributed-process wiki page on github:
<https://github.com/haskell-distributed/distributed-process/wiki>.

-}
module Control.Distributed.Process
  ( -- * Basic types
    ProcessId
  , NodeId(..)
  , Process
  , SendPortId
  , processNodeId
  , sendPortProcessId
  , liftIO -- Reexported for convenience
    -- * Basic messaging
  , send
  , usend
  , expect
  , expectTimeout
    -- * Channels
  , ReceivePort
  , SendPort
  , sendPortId
  , newChan
  , sendChan
  , receiveChan
  , receiveChanTimeout
  , mergePortsBiased
  , mergePortsRR
    -- * Unsafe messaging variants
  , unsafeSend
  , unsafeUSend
  , unsafeSendChan
  , unsafeNSend
  , unsafeNSendRemote
  , unsafeWrapMessage
    -- * Advanced messaging
  , Match
  , receiveWait
  , receiveTimeout
  , match
  , matchIf
  , matchUnknown
  , matchAny
  , matchAnyIf
  , matchChan
  , matchSTM
  , Message
  , matchMessage
  , matchMessageIf
  , isEncoded
  , wrapMessage
  , unwrapMessage
  , handleMessage
  , handleMessageIf
  , handleMessage_
  , handleMessageIf_
  , forward
  , uforward
  , delegate
  , relay
  , proxy
    -- * Process management
  , spawn
  , call
  , terminate
  , die
  , kill
  , exit
  , catchExit
  , catchesExit
  , ProcessTerminationException(..)
  , ProcessRegistrationException(..)
  , SpawnRef
  , getSelfPid
  , getSelfNode
  , ProcessInfo(..)
  , getProcessInfo
  , NodeStats(..)
  , getNodeStats
  , getLocalNodeStats
    -- * Monitoring and linking
  , link
  , linkNode
  , linkPort
  , unlink
  , unlinkNode
  , unlinkPort
  , monitor
  , monitorNode
  , monitorPort
  , unmonitor
  , withMonitor
  , withMonitor_
  , MonitorRef -- opaque
  , ProcessLinkException(..)
  , NodeLinkException(..)
  , PortLinkException(..)
  , ProcessMonitorNotification(..)
  , NodeMonitorNotification(..)
  , PortMonitorNotification(..)
  , DiedReason(..)
    -- * Closures
  , Closure
  , closure
  , Static
  , unStatic
  , unClosure
  , RemoteTable
    -- * Logging
  , say
    -- * Registry
  , register
  , reregister
  , unregister
  , whereis
  , nsend
  , registerRemoteAsync
  , reregisterRemoteAsync
  , unregisterRemoteAsync
  , whereisRemoteAsync
  , nsendRemote
  , WhereIsReply(..)
  , RegisterReply(..)
    -- * Exception handling
  , catch
  , Handler(..)
  , catches
  , try
  , mask
  , mask_
  , onException
  , bracket
  , bracket_
  , finally
    -- * Auxiliary API
  , spawnAsync
  , spawnSupervised
  , spawnLink
  , spawnMonitor
  , spawnChannel
  , DidSpawn(..)
    -- * Local versions of 'spawn'
  , spawnLocal
  , spawnChannelLocal
  , callLocal
    -- * Reconnecting
  , reconnect
  , reconnectPort
  ) where

import Control.Monad.IO.Class (liftIO)
import Control.Applicative
import Control.Monad.Reader (ask)
import Control.Concurrent (killThread)
import Control.Concurrent.MVar
  ( MVar
  , newEmptyMVar
  , takeMVar
  , putMVar
  )
import Control.Distributed.Static
  ( Closure
  , closure
  , Static
  , RemoteTable
  )
import Control.Distributed.Process.Internal.Types
  ( NodeId(..)
  , ProcessId(..)
  , Process(..)
  , MonitorRef(..)
  , ProcessMonitorNotification(..)
  , NodeMonitorNotification(..)
  , PortMonitorNotification(..)
  , ProcessLinkException(..)
  , NodeLinkException(..)
  , PortLinkException(..)
  , ProcessRegistrationException(..)
  , DiedReason(..)
  , SpawnRef(..)
  , DidSpawn(..)
  , SendPort(..)
  , ReceivePort(..)
  , SendPortId(..)
  , WhereIsReply(..)
  , RegisterReply(..)
  , LocalProcess(processNode)
  , Message
  , localProcessWithId
  )
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process.Internal.Primitives
  ( -- Basic messaging
    send
  , usend
  , expect
    -- Channels
  , newChan
  , sendChan
  , receiveChan
  , mergePortsBiased
  , mergePortsRR
  , unsafeSend
  , unsafeUSend
  , unsafeSendChan
  , unsafeNSend
  , unsafeNSendRemote
    -- Advanced messaging
  , Match
  , receiveWait
  , receiveTimeout
  , match
  , matchIf
  , matchUnknown
  , matchAny
  , matchAnyIf
  , matchChan
  , matchSTM
  , matchMessage
  , matchMessageIf
  , isEncoded
  , wrapMessage
  , unsafeWrapMessage
  , unwrapMessage
  , handleMessage
  , handleMessageIf
  , handleMessage_
  , handleMessageIf_
  , forward
  , uforward
  , delegate
  , relay
  , proxy
    -- Process management
  , terminate
  , ProcessTerminationException(..)
  , die
  , exit
  , catchExit
  , catchesExit
  , kill
  , getSelfPid
  , getSelfNode
  , ProcessInfo(..)
  , getProcessInfo
  , NodeStats(..)
  , getNodeStats
  , getLocalNodeStats
    -- Monitoring and linking
  , link
  , linkNode
  , linkPort
  , unlink
  , unlinkNode
  , unlinkPort
  , monitor
  , monitorNode
  , monitorPort
  , unmonitor
  , withMonitor
  , withMonitor_
    -- Logging
  , say
    -- Registry
  , register
  , reregister
  , unregister
  , whereis
  , nsend
  , registerRemoteAsync
  , reregisterRemoteAsync
  , unregisterRemoteAsync
  , whereisRemoteAsync
  , nsendRemote
    -- Closures
  , unStatic
  , unClosure
    -- Exception handling
  , catch
  , Handler(..)
  , catches
  , try
  , mask
  , mask_
  , onException
  , bracket
  , bracket_
  , finally
    -- Auxiliary API
  , expectTimeout
  , receiveChanTimeout
  , spawnAsync
    -- Reconnecting
  , reconnect
  , reconnectPort
  )
import Control.Distributed.Process.Node (forkProcess)
import Control.Distributed.Process.Internal.Types
  ( processThread
  , withValidLocalState
  )
import Control.Distributed.Process.Internal.Spawn
  ( -- Spawning Processes/Channels
    spawn
  , spawnLink
  , spawnMonitor
  , spawnChannel
  , spawnSupervised
  , call
  )
import qualified Control.Monad.Catch as Catch

#if MIN_VERSION_base(4,6,0)
import Prelude
#else
import Prelude hiding (catch)
#endif
import qualified Control.Exception as Exception (onException)
import Data.Accessor ((^.))
import Data.Foldable (forM_)


-- INTERNAL NOTES
--
-- 1.  'send' never fails. If you want to know that the remote process received
--     your message, you will need to send an explicit acknowledgement. If you
--     want to know when the remote process failed, you will need to monitor
--     that remote process.
--
-- 2.  'send' may block (when the system TCP buffers are full, while we are
--     trying to establish a connection to the remote endpoint, etc.) but its
--     return does not imply that the remote process received the message (much
--     less processed it). When 'send' targets a local process, it is dispatched
--     via the node controller however, in which cases it will /not/ block. This
--     isn't part of the semantics, so it should be ok. The ordering is maintained
--     because the ctrlChannel still has FIFO semantics with regards interactions
--     between two disparate forkIO threads.
--
-- 3.  Message delivery is reliable and ordered. That means that if process A
--     sends messages m1, m2, m3 to process B, B will either arrive all three
--     messages in order (m1, m2, m3) or a prefix thereof; messages will not be
--     'missing' (m1, m3) or reordered (m1, m3, m2)
--
-- In order to guarantee (3), we stipulate that
--
-- 3a. We do not garbage collect connections because Network.Transport provides
--     ordering guarantees only *per connection*.
--
-- 3b. Once a connection breaks, we have no way of knowing which messages
--     arrived and which did not; hence, once a connection fails, we assume the
--     remote process to be forever unreachable. Otherwise we might sent m1 and
--     m2, get notified of the broken connection, reconnect, send m3, but only
--     m1 and m3 arrive.
--
-- 3c. As a consequence of (3b) we should not reuse PIDs. If a process dies,
--     we consider it forever unreachable. Hence, new processes should get new
--     IDs or they too would be considered unreachable.
--
-- Main reference for Cloud Haskell is
--
-- [1] "Towards Haskell in the Cloud", Jeff Epstein, Andrew Black and Simon
--     Peyton-Jones.
--       http://research.microsoft.com/en-us/um/people/simonpj/papers/parallel/remote.pdf
--
-- The precise semantics for message passing is based on
--
-- [2] "A Unified Semantics for Future Erlang", Hans Svensson, Lars-Ake Fredlund
--     and Clara Benac Earle (not freely available online, unfortunately)
--
-- Some pointers to related documentation about Erlang, for comparison and
-- inspiration:
--
-- [3] "Programming Distributed Erlang Applications: Pitfalls and Recipes",
--     Hans Svensson and Lars-Ake Fredlund
--       http://man.lupaworld.com/content/develop/p37-svensson.pdf
-- [4] The Erlang manual, sections "Message Sending" and "Send"
--       http://www.erlang.org/doc/reference_manual/processes.html#id82409
--       http://www.erlang.org/doc/reference_manual/expressions.html#send
-- [5] Questions "Is the order of message reception guaranteed?" and
--     "If I send a message, is it guaranteed to reach the receiver?" of
--     the Erlang FAQ
--       http://www.erlang.org/faq/academic.html
-- [6] "Delivery of Messages", post on erlang-questions
--       http://erlang.org/pipermail/erlang-questions/2012-February/064767.html

--------------------------------------------------------------------------------
-- Local versions of spawn                                                    --
--------------------------------------------------------------------------------

-- | Spawn a process on the local node
spawnLocal :: Process () -> Process ProcessId
spawnLocal :: Process () -> Process ProcessId
spawnLocal Process ()
proc = do
  LocalNode
node <- LocalProcess -> LocalNode
processNode (LocalProcess -> LocalNode)
-> Process LocalProcess -> Process LocalNode
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  IO ProcessId -> Process ProcessId
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ProcessId -> Process ProcessId)
-> IO ProcessId -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
node Process ()
proc

-- | Create a new typed channel, spawn a process on the local node, passing it
-- the receive port, and return the send port
spawnChannelLocal :: Serializable a
                  => (ReceivePort a -> Process ())
                  -> Process (SendPort a)
spawnChannelLocal :: forall a.
Serializable a =>
(ReceivePort a -> Process ()) -> Process (SendPort a)
spawnChannelLocal ReceivePort a -> Process ()
proc = do
  LocalNode
node <- LocalProcess -> LocalNode
processNode (LocalProcess -> LocalNode)
-> Process LocalProcess -> Process LocalNode
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  IO (SendPort a) -> Process (SendPort a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SendPort a) -> Process (SendPort a))
-> IO (SendPort a) -> Process (SendPort a)
forall a b. (a -> b) -> a -> b
$ do
    MVar (SendPort a)
mvar <- IO (MVar (SendPort a))
forall a. IO (MVar a)
newEmptyMVar
    ProcessId
_ <- LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
node (Process () -> IO ProcessId) -> Process () -> IO ProcessId
forall a b. (a -> b) -> a -> b
$ do
      -- It is important that we allocate the new channel in the new process,
      -- because otherwise it will be associated with the wrong process ID
      (SendPort a
sport, ReceivePort a
rport) <- Process (SendPort a, ReceivePort a)
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
      IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MVar (SendPort a) -> SendPort a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (SendPort a)
mvar SendPort a
sport
      ReceivePort a -> Process ()
proc ReceivePort a
rport
    MVar (SendPort a) -> IO (SendPort a)
forall a. MVar a -> IO a
takeMVar MVar (SendPort a)
mvar

-- | Local version of 'call'. Running a process in this way isolates it from
-- messages sent to the caller process, and also allows silently dropping late
-- or duplicate messages sent to the isolated process after it exits.
-- Silently dropping messages may not always be the best approach.
callLocal :: Process a -> Process a
callLocal :: forall a. Process a -> Process a
callLocal Process a
proc = ((forall a. Process a -> Process a) -> Process a) -> Process a
forall b.
HasCallStack =>
((forall a. Process a -> Process a) -> Process b) -> Process b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
Catch.mask (((forall a. Process a -> Process a) -> Process a) -> Process a)
-> ((forall a. Process a -> Process a) -> Process a) -> Process a
forall a b. (a -> b) -> a -> b
$ \forall a. Process a -> Process a
release -> do
    MVar (Either SomeException a)
mv <- IO (MVar (Either SomeException a))
-> Process (MVar (Either SomeException a))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar (Either SomeException a))
forall a. IO (MVar a)
newEmptyMVar :: Process (MVar (Either Catch.SomeException a))
    ProcessId
child <- Process () -> Process ProcessId
spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ Process a -> Process (Either SomeException a)
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
Catch.try (Process a -> Process a
forall a. Process a -> Process a
release Process a
proc) Process (Either SomeException a)
-> (Either SomeException a -> Process ()) -> Process ()
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ())
-> (Either SomeException a -> IO ())
-> Either SomeException a
-> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar (Either SomeException a) -> Either SomeException a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException a)
mv
    LocalProcess
lproc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
    IO a -> Process a
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> Process a) -> IO a -> Process a
forall a b. (a -> b) -> a -> b
$ do
      Either SomeException a
rs <- IO (Either SomeException a)
-> IO (Either SomeException a) -> IO (Either SomeException a)
forall a b. IO a -> IO b -> IO a
Exception.onException (MVar (Either SomeException a) -> IO (Either SomeException a)
forall a. MVar a -> IO a
takeMVar MVar (Either SomeException a)
mv) (IO (Either SomeException a) -> IO (Either SomeException a))
-> IO (Either SomeException a) -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ IO (Either SomeException a) -> IO (Either SomeException a)
forall (m :: * -> *) a. (HasCallStack, MonadMask m) => m a -> m a
Catch.uninterruptibleMask_ (IO (Either SomeException a) -> IO (Either SomeException a))
-> IO (Either SomeException a) -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$
            -- Exceptions need to be prevented from interrupting the clean up or
            -- the original exception which caused entering the handler could be
            -- forgotten. For instance, this could have a problematic effect
            -- when the original exception was meant to kill the thread and the
            -- second exception doesn't (like the exception thrown by
            -- 'System.Timeout.timeout').
            do Maybe ThreadId
mchildThreadId <- LocalNode
-> (ValidLocalNodeState -> IO (Maybe ThreadId))
-> IO (Maybe ThreadId)
forall r. LocalNode -> (ValidLocalNodeState -> IO r) -> IO r
withValidLocalState (LocalProcess -> LocalNode
processNode LocalProcess
lproc) ((ValidLocalNodeState -> IO (Maybe ThreadId))
 -> IO (Maybe ThreadId))
-> (ValidLocalNodeState -> IO (Maybe ThreadId))
-> IO (Maybe ThreadId)
forall a b. (a -> b) -> a -> b
$
                 \ValidLocalNodeState
vst -> Maybe ThreadId -> IO (Maybe ThreadId)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ThreadId -> IO (Maybe ThreadId))
-> Maybe ThreadId -> IO (Maybe ThreadId)
forall a b. (a -> b) -> a -> b
$ (LocalProcess -> ThreadId) -> Maybe LocalProcess -> Maybe ThreadId
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap LocalProcess -> ThreadId
processThread (Maybe LocalProcess -> Maybe ThreadId)
-> Maybe LocalProcess -> Maybe ThreadId
forall a b. (a -> b) -> a -> b
$
                           ValidLocalNodeState
vst ValidLocalNodeState
-> T ValidLocalNodeState (Maybe LocalProcess) -> Maybe LocalProcess
forall r a. r -> T r a -> a
^. LocalProcessId -> T ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId (ProcessId -> LocalProcessId
processLocalId ProcessId
child)
               Maybe ThreadId -> (ThreadId -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe ThreadId
mchildThreadId ThreadId -> IO ()
killThread
               MVar (Either SomeException a) -> IO (Either SomeException a)
forall a. MVar a -> IO a
takeMVar MVar (Either SomeException a)
mv
      (SomeException -> IO a)
-> (a -> IO a) -> Either SomeException a -> IO a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
Catch.throwM a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException a
rs