{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PatternGuards #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TemplateHaskell #-} module Network.Nats ( -- * How to use this module -- | -- @ -- -- {-\# LANGUAGE OverloadedStrings \#-} -- -- import qualified Data.ByteString.Lazy as BL -- -- nats <- 'connect' \"nats:\/\/user:password\@localhost:4222\" -- -- sid \<- 'subscribe' nats \"news\" Nothing $ \\_ _ msg _ -\> putStrLn $ show msg -- -- 'publish' nats \"news\" \"I got news for you\" -- -- 'unsubscribe' nats sid -- -- 'subscribe' nats \"gift\" Nothing $ \\_ _ msg mreply -> do -- putStrLn $ show msg -- case mreply of -- Nothing -> return () -- Just reply -> 'publish' nats reply \"I've got a gift for you.\" -- -- reply <- 'request' nats \"gift\" \"Do you have anything for me?\" -- -- putStrLn $ show reply -- @ -- -- The 'connect' call connects to the NATS server and creates a receiver thread. The -- callbacks are run synchronously on this thread when a server messages comes. -- Client commands are generally acknowledged by the server with an +OK message, -- the library waits for acknowledgment only for the 'subscribe' command. The NATS -- server usually closes the connection when there is an error. -- * Comparison to API in other languages -- |Compared to API in other languages, the Haskell binding does -- not implement timeouts and automatic unsubscribing, the 'request' call is implemented -- as a synchronous call. -- -- The timeouts can be easily implemented using 'System.Timeout' module, automatic unsubscribing -- can be done in the callback function. -- * Error behaviour -- |The 'connect' function tries to connect to the NATS server. In case of failure it immediately fails. -- If there is an error during operations, the NATS module tries to reconnect to the server. -- When there are more servers, the client immediately tries to connect to the next server. If -- that fails, it waits 1s before trying the next server in the NatsSettings list. -- -- During the reconnection, the calls 'subscribe' and 'request' will block. The calls -- 'publish' and 'unsubscribe' silently fail (unsubscribe is handled locally, NATS is a messaging -- system without guarantees, 'publish' is not guaranteed to succeed anyway). -- After reconnecting to the server, the module automatically resubscribes to previously subscribed channels. -- -- If there is a network failure, the nats commands 'subscribe' and 'request' -- may fail on an IOexception or NatsException. The 'subscribe' -- command is synchronous, it waits until the server responds with +OK. The commands 'publish' -- and 'unsubscribe' are asynchronous, no confirmation from server is required and they -- should not raise an exception. -- Nats , NatsSID , connect , connectSettings , NatsHost(..) , NatsSettings(..) , defaultSettings -- * Exceptions , NatsException -- * Access , MsgCallback , subscribe , unsubscribe , publish , request , requestMany -- * Termination , disconnect ) where import Control.Applicative ((<$>), (<*>)) import Control.Concurrent import Control.Concurrent.Async (concurrently) import Control.Exception (AsyncException, Exception, Handler (..), IOException, SomeException, bracket, bracketOnError, catch, catches, throwIO) import Control.Monad (forever, replicateM, unless, void, when, mzero) import Data.Dequeue as D import qualified Data.Foldable as FOLD import Data.IORef import Data.Maybe (fromMaybe) import Data.Typeable import Network.Socket (SockAddr (..), SocketOption (KeepAlive, NoDelay), getAddrInfo, setSocketOption) import qualified Network.Socket as S import System.IO import System.Random (randomRIO) import System.Timeout import qualified Data.ByteString.Char8 as BS import qualified Data.ByteString.Lazy.Char8 as BL import Data.Char (isUpper, toLower) import qualified Data.Map.Strict as Map import qualified Data.Text as T import Data.Text.Encoding (decodeUtf8) import qualified Data.Aeson as AE import Data.Aeson ((.:), (.!=)) import Data.Aeson.TH (defaultOptions, deriveJSON, fieldLabelModifier) import qualified Network.URI as URI -- | How often should we ping the server pingInterval :: Int pingInterval = 3000000 -- | Timeout interval for connect operations timeoutInterval :: Int timeoutInterval = 1000000 -- | NATS communication error data NatsException = NatsException String deriving (Show, Typeable) instance Exception NatsException data NatsConnectionOptions = NatsConnectionOptions { natsConnUser :: String , natsConnPass :: String , natsConnVerbose :: Bool , natsConnPedantic :: Bool , natsConnSslRequired :: Bool } deriving (Show) defaultConnectionOptions :: NatsConnectionOptions defaultConnectionOptions = NatsConnectionOptions{natsConnUser="nats",natsConnPass="nats", natsConnVerbose=True, natsConnPedantic=True, natsConnSslRequired=False} $(deriveJSON defaultOptions{fieldLabelModifier = let insertUnderscore acc chr | isUpper chr = chr : '_' : acc | otherwise = chr : acc in map toLower . drop 1 . reverse . foldl insertUnderscore [] . drop 8 } ''NatsConnectionOptions) -- | Server information sent usually upon opening a connection to NATS server data NatsServerInfo = NatsServerInfo { -- natsSvrServerId :: T.Text -- , natsSvrVersion :: T.Text -- , natsSvrMaxPayload :: Int natsSvrAuthRequired :: Bool } deriving (Show) $(deriveJSON defaultOptions{fieldLabelModifier = let insertUnderscore acc chr | isUpper chr = chr : '_' : acc | otherwise = chr : acc in map toLower . drop 1 . reverse . foldl insertUnderscore [] . drop 7 } ''NatsServerInfo) newtype NatsSID = NatsSID Int deriving (Num, Ord, Eq) instance Show NatsSID where show (NatsSID num) = show num instance Read NatsSID where readsPrec x1 x2 = map (\(a,rest) -> (NatsSID a, rest)) $ readsPrec x1 x2 type MsgCallback = NatsSID -- ^ SID of subscription -> String -- ^ Subject -> BL.ByteString -- ^ Message -> Maybe String -- ^ Reply subject -> IO () data NatsSubscription = NatsSubscription { subSubject :: Subject , subQueue :: Maybe Subject , subCallback :: MsgCallback , subSid :: NatsSID } type FifoQueue = D.BankersDequeue (Maybe T.Text -> IO ()) -- | Control structure representing a connection to NATS server data Nats = Nats { natsSettings :: NatsSettings , natsRuntime :: MVar (Handle, -- Network socket FifoQueue, -- FIFO of sent commands waiting for ack Bool, -- False if we are disconnected MVar () -- Empty mvar that gets full in the moment we connect ) , natsThreadId :: MVar ThreadId , natsNextSid :: IORef NatsSID , natsSubMap :: IORef (Map.Map NatsSID NatsSubscription) } -- | Host settings; may have different username/password for each host data NatsHost = NatsHost { natsHHost :: String , natsHPort :: Int , natsHUser :: String -- ^ Username for authentication , natsHPass :: String -- ^ Password for authentication } deriving (Show) instance AE.FromJSON NatsHost where parseJSON (AE.Object v) = NatsHost <$> v .: "host" <*> v .: "port" .!= 4222 <*> v .: "user" .!= "nats" <*> v .: "pass" .!= "pass" parseJSON _ = mzero -- | Advanced settings for connecting to NATS server data NatsSettings = NatsSettings { natsHosts :: [NatsHost] , natsOnReconnect :: Nats -> (String, Int) -> IO () -- ^ Called when a client has successfully re-connected. This callback is called synchronously -- before the processing of incoming messages begins. It is not called when the client -- connects the first time, as such connection is synchronous. , natsOnDisconnect :: Nats -> String -> IO () -- ^ Called when a client is disconnected. } defaultSettings :: NatsSettings defaultSettings = NatsSettings { natsHosts = [NatsHost "localhost" 4222 "nats" "nats"] , natsOnReconnect = \_ _ -> (return ()) , natsOnDisconnect = \_ _ -> (return ()) } -- | Message received by the client from the server data NatsSvrMessage = NatsSvrMsg { msgSubject::String, msgSid::NatsSID, msgText::BS.ByteString, msgReply::Maybe String} | NatsSvrOK | NatsSvrError T.Text | NatsSvrPing | NatsSvrPong | NatsSvrInfo NatsServerInfo deriving (Show) newtype Subject = Subject String deriving (Show) subjectToStr :: Subject -> String subjectToStr (Subject str) = str makeSubject :: String -> Subject makeSubject "" = error "Empty subject" makeSubject str | any (<=' ') str = error $ "Subject contains incorrect characters: " ++ str | otherwise = Subject str -- | Message sent from the client to server data NatsClntMessage = NatsClntPing | NatsClntPong | NatsClntSubscribe Subject NatsSID (Maybe Subject) | NatsClntUnsubscribe NatsSID | NatsClntPublish Subject (Maybe Subject) BL.ByteString | NatsClntConnect NatsConnectionOptions -- | Encode NATS client message makeClntMsg :: NatsClntMessage -> BL.ByteString makeClntMsg = BL.fromChunks . _makeClntMsg where _makeClntMsg :: NatsClntMessage -> [BS.ByteString] _makeClntMsg NatsClntPing = ["PING"] _makeClntMsg NatsClntPong = ["PONG"] _makeClntMsg (NatsClntSubscribe subject sid (Just queue)) = [BS.pack $ "SUB " ++ subjectToStr subject ++ " " ++ subjectToStr queue ++ " " ++ show sid] _makeClntMsg (NatsClntSubscribe subject sid Nothing) = [BS.pack $ "SUB " ++ subjectToStr subject ++ " " ++ show sid] _makeClntMsg (NatsClntUnsubscribe sid) = [ BS.pack $ "UNSUB " ++ show sid ] _makeClntMsg (NatsClntPublish subj Nothing msg) = BS.pack ("PUB " ++ subjectToStr subj ++ " " ++ show (BL.length msg) ++ "\r\n") : BL.toChunks msg _makeClntMsg (NatsClntPublish subj (Just reply) msg) = BS.pack ("PUB " ++ subjectToStr subj ++ " " ++ subjectToStr reply ++ " " ++ show (BL.length msg) ++ "\r\n") : BL.toChunks msg _makeClntMsg (NatsClntConnect info) = "CONNECT " : BL.toChunks (AE.encode info) -- | Decode NATS server message; result is message + payload (payload is 'undefined' in NatsSvrMsg) decodeMessage :: BS.ByteString -> Maybe (NatsSvrMessage, Maybe Int) decodeMessage line = decodeMessage_ mid (BS.drop 1 mrest) where (mid, mrest) = BS.span (\x -> x/=' ' && x/='\r') line decodeMessage_ :: BS.ByteString -> BS.ByteString -> Maybe (NatsSvrMessage, Maybe Int) decodeMessage_ "PING" _ = Just (NatsSvrPing, Nothing) decodeMessage_ "PONG" _ = Just (NatsSvrPong, Nothing) decodeMessage_ "+OK" _ = Just (NatsSvrOK, Nothing) decodeMessage_ "-ERR" msg = Just (NatsSvrError (decodeUtf8 msg), Nothing) decodeMessage_ "INFO" msg = do info <- AE.decode $ BL.fromChunks [msg] return (NatsSvrInfo info, Nothing) decodeMessage_ "MSG" msg = case map BS.unpack (BS.words msg) of [subj, sid, len] -> return (NatsSvrMsg subj (read sid) undefined Nothing, Just $ read len) [subj, sid, reply, len] -> return (NatsSvrMsg subj (read sid) undefined (Just reply), Just $ read len) _ -> fail "" decodeMessage_ _ _ = Nothing -- | Returns next sid and updates MVar newNatsSid :: Nats -> IO NatsSID newNatsSid nats = atomicModifyIORef' (natsNextSid nats) $ \sid -> (sid + 1, sid) -- | Generates a new INBOX name for request/response communication newInbox :: IO String newInbox = do rnd <- replicateM 13 (randomRIO ('a', 'z')) return $ "_INBOX." ++ rnd -- | Create a TCP connection to the server connectToServer :: String -> Int -> IO Handle connectToServer hostname port = do addrinfos <- getAddrInfo Nothing (Just hostname) Nothing let serveraddr = head addrinfos -- Create a socket bracketOnError (S.socket (S.addrFamily serveraddr) S.Stream S.defaultProtocol) S.sClose (\sock -> do setSocketOption sock KeepAlive 1 setSocketOption sock NoDelay 1 let connaddr = case S.addrAddress serveraddr of SockAddrInet _ haddr -> SockAddrInet (fromInteger $ toInteger port) haddr SockAddrInet6 _ finfo haddr scopeid -> SockAddrInet6 (fromInteger $ toInteger port) finfo haddr scopeid other -> other S.connect sock connaddr h <- S.socketToHandle sock ReadWriteMode hSetBuffering h NoBuffering return h ) ensureConnection :: Nats -> Bool -- ^ If true, wait for the connection to become available -> ((Handle, FifoQueue) -> IO FifoQueue) -- ^ Action to do when the connection is available -> IO () -- Block if we are disconnected ensureConnection nats True f = bracketOnError (takeMVar $ natsRuntime nats) (putMVar $ natsRuntime nats) (\r@(handle, _, x1, x2) -> do result <- runAction r case result of Just nqueue -> putMVar (natsRuntime nats) (handle, nqueue, x1, x2) Nothing -> return () ) where -- Connected runAction (handle, queue, True, _) = do nqueue <- f (handle, queue) return $ Just nqueue -- Disconnected, we will wait runAction r@(_, _, False, csig) = do putMVar (natsRuntime nats) r readMVar csig -- Wait for connection to become available ensureConnection nats True f return Nothing -- Do not block if we are disconnected ensureConnection nats False f = modifyMVarMasked_ (natsRuntime nats) runAction where -- Connected, try to send runAction (handle, queue, True, csig) = do nqueue <- f (handle, queue) return (handle, nqueue, True, csig) -- Disconnected, ignore runAction (handle, queue, False, csig) = return (handle, queue, False, csig) -- | Send a message and register callback if possible sendMessage :: Nats -> Bool -> NatsClntMessage -> Maybe (Maybe T.Text -> IO ()) -> IO () sendMessage nats blockIfDisconnected msg mcb | Just cb <- mcb, supportsCallback msg = ensureConnection nats blockIfDisconnected $ \(handle, queue) -> do _sendMessage handle msg return $ D.pushBack queue cb -- Append callback on the callback queue | supportsCallback msg = sendMessage nats blockIfDisconnected msg (Just $ \_ -> return ()) | Just _ <- mcb, not (supportsCallback msg) = error "Callback not supported" | otherwise = ensureConnection nats blockIfDisconnected $ \(handle, queue) -> do _sendMessage handle msg return queue where supportsCallback (NatsClntConnect {}) = True supportsCallback (NatsClntPublish {}) = True supportsCallback (NatsClntSubscribe {}) = True supportsCallback (NatsClntUnsubscribe {}) = True supportsCallback _ = False -- Throw exception if an action does not end in specified time timeoutThrow :: Int -> IO a -> IO a timeoutThrow t f = do res <- timeout t f case res of Just x -> return x Nothing -> throwIO $ NatsException "Reached timeout" _sendMessage :: Handle -> NatsClntMessage -> IO () _sendMessage handle cmsg = timeoutThrow timeoutInterval $ do let msg = makeClntMsg cmsg case () of _| BL.length msg < 1024 -> BS.hPut handle $ BS.concat $ BL.toChunks msg ++ ["\r\n"] | otherwise -> do BL.hPut handle msg BS.hPut handle "\r\n" -- | Do the authentication handshake if necessary authenticate :: Handle -> String -> String -> IO () authenticate handle user password = do info <- BS.hGetLine handle case decodeMessage info of Just (NatsSvrInfo (NatsServerInfo {natsSvrAuthRequired=True}), Nothing) -> do let coptions = defaultConnectionOptions{natsConnUser=user, natsConnPass=password} BL.hPut handle $ makeClntMsg (NatsClntConnect coptions) BS.hPut handle "\r\n" response <- BS.hGetLine handle case decodeMessage response of Just (NatsSvrOK, Nothing) -> return () Just (NatsSvrError err, Nothing)-> throwIO $ NatsException $ "Authentication error: " ++ show err _ -> throwIO $ NatsException "Incorrect server response" Just (NatsSvrInfo _, Nothing) -> return () _ -> throwIO $ NatsException "Incorrect input from server" -- | Open and authenticate a connection prepareConnection :: Nats -> NatsHost -> IO () prepareConnection nats nhost = timeoutThrow timeoutInterval $ bracketOnError (connectToServer (natsHHost nhost) (natsHPort nhost)) hClose (\handle -> do authenticate handle (natsHUser nhost) (natsHPass nhost) csig <- modifyMVar (natsRuntime nats) $ \(_,_,_, csig) -> return ((handle, D.empty, True, undefined), csig) putMVar csig () ) -- | Main thread that reads events from NATS server and reconnects if necessary connectionThread :: Bool -> Nats -> [NatsHost] -- ^ inifinite list of connections to try -> IO () connectionThread _ _ [] = error "Empty list of connections" connectionThread firstTime nats (thisconn:nextconn) = do mnewconnlist <- (connectionHandler firstTime nats thisconn >> return Nothing) -- connectionHandler never returns... `catches` [Handler (\(e :: IOException) -> Just <$> errorHandler e), Handler (\(e :: NatsException) -> Just <$> errorHandler e), Handler (\e -> finalHandler e >> return Nothing)] case mnewconnlist of Nothing -> return () -- Never happens Just newconnlist -> connectionThread False nats newconnlist where finalize :: (Show e) => e -> IO () finalize e = do -- Hide existing connection (handle, queue, _, _) <- takeMVar (natsRuntime nats) finsignal <- newEmptyMVar putMVar (natsRuntime nats) (undefined, undefined, False, finsignal) -- Close network socket hClose handle -- Call appropriate callbacks on unfinished calls FOLD.mapM_ (\f -> f $ Just (T.pack $ show e)) queue -- Call user supplied disconnect (natsOnDisconnect $ natsSettings nats) nats (show e) errorHandler :: (Show e) => e -> IO [NatsHost] errorHandler e = do finalize e tryToConnect nextconn where tryToConnect connlist@(conn:rest) = do res <- (prepareConnection nats conn >> return (Just connlist)) `catches` [ Handler (\(_ :: IOException) -> return Nothing), Handler (\(_ :: NatsException) -> return Nothing) ] case res of Just restlist -> return restlist Nothing -> threadDelay timeoutInterval >> tryToConnect rest tryToConnect [] = error "Empty list of connections" -- Handler for exiting the thread finalHandler :: AsyncException -> IO () finalHandler = finalize pingerThread :: Nats -> IORef (Int, Int) -> IO () pingerThread nats pingStatus = forever $ do threadDelay pingInterval -- todo - kontrola pingu ok <- atomicModifyIORef' pingStatus $ \(pings, pongs) -> ((pings+1, pongs), pings - pongs < 2) unless ok $ throwIO (NatsException "Ping timeouted") sendMessage nats True NatsClntPing Nothing -- | Forever read input from a connection and process it connectionHandler :: Bool -> Nats -> NatsHost -> IO () connectionHandler firstTime nats (NatsHost host port _ _) = do (handle, _, _, _) <- readMVar (natsRuntime nats) -- Subscribe channels that are supposed to be subscribed subscriptions <- readIORef (natsSubMap nats) FOLD.forM_ subscriptions $ \(NatsSubscription subject queue _ sid) -> sendMessage nats True (NatsClntSubscribe subject sid queue) Nothing -- Call user function that we are successfully connected unless firstTime $ (natsOnReconnect $ natsSettings nats) nats (host, port) -- Allocate structures for PING, IORef is probably easiest to manage pingStatus <- newIORef (0, 0) -- Perform the job void $ concurrently (pingerThread nats pingStatus) (connectionHandler' handle nats pingStatus) connectionHandler' :: Handle -> Nats -> IORef (Int, Int) -> IO () connectionHandler' handle nats pingStatus = forever $ do line <- BS.hGetLine handle case decodeMessage line of Just (msg, Nothing) -> handleMessage msg Just (msg@(NatsSvrMsg {}), Just paylen) -> do payload <- BS.hGet handle (paylen + 2) -- +2 = CRLF handleMessage msg{msgText=BS.take paylen payload} _ -> putStrLn $ "Incorrect message: " ++ show line where -- | Pull callback for OK/ERR status from FIFO queue popCb (h, queue, x1, x2) = return ((h, newq, x1, x2), item) where (item, newq) = case D.popFront queue of Just inq -> inq Nothing -> (maybe (return ()) print, D.empty) handleMessage NatsSvrPing = sendMessage nats True NatsClntPong Nothing handleMessage NatsSvrPong = atomicModifyIORef' pingStatus $ \(pings, pongs) -> ((pings, pongs + 1), ()) handleMessage NatsSvrOK = do cb <- modifyMVar (natsRuntime nats) popCb cb Nothing handleMessage (NatsSvrError txt) = do cb <- modifyMVar (natsRuntime nats) popCb cb $ Just txt handleMessage (NatsSvrInfo _) = return () handleMessage (NatsSvrMsg {..}) = do msubscription <- Map.lookup msgSid <$> readIORef (natsSubMap nats) case msubscription of Just subscription -> subCallback subscription msgSid msgSubject (BL.fromChunks [msgText]) msgReply `catch` (\(e :: SomeException) -> print e) -- SID not found in map, force unsubscribe Nothing -> sendMessage nats True (NatsClntUnsubscribe msgSid) Nothing -- | Connect to a NATS server connect :: String -- ^ URI with format: nats:\/\/user:password\@localhost:4222 -> IO Nats connect uri = do let parsedUri = fromMaybe (error ("Error parsing NATS url: " ++ uri)) (URI.parseURI uri) when (URI.uriScheme parsedUri /= "nats:") $ error "Incorrect URL scheme" let (host, port, user, password) = case URI.uriAuthority parsedUri of Just (URI.URIAuth {..}) -> (uriRegName, read $ drop 1 uriPort, takeWhile (/= ':') uriUserInfo, takeWhile (/= '@') $ drop 1 $ dropWhile (/= ':') uriUserInfo ) Nothing -> error "Missing hostname section" connectSettings defaultSettings{ natsHosts=[NatsHost host port user password] } -- | Connect to NATS server using custom settings connectSettings :: NatsSettings -> IO Nats connectSettings settings = do csig <- newEmptyMVar mruntime <- newMVar (undefined, undefined, False, csig) mthreadid <- newEmptyMVar nextsid <- newIORef 1 submap <- newIORef Map.empty let nats = Nats{ natsSettings=settings , natsRuntime=mruntime , natsThreadId=mthreadid , natsNextSid=nextsid , natsSubMap=submap } hosts = natsHosts settings -- Try to connect to all until one succeeds connhost <- tryUntilSuccess hosts $ prepareConnection nats threadid <- forkIO $ connectionThread True nats (connhost : cycle hosts) putMVar mthreadid threadid return nats where tryUntilSuccess [a] f = f a >> return a tryUntilSuccess (a:rest) f = (f a >> return a) `catch` (\(_ :: SomeException) -> tryUntilSuccess rest f) tryUntilSuccess [] _ = error "Empty list" -- | Subscribe to a channel, optionally specifying queue group subscribe :: Nats -> String -- ^ Subject -> Maybe String -- ^ Queue -> MsgCallback -- ^ Callback -> IO NatsSID -- ^ SID of subscription subscribe nats subject queue cb = let ssubject = makeSubject subject squeue = makeSubject `fmap` queue addToSubTable sid = atomicModifyIORef' (natsSubMap nats) $ \submap -> (Map.insert sid NatsSubscription{subSubject=ssubject, subQueue=squeue, subCallback=cb, subSid=sid} submap, ()) in do mvar <- newEmptyMVar :: IO (MVar (Maybe T.Text)) sid <- newNatsSid nats sendMessage nats True (NatsClntSubscribe ssubject sid squeue) $ Just $ \err -> do case err of Nothing -> addToSubTable sid Just _ -> return () putMVar mvar err merr <- takeMVar mvar case merr of Just err -> throwIO $ NatsException $ T.unpack err Nothing -> return sid -- | Unsubscribe from a channel unsubscribe :: Nats -> NatsSID -> IO () unsubscribe nats sid = do -- Remove from internal tables atomicModifyIORef' (natsSubMap nats) $ \ioref -> (Map.delete sid ioref, ()) -- Unsubscribe from server, ignore errors sendMessage nats False (NatsClntUnsubscribe sid) Nothing `catches` [ Handler (\(_ :: IOException) -> return ()), Handler (\(_ :: NatsException) -> return ()) ] -- | Synchronous request/response communication to obtain one message request :: Nats -> String -- ^ Subject -> BL.ByteString -- ^ Request -> IO BL.ByteString -- ^ Response request nats subject body = do mvar <- newEmptyMVar :: IO (MVar (Either String BL.ByteString)) inbox <- newInbox bracket (subscribe nats inbox Nothing $ \_ _ response _ -> void $ tryPutMVar mvar (Right response) ) (unsubscribe nats) (\_ -> do sendMessage nats True (NatsClntPublish (makeSubject subject) (Just $ makeSubject inbox) body) $ Just $ \merr -> case merr of Nothing -> return () Just err -> void $ tryPutMVar mvar (Left $ T.unpack err) result <- takeMVar mvar case result of Left err -> throwIO $ NatsException err Right res -> return res ) -- | Synchronous request/response for obtaining many messages in certain timespan requestMany :: Nats -> String -- ^ Subject -> BL.ByteString -- ^ Body -> Int -- ^ Timeout in microseconds -> IO [BL.ByteString] requestMany nats subject body time = do result <- newIORef [] inbox <- newInbox bracket (subscribe nats inbox Nothing $ \_ _ response _ -> atomicModifyIORef result $ \old -> (response:old, ()) ) (unsubscribe nats) (\_ -> do publish' nats subject (Just inbox) body threadDelay time ) reverse <$> readIORef result -- | Publish a message publish :: Nats -> String -- ^ Subject -> BL.ByteString -- ^ Data -> IO () publish nats subject = publish' nats subject Nothing publish' :: Nats -> String -- ^ Subject -> Maybe String -> BL.ByteString -- ^ Data -> IO () publish' nats subject inbox body = -- Ignore errors - messages can get lost sendMessage nats False (NatsClntPublish (makeSubject subject) (makeSubject <$> inbox) body) Nothing `catches` [ Handler (\(_ :: IOException) -> return ()), Handler (\(_ :: NatsException) -> return ()) ] -- | Disconnect from a NATS server disconnect :: Nats -> IO () disconnect nats = do threadid <- readMVar (natsThreadId nats) killThread threadid