{-# LANGUAGE CPP, OverloadedStrings, RecordWildCards, ScopedTypeVariables, FlexibleContexts, MultiWayIf #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}

-- |
-- Module:      Network.Riak.Connection.Internal
-- Copyright:   (c) 2011 MailRank, Inc.
-- License:     Apache
-- Maintainer:  Mark Hibberd <mark@hibberd.id.au>, Nathan Hunter <nhunter@janrain.com>
-- Stability:   experimental
-- Portability: portable
--
-- Low-level network connection management.

module Network.Riak.Connection.Internal
    (
    -- * Connection management
      Network.Riak.Connection.Internal.connect
    , disconnect
    , setClientID
    -- * Client configuration
    , defaultClient
    , makeClientID
    -- * Requests and responses
    -- ** Sending and receiving requests and responses
    , exchange
    , exchangeMaybe
    , exchange_
    -- ** Pipelining many requests
    , pipeline
    , pipelineMaybe
    , pipeline_
    -- * Low-level protocol operations
    -- ** Sending and receiving
    , sendRequest
    , recvResponse
    , recvMaybeResponse
    , recvResponse_
    ) where

import Control.Concurrent.Async (async, waitBoth)
import Control.Exception (Exception, IOException, throwIO, bracketOnError)
import Control.Monad (forM_, replicateM)
import Data.Binary.Get (Get, Decoder(..), getWord32be, runGetIncremental)
import Data.Binary.Put (Put, putWord32be, runPut, putLazyByteString)
import Data.ByteString (ByteString)
import Data.IORef (newIORef, readIORef, writeIORef)
import Network.Riak.Connection.NoPush (setNoPush)
import Network.Riak.Debug as Debug
import Network.Riak.Lens
import Network.Riak.Tag (getTag, putTag)
import Network.Riak.Types.Internal hiding (MessageTag(..))
import Network.Socket as Socket
import Numeric (showHex)
import Data.ProtoLens (buildMessage)
import System.Random (randomIO)
import qualified Control.Exception as E
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Builder as BB
import qualified Data.ByteString.Lazy.Char8 as L
import qualified Data.Riak.Proto as Proto
import qualified Network.Riak.Types.Internal as T
import qualified Network.Socket.ByteString as B
import qualified Network.Socket.ByteString.Lazy as L

-- | Default client configuration.  Talks to localhost, port 8087,
-- with a randomly chosen client ID.
defaultClient :: Client
defaultClient :: Client
defaultClient = Client :: HostName -> HostName -> ClientID -> Client
Client {
                  host :: HostName
host = HostName
"127.0.0.1"
                , port :: HostName
port = HostName
"8087"
                , clientID :: ClientID
clientID = ClientID
B.empty
                }

-- | Tell the server our client ID.
setClientID :: Connection -> ClientID -> IO ()
setClientID :: Connection -> ClientID -> IO ()
setClientID Connection
conn ClientID
i = do
  Connection -> RpbSetClientIdReq -> IO ()
forall req. Request req => Connection -> req -> IO ()
sendRequest Connection
conn (RpbSetClientIdReq -> IO ()) -> RpbSetClientIdReq -> IO ()
forall a b. (a -> b) -> a -> b
$ (RpbSetClientIdReq
forall msg. Message msg => msg
Proto.defMessage RpbSetClientIdReq
-> (RpbSetClientIdReq -> RpbSetClientIdReq) -> RpbSetClientIdReq
forall a b. a -> (a -> b) -> b
& LensLike' Identity RpbSetClientIdReq ClientID
forall (f :: * -> *) s a.
(Functor f, HasField s "clientId" a) =>
LensLike' f s a
Proto.clientId LensLike' Identity RpbSetClientIdReq ClientID
-> ClientID -> RpbSetClientIdReq -> RpbSetClientIdReq
forall s a. Setter s a -> a -> s -> s
.~ ClientID
i :: Proto.RpbSetClientIdReq)
  Connection -> MessageTag -> IO ()
recvResponse_ Connection
conn MessageTag
T.SetClientIDResponse

-- | Generate a random client ID.
makeClientID :: IO ClientID
makeClientID :: IO ClientID
makeClientID = do
  Int
r <- IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => m a
randomIO :: IO Int
  ClientID -> IO ClientID
forall (m :: * -> *) a. Monad m => a -> m a
return (ClientID -> IO ClientID)
-> (HostName -> ClientID) -> HostName -> IO ClientID
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ClientID -> ClientID -> ClientID
B.append ClientID
"hs_" (ClientID -> ClientID)
-> (HostName -> ClientID) -> HostName -> ClientID
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HostName -> ClientID
B8.pack (HostName -> ClientID)
-> (HostName -> HostName) -> HostName -> ClientID
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> HostName -> HostName
forall a. (Integral a, Show a) => a -> HostName -> HostName
showHex (Int -> Int
forall a. Num a => a -> a
abs Int
r) (HostName -> IO ClientID) -> HostName -> IO ClientID
forall a b. (a -> b) -> a -> b
$ HostName
""

-- | Add a random 'ClientID' to a 'Client' if the 'Client' doesn't
-- already have one.
addClientID :: Client -> IO Client
addClientID :: Client -> IO Client
addClientID Client
client
  | ClientID -> Bool
B.null (Client -> ClientID
clientID Client
client) = do
    ClientID
i <- IO ClientID
makeClientID
    Client -> IO Client
forall (m :: * -> *) a. Monad m => a -> m a
return Client
client { clientID :: ClientID
clientID = ClientID
i }
  | Bool
otherwise = Client -> IO Client
forall (m :: * -> *) a. Monad m => a -> m a
return Client
client

-- | Connect to a server.
connect :: Client -> IO Connection
connect :: Client -> IO Connection
connect Client
cli0 = do
  client :: Client
client@Client{HostName
ClientID
clientID :: ClientID
port :: HostName
host :: HostName
clientID :: Client -> ClientID
port :: Client -> HostName
host :: Client -> HostName
..} <- Client -> IO Client
addClientID Client
cli0
  let hints :: AddrInfo
hints = AddrInfo
defaultHints {
                addrFlags :: [AddrInfoFlag]
addrFlags = [AddrInfoFlag
AI_ADDRCONFIG]
              , addrSocketType :: SocketType
addrSocketType = SocketType
Stream
              }
  HostName -> HostName -> IO ()
debug HostName
"connect" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
"server " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
host HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
":" HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
port HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
", client ID " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++
                    ClientID -> HostName
B8.unpack ClientID
clientID
  [AddrInfo]
ais <- Maybe AddrInfo -> Maybe HostName -> Maybe HostName -> IO [AddrInfo]
getAddrInfo (AddrInfo -> Maybe AddrInfo
forall a. a -> Maybe a
Just AddrInfo
hints) (HostName -> Maybe HostName
forall a. a -> Maybe a
Just HostName
host) (HostName -> Maybe HostName
forall a. a -> Maybe a
Just HostName
port)
  let ai :: AddrInfo
ai = case [AddrInfo]
ais of
             (AddrInfo
a:[AddrInfo]
_) -> AddrInfo
a
             [AddrInfo]
_     -> HostName -> HostName -> AddrInfo
forall a. HostName -> HostName -> a
moduleError HostName
"connect" (HostName -> AddrInfo) -> HostName -> AddrInfo
forall a b. (a -> b) -> a -> b
$
                      HostName
"could not look up server " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
host HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
":" HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
port
  HostName -> IO Connection -> IO Connection
forall a. HostName -> IO a -> IO a
onIOException HostName
"connect" (IO Connection -> IO Connection) -> IO Connection -> IO Connection
forall a b. (a -> b) -> a -> b
$
    IO Socket
-> (Socket -> IO ()) -> (Socket -> IO Connection) -> IO Connection
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracketOnError
      (Family -> SocketType -> ProtocolNumber -> IO Socket
socket (AddrInfo -> Family
addrFamily AddrInfo
ai) (AddrInfo -> SocketType
addrSocketType AddrInfo
ai) (AddrInfo -> ProtocolNumber
addrProtocol AddrInfo
ai))
      Socket -> IO ()
close ((Socket -> IO Connection) -> IO Connection)
-> (Socket -> IO Connection) -> IO Connection
forall a b. (a -> b) -> a -> b
$
      \Socket
sock -> do
          Socket -> SockAddr -> IO ()
Socket.connect Socket
sock (AddrInfo -> SockAddr
addrAddress AddrInfo
ai)
          IORef ClientID
buf <- ClientID -> IO (IORef ClientID)
forall a. a -> IO (IORef a)
newIORef ClientID
B.empty
          let conn :: Connection
conn = Socket -> Client -> IORef ClientID -> Connection
Connection Socket
sock Client
client IORef ClientID
buf
          Connection -> ClientID -> IO ()
setClientID Connection
conn ClientID
clientID
          Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn

-- | Disconnect from a server.
disconnect :: Connection -> IO ()
disconnect :: Connection -> IO ()
disconnect Connection{IORef ClientID
Socket
Client
connBuffer :: Connection -> IORef ClientID
connClient :: Connection -> Client
connSock :: Connection -> Socket
connBuffer :: IORef ClientID
connClient :: Client
connSock :: Socket
..} = HostName -> IO () -> IO ()
forall a. HostName -> IO a -> IO a
onIOException HostName
"disconnect" (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  HostName -> HostName -> IO ()
debug HostName
"disconnect" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
"server " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ Client -> HostName
host Client
connClient HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
":" HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ Client -> HostName
port Client
connClient HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++
                       HostName
", client ID " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ ClientID -> HostName
B8.unpack (Client -> ClientID
clientID Client
connClient)
  Socket -> IO ()
close Socket
connSock
  IORef ClientID -> ClientID -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef ClientID
connBuffer ClientID
B.empty

-- | We use a larger receive buffer than we usually need, and
-- generally ask to receive more data than we know we'll need, in the
-- hope that we'll be able to buffer some of it and avoid future recv
-- system calls.
recvBufferSize :: Integral a => a
recvBufferSize :: a
recvBufferSize = a
16384
{-# INLINE recvBufferSize #-}

recvExactly :: Connection -> Int -> IO ByteString
recvExactly :: Connection -> Int -> IO ClientID
recvExactly Connection{IORef ClientID
Socket
Client
connBuffer :: IORef ClientID
connClient :: Client
connSock :: Socket
connBuffer :: Connection -> IORef ClientID
connClient :: Connection -> Client
connSock :: Connection -> Socket
..} Int
n0
    | Int
n0 Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = ClientID -> IO ClientID
forall (m :: * -> *) a. Monad m => a -> m a
return ClientID
B.empty
    | Bool
otherwise = do
  ClientID
bs <- IORef ClientID -> IO ClientID
forall a. IORef a -> IO a
readIORef IORef ClientID
connBuffer
  let (ClientID
h,ClientID
t) = Int -> ClientID -> (ClientID, ClientID)
B.splitAt Int
n0 ClientID
bs
      len :: Int
len = ClientID -> Int
B.length ClientID
h
  if Int
len Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n0
    then IORef ClientID -> ClientID -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef ClientID
connBuffer ClientID
t IO () -> IO ClientID -> IO ClientID
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ClientID -> IO ClientID
forall (m :: * -> *) a. Monad m => a -> m a
return ClientID
h
    else [ClientID] -> Int -> IO ClientID
go [ClientID
h] (Int
n0Int -> Int -> Int
forall a. Num a => a -> a -> a
-Int
len)
  where
    maxInt :: Int
maxInt = Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
forall a. Bounded a => a
maxBound :: Int)
    go :: [ClientID] -> Int -> IO ClientID
go (ClientID
s:[ClientID]
acc) Int
n
      | Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0 = do
        let (ClientID
h,ClientID
t) = Int -> ClientID -> (ClientID, ClientID)
B.splitAt (ClientID -> Int
B.length ClientID
s Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n) ClientID
s
        IORef ClientID -> ClientID -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef ClientID
connBuffer (ClientID -> IO ()) -> ClientID -> IO ()
forall a b. (a -> b) -> a -> b
$! ClientID
t
        ClientID -> IO ClientID
forall (m :: * -> *) a. Monad m => a -> m a
return (ClientID -> IO ClientID) -> ClientID -> IO ClientID
forall a b. (a -> b) -> a -> b
$ [ClientID] -> ClientID
B.concat ([ClientID] -> [ClientID]
forall a. [a] -> [a]
reverse (ClientID
hClientID -> [ClientID] -> [ClientID]
forall a. a -> [a] -> [a]
:[ClientID]
acc))
    go [ClientID]
acc Int
n
      | Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 = do
        IORef ClientID -> ClientID -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef ClientID
connBuffer ClientID
B.empty
        ClientID -> IO ClientID
forall (m :: * -> *) a. Monad m => a -> m a
return (ClientID -> IO ClientID) -> ClientID -> IO ClientID
forall a b. (a -> b) -> a -> b
$ [ClientID] -> ClientID
B.concat ([ClientID] -> [ClientID]
forall a. [a] -> [a]
reverse [ClientID]
acc)
      | Bool
otherwise = do
        let n' :: Int
n' = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
forall a. Integral a => a
recvBufferSize (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
n Int
maxInt
        ClientID
bs <- Socket -> Int -> IO ClientID
B.recv Socket
connSock (Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n')
        let len :: Int
len = ClientID -> Int
B.length ClientID
bs
        if Int
len Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
          then HostName -> HostName -> IO ClientID
forall a. HostName -> HostName -> a
moduleError HostName
"recvExactly" HostName
"short read from network"
          else [ClientID] -> Int -> IO ClientID
go (ClientID
bsClientID -> [ClientID] -> [ClientID]
forall a. a -> [a] -> [a]
:[ClientID]
acc) (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
len)

recvGet :: Connection -> Get a -> IO a
recvGet :: Connection -> Get a -> IO a
recvGet Connection{IORef ClientID
Socket
Client
connBuffer :: IORef ClientID
connClient :: Client
connSock :: Socket
connBuffer :: Connection -> IORef ClientID
connClient :: Connection -> Client
connSock :: Connection -> Socket
..} Get a
get = do
  let refill :: IO (Maybe ByteString)
      refill :: IO (Maybe ClientID)
refill = do
        ClientID
bs <- Socket -> Int -> IO ClientID
B.recv Socket
connSock Int
forall a. Integral a => a
recvBufferSize
        if ClientID -> Bool
B.null ClientID
bs
          then Socket -> ShutdownCmd -> IO ()
shutdown Socket
connSock ShutdownCmd
ShutdownReceive IO () -> IO (Maybe ClientID) -> IO (Maybe ClientID)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe ClientID -> IO (Maybe ClientID)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ClientID
forall a. Maybe a
Nothing
          else Maybe ClientID -> IO (Maybe ClientID)
forall (m :: * -> *) a. Monad m => a -> m a
return (ClientID -> Maybe ClientID
forall a. a -> Maybe a
Just ClientID
bs)
      -- step :: Decoder a -> IO a
      step :: Decoder b -> IO b
step (Fail ClientID
_ ByteOffset
_ HostName
err) = HostName -> HostName -> IO b
forall a. HostName -> HostName -> a
moduleError HostName
"recvGet" HostName
err
      step (Done ClientID
bs ByteOffset
_ b
r)  = IORef ClientID -> ClientID -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef ClientID
connBuffer ClientID
bs IO () -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return b
r
      step (Partial Maybe ClientID -> Decoder b
k)    = (Decoder b -> IO b
step (Decoder b -> IO b)
-> (Maybe ClientID -> Decoder b) -> Maybe ClientID -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe ClientID -> Decoder b
k) (Maybe ClientID -> IO b) -> IO (Maybe ClientID) -> IO b
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (Maybe ClientID)
refill
  Maybe ClientID
mbs <- do
    ClientID
buf <- IORef ClientID -> IO ClientID
forall a. IORef a -> IO a
readIORef IORef ClientID
connBuffer
    if ClientID -> Bool
B.null ClientID
buf
      then IO (Maybe ClientID)
refill
      else Maybe ClientID -> IO (Maybe ClientID)
forall (m :: * -> *) a. Monad m => a -> m a
return (ClientID -> Maybe ClientID
forall a. a -> Maybe a
Just ClientID
buf)
  case Maybe ClientID
mbs of
    Just ClientID
bs ->
      case Get a -> Decoder a
forall a. Get a -> Decoder a
runGetIncremental Get a
get of
        Fail ClientID
_ ByteOffset
_ HostName
err -> HostName -> HostName -> IO a
forall a. HostName -> HostName -> a
moduleError HostName
"recvGet" HostName
err
        Done ClientID
bs' ByteOffset
_ a
r -> IORef ClientID -> ClientID -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef ClientID
connBuffer ClientID
bs' IO () -> IO a -> IO a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
r
        Partial Maybe ClientID -> Decoder a
k -> Decoder a -> IO a
forall b. Decoder b -> IO b
step (Maybe ClientID -> Decoder a
k (ClientID -> Maybe ClientID
forall a. a -> Maybe a
Just ClientID
bs))
    Maybe ClientID
Nothing -> HostName -> HostName -> IO a
forall a. HostName -> HostName -> a
moduleError HostName
"recvGet" HostName
"socket closed"

recvGetN :: Proto.Message a => Connection -> Int -> IO a
recvGetN :: Connection -> Int -> IO a
recvGetN Connection
conn Int
n = do
  ClientID
bs <- Connection -> Int -> IO ClientID
recvExactly Connection
conn Int
n
  case ClientID -> Either HostName a
forall msg. Message msg => ClientID -> Either HostName msg
Proto.decodeMessage ClientID
bs of
    Left HostName
err -> HostName -> HostName -> IO a
forall a. HostName -> HostName -> a
moduleError HostName
"recvGetN" HostName
err
    Right a
r -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
r

putRequest :: (Request req) => req -> Put
putRequest :: req -> Put
putRequest req
req = do
  Word32 -> Put
putWord32be (ByteOffset -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteOffset
1 ByteOffset -> ByteOffset -> ByteOffset
forall a. Num a => a -> a -> a
+ ByteString -> ByteOffset
L.length ByteString
bytes))
  MessageTag -> Put
putTag (req -> MessageTag
forall msg. Tagged msg => msg -> MessageTag
messageTag req
req)
  ByteString -> Put
putLazyByteString ByteString
bytes
  where
    bytes :: L.ByteString
    bytes :: ByteString
bytes = Builder -> ByteString
BB.toLazyByteString (req -> Builder
forall msg. Message msg => msg -> Builder
buildMessage req
req)

instance Exception Proto.RpbErrorResp

throwError :: Proto.RpbErrorResp -> IO a
throwError :: RpbErrorResp -> IO a
throwError = RpbErrorResp -> IO a
forall e a. Exception e => e -> IO a
throwIO

getResponse :: Response a => Connection -> Int -> a -> T.MessageTag -> IO a
getResponse :: Connection -> Int -> a -> MessageTag -> IO a
getResponse Connection
conn Int
len a
_ MessageTag
expected = do
  MessageTag
tag <- Connection -> Get MessageTag -> IO MessageTag
forall a. Connection -> Get a -> IO a
recvGet Connection
conn Get MessageTag
getTag
  if | MessageTag
tag MessageTag -> MessageTag -> Bool
forall a. Eq a => a -> a -> Bool
== MessageTag
expected        -> Connection -> Int -> IO a
forall a. Message a => Connection -> Int -> IO a
recvGetN Connection
conn (Int
lenInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1)
     | MessageTag
tag MessageTag -> MessageTag -> Bool
forall a. Eq a => a -> a -> Bool
== MessageTag
T.ErrorResponse -> RpbErrorResp -> IO a
forall a. RpbErrorResp -> IO a
throwError (RpbErrorResp -> IO a) -> IO RpbErrorResp -> IO a
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Connection -> Int -> IO RpbErrorResp
forall a. Message a => Connection -> Int -> IO a
recvGetN Connection
conn (Int
lenInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1)
     | Bool
otherwise ->
         HostName -> HostName -> IO a
forall a. HostName -> HostName -> a
moduleError HostName
"getResponse" (HostName -> IO a) -> HostName -> IO a
forall a b. (a -> b) -> a -> b
$ HostName
"received unexpected response: expected " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++
                                     MessageTag -> HostName
forall a. Show a => a -> HostName
show MessageTag
expected HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
", received " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ MessageTag -> HostName
forall a. Show a => a -> HostName
show MessageTag
tag

-- | Send a request to the server, and receive its response.
exchange :: Exchange req resp => Connection -> req -> IO resp
exchange :: Connection -> req -> IO resp
exchange conn :: Connection
conn@Connection{IORef ClientID
Socket
Client
connBuffer :: IORef ClientID
connClient :: Client
connSock :: Socket
connBuffer :: Connection -> IORef ClientID
connClient :: Connection -> Client
connSock :: Connection -> Socket
..} req
req = do
  HostName -> HostName -> IO ()
debug HostName
"exchange" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
">>> " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ req -> HostName
forall a. (Show a, Tagged a) => a -> HostName
showM req
req
  HostName -> IO resp -> IO resp
forall a. HostName -> IO a -> IO a
onIOException (HostName
"exchange " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ MessageTag -> HostName
forall a. Show a => a -> HostName
show (req -> MessageTag
forall msg. Tagged msg => msg -> MessageTag
messageTag req
req)) (IO resp -> IO resp) -> IO resp -> IO resp
forall a b. (a -> b) -> a -> b
$ do
    Connection -> req -> IO ()
forall req. Request req => Connection -> req -> IO ()
sendRequest Connection
conn req
req
    Connection -> IO resp
forall a. Response a => Connection -> IO a
recvResponse Connection
conn

-- | Send a request to the server, and receive its response (which may
-- be empty).
exchangeMaybe :: Exchange req resp => Connection -> req -> IO (Maybe resp)
exchangeMaybe :: Connection -> req -> IO (Maybe resp)
exchangeMaybe conn :: Connection
conn@Connection{IORef ClientID
Socket
Client
connBuffer :: IORef ClientID
connClient :: Client
connSock :: Socket
connBuffer :: Connection -> IORef ClientID
connClient :: Connection -> Client
connSock :: Connection -> Socket
..} req
req = do
  HostName -> HostName -> IO ()
debug HostName
"exchangeMaybe" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
">>> " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ req -> HostName
forall a. (Show a, Tagged a) => a -> HostName
showM req
req
  HostName -> IO (Maybe resp) -> IO (Maybe resp)
forall a. HostName -> IO a -> IO a
onIOException (HostName
"exchangeMaybe " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ MessageTag -> HostName
forall a. Show a => a -> HostName
show (req -> MessageTag
forall msg. Tagged msg => msg -> MessageTag
messageTag req
req)) (IO (Maybe resp) -> IO (Maybe resp))
-> IO (Maybe resp) -> IO (Maybe resp)
forall a b. (a -> b) -> a -> b
$ do
    Connection -> req -> IO ()
forall req. Request req => Connection -> req -> IO ()
sendRequest Connection
conn req
req
    Connection -> IO (Maybe resp)
forall a. Response a => Connection -> IO (Maybe a)
recvMaybeResponse Connection
conn

-- | Send a request to the server, and receive its response, but do
-- not decode it.
exchange_ :: Request req => Connection -> req -> IO ()
exchange_ :: Connection -> req -> IO ()
exchange_ Connection
conn req
req = do
  HostName -> HostName -> IO ()
debug HostName
"exchange_" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
">>> " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ req -> HostName
forall a. (Show a, Tagged a) => a -> HostName
showM req
req
  HostName -> IO () -> IO ()
forall a. HostName -> IO a -> IO a
onIOException (HostName
"exchange_ " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ MessageTag -> HostName
forall a. Show a => a -> HostName
show (req -> MessageTag
forall msg. Tagged msg => msg -> MessageTag
messageTag req
req)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    Connection -> req -> IO ()
forall req. Request req => Connection -> req -> IO ()
sendRequest Connection
conn req
req
    Connection -> MessageTag -> IO ()
recvResponse_ Connection
conn (req -> MessageTag
forall msg. Request msg => msg -> MessageTag
expectedResponse req
req)

sendAll :: Socket -> L.ByteString -> IO ()
sendAll :: Socket -> ByteString -> IO ()
sendAll Socket
sock ByteString
bs = do
  Socket -> Bool -> IO ()
setNoPush Socket
sock Bool
True
  Socket -> ByteString -> IO ()
L.sendAll Socket
sock ByteString
bs
  Socket -> Bool -> IO ()
setNoPush Socket
sock Bool
False

sendRequest :: (Request req) => Connection -> req -> IO ()
sendRequest :: Connection -> req -> IO ()
sendRequest Connection{IORef ClientID
Socket
Client
connBuffer :: IORef ClientID
connClient :: Client
connSock :: Socket
connBuffer :: Connection -> IORef ClientID
connClient :: Connection -> Client
connSock :: Connection -> Socket
..} = Socket -> ByteString -> IO ()
sendAll Socket
connSock (ByteString -> IO ()) -> (req -> ByteString) -> req -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> ByteString
runPut (Put -> ByteString) -> (req -> Put) -> req -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. req -> Put
forall req. Request req => req -> Put
putRequest

recvResponse :: (Response a) => Connection -> IO a
recvResponse :: Connection -> IO a
recvResponse Connection
conn = (a -> HostName) -> IO a -> IO a
forall a. (a -> HostName) -> IO a -> IO a
debugRecv a -> HostName
forall a. (Show a, Tagged a) => a -> HostName
showM (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ a -> IO a
forall b. Response b => b -> IO b
go a
forall a. HasCallStack => a
undefined where
  go :: Response b => b -> IO b
  go :: b -> IO b
go b
dummy = do
    Int
len <- Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word32 -> Int) -> IO Word32 -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` Connection -> Get Word32 -> IO Word32
forall a. Connection -> Get a -> IO a
recvGet Connection
conn Get Word32
getWord32be
    Connection -> Int -> b -> MessageTag -> IO b
forall a.
Response a =>
Connection -> Int -> a -> MessageTag -> IO a
getResponse Connection
conn Int
len b
dummy (b -> MessageTag
forall msg. Tagged msg => msg -> MessageTag
messageTag b
dummy)

recvResponse_ :: Connection -> T.MessageTag -> IO ()
recvResponse_ :: Connection -> MessageTag -> IO ()
recvResponse_ Connection
conn MessageTag
expected = (() -> HostName) -> IO () -> IO ()
forall a. (a -> HostName) -> IO a -> IO a
debugRecv () -> HostName
forall a. Show a => a -> HostName
show (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  Int
len <- Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word32 -> Int) -> IO Word32 -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` Connection -> Get Word32 -> IO Word32
forall a. Connection -> Get a -> IO a
recvGet Connection
conn Get Word32
getWord32be
  HostName -> Connection -> MessageTag -> Int -> () -> IO ()
forall a. HostName -> Connection -> MessageTag -> Int -> a -> IO a
recvCorrectTag HostName
"recvResponse_" Connection
conn MessageTag
expected (Int
lenInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1) ()

recvMaybeResponse :: (Response a) => Connection -> IO (Maybe a)
recvMaybeResponse :: Connection -> IO (Maybe a)
recvMaybeResponse Connection
conn = (Maybe a -> HostName) -> IO (Maybe a) -> IO (Maybe a)
forall a. (a -> HostName) -> IO a -> IO a
debugRecv (HostName -> (a -> HostName) -> Maybe a -> HostName
forall b a. b -> (a -> b) -> Maybe a -> b
maybe HostName
"Nothing" ((HostName
"Just " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++) (HostName -> HostName) -> (a -> HostName) -> a -> HostName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> HostName
forall a. (Show a, Tagged a) => a -> HostName
showM)) (IO (Maybe a) -> IO (Maybe a)) -> IO (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$
                         a -> IO (Maybe a)
forall b. Response b => b -> IO (Maybe b)
go a
forall a. HasCallStack => a
undefined where
  go :: Response b => b -> IO (Maybe b)
  go :: b -> IO (Maybe b)
go b
dummy = do
    Int
len <- Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word32 -> Int) -> IO Word32 -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` Connection -> Get Word32 -> IO Word32
forall a. Connection -> Get a -> IO a
recvGet Connection
conn Get Word32
getWord32be
    let tag :: MessageTag
tag = b -> MessageTag
forall msg. Tagged msg => msg -> MessageTag
messageTag b
dummy
    if Int
len Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
      then HostName
-> Connection -> MessageTag -> Int -> Maybe b -> IO (Maybe b)
forall a. HostName -> Connection -> MessageTag -> Int -> a -> IO a
recvCorrectTag HostName
"recvMaybeResponse" Connection
conn MessageTag
tag Int
1 Maybe b
forall a. Maybe a
Nothing
      else b -> Maybe b
forall a. a -> Maybe a
Just (b -> Maybe b) -> IO b -> IO (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` Connection -> Int -> b -> MessageTag -> IO b
forall a.
Response a =>
Connection -> Int -> a -> MessageTag -> IO a
getResponse Connection
conn Int
len b
dummy MessageTag
tag

recvCorrectTag :: String -> Connection -> T.MessageTag -> Int -> a -> IO a
recvCorrectTag :: HostName -> Connection -> MessageTag -> Int -> a -> IO a
recvCorrectTag HostName
func Connection
conn MessageTag
expected Int
len a
v = do
  MessageTag
tag <- Connection -> Get MessageTag -> IO MessageTag
forall a. Connection -> Get a -> IO a
recvGet Connection
conn Get MessageTag
getTag
  if | MessageTag
tag MessageTag -> MessageTag -> Bool
forall a. Eq a => a -> a -> Bool
== MessageTag
expected -> Connection -> Int -> IO ClientID
recvExactly Connection
conn (Int
lenInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1) IO ClientID -> IO a -> IO a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
v
     | MessageTag
tag MessageTag -> MessageTag -> Bool
forall a. Eq a => a -> a -> Bool
== MessageTag
T.ErrorResponse -> RpbErrorResp -> IO a
forall a. RpbErrorResp -> IO a
throwError (RpbErrorResp -> IO a) -> IO RpbErrorResp -> IO a
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Connection -> Int -> IO RpbErrorResp
forall a. Message a => Connection -> Int -> IO a
recvGetN Connection
conn Int
len
     | Bool
otherwise -> HostName -> HostName -> IO a
forall a. HostName -> HostName -> a
moduleError HostName
func (HostName -> IO a) -> HostName -> IO a
forall a b. (a -> b) -> a -> b
$
                    HostName
"received unexpected response: expected " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++
                    MessageTag -> HostName
forall a. Show a => a -> HostName
show MessageTag
expected HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
", received " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ MessageTag -> HostName
forall a. Show a => a -> HostName
show MessageTag
tag

debugRecv :: (a -> String) -> IO a -> IO a
#ifdef DEBUG
debugRecv :: (a -> HostName) -> IO a -> IO a
debugRecv a -> HostName
f IO a
act = do
  a
r <- IO a
act
  HostName -> HostName -> IO ()
debug HostName
"recv" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
"<<< " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ a -> HostName
f a
r
  a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
r
#else
debugRecv _ act = act
{-# INLINE debugRecv #-}
#endif

pipe :: (Request req) =>
        (Connection -> IO resp) -> Connection -> [req] -> IO [resp]
pipe :: (Connection -> IO resp) -> Connection -> [req] -> IO [resp]
pipe Connection -> IO resp
_ Connection
_ [] = [resp] -> IO [resp]
forall (m :: * -> *) a. Monad m => a -> m a
return []
pipe Connection -> IO resp
receive conn :: Connection
conn@Connection{IORef ClientID
Socket
Client
connBuffer :: IORef ClientID
connClient :: Client
connSock :: Socket
connBuffer :: Connection -> IORef ClientID
connClient :: Connection -> Client
connSock :: Connection -> Socket
..} [req]
reqs = do
  let numReqs :: Int
numReqs = [req] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [req]
reqs
  let tag :: HostName
tag = MessageTag -> HostName
forall a. Show a => a -> HostName
show (req -> MessageTag
forall msg. Tagged msg => msg -> MessageTag
messageTag ([req] -> req
forall a. [a] -> a
head [req]
reqs))
  if Int
Debug.level Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1
    then [req] -> (req -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [req]
reqs ((req -> IO ()) -> IO ()) -> (req -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \req
req -> HostName -> HostName -> IO ()
debug HostName
"pipe" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
">>> " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ req -> HostName
forall a. (Show a, Tagged a) => a -> HostName
showM req
req
    else HostName -> HostName -> IO ()
debug HostName
"pipe" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
">>> " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ Int -> HostName
forall a. Show a => a -> HostName
show Int
numReqs HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
"x " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
tag
  Async [resp]
receiveResps <- IO [resp] -> IO (Async [resp])
forall a. IO a -> IO (Async a)
async (IO [resp] -> IO (Async [resp]))
-> (IO resp -> IO [resp]) -> IO resp -> IO (Async [resp])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO resp -> IO [resp]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numReqs (IO resp -> IO (Async [resp])) -> IO resp -> IO (Async [resp])
forall a b. (a -> b) -> a -> b
$ Connection -> IO resp
receive Connection
conn
  Async ()
sendReqs <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ()))
-> ([req] -> IO ()) -> [req] -> IO (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> ByteString -> IO ()
sendAll Socket
connSock (ByteString -> IO ()) -> ([req] -> ByteString) -> [req] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> ByteString
runPut (Put -> ByteString) -> ([req] -> Put) -> [req] -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (req -> Put) -> [req] -> Put
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ req -> Put
forall req. Request req => req -> Put
putRequest ([req] -> IO (Async ())) -> [req] -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ [req]
reqs
  (()
_, [resp]
resps) <- HostName -> IO ((), [resp]) -> IO ((), [resp])
forall a. HostName -> IO a -> IO a
onIOException (HostName
"pipe " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
tag) (IO ((), [resp]) -> IO ((), [resp]))
-> IO ((), [resp]) -> IO ((), [resp])
forall a b. (a -> b) -> a -> b
$
    Async () -> Async [resp] -> IO ((), [resp])
forall a b. Async a -> Async b -> IO (a, b)
waitBoth Async ()
sendReqs Async [resp]
receiveResps
  [resp] -> IO [resp]
forall (m :: * -> *) a. Monad m => a -> m a
return [resp]
resps

-- | Send a series of requests to the server, back to back, and
-- receive a response for each request sent.  The sending and
-- receiving will be overlapped if possible, to improve concurrency
-- and reduce latency.
pipeline :: (Exchange req resp) => Connection -> [req] -> IO [resp]
pipeline :: Connection -> [req] -> IO [resp]
pipeline = (Connection -> IO resp) -> Connection -> [req] -> IO [resp]
forall req resp.
Request req =>
(Connection -> IO resp) -> Connection -> [req] -> IO [resp]
pipe Connection -> IO resp
forall a. Response a => Connection -> IO a
recvResponse

-- | Send a series of requests to the server, back to back, and
-- receive a response for each request sent (the responses may be
-- empty).  The sending and receiving will be overlapped if possible,
-- to improve concurrency and reduce latency.
pipelineMaybe :: (Exchange req resp) => Connection -> [req] -> IO [Maybe resp]
pipelineMaybe :: Connection -> [req] -> IO [Maybe resp]
pipelineMaybe = (Connection -> IO (Maybe resp))
-> Connection -> [req] -> IO [Maybe resp]
forall req resp.
Request req =>
(Connection -> IO resp) -> Connection -> [req] -> IO [resp]
pipe Connection -> IO (Maybe resp)
forall a. Response a => Connection -> IO (Maybe a)
recvMaybeResponse

-- | Send a series of requests to the server, back to back, and
-- receive (but do not decode) a response for each request sent.  The
-- sending and receiving will be overlapped if possible, to improve
-- concurrency and reduce latency.
pipeline_ :: (Request req) => Connection -> [req] -> IO ()
pipeline_ :: Connection -> [req] -> IO ()
pipeline_ Connection
_ [] = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
pipeline_ conn :: Connection
conn@Connection{IORef ClientID
Socket
Client
connBuffer :: IORef ClientID
connClient :: Client
connSock :: Socket
connBuffer :: Connection -> IORef ClientID
connClient :: Connection -> Client
connSock :: Connection -> Socket
..} [req]
reqs = do
  Async ()
receiveResps <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$
    [req] -> (req -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [req]
reqs (Connection -> MessageTag -> IO ()
recvResponse_ Connection
conn (MessageTag -> IO ()) -> (req -> MessageTag) -> req -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. req -> MessageTag
forall msg. Request msg => msg -> MessageTag
expectedResponse)
  if Int
Debug.level Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1
    then [req] -> (req -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [req]
reqs ((req -> IO ()) -> IO ()) -> (req -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \req
req -> HostName -> HostName -> IO ()
debug HostName
"pipe" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
">>> " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ req -> HostName
forall a. (Show a, Tagged a) => a -> HostName
showM req
req
    else HostName -> HostName -> IO ()
debug HostName
"pipe" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
">>> " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ Int -> HostName
forall a. Show a => a -> HostName
show ([req] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [req]
reqs) HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
"x " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++
                        MessageTag -> HostName
forall a. Show a => a -> HostName
show (req -> MessageTag
forall msg. Tagged msg => msg -> MessageTag
messageTag ([req] -> req
forall a. [a] -> a
head [req]
reqs))
  Async ()
sendReqs <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ()))
-> ([req] -> IO ()) -> [req] -> IO (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> ByteString -> IO ()
sendAll Socket
connSock (ByteString -> IO ()) -> ([req] -> ByteString) -> [req] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> ByteString
runPut (Put -> ByteString) -> ([req] -> Put) -> [req] -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (req -> Put) -> [req] -> Put
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ req -> Put
forall req. Request req => req -> Put
putRequest ([req] -> IO (Async ())) -> [req] -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ [req]
reqs
  ((), ())
_ <- HostName -> IO ((), ()) -> IO ((), ())
forall a. HostName -> IO a -> IO a
onIOException HostName
"pipeline_" (IO ((), ()) -> IO ((), ())) -> IO ((), ()) -> IO ((), ())
forall a b. (a -> b) -> a -> b
$ Async () -> Async () -> IO ((), ())
forall a b. Async a -> Async b -> IO (a, b)
waitBoth Async ()
sendReqs Async ()
receiveResps
  () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

onIOException :: String -> IO a -> IO a
onIOException :: HostName -> IO a -> IO a
onIOException HostName
func IO a
act =
    IO a
act IO a -> (IOException -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` \(IOException
e::IOException) -> do
      let s :: HostName
s = IOException -> HostName
forall a. Show a => a -> HostName
show IOException
e
      HostName -> HostName -> IO ()
debug HostName
func (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
"caught IO exception: " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
s
      HostName -> HostName -> IO a
forall a. HostName -> HostName -> a
moduleError HostName
func HostName
s

moduleError :: String -> String -> a
moduleError :: HostName -> HostName -> a
moduleError = HostName -> HostName -> HostName -> a
forall a. HostName -> HostName -> HostName -> a
netError HostName
"Network.Riak.Connection.Internal"