module Database.RethinkDB.Messages where
import Control.Applicative
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.Aeson as A hiding (Result, Object)
import Data.Aeson.Types as A hiding (Result, Object)
import qualified Data.ByteString as SBS
import Data.ByteString.Lazy (ByteString, toStrict)
import qualified Data.ByteString.Lazy as BS
import Data.Binary
import Data.Binary.Put
import Data.Binary.Get as Get
import Network.Socket (Socket, AddrInfo(..), AddrInfoFlag(..), SocketType(..))
import Network.Socket (getAddrInfo, socket, connect, close, defaultHints)
import Network.Socket.ByteString (recv)
import Network.Socket.ByteString.Lazy (sendAll)
import Database.RethinkDB.Types
import Prelude
createSocket :: Text -> Int -> IO Socket
createSocket host port = do
ai:_ <- getAddrInfo (Just hints) (Just $ T.unpack host) (Just $ show port)
sock <- socket (addrFamily ai) (addrSocketType ai) (addrProtocol ai)
connect sock (addrAddress ai)
return sock
where
hints = defaultHints { addrSocketType = Stream, addrFlags = [ AI_NUMERICSERV ] }
closeSocket :: Socket -> IO ()
closeSocket = close
sendMessage :: Socket -> ByteString -> IO ()
sendMessage sock buf = sendAll sock buf
recvMessage :: Socket -> Get a -> IO (Either Error a)
recvMessage sock parser = go (runGetIncremental parser)
where
go (Get.Done _ _ r) = return $ Right r
go (Get.Partial c) = recv sock (4 * 1024) >>= go . c . Just
go (Get.Fail _ _ e) = return $ Left $ ProtocolError $ T.pack e
handshakeMessage :: Maybe Text -> ByteString
handshakeMessage mbAuth = runPut $ do
putWord32le 0x400c2d20
case mbAuth of
Nothing -> putWord32le 0
Just auth -> do
let key = T.encodeUtf8 auth
putWord32le $ fromIntegral $ SBS.length key
putByteString key
putWord32le 0x7e6970c7
handshakeReplyParser :: Get Text
handshakeReplyParser = do
(T.decodeUtf8 . toStrict) <$> getLazyByteStringNul
queryMessage :: Token -> A.Value -> ByteString
queryMessage token msg = runPut $ do
putWord64host token
putWord32le (fromIntegral $ BS.length buf)
putLazyByteString buf
where
buf = A.encode msg
responseMessageParser :: Get (Either (Token, String) Response)
responseMessageParser = do
token <- getWord64host
len <- getWord32le
buf <- getByteString (fromIntegral len)
case A.eitherDecodeStrict buf of
Left e -> pure $ Left (token, "responseMessageParser: response is not a JSON value (" ++ e ++ ")")
Right value -> case A.parseEither (responseParser token) value of
Left e -> pure $ Left (token, "responseMessageParser: could not parse response (" ++ e ++ ")")
Right x -> pure $ Right x