{-# LANGUAGE OverloadedStrings #-}
module Pulsar.Core where
import Control.Concurrent.Chan
import Control.Concurrent.MVar
import Control.Exception ( throwIO )
import Control.Monad.Catch ( MonadThrow )
import Control.Monad.IO.Class
import Data.Text ( Text )
import Lens.Family
import Proto.PulsarApi hiding ( Subscription )
import qualified Proto.PulsarApi_Fields as F
import Pulsar.AppState
import Pulsar.Connection
import Pulsar.Internal.Logger
import qualified Pulsar.Protocol.Commands as P
import Pulsar.Protocol.Frame ( Payload(..)
, Response(..)
, getCommand
)
import Pulsar.Types
lookup :: Connection -> MVar Response -> ReqId -> Topic -> IO ()
lookup :: Connection -> MVar Response -> ReqId -> Topic -> IO ()
lookup (Conn s :: Socket
s) var :: MVar Response
var (ReqId req :: Word64
req) topic :: Topic
topic = do
BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Topic -> BaseCommand
P.lookup Word64
req Topic
topic
Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Topic -> BaseCommand
P.lookup Word64
req Topic
topic
MVar Response -> IO Response
forall a. MVar a -> IO a
readMVar MVar Response
var IO Response -> (Response -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Response -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse
newProducer
:: Connection -> MVar Response -> ReqId -> ProducerId -> Topic -> IO Text
newProducer :: Connection
-> MVar Response -> ReqId -> ProducerId -> Topic -> IO Text
newProducer (Conn s :: Socket
s) var :: MVar Response
var (ReqId req :: Word64
req) (PId pid :: Word64
pid) topic :: Topic
topic = do
BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64 -> Topic -> BaseCommand
P.producer Word64
req Word64
pid Topic
topic
Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64 -> Topic -> BaseCommand
P.producer Word64
req Word64
pid Topic
topic
Response
resp <- MVar Response -> IO Response
forall a. MVar a -> IO a
readMVar MVar Response
var
Response -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse Response
resp
case Response -> BaseCommand
getCommand Response
resp BaseCommand
-> FoldLike
(Maybe CommandProducerSuccess)
BaseCommand
BaseCommand
(Maybe CommandProducerSuccess)
(Maybe CommandProducerSuccess)
-> Maybe CommandProducerSuccess
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe CommandProducerSuccess)
BaseCommand
BaseCommand
(Maybe CommandProducerSuccess)
(Maybe CommandProducerSuccess)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'producerSuccess" a) =>
LensLike' f s a
F.maybe'producerSuccess of
Just ps :: CommandProducerSuccess
ps -> Text -> IO Text
forall (m :: * -> *) a. Monad m => a -> m a
return (Text -> IO Text) -> Text -> IO Text
forall a b. (a -> b) -> a -> b
$ CommandProducerSuccess
ps CommandProducerSuccess
-> FoldLike
Text CommandProducerSuccess CommandProducerSuccess Text Text
-> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
Text CommandProducerSuccess CommandProducerSuccess Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "producerName" a) =>
LensLike' f s a
F.producerName
Nothing -> Text -> IO Text
forall (m :: * -> *) a. Monad m => a -> m a
return ""
closeProducer :: Connection -> MVar Response -> ProducerId -> ReqId -> IO ()
closeProducer :: Connection -> MVar Response -> ProducerId -> ReqId -> IO ()
closeProducer (Conn s :: Socket
s) var :: MVar Response
var (PId pid :: Word64
pid) (ReqId req :: Word64
req) = do
BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64 -> BaseCommand
P.closeProducer Word64
req Word64
pid
Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64 -> BaseCommand
P.closeProducer Word64
req Word64
pid
MVar Response -> IO Response
forall a. MVar a -> IO a
readMVar MVar Response
var IO Response -> (Response -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Response -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse
newSubscriber
:: Connection
-> MVar Response
-> ReqId
-> ConsumerId
-> Topic
-> Subscription
-> IO ()
newSubscriber :: Connection
-> MVar Response
-> ReqId
-> ConsumerId
-> Topic
-> Subscription
-> IO ()
newSubscriber (Conn s :: Socket
s) var :: MVar Response
var (ReqId req :: Word64
req) (CId cid :: Word64
cid) topic :: Topic
topic (Subscription stype :: SubType
stype sname :: SubName
sname)
= do
BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64 -> Topic -> SubType -> SubName -> BaseCommand
P.subscribe Word64
req Word64
cid Topic
topic SubType
stype SubName
sname
Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64 -> Topic -> SubType -> SubName -> BaseCommand
P.subscribe Word64
req Word64
cid Topic
topic SubType
stype SubName
sname
MVar Response -> IO Response
forall a. MVar a -> IO a
readMVar MVar Response
var IO Response -> (Response -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Response -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse
flow :: Connection -> ConsumerId -> Permits -> IO ()
flow :: Connection -> ConsumerId -> Permits -> IO ()
flow (Conn s :: Socket
s) (CId cid :: Word64
cid) (Permits p :: Word32
p) = do
BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word32 -> BaseCommand
P.flow Word64
cid Word32
p
Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word32 -> BaseCommand
P.flow Word64
cid Word32
p
ack :: MonadIO m => Connection -> ConsumerId -> MessageIdData -> m ()
ack :: Connection -> ConsumerId -> MessageIdData -> m ()
ack (Conn s :: Socket
s) (CId cid :: Word64
cid) msgId :: MessageIdData
msgId = do
BaseCommand -> m ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest (BaseCommand -> m ()) -> BaseCommand -> m ()
forall a b. (a -> b) -> a -> b
$ Word64 -> MessageIdData -> BaseCommand
P.ack Word64
cid MessageIdData
msgId
Socket -> BaseCommand -> m ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s (BaseCommand -> m ()) -> BaseCommand -> m ()
forall a b. (a -> b) -> a -> b
$ Word64 -> MessageIdData -> BaseCommand
P.ack Word64
cid MessageIdData
msgId
closeConsumer :: Connection -> MVar Response -> ConsumerId -> ReqId -> IO ()
closeConsumer :: Connection -> MVar Response -> ConsumerId -> ReqId -> IO ()
closeConsumer (Conn s :: Socket
s) var :: MVar Response
var (CId cid :: Word64
cid) (ReqId req :: Word64
req) = do
BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64 -> BaseCommand
P.closeConsumer Word64
req Word64
cid
Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64 -> BaseCommand
P.closeConsumer Word64
req Word64
cid
MVar Response -> IO Response
forall a. MVar a -> IO a
readMVar MVar Response
var IO Response -> (Response -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Response -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse
ping :: (MonadThrow m, MonadIO m) => Connection -> Chan Response -> m ()
ping :: Connection -> Chan Response -> m ()
ping (Conn s :: Socket
s) chan :: Chan Response
chan = do
BaseCommand -> m ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest BaseCommand
P.ping
Socket -> BaseCommand -> m ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s BaseCommand
P.ping
BaseCommand
cmd <- Response -> BaseCommand
getCommand (Response -> BaseCommand) -> m Response -> m BaseCommand
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO Response -> m Response
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Chan Response -> IO Response
forall a. Chan a -> IO a
readChan Chan Response
chan)
case BaseCommand
cmd BaseCommand
-> FoldLike
(Maybe CommandPong)
BaseCommand
BaseCommand
(Maybe CommandPong)
(Maybe CommandPong)
-> Maybe CommandPong
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe CommandPong)
BaseCommand
BaseCommand
(Maybe CommandPong)
(Maybe CommandPong)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'pong" a) =>
LensLike' f s a
F.maybe'pong of
Just p :: CommandPong
p -> CommandPong -> m ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse CommandPong
p
Nothing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (IOError -> IO ()) -> IOError -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IOError -> IO ()
forall e a. Exception e => e -> IO a
throwIO (IOError -> m ()) -> IOError -> m ()
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError "Failed to get PONG"
pong :: MonadIO m => Connection -> m ()
pong :: Connection -> m ()
pong (Conn s :: Socket
s) = do
BaseCommand -> m ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest BaseCommand
P.pong
Socket -> BaseCommand -> m ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s BaseCommand
P.pong
send
:: Connection
-> MVar Response
-> ProducerId
-> SeqId
-> PulsarMessage
-> IO ()
send :: Connection
-> MVar Response -> ProducerId -> SeqId -> PulsarMessage -> IO ()
send (Conn s :: Socket
s) var :: MVar Response
var (PId pid :: Word64
pid) (SeqId sid :: Word64
sid) (PulsarMessage msg :: ByteString
msg) = do
BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64 -> BaseCommand
P.send Word64
pid Word64
sid
Socket -> BaseCommand -> MessageMetadata -> Maybe Payload -> IO ()
forall (m :: * -> *).
MonadIO m =>
Socket -> BaseCommand -> MessageMetadata -> Maybe Payload -> m ()
sendPayloadCmd Socket
s (Word64 -> Word64 -> BaseCommand
P.send Word64
pid Word64
sid) MessageMetadata
P.messageMetadata (Payload -> Maybe Payload
forall a. a -> Maybe a
Just (Payload -> Maybe Payload) -> Payload -> Maybe Payload
forall a b. (a -> b) -> a -> b
$ ByteString -> Payload
Payload ByteString
msg)
MVar Response -> IO Response
forall a. MVar a -> IO a
readMVar MVar Response
var IO Response -> (Response -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Response -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse