-- | Low-level messaging between this client and the MongoDB server, see Mongo
-- Wire Protocol (<http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol>).
--
-- This module is not intended for direct use. Use the high-level interface at
-- "Database.MongoDB.Query" and "Database.MongoDB.Connection" instead.

{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings #-}
{-# LANGUAGE CPP, FlexibleContexts, TupleSections, TypeSynonymInstances #-}
{-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-}
{-# LANGUAGE BangPatterns #-}

{-# LANGUAGE NamedFieldPuns, ScopedTypeVariables #-}

#if (__GLASGOW_HASKELL__ >= 706)
{-# LANGUAGE RecursiveDo #-}
#else
{-# LANGUAGE DoRec #-}
#endif

module Database.MongoDB.Internal.Protocol (
    FullCollection,
    -- * Pipe
    Pipe, newPipe, newPipeWith, send, call,
    -- ** Notice
    Notice(..), InsertOption(..), UpdateOption(..), DeleteOption(..), CursorId,
    -- ** Request
    Request(..), QueryOption(..),
    -- ** Reply
    Reply(..), ResponseFlag(..),
    -- * Authentication
    Username, Password, Nonce, pwHash, pwKey,
    isClosed, close, ServerData(..), Pipeline(..)
) where

#if !MIN_VERSION_base(4,8,0)
import Control.Applicative ((<$>))
#endif
import Control.Monad (forM, replicateM, unless)
import Data.Binary.Get (Get, runGet)
import Data.Binary.Put (Put, runPut)
import Data.Bits (bit, testBit)
import Data.Int (Int32, Int64)
import Data.IORef (IORef, newIORef, atomicModifyIORef)
import System.IO (Handle)
import System.IO.Error (doesNotExistErrorType, mkIOError)
import System.IO.Unsafe (unsafePerformIO)
import Data.Maybe (maybeToList)
import GHC.Conc (ThreadStatus(..), threadStatus)
import Control.Monad (forever)
import Control.Monad.STM (atomically)
import Control.Concurrent (ThreadId, killThread, forkIOWithUnmask)
import Control.Concurrent.STM.TChan (TChan, newTChan, readTChan, writeTChan, isEmptyTChan)

import Control.Exception.Lifted (SomeException, mask_, onException, throwIO, try)

import qualified Data.ByteString.Lazy as L

import Control.Monad.Trans (MonadIO, liftIO)
import Data.Bson (Document)
import Data.Bson.Binary (getDocument, putDocument, getInt32, putInt32, getInt64,
                         putInt64, putCString)
import Data.Text (Text)

import qualified Crypto.Hash.MD5 as MD5
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE

import Database.MongoDB.Internal.Util (bitOr, byteStringHex)

import Database.MongoDB.Transport (Transport)
import qualified Database.MongoDB.Transport as Tr

#if MIN_VERSION_base(4,6,0)
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
                                       putMVar, readMVar, mkWeakMVar, isEmptyMVar)
#else
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
                                         putMVar, readMVar, addMVarFinalizer)
#endif

#if !MIN_VERSION_base(4,6,0)
mkWeakMVar :: MVar a -> IO () -> IO ()
mkWeakMVar = addMVarFinalizer
#endif

-- * Pipeline

-- | Thread-safe and pipelined connection
data Pipeline = Pipeline
    { Pipeline -> MVar Transport
vStream :: MVar Transport -- ^ Mutex on handle, so only one thread at a time can write to it
    , Pipeline -> TChan (MVar (Either IOError Response))
responseQueue :: TChan (MVar (Either IOError Response)) -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response.
    , Pipeline -> ThreadId
listenThread :: ThreadId
    , Pipeline -> MVar ()
finished :: MVar ()
    , Pipeline -> ServerData
serverData :: ServerData
    }

data ServerData = ServerData
                { ServerData -> Bool
isMaster            :: Bool
                , ServerData -> Int
minWireVersion      :: Int
                , ServerData -> Int
maxWireVersion      :: Int
                , ServerData -> Int
maxMessageSizeBytes :: Int
                , ServerData -> Int
maxBsonObjectSize   :: Int
                , ServerData -> Int
maxWriteBatchSize   :: Int
                }

-- | @'forkUnmaskedFinally' action and_then@ behaves the same as @'forkFinally' action and_then@, except that @action@ is run completely unmasked, whereas with 'forkFinally', @action@ is run with the same mask as the parent thread.
forkUnmaskedFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkUnmaskedFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkUnmaskedFinally IO a
action Either SomeException a -> IO ()
and_then =
  IO ThreadId -> IO ThreadId
forall (m :: * -> *) a. MonadBaseControl IO m => m a -> m a
mask_ (IO ThreadId -> IO ThreadId) -> IO ThreadId -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask ->
    IO a -> IO (Either SomeException a)
forall (m :: * -> *) e a.
(MonadBaseControl IO m, Exception e) =>
m a -> m (Either e a)
try (IO a -> IO a
forall a. IO a -> IO a
unmask IO a
action) IO (Either SomeException a)
-> (Either SomeException a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either SomeException a -> IO ()
and_then

-- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle.
newPipeline :: ServerData -> Transport -> IO Pipeline
newPipeline :: ServerData -> Transport -> IO Pipeline
newPipeline ServerData
serverData Transport
stream = do
    MVar Transport
vStream <- Transport -> IO (MVar Transport)
forall (m :: * -> *) a. MonadBase IO m => a -> m (MVar a)
newMVar Transport
stream
    TChan (MVar (Either IOError Response))
responseQueue <- STM (TChan (MVar (Either IOError Response)))
-> IO (TChan (MVar (Either IOError Response)))
forall a. STM a -> IO a
atomically STM (TChan (MVar (Either IOError Response)))
forall a. STM (TChan a)
newTChan
    MVar ()
finished <- IO (MVar ())
forall (m :: * -> *) a. MonadBase IO m => m (MVar a)
newEmptyMVar
    let drainReplies :: IO ()
drainReplies = do
          Bool
chanEmpty <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TChan (MVar (Either IOError Response)) -> STM Bool
forall a. TChan a -> STM Bool
isEmptyTChan TChan (MVar (Either IOError Response))
responseQueue
          if Bool
chanEmpty
            then () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            else do
              MVar (Either IOError Response)
var <- STM (MVar (Either IOError Response))
-> IO (MVar (Either IOError Response))
forall a. STM a -> IO a
atomically (STM (MVar (Either IOError Response))
 -> IO (MVar (Either IOError Response)))
-> STM (MVar (Either IOError Response))
-> IO (MVar (Either IOError Response))
forall a b. (a -> b) -> a -> b
$ TChan (MVar (Either IOError Response))
-> STM (MVar (Either IOError Response))
forall a. TChan a -> STM a
readTChan TChan (MVar (Either IOError Response))
responseQueue
              MVar (Either IOError Response) -> Either IOError Response -> IO ()
forall (m :: * -> *) a. MonadBase IO m => MVar a -> a -> m ()
putMVar MVar (Either IOError Response)
var (Either IOError Response -> IO ())
-> Either IOError Response -> IO ()
forall a b. (a -> b) -> a -> b
$ IOError -> Either IOError Response
forall a b. a -> Either a b
Left (IOError -> Either IOError Response)
-> IOError -> Either IOError Response
forall a b. (a -> b) -> a -> b
$ IOErrorType -> String -> Maybe Handle -> Maybe String -> IOError
mkIOError
                                        IOErrorType
doesNotExistErrorType
                                        String
"Handle has been closed"
                                        Maybe Handle
forall a. Maybe a
Nothing
                                        Maybe String
forall a. Maybe a
Nothing
              IO ()
drainReplies

    rec
        let pipe :: Pipeline
pipe = Pipeline :: MVar Transport
-> TChan (MVar (Either IOError Response))
-> ThreadId
-> MVar ()
-> ServerData
-> Pipeline
Pipeline{ThreadId
MVar ()
MVar Transport
TChan (MVar (Either IOError Response))
ServerData
listenThread :: ThreadId
finished :: MVar ()
responseQueue :: TChan (MVar (Either IOError Response))
vStream :: MVar Transport
serverData :: ServerData
serverData :: ServerData
finished :: MVar ()
listenThread :: ThreadId
responseQueue :: TChan (MVar (Either IOError Response))
vStream :: MVar Transport
..}
        ThreadId
listenThread <- IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkUnmaskedFinally (Pipeline -> IO ()
listen Pipeline
pipe) ((Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \Either SomeException ()
_ -> do
                                                              MVar () -> () -> IO ()
forall (m :: * -> *) a. MonadBase IO m => MVar a -> a -> m ()
putMVar MVar ()
finished ()
                                                              IO ()
drainReplies

    Weak (MVar Transport)
_ <- MVar Transport -> IO () -> IO (Weak (MVar Transport))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
MVar a -> m () -> m (Weak (MVar a))
mkWeakMVar MVar Transport
vStream (IO () -> IO (Weak (MVar Transport)))
-> IO () -> IO (Weak (MVar Transport))
forall a b. (a -> b) -> a -> b
$ do
        ThreadId -> IO ()
killThread ThreadId
listenThread
        Transport -> IO ()
Tr.close Transport
stream
    Pipeline -> IO Pipeline
forall (m :: * -> *) a. Monad m => a -> m a
return Pipeline
pipe

isFinished :: Pipeline -> IO Bool
isFinished :: Pipeline -> IO Bool
isFinished Pipeline {MVar ()
finished :: MVar ()
finished :: Pipeline -> MVar ()
finished} = do
  Bool
empty <- MVar () -> IO Bool
forall (m :: * -> *) a. MonadBase IO m => MVar a -> m Bool
isEmptyMVar MVar ()
finished
  Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool -> Bool
not Bool
empty

close :: Pipeline -> IO ()
-- ^ Close pipe and underlying connection
close :: Pipeline -> IO ()
close Pipeline{ThreadId
MVar ()
MVar Transport
TChan (MVar (Either IOError Response))
ServerData
serverData :: ServerData
finished :: MVar ()
listenThread :: ThreadId
responseQueue :: TChan (MVar (Either IOError Response))
vStream :: MVar Transport
serverData :: Pipeline -> ServerData
finished :: Pipeline -> MVar ()
listenThread :: Pipeline -> ThreadId
responseQueue :: Pipeline -> TChan (MVar (Either IOError Response))
vStream :: Pipeline -> MVar Transport
..} = do
    ThreadId -> IO ()
killThread ThreadId
listenThread
    Transport -> IO ()
Tr.close (Transport -> IO ()) -> IO Transport -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MVar Transport -> IO Transport
forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
readMVar MVar Transport
vStream

isClosed :: Pipeline -> IO Bool
isClosed :: Pipeline -> IO Bool
isClosed Pipeline{ThreadId
listenThread :: ThreadId
listenThread :: Pipeline -> ThreadId
listenThread} = do
    ThreadStatus
status <- ThreadId -> IO ThreadStatus
threadStatus ThreadId
listenThread
    Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ case ThreadStatus
status of
        ThreadStatus
ThreadRunning -> Bool
False
        ThreadStatus
ThreadFinished -> Bool
True
        ThreadBlocked BlockReason
_ -> Bool
False
        ThreadStatus
ThreadDied -> Bool
True
--isPipeClosed Pipeline{..} = isClosed =<< readMVar vHandle  -- isClosed hangs while listen loop is waiting on read

listen :: Pipeline -> IO ()
-- ^ Listen for responses and supply them to waiting threads in order
listen :: Pipeline -> IO ()
listen Pipeline{ThreadId
MVar ()
MVar Transport
TChan (MVar (Either IOError Response))
ServerData
serverData :: ServerData
finished :: MVar ()
listenThread :: ThreadId
responseQueue :: TChan (MVar (Either IOError Response))
vStream :: MVar Transport
serverData :: Pipeline -> ServerData
finished :: Pipeline -> MVar ()
listenThread :: Pipeline -> ThreadId
responseQueue :: Pipeline -> TChan (MVar (Either IOError Response))
vStream :: Pipeline -> MVar Transport
..} = do
    Transport
stream <- MVar Transport -> IO Transport
forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
readMVar MVar Transport
vStream
    IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        Either IOError Response
e <- IO Response -> IO (Either IOError Response)
forall (m :: * -> *) e a.
(MonadBaseControl IO m, Exception e) =>
m a -> m (Either e a)
try (IO Response -> IO (Either IOError Response))
-> IO Response -> IO (Either IOError Response)
forall a b. (a -> b) -> a -> b
$ Transport -> IO Response
readMessage Transport
stream
        MVar (Either IOError Response)
var <- STM (MVar (Either IOError Response))
-> IO (MVar (Either IOError Response))
forall a. STM a -> IO a
atomically (STM (MVar (Either IOError Response))
 -> IO (MVar (Either IOError Response)))
-> STM (MVar (Either IOError Response))
-> IO (MVar (Either IOError Response))
forall a b. (a -> b) -> a -> b
$ TChan (MVar (Either IOError Response))
-> STM (MVar (Either IOError Response))
forall a. TChan a -> STM a
readTChan TChan (MVar (Either IOError Response))
responseQueue
        MVar (Either IOError Response) -> Either IOError Response -> IO ()
forall (m :: * -> *) a. MonadBase IO m => MVar a -> a -> m ()
putMVar MVar (Either IOError Response)
var Either IOError Response
e
        case Either IOError Response
e of
            Left IOError
err -> Transport -> IO ()
Tr.close Transport
stream IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IOError -> IO ()
forall a. IOError -> IO a
ioError IOError
err  -- close and stop looping
            Right Response
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

psend :: Pipeline -> Message -> IO ()
-- ^ Send message to destination; the destination must not response (otherwise future 'call's will get these responses instead of their own).
-- Throw IOError and close pipeline if send fails
psend :: Pipeline -> Message -> IO ()
psend p :: Pipeline
p@Pipeline{ThreadId
MVar ()
MVar Transport
TChan (MVar (Either IOError Response))
ServerData
serverData :: ServerData
finished :: MVar ()
listenThread :: ThreadId
responseQueue :: TChan (MVar (Either IOError Response))
vStream :: MVar Transport
serverData :: Pipeline -> ServerData
finished :: Pipeline -> MVar ()
listenThread :: Pipeline -> ThreadId
responseQueue :: Pipeline -> TChan (MVar (Either IOError Response))
vStream :: Pipeline -> MVar Transport
..} !Message
message = MVar Transport -> (Transport -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
MVar a -> (a -> m b) -> m b
withMVar MVar Transport
vStream ((Transport -> Message -> IO ()) -> Message -> Transport -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Transport -> Message -> IO ()
writeMessage Message
message) IO () -> IO () -> IO ()
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`onException` Pipeline -> IO ()
close Pipeline
p

pcall :: Pipeline -> Message -> IO (IO Response)
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
-- Throw IOError and closes pipeline if send fails, likewise for promised response.
pcall :: Pipeline -> Message -> IO (IO Response)
pcall p :: Pipeline
p@Pipeline{ThreadId
MVar ()
MVar Transport
TChan (MVar (Either IOError Response))
ServerData
serverData :: ServerData
finished :: MVar ()
listenThread :: ThreadId
responseQueue :: TChan (MVar (Either IOError Response))
vStream :: MVar Transport
serverData :: Pipeline -> ServerData
finished :: Pipeline -> MVar ()
listenThread :: Pipeline -> ThreadId
responseQueue :: Pipeline -> TChan (MVar (Either IOError Response))
vStream :: Pipeline -> MVar Transport
..} Message
message = do
  Bool
listenerStopped <- Pipeline -> IO Bool
isFinished Pipeline
p
  if Bool
listenerStopped
    then IOError -> IO (IO Response)
forall a. IOError -> IO a
ioError (IOError -> IO (IO Response)) -> IOError -> IO (IO Response)
forall a b. (a -> b) -> a -> b
$ IOErrorType -> String -> Maybe Handle -> Maybe String -> IOError
mkIOError IOErrorType
doesNotExistErrorType String
"Handle has been closed" Maybe Handle
forall a. Maybe a
Nothing Maybe String
forall a. Maybe a
Nothing
    else MVar Transport
-> (Transport -> IO (IO Response)) -> IO (IO Response)
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
MVar a -> (a -> m b) -> m b
withMVar MVar Transport
vStream Transport -> IO (IO Response)
forall (m :: * -> *).
MonadBase IO m =>
Transport -> IO (m Response)
doCall IO (IO Response) -> IO () -> IO (IO Response)
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`onException` Pipeline -> IO ()
close Pipeline
p
  where
    doCall :: Transport -> IO (m Response)
doCall Transport
stream = do
        Transport -> Message -> IO ()
writeMessage Transport
stream Message
message
        MVar (Either IOError Response)
var <- IO (MVar (Either IOError Response))
forall (m :: * -> *) a. MonadBase IO m => m (MVar a)
newEmptyMVar
        IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan (MVar (Either IOError Response))
-> MVar (Either IOError Response) -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (MVar (Either IOError Response))
responseQueue MVar (Either IOError Response)
var
        m Response -> IO (m Response)
forall (m :: * -> *) a. Monad m => a -> m a
return (m Response -> IO (m Response)) -> m Response -> IO (m Response)
forall a b. (a -> b) -> a -> b
$ MVar (Either IOError Response) -> m (Either IOError Response)
forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
readMVar MVar (Either IOError Response)
var m (Either IOError Response)
-> (Either IOError Response -> m Response) -> m Response
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (IOError -> m Response)
-> (Response -> m Response)
-> Either IOError Response
-> m Response
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either IOError -> m Response
forall (m :: * -> *) e a. (MonadBase IO m, Exception e) => e -> m a
throwIO Response -> m Response
forall (m :: * -> *) a. Monad m => a -> m a
return -- return promise

-- * Pipe

type Pipe = Pipeline
-- ^ Thread-safe TCP connection with pipelined requests

newPipe :: ServerData -> Handle -> IO Pipe
-- ^ Create pipe over handle
newPipe :: ServerData -> Handle -> IO Pipeline
newPipe ServerData
sd Handle
handle = Handle -> IO Transport
Tr.fromHandle Handle
handle IO Transport -> (Transport -> IO Pipeline) -> IO Pipeline
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (ServerData -> Transport -> IO Pipeline
newPipeWith ServerData
sd)

newPipeWith :: ServerData -> Transport -> IO Pipe
-- ^ Create pipe over connection
newPipeWith :: ServerData -> Transport -> IO Pipeline
newPipeWith ServerData
sd Transport
conn = ServerData -> Transport -> IO Pipeline
newPipeline ServerData
sd Transport
conn

send :: Pipe -> [Notice] -> IO ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
send :: Pipeline -> [Notice] -> IO ()
send Pipeline
pipe [Notice]
notices = Pipeline -> Message -> IO ()
psend Pipeline
pipe ([Notice]
notices, Maybe (Request, RequestId)
forall a. Maybe a
Nothing)

call :: Pipe -> [Notice] -> Request -> IO (IO Reply)
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
call :: Pipeline -> [Notice] -> Request -> IO (IO Reply)
call Pipeline
pipe [Notice]
notices Request
request = do
    RequestId
requestId <- IO RequestId
forall (m :: * -> *). MonadIO m => m RequestId
genRequestId
    IO Response
promise <- Pipeline -> Message -> IO (IO Response)
pcall Pipeline
pipe ([Notice]
notices, (Request, RequestId) -> Maybe (Request, RequestId)
forall a. a -> Maybe a
Just (Request
request, RequestId
requestId))
    IO Reply -> IO (IO Reply)
forall (m :: * -> *) a. Monad m => a -> m a
return (IO Reply -> IO (IO Reply)) -> IO Reply -> IO (IO Reply)
forall a b. (a -> b) -> a -> b
$ RequestId -> Response -> Reply
forall a p. (Eq a, Show a) => a -> (a, p) -> p
check RequestId
requestId (Response -> Reply) -> IO Response -> IO Reply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO Response
promise
 where
    check :: a -> (a, p) -> p
check a
requestId (a
responseTo, p
reply) = if a
requestId a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
responseTo then p
reply else
        String -> p
forall a. HasCallStack => String -> a
error (String -> p) -> String -> p
forall a b. (a -> b) -> a -> b
$ String
"expected response id (" String -> String -> String
forall a. [a] -> [a] -> [a]
++ a -> String
forall a. Show a => a -> String
show a
responseTo String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
") to match request id (" String -> String -> String
forall a. [a] -> [a] -> [a]
++ a -> String
forall a. Show a => a -> String
show a
requestId String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
")"

-- * Message

type Message = ([Notice], Maybe (Request, RequestId))
-- ^ A write notice(s) with getLastError request, or just query request.
-- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness.

writeMessage :: Transport -> Message -> IO ()
-- ^ Write message to connection
writeMessage :: Transport -> Message -> IO ()
writeMessage Transport
conn ([Notice]
notices, Maybe (Request, RequestId)
mRequest) = do
    [ByteString]
noticeStrings <- [Notice] -> (Notice -> IO ByteString) -> IO [ByteString]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Notice]
notices ((Notice -> IO ByteString) -> IO [ByteString])
-> (Notice -> IO ByteString) -> IO [ByteString]
forall a b. (a -> b) -> a -> b
$ \Notice
n -> do
          RequestId
requestId <- IO RequestId
forall (m :: * -> *). MonadIO m => m RequestId
genRequestId
          let s :: ByteString
s = Put -> ByteString
runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ Notice -> RequestId -> Put
putNotice Notice
n RequestId
requestId
          ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ (ByteString -> ByteString
lenBytes ByteString
s) ByteString -> ByteString -> ByteString
`L.append` ByteString
s

    let requestString :: Maybe ByteString
requestString = do
          (Request
request, RequestId
requestId) <- Maybe (Request, RequestId)
mRequest
          let s :: ByteString
s = Put -> ByteString
runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ Request -> RequestId -> Put
putRequest Request
request RequestId
requestId
          ByteString -> Maybe ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> Maybe ByteString) -> ByteString -> Maybe ByteString
forall a b. (a -> b) -> a -> b
$ (ByteString -> ByteString
lenBytes ByteString
s) ByteString -> ByteString -> ByteString
`L.append` ByteString
s

    Transport -> ByteString -> IO ()
Tr.write Transport
conn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
L.toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
L.concat ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString]
noticeStrings [ByteString] -> [ByteString] -> [ByteString]
forall a. [a] -> [a] -> [a]
++ (Maybe ByteString -> [ByteString]
forall a. Maybe a -> [a]
maybeToList Maybe ByteString
requestString)
    Transport -> IO ()
Tr.flush Transport
conn
 where
    lenBytes :: ByteString -> ByteString
lenBytes ByteString
bytes = RequestId -> ByteString
encodeSize (RequestId -> ByteString)
-> (Int64 -> RequestId) -> Int64 -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> RequestId
forall a. Enum a => Int -> a
toEnum (Int -> RequestId) -> (Int64 -> Int) -> Int64 -> RequestId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int64 -> Int
forall a. Enum a => a -> Int
fromEnum (Int64 -> ByteString) -> Int64 -> ByteString
forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
L.length ByteString
bytes
    encodeSize :: RequestId -> ByteString
encodeSize = Put -> ByteString
runPut (Put -> ByteString)
-> (RequestId -> Put) -> RequestId -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RequestId -> Put
putInt32 (RequestId -> Put) -> (RequestId -> RequestId) -> RequestId -> Put
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (RequestId -> RequestId -> RequestId
forall a. Num a => a -> a -> a
+ RequestId
4)

type Response = (ResponseTo, Reply)
-- ^ Message received from a Mongo server in response to a Request

readMessage :: Transport -> IO Response
-- ^ read response from a connection
readMessage :: Transport -> IO Response
readMessage Transport
conn = IO Response
readResp  where
    readResp :: IO Response
readResp = do
        Int
len <- RequestId -> Int
forall a. Enum a => a -> Int
fromEnum (RequestId -> Int)
-> (ByteString -> RequestId) -> ByteString -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> RequestId
decodeSize (ByteString -> RequestId)
-> (ByteString -> ByteString) -> ByteString -> RequestId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
L.fromStrict (ByteString -> Int) -> IO ByteString -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Transport -> Int -> IO ByteString
Tr.read Transport
conn Int
4
        Get Response -> ByteString -> Response
forall a. Get a -> ByteString -> a
runGet Get Response
getReply (ByteString -> Response)
-> (ByteString -> ByteString) -> ByteString -> Response
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
L.fromStrict (ByteString -> Response) -> IO ByteString -> IO Response
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Transport -> Int -> IO ByteString
Tr.read Transport
conn Int
len
    decodeSize :: ByteString -> RequestId
decodeSize = RequestId -> RequestId -> RequestId
forall a. Num a => a -> a -> a
subtract RequestId
4 (RequestId -> RequestId)
-> (ByteString -> RequestId) -> ByteString -> RequestId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Get RequestId -> ByteString -> RequestId
forall a. Get a -> ByteString -> a
runGet Get RequestId
getInt32

type FullCollection = Text
-- ^ Database name and collection name with period (.) in between. Eg. \"myDb.myCollection\"

-- ** Header

type Opcode = Int32

type RequestId = Int32
-- ^ A fresh request id is generated for every message

type ResponseTo = RequestId

genRequestId :: (MonadIO m) => m RequestId
-- ^ Generate fresh request id
genRequestId :: m RequestId
genRequestId = IO RequestId -> m RequestId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO RequestId -> m RequestId) -> IO RequestId -> m RequestId
forall a b. (a -> b) -> a -> b
$ IORef RequestId
-> (RequestId -> (RequestId, RequestId)) -> IO RequestId
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef RequestId
counter ((RequestId -> (RequestId, RequestId)) -> IO RequestId)
-> (RequestId -> (RequestId, RequestId)) -> IO RequestId
forall a b. (a -> b) -> a -> b
$ \RequestId
n -> (RequestId
n RequestId -> RequestId -> RequestId
forall a. Num a => a -> a -> a
+ RequestId
1, RequestId
n) where
    counter :: IORef RequestId
    counter :: IORef RequestId
counter = IO (IORef RequestId) -> IORef RequestId
forall a. IO a -> a
unsafePerformIO (RequestId -> IO (IORef RequestId)
forall a. a -> IO (IORef a)
newIORef RequestId
0)
    {-# NOINLINE counter #-}

-- *** Binary format

putHeader :: Opcode -> RequestId -> Put
-- ^ Note, does not write message length (first int32), assumes caller will write it
putHeader :: RequestId -> RequestId -> Put
putHeader RequestId
opcode RequestId
requestId = do
    RequestId -> Put
putInt32 RequestId
requestId
    RequestId -> Put
putInt32 RequestId
0
    RequestId -> Put
putInt32 RequestId
opcode

getHeader :: Get (Opcode, ResponseTo)
-- ^ Note, does not read message length (first int32), assumes it was already read
getHeader :: Get (RequestId, RequestId)
getHeader = do
    RequestId
_requestId <- Get RequestId
getInt32
    RequestId
responseTo <- Get RequestId
getInt32
    RequestId
opcode <- Get RequestId
getInt32
    (RequestId, RequestId) -> Get (RequestId, RequestId)
forall (m :: * -> *) a. Monad m => a -> m a
return (RequestId
opcode, RequestId
responseTo)

-- ** Notice

-- | A notice is a message that is sent with no reply
data Notice =
      Insert {
        Notice -> FullCollection
iFullCollection :: FullCollection,
        Notice -> [InsertOption]
iOptions :: [InsertOption],
        Notice -> [Document]
iDocuments :: [Document]}
    | Update {
        Notice -> FullCollection
uFullCollection :: FullCollection,
        Notice -> [UpdateOption]
uOptions :: [UpdateOption],
        Notice -> Document
uSelector :: Document,
        Notice -> Document
uUpdater :: Document}
    | Delete {
        Notice -> FullCollection
dFullCollection :: FullCollection,
        Notice -> [DeleteOption]
dOptions :: [DeleteOption],
        Notice -> Document
dSelector :: Document}
    | KillCursors {
        Notice -> [Int64]
kCursorIds :: [CursorId]}
    deriving (Int -> Notice -> String -> String
[Notice] -> String -> String
Notice -> String
(Int -> Notice -> String -> String)
-> (Notice -> String)
-> ([Notice] -> String -> String)
-> Show Notice
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [Notice] -> String -> String
$cshowList :: [Notice] -> String -> String
show :: Notice -> String
$cshow :: Notice -> String
showsPrec :: Int -> Notice -> String -> String
$cshowsPrec :: Int -> Notice -> String -> String
Show, Notice -> Notice -> Bool
(Notice -> Notice -> Bool)
-> (Notice -> Notice -> Bool) -> Eq Notice
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Notice -> Notice -> Bool
$c/= :: Notice -> Notice -> Bool
== :: Notice -> Notice -> Bool
$c== :: Notice -> Notice -> Bool
Eq)

data InsertOption = KeepGoing  -- ^ If set, the database will not stop processing a bulk insert if one fails (eg due to duplicate IDs). This makes bulk insert behave similarly to a series of single inserts, except lastError will be set if any insert fails, not just the last one. (new in 1.9.1)
    deriving (Int -> InsertOption -> String -> String
[InsertOption] -> String -> String
InsertOption -> String
(Int -> InsertOption -> String -> String)
-> (InsertOption -> String)
-> ([InsertOption] -> String -> String)
-> Show InsertOption
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [InsertOption] -> String -> String
$cshowList :: [InsertOption] -> String -> String
show :: InsertOption -> String
$cshow :: InsertOption -> String
showsPrec :: Int -> InsertOption -> String -> String
$cshowsPrec :: Int -> InsertOption -> String -> String
Show, InsertOption -> InsertOption -> Bool
(InsertOption -> InsertOption -> Bool)
-> (InsertOption -> InsertOption -> Bool) -> Eq InsertOption
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: InsertOption -> InsertOption -> Bool
$c/= :: InsertOption -> InsertOption -> Bool
== :: InsertOption -> InsertOption -> Bool
$c== :: InsertOption -> InsertOption -> Bool
Eq)

data UpdateOption =
      Upsert  -- ^ If set, the database will insert the supplied object into the collection if no matching document is found
    | MultiUpdate  -- ^ If set, the database will update all matching objects in the collection. Otherwise only updates first matching doc
    deriving (Int -> UpdateOption -> String -> String
[UpdateOption] -> String -> String
UpdateOption -> String
(Int -> UpdateOption -> String -> String)
-> (UpdateOption -> String)
-> ([UpdateOption] -> String -> String)
-> Show UpdateOption
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [UpdateOption] -> String -> String
$cshowList :: [UpdateOption] -> String -> String
show :: UpdateOption -> String
$cshow :: UpdateOption -> String
showsPrec :: Int -> UpdateOption -> String -> String
$cshowsPrec :: Int -> UpdateOption -> String -> String
Show, UpdateOption -> UpdateOption -> Bool
(UpdateOption -> UpdateOption -> Bool)
-> (UpdateOption -> UpdateOption -> Bool) -> Eq UpdateOption
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UpdateOption -> UpdateOption -> Bool
$c/= :: UpdateOption -> UpdateOption -> Bool
== :: UpdateOption -> UpdateOption -> Bool
$c== :: UpdateOption -> UpdateOption -> Bool
Eq)

data DeleteOption = SingleRemove  -- ^ If set, the database will remove only the first matching document in the collection. Otherwise all matching documents will be removed
    deriving (Int -> DeleteOption -> String -> String
[DeleteOption] -> String -> String
DeleteOption -> String
(Int -> DeleteOption -> String -> String)
-> (DeleteOption -> String)
-> ([DeleteOption] -> String -> String)
-> Show DeleteOption
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [DeleteOption] -> String -> String
$cshowList :: [DeleteOption] -> String -> String
show :: DeleteOption -> String
$cshow :: DeleteOption -> String
showsPrec :: Int -> DeleteOption -> String -> String
$cshowsPrec :: Int -> DeleteOption -> String -> String
Show, DeleteOption -> DeleteOption -> Bool
(DeleteOption -> DeleteOption -> Bool)
-> (DeleteOption -> DeleteOption -> Bool) -> Eq DeleteOption
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: DeleteOption -> DeleteOption -> Bool
$c/= :: DeleteOption -> DeleteOption -> Bool
== :: DeleteOption -> DeleteOption -> Bool
$c== :: DeleteOption -> DeleteOption -> Bool
Eq)

type CursorId = Int64

-- *** Binary format

nOpcode :: Notice -> Opcode
nOpcode :: Notice -> RequestId
nOpcode Update{} = RequestId
2001
nOpcode Insert{} = RequestId
2002
nOpcode Delete{} = RequestId
2006
nOpcode KillCursors{} = RequestId
2007

putNotice :: Notice -> RequestId -> Put
putNotice :: Notice -> RequestId -> Put
putNotice Notice
notice RequestId
requestId = do
    RequestId -> RequestId -> Put
putHeader (Notice -> RequestId
nOpcode Notice
notice) RequestId
requestId
    case Notice
notice of
        Insert{[Document]
[InsertOption]
FullCollection
iDocuments :: [Document]
iOptions :: [InsertOption]
iFullCollection :: FullCollection
iDocuments :: Notice -> [Document]
iOptions :: Notice -> [InsertOption]
iFullCollection :: Notice -> FullCollection
..} -> do
            RequestId -> Put
putInt32 ([InsertOption] -> RequestId
iBits [InsertOption]
iOptions)
            FullCollection -> Put
putCString FullCollection
iFullCollection
            (Document -> Put) -> [Document] -> Put
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Document -> Put
putDocument [Document]
iDocuments
        Update{Document
[UpdateOption]
FullCollection
uUpdater :: Document
uSelector :: Document
uOptions :: [UpdateOption]
uFullCollection :: FullCollection
uUpdater :: Notice -> Document
uSelector :: Notice -> Document
uOptions :: Notice -> [UpdateOption]
uFullCollection :: Notice -> FullCollection
..} -> do
            RequestId -> Put
putInt32 RequestId
0
            FullCollection -> Put
putCString FullCollection
uFullCollection
            RequestId -> Put
putInt32 ([UpdateOption] -> RequestId
uBits [UpdateOption]
uOptions)
            Document -> Put
putDocument Document
uSelector
            Document -> Put
putDocument Document
uUpdater
        Delete{Document
[DeleteOption]
FullCollection
dSelector :: Document
dOptions :: [DeleteOption]
dFullCollection :: FullCollection
dSelector :: Notice -> Document
dOptions :: Notice -> [DeleteOption]
dFullCollection :: Notice -> FullCollection
..} -> do
            RequestId -> Put
putInt32 RequestId
0
            FullCollection -> Put
putCString FullCollection
dFullCollection
            RequestId -> Put
putInt32 ([DeleteOption] -> RequestId
dBits [DeleteOption]
dOptions)
            Document -> Put
putDocument Document
dSelector
        KillCursors{[Int64]
kCursorIds :: [Int64]
kCursorIds :: Notice -> [Int64]
..} -> do
            RequestId -> Put
putInt32 RequestId
0
            RequestId -> Put
putInt32 (RequestId -> Put) -> RequestId -> Put
forall a b. (a -> b) -> a -> b
$ Int -> RequestId
forall a. Enum a => Int -> a
toEnum ([Int64] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int64]
kCursorIds)
            (Int64 -> Put) -> [Int64] -> Put
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Int64 -> Put
putInt64 [Int64]
kCursorIds

iBit :: InsertOption -> Int32
iBit :: InsertOption -> RequestId
iBit InsertOption
KeepGoing = Int -> RequestId
forall a. Bits a => Int -> a
bit Int
0

iBits :: [InsertOption] -> Int32
iBits :: [InsertOption] -> RequestId
iBits = [RequestId] -> RequestId
forall a. (Num a, Bits a) => [a] -> a
bitOr ([RequestId] -> RequestId)
-> ([InsertOption] -> [RequestId]) -> [InsertOption] -> RequestId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (InsertOption -> RequestId) -> [InsertOption] -> [RequestId]
forall a b. (a -> b) -> [a] -> [b]
map InsertOption -> RequestId
iBit

uBit :: UpdateOption -> Int32
uBit :: UpdateOption -> RequestId
uBit UpdateOption
Upsert = Int -> RequestId
forall a. Bits a => Int -> a
bit Int
0
uBit UpdateOption
MultiUpdate = Int -> RequestId
forall a. Bits a => Int -> a
bit Int
1

uBits :: [UpdateOption] -> Int32
uBits :: [UpdateOption] -> RequestId
uBits = [RequestId] -> RequestId
forall a. (Num a, Bits a) => [a] -> a
bitOr ([RequestId] -> RequestId)
-> ([UpdateOption] -> [RequestId]) -> [UpdateOption] -> RequestId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (UpdateOption -> RequestId) -> [UpdateOption] -> [RequestId]
forall a b. (a -> b) -> [a] -> [b]
map UpdateOption -> RequestId
uBit

dBit :: DeleteOption -> Int32
dBit :: DeleteOption -> RequestId
dBit DeleteOption
SingleRemove = Int -> RequestId
forall a. Bits a => Int -> a
bit Int
0

dBits :: [DeleteOption] -> Int32
dBits :: [DeleteOption] -> RequestId
dBits = [RequestId] -> RequestId
forall a. (Num a, Bits a) => [a] -> a
bitOr ([RequestId] -> RequestId)
-> ([DeleteOption] -> [RequestId]) -> [DeleteOption] -> RequestId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (DeleteOption -> RequestId) -> [DeleteOption] -> [RequestId]
forall a b. (a -> b) -> [a] -> [b]
map DeleteOption -> RequestId
dBit

-- ** Request

-- | A request is a message that is sent with a 'Reply' expected in return
data Request =
      Query {
        Request -> [QueryOption]
qOptions :: [QueryOption],
        Request -> FullCollection
qFullCollection :: FullCollection,
        Request -> RequestId
qSkip :: Int32,  -- ^ Number of initial matching documents to skip
        Request -> RequestId
qBatchSize :: Int32,  -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size.
        Request -> Document
qSelector :: Document,  -- ^ @[]@ = return all documents in collection
        Request -> Document
qProjector :: Document  -- ^ @[]@ = return whole document
    } | GetMore {
        Request -> FullCollection
gFullCollection :: FullCollection,
        Request -> RequestId
gBatchSize :: Int32,
        Request -> Int64
gCursorId :: CursorId}
    deriving (Int -> Request -> String -> String
[Request] -> String -> String
Request -> String
(Int -> Request -> String -> String)
-> (Request -> String)
-> ([Request] -> String -> String)
-> Show Request
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [Request] -> String -> String
$cshowList :: [Request] -> String -> String
show :: Request -> String
$cshow :: Request -> String
showsPrec :: Int -> Request -> String -> String
$cshowsPrec :: Int -> Request -> String -> String
Show, Request -> Request -> Bool
(Request -> Request -> Bool)
-> (Request -> Request -> Bool) -> Eq Request
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Request -> Request -> Bool
$c/= :: Request -> Request -> Bool
== :: Request -> Request -> Bool
$c== :: Request -> Request -> Bool
Eq)

data QueryOption =
      TailableCursor  -- ^ Tailable means cursor is not closed when the last data is retrieved. Rather, the cursor marks the final object's position. You can resume using the cursor later, from where it was located, if more data were received. Like any "latent cursor", the cursor may become invalid at some point – for example if the final object it references were deleted. Thus, you should be prepared to requery on @CursorNotFound@ exception.
    | SlaveOK  -- ^ Allow query of replica slave. Normally these return an error except for namespace "local".
    | NoCursorTimeout  -- ^ The server normally times out idle cursors after 10 minutes to prevent a memory leak in case a client forgets to close a cursor. Set this option to allow a cursor to live forever until it is closed.
    | AwaitData  -- ^ Use with TailableCursor. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal.

--  | Exhaust  -- ^ Stream the data down full blast in multiple "more" packages, on the assumption that the client will fully read all data queried. Faster when you are pulling a lot of data and know you want to pull it all down. Note: the client is not allowed to not read all the data unless it closes the connection.
-- Exhaust commented out because not compatible with current `Pipeline` implementation

    | Partial  -- ^ Get partial results from a /mongos/ if some shards are down, instead of throwing an error.
    deriving (Int -> QueryOption -> String -> String
[QueryOption] -> String -> String
QueryOption -> String
(Int -> QueryOption -> String -> String)
-> (QueryOption -> String)
-> ([QueryOption] -> String -> String)
-> Show QueryOption
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [QueryOption] -> String -> String
$cshowList :: [QueryOption] -> String -> String
show :: QueryOption -> String
$cshow :: QueryOption -> String
showsPrec :: Int -> QueryOption -> String -> String
$cshowsPrec :: Int -> QueryOption -> String -> String
Show, QueryOption -> QueryOption -> Bool
(QueryOption -> QueryOption -> Bool)
-> (QueryOption -> QueryOption -> Bool) -> Eq QueryOption
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: QueryOption -> QueryOption -> Bool
$c/= :: QueryOption -> QueryOption -> Bool
== :: QueryOption -> QueryOption -> Bool
$c== :: QueryOption -> QueryOption -> Bool
Eq)

-- *** Binary format

qOpcode :: Request -> Opcode
qOpcode :: Request -> RequestId
qOpcode Query{} = RequestId
2004
qOpcode GetMore{} = RequestId
2005

putRequest :: Request -> RequestId -> Put
putRequest :: Request -> RequestId -> Put
putRequest Request
request RequestId
requestId = do
    RequestId -> RequestId -> Put
putHeader (Request -> RequestId
qOpcode Request
request) RequestId
requestId
    case Request
request of
        Query{RequestId
Document
[QueryOption]
FullCollection
qProjector :: Document
qSelector :: Document
qBatchSize :: RequestId
qSkip :: RequestId
qFullCollection :: FullCollection
qOptions :: [QueryOption]
qProjector :: Request -> Document
qSelector :: Request -> Document
qBatchSize :: Request -> RequestId
qSkip :: Request -> RequestId
qFullCollection :: Request -> FullCollection
qOptions :: Request -> [QueryOption]
..} -> do
            RequestId -> Put
putInt32 ([QueryOption] -> RequestId
qBits [QueryOption]
qOptions)
            FullCollection -> Put
putCString FullCollection
qFullCollection
            RequestId -> Put
putInt32 RequestId
qSkip
            RequestId -> Put
putInt32 RequestId
qBatchSize
            Document -> Put
putDocument Document
qSelector
            Bool -> Put -> Put
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Document -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null Document
qProjector) (Document -> Put
putDocument Document
qProjector)
        GetMore{RequestId
Int64
FullCollection
gCursorId :: Int64
gBatchSize :: RequestId
gFullCollection :: FullCollection
gCursorId :: Request -> Int64
gBatchSize :: Request -> RequestId
gFullCollection :: Request -> FullCollection
..} -> do
            RequestId -> Put
putInt32 RequestId
0
            FullCollection -> Put
putCString FullCollection
gFullCollection
            RequestId -> Put
putInt32 RequestId
gBatchSize
            Int64 -> Put
putInt64 Int64
gCursorId

qBit :: QueryOption -> Int32
qBit :: QueryOption -> RequestId
qBit QueryOption
TailableCursor = Int -> RequestId
forall a. Bits a => Int -> a
bit Int
1
qBit QueryOption
SlaveOK = Int -> RequestId
forall a. Bits a => Int -> a
bit Int
2
qBit QueryOption
NoCursorTimeout = Int -> RequestId
forall a. Bits a => Int -> a
bit Int
4
qBit QueryOption
AwaitData = Int -> RequestId
forall a. Bits a => Int -> a
bit Int
5
--qBit Exhaust = bit 6
qBit QueryOption
Partial = Int -> RequestId
forall a. Bits a => Int -> a
bit Int
7

qBits :: [QueryOption] -> Int32
qBits :: [QueryOption] -> RequestId
qBits = [RequestId] -> RequestId
forall a. (Num a, Bits a) => [a] -> a
bitOr ([RequestId] -> RequestId)
-> ([QueryOption] -> [RequestId]) -> [QueryOption] -> RequestId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (QueryOption -> RequestId) -> [QueryOption] -> [RequestId]
forall a b. (a -> b) -> [a] -> [b]
map QueryOption -> RequestId
qBit

-- ** Reply

-- | A reply is a message received in response to a 'Request'
data Reply = Reply {
    Reply -> [ResponseFlag]
rResponseFlags :: [ResponseFlag],
    Reply -> Int64
rCursorId :: CursorId,  -- ^ 0 = cursor finished
    Reply -> RequestId
rStartingFrom :: Int32,
    Reply -> [Document]
rDocuments :: [Document]
    } deriving (Int -> Reply -> String -> String
[Reply] -> String -> String
Reply -> String
(Int -> Reply -> String -> String)
-> (Reply -> String) -> ([Reply] -> String -> String) -> Show Reply
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [Reply] -> String -> String
$cshowList :: [Reply] -> String -> String
show :: Reply -> String
$cshow :: Reply -> String
showsPrec :: Int -> Reply -> String -> String
$cshowsPrec :: Int -> Reply -> String -> String
Show, Reply -> Reply -> Bool
(Reply -> Reply -> Bool) -> (Reply -> Reply -> Bool) -> Eq Reply
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Reply -> Reply -> Bool
$c/= :: Reply -> Reply -> Bool
== :: Reply -> Reply -> Bool
$c== :: Reply -> Reply -> Bool
Eq)

data ResponseFlag =
      CursorNotFound  -- ^ Set when getMore is called but the cursor id is not valid at the server. Returned with zero results.
    | QueryError  -- ^ Query error. Returned with one document containing an "$err" field holding the error message.
    | AwaitCapable  -- ^ For backward compatability: Set when the server supports the AwaitData query option. if it doesn't, a replica slave client should sleep a little between getMore's
    deriving (Int -> ResponseFlag -> String -> String
[ResponseFlag] -> String -> String
ResponseFlag -> String
(Int -> ResponseFlag -> String -> String)
-> (ResponseFlag -> String)
-> ([ResponseFlag] -> String -> String)
-> Show ResponseFlag
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [ResponseFlag] -> String -> String
$cshowList :: [ResponseFlag] -> String -> String
show :: ResponseFlag -> String
$cshow :: ResponseFlag -> String
showsPrec :: Int -> ResponseFlag -> String -> String
$cshowsPrec :: Int -> ResponseFlag -> String -> String
Show, ResponseFlag -> ResponseFlag -> Bool
(ResponseFlag -> ResponseFlag -> Bool)
-> (ResponseFlag -> ResponseFlag -> Bool) -> Eq ResponseFlag
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ResponseFlag -> ResponseFlag -> Bool
$c/= :: ResponseFlag -> ResponseFlag -> Bool
== :: ResponseFlag -> ResponseFlag -> Bool
$c== :: ResponseFlag -> ResponseFlag -> Bool
Eq, Int -> ResponseFlag
ResponseFlag -> Int
ResponseFlag -> [ResponseFlag]
ResponseFlag -> ResponseFlag
ResponseFlag -> ResponseFlag -> [ResponseFlag]
ResponseFlag -> ResponseFlag -> ResponseFlag -> [ResponseFlag]
(ResponseFlag -> ResponseFlag)
-> (ResponseFlag -> ResponseFlag)
-> (Int -> ResponseFlag)
-> (ResponseFlag -> Int)
-> (ResponseFlag -> [ResponseFlag])
-> (ResponseFlag -> ResponseFlag -> [ResponseFlag])
-> (ResponseFlag -> ResponseFlag -> [ResponseFlag])
-> (ResponseFlag -> ResponseFlag -> ResponseFlag -> [ResponseFlag])
-> Enum ResponseFlag
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
enumFromThenTo :: ResponseFlag -> ResponseFlag -> ResponseFlag -> [ResponseFlag]
$cenumFromThenTo :: ResponseFlag -> ResponseFlag -> ResponseFlag -> [ResponseFlag]
enumFromTo :: ResponseFlag -> ResponseFlag -> [ResponseFlag]
$cenumFromTo :: ResponseFlag -> ResponseFlag -> [ResponseFlag]
enumFromThen :: ResponseFlag -> ResponseFlag -> [ResponseFlag]
$cenumFromThen :: ResponseFlag -> ResponseFlag -> [ResponseFlag]
enumFrom :: ResponseFlag -> [ResponseFlag]
$cenumFrom :: ResponseFlag -> [ResponseFlag]
fromEnum :: ResponseFlag -> Int
$cfromEnum :: ResponseFlag -> Int
toEnum :: Int -> ResponseFlag
$ctoEnum :: Int -> ResponseFlag
pred :: ResponseFlag -> ResponseFlag
$cpred :: ResponseFlag -> ResponseFlag
succ :: ResponseFlag -> ResponseFlag
$csucc :: ResponseFlag -> ResponseFlag
Enum)

-- * Binary format

replyOpcode :: Opcode
replyOpcode :: RequestId
replyOpcode = RequestId
1

getReply :: Get (ResponseTo, Reply)
getReply :: Get Response
getReply = do
    (RequestId
opcode, RequestId
responseTo) <- Get (RequestId, RequestId)
getHeader
    Bool -> Get () -> Get ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (RequestId
opcode RequestId -> RequestId -> Bool
forall a. Eq a => a -> a -> Bool
== RequestId
replyOpcode) (Get () -> Get ()) -> Get () -> Get ()
forall a b. (a -> b) -> a -> b
$ String -> Get ()
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Get ()) -> String -> Get ()
forall a b. (a -> b) -> a -> b
$ String
"expected reply opcode (1) but got " String -> String -> String
forall a. [a] -> [a] -> [a]
++ RequestId -> String
forall a. Show a => a -> String
show RequestId
opcode
    [ResponseFlag]
rResponseFlags <-  RequestId -> [ResponseFlag]
rFlags (RequestId -> [ResponseFlag])
-> Get RequestId -> Get [ResponseFlag]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get RequestId
getInt32
    Int64
rCursorId <- Get Int64
getInt64
    RequestId
rStartingFrom <- Get RequestId
getInt32
    Int
numDocs <- RequestId -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (RequestId -> Int) -> Get RequestId -> Get Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get RequestId
getInt32
    [Document]
rDocuments <- Int -> Get Document -> Get [Document]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numDocs Get Document
getDocument
    Response -> Get Response
forall (m :: * -> *) a. Monad m => a -> m a
return (RequestId
responseTo, Reply :: [ResponseFlag] -> Int64 -> RequestId -> [Document] -> Reply
Reply{RequestId
Int64
[Document]
[ResponseFlag]
rDocuments :: [Document]
rStartingFrom :: RequestId
rCursorId :: Int64
rResponseFlags :: [ResponseFlag]
rDocuments :: [Document]
rStartingFrom :: RequestId
rCursorId :: Int64
rResponseFlags :: [ResponseFlag]
..})

rFlags :: Int32 -> [ResponseFlag]
rFlags :: RequestId -> [ResponseFlag]
rFlags RequestId
bits = (ResponseFlag -> Bool) -> [ResponseFlag] -> [ResponseFlag]
forall a. (a -> Bool) -> [a] -> [a]
filter (RequestId -> Int -> Bool
forall a. Bits a => a -> Int -> Bool
testBit RequestId
bits (Int -> Bool) -> (ResponseFlag -> Int) -> ResponseFlag -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResponseFlag -> Int
rBit) [ResponseFlag
CursorNotFound ..]

rBit :: ResponseFlag -> Int
rBit :: ResponseFlag -> Int
rBit ResponseFlag
CursorNotFound = Int
0
rBit ResponseFlag
QueryError = Int
1
rBit ResponseFlag
AwaitCapable = Int
3

-- * Authentication

type Username = Text
type Password = Text
type Nonce = Text

pwHash :: Username -> Password -> Text
pwHash :: FullCollection -> FullCollection -> FullCollection
pwHash FullCollection
u FullCollection
p = String -> FullCollection
T.pack (String -> FullCollection)
-> (FullCollection -> String) -> FullCollection -> FullCollection
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> String
byteStringHex (ByteString -> String)
-> (FullCollection -> ByteString) -> FullCollection -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
MD5.hash (ByteString -> ByteString)
-> (FullCollection -> ByteString) -> FullCollection -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FullCollection -> ByteString
TE.encodeUtf8 (FullCollection -> FullCollection)
-> FullCollection -> FullCollection
forall a b. (a -> b) -> a -> b
$ FullCollection
u FullCollection -> FullCollection -> FullCollection
`T.append` FullCollection
":mongo:" FullCollection -> FullCollection -> FullCollection
`T.append` FullCollection
p

pwKey :: Nonce -> Username -> Password -> Text
pwKey :: FullCollection
-> FullCollection -> FullCollection -> FullCollection
pwKey FullCollection
n FullCollection
u FullCollection
p = String -> FullCollection
T.pack (String -> FullCollection)
-> (FullCollection -> String) -> FullCollection -> FullCollection
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> String
byteStringHex (ByteString -> String)
-> (FullCollection -> ByteString) -> FullCollection -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
MD5.hash (ByteString -> ByteString)
-> (FullCollection -> ByteString) -> FullCollection -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FullCollection -> ByteString
TE.encodeUtf8 (FullCollection -> ByteString)
-> (FullCollection -> FullCollection)
-> FullCollection
-> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FullCollection -> FullCollection -> FullCollection
T.append FullCollection
n (FullCollection -> FullCollection)
-> (FullCollection -> FullCollection)
-> FullCollection
-> FullCollection
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FullCollection -> FullCollection -> FullCollection
T.append FullCollection
u (FullCollection -> FullCollection)
-> FullCollection -> FullCollection
forall a b. (a -> b) -> a -> b
$ FullCollection -> FullCollection -> FullCollection
pwHash FullCollection
u FullCollection
p


{- Authors: Tony Hannan <tony@10gen.com>
   Copyright 2011 10gen Inc.
   Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -}