{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE CPP #-} #if ! MIN_VERSION_base(4,7,0) {-# LANGUAGE MagicHash #-} {-# LANGUAGE UnboxedTuples #-} #endif {-# LANGUAGE LambdaCase #-} {-# OPTIONS_GHC -fno-warn-type-defaults #-} {-# OPTIONS_GHC -fno-warn-unused-imports #-} -- Cut noise for post-AMP imports {--| A Network Transport Layer for `distributed-process` based on AMQP and single-owner queues --} module Network.Transport.AMQP ( createTransport , AMQPParameters(..) ) where import Network.Transport.AMQP.Internal.Types import qualified Network.AMQP as AMQP import qualified Network.AMQP.Types as AMQP import qualified Data.Text as T import qualified Data.Set as Set import Data.UUID.V4 import Data.UUID (toString, toWords) import Control.Concurrent.STM import Control.Concurrent.STM.TMChan import Data.Bits import Data.IORef import Data.Monoid import qualified Data.Map.Strict as Map import Data.ByteString (ByteString) import Data.Foldable hiding (mapM_) import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString as B import Data.String.Conv import Data.Serialize import Network.Transport import Network.Transport.Internal (mapIOException, asyncWhenCancelled) import Control.Applicative import Control.Concurrent (yield, threadDelay) import Control.Concurrent.Async as Async import Control.Concurrent.MVar import Control.Monad import Lens.Family2 import System.IO (hPutStrLn , stderr) import Text.Printf import Control.Monad.Catch hiding (catches, Handler(..)) import Control.Exception (mapException, catches, throwIO, Handler(..), evaluate) #if ! MIN_VERSION_base(4,7,0) import GHC.MVar import GHC.Base import GHC.PrimopWrappers #endif -------------------------------------------------------------------------------- -- Utility functions -------------------------------------------------------------------------------- encode' :: AMQPMessage -> BL.ByteString encode' = encodeLazy -------------------------------------------------------------------------------- decode' :: AMQP.Message -> Either String AMQPMessage decode' = decodeLazy . AMQP.msgBody -------------------------------------------------------------------------------- onValidEndPoint :: LocalEndPoint -> (ValidLocalEndPointState -> IO ()) -> IO () onValidEndPoint lep f = withMVar (localState lep) $ \case LocalEndPointValid v -> f v _ -> return () -------------------------------------------------------------------------------- onValidRemote :: RemoteEndPoint -> (ValidRemoteEndPointState -> IO ()) -> IO () onValidRemote rep f = withMVar (remoteState rep) $ \case RemoteEndPointValid v -> f v _ -> return () -------------------------------------------------------------------------------- apiNewEndPoint :: AMQPInternalState -> IO (Either (TransportError NewEndPointErrorCode) EndPoint) apiNewEndPoint is@AMQPInternalState{..} = try $ mapAMQPException (TransportError NewEndPointFailed . show) $ do let AMQPParameters{..} = istate_params modifyMVar istate_tstate $ \tst -> case tst of TransportClosed -> throwM $ TransportError NewEndPointFailed "Transport is closed." TransportValid vst@(ValidTransportState _ oldMap) -> do -- If a valid transport was found, create a new LocalEndPoint (ep@LocalEndPoint{..}, chan) <- endPointCreate (Map.size oldMap) is let newEp = EndPoint { receive = atomically $ do mx <- readTMChan chan case mx of Nothing -> error "receive: channel is closed" Just x -> return x , address = localAddress , connect = apiConnect is ep , closeEndPoint = apiCloseEndPoint is ep , newMulticastGroup = return . Left $ newMulticastGroupError , resolveMulticastGroup = return . Left . const resolveMulticastGroupError } return (TransportValid $ over tstateEndPoints (Map.insert localAddress ep) vst , newEp) where newMulticastGroupError = TransportError NewMulticastGroupUnsupported "Multicast not supported" resolveMulticastGroupError = TransportError ResolveMulticastGroupUnsupported "Multicast not supported" ------------------------------------------------------------------------------- endPointCreate :: Int -> AMQPInternalState -> IO (LocalEndPoint, TMChan Event) endPointCreate newId is@AMQPInternalState{..} = do -- Each new `EndPoint` has a new Rabbit queue. Think of it as a -- heavyweight connection. Subsequent connections are multiplexed -- using Rabbit's exchanges. uuid <- toS . toString <$> nextRandom let qName = maybe uuid toS (transportEndpoint istate_params) newChannel <- AMQP.openChannel (transportConnection istate_params) -- https://www.rabbitmq.com/ttl.html -- Using a TTL for a heavyweight connection we ensure we will do -- the appropriate cleanup even if we do not shutdown correctly an -- endpoint or the transport layer. (ourEndPoint,_,_) <- AMQP.declareQueue newChannel $ AMQP.newQueue { AMQP.queueName = qName <> ":" <> T.pack (show newId) , AMQP.queuePassive = False , AMQP.queueDurable = False , AMQP.queueExclusive = False , AMQP.queueHeaders = queueHeaders } newExchange <- toExchangeName (toAddress ourEndPoint) AMQP.declareExchange newChannel $ AMQP.newExchange { AMQP.exchangeName = newExchange , AMQP.exchangeType = "direct" , AMQP.exchangePassive = False , AMQP.exchangeDurable = False , AMQP.exchangeAutoDelete = True } AMQP.bindQueue newChannel ourEndPoint newExchange mempty chOut <- newTMChanIO lep <- LocalEndPoint <$> pure (toAddress ourEndPoint) <*> pure (AMQPExchange newExchange) <*> newEmptyMVar <*> newEmptyMVar opened <- newIORef True mask $ \restore -> do restore (startReceiver is lep chOut newChannel) AMQP.addChannelExceptionHandler newChannel $ \_ -> do finaliseEndPoint lep False putMVar (localState lep) $ LocalEndPointValid (ValidLocalEndPointState chOut newChannel opened newCounter Map.empty) return (lep, chOut) where queueHeaders = AMQP.FieldTable (Map.fromList [("x-expires", AMQP.FVInt32 3000)]) -------------------------------------------------------------------------------- finaliseEndPoint :: LocalEndPoint -> Bool -> IO () finaliseEndPoint ourEp requestedByUser = do join $ withMVar (localState ourEp) $ \case LocalEndPointClosed -> afterP () LocalEndPointValid v@ValidLocalEndPointState{..} -> do writeIORef _localOpened False return $ do void $ Async.mapConcurrently (remoteEndPointClose False ourEp) $ v ^. localRemotes when requestedByUser $ void $ tryPutMVar (localDone ourEp) () void $ swapMVar (localState ourEp) LocalEndPointClosed -------------------------------------------------------------------------------- toExchangeName :: EndPointAddress -> IO T.Text toExchangeName eA = do uuid <- toString <$> nextRandom return $ fromAddress eA <> ":" <> T.pack uuid -------------------------------------------------------------------------------- startReceiver :: AMQPInternalState -> LocalEndPoint -> TMChan Event -> AMQP.Channel -> IO () startReceiver tr@AMQPInternalState{..} lep@LocalEndPoint{..} _localChan ch = do void $ AMQP.consumeMsgs ch (fromAddress localAddress) AMQP.NoAck $ \(msg,_) -> do case decode' msg of Left _ -> return () Right PoisonPill -> do finaliseEndPoint lep True Right (MessageData cId rawMsg) -> do atomically $ writeTMChan _localChan $ Received cId rawMsg Right (MessageConnect theirAddress) -> do void $ createOrGetRemoteEndPoint tr lep theirAddress Right (MessageInitConnection theirAddress theirId rel) -> do join $ modifyMVar localState $ \case LocalEndPointValid vst@ValidLocalEndPointState{..} -> do case Map.lookup theirAddress _localRemotes of Nothing -> return (LocalEndPointValid vst , throwM $ InvariantViolated (RemoteEndPointLookupFailed theirAddress)) Just rep -> modifyMVar (remoteState rep) $ \case RemoteEndPointFailed -> throwM $ InvariantViolated (RemoteEndPointMustBeValid theirAddress) RemoteEndPointClosed -> throwM $ InvariantViolated (RemoteEndPointMustBeValid theirAddress) RemoteEndPointClosing{} -> throwM $ InvariantViolated (RemoteEndPointMustBeValid theirAddress) w@RemoteEndPointValid{} -> do conn <- AMQPConnection <$> pure lep <*> pure rep <*> pure rel <*> newMVar (AMQPConnectionValid $ ValidAMQPConnection Nothing Nothing i) <*> newEmptyMVar w' <- registerRemoteEndPoint (succ i) w return ( w' , ( LocalEndPointValid (set localConnections (Counter (succ i) (Map.insert (succ i) conn m)) vst) , return ()) ) z@(RemoteEndPointPending w) -> do conn <- AMQPConnection <$> pure lep <*> pure rep <*> pure rel <*> newMVar (AMQPConnectionValid $ ValidAMQPConnection Nothing Nothing i) <*> newEmptyMVar modifyIORef w (\xs -> (registerRemoteEndPoint (succ i)) : xs) return ( z , ( LocalEndPointValid (set localConnections (Counter (succ i) (Map.insert (succ i) conn m)) vst) , return ()) ) where (Counter i m) = vst ^. localConnections _ -> throwM $ InvariantViolated (LocalEndPointMustBeValid localAddress) where registerRemoteEndPoint :: ConnectionId -> RemoteEndPointState -> IO RemoteEndPointState registerRemoteEndPoint i RemoteEndPointFailed = do atomically $ do writeTMChan _localChan (ConnectionOpened i rel theirAddress) writeTMChan _localChan (ConnectionClosed i) return RemoteEndPointFailed registerRemoteEndPoint i RemoteEndPointClosed = do atomically $ do writeTMChan _localChan (ConnectionOpened i rel theirAddress) writeTMChan _localChan (ConnectionClosed i) return RemoteEndPointClosed registerRemoteEndPoint i (RemoteEndPointClosing x) = do atomically $ do writeTMChan _localChan (ConnectionOpened i rel theirAddress) writeTMChan _localChan (ConnectionClosed i) return $ RemoteEndPointClosing x registerRemoteEndPoint _ RemoteEndPointPending{} = throwM $ InvariantViolated $ RemoteEndPointCannotBePending (theirAddress) registerRemoteEndPoint i (RemoteEndPointValid vst@(ValidRemoteEndPointState exg chl _ s _)) = do publish chl exg (MessageInitConnectionOk localAddress theirId i) atomically $ writeTMChan _localChan (ConnectionOpened i rel theirAddress) return $ RemoteEndPointValid vst { _remoteIncomingConnections = Set.insert i s } -- -- MessageCloseConnection -- Right (MessageCloseConnection idx) -> do join $ modifyMVar localState $ \case LocalEndPointValid v -> case v ^. localConnectionAt idx of Nothing -> return (LocalEndPointValid v, return ()) Just conn -> do old <- swapMVar (_connectionState conn) AMQPConnectionFailed let setter = over (localConnections . cntValue) (Map.delete idx) return ( LocalEndPointValid $ setter v , case old of AMQPConnectionFailed -> return () AMQPConnectionInit -> return () -- throwM InvariantViolation AMQPConnectionClosed -> return () AMQPConnectionValid (ValidAMQPConnection{}) -> do atomically $ writeTMChan _localChan (ConnectionClosed idx) connectionCleanup (_connectionRemoteEndPoint conn) idx) LocalEndPointClosed -> return (LocalEndPointClosed, return ()) -- -- MessageInitConnectionOk -- Right (MessageInitConnectionOk theirAddress ourId theirId) -> do join $ withMVar localState $ \case LocalEndPointValid vst -> case vst ^. localRemoteAt theirAddress of Nothing -> return (return ()) -- TODO: send message to the host Just rep -> modifyMVar (remoteState rep) $ \case RemoteEndPointFailed -> return (RemoteEndPointFailed, return ()) RemoteEndPointClosed -> throwM $ InvariantViolated $ RemoteEndPointShouldBeValidOrClosed theirAddress RemoteEndPointClosing{} -> throwM $ InvariantViolated $ RemoteEndPointShouldBeValidOrClosed theirAddress t@(RemoteEndPointValid (ValidRemoteEndPointState exg chl (Counter x m) s z)) -> return $ case ourId `Map.lookup` m of Nothing -> (t, return ()) -- TODO: send message to the hostv Just c -> (RemoteEndPointValid (ValidRemoteEndPointState exg chl (Counter x (ourId `Map.delete` m)) s (z+1)) , do modifyMVar_ (_connectionState c) $ \case AMQPConnectionFailed -> return AMQPConnectionFailed AMQPConnectionInit -> return $ AMQPConnectionValid (ValidAMQPConnection (Just exg) (Just chl) theirId) AMQPConnectionClosed -> do publish chl exg (MessageCloseConnection theirId) return AMQPConnectionClosed AMQPConnectionValid _ -> throwM $ (IncorrectState "RemoteEndPoint should be closed") void $ tryPutMVar (_connectionReady c) () ) RemoteEndPointPending p -> return (RemoteEndPointPending p , throwM $ (IncorrectState "RemoteEndPoint should be closed")) LocalEndPointClosed -> return $ return () -- -- MessageEndPointClose -- Right (MessageEndPointClose theirAddress True) -> do getRemoteEndPoint lep theirAddress >>= traverse_ (\rep -> do onValidRemote rep $ \ValidRemoteEndPointState{..} -> publish _remoteChannel _remoteExchange (MessageEndPointCloseOk localAddress) remoteEndPointClose True lep rep) Right (MessageEndPointClose theirAddress False) -> do getRemoteEndPoint lep theirAddress >>= traverse_ (\rep -> do mst <- cleanupRemoteEndPoint lep rep Nothing case mst of Nothing -> return () Just st -> onValidEndPoint lep $ \vst -> do atomically $ writeTMChan (vst ^. localChan) $ ErrorEvent $ TransportError (EventConnectionLost theirAddress) "MessageEndPointClose: Exception on remote side" closeRemoteEndPoint lep rep st) -- -- MessageEndPointCloseOk -- Right (MessageEndPointCloseOk theirAddress) -> do getRemoteEndPoint lep theirAddress >>= traverse_ (\rep -> do state <- swapMVar (remoteState rep) RemoteEndPointClosed closeRemoteEndPoint lep rep state) -------------------------------------------------------------------------------- -- | Close, all network connections. closeRemoteEndPoint :: LocalEndPoint -> RemoteEndPoint -> RemoteEndPointState -> IO () closeRemoteEndPoint lep rep state = do step1 step2 state where step1 = uninterruptibleMask_ $ do -- NOTE: This step1 function is different -- from Tweag's one, which used "modifyMVar_", but -- on this implementation it seems to always deadlock -- on the "SendException" test. mbSt <- tryReadMVar (localState lep) case mbSt of Nothing -> return () Just vst -> do newState <- case vst of LocalEndPointValid v -> return $ LocalEndPointValid (over localRemotes (Map.delete (remoteAddress rep)) v) c -> return c void $ tryPutMVar (localState lep) newState step2 (RemoteEndPointClosing (ClosingRemoteEndPoint (AMQPExchange _) _ rd)) = do _ <- readMVar rd return () -- No AMQP cleanup needed (cfr. TTL) step2 _ = return () -------------------------------------------------------------------------------- connectionCleanup :: RemoteEndPoint -> ConnectionId -> IO () connectionCleanup rep cid = modifyMVar_ (remoteState rep) $ \case RemoteEndPointValid v -> return $ RemoteEndPointValid $ over remoteIncomingConnections (Set.delete cid) v c -> return c -------------------------------------------------------------------------------- -- | Asynchronous operation, shutdown of the remote end point may take a while apiCloseEndPoint :: AMQPInternalState -> LocalEndPoint -> IO () apiCloseEndPoint AMQPInternalState{..} LocalEndPoint{..} = mask_ $ either errorLog return <=< tryAMQP $ do -- we don't close endpoint here because other threads, -- should be able to access local endpoint state old <- readMVar localState case old of LocalEndPointValid ValidLocalEndPointState{..} -> do -- close channel, no events will be received atomically $ do writeTMChan _localChan EndPointClosed closeTMChan _localChan publish _localChannel localExchange PoisonPill LocalEndPointClosed -> return () takeMVar localDone void $ swapMVar localState LocalEndPointClosed modifyMVar_ istate_tstate $ \case TransportClosed -> return TransportClosed TransportValid v -> return $ TransportValid (over tstateEndPoints (Map.delete localAddress) v) -------------------------------------------------------------------------------- toAddress :: T.Text -> EndPointAddress toAddress = EndPointAddress . toS -------------------------------------------------------------------------------- fromAddress :: EndPointAddress -> T.Text fromAddress = toS . endPointAddressToByteString -------------------------------------------------------------------------------- apiConnect :: AMQPInternalState -> LocalEndPoint -> EndPointAddress -- ^ Remote address -> Reliability -- ^ Reliability (ignored) -> ConnectHints -- ^ Hints -> IO (Either (TransportError ConnectErrorCode) Connection) apiConnect is@AMQPInternalState{..} lep@LocalEndPoint{..} theirAddress reliability _ = fmap (either Left id) $ try $ do mapAMQPException (TransportError ConnectFailed . show) $ do eRep <- createOrGetRemoteEndPoint is lep theirAddress case eRep of Left _ -> return $ Left $ TransportError ConnectFailed "LocalEndPoint is closed." Right rep -> do conn <- AMQPConnection <$> pure lep <*> pure rep <*> pure reliability <*> newMVar AMQPConnectionInit <*> newEmptyMVar let apiConn = Connection { send = apiSend conn , close = apiClose conn } join $ modifyMVar (remoteState rep) $ \w -> case w of RemoteEndPointClosed -> do return ( RemoteEndPointClosed , return $ Left $ TransportError ConnectFailed "Transport is closed.") RemoteEndPointClosing x -> do return ( RemoteEndPointClosing x , return $ Left $ TransportError ConnectFailed "RemoteEndPoint closed.") RemoteEndPointValid _ -> do newState <- handshake conn w return (newState, waitReady conn apiConn) RemoteEndPointPending z -> do modifyIORef z (\zs -> handshake conn : zs) return ( RemoteEndPointPending z, waitReady conn apiConn) RemoteEndPointFailed -> return ( RemoteEndPointFailed , return $ Left $ TransportError ConnectFailed "RemoteEndPoint failed.") where waitReady conn apiConn = join $ withMVar (_connectionState conn) $ \case AMQPConnectionInit{} -> return $ yield >> waitReady conn apiConn AMQPConnectionValid{} -> afterP $ Right apiConn AMQPConnectionFailed{} -> afterP $ Left $ TransportError ConnectFailed "Connection failed." AMQPConnectionClosed{} -> afterP $ Left $ TransportError ConnectFailed "Connection closed" handshake _ RemoteEndPointClosed = return RemoteEndPointClosed handshake _ RemoteEndPointPending{} = throwM $ TransportError ConnectFailed "Connection pending." handshake _ (RemoteEndPointClosing x) = return $ RemoteEndPointClosing x handshake _ RemoteEndPointFailed = return RemoteEndPointFailed handshake conn (RemoteEndPointValid (ValidRemoteEndPointState exg ch (Counter i m) s z)) = do publish ch exg (MessageInitConnection localAddress i' reliability) return $ RemoteEndPointValid (ValidRemoteEndPointState exg ch (Counter i' (Map.insert i' conn m)) s z) where i' = succ i -------------------------------------------------------------------------------- getRemoteEndPoint :: LocalEndPoint -> EndPointAddress -> IO (Maybe RemoteEndPoint) getRemoteEndPoint ourEp theirAddr = do withMVar (localState ourEp) $ \case LocalEndPointValid v -> return $ theirAddr `Map.lookup` _localRemotes v LocalEndPointClosed -> return Nothing -------------------------------------------------------------------------------- createOrGetRemoteEndPoint :: AMQPInternalState -> LocalEndPoint -> EndPointAddress -> IO (Either AMQPError RemoteEndPoint) createOrGetRemoteEndPoint AMQPInternalState{..} ourEp theirAddr = do join $ modifyMVar (localState ourEp) $ \case LocalEndPointValid v@ValidLocalEndPointState{..} -> do opened <- readIORef _localOpened if opened then do case v ^. localRemoteAt theirAddr of Nothing -> create v Just rep -> do withMVar (remoteState rep) $ \case RemoteEndPointFailed -> do create v _ -> return (LocalEndPointValid v, return $ Right rep) else return (LocalEndPointValid v, return $ Left $ IncorrectState "EndPointClosing") LocalEndPointClosed -> return ( LocalEndPointClosed , return $ Left $ IncorrectState "EndPoint is closed" ) where create v = do newChannel <- AMQP.openChannel (transportConnection istate_params) newExchange <- toExchangeName theirAddr AMQP.declareExchange newChannel $ AMQP.newExchange { AMQP.exchangeName = newExchange , AMQP.exchangeType = "direct" , AMQP.exchangePassive = False , AMQP.exchangeDurable = False , AMQP.exchangeAutoDelete = True } AMQP.bindQueue newChannel (fromAddress theirAddr) newExchange mempty state <- newMVar . RemoteEndPointPending =<< newIORef [] opened <- newIORef False let rep = RemoteEndPoint theirAddr state opened return ( LocalEndPointValid $ over localRemotes (Map.insert theirAddr rep) v , initialize newChannel (AMQPExchange newExchange) rep >> return (Right rep)) ourAddr = localAddress ourEp initialize ch exg rep = do publish ch exg $ MessageConnect ourAddr let v = ValidRemoteEndPointState exg ch newCounter Set.empty 0 modifyMVar_ (remoteState rep) $ \case RemoteEndPointPending p -> do z <- foldM (\y f -> f y) (RemoteEndPointValid v) . Prelude.reverse =<< readIORef p modifyIORef (remoteOpened rep) (const True) return z RemoteEndPointValid _ -> throwM $ InvariantViolated (RemoteEndPointMustBeValid theirAddr) RemoteEndPointClosed -> return RemoteEndPointClosed RemoteEndPointClosing j -> return (RemoteEndPointClosing j) RemoteEndPointFailed -> return RemoteEndPointFailed -------------------------------------------------------------------------------- publish :: AMQP.Channel -> AMQPExchange -> AMQPMessage -> IO () publish chn (AMQPExchange e) msg = do AMQP.publishMsg chn e mempty --TODO: Do we need a routing key? (AMQP.newMsg { AMQP.msgBody = encode' msg , AMQP.msgDeliveryMode = Just AMQP.NonPersistent }) -------------------------------------------------------------------------------- -- TODO: Deal with exceptions and error at the broker level. apiSend :: AMQPConnection -> [ByteString] -> IO (Either (TransportError SendErrorCode) ()) apiSend (AMQPConnection us them _ st _) msg = do msgs <- try $ mapM_ evaluate msg case msgs of Left ex -> do cleanup return $ Left $ TransportError SendFailed (show (ex::SomeException)) -- TODO: Check that, in case of AMQP-raised exceptions, we are still -- doing the appropriate cleanup. Right _ -> (fmap Right send_) `catches` [ Handler $ \ex -> -- TransportError - return, as all require -- actions were performed return $ Left (ex :: TransportError SendErrorCode) , Handler $ \ex -> do -- AMQPError appeared exception cleanup return $ Left $ TransportError SendFailed (show (ex::AMQPError)) , Handler $ \ex -> do -- AMQPException appeared exception cleanup return $ Left $ TransportError SendFailed (show (ex::AMQP.AMQPException)) ] where send_ :: IO () send_ = join $ withMVar (remoteState them) $ \x -> case x of RemoteEndPointPending{} -> return $ yield >> send_ RemoteEndPointFailed -> throwIO $ TransportError SendFailed "Remote end point is failed." RemoteEndPointClosed -> throwIO $ TransportError SendFailed "Remote end point is closed." RemoteEndPointClosing{} -> throwIO $ TransportError SendFailed "Remote end point is closing." RemoteEndPointValid _ -> withMVar st $ \case AMQPConnectionInit -> return $ yield >> send_ AMQPConnectionClosed -> throwIO $ TransportError SendClosed "Connection is closed" AMQPConnectionFailed -> throwIO $ TransportError SendFailed "Connection is failed" AMQPConnectionValid (ValidAMQPConnection (Just exg) (Just ch) idx) -> do res <- try $ publish ch exg (MessageData idx msg) case res of Right _ -> afterP () Left (_ :: SomeException) -> do rep <- cleanupRemoteEndPoint us them Nothing traverse_ (\z -> do onValidEndPoint us $ \w -> atomically $ writeTMChan (_localChan w) $ ErrorEvent $ TransportError (EventConnectionLost (remoteAddress them)) "apiSend: Exception on remote side" closeRemoteEndPoint us them z) rep throwIO $ TransportError SendFailed "Connection broken." cleanup = do void $ cleanupRemoteEndPoint us them (Just $ \v -> publish (_remoteChannel v) (_remoteExchange v) $ MessageEndPointClose (localAddress us) False) onValidEndPoint us $ \v -> do atomically $ writeTMChan (_localChan v) $ ErrorEvent $ TransportError (EventConnectionLost (remoteAddress them)) "Exception on send." -------------------------------------------------------------------------------- apiClose :: AMQPConnection -> IO () apiClose (AMQPConnection _ them _ st _) = either errorLog return <=< tryAMQP $ uninterruptibleMask_ $ do join $ modifyMVar st $ \case AMQPConnectionValid (ValidAMQPConnection _ _ idx) -> do return (AMQPConnectionClosed, do modifyMVar_ (remoteState them) $ \case v@RemoteEndPointValid{} -> notify idx v v@(RemoteEndPointPending p) -> modifyIORef p (\xs -> notify idx : xs) >> return v v -> return v ) _ -> return (AMQPConnectionClosed, return ()) where notify _ RemoteEndPointFailed = return RemoteEndPointFailed notify _ (RemoteEndPointClosing x) = return $ RemoteEndPointClosing x notify _ RemoteEndPointClosed = return RemoteEndPointClosed notify _ RemoteEndPointPending{} = throwM $ InvariantViolated (RemoteEndPointMustBeValid (remoteAddress them)) notify idx w@(RemoteEndPointValid (ValidRemoteEndPointState exg ch _ _ _)) = do publish ch exg (MessageCloseConnection idx) return w -------------------------------------------------------------------------------- -- | Close all endpoint connections, return previous state in case -- if it was alive, for further cleanup actions. cleanupRemoteEndPoint :: LocalEndPoint -> RemoteEndPoint -> (Maybe (ValidRemoteEndPointState -> IO ())) -> IO (Maybe RemoteEndPointState) cleanupRemoteEndPoint lep rep actions = do modifyMVar (localState lep) $ \case LocalEndPointValid v -> do modifyIORef (remoteOpened rep) (const False) oldState <- swapMVar (remoteState rep) newState case oldState of RemoteEndPointValid w -> do let cn = w ^. (remotePendingConnections . cntValue) traverse_ (\c -> void $ swapMVar (_connectionState c) AMQPConnectionFailed) cn cn' <- foldM (\c' idx -> case idx `Map.lookup` cn of Nothing -> return c' Just c -> do void $ swapMVar (_connectionState c) AMQPConnectionFailed return $ over cntValue (Map.delete idx) c' ) (v ^. localConnections) (Set.toList (_remoteIncomingConnections w)) case actions of Nothing -> return () Just f -> f w return ( LocalEndPointValid (set localConnections cn' v) , Just oldState) _ -> return (LocalEndPointValid v, Nothing) c -> return (c, Nothing) where newState = RemoteEndPointFailed -------------------------------------------------------------------------------- remoteEndPointClose :: Bool -> LocalEndPoint -> RemoteEndPoint -> IO () remoteEndPointClose silent lep rep = do modifyIORef (remoteOpened rep) (const False) join $ modifyMVar (remoteState rep) $ \o -> case o of RemoteEndPointFailed -> return (o, return ()) RemoteEndPointClosed -> return (o, return ()) RemoteEndPointClosing (ClosingRemoteEndPoint _ _ l) -> return (o, void $ readMVar l) RemoteEndPointPending _ -> do let err = (error "Pending actions should not be executed") closing err err o -- TODO: store socket, or delay RemoteEndPointValid v -> closing (_remoteExchange v) (_remoteChannel v) o where closing exg ch old = do lock <- newEmptyMVar return (RemoteEndPointClosing (ClosingRemoteEndPoint exg ch lock), go lock old) go lock old@(RemoteEndPointValid (ValidRemoteEndPointState c amqpCh _ s i)) = do -- close all connections void $ cleanupRemoteEndPoint lep rep Nothing withMVar (localState lep) $ \case LocalEndPointClosed -> return () LocalEndPointValid ValidLocalEndPointState{..} -> do -- notify about all connections close (?) do we really want it? traverse_ (atomically . writeTMChan _localChan . ConnectionClosed) (Set.toList s) -- if we have outgoing connections, then we have connection error when (i > 0) $ do let msg = "Remote end point closed, but " <> show i <> " outgoing connection(s)" atomically $ writeTMChan _localChan $ ErrorEvent $ TransportError (EventConnectionLost (remoteAddress rep)) msg unless silent $ do publish amqpCh c (MessageEndPointClose (localAddress lep) True) yield void $ Async.race (readMVar lock) (threadDelay 1000000) void $ tryPutMVar lock () closeRemoteEndPoint lep rep old return () go _ _ = return () -------------------------------------------------------------------------------- createTransport :: AMQPParameters -> IO Transport createTransport params@AMQPParameters{..} = do let validTState = ValidTransportState transportConnection Map.empty tState <- newMVar (TransportValid validTState) let iState = AMQPInternalState params tState return Transport { newEndPoint = apiNewEndPoint iState , closeTransport = apiCloseTransport iState } -------------------------------------------------------------------------------- -- Do not close the externally-passed AMQP connection, -- or it will compromise users sharing it! apiCloseTransport :: AMQPInternalState -> IO () apiCloseTransport is = do old <- swapMVar (istate_tstate is) TransportClosed case old of TransportClosed -> return () TransportValid (ValidTransportState _ mp) -> either errorLog return <=< tryAMQP $ do traverse_ (apiCloseEndPoint is) mp -------------------------------------------------------------------------------- afterP :: a -> IO (IO a) afterP = return . return -------------------------------------------------------------------------------- -- | Print error to standart output, this function should be used for -- errors that could not be handled in a normal way. errorLog :: Show a => a -> IO () errorLog s = hPutStrLn stderr (printf "[network-transport-amqp] Unhandled error: %s" $ show s) -------------------------------------------------------------------------------- promoteAMQPException :: a -> a promoteAMQPException = mapException DriverError -------------------------------------------------------------------------------- tryAMQP :: (MonadCatch m) => m a -> m (Either AMQPError a) tryAMQP = try . promoteAMQPException -------------------------------------------------------------------------------- mapAMQPException :: (Exception e) => (AMQPError -> e) -> a -> a mapAMQPException = mapException #if ! MIN_VERSION_base(4,7,0) -------------------------------------------------------------------------------- tryReadMVar :: MVar a -> IO (Maybe a) tryReadMVar (MVar m) = IO $ \ s -> case tryReadMVar# m s of (# s', 0#, _ #) -> (# s', Nothing #) -- MVar is empty (# s', _, a #) -> (# s', Just a #) -- MVar is full #endif