{-# LANGUAGE OverloadedStrings #-}

{- Defines a set of transactional commands, communicating via internal channels -}
module Pulsar.Core where

import           Control.Concurrent.Chan
import           Control.Concurrent.MVar
import           Control.Exception              ( throwIO )
import           Control.Monad.Catch            ( MonadThrow )
import           Control.Monad.IO.Class
import           Data.Text                      ( Text )
import           Lens.Family
import           Proto.PulsarApi         hiding ( Subscription )
import qualified Proto.PulsarApi_Fields        as F
import           Pulsar.AppState
import           Pulsar.Connection
import           Pulsar.Internal.Logger
import qualified Pulsar.Protocol.Commands      as P
import           Pulsar.Protocol.Frame          ( Payload(..)
                                                , Response(..)
                                                , getCommand
                                                )
import           Pulsar.Types

------ Simple commands ------

lookup :: Connection -> MVar Response -> ReqId -> Topic -> IO ()
lookup :: Connection -> MVar Response -> ReqId -> Topic -> IO ()
lookup (Conn s :: Socket
s) var :: MVar Response
var (ReqId req :: Word64
req) topic :: Topic
topic = do
  BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Topic -> BaseCommand
P.lookup Word64
req Topic
topic
  Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Topic -> BaseCommand
P.lookup Word64
req Topic
topic
  -- TODO: we need to analyze it and might need to re-issue another lookup
  MVar Response -> IO Response
forall a. MVar a -> IO a
readMVar MVar Response
var IO Response -> (Response -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Response -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse

newProducer
  :: Connection -> MVar Response -> ReqId -> ProducerId -> Topic -> IO Text
newProducer :: Connection
-> MVar Response -> ReqId -> ProducerId -> Topic -> IO Text
newProducer (Conn s :: Socket
s) var :: MVar Response
var (ReqId req :: Word64
req) (PId pid :: Word64
pid) topic :: Topic
topic = do
  BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64 -> Topic -> BaseCommand
P.producer Word64
req Word64
pid Topic
topic
  Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64 -> Topic -> BaseCommand
P.producer Word64
req Word64
pid Topic
topic
  Response
resp <- MVar Response -> IO Response
forall a. MVar a -> IO a
readMVar MVar Response
var
  Response -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse Response
resp
  case Response -> BaseCommand
getCommand Response
resp BaseCommand
-> FoldLike
     (Maybe CommandProducerSuccess)
     BaseCommand
     BaseCommand
     (Maybe CommandProducerSuccess)
     (Maybe CommandProducerSuccess)
-> Maybe CommandProducerSuccess
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  (Maybe CommandProducerSuccess)
  BaseCommand
  BaseCommand
  (Maybe CommandProducerSuccess)
  (Maybe CommandProducerSuccess)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'producerSuccess" a) =>
LensLike' f s a
F.maybe'producerSuccess of
    Just ps :: CommandProducerSuccess
ps -> Text -> IO Text
forall (m :: * -> *) a. Monad m => a -> m a
return (Text -> IO Text) -> Text -> IO Text
forall a b. (a -> b) -> a -> b
$ CommandProducerSuccess
ps CommandProducerSuccess
-> FoldLike
     Text CommandProducerSuccess CommandProducerSuccess Text Text
-> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  Text CommandProducerSuccess CommandProducerSuccess Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "producerName" a) =>
LensLike' f s a
F.producerName
    Nothing -> Text -> IO Text
forall (m :: * -> *) a. Monad m => a -> m a
return ""

closeProducer :: Connection -> MVar Response -> ProducerId -> ReqId -> IO ()
closeProducer :: Connection -> MVar Response -> ProducerId -> ReqId -> IO ()
closeProducer (Conn s :: Socket
s) var :: MVar Response
var (PId pid :: Word64
pid) (ReqId req :: Word64
req) = do
  BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64 -> BaseCommand
P.closeProducer Word64
req Word64
pid
  Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64 -> BaseCommand
P.closeProducer Word64
req Word64
pid
  MVar Response -> IO Response
forall a. MVar a -> IO a
readMVar MVar Response
var IO Response -> (Response -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Response -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse

newSubscriber
  :: Connection
  -> MVar Response
  -> ReqId
  -> ConsumerId
  -> Topic
  -> Subscription
  -> IO ()
newSubscriber :: Connection
-> MVar Response
-> ReqId
-> ConsumerId
-> Topic
-> Subscription
-> IO ()
newSubscriber (Conn s :: Socket
s) var :: MVar Response
var (ReqId req :: Word64
req) (CId cid :: Word64
cid) topic :: Topic
topic (Subscription stype :: SubType
stype sname :: SubName
sname)
  = do
    BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64 -> Topic -> SubType -> SubName -> BaseCommand
P.subscribe Word64
req Word64
cid Topic
topic SubType
stype SubName
sname
    Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64 -> Topic -> SubType -> SubName -> BaseCommand
P.subscribe Word64
req Word64
cid Topic
topic SubType
stype SubName
sname
    MVar Response -> IO Response
forall a. MVar a -> IO a
readMVar MVar Response
var IO Response -> (Response -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Response -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse

flow :: Connection -> ConsumerId -> Permits -> IO ()
flow :: Connection -> ConsumerId -> Permits -> IO ()
flow (Conn s :: Socket
s) (CId cid :: Word64
cid) (Permits p :: Word32
p) = do
  BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word32 -> BaseCommand
P.flow Word64
cid Word32
p
  Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word32 -> BaseCommand
P.flow Word64
cid Word32
p

ack :: MonadIO m => Connection -> ConsumerId -> MessageIdData -> m ()
ack :: Connection -> ConsumerId -> MessageIdData -> m ()
ack (Conn s :: Socket
s) (CId cid :: Word64
cid) msgId :: MessageIdData
msgId = do
  BaseCommand -> m ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest (BaseCommand -> m ()) -> BaseCommand -> m ()
forall a b. (a -> b) -> a -> b
$ Word64 -> MessageIdData -> BaseCommand
P.ack Word64
cid MessageIdData
msgId
  Socket -> BaseCommand -> m ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s (BaseCommand -> m ()) -> BaseCommand -> m ()
forall a b. (a -> b) -> a -> b
$ Word64 -> MessageIdData -> BaseCommand
P.ack Word64
cid MessageIdData
msgId

closeConsumer :: Connection -> MVar Response -> ConsumerId -> ReqId -> IO ()
closeConsumer :: Connection -> MVar Response -> ConsumerId -> ReqId -> IO ()
closeConsumer (Conn s :: Socket
s) var :: MVar Response
var (CId cid :: Word64
cid) (ReqId req :: Word64
req) = do
  BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64 -> BaseCommand
P.closeConsumer Word64
req Word64
cid
  Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64 -> BaseCommand
P.closeConsumer Word64
req Word64
cid
  MVar Response -> IO Response
forall a. MVar a -> IO a
readMVar MVar Response
var IO Response -> (Response -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Response -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse

------ Keep Alive -------

ping :: (MonadThrow m, MonadIO m) => Connection -> Chan Response -> m ()
ping :: Connection -> Chan Response -> m ()
ping (Conn s :: Socket
s) chan :: Chan Response
chan = do
  BaseCommand -> m ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest BaseCommand
P.ping
  Socket -> BaseCommand -> m ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s BaseCommand
P.ping
  BaseCommand
cmd <- Response -> BaseCommand
getCommand (Response -> BaseCommand) -> m Response -> m BaseCommand
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO Response -> m Response
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Chan Response -> IO Response
forall a. Chan a -> IO a
readChan Chan Response
chan)
  case BaseCommand
cmd BaseCommand
-> FoldLike
     (Maybe CommandPong)
     BaseCommand
     BaseCommand
     (Maybe CommandPong)
     (Maybe CommandPong)
-> Maybe CommandPong
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  (Maybe CommandPong)
  BaseCommand
  BaseCommand
  (Maybe CommandPong)
  (Maybe CommandPong)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'pong" a) =>
LensLike' f s a
F.maybe'pong of
    Just p :: CommandPong
p  -> CommandPong -> m ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse CommandPong
p
    Nothing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (IOError -> IO ()) -> IOError -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IOError -> IO ()
forall e a. Exception e => e -> IO a
throwIO (IOError -> m ()) -> IOError -> m ()
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError "Failed to get PONG"

pong :: MonadIO m => Connection -> m ()
pong :: Connection -> m ()
pong (Conn s :: Socket
s) = do
  BaseCommand -> m ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest BaseCommand
P.pong
  Socket -> BaseCommand -> m ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s BaseCommand
P.pong

------ Payload commands ------

send
  :: Connection
  -> MVar Response
  -> ProducerId
  -> SeqId
  -> PulsarMessage
  -> IO ()
send :: Connection
-> MVar Response -> ProducerId -> SeqId -> PulsarMessage -> IO ()
send (Conn s :: Socket
s) var :: MVar Response
var (PId pid :: Word64
pid) (SeqId sid :: Word64
sid) (PulsarMessage msg :: ByteString
msg) = do
  BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64 -> BaseCommand
P.send Word64
pid Word64
sid
  Socket -> BaseCommand -> MessageMetadata -> Maybe Payload -> IO ()
forall (m :: * -> *).
MonadIO m =>
Socket -> BaseCommand -> MessageMetadata -> Maybe Payload -> m ()
sendPayloadCmd Socket
s (Word64 -> Word64 -> BaseCommand
P.send Word64
pid Word64
sid) MessageMetadata
P.messageMetadata (Payload -> Maybe Payload
forall a. a -> Maybe a
Just (Payload -> Maybe Payload) -> Payload -> Maybe Payload
forall a b. (a -> b) -> a -> b
$ ByteString -> Payload
Payload ByteString
msg)
  MVar Response -> IO Response
forall a. MVar a -> IO a
readMVar MVar Response
var IO Response -> (Response -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Response -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse