License | Apache-2.0 |
---|---|
Maintainer | gabriel.volpe@chatroulette.com |
Stability | experimental |
Safe Haskell | None |
Language | Haskell2010 |
In the following example, we will create a quick example showcasing a consumer and producer running concurrently, step by step.
Consider the following imports (needs the async library).
import Control.Concurrent ( threadDelay ) import Control.Concurrent.Async ( concurrently_ ) import Pulsar
Firstly, we create a connection to Pulsar, defined as PulsarConnection
.
conn :: PulsarConnection conn = connect defaultConnectData
Then a consumer and a producer, which operate in the Pulsar
monad.
pulsar :: Pulsar () pulsar = do c <- newConsumer topic sub p <- newProducer topic liftIO $ program c p where topic = defaultTopic "app" sub = Subscription Exclusive "test-sub"
And the main user program that consume and produce messages concurrently, running in IO
.
program :: Consumer IO -> Producer IO -> IO () program Consumer {..} Producer {..} = let c = fetch >>= (Message i m) -> print m >> ack i >> c p = threadDelay (3 * 1000000) >> send "Hello World!" >> p in concurrently_ c p
We have a delay of 3 seconds before publishing to make sure the consumer is already running. Otherwise, it might miss some messages.
Finally, we put it all together and call runPulsar
with the connection and the program in the Pulsar
monad.
main :: IO () main = runPulsar conn pulsar
Since a Pulsar connection, consumers, and producers are long-lived resources, Supernova manages them accordingly for you. Once the program exits, the resources will be released in the respective order (always opposite to the order of acquisition).
Synopsis
- connect :: (MonadIO m, MonadThrow m, MonadManaged m) => ConnectData -> m PulsarCtx
- defaultConnectData :: ConnectData
- newConsumer :: (MonadIO m, MonadIO f, MonadReader PulsarCtx m) => Topic -> Subscription -> m (Consumer f)
- newProducer :: (MonadIO m, MonadReader PulsarCtx m, MonadIO f) => Topic -> m (Producer f)
- runPulsar :: PulsarConnection -> Pulsar a -> IO ()
- runPulsar' :: LogOptions -> PulsarConnection -> Pulsar a -> IO ()
- data Consumer m = Consumer {}
- newtype Producer m = Producer {
- send :: PulsarMessage -> m ()
- data Pulsar a
- type PulsarConnection = Connection PulsarCtx
- data ConnectData = ConnData {}
- data LogLevel
- data LogOptions = LogOptions {}
- data LogOutput
- data Topic = Topic {}
- defaultTopic :: TopicName -> Topic
- data TopicType
- newtype Tenant = Tenant Text
- newtype NameSpace = NameSpace Text
- newtype TopicName = TopicName Text
- newtype MsgId = MsgId MessageIdData
- data Message = Message MsgId ByteString
- newtype PulsarMessage = PulsarMessage ByteString
- newtype SubName = SubName Text
- data SubType
- data Subscription = Subscription SubType SubName
Documentation
connect :: (MonadIO m, MonadThrow m, MonadManaged m) => ConnectData -> m PulsarCtx Source #
Starts a Pulsar connection with the supplied ConnectData
defaultConnectData :: ConnectData Source #
Default connection data: "127.0.0.1:6650"
newConsumer :: (MonadIO m, MonadIO f, MonadReader PulsarCtx m) => Topic -> Subscription -> m (Consumer f) Source #
newProducer :: (MonadIO m, MonadReader PulsarCtx m, MonadIO f) => Topic -> m (Producer f) Source #
runPulsar :: PulsarConnection -> Pulsar a -> IO () Source #
Runs a Pulsar computation with default logging to standard output
runPulsar' :: LogOptions -> PulsarConnection -> Pulsar a -> IO () Source #
Runs a Pulsar computation with the supplied logging options
An abstract Producer
able to send
messages of type PulsarMessage
.
Producer | |
|
The main Pulsar monad, which abstracts over a ReaderT
monad.
type PulsarConnection = Connection PulsarCtx Source #
Alias for Connection PulsarCtx.
data ConnectData Source #
Connection details: host and port.
Instances
Show ConnectData Source # | |
Defined in Pulsar.Connection showsPrec :: Int -> ConnectData -> ShowS # show :: ConnectData -> String # showList :: [ConnectData] -> ShowS # |
Internal logging level, part of LogOptions
. Can be used together with runPulsar
`.
data LogOptions Source #
Internal logging options. Can be used together with runPulsar
`.
Instances
Show LogOptions Source # | |
Defined in Pulsar.Internal.Core showsPrec :: Int -> LogOptions -> ShowS # show :: LogOptions -> String # showList :: [LogOptions] -> ShowS # |
Internal logging output, part of LogOptions
. Can be used together with runPulsar
`.
A Topic is in the form "type://tenant/namespace/topic-name", which is what the Show
instance does.
defaultTopic :: TopicName -> Topic Source #
A default Topic
: "non-persistent://public/default/my-topic".
A topic can be either Persistent
or NonPersistent
.
A tenant can be any string value. Default value is "public".
A namespace can be any string value. Default value is "default".
A topic name can be any string value.
A consumed message, containing both MsgId
and payload as bytestring.
newtype PulsarMessage Source #
A produced message, containing just a payload as bytestring.
Instances
Show PulsarMessage Source # | |
Defined in Pulsar.Types showsPrec :: Int -> PulsarMessage -> ShowS # show :: PulsarMessage -> String # showList :: [PulsarMessage] -> ShowS # | |
IsString PulsarMessage Source # | |
Defined in Pulsar.Types fromString :: String -> PulsarMessage # |
A subscription name can be any string value.
A subscription type. See https://pulsar.apache.org/docs/en/concepts-messaging/#subscriptions to learn more.
data Subscription Source #
A subscription with a type and a name.
Instances
Show Subscription Source # | |
Defined in Pulsar.Types showsPrec :: Int -> Subscription -> ShowS # show :: Subscription -> String # showList :: [Subscription] -> ShowS # |