{-# LANGUAGE FlexibleContexts #-}
module Pulsar.Producer where
import Control.Concurrent.Async ( async )
import Control.Monad.Catch ( bracket_ )
import Control.Concurrent.MVar
import Control.Monad.IO.Class ( MonadIO
, liftIO
)
import Control.Monad.Managed ( managed_
, runManaged
)
import Control.Monad.Reader ( MonadReader
, ask
)
import Data.IORef
import Data.Text ( Text )
import Pulsar.AppState
import qualified Pulsar.Core as C
import Pulsar.Connection ( PulsarCtx(..) )
import Pulsar.Types
newtype Producer m = Producer
{ Producer m -> PulsarMessage -> m ()
send :: PulsarMessage -> m ()
}
data ProducerState = ProducerState
{ ProducerState -> SeqId
stSeqId :: SeqId
, ProducerState -> Text
stName :: Text
}
mkSeqId :: MonadIO m => IORef ProducerState -> m SeqId
mkSeqId :: IORef ProducerState -> m SeqId
mkSeqId ref :: IORef ProducerState
ref = IO SeqId -> m SeqId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO SeqId -> m SeqId) -> IO SeqId -> m SeqId
forall a b. (a -> b) -> a -> b
$ IORef ProducerState
-> (ProducerState -> (ProducerState, SeqId)) -> IO SeqId
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef
IORef ProducerState
ref
(\(ProducerState s :: SeqId
s n :: Text
n) -> let s' :: SeqId
s' = SeqId
s SeqId -> SeqId -> SeqId
forall a. Num a => a -> a -> a
+ 1 in (SeqId -> Text -> ProducerState
ProducerState SeqId
s' Text
n, SeqId
s))
newProducer
:: (MonadIO m, MonadReader PulsarCtx m, MonadIO f) => Topic -> m (Producer f)
newProducer :: Topic -> m (Producer f)
newProducer topic :: Topic
topic = do
(Ctx conn :: Connection
conn app :: IORef AppState
app _) <- m PulsarCtx
forall r (m :: * -> *). MonadReader r m => m r
ask
ProducerId
pid <- IORef AppState -> m ProducerId
forall (m :: * -> *). MonadIO m => IORef AppState -> m ProducerId
mkProducerId IORef AppState
app
Text
pname <- IO Text -> m Text
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Text -> m Text) -> IO Text -> m Text
forall a b. (a -> b) -> a -> b
$ Connection -> ProducerId -> IORef AppState -> IO Text
mkProducer Connection
conn ProducerId
pid IORef AppState
app
IORef ProducerState
pst <- IO (IORef ProducerState) -> m (IORef ProducerState)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ProducerState) -> m (IORef ProducerState))
-> IO (IORef ProducerState) -> m (IORef ProducerState)
forall a b. (a -> b) -> a -> b
$ ProducerState -> IO (IORef ProducerState)
forall a. a -> IO (IORef a)
newIORef (SeqId -> Text -> ProducerState
ProducerState 0 Text
pname)
MVar ()
var <- IO (MVar ()) -> m (MVar ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
let release :: IO ()
release = IORef AppState -> IO (ReqId, MVar Response)
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> m (ReqId, MVar Response)
newReq IORef AppState
app IO (ReqId, MVar Response)
-> ((ReqId, MVar Response) -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(r :: ReqId
r, v :: MVar Response
v) -> Connection -> MVar Response -> ProducerId -> ReqId -> IO ()
C.closeProducer Connection
conn MVar Response
v ProducerId
pid ReqId
r
handler :: Managed ()
handler = (forall r. IO r -> IO r) -> Managed ()
managed_ (IO () -> IO () -> IO r -> IO r
forall (m :: * -> *) a c b. MonadMask m => m a -> m c -> m b -> m b
bracket_ (() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) IO ()
release) Managed () -> Managed () -> Managed ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> Managed ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (MVar () -> IO ()
forall a. MVar a -> IO a
readMVar MVar ()
var)
Async ()
worker <- IO (Async ()) -> m (Async ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async ()) -> m (Async ())) -> IO (Async ()) -> m (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (Managed () -> IO ()
runManaged Managed ()
handler)
IORef AppState -> (Async (), MVar ()) -> m ()
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> (Async (), MVar ()) -> m ()
addWorker IORef AppState
app (Async ()
worker, MVar ()
var)
Producer f -> m (Producer f)
forall (m :: * -> *) a. Monad m => a -> m a
return (Producer f -> m (Producer f)) -> Producer f -> m (Producer f)
forall a b. (a -> b) -> a -> b
$ (PulsarMessage -> f ()) -> Producer f
forall (m :: * -> *). (PulsarMessage -> m ()) -> Producer m
Producer (Connection
-> ProducerId
-> IORef AppState
-> IORef ProducerState
-> PulsarMessage
-> f ()
forall (m :: * -> *).
MonadIO m =>
Connection
-> ProducerId
-> IORef AppState
-> IORef ProducerState
-> PulsarMessage
-> m ()
dispatch Connection
conn ProducerId
pid IORef AppState
app IORef ProducerState
pst)
where
newReq :: IORef AppState -> m (ReqId, MVar Response)
newReq app :: IORef AppState
app = IORef AppState -> m (ReqId, MVar Response)
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> m (ReqId, MVar Response)
mkRequestId IORef AppState
app
dispatch :: Connection
-> ProducerId
-> IORef AppState
-> IORef ProducerState
-> PulsarMessage
-> m ()
dispatch conn :: Connection
conn pid :: ProducerId
pid app :: IORef AppState
app pst :: IORef ProducerState
pst msg :: PulsarMessage
msg = do
SeqId
sid <- IORef ProducerState -> m SeqId
forall (m :: * -> *). MonadIO m => IORef ProducerState -> m SeqId
mkSeqId IORef ProducerState
pst
MVar Response
var <- IORef AppState -> ProducerId -> SeqId -> m (MVar Response)
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> ProducerId -> SeqId -> m (MVar Response)
registerSeqId IORef AppState
app ProducerId
pid SeqId
sid
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Connection
-> MVar Response -> ProducerId -> SeqId -> PulsarMessage -> IO ()
C.send Connection
conn MVar Response
var ProducerId
pid SeqId
sid PulsarMessage
msg
mkProducer :: Connection -> ProducerId -> IORef AppState -> IO Text
mkProducer conn :: Connection
conn pid :: ProducerId
pid app :: IORef AppState
app = do
(req1 :: ReqId
req1, var1 :: MVar Response
var1) <- IORef AppState -> IO (ReqId, MVar Response)
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> m (ReqId, MVar Response)
newReq IORef AppState
app
Connection -> MVar Response -> ReqId -> Topic -> IO ()
C.lookup Connection
conn MVar Response
var1 ReqId
req1 Topic
topic
(req2 :: ReqId
req2, var2 :: MVar Response
var2) <- IORef AppState -> IO (ReqId, MVar Response)
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> m (ReqId, MVar Response)
newReq IORef AppState
app
Connection
-> MVar Response -> ReqId -> ProducerId -> Topic -> IO Text
C.newProducer Connection
conn MVar Response
var2 ReqId
req2 ProducerId
pid Topic
topic