module Network.Nats.Connection
( Connection (sockAddr)
, Upstream
, Downstream
, makeConnection
, clientShutdown
, waitForShutdown
) where
import Control.Concurrent.Async (Async, async, waitAnyCatchCancel)
import Control.Exception (SomeException, fromException, throwIO, handle)
import Control.Monad (void, when)
import Data.Conduit (($$), (=$=))
import Data.Conduit.Attoparsec (sinkParser)
import Data.Conduit.List (sourceList)
import Data.List
import Data.Maybe (fromJust, isNothing)
import Network.Socket ( AddrInfo (..), HostName, PortNumber
, SockAddr, defaultHints, getAddrInfo
)
import Network.URI (URI, uriAuthority, uriRegName, uriPort, uriUserInfo)
import System.Timeout (timeout)
import Network.Nats.Types (NatsException (..))
import Network.Nats.Conduit ( Upstream, Downstream, connectionSource
, connectionSink, streamSource
, streamSink, messageChunker
)
import Network.Nats.Subscriber (SubscriberMap, subscribeMessages)
import Network.Nats.Message.Message (Message (..))
import Network.Nats.Message.Parser (parseMessage)
import Network.Nats.Message.Writer (writeMessage)
import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Lazy as LBS
import qualified Network.Connection as NC
type Tmo = Int
data Connection = Connection
{ connection :: !NC.Connection
, sockAddr :: !SockAddr
, fromNet :: !(Async ())
, toNet :: !(Async ())
}
makeConnection :: Tmo -> URI -> Upstream -> Downstream -> SubscriberMap
-> IO (Maybe Connection)
makeConnection tmo uri fromApp toApp subscriberMap =
connectionError `handle` (Just <$> makeConnection' tmo uri fromApp
toApp subscriberMap)
where
connectionError :: SomeException -> IO (Maybe Connection)
connectionError e
| isConnectionRefused e = return Nothing
| isResolvError e = return Nothing
| otherwise =
case fromException e of
(Just HandshakeException) -> return Nothing
_ -> throwIO e
makeConnection' :: Tmo -> URI -> Upstream -> Downstream -> SubscriberMap
-> IO Connection
makeConnection' tmo uri fromApp toApp subscriberMap = do
let host = hostFromUri uri
port = portFromUri uri
ctx <- NC.initConnectionContext
conn <- NC.connectTo ctx
NC.ConnectionParams
{ NC.connectionHostname = host
, NC.connectionPort = port
, NC.connectionUseSocks = Nothing
, NC.connectionUseSecure = Nothing
}
msg <- timeout tmo $ getSingleMessage conn
when (isNothing msg) $ do
NC.connectionClose conn
throwIO HandshakeException
handshake uri conn $ fromJust msg
msgs <- subscribeMessages subscriberMap
Connection conn <$> toSockAddr host port
<*> async (recvPipe conn toApp)
<*> async (do
replaySubscriptions conn msgs
sendPipe fromApp conn)
recvPipe :: NC.Connection -> Downstream -> IO ()
recvPipe conn toApp = connectionSource conn $$ streamSink toApp
sendPipe :: Upstream -> NC.Connection -> IO ()
sendPipe fromApp conn = streamSource fromApp $$ connectionSink conn
replaySubscriptions :: NC.Connection -> [Message] -> IO ()
replaySubscriptions conn msgs =
sourceList msgs =$= messageChunker $$ connectionSink conn
clientShutdown :: Connection -> IO ()
clientShutdown conn = do
NC.connectionClose $ connection conn
void $ waitAnyCatchCancel [ fromNet conn, toNet conn ]
waitForShutdown :: Connection -> IO ()
waitForShutdown conn = do
void $ waitAnyCatchCancel [ fromNet conn, toNet conn ]
NC.connectionClose $ connection conn
handshake :: URI -> NC.Connection -> Message -> IO ()
handshake uri conn INFO {..} = do
let (user, pass) = credentialsFromUri uri
connect = CONNECT { clientVerbose = Just False
, clientPedantic = Just False
, clientSslRequired = Just False
, clientAuthToken = Nothing
, clientUser = user
, clientPass = pass
, clientName = Just "hats"
, clientLang = Just "Haskell"
, clientVersion = Just "0.1.0.0"
}
mapM_ (NC.connectionPut conn) $ LBS.toChunks (writeMessage connect)
handshake _ _ _ = throwIO HandshakeException
hostFromUri :: URI -> HostName
hostFromUri = uriRegName . fromJust . uriAuthority
portFromUri :: URI -> PortNumber
portFromUri = fromIntegral . extractPort . uriPort . fromJust . uriAuthority
credentialsFromUri :: URI -> (Maybe BS.ByteString, Maybe BS.ByteString)
credentialsFromUri =
toBS . extractCredentials . uriUserInfo. fromJust . uriAuthority
where
toBS (user, pass) = (BS.pack <$> user, BS.pack <$> pass)
toSockAddr :: HostName -> PortNumber -> IO SockAddr
toSockAddr host port =
addrAddress . head <$> getAddrInfo (Just defaultHints)
(Just host)
(Just $ show port)
extractPort :: String -> Int
extractPort [] = 4222
extractPort ":" = 4222
extractPort (':':str) = read str
extractPort _ = error "This is no valid port, ehh?"
extractCredentials :: String -> (Maybe String, Maybe String)
extractCredentials "" = (Nothing, Nothing)
extractCredentials str =
let str' = takeWhile (/= '@') str
colon = elemIndex ':' str'
in
if isNothing colon
then (Just str', Nothing)
else
let (user, _:pass) = splitAt (fromJust colon) str'
in (Just user, Just pass)
isConnectionRefused :: SomeException -> Bool
isConnectionRefused e =
"connect: does not exist (Connection refused)" `isInfixOf` show e
isResolvError :: SomeException -> Bool
isResolvError e =
show e == "getAddrInfo: does not exist (Name or service not known)"
getSingleMessage :: NC.Connection -> IO Message
getSingleMessage c = connectionSource c $$ sinkParser parseMessage