{-# LANGUAGE DeriveGeneric    #-}
{-# LANGUAGE ParallelListComp #-}

module Control.Distributed.Process.Tests.Mx (tests) where

import Control.Distributed.Process.Tests.Internal.Utils
import Network.Transport.Test (TestTransport(..))
import Control.Distributed.Process hiding (bracket, finally, try)
import Control.Distributed.Process.Internal.Types
 ( ProcessExitException(..)
 , unsafeCreateUnencodedMessage
 )
import Control.Distributed.Process.Node
import qualified Control.Distributed.Process.UnsafePrimitives as Unsafe
  ( send
  , nsend
  , nsendRemote
  , usend
  , sendChan
  )
import Control.Distributed.Process.Management
  ( MxEvent(..)
  , MxAgentId(..)
  , mxAgent
  , mxSink
  , mxReady
  , mxReceive
  , mxDeactivate
  , liftMX
  , mxGetLocal
  , mxSetLocal
  , mxUpdateLocal
  , mxNotify
  , mxBroadcast
  )
import Control.Monad (void, unless)
import Control.Monad.Catch(finally, bracket, try)
import Data.Binary
import Data.List (find, sort, intercalate)
import Data.Maybe (isJust, fromJust, isNothing, fromMaybe, catMaybes)
import Data.Typeable
import GHC.Generics hiding (from)

import Test.Framework
  ( Test
  , testGroup
  )
import Test.Framework.Providers.HUnit (testCase)
import Test.HUnit (assertBool, assertEqual)

data Publish = Publish
  deriving (Typeable, (forall x. Publish -> Rep Publish x)
-> (forall x. Rep Publish x -> Publish) -> Generic Publish
forall x. Rep Publish x -> Publish
forall x. Publish -> Rep Publish x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. Publish -> Rep Publish x
from :: forall x. Publish -> Rep Publish x
$cto :: forall x. Rep Publish x -> Publish
to :: forall x. Rep Publish x -> Publish
Generic, Publish -> Publish -> Bool
(Publish -> Publish -> Bool)
-> (Publish -> Publish -> Bool) -> Eq Publish
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Publish -> Publish -> Bool
== :: Publish -> Publish -> Bool
$c/= :: Publish -> Publish -> Bool
/= :: Publish -> Publish -> Bool
Eq)

instance Binary Publish where

awaitExit :: ProcessId -> Process ()
awaitExit :: ProcessId -> Process ()
awaitExit ProcessId
pid =
  ProcessId -> (MonitorRef -> Process ()) -> Process ()
forall {b}. ProcessId -> (MonitorRef -> Process b) -> Process b
withMonitorRef ProcessId
pid ((MonitorRef -> Process ()) -> Process ())
-> (MonitorRef -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \MonitorRef
ref -> do
      [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait
        [ (ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(ProcessMonitorNotification MonitorRef
r ProcessId
_ DiedReason
_) -> MonitorRef
r MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
ref)
                  (\ProcessMonitorNotification
_ -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
        ]
  where
    withMonitorRef :: ProcessId -> (MonitorRef -> Process b) -> Process b
withMonitorRef ProcessId
p = Process MonitorRef
-> (MonitorRef -> Process ())
-> (MonitorRef -> Process b)
-> Process b
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket (ProcessId -> Process MonitorRef
monitor ProcessId
p) MonitorRef -> Process ()
unmonitor

testAgentBroadcast :: TestResult (Maybe ()) -> Process ()
testAgentBroadcast :: TestResult (Maybe ()) -> Process ()
testAgentBroadcast TestResult (Maybe ())
result = do
  (SendPort ()
resultSP, ReceivePort ()
resultRP) <- Process (SendPort (), ReceivePort ())
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan :: Process (SendPort (), ReceivePort ())

  ProcessId
publisher <- MxAgentId -> () -> [MxSink ()] -> Process ProcessId
forall s. MxAgentId -> s -> [MxSink s] -> Process ProcessId
mxAgent (String -> MxAgentId
MxAgentId String
"publisher-agent") () [
      (() -> MxAgent () MxAction) -> MxSink ()
forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink ((() -> MxAgent () MxAction) -> MxSink ())
-> (() -> MxAgent () MxAction) -> MxSink ()
forall a b. (a -> b) -> a -> b
$ \() -> Publish -> MxAgent () ()
forall m s. Serializable m => m -> MxAgent s ()
mxBroadcast Publish
Publish MxAgent () () -> MxAgent () MxAction -> MxAgent () MxAction
forall a b. MxAgent () a -> MxAgent () b -> MxAgent () b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MxAgent () MxAction
forall s. MxAgent s MxAction
mxReady
    ]

  ProcessId
consumer  <- MxAgentId -> () -> [MxSink ()] -> Process ProcessId
forall s. MxAgentId -> s -> [MxSink s] -> Process ProcessId
mxAgent (String -> MxAgentId
MxAgentId String
"consumer-agent") () [
      (Publish -> MxAgent () MxAction) -> MxSink ()
forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink ((Publish -> MxAgent () MxAction) -> MxSink ())
-> (Publish -> MxAgent () MxAction) -> MxSink ()
forall a b. (a -> b) -> a -> b
$ \Publish
Publish -> (Process () -> MxAgent () ()
forall a s. Process a -> MxAgent s a
liftMX (Process () -> MxAgent () ()) -> Process () -> MxAgent () ()
forall a b. (a -> b) -> a -> b
$ SendPort () -> () -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort ()
resultSP ()) MxAgent () () -> MxAgent () MxAction -> MxAgent () MxAction
forall a b. MxAgent () a -> MxAgent () b -> MxAgent () b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MxAgent () MxAction
forall s. MxAgent s MxAction
mxReady
    ]

  () -> Process ()
forall a. Serializable a => a -> Process ()
mxNotify ()
  -- Once the publisher has seen our message, it will broadcast the Publish
  -- and the consumer will see that and send the result to our typed channel.
  TestResult (Maybe ()) -> Maybe () -> Process ()
forall a. TestResult a -> a -> Process ()
stash TestResult (Maybe ())
result (Maybe () -> Process ()) -> Process (Maybe ()) -> Process ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> ReceivePort () -> Process (Maybe ())
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
10000000 ReceivePort ()
resultRP

  ProcessId -> String -> Process ()
kill ProcessId
publisher String
"finished"
  ProcessId -> String -> Process ()
kill ProcessId
consumer  String
"finished"

testAgentDualInput :: TestResult (Maybe Int) -> Process ()
testAgentDualInput :: TestResult (Maybe Int) -> Process ()
testAgentDualInput TestResult (Maybe Int)
result = do
  (SendPort Int
sp, ReceivePort Int
rp) <- Process (SendPort Int, ReceivePort Int)
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
  ProcessId
s <- MxAgentId -> Int -> [MxSink Int] -> Process ProcessId
forall s. MxAgentId -> s -> [MxSink s] -> Process ProcessId
mxAgent (String -> MxAgentId
MxAgentId String
"sum-agent") (Int
0 :: Int) [
        (Int -> MxAgent Int MxAction) -> MxSink Int
forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink ((Int -> MxAgent Int MxAction) -> MxSink Int)
-> (Int -> MxAgent Int MxAction) -> MxSink Int
forall a b. (a -> b) -> a -> b
$ (\(Int
i :: Int) -> do
                     Int -> MxAgent Int ()
forall s. s -> MxAgent s ()
mxSetLocal (Int -> MxAgent Int ()) -> (Int -> Int) -> Int -> MxAgent Int ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> Int -> Int
forall a. Num a => a -> a -> a
+Int
i) (Int -> MxAgent Int ()) -> MxAgent Int Int -> MxAgent Int ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MxAgent Int Int
forall s. MxAgent s s
mxGetLocal
                     Int
i' <- MxAgent Int Int
forall s. MxAgent s s
mxGetLocal
                     if Int
i' Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
15
                        then do MxAgent Int Int
forall s. MxAgent s s
mxGetLocal MxAgent Int Int -> (Int -> MxAgent Int ()) -> MxAgent Int ()
forall a b. MxAgent Int a -> (a -> MxAgent Int b) -> MxAgent Int b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Process () -> MxAgent Int ()
forall a s. Process a -> MxAgent s a
liftMX (Process () -> MxAgent Int ())
-> (Int -> Process ()) -> Int -> MxAgent Int ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SendPort Int -> Int -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort Int
sp
                                String -> MxAgent Int MxAction
forall s. String -> MxAgent s MxAction
mxDeactivate String
"finished"
                        else MxAgent Int MxAction
forall s. MxAgent s MxAction
mxReady)
    ]

  MonitorRef
mRef <- ProcessId -> Process MonitorRef
monitor ProcessId
s

  Int -> Process ()
forall a. Serializable a => a -> Process ()
mxNotify          (Int
1 :: Int)
  String -> Int -> Process ()
forall a. Serializable a => String -> a -> Process ()
nsend String
"sum-agent" (Int
3 :: Int)
  Int -> Process ()
forall a. Serializable a => a -> Process ()
mxNotify          (Int
2 :: Int)
  String -> Int -> Process ()
forall a. Serializable a => String -> a -> Process ()
nsend String
"sum-agent" (Int
4 :: Int)
  Int -> Process ()
forall a. Serializable a => a -> Process ()
mxNotify          (Int
5 :: Int)

  TestResult (Maybe Int) -> Maybe Int -> Process ()
forall a. TestResult a -> a -> Process ()
stash TestResult (Maybe Int)
result (Maybe Int -> Process ()) -> Process (Maybe Int) -> Process ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> ReceivePort Int -> Process (Maybe Int)
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
10000000 ReceivePort Int
rp
  Maybe Bool
died <- Int -> [Match Bool] -> Process (Maybe Bool)
forall b. Int -> [Match b] -> Process (Maybe b)
receiveTimeout Int
10000000 [
      (ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification -> Process Bool) -> Match Bool
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(ProcessMonitorNotification MonitorRef
r ProcessId
_ DiedReason
_) -> MonitorRef
r MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
mRef) (Process Bool -> ProcessMonitorNotification -> Process Bool
forall a b. a -> b -> a
const (Process Bool -> ProcessMonitorNotification -> Process Bool)
-> Process Bool -> ProcessMonitorNotification -> Process Bool
forall a b. (a -> b) -> a -> b
$ Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True)
    ]
  IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ String -> Maybe Bool -> Maybe Bool -> IO ()
forall a. (HasCallStack, Eq a, Show a) => String -> a -> a -> IO ()
assertEqual String
forall a. Monoid a => a
mempty (Bool -> Maybe Bool
forall a. a -> Maybe a
Just Bool
True) Maybe Bool
died

testAgentPrioritisation :: TestResult [String] -> Process ()
testAgentPrioritisation :: TestResult [String] -> Process ()
testAgentPrioritisation TestResult [String]
result = do

  -- TODO: this isn't really testing how we /prioritise/ one source
  -- over another at all, but I've not yet figured out the right way
  -- to do so, since we're at the whim of the scheduler with regards
  -- the timeliness of nsend versus mxNotify anyway.

  let name :: String
name = String
"prioritising-agent"
  (SendPort [String]
sp, ReceivePort [String]
rp) <- Process (SendPort [String], ReceivePort [String])
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
  ProcessId
s <- MxAgentId -> [String] -> [MxSink [String]] -> Process ProcessId
forall s. MxAgentId -> s -> [MxSink s] -> Process ProcessId
mxAgent (String -> MxAgentId
MxAgentId String
name) [String
"first"] [
         (String -> MxAgent [String] MxAction) -> MxSink [String]
forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink (\(String
s :: String) -> do
                   ([String] -> [String]) -> MxAgent [String] ()
forall s. (s -> s) -> MxAgent s ()
mxUpdateLocal (String
sString -> [String] -> [String]
forall a. a -> [a] -> [a]
:)
                   [String]
st <- MxAgent [String] [String]
forall s. MxAgent s s
mxGetLocal
                   case [String] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [String]
st of
                     Int
n | Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
5 -> do Process () -> MxAgent [String] ()
forall a s. Process a -> MxAgent s a
liftMX (Process () -> MxAgent [String] ())
-> Process () -> MxAgent [String] ()
forall a b. (a -> b) -> a -> b
$ SendPort [String] -> [String] -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort [String]
sp ([String] -> [String]
forall a. Ord a => [a] -> [a]
sort [String]
st)
                                      String -> MxAgent [String] MxAction
forall s. String -> MxAgent s MxAction
mxDeactivate String
"finished"
                     Int
_          -> MxAgent [String] MxAction
forall s. MxAgent s MxAction
mxReceive  -- go to the mailbox
                   )
    ]

  String -> String -> Process ()
forall a. Serializable a => String -> a -> Process ()
nsend String
name String
"second"
  String -> Process ()
forall a. Serializable a => a -> Process ()
mxNotify String
"third"
  String -> Process ()
forall a. Serializable a => a -> Process ()
mxNotify String
"fourth"
  String -> String -> Process ()
forall a. Serializable a => String -> a -> Process ()
nsend String
name String
"fifth"

  TestResult [String] -> [String] -> Process ()
forall a. TestResult a -> a -> Process ()
stash TestResult [String]
result ([String] -> Process ())
-> (Maybe [String] -> [String]) -> Maybe [String] -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [String] -> Maybe [String] -> [String]
forall a. a -> Maybe a -> a
fromMaybe [] (Maybe [String] -> Process ())
-> Process (Maybe [String]) -> Process ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> ReceivePort [String] -> Process (Maybe [String])
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
10000000 ReceivePort [String]
rp
  ProcessId -> Process ()
awaitExit ProcessId
s

testAgentMailboxHandling :: TestResult (Maybe ()) -> Process ()
testAgentMailboxHandling :: TestResult (Maybe ()) -> Process ()
testAgentMailboxHandling TestResult (Maybe ())
result = do
  (SendPort ()
sp, ReceivePort ()
rp) <- Process (SendPort (), ReceivePort ())
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
  ProcessId
agent <- MxAgentId -> () -> [MxSink ()] -> Process ProcessId
forall s. MxAgentId -> s -> [MxSink s] -> Process ProcessId
mxAgent (String -> MxAgentId
MxAgentId String
"mailbox-agent") () [
      (() -> MxAgent () MxAction) -> MxSink ()
forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink ((() -> MxAgent () MxAction) -> MxSink ())
-> (() -> MxAgent () MxAction) -> MxSink ()
forall a b. (a -> b) -> a -> b
$ \() -> (Process () -> MxAgent () ()
forall a s. Process a -> MxAgent s a
liftMX (Process () -> MxAgent () ()) -> Process () -> MxAgent () ()
forall a b. (a -> b) -> a -> b
$ SendPort () -> () -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort ()
sp ()) MxAgent () () -> MxAgent () MxAction -> MxAgent () MxAction
forall a b. MxAgent () a -> MxAgent () b -> MxAgent () b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MxAgent () MxAction
forall s. MxAgent s MxAction
mxReady
    ]

  String -> () -> Process ()
forall a. Serializable a => String -> a -> Process ()
nsend String
"mailbox-agent" ()

  TestResult (Maybe ()) -> Maybe () -> Process ()
forall a. TestResult a -> a -> Process ()
stash TestResult (Maybe ())
result (Maybe () -> Process ()) -> Process (Maybe ()) -> Process ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> ReceivePort () -> Process (Maybe ())
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
1000000 ReceivePort ()
rp
  ProcessId -> String -> Process ()
kill ProcessId
agent String
"finished"

testAgentEventHandling :: TestResult Bool -> Process ()
testAgentEventHandling :: TestResult Bool -> Process ()
testAgentEventHandling TestResult Bool
result = do
  ProcessId
us <- Process ProcessId
getSelfPid
  -- because this test is a bit racy, let's ensure it can't run indefinitely
  ProcessId
timer <- Process () -> Process ProcessId
spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ do
    Int -> Process ()
pause (Int
1000000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
60 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
5)
    -- okay we've waited 5 mins, let's kill the test off if it's stuck...
    TestResult Bool -> Bool -> Process ()
forall a. TestResult a -> a -> Process ()
stash TestResult Bool
result Bool
False
    ProcessId -> String -> Process ()
kill ProcessId
us String
"Test Timed Out"

  let initState :: [MxEvent]
initState = [] :: [MxEvent]
  (SendPort ()
rc, ReceivePort ()
rs) <- Process (SendPort (), ReceivePort ())
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
  ProcessId
agentPid <- MxAgentId -> [MxEvent] -> [MxSink [MxEvent]] -> Process ProcessId
forall s. MxAgentId -> s -> [MxSink s] -> Process ProcessId
mxAgent (String -> MxAgentId
MxAgentId String
"lifecycle-listener-agent") [MxEvent]
initState [
      ((MxEvent -> MxAgent [MxEvent] MxAction) -> MxSink [MxEvent]
forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink ((MxEvent -> MxAgent [MxEvent] MxAction) -> MxSink [MxEvent])
-> (MxEvent -> MxAgent [MxEvent] MxAction) -> MxSink [MxEvent]
forall a b. (a -> b) -> a -> b
$ \MxEvent
ev -> do
         [MxEvent]
st <- MxAgent [MxEvent] [MxEvent]
forall s. MxAgent s s
mxGetLocal
         let act :: MxAgent [MxEvent] ()
act =
               case MxEvent
ev of
                 (MxSpawned ProcessId
_)       -> [MxEvent] -> MxAgent [MxEvent] ()
forall s. s -> MxAgent s ()
mxSetLocal (MxEvent
evMxEvent -> [MxEvent] -> [MxEvent]
forall a. a -> [a] -> [a]
:[MxEvent]
st)
                 (MxProcessDied ProcessId
_ DiedReason
_) -> [MxEvent] -> MxAgent [MxEvent] ()
forall s. s -> MxAgent s ()
mxSetLocal (MxEvent
evMxEvent -> [MxEvent] -> [MxEvent]
forall a. a -> [a] -> [a]
:[MxEvent]
st)
                 MxEvent
_                   -> () -> MxAgent [MxEvent] ()
forall a. a -> MxAgent [MxEvent] a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
         MxAgent [MxEvent] ()
act MxAgent [MxEvent] ()
-> MxAgent [MxEvent] () -> MxAgent [MxEvent] ()
forall a b.
MxAgent [MxEvent] a -> MxAgent [MxEvent] b -> MxAgent [MxEvent] b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Process () -> MxAgent [MxEvent] ()
forall a s. Process a -> MxAgent s a
liftMX (Process () -> MxAgent [MxEvent] ())
-> Process () -> MxAgent [MxEvent] ()
forall a b. (a -> b) -> a -> b
$ SendPort () -> () -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort ()
rc ()) MxAgent [MxEvent] ()
-> MxAgent [MxEvent] MxAction -> MxAgent [MxEvent] MxAction
forall a b.
MxAgent [MxEvent] a -> MxAgent [MxEvent] b -> MxAgent [MxEvent] b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MxAgent [MxEvent] MxAction
forall s. MxAgent s MxAction
mxReady),
      (((MxEvent, SendPort Bool) -> MxAgent [MxEvent] MxAction)
-> MxSink [MxEvent]
forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink (((MxEvent, SendPort Bool) -> MxAgent [MxEvent] MxAction)
 -> MxSink [MxEvent])
-> ((MxEvent, SendPort Bool) -> MxAgent [MxEvent] MxAction)
-> MxSink [MxEvent]
forall a b. (a -> b) -> a -> b
$ \(MxEvent
ev, SendPort Bool
sp :: SendPort Bool) -> do
          [MxEvent]
st <- MxAgent [MxEvent] [MxEvent]
forall s. MxAgent s s
mxGetLocal
          let found :: Bool
found =
                case MxEvent
ev of
                  MxSpawned ProcessId
p ->
                    Maybe MxEvent -> Bool
forall a. Maybe a -> Bool
isJust (Maybe MxEvent -> Bool) -> Maybe MxEvent -> Bool
forall a b. (a -> b) -> a -> b
$ (MxEvent -> Bool) -> [MxEvent] -> Maybe MxEvent
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (\MxEvent
ev' ->
                                    case MxEvent
ev' of
                                      (MxSpawned ProcessId
p') -> ProcessId
p' ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
p
                                      MxEvent
_              -> Bool
False) [MxEvent]
st
                  MxProcessDied ProcessId
p DiedReason
r ->
                    Maybe MxEvent -> Bool
forall a. Maybe a -> Bool
isJust (Maybe MxEvent -> Bool) -> Maybe MxEvent -> Bool
forall a b. (a -> b) -> a -> b
$ (MxEvent -> Bool) -> [MxEvent] -> Maybe MxEvent
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (\MxEvent
ev' ->
                                    case MxEvent
ev' of
                                      (MxProcessDied ProcessId
p' DiedReason
r') -> ProcessId
p' ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
p Bool -> Bool -> Bool
&& DiedReason
r DiedReason -> DiedReason -> Bool
forall a. Eq a => a -> a -> Bool
== DiedReason
r'
                                      MxEvent
_                     -> Bool
False) [MxEvent]
st
                  MxEvent
_ -> Bool
False
          Process () -> MxAgent [MxEvent] ()
forall a s. Process a -> MxAgent s a
liftMX (Process () -> MxAgent [MxEvent] ())
-> Process () -> MxAgent [MxEvent] ()
forall a b. (a -> b) -> a -> b
$ SendPort Bool -> Bool -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort Bool
sp Bool
found
          MxAgent [MxEvent] MxAction
forall s. MxAgent s MxAction
mxReady)
    ]

  -- TODO: yes, this is racy, but we're at the mercy of the scheduler here...
  Int -> ReceivePort () -> Process ()
forall {a}.
(Binary a, Typeable a) =>
Int -> ReceivePort a -> Process ()
faff Int
2000000 ReceivePort ()
rs

  (SendPort ()
sp, ReceivePort ()
rp) <- Process (SendPort (), ReceivePort ())
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan :: Process (SendPort (), ReceivePort ())

  ProcessId
pid <- Process () -> Process ProcessId
spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ Process ()
forall a. Serializable a => Process a
expect Process () -> (() -> Process ()) -> Process ()
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SendPort () -> () -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort ()
sp

  -- By waiting for a monitor notification, we have a
  -- higher probably that the agent has seen the spawn and died events
  ProcessId -> Process MonitorRef
monitor ProcessId
pid

  ProcessId -> () -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
pid ()
  Maybe ()
rct <- Int -> ReceivePort () -> Process (Maybe ())
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
10000000 ReceivePort ()
rp
  Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Maybe () -> Bool
forall a. Maybe a -> Bool
isJust Maybe ()
rct) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ String -> Process ()
forall a b. Serializable a => a -> Process b
die String
"No response on channel"

  Maybe ()
pmn <- Int -> [Match ()] -> Process (Maybe ())
forall b. Int -> [Match b] -> Process (Maybe b)
receiveTimeout Int
2000000 [ (ProcessMonitorNotification -> Process ()) -> Match ()
forall a b. Serializable a => (a -> Process b) -> Match b
match (\ProcessMonitorNotification{} -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ]
  Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Maybe () -> Bool
forall a. Maybe a -> Bool
isJust Maybe ()
pmn) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ String -> Process ()
forall a b. Serializable a => a -> Process b
die String
"No monitor notification arrived"

  -- TODO: yes, this is racy, but we're at the mercy of the scheduler here...
  Int -> ReceivePort () -> Process ()
forall {a}.
(Binary a, Typeable a) =>
Int -> ReceivePort a -> Process ()
faff Int
2000000 ReceivePort ()
rs

  (SendPort Bool
replyTo, ReceivePort Bool
reply) <- Process (SendPort Bool, ReceivePort Bool)
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan :: Process (SendPort Bool, ReceivePort Bool)
  (MxEvent, SendPort Bool) -> Process ()
forall a. Serializable a => a -> Process ()
mxNotify (ProcessId -> MxEvent
MxSpawned ProcessId
pid, SendPort Bool
replyTo)
  (MxEvent, SendPort Bool) -> Process ()
forall a. Serializable a => a -> Process ()
mxNotify (ProcessId -> DiedReason -> MxEvent
MxProcessDied ProcessId
pid DiedReason
DiedNormal, SendPort Bool
replyTo)

  Maybe Bool
seenAlive <- Int -> ReceivePort Bool -> Process (Maybe Bool)
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
10000000 ReceivePort Bool
reply
  Maybe Bool
seenDead  <- Int -> ReceivePort Bool -> Process (Maybe Bool)
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
10000000 ReceivePort Bool
reply

  TestResult Bool -> Bool -> Process ()
forall a. TestResult a -> a -> Process ()
stash TestResult Bool
result (Bool -> Process ()) -> Bool -> Process ()
forall a b. (a -> b) -> a -> b
$ (Bool -> Bool -> Bool) -> Bool -> [Bool] -> Bool
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr Bool -> Bool -> Bool
(&&) Bool
True ([Bool] -> Bool) -> [Bool] -> Bool
forall a b. (a -> b) -> a -> b
$ [Maybe Bool] -> [Bool]
forall a. [Maybe a] -> [a]
catMaybes [Maybe Bool
seenAlive, Maybe Bool
seenDead]
  ProcessId -> String -> Process ()
kill ProcessId
timer String
"test-complete"
  ProcessId -> String -> Process ()
kill ProcessId
agentPid String
"test-complete"
  where
    faff :: Int -> ReceivePort a -> Process ()
faff Int
delay ReceivePort a
port = do
      Maybe a
res <- Int -> ReceivePort a -> Process (Maybe a)
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
delay ReceivePort a
port
      Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Maybe a -> Bool
forall a. Maybe a -> Bool
isNothing Maybe a
res) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ Int -> Process ()
pause Int
delay

testMxRegEvents :: TestResult () -> Process ()
testMxRegEvents :: TestResult () -> Process ()
testMxRegEvents TestResult ()
result = do
  {- This test only deals with the local case, to ensure that we are being
     notified in the expected order - the remote cases related to the
     behaviour of the node controller are contained in the CH test suite. -}
  Process () -> Process () -> Process ()
ensure (TestResult () -> () -> Process ()
forall a. TestResult a -> a -> Process ()
stash TestResult ()
result ()) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ do
    let label :: String
label = String
"testMxRegEvents"
    let agentLabel :: String
agentLabel = String
"mxRegEvents-agent"
    let delay :: Int
delay = Int
1000000
    (SendPort (String, ProcessId)
regChan, ReceivePort (String, ProcessId)
regSink) <- Process
  (SendPort (String, ProcessId), ReceivePort (String, ProcessId))
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
    (SendPort (String, ProcessId)
unRegChan, ReceivePort (String, ProcessId)
unRegSink) <- Process
  (SendPort (String, ProcessId), ReceivePort (String, ProcessId))
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
    ProcessId
agent <- MxAgentId -> () -> [MxSink ()] -> Process ProcessId
forall s. MxAgentId -> s -> [MxSink s] -> Process ProcessId
mxAgent (String -> MxAgentId
MxAgentId String
agentLabel) () [
        (MxEvent -> MxAgent () MxAction) -> MxSink ()
forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink ((MxEvent -> MxAgent () MxAction) -> MxSink ())
-> (MxEvent -> MxAgent () MxAction) -> MxSink ()
forall a b. (a -> b) -> a -> b
$ \MxEvent
ev -> do
          case MxEvent
ev of
            MxRegistered ProcessId
pid String
label'
              | String
label' String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label -> Process () -> MxAgent () ()
forall a s. Process a -> MxAgent s a
liftMX (Process () -> MxAgent () ()) -> Process () -> MxAgent () ()
forall a b. (a -> b) -> a -> b
$ SendPort (String, ProcessId) -> (String, ProcessId) -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort (String, ProcessId)
regChan (String
label', ProcessId
pid)
            MxUnRegistered ProcessId
pid String
label'
              | String
label' String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label -> Process () -> MxAgent () ()
forall a s. Process a -> MxAgent s a
liftMX (Process () -> MxAgent () ()) -> Process () -> MxAgent () ()
forall a b. (a -> b) -> a -> b
$ SendPort (String, ProcessId) -> (String, ProcessId) -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort (String, ProcessId)
unRegChan (String
label', ProcessId
pid)
            MxEvent
_                   -> () -> MxAgent () ()
forall a. a -> MxAgent () a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          MxAgent () MxAction
forall s. MxAgent s MxAction
mxReady
      ]

    ProcessId
p1 <- Process () -> Process ProcessId
spawnLocal Process ()
forall a. Serializable a => Process a
expect
    ProcessId
p2 <- Process () -> Process ProcessId
spawnLocal Process ()
forall a. Serializable a => Process a
expect

    String -> ProcessId -> Process ()
register String
label ProcessId
p1
    Maybe (String, ProcessId)
reg1 <- Int
-> ReceivePort (String, ProcessId)
-> Process (Maybe (String, ProcessId))
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
delay ReceivePort (String, ProcessId)
regSink
    IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ String
-> Maybe (String, ProcessId) -> Maybe (String, ProcessId) -> IO ()
forall a. (HasCallStack, Eq a, Show a) => String -> a -> a -> IO ()
assertEqual String
forall a. Monoid a => a
mempty ((String, ProcessId) -> Maybe (String, ProcessId)
forall a. a -> Maybe a
Just (String
label, ProcessId
p1)) Maybe (String, ProcessId)
reg1

    String -> Process ()
unregister String
label
    Maybe (String, ProcessId)
unreg1 <- Int
-> ReceivePort (String, ProcessId)
-> Process (Maybe (String, ProcessId))
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
delay ReceivePort (String, ProcessId)
unRegSink
    IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ String
-> Maybe (String, ProcessId) -> Maybe (String, ProcessId) -> IO ()
forall a. (HasCallStack, Eq a, Show a) => String -> a -> a -> IO ()
assertEqual String
forall a. Monoid a => a
mempty ((String, ProcessId) -> Maybe (String, ProcessId)
forall a. a -> Maybe a
Just (String
label, ProcessId
p1)) Maybe (String, ProcessId)
unreg1

    String -> ProcessId -> Process ()
register String
label ProcessId
p2
    Maybe (String, ProcessId)
reg2 <- Int
-> ReceivePort (String, ProcessId)
-> Process (Maybe (String, ProcessId))
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
delay ReceivePort (String, ProcessId)
regSink
    IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ String
-> Maybe (String, ProcessId) -> Maybe (String, ProcessId) -> IO ()
forall a. (HasCallStack, Eq a, Show a) => String -> a -> a -> IO ()
assertEqual String
forall a. Monoid a => a
mempty ((String, ProcessId) -> Maybe (String, ProcessId)
forall a. a -> Maybe a
Just (String
label, ProcessId
p2)) Maybe (String, ProcessId)
reg2

    String -> ProcessId -> Process ()
reregister String
label ProcessId
p1
    Maybe (String, ProcessId)
unreg2 <- Int
-> ReceivePort (String, ProcessId)
-> Process (Maybe (String, ProcessId))
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
delay ReceivePort (String, ProcessId)
unRegSink
    IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ String
-> Maybe (String, ProcessId) -> Maybe (String, ProcessId) -> IO ()
forall a. (HasCallStack, Eq a, Show a) => String -> a -> a -> IO ()
assertEqual String
forall a. Monoid a => a
mempty ((String, ProcessId) -> Maybe (String, ProcessId)
forall a. a -> Maybe a
Just (String
label, ProcessId
p2)) Maybe (String, ProcessId)
unreg2 

    Maybe (String, ProcessId)
reg3 <- Int
-> ReceivePort (String, ProcessId)
-> Process (Maybe (String, ProcessId))
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
delay ReceivePort (String, ProcessId)
regSink
    IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ String
-> Maybe (String, ProcessId) -> Maybe (String, ProcessId) -> IO ()
forall a. (HasCallStack, Eq a, Show a) => String -> a -> a -> IO ()
assertEqual String
forall a. Monoid a => a
mempty ((String, ProcessId) -> Maybe (String, ProcessId)
forall a. a -> Maybe a
Just (String
label, ProcessId
p1)) Maybe (String, ProcessId)
reg3

    (ProcessId -> Process ()) -> [ProcessId] -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((ProcessId -> String -> Process ())
-> String -> ProcessId -> Process ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ProcessId -> String -> Process ()
kill (String -> ProcessId -> Process ())
-> String -> ProcessId -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"test-complete") [ProcessId
agent, ProcessId
p1, ProcessId
p2]

testMxRegMon :: LocalNode -> TestResult () -> Process ()
testMxRegMon :: LocalNode -> TestResult () -> Process ()
testMxRegMon LocalNode
remoteNode TestResult ()
result = do
  Process () -> Process () -> Process ()
ensure (TestResult () -> () -> Process ()
forall a. TestResult a -> a -> Process ()
stash TestResult ()
result ()) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ do
    -- ensure that when a registered process dies, we get a notification that
    -- it has been unregistered as well as seeing the name get removed
    let label1 :: String
label1 = String
"aaaaa"
    let label2 :: String
label2 = String
"bbbbb"
    let isValid :: String -> Bool
isValid String
l = String
l String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label1 Bool -> Bool -> Bool
|| String
l String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label2
    let agentLabel :: String
agentLabel = String
"mxRegMon-agent"
    let delay :: Int
delay = Int
1000000
    (SendPort (String, ProcessId)
regChan, ReceivePort (String, ProcessId)
regSink) <- Process
  (SendPort (String, ProcessId), ReceivePort (String, ProcessId))
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
    (SendPort (String, ProcessId)
unRegChan, ReceivePort (String, ProcessId)
unRegSink) <- Process
  (SendPort (String, ProcessId), ReceivePort (String, ProcessId))
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
    ProcessId
agent <- MxAgentId -> () -> [MxSink ()] -> Process ProcessId
forall s. MxAgentId -> s -> [MxSink s] -> Process ProcessId
mxAgent (String -> MxAgentId
MxAgentId String
agentLabel) () [
        (MxEvent -> MxAgent () MxAction) -> MxSink ()
forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink ((MxEvent -> MxAgent () MxAction) -> MxSink ())
-> (MxEvent -> MxAgent () MxAction) -> MxSink ()
forall a b. (a -> b) -> a -> b
$ \MxEvent
ev -> do
          case MxEvent
ev of
            MxRegistered ProcessId
pid String
label
              | String -> Bool
isValid String
label -> Process () -> MxAgent () ()
forall a s. Process a -> MxAgent s a
liftMX (Process () -> MxAgent () ()) -> Process () -> MxAgent () ()
forall a b. (a -> b) -> a -> b
$ SendPort (String, ProcessId) -> (String, ProcessId) -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort (String, ProcessId)
regChan (String
label, ProcessId
pid)
            MxUnRegistered ProcessId
pid String
label
              | String -> Bool
isValid String
label -> Process () -> MxAgent () ()
forall a s. Process a -> MxAgent s a
liftMX (Process () -> MxAgent () ()) -> Process () -> MxAgent () ()
forall a b. (a -> b) -> a -> b
$ SendPort (String, ProcessId) -> (String, ProcessId) -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort (String, ProcessId)
unRegChan (String
label, ProcessId
pid)
            MxEvent
_                 -> () -> MxAgent () ()
forall a. a -> MxAgent () a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          MxAgent () MxAction
forall s. MxAgent s MxAction
mxReady
      ]

    (SendPort ProcessId
sp, ReceivePort ProcessId
rp) <- Process (SendPort ProcessId, ReceivePort ProcessId)
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
    IO ProcessId -> Process ProcessId
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ProcessId -> Process ProcessId)
-> IO ProcessId -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
remoteNode (Process () -> IO ProcessId) -> Process () -> IO ProcessId
forall a b. (a -> b) -> a -> b
$ do
      Process ProcessId
getSelfPid Process ProcessId -> (ProcessId -> Process ()) -> Process ()
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SendPort ProcessId -> ProcessId -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort ProcessId
sp
      Process ()
forall a. Serializable a => Process a
expect :: Process ()

    ProcessId
p1 <- ReceivePort ProcessId -> Process ProcessId
forall a. Serializable a => ReceivePort a -> Process a
receiveChan ReceivePort ProcessId
rp

    String -> ProcessId -> Process ()
register String
label1 ProcessId
p1
    Maybe (String, ProcessId)
reg1 <- Int
-> ReceivePort (String, ProcessId)
-> Process (Maybe (String, ProcessId))
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
delay ReceivePort (String, ProcessId)
regSink
    IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ String
-> Maybe (String, ProcessId) -> Maybe (String, ProcessId) -> IO ()
forall a. (HasCallStack, Eq a, Show a) => String -> a -> a -> IO ()
assertEqual String
forall a. Monoid a => a
mempty ((String, ProcessId) -> Maybe (String, ProcessId)
forall a. a -> Maybe a
Just (String
label1, ProcessId
p1)) Maybe (String, ProcessId)
reg1 

    String -> ProcessId -> Process ()
register String
label2 ProcessId
p1
    Maybe (String, ProcessId)
reg2 <- Int
-> ReceivePort (String, ProcessId)
-> Process (Maybe (String, ProcessId))
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
delay ReceivePort (String, ProcessId)
regSink
    IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ String
-> Maybe (String, ProcessId) -> Maybe (String, ProcessId) -> IO ()
forall a. (HasCallStack, Eq a, Show a) => String -> a -> a -> IO ()
assertEqual String
forall a. Monoid a => a
mempty ((String, ProcessId) -> Maybe (String, ProcessId)
forall a. a -> Maybe a
Just (String
label2, ProcessId
p1)) Maybe (String, ProcessId)
reg2

    Maybe ProcessId
n1 <- String -> Process (Maybe ProcessId)
whereis String
label1
    IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ String -> Maybe ProcessId -> Maybe ProcessId -> IO ()
forall a. (HasCallStack, Eq a, Show a) => String -> a -> a -> IO ()
assertEqual String
forall a. Monoid a => a
mempty (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
p1) Maybe ProcessId
n1

    Maybe ProcessId
n2 <- String -> Process (Maybe ProcessId)
whereis String
label2
    IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ String -> Maybe ProcessId -> Maybe ProcessId -> IO ()
forall a. (HasCallStack, Eq a, Show a) => String -> a -> a -> IO ()
assertEqual String
forall a. Monoid a => a
mempty (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
p1) Maybe ProcessId
n2

    ProcessId -> String -> Process ()
kill ProcessId
p1 String
"goodbye"

    Maybe (String, ProcessId)
unreg1 <- Int
-> ReceivePort (String, ProcessId)
-> Process (Maybe (String, ProcessId))
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
delay ReceivePort (String, ProcessId)
unRegSink
    Maybe (String, ProcessId)
unreg2 <- Int
-> ReceivePort (String, ProcessId)
-> Process (Maybe (String, ProcessId))
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
delay ReceivePort (String, ProcessId)
unRegSink

    let evts :: [Maybe (String, ProcessId)]
evts = [Maybe (String, ProcessId)
unreg1, Maybe (String, ProcessId)
unreg2]
    -- we can't rely on the order of the values in the node controller's
    -- map (it's either racy to do so, or no such guarantee exists for Data.Map),
    -- so we simply verify that we received the un-registration events we expect
    [Maybe (String, ProcessId)]
evts [Maybe (String, ProcessId)]
-> Maybe (String, ProcessId) -> Process ()
forall a. (Show a, Eq a) => [a] -> a -> Process ()
`shouldContain` ((String, ProcessId) -> Maybe (String, ProcessId)
forall a. a -> Maybe a
Just (String
label1, ProcessId
p1))
    [Maybe (String, ProcessId)]
evts [Maybe (String, ProcessId)]
-> Maybe (String, ProcessId) -> Process ()
forall a. (Show a, Eq a) => [a] -> a -> Process ()
`shouldContain` ((String, ProcessId) -> Maybe (String, ProcessId)
forall a. a -> Maybe a
Just (String
label2, ProcessId
p1))

    ProcessId -> String -> Process ()
kill ProcessId
agent String
"test-complete"

ensure :: Process () -> Process () -> Process ()
ensure :: Process () -> Process () -> Process ()
ensure = (Process () -> Process () -> Process ())
-> Process () -> Process () -> Process ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Process () -> Process () -> Process ()
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
finally

testNSend :: (String -> () -> Process ())
          -> Maybe LocalNode
          -> Process ()
testNSend :: (String -> () -> Process ()) -> Maybe LocalNode -> Process ()
testNSend String -> () -> Process ()
op Maybe LocalNode
n = do
  ProcessId
us <- Process ProcessId
getSelfPid
  let delay :: Int
delay = Int
5000000
  let label :: String
label = String
"testMxSend" String -> String -> String
forall a. [a] -> [a] -> [a]
++ (ProcessId -> String
forall a. Show a => a -> String
show ProcessId
us)
  let isValid :: String -> Bool
isValid = Maybe LocalNode -> String -> String -> Bool
isValidLabel Maybe LocalNode
n String
label

  Maybe LocalNode -> String -> SendTest -> Process ()
testMxSend Maybe LocalNode
n String
label (SendTest -> Process ()) -> SendTest -> Process ()
forall a b. (a -> b) -> a -> b
$ \ProcessId
p1 ReceivePort MxEvent
sink -> do
    String -> ProcessId -> Process ()
register String
label ProcessId
p1
    Maybe MxEvent
reg1 <- Int -> ReceivePort MxEvent -> Process (Maybe MxEvent)
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
delay ReceivePort MxEvent
sink
    case Maybe MxEvent
reg1 of
      Just (MxRegistered ProcessId
pd String
lb)
        | ProcessId
pd ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
p1 Bool -> Bool -> Bool
&& String -> Bool
isValid String
lb -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      Maybe MxEvent
_                          -> String -> Process ()
forall a b. Serializable a => a -> Process b
die (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Reg-Failed: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Maybe MxEvent -> String
forall a. Show a => a -> String
show Maybe MxEvent
reg1

    String -> () -> Process ()
op String
label ()

    Maybe MxEvent
sent <- Int -> ReceivePort MxEvent -> Process (Maybe MxEvent)
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
delay ReceivePort MxEvent
sink
    case Maybe MxEvent
sent of
      Just (MxSentToName String
lb ProcessId
by Message
_)
        | ProcessId
by ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
us Bool -> Bool -> Bool
&& String -> Bool
isValid String
lb -> Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
      Maybe MxEvent
_                          -> String -> Process Bool
forall a b. Serializable a => a -> Process b
die (String -> Process Bool) -> String -> Process Bool
forall a b. (a -> b) -> a -> b
$ String
"Send-Failed: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Maybe MxEvent -> String
forall a. Show a => a -> String
show Maybe MxEvent
sent

  where
    isValidLabel :: Maybe LocalNode -> String -> String -> Bool
isValidLabel Maybe LocalNode
nd String
l1 String
l2
      | String
l2 String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
l2 = Bool
True
      | Maybe LocalNode -> Bool
forall a. Maybe a -> Bool
isJust Maybe LocalNode
nd     = String
l2 String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
l1 String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"@" String -> String -> String
forall a. [a] -> [a] -> [a]
++ NodeId -> String
forall a. Show a => a -> String
show (LocalNode -> NodeId
localNodeId (LocalNode -> NodeId) -> LocalNode -> NodeId
forall a b. (a -> b) -> a -> b
$ Maybe LocalNode -> LocalNode
forall a. HasCallStack => Maybe a -> a
fromJust Maybe LocalNode
nd)
      | Bool
otherwise    = Bool
False

testSend :: (ProcessId -> () -> Process ())
         -> Maybe LocalNode
         -> Process ()
testSend :: (ProcessId -> () -> Process ()) -> Maybe LocalNode -> Process ()
testSend ProcessId -> () -> Process ()
op Maybe LocalNode
n = do
  ProcessId
us <- Process ProcessId
getSelfPid
  let label :: String
label = String
"testMxSend" String -> String -> String
forall a. [a] -> [a] -> [a]
++ (ProcessId -> String
forall a. Show a => a -> String
show ProcessId
us)
  Maybe LocalNode -> String -> SendTest -> Process ()
testMxSend Maybe LocalNode
n String
label (SendTest -> Process ()) -> SendTest -> Process ()
forall a b. (a -> b) -> a -> b
$ \ProcessId
p1 ReceivePort MxEvent
sink -> do
    -- initiate a send
    ProcessId -> () -> Process ()
op ProcessId
p1 ()

    -- verify the management event
    Maybe MxEvent
sent <- Int -> ReceivePort MxEvent -> Process (Maybe MxEvent)
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
5000000 ReceivePort MxEvent
sink
    case Maybe MxEvent
sent of
      Just (MxSent ProcessId
pidTo ProcessId
pidFrom Message
_)
        | ProcessId
pidTo ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
p1 Bool -> Bool -> Bool
&& ProcessId
pidFrom ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
us -> Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
      Maybe MxEvent
_                                -> Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

testChan :: (SendPort () -> () -> Process ())
         -> Maybe LocalNode
         -> Process ()
testChan :: (SendPort () -> () -> Process ()) -> Maybe LocalNode -> Process ()
testChan SendPort () -> () -> Process ()
op Maybe LocalNode
n = Maybe LocalNode -> String -> SendTest -> Process ()
testMxSend Maybe LocalNode
n String
"" (SendTest -> Process ()) -> SendTest -> Process ()
forall a b. (a -> b) -> a -> b
$ \ProcessId
p1 ReceivePort MxEvent
sink -> do

  ProcessId
us <- Process ProcessId
getSelfPid
  ProcessId -> ProcessId -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
p1 ProcessId
us

  Maybe MxEvent
cleared <- Int -> ReceivePort MxEvent -> Process (Maybe MxEvent)
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
2000000 ReceivePort MxEvent
sink
  case Maybe MxEvent
cleared of
    Just (MxSent ProcessId
pidTo ProcessId
pidFrom Message
_)
      | ProcessId
pidTo ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
p1 Bool -> Bool -> Bool
&& ProcessId
pidFrom ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
us -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    Maybe MxEvent
_                                -> String -> Process ()
forall a b. Serializable a => a -> Process b
die String
"Received uncleared Mx Event"

  Maybe (SendPort ())
chan <- Int -> [Match (SendPort ())] -> Process (Maybe (SendPort ()))
forall b. Int -> [Match b] -> Process (Maybe b)
receiveTimeout Int
5000000 [ (SendPort () -> Process (SendPort ())) -> Match (SendPort ())
forall a b. Serializable a => (a -> Process b) -> Match b
match SendPort () -> Process (SendPort ())
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ]
  let ch' :: SendPort ()
ch' = SendPort () -> Maybe (SendPort ()) -> SendPort ()
forall a. a -> Maybe a -> a
fromMaybe (String -> SendPort ()
forall a. HasCallStack => String -> a
error String
"No reply chan received") Maybe (SendPort ())
chan

  -- initiate a send
  SendPort () -> () -> Process ()
op SendPort ()
ch' ()

  -- verify the management event
  Maybe MxEvent
sent <- Int -> ReceivePort MxEvent -> Process (Maybe MxEvent)
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
5000000 ReceivePort MxEvent
sink
  case Maybe MxEvent
sent of
    Just (MxSentToPort ProcessId
sId SendPortId
spId Message
_)
      | ProcessId
sId ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
us Bool -> Bool -> Bool
&& SendPortId
spId SendPortId -> SendPortId -> Bool
forall a. Eq a => a -> a -> Bool
== SendPort () -> SendPortId
forall a. SendPort a -> SendPortId
sendPortId SendPort ()
ch' -> Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
    Maybe MxEvent
_                                       -> Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

type SendTest = ProcessId -> ReceivePort MxEvent -> Process Bool

testMxSend :: Maybe LocalNode -> String -> SendTest -> Process ()
testMxSend :: Maybe LocalNode -> String -> SendTest -> Process ()
testMxSend Maybe LocalNode
mNode String
label SendTest
test = do
  ProcessId
us <- Process ProcessId
getSelfPid
  (SendPort ProcessId
sp, ReceivePort ProcessId
rp) <- Process (SendPort ProcessId, ReceivePort ProcessId)
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
  (SendPort MxEvent
chan, ReceivePort MxEvent
sink) <- Process (SendPort MxEvent, ReceivePort MxEvent)
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
  ProcessId
agent <- MxAgentId -> () -> [MxSink ()] -> Process ProcessId
forall s. MxAgentId -> s -> [MxSink s] -> Process ProcessId
mxAgent (String -> MxAgentId
MxAgentId (String -> MxAgentId) -> String -> MxAgentId
forall a b. (a -> b) -> a -> b
$ ProcessId -> String
forall a. Show a => a -> String
agentLabel ProcessId
us) () [
      (MxEvent -> MxAgent () MxAction) -> MxSink ()
forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink ((MxEvent -> MxAgent () MxAction) -> MxSink ())
-> (MxEvent -> MxAgent () MxAction) -> MxSink ()
forall a b. (a -> b) -> a -> b
$ \MxEvent
ev -> do
        case MxEvent
ev of
          m :: MxEvent
m@(MxSentToPort ProcessId
_ SendPortId
cid Message
_)
            | SendPortId
cid SendPortId -> SendPortId -> Bool
forall a. Eq a => a -> a -> Bool
/= SendPort MxEvent -> SendPortId
forall a. SendPort a -> SendPortId
sendPortId SendPort MxEvent
chan
              Bool -> Bool -> Bool
&& SendPortId
cid SendPortId -> SendPortId -> Bool
forall a. Eq a => a -> a -> Bool
/= SendPort ProcessId -> SendPortId
forall a. SendPort a -> SendPortId
sendPortId SendPort ProcessId
sp -> Process () -> MxAgent () ()
forall a s. Process a -> MxAgent s a
liftMX (Process () -> MxAgent () ()) -> Process () -> MxAgent () ()
forall a b. (a -> b) -> a -> b
$ SendPort MxEvent -> MxEvent -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort MxEvent
chan MxEvent
m
          m :: MxEvent
m@(MxSent ProcessId
_ ProcessId
fromPid Message
_)
            | ProcessId
fromPid ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
us -> Process () -> MxAgent () ()
forall a s. Process a -> MxAgent s a
liftMX (Process () -> MxAgent () ()) -> Process () -> MxAgent () ()
forall a b. (a -> b) -> a -> b
$ SendPort MxEvent -> MxEvent -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort MxEvent
chan MxEvent
m
          m :: MxEvent
m@(MxSentToName String
_ ProcessId
fromPid Message
_)
            | ProcessId
fromPid ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
us -> Process () -> MxAgent () ()
forall a s. Process a -> MxAgent s a
liftMX (Process () -> MxAgent () ()) -> Process () -> MxAgent () ()
forall a b. (a -> b) -> a -> b
$ SendPort MxEvent -> MxEvent -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort MxEvent
chan MxEvent
m
          m :: MxEvent
m@(MxRegistered ProcessId
_ String
name)
            | String
name String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label -> Process () -> MxAgent () ()
forall a s. Process a -> MxAgent s a
liftMX (Process () -> MxAgent () ()) -> Process () -> MxAgent () ()
forall a b. (a -> b) -> a -> b
$ SendPort MxEvent -> MxEvent -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort MxEvent
chan MxEvent
m
          MxEvent
_                 -> () -> MxAgent () ()
forall a. a -> MxAgent () a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        MxAgent () MxAction
forall s. MxAgent s MxAction
mxReady
    ]

  case Maybe LocalNode
mNode of
    Maybe LocalNode
Nothing         -> Process ProcessId -> Process ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Process ProcessId -> Process ())
-> Process ProcessId -> Process ()
forall a b. (a -> b) -> a -> b
$ Process () -> Process ProcessId
spawnLocal (SendPort ProcessId -> Process ()
proc SendPort ProcessId
sp)
    Just LocalNode
remoteNode -> Process ProcessId -> Process ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Process ProcessId -> Process ())
-> Process ProcessId -> Process ()
forall a b. (a -> b) -> a -> b
$ IO ProcessId -> Process ProcessId
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ProcessId -> Process ProcessId)
-> IO ProcessId -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
remoteNode (Process () -> IO ProcessId) -> Process () -> IO ProcessId
forall a b. (a -> b) -> a -> b
$ SendPort ProcessId -> Process ()
proc SendPort ProcessId
sp

  Maybe ProcessId
p1 <- Int -> ReceivePort ProcessId -> Process (Maybe ProcessId)
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
2000000 ReceivePort ProcessId
rp
  Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Maybe ProcessId -> Bool
forall a. Maybe a -> Bool
isJust Maybe ProcessId
p1) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ String -> Process ()
forall a b. Serializable a => a -> Process b
die String
"Timed out waiting for ProcessId"
  Either ProcessExitException Bool
res <- Process Bool -> Process (Either ProcessExitException Bool)
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (Process Bool -> Process (Either ProcessExitException Bool))
-> Process Bool -> Process (Either ProcessExitException Bool)
forall a b. (a -> b) -> a -> b
$ SendTest
test (Maybe ProcessId -> ProcessId
forall a. HasCallStack => Maybe a -> a
fromJust Maybe ProcessId
p1) ReceivePort MxEvent
sink

  ProcessId -> String -> Process ()
kill ProcessId
agent String
"bye"
  ProcessId -> String -> Process ()
kill (Maybe ProcessId -> ProcessId
forall a. HasCallStack => Maybe a -> a
fromJust Maybe ProcessId
p1) String
"bye"

  case Either ProcessExitException Bool
res of
    Left  (ProcessExitException ProcessId
_ Message
m) -> (IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"SomeException-" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Message -> String
forall a. Show a => a -> String
show Message
m) Process () -> Process () -> Process ()
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Message -> Process ()
forall a b. Serializable a => a -> Process b
die Message
m
    Right Bool
tr                         -> IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ HasCallStack => String -> Bool -> IO ()
String -> Bool -> IO ()
assertBool String
forall a. Monoid a => a
mempty Bool
tr


  where
    agentLabel :: a -> String
agentLabel a
s = String
"mx-unsafe-check-agent-" String -> String -> String
forall a. [a] -> [a] -> [a]
++ a -> String
forall a. Show a => a -> String
show a
s
    proc :: SendPort ProcessId -> Process ()
proc SendPort ProcessId
sp' = Process ProcessId
getSelfPid Process ProcessId -> (ProcessId -> Process ()) -> Process ()
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SendPort ProcessId -> ProcessId -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort ProcessId
sp' Process () -> Process () -> Process ()
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe (ReceivePort ()) -> Process ()
go Maybe (ReceivePort ())
forall a. Maybe a
Nothing

    go :: Maybe (ReceivePort ()) -> Process ()
    go :: Maybe (ReceivePort ()) -> Process ()
go Maybe (ReceivePort ())
Nothing = Int -> [Match (ReceivePort ())] -> Process (Maybe (ReceivePort ()))
forall b. Int -> [Match b] -> Process (Maybe b)
receiveTimeout Int
5000000 [ (ProcessId -> Process (ReceivePort ())) -> Match (ReceivePort ())
forall a b. Serializable a => (a -> Process b) -> Match b
match ProcessId -> Process (ReceivePort ())
forall {a}.
(Binary a, Typeable a) =>
ProcessId -> Process (ReceivePort a)
replyChannel ] Process (Maybe (ReceivePort ()))
-> (Maybe (ReceivePort ()) -> Process ()) -> Process ()
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe (ReceivePort ()) -> Process ()
go
    go c :: Maybe (ReceivePort ())
c@(Just ReceivePort ()
c') = [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [ ReceivePort () -> (() -> Process ()) -> Match ()
forall a b. ReceivePort a -> (a -> Process b) -> Match b
matchChan ReceivePort ()
c' () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ] Process () -> Process () -> Process ()
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe (ReceivePort ()) -> Process ()
go Maybe (ReceivePort ())
c

    replyChannel :: ProcessId -> Process (ReceivePort a)
replyChannel ProcessId
p' = do
      (SendPort a
s, ReceivePort a
r) <- Process (SendPort a, ReceivePort a)
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
      ProcessId -> SendPort a -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
p' SendPort a
s
      ReceivePort a -> Process (ReceivePort a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ReceivePort a
r

tests :: TestTransport -> IO [Test]
tests :: TestTransport -> IO [Test]
tests TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
..} = do
  LocalNode
node1 <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  LocalNode
node2 <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  [Test] -> IO [Test]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return [
      String -> [Test] -> Test
testGroup String
"MxAgents" [
          String -> IO () -> Test
testCase String
"EventHandling"
              (String
-> LocalNode -> Bool -> (TestResult Bool -> Process ()) -> IO ()
forall a.
Eq a =>
String -> LocalNode -> a -> (TestResult a -> Process ()) -> IO ()
delayedAssertion
               String
"expected True, but events where not as expected"
               LocalNode
node1 Bool
True TestResult Bool -> Process ()
testAgentEventHandling)
        , String -> IO () -> Test
testCase String
"InterAgentBroadcast"
              (String
-> LocalNode
-> Maybe ()
-> (TestResult (Maybe ()) -> Process ())
-> IO ()
forall a.
Eq a =>
String -> LocalNode -> a -> (TestResult a -> Process ()) -> IO ()
delayedAssertion
               String
"expected (), but no broadcast was received"
               LocalNode
node1 (() -> Maybe ()
forall a. a -> Maybe a
Just ()) TestResult (Maybe ()) -> Process ()
testAgentBroadcast)
        , String -> IO () -> Test
testCase String
"AgentMailboxHandling"
              (String
-> LocalNode
-> Maybe ()
-> (TestResult (Maybe ()) -> Process ())
-> IO ()
forall a.
Eq a =>
String -> LocalNode -> a -> (TestResult a -> Process ()) -> IO ()
delayedAssertion
               String
"expected (Just ()), but no regular (mailbox) input was handled"
               LocalNode
node1 (() -> Maybe ()
forall a. a -> Maybe a
Just ()) TestResult (Maybe ()) -> Process ()
testAgentMailboxHandling)
        , String -> IO () -> Test
testCase String
"AgentDualInputHandling"
              (String
-> LocalNode
-> Maybe Int
-> (TestResult (Maybe Int) -> Process ())
-> IO ()
forall a.
Eq a =>
String -> LocalNode -> a -> (TestResult a -> Process ()) -> IO ()
delayedAssertion
               String
"expected sum = 15, but the result was Nothing"
               LocalNode
node1 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
15 :: Maybe Int) TestResult (Maybe Int) -> Process ()
testAgentDualInput)
        , String -> IO () -> Test
testCase String
"AgentInputPrioritisation"
              (String
-> LocalNode
-> [String]
-> (TestResult [String] -> Process ())
-> IO ()
forall a.
Eq a =>
String -> LocalNode -> a -> (TestResult a -> Process ()) -> IO ()
delayedAssertion
               String
"expected [first, second, third, fourth, fifth], but result diverged"
               LocalNode
node1 ([String] -> [String]
forall a. Ord a => [a] -> [a]
sort [String
"first", String
"second",
                            String
"third", String
"fourth",
                            String
"fifth"]) TestResult [String] -> Process ()
testAgentPrioritisation)
      ]
    , String -> [Test] -> Test
testGroup String
"MxEvents" [
        String -> IO () -> Test
testCase String
"NameRegistrationEvents"
          (String -> LocalNode -> () -> (TestResult () -> Process ()) -> IO ()
forall a.
Eq a =>
String -> LocalNode -> a -> (TestResult a -> Process ()) -> IO ()
delayedAssertion
           String
"expected registration events to map to the correct ProcessId"
           LocalNode
node1 () TestResult () -> Process ()
testMxRegEvents)
      , String -> IO () -> Test
testCase String
"PostDeathNameUnRegistrationEvents"
          (String -> LocalNode -> () -> (TestResult () -> Process ()) -> IO ()
forall a.
Eq a =>
String -> LocalNode -> a -> (TestResult a -> Process ()) -> IO ()
delayedAssertion
           String
"expected process deaths to result in unregistration events"
           LocalNode
node1 () (LocalNode -> TestResult () -> Process ()
testMxRegMon LocalNode
node2))
      , String -> [Test] -> Test
testGroup String
"SendEvents" ([Test] -> Test) -> [Test] -> Test
forall a b. (a -> b) -> a -> b
$ LocalNode -> LocalNode -> [Test]
buildTestCases LocalNode
node1 LocalNode
node2
      ]
    ]
  where
    buildTestCases :: LocalNode -> LocalNode -> [Test]
buildTestCases LocalNode
n1 LocalNode
n2 = let nid :: NodeId
nid = LocalNode -> NodeId
localNodeId LocalNode
n2 in LocalNode
-> LocalNode
-> [(String, [(String, Maybe LocalNode -> Process ())])]
-> [Test]
build LocalNode
n1 LocalNode
n2 [
              (String
"NSend", [
                  (String
"nsend",              (String -> () -> Process ()) -> Maybe LocalNode -> Process ()
testNSend String -> () -> Process ()
forall a. Serializable a => String -> a -> Process ()
nsend)
                , (String
"Unsafe.nsend",       (String -> () -> Process ()) -> Maybe LocalNode -> Process ()
testNSend String -> () -> Process ()
forall a. Serializable a => String -> a -> Process ()
Unsafe.nsend)
                , (String
"nsendRemote",        (String -> () -> Process ()) -> Maybe LocalNode -> Process ()
testNSend (NodeId -> String -> () -> Process ()
forall a. Serializable a => NodeId -> String -> a -> Process ()
nsendRemote NodeId
nid))
                , (String
"Unsafe.nsendRemote", (String -> () -> Process ()) -> Maybe LocalNode -> Process ()
testNSend (NodeId -> String -> () -> Process ()
forall a. Serializable a => NodeId -> String -> a -> Process ()
Unsafe.nsendRemote NodeId
nid))
                ])
            , (String
"Send", [
                  (String
"send",            (ProcessId -> () -> Process ()) -> Maybe LocalNode -> Process ()
testSend ProcessId -> () -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send)
                , (String
"Unsafe.send",     (ProcessId -> () -> Process ()) -> Maybe LocalNode -> Process ()
testSend ProcessId -> () -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
Unsafe.send)
                , (String
"usend",           (ProcessId -> () -> Process ()) -> Maybe LocalNode -> Process ()
testSend ProcessId -> () -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
usend)
                , (String
"Unsafe.usend",    (ProcessId -> () -> Process ()) -> Maybe LocalNode -> Process ()
testSend ProcessId -> () -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
Unsafe.usend)
                , (String
"sendChan",        (SendPort () -> () -> Process ()) -> Maybe LocalNode -> Process ()
testChan SendPort () -> () -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan)
                , (String
"Unsafe.sendChan", (SendPort () -> () -> Process ()) -> Maybe LocalNode -> Process ()
testChan SendPort () -> () -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
Unsafe.sendChan)
                ])
            , (String
"Forward", [
                  (String
"forward",  (ProcessId -> () -> Process ()) -> Maybe LocalNode -> Process ()
testSend (\ProcessId
p ()
m -> Message -> ProcessId -> Process ()
forward (() -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage ()
m) ProcessId
p))
                , (String
"uforward", (ProcessId -> () -> Process ()) -> Maybe LocalNode -> Process ()
testSend (\ProcessId
p ()
m -> Message -> ProcessId -> Process ()
uforward (() -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage ()
m) ProcessId
p))
                ])
            ]

    build :: LocalNode
          -> LocalNode
          -> [(String, [(String, (Maybe LocalNode -> Process ()))])]
          -> [Test]
    build :: LocalNode
-> LocalNode
-> [(String, [(String, Maybe LocalNode -> Process ())])]
-> [Test]
build LocalNode
n LocalNode
ln [(String, [(String, Maybe LocalNode -> Process ())])]
specs =
      [ String -> [Test] -> Test
testGroup (String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
"-" [String
groupName, String
caseSuffix]) [
             String -> IO () -> Test
testCase (String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
"-" [String
caseName, String
caseSuffix])
               (LocalNode -> Process () -> IO ()
runProcess LocalNode
n (Maybe LocalNode -> Process ()
caseImpl Maybe LocalNode
caseNode))
             | (String
caseName, Maybe LocalNode -> Process ()
caseImpl) <- [(String, Maybe LocalNode -> Process ())]
groupCases
           ]
         | (String
groupName, [(String, Maybe LocalNode -> Process ())]
groupCases) <- [(String, [(String, Maybe LocalNode -> Process ())])]
specs
         , (String
caseSuffix, Maybe LocalNode
caseNode) <- [(String
"RemotePid", LocalNode -> Maybe LocalNode
forall a. a -> Maybe a
Just LocalNode
ln), (String
"LocalPid", Maybe LocalNode
forall a. Maybe a
Nothing)]
      ]