{-# LANGUAGE RecursiveDo #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE LambdaCase #-}
module Network.Transport.TCP
(
createTransport
, TCPAddr(..)
, defaultTCPAddr
, TCPAddrInfo(..)
, TCPParameters(..)
, defaultTCPParameters
, createTransportExposeInternals
, TransportInternals(..)
, EndPointId
, ControlHeader(..)
, ConnectionRequestResponse(..)
, firstNonReservedLightweightConnectionId
, firstNonReservedHeavyweightConnectionId
, socketToEndPoint
, LightweightConnectionId
, QDisc(..)
, simpleUnboundedQDisc
, simpleOnePlaceQDisc
) where
import Prelude hiding
( mapM_
#if ! MIN_VERSION_base(4,6,0)
, catch
#endif
)
import Network.Transport
import Network.Transport.TCP.Internal
( ControlHeader(..)
, encodeControlHeader
, decodeControlHeader
, ConnectionRequestResponse(..)
, encodeConnectionRequestResponse
, decodeConnectionRequestResponse
, forkServer
, recvWithLength
, recvExact
, recvWord32
, encodeWord32
, tryCloseSocket
, tryShutdownSocketBoth
, resolveSockAddr
, EndPointId
, encodeEndPointAddress
, decodeEndPointAddress
, currentProtocolVersion
, randomEndPointAddress
)
import Network.Transport.Internal
( prependLength
, mapIOException
, tryIO
, tryToEnum
, void
, timeoutMaybe
, asyncWhenCancelled
)
#ifdef USE_MOCK_NETWORK
import qualified Network.Transport.TCP.Mock.Socket as N
#else
import qualified Network.Socket as N
#endif
( HostName
, ServiceName
, Socket
, getAddrInfo
, maxListenQueue
, socket
, addrFamily
, addrAddress
, SocketType(Stream)
, defaultProtocol
, setSocketOption
, SocketOption(ReuseAddr, NoDelay, UserTimeout, KeepAlive)
, isSupportedSocketOption
, connect
, AddrInfo
, SockAddr(..)
)
#ifdef USE_MOCK_NETWORK
import Network.Transport.TCP.Mock.Socket.ByteString (sendMany)
#else
import Network.Socket.ByteString (sendMany)
#endif
import Control.Concurrent
( forkIO
, ThreadId
, killThread
, myThreadId
, threadDelay
, throwTo
)
import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
import Control.Concurrent.MVar
( MVar
, newMVar
, modifyMVar
, modifyMVar_
, readMVar
, tryReadMVar
, takeMVar
, putMVar
, tryPutMVar
, newEmptyMVar
, withMVar
)
import Control.Concurrent.Async (async, wait)
import Control.Category ((>>>))
import Control.Applicative ((<$>))
import Control.Monad (when, unless, join, mplus, (<=<))
import Control.Exception
( IOException
, SomeException
, AsyncException
, handle
, throw
, throwIO
, try
, bracketOnError
, bracket
, fromException
, finally
, catch
, bracket
, mask
, mask_
)
import Data.IORef (IORef, newIORef, writeIORef, readIORef, writeIORef)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS (concat, length, null)
import qualified Data.ByteString.Char8 as BSC (pack, unpack)
import Data.Bits (shiftL, (.|.))
import Data.Maybe (isJust, isNothing, fromJust)
import Data.Word (Word32)
import Data.Set (Set)
import qualified Data.Set as Set
( empty
, insert
, elems
, singleton
, null
, delete
, member
)
import Data.Map (Map)
import qualified Data.Map as Map (empty)
import Data.Traversable (traverse)
import Data.Accessor (Accessor, accessor, (^.), (^=), (^:))
import qualified Data.Accessor.Container as DAC (mapMaybe)
import Data.Foldable (forM_, mapM_)
import qualified System.Timeout (timeout)
data TransportAddrInfo = TransportAddrInfo
{ TransportAddrInfo -> HostName
transportHost :: !N.HostName
, TransportAddrInfo -> HostName
transportPort :: !N.ServiceName
, TransportAddrInfo -> HostName
transportBindHost :: !N.HostName
, TransportAddrInfo -> HostName
transportBindPort :: !N.ServiceName
}
data TCPTransport = TCPTransport
{ TCPTransport -> Maybe TransportAddrInfo
transportAddrInfo :: !(Maybe TransportAddrInfo)
, TCPTransport -> MVar TransportState
transportState :: !(MVar TransportState)
, TCPTransport -> TCPParameters
transportParams :: !TCPParameters
}
data TransportState =
TransportValid !ValidTransportState
| TransportClosed
data ValidTransportState = ValidTransportState
{ ValidTransportState -> Map HeavyweightConnectionId LocalEndPoint
_localEndPoints :: !(Map EndPointId LocalEndPoint)
, ValidTransportState -> HeavyweightConnectionId
_nextEndPointId :: !EndPointId
}
data LocalEndPoint = LocalEndPoint
{ LocalEndPoint -> EndPointAddress
localAddress :: !EndPointAddress
, LocalEndPoint -> HeavyweightConnectionId
localEndPointId :: !EndPointId
, LocalEndPoint -> MVar LocalEndPointState
localState :: !(MVar LocalEndPointState)
, LocalEndPoint -> QDisc Event
localQueue :: !(QDisc Event)
}
data LocalEndPointState =
LocalEndPointValid !ValidLocalEndPointState
| LocalEndPointClosed
data ValidLocalEndPointState = ValidLocalEndPointState
{
ValidLocalEndPointState -> HeavyweightConnectionId
_localNextConnOutId :: !LightweightConnectionId
, ValidLocalEndPointState -> HeavyweightConnectionId
_nextConnInId :: !HeavyweightConnectionId
, ValidLocalEndPointState -> Map EndPointAddress RemoteEndPoint
_localConnections :: !(Map EndPointAddress RemoteEndPoint)
}
data RemoteEndPoint = RemoteEndPoint
{ RemoteEndPoint -> EndPointAddress
remoteAddress :: !EndPointAddress
, RemoteEndPoint -> MVar RemoteState
remoteState :: !(MVar RemoteState)
, RemoteEndPoint -> HeavyweightConnectionId
remoteId :: !HeavyweightConnectionId
, RemoteEndPoint -> Chan (IO ())
remoteScheduled :: !(Chan (IO ()))
}
data RequestedBy = RequestedByUs | RequestedByThem
deriving (RequestedBy -> RequestedBy -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: RequestedBy -> RequestedBy -> Bool
$c/= :: RequestedBy -> RequestedBy -> Bool
== :: RequestedBy -> RequestedBy -> Bool
$c== :: RequestedBy -> RequestedBy -> Bool
Eq, Int -> RequestedBy -> ShowS
[RequestedBy] -> ShowS
RequestedBy -> HostName
forall a.
(Int -> a -> ShowS) -> (a -> HostName) -> ([a] -> ShowS) -> Show a
showList :: [RequestedBy] -> ShowS
$cshowList :: [RequestedBy] -> ShowS
show :: RequestedBy -> HostName
$cshow :: RequestedBy -> HostName
showsPrec :: Int -> RequestedBy -> ShowS
$cshowsPrec :: Int -> RequestedBy -> ShowS
Show)
data RemoteState =
RemoteEndPointInvalid !(TransportError ConnectErrorCode)
| RemoteEndPointInit !(MVar ()) !(MVar ()) !RequestedBy
| RemoteEndPointValid !ValidRemoteEndPointState
| RemoteEndPointClosing !(MVar ()) !ValidRemoteEndPointState
| RemoteEndPointClosed
| RemoteEndPointFailed !IOException
data ValidRemoteEndPointState = ValidRemoteEndPointState
{ ValidRemoteEndPointState -> Int
_remoteOutgoing :: !Int
, ValidRemoteEndPointState -> Set HeavyweightConnectionId
_remoteIncoming :: !(Set LightweightConnectionId)
, ValidRemoteEndPointState -> HeavyweightConnectionId
_remoteLastIncoming :: !LightweightConnectionId
, ValidRemoteEndPointState -> HeavyweightConnectionId
_remoteNextConnOutId :: !LightweightConnectionId
, ValidRemoteEndPointState -> Socket
remoteSocket :: !N.Socket
, ValidRemoteEndPointState -> Maybe (IO ())
remoteProbing :: Maybe (IO ())
, ValidRemoteEndPointState -> MVar (Maybe SomeException)
remoteSendLock :: !(MVar (Maybe SomeException))
, ValidRemoteEndPointState -> IO ()
remoteSocketClosed :: !(IO ())
}
type EndPointPair = (LocalEndPoint, RemoteEndPoint)
type LightweightConnectionId = Word32
type HeavyweightConnectionId = Word32
data TCPAddrInfo = TCPAddrInfo {
TCPAddrInfo -> HostName
tcpBindHost :: N.HostName
, TCPAddrInfo -> HostName
tcpBindPort :: N.ServiceName
, TCPAddrInfo -> HostName -> (HostName, HostName)
tcpExternalAddress :: N.ServiceName -> (N.HostName, N.ServiceName)
}
data TCPAddr = Addressable TCPAddrInfo | Unaddressable
defaultTCPAddr :: N.HostName -> N.ServiceName -> TCPAddr
defaultTCPAddr :: HostName -> HostName -> TCPAddr
defaultTCPAddr HostName
host HostName
port = TCPAddrInfo -> TCPAddr
Addressable forall a b. (a -> b) -> a -> b
$ TCPAddrInfo {
tcpBindHost :: HostName
tcpBindHost = HostName
host
, tcpBindPort :: HostName
tcpBindPort = HostName
port
, tcpExternalAddress :: HostName -> (HostName, HostName)
tcpExternalAddress = (,) HostName
host
}
data TCPParameters = TCPParameters {
TCPParameters -> Int
tcpBacklog :: Int
, TCPParameters -> Bool
tcpReuseServerAddr :: Bool
, TCPParameters -> Bool
tcpReuseClientAddr :: Bool
, TCPParameters -> Bool
tcpNoDelay :: Bool
, TCPParameters -> Bool
tcpKeepAlive :: Bool
, TCPParameters -> Maybe Int
tcpUserTimeout :: Maybe Int
, TCPParameters -> Maybe Int
transportConnectTimeout :: Maybe Int
, TCPParameters -> forall t. IO (QDisc t)
tcpNewQDisc :: forall t . IO (QDisc t)
, TCPParameters -> HeavyweightConnectionId
tcpMaxAddressLength :: Word32
, TCPParameters -> HeavyweightConnectionId
tcpMaxReceiveLength :: Word32
, TCPParameters -> Bool
tcpCheckPeerHost :: Bool
, TCPParameters -> SomeException -> IO ()
tcpServerExceptionHandler :: SomeException -> IO ()
}
data TransportInternals = TransportInternals
{
TransportInternals -> Maybe ThreadId
transportThread :: Maybe ThreadId
, TransportInternals
-> (forall t. Maybe (QDisc t))
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPointInternal :: (forall t . Maybe (QDisc t))
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
, TransportInternals
-> EndPointAddress -> EndPointAddress -> IO Socket
socketBetween :: EndPointAddress
-> EndPointAddress
-> IO N.Socket
}
createTransport
:: TCPAddr
-> TCPParameters
-> IO (Either IOException Transport)
createTransport :: TCPAddr -> TCPParameters -> IO (Either IOError Transport)
createTransport TCPAddr
addr TCPParameters
params =
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a b. a -> Either a b
Left (forall a b. b -> Either a b
Right forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> a
fst) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TCPAddr
-> TCPParameters
-> IO (Either IOError (Transport, TransportInternals))
createTransportExposeInternals TCPAddr
addr TCPParameters
params
createTransportExposeInternals
:: TCPAddr
-> TCPParameters
-> IO (Either IOException (Transport, TransportInternals))
createTransportExposeInternals :: TCPAddr
-> TCPParameters
-> IO (Either IOError (Transport, TransportInternals))
createTransportExposeInternals TCPAddr
addr TCPParameters
params = do
MVar TransportState
state <- forall a. a -> IO (MVar a)
newMVar forall b c a. (b -> c) -> (a -> b) -> a -> c
. ValidTransportState -> TransportState
TransportValid forall a b. (a -> b) -> a -> b
$ ValidTransportState
{ _localEndPoints :: Map HeavyweightConnectionId LocalEndPoint
_localEndPoints = forall k a. Map k a
Map.empty
, _nextEndPointId :: HeavyweightConnectionId
_nextEndPointId = HeavyweightConnectionId
0
}
case TCPAddr
addr of
TCPAddr
Unaddressable ->
let transport :: TCPTransport
transport = TCPTransport { transportState :: MVar TransportState
transportState = MVar TransportState
state
, transportAddrInfo :: Maybe TransportAddrInfo
transportAddrInfo = forall a. Maybe a
Nothing
, transportParams :: TCPParameters
transportParams = TCPParameters
params
}
in forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. b -> Either a b
Right (TCPTransport
-> Maybe ThreadId -> IO (Transport, TransportInternals)
mkTransport TCPTransport
transport forall a. Maybe a
Nothing)
Addressable (TCPAddrInfo HostName
bindHost HostName
bindPort HostName -> (HostName, HostName)
mkExternal) -> forall (m :: * -> *) a. MonadIO m => IO a -> m (Either IOError a)
tryIO forall a b. (a -> b) -> a -> b
$ mdo
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ( forall a. Maybe a -> Bool
isJust (TCPParameters -> Maybe Int
tcpUserTimeout TCPParameters
params) Bool -> Bool -> Bool
&&
Bool -> Bool
not (SocketOption -> Bool
N.isSupportedSocketOption SocketOption
N.UserTimeout)
) forall a b. (a -> b) -> a -> b
$
forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError forall a b. (a -> b) -> a -> b
$ HostName
"Network.Transport.TCP.createTransport: " forall a. [a] -> [a] -> [a]
++
HostName
"the parameter tcpUserTimeout is unsupported " forall a. [a] -> [a] -> [a]
++
HostName
"in this system."
(HostName
port', (Transport, TransportInternals)
result) <- do
let (HostName
externalHost, HostName
externalPort) = HostName -> (HostName, HostName)
mkExternal HostName
port'
let addrInfo :: TransportAddrInfo
addrInfo = TransportAddrInfo { transportHost :: HostName
transportHost = HostName
externalHost
, transportPort :: HostName
transportPort = HostName
externalPort
, transportBindHost :: HostName
transportBindHost = HostName
bindHost
, transportBindPort :: HostName
transportBindPort = HostName
port'
}
let transport :: TCPTransport
transport = TCPTransport { transportState :: MVar TransportState
transportState = MVar TransportState
state
, transportAddrInfo :: Maybe TransportAddrInfo
transportAddrInfo = forall a. a -> Maybe a
Just TransportAddrInfo
addrInfo
, transportParams :: TCPParameters
transportParams = TCPParameters
params
}
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracketOnError (HostName
-> HostName
-> Int
-> Bool
-> (SomeException -> IO ())
-> (SomeException -> IO ())
-> (IO () -> (Socket, SockAddr) -> IO ())
-> IO (HostName, ThreadId)
forkServer
HostName
bindHost
HostName
bindPort
(TCPParameters -> Int
tcpBacklog TCPParameters
params)
(TCPParameters -> Bool
tcpReuseServerAddr TCPParameters
params)
(TCPTransport -> SomeException -> IO ()
errorHandler TCPTransport
transport)
(TCPTransport -> SomeException -> IO ()
terminationHandler TCPTransport
transport)
(TCPTransport -> IO () -> (Socket, SockAddr) -> IO ()
handleConnectionRequest TCPTransport
transport))
(\(HostName
_port', ThreadId
tid) -> ThreadId -> IO ()
killThread ThreadId
tid)
(\(HostName
port'', ThreadId
tid) -> (HostName
port'',) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TCPTransport
-> Maybe ThreadId -> IO (Transport, TransportInternals)
mkTransport TCPTransport
transport (forall a. a -> Maybe a
Just ThreadId
tid))
forall (m :: * -> *) a. Monad m => a -> m a
return (Transport, TransportInternals)
result
where
mkTransport :: TCPTransport
-> Maybe ThreadId
-> IO (Transport, TransportInternals)
mkTransport :: TCPTransport
-> Maybe ThreadId -> IO (Transport, TransportInternals)
mkTransport TCPTransport
transport Maybe ThreadId
mtid = do
forall (m :: * -> *) a. Monad m => a -> m a
return
( Transport
{ newEndPoint :: IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint = do
QDisc Event
qdisc <- TCPParameters -> forall t. IO (QDisc t)
tcpNewQDisc TCPParameters
params
TCPTransport
-> QDisc Event
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
apiNewEndPoint TCPTransport
transport QDisc Event
qdisc
, closeTransport :: IO ()
closeTransport = let evs :: [Event]
evs = [ Event
EndPointClosed ]
in TCPTransport -> Maybe ThreadId -> [Event] -> IO ()
apiCloseTransport TCPTransport
transport Maybe ThreadId
mtid [Event]
evs
}
, TransportInternals
{ transportThread :: Maybe ThreadId
transportThread = Maybe ThreadId
mtid
, socketBetween :: EndPointAddress -> EndPointAddress -> IO Socket
socketBetween = TCPTransport -> EndPointAddress -> EndPointAddress -> IO Socket
internalSocketBetween TCPTransport
transport
, newEndPointInternal :: (forall t. Maybe (QDisc t))
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPointInternal = \forall t. Maybe (QDisc t)
mqdisc -> case forall t. Maybe (QDisc t)
mqdisc of
Just QDisc Event
qdisc -> TCPTransport
-> QDisc Event
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
apiNewEndPoint TCPTransport
transport QDisc Event
qdisc
Maybe (QDisc Event)
Nothing -> do
QDisc Event
qdisc <- TCPParameters -> forall t. IO (QDisc t)
tcpNewQDisc TCPParameters
params
TCPTransport
-> QDisc Event
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
apiNewEndPoint TCPTransport
transport QDisc Event
qdisc
}
)
errorHandler :: TCPTransport -> SomeException -> IO ()
errorHandler :: TCPTransport -> SomeException -> IO ()
errorHandler TCPTransport
_ = TCPParameters -> SomeException -> IO ()
tcpServerExceptionHandler TCPParameters
params
terminationHandler :: TCPTransport -> SomeException -> IO ()
terminationHandler :: TCPTransport -> SomeException -> IO ()
terminationHandler TCPTransport
transport SomeException
ex = do
let evs :: [Event]
evs = [ TransportError EventErrorCode -> Event
ErrorEvent (forall error. error -> HostName -> TransportError error
TransportError EventErrorCode
EventTransportFailed (forall a. Show a => a -> HostName
show SomeException
ex))
, forall a e. Exception e => e -> a
throw forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError HostName
"Transport closed"
]
TCPTransport -> Maybe ThreadId -> [Event] -> IO ()
apiCloseTransport TCPTransport
transport forall a. Maybe a
Nothing [Event]
evs
defaultTCPParameters :: TCPParameters
defaultTCPParameters :: TCPParameters
defaultTCPParameters = TCPParameters {
tcpBacklog :: Int
tcpBacklog = Int
N.maxListenQueue
, tcpReuseServerAddr :: Bool
tcpReuseServerAddr = Bool
True
, tcpReuseClientAddr :: Bool
tcpReuseClientAddr = Bool
True
, tcpNoDelay :: Bool
tcpNoDelay = Bool
False
, tcpKeepAlive :: Bool
tcpKeepAlive = Bool
False
, tcpUserTimeout :: Maybe Int
tcpUserTimeout = forall a. Maybe a
Nothing
, tcpNewQDisc :: forall t. IO (QDisc t)
tcpNewQDisc = forall t. IO (QDisc t)
simpleUnboundedQDisc
, transportConnectTimeout :: Maybe Int
transportConnectTimeout = forall a. Maybe a
Nothing
, tcpMaxAddressLength :: HeavyweightConnectionId
tcpMaxAddressLength = forall a. Bounded a => a
maxBound
, tcpMaxReceiveLength :: HeavyweightConnectionId
tcpMaxReceiveLength = forall a. Bounded a => a
maxBound
, tcpCheckPeerHost :: Bool
tcpCheckPeerHost = Bool
False
, tcpServerExceptionHandler :: SomeException -> IO ()
tcpServerExceptionHandler = forall e a. Exception e => e -> IO a
throwIO
}
apiCloseTransport :: TCPTransport -> Maybe ThreadId -> [Event] -> IO ()
apiCloseTransport :: TCPTransport -> Maybe ThreadId -> [Event] -> IO ()
apiCloseTransport TCPTransport
transport Maybe ThreadId
mTransportThread [Event]
evs =
forall a. (a -> IO ()) -> IO a -> IO a
asyncWhenCancelled forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ do
Maybe ValidTransportState
mTSt <- forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (TCPTransport -> MVar TransportState
transportState TCPTransport
transport) forall a b. (a -> b) -> a -> b
$ \TransportState
st -> case TransportState
st of
TransportValid ValidTransportState
vst -> forall (m :: * -> *) a. Monad m => a -> m a
return (TransportState
TransportClosed, forall a. a -> Maybe a
Just ValidTransportState
vst)
TransportState
TransportClosed -> forall (m :: * -> *) a. Monad m => a -> m a
return (TransportState
TransportClosed, forall a. Maybe a
Nothing)
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe ValidTransportState
mTSt forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (TCPTransport -> [Event] -> LocalEndPoint -> IO ()
apiCloseEndPoint TCPTransport
transport [Event]
evs) forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall r a. r -> T r a -> a
^. Accessor
ValidTransportState (Map HeavyweightConnectionId LocalEndPoint)
localEndPoints)
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe ThreadId
mTransportThread ThreadId -> IO ()
killThread
apiNewEndPoint :: TCPTransport
-> QDisc Event
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
apiNewEndPoint :: TCPTransport
-> QDisc Event
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
apiNewEndPoint TCPTransport
transport QDisc Event
qdisc =
forall e a. Exception e => IO a -> IO (Either e a)
try forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. (a -> IO ()) -> IO a -> IO a
asyncWhenCancelled EndPoint -> IO ()
closeEndPoint forall a b. (a -> b) -> a -> b
$ do
LocalEndPoint
ourEndPoint <- TCPTransport -> QDisc Event -> IO LocalEndPoint
createLocalEndPoint TCPTransport
transport QDisc Event
qdisc
forall (m :: * -> *) a. Monad m => a -> m a
return EndPoint
{ receive :: IO Event
receive = forall t. QDisc t -> IO t
qdiscDequeue (LocalEndPoint -> QDisc Event
localQueue LocalEndPoint
ourEndPoint)
, address :: EndPointAddress
address = LocalEndPoint -> EndPointAddress
localAddress LocalEndPoint
ourEndPoint
, connect :: EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect = TCPTransport
-> LocalEndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
apiConnect TCPTransport
transport LocalEndPoint
ourEndPoint
, closeEndPoint :: IO ()
closeEndPoint = let evs :: [Event]
evs = [ Event
EndPointClosed ]
in TCPTransport -> [Event] -> LocalEndPoint -> IO ()
apiCloseEndPoint TCPTransport
transport [Event]
evs LocalEndPoint
ourEndPoint
, newMulticastGroup :: IO
(Either (TransportError NewMulticastGroupErrorCode) MulticastGroup)
newMulticastGroup = forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ TransportError NewMulticastGroupErrorCode
newMulticastGroupError
, resolveMulticastGroup :: MulticastAddress
-> IO
(Either
(TransportError ResolveMulticastGroupErrorCode) MulticastGroup)
resolveMulticastGroup = forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> Either a b
Left forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> b -> a
const TransportError ResolveMulticastGroupErrorCode
resolveMulticastGroupError
}
where
newMulticastGroupError :: TransportError NewMulticastGroupErrorCode
newMulticastGroupError =
forall error. error -> HostName -> TransportError error
TransportError NewMulticastGroupErrorCode
NewMulticastGroupUnsupported HostName
"Multicast not supported"
resolveMulticastGroupError :: TransportError ResolveMulticastGroupErrorCode
resolveMulticastGroupError =
forall error. error -> HostName -> TransportError error
TransportError ResolveMulticastGroupErrorCode
ResolveMulticastGroupUnsupported HostName
"Multicast not supported"
data QDisc t = QDisc {
forall t. QDisc t -> IO t
qdiscDequeue :: IO t
, forall t. QDisc t -> EndPointAddress -> Event -> t -> IO ()
qdiscEnqueue :: EndPointAddress -> Event -> t -> IO ()
}
qdiscEnqueue' :: QDisc Event -> EndPointAddress -> Event -> IO ()
qdiscEnqueue' :: QDisc Event -> EndPointAddress -> Event -> IO ()
qdiscEnqueue' QDisc Event
qdisc EndPointAddress
addr Event
event = forall t. QDisc t -> EndPointAddress -> Event -> t -> IO ()
qdiscEnqueue QDisc Event
qdisc EndPointAddress
addr Event
event Event
event
simpleUnboundedQDisc :: forall t . IO (QDisc t)
simpleUnboundedQDisc :: forall t. IO (QDisc t)
simpleUnboundedQDisc = do
Chan t
eventChan <- forall a. IO (Chan a)
newChan
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ QDisc {
qdiscDequeue :: IO t
qdiscDequeue = forall a. Chan a -> IO a
readChan Chan t
eventChan
, qdiscEnqueue :: EndPointAddress -> Event -> t -> IO ()
qdiscEnqueue = forall a b. a -> b -> a
const (forall a b. a -> b -> a
const (forall a. Chan a -> a -> IO ()
writeChan Chan t
eventChan))
}
simpleOnePlaceQDisc :: forall t . IO (QDisc t)
simpleOnePlaceQDisc :: forall t. IO (QDisc t)
simpleOnePlaceQDisc = do
MVar t
mvar <- forall a. IO (MVar a)
newEmptyMVar
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ QDisc {
qdiscDequeue :: IO t
qdiscDequeue = forall a. MVar a -> IO a
takeMVar MVar t
mvar
, qdiscEnqueue :: EndPointAddress -> Event -> t -> IO ()
qdiscEnqueue = forall a b. a -> b -> a
const (forall a b. a -> b -> a
const (forall a. MVar a -> a -> IO ()
putMVar MVar t
mvar))
}
apiConnect :: TCPTransport
-> LocalEndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
apiConnect :: TCPTransport
-> LocalEndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
apiConnect TCPTransport
transport LocalEndPoint
ourEndPoint EndPointAddress
theirAddress Reliability
_reliability ConnectHints
hints =
forall e a. Exception e => IO a -> IO (Either e a)
try forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. (a -> IO ()) -> IO a -> IO a
asyncWhenCancelled Connection -> IO ()
close forall a b. (a -> b) -> a -> b
$
if LocalEndPoint -> EndPointAddress
localAddress LocalEndPoint
ourEndPoint forall a. Eq a => a -> a -> Bool
== EndPointAddress
theirAddress
then LocalEndPoint -> IO Connection
connectToSelf LocalEndPoint
ourEndPoint
else do
LocalEndPoint -> EndPointAddress -> IO ()
resetIfBroken LocalEndPoint
ourEndPoint EndPointAddress
theirAddress
(RemoteEndPoint
theirEndPoint, HeavyweightConnectionId
connId) <-
TCPTransport
-> LocalEndPoint
-> EndPointAddress
-> ConnectHints
-> IO (RemoteEndPoint, HeavyweightConnectionId)
createConnectionTo TCPTransport
transport LocalEndPoint
ourEndPoint EndPointAddress
theirAddress ConnectHints
hints
IORef Bool
connAlive <- forall a. a -> IO (IORef a)
newIORef Bool
True
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
{ send :: [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send = EndPointPair
-> HeavyweightConnectionId
-> IORef Bool
-> [ByteString]
-> IO (Either (TransportError SendErrorCode) ())
apiSend (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) HeavyweightConnectionId
connId IORef Bool
connAlive
, close :: IO ()
close = EndPointPair -> HeavyweightConnectionId -> IORef Bool -> IO ()
apiClose (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) HeavyweightConnectionId
connId IORef Bool
connAlive
}
where
params :: TCPParameters
params = TCPTransport -> TCPParameters
transportParams TCPTransport
transport
apiClose :: EndPointPair -> LightweightConnectionId -> IORef Bool -> IO ()
apiClose :: EndPointPair -> HeavyweightConnectionId -> IORef Bool -> IO ()
apiClose (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) HeavyweightConnectionId
connId IORef Bool
connAlive =
forall (m :: * -> *) a. Monad m => m a -> m ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadIO m => IO a -> m (Either IOError a)
tryIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. (a -> IO ()) -> IO a -> IO a
asyncWhenCancelled forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. IO a -> IO b -> IO a
finally
(forall a.
LocalEndPoint
-> ((RemoteEndPoint -> IO a -> IO ()) -> IO ()) -> IO ()
withScheduledAction LocalEndPoint
ourEndPoint forall a b. (a -> b) -> a -> b
$ \RemoteEndPoint -> IO () -> IO ()
sched -> do
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (RemoteEndPoint -> MVar RemoteState
remoteState RemoteEndPoint
theirEndPoint) forall a b. (a -> b) -> a -> b
$ \RemoteState
st -> case RemoteState
st of
RemoteEndPointValid ValidRemoteEndPointState
vst -> do
Bool
alive <- forall a. IORef a -> IO a
readIORef IORef Bool
connAlive
if Bool
alive
then do
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
connAlive Bool
False
RemoteEndPoint -> IO () -> IO ()
sched RemoteEndPoint
theirEndPoint forall a b. (a -> b) -> a -> b
$
ValidRemoteEndPointState -> [ByteString] -> IO ()
sendOn ValidRemoteEndPointState
vst [
HeavyweightConnectionId -> ByteString
encodeWord32 (ControlHeader -> HeavyweightConnectionId
encodeControlHeader ControlHeader
CloseConnection)
, HeavyweightConnectionId -> ByteString
encodeWord32 HeavyweightConnectionId
connId
]
forall (m :: * -> *) a. Monad m => a -> m a
return ( ValidRemoteEndPointState -> RemoteState
RemoteEndPointValid
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Accessor ValidRemoteEndPointState Int
remoteOutgoing forall r a. T r a -> (a -> a) -> r -> r
^: (\Int
x -> Int
x forall a. Num a => a -> a -> a
- Int
1))
forall a b. (a -> b) -> a -> b
$ ValidRemoteEndPointState
vst
)
else
forall (m :: * -> *) a. Monad m => a -> m a
return (ValidRemoteEndPointState -> RemoteState
RemoteEndPointValid ValidRemoteEndPointState
vst)
RemoteState
_ ->
forall (m :: * -> *) a. Monad m => a -> m a
return RemoteState
st)
(EndPointPair -> IO ()
closeIfUnused (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint))
apiSend :: EndPointPair
-> LightweightConnectionId
-> IORef Bool
-> [ByteString]
-> IO (Either (TransportError SendErrorCode) ())
apiSend :: EndPointPair
-> HeavyweightConnectionId
-> IORef Bool
-> [ByteString]
-> IO (Either (TransportError SendErrorCode) ())
apiSend (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) HeavyweightConnectionId
connId IORef Bool
connAlive [ByteString]
payload =
forall e a. Exception e => IO a -> IO (Either e a)
try forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e a. Exception e => (IOError -> e) -> IO a -> IO a
mapIOException IOError -> TransportError SendErrorCode
sendFailed forall a b. (a -> b) -> a -> b
$ forall a.
LocalEndPoint
-> ((RemoteEndPoint -> IO a -> IO ()) -> IO ()) -> IO ()
withScheduledAction LocalEndPoint
ourEndPoint forall a b. (a -> b) -> a -> b
$ \RemoteEndPoint -> IO () -> IO ()
sched -> do
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (RemoteEndPoint -> MVar RemoteState
remoteState RemoteEndPoint
theirEndPoint) forall a b. (a -> b) -> a -> b
$ \RemoteState
st -> case RemoteState
st of
RemoteEndPointInvalid TransportError ConnectErrorCode
_ ->
forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) HostName
"apiSend"
RemoteEndPointInit MVar ()
_ MVar ()
_ RequestedBy
_ ->
forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) HostName
"apiSend"
RemoteEndPointValid ValidRemoteEndPointState
vst -> do
Bool
alive <- forall a. IORef a -> IO a
readIORef IORef Bool
connAlive
if Bool
alive
then RemoteEndPoint -> IO () -> IO ()
sched RemoteEndPoint
theirEndPoint forall a b. (a -> b) -> a -> b
$
ValidRemoteEndPointState -> [ByteString] -> IO ()
sendOn ValidRemoteEndPointState
vst (HeavyweightConnectionId -> ByteString
encodeWord32 HeavyweightConnectionId
connId forall a. a -> [a] -> [a]
: [ByteString] -> [ByteString]
prependLength [ByteString]
payload)
else forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ forall error. error -> HostName -> TransportError error
TransportError SendErrorCode
SendClosed HostName
"Connection closed"
RemoteEndPointClosing MVar ()
_ ValidRemoteEndPointState
_ -> do
Bool
alive <- forall a. IORef a -> IO a
readIORef IORef Bool
connAlive
if Bool
alive
then forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) HostName
"apiSend RemoteEndPointClosing"
else forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ forall error. error -> HostName -> TransportError error
TransportError SendErrorCode
SendClosed HostName
"Connection closed"
RemoteState
RemoteEndPointClosed -> do
Bool
alive <- forall a. IORef a -> IO a
readIORef IORef Bool
connAlive
if Bool
alive
then forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ forall error. error -> HostName -> TransportError error
TransportError SendErrorCode
SendFailed HostName
"Remote endpoint closed"
else forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ forall error. error -> HostName -> TransportError error
TransportError SendErrorCode
SendClosed HostName
"Connection closed"
RemoteEndPointFailed IOError
err -> do
Bool
alive <- forall a. IORef a -> IO a
readIORef IORef Bool
connAlive
if Bool
alive
then forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ forall error. error -> HostName -> TransportError error
TransportError SendErrorCode
SendFailed (forall a. Show a => a -> HostName
show IOError
err)
else forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ forall error. error -> HostName -> TransportError error
TransportError SendErrorCode
SendClosed HostName
"Connection closed"
where
sendFailed :: IOError -> TransportError SendErrorCode
sendFailed = forall error. error -> HostName -> TransportError error
TransportError SendErrorCode
SendFailed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Show a => a -> HostName
show
apiCloseEndPoint :: TCPTransport
-> [Event]
-> LocalEndPoint
-> IO ()
apiCloseEndPoint :: TCPTransport -> [Event] -> LocalEndPoint -> IO ()
apiCloseEndPoint TCPTransport
transport [Event]
evs LocalEndPoint
ourEndPoint =
forall a. (a -> IO ()) -> IO a -> IO a
asyncWhenCancelled forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ do
TCPTransport -> LocalEndPoint -> IO ()
removeLocalEndPoint TCPTransport
transport LocalEndPoint
ourEndPoint
Maybe ValidLocalEndPointState
mOurState <- forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (LocalEndPoint -> MVar LocalEndPointState
localState LocalEndPoint
ourEndPoint) forall a b. (a -> b) -> a -> b
$ \LocalEndPointState
st ->
case LocalEndPointState
st of
LocalEndPointValid ValidLocalEndPointState
vst ->
forall (m :: * -> *) a. Monad m => a -> m a
return (LocalEndPointState
LocalEndPointClosed, forall a. a -> Maybe a
Just ValidLocalEndPointState
vst)
LocalEndPointState
LocalEndPointClosed ->
forall (m :: * -> *) a. Monad m => a -> m a
return (LocalEndPointState
LocalEndPointClosed, forall a. Maybe a
Nothing)
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe ValidLocalEndPointState
mOurState forall a b. (a -> b) -> a -> b
$ \ValidLocalEndPointState
vst -> do
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ValidLocalEndPointState
vst forall r a. r -> T r a -> a
^. Accessor
ValidLocalEndPointState (Map EndPointAddress RemoteEndPoint)
localConnections) RemoteEndPoint -> IO ()
tryCloseRemoteSocket
let qdisc :: QDisc Event
qdisc = LocalEndPoint -> QDisc Event
localQueue LocalEndPoint
ourEndPoint
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Event]
evs (QDisc Event -> EndPointAddress -> Event -> IO ()
qdiscEnqueue' QDisc Event
qdisc (LocalEndPoint -> EndPointAddress
localAddress LocalEndPoint
ourEndPoint))
where
tryCloseRemoteSocket :: RemoteEndPoint -> IO ()
tryCloseRemoteSocket :: RemoteEndPoint -> IO ()
tryCloseRemoteSocket RemoteEndPoint
theirEndPoint = forall a.
LocalEndPoint
-> ((RemoteEndPoint -> IO a -> IO ()) -> IO ()) -> IO ()
withScheduledAction LocalEndPoint
ourEndPoint forall a b. (a -> b) -> a -> b
$ \RemoteEndPoint -> IO () -> IO ()
sched -> do
let closed :: RemoteState
closed = IOError -> RemoteState
RemoteEndPointFailed forall b c a. (b -> c) -> (a -> b) -> a -> c
. HostName -> IOError
userError forall a b. (a -> b) -> a -> b
$ HostName
"apiCloseEndPoint"
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (RemoteEndPoint -> MVar RemoteState
remoteState RemoteEndPoint
theirEndPoint) forall a b. (a -> b) -> a -> b
$ \RemoteState
st ->
case RemoteState
st of
RemoteEndPointInvalid TransportError ConnectErrorCode
_ ->
forall (m :: * -> *) a. Monad m => a -> m a
return RemoteState
st
RemoteEndPointInit MVar ()
resolved MVar ()
_ RequestedBy
_ -> do
forall a. MVar a -> a -> IO ()
putMVar MVar ()
resolved ()
forall (m :: * -> *) a. Monad m => a -> m a
return RemoteState
closed
RemoteEndPointValid ValidRemoteEndPointState
vst -> do
RemoteEndPoint -> IO () -> IO ()
sched RemoteEndPoint
theirEndPoint forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a. Monad m => m a -> m ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m (Either IOError a)
tryIO forall a b. (a -> b) -> a -> b
$ ValidRemoteEndPointState -> [ByteString] -> IO ()
sendOn ValidRemoteEndPointState
vst
[ HeavyweightConnectionId -> ByteString
encodeWord32 (ControlHeader -> HeavyweightConnectionId
encodeControlHeader ControlHeader
CloseEndPoint) ]
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ValidRemoteEndPointState -> Maybe (IO ())
remoteProbing ValidRemoteEndPointState
vst) forall a. a -> a
id
Socket -> IO ()
tryShutdownSocketBoth (ValidRemoteEndPointState -> Socket
remoteSocket ValidRemoteEndPointState
vst)
ValidRemoteEndPointState -> IO ()
remoteSocketClosed ValidRemoteEndPointState
vst
forall (m :: * -> *) a. Monad m => a -> m a
return RemoteState
closed
RemoteEndPointClosing MVar ()
resolved ValidRemoteEndPointState
vst -> do
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ValidRemoteEndPointState -> Maybe (IO ())
remoteProbing ValidRemoteEndPointState
vst) forall a. a -> a
id
forall a. MVar a -> a -> IO ()
putMVar MVar ()
resolved ()
RemoteEndPoint -> IO () -> IO ()
sched RemoteEndPoint
theirEndPoint forall a b. (a -> b) -> a -> b
$ do
Socket -> IO ()
tryShutdownSocketBoth (ValidRemoteEndPointState -> Socket
remoteSocket ValidRemoteEndPointState
vst)
ValidRemoteEndPointState -> IO ()
remoteSocketClosed ValidRemoteEndPointState
vst
forall (m :: * -> *) a. Monad m => a -> m a
return RemoteState
closed
RemoteState
RemoteEndPointClosed ->
forall (m :: * -> *) a. Monad m => a -> m a
return RemoteState
st
RemoteEndPointFailed IOError
err ->
forall (m :: * -> *) a. Monad m => a -> m a
return (IOError -> RemoteState
RemoteEndPointFailed IOError
err)
handleConnectionRequest :: TCPTransport -> IO () -> (N.Socket, N.SockAddr) -> IO ()
handleConnectionRequest :: TCPTransport -> IO () -> (Socket, SockAddr) -> IO ()
handleConnectionRequest TCPTransport
transport IO ()
socketClosed (Socket
sock, SockAddr
sockAddr) = forall e a. Exception e => (e -> IO a) -> IO a -> IO a
handle SomeException -> IO ()
handleException forall a b. (a -> b) -> a -> b
$ do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (TCPParameters -> Bool
tcpNoDelay forall a b. (a -> b) -> a -> b
$ TCPTransport -> TCPParameters
transportParams TCPTransport
transport) forall a b. (a -> b) -> a -> b
$
Socket -> SocketOption -> Int -> IO ()
N.setSocketOption Socket
sock SocketOption
N.NoDelay Int
1
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (TCPParameters -> Bool
tcpKeepAlive forall a b. (a -> b) -> a -> b
$ TCPTransport -> TCPParameters
transportParams TCPTransport
transport) forall a b. (a -> b) -> a -> b
$
Socket -> SocketOption -> Int -> IO ()
N.setSocketOption Socket
sock SocketOption
N.KeepAlive Int
1
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (TCPParameters -> Maybe Int
tcpUserTimeout forall a b. (a -> b) -> a -> b
$ TCPTransport -> TCPParameters
transportParams TCPTransport
transport) forall a b. (a -> b) -> a -> b
$
Socket -> SocketOption -> Int -> IO ()
N.setSocketOption Socket
sock SocketOption
N.UserTimeout
let handleVersioned :: IO (Maybe (IO ()))
handleVersioned = do
HeavyweightConnectionId
protocolVersion <- Socket -> IO HeavyweightConnectionId
recvWord32 Socket
sock
HeavyweightConnectionId
handshakeLength <- Socket -> IO HeavyweightConnectionId
recvWord32 Socket
sock
case HeavyweightConnectionId
protocolVersion of
HeavyweightConnectionId
0x00000000 -> (Socket, SockAddr) -> IO (Maybe (IO ()))
handleConnectionRequestV0 (Socket
sock, SockAddr
sockAddr)
HeavyweightConnectionId
_ -> do
Socket -> [ByteString] -> IO ()
sendMany Socket
sock [
HeavyweightConnectionId -> ByteString
encodeWord32 (ConnectionRequestResponse -> HeavyweightConnectionId
encodeConnectionRequestResponse ConnectionRequestResponse
ConnectionRequestUnsupportedVersion)
, HeavyweightConnectionId -> ByteString
encodeWord32 HeavyweightConnectionId
0x00000000
]
[ByteString]
_ <- Socket -> HeavyweightConnectionId -> IO [ByteString]
recvExact Socket
sock HeavyweightConnectionId
handshakeLength
IO (Maybe (IO ()))
handleVersioned
let connTimeout :: Maybe Int
connTimeout = TCPParameters -> Maybe Int
transportConnectTimeout (TCPTransport -> TCPParameters
transportParams TCPTransport
transport)
Maybe (Maybe (IO ()))
outcome <- forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. a -> Maybe a
Just) forall a. Int -> IO a -> IO (Maybe a)
System.Timeout.timeout Maybe Int
connTimeout IO (Maybe (IO ()))
handleVersioned
case Maybe (Maybe (IO ()))
outcome of
Maybe (Maybe (IO ()))
Nothing -> forall e a. Exception e => e -> IO a
throwIO (HostName -> IOError
userError HostName
"handleConnectionRequest: timed out")
Just Maybe (IO ())
act -> forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (IO ())
act forall a. a -> a
id
where
handleException :: SomeException -> IO ()
handleException :: SomeException -> IO ()
handleException SomeException
ex = do
Maybe AsyncException -> IO ()
rethrowIfAsync (forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex)
rethrowIfAsync :: Maybe AsyncException -> IO ()
rethrowIfAsync :: Maybe AsyncException -> IO ()
rethrowIfAsync = forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall e a. Exception e => e -> IO a
throwIO
handleConnectionRequestV0 :: (N.Socket, N.SockAddr) -> IO (Maybe (IO ()))
handleConnectionRequestV0 :: (Socket, SockAddr) -> IO (Maybe (IO ()))
handleConnectionRequestV0 (Socket
sock, SockAddr
sockAddr) = do
(HostName
numericHost, HostName
resolvedHost, HostName
actualPort) <-
SockAddr -> IO (Maybe (HostName, HostName, HostName))
resolveSockAddr SockAddr
sockAddr forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall e a. Exception e => e -> IO a
throwIO (HostName -> IOError
userError HostName
"handleConnectionRequest: invalid socket address")) forall (m :: * -> *) a. Monad m => a -> m a
return
(HeavyweightConnectionId
ourEndPointId, EndPointAddress
theirAddress, Maybe HostName
mTheirHost) <- do
HeavyweightConnectionId
ourEndPointId <- Socket -> IO HeavyweightConnectionId
recvWord32 Socket
sock
let maxAddressLength :: HeavyweightConnectionId
maxAddressLength = TCPParameters -> HeavyweightConnectionId
tcpMaxAddressLength forall a b. (a -> b) -> a -> b
$ TCPTransport -> TCPParameters
transportParams TCPTransport
transport
ByteString
mTheirAddress <- [ByteString] -> ByteString
BS.concat forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> HeavyweightConnectionId -> Socket -> IO [ByteString]
recvWithLength HeavyweightConnectionId
maxAddressLength Socket
sock
if ByteString -> Bool
BS.null ByteString
mTheirAddress
then do
EndPointAddress
theirAddress <- IO EndPointAddress
randomEndPointAddress
forall (m :: * -> *) a. Monad m => a -> m a
return (HeavyweightConnectionId
ourEndPointId, EndPointAddress
theirAddress, forall a. Maybe a
Nothing)
else do
let theirAddress :: EndPointAddress
theirAddress = ByteString -> EndPointAddress
EndPointAddress ByteString
mTheirAddress
(HostName
theirHost, HostName
_, HeavyweightConnectionId
_)
<- forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall e a. Exception e => e -> IO a
throwIO (HostName -> IOError
userError HostName
"handleConnectionRequest: peer gave malformed address"))
forall (m :: * -> *) a. Monad m => a -> m a
return
(EndPointAddress
-> Maybe (HostName, HostName, HeavyweightConnectionId)
decodeEndPointAddress EndPointAddress
theirAddress)
forall (m :: * -> *) a. Monad m => a -> m a
return (HeavyweightConnectionId
ourEndPointId, EndPointAddress
theirAddress, forall a. a -> Maybe a
Just HostName
theirHost)
let checkPeerHost :: Bool
checkPeerHost = TCPParameters -> Bool
tcpCheckPeerHost (TCPTransport -> TCPParameters
transportParams TCPTransport
transport)
Bool
continue <- case (Maybe HostName
mTheirHost, Bool
checkPeerHost) of
(Just HostName
theirHost, Bool
True) -> do
if HostName
theirHost forall a. Eq a => a -> a -> Bool
== HostName
numericHost Bool -> Bool -> Bool
|| HostName
theirHost forall a. Eq a => a -> a -> Bool
== HostName
resolvedHost
then forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
else do
Socket -> [ByteString] -> IO ()
sendMany Socket
sock forall a b. (a -> b) -> a -> b
$
HeavyweightConnectionId -> ByteString
encodeWord32 (ConnectionRequestResponse -> HeavyweightConnectionId
encodeConnectionRequestResponse ConnectionRequestResponse
ConnectionRequestHostMismatch)
forall a. a -> [a] -> [a]
: ([ByteString] -> [ByteString]
prependLength [HostName -> ByteString
BSC.pack HostName
theirHost] forall a. [a] -> [a] -> [a]
++ [ByteString] -> [ByteString]
prependLength [HostName -> ByteString
BSC.pack HostName
numericHost] forall a. [a] -> [a] -> [a]
++ [ByteString] -> [ByteString]
prependLength [HostName -> ByteString
BSC.pack HostName
resolvedHost])
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
(Maybe HostName, Bool)
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
if Bool
continue
then do
LocalEndPoint
ourEndPoint <- forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (TCPTransport -> MVar TransportState
transportState TCPTransport
transport) forall a b. (a -> b) -> a -> b
$ \TransportState
st -> case TransportState
st of
TransportValid ValidTransportState
vst ->
case ValidTransportState
vst forall r a. r -> T r a -> a
^. HeavyweightConnectionId
-> Accessor ValidTransportState (Maybe LocalEndPoint)
localEndPointAt HeavyweightConnectionId
ourEndPointId of
Maybe LocalEndPoint
Nothing -> do
Socket -> [ByteString] -> IO ()
sendMany Socket
sock [HeavyweightConnectionId -> ByteString
encodeWord32 (ConnectionRequestResponse -> HeavyweightConnectionId
encodeConnectionRequestResponse ConnectionRequestResponse
ConnectionRequestInvalid)]
forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError HostName
"handleConnectionRequest: Invalid endpoint"
Just LocalEndPoint
ourEndPoint ->
forall (m :: * -> *) a. Monad m => a -> m a
return LocalEndPoint
ourEndPoint
TransportState
TransportClosed ->
forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError HostName
"Transport closed"
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just (LocalEndPoint -> EndPointAddress -> IO ()
go LocalEndPoint
ourEndPoint EndPointAddress
theirAddress))
else forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
where
go :: LocalEndPoint -> EndPointAddress -> IO ()
go :: LocalEndPoint -> EndPointAddress -> IO ()
go LocalEndPoint
ourEndPoint EndPointAddress
theirAddress = forall e a. Exception e => (e -> IO a) -> IO a -> IO a
handle SomeException -> IO ()
handleException forall a b. (a -> b) -> a -> b
$ do
LocalEndPoint -> EndPointAddress -> IO ()
resetIfBroken LocalEndPoint
ourEndPoint EndPointAddress
theirAddress
(RemoteEndPoint
theirEndPoint, Bool
isNew) <-
LocalEndPoint
-> EndPointAddress
-> RequestedBy
-> Maybe (IO ())
-> IO (RemoteEndPoint, Bool)
findRemoteEndPoint LocalEndPoint
ourEndPoint EndPointAddress
theirAddress RequestedBy
RequestedByThem forall a. Maybe a
Nothing
if Bool -> Bool
not Bool
isNew
then do
forall (m :: * -> *) a. Monad m => m a -> m ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m (Either IOError a)
tryIO forall a b. (a -> b) -> a -> b
$ Socket -> [ByteString] -> IO ()
sendMany Socket
sock
[HeavyweightConnectionId -> ByteString
encodeWord32 (ConnectionRequestResponse -> HeavyweightConnectionId
encodeConnectionRequestResponse ConnectionRequestResponse
ConnectionRequestCrossed)]
RemoteEndPoint -> IO ()
probeIfValid RemoteEndPoint
theirEndPoint
else do
MVar (Maybe SomeException)
sendLock <- forall a. a -> IO (MVar a)
newMVar forall a. Maybe a
Nothing
let vst :: ValidRemoteEndPointState
vst = ValidRemoteEndPointState
{ remoteSocket :: Socket
remoteSocket = Socket
sock
, remoteSocketClosed :: IO ()
remoteSocketClosed = IO ()
socketClosed
, remoteProbing :: Maybe (IO ())
remoteProbing = forall a. Maybe a
Nothing
, remoteSendLock :: MVar (Maybe SomeException)
remoteSendLock = MVar (Maybe SomeException)
sendLock
, _remoteOutgoing :: Int
_remoteOutgoing = Int
0
, _remoteIncoming :: Set HeavyweightConnectionId
_remoteIncoming = forall a. Set a
Set.empty
, _remoteLastIncoming :: HeavyweightConnectionId
_remoteLastIncoming = HeavyweightConnectionId
0
, _remoteNextConnOutId :: HeavyweightConnectionId
_remoteNextConnOutId = HeavyweightConnectionId
firstNonReservedLightweightConnectionId
}
Socket -> [ByteString] -> IO ()
sendMany Socket
sock [HeavyweightConnectionId -> ByteString
encodeWord32 (ConnectionRequestResponse -> HeavyweightConnectionId
encodeConnectionRequestResponse ConnectionRequestResponse
ConnectionRequestAccepted)]
EndPointPair -> RemoteState -> IO ()
resolveInit (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) (ValidRemoteEndPointState -> RemoteState
RemoteEndPointValid ValidRemoteEndPointState
vst)
forall a b. IO a -> IO b -> IO a
`finally`
TCPParameters -> EndPointPair -> IO ()
handleIncomingMessages (TCPTransport -> TCPParameters
transportParams TCPTransport
transport) (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
probeIfValid :: RemoteEndPoint -> IO ()
probeIfValid :: RemoteEndPoint -> IO ()
probeIfValid RemoteEndPoint
theirEndPoint = forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (RemoteEndPoint -> MVar RemoteState
remoteState RemoteEndPoint
theirEndPoint) forall a b. (a -> b) -> a -> b
$
\RemoteState
st -> case RemoteState
st of
RemoteEndPointValid
vst :: ValidRemoteEndPointState
vst@(ValidRemoteEndPointState { remoteProbing :: ValidRemoteEndPointState -> Maybe (IO ())
remoteProbing = Maybe (IO ())
Nothing }) -> do
ThreadId
tid <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ do
let params :: TCPParameters
params = TCPTransport -> TCPParameters
transportParams TCPTransport
transport
forall (m :: * -> *) a. Monad m => m a -> m ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m (Either IOError a)
tryIO forall a b. (a -> b) -> a -> b
$ forall a. Int -> IO a -> IO (Maybe a)
System.Timeout.timeout
(forall b a. b -> (a -> b) -> Maybe a -> b
maybe (-Int
1) forall a. a -> a
id forall a b. (a -> b) -> a -> b
$ TCPParameters -> Maybe Int
transportConnectTimeout TCPParameters
params) forall a b. (a -> b) -> a -> b
$ do
Socket -> [ByteString] -> IO ()
sendMany (ValidRemoteEndPointState -> Socket
remoteSocket ValidRemoteEndPointState
vst)
[HeavyweightConnectionId -> ByteString
encodeWord32 (ControlHeader -> HeavyweightConnectionId
encodeControlHeader ControlHeader
ProbeSocket)]
Int -> IO ()
threadDelay forall a. Bounded a => a
maxBound
Socket -> IO ()
tryCloseSocket (ValidRemoteEndPointState -> Socket
remoteSocket ValidRemoteEndPointState
vst)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ValidRemoteEndPointState -> RemoteState
RemoteEndPointValid
ValidRemoteEndPointState
vst { remoteProbing :: Maybe (IO ())
remoteProbing = forall a. a -> Maybe a
Just (ThreadId -> IO ()
killThread ThreadId
tid) }
RemoteState
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return RemoteState
st
handleIncomingMessages :: TCPParameters -> EndPointPair -> IO ()
handleIncomingMessages :: TCPParameters -> EndPointPair -> IO ()
handleIncomingMessages TCPParameters
params (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) =
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either IOError Socket)
acquire Either IOError Socket -> IO ()
release Either IOError Socket -> IO ()
act
where
acquire :: IO (Either IOError N.Socket)
acquire :: IO (Either IOError Socket)
acquire = forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar RemoteState
theirState forall a b. (a -> b) -> a -> b
$ \RemoteState
st -> case RemoteState
st of
RemoteEndPointInvalid TransportError ConnectErrorCode
_ ->
forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
HostName
"handleIncomingMessages (invalid)"
RemoteEndPointInit MVar ()
_ MVar ()
_ RequestedBy
_ ->
forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
HostName
"handleIncomingMessages (init)"
RemoteEndPointValid ValidRemoteEndPointState
ep ->
forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right forall a b. (a -> b) -> a -> b
$ ValidRemoteEndPointState -> Socket
remoteSocket ValidRemoteEndPointState
ep
RemoteEndPointClosing MVar ()
_ ValidRemoteEndPointState
ep ->
forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right forall a b. (a -> b) -> a -> b
$ ValidRemoteEndPointState -> Socket
remoteSocket ValidRemoteEndPointState
ep
RemoteState
RemoteEndPointClosed ->
forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError HostName
"handleIncomingMessages (already closed)"
RemoteEndPointFailed IOError
_ ->
forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError HostName
"handleIncomingMessages (failed)"
release :: Either IOError N.Socket -> IO ()
release :: Either IOError Socket -> IO ()
release (Left IOError
err) = IOError -> IO ()
prematureExit IOError
err
release (Right Socket
_) = forall (m :: * -> *) a. Monad m => a -> m a
return ()
act :: Either IOError N.Socket -> IO ()
act :: Either IOError Socket -> IO ()
act (Left IOError
_) = forall (m :: * -> *) a. Monad m => a -> m a
return ()
act (Right Socket
sock) = Socket -> IO ()
go Socket
sock forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` IOError -> IO ()
prematureExit
go :: N.Socket -> IO ()
go :: Socket -> IO ()
go Socket
sock = do
HeavyweightConnectionId
lcid <- Socket -> IO HeavyweightConnectionId
recvWord32 Socket
sock :: IO LightweightConnectionId
if HeavyweightConnectionId
lcid forall a. Ord a => a -> a -> Bool
>= HeavyweightConnectionId
firstNonReservedLightweightConnectionId
then do
Socket -> HeavyweightConnectionId -> IO ()
readMessage Socket
sock HeavyweightConnectionId
lcid
Socket -> IO ()
go Socket
sock
else
case HeavyweightConnectionId -> Maybe ControlHeader
decodeControlHeader HeavyweightConnectionId
lcid of
Just ControlHeader
CreatedNewConnection -> do
Socket -> IO HeavyweightConnectionId
recvWord32 Socket
sock forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= HeavyweightConnectionId -> IO ()
createdNewConnection
Socket -> IO ()
go Socket
sock
Just ControlHeader
CloseConnection -> do
Socket -> IO HeavyweightConnectionId
recvWord32 Socket
sock forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= HeavyweightConnectionId -> IO ()
closeConnection
Socket -> IO ()
go Socket
sock
Just ControlHeader
CloseSocket -> do
Bool
didClose <- Socket -> IO HeavyweightConnectionId
recvWord32 Socket
sock forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Socket -> HeavyweightConnectionId -> IO Bool
closeSocket Socket
sock
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
didClose forall a b. (a -> b) -> a -> b
$ Socket -> IO ()
go Socket
sock
Just ControlHeader
CloseEndPoint -> do
let closeRemoteEndPoint :: ValidRemoteEndPointState -> IO ()
closeRemoteEndPoint ValidRemoteEndPointState
vst = do
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ValidRemoteEndPointState -> Maybe (IO ())
remoteProbing ValidRemoteEndPointState
vst) forall a. a -> a
id
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (forall a. Set a -> [a]
Set.elems forall a b. (a -> b) -> a -> b
$ ValidRemoteEndPointState
vst forall r a. r -> T r a -> a
^. Accessor ValidRemoteEndPointState (Set HeavyweightConnectionId)
remoteIncoming) forall a b. (a -> b) -> a -> b
$
QDisc Event -> EndPointAddress -> Event -> IO ()
qdiscEnqueue' QDisc Event
ourQueue EndPointAddress
theirAddr forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId -> Event
ConnectionClosed forall b c a. (b -> c) -> (a -> b) -> a -> c
. HeavyweightConnectionId -> ConnectionId
connId
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ValidRemoteEndPointState
vst forall r a. r -> T r a -> a
^. Accessor ValidRemoteEndPointState Int
remoteOutgoing forall a. Ord a => a -> a -> Bool
> Int
0) forall a b. (a -> b) -> a -> b
$ do
let code :: EventErrorCode
code = EndPointAddress -> EventErrorCode
EventConnectionLost (RemoteEndPoint -> EndPointAddress
remoteAddress RemoteEndPoint
theirEndPoint)
QDisc Event -> EndPointAddress -> Event -> IO ()
qdiscEnqueue' QDisc Event
ourQueue EndPointAddress
theirAddr forall b c a. (b -> c) -> (a -> b) -> a -> c
. TransportError EventErrorCode -> Event
ErrorEvent forall a b. (a -> b) -> a -> b
$
forall error. error -> HostName -> TransportError error
TransportError EventErrorCode
code HostName
"The remote endpoint was closed."
EndPointPair -> IO ()
removeRemoteEndPoint (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar RemoteState
theirState forall a b. (a -> b) -> a -> b
$ \RemoteState
s -> case RemoteState
s of
RemoteEndPointValid ValidRemoteEndPointState
vst -> do
ValidRemoteEndPointState -> IO ()
closeRemoteEndPoint ValidRemoteEndPointState
vst
forall (m :: * -> *) a. Monad m => a -> m a
return RemoteState
RemoteEndPointClosed
RemoteEndPointClosing MVar ()
resolved ValidRemoteEndPointState
vst -> do
ValidRemoteEndPointState -> IO ()
closeRemoteEndPoint ValidRemoteEndPointState
vst
forall a. MVar a -> a -> IO ()
putMVar MVar ()
resolved ()
forall (m :: * -> *) a. Monad m => a -> m a
return RemoteState
RemoteEndPointClosed
RemoteState
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return RemoteState
s
Just ControlHeader
ProbeSocket -> do
IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ Socket -> [ByteString] -> IO ()
sendMany Socket
sock [HeavyweightConnectionId -> ByteString
encodeWord32 (ControlHeader -> HeavyweightConnectionId
encodeControlHeader ControlHeader
ProbeSocketAck)]
Socket -> IO ()
go Socket
sock
Just ControlHeader
ProbeSocketAck -> do
IO ()
stopProbing
Socket -> IO ()
go Socket
sock
Maybe ControlHeader
Nothing ->
forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError HostName
"Invalid control request"
createdNewConnection :: LightweightConnectionId -> IO ()
createdNewConnection :: HeavyweightConnectionId -> IO ()
createdNewConnection HeavyweightConnectionId
lcid = do
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar RemoteState
theirState forall a b. (a -> b) -> a -> b
$ \RemoteState
st -> do
ValidRemoteEndPointState
vst <- case RemoteState
st of
RemoteEndPointInvalid TransportError ConnectErrorCode
_ ->
forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
HostName
"handleIncomingMessages:createNewConnection (invalid)"
RemoteEndPointInit MVar ()
_ MVar ()
_ RequestedBy
_ ->
forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
HostName
"handleIncomingMessages:createNewConnection (init)"
RemoteEndPointValid ValidRemoteEndPointState
vst ->
forall (m :: * -> *) a. Monad m => a -> m a
return ( (Accessor ValidRemoteEndPointState (Set HeavyweightConnectionId)
remoteIncoming forall r a. T r a -> (a -> a) -> r -> r
^: forall a. Ord a => a -> Set a -> Set a
Set.insert HeavyweightConnectionId
lcid)
forall a b. (a -> b) -> a -> b
$ (Accessor ValidRemoteEndPointState HeavyweightConnectionId
remoteLastIncoming forall r a. T r a -> a -> r -> r
^= HeavyweightConnectionId
lcid)
ValidRemoteEndPointState
vst
)
RemoteEndPointClosing MVar ()
resolved ValidRemoteEndPointState
vst -> do
forall a. MVar a -> a -> IO ()
putMVar MVar ()
resolved ()
forall (m :: * -> *) a. Monad m => a -> m a
return ( (Accessor ValidRemoteEndPointState (Set HeavyweightConnectionId)
remoteIncoming forall r a. T r a -> a -> r -> r
^= forall a. a -> Set a
Set.singleton HeavyweightConnectionId
lcid)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Accessor ValidRemoteEndPointState HeavyweightConnectionId
remoteLastIncoming forall r a. T r a -> a -> r -> r
^= HeavyweightConnectionId
lcid)
forall a b. (a -> b) -> a -> b
$ ValidRemoteEndPointState
vst
)
RemoteEndPointFailed IOError
err ->
forall e a. Exception e => e -> IO a
throwIO IOError
err
RemoteState
RemoteEndPointClosed ->
forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
HostName
"createNewConnection (closed)"
forall (m :: * -> *) a. Monad m => a -> m a
return (ValidRemoteEndPointState -> RemoteState
RemoteEndPointValid ValidRemoteEndPointState
vst)
QDisc Event -> EndPointAddress -> Event -> IO ()
qdiscEnqueue' QDisc Event
ourQueue EndPointAddress
theirAddr (ConnectionId -> Reliability -> EndPointAddress -> Event
ConnectionOpened (HeavyweightConnectionId -> ConnectionId
connId HeavyweightConnectionId
lcid) Reliability
ReliableOrdered EndPointAddress
theirAddr)
closeConnection :: LightweightConnectionId -> IO ()
closeConnection :: HeavyweightConnectionId -> IO ()
closeConnection HeavyweightConnectionId
lcid = do
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar RemoteState
theirState forall a b. (a -> b) -> a -> b
$ \RemoteState
st -> case RemoteState
st of
RemoteEndPointInvalid TransportError ConnectErrorCode
_ ->
forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) HostName
"closeConnection (invalid)"
RemoteEndPointInit MVar ()
_ MVar ()
_ RequestedBy
_ ->
forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) HostName
"closeConnection (init)"
RemoteEndPointValid ValidRemoteEndPointState
vst -> do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (forall a. Ord a => a -> Set a -> Bool
Set.member HeavyweightConnectionId
lcid (ValidRemoteEndPointState
vst forall r a. r -> T r a -> a
^. Accessor ValidRemoteEndPointState (Set HeavyweightConnectionId)
remoteIncoming)) forall a b. (a -> b) -> a -> b
$
forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError HostName
"Invalid CloseConnection"
forall (m :: * -> *) a. Monad m => a -> m a
return ( ValidRemoteEndPointState -> RemoteState
RemoteEndPointValid
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Accessor ValidRemoteEndPointState (Set HeavyweightConnectionId)
remoteIncoming forall r a. T r a -> (a -> a) -> r -> r
^: forall a. Ord a => a -> Set a -> Set a
Set.delete HeavyweightConnectionId
lcid)
forall a b. (a -> b) -> a -> b
$ ValidRemoteEndPointState
vst
)
RemoteEndPointClosing MVar ()
_ ValidRemoteEndPointState
_ ->
forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError HostName
"Invalid CloseConnection request"
RemoteEndPointFailed IOError
err ->
forall e a. Exception e => e -> IO a
throwIO IOError
err
RemoteState
RemoteEndPointClosed ->
forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) HostName
"closeConnection (closed)"
QDisc Event -> EndPointAddress -> Event -> IO ()
qdiscEnqueue' QDisc Event
ourQueue EndPointAddress
theirAddr (ConnectionId -> Event
ConnectionClosed (HeavyweightConnectionId -> ConnectionId
connId HeavyweightConnectionId
lcid))
closeSocket :: N.Socket -> LightweightConnectionId -> IO Bool
closeSocket :: Socket -> HeavyweightConnectionId -> IO Bool
closeSocket Socket
sock HeavyweightConnectionId
lastReceivedId = do
Maybe (Action ())
mAct <- forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar RemoteState
theirState forall a b. (a -> b) -> a -> b
$ \RemoteState
st -> do
case RemoteState
st of
RemoteEndPointInvalid TransportError ConnectErrorCode
_ ->
forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
HostName
"handleIncomingMessages:closeSocket (invalid)"
RemoteEndPointInit MVar ()
_ MVar ()
_ RequestedBy
_ ->
forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
HostName
"handleIncomingMessages:closeSocket (init)"
RemoteEndPointValid ValidRemoteEndPointState
vst -> do
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (forall a. Set a -> [a]
Set.elems forall a b. (a -> b) -> a -> b
$ ValidRemoteEndPointState
vst forall r a. r -> T r a -> a
^. Accessor ValidRemoteEndPointState (Set HeavyweightConnectionId)
remoteIncoming) forall a b. (a -> b) -> a -> b
$
QDisc Event -> EndPointAddress -> Event -> IO ()
qdiscEnqueue' QDisc Event
ourQueue EndPointAddress
theirAddr forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId -> Event
ConnectionClosed forall b c a. (b -> c) -> (a -> b) -> a -> c
. HeavyweightConnectionId -> ConnectionId
connId
let vst' :: ValidRemoteEndPointState
vst' = Accessor ValidRemoteEndPointState (Set HeavyweightConnectionId)
remoteIncoming forall r a. T r a -> a -> r -> r
^= forall a. Set a
Set.empty forall a b. (a -> b) -> a -> b
$ ValidRemoteEndPointState
vst
if ValidRemoteEndPointState
vst forall r a. r -> T r a -> a
^. Accessor ValidRemoteEndPointState Int
remoteOutgoing forall a. Ord a => a -> a -> Bool
> Int
0 Bool -> Bool -> Bool
|| HeavyweightConnectionId
lastReceivedId forall a. Eq a => a -> a -> Bool
/= ValidRemoteEndPointState -> HeavyweightConnectionId
lastSentId ValidRemoteEndPointState
vst
then
forall (m :: * -> *) a. Monad m => a -> m a
return (ValidRemoteEndPointState -> RemoteState
RemoteEndPointValid ValidRemoteEndPointState
vst', forall a. Maybe a
Nothing)
else do
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ValidRemoteEndPointState -> Maybe (IO ())
remoteProbing ValidRemoteEndPointState
vst) forall a. a -> a
id
EndPointPair -> IO ()
removeRemoteEndPoint (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
Action ()
act <- forall a. RemoteEndPoint -> IO a -> IO (Action a)
schedule RemoteEndPoint
theirEndPoint forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a. Monad m => m a -> m ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m (Either IOError a)
tryIO forall a b. (a -> b) -> a -> b
$ ValidRemoteEndPointState -> [ByteString] -> IO ()
sendOn ValidRemoteEndPointState
vst'
[ HeavyweightConnectionId -> ByteString
encodeWord32 (ControlHeader -> HeavyweightConnectionId
encodeControlHeader ControlHeader
CloseSocket)
, HeavyweightConnectionId -> ByteString
encodeWord32 (ValidRemoteEndPointState
vst forall r a. r -> T r a -> a
^. Accessor ValidRemoteEndPointState HeavyweightConnectionId
remoteLastIncoming)
]
forall (m :: * -> *) a. Monad m => a -> m a
return (RemoteState
RemoteEndPointClosed, forall a. a -> Maybe a
Just Action ()
act)
RemoteEndPointClosing MVar ()
resolved ValidRemoteEndPointState
vst -> do
if HeavyweightConnectionId
lastReceivedId forall a. Eq a => a -> a -> Bool
/= ValidRemoteEndPointState -> HeavyweightConnectionId
lastSentId ValidRemoteEndPointState
vst
then do
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar () -> ValidRemoteEndPointState -> RemoteState
RemoteEndPointClosing MVar ()
resolved ValidRemoteEndPointState
vst, forall a. Maybe a
Nothing)
else do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ValidRemoteEndPointState
vst forall r a. r -> T r a -> a
^. Accessor ValidRemoteEndPointState Int
remoteOutgoing forall a. Ord a => a -> a -> Bool
> Int
0) forall a b. (a -> b) -> a -> b
$ do
let code :: EventErrorCode
code = EndPointAddress -> EventErrorCode
EventConnectionLost (RemoteEndPoint -> EndPointAddress
remoteAddress RemoteEndPoint
theirEndPoint)
let msg :: HostName
msg = HostName
"socket closed prematurely by peer"
QDisc Event -> EndPointAddress -> Event -> IO ()
qdiscEnqueue' QDisc Event
ourQueue EndPointAddress
theirAddr forall b c a. (b -> c) -> (a -> b) -> a -> c
. TransportError EventErrorCode -> Event
ErrorEvent forall a b. (a -> b) -> a -> b
$ forall error. error -> HostName -> TransportError error
TransportError EventErrorCode
code HostName
msg
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ValidRemoteEndPointState -> Maybe (IO ())
remoteProbing ValidRemoteEndPointState
vst) forall a. a -> a
id
EndPointPair -> IO ()
removeRemoteEndPoint (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
Action ()
act <- forall a. RemoteEndPoint -> IO a -> IO (Action a)
schedule RemoteEndPoint
theirEndPoint forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
resolved ()
forall (m :: * -> *) a. Monad m => a -> m a
return (RemoteState
RemoteEndPointClosed, forall a. a -> Maybe a
Just Action ()
act)
RemoteEndPointFailed IOError
err ->
forall e a. Exception e => e -> IO a
throwIO IOError
err
RemoteState
RemoteEndPointClosed ->
forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
HostName
"handleIncomingMessages:closeSocket (closed)"
case Maybe (Action ())
mAct of
Maybe (Action ())
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Just Action ()
act -> do
forall a. EndPointPair -> Action a -> IO a
runScheduledAction (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) Action ()
act
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
readMessage :: N.Socket -> LightweightConnectionId -> IO ()
readMessage :: Socket -> HeavyweightConnectionId -> IO ()
readMessage Socket
sock HeavyweightConnectionId
lcid =
HeavyweightConnectionId -> Socket -> IO [ByteString]
recvWithLength HeavyweightConnectionId
recvLimit Socket
sock forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
QDisc Event -> EndPointAddress -> Event -> IO ()
qdiscEnqueue' QDisc Event
ourQueue EndPointAddress
theirAddr forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId -> [ByteString] -> Event
Received (HeavyweightConnectionId -> ConnectionId
connId HeavyweightConnectionId
lcid)
stopProbing :: IO ()
stopProbing :: IO ()
stopProbing = forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar RemoteState
theirState forall a b. (a -> b) -> a -> b
$ \RemoteState
st -> case RemoteState
st of
RemoteEndPointValid
vst :: ValidRemoteEndPointState
vst@(ValidRemoteEndPointState { remoteProbing :: ValidRemoteEndPointState -> Maybe (IO ())
remoteProbing = Just IO ()
stop }) -> do
IO ()
stop
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ValidRemoteEndPointState -> RemoteState
RemoteEndPointValid ValidRemoteEndPointState
vst { remoteProbing :: Maybe (IO ())
remoteProbing = forall a. Maybe a
Nothing }
RemoteState
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return RemoteState
st
ourQueue :: QDisc Event
ourQueue = LocalEndPoint -> QDisc Event
localQueue LocalEndPoint
ourEndPoint
ourState :: MVar LocalEndPointState
ourState = LocalEndPoint -> MVar LocalEndPointState
localState LocalEndPoint
ourEndPoint
theirState :: MVar RemoteState
theirState = RemoteEndPoint -> MVar RemoteState
remoteState RemoteEndPoint
theirEndPoint
theirAddr :: EndPointAddress
theirAddr = RemoteEndPoint -> EndPointAddress
remoteAddress RemoteEndPoint
theirEndPoint
recvLimit :: HeavyweightConnectionId
recvLimit = TCPParameters -> HeavyweightConnectionId
tcpMaxReceiveLength TCPParameters
params
prematureExit :: IOException -> IO ()
prematureExit :: IOError -> IO ()
prematureExit IOError
err = do
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar RemoteState
theirState forall a b. (a -> b) -> a -> b
$ \RemoteState
st ->
case RemoteState
st of
RemoteEndPointInvalid TransportError ConnectErrorCode
_ ->
forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
HostName
"handleIncomingMessages:prematureExit"
RemoteEndPointInit MVar ()
_ MVar ()
_ RequestedBy
_ ->
forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
HostName
"handleIncomingMessages:prematureExit"
RemoteEndPointValid ValidRemoteEndPointState
vst -> do
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ValidRemoteEndPointState -> Maybe (IO ())
remoteProbing ValidRemoteEndPointState
vst) forall a. a -> a
id
let code :: EventErrorCode
code = EndPointAddress -> EventErrorCode
EventConnectionLost (RemoteEndPoint -> EndPointAddress
remoteAddress RemoteEndPoint
theirEndPoint)
QDisc Event -> EndPointAddress -> Event -> IO ()
qdiscEnqueue' QDisc Event
ourQueue EndPointAddress
theirAddr forall b c a. (b -> c) -> (a -> b) -> a -> c
. TransportError EventErrorCode -> Event
ErrorEvent forall a b. (a -> b) -> a -> b
$ forall error. error -> HostName -> TransportError error
TransportError EventErrorCode
code (forall a. Show a => a -> HostName
show IOError
err)
forall (m :: * -> *) a. Monad m => a -> m a
return (IOError -> RemoteState
RemoteEndPointFailed IOError
err)
RemoteEndPointClosing MVar ()
resolved ValidRemoteEndPointState
vst -> do
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ValidRemoteEndPointState -> Maybe (IO ())
remoteProbing ValidRemoteEndPointState
vst) forall a. a -> a
id
forall a. MVar a -> a -> IO ()
putMVar MVar ()
resolved ()
forall (m :: * -> *) a. Monad m => a -> m a
return (IOError -> RemoteState
RemoteEndPointFailed IOError
err)
RemoteState
RemoteEndPointClosed ->
forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
HostName
"handleIncomingMessages:prematureExit"
RemoteEndPointFailed IOError
err' -> do
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar LocalEndPointState
ourState forall a b. (a -> b) -> a -> b
$ \LocalEndPointState
st' -> case LocalEndPointState
st' of
LocalEndPointState
LocalEndPointClosed -> forall (m :: * -> *) a. Monad m => a -> m a
return LocalEndPointState
st'
LocalEndPointValid ValidLocalEndPointState
_ -> do
let code :: EventErrorCode
code = EndPointAddress -> EventErrorCode
EventConnectionLost (RemoteEndPoint -> EndPointAddress
remoteAddress RemoteEndPoint
theirEndPoint)
err :: TransportError EventErrorCode
err = forall error. error -> HostName -> TransportError error
TransportError EventErrorCode
code (forall a. Show a => a -> HostName
show IOError
err')
QDisc Event -> EndPointAddress -> Event -> IO ()
qdiscEnqueue' QDisc Event
ourQueue EndPointAddress
theirAddr (TransportError EventErrorCode -> Event
ErrorEvent TransportError EventErrorCode
err)
forall (m :: * -> *) a. Monad m => a -> m a
return LocalEndPointState
st'
forall (m :: * -> *) a. Monad m => a -> m a
return (IOError -> RemoteState
RemoteEndPointFailed IOError
err')
connId :: LightweightConnectionId -> ConnectionId
connId :: HeavyweightConnectionId -> ConnectionId
connId = HeavyweightConnectionId -> HeavyweightConnectionId -> ConnectionId
createConnectionId (RemoteEndPoint -> HeavyweightConnectionId
remoteId RemoteEndPoint
theirEndPoint)
lastSentId :: ValidRemoteEndPointState -> LightweightConnectionId
lastSentId :: ValidRemoteEndPointState -> HeavyweightConnectionId
lastSentId ValidRemoteEndPointState
vst =
if ValidRemoteEndPointState
vst forall r a. r -> T r a -> a
^. Accessor ValidRemoteEndPointState HeavyweightConnectionId
remoteNextConnOutId forall a. Eq a => a -> a -> Bool
== HeavyweightConnectionId
firstNonReservedLightweightConnectionId
then HeavyweightConnectionId
0
else (ValidRemoteEndPointState
vst forall r a. r -> T r a -> a
^. Accessor ValidRemoteEndPointState HeavyweightConnectionId
remoteNextConnOutId) forall a. Num a => a -> a -> a
- HeavyweightConnectionId
1
createConnectionTo
:: TCPTransport
-> LocalEndPoint
-> EndPointAddress
-> ConnectHints
-> IO (RemoteEndPoint, LightweightConnectionId)
createConnectionTo :: TCPTransport
-> LocalEndPoint
-> EndPointAddress
-> ConnectHints
-> IO (RemoteEndPoint, HeavyweightConnectionId)
createConnectionTo TCPTransport
transport LocalEndPoint
ourEndPoint EndPointAddress
theirAddress ConnectHints
hints = do
Maybe (IO ())
timer <- case Maybe Int
connTimeout of
Just Int
t -> do
MVar ()
mv <- forall a. IO (MVar a)
newEmptyMVar
ThreadId
_ <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
t forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. MVar a -> a -> IO ()
putMVar MVar ()
mv ()
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
readMVar MVar ()
mv
Maybe Int
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Maybe (IO ())
-> Maybe (RemoteEndPoint, ConnectionRequestResponse)
-> IO (RemoteEndPoint, HeavyweightConnectionId)
go Maybe (IO ())
timer forall a. Maybe a
Nothing
where
params :: TCPParameters
params = TCPTransport -> TCPParameters
transportParams TCPTransport
transport
connTimeout :: Maybe Int
connTimeout = ConnectHints -> Maybe Int
connectTimeout ConnectHints
hints forall (m :: * -> *) a. MonadPlus m => m a -> m a -> m a
`mplus` TCPParameters -> Maybe Int
transportConnectTimeout TCPParameters
params
go :: Maybe (IO ())
-> Maybe (RemoteEndPoint, ConnectionRequestResponse)
-> IO (RemoteEndPoint, HeavyweightConnectionId)
go Maybe (IO ())
timer Maybe (RemoteEndPoint, ConnectionRequestResponse)
mr = do
(RemoteEndPoint
theirEndPoint, Bool
isNew) <- forall e a. Exception e => (IOError -> e) -> IO a -> IO a
mapIOException IOError -> TransportError ConnectErrorCode
connectFailed
(LocalEndPoint
-> EndPointAddress
-> RequestedBy
-> Maybe (IO ())
-> IO (RemoteEndPoint, Bool)
findRemoteEndPoint LocalEndPoint
ourEndPoint EndPointAddress
theirAddress RequestedBy
RequestedByUs Maybe (IO ())
timer)
forall a b. IO a -> IO b -> IO a
`finally` case Maybe (RemoteEndPoint, ConnectionRequestResponse)
mr of
Just (RemoteEndPoint
theirEndPoint, ConnectionRequestResponse
ConnectionRequestCrossed) ->
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (RemoteEndPoint -> MVar RemoteState
remoteState RemoteEndPoint
theirEndPoint) forall a b. (a -> b) -> a -> b
$
\RemoteState
rst -> case RemoteState
rst of
RemoteEndPointInit MVar ()
resolved MVar ()
_ RequestedBy
_ -> do
forall a. MVar a -> a -> IO ()
putMVar MVar ()
resolved ()
EndPointPair -> IO ()
removeRemoteEndPoint (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
forall (m :: * -> *) a. Monad m => a -> m a
return RemoteState
RemoteEndPointClosed
RemoteState
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return RemoteState
rst
Maybe (RemoteEndPoint, ConnectionRequestResponse)
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
if Bool
isNew
then do
Maybe ConnectionRequestResponse
mr' <- forall e a. Exception e => (e -> IO a) -> IO a -> IO a
handle (forall a. a -> SomeException -> IO a
absorbAllExceptions forall a. Maybe a
Nothing) forall a b. (a -> b) -> a -> b
$
TCPTransport
-> EndPointPair
-> Maybe Int
-> IO (Maybe ConnectionRequestResponse)
setupRemoteEndPoint TCPTransport
transport (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) Maybe Int
connTimeout
Maybe (IO ())
-> Maybe (RemoteEndPoint, ConnectionRequestResponse)
-> IO (RemoteEndPoint, HeavyweightConnectionId)
go Maybe (IO ())
timer (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((,) RemoteEndPoint
theirEndPoint) Maybe ConnectionRequestResponse
mr')
else do
forall e a. Exception e => (IOError -> e) -> IO a -> IO a
mapIOException IOError -> TransportError ConnectErrorCode
connectFailed forall a b. (a -> b) -> a -> b
$ do
Action HeavyweightConnectionId
act <- forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (RemoteEndPoint -> MVar RemoteState
remoteState RemoteEndPoint
theirEndPoint) forall a b. (a -> b) -> a -> b
$ \RemoteState
st -> case RemoteState
st of
RemoteEndPointValid ValidRemoteEndPointState
vst -> do
let connId :: HeavyweightConnectionId
connId = ValidRemoteEndPointState
vst forall r a. r -> T r a -> a
^. Accessor ValidRemoteEndPointState HeavyweightConnectionId
remoteNextConnOutId
Action HeavyweightConnectionId
act <- forall a. RemoteEndPoint -> IO a -> IO (Action a)
schedule RemoteEndPoint
theirEndPoint forall a b. (a -> b) -> a -> b
$ do
ValidRemoteEndPointState -> [ByteString] -> IO ()
sendOn ValidRemoteEndPointState
vst [
HeavyweightConnectionId -> ByteString
encodeWord32 (ControlHeader -> HeavyweightConnectionId
encodeControlHeader ControlHeader
CreatedNewConnection)
, HeavyweightConnectionId -> ByteString
encodeWord32 HeavyweightConnectionId
connId
]
forall (m :: * -> *) a. Monad m => a -> m a
return HeavyweightConnectionId
connId
forall (m :: * -> *) a. Monad m => a -> m a
return ( ValidRemoteEndPointState -> RemoteState
RemoteEndPointValid
forall a b. (a -> b) -> a -> b
$ Accessor ValidRemoteEndPointState HeavyweightConnectionId
remoteNextConnOutId forall r a. T r a -> a -> r -> r
^= HeavyweightConnectionId
connId forall a. Num a => a -> a -> a
+ HeavyweightConnectionId
1
forall a b. (a -> b) -> a -> b
$ ValidRemoteEndPointState
vst
, Action HeavyweightConnectionId
act
)
RemoteEndPointInvalid TransportError ConnectErrorCode
err ->
forall e a. Exception e => e -> IO a
throwIO TransportError ConnectErrorCode
err
RemoteEndPointFailed IOError
err ->
forall e a. Exception e => e -> IO a
throwIO IOError
err
RemoteState
_ ->
forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) HostName
"createConnectionTo"
HeavyweightConnectionId
connId <- forall a. EndPointPair -> Action a -> IO a
runScheduledAction (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) Action HeavyweightConnectionId
act
forall (m :: * -> *) a. Monad m => a -> m a
return (RemoteEndPoint
theirEndPoint, HeavyweightConnectionId
connId)
connectFailed :: IOException -> TransportError ConnectErrorCode
connectFailed :: IOError -> TransportError ConnectErrorCode
connectFailed = forall error. error -> HostName -> TransportError error
TransportError ConnectErrorCode
ConnectFailed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Show a => a -> HostName
show
absorbAllExceptions :: a -> SomeException -> IO a
absorbAllExceptions :: forall a. a -> SomeException -> IO a
absorbAllExceptions a
a SomeException
_ex =
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
setupRemoteEndPoint
:: TCPTransport
-> EndPointPair
-> Maybe Int
-> IO (Maybe ConnectionRequestResponse)
setupRemoteEndPoint :: TCPTransport
-> EndPointPair
-> Maybe Int
-> IO (Maybe ConnectionRequestResponse)
setupRemoteEndPoint TCPTransport
transport (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) Maybe Int
connTimeout = do
let mOurAddress :: Maybe EndPointAddress
mOurAddress = forall a b. a -> b -> a
const EndPointAddress
ourAddress forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TCPTransport -> Maybe TransportAddrInfo
transportAddrInfo TCPTransport
transport
Either
(TransportError ConnectErrorCode)
(MVar (), Socket, ConnectionRequestResponse)
result <- Maybe EndPointAddress
-> EndPointAddress
-> Bool
-> Bool
-> Bool
-> Maybe Int
-> Maybe Int
-> IO
(Either
(TransportError ConnectErrorCode)
(MVar (), Socket, ConnectionRequestResponse))
socketToEndPoint Maybe EndPointAddress
mOurAddress
EndPointAddress
theirAddress
(TCPParameters -> Bool
tcpReuseClientAddr TCPParameters
params)
(TCPParameters -> Bool
tcpNoDelay TCPParameters
params)
(TCPParameters -> Bool
tcpKeepAlive TCPParameters
params)
(TCPParameters -> Maybe Int
tcpUserTimeout TCPParameters
params)
Maybe Int
connTimeout
Maybe (MVar (), Socket)
didAccept <- case Either
(TransportError ConnectErrorCode)
(MVar (), Socket, ConnectionRequestResponse)
result of
Right (MVar ()
socketClosedVar, Socket
sock, ConnectionRequestResponse
ConnectionRequestAccepted) -> do
MVar (Maybe SomeException)
sendLock <- forall a. a -> IO (MVar a)
newMVar forall a. Maybe a
Nothing
let vst :: ValidRemoteEndPointState
vst = ValidRemoteEndPointState
{ remoteSocket :: Socket
remoteSocket = Socket
sock
, remoteSocketClosed :: IO ()
remoteSocketClosed = forall a. MVar a -> IO a
readMVar MVar ()
socketClosedVar
, remoteProbing :: Maybe (IO ())
remoteProbing = forall a. Maybe a
Nothing
, remoteSendLock :: MVar (Maybe SomeException)
remoteSendLock = MVar (Maybe SomeException)
sendLock
, _remoteOutgoing :: Int
_remoteOutgoing = Int
0
, _remoteIncoming :: Set HeavyweightConnectionId
_remoteIncoming = forall a. Set a
Set.empty
, _remoteLastIncoming :: HeavyweightConnectionId
_remoteLastIncoming = HeavyweightConnectionId
0
, _remoteNextConnOutId :: HeavyweightConnectionId
_remoteNextConnOutId = HeavyweightConnectionId
firstNonReservedLightweightConnectionId
}
EndPointPair -> RemoteState -> IO ()
resolveInit (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) (ValidRemoteEndPointState -> RemoteState
RemoteEndPointValid ValidRemoteEndPointState
vst)
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just (MVar ()
socketClosedVar, Socket
sock))
Right (MVar ()
socketClosedVar, Socket
sock, ConnectionRequestResponse
ConnectionRequestUnsupportedVersion) -> do
let err :: TransportError ConnectErrorCode
err = HostName -> TransportError ConnectErrorCode
connectFailed HostName
"setupRemoteEndPoint: unsupported version"
EndPointPair -> RemoteState -> IO ()
resolveInit (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) (TransportError ConnectErrorCode -> RemoteState
RemoteEndPointInvalid TransportError ConnectErrorCode
err)
Socket -> IO ()
tryCloseSocket Socket
sock forall a b. IO a -> IO b -> IO a
`finally` forall a. MVar a -> a -> IO ()
putMVar MVar ()
socketClosedVar ()
forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Right (MVar ()
socketClosedVar, Socket
sock, ConnectionRequestResponse
ConnectionRequestInvalid) -> do
let err :: TransportError ConnectErrorCode
err = HostName -> TransportError ConnectErrorCode
invalidAddress HostName
"setupRemoteEndPoint: Invalid endpoint"
EndPointPair -> RemoteState -> IO ()
resolveInit (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) (TransportError ConnectErrorCode -> RemoteState
RemoteEndPointInvalid TransportError ConnectErrorCode
err)
Socket -> IO ()
tryCloseSocket Socket
sock forall a b. IO a -> IO b -> IO a
`finally` forall a. MVar a -> a -> IO ()
putMVar MVar ()
socketClosedVar ()
forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Right (MVar ()
socketClosedVar, Socket
sock, ConnectionRequestResponse
ConnectionRequestCrossed) -> do
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (RemoteEndPoint -> MVar RemoteState
remoteState RemoteEndPoint
theirEndPoint) forall a b. (a -> b) -> a -> b
$ \RemoteState
st -> case RemoteState
st of
RemoteEndPointInit MVar ()
_ MVar ()
crossed RequestedBy
_ ->
forall a. MVar a -> a -> IO ()
putMVar MVar ()
crossed ()
RemoteEndPointFailed IOError
ex ->
forall e a. Exception e => e -> IO a
throwIO IOError
ex
RemoteState
_ ->
forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) HostName
"setupRemoteEndPoint: Crossed"
Socket -> IO ()
tryCloseSocket Socket
sock forall a b. IO a -> IO b -> IO a
`finally` forall a. MVar a -> a -> IO ()
putMVar MVar ()
socketClosedVar ()
forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Right (MVar ()
socketClosedVar, Socket
sock, ConnectionRequestResponse
ConnectionRequestHostMismatch) -> do
let handler :: SomeException -> IO (TransportError ConnectErrorCode)
handler :: SomeException -> IO (TransportError ConnectErrorCode)
handler SomeException
err = forall (m :: * -> *) a. Monad m => a -> m a
return (forall error. error -> HostName -> TransportError error
TransportError ConnectErrorCode
ConnectFailed (forall a. Show a => a -> HostName
show SomeException
err))
TransportError ConnectErrorCode
err <- forall e a. Exception e => (e -> IO a) -> IO a -> IO a
handle SomeException -> IO (TransportError ConnectErrorCode)
handler forall a b. (a -> b) -> a -> b
$ do
[ByteString]
claimedHost <- HeavyweightConnectionId -> Socket -> IO [ByteString]
recvWithLength (TCPParameters -> HeavyweightConnectionId
tcpMaxReceiveLength TCPParameters
params) Socket
sock
[ByteString]
actualNumericHost <- HeavyweightConnectionId -> Socket -> IO [ByteString]
recvWithLength (TCPParameters -> HeavyweightConnectionId
tcpMaxReceiveLength TCPParameters
params) Socket
sock
[ByteString]
actualResolvedHost <- HeavyweightConnectionId -> Socket -> IO [ByteString]
recvWithLength (TCPParameters -> HeavyweightConnectionId
tcpMaxReceiveLength TCPParameters
params) Socket
sock
let reason :: HostName
reason = forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [
HostName
"setupRemoteEndPoint: Host mismatch"
, HostName
". Claimed: "
, ByteString -> HostName
BSC.unpack ([ByteString] -> ByteString
BS.concat [ByteString]
claimedHost)
, HostName
"; Numeric: "
, ByteString -> HostName
BSC.unpack ([ByteString] -> ByteString
BS.concat [ByteString]
actualNumericHost)
, HostName
"; Resolved: "
, ByteString -> HostName
BSC.unpack ([ByteString] -> ByteString
BS.concat [ByteString]
actualResolvedHost)
]
forall (m :: * -> *) a. Monad m => a -> m a
return (forall error. error -> HostName -> TransportError error
TransportError ConnectErrorCode
ConnectFailed HostName
reason)
EndPointPair -> RemoteState -> IO ()
resolveInit (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) (TransportError ConnectErrorCode -> RemoteState
RemoteEndPointInvalid TransportError ConnectErrorCode
err)
Socket -> IO ()
tryCloseSocket Socket
sock forall a b. IO a -> IO b -> IO a
`finally` forall a. MVar a -> a -> IO ()
putMVar MVar ()
socketClosedVar ()
forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Left TransportError ConnectErrorCode
err -> do
EndPointPair -> RemoteState -> IO ()
resolveInit (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) (TransportError ConnectErrorCode -> RemoteState
RemoteEndPointInvalid TransportError ConnectErrorCode
err)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (MVar (), Socket)
didAccept forall a b. (a -> b) -> a -> b
$ \(MVar ()
socketClosed, Socket
sock) -> forall (m :: * -> *) a. Monad m => m a -> m ()
void forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$
TCPParameters -> EndPointPair -> IO ()
handleIncomingMessages TCPParameters
params (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
forall a b. IO a -> IO b -> IO a
`finally`
(Socket -> IO ()
tryCloseSocket Socket
sock forall a b. IO a -> IO b -> IO a
`finally` forall a. MVar a -> a -> IO ()
putMVar MVar ()
socketClosed ())
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (forall a b. a -> b -> a
const forall a. Maybe a
Nothing) (forall a. a -> Maybe a
Just forall b c a. (b -> c) -> (a -> b) -> a -> c
. (\(MVar ()
_,Socket
_,ConnectionRequestResponse
x) -> ConnectionRequestResponse
x)) Either
(TransportError ConnectErrorCode)
(MVar (), Socket, ConnectionRequestResponse)
result
where
params :: TCPParameters
params = TCPTransport -> TCPParameters
transportParams TCPTransport
transport
ourAddress :: EndPointAddress
ourAddress = LocalEndPoint -> EndPointAddress
localAddress LocalEndPoint
ourEndPoint
theirAddress :: EndPointAddress
theirAddress = RemoteEndPoint -> EndPointAddress
remoteAddress RemoteEndPoint
theirEndPoint
invalidAddress :: HostName -> TransportError ConnectErrorCode
invalidAddress = forall error. error -> HostName -> TransportError error
TransportError ConnectErrorCode
ConnectNotFound
connectFailed :: HostName -> TransportError ConnectErrorCode
connectFailed = forall error. error -> HostName -> TransportError error
TransportError ConnectErrorCode
ConnectFailed
closeIfUnused :: EndPointPair -> IO ()
closeIfUnused :: EndPointPair -> IO ()
closeIfUnused (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) = do
Maybe (Action ())
mAct <- forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (RemoteEndPoint -> MVar RemoteState
remoteState RemoteEndPoint
theirEndPoint) forall a b. (a -> b) -> a -> b
$ \RemoteState
st -> case RemoteState
st of
RemoteEndPointValid ValidRemoteEndPointState
vst ->
if ValidRemoteEndPointState
vst forall r a. r -> T r a -> a
^. Accessor ValidRemoteEndPointState Int
remoteOutgoing forall a. Eq a => a -> a -> Bool
== Int
0 Bool -> Bool -> Bool
&& forall a. Set a -> Bool
Set.null (ValidRemoteEndPointState
vst forall r a. r -> T r a -> a
^. Accessor ValidRemoteEndPointState (Set HeavyweightConnectionId)
remoteIncoming)
then do
MVar ()
resolved <- forall a. IO (MVar a)
newEmptyMVar
Action ()
act <- forall a. RemoteEndPoint -> IO a -> IO (Action a)
schedule RemoteEndPoint
theirEndPoint forall a b. (a -> b) -> a -> b
$
ValidRemoteEndPointState -> [ByteString] -> IO ()
sendOn ValidRemoteEndPointState
vst [ HeavyweightConnectionId -> ByteString
encodeWord32 (ControlHeader -> HeavyweightConnectionId
encodeControlHeader ControlHeader
CloseSocket)
, HeavyweightConnectionId -> ByteString
encodeWord32 (ValidRemoteEndPointState
vst forall r a. r -> T r a -> a
^. Accessor ValidRemoteEndPointState HeavyweightConnectionId
remoteLastIncoming)
]
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar () -> ValidRemoteEndPointState -> RemoteState
RemoteEndPointClosing MVar ()
resolved ValidRemoteEndPointState
vst, forall a. a -> Maybe a
Just Action ()
act)
else
forall (m :: * -> *) a. Monad m => a -> m a
return (ValidRemoteEndPointState -> RemoteState
RemoteEndPointValid ValidRemoteEndPointState
vst, forall a. Maybe a
Nothing)
RemoteState
_ ->
forall (m :: * -> *) a. Monad m => a -> m a
return (RemoteState
st, forall a. Maybe a
Nothing)
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (Action ())
mAct forall a b. (a -> b) -> a -> b
$ forall a. EndPointPair -> Action a -> IO a
runScheduledAction (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
resetIfBroken :: LocalEndPoint -> EndPointAddress -> IO ()
resetIfBroken :: LocalEndPoint -> EndPointAddress -> IO ()
resetIfBroken LocalEndPoint
ourEndPoint EndPointAddress
theirAddress = do
Maybe RemoteEndPoint
mTheirEndPoint <- forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (LocalEndPoint -> MVar LocalEndPointState
localState LocalEndPoint
ourEndPoint) forall a b. (a -> b) -> a -> b
$ \LocalEndPointState
st -> case LocalEndPointState
st of
LocalEndPointValid ValidLocalEndPointState
vst ->
forall (m :: * -> *) a. Monad m => a -> m a
return (ValidLocalEndPointState
vst forall r a. r -> T r a -> a
^. EndPointAddress
-> Accessor ValidLocalEndPointState (Maybe RemoteEndPoint)
localConnectionTo EndPointAddress
theirAddress)
LocalEndPointState
LocalEndPointClosed ->
forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ forall error. error -> HostName -> TransportError error
TransportError ConnectErrorCode
ConnectFailed HostName
"Endpoint closed"
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe RemoteEndPoint
mTheirEndPoint forall a b. (a -> b) -> a -> b
$ \RemoteEndPoint
theirEndPoint ->
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (RemoteEndPoint -> MVar RemoteState
remoteState RemoteEndPoint
theirEndPoint) forall a b. (a -> b) -> a -> b
$ \RemoteState
st -> case RemoteState
st of
RemoteEndPointInvalid TransportError ConnectErrorCode
_ ->
EndPointPair -> IO ()
removeRemoteEndPoint (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
RemoteEndPointFailed IOError
_ ->
EndPointPair -> IO ()
removeRemoteEndPoint (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
RemoteState
_ ->
forall (m :: * -> *) a. Monad m => a -> m a
return ()
connectToSelf :: LocalEndPoint
-> IO Connection
connectToSelf :: LocalEndPoint -> IO Connection
connectToSelf LocalEndPoint
ourEndPoint = do
IORef Bool
connAlive <- forall a. a -> IO (IORef a)
newIORef Bool
True
HeavyweightConnectionId
lconnId <- forall e a. Exception e => (IOError -> e) -> IO a -> IO a
mapIOException IOError -> TransportError ConnectErrorCode
connectFailed forall a b. (a -> b) -> a -> b
$ LocalEndPoint -> IO HeavyweightConnectionId
getLocalNextConnOutId LocalEndPoint
ourEndPoint
let connId :: ConnectionId
connId = HeavyweightConnectionId -> HeavyweightConnectionId -> ConnectionId
createConnectionId HeavyweightConnectionId
heavyweightSelfConnectionId HeavyweightConnectionId
lconnId
QDisc Event -> EndPointAddress -> Event -> IO ()
qdiscEnqueue' QDisc Event
ourQueue EndPointAddress
ourAddress forall a b. (a -> b) -> a -> b
$
ConnectionId -> Reliability -> EndPointAddress -> Event
ConnectionOpened ConnectionId
connId Reliability
ReliableOrdered (LocalEndPoint -> EndPointAddress
localAddress LocalEndPoint
ourEndPoint)
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
{ send :: [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send = IORef Bool
-> ConnectionId
-> [ByteString]
-> IO (Either (TransportError SendErrorCode) ())
selfSend IORef Bool
connAlive ConnectionId
connId
, close :: IO ()
close = IORef Bool -> ConnectionId -> IO ()
selfClose IORef Bool
connAlive ConnectionId
connId
}
where
selfSend :: IORef Bool
-> ConnectionId
-> [ByteString]
-> IO (Either (TransportError SendErrorCode) ())
selfSend :: IORef Bool
-> ConnectionId
-> [ByteString]
-> IO (Either (TransportError SendErrorCode) ())
selfSend IORef Bool
connAlive ConnectionId
connId [ByteString]
msg =
forall e a. Exception e => IO a -> IO (Either e a)
try forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar LocalEndPointState
ourState forall a b. (a -> b) -> a -> b
$ \LocalEndPointState
st -> case LocalEndPointState
st of
LocalEndPointValid ValidLocalEndPointState
_ -> do
Bool
alive <- forall a. IORef a -> IO a
readIORef IORef Bool
connAlive
if Bool
alive
then seq :: forall a b. a -> b -> b
seq (forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr seq :: forall a b. a -> b -> b
seq () [ByteString]
msg)
QDisc Event -> EndPointAddress -> Event -> IO ()
qdiscEnqueue' QDisc Event
ourQueue EndPointAddress
ourAddress (ConnectionId -> [ByteString] -> Event
Received ConnectionId
connId [ByteString]
msg)
else forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ forall error. error -> HostName -> TransportError error
TransportError SendErrorCode
SendClosed HostName
"Connection closed"
LocalEndPointState
LocalEndPointClosed ->
forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ forall error. error -> HostName -> TransportError error
TransportError SendErrorCode
SendFailed HostName
"Endpoint closed"
selfClose :: IORef Bool -> ConnectionId -> IO ()
selfClose :: IORef Bool -> ConnectionId -> IO ()
selfClose IORef Bool
connAlive ConnectionId
connId =
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar LocalEndPointState
ourState forall a b. (a -> b) -> a -> b
$ \LocalEndPointState
st -> case LocalEndPointState
st of
LocalEndPointValid ValidLocalEndPointState
_ -> do
Bool
alive <- forall a. IORef a -> IO a
readIORef IORef Bool
connAlive
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
alive forall a b. (a -> b) -> a -> b
$ do
QDisc Event -> EndPointAddress -> Event -> IO ()
qdiscEnqueue' QDisc Event
ourQueue EndPointAddress
ourAddress (ConnectionId -> Event
ConnectionClosed ConnectionId
connId)
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
connAlive Bool
False
LocalEndPointState
LocalEndPointClosed ->
forall (m :: * -> *) a. Monad m => a -> m a
return ()
ourQueue :: QDisc Event
ourQueue = LocalEndPoint -> QDisc Event
localQueue LocalEndPoint
ourEndPoint
ourState :: MVar LocalEndPointState
ourState = LocalEndPoint -> MVar LocalEndPointState
localState LocalEndPoint
ourEndPoint
connectFailed :: IOError -> TransportError ConnectErrorCode
connectFailed = forall error. error -> HostName -> TransportError error
TransportError ConnectErrorCode
ConnectFailed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Show a => a -> HostName
show
ourAddress :: EndPointAddress
ourAddress = LocalEndPoint -> EndPointAddress
localAddress LocalEndPoint
ourEndPoint
resolveInit :: EndPointPair -> RemoteState -> IO ()
resolveInit :: EndPointPair -> RemoteState -> IO ()
resolveInit (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) RemoteState
newState =
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (RemoteEndPoint -> MVar RemoteState
remoteState RemoteEndPoint
theirEndPoint) forall a b. (a -> b) -> a -> b
$ \RemoteState
st -> case RemoteState
st of
RemoteEndPointInit MVar ()
resolved MVar ()
crossed RequestedBy
_ -> do
forall a. MVar a -> a -> IO ()
putMVar MVar ()
resolved ()
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
crossed ()
case RemoteState
newState of
RemoteState
RemoteEndPointClosed ->
EndPointPair -> IO ()
removeRemoteEndPoint (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint)
RemoteState
_ ->
forall (m :: * -> *) a. Monad m => a -> m a
return ()
forall (m :: * -> *) a. Monad m => a -> m a
return RemoteState
newState
RemoteEndPointFailed IOError
ex ->
forall e a. Exception e => e -> IO a
throwIO IOError
ex
RemoteState
_ ->
forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) HostName
"resolveInit"
getLocalNextConnOutId :: LocalEndPoint -> IO LightweightConnectionId
getLocalNextConnOutId :: LocalEndPoint -> IO HeavyweightConnectionId
getLocalNextConnOutId LocalEndPoint
ourEndpoint =
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (LocalEndPoint -> MVar LocalEndPointState
localState LocalEndPoint
ourEndpoint) forall a b. (a -> b) -> a -> b
$ \LocalEndPointState
st -> case LocalEndPointState
st of
LocalEndPointValid ValidLocalEndPointState
vst -> do
let connId :: HeavyweightConnectionId
connId = ValidLocalEndPointState
vst forall r a. r -> T r a -> a
^. Accessor ValidLocalEndPointState HeavyweightConnectionId
localNextConnOutId
forall (m :: * -> *) a. Monad m => a -> m a
return ( ValidLocalEndPointState -> LocalEndPointState
LocalEndPointValid
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Accessor ValidLocalEndPointState HeavyweightConnectionId
localNextConnOutId forall r a. T r a -> a -> r -> r
^= HeavyweightConnectionId
connId forall a. Num a => a -> a -> a
+ HeavyweightConnectionId
1)
forall a b. (a -> b) -> a -> b
$ ValidLocalEndPointState
vst
, HeavyweightConnectionId
connId)
LocalEndPointState
LocalEndPointClosed ->
forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError HostName
"Local endpoint closed"
createLocalEndPoint :: TCPTransport
-> QDisc Event
-> IO LocalEndPoint
createLocalEndPoint :: TCPTransport -> QDisc Event -> IO LocalEndPoint
createLocalEndPoint TCPTransport
transport QDisc Event
qdisc = do
MVar LocalEndPointState
state <- forall a. a -> IO (MVar a)
newMVar forall b c a. (b -> c) -> (a -> b) -> a -> c
. ValidLocalEndPointState -> LocalEndPointState
LocalEndPointValid forall a b. (a -> b) -> a -> b
$ ValidLocalEndPointState
{ _localNextConnOutId :: HeavyweightConnectionId
_localNextConnOutId = HeavyweightConnectionId
firstNonReservedLightweightConnectionId
, _localConnections :: Map EndPointAddress RemoteEndPoint
_localConnections = forall k a. Map k a
Map.empty
, _nextConnInId :: HeavyweightConnectionId
_nextConnInId = HeavyweightConnectionId
firstNonReservedHeavyweightConnectionId
}
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (TCPTransport -> MVar TransportState
transportState TCPTransport
transport) forall a b. (a -> b) -> a -> b
$ \TransportState
st -> case TransportState
st of
TransportValid ValidTransportState
vst -> do
let ix :: HeavyweightConnectionId
ix = ValidTransportState
vst forall r a. r -> T r a -> a
^. Accessor ValidTransportState HeavyweightConnectionId
nextEndPointId
EndPointAddress
addr <- case TCPTransport -> Maybe TransportAddrInfo
transportAddrInfo TCPTransport
transport of
Maybe TransportAddrInfo
Nothing -> IO EndPointAddress
randomEndPointAddress
Just TransportAddrInfo
addrInfo -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
HostName -> HostName -> HeavyweightConnectionId -> EndPointAddress
encodeEndPointAddress (TransportAddrInfo -> HostName
transportHost TransportAddrInfo
addrInfo)
(TransportAddrInfo -> HostName
transportPort TransportAddrInfo
addrInfo)
HeavyweightConnectionId
ix
let localEndPoint :: LocalEndPoint
localEndPoint = LocalEndPoint { localAddress :: EndPointAddress
localAddress = EndPointAddress
addr
, localEndPointId :: HeavyweightConnectionId
localEndPointId = HeavyweightConnectionId
ix
, localQueue :: QDisc Event
localQueue = QDisc Event
qdisc
, localState :: MVar LocalEndPointState
localState = MVar LocalEndPointState
state
}
forall (m :: * -> *) a. Monad m => a -> m a
return ( ValidTransportState -> TransportState
TransportValid
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (HeavyweightConnectionId
-> Accessor ValidTransportState (Maybe LocalEndPoint)
localEndPointAt HeavyweightConnectionId
ix forall r a. T r a -> a -> r -> r
^= forall a. a -> Maybe a
Just LocalEndPoint
localEndPoint)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Accessor ValidTransportState HeavyweightConnectionId
nextEndPointId forall r a. T r a -> a -> r -> r
^= HeavyweightConnectionId
ix forall a. Num a => a -> a -> a
+ HeavyweightConnectionId
1)
forall a b. (a -> b) -> a -> b
$ ValidTransportState
vst
, LocalEndPoint
localEndPoint
)
TransportState
TransportClosed ->
forall e a. Exception e => e -> IO a
throwIO (forall error. error -> HostName -> TransportError error
TransportError NewEndPointErrorCode
NewEndPointFailed HostName
"Transport closed")
removeRemoteEndPoint :: EndPointPair -> IO ()
removeRemoteEndPoint :: EndPointPair -> IO ()
removeRemoteEndPoint (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) =
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar LocalEndPointState
ourState forall a b. (a -> b) -> a -> b
$ \LocalEndPointState
st -> case LocalEndPointState
st of
LocalEndPointValid ValidLocalEndPointState
vst ->
case ValidLocalEndPointState
vst forall r a. r -> T r a -> a
^. EndPointAddress
-> Accessor ValidLocalEndPointState (Maybe RemoteEndPoint)
localConnectionTo EndPointAddress
theirAddress of
Maybe RemoteEndPoint
Nothing ->
forall (m :: * -> *) a. Monad m => a -> m a
return LocalEndPointState
st
Just RemoteEndPoint
remoteEndPoint' ->
if RemoteEndPoint -> HeavyweightConnectionId
remoteId RemoteEndPoint
remoteEndPoint' forall a. Eq a => a -> a -> Bool
== RemoteEndPoint -> HeavyweightConnectionId
remoteId RemoteEndPoint
theirEndPoint
then forall (m :: * -> *) a. Monad m => a -> m a
return
( ValidLocalEndPointState -> LocalEndPointState
LocalEndPointValid
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (EndPointAddress
-> Accessor ValidLocalEndPointState (Maybe RemoteEndPoint)
localConnectionTo (RemoteEndPoint -> EndPointAddress
remoteAddress RemoteEndPoint
theirEndPoint) forall r a. T r a -> a -> r -> r
^= forall a. Maybe a
Nothing)
forall a b. (a -> b) -> a -> b
$ ValidLocalEndPointState
vst
)
else forall (m :: * -> *) a. Monad m => a -> m a
return LocalEndPointState
st
LocalEndPointState
LocalEndPointClosed ->
forall (m :: * -> *) a. Monad m => a -> m a
return LocalEndPointState
LocalEndPointClosed
where
ourState :: MVar LocalEndPointState
ourState = LocalEndPoint -> MVar LocalEndPointState
localState LocalEndPoint
ourEndPoint
theirAddress :: EndPointAddress
theirAddress = RemoteEndPoint -> EndPointAddress
remoteAddress RemoteEndPoint
theirEndPoint
removeLocalEndPoint :: TCPTransport -> LocalEndPoint -> IO ()
removeLocalEndPoint :: TCPTransport -> LocalEndPoint -> IO ()
removeLocalEndPoint TCPTransport
transport LocalEndPoint
ourEndPoint =
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (TCPTransport -> MVar TransportState
transportState TCPTransport
transport) forall a b. (a -> b) -> a -> b
$ \TransportState
st -> case TransportState
st of
TransportValid ValidTransportState
vst ->
forall (m :: * -> *) a. Monad m => a -> m a
return ( ValidTransportState -> TransportState
TransportValid
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (HeavyweightConnectionId
-> Accessor ValidTransportState (Maybe LocalEndPoint)
localEndPointAt (LocalEndPoint -> HeavyweightConnectionId
localEndPointId LocalEndPoint
ourEndPoint) forall r a. T r a -> a -> r -> r
^= forall a. Maybe a
Nothing)
forall a b. (a -> b) -> a -> b
$ ValidTransportState
vst
)
TransportState
TransportClosed ->
forall (m :: * -> *) a. Monad m => a -> m a
return TransportState
TransportClosed
findRemoteEndPoint
:: LocalEndPoint
-> EndPointAddress
-> RequestedBy
-> Maybe (IO ())
-> IO (RemoteEndPoint, Bool)
findRemoteEndPoint :: LocalEndPoint
-> EndPointAddress
-> RequestedBy
-> Maybe (IO ())
-> IO (RemoteEndPoint, Bool)
findRemoteEndPoint LocalEndPoint
ourEndPoint EndPointAddress
theirAddress RequestedBy
findOrigin Maybe (IO ())
mtimer = IO (RemoteEndPoint, Bool)
go
where
go :: IO (RemoteEndPoint, Bool)
go = do
(RemoteEndPoint
theirEndPoint, Bool
isNew) <- forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar LocalEndPointState
ourState forall a b. (a -> b) -> a -> b
$ \LocalEndPointState
st -> case LocalEndPointState
st of
LocalEndPointValid ValidLocalEndPointState
vst -> case ValidLocalEndPointState
vst forall r a. r -> T r a -> a
^. EndPointAddress
-> Accessor ValidLocalEndPointState (Maybe RemoteEndPoint)
localConnectionTo EndPointAddress
theirAddress of
Just RemoteEndPoint
theirEndPoint ->
forall (m :: * -> *) a. Monad m => a -> m a
return (LocalEndPointState
st, (RemoteEndPoint
theirEndPoint, Bool
False))
Maybe RemoteEndPoint
Nothing -> do
MVar ()
resolved <- forall a. IO (MVar a)
newEmptyMVar
MVar ()
crossed <- forall a. IO (MVar a)
newEmptyMVar
MVar RemoteState
theirState <- forall a. a -> IO (MVar a)
newMVar (MVar () -> MVar () -> RequestedBy -> RemoteState
RemoteEndPointInit MVar ()
resolved MVar ()
crossed RequestedBy
findOrigin)
Chan (IO ())
scheduled <- forall a. IO (Chan a)
newChan
let theirEndPoint :: RemoteEndPoint
theirEndPoint = RemoteEndPoint
{ remoteAddress :: EndPointAddress
remoteAddress = EndPointAddress
theirAddress
, remoteState :: MVar RemoteState
remoteState = MVar RemoteState
theirState
, remoteId :: HeavyweightConnectionId
remoteId = ValidLocalEndPointState
vst forall r a. r -> T r a -> a
^. Accessor ValidLocalEndPointState HeavyweightConnectionId
nextConnInId
, remoteScheduled :: Chan (IO ())
remoteScheduled = Chan (IO ())
scheduled
}
forall (m :: * -> *) a. Monad m => a -> m a
return ( ValidLocalEndPointState -> LocalEndPointState
LocalEndPointValid
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (EndPointAddress
-> Accessor ValidLocalEndPointState (Maybe RemoteEndPoint)
localConnectionTo EndPointAddress
theirAddress forall r a. T r a -> a -> r -> r
^= forall a. a -> Maybe a
Just RemoteEndPoint
theirEndPoint)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Accessor ValidLocalEndPointState HeavyweightConnectionId
nextConnInId forall r a. T r a -> (a -> a) -> r -> r
^: (forall a. Num a => a -> a -> a
+ HeavyweightConnectionId
1))
forall a b. (a -> b) -> a -> b
$ ValidLocalEndPointState
vst
, (RemoteEndPoint
theirEndPoint, Bool
True)
)
LocalEndPointState
LocalEndPointClosed ->
forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError HostName
"Local endpoint closed"
if Bool
isNew
then
forall (m :: * -> *) a. Monad m => a -> m a
return (RemoteEndPoint
theirEndPoint, Bool
True)
else do
let theirState :: MVar RemoteState
theirState = RemoteEndPoint -> MVar RemoteState
remoteState RemoteEndPoint
theirEndPoint
RemoteState
snapshot <- forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar RemoteState
theirState forall a b. (a -> b) -> a -> b
$ \RemoteState
st -> case RemoteState
st of
RemoteEndPointValid ValidRemoteEndPointState
vst ->
case RequestedBy
findOrigin of
RequestedBy
RequestedByUs -> do
let st' :: RemoteState
st' = ValidRemoteEndPointState -> RemoteState
RemoteEndPointValid
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Accessor ValidRemoteEndPointState Int
remoteOutgoing forall r a. T r a -> (a -> a) -> r -> r
^: (forall a. Num a => a -> a -> a
+ Int
1))
forall a b. (a -> b) -> a -> b
$ ValidRemoteEndPointState
vst
forall (m :: * -> *) a. Monad m => a -> m a
return (RemoteState
st', RemoteState
st')
RequestedBy
RequestedByThem ->
forall (m :: * -> *) a. Monad m => a -> m a
return (RemoteState
st, RemoteState
st)
RemoteState
_ ->
forall (m :: * -> *) a. Monad m => a -> m a
return (RemoteState
st, RemoteState
st)
case RemoteState
snapshot of
RemoteEndPointInvalid TransportError ConnectErrorCode
err ->
forall e a. Exception e => e -> IO a
throwIO TransportError ConnectErrorCode
err
RemoteEndPointInit MVar ()
resolved MVar ()
crossed RequestedBy
initOrigin ->
case (RequestedBy
findOrigin, RequestedBy
initOrigin) of
(RequestedBy
RequestedByUs, RequestedBy
RequestedByUs) ->
forall {a} {a}. Maybe (IO a) -> MVar a -> IO a
readMVarTimeout Maybe (IO ())
mtimer MVar ()
resolved forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (RemoteEndPoint, Bool)
go
(RequestedBy
RequestedByUs, RequestedBy
RequestedByThem) ->
forall {a} {a}. Maybe (IO a) -> MVar a -> IO a
readMVarTimeout Maybe (IO ())
mtimer MVar ()
resolved forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (RemoteEndPoint, Bool)
go
(RequestedBy
RequestedByThem, RequestedBy
RequestedByUs) ->
if EndPointAddress
ourAddress forall a. Ord a => a -> a -> Bool
> EndPointAddress
theirAddress
then do
forall a. MVar a -> IO (Maybe a)
tryReadMVar MVar ()
crossed forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe ()
Nothing -> forall {a} {a}. Maybe (IO a) -> MVar a -> IO a
readMVarTimeout Maybe (IO ())
mtimer MVar ()
crossed forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (RemoteEndPoint, Bool)
go
Maybe ()
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return (RemoteEndPoint
theirEndPoint, Bool
True)
else
forall (m :: * -> *) a. Monad m => a -> m a
return (RemoteEndPoint
theirEndPoint, Bool
False)
(RequestedBy
RequestedByThem, RequestedBy
RequestedByThem) ->
forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError HostName
"Already connected"
RemoteEndPointValid ValidRemoteEndPointState
_ ->
forall (m :: * -> *) a. Monad m => a -> m a
return (RemoteEndPoint
theirEndPoint, Bool
False)
RemoteEndPointClosing MVar ()
resolved ValidRemoteEndPointState
_ ->
forall {a} {a}. Maybe (IO a) -> MVar a -> IO a
readMVarTimeout Maybe (IO ())
mtimer MVar ()
resolved forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (RemoteEndPoint, Bool)
go
RemoteState
RemoteEndPointClosed ->
IO (RemoteEndPoint, Bool)
go
RemoteEndPointFailed IOError
err ->
forall e a. Exception e => e -> IO a
throwIO IOError
err
ourState :: MVar LocalEndPointState
ourState = LocalEndPoint -> MVar LocalEndPointState
localState LocalEndPoint
ourEndPoint
ourAddress :: EndPointAddress
ourAddress = LocalEndPoint -> EndPointAddress
localAddress LocalEndPoint
ourEndPoint
readMVarTimeout :: Maybe (IO a) -> MVar a -> IO a
readMVarTimeout Maybe (IO a)
Nothing MVar a
mv = forall a. MVar a -> IO a
readMVar MVar a
mv
readMVarTimeout (Just IO a
timer) MVar a
mv = do
let connectTimedout :: TransportError ConnectErrorCode
connectTimedout = forall error. error -> HostName -> TransportError error
TransportError ConnectErrorCode
ConnectTimeout HostName
"Timed out"
ThreadId
tid <- IO ThreadId
myThreadId
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ IO a
timer forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
tid TransportError ConnectErrorCode
connectTimedout) ThreadId -> IO ()
killThread forall a b. (a -> b) -> a -> b
$
forall a b. a -> b -> a
const forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
readMVar MVar a
mv
sendOn :: ValidRemoteEndPointState -> [ByteString] -> IO ()
sendOn :: ValidRemoteEndPointState -> [ByteString] -> IO ()
sendOn ValidRemoteEndPointState
vst [ByteString]
bs = (forall a. Async a -> IO a
wait forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) forall a b. (a -> b) -> a -> b
$ forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
let lock :: MVar (Maybe SomeException)
lock = ValidRemoteEndPointState -> MVar (Maybe SomeException)
remoteSendLock ValidRemoteEndPointState
vst
Maybe SomeException
maybeException <- forall a. MVar a -> IO a
takeMVar MVar (Maybe SomeException)
lock
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Maybe a -> Bool
isNothing Maybe SomeException
maybeException) forall a b. (a -> b) -> a -> b
$
forall a. IO a -> IO a
restore (Socket -> [ByteString] -> IO ()
sendMany (ValidRemoteEndPointState -> Socket
remoteSocket ValidRemoteEndPointState
vst) [ByteString]
bs) forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \SomeException
ex -> do
forall a. MVar a -> a -> IO ()
putMVar MVar (Maybe SomeException)
lock (forall a. a -> Maybe a
Just SomeException
ex)
forall e a. Exception e => e -> IO a
throwIO SomeException
ex
forall a. MVar a -> a -> IO ()
putMVar MVar (Maybe SomeException)
lock Maybe SomeException
maybeException
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe SomeException
maybeException forall a b. (a -> b) -> a -> b
$ \SomeException
e ->
forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError forall a b. (a -> b) -> a -> b
$ HostName
"sendOn failed earlier with: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> HostName
show SomeException
e
type Action a = MVar (Either SomeException a)
schedule :: RemoteEndPoint -> IO a -> IO (Action a)
schedule :: forall a. RemoteEndPoint -> IO a -> IO (Action a)
schedule RemoteEndPoint
theirEndPoint IO a
act = do
Action a
mvar <- forall a. IO (MVar a)
newEmptyMVar
forall a. Chan a -> a -> IO ()
writeChan (RemoteEndPoint -> Chan (IO ())
remoteScheduled RemoteEndPoint
theirEndPoint) forall a b. (a -> b) -> a -> b
$
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch (IO a
act forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. MVar a -> a -> IO ()
putMVar Action a
mvar forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right) (forall a. MVar a -> a -> IO ()
putMVar Action a
mvar forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> Either a b
Left)
forall (m :: * -> *) a. Monad m => a -> m a
return Action a
mvar
runScheduledAction :: EndPointPair -> Action a -> IO a
runScheduledAction :: forall a. EndPointPair -> Action a -> IO a
runScheduledAction (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) Action a
mvar = do
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall a b. (a -> b) -> a -> b
$ forall a. Chan a -> IO a
readChan (RemoteEndPoint -> Chan (IO ())
remoteScheduled RemoteEndPoint
theirEndPoint)
Either SomeException a
ma <- forall a. MVar a -> IO a
readMVar Action a
mvar
case Either SomeException a
ma of
Right a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Left SomeException
e -> do
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e) forall a b. (a -> b) -> a -> b
$ \IOError
ioe ->
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (RemoteEndPoint -> MVar RemoteState
remoteState RemoteEndPoint
theirEndPoint) forall a b. (a -> b) -> a -> b
$ \RemoteState
st ->
case RemoteState
st of
RemoteEndPointValid ValidRemoteEndPointState
vst -> IOError -> ValidRemoteEndPointState -> IO RemoteState
handleIOException IOError
ioe ValidRemoteEndPointState
vst
RemoteState
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return (IOError -> RemoteState
RemoteEndPointFailed IOError
ioe)
forall e a. Exception e => e -> IO a
throwIO SomeException
e
where
handleIOException :: IOException
-> ValidRemoteEndPointState
-> IO RemoteState
handleIOException :: IOError -> ValidRemoteEndPointState -> IO RemoteState
handleIOException IOError
ex ValidRemoteEndPointState
vst = do
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ValidRemoteEndPointState -> Maybe (IO ())
remoteProbing ValidRemoteEndPointState
vst) forall a. a -> a
id
Socket -> IO ()
tryShutdownSocketBoth (ValidRemoteEndPointState -> Socket
remoteSocket ValidRemoteEndPointState
vst)
forall (m :: * -> *) a. Monad m => a -> m a
return (IOError -> RemoteState
RemoteEndPointFailed IOError
ex)
withScheduledAction :: LocalEndPoint -> ((RemoteEndPoint -> IO a -> IO ()) -> IO ()) -> IO ()
withScheduledAction :: forall a.
LocalEndPoint
-> ((RemoteEndPoint -> IO a -> IO ()) -> IO ()) -> IO ()
withScheduledAction LocalEndPoint
ourEndPoint (RemoteEndPoint -> IO a -> IO ()) -> IO ()
f =
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (forall a. a -> IO (IORef a)
newIORef forall a. Maybe a
Nothing)
(forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (\(RemoteEndPoint
tp, Action a
a) -> forall a. EndPointPair -> Action a -> IO a
runScheduledAction (LocalEndPoint
ourEndPoint, RemoteEndPoint
tp) Action a
a) forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< forall a. IORef a -> IO a
readIORef)
(\IORef (Maybe (RemoteEndPoint, Action a))
ref -> (RemoteEndPoint -> IO a -> IO ()) -> IO ()
f (\RemoteEndPoint
rp IO a
g -> forall a. IO a -> IO a
mask_ forall a b. (a -> b) -> a -> b
$ forall a. RemoteEndPoint -> IO a -> IO (Action a)
schedule RemoteEndPoint
rp IO a
g forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Action a
x -> forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe (RemoteEndPoint, Action a))
ref (forall a. a -> Maybe a
Just (RemoteEndPoint
rp,Action a
x)) ))
socketToEndPoint :: Maybe EndPointAddress
-> EndPointAddress
-> Bool
-> Bool
-> Bool
-> Maybe Int
-> Maybe Int
-> IO (Either (TransportError ConnectErrorCode)
(MVar (), N.Socket, ConnectionRequestResponse))
socketToEndPoint :: Maybe EndPointAddress
-> EndPointAddress
-> Bool
-> Bool
-> Bool
-> Maybe Int
-> Maybe Int
-> IO
(Either
(TransportError ConnectErrorCode)
(MVar (), Socket, ConnectionRequestResponse))
socketToEndPoint Maybe EndPointAddress
mOurAddress EndPointAddress
theirAddress Bool
reuseAddr Bool
noDelay Bool
keepAlive
Maybe Int
mUserTimeout Maybe Int
timeout =
forall e a. Exception e => IO a -> IO (Either e a)
try forall a b. (a -> b) -> a -> b
$ do
(HostName
host, HostName
port, HeavyweightConnectionId
theirEndPointId) <- case EndPointAddress
-> Maybe (HostName, HostName, HeavyweightConnectionId)
decodeEndPointAddress EndPointAddress
theirAddress of
Maybe (HostName, HostName, HeavyweightConnectionId)
Nothing -> forall e a. Exception e => e -> IO a
throwIO (IOError -> TransportError ConnectErrorCode
failed forall b c a. (b -> c) -> (a -> b) -> a -> c
. HostName -> IOError
userError forall a b. (a -> b) -> a -> b
$ HostName
"Could not parse")
Just (HostName, HostName, HeavyweightConnectionId)
dec -> forall (m :: * -> *) a. Monad m => a -> m a
return (HostName, HostName, HeavyweightConnectionId)
dec
AddrInfo
addr:[AddrInfo]
_ <- forall e a. Exception e => (IOError -> e) -> IO a -> IO a
mapIOException IOError -> TransportError ConnectErrorCode
invalidAddress forall a b. (a -> b) -> a -> b
$
Maybe AddrInfo -> Maybe HostName -> Maybe HostName -> IO [AddrInfo]
N.getAddrInfo forall a. Maybe a
Nothing (forall a. a -> Maybe a
Just HostName
host) (forall a. a -> Maybe a
Just HostName
port)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracketOnError (AddrInfo -> IO Socket
createSocket AddrInfo
addr) Socket -> IO ()
tryCloseSocket forall a b. (a -> b) -> a -> b
$ \Socket
sock -> do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
reuseAddr forall a b. (a -> b) -> a -> b
$
forall e a. Exception e => (IOError -> e) -> IO a -> IO a
mapIOException IOError -> TransportError ConnectErrorCode
failed forall a b. (a -> b) -> a -> b
$ Socket -> SocketOption -> Int -> IO ()
N.setSocketOption Socket
sock SocketOption
N.ReuseAddr Int
1
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
noDelay forall a b. (a -> b) -> a -> b
$
forall e a. Exception e => (IOError -> e) -> IO a -> IO a
mapIOException IOError -> TransportError ConnectErrorCode
failed forall a b. (a -> b) -> a -> b
$ Socket -> SocketOption -> Int -> IO ()
N.setSocketOption Socket
sock SocketOption
N.NoDelay Int
1
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
keepAlive forall a b. (a -> b) -> a -> b
$
forall e a. Exception e => (IOError -> e) -> IO a -> IO a
mapIOException IOError -> TransportError ConnectErrorCode
failed forall a b. (a -> b) -> a -> b
$ Socket -> SocketOption -> Int -> IO ()
N.setSocketOption Socket
sock SocketOption
N.KeepAlive Int
1
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe Int
mUserTimeout forall a b. (a -> b) -> a -> b
$
forall e a. Exception e => (IOError -> e) -> IO a -> IO a
mapIOException IOError -> TransportError ConnectErrorCode
failed forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> SocketOption -> Int -> IO ()
N.setSocketOption Socket
sock SocketOption
N.UserTimeout
HeavyweightConnectionId
response <- forall e a. Exception e => Maybe Int -> e -> IO a -> IO a
timeoutMaybe Maybe Int
timeout TransportError ConnectErrorCode
timeoutError forall a b. (a -> b) -> a -> b
$ do
forall e a. Exception e => (IOError -> e) -> IO a -> IO a
mapIOException IOError -> TransportError ConnectErrorCode
invalidAddress forall a b. (a -> b) -> a -> b
$
Socket -> SockAddr -> IO ()
N.connect Socket
sock (AddrInfo -> SockAddr
N.addrAddress AddrInfo
addr)
forall e a. Exception e => (IOError -> e) -> IO a -> IO a
mapIOException IOError -> TransportError ConnectErrorCode
failed forall a b. (a -> b) -> a -> b
$ do
case Maybe EndPointAddress
mOurAddress of
Just (EndPointAddress ByteString
ourAddress) ->
Socket -> [ByteString] -> IO ()
sendMany Socket
sock forall a b. (a -> b) -> a -> b
$
HeavyweightConnectionId -> ByteString
encodeWord32 HeavyweightConnectionId
currentProtocolVersion
forall a. a -> [a] -> [a]
: [ByteString] -> [ByteString]
prependLength (HeavyweightConnectionId -> ByteString
encodeWord32 HeavyweightConnectionId
theirEndPointId forall a. a -> [a] -> [a]
: [ByteString] -> [ByteString]
prependLength [ByteString
ourAddress])
Maybe EndPointAddress
Nothing ->
Socket -> [ByteString] -> IO ()
sendMany Socket
sock forall a b. (a -> b) -> a -> b
$
HeavyweightConnectionId -> ByteString
encodeWord32 HeavyweightConnectionId
currentProtocolVersion
forall a. a -> [a] -> [a]
: [ByteString] -> [ByteString]
prependLength ([HeavyweightConnectionId -> ByteString
encodeWord32 HeavyweightConnectionId
theirEndPointId, HeavyweightConnectionId -> ByteString
encodeWord32 HeavyweightConnectionId
0])
Socket -> IO HeavyweightConnectionId
recvWord32 Socket
sock
case HeavyweightConnectionId -> Maybe ConnectionRequestResponse
decodeConnectionRequestResponse HeavyweightConnectionId
response of
Maybe ConnectionRequestResponse
Nothing -> forall e a. Exception e => e -> IO a
throwIO (IOError -> TransportError ConnectErrorCode
failed forall b c a. (b -> c) -> (a -> b) -> a -> c
. HostName -> IOError
userError forall a b. (a -> b) -> a -> b
$ HostName
"Unexpected response")
Just ConnectionRequestResponse
r -> do
MVar ()
socketClosedVar <- forall a. IO (MVar a)
newEmptyMVar
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar ()
socketClosedVar, Socket
sock, ConnectionRequestResponse
r)
where
createSocket :: N.AddrInfo -> IO N.Socket
createSocket :: AddrInfo -> IO Socket
createSocket AddrInfo
addr = forall e a. Exception e => (IOError -> e) -> IO a -> IO a
mapIOException IOError -> TransportError ConnectErrorCode
insufficientResources forall a b. (a -> b) -> a -> b
$
Family -> SocketType -> ProtocolNumber -> IO Socket
N.socket (AddrInfo -> Family
N.addrFamily AddrInfo
addr) SocketType
N.Stream ProtocolNumber
N.defaultProtocol
invalidAddress :: IOError -> TransportError ConnectErrorCode
invalidAddress = forall error. error -> HostName -> TransportError error
TransportError ConnectErrorCode
ConnectNotFound forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Show a => a -> HostName
show
insufficientResources :: IOError -> TransportError ConnectErrorCode
insufficientResources = forall error. error -> HostName -> TransportError error
TransportError ConnectErrorCode
ConnectInsufficientResources forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Show a => a -> HostName
show
failed :: IOError -> TransportError ConnectErrorCode
failed = forall error. error -> HostName -> TransportError error
TransportError ConnectErrorCode
ConnectFailed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Show a => a -> HostName
show
timeoutError :: TransportError ConnectErrorCode
timeoutError = forall error. error -> HostName -> TransportError error
TransportError ConnectErrorCode
ConnectTimeout HostName
"Timed out"
createConnectionId :: HeavyweightConnectionId
-> LightweightConnectionId
-> ConnectionId
createConnectionId :: HeavyweightConnectionId -> HeavyweightConnectionId -> ConnectionId
createConnectionId HeavyweightConnectionId
hcid HeavyweightConnectionId
lcid =
(forall a b. (Integral a, Num b) => a -> b
fromIntegral HeavyweightConnectionId
hcid forall a. Bits a => a -> Int -> a
`shiftL` Int
32) forall a. Bits a => a -> a -> a
.|. forall a b. (Integral a, Num b) => a -> b
fromIntegral HeavyweightConnectionId
lcid
internalSocketBetween :: TCPTransport
-> EndPointAddress
-> EndPointAddress
-> IO N.Socket
internalSocketBetween :: TCPTransport -> EndPointAddress -> EndPointAddress -> IO Socket
internalSocketBetween TCPTransport
transport EndPointAddress
ourAddress EndPointAddress
theirAddress = do
HeavyweightConnectionId
ourEndPointId <- case EndPointAddress
-> Maybe (HostName, HostName, HeavyweightConnectionId)
decodeEndPointAddress EndPointAddress
ourAddress of
Just (HostName
_, HostName
_, HeavyweightConnectionId
eid) -> forall (m :: * -> *) a. Monad m => a -> m a
return HeavyweightConnectionId
eid
Maybe (HostName, HostName, HeavyweightConnectionId)
_ -> forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError HostName
"Malformed local EndPointAddress"
LocalEndPoint
ourEndPoint <- forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (TCPTransport -> MVar TransportState
transportState TCPTransport
transport) forall a b. (a -> b) -> a -> b
$ \TransportState
st -> case TransportState
st of
TransportState
TransportClosed ->
forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError HostName
"Transport closed"
TransportValid ValidTransportState
vst ->
case ValidTransportState
vst forall r a. r -> T r a -> a
^. HeavyweightConnectionId
-> Accessor ValidTransportState (Maybe LocalEndPoint)
localEndPointAt HeavyweightConnectionId
ourEndPointId of
Maybe LocalEndPoint
Nothing -> forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError HostName
"Local endpoint not found"
Just LocalEndPoint
ep -> forall (m :: * -> *) a. Monad m => a -> m a
return LocalEndPoint
ep
RemoteEndPoint
theirEndPoint <- forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (LocalEndPoint -> MVar LocalEndPointState
localState LocalEndPoint
ourEndPoint) forall a b. (a -> b) -> a -> b
$ \LocalEndPointState
st -> case LocalEndPointState
st of
LocalEndPointState
LocalEndPointClosed ->
forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError HostName
"Local endpoint closed"
LocalEndPointValid ValidLocalEndPointState
vst ->
case ValidLocalEndPointState
vst forall r a. r -> T r a -> a
^. EndPointAddress
-> Accessor ValidLocalEndPointState (Maybe RemoteEndPoint)
localConnectionTo EndPointAddress
theirAddress of
Maybe RemoteEndPoint
Nothing -> forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError HostName
"Remote endpoint not found"
Just RemoteEndPoint
ep -> forall (m :: * -> *) a. Monad m => a -> m a
return RemoteEndPoint
ep
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (RemoteEndPoint -> MVar RemoteState
remoteState RemoteEndPoint
theirEndPoint) forall a b. (a -> b) -> a -> b
$ \RemoteState
st -> case RemoteState
st of
RemoteEndPointInit MVar ()
_ MVar ()
_ RequestedBy
_ ->
forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError HostName
"Remote endpoint not yet initialized"
RemoteEndPointValid ValidRemoteEndPointState
vst ->
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ValidRemoteEndPointState -> Socket
remoteSocket ValidRemoteEndPointState
vst
RemoteEndPointClosing MVar ()
_ ValidRemoteEndPointState
vst ->
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ValidRemoteEndPointState -> Socket
remoteSocket ValidRemoteEndPointState
vst
RemoteState
RemoteEndPointClosed ->
forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError HostName
"Remote endpoint closed"
RemoteEndPointInvalid TransportError ConnectErrorCode
err ->
forall e a. Exception e => e -> IO a
throwIO TransportError ConnectErrorCode
err
RemoteEndPointFailed IOError
err ->
forall e a. Exception e => e -> IO a
throwIO IOError
err
where
firstNonReservedLightweightConnectionId :: LightweightConnectionId
firstNonReservedLightweightConnectionId :: HeavyweightConnectionId
firstNonReservedLightweightConnectionId = HeavyweightConnectionId
1024
heavyweightSelfConnectionId :: HeavyweightConnectionId
heavyweightSelfConnectionId :: HeavyweightConnectionId
heavyweightSelfConnectionId = HeavyweightConnectionId
0
firstNonReservedHeavyweightConnectionId :: HeavyweightConnectionId
firstNonReservedHeavyweightConnectionId :: HeavyweightConnectionId
firstNonReservedHeavyweightConnectionId = HeavyweightConnectionId
1
localEndPoints :: Accessor ValidTransportState (Map EndPointId LocalEndPoint)
localEndPoints :: Accessor
ValidTransportState (Map HeavyweightConnectionId LocalEndPoint)
localEndPoints = forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor ValidTransportState -> Map HeavyweightConnectionId LocalEndPoint
_localEndPoints (\Map HeavyweightConnectionId LocalEndPoint
es ValidTransportState
st -> ValidTransportState
st { _localEndPoints :: Map HeavyweightConnectionId LocalEndPoint
_localEndPoints = Map HeavyweightConnectionId LocalEndPoint
es })
nextEndPointId :: Accessor ValidTransportState EndPointId
nextEndPointId :: Accessor ValidTransportState HeavyweightConnectionId
nextEndPointId = forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor ValidTransportState -> HeavyweightConnectionId
_nextEndPointId (\HeavyweightConnectionId
eid ValidTransportState
st -> ValidTransportState
st { _nextEndPointId :: HeavyweightConnectionId
_nextEndPointId = HeavyweightConnectionId
eid })
localNextConnOutId :: Accessor ValidLocalEndPointState LightweightConnectionId
localNextConnOutId :: Accessor ValidLocalEndPointState HeavyweightConnectionId
localNextConnOutId = forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor ValidLocalEndPointState -> HeavyweightConnectionId
_localNextConnOutId (\HeavyweightConnectionId
cix ValidLocalEndPointState
st -> ValidLocalEndPointState
st { _localNextConnOutId :: HeavyweightConnectionId
_localNextConnOutId = HeavyweightConnectionId
cix })
localConnections :: Accessor ValidLocalEndPointState (Map EndPointAddress RemoteEndPoint)
localConnections :: Accessor
ValidLocalEndPointState (Map EndPointAddress RemoteEndPoint)
localConnections = forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor ValidLocalEndPointState -> Map EndPointAddress RemoteEndPoint
_localConnections (\Map EndPointAddress RemoteEndPoint
es ValidLocalEndPointState
st -> ValidLocalEndPointState
st { _localConnections :: Map EndPointAddress RemoteEndPoint
_localConnections = Map EndPointAddress RemoteEndPoint
es })
nextConnInId :: Accessor ValidLocalEndPointState HeavyweightConnectionId
nextConnInId :: Accessor ValidLocalEndPointState HeavyweightConnectionId
nextConnInId = forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor ValidLocalEndPointState -> HeavyweightConnectionId
_nextConnInId (\HeavyweightConnectionId
rid ValidLocalEndPointState
st -> ValidLocalEndPointState
st { _nextConnInId :: HeavyweightConnectionId
_nextConnInId = HeavyweightConnectionId
rid })
remoteOutgoing :: Accessor ValidRemoteEndPointState Int
remoteOutgoing :: Accessor ValidRemoteEndPointState Int
remoteOutgoing = forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor ValidRemoteEndPointState -> Int
_remoteOutgoing (\Int
cs ValidRemoteEndPointState
conn -> ValidRemoteEndPointState
conn { _remoteOutgoing :: Int
_remoteOutgoing = Int
cs })
remoteIncoming :: Accessor ValidRemoteEndPointState (Set LightweightConnectionId)
remoteIncoming :: Accessor ValidRemoteEndPointState (Set HeavyweightConnectionId)
remoteIncoming = forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor ValidRemoteEndPointState -> Set HeavyweightConnectionId
_remoteIncoming (\Set HeavyweightConnectionId
cs ValidRemoteEndPointState
conn -> ValidRemoteEndPointState
conn { _remoteIncoming :: Set HeavyweightConnectionId
_remoteIncoming = Set HeavyweightConnectionId
cs })
remoteLastIncoming :: Accessor ValidRemoteEndPointState LightweightConnectionId
remoteLastIncoming :: Accessor ValidRemoteEndPointState HeavyweightConnectionId
remoteLastIncoming = forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor ValidRemoteEndPointState -> HeavyweightConnectionId
_remoteLastIncoming (\HeavyweightConnectionId
lcid ValidRemoteEndPointState
st -> ValidRemoteEndPointState
st { _remoteLastIncoming :: HeavyweightConnectionId
_remoteLastIncoming = HeavyweightConnectionId
lcid })
remoteNextConnOutId :: Accessor ValidRemoteEndPointState LightweightConnectionId
remoteNextConnOutId :: Accessor ValidRemoteEndPointState HeavyweightConnectionId
remoteNextConnOutId = forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor ValidRemoteEndPointState -> HeavyweightConnectionId
_remoteNextConnOutId (\HeavyweightConnectionId
cix ValidRemoteEndPointState
st -> ValidRemoteEndPointState
st { _remoteNextConnOutId :: HeavyweightConnectionId
_remoteNextConnOutId = HeavyweightConnectionId
cix })
localEndPointAt :: EndPointId -> Accessor ValidTransportState (Maybe LocalEndPoint)
localEndPointAt :: HeavyweightConnectionId
-> Accessor ValidTransportState (Maybe LocalEndPoint)
localEndPointAt HeavyweightConnectionId
addr = Accessor
ValidTransportState (Map HeavyweightConnectionId LocalEndPoint)
localEndPoints forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall key elem. Ord key => key -> T (Map key elem) (Maybe elem)
DAC.mapMaybe HeavyweightConnectionId
addr
localConnectionTo :: EndPointAddress -> Accessor ValidLocalEndPointState (Maybe RemoteEndPoint)
localConnectionTo :: EndPointAddress
-> Accessor ValidLocalEndPointState (Maybe RemoteEndPoint)
localConnectionTo EndPointAddress
addr = Accessor
ValidLocalEndPointState (Map EndPointAddress RemoteEndPoint)
localConnections forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall key elem. Ord key => key -> T (Map key elem) (Maybe elem)
DAC.mapMaybe EndPointAddress
addr
relyViolation :: EndPointPair -> String -> IO a
relyViolation :: forall a. EndPointPair -> HostName -> IO a
relyViolation (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) HostName
str = do
EndPointPair -> HostName -> IO ()
elog (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) (HostName
str forall a. [a] -> [a] -> [a]
++ HostName
" RELY violation")
forall (m :: * -> *) a. MonadFail m => HostName -> m a
fail (HostName
str forall a. [a] -> [a] -> [a]
++ HostName
" RELY violation")
elog :: EndPointPair -> String -> IO ()
elog :: EndPointPair -> HostName -> IO ()
elog (LocalEndPoint
ourEndPoint, RemoteEndPoint
theirEndPoint) HostName
msg = do
ThreadId
tid <- IO ThreadId
myThreadId
HostName -> IO ()
putStrLn forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> HostName
show (LocalEndPoint -> EndPointAddress
localAddress LocalEndPoint
ourEndPoint)
forall a. [a] -> [a] -> [a]
++ HostName
"/" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> HostName
show (RemoteEndPoint -> EndPointAddress
remoteAddress RemoteEndPoint
theirEndPoint)
forall a. [a] -> [a] -> [a]
++ HostName
"(" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> HostName
show (RemoteEndPoint -> HeavyweightConnectionId
remoteId RemoteEndPoint
theirEndPoint) forall a. [a] -> [a] -> [a]
++ HostName
")"
forall a. [a] -> [a] -> [a]
++ HostName
"/" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> HostName
show ThreadId
tid
forall a. [a] -> [a] -> [a]
++ HostName
": " forall a. [a] -> [a] -> [a]
++ HostName
msg