{-# LANGUAGE CPP  #-}
module Control.Distributed.Process
  ( 
    ProcessId
  , NodeId(..)
  , Process
  , SendPortId
  , processNodeId
  , sendPortProcessId
  , liftIO 
    
  , send
  , usend
  , expect
  , expectTimeout
    
  , ReceivePort
  , SendPort
  , sendPortId
  , newChan
  , sendChan
  , receiveChan
  , receiveChanTimeout
  , mergePortsBiased
  , mergePortsRR
    
  , unsafeSend
  , unsafeUSend
  , unsafeSendChan
  , unsafeNSend
  , unsafeNSendRemote
  , unsafeWrapMessage
    
  , Match
  , receiveWait
  , receiveTimeout
  , match
  , matchIf
  , matchUnknown
  , matchAny
  , matchAnyIf
  , matchChan
  , matchSTM
  , Message
  , matchMessage
  , matchMessageIf
  , isEncoded
  , wrapMessage
  , unwrapMessage
  , handleMessage
  , handleMessageIf
  , handleMessage_
  , handleMessageIf_
  , forward
  , uforward
  , delegate
  , relay
  , proxy
    
  , spawn
  , call
  , terminate
  , die
  , kill
  , exit
  , catchExit
  , catchesExit
  , ProcessTerminationException(..)
  , ProcessRegistrationException(..)
  , SpawnRef
  , getSelfPid
  , getSelfNode
  , ProcessInfo(..)
  , getProcessInfo
  , NodeStats(..)
  , getNodeStats
  , getLocalNodeStats
    
  , link
  , linkNode
  , linkPort
  , unlink
  , unlinkNode
  , unlinkPort
  , monitor
  , monitorNode
  , monitorPort
  , unmonitor
  , withMonitor
  , withMonitor_
  , MonitorRef 
  , ProcessLinkException(..)
  , NodeLinkException(..)
  , PortLinkException(..)
  , ProcessMonitorNotification(..)
  , NodeMonitorNotification(..)
  , PortMonitorNotification(..)
  , DiedReason(..)
    
  , Closure
  , closure
  , Static
  , unStatic
  , unClosure
  , RemoteTable
    
  , say
    
  , register
  , reregister
  , unregister
  , whereis
  , nsend
  , registerRemoteAsync
  , reregisterRemoteAsync
  , unregisterRemoteAsync
  , whereisRemoteAsync
  , nsendRemote
  , WhereIsReply(..)
  , RegisterReply(..)
    
  , catch
  , Handler(..)
  , catches
  , try
  , mask
  , mask_
  , onException
  , bracket
  , bracket_
  , finally
    
  , spawnAsync
  , spawnSupervised
  , spawnLink
  , spawnMonitor
  , spawnChannel
  , DidSpawn(..)
    
  , spawnLocal
  , spawnChannelLocal
  , callLocal
    
  , reconnect
  , reconnectPort
  ) where
import Control.Monad.IO.Class (liftIO)
import Control.Applicative
import Control.Monad.Reader (ask)
import Control.Concurrent (killThread)
import Control.Concurrent.MVar
  ( MVar
  , newEmptyMVar
  , takeMVar
  , putMVar
  )
import Control.Distributed.Static
  ( Closure
  , closure
  , Static
  , RemoteTable
  )
import Control.Distributed.Process.Internal.Types
  ( NodeId(..)
  , ProcessId(..)
  , Process(..)
  , MonitorRef(..)
  , ProcessMonitorNotification(..)
  , NodeMonitorNotification(..)
  , PortMonitorNotification(..)
  , ProcessLinkException(..)
  , NodeLinkException(..)
  , PortLinkException(..)
  , ProcessRegistrationException(..)
  , DiedReason(..)
  , SpawnRef(..)
  , DidSpawn(..)
  , SendPort(..)
  , ReceivePort(..)
  , SendPortId(..)
  , WhereIsReply(..)
  , RegisterReply(..)
  , LocalProcess(processNode)
  , Message
  , localProcessWithId
  )
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process.Internal.Primitives
  ( 
    send
  , usend
  , expect
    
  , newChan
  , sendChan
  , receiveChan
  , mergePortsBiased
  , mergePortsRR
  , unsafeSend
  , unsafeUSend
  , unsafeSendChan
  , unsafeNSend
  , unsafeNSendRemote
    
  , Match
  , receiveWait
  , receiveTimeout
  , match
  , matchIf
  , matchUnknown
  , matchAny
  , matchAnyIf
  , matchChan
  , matchSTM
  , matchMessage
  , matchMessageIf
  , isEncoded
  , wrapMessage
  , unsafeWrapMessage
  , unwrapMessage
  , handleMessage
  , handleMessageIf
  , handleMessage_
  , handleMessageIf_
  , forward
  , uforward
  , delegate
  , relay
  , proxy
    
  , terminate
  , ProcessTerminationException(..)
  , die
  , exit
  , catchExit
  , catchesExit
  , kill
  , getSelfPid
  , getSelfNode
  , ProcessInfo(..)
  , getProcessInfo
  , NodeStats(..)
  , getNodeStats
  , getLocalNodeStats
    
  , link
  , linkNode
  , linkPort
  , unlink
  , unlinkNode
  , unlinkPort
  , monitor
  , monitorNode
  , monitorPort
  , unmonitor
  , withMonitor
  , withMonitor_
    
  , say
    
  , register
  , reregister
  , unregister
  , whereis
  , nsend
  , registerRemoteAsync
  , reregisterRemoteAsync
  , unregisterRemoteAsync
  , whereisRemoteAsync
  , nsendRemote
    
  , unStatic
  , unClosure
    
  , catch
  , Handler(..)
  , catches
  , try
  , mask
  , mask_
  , onException
  , bracket
  , bracket_
  , finally
    
  , expectTimeout
  , receiveChanTimeout
  , spawnAsync
    
  , reconnect
  , reconnectPort
  )
import Control.Distributed.Process.Node (forkProcess)
import Control.Distributed.Process.Internal.Types
  ( processThread
  , withValidLocalState
  )
import Control.Distributed.Process.Internal.Spawn
  ( 
    spawn
  , spawnLink
  , spawnMonitor
  , spawnChannel
  , spawnSupervised
  , call
  )
import qualified Control.Monad.Catch as Catch
#if MIN_VERSION_base(4,6,0)
import Prelude
#else
import Prelude hiding (catch)
#endif
import qualified Control.Exception as Exception (onException)
import Data.Accessor ((^.))
import Data.Foldable (forM_)
spawnLocal :: Process () -> Process ProcessId
spawnLocal :: Process () -> Process ProcessId
spawnLocal Process ()
proc = do
  LocalNode
node <- LocalProcess -> LocalNode
processNode (LocalProcess -> LocalNode)
-> Process LocalProcess -> Process LocalNode
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  IO ProcessId -> Process ProcessId
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ProcessId -> Process ProcessId)
-> IO ProcessId -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
node Process ()
proc
spawnChannelLocal :: Serializable a
                  => (ReceivePort a -> Process ())
                  -> Process (SendPort a)
spawnChannelLocal :: forall a.
Serializable a =>
(ReceivePort a -> Process ()) -> Process (SendPort a)
spawnChannelLocal ReceivePort a -> Process ()
proc = do
  LocalNode
node <- LocalProcess -> LocalNode
processNode (LocalProcess -> LocalNode)
-> Process LocalProcess -> Process LocalNode
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  IO (SendPort a) -> Process (SendPort a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SendPort a) -> Process (SendPort a))
-> IO (SendPort a) -> Process (SendPort a)
forall a b. (a -> b) -> a -> b
$ do
    MVar (SendPort a)
mvar <- IO (MVar (SendPort a))
forall a. IO (MVar a)
newEmptyMVar
    ProcessId
_ <- LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
node (Process () -> IO ProcessId) -> Process () -> IO ProcessId
forall a b. (a -> b) -> a -> b
$ do
      
      
      (SendPort a
sport, ReceivePort a
rport) <- Process (SendPort a, ReceivePort a)
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
      IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MVar (SendPort a) -> SendPort a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (SendPort a)
mvar SendPort a
sport
      ReceivePort a -> Process ()
proc ReceivePort a
rport
    MVar (SendPort a) -> IO (SendPort a)
forall a. MVar a -> IO a
takeMVar MVar (SendPort a)
mvar
callLocal :: Process a -> Process a
callLocal :: forall a. Process a -> Process a
callLocal Process a
proc = ((forall a. Process a -> Process a) -> Process a) -> Process a
forall b.
HasCallStack =>
((forall a. Process a -> Process a) -> Process b) -> Process b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
Catch.mask (((forall a. Process a -> Process a) -> Process a) -> Process a)
-> ((forall a. Process a -> Process a) -> Process a) -> Process a
forall a b. (a -> b) -> a -> b
$ \forall a. Process a -> Process a
release -> do
    MVar (Either SomeException a)
mv <- IO (MVar (Either SomeException a))
-> Process (MVar (Either SomeException a))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar (Either SomeException a))
forall a. IO (MVar a)
newEmptyMVar :: Process (MVar (Either Catch.SomeException a))
    ProcessId
child <- Process () -> Process ProcessId
spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ Process a -> Process (Either SomeException a)
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
Catch.try (Process a -> Process a
forall a. Process a -> Process a
release Process a
proc) Process (Either SomeException a)
-> (Either SomeException a -> Process ()) -> Process ()
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ())
-> (Either SomeException a -> IO ())
-> Either SomeException a
-> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar (Either SomeException a) -> Either SomeException a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException a)
mv
    LocalProcess
lproc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
    IO a -> Process a
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> Process a) -> IO a -> Process a
forall a b. (a -> b) -> a -> b
$ do
      Either SomeException a
rs <- IO (Either SomeException a)
-> IO (Either SomeException a) -> IO (Either SomeException a)
forall a b. IO a -> IO b -> IO a
Exception.onException (MVar (Either SomeException a) -> IO (Either SomeException a)
forall a. MVar a -> IO a
takeMVar MVar (Either SomeException a)
mv) (IO (Either SomeException a) -> IO (Either SomeException a))
-> IO (Either SomeException a) -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ IO (Either SomeException a) -> IO (Either SomeException a)
forall (m :: * -> *) a. (HasCallStack, MonadMask m) => m a -> m a
Catch.uninterruptibleMask_ (IO (Either SomeException a) -> IO (Either SomeException a))
-> IO (Either SomeException a) -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$
            
            
            
            
            
            
            do Maybe ThreadId
mchildThreadId <- LocalNode
-> (ValidLocalNodeState -> IO (Maybe ThreadId))
-> IO (Maybe ThreadId)
forall r. LocalNode -> (ValidLocalNodeState -> IO r) -> IO r
withValidLocalState (LocalProcess -> LocalNode
processNode LocalProcess
lproc) ((ValidLocalNodeState -> IO (Maybe ThreadId))
 -> IO (Maybe ThreadId))
-> (ValidLocalNodeState -> IO (Maybe ThreadId))
-> IO (Maybe ThreadId)
forall a b. (a -> b) -> a -> b
$
                 \ValidLocalNodeState
vst -> Maybe ThreadId -> IO (Maybe ThreadId)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ThreadId -> IO (Maybe ThreadId))
-> Maybe ThreadId -> IO (Maybe ThreadId)
forall a b. (a -> b) -> a -> b
$ (LocalProcess -> ThreadId) -> Maybe LocalProcess -> Maybe ThreadId
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap LocalProcess -> ThreadId
processThread (Maybe LocalProcess -> Maybe ThreadId)
-> Maybe LocalProcess -> Maybe ThreadId
forall a b. (a -> b) -> a -> b
$
                           ValidLocalNodeState
vst ValidLocalNodeState
-> T ValidLocalNodeState (Maybe LocalProcess) -> Maybe LocalProcess
forall r a. r -> T r a -> a
^. LocalProcessId -> T ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId (ProcessId -> LocalProcessId
processLocalId ProcessId
child)
               Maybe ThreadId -> (ThreadId -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe ThreadId
mchildThreadId ThreadId -> IO ()
killThread
               MVar (Either SomeException a) -> IO (Either SomeException a)
forall a. MVar a -> IO a
takeMVar MVar (Either SomeException a)
mv
      (SomeException -> IO a)
-> (a -> IO a) -> Either SomeException a -> IO a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
Catch.throwM a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException a
rs