module Network.Nats.ConnectionManager
( ConnectionManager
, ManagerSettings (..)
, SockAddr
, startConnectionManager
, stopConnectionManager
, defaultSettings
, randomSelect
, roundRobinSelect
) where
import Control.Concurrent (ThreadId, forkIO, killThread, myThreadId)
import Control.Concurrent.STM ( TVar, atomically, newTVarIO
, readTVarIO, writeTVar
)
import Control.Exception (handle, throwIO, throwTo)
import Network.URI (URI)
import Network.Socket (SockAddr)
import System.Random (randomRIO)
import Network.Nats.Connection ( Connection, Downstream
, Upstream, makeConnection
, clientShutdown, waitForShutdown, sockAddr
)
import Network.Nats.Subscriber (SubscriberMap)
import Network.Nats.Types (NatsException (..))
data ConnectionManager = ConnectionManager
{ connection :: TVar (Maybe Connection)
, managerThread :: ThreadId
}
data ManagerContext = ManagerContext
{ upstream :: !Upstream
, downstream :: !Downstream
, subscriberMap :: !SubscriberMap
, uris :: ![URI]
, currUriIdx :: !Int
, currConn :: TVar (Maybe Connection)
, callerThread :: !ThreadId
}
data ManagerSettings = ManagerSettings
{ reconnectionAttempts :: !Int
, maxWaitTimeMS :: !Int
, serverSelect :: ([URI], Int) -> IO (URI, Int)
, connectedTo :: SockAddr -> IO ()
, disconnectedFrom :: SockAddr -> IO ()
}
startConnectionManager :: ManagerSettings
-> Upstream
-> Downstream
-> SubscriberMap
-> [URI]
-> IO ConnectionManager
startConnectionManager settings upstream' downstream'
subscriberMap' uris' = do
conn <- newTVarIO Nothing
caller <- myThreadId
let context = ManagerContext { upstream = upstream'
, downstream = downstream'
, subscriberMap = subscriberMap'
, uris = uris'
, currUriIdx = 1
, currConn = conn
, callerThread = caller
}
ConnectionManager <$> pure conn
<*> forkIO (managerLoop settings context)
stopConnectionManager :: ConnectionManager -> IO ()
stopConnectionManager mgr = do
killThread $ managerThread mgr
mapM_ clientShutdown =<< readTVarIO (connection mgr)
defaultSettings :: ManagerSettings
defaultSettings =
ManagerSettings
{ reconnectionAttempts = 5
, maxWaitTimeMS = 2000
, serverSelect = roundRobinSelect
, connectedTo = const (return ())
, disconnectedFrom = const (return ())
}
randomSelect :: ([URI], Int) -> IO (URI, Int)
randomSelect (xs, _) = do
idx <- randomRIO (0, length xs 1)
return (xs !! idx, idx)
roundRobinSelect :: ([URI], Int) -> IO (URI, Int)
roundRobinSelect (xs, currIdx)
| currIdx == length xs 1 = return (head xs, 0)
| otherwise = return (xs !! (currIdx + 1), currIdx + 1)
managerLoop :: ManagerSettings -> ManagerContext -> IO ()
managerLoop mgr@ManagerSettings {..} ctx@ManagerContext {..} =
exceptionForward callerThread `handle` do
(newIdx, conn) <- tryConnect mgr ctx reconnectionAttempts
atomically $ writeTVar currConn (Just conn)
connectedTo $ sockAddr conn
waitForShutdown conn
atomically $ writeTVar currConn Nothing
disconnectedFrom $ sockAddr conn
managerLoop mgr $ ctx { currUriIdx = newIdx }
tryConnect :: ManagerSettings -> ManagerContext -> Int
-> IO (Int, Connection)
tryConnect _ _ 0 = throwIO ConnectionGiveUpException
tryConnect mgr@ManagerSettings {..} ctx@ManagerContext {..} attempts = do
(uri, uriIdx) <- serverSelect (uris, currUriIdx)
maybe (tryConnect mgr ctx (attempts 1))
(\conn -> return (uriIdx, conn))
=<< makeConnection (toUS maxWaitTimeMS) uri
upstream downstream subscriberMap
exceptionForward :: ThreadId -> NatsException -> IO ()
exceptionForward = throwTo
toUS :: Int -> Int
toUS n = n * 1000