module Database.RethinkDB
( Handle
, defaultPort, newHandle, close
, run, nextChunk, collect, stop, wait
, Error(..)
, Datum(..), Array, Object, ToDatum(..), FromDatum(..)
, (.=), (.:), (.:?), object
, Exp(..), SomeExp(..)
, Bound(..), Order(..)
, Sequence
, Table, Database, SingleSelection
, Res, Result, FromResponse
, ConflictResolutionStrategy(..)
, emptyOptions
, lift
, call1, call2
, IsDatum, IsObject, IsSequence
) where
import Data.Monoid ((<>))
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Vector as V
import qualified Data.Aeson.Types as A
import Network.Socket (Socket)
import Data.IORef
import Database.RethinkDB.Types
import Database.RethinkDB.Types.Datum
import Database.RethinkDB.Messages
data Handle = Handle
{ hSocket :: !Socket
, hTokenRef :: !(IORef Token)
, hDatabase :: !(Exp Database)
}
defaultPort :: Int
defaultPort = 28015
newHandle :: Text -> Int -> Maybe Text -> Exp Database -> IO Handle
newHandle host port mbAuth db = do
sock <- createSocket host port
sendMessage sock (handshakeMessage mbAuth)
_reply <- recvMessage sock handshakeReplyParser
ref <- newIORef 1
return $ Handle sock ref db
close :: Handle -> IO ()
close handle = closeSocket (hSocket handle)
run :: (FromResponse (Result a))
=> Handle -> Exp a -> IO (Res a)
run handle expr = do
_token <- start handle expr
reply <- getResponse handle
case reply of
Left e -> return $ Left e
Right response -> case responseType response of
ClientErrorType -> mkError response ClientError
CompileErrorType -> mkError response CompileError
RuntimeErrorType -> mkError response RuntimeError
_ -> return $ parseMessage parseResponse response Right
parseMessage :: (a -> A.Parser b) -> a -> (b -> Either Error c) -> Either Error c
parseMessage parser value f = case A.parseEither parser value of
Left e -> Left $ ProtocolError $ T.pack e
Right v -> f v
mkError :: Response -> (T.Text -> Error) -> IO (Either Error a)
mkError r e = return $ case V.toList (responseResult r) of
[a] -> parseMessage A.parseJSON a (Left . e)
_ -> Left $ ProtocolError $ "mkError: Could not parse error" <> T.pack (show (responseResult r))
collect :: (FromDatum a) => Handle -> Sequence a -> IO (Either Error (V.Vector a))
collect _ (Done x) = return $ Right x
collect handle s@(Partial _ x) = do
chunk <- nextChunk handle s
case chunk of
Left e -> return $ Left e
Right r -> do
vals <- collect handle r
case vals of
Left ve -> return $ Left ve
Right v -> return $ Right $ x <> v
start :: Handle -> Exp a -> IO Token
start handle term = do
token <- atomicModifyIORef (hTokenRef handle) (\x -> (x + 1, x))
sendMessage (hSocket handle) (queryMessage token msg)
return token
where
msg = compileTerm (hDatabase handle) $ do
term' <- toTerm term
options' <- toTerm emptyOptions
return $ A.Array $ V.fromList
[ A.Number 1
, term'
, A.toJSON $ options'
]
singleElementArray :: Int -> A.Value
singleElementArray x = A.Array $ V.singleton $ A.Number $ fromIntegral x
continue :: Handle -> Token -> IO ()
continue handle token = sendMessage
(hSocket handle)
(queryMessage token $ singleElementArray 2)
stop :: Handle -> Token -> IO ()
stop handle token = sendMessage
(hSocket handle)
(queryMessage token $ singleElementArray 3)
wait :: Handle -> Token -> IO ()
wait handle token = sendMessage
(hSocket handle)
(queryMessage token $ singleElementArray 4)
nextChunk :: (FromResponse (Sequence a))
=> Handle -> Sequence a -> IO (Either Error (Sequence a))
nextChunk _ (Done _) = return $ Left $ ProtocolError ""
nextChunk handle (Partial token _) = do
continue handle token
reply <- getResponse handle
case reply of
Left e -> return $ Left e
Right response -> return $ parseMessage parseResponse response Right
getResponse :: Handle -> IO (Either Error Response)
getResponse handle = do
recvMessage (hSocket handle) responseMessageParser