{-# LANGUAGE RankNTypes  #-}
module Control.Distributed.Process.Internal.Spawn
  ( spawn
  , spawnLink
  , spawnMonitor
  , call
  , spawnSupervised
  , spawnChannel
  ) where

import Control.Distributed.Static
  ( Static
  , Closure
  , closureCompose
  , staticClosure
  )
import Control.Distributed.Process.Internal.Types
  ( NodeId(..)
  , ProcessId(..)
  , Process(..)
  , MonitorRef(..)
  , ProcessMonitorNotification(..)
  , NodeMonitorNotification(..)
  , DidSpawn(..)
  , SendPort(..)
  , ReceivePort(..)
  , nullProcessId
  )
import Control.Distributed.Process.Serializable (Serializable, SerializableDict)
import Control.Distributed.Process.Internal.Closure.BuiltIn
  ( sdictSendPort
  , sndStatic
  , idCP
  , seqCP
  , bindCP
  , splitCP
  , cpLink
  , cpSend
  , cpNewChan
  , cpDelayed
  , returnCP
  , sdictUnit
  )
import Control.Distributed.Process.Internal.Primitives
  ( -- Basic messaging
    usend
  , expect
  , receiveWait
  , match
  , matchIf
  , link
  , getSelfPid
  , monitor
  , monitorNode
  , unmonitor
  , spawnAsync
  , reconnect
  )

-- | Spawn a process
--
-- For more information about 'Closure', see
-- "Control.Distributed.Process.Closure".
--
-- See also 'call'.
spawn :: NodeId -> Closure (Process ()) -> Process ProcessId
spawn :: NodeId -> Closure (Process ()) -> Process ProcessId
spawn NodeId
nid Closure (Process ())
proc = do
  ProcessId
us   <- Process ProcessId
getSelfPid
  MonitorRef
mRef <- NodeId -> Process MonitorRef
monitorNode NodeId
nid
  SpawnRef
sRef <- NodeId -> Closure (Process ()) -> Process SpawnRef
spawnAsync NodeId
nid (ProcessId -> Closure (Process ()) -> Closure (Process ())
cpDelayed ProcessId
us Closure (Process ())
proc)
  [Match ProcessId] -> Process ProcessId
forall b. [Match b] -> Process b
receiveWait [
      (DidSpawn -> Bool)
-> (DidSpawn -> Process ProcessId) -> Match ProcessId
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(DidSpawn SpawnRef
ref ProcessId
_) -> SpawnRef
ref SpawnRef -> SpawnRef -> Bool
forall a. Eq a => a -> a -> Bool
== SpawnRef
sRef) ((DidSpawn -> Process ProcessId) -> Match ProcessId)
-> (DidSpawn -> Process ProcessId) -> Match ProcessId
forall a b. (a -> b) -> a -> b
$ \(DidSpawn SpawnRef
_ ProcessId
pid) -> do
        MonitorRef -> Process ()
unmonitor MonitorRef
mRef
        ProcessId -> () -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
usend ProcessId
pid ()
        ProcessId -> Process ProcessId
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessId
pid
    , (NodeMonitorNotification -> Bool)
-> (NodeMonitorNotification -> Process ProcessId)
-> Match ProcessId
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(NodeMonitorNotification MonitorRef
ref NodeId
_ DiedReason
_) -> MonitorRef
ref MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
mRef) ((NodeMonitorNotification -> Process ProcessId) -> Match ProcessId)
-> (NodeMonitorNotification -> Process ProcessId)
-> Match ProcessId
forall a b. (a -> b) -> a -> b
$ \NodeMonitorNotification
_ ->
        ProcessId -> Process ProcessId
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (NodeId -> ProcessId
nullProcessId NodeId
nid)
    ]

-- | Spawn a process and link to it
--
-- Note that this is just the sequential composition of 'spawn' and 'link'.
-- (The "Unified" semantics that underlies Cloud Haskell does not even support
-- a synchronous link operation)
spawnLink :: NodeId -> Closure (Process ()) -> Process ProcessId
spawnLink :: NodeId -> Closure (Process ()) -> Process ProcessId
spawnLink NodeId
nid Closure (Process ())
proc = do
  ProcessId
pid <- NodeId -> Closure (Process ()) -> Process ProcessId
spawn NodeId
nid Closure (Process ())
proc
  ProcessId -> Process ()
link ProcessId
pid
  ProcessId -> Process ProcessId
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessId
pid

-- | Like 'spawnLink', but monitor the spawned process
spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
spawnMonitor NodeId
nid Closure (Process ())
proc = do
  ProcessId
pid <- NodeId -> Closure (Process ()) -> Process ProcessId
spawn NodeId
nid Closure (Process ())
proc
  MonitorRef
ref <- ProcessId -> Process MonitorRef
monitor ProcessId
pid
  (ProcessId, MonitorRef) -> Process (ProcessId, MonitorRef)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessId
pid, MonitorRef
ref)

-- | Run a process remotely and wait for it to reply
--
-- We monitor the remote process: if it dies before it can send a reply, we die
-- too.
--
-- For more information about 'Static', 'SerializableDict', and 'Closure', see
-- "Control.Distributed.Process.Closure".
--
-- See also 'spawn'.
call :: Serializable a
        => Static (SerializableDict a)
        -> NodeId
        -> Closure (Process a)
        -> Process a
call :: forall a.
Serializable a =>
Static (SerializableDict a)
-> NodeId -> Closure (Process a) -> Process a
call Static (SerializableDict a)
dict NodeId
nid Closure (Process a)
proc = do
  ProcessId
us <- Process ProcessId
getSelfPid
  (ProcessId
pid, MonitorRef
mRef) <- NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
spawnMonitor NodeId
nid (Closure (Process a)
proc Closure (Process a) -> CP a () -> Closure (Process ())
forall a b.
(Typeable a, Typeable b) =>
Closure (Process a) -> CP a b -> Closure (Process b)
`bindCP`
                                   Static (SerializableDict a) -> ProcessId -> CP a ()
forall a.
Typeable a =>
Static (SerializableDict a) -> ProcessId -> CP a ()
cpSend Static (SerializableDict a)
dict ProcessId
us Closure (Process ())
-> Closure (Process ()) -> Closure (Process ())
forall a b.
(Typeable a, Typeable b) =>
Closure (Process a) -> Closure (Process b) -> Closure (Process b)
`seqCP`
                                   -- Delay so the process does not terminate
                                   -- before the response arrives.
                                   ProcessId -> Closure (Process ()) -> Closure (Process ())
cpDelayed ProcessId
us (Static (SerializableDict ()) -> () -> Closure (Process ())
forall a.
Serializable a =>
Static (SerializableDict a) -> a -> Closure (Process a)
returnCP Static (SerializableDict ())
sdictUnit ())
                                  )
  Either DiedReason a
mResult <- [Match (Either DiedReason a)] -> Process (Either DiedReason a)
forall b. [Match b] -> Process b
receiveWait
    [ (a -> Process (Either DiedReason a)) -> Match (Either DiedReason a)
forall a b. Serializable a => (a -> Process b) -> Match b
match ((a -> Process (Either DiedReason a))
 -> Match (Either DiedReason a))
-> (a -> Process (Either DiedReason a))
-> Match (Either DiedReason a)
forall a b. (a -> b) -> a -> b
$ \a
a -> ProcessId -> () -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
usend ProcessId
pid () Process ()
-> Process (Either DiedReason a) -> Process (Either DiedReason a)
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either DiedReason a -> Process (Either DiedReason a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Either DiedReason a
forall a b. b -> Either a b
Right a
a)
    , (ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification -> Process (Either DiedReason a))
-> Match (Either DiedReason a)
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(ProcessMonitorNotification MonitorRef
ref ProcessId
_ DiedReason
_) -> MonitorRef
ref MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
mRef)
              (\(ProcessMonitorNotification MonitorRef
_ ProcessId
_ DiedReason
reason) -> Either DiedReason a -> Process (Either DiedReason a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (DiedReason -> Either DiedReason a
forall a b. a -> Either a b
Left DiedReason
reason))
    ]
  case Either DiedReason a
mResult of
    Right a
a  -> do
      -- Wait for the monitor message so that we the mailbox doesn't grow
      [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait
        [ (ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(ProcessMonitorNotification MonitorRef
ref ProcessId
_ DiedReason
_) -> MonitorRef
ref MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
mRef)
                  (\(ProcessMonitorNotification {}) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
        ]
      -- Clean up connection to pid
      ProcessId -> Process ()
reconnect ProcessId
pid
      a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
    Left DiedReason
err ->
      String -> Process a
forall a. String -> Process a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Process a) -> String -> Process a
forall a b. (a -> b) -> a -> b
$ String
"call: remote process died: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ DiedReason -> String
forall a. Show a => a -> String
show DiedReason
err

-- | Spawn a child process, have the child link to the parent and the parent
-- monitor the child
spawnSupervised :: NodeId
                -> Closure (Process ())
                -> Process (ProcessId, MonitorRef)
spawnSupervised :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
spawnSupervised NodeId
nid Closure (Process ())
proc = do
  ProcessId
us   <- Process ProcessId
getSelfPid
  ProcessId
them <- NodeId -> Closure (Process ()) -> Process ProcessId
spawn NodeId
nid (ProcessId -> Closure (Process ())
cpLink ProcessId
us Closure (Process ())
-> Closure (Process ()) -> Closure (Process ())
forall a b.
(Typeable a, Typeable b) =>
Closure (Process a) -> Closure (Process b) -> Closure (Process b)
`seqCP` Closure (Process ())
proc)
  MonitorRef
ref  <- ProcessId -> Process MonitorRef
monitor ProcessId
them
  (ProcessId, MonitorRef) -> Process (ProcessId, MonitorRef)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessId
them, MonitorRef
ref)

-- | Spawn a new process, supplying it with a new 'ReceivePort' and return
-- the corresponding 'SendPort'.
spawnChannel :: forall a. Serializable a => Static (SerializableDict a)
             -> NodeId
             -> Closure (ReceivePort a -> Process ())
             -> Process (SendPort a)
spawnChannel :: forall a.
Serializable a =>
Static (SerializableDict a)
-> NodeId
-> Closure (ReceivePort a -> Process ())
-> Process (SendPort a)
spawnChannel Static (SerializableDict a)
dict NodeId
nid Closure (ReceivePort a -> Process ())
proc = do
    ProcessId
us <- Process ProcessId
getSelfPid
    ProcessId
_ <- NodeId -> Closure (Process ()) -> Process ProcessId
spawn NodeId
nid (ProcessId -> Closure (Process ())
go ProcessId
us)
    Process (SendPort a)
forall a. Serializable a => Process a
expect
  where
    go :: ProcessId -> Closure (Process ())
    go :: ProcessId -> Closure (Process ())
go ProcessId
pid = Static (SerializableDict a)
-> Closure (Process (SendPort a, ReceivePort a))
forall a.
Typeable a =>
Static (SerializableDict a)
-> Closure (Process (SendPort a, ReceivePort a))
cpNewChan Static (SerializableDict a)
dict
           Closure (Process (SendPort a, ReceivePort a))
-> CP (SendPort a, ReceivePort a) ((), ())
-> Closure (Process ((), ()))
forall a b.
(Typeable a, Typeable b) =>
Closure (Process a) -> CP a b -> Closure (Process b)
`bindCP`
             (Static (SerializableDict (SendPort a))
-> ProcessId -> CP (SendPort a) ()
forall a.
Typeable a =>
Static (SerializableDict a) -> ProcessId -> CP a ()
cpSend (Static (SerializableDict a)
-> Static (SerializableDict (SendPort a))
forall a.
Typeable a =>
Static (SerializableDict a)
-> Static (SerializableDict (SendPort a))
sdictSendPort Static (SerializableDict a)
dict) ProcessId
pid CP (SendPort a) ()
-> Closure (ReceivePort a -> Process ())
-> CP (SendPort a, ReceivePort a) ((), ())
forall a b c d.
(Typeable a, Typeable b, Typeable c, Typeable d) =>
CP a c -> CP b d -> CP (a, b) (c, d)
`splitCP` Closure (ReceivePort a -> Process ())
proc)
           Closure (Process ((), ()))
-> CP ((), ()) () -> Closure (Process ())
forall a b.
(Typeable a, Typeable b) =>
Closure (Process a) -> CP a b -> Closure (Process b)
`bindCP`
             (CP () ()
forall a. Typeable a => CP a a
idCP CP () () -> Closure (((), ()) -> ()) -> CP ((), ()) ()
forall b c a.
Closure (b -> c) -> Closure (a -> b) -> Closure (a -> c)
`closureCompose` Static (((), ()) -> ()) -> Closure (((), ()) -> ())
forall a. Static a -> Closure a
staticClosure Static (((), ()) -> ())
forall a b. Static ((a, b) -> b)
sndStatic)