{-# LANGUAGE LambdaCase, OverloadedStrings #-}
module Pulsar.Connection where
import Control.Applicative ( (<|>) )
import Control.Concurrent ( forkIO
, killThread
, threadDelay
)
import Control.Concurrent.Async ( async
, concurrently_
)
import Control.Concurrent.Chan
import Control.Concurrent.MVar
import Control.Exception ( throwIO )
import Control.Monad ( forever
, when
)
import Control.Monad.Catch ( MonadThrow
, bracket
)
import Control.Monad.IO.Class
import Control.Monad.Managed ( MonadManaged
, managed
, runManaged
)
import Data.Foldable ( traverse_ )
import Data.Functor ( void )
import Data.IORef
import Lens.Family
import qualified Network.Socket as NS
import qualified Network.Socket.ByteString.Lazy
as SBL
import Proto.PulsarApi ( BaseCommand
, MessageMetadata
)
import qualified Proto.PulsarApi_Fields as F
import Pulsar.AppState
import Pulsar.Internal.Logger
import Pulsar.Internal.TCPClient ( acquireSocket )
import qualified Pulsar.Protocol.Commands as P
import Pulsar.Protocol.Decoder ( decodeBaseCommand )
import Pulsar.Protocol.Encoder ( encodeBaseCommand )
import Pulsar.Protocol.Frame ( Payload
, Response(..)
, frameMaxSize
, getCommand
)
import System.Timeout ( timeout )
newtype Connection = Conn NS.Socket
data ConnectData = ConnData
{ ConnectData -> HostName
connHost :: NS.HostName
, ConnectData -> HostName
connPort :: NS.ServiceName
} deriving Int -> ConnectData -> ShowS
[ConnectData] -> ShowS
ConnectData -> HostName
(Int -> ConnectData -> ShowS)
-> (ConnectData -> HostName)
-> ([ConnectData] -> ShowS)
-> Show ConnectData
forall a.
(Int -> a -> ShowS) -> (a -> HostName) -> ([a] -> ShowS) -> Show a
showList :: [ConnectData] -> ShowS
$cshowList :: [ConnectData] -> ShowS
show :: ConnectData -> HostName
$cshow :: ConnectData -> HostName
showsPrec :: Int -> ConnectData -> ShowS
$cshowsPrec :: Int -> ConnectData -> ShowS
Show
data PulsarCtx = Ctx
{ PulsarCtx -> Connection
ctxConn :: Connection
, PulsarCtx -> IORef AppState
ctxState :: IORef AppState
, PulsarCtx -> Worker
ctxConnWorker :: Worker
}
defaultConnectData :: ConnectData
defaultConnectData :: ConnectData
defaultConnectData = ConnData :: HostName -> HostName -> ConnectData
ConnData { connHost :: HostName
connHost = "127.0.0.1", connPort :: HostName
connPort = "6650" }
connect
:: (MonadIO m, MonadThrow m, MonadManaged m) => ConnectData -> m PulsarCtx
connect :: ConnectData -> m PulsarCtx
connect (ConnData h :: HostName
h p :: HostName
p) = do
Socket
socket <- HostName -> HostName -> m Socket
forall (m :: * -> *).
(MonadIO m, MonadManaged m) =>
HostName -> HostName -> m Socket
acquireSocket HostName
h HostName
p
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
socket BaseCommand
P.connect
Socket -> m ()
forall (m :: * -> *). (MonadIO m, MonadThrow m) => Socket -> m ()
checkConnection Socket
socket
IORef AppState
app <- IO (IORef AppState) -> m (IORef AppState)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (IORef AppState)
forall (m :: * -> *). MonadIO m => m (IORef AppState)
initAppState
Chan BaseCommand
kchan <- IO (Chan BaseCommand) -> m (Chan BaseCommand)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Chan BaseCommand)
forall a. IO (Chan a)
newChan
MVar ()
var <- IO (MVar ()) -> m (MVar ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
let
dispatcher :: IO ()
dispatcher = Socket -> IORef AppState -> Chan BaseCommand -> IO ()
recvDispatch Socket
socket IORef AppState
app Chan BaseCommand
kchan
task :: IO ()
task = IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
concurrently_ IO ()
dispatcher (Socket -> Chan BaseCommand -> IO ()
keepAlive Socket
socket Chan BaseCommand
kchan)
handler :: Managed ThreadId
handler =
(forall r. (ThreadId -> IO r) -> IO r) -> Managed ThreadId
forall a. (forall r. (a -> IO r) -> IO r) -> Managed a
managed (IO ThreadId -> (ThreadId -> IO ()) -> (ThreadId -> IO r) -> IO r
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket (IO () -> IO ThreadId
forkIO IO ()
task) (\i :: ThreadId
i -> MVar () -> IO ()
forall a. MVar a -> IO a
readMVar MVar ()
var IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ThreadId -> IO ()
killThread ThreadId
i))
Async ()
worker <- IO (Async ()) -> m (Async ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async ()) -> m (Async ())) -> IO (Async ()) -> m (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (Managed () -> IO ()
runManaged (Managed () -> IO ()) -> Managed () -> IO ()
forall a b. (a -> b) -> a -> b
$ Managed ThreadId -> Managed ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void Managed ThreadId
handler)
PulsarCtx -> m PulsarCtx
forall (m :: * -> *) a. Monad m => a -> m a
return (PulsarCtx -> m PulsarCtx) -> PulsarCtx -> m PulsarCtx
forall a b. (a -> b) -> a -> b
$ Connection -> IORef AppState -> Worker -> PulsarCtx
Ctx (Socket -> Connection
Conn Socket
socket) IORef AppState
app (Async ()
worker, MVar ()
var)
checkConnection :: (MonadIO m, MonadThrow m) => NS.Socket -> m ()
checkConnection :: Socket -> m ()
checkConnection socket :: Socket
socket = do
Response
resp <- Socket -> m Response
forall (m :: * -> *). MonadIO m => Socket -> m Response
receive Socket
socket
case Response -> BaseCommand
getCommand Response
resp BaseCommand
-> FoldLike
(Maybe CommandConnected)
BaseCommand
BaseCommand
(Maybe CommandConnected)
(Maybe CommandConnected)
-> Maybe CommandConnected
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe CommandConnected)
BaseCommand
BaseCommand
(Maybe CommandConnected)
(Maybe CommandConnected)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'connected" a) =>
LensLike' f s a
F.maybe'connected of
Just _ -> Response -> m ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse Response
resp
Nothing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (IOError -> IO ()) -> IOError -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IOError -> IO ()
forall e a. Exception e => e -> IO a
throwIO (IOError -> m ()) -> IOError -> m ()
forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError "Could not connect"
initAppState :: MonadIO m => m (IORef AppState)
initAppState :: m (IORef AppState)
initAppState = IO (IORef AppState) -> m (IORef AppState)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef AppState) -> m (IORef AppState))
-> (AppState -> IO (IORef AppState))
-> AppState
-> m (IORef AppState)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AppState -> IO (IORef AppState)
forall a. a -> IO (IORef a)
newIORef (AppState -> m (IORef AppState)) -> AppState -> m (IORef AppState)
forall a b. (a -> b) -> a -> b
$ [(ConsumerId, Chan Response)]
-> ConsumerId
-> ProducerId
-> ReqId
-> [Worker]
-> [(ReqId, MVar Response)]
-> [(ProducerId, ProducerSeqs)]
-> AppState
AppState [] 0 0 0 [] [] []
responseForRequest :: BaseCommand -> Maybe ReqId
responseForRequest :: BaseCommand -> Maybe ReqId
responseForRequest cmd :: BaseCommand
cmd =
let cmd1 :: Maybe Word64
cmd1 = FoldLike Word64 CommandSuccess CommandSuccess Word64 Word64
-> CommandSuccess -> Word64
forall a s t b. FoldLike a s t a b -> s -> a
view FoldLike Word64 CommandSuccess CommandSuccess Word64 Word64
forall (f :: * -> *) s a.
(Functor f, HasField s "requestId" a) =>
LensLike' f s a
F.requestId (CommandSuccess -> Word64) -> Maybe CommandSuccess -> Maybe Word64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BaseCommand
cmd BaseCommand
-> FoldLike
(Maybe CommandSuccess)
BaseCommand
BaseCommand
(Maybe CommandSuccess)
(Maybe CommandSuccess)
-> Maybe CommandSuccess
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe CommandSuccess)
BaseCommand
BaseCommand
(Maybe CommandSuccess)
(Maybe CommandSuccess)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'success" a) =>
LensLike' f s a
F.maybe'success
cmd2 :: Maybe Word64
cmd2 = FoldLike
Word64 CommandProducerSuccess CommandProducerSuccess Word64 Word64
-> CommandProducerSuccess -> Word64
forall a s t b. FoldLike a s t a b -> s -> a
view FoldLike
Word64 CommandProducerSuccess CommandProducerSuccess Word64 Word64
forall (f :: * -> *) s a.
(Functor f, HasField s "requestId" a) =>
LensLike' f s a
F.requestId (CommandProducerSuccess -> Word64)
-> Maybe CommandProducerSuccess -> Maybe Word64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BaseCommand
cmd BaseCommand
-> FoldLike
(Maybe CommandProducerSuccess)
BaseCommand
BaseCommand
(Maybe CommandProducerSuccess)
(Maybe CommandProducerSuccess)
-> Maybe CommandProducerSuccess
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe CommandProducerSuccess)
BaseCommand
BaseCommand
(Maybe CommandProducerSuccess)
(Maybe CommandProducerSuccess)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'producerSuccess" a) =>
LensLike' f s a
F.maybe'producerSuccess
cmd3 :: Maybe Word64
cmd3 = FoldLike
Word64
CommandLookupTopicResponse
CommandLookupTopicResponse
Word64
Word64
-> CommandLookupTopicResponse -> Word64
forall a s t b. FoldLike a s t a b -> s -> a
view FoldLike
Word64
CommandLookupTopicResponse
CommandLookupTopicResponse
Word64
Word64
forall (f :: * -> *) s a.
(Functor f, HasField s "requestId" a) =>
LensLike' f s a
F.requestId (CommandLookupTopicResponse -> Word64)
-> Maybe CommandLookupTopicResponse -> Maybe Word64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BaseCommand
cmd BaseCommand
-> FoldLike
(Maybe CommandLookupTopicResponse)
BaseCommand
BaseCommand
(Maybe CommandLookupTopicResponse)
(Maybe CommandLookupTopicResponse)
-> Maybe CommandLookupTopicResponse
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe CommandLookupTopicResponse)
BaseCommand
BaseCommand
(Maybe CommandLookupTopicResponse)
(Maybe CommandLookupTopicResponse)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'lookupTopicResponse" a) =>
LensLike' f s a
F.maybe'lookupTopicResponse
in Word64 -> ReqId
ReqId (Word64 -> ReqId) -> Maybe Word64 -> Maybe ReqId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Maybe Word64
cmd1 Maybe Word64 -> Maybe Word64 -> Maybe Word64
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Maybe Word64
cmd2 Maybe Word64 -> Maybe Word64 -> Maybe Word64
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Maybe Word64
cmd3)
responseForSendReceipt :: BaseCommand -> Maybe (ProducerId, SeqId)
responseForSendReceipt :: BaseCommand -> Maybe (ProducerId, SeqId)
responseForSendReceipt cmd :: BaseCommand
cmd =
let cmd' :: Maybe CommandSendReceipt
cmd' = BaseCommand
cmd BaseCommand
-> FoldLike
(Maybe CommandSendReceipt)
BaseCommand
BaseCommand
(Maybe CommandSendReceipt)
(Maybe CommandSendReceipt)
-> Maybe CommandSendReceipt
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe CommandSendReceipt)
BaseCommand
BaseCommand
(Maybe CommandSendReceipt)
(Maybe CommandSendReceipt)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'sendReceipt" a) =>
LensLike' f s a
F.maybe'sendReceipt
pid :: Maybe ProducerId
pid = Word64 -> ProducerId
PId (Word64 -> ProducerId)
-> (CommandSendReceipt -> Word64)
-> CommandSendReceipt
-> ProducerId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FoldLike Word64 CommandSendReceipt CommandSendReceipt Word64 Word64
-> CommandSendReceipt -> Word64
forall a s t b. FoldLike a s t a b -> s -> a
view FoldLike Word64 CommandSendReceipt CommandSendReceipt Word64 Word64
forall (f :: * -> *) s a.
(Functor f, HasField s "producerId" a) =>
LensLike' f s a
F.producerId (CommandSendReceipt -> ProducerId)
-> Maybe CommandSendReceipt -> Maybe ProducerId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe CommandSendReceipt
cmd'
sid :: Maybe SeqId
sid = Word64 -> SeqId
SeqId (Word64 -> SeqId)
-> (CommandSendReceipt -> Word64) -> CommandSendReceipt -> SeqId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FoldLike Word64 CommandSendReceipt CommandSendReceipt Word64 Word64
-> CommandSendReceipt -> Word64
forall a s t b. FoldLike a s t a b -> s -> a
view FoldLike Word64 CommandSendReceipt CommandSendReceipt Word64 Word64
forall (f :: * -> *) s a.
(Functor f, HasField s "sequenceId" a) =>
LensLike' f s a
F.sequenceId (CommandSendReceipt -> SeqId)
-> Maybe CommandSendReceipt -> Maybe SeqId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe CommandSendReceipt
cmd'
in (,) (ProducerId -> SeqId -> (ProducerId, SeqId))
-> Maybe ProducerId -> Maybe (SeqId -> (ProducerId, SeqId))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ProducerId
pid Maybe (SeqId -> (ProducerId, SeqId))
-> Maybe SeqId -> Maybe (ProducerId, SeqId)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe SeqId
sid
pongResponse :: BaseCommand -> Chan BaseCommand -> IO (Maybe ())
pongResponse :: BaseCommand -> Chan BaseCommand -> IO (Maybe ())
pongResponse cmd :: BaseCommand
cmd chan :: Chan BaseCommand
chan =
(CommandPong -> IO ()) -> Maybe CommandPong -> IO (Maybe ())
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (IO () -> CommandPong -> IO ()
forall a b. a -> b -> a
const (IO () -> CommandPong -> IO ()) -> IO () -> CommandPong -> IO ()
forall a b. (a -> b) -> a -> b
$ Chan BaseCommand -> BaseCommand -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan BaseCommand
chan BaseCommand
cmd) (BaseCommand
cmd BaseCommand
-> FoldLike
(Maybe CommandPong)
BaseCommand
BaseCommand
(Maybe CommandPong)
(Maybe CommandPong)
-> Maybe CommandPong
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe CommandPong)
BaseCommand
BaseCommand
(Maybe CommandPong)
(Maybe CommandPong)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'pong" a) =>
LensLike' f s a
F.maybe'pong)
messageResponse :: BaseCommand -> Maybe ConsumerId
messageResponse :: BaseCommand -> Maybe ConsumerId
messageResponse cmd :: BaseCommand
cmd =
let cmd' :: Maybe CommandMessage
cmd' = BaseCommand
cmd BaseCommand
-> FoldLike
(Maybe CommandMessage)
BaseCommand
BaseCommand
(Maybe CommandMessage)
(Maybe CommandMessage)
-> Maybe CommandMessage
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe CommandMessage)
BaseCommand
BaseCommand
(Maybe CommandMessage)
(Maybe CommandMessage)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'message" a) =>
LensLike' f s a
F.maybe'message
cid :: Maybe Word64
cid = FoldLike Word64 CommandMessage CommandMessage Word64 Word64
-> CommandMessage -> Word64
forall a s t b. FoldLike a s t a b -> s -> a
view FoldLike Word64 CommandMessage CommandMessage Word64 Word64
forall (f :: * -> *) s a.
(Functor f, HasField s "consumerId" a) =>
LensLike' f s a
F.consumerId (CommandMessage -> Word64) -> Maybe CommandMessage -> Maybe Word64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe CommandMessage
cmd'
in Word64 -> ConsumerId
CId (Word64 -> ConsumerId) -> Maybe Word64 -> Maybe ConsumerId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Word64
cid
recvDispatch :: NS.Socket -> IORef AppState -> Chan BaseCommand -> IO ()
recvDispatch :: Socket -> IORef AppState -> Chan BaseCommand -> IO ()
recvDispatch s :: Socket
s ref :: IORef AppState
ref chan :: Chan BaseCommand
chan = IO (Maybe ()) -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Response
resp <- Socket -> IO Response
forall (m :: * -> *). MonadIO m => Socket -> m Response
receive Socket
s
[(ConsumerId, Chan Response)]
cs <- AppState -> [(ConsumerId, Chan Response)]
_appConsumers (AppState -> [(ConsumerId, Chan Response)])
-> IO AppState -> IO [(ConsumerId, Chan Response)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef AppState -> IO AppState
forall a. IORef a -> IO a
readIORef IORef AppState
ref
let
f :: ReqId -> IO ()
f = \rid :: ReqId
rid -> IORef AppState -> ReqId -> Response -> IO ()
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> ReqId -> Response -> m ()
registerReqResponse IORef AppState
ref ReqId
rid Response
resp
g :: (ProducerId, SeqId) -> IO ()
g = (\(pid :: ProducerId
pid, sid :: SeqId
sid) -> IORef AppState -> ProducerId -> SeqId -> Response -> IO ()
forall (m :: * -> *).
MonadIO m =>
IORef AppState -> ProducerId -> SeqId -> Response -> m ()
registerSendReceipt IORef AppState
ref ProducerId
pid SeqId
sid Response
resp)
h :: ConsumerId -> IO [()]
h = \cid :: ConsumerId
cid ->
((ConsumerId, Chan Response) -> IO ())
-> [(ConsumerId, Chan Response)] -> IO [()]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (\(cid' :: ConsumerId
cid', cn :: Chan Response
cn) -> Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConsumerId
cid ConsumerId -> ConsumerId -> Bool
forall a. Eq a => a -> a -> Bool
== ConsumerId
cid') (Chan Response -> Response -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan Response
cn Response
resp)) [(ConsumerId, Chan Response)]
cs
cmd :: BaseCommand
cmd = Response -> BaseCommand
getCommand Response
resp
(ReqId -> IO ()) -> Maybe ReqId -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ ReqId -> IO ()
f (BaseCommand -> Maybe ReqId
responseForRequest BaseCommand
cmd)
((ProducerId, SeqId) -> IO ())
-> Maybe (ProducerId, SeqId) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (ProducerId, SeqId) -> IO ()
g (BaseCommand -> Maybe (ProducerId, SeqId)
responseForSendReceipt BaseCommand
cmd)
(ConsumerId -> IO [()]) -> Maybe ConsumerId -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ ConsumerId -> IO [()]
h (BaseCommand -> Maybe ConsumerId
messageResponse BaseCommand
cmd)
BaseCommand -> Chan BaseCommand -> IO (Maybe ())
pongResponse BaseCommand
cmd Chan BaseCommand
chan
keepAlive :: NS.Socket -> Chan BaseCommand -> IO ()
keepAlive :: Socket -> Chan BaseCommand -> IO ()
keepAlive s :: Socket
s chan :: Chan BaseCommand
chan = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int -> IO ()
threadDelay (29 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000000)
BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest BaseCommand
P.ping
Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s BaseCommand
P.ping
Int -> IO BaseCommand -> IO (Maybe BaseCommand)
forall a. Int -> IO a -> IO (Maybe a)
timeout (2 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000000) (Chan BaseCommand -> IO BaseCommand
forall a. Chan a -> IO a
readChan Chan BaseCommand
chan) IO (Maybe BaseCommand) -> (Maybe BaseCommand -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just cmd :: BaseCommand
cmd -> BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse BaseCommand
cmd
Nothing -> IOError -> IO ()
forall e a. Exception e => e -> IO a
throwIO (IOError -> IO ()) -> IOError -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName -> IOError
userError "Keep Alive interruption"
sendSimpleCmd :: MonadIO m => NS.Socket -> BaseCommand -> m ()
sendSimpleCmd :: Socket -> BaseCommand -> m ()
sendSimpleCmd s :: Socket
s cmd :: BaseCommand
cmd =
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (ByteString -> IO ()) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> ByteString -> IO ()
SBL.sendAll Socket
s (ByteString -> m ()) -> ByteString -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe MessageMetadata -> Maybe Payload -> BaseCommand -> ByteString
encodeBaseCommand Maybe MessageMetadata
forall a. Maybe a
Nothing Maybe Payload
forall a. Maybe a
Nothing BaseCommand
cmd
sendPayloadCmd
:: MonadIO m
=> NS.Socket
-> BaseCommand
-> MessageMetadata
-> Maybe Payload
-> m ()
sendPayloadCmd :: Socket -> BaseCommand -> MessageMetadata -> Maybe Payload -> m ()
sendPayloadCmd s :: Socket
s cmd :: BaseCommand
cmd meta :: MessageMetadata
meta payload :: Maybe Payload
payload =
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (ByteString -> IO ()) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> ByteString -> IO ()
SBL.sendAll Socket
s (ByteString -> m ()) -> ByteString -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe MessageMetadata -> Maybe Payload -> BaseCommand -> ByteString
encodeBaseCommand (MessageMetadata -> Maybe MessageMetadata
forall a. a -> Maybe a
Just MessageMetadata
meta) Maybe Payload
payload BaseCommand
cmd
receive :: MonadIO m => NS.Socket -> m Response
receive :: Socket -> m Response
receive s :: Socket
s = IO Response -> m Response
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Response -> m Response) -> IO Response -> m Response
forall a b. (a -> b) -> a -> b
$ do
ByteString
msg <- Socket -> Int64 -> IO ByteString
SBL.recv Socket
s (Int64 -> IO ByteString) -> Int64 -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
frameMaxSize
case ByteString -> Either HostName Response
decodeBaseCommand ByteString
msg of
Left e :: HostName
e -> HostName -> IO Response
forall (m :: * -> *) a. MonadFail m => HostName -> m a
fail (HostName -> IO Response) -> HostName -> IO Response
forall a b. (a -> b) -> a -> b
$ "Decoding error: " HostName -> ShowS
forall a. Semigroup a => a -> a -> a
<> HostName
e
Right resp :: Response
resp -> case Response -> BaseCommand
getCommand Response
resp BaseCommand
-> FoldLike
(Maybe CommandPing)
BaseCommand
BaseCommand
(Maybe CommandPing)
(Maybe CommandPing)
-> Maybe CommandPing
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe CommandPing)
BaseCommand
BaseCommand
(Maybe CommandPing)
(Maybe CommandPing)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'ping" a) =>
LensLike' f s a
F.maybe'ping of
Just _ -> do
BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Response -> BaseCommand
getCommand Response
resp
BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest BaseCommand
P.pong
Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s BaseCommand
P.pong
Response -> IO Response
forall (m :: * -> *) a. Monad m => a -> m a
return Response
resp
Nothing -> Response -> IO Response
forall (m :: * -> *) a. Monad m => a -> m a
return Response
resp