{-# LANGUAGE RebindableSyntax #-}
module Network.Transport.Tests where

import Prelude hiding
  ( (>>=)
  , return
  , fail
  , (>>)
#if ! MIN_VERSION_base(4,6,0)
  , catch
#endif
  )
import Control.Concurrent (forkIO, killThread, yield)
import Control.Concurrent.MVar (newEmptyMVar, takeMVar, putMVar, readMVar, tryTakeMVar, modifyMVar_, newMVar)
import Control.Exception
  ( evaluate
  , throw
  , throwIO
  , bracket
  , catch
  , ErrorCall(..)
  )
import Control.Monad (replicateM, replicateM_, when, guard, forM_, unless)
import Control.Monad.Error ()
import Control.Applicative ((<$>))
import Network.Transport
import Network.Transport.Internal (tlog, tryIO, timeoutMaybe)
import Network.Transport.Util (spawn)
import System.Random (randomIO)
import Data.ByteString (ByteString)
import Data.ByteString.Char8 (pack)
import Data.Map (Map)
import qualified Data.Map as Map (empty, insert, delete, findWithDefault, adjust, null, toList, map)
import Data.String (fromString)
import Data.List (permutations)
import Network.Transport.Tests.Auxiliary (forkTry, runTests, trySome, randomThreadDelay)
import Network.Transport.Tests.Traced

-- | Server that echoes messages straight back to the origin endpoint.
echoServer :: EndPoint -> IO ()
echoServer :: EndPoint -> IO ()
echoServer EndPoint
endpoint = do
    Map ConnectionId Connection -> IO ()
go forall k a. Map k a
Map.empty
  where
    go :: Map ConnectionId Connection -> IO ()
    go :: Map ConnectionId Connection -> IO ()
go Map ConnectionId Connection
cs = do
      Event
event <- EndPoint -> IO Event
receive EndPoint
endpoint
      case Event
event of
        ConnectionOpened ConnectionId
cid Reliability
rel EndPointAddress
addr -> do
          forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog forall a b. (a -> b) -> a -> b
$ [Char]
"Opened new connection " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show ConnectionId
cid
          Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr Reliability
rel ConnectHints
defaultConnectHints
          Map ConnectionId Connection -> IO ()
go (forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert ConnectionId
cid Connection
conn Map ConnectionId Connection
cs)
        Received ConnectionId
cid [ByteString]
payload -> do
          Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send (forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault (forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"Received: Invalid cid " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show ConnectionId
cid) ConnectionId
cid Map ConnectionId Connection
cs) [ByteString]
payload
          Map ConnectionId Connection -> IO ()
go Map ConnectionId Connection
cs
        ConnectionClosed ConnectionId
cid -> do
          forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog forall a b. (a -> b) -> a -> b
$ [Char]
"Close connection " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show ConnectionId
cid
          Connection -> IO ()
close (forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault (forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"ConnectionClosed: Invalid cid " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show ConnectionId
cid) ConnectionId
cid Map ConnectionId Connection
cs)
          Map ConnectionId Connection -> IO ()
go (forall k a. Ord k => k -> Map k a -> Map k a
Map.delete ConnectionId
cid Map ConnectionId Connection
cs)
        ReceivedMulticast MulticastAddress
_ [ByteString]
_ ->
          -- Ignore
          Map ConnectionId Connection -> IO ()
go Map ConnectionId Connection
cs
        ErrorEvent TransportError EventErrorCode
_ ->
          [Char] -> IO ()
putStrLn forall a b. (a -> b) -> a -> b
$ [Char]
"Echo server received error event: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Event
event
        Event
EndPointClosed ->
          forall (m :: * -> *) a. MonadS m => a -> m a
return ()

-- | Ping client used in a few tests
ping :: EndPoint -> EndPointAddress -> Int -> ByteString -> IO ()
ping :: EndPoint -> EndPointAddress -> Int -> ByteString -> IO ()
ping EndPoint
endpoint EndPointAddress
server Int
numPings ByteString
msg = do
  -- Open connection to the server
  forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Connect to echo server"
  Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints

  -- Wait for the server to open reply connection
  forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Wait for ConnectionOpened message"
  ConnectionOpened ConnectionId
cid Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint

  -- Send pings and wait for reply
  forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Send ping and wait for reply"
  forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings forall a b. (a -> b) -> a -> b
$ do
      Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
msg]
      Received ConnectionId
cid' [ByteString
reply] <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
cid' Bool -> Bool -> Bool
&& ByteString
reply forall a. Eq a => a -> a -> Bool
== ByteString
msg
      forall (m :: * -> *) a. MonadS m => a -> m a
return ()

  -- Close the connection
  forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Close the connection"
  Connection -> IO ()
close Connection
conn

  -- Wait for the server to close its connection to us
  forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Wait for ConnectionClosed message"
  ConnectionClosed ConnectionId
cid' <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'

  -- Done
  forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Ping client done"

-- | Basic ping test
testPingPong :: Transport -> Int -> IO ()
testPingPong :: Transport -> Int -> IO ()
testPingPong Transport
transport Int
numPings = do
  forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Starting ping pong test"
  EndPointAddress
server <- Transport -> (EndPoint -> IO ()) -> IO EndPointAddress
spawn Transport
transport EndPoint -> IO ()
echoServer
  MVar ()
result <- forall a. IO (MVar a)
newEmptyMVar

  -- Client
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Ping client"
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    EndPoint -> EndPointAddress -> Int -> ByteString -> IO ()
ping EndPoint
endpoint EndPointAddress
server Int
numPings ByteString
"ping"
    forall a. MVar a -> a -> IO ()
putMVar MVar ()
result ()

  forall a. MVar a -> IO a
takeMVar MVar ()
result

-- | Test that endpoints don't get confused
testEndPoints :: Transport -> Int -> IO ()
testEndPoints :: Transport -> Int -> IO ()
testEndPoints Transport
transport Int
numPings = do
  EndPointAddress
server <- Transport -> (EndPoint -> IO ()) -> IO EndPointAddress
spawn Transport
transport EndPoint -> IO ()
echoServer
  [MVar ()]
dones <- forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
2 forall a. IO (MVar a)
newEmptyMVar

  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (forall a b. [a] -> [b] -> [(a, b)]
zip [MVar ()]
dones [Char
'A'..]) forall a b. (a -> b) -> a -> b
$ \(MVar ()
done, Char
name) -> IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    let name' :: ByteString
        name' :: ByteString
name' = [Char] -> ByteString
pack [Char
name]
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog forall a b. (a -> b) -> a -> b
$ [Char]
"Ping client " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show ByteString
name' forall a. [a] -> [a] -> [a]
++ [Char]
": " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show (EndPoint -> EndPointAddress
address EndPoint
endpoint)
    EndPoint -> EndPointAddress -> Int -> ByteString -> IO ()
ping EndPoint
endpoint EndPointAddress
server Int
numPings ByteString
name'
    forall a. MVar a -> a -> IO ()
putMVar MVar ()
done ()

  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [MVar ()]
dones forall a. MVar a -> IO a
takeMVar

-- Test that connections don't get confused
testConnections :: Transport -> Int -> IO ()
testConnections :: Transport -> Int -> IO ()
testConnections Transport
transport Int
numPings = do
  EndPointAddress
server <- Transport -> (EndPoint -> IO ()) -> IO EndPointAddress
spawn Transport
transport EndPoint -> IO ()
echoServer
  MVar ()
result <- forall a. IO (MVar a)
newEmptyMVar

  -- Client
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

    -- Open two connections to the server
    Right Connection
conn1 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    ConnectionOpened ConnectionId
serv1 Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint

    Right Connection
conn2 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    ConnectionOpened ConnectionId
serv2 Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint

    -- One thread to send "pingA" on the first connection
    IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings forall a b. (a -> b) -> a -> b
$ Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn1 [ByteString
"pingA"]

    -- One thread to send "pingB" on the second connection
    IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings forall a b. (a -> b) -> a -> b
$ Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn2 [ByteString
"pingB"]

    -- Verify server responses
    let verifyResponse :: t -> IO ()
verifyResponse t
0 = forall a. MVar a -> a -> IO ()
putMVar MVar ()
result ()
        verifyResponse t
n = do
          Event
event <- EndPoint -> IO Event
receive EndPoint
endpoint
          case Event
event of
            Received ConnectionId
cid [ByteString
payload] -> do
              forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
serv1 Bool -> Bool -> Bool
&& ByteString
payload forall a. Eq a => a -> a -> Bool
/= ByteString
"pingA") forall a b. (a -> b) -> a -> b
$ forall a. HasCallStack => [Char] -> a
error [Char]
"Wrong message"
              forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
serv2 Bool -> Bool -> Bool
&& ByteString
payload forall a. Eq a => a -> a -> Bool
/= ByteString
"pingB") forall a b. (a -> b) -> a -> b
$ forall a. HasCallStack => [Char] -> a
error [Char]
"Wrong message"
              t -> IO ()
verifyResponse (t
n forall a. Num a => a -> a -> a
- t
1)
            Event
_ ->
              t -> IO ()
verifyResponse t
n
    forall {t}. (Eq t, Num t) => t -> IO ()
verifyResponse (Int
2 forall a. Num a => a -> a -> a
* Int
numPings)

  forall a. MVar a -> IO a
takeMVar MVar ()
result

-- | Test that closing one connection does not close the other
testCloseOneConnection :: Transport -> Int -> IO ()
testCloseOneConnection :: Transport -> Int -> IO ()
testCloseOneConnection Transport
transport Int
numPings = do
  EndPointAddress
server <- Transport -> (EndPoint -> IO ()) -> IO EndPointAddress
spawn Transport
transport EndPoint -> IO ()
echoServer
  MVar ()
result <- forall a. IO (MVar a)
newEmptyMVar

  -- Client
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

    -- Open two connections to the server
    Right Connection
conn1 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    ConnectionOpened ConnectionId
serv1 Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint

    Right Connection
conn2 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    ConnectionOpened ConnectionId
serv2 Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint

    -- One thread to send "pingA" on the first connection
    IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
      forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings forall a b. (a -> b) -> a -> b
$ Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn1 [ByteString
"pingA"]
      Connection -> IO ()
close Connection
conn1

    -- One thread to send "pingB" on the second connection
    IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ (Int
numPings forall a. Num a => a -> a -> a
* Int
2) forall a b. (a -> b) -> a -> b
$ Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn2 [ByteString
"pingB"]

    -- Verify server responses
    let verifyResponse :: t -> IO ()
verifyResponse t
0 = forall a. MVar a -> a -> IO ()
putMVar MVar ()
result ()
        verifyResponse t
n = do
          Event
event <- EndPoint -> IO Event
receive EndPoint
endpoint
          case Event
event of
            Received ConnectionId
cid [ByteString
payload] -> do
              forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
serv1 Bool -> Bool -> Bool
&& ByteString
payload forall a. Eq a => a -> a -> Bool
/= ByteString
"pingA") forall a b. (a -> b) -> a -> b
$ forall a. HasCallStack => [Char] -> a
error [Char]
"Wrong message"
              forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
serv2 Bool -> Bool -> Bool
&& ByteString
payload forall a. Eq a => a -> a -> Bool
/= ByteString
"pingB") forall a b. (a -> b) -> a -> b
$ forall a. HasCallStack => [Char] -> a
error [Char]
"Wrong message"
              t -> IO ()
verifyResponse (t
n forall a. Num a => a -> a -> a
- t
1)
            Event
_ ->
              t -> IO ()
verifyResponse t
n
    forall {t}. (Eq t, Num t) => t -> IO ()
verifyResponse (Int
3 forall a. Num a => a -> a -> a
* Int
numPings)

  forall a. MVar a -> IO a
takeMVar MVar ()
result

-- | Test that if A connects to B and B connects to A, B can still send to A after
-- A closes its connection to B (for instance, in the TCP transport, the socket pair
-- connecting A and B should not yet be closed).
testCloseOneDirection :: Transport -> Int -> IO ()
testCloseOneDirection :: Transport -> Int -> IO ()
testCloseOneDirection Transport
transport Int
numPings = do
  MVar EndPointAddress
addrA <- forall a. IO (MVar a)
newEmptyMVar
  MVar EndPointAddress
addrB <- forall a. IO (MVar a)
newEmptyMVar
  MVar ()
doneA <- forall a. IO (MVar a)
newEmptyMVar
  MVar ()
doneB <- forall a. IO (MVar a)
newEmptyMVar

  -- A
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"A"
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog (forall a. Show a => a -> [Char]
show (EndPoint -> EndPointAddress
address EndPoint
endpoint))
    forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
addrA (EndPoint -> EndPointAddress
address EndPoint
endpoint)

    -- Connect to B
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Connect to B"
    Right Connection
conn <- forall a. MVar a -> IO a
readMVar MVar EndPointAddress
addrB forall (m :: * -> *) a b.
(MonadS m, Traceable a) =>
m a -> (a -> m b) -> m b
>>= \EndPointAddress
addr -> EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr Reliability
ReliableOrdered ConnectHints
defaultConnectHints

    -- Wait for B to connect to us
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Wait for B"
    ConnectionOpened ConnectionId
cid Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint

    -- Send pings to B
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Send pings to B"
    forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings forall a b. (a -> b) -> a -> b
$ Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"ping"]

    -- Close our connection to B
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Close connection"
    Connection -> IO ()
close Connection
conn

    -- Wait for B's pongs
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Wait for pongs from B"
    forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings forall a b. (a -> b) -> a -> b
$ do Received ConnectionId
_ [ByteString]
_ <- EndPoint -> IO Event
receive EndPoint
endpoint ; forall (m :: * -> *) a. MonadS m => a -> m a
return ()

    -- Wait for B to close it's connection to us
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Wait for B to close connection"
    ConnectionClosed ConnectionId
cid' <- EndPoint -> IO Event
receive EndPoint
endpoint
    forall (f :: * -> *). Alternative f => Bool -> f ()
guard (ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
cid')

    -- Done
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Done"
    forall a. MVar a -> a -> IO ()
putMVar MVar ()
doneA ()

  -- B
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"B"
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog (forall a. Show a => a -> [Char]
show (EndPoint -> EndPointAddress
address EndPoint
endpoint))
    forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
addrB (EndPoint -> EndPointAddress
address EndPoint
endpoint)

    -- Wait for A to connect
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Wait for A to connect"
    ConnectionOpened ConnectionId
cid Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint

    -- Connect to A
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Connect to A"
    Right Connection
conn <- forall a. MVar a -> IO a
readMVar MVar EndPointAddress
addrA forall (m :: * -> *) a b.
(MonadS m, Traceable a) =>
m a -> (a -> m b) -> m b
>>= \EndPointAddress
addr -> EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr Reliability
ReliableOrdered ConnectHints
defaultConnectHints

    -- Wait for A's pings
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Wait for pings from A"
    forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings forall a b. (a -> b) -> a -> b
$ do Received ConnectionId
_ [ByteString]
_ <- EndPoint -> IO Event
receive EndPoint
endpoint ; forall (m :: * -> *) a. MonadS m => a -> m a
return ()

    -- Wait for A to close it's connection to us
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Wait for A to close connection"
    ConnectionClosed ConnectionId
cid' <- EndPoint -> IO Event
receive EndPoint
endpoint
    forall (f :: * -> *). Alternative f => Bool -> f ()
guard (ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
cid')

    -- Send pongs to A
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Send pongs to A"
    forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings forall a b. (a -> b) -> a -> b
$ Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"pong"]

    -- Close our connection to A
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Close connection to A"
    Connection -> IO ()
close Connection
conn

    -- Done
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Done"
    forall a. MVar a -> a -> IO ()
putMVar MVar ()
doneB ()

  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall a. MVar a -> IO a
takeMVar [MVar ()
doneA, MVar ()
doneB]

-- | Collect events and order them by connection ID
collect :: EndPoint -> Maybe Int -> Maybe Int -> IO [(ConnectionId, [[ByteString]])]
collect :: EndPoint
-> Maybe Int -> Maybe Int -> IO [(ConnectionId, [[ByteString]])]
collect EndPoint
endPoint Maybe Int
maxEvents Maybe Int
timeout = forall {a} {m :: * -> *}.
(Eq a, MonadS m, MonadIO m, Num a) =>
Maybe a
-> Map ConnectionId [[ByteString]]
-> Map ConnectionId [[ByteString]]
-> m [(ConnectionId, [[ByteString]])]
go Maybe Int
maxEvents forall k a. Map k a
Map.empty forall k a. Map k a
Map.empty
  where
    -- TODO: for more serious use of this function we'd need to make these arguments strict
    go :: Maybe a
-> Map ConnectionId [[ByteString]]
-> Map ConnectionId [[ByteString]]
-> m [(ConnectionId, [[ByteString]])]
go (Just a
0) Map ConnectionId [[ByteString]]
open Map ConnectionId [[ByteString]]
closed = forall {m :: * -> *} {b} {b} {k} {a}.
(MonadS m, Show b) =>
Map b b -> Map k [a] -> m [(k, [a])]
finish Map ConnectionId [[ByteString]]
open Map ConnectionId [[ByteString]]
closed
    go Maybe a
n Map ConnectionId [[ByteString]]
open Map ConnectionId [[ByteString]]
closed = do
      Either IOError Event
mEvent <- forall (m :: * -> *) a. MonadIO m => IO a -> m (Either IOError a)
tryIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e a. Exception e => Maybe Int -> e -> IO a -> IO a
timeoutMaybe Maybe Int
timeout ([Char] -> IOError
userError [Char]
"timeout") forall a b. (a -> b) -> a -> b
$ EndPoint -> IO Event
receive EndPoint
endPoint
      case Either IOError Event
mEvent of
        Left IOError
_ -> forall {m :: * -> *} {b} {b} {k} {a}.
(MonadS m, Show b) =>
Map b b -> Map k [a] -> m [(k, [a])]
finish Map ConnectionId [[ByteString]]
open Map ConnectionId [[ByteString]]
closed
        Right Event
event -> do
          let n' :: Maybe a
n' = (\a
x -> a
x forall a. Num a => a -> a -> a
- a
1) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe a
n
          case Event
event of
            ConnectionOpened ConnectionId
cid Reliability
_ EndPointAddress
_ ->
              Maybe a
-> Map ConnectionId [[ByteString]]
-> Map ConnectionId [[ByteString]]
-> m [(ConnectionId, [[ByteString]])]
go Maybe a
n' (forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert ConnectionId
cid [] Map ConnectionId [[ByteString]]
open) Map ConnectionId [[ByteString]]
closed
            ConnectionClosed ConnectionId
cid ->
              let list :: [[ByteString]]
list = forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault (forall a. HasCallStack => [Char] -> a
error [Char]
"Invalid ConnectionClosed") ConnectionId
cid Map ConnectionId [[ByteString]]
open in
              Maybe a
-> Map ConnectionId [[ByteString]]
-> Map ConnectionId [[ByteString]]
-> m [(ConnectionId, [[ByteString]])]
go Maybe a
n' (forall k a. Ord k => k -> Map k a -> Map k a
Map.delete ConnectionId
cid Map ConnectionId [[ByteString]]
open) (forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert ConnectionId
cid [[ByteString]]
list Map ConnectionId [[ByteString]]
closed)
            Received ConnectionId
cid [ByteString]
msg ->
              Maybe a
-> Map ConnectionId [[ByteString]]
-> Map ConnectionId [[ByteString]]
-> m [(ConnectionId, [[ByteString]])]
go Maybe a
n' (forall k a. Ord k => (a -> a) -> k -> Map k a -> Map k a
Map.adjust ([ByteString]
msg forall a. a -> [a] -> [a]
:) ConnectionId
cid Map ConnectionId [[ByteString]]
open) Map ConnectionId [[ByteString]]
closed
            ReceivedMulticast MulticastAddress
_ [ByteString]
_ ->
              forall (m :: * -> *) a. MonadS m => [Char] -> m a
fail [Char]
"Unexpected multicast"
            ErrorEvent TransportError EventErrorCode
_ ->
              forall (m :: * -> *) a. MonadS m => [Char] -> m a
fail [Char]
"Unexpected error"
            Event
EndPointClosed ->
              forall (m :: * -> *) a. MonadS m => [Char] -> m a
fail [Char]
"Unexpected endpoint closure"

    finish :: Map b b -> Map k [a] -> m [(k, [a])]
finish Map b b
open Map k [a]
closed =
      if forall k a. Map k a -> Bool
Map.null Map b b
open
        then forall (m :: * -> *) a. MonadS m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall k a. Map k a -> [(k, a)]
Map.toList forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b k. (a -> b) -> Map k a -> Map k b
Map.map forall a. [a] -> [a]
reverse forall a b. (a -> b) -> a -> b
$ Map k [a]
closed
        else forall (m :: * -> *) a. MonadS m => [Char] -> m a
fail forall a b. (a -> b) -> a -> b
$ [Char]
"Open connections: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show (forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> a
fst forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall k a. Map k a -> [(k, a)]
Map.toList forall a b. (a -> b) -> a -> b
$ Map b b
open)

-- | Open connection, close it, then reopen it
-- (In the TCP transport this means the socket will be closed, then reopened)
--
-- Note that B cannot expect to receive all of A's messages on the first connection
-- before receiving the messages on the second connection. What might (and sometimes
-- does) happen is that finishes sending all of its messages on the first connection
-- (in the TCP transport, the first socket pair) while B is behind on reading _from_
-- this connection (socket pair) -- the messages are "in transit" on the network
-- (these tests are done on localhost, so there are in some OS buffer). Then when
-- A opens the second connection (socket pair) B will spawn a new thread for this
-- connection, and hence might start interleaving messages from the first and second
-- connection.
--
-- This is correct behaviour, however: the transport API guarantees reliability and
-- ordering _per connection_, but not _across_ connections.
testCloseReopen :: Transport -> Int -> IO ()
testCloseReopen :: Transport -> Int -> IO ()
testCloseReopen Transport
transport Int
numPings = do
  MVar EndPointAddress
addrB <- forall a. IO (MVar a)
newEmptyMVar
  MVar ()
doneB <- forall a. IO (MVar a)
newEmptyMVar

  let numRepeats :: Int
numRepeats = Int
2 :: Int

  -- A
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

    forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
1 .. Int
numRepeats] forall a b. (a -> b) -> a -> b
$ \Int
i -> do
      forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"A connecting"
      -- Connect to B
      Right Connection
conn <- forall a. MVar a -> IO a
readMVar MVar EndPointAddress
addrB forall (m :: * -> *) a b.
(MonadS m, Traceable a) =>
m a -> (a -> m b) -> m b
>>= \EndPointAddress
addr -> EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr Reliability
ReliableOrdered ConnectHints
defaultConnectHints

      forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"A pinging"
      -- Say hi
      forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
1 .. Int
numPings] forall a b. (a -> b) -> a -> b
$ \Int
j -> Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [[Char] -> ByteString
pack forall a b. (a -> b) -> a -> b
$ [Char]
"ping" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Int
i forall a. [a] -> [a] -> [a]
++ [Char]
"/" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Int
j]

      forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"A closing"
      -- Disconnect again
      Connection -> IO ()
close Connection
conn

    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"A finishing"

  -- B
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
addrB (EndPoint -> EndPointAddress
address EndPoint
endpoint)

    [(ConnectionId, [[ByteString]])]
eventss <- EndPoint
-> Maybe Int -> Maybe Int -> IO [(ConnectionId, [[ByteString]])]
collect EndPoint
endpoint (forall a. a -> Maybe a
Just (Int
numRepeats forall a. Num a => a -> a -> a
* (Int
numPings forall a. Num a => a -> a -> a
+ Int
2))) forall a. Maybe a
Nothing

    forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1 .. Int
numRepeats] [(ConnectionId, [[ByteString]])]
eventss) forall a b. (a -> b) -> a -> b
$ \(Int
i, (ConnectionId
_, [[ByteString]]
events)) -> do
      forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1 .. Int
numPings] [[ByteString]]
events) forall a b. (a -> b) -> a -> b
$ \(Int
j, [ByteString]
event) -> do
        forall (f :: * -> *). Alternative f => Bool -> f ()
guard ([ByteString]
event forall a. Eq a => a -> a -> Bool
== [[Char] -> ByteString
pack forall a b. (a -> b) -> a -> b
$ [Char]
"ping" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Int
i forall a. [a] -> [a] -> [a]
++ [Char]
"/" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Int
j])

    forall a. MVar a -> a -> IO ()
putMVar MVar ()
doneB ()

  forall a. MVar a -> IO a
takeMVar MVar ()
doneB

-- | Test lots of parallel connection attempts
testParallelConnects :: Transport -> Int -> IO ()
testParallelConnects :: Transport -> Int -> IO ()
testParallelConnects Transport
transport Int
numPings = do
  EndPointAddress
server <- Transport -> (EndPoint -> IO ()) -> IO EndPointAddress
spawn Transport
transport EndPoint -> IO ()
echoServer
  MVar ()
done   <- forall a. IO (MVar a)
newEmptyMVar

  Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

  -- Spawn lots of clients
  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
1 .. Int
numPings] forall a b. (a -> b) -> a -> b
$ \Int
i -> IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [[Char] -> ByteString
pack forall a b. (a -> b) -> a -> b
$ [Char]
"ping" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Int
i]
    Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [[Char] -> ByteString
pack forall a b. (a -> b) -> a -> b
$ [Char]
"ping" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Int
i]
    Connection -> IO ()
close Connection
conn

  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    [(ConnectionId, [[ByteString]])]
eventss <- EndPoint
-> Maybe Int -> Maybe Int -> IO [(ConnectionId, [[ByteString]])]
collect EndPoint
endpoint (forall a. a -> Maybe a
Just (Int
numPings forall a. Num a => a -> a -> a
* Int
4)) forall a. Maybe a
Nothing
    -- Check that no pings got sent to the wrong connection
    forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(ConnectionId, [[ByteString]])]
eventss forall a b. (a -> b) -> a -> b
$ \(ConnectionId
_, [[ByteString
ping1], [ByteString
ping2]]) ->
      forall (f :: * -> *). Alternative f => Bool -> f ()
guard (ByteString
ping1 forall a. Eq a => a -> a -> Bool
== ByteString
ping2)
    forall a. MVar a -> a -> IO ()
putMVar MVar ()
done ()

  forall a. MVar a -> IO a
takeMVar MVar ()
done

-- | Test that sending an error to self gives an error in the sender
testSelfSend :: Transport -> IO ()
testSelfSend :: Transport -> IO ()
testSelfSend Transport
transport = do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

    Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint (EndPoint -> EndPointAddress
address EndPoint
endpoint) Reliability
ReliableOrdered
                          ConnectHints
defaultConnectHints

    -- Must clear the ConnectionOpened event or else sending may block
    ConnectionOpened ConnectionId
_ Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint

    do Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ forall a. HasCallStack => [Char] -> a
error [Char]
"bang!" ]
       forall a. HasCallStack => [Char] -> a
error [Char]
"testSelfSend: send didn't fail"
     forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(ErrorCall [Char]
"bang!") -> forall (m :: * -> *) a. MonadS m => a -> m a
return ())

    Connection -> IO ()
close Connection
conn

    -- Must clear this event or else closing the end point may block.
    ConnectionClosed ConnectionId
_ <- EndPoint -> IO Event
receive EndPoint
endpoint

    EndPoint -> IO ()
closeEndPoint EndPoint
endpoint

-- | Test that sending on a closed connection gives an error
testSendAfterClose :: Transport -> Int -> IO ()
testSendAfterClose :: Transport -> Int -> IO ()
testSendAfterClose Transport
transport Int
numRepeats = do
  EndPointAddress
server <- Transport -> (EndPoint -> IO ()) -> IO EndPointAddress
spawn Transport
transport EndPoint -> IO ()
echoServer
  MVar ()
clientDone <- forall a. IO (MVar a)
newEmptyMVar

  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

    -- We request two lightweight connections
    forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numRepeats forall a b. (a -> b) -> a -> b
$ do
      Right Connection
conn1 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints
      Right Connection
conn2 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints

      -- Close the second, but leave the first open; then output on the second
      -- connection (i.e., on a closed connection while there is still another
      -- connection open)
      Connection -> IO ()
close Connection
conn2
      Left (TransportError SendErrorCode
SendClosed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn2 [ByteString
"ping2"]

      -- Now close the first connection, and output on it (i.e., output while
      -- there are no lightweight connection at all anymore)
      Connection -> IO ()
close Connection
conn1
      Left (TransportError SendErrorCode
SendClosed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn2 [ByteString
"ping2"]

      forall (m :: * -> *) a. MonadS m => a -> m a
return ()

    forall a. MVar a -> a -> IO ()
putMVar MVar ()
clientDone ()

  forall a. MVar a -> IO a
takeMVar MVar ()
clientDone

-- | Test that closing the same connection twice has no effect
testCloseTwice :: Transport -> Int -> IO ()
testCloseTwice :: Transport -> Int -> IO ()
testCloseTwice Transport
transport Int
numRepeats = do
  EndPointAddress
server <- Transport -> (EndPoint -> IO ()) -> IO EndPointAddress
spawn Transport
transport EndPoint -> IO ()
echoServer
  MVar ()
clientDone <- forall a. IO (MVar a)
newEmptyMVar

  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

    forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numRepeats forall a b. (a -> b) -> a -> b
$ do
      -- We request two lightweight connections
      Right Connection
conn1 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints
      Right Connection
conn2 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints

      -- Close the second one twice
      Connection -> IO ()
close Connection
conn2
      Connection -> IO ()
close Connection
conn2

      -- Then send a message on the first and close that twice too
      Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn1 [ByteString
"ping"]
      Connection -> IO ()
close Connection
conn1

      -- Verify expected response from the echo server
      ConnectionOpened ConnectionId
cid1 Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint
      ConnectionOpened ConnectionId
cid2 Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint
      -- ordering of the following messages may differ depending of
      -- implementation
      [Event]
ms   <- forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
3 forall a b. (a -> b) -> a -> b
$ EndPoint -> IO Event
receive EndPoint
endpoint
      Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. Eq a => [a] -> [[a]] -> Bool
testStreams [Event]
ms [ [ ConnectionId -> Event
ConnectionClosed ConnectionId
cid2 ]
                                      , [ ConnectionId -> [ByteString] -> Event
Received ConnectionId
cid1 [ByteString
"ping"]
                                        , ConnectionId -> Event
ConnectionClosed ConnectionId
cid1 ]
                                      ]
      forall (m :: * -> *) a. MonadS m => a -> m a
return ()

    forall a. MVar a -> a -> IO ()
putMVar MVar ()
clientDone ()

  forall a. MVar a -> IO a
takeMVar MVar ()
clientDone

-- | Test that we can connect an endpoint to itself
testConnectToSelf :: Transport -> Int -> IO ()
testConnectToSelf :: Transport -> Int -> IO ()
testConnectToSelf Transport
transport Int
numPings = do
  MVar ()
done <- forall a. IO (MVar a)
newEmptyMVar
  MVar ()
reconnect <- forall a. IO (MVar a)
newEmptyMVar
  Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

  forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Creating self-connection"
  Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint (EndPoint -> EndPointAddress
address EndPoint
endpoint) Reliability
ReliableOrdered ConnectHints
defaultConnectHints

  forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Talk to myself"

  -- One thread to write to the endpoint
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog forall a b. (a -> b) -> a -> b
$ [Char]
"writing"

    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog forall a b. (a -> b) -> a -> b
$ [Char]
"Sending ping"
    forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings forall a b. (a -> b) -> a -> b
$ Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"ping"]

    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog forall a b. (a -> b) -> a -> b
$ [Char]
"Closing connection"
    Connection -> IO ()
close Connection
conn
    forall a. MVar a -> IO a
readMVar MVar ()
reconnect
    ConnectionOpened ConnectionId
cid' Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint
    ConnectionClosed ConnectionId
cid'' <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ConnectionId
cid' forall a. Eq a => a -> a -> Bool
== ConnectionId
cid''
    forall (m :: * -> *) a. MonadS m => a -> m a
return ()

  -- And one thread to read
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog forall a b. (a -> b) -> a -> b
$ [Char]
"reading"

    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Waiting for ConnectionOpened"
    ConnectionOpened ConnectionId
cid Reliability
_ EndPointAddress
addr <- EndPoint -> IO Event
receive EndPoint
endpoint

    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Waiting for Received"
    forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings forall a b. (a -> b) -> a -> b
$ do
       Received ConnectionId
cid' [ByteString
"ping"] <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'
       forall (m :: * -> *) a. MonadS m => a -> m a
return ()

    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Waiting for ConnectionClosed"
    ConnectionClosed ConnectionId
cid' <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'

    forall a. MVar a -> a -> IO ()
putMVar MVar ()
reconnect ()

    -- Check that the addr supplied also connects to self.
    -- The other thread verifies this.
    Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    Connection -> IO ()
close Connection
conn

    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Done"
    forall a. MVar a -> a -> IO ()
putMVar MVar ()
done ()

  forall a. MVar a -> IO a
takeMVar MVar ()
done

-- | Test that we can connect an endpoint to itself multiple times
testConnectToSelfTwice :: Transport -> Int -> IO ()
testConnectToSelfTwice :: Transport -> Int -> IO ()
testConnectToSelfTwice Transport
transport Int
numPings = do
  MVar ()
done <- forall a. IO (MVar a)
newEmptyMVar
  Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

  forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Talk to myself"

  -- An MVar to ensure that the node which sends pingA will connect first, as
  -- this determines the order of the events given out by 'collect' and is
  -- essential for the equality test there.
  MVar ()
firstConnectionMade <- forall a. IO (MVar a)
newEmptyMVar

  -- One thread to write to the endpoint using the first connection
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Creating self-connection"
    Right Connection
conn1 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint (EndPoint -> EndPointAddress
address EndPoint
endpoint) Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    forall a. MVar a -> a -> IO ()
putMVar MVar ()
firstConnectionMade ()

    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog forall a b. (a -> b) -> a -> b
$ [Char]
"writing"

    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog forall a b. (a -> b) -> a -> b
$ [Char]
"Sending ping"
    forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings forall a b. (a -> b) -> a -> b
$ Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn1 [ByteString
"pingA"]

    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog forall a b. (a -> b) -> a -> b
$ [Char]
"Closing connection"
    Connection -> IO ()
close Connection
conn1

  -- One thread to write to the endpoint using the second connection
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    forall a. MVar a -> IO a
takeMVar MVar ()
firstConnectionMade
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Creating self-connection"
    Right Connection
conn2 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint (EndPoint -> EndPointAddress
address EndPoint
endpoint) Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog forall a b. (a -> b) -> a -> b
$ [Char]
"writing"

    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog forall a b. (a -> b) -> a -> b
$ [Char]
"Sending ping"
    forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings forall a b. (a -> b) -> a -> b
$ Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn2 [ByteString
"pingB"]

    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog forall a b. (a -> b) -> a -> b
$ [Char]
"Closing connection"
    Connection -> IO ()
close Connection
conn2

  -- And one thread to read
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog forall a b. (a -> b) -> a -> b
$ [Char]
"reading"

    [(ConnectionId
_, [[ByteString]]
events1), (ConnectionId
_, [[ByteString]]
events2)] <- EndPoint
-> Maybe Int -> Maybe Int -> IO [(ConnectionId, [[ByteString]])]
collect EndPoint
endpoint (forall a. a -> Maybe a
Just (Int
2 forall a. Num a => a -> a -> a
* (Int
numPings forall a. Num a => a -> a -> a
+ Int
2))) forall a. Maybe a
Nothing
    Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [[ByteString]]
events1 forall a. Eq a => a -> a -> Bool
== forall a. Int -> a -> [a]
replicate Int
numPings [ByteString
"pingA"]
    Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [[ByteString]]
events2 forall a. Eq a => a -> a -> Bool
== forall a. Int -> a -> [a]
replicate Int
numPings [ByteString
"pingB"]

    forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Done"
    forall a. MVar a -> a -> IO ()
putMVar MVar ()
done ()

  forall a. MVar a -> IO a
takeMVar MVar ()
done

-- | Test that we self-connections no longer work once we close our endpoint
-- or our transport
testCloseSelf :: IO (Either String Transport) -> IO ()
testCloseSelf :: IO (Either [Char] Transport) -> IO ()
testCloseSelf IO (Either [Char] Transport)
newTransport = do
  Right Transport
transport <- IO (Either [Char] Transport)
newTransport
  Right EndPoint
endpoint1 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
  Right EndPoint
endpoint2 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
  Right Connection
conn1     <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint1 (EndPoint -> EndPointAddress
address EndPoint
endpoint1) Reliability
ReliableOrdered ConnectHints
defaultConnectHints
  ConnectionOpened ConnectionId
_ Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint1
  Right Connection
conn2     <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint1 (EndPoint -> EndPointAddress
address EndPoint
endpoint1) Reliability
ReliableOrdered ConnectHints
defaultConnectHints
  ConnectionOpened ConnectionId
_ Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint1
  Right Connection
conn3     <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint2 (EndPoint -> EndPointAddress
address EndPoint
endpoint2) Reliability
ReliableOrdered ConnectHints
defaultConnectHints
  ConnectionOpened ConnectionId
_ Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint2

  -- Close the conneciton and try to send
  Connection -> IO ()
close Connection
conn1
  ConnectionClosed ConnectionId
_ <- EndPoint -> IO Event
receive EndPoint
endpoint1
  Left (TransportError SendErrorCode
SendClosed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn1 [ByteString
"ping"]

  -- Close the first endpoint. We should not be able to use the first
  -- connection anymore, or open more self connections, but the self connection
  -- to the second endpoint should still be fine
  EndPoint -> IO ()
closeEndPoint EndPoint
endpoint1
  Event
EndPointClosed <- EndPoint -> IO Event
receive EndPoint
endpoint1
  Left (TransportError SendErrorCode
SendFailed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn2 [ByteString
"ping"]
  Left (TransportError ConnectErrorCode
ConnectFailed [Char]
_) <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint1 (EndPoint -> EndPointAddress
address EndPoint
endpoint1) Reliability
ReliableOrdered ConnectHints
defaultConnectHints
  Right () <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn3 [ByteString
"ping"]
  Received ConnectionId
_ [ByteString]
_ <- EndPoint -> IO Event
receive EndPoint
endpoint2

  -- Close the transport; now the second should no longer work
  Transport -> IO ()
closeTransport Transport
transport
  Left (TransportError SendErrorCode
SendFailed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn3 [ByteString
"ping"]
  Left TransportError ConnectErrorCode
r <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint2 (EndPoint -> EndPointAddress
address EndPoint
endpoint2) Reliability
ReliableOrdered ConnectHints
defaultConnectHints
  case TransportError ConnectErrorCode
r of
    TransportError ConnectErrorCode
ConnectFailed [Char]
_ -> forall (m :: * -> *) a. MonadS m => a -> m a
return ()
    TransportError ConnectErrorCode
_ -> do [Char] -> IO ()
putStrLn forall a b. (a -> b) -> a -> b
$ [Char]
"Actual: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show TransportError ConnectErrorCode
r
            TransportError ConnectErrorCode
ConnectFailed [Char]
_ <- forall (m :: * -> *) a. MonadS m => a -> m a
return TransportError ConnectErrorCode
r
            forall (m :: * -> *) a. MonadS m => a -> m a
return ()

  forall (m :: * -> *) a. MonadS m => a -> m a
return ()

-- | Test various aspects of 'closeEndPoint'
testCloseEndPoint :: Transport -> Int -> IO ()
testCloseEndPoint :: Transport -> Int -> IO ()
testCloseEndPoint Transport
transport Int
_ = do
  MVar ()
serverFirstTestDone <- forall a. IO (MVar a)
newEmptyMVar
  MVar ()
serverDone <- forall a. IO (MVar a)
newEmptyMVar
  MVar ()
clientDone <- forall a. IO (MVar a)
newEmptyMVar
  MVar EndPointAddress
clientAddr1 <- forall a. IO (MVar a)
newEmptyMVar
  MVar EndPointAddress
clientAddr2 <- forall a. IO (MVar a)
newEmptyMVar
  MVar EndPointAddress
serverAddr <- forall a. IO (MVar a)
newEmptyMVar

  -- Server
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
serverAddr (EndPoint -> EndPointAddress
address EndPoint
endpoint)

    -- First test (see client)
    do
      EndPointAddress
theirAddr <- forall a. MVar a -> IO a
readMVar MVar EndPointAddress
clientAddr1
      ConnectionOpened ConnectionId
cid Reliability
ReliableOrdered EndPointAddress
addr <- EndPoint -> IO Event
receive EndPoint
endpoint
      -- Ensure that connecting to the supplied address reaches the peer.
      Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
      Connection -> IO ()
close Connection
conn
      forall a. MVar a -> a -> IO ()
putMVar MVar ()
serverFirstTestDone ()
      ConnectionClosed ConnectionId
cid' <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'
      forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
serverAddr (EndPoint -> EndPointAddress
address EndPoint
endpoint)
      forall (m :: * -> *) a. MonadS m => a -> m a
return ()

    -- Second test
    do
      EndPointAddress
theirAddr <- forall a. MVar a -> IO a
readMVar MVar EndPointAddress
clientAddr2

      ConnectionOpened ConnectionId
cid Reliability
ReliableOrdered EndPointAddress
addr <- EndPoint -> IO Event
receive EndPoint
endpoint
      -- Ensure that connecting to the supplied address reaches the peer.
      Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
      Connection -> IO ()
close Connection
conn
      Received ConnectionId
cid' [ByteString
"ping"] <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'

      Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
theirAddr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
      Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"pong"]

      ConnectionClosed ConnectionId
cid'' <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
cid''
      ErrorEvent (TransportError (EventConnectionLost EndPointAddress
addr') [Char]
_) <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ EndPointAddress
addr' forall a. Eq a => a -> a -> Bool
== EndPointAddress
theirAddr

      Left (TransportError SendErrorCode
SendFailed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"pong2"]

      forall (m :: * -> *) a. MonadS m => a -> m a
return ()

    forall a. MVar a -> a -> IO ()
putMVar MVar ()
serverDone ()

  -- Client
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do

    -- First test: close endpoint with one outgoing but no incoming connections
    do
      EndPointAddress
theirAddr <- forall a. MVar a -> IO a
takeMVar MVar EndPointAddress
serverAddr
      Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
      forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
clientAddr1 (EndPoint -> EndPointAddress
address EndPoint
endpoint)

      -- Connect to the server, then close the endpoint without disconnecting explicitly
      Right Connection
_ <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
theirAddr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
      ConnectionOpened ConnectionId
cid Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint
      ConnectionClosed ConnectionId
cid' <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'
      -- Don't close before the remote server had a chance to digest the
      -- connection.
      forall a. MVar a -> IO a
readMVar MVar ()
serverFirstTestDone
      EndPoint -> IO ()
closeEndPoint EndPoint
endpoint
      Event
EndPointClosed <- EndPoint -> IO Event
receive EndPoint
endpoint
      forall (m :: * -> *) a. MonadS m => a -> m a
return ()

    -- Second test: close endpoint with one outgoing and one incoming connection
    do
      EndPointAddress
theirAddr <- forall a. MVar a -> IO a
takeMVar MVar EndPointAddress
serverAddr
      Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
      forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
clientAddr2 (EndPoint -> EndPointAddress
address EndPoint
endpoint)

      Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
theirAddr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
      ConnectionOpened ConnectionId
cid Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint
      ConnectionClosed ConnectionId
cid' <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'
      Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"ping"]

      -- Reply from the server
      ConnectionOpened ConnectionId
cid Reliability
ReliableOrdered EndPointAddress
addr <- EndPoint -> IO Event
receive EndPoint
endpoint
      Received ConnectionId
cid' [ByteString
"pong"] <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'

      -- Close the endpoint
      EndPoint -> IO ()
closeEndPoint EndPoint
endpoint
      Event
EndPointClosed <- EndPoint -> IO Event
receive EndPoint
endpoint

      -- Attempt to send should fail with connection closed
      Left (TransportError SendErrorCode
SendFailed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"ping2"]

      -- An attempt to close the already closed connection should just return
      () <- Connection -> IO ()
close Connection
conn

      -- And so should an attempt to connect
      Left (TransportError ConnectErrorCode
ConnectFailed [Char]
_) <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
theirAddr Reliability
ReliableOrdered ConnectHints
defaultConnectHints

      forall (m :: * -> *) a. MonadS m => a -> m a
return ()

    forall a. MVar a -> a -> IO ()
putMVar MVar ()
clientDone ()

  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall a. MVar a -> IO a
takeMVar [MVar ()
serverDone, MVar ()
clientDone]

-- Test closeTransport
--
-- This tests many of the same things that testEndPoint does, and some more
testCloseTransport :: IO (Either String Transport) -> IO ()
testCloseTransport :: IO (Either [Char] Transport) -> IO ()
testCloseTransport IO (Either [Char] Transport)
newTransport = do
  MVar ()
serverDone <- forall a. IO (MVar a)
newEmptyMVar
  MVar ()
clientDone <- forall a. IO (MVar a)
newEmptyMVar
  MVar EndPointAddress
clientAddr1 <- forall a. IO (MVar a)
newEmptyMVar
  MVar EndPointAddress
clientAddr2 <- forall a. IO (MVar a)
newEmptyMVar
  MVar EndPointAddress
serverAddr <- forall a. IO (MVar a)
newEmptyMVar

  -- Server
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    Right Transport
transport <- IO (Either [Char] Transport)
newTransport
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
serverAddr (EndPoint -> EndPointAddress
address EndPoint
endpoint)

    -- Client sets up first endpoint
    EndPointAddress
theirAddr1 <- forall a. MVar a -> IO a
readMVar MVar EndPointAddress
clientAddr1
    ConnectionOpened ConnectionId
cid1 Reliability
ReliableOrdered EndPointAddress
addr <- EndPoint -> IO Event
receive EndPoint
endpoint
    -- Test that the address given does indeed point back to the client
    Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
theirAddr1 Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    Connection -> IO ()
close Connection
conn
    Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    Connection -> IO ()
close Connection
conn

    -- Client sets up second endpoint
    EndPointAddress
theirAddr2 <- forall a. MVar a -> IO a
readMVar MVar EndPointAddress
clientAddr2

    ConnectionOpened ConnectionId
cid2 Reliability
ReliableOrdered EndPointAddress
addr' <- EndPoint -> IO Event
receive EndPoint
endpoint
    -- We're going to use addr' to connect back to the server, which tests
    -- that it's a valid address (but not *necessarily* == to theirAddr2

    Received ConnectionId
cid2' [ByteString
"ping"] <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ConnectionId
cid2' forall a. Eq a => a -> a -> Bool
== ConnectionId
cid2

    Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
theirAddr2 Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"pong"]
    Connection -> IO ()
close Connection
conn
    Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr' Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"pong"]

    -- Client now closes down its transport. We should receive connection closed messages (we don't know the precise order, however)
    -- TODO: should we get an EventConnectionLost for theirAddr1? We have no outgoing connections
    [Event]
evs <- forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
3 forall a b. (a -> b) -> a -> b
$ EndPoint -> IO Event
receive EndPoint
endpoint
    let expected :: [Event]
expected = [ ConnectionId -> Event
ConnectionClosed ConnectionId
cid1
                   , ConnectionId -> Event
ConnectionClosed ConnectionId
cid2
                   -- , ErrorEvent (TransportError (EventConnectionLost theirAddr1) "")
                   , TransportError EventErrorCode -> Event
ErrorEvent (forall error. error -> [Char] -> TransportError error
TransportError (EndPointAddress -> EventErrorCode
EventConnectionLost EndPointAddress
addr') [Char]
"")
                   ]
    Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [Event]
expected forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` forall a. [a] -> [[a]]
permutations [Event]
evs

    -- An attempt to send to the endpoint should now fail
    Left (TransportError SendErrorCode
SendFailed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"pong2"]

    forall a. MVar a -> a -> IO ()
putMVar MVar ()
serverDone ()

  -- Client
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    Right Transport
transport <- IO (Either [Char] Transport)
newTransport
    EndPointAddress
theirAddr <- forall a. MVar a -> IO a
readMVar MVar EndPointAddress
serverAddr

    -- Set up endpoint with one outgoing but no incoming connections
    Right EndPoint
endpoint1 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
clientAddr1 (EndPoint -> EndPointAddress
address EndPoint
endpoint1)

    -- Connect to the server, then close the endpoint without disconnecting explicitly
    Right Connection
_ <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint1 EndPointAddress
theirAddr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    -- Server connects back to verify that both addresses they have for us
    -- are suitable to reach us.
    ConnectionOpened ConnectionId
cid Reliability
ReliableOrdered EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint1
    ConnectionClosed ConnectionId
cid' <- EndPoint -> IO Event
receive EndPoint
endpoint1 ; Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'
    ConnectionOpened ConnectionId
cid Reliability
ReliableOrdered EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint1
    ConnectionClosed ConnectionId
cid' <- EndPoint -> IO Event
receive EndPoint
endpoint1 ; Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'

    -- Set up an endpoint with one outgoing and one incoming connection
    Right EndPoint
endpoint2 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
clientAddr2 (EndPoint -> EndPointAddress
address EndPoint
endpoint2)

    -- The outgoing connection.
    Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint2 EndPointAddress
theirAddr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"ping"]

    -- Reply from the server. It will connect twice, using both addresses
    -- (the one that the client sees, and the one that the server sees).
    ConnectionOpened ConnectionId
cid Reliability
ReliableOrdered EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint2
    Received ConnectionId
cid' [ByteString
"pong"] <- EndPoint -> IO Event
receive EndPoint
endpoint2 ; Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'
    ConnectionClosed ConnectionId
cid'' <- EndPoint -> IO Event
receive EndPoint
endpoint2 ; Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
cid''
    ConnectionOpened ConnectionId
cid Reliability
ReliableOrdered EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint2
    Received ConnectionId
cid' [ByteString
"pong"] <- EndPoint -> IO Event
receive EndPoint
endpoint2 ; Bool
True <- forall (m :: * -> *) a. MonadS m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ConnectionId
cid forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'

    -- Now shut down the entire transport
    Transport -> IO ()
closeTransport Transport
transport

    -- Both endpoints should report that they have been closed
    Event
EndPointClosed <- EndPoint -> IO Event
receive EndPoint
endpoint1
    Event
EndPointClosed <- EndPoint -> IO Event
receive EndPoint
endpoint2

    -- Attempt to send should fail with connection closed
    Left (TransportError SendErrorCode
SendFailed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"ping2"]

    -- An attempt to close the already closed connection should just return
    () <- Connection -> IO ()
close Connection
conn

    -- And so should an attempt to connect on either endpoint
    Left (TransportError ConnectErrorCode
ConnectFailed [Char]
_) <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint1 EndPointAddress
theirAddr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    Left (TransportError ConnectErrorCode
ConnectFailed [Char]
_) <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint2 EndPointAddress
theirAddr Reliability
ReliableOrdered ConnectHints
defaultConnectHints

    -- And finally, so should an attempt to create a new endpoint
    Left (TransportError NewEndPointErrorCode
NewEndPointFailed [Char]
_) <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

    forall a. MVar a -> a -> IO ()
putMVar MVar ()
clientDone ()

  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall a. MVar a -> IO a
takeMVar [MVar ()
serverDone, MVar ()
clientDone]

-- | Remote node attempts to connect to a closed local endpoint
testConnectClosedEndPoint :: Transport -> IO ()
testConnectClosedEndPoint :: Transport -> IO ()
testConnectClosedEndPoint Transport
transport = do
  MVar EndPointAddress
serverAddr   <- forall a. IO (MVar a)
newEmptyMVar
  MVar ()
serverClosed <- forall a. IO (MVar a)
newEmptyMVar
  MVar ()
clientDone   <- forall a. IO (MVar a)
newEmptyMVar

  -- Server
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
serverAddr (EndPoint -> EndPointAddress
address EndPoint
endpoint)

    EndPoint -> IO ()
closeEndPoint EndPoint
endpoint
    forall a. MVar a -> a -> IO ()
putMVar MVar ()
serverClosed ()

  -- Client
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    forall a. MVar a -> IO a
readMVar MVar ()
serverClosed

    Left (TransportError ConnectErrorCode
ConnectNotFound [Char]
_) <- forall a. MVar a -> IO a
readMVar MVar EndPointAddress
serverAddr forall (m :: * -> *) a b.
(MonadS m, Traceable a) =>
m a -> (a -> m b) -> m b
>>= \EndPointAddress
addr -> EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr Reliability
ReliableOrdered ConnectHints
defaultConnectHints

    forall a. MVar a -> a -> IO ()
putMVar MVar ()
clientDone ()

  forall a. MVar a -> IO a
takeMVar MVar ()
clientDone

-- | We should receive an exception when doing a 'receive' after we have been
-- notified that an endpoint has been closed
testExceptionOnReceive :: IO (Either String Transport) -> IO ()
testExceptionOnReceive :: IO (Either [Char] Transport) -> IO ()
testExceptionOnReceive IO (Either [Char] Transport)
newTransport = do
  Right Transport
transport <- IO (Either [Char] Transport)
newTransport

  -- Test one: when we close an endpoint specifically
  Right EndPoint
endpoint1 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
  EndPoint -> IO ()
closeEndPoint EndPoint
endpoint1
  Event
EndPointClosed <- EndPoint -> IO Event
receive EndPoint
endpoint1
  Left SomeException
_ <- forall a. IO a -> IO (Either SomeException a)
trySome (EndPoint -> IO Event
receive EndPoint
endpoint1 forall (m :: * -> *) a b.
(MonadS m, Traceable a) =>
m a -> (a -> m b) -> m b
>>= forall a. a -> IO a
evaluate)

  -- Test two: when we close the entire transport
  Right EndPoint
endpoint2 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
  Transport -> IO ()
closeTransport Transport
transport
  Event
EndPointClosed <- EndPoint -> IO Event
receive EndPoint
endpoint2
  Left SomeException
_ <- forall a. IO a -> IO (Either SomeException a)
trySome (EndPoint -> IO Event
receive EndPoint
endpoint2 forall (m :: * -> *) a b.
(MonadS m, Traceable a) =>
m a -> (a -> m b) -> m b
>>= forall a. a -> IO a
evaluate)

  forall (m :: * -> *) a. MonadS m => a -> m a
return ()

-- | Test what happens when the argument to 'send' is an exceptional value
testSendException :: IO (Either String Transport) -> IO ()
testSendException :: IO (Either [Char] Transport) -> IO ()
testSendException IO (Either [Char] Transport)
newTransport = do
  Right Transport
transport <- IO (Either [Char] Transport)
newTransport
  Right EndPoint
endpoint1 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
  Right EndPoint
endpoint2 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

  -- Connect endpoint1 to endpoint2
  Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint1 (EndPoint -> EndPointAddress
address EndPoint
endpoint2) Reliability
ReliableOrdered ConnectHints
defaultConnectHints
  ConnectionOpened ConnectionId
_ Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint2

  -- Send an exceptional value
  Left (TransportError SendErrorCode
SendFailed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn (forall a e. Exception e => e -> a
throw forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"uhoh")

  -- This will have been as a failure to send by endpoint1, which will
  -- therefore have closed the socket. In turn this will have caused endpoint2
  -- to report that the connection was lost
  ErrorEvent (TransportError (EventConnectionLost EndPointAddress
_) [Char]
_) <- EndPoint -> IO Event
receive EndPoint
endpoint1
  ErrorEvent (TransportError (EventConnectionLost EndPointAddress
_) [Char]
_) <- EndPoint -> IO Event
receive EndPoint
endpoint2

  -- A new connection will re-establish the connection
  Right Connection
conn2 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint1 (EndPoint -> EndPointAddress
address EndPoint
endpoint2) Reliability
ReliableOrdered ConnectHints
defaultConnectHints
  Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn2 [ByteString
"ping"]
  Connection -> IO ()
close Connection
conn2

  ConnectionOpened ConnectionId
_ Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint2
  Received ConnectionId
_ [ByteString
"ping"]    <- EndPoint -> IO Event
receive EndPoint
endpoint2
  ConnectionClosed ConnectionId
_     <- EndPoint -> IO Event
receive EndPoint
endpoint2

  forall (m :: * -> *) a. MonadS m => a -> m a
return ()

-- | If threads get killed while executing a 'connect', 'send', or 'close', this
-- should not affect other threads.
--
-- The intention of this test is to see what happens when a asynchronous
-- exception happes _while executing a send_. This is exceedingly difficult to
-- guarantee, however. Hence we run a large number of tests and insert random
-- thread delays -- and even then it might not happen.  Moreover, it will only
-- happen when we run on multiple cores.
testKill :: IO (Either String Transport) -> Int -> IO ()
testKill :: IO (Either [Char] Transport) -> Int -> IO ()
testKill IO (Either [Char] Transport)
newTransport Int
numThreads = do
  Right Transport
transport1 <- IO (Either [Char] Transport)
newTransport
  Right Transport
transport2 <- IO (Either [Char] Transport)
newTransport
  Right EndPoint
endpoint1 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport1
  Right EndPoint
endpoint2 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport2

  [ThreadId]
threads <- forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numThreads forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ do
    Int -> IO ()
randomThreadDelay Int
100
    forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint1 (EndPoint -> EndPointAddress
address EndPoint
endpoint2) Reliability
ReliableOrdered ConnectHints
defaultConnectHints)
            -- Note that we should not insert a randomThreadDelay into the
            -- exception handler itself as this means that the exception handler
            -- could be interrupted and we might not close
            (\(Right Connection
conn) -> Connection -> IO ()
close Connection
conn)
            (\(Right Connection
conn) -> do Int -> IO ()
randomThreadDelay Int
100
                                 Right () <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"ping"]
                                 Int -> IO ()
randomThreadDelay Int
100)

  MVar Int
numAlive <- forall a. a -> IO (MVar a)
newMVar (Int
0 :: Int)

  -- Kill half of those threads
  IO () -> IO ThreadId
forkIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ThreadId]
threads forall a b. (a -> b) -> a -> b
$ \ThreadId
tid -> do
    Bool
shouldKill <- forall a (m :: * -> *). (Random a, MonadIO m) => m a
randomIO
    if Bool
shouldKill
      then Int -> IO ()
randomThreadDelay Int
600 forall (m :: * -> *) a b. MonadS m => m a -> m b -> m b
>> ThreadId -> IO ()
killThread ThreadId
tid
      else forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar Int
numAlive (forall (m :: * -> *) a. MonadS m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a. Num a => a -> a -> a
+ Int
1))

  -- Since it is impossible to predict when the kill exactly happens, we don't
  -- know how many connects were opened and how many pings were sent. But we
  -- should not have any open connections (if we do, collect will throw an
  -- error) and we should have at least the number of pings equal to the number
  -- of threads we did *not* kill
  [(ConnectionId, [[ByteString]])]
eventss <- EndPoint
-> Maybe Int -> Maybe Int -> IO [(ConnectionId, [[ByteString]])]
collect EndPoint
endpoint2 forall a. Maybe a
Nothing (forall a. a -> Maybe a
Just Int
1000000)
  let actualPings :: Int
actualPings = forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a -> b) -> [a] -> [b]
map (forall (t :: * -> *) a. Foldable t => t a -> Int
length forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> b
snd) forall a b. (a -> b) -> a -> b
$ [(ConnectionId, [[ByteString]])]
eventss
  Int
expectedPings <- forall a. MVar a -> IO a
takeMVar MVar Int
numAlive
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Int
actualPings forall a. Ord a => a -> a -> Bool
>= Int
expectedPings) forall a b. (a -> b) -> a -> b
$
    forall e a. Exception e => e -> IO a
throwIO ([Char] -> IOError
userError [Char]
"Missing pings")

--  print (actualPings, expectedPings)


-- | Set up conditions with a high likelyhood of "crossing" (for transports
-- that multiplex lightweight connections across heavyweight connections)
testCrossing :: Transport -> Int -> IO ()
testCrossing :: Transport -> Int -> IO ()
testCrossing Transport
transport Int
numRepeats = do
  [MVar EndPointAddress
aAddr, MVar EndPointAddress
bAddr] <- forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
2 forall a. IO (MVar a)
newEmptyMVar
  [MVar ()
aDone, MVar ()
bDone] <- forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
2 forall a. IO (MVar a)
newEmptyMVar
  [MVar ()
aGo,   MVar ()
bGo]   <- forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
2 forall a. IO (MVar a)
newEmptyMVar
  [MVar ()
aTimeout, MVar ()
bTimeout] <- forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
2 forall a. IO (MVar a)
newEmptyMVar

  let hints :: ConnectHints
hints = ConnectHints
defaultConnectHints {
                connectTimeout :: Maybe Int
connectTimeout = forall a. a -> Maybe a
Just Int
5000000
              }

  -- A
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
aAddr (EndPoint -> EndPointAddress
address EndPoint
endpoint)
    EndPointAddress
theirAddress <- forall a. MVar a -> IO a
readMVar MVar EndPointAddress
bAddr

    forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numRepeats forall a b. (a -> b) -> a -> b
$ do
      forall a. MVar a -> IO a
takeMVar MVar ()
aGo forall (m :: * -> *) a b. MonadS m => m a -> m b -> m b
>> IO ()
yield
      -- Because we are creating lots of connections, it's possible that
      -- connect times out (for instance, in the TCP transport,
      -- Network.Socket.connect may time out). We shouldn't regard this as an
      -- error in the Transport, though.
      Either (TransportError ConnectErrorCode) Connection
connectResult <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
theirAddress Reliability
ReliableOrdered ConnectHints
hints
      case Either (TransportError ConnectErrorCode) Connection
connectResult of
        Right Connection
conn -> Connection -> IO ()
close Connection
conn
        Left (TransportError ConnectErrorCode
ConnectTimeout [Char]
_) -> forall a. MVar a -> a -> IO ()
putMVar MVar ()
aTimeout ()
        Left (TransportError ConnectErrorCode
ConnectFailed [Char]
_) -> forall a. MVar a -> IO a
readMVar MVar ()
bTimeout
        Left TransportError ConnectErrorCode
err -> forall e a. Exception e => e -> IO a
throwIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> IOError
userError forall a b. (a -> b) -> a -> b
$ [Char]
"testCrossed: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show TransportError ConnectErrorCode
err
      forall a. MVar a -> a -> IO ()
putMVar MVar ()
aDone ()

  -- B
  IO () -> IO ThreadId
forkTry forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
bAddr (EndPoint -> EndPointAddress
address EndPoint
endpoint)
    EndPointAddress
theirAddress <- forall a. MVar a -> IO a
readMVar MVar EndPointAddress
aAddr

    forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numRepeats forall a b. (a -> b) -> a -> b
$ do
      forall a. MVar a -> IO a
takeMVar MVar ()
bGo forall (m :: * -> *) a b. MonadS m => m a -> m b -> m b
>> IO ()
yield
      Either (TransportError ConnectErrorCode) Connection
connectResult <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
theirAddress Reliability
ReliableOrdered ConnectHints
hints
      case Either (TransportError ConnectErrorCode) Connection
connectResult of
        Right Connection
conn -> Connection -> IO ()
close Connection
conn
        Left (TransportError ConnectErrorCode
ConnectTimeout [Char]
_) -> forall a. MVar a -> a -> IO ()
putMVar MVar ()
bTimeout ()
        Left (TransportError ConnectErrorCode
ConnectFailed [Char]
_) -> forall a. MVar a -> IO a
readMVar MVar ()
aTimeout
        Left TransportError ConnectErrorCode
err -> forall e a. Exception e => e -> IO a
throwIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> IOError
userError forall a b. (a -> b) -> a -> b
$ [Char]
"testCrossed: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show TransportError ConnectErrorCode
err
      forall a. MVar a -> a -> IO ()
putMVar MVar ()
bDone ()

  -- Driver
  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
1 .. Int
numRepeats] forall a b. (a -> b) -> a -> b
$ \Int
_i -> do
    -- putStrLn $ "Round " ++ show _i
    forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar ()
aTimeout
    forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar ()
bTimeout
    Bool
b <- forall a (m :: * -> *). (Random a, MonadIO m) => m a
randomIO
    if Bool
b then do forall a. MVar a -> a -> IO ()
putMVar MVar ()
aGo () ; forall a. MVar a -> a -> IO ()
putMVar MVar ()
bGo ()
         else do forall a. MVar a -> a -> IO ()
putMVar MVar ()
bGo () ; forall a. MVar a -> a -> IO ()
putMVar MVar ()
aGo ()
    IO ()
yield
    forall a. MVar a -> IO a
takeMVar MVar ()
aDone
    forall a. MVar a -> IO a
takeMVar MVar ()
bDone

-- Transport tests
testTransport :: IO (Either String Transport) -> IO ()
testTransport :: IO (Either [Char] Transport) -> IO ()
testTransport = ([Char] -> Bool) -> IO (Either [Char] Transport) -> IO ()
testTransportWithFilter (forall a b. a -> b -> a
const Bool
True)

testTransportWithFilter :: (String -> Bool) -> IO (Either String Transport) -> IO ()
testTransportWithFilter :: ([Char] -> Bool) -> IO (Either [Char] Transport) -> IO ()
testTransportWithFilter [Char] -> Bool
p IO (Either [Char] Transport)
newTransport = do
  Right Transport
transport <- IO (Either [Char] Transport)
newTransport
  [([Char], IO ())] -> IO ()
runTests forall a b. (a -> b) -> a -> b
$ forall a. (a -> Bool) -> [a] -> [a]
filter ([Char] -> Bool
p forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> a
fst)
    [ ([Char]
"PingPong",              Transport -> Int -> IO ()
testPingPong Transport
transport Int
numPings)
    , ([Char]
"EndPoints",             Transport -> Int -> IO ()
testEndPoints Transport
transport Int
numPings)
    , ([Char]
"Connections",           Transport -> Int -> IO ()
testConnections Transport
transport Int
numPings)
    , ([Char]
"CloseOneConnection",    Transport -> Int -> IO ()
testCloseOneConnection Transport
transport Int
numPings)
    , ([Char]
"CloseOneDirection",     Transport -> Int -> IO ()
testCloseOneDirection Transport
transport Int
numPings)
    , ([Char]
"CloseReopen",           Transport -> Int -> IO ()
testCloseReopen Transport
transport Int
numPings)
    , ([Char]
"ParallelConnects",      Transport -> Int -> IO ()
testParallelConnects Transport
transport Int
numPings)
    , ([Char]
"SelfSend",              Transport -> IO ()
testSelfSend Transport
transport)
    , ([Char]
"SendAfterClose",        Transport -> Int -> IO ()
testSendAfterClose Transport
transport Int
100)
    , ([Char]
"Crossing",              Transport -> Int -> IO ()
testCrossing Transport
transport Int
10)
    , ([Char]
"CloseTwice",            Transport -> Int -> IO ()
testCloseTwice Transport
transport Int
100)
    , ([Char]
"ConnectToSelf",         Transport -> Int -> IO ()
testConnectToSelf Transport
transport Int
numPings)
    , ([Char]
"ConnectToSelfTwice",    Transport -> Int -> IO ()
testConnectToSelfTwice Transport
transport Int
numPings)
    , ([Char]
"CloseSelf",             IO (Either [Char] Transport) -> IO ()
testCloseSelf IO (Either [Char] Transport)
newTransport)
    , ([Char]
"CloseEndPoint",         Transport -> Int -> IO ()
testCloseEndPoint Transport
transport Int
numPings)
    , ([Char]
"CloseTransport",        IO (Either [Char] Transport) -> IO ()
testCloseTransport IO (Either [Char] Transport)
newTransport)
    , ([Char]
"ConnectClosedEndPoint", Transport -> IO ()
testConnectClosedEndPoint Transport
transport)
    , ([Char]
"ExceptionOnReceive",    IO (Either [Char] Transport) -> IO ()
testExceptionOnReceive IO (Either [Char] Transport)
newTransport)
    , ([Char]
"SendException",         IO (Either [Char] Transport) -> IO ()
testSendException IO (Either [Char] Transport)
newTransport)
    , ([Char]
"Kill",                  IO (Either [Char] Transport) -> Int -> IO ()
testKill IO (Either [Char] Transport)
newTransport Int
1000)
    ]
  where
    numPings :: Int
numPings = Int
10000 :: Int


-- Test that list is a union of stream message, with preserved ordering
-- within each stream.
-- Note: this function may not work if different streams contains equal
-- messages.
testStreams :: Eq a => [a] -> [[a]] -> Bool
testStreams :: forall a. Eq a => [a] -> [[a]] -> Bool
testStreams []      [[a]]
ys = forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all forall (t :: * -> *) a. Foldable t => t a -> Bool
null [[a]]
ys
testStreams (a
x:[a]
xs)  [[a]]
ys =
    case [[a]] -> [[a]] -> [[a]]
go [] [[a]]
ys of
      []  -> Bool
False
      [[a]]
ys' -> forall a. Eq a => [a] -> [[a]] -> Bool
testStreams [a]
xs [[a]]
ys'
  where
    go :: [[a]] -> [[a]] -> [[a]]
go [[a]]
_ [] = []
    go [[a]]
c ([]:[[a]]
zss) = [[a]] -> [[a]] -> [[a]]
go [[a]]
c [[a]]
zss
    go [[a]]
c (z' :: [a]
z'@(a
z:[a]
zs):[[a]]
zss)
        |  a
x forall a. Eq a => a -> a -> Bool
== a
z    = ([a]
zsforall a. a -> [a] -> [a]
:[[a]]
c)forall a. [a] -> [a] -> [a]
++[[a]]
zss
        |  Bool
otherwise = [[a]] -> [[a]] -> [[a]]
go ([a]
z'forall a. a -> [a] -> [a]
:[[a]]
c) [[a]]
zss