{-# LANGUAGE FlexibleContexts, LambdaCase, OverloadedStrings #-}

{- |
Module      : Pulsar.Consumer
Description : Apache Pulsar client
License     : Apache-2.0
Maintainer  : gabriel.volpe@chatroulette.com
Stability   : experimental

The basic consumer interaction looks as follows: http://pulsar.apache.org/docs/en/develop-binary-protocol/#consumer

>>> LOOKUP
<<< LOOKUP_RESPONSE
>>> SUBSCRIBE
<<< SUCCESS
>>> FLOW 1000
<<< MESSAGE 1
<<< MESSAGE 2
>>> ACK 1
>>> ACK 2

When half of the messages have been consumed from our internal queue (Chan), we ask the broker to send more events and continue processing events.

>>> FLOW 500

When the program finishes, either succesfully or due to a failure, we unsubscribe and close the consumer.

>>> CLOSE_CONSUMER
<<< SUCCESS
-}
module Pulsar.Consumer
  ( Consumer(..)
  , newConsumer
  )
where

import           Control.Concurrent             ( forkIO
                                                , killThread
                                                )
import           Control.Concurrent.Async       ( async )
import           Control.Concurrent.Chan
import           Control.Concurrent.MVar
import           Control.Monad                  ( forever
                                                , when
                                                )
import           Control.Monad.Catch            ( bracket )
import           Control.Monad.IO.Class         ( MonadIO
                                                , liftIO
                                                )
import           Control.Monad.Managed          ( managed
                                                , runManaged
                                                )
import           Control.Monad.Reader           ( MonadReader
                                                , ask
                                                )
import           Data.Foldable                  ( for_ )
import           Data.IORef
import           Data.Functor                   ( void )
import           Lens.Family             hiding ( reset )
import qualified Proto.PulsarApi_Fields        as F
import qualified Pulsar.Core                   as C
import           Pulsar.AppState
import           Pulsar.Connection              ( PulsarCtx(..) )
import           Pulsar.Internal.Logger         ( logResponse )
import           Pulsar.Protocol.Frame          ( Payload(..)
                                                , Response(..)
                                                )
import           Pulsar.Types

{- | An abstract 'Consumer' able to 'fetch' messages and 'ack'nowledge them. -}
data Consumer m = Consumer
  { Consumer m -> m Message
fetch :: m Message   -- ^ Fetches a single message. Blocks if no messages are available.
  , Consumer m -> MsgId -> m ()
ack :: MsgId -> m () -- ^ Acknowledges a single message.
  }

{- | The protocol expects the implementation to use some kind of queue to store events sent by the broker. -}
defaultQueueSize :: Int
defaultQueueSize :: Int
defaultQueueSize = 1000

{- | It keeps track of the size of our internal messages queue . -}
updateQueueSize :: IORef Int -> (Int -> Int) -> IO ()
updateQueueSize :: IORef Int -> (Int -> Int) -> IO ()
updateQueueSize ref :: IORef Int
ref f :: Int -> Int
f = IORef Int -> (Int -> (Int, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef Int
ref (\x :: Int
x -> (Int -> Int
f Int
x, ()))

{- | Create a new 'Consumer' by supplying a 'PulsarCtx' (returned by 'Pulsar.connect'), a 'Topic' and a 'SubscriptionName'. -}
newConsumer
  :: (MonadIO m, MonadIO f, MonadReader PulsarCtx m)
  => Topic
  -> Subscription
  -> m (Consumer f)
newConsumer :: Topic -> Subscription -> m (Consumer f)
newConsumer topic :: Topic
topic sub :: Subscription
sub = do
  (Ctx conn :: Connection
conn app :: IORef AppState
app _) <- m PulsarCtx
forall r (m :: * -> *). MonadReader r m => m r
ask
  Chan Response
chan             <- IO (Chan Response) -> m (Chan Response)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Chan Response)
forall a. IO (Chan a)
newChan
  ConsumerId
cid              <- Chan Response -> IORef AppState -> m ConsumerId
forall (m :: * -> *).
MonadIO m =>
Chan Response -> IORef AppState -> m ConsumerId
mkConsumerId Chan Response
chan IORef AppState
app
  Chan Message
fchan            <- IO (Chan Message) -> m (Chan Message)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Chan Message)
forall a. IO (Chan a)
newChan
  IORef Int
ref              <- IO (IORef Int) -> m (IORef Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef Int) -> m (IORef Int))
-> IO (IORef Int) -> m (IORef Int)
forall a b. (a -> b) -> a -> b
$ Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef 0
  MVar ()
var              <- IO (MVar ()) -> m (MVar ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  let permits :: IO ()
permits = Connection -> ConsumerId -> IO ()
issuePermits Connection
conn ConsumerId
cid
      acquire :: IO ThreadId
acquire = do
        Connection -> ConsumerId -> IORef AppState -> IO ()
mkSubscriber Connection
conn ConsumerId
cid IORef AppState
app
        IO () -> IO ThreadId
forkIO (Chan Response -> Chan Message -> IORef Int -> IO () -> IO ()
forall a b.
Chan Response -> Chan Message -> IORef Int -> IO a -> IO b
fetcher Chan Response
chan Chan Message
fchan IORef Int
ref IO ()
permits)
      release :: ThreadId -> IO ()
release i :: ThreadId
i =
        ThreadId -> IO ()
killThread ThreadId
i IO () -> IO (ReqId, MVar Response) -> IO (ReqId, MVar Response)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IORef AppState -> IO (ReqId, MVar Response)
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> m (ReqId, MVar Response)
newReq IORef AppState
app IO (ReqId, MVar Response)
-> ((ReqId, MVar Response) -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(r :: ReqId
r, v :: MVar Response
v) -> Connection -> MVar Response -> ConsumerId -> ReqId -> IO ()
C.closeConsumer Connection
conn MVar Response
v ConsumerId
cid ReqId
r
      handler :: Managed ()
handler = (forall r. (ThreadId -> IO r) -> IO r) -> Managed ThreadId
forall a. (forall r. (a -> IO r) -> IO r) -> Managed a
managed (IO ThreadId -> (ThreadId -> IO ()) -> (ThreadId -> IO r) -> IO r
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket IO ThreadId
acquire ThreadId -> IO ()
release) Managed ThreadId -> Managed () -> Managed ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> Managed ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (MVar () -> IO ()
forall a. MVar a -> IO a
readMVar MVar ()
var)
  Async ()
worker <- IO (Async ()) -> m (Async ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async ()) -> m (Async ())) -> IO (Async ()) -> m (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (Managed () -> IO ()
runManaged (Managed () -> IO ()) -> Managed () -> IO ()
forall a b. (a -> b) -> a -> b
$ Managed () -> Managed ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void Managed ()
handler)
  IORef AppState -> (Async (), MVar ()) -> m ()
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> (Async (), MVar ()) -> m ()
addWorker IORef AppState
app (Async ()
worker, MVar ()
var)
  Consumer f -> m (Consumer f)
forall (m :: * -> *) a. Monad m => a -> m a
return (Consumer f -> m (Consumer f)) -> Consumer f -> m (Consumer f)
forall a b. (a -> b) -> a -> b
$ f Message -> (MsgId -> f ()) -> Consumer f
forall (m :: * -> *). m Message -> (MsgId -> m ()) -> Consumer m
Consumer (IO Message -> f Message
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Message -> f Message) -> IO Message -> f Message
forall a b. (a -> b) -> a -> b
$ Chan Message -> IO Message
forall a. Chan a -> IO a
readChan Chan Message
fchan) (Connection -> ConsumerId -> MsgId -> f ()
forall (m :: * -> *).
MonadIO m =>
Connection -> ConsumerId -> MsgId -> m ()
acker Connection
conn ConsumerId
cid)
 where
  newReq :: IORef AppState -> m (ReqId, MVar Response)
newReq app :: IORef AppState
app = IORef AppState -> m (ReqId, MVar Response)
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> m (ReqId, MVar Response)
mkRequestId IORef AppState
app
  acker :: Connection -> ConsumerId -> MsgId -> m ()
acker conn :: Connection
conn cid :: ConsumerId
cid (MsgId mid :: MessageIdData
mid) = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Connection -> ConsumerId -> MessageIdData -> IO ()
forall (m :: * -> *).
MonadIO m =>
Connection -> ConsumerId -> MessageIdData -> m ()
C.ack Connection
conn ConsumerId
cid MessageIdData
mid
  issuePermits :: Connection -> ConsumerId -> IO ()
issuePermits conn :: Connection
conn cid :: ConsumerId
cid =
    Connection -> ConsumerId -> Permits -> IO ()
C.flow Connection
conn ConsumerId
cid (Word32 -> Permits
Permits (Word32 -> Permits) -> Word32 -> Permits
forall a b. (a -> b) -> a -> b
$ Int -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
defaultQueueSize Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` 2))
  mkSubscriber :: Connection -> ConsumerId -> IORef AppState -> IO ()
mkSubscriber conn :: Connection
conn cid :: ConsumerId
cid app :: IORef AppState
app = do
    (req1 :: ReqId
req1, var1 :: MVar Response
var1) <- IORef AppState -> IO (ReqId, MVar Response)
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> m (ReqId, MVar Response)
newReq IORef AppState
app
    Connection -> MVar Response -> ReqId -> Topic -> IO ()
C.lookup Connection
conn MVar Response
var1 ReqId
req1 Topic
topic
    (req2 :: ReqId
req2, var2 :: MVar Response
var2) <- IORef AppState -> IO (ReqId, MVar Response)
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> m (ReqId, MVar Response)
newReq IORef AppState
app
    Connection
-> MVar Response
-> ReqId
-> ConsumerId
-> Topic
-> Subscription
-> IO ()
C.newSubscriber Connection
conn MVar Response
var2 ReqId
req2 ConsumerId
cid Topic
topic Subscription
sub
    Connection -> ConsumerId -> Permits -> IO ()
C.flow Connection
conn ConsumerId
cid (Word32 -> Permits
Permits (Word32 -> Permits) -> Word32 -> Permits
forall a b. (a -> b) -> a -> b
$ Int -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
defaultQueueSize)

{- | It reads responses from the main communication channel and whenever it corresponds to a
 - 'PayloadResponse', it creates a 'Message' and it writes it to the fetcher channel, which
 - is the one the 'fetch' function is listening on.
 -
 - It also keeps count of the internal fetcher channel size and issues new permits (FLOW)
 - whenever necessary.
 -}
fetcher :: Chan Response -> Chan Message -> IORef Int -> IO a -> IO b
fetcher :: Chan Response -> Chan Message -> IORef Int -> IO a -> IO b
fetcher chan :: Chan Response
chan fc :: Chan Message
fc ref :: IORef Int
ref f :: IO a
f = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ Chan Response -> IO Response
forall a. Chan a -> IO a
readChan Chan Response
chan IO Response -> (Response -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
  PayloadResponse cmd :: BaseCommand
cmd _ p :: Maybe Payload
p -> Maybe CommandMessage -> (CommandMessage -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (BaseCommand
cmd BaseCommand
-> FoldLike
     (Maybe CommandMessage)
     BaseCommand
     BaseCommand
     (Maybe CommandMessage)
     (Maybe CommandMessage)
-> Maybe CommandMessage
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  (Maybe CommandMessage)
  BaseCommand
  BaseCommand
  (Maybe CommandMessage)
  (Maybe CommandMessage)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'message" a) =>
LensLike' f s a
F.maybe'message) ((CommandMessage -> IO ()) -> IO ())
-> (CommandMessage -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \msg :: CommandMessage
msg -> do
    let msgId :: MessageIdData
msgId = CommandMessage
msg CommandMessage
-> FoldLike
     MessageIdData
     CommandMessage
     CommandMessage
     MessageIdData
     MessageIdData
-> MessageIdData
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  MessageIdData
  CommandMessage
  CommandMessage
  MessageIdData
  MessageIdData
forall (f :: * -> *) s a.
(Functor f, HasField s "messageId" a) =>
LensLike' f s a
F.messageId
        pm :: Message
pm    = MsgId -> ByteString -> Message
Message (MessageIdData -> MsgId
MsgId MessageIdData
msgId) (ByteString -> Message) -> ByteString -> Message
forall a b. (a -> b) -> a -> b
$ ByteString
-> (Payload -> ByteString) -> Maybe Payload -> ByteString
forall b a. b -> (a -> b) -> Maybe a -> b
maybe "" (\(Payload x :: ByteString
x) -> ByteString
x) Maybe Payload
p
        reset :: IO ()
reset = IORef Int -> (Int -> Int) -> IO ()
updateQueueSize IORef Int
ref ((Int
defaultQueueSize Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` 2) Int -> Int -> Int
forall a. Num a => a -> a -> a
-)
    BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse BaseCommand
cmd
    IORef Int -> (Int -> Int) -> IO ()
updateQueueSize IORef Int
ref (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1)
    Int
size <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
ref
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
size Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
defaultQueueSize Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` 2) (IO a
f IO a -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
reset)
    Chan Message -> Message -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan Message
fc Message
pm
  _ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()