supernova-0.0.3: Apache Pulsar client for Haskell
LicenseApache-2.0
Maintainergabriel.volpe@chatroulette.com
Stabilityexperimental
Safe HaskellNone
LanguageHaskell2010

Pulsar

Description

In the following example, we will create a quick example showcasing a consumer and producer running concurrently, step by step.

Consider the following imports (needs the async library).

import           Control.Concurrent             ( threadDelay )
import           Control.Concurrent.Async       ( concurrently_ )
import           Pulsar

Firstly, we create a connection to Pulsar, defined as PulsarConnection.

conn :: PulsarConnection
conn = connect defaultConnectData

Then a consumer and a producer, which operate in the Pulsar monad.

pulsar :: Pulsar ()
pulsar = do
  c <- newConsumer topic sub
  p <- newProducer topic
  liftIO $ program c p
 where
  topic = defaultTopic "app"
  sub   = Subscription Exclusive "test-sub"

And the main user program that consume and produce messages concurrently, running in IO.

program :: Consumer IO -> Producer IO -> IO ()
program Consumer {..} Producer {..} =
  let c = fetch >>= (Message i m) -> print m >> ack i >> c
      p = threadDelay (3 * 1000000) >> send "Hello World!" >> p
  in  concurrently_ c p

We have a delay of 3 seconds before publishing to make sure the consumer is already running. Otherwise, it might miss some messages.

Finally, we put it all together and call runPulsar with the connection and the program in the Pulsar monad.

main :: IO ()
main = runPulsar conn pulsar

Since a Pulsar connection, consumers, and producers are long-lived resources, Supernova manages them accordingly for you. Once the program exits, the resources will be released in the respective order (always opposite to the order of acquisition).

Synopsis

Documentation

connect :: (MonadIO m, MonadThrow m, MonadManaged m) => ConnectData -> m PulsarCtx Source #

Starts a Pulsar connection with the supplied ConnectData

defaultConnectData :: ConnectData Source #

Default connection data: "127.0.0.1:6650"

newConsumer :: (MonadIO m, MonadIO f, MonadReader PulsarCtx m) => Topic -> Subscription -> m (Consumer f) Source #

Create a new Consumer by supplying a PulsarCtx (returned by connect), a Topic and a SubscriptionName.

newProducer :: (MonadIO m, MonadReader PulsarCtx m, MonadIO f) => Topic -> m (Producer f) Source #

Create a new Producer by supplying a PulsarCtx (returned by connect) and a Topic.

runPulsar :: PulsarConnection -> Pulsar a -> IO () Source #

Runs a Pulsar computation with default logging to standard output

runPulsar' :: LogOptions -> PulsarConnection -> Pulsar a -> IO () Source #

Runs a Pulsar computation with the supplied logging options

data Consumer m Source #

An abstract Consumer able to fetch messages and acknowledge them.

Constructors

Consumer 

Fields

  • fetch :: m Message

    Fetches a single message. Blocks if no messages are available.

  • ack :: MsgId -> m ()

    Acknowledges a single message.

newtype Producer m Source #

An abstract Producer able to send messages of type PulsarMessage.

Constructors

Producer 

Fields

data Pulsar a Source #

The main Pulsar monad, which abstracts over a ReaderT monad.

Instances

Instances details
Monad Pulsar Source # 
Instance details

Defined in Pulsar.Internal.Core

Methods

(>>=) :: Pulsar a -> (a -> Pulsar b) -> Pulsar b #

(>>) :: Pulsar a -> Pulsar b -> Pulsar b #

return :: a -> Pulsar a #

Functor Pulsar Source # 
Instance details

Defined in Pulsar.Internal.Core

Methods

fmap :: (a -> b) -> Pulsar a -> Pulsar b #

(<$) :: a -> Pulsar b -> Pulsar a #

Applicative Pulsar Source # 
Instance details

Defined in Pulsar.Internal.Core

Methods

pure :: a -> Pulsar a #

(<*>) :: Pulsar (a -> b) -> Pulsar a -> Pulsar b #

liftA2 :: (a -> b -> c) -> Pulsar a -> Pulsar b -> Pulsar c #

(*>) :: Pulsar a -> Pulsar b -> Pulsar b #

(<*) :: Pulsar a -> Pulsar b -> Pulsar a #

MonadIO Pulsar Source # 
Instance details

Defined in Pulsar.Internal.Core

Methods

liftIO :: IO a -> Pulsar a #

type PulsarConnection = Connection PulsarCtx Source #

Alias for Connection PulsarCtx.

data ConnectData Source #

Connection details: host and port.

Constructors

ConnData 

Instances

Instances details
Show ConnectData Source # 
Instance details

Defined in Pulsar.Connection

data LogLevel Source #

Internal logging level, part of LogOptions. Can be used together with runPulsar`.

Constructors

Error 
Warn 
Info 
Debug 

Instances

Instances details
Show LogLevel Source # 
Instance details

Defined in Pulsar.Internal.Core

data LogOptions Source #

Internal logging options. Can be used together with runPulsar`.

Constructors

LogOptions 

Instances

Instances details
Show LogOptions Source # 
Instance details

Defined in Pulsar.Internal.Core

data LogOutput Source #

Internal logging output, part of LogOptions. Can be used together with runPulsar`.

Constructors

StdOut 
File FilePath 

Instances

Instances details
Show LogOutput Source # 
Instance details

Defined in Pulsar.Internal.Core

data Topic Source #

A Topic is in the form "type://tenant/namespace/topic-name", which is what the Show instance does.

Constructors

Topic 

Instances

Instances details
Show Topic Source # 
Instance details

Defined in Pulsar.Types

Methods

showsPrec :: Int -> Topic -> ShowS #

show :: Topic -> String #

showList :: [Topic] -> ShowS #

defaultTopic :: TopicName -> Topic Source #

A default Topic: "non-persistent://public/default/my-topic".

data TopicType Source #

A topic can be either Persistent or NonPersistent.

Constructors

Persistent 
NonPersistent 

Instances

Instances details
Show TopicType Source # 
Instance details

Defined in Pulsar.Types

newtype Tenant Source #

A tenant can be any string value. Default value is "public".

Constructors

Tenant Text 

Instances

Instances details
Show Tenant Source # 
Instance details

Defined in Pulsar.Types

IsString Tenant Source # 
Instance details

Defined in Pulsar.Types

Methods

fromString :: String -> Tenant #

newtype NameSpace Source #

A namespace can be any string value. Default value is "default".

Constructors

NameSpace Text 

Instances

Instances details
Show NameSpace Source # 
Instance details

Defined in Pulsar.Types

IsString NameSpace Source # 
Instance details

Defined in Pulsar.Types

newtype TopicName Source #

A topic name can be any string value.

Constructors

TopicName Text 

Instances

Instances details
Show TopicName Source # 
Instance details

Defined in Pulsar.Types

IsString TopicName Source # 
Instance details

Defined in Pulsar.Types

newtype MsgId Source #

A message id, needed for acknowledging messages. See ack.

Constructors

MsgId MessageIdData 

data Message Source #

A consumed message, containing both MsgId and payload as bytestring.

Constructors

Message MsgId ByteString 

newtype PulsarMessage Source #

A produced message, containing just a payload as bytestring.

Instances

Instances details
Show PulsarMessage Source # 
Instance details

Defined in Pulsar.Types

IsString PulsarMessage Source # 
Instance details

Defined in Pulsar.Types

newtype SubName Source #

A subscription name can be any string value.

Constructors

SubName Text 

Instances

Instances details
Show SubName Source # 
Instance details

Defined in Pulsar.Types

IsString SubName Source # 
Instance details

Defined in Pulsar.Types

Methods

fromString :: String -> SubName #

data SubType Source #

Instances

Instances details
Show SubType Source # 
Instance details

Defined in Pulsar.Types

data Subscription Source #

A subscription with a type and a name.

Instances

Instances details
Show Subscription Source # 
Instance details

Defined in Pulsar.Types