{-# LANGUAGE FlexibleContexts, LambdaCase, OverloadedStrings #-}
module Pulsar.Consumer
( Consumer(..)
, newConsumer
)
where
import Control.Concurrent ( forkIO
, killThread
)
import Control.Concurrent.Async ( async )
import Control.Concurrent.Chan
import Control.Concurrent.MVar
import Control.Monad ( forever
, when
)
import Control.Monad.Catch ( bracket )
import Control.Monad.IO.Class ( MonadIO
, liftIO
)
import Control.Monad.Managed ( managed
, runManaged
)
import Control.Monad.Reader ( MonadReader
, ask
)
import Data.Foldable ( for_ )
import Data.IORef
import Data.Functor ( void )
import Lens.Family hiding ( reset )
import qualified Proto.PulsarApi_Fields as F
import qualified Pulsar.Core as C
import Pulsar.AppState
import Pulsar.Connection ( PulsarCtx(..) )
import Pulsar.Internal.Logger ( logResponse )
import Pulsar.Protocol.Frame ( Payload(..)
, Response(..)
)
import Pulsar.Types
data Consumer m = Consumer
{ Consumer m -> m Message
fetch :: m Message
, Consumer m -> MsgId -> m ()
ack :: MsgId -> m ()
}
defaultQueueSize :: Int
defaultQueueSize :: Int
defaultQueueSize = 1000
updateQueueSize :: IORef Int -> (Int -> Int) -> IO ()
updateQueueSize :: IORef Int -> (Int -> Int) -> IO ()
updateQueueSize ref :: IORef Int
ref f :: Int -> Int
f = IORef Int -> (Int -> (Int, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef Int
ref (\x :: Int
x -> (Int -> Int
f Int
x, ()))
newConsumer
:: (MonadIO m, MonadIO f, MonadReader PulsarCtx m)
=> Topic
-> Subscription
-> m (Consumer f)
newConsumer :: Topic -> Subscription -> m (Consumer f)
newConsumer topic :: Topic
topic sub :: Subscription
sub = do
(Ctx conn :: Connection
conn app :: IORef AppState
app _) <- m PulsarCtx
forall r (m :: * -> *). MonadReader r m => m r
ask
Chan Response
chan <- IO (Chan Response) -> m (Chan Response)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Chan Response)
forall a. IO (Chan a)
newChan
ConsumerId
cid <- Chan Response -> IORef AppState -> m ConsumerId
forall (m :: * -> *).
MonadIO m =>
Chan Response -> IORef AppState -> m ConsumerId
mkConsumerId Chan Response
chan IORef AppState
app
Chan Message
fchan <- IO (Chan Message) -> m (Chan Message)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Chan Message)
forall a. IO (Chan a)
newChan
IORef Int
ref <- IO (IORef Int) -> m (IORef Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef Int) -> m (IORef Int))
-> IO (IORef Int) -> m (IORef Int)
forall a b. (a -> b) -> a -> b
$ Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef 0
MVar ()
var <- IO (MVar ()) -> m (MVar ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
let permits :: IO ()
permits = Connection -> ConsumerId -> IO ()
issuePermits Connection
conn ConsumerId
cid
acquire :: IO ThreadId
acquire = do
Connection -> ConsumerId -> IORef AppState -> IO ()
mkSubscriber Connection
conn ConsumerId
cid IORef AppState
app
IO () -> IO ThreadId
forkIO (Chan Response -> Chan Message -> IORef Int -> IO () -> IO ()
forall a b.
Chan Response -> Chan Message -> IORef Int -> IO a -> IO b
fetcher Chan Response
chan Chan Message
fchan IORef Int
ref IO ()
permits)
release :: ThreadId -> IO ()
release i :: ThreadId
i =
ThreadId -> IO ()
killThread ThreadId
i IO () -> IO (ReqId, MVar Response) -> IO (ReqId, MVar Response)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IORef AppState -> IO (ReqId, MVar Response)
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> m (ReqId, MVar Response)
newReq IORef AppState
app IO (ReqId, MVar Response)
-> ((ReqId, MVar Response) -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(r :: ReqId
r, v :: MVar Response
v) -> Connection -> MVar Response -> ConsumerId -> ReqId -> IO ()
C.closeConsumer Connection
conn MVar Response
v ConsumerId
cid ReqId
r
handler :: Managed ()
handler = (forall r. (ThreadId -> IO r) -> IO r) -> Managed ThreadId
forall a. (forall r. (a -> IO r) -> IO r) -> Managed a
managed (IO ThreadId -> (ThreadId -> IO ()) -> (ThreadId -> IO r) -> IO r
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket IO ThreadId
acquire ThreadId -> IO ()
release) Managed ThreadId -> Managed () -> Managed ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> Managed ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (MVar () -> IO ()
forall a. MVar a -> IO a
readMVar MVar ()
var)
Async ()
worker <- IO (Async ()) -> m (Async ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async ()) -> m (Async ())) -> IO (Async ()) -> m (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (Managed () -> IO ()
runManaged (Managed () -> IO ()) -> Managed () -> IO ()
forall a b. (a -> b) -> a -> b
$ Managed () -> Managed ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void Managed ()
handler)
IORef AppState -> (Async (), MVar ()) -> m ()
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> (Async (), MVar ()) -> m ()
addWorker IORef AppState
app (Async ()
worker, MVar ()
var)
Consumer f -> m (Consumer f)
forall (m :: * -> *) a. Monad m => a -> m a
return (Consumer f -> m (Consumer f)) -> Consumer f -> m (Consumer f)
forall a b. (a -> b) -> a -> b
$ f Message -> (MsgId -> f ()) -> Consumer f
forall (m :: * -> *). m Message -> (MsgId -> m ()) -> Consumer m
Consumer (IO Message -> f Message
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Message -> f Message) -> IO Message -> f Message
forall a b. (a -> b) -> a -> b
$ Chan Message -> IO Message
forall a. Chan a -> IO a
readChan Chan Message
fchan) (Connection -> ConsumerId -> MsgId -> f ()
forall (m :: * -> *).
MonadIO m =>
Connection -> ConsumerId -> MsgId -> m ()
acker Connection
conn ConsumerId
cid)
where
newReq :: IORef AppState -> m (ReqId, MVar Response)
newReq app :: IORef AppState
app = IORef AppState -> m (ReqId, MVar Response)
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> m (ReqId, MVar Response)
mkRequestId IORef AppState
app
acker :: Connection -> ConsumerId -> MsgId -> m ()
acker conn :: Connection
conn cid :: ConsumerId
cid (MsgId mid :: MessageIdData
mid) = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Connection -> ConsumerId -> MessageIdData -> IO ()
forall (m :: * -> *).
MonadIO m =>
Connection -> ConsumerId -> MessageIdData -> m ()
C.ack Connection
conn ConsumerId
cid MessageIdData
mid
issuePermits :: Connection -> ConsumerId -> IO ()
issuePermits conn :: Connection
conn cid :: ConsumerId
cid =
Connection -> ConsumerId -> Permits -> IO ()
C.flow Connection
conn ConsumerId
cid (Word32 -> Permits
Permits (Word32 -> Permits) -> Word32 -> Permits
forall a b. (a -> b) -> a -> b
$ Int -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
defaultQueueSize Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` 2))
mkSubscriber :: Connection -> ConsumerId -> IORef AppState -> IO ()
mkSubscriber conn :: Connection
conn cid :: ConsumerId
cid app :: IORef AppState
app = do
(req1 :: ReqId
req1, var1 :: MVar Response
var1) <- IORef AppState -> IO (ReqId, MVar Response)
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> m (ReqId, MVar Response)
newReq IORef AppState
app
Connection -> MVar Response -> ReqId -> Topic -> IO ()
C.lookup Connection
conn MVar Response
var1 ReqId
req1 Topic
topic
(req2 :: ReqId
req2, var2 :: MVar Response
var2) <- IORef AppState -> IO (ReqId, MVar Response)
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> m (ReqId, MVar Response)
newReq IORef AppState
app
Connection
-> MVar Response
-> ReqId
-> ConsumerId
-> Topic
-> Subscription
-> IO ()
C.newSubscriber Connection
conn MVar Response
var2 ReqId
req2 ConsumerId
cid Topic
topic Subscription
sub
Connection -> ConsumerId -> Permits -> IO ()
C.flow Connection
conn ConsumerId
cid (Word32 -> Permits
Permits (Word32 -> Permits) -> Word32 -> Permits
forall a b. (a -> b) -> a -> b
$ Int -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
defaultQueueSize)
fetcher :: Chan Response -> Chan Message -> IORef Int -> IO a -> IO b
fetcher :: Chan Response -> Chan Message -> IORef Int -> IO a -> IO b
fetcher chan :: Chan Response
chan fc :: Chan Message
fc ref :: IORef Int
ref f :: IO a
f = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ Chan Response -> IO Response
forall a. Chan a -> IO a
readChan Chan Response
chan IO Response -> (Response -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
PayloadResponse cmd :: BaseCommand
cmd _ p :: Maybe Payload
p -> Maybe CommandMessage -> (CommandMessage -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (BaseCommand
cmd BaseCommand
-> FoldLike
(Maybe CommandMessage)
BaseCommand
BaseCommand
(Maybe CommandMessage)
(Maybe CommandMessage)
-> Maybe CommandMessage
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe CommandMessage)
BaseCommand
BaseCommand
(Maybe CommandMessage)
(Maybe CommandMessage)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'message" a) =>
LensLike' f s a
F.maybe'message) ((CommandMessage -> IO ()) -> IO ())
-> (CommandMessage -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \msg :: CommandMessage
msg -> do
let msgId :: MessageIdData
msgId = CommandMessage
msg CommandMessage
-> FoldLike
MessageIdData
CommandMessage
CommandMessage
MessageIdData
MessageIdData
-> MessageIdData
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
MessageIdData
CommandMessage
CommandMessage
MessageIdData
MessageIdData
forall (f :: * -> *) s a.
(Functor f, HasField s "messageId" a) =>
LensLike' f s a
F.messageId
pm :: Message
pm = MsgId -> ByteString -> Message
Message (MessageIdData -> MsgId
MsgId MessageIdData
msgId) (ByteString -> Message) -> ByteString -> Message
forall a b. (a -> b) -> a -> b
$ ByteString
-> (Payload -> ByteString) -> Maybe Payload -> ByteString
forall b a. b -> (a -> b) -> Maybe a -> b
maybe "" (\(Payload x :: ByteString
x) -> ByteString
x) Maybe Payload
p
reset :: IO ()
reset = IORef Int -> (Int -> Int) -> IO ()
updateQueueSize IORef Int
ref ((Int
defaultQueueSize Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` 2) Int -> Int -> Int
forall a. Num a => a -> a -> a
-)
BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse BaseCommand
cmd
IORef Int -> (Int -> Int) -> IO ()
updateQueueSize IORef Int
ref (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1)
Int
size <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
ref
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
size Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
defaultQueueSize Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` 2) (IO a
f IO a -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
reset)
Chan Message -> Message -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan Message
fc Message
pm
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()