{-# LANGUAGE LambdaCase, OverloadedStrings #-}

module Pulsar.Connection where

import           Control.Applicative            ( (<|>) )
import           Control.Concurrent             ( forkIO
                                                , killThread
                                                , threadDelay
                                                )
import           Control.Concurrent.Async       ( async
                                                , concurrently_
                                                )
import           Control.Concurrent.Chan
import           Control.Concurrent.MVar
import           Control.Exception              ( throwIO )
import           Control.Monad                  ( forever
                                                , when
                                                )
import           Control.Monad.Catch            ( MonadThrow
                                                , bracket
                                                )
import           Control.Monad.IO.Class
import           Control.Monad.Managed          ( MonadManaged
                                                , managed
                                                , runManaged
                                                )
import           Data.Foldable                  ( traverse_ )
import           Data.Functor                   ( void )
import           Data.IORef
import           Lens.Family
import qualified Network.Socket                as NS
import qualified Network.Socket.ByteString.Lazy
                                               as SBL
import           Proto.PulsarApi                ( BaseCommand
                                                , MessageMetadata
                                                )
import qualified Proto.PulsarApi_Fields        as F
import           Pulsar.AppState
import           Pulsar.Internal.Logger
import           Pulsar.Internal.TCPClient      ( acquireSocket )
import qualified Pulsar.Protocol.Commands      as P
import           Pulsar.Protocol.Decoder        ( decodeBaseCommand )
import           Pulsar.Protocol.Encoder        ( encodeBaseCommand )
import           Pulsar.Protocol.Frame          ( Payload
                                                , Response(..)
                                                , frameMaxSize
                                                , getCommand
                                                )
import           System.Timeout                 ( timeout )

newtype Connection = Conn NS.Socket

{- | Connection details: host and port. -}
data ConnectData = ConnData
    { ConnectData -> HostName
connHost :: NS.HostName
    , ConnectData -> HostName
connPort :: NS.ServiceName
    } deriving Int -> ConnectData -> ShowS
[ConnectData] -> ShowS
ConnectData -> HostName
(Int -> ConnectData -> ShowS)
-> (ConnectData -> HostName)
-> ([ConnectData] -> ShowS)
-> Show ConnectData
forall a.
(Int -> a -> ShowS) -> (a -> HostName) -> ([a] -> ShowS) -> Show a
showList :: [ConnectData] -> ShowS
$cshowList :: [ConnectData] -> ShowS
show :: ConnectData -> HostName
$cshow :: ConnectData -> HostName
showsPrec :: Int -> ConnectData -> ShowS
$cshowsPrec :: Int -> ConnectData -> ShowS
Show

{- | Internal Pulsar context. You will never need to access its content (not exported) but might need to take it as argument. -}
data PulsarCtx = Ctx
  { PulsarCtx -> Connection
ctxConn :: Connection
  , PulsarCtx -> IORef AppState
ctxState :: IORef AppState
  , PulsarCtx -> Worker
ctxConnWorker :: Worker
  }

{- | Default connection data: "127.0.0.1:6650" -}
defaultConnectData :: ConnectData
defaultConnectData :: ConnectData
defaultConnectData = ConnData :: HostName -> HostName -> ConnectData
ConnData { connHost :: HostName
connHost = "127.0.0.1", connPort :: HostName
connPort = "6650" }

{- | Starts a Pulsar connection with the supplied 'ConnectData' -}
connect
  :: (MonadIO m, MonadThrow m, MonadManaged m) => ConnectData -> m PulsarCtx
connect :: ConnectData -> m PulsarCtx
connect (ConnData h :: HostName
h p :: HostName
p) = do
  Socket
socket <- HostName -> HostName -> m Socket
forall (m :: * -> *).
(MonadIO m, MonadManaged m) =>
HostName -> HostName -> m Socket
acquireSocket HostName
h HostName
p
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
socket BaseCommand
P.connect
  Socket -> m ()
forall (m :: * -> *). (MonadIO m, MonadThrow m) => Socket -> m ()
checkConnection Socket
socket
  IORef AppState
app   <- IO (IORef AppState) -> m (IORef AppState)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (IORef AppState)
forall (m :: * -> *). MonadIO m => m (IORef AppState)
initAppState
  Chan BaseCommand
kchan <- IO (Chan BaseCommand) -> m (Chan BaseCommand)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Chan BaseCommand)
forall a. IO (Chan a)
newChan
  MVar ()
var   <- IO (MVar ()) -> m (MVar ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  let
    dispatcher :: IO ()
dispatcher = Socket -> IORef AppState -> Chan BaseCommand -> IO ()
recvDispatch Socket
socket IORef AppState
app Chan BaseCommand
kchan
    task :: IO ()
task       = IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
concurrently_ IO ()
dispatcher (Socket -> Chan BaseCommand -> IO ()
keepAlive Socket
socket Chan BaseCommand
kchan)
    handler :: Managed ThreadId
handler =
      (forall r. (ThreadId -> IO r) -> IO r) -> Managed ThreadId
forall a. (forall r. (a -> IO r) -> IO r) -> Managed a
managed (IO ThreadId -> (ThreadId -> IO ()) -> (ThreadId -> IO r) -> IO r
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket (IO () -> IO ThreadId
forkIO IO ()
task) (\i :: ThreadId
i -> MVar () -> IO ()
forall a. MVar a -> IO a
readMVar MVar ()
var IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ThreadId -> IO ()
killThread ThreadId
i))
  Async ()
worker <- IO (Async ()) -> m (Async ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async ()) -> m (Async ())) -> IO (Async ()) -> m (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (Managed () -> IO ()
runManaged (Managed () -> IO ()) -> Managed () -> IO ()
forall a b. (a -> b) -> a -> b
$ Managed ThreadId -> Managed ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void Managed ThreadId
handler)
  PulsarCtx -> m PulsarCtx
forall (m :: * -> *) a. Monad m => a -> m a
return (PulsarCtx -> m PulsarCtx) -> PulsarCtx -> m PulsarCtx
forall a b. (a -> b) -> a -> b
$ Connection -> IORef AppState -> Worker -> PulsarCtx
Ctx (Socket -> Connection
Conn Socket
socket) IORef AppState
app (Async ()
worker, MVar ()
var)

checkConnection :: (MonadIO m, MonadThrow m) => NS.Socket -> m ()
checkConnection :: Socket -> m ()
checkConnection socket :: Socket
socket = do
  Response
resp <- Socket -> m Response
forall (m :: * -> *). MonadIO m => Socket -> m Response
receive Socket
socket
  case Response -> BaseCommand
getCommand Response
resp BaseCommand
-> FoldLike
     (Maybe CommandConnected)
     BaseCommand
     BaseCommand
     (Maybe CommandConnected)
     (Maybe CommandConnected)
-> Maybe CommandConnected
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  (Maybe CommandConnected)
  BaseCommand
  BaseCommand
  (Maybe CommandConnected)
  (Maybe CommandConnected)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'connected" a) =>
LensLike' f s a
F.maybe'connected of
    Just _  -> Response -> m ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse Response
resp
    Nothing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (IOError -> IO ()) -> IOError -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IOError -> IO ()
forall e a. Exception e => e -> IO a
throwIO (IOError -> m ()) -> IOError -> m ()
forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError "Could not connect"

initAppState :: MonadIO m => m (IORef AppState)
initAppState :: m (IORef AppState)
initAppState = IO (IORef AppState) -> m (IORef AppState)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef AppState) -> m (IORef AppState))
-> (AppState -> IO (IORef AppState))
-> AppState
-> m (IORef AppState)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AppState -> IO (IORef AppState)
forall a. a -> IO (IORef a)
newIORef (AppState -> m (IORef AppState)) -> AppState -> m (IORef AppState)
forall a b. (a -> b) -> a -> b
$ [(ConsumerId, Chan Response)]
-> ConsumerId
-> ProducerId
-> ReqId
-> [Worker]
-> [(ReqId, MVar Response)]
-> [(ProducerId, ProducerSeqs)]
-> AppState
AppState [] 0 0 0 [] [] []

responseForRequest :: BaseCommand -> Maybe ReqId
responseForRequest :: BaseCommand -> Maybe ReqId
responseForRequest cmd :: BaseCommand
cmd =
  let cmd1 :: Maybe Word64
cmd1 = FoldLike Word64 CommandSuccess CommandSuccess Word64 Word64
-> CommandSuccess -> Word64
forall a s t b. FoldLike a s t a b -> s -> a
view FoldLike Word64 CommandSuccess CommandSuccess Word64 Word64
forall (f :: * -> *) s a.
(Functor f, HasField s "requestId" a) =>
LensLike' f s a
F.requestId (CommandSuccess -> Word64) -> Maybe CommandSuccess -> Maybe Word64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BaseCommand
cmd BaseCommand
-> FoldLike
     (Maybe CommandSuccess)
     BaseCommand
     BaseCommand
     (Maybe CommandSuccess)
     (Maybe CommandSuccess)
-> Maybe CommandSuccess
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  (Maybe CommandSuccess)
  BaseCommand
  BaseCommand
  (Maybe CommandSuccess)
  (Maybe CommandSuccess)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'success" a) =>
LensLike' f s a
F.maybe'success
      cmd2 :: Maybe Word64
cmd2 = FoldLike
  Word64 CommandProducerSuccess CommandProducerSuccess Word64 Word64
-> CommandProducerSuccess -> Word64
forall a s t b. FoldLike a s t a b -> s -> a
view FoldLike
  Word64 CommandProducerSuccess CommandProducerSuccess Word64 Word64
forall (f :: * -> *) s a.
(Functor f, HasField s "requestId" a) =>
LensLike' f s a
F.requestId (CommandProducerSuccess -> Word64)
-> Maybe CommandProducerSuccess -> Maybe Word64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BaseCommand
cmd BaseCommand
-> FoldLike
     (Maybe CommandProducerSuccess)
     BaseCommand
     BaseCommand
     (Maybe CommandProducerSuccess)
     (Maybe CommandProducerSuccess)
-> Maybe CommandProducerSuccess
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  (Maybe CommandProducerSuccess)
  BaseCommand
  BaseCommand
  (Maybe CommandProducerSuccess)
  (Maybe CommandProducerSuccess)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'producerSuccess" a) =>
LensLike' f s a
F.maybe'producerSuccess
      cmd3 :: Maybe Word64
cmd3 = FoldLike
  Word64
  CommandLookupTopicResponse
  CommandLookupTopicResponse
  Word64
  Word64
-> CommandLookupTopicResponse -> Word64
forall a s t b. FoldLike a s t a b -> s -> a
view FoldLike
  Word64
  CommandLookupTopicResponse
  CommandLookupTopicResponse
  Word64
  Word64
forall (f :: * -> *) s a.
(Functor f, HasField s "requestId" a) =>
LensLike' f s a
F.requestId (CommandLookupTopicResponse -> Word64)
-> Maybe CommandLookupTopicResponse -> Maybe Word64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BaseCommand
cmd BaseCommand
-> FoldLike
     (Maybe CommandLookupTopicResponse)
     BaseCommand
     BaseCommand
     (Maybe CommandLookupTopicResponse)
     (Maybe CommandLookupTopicResponse)
-> Maybe CommandLookupTopicResponse
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  (Maybe CommandLookupTopicResponse)
  BaseCommand
  BaseCommand
  (Maybe CommandLookupTopicResponse)
  (Maybe CommandLookupTopicResponse)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'lookupTopicResponse" a) =>
LensLike' f s a
F.maybe'lookupTopicResponse
  in  Word64 -> ReqId
ReqId (Word64 -> ReqId) -> Maybe Word64 -> Maybe ReqId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Maybe Word64
cmd1 Maybe Word64 -> Maybe Word64 -> Maybe Word64
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Maybe Word64
cmd2 Maybe Word64 -> Maybe Word64 -> Maybe Word64
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Maybe Word64
cmd3)

responseForSendReceipt :: BaseCommand -> Maybe (ProducerId, SeqId)
responseForSendReceipt :: BaseCommand -> Maybe (ProducerId, SeqId)
responseForSendReceipt cmd :: BaseCommand
cmd =
  let cmd' :: Maybe CommandSendReceipt
cmd' = BaseCommand
cmd BaseCommand
-> FoldLike
     (Maybe CommandSendReceipt)
     BaseCommand
     BaseCommand
     (Maybe CommandSendReceipt)
     (Maybe CommandSendReceipt)
-> Maybe CommandSendReceipt
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  (Maybe CommandSendReceipt)
  BaseCommand
  BaseCommand
  (Maybe CommandSendReceipt)
  (Maybe CommandSendReceipt)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'sendReceipt" a) =>
LensLike' f s a
F.maybe'sendReceipt
      pid :: Maybe ProducerId
pid  = Word64 -> ProducerId
PId (Word64 -> ProducerId)
-> (CommandSendReceipt -> Word64)
-> CommandSendReceipt
-> ProducerId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FoldLike Word64 CommandSendReceipt CommandSendReceipt Word64 Word64
-> CommandSendReceipt -> Word64
forall a s t b. FoldLike a s t a b -> s -> a
view FoldLike Word64 CommandSendReceipt CommandSendReceipt Word64 Word64
forall (f :: * -> *) s a.
(Functor f, HasField s "producerId" a) =>
LensLike' f s a
F.producerId (CommandSendReceipt -> ProducerId)
-> Maybe CommandSendReceipt -> Maybe ProducerId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe CommandSendReceipt
cmd'
      sid :: Maybe SeqId
sid  = Word64 -> SeqId
SeqId (Word64 -> SeqId)
-> (CommandSendReceipt -> Word64) -> CommandSendReceipt -> SeqId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FoldLike Word64 CommandSendReceipt CommandSendReceipt Word64 Word64
-> CommandSendReceipt -> Word64
forall a s t b. FoldLike a s t a b -> s -> a
view FoldLike Word64 CommandSendReceipt CommandSendReceipt Word64 Word64
forall (f :: * -> *) s a.
(Functor f, HasField s "sequenceId" a) =>
LensLike' f s a
F.sequenceId (CommandSendReceipt -> SeqId)
-> Maybe CommandSendReceipt -> Maybe SeqId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe CommandSendReceipt
cmd'
  in  (,) (ProducerId -> SeqId -> (ProducerId, SeqId))
-> Maybe ProducerId -> Maybe (SeqId -> (ProducerId, SeqId))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ProducerId
pid Maybe (SeqId -> (ProducerId, SeqId))
-> Maybe SeqId -> Maybe (ProducerId, SeqId)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe SeqId
sid

pongResponse :: BaseCommand -> Chan BaseCommand -> IO (Maybe ())
pongResponse :: BaseCommand -> Chan BaseCommand -> IO (Maybe ())
pongResponse cmd :: BaseCommand
cmd chan :: Chan BaseCommand
chan =
  (CommandPong -> IO ()) -> Maybe CommandPong -> IO (Maybe ())
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (IO () -> CommandPong -> IO ()
forall a b. a -> b -> a
const (IO () -> CommandPong -> IO ()) -> IO () -> CommandPong -> IO ()
forall a b. (a -> b) -> a -> b
$ Chan BaseCommand -> BaseCommand -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan BaseCommand
chan BaseCommand
cmd) (BaseCommand
cmd BaseCommand
-> FoldLike
     (Maybe CommandPong)
     BaseCommand
     BaseCommand
     (Maybe CommandPong)
     (Maybe CommandPong)
-> Maybe CommandPong
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  (Maybe CommandPong)
  BaseCommand
  BaseCommand
  (Maybe CommandPong)
  (Maybe CommandPong)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'pong" a) =>
LensLike' f s a
F.maybe'pong)

messageResponse :: BaseCommand -> Maybe ConsumerId
messageResponse :: BaseCommand -> Maybe ConsumerId
messageResponse cmd :: BaseCommand
cmd =
  let cmd' :: Maybe CommandMessage
cmd' = BaseCommand
cmd BaseCommand
-> FoldLike
     (Maybe CommandMessage)
     BaseCommand
     BaseCommand
     (Maybe CommandMessage)
     (Maybe CommandMessage)
-> Maybe CommandMessage
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  (Maybe CommandMessage)
  BaseCommand
  BaseCommand
  (Maybe CommandMessage)
  (Maybe CommandMessage)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'message" a) =>
LensLike' f s a
F.maybe'message
      cid :: Maybe Word64
cid  = FoldLike Word64 CommandMessage CommandMessage Word64 Word64
-> CommandMessage -> Word64
forall a s t b. FoldLike a s t a b -> s -> a
view FoldLike Word64 CommandMessage CommandMessage Word64 Word64
forall (f :: * -> *) s a.
(Functor f, HasField s "consumerId" a) =>
LensLike' f s a
F.consumerId (CommandMessage -> Word64) -> Maybe CommandMessage -> Maybe Word64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe CommandMessage
cmd'
  in  Word64 -> ConsumerId
CId (Word64 -> ConsumerId) -> Maybe Word64 -> Maybe ConsumerId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Word64
cid

{- | It listens to incoming messages directly from the network socket and it writes them to all the
 - consumers and producers' communication channels. -}
recvDispatch :: NS.Socket -> IORef AppState -> Chan BaseCommand -> IO ()
recvDispatch :: Socket -> IORef AppState -> Chan BaseCommand -> IO ()
recvDispatch s :: Socket
s ref :: IORef AppState
ref chan :: Chan BaseCommand
chan = IO (Maybe ()) -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  Response
resp <- Socket -> IO Response
forall (m :: * -> *). MonadIO m => Socket -> m Response
receive Socket
s
  [(ConsumerId, Chan Response)]
cs   <- AppState -> [(ConsumerId, Chan Response)]
_appConsumers (AppState -> [(ConsumerId, Chan Response)])
-> IO AppState -> IO [(ConsumerId, Chan Response)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef AppState -> IO AppState
forall a. IORef a -> IO a
readIORef IORef AppState
ref
  let
    f :: ReqId -> IO ()
f = \rid :: ReqId
rid -> IORef AppState -> ReqId -> Response -> IO ()
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> ReqId -> Response -> m ()
registerReqResponse IORef AppState
ref ReqId
rid Response
resp
    g :: (ProducerId, SeqId) -> IO ()
g = (\(pid :: ProducerId
pid, sid :: SeqId
sid) -> IORef AppState -> ProducerId -> SeqId -> Response -> IO ()
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> ProducerId -> SeqId -> Response -> m ()
registerSendReceipt IORef AppState
ref ProducerId
pid SeqId
sid Response
resp)
    h :: ConsumerId -> IO [()]
h = \cid :: ConsumerId
cid ->
      ((ConsumerId, Chan Response) -> IO ())
-> [(ConsumerId, Chan Response)] -> IO [()]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (\(cid' :: ConsumerId
cid', cn :: Chan Response
cn) -> Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConsumerId
cid ConsumerId -> ConsumerId -> Bool
forall a. Eq a => a -> a -> Bool
== ConsumerId
cid') (Chan Response -> Response -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan Response
cn Response
resp)) [(ConsumerId, Chan Response)]
cs
    cmd :: BaseCommand
cmd = Response -> BaseCommand
getCommand Response
resp
  (ReqId -> IO ()) -> Maybe ReqId -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ ReqId -> IO ()
f (BaseCommand -> Maybe ReqId
responseForRequest BaseCommand
cmd)
  ((ProducerId, SeqId) -> IO ())
-> Maybe (ProducerId, SeqId) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (ProducerId, SeqId) -> IO ()
g (BaseCommand -> Maybe (ProducerId, SeqId)
responseForSendReceipt BaseCommand
cmd)
  (ConsumerId -> IO [()]) -> Maybe ConsumerId -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ ConsumerId -> IO [()]
h (BaseCommand -> Maybe ConsumerId
messageResponse BaseCommand
cmd)
  BaseCommand -> Chan BaseCommand -> IO (Maybe ())
pongResponse BaseCommand
cmd Chan BaseCommand
chan

{- Emit a PING and expect a PONG every 29 seconds. If a PONG is not received, interrupt connection -}
keepAlive :: NS.Socket -> Chan BaseCommand -> IO ()
keepAlive :: Socket -> Chan BaseCommand -> IO ()
keepAlive s :: Socket
s chan :: Chan BaseCommand
chan = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  Int -> IO ()
threadDelay (29 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000000)
  BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest BaseCommand
P.ping
  Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s BaseCommand
P.ping
  Int -> IO BaseCommand -> IO (Maybe BaseCommand)
forall a. Int -> IO a -> IO (Maybe a)
timeout (2 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000000) (Chan BaseCommand -> IO BaseCommand
forall a. Chan a -> IO a
readChan Chan BaseCommand
chan) IO (Maybe BaseCommand) -> (Maybe BaseCommand -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Just cmd :: BaseCommand
cmd -> BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse BaseCommand
cmd
    Nothing  -> IOError -> IO ()
forall e a. Exception e => e -> IO a
throwIO (IOError -> IO ()) -> IOError -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError "Keep Alive interruption"

sendSimpleCmd :: MonadIO m => NS.Socket -> BaseCommand -> m ()
sendSimpleCmd :: Socket -> BaseCommand -> m ()
sendSimpleCmd s :: Socket
s cmd :: BaseCommand
cmd =
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (ByteString -> IO ()) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> ByteString -> IO ()
SBL.sendAll Socket
s (ByteString -> m ()) -> ByteString -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe MessageMetadata -> Maybe Payload -> BaseCommand -> ByteString
encodeBaseCommand Maybe MessageMetadata
forall a. Maybe a
Nothing Maybe Payload
forall a. Maybe a
Nothing BaseCommand
cmd

sendPayloadCmd
  :: MonadIO m
  => NS.Socket
  -> BaseCommand
  -> MessageMetadata
  -> Maybe Payload
  -> m ()
sendPayloadCmd :: Socket -> BaseCommand -> MessageMetadata -> Maybe Payload -> m ()
sendPayloadCmd s :: Socket
s cmd :: BaseCommand
cmd meta :: MessageMetadata
meta payload :: Maybe Payload
payload =
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (ByteString -> IO ()) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> ByteString -> IO ()
SBL.sendAll Socket
s (ByteString -> m ()) -> ByteString -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe MessageMetadata -> Maybe Payload -> BaseCommand -> ByteString
encodeBaseCommand (MessageMetadata -> Maybe MessageMetadata
forall a. a -> Maybe a
Just MessageMetadata
meta) Maybe Payload
payload BaseCommand
cmd

receive :: MonadIO m => NS.Socket -> m Response
receive :: Socket -> m Response
receive s :: Socket
s = IO Response -> m Response
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Response -> m Response) -> IO Response -> m Response
forall a b. (a -> b) -> a -> b
$ do
  ByteString
msg <- Socket -> Int64 -> IO ByteString
SBL.recv Socket
s (Int64 -> IO ByteString) -> Int64 -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
frameMaxSize
  case ByteString -> Either HostName Response
decodeBaseCommand ByteString
msg of
    Left  e :: HostName
e    -> HostName -> IO Response
forall (m :: * -> *) a. MonadFail m => HostName -> m a
fail (HostName -> IO Response) -> HostName -> IO Response
forall a b. (a -> b) -> a -> b
$ "Decoding error: " HostName -> ShowS
forall a. Semigroup a => a -> a -> a
<> HostName
e
    Right resp :: Response
resp -> case Response -> BaseCommand
getCommand Response
resp BaseCommand
-> FoldLike
     (Maybe CommandPing)
     BaseCommand
     BaseCommand
     (Maybe CommandPing)
     (Maybe CommandPing)
-> Maybe CommandPing
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  (Maybe CommandPing)
  BaseCommand
  BaseCommand
  (Maybe CommandPing)
  (Maybe CommandPing)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'ping" a) =>
LensLike' f s a
F.maybe'ping of
      Just _ -> do
        BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Response -> BaseCommand
getCommand Response
resp
        BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest BaseCommand
P.pong
        Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s BaseCommand
P.pong
        Response -> IO Response
forall (m :: * -> *) a. Monad m => a -> m a
return Response
resp
      Nothing -> Response -> IO Response
forall (m :: * -> *) a. Monad m => a -> m a
return Response
resp