-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Apache Pulsar client for Haskell -- -- Supernova is an Apache Pulsar client that implements the specified TCP -- protocol. @package supernova @version 0.0.3 -- | In the following example, we will create a quick example showcasing a -- consumer and producer running concurrently, step by step. -- -- Consider the following imports (needs the async library). -- --
-- import Control.Concurrent ( threadDelay ) -- import Control.Concurrent.Async ( concurrently_ ) -- import Pulsar ---- -- Firstly, we create a connection to Pulsar, defined as -- PulsarConnection. -- --
-- conn :: PulsarConnection -- conn = connect defaultConnectData ---- -- Then a consumer and a producer, which operate in the Pulsar -- monad. -- --
-- pulsar :: Pulsar () -- pulsar = do -- c <- newConsumer topic sub -- p <- newProducer topic -- liftIO $ program c p -- where -- topic = defaultTopic "app" -- sub = Subscription Exclusive "test-sub" ---- -- And the main user program that consume and produce messages -- concurrently, running in IO. -- --
-- program :: Consumer IO -> Producer IO -> IO ()
-- program Consumer {..} Producer {..} =
-- let c = fetch >>= (Message i m) -> print m >> ack i >> c
-- p = threadDelay (3 * 1000000) >> send "Hello World!" >> p
-- in concurrently_ c p
--
--
-- We have a delay of 3 seconds before publishing to make sure the
-- consumer is already running. Otherwise, it might miss some messages.
--
-- Finally, we put it all together and call runPulsar with the
-- connection and the program in the Pulsar monad.
--
-- -- main :: IO () -- main = runPulsar conn pulsar ---- -- Since a Pulsar connection, consumers, and producers are long-lived -- resources, Supernova manages them accordingly for you. Once the -- program exits, the resources will be released in the respective order -- (always opposite to the order of acquisition). module Pulsar -- | Starts a Pulsar connection with the supplied ConnectData connect :: (MonadIO m, MonadThrow m, MonadManaged m) => ConnectData -> m PulsarCtx -- | Default connection data: "127.0.0.1:6650" defaultConnectData :: ConnectData -- | Create a new Consumer by supplying a PulsarCtx (returned -- by connect), a Topic and a SubscriptionName. newConsumer :: (MonadIO m, MonadIO f, MonadReader PulsarCtx m) => Topic -> Subscription -> m (Consumer f) -- | Create a new Producer by supplying a PulsarCtx (returned -- by connect) and a Topic. newProducer :: (MonadIO m, MonadReader PulsarCtx m, MonadIO f) => Topic -> m (Producer f) -- | Runs a Pulsar computation with default logging to standard output runPulsar :: PulsarConnection -> Pulsar a -> IO () -- | Runs a Pulsar computation with the supplied logging options runPulsar' :: LogOptions -> PulsarConnection -> Pulsar a -> IO () -- | An abstract Consumer able to fetch messages and -- acknowledge them. data Consumer m Consumer :: m Message -> (MsgId -> m ()) -> Consumer m -- | Fetches a single message. Blocks if no messages are available. [fetch] :: Consumer m -> m Message -- | Acknowledges a single message. [ack] :: Consumer m -> MsgId -> m () -- | An abstract Producer able to send messages of type -- PulsarMessage. newtype Producer m Producer :: (PulsarMessage -> m ()) -> Producer m -- | Produces a single message. [send] :: Producer m -> PulsarMessage -> m () -- | The main Pulsar monad, which abstracts over a ReaderT monad. data Pulsar a -- | Alias for Connection PulsarCtx. type PulsarConnection = Connection PulsarCtx -- | Connection details: host and port. data ConnectData ConnData :: HostName -> ServiceName -> ConnectData [connHost] :: ConnectData -> HostName [connPort] :: ConnectData -> ServiceName -- | Internal logging level, part of LogOptions. Can be used -- together with runPulsar`. data LogLevel Error :: LogLevel Warn :: LogLevel Info :: LogLevel Debug :: LogLevel -- | Internal logging options. Can be used together with runPulsar`. data LogOptions LogOptions :: LogLevel -> LogOutput -> LogOptions [logLevel] :: LogOptions -> LogLevel [logOutput] :: LogOptions -> LogOutput -- | Internal logging output, part of LogOptions. Can be used -- together with runPulsar`. data LogOutput StdOut :: LogOutput File :: FilePath -> LogOutput -- | A Topic is in the form "type://tenant/namespace/topic-name", which is -- what the Show instance does. data Topic Topic :: TopicType -> Tenant -> NameSpace -> TopicName -> Topic [type'] :: Topic -> TopicType [tenant] :: Topic -> Tenant [namespace] :: Topic -> NameSpace [name] :: Topic -> TopicName -- | A default Topic: "non-persistent://public/default/my-topic". defaultTopic :: TopicName -> Topic -- | A topic can be either Persistent or NonPersistent. data TopicType Persistent :: TopicType NonPersistent :: TopicType -- | A tenant can be any string value. Default value is "public". newtype Tenant Tenant :: Text -> Tenant -- | A namespace can be any string value. Default value is "default". newtype NameSpace NameSpace :: Text -> NameSpace -- | A topic name can be any string value. newtype TopicName TopicName :: Text -> TopicName -- | A message id, needed for acknowledging messages. See ack. newtype MsgId MsgId :: MessageIdData -> MsgId -- | A consumed message, containing both MsgId and payload as -- bytestring. data Message Message :: MsgId -> ByteString -> Message -- | A produced message, containing just a payload as bytestring. newtype PulsarMessage PulsarMessage :: ByteString -> PulsarMessage -- | A subscription name can be any string value. newtype SubName SubName :: Text -> SubName -- | A subscription type. See -- https://pulsar.apache.org/docs/en/concepts-messaging/#subscriptions -- to learn more. data SubType Exclusive :: SubType Failover :: SubType Shared :: SubType KeyShared :: SubType -- | A subscription with a type and a name. data Subscription Subscription :: SubType -> SubName -> Subscription