{-# LANGUAGE OverloadedStrings #-} -- | -- Module: Network.Nats.Api -- Copyright: (c) 2016 Patrik Sandahl -- License: MIT -- Maintainer: Patrik Sandahl -- Stability: experimental -- Portability: portable -- -- NATS base API as provided by this library. A thin JSON layer is -- added in "Network.Nats.JsonApi". module Network.Nats.Api ( Nats , initNats , termNats , publish , subscribe , subscribeAsync , request , unsubscribe , nextMsg ) where import Control.Concurrent.STM (atomically , newTQueueIO, readTQueue) import Control.Exception (bracket) import Network.Nats.Conduit (Downstream, Upstream, upstreamMessage) import Network.Nats.ConnectionManager ( ConnectionManager , ManagerSettings , startConnectionManager , stopConnectionManager ) import Network.Nats.Dispatcher (Dispatcher, startDispatcher, stopDispatcher) import Network.Nats.Types ( MsgQueue (..), Msg, Sid , Topic, Payload, QueueGroup ) import Network.Nats.Subscriber ( SubscriberMap, newSubscriberMap , addSubscriber, addAsyncSubscriber , removeSubscriber ) import Network.Nats.Message.Message (Message (..)) import Network.URI (URI) import System.Random (randomRIO) import qualified Data.ByteString.Char8 as BS -- | The type of the handle used by the API. To the user this -- type is opaque. The Nats handle is only valid within the scope of -- 'Network.Nats.withNats' function. data Nats = Nats { subscriberMap :: SubscriberMap -- ^ A map to hold 'Topic' subscribers. , connectionManager :: !ConnectionManager -- ^ The 'ConnectionManager'. , downstream :: !Downstream -- ^ The stream of messages from the NATS server to the client. , upstream :: !Upstream -- ^ The stream of messages from the client to the NATS server. , dispatcher :: !Dispatcher -- ^ The 'Dispatcher'. } -- | Setting up of all the necessary resources needed by 'Nats'. -- Suited for use with the style of resource management given by -- 'Control.Exception.bracket'. initNats :: ManagerSettings -> [URI] -> IO Nats initNats config uris = do subscriberMap' <- newSubscriberMap downstream' <- newTQueueIO upstream' <- newTQueueIO manager <- startConnectionManager config upstream' downstream' subscriberMap' uris dispatcher' <- startDispatcher downstream' upstream' subscriberMap' return Nats { subscriberMap = subscriberMap' , connectionManager = manager , downstream = downstream' , upstream = upstream' , dispatcher = dispatcher' } -- | Clean up 'Nats' resource. Used to clean up after 'initNats'. termNats :: Nats -> IO () termNats nats = do stopDispatcher $ dispatcher nats stopConnectionManager $ connectionManager nats -- | Publish some 'Payload' message to a 'Topic'. The NATS server will -- distribute the message to subscribers of the 'Topic'. -- -- > publish nats "GREETINGS" Nothing "Hello, there!" -- -- Will publish the string Hello, there! to subscribers of GREETINGS. No -- reply-to 'Topic' is provided. To request a reply, provide a 'Topic' -- where the subscriber can publish a reply. -- -- > publish nats "GREETINGS" (Just "THANKS") "Hello, there!" publish :: Nats -> Topic -> Maybe Topic -> Payload -> IO () publish nats topic replyTo payload = upstreamMessage (upstream nats) $ PUB topic replyTo payload {-# INLINE publish #-} -- | Subscribe to a 'Topic'. Optionally a subscription can be part of -- a 'QueueGroup'. The function will immediately return with a tuple of -- a 'Sid' for the subscription, and a 'SubQueue' from where messages can -- be fetched using 'nextMsg'. -- -- > (sid, queue) <- subscribe nats "do.stuff" Nothing -- -- Or -- -- > (sid, queue) <- subscribe nats "do.stuff" (Just "stuffworkers") subscribe :: Nats -> Topic -> Maybe QueueGroup -> IO (Sid, MsgQueue) subscribe nats topic queueGroup = do sid <- newSid let msg = SUB topic queueGroup sid subQueue <- addSubscriber (subscriberMap nats) sid msg upstreamMessage (upstream nats) msg return (sid, subQueue) -- | Subscribe to a 'Topic'. Optionally a subscription can be part of -- a 'QueueGroup'. -- -- Subscriptions using this function will be asynchronous, and each -- message will be handled in its own thread. A message handler is -- an IO action taking a 'Msg' as its argument. The function return -- the 'Sid' for the subscription. -- -- > sid <- subscribeAsync nats "do.stuff" Nothing $ \msg -> do -- > -- Do stuff with the msg -- -- Or -- -- > sid <- subscribeAsync nats "do.stuff" Nothing messageHandler -- > -- > messageHandler :: Msg -> IO () -- > messageHandler msg = do -- > -- Do stuff with the msg subscribeAsync :: Nats -> Topic -> Maybe QueueGroup -> (Msg -> IO ()) -> IO Sid subscribeAsync nats topic queueGroup action = do sid <- newSid let msg = SUB topic queueGroup sid addAsyncSubscriber (subscriberMap nats) sid msg action upstreamMessage (upstream nats) msg return sid -- | Request is publishing a 'Payload' to a 'Topic' and waiting for a -- 'Msg'. Request is a blocking operation, but can be interrupted -- by 'System.Timeout.timeout'. -- -- > msg <- request nats "do.stuff" "A little payload" -- -- Or -- -- > maybeMsg <- timeout tmo $ request nats "do.stuff" "A little payload" request :: Nats -> Topic -> Payload -> IO Msg request nats topic payload = do replyTo <- randomReplyTo bracket (subscribe nats replyTo Nothing) (\(sid, _) -> unsubscribe nats sid Nothing) (\(_, queue) -> do publish nats topic (Just replyTo) payload nextMsg queue ) -- | Unsubscribe from a subscription using its 'Sid'. Optionally a limit -- for automatic unsubscription can be given. Unsubscription will happen -- once the number of messages - the limit - has been reached. -- -- > unsubscribe nats sid Nothing -- -- Or -- -- > unsubscribe nats sid (Just 100) unsubscribe :: Nats -> Sid -> Maybe Int -> IO () unsubscribe nats sid limit = do let msg = UNSUB sid limit removeSubscriber (subscriberMap nats) sid upstreamMessage (upstream nats) msg -- | Fetch a new 'Msg' from the 'SubQueue'. Fetching a message is a -- blocking operation, but can be interrupted by 'System.Timeout.timeout'. -- -- > msg <- nextMsg queue -- -- Or -- -- > maybeMsg <- timeout tmo $ nextMsg queue nextMsg :: MsgQueue -> IO Msg nextMsg (MsgQueue queue) = atomically $ readTQueue queue {-# INLINE nextMsg #-} newSid :: IO Sid newSid = randomRIO (0, maxBound) {-# INLINE newSid #-} randomReplyTo :: IO Topic randomReplyTo = do value <- BS.pack . show <$> randomRIO (0, maxBound :: Int) return $ "INBOX." `BS.append` value {-# INLINE randomReplyTo #-}