module Database.RethinkDB
( Handle
, newHandle
, run, nextChunk, collect
, Error(..)
, Exp
, Array, Object, Datum(..)
, Sequence
, constant
, Table, Database, SingleSelection
, Res
, emptyOptions
, Any, IsDatum, IsObject, IsSequence
, module Database.RethinkDB.Terms
) where
import Data.Monoid ((<>))
import qualified Data.Text as T
import qualified Data.Vector as V
import qualified Data.Aeson.Types as A
import Network.Socket (Socket)
import Data.IORef
import Database.RethinkDB.Types
import Database.RethinkDB.Terms
import Database.RethinkDB.Messages
data Handle = Handle
{ hSocket :: !Socket
, hTokenRef :: !(IORef Token)
}
newHandle :: IO Handle
newHandle = do
sock <- createSocket
sendMessage sock handshakeMessage
_reply <- recvMessage sock handshakeReplyParser
ref <- newIORef 1
return $ Handle sock ref
run :: (Any a, FromResponse (Result a))
=> Handle -> Exp a -> IO (Res a)
run handle expr = do
_token <- start handle expr
reply <- getResponse handle
case reply of
Left e -> return $ Left e
Right response -> return $ case A.parseEither parseResponse response of
Left e -> Left $ ProtocolError $ T.pack e
Right r -> Right r
collect :: (FromResponse (Sequence a))
=> Handle -> Sequence a -> IO (Either Error (V.Vector a))
collect _ (Done x) = return $ Right x
collect handle s@(Partial token x) = do
chunk <- nextChunk handle s
case chunk of
Left e -> return $ Left e
Right r -> do
vals <- collect handle r
case vals of
Left ve -> return $ Left ve
Right v -> return $ Right $ x <> v
start :: (Any a) => Handle -> Exp a -> IO Token
start handle term = do
token <- atomicModifyIORef (hTokenRef handle) (\x -> (x + 1, x))
sendMessage (hSocket handle) (queryMessage token (Start term []))
return token
singleElementArray :: Int -> A.Value
singleElementArray x = A.Array $ V.singleton $ A.Number $ fromIntegral x
continue :: Handle -> Token -> IO ()
continue handle token = sendMessage
(hSocket handle)
(queryMessage token $ singleElementArray 2)
stop :: Handle -> Token -> IO ()
stop handle token = sendMessage
(hSocket handle)
(queryMessage token $ singleElementArray 3)
wait :: Handle -> Token -> IO ()
wait handle token = sendMessage
(hSocket handle)
(queryMessage token $ singleElementArray 4)
nextChunk :: (FromResponse (Sequence a))
=> Handle -> Sequence a -> IO (Either Error (Sequence a))
nextChunk _ (Done _) = return $ Left $ ProtocolError ""
nextChunk handle (Partial token _) = do
continue handle token
reply <- getResponse handle
case reply of
Left e -> return $ Left e
Right response -> case A.parseEither parseResponse response of
Left e -> return $ Left $ ProtocolError $ T.pack e
Right r -> return $ Right $ r
getResponse :: Handle -> IO (Either Error Response)
getResponse handle = do
recvMessage (hSocket handle) responseMessageParser