{-# LANGUAGE FlexibleContexts #-}

{- |
Module      : Pulsar.Producer
Description : Apache Pulsar client
License     : Apache-2.0
Maintainer  : gabriel.volpe@chatroulette.com
Stability   : experimental

The basic producer interaction looks as follows: http://pulsar.apache.org/docs/en/develop-binary-protocol/#producer

>>> LOOKUP
<<< LOOKUP_RESPONSE
>>> PRODUCER
<<< SUCCESS
>>> SEND 1
>>> SEND 2
<<< SEND_RECEIPT 1
<<< SEND_RECEIPT 2

When the program finishes, either succesfully or due to a failure, we close the producer.

>>> CLOSE_PRODUCER
<<< SUCCESS
-}
module Pulsar.Producer where

import           Control.Concurrent.Async       ( async )
import           Control.Monad.Catch            ( bracket_ )
import           Control.Concurrent.MVar
import           Control.Monad.IO.Class         ( MonadIO
                                                , liftIO
                                                )
import           Control.Monad.Managed          ( managed_
                                                , runManaged
                                                )
import           Control.Monad.Reader           ( MonadReader
                                                , ask
                                                )
import           Data.IORef
import           Data.Text                      ( Text )
import           Pulsar.AppState
import qualified Pulsar.Core                   as C
import           Pulsar.Connection              ( PulsarCtx(..) )
import           Pulsar.Types

{- | An abstract 'Producer' able to 'send' messages of type 'PulsarMessage'. -}
newtype Producer m = Producer
  { Producer m -> PulsarMessage -> m ()
send :: PulsarMessage -> m () -- ^ Produces a single message.
  }

data ProducerState = ProducerState
  { ProducerState -> SeqId
stSeqId :: SeqId -- an incremental message sequence counter
  , ProducerState -> Text
stName :: Text   -- a unique name
  }

mkSeqId :: MonadIO m => IORef ProducerState -> m SeqId
mkSeqId :: IORef ProducerState -> m SeqId
mkSeqId ref :: IORef ProducerState
ref = IO SeqId -> m SeqId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO SeqId -> m SeqId) -> IO SeqId -> m SeqId
forall a b. (a -> b) -> a -> b
$ IORef ProducerState
-> (ProducerState -> (ProducerState, SeqId)) -> IO SeqId
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef
  IORef ProducerState
ref
  (\(ProducerState s :: SeqId
s n :: Text
n) -> let s' :: SeqId
s' = SeqId
s SeqId -> SeqId -> SeqId
forall a. Num a => a -> a -> a
+ 1 in (SeqId -> Text -> ProducerState
ProducerState SeqId
s' Text
n, SeqId
s))

{- | Create a new 'Producer' by supplying a 'PulsarCtx' (returned by 'Pulsar.connect') and a 'Topic'. -}
newProducer
  :: (MonadIO m, MonadReader PulsarCtx m, MonadIO f) => Topic -> m (Producer f)
newProducer :: Topic -> m (Producer f)
newProducer topic :: Topic
topic = do
  (Ctx conn :: Connection
conn app :: IORef AppState
app _) <- m PulsarCtx
forall r (m :: * -> *). MonadReader r m => m r
ask
  ProducerId
pid              <- IORef AppState -> m ProducerId
forall (m :: * -> *). MonadIO m => IORef AppState -> m ProducerId
mkProducerId IORef AppState
app
  Text
pname            <- IO Text -> m Text
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Text -> m Text) -> IO Text -> m Text
forall a b. (a -> b) -> a -> b
$ Connection -> ProducerId -> IORef AppState -> IO Text
mkProducer Connection
conn ProducerId
pid IORef AppState
app
  IORef ProducerState
pst              <- IO (IORef ProducerState) -> m (IORef ProducerState)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ProducerState) -> m (IORef ProducerState))
-> IO (IORef ProducerState) -> m (IORef ProducerState)
forall a b. (a -> b) -> a -> b
$ ProducerState -> IO (IORef ProducerState)
forall a. a -> IO (IORef a)
newIORef (SeqId -> Text -> ProducerState
ProducerState 0 Text
pname)
  MVar ()
var              <- IO (MVar ()) -> m (MVar ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  let release :: IO ()
release = IORef AppState -> IO (ReqId, MVar Response)
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> m (ReqId, MVar Response)
newReq IORef AppState
app IO (ReqId, MVar Response)
-> ((ReqId, MVar Response) -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(r :: ReqId
r, v :: MVar Response
v) -> Connection -> MVar Response -> ProducerId -> ReqId -> IO ()
C.closeProducer Connection
conn MVar Response
v ProducerId
pid ReqId
r
      handler :: Managed ()
handler = (forall r. IO r -> IO r) -> Managed ()
managed_ (IO () -> IO () -> IO r -> IO r
forall (m :: * -> *) a c b. MonadMask m => m a -> m c -> m b -> m b
bracket_ (() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) IO ()
release) Managed () -> Managed () -> Managed ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> Managed ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (MVar () -> IO ()
forall a. MVar a -> IO a
readMVar MVar ()
var)
  Async ()
worker <- IO (Async ()) -> m (Async ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async ()) -> m (Async ())) -> IO (Async ()) -> m (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (Managed () -> IO ()
runManaged Managed ()
handler)
  IORef AppState -> (Async (), MVar ()) -> m ()
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> (Async (), MVar ()) -> m ()
addWorker IORef AppState
app (Async ()
worker, MVar ()
var)
  Producer f -> m (Producer f)
forall (m :: * -> *) a. Monad m => a -> m a
return (Producer f -> m (Producer f)) -> Producer f -> m (Producer f)
forall a b. (a -> b) -> a -> b
$ (PulsarMessage -> f ()) -> Producer f
forall (m :: * -> *). (PulsarMessage -> m ()) -> Producer m
Producer (Connection
-> ProducerId
-> IORef AppState
-> IORef ProducerState
-> PulsarMessage
-> f ()
forall (m :: * -> *).
MonadIO m =>
Connection
-> ProducerId
-> IORef AppState
-> IORef ProducerState
-> PulsarMessage
-> m ()
dispatch Connection
conn ProducerId
pid IORef AppState
app IORef ProducerState
pst)
 where
  newReq :: IORef AppState -> m (ReqId, MVar Response)
newReq app :: IORef AppState
app = IORef AppState -> m (ReqId, MVar Response)
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> m (ReqId, MVar Response)
mkRequestId IORef AppState
app
  dispatch :: Connection
-> ProducerId
-> IORef AppState
-> IORef ProducerState
-> PulsarMessage
-> m ()
dispatch conn :: Connection
conn pid :: ProducerId
pid app :: IORef AppState
app pst :: IORef ProducerState
pst msg :: PulsarMessage
msg = do
    SeqId
sid <- IORef ProducerState -> m SeqId
forall (m :: * -> *). MonadIO m => IORef ProducerState -> m SeqId
mkSeqId IORef ProducerState
pst
    MVar Response
var <- IORef AppState -> ProducerId -> SeqId -> m (MVar Response)
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> ProducerId -> SeqId -> m (MVar Response)
registerSeqId IORef AppState
app ProducerId
pid SeqId
sid
    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Connection
-> MVar Response -> ProducerId -> SeqId -> PulsarMessage -> IO ()
C.send Connection
conn MVar Response
var ProducerId
pid SeqId
sid PulsarMessage
msg
  mkProducer :: Connection -> ProducerId -> IORef AppState -> IO Text
mkProducer conn :: Connection
conn pid :: ProducerId
pid app :: IORef AppState
app = do
    (req1 :: ReqId
req1, var1 :: MVar Response
var1) <- IORef AppState -> IO (ReqId, MVar Response)
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> m (ReqId, MVar Response)
newReq IORef AppState
app
    Connection -> MVar Response -> ReqId -> Topic -> IO ()
C.lookup Connection
conn MVar Response
var1 ReqId
req1 Topic
topic
    (req2 :: ReqId
req2, var2 :: MVar Response
var2) <- IORef AppState -> IO (ReqId, MVar Response)
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> m (ReqId, MVar Response)
newReq IORef AppState
app
    Connection
-> MVar Response -> ReqId -> ProducerId -> Topic -> IO Text
C.newProducer Connection
conn MVar Response
var2 ReqId
req2 ProducerId
pid Topic
topic