-- | Low-level messaging between this client and the MongoDB server, see Mongo
-- Wire Protocol (<http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol>).
--
-- This module is not intended for direct use. Use the high-level interface at
-- "Database.MongoDB.Query" and "Database.MongoDB.Connection" instead.

{-# LANGUAGE RecordWildCards, OverloadedStrings #-}
{-# LANGUAGE CPP, FlexibleContexts #-}
{-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-}
{-# LANGUAGE BangPatterns #-}

{-# LANGUAGE NamedFieldPuns, ScopedTypeVariables #-}

#if (__GLASGOW_HASKELL__ >= 706)
{-# LANGUAGE RecursiveDo #-}
#else
{-# LANGUAGE DoRec #-}
#endif

module Database.MongoDB.Internal.Protocol (
    FullCollection,
    -- * Pipe
    Pipe,  newPipe, newPipeWith, send, sendOpMsg, call, callOpMsg,
    -- ** Notice
    Notice(..), InsertOption(..), UpdateOption(..), DeleteOption(..), CursorId,
    -- ** Request
    Request(..), QueryOption(..), Cmd (..), KillC(..),
    -- ** Reply
    Reply(..), ResponseFlag(..), FlagBit(..),
    -- * Authentication
    Username, Password, Nonce, pwHash, pwKey,
    isClosed, close, ServerData(..), Pipeline(..), putOpMsg,
    bitOpMsg
) where

#if !MIN_VERSION_base(4,8,0)
import Control.Applicative ((<$>))
#endif
import Control.Monad ( forM, replicateM, unless, forever )
import Data.Binary.Get (Get, runGet, getInt8)
import Data.Binary.Put (Put, runPut, putInt8)
import Data.Bits (bit, testBit, zeroBits)
import Data.Int (Int32, Int64)
import Data.IORef (IORef, newIORef, atomicModifyIORef)
import System.IO (Handle)
import System.IO.Error (doesNotExistErrorType, mkIOError)
import System.IO.Unsafe (unsafePerformIO)
import Data.Maybe (maybeToList, fromJust)
import GHC.Conc (ThreadStatus(..), threadStatus)
import Control.Monad.STM (atomically)
import Control.Concurrent (ThreadId, killThread, forkIOWithUnmask)
import Control.Concurrent.STM.TChan (TChan, newTChan, readTChan, writeTChan, isEmptyTChan)

import Control.Exception.Lifted (SomeException, mask_, onException, throwIO, try)

import qualified Data.ByteString.Lazy as L

import Control.Monad.Trans (MonadIO, liftIO)
import Data.Bson (Document, (=:), merge, cast, valueAt, look)
import Data.Bson.Binary (getDocument, putDocument, getInt32, putInt32, getInt64,
                         putInt64, putCString)
import Data.Text (Text)

import qualified Crypto.Hash.MD5 as MD5
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE

import Database.MongoDB.Internal.Util (bitOr, byteStringHex)

import Database.MongoDB.Transport (Transport)
import qualified Database.MongoDB.Transport as Tr


#if MIN_VERSION_base(4,6,0)
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
                                       putMVar, readMVar, mkWeakMVar, isEmptyMVar)
import GHC.List (foldl1')
import Conduit (repeatWhileMC, (.|), runConduit, foldlC)
#else
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
                                         putMVar, readMVar, addMVarFinalizer)
#endif

#if !MIN_VERSION_base(4,6,0)
mkWeakMVar :: MVar a -> IO () -> IO ()
mkWeakMVar = addMVarFinalizer
#endif


-- * Pipeline

-- | Thread-safe and pipelined connection
data Pipeline = Pipeline
    { Pipeline -> MVar Transport
vStream :: MVar Transport -- ^ Mutex on handle, so only one thread at a time can write to it
    , Pipeline -> TChan (MVar (Either IOError Response))
responseQueue :: TChan (MVar (Either IOError Response)) -- ^ Queue of threads waiting for responses. Every time a response arrives we pop the next thread and give it the response.
    , Pipeline -> ThreadId
listenThread :: ThreadId
    , Pipeline -> MVar ()
finished :: MVar ()
    , Pipeline -> ServerData
serverData :: ServerData
    }

data ServerData = ServerData
                { ServerData -> Bool
isMaster            :: Bool
                , ServerData -> Int
minWireVersion      :: Int
                , ServerData -> Int
maxWireVersion      :: Int
                , ServerData -> Int
maxMessageSizeBytes :: Int
                , ServerData -> Int
maxBsonObjectSize   :: Int
                , ServerData -> Int
maxWriteBatchSize   :: Int
                }
                deriving Int -> ServerData -> ShowS
[ServerData] -> ShowS
ServerData -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [ServerData] -> ShowS
$cshowList :: [ServerData] -> ShowS
show :: ServerData -> [Char]
$cshow :: ServerData -> [Char]
showsPrec :: Int -> ServerData -> ShowS
$cshowsPrec :: Int -> ServerData -> ShowS
Show

-- | @'forkUnmaskedFinally' action and_then@ behaves the same as @'forkFinally' action and_then@, except that @action@ is run completely unmasked, whereas with 'forkFinally', @action@ is run with the same mask as the parent thread.
forkUnmaskedFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkUnmaskedFinally :: forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkUnmaskedFinally IO a
action Either SomeException a -> IO ()
and_then =
  forall (m :: * -> *) a. MonadBaseControl IO m => m a -> m a
mask_ forall a b. (a -> b) -> a -> b
$ ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask ->
    forall (m :: * -> *) e a.
(MonadBaseControl IO m, Exception e) =>
m a -> m (Either e a)
try (forall a. IO a -> IO a
unmask IO a
action) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either SomeException a -> IO ()
and_then

-- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle.
newPipeline :: ServerData -> Transport -> IO Pipeline
newPipeline :: ServerData -> Transport -> IO Pipeline
newPipeline ServerData
serverData Transport
stream = do
    MVar Transport
vStream <- forall (m :: * -> *) a. MonadBase IO m => a -> m (MVar a)
newMVar Transport
stream
    TChan (MVar (Either IOError Response))
responseQueue <- forall a. STM a -> IO a
atomically forall a. STM (TChan a)
newTChan
    MVar ()
finished <- forall (m :: * -> *) a. MonadBase IO m => m (MVar a)
newEmptyMVar
    let drainReplies :: IO ()
drainReplies = do
          Bool
chanEmpty <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> STM Bool
isEmptyTChan TChan (MVar (Either IOError Response))
responseQueue
          if Bool
chanEmpty
            then forall (m :: * -> *) a. Monad m => a -> m a
return ()
            else do
              MVar (Either IOError Response)
var <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> STM a
readTChan TChan (MVar (Either IOError Response))
responseQueue
              forall (m :: * -> *) a. MonadBase IO m => MVar a -> a -> m ()
putMVar MVar (Either IOError Response)
var forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ IOErrorType -> [Char] -> Maybe Handle -> Maybe [Char] -> IOError
mkIOError
                                        IOErrorType
doesNotExistErrorType
                                        [Char]
"Handle has been closed"
                                        forall a. Maybe a
Nothing
                                        forall a. Maybe a
Nothing
              IO ()
drainReplies

    rec
        let pipe :: Pipeline
pipe = Pipeline{ThreadId
MVar ()
MVar Transport
TChan (MVar (Either IOError Response))
ServerData
listenThread :: ThreadId
finished :: MVar ()
responseQueue :: TChan (MVar (Either IOError Response))
vStream :: MVar Transport
serverData :: ServerData
serverData :: ServerData
finished :: MVar ()
listenThread :: ThreadId
responseQueue :: TChan (MVar (Either IOError Response))
vStream :: MVar Transport
..}
        ThreadId
listenThread <- forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkUnmaskedFinally (Pipeline -> IO ()
listen Pipeline
pipe) forall a b. (a -> b) -> a -> b
$ \Either SomeException ()
_ -> do
                                                              forall (m :: * -> *) a. MonadBase IO m => MVar a -> a -> m ()
putMVar MVar ()
finished ()
                                                              IO ()
drainReplies

    Weak (MVar Transport)
_ <- forall (m :: * -> *) a.
MonadBaseControl IO m =>
MVar a -> m () -> m (Weak (MVar a))
mkWeakMVar MVar Transport
vStream forall a b. (a -> b) -> a -> b
$ do
        ThreadId -> IO ()
killThread ThreadId
listenThread
        Transport -> IO ()
Tr.close Transport
stream
    forall (m :: * -> *) a. Monad m => a -> m a
return Pipeline
pipe

isFinished :: Pipeline -> IO Bool
isFinished :: Pipeline -> IO Bool
isFinished Pipeline {MVar ()
finished :: MVar ()
finished :: Pipeline -> MVar ()
finished} = do
  Bool
empty <- forall (m :: * -> *) a. MonadBase IO m => MVar a -> m Bool
isEmptyMVar MVar ()
finished
  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool -> Bool
not Bool
empty

close :: Pipeline -> IO ()
-- ^ Close pipe and underlying connection
close :: Pipeline -> IO ()
close Pipeline{ThreadId
MVar ()
MVar Transport
TChan (MVar (Either IOError Response))
ServerData
serverData :: ServerData
finished :: MVar ()
listenThread :: ThreadId
responseQueue :: TChan (MVar (Either IOError Response))
vStream :: MVar Transport
serverData :: Pipeline -> ServerData
finished :: Pipeline -> MVar ()
listenThread :: Pipeline -> ThreadId
responseQueue :: Pipeline -> TChan (MVar (Either IOError Response))
vStream :: Pipeline -> MVar Transport
..} = do
    ThreadId -> IO ()
killThread ThreadId
listenThread
    Transport -> IO ()
Tr.close forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
readMVar MVar Transport
vStream

isClosed :: Pipeline -> IO Bool
isClosed :: Pipeline -> IO Bool
isClosed Pipeline{ThreadId
listenThread :: ThreadId
listenThread :: Pipeline -> ThreadId
listenThread} = do
    ThreadStatus
status <- ThreadId -> IO ThreadStatus
threadStatus ThreadId
listenThread
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case ThreadStatus
status of
        ThreadStatus
ThreadRunning -> Bool
False
        ThreadStatus
ThreadFinished -> Bool
True
        ThreadBlocked BlockReason
_ -> Bool
False
        ThreadStatus
ThreadDied -> Bool
True

--isPipeClosed Pipeline{..} = isClosed =<< readMVar vHandle  -- isClosed hangs while listen loop is waiting on read

listen :: Pipeline -> IO ()
-- ^ Listen for responses and supply them to waiting threads in order
listen :: Pipeline -> IO ()
listen Pipeline{ThreadId
MVar ()
MVar Transport
TChan (MVar (Either IOError Response))
ServerData
serverData :: ServerData
finished :: MVar ()
listenThread :: ThreadId
responseQueue :: TChan (MVar (Either IOError Response))
vStream :: MVar Transport
serverData :: Pipeline -> ServerData
finished :: Pipeline -> MVar ()
listenThread :: Pipeline -> ThreadId
responseQueue :: Pipeline -> TChan (MVar (Either IOError Response))
vStream :: Pipeline -> MVar Transport
..} = do
    Transport
stream <- forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
readMVar MVar Transport
vStream
    forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
        Either IOError Response
e <- forall (m :: * -> *) e a.
(MonadBaseControl IO m, Exception e) =>
m a -> m (Either e a)
try forall a b. (a -> b) -> a -> b
$ Transport -> IO Response
readMessage Transport
stream
        MVar (Either IOError Response)
var <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> STM a
readTChan TChan (MVar (Either IOError Response))
responseQueue
        forall (m :: * -> *) a. MonadBase IO m => MVar a -> a -> m ()
putMVar MVar (Either IOError Response)
var Either IOError Response
e
        case Either IOError Response
e of
            Left IOError
err -> Transport -> IO ()
Tr.close Transport
stream forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. IOError -> IO a
ioError IOError
err  -- close and stop looping
            Right Response
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()

psend :: Pipeline -> Message -> IO ()
-- ^ Send message to destination; the destination must not response (otherwise future 'call's will get these responses instead of their own).
-- Throw IOError and close pipeline if send fails
psend :: Pipeline -> Message -> IO ()
psend p :: Pipeline
p@Pipeline{ThreadId
MVar ()
MVar Transport
TChan (MVar (Either IOError Response))
ServerData
serverData :: ServerData
finished :: MVar ()
listenThread :: ThreadId
responseQueue :: TChan (MVar (Either IOError Response))
vStream :: MVar Transport
serverData :: Pipeline -> ServerData
finished :: Pipeline -> MVar ()
listenThread :: Pipeline -> ThreadId
responseQueue :: Pipeline -> TChan (MVar (Either IOError Response))
vStream :: Pipeline -> MVar Transport
..} !Message
message = forall (m :: * -> *) a b.
MonadBaseControl IO m =>
MVar a -> (a -> m b) -> m b
withMVar MVar Transport
vStream (forall a b c. (a -> b -> c) -> b -> a -> c
flip Transport -> Message -> IO ()
writeMessage Message
message) forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`onException` Pipeline -> IO ()
close Pipeline
p

psendOpMsg :: Pipeline -> [Cmd] -> Maybe FlagBit -> Document -> IO ()-- IO (IO Response)
psendOpMsg :: Pipeline -> [Cmd] -> Maybe FlagBit -> Document -> IO ()
psendOpMsg p :: Pipeline
p@Pipeline{ThreadId
MVar ()
MVar Transport
TChan (MVar (Either IOError Response))
ServerData
serverData :: ServerData
finished :: MVar ()
listenThread :: ThreadId
responseQueue :: TChan (MVar (Either IOError Response))
vStream :: MVar Transport
serverData :: Pipeline -> ServerData
finished :: Pipeline -> MVar ()
listenThread :: Pipeline -> ThreadId
responseQueue :: Pipeline -> TChan (MVar (Either IOError Response))
vStream :: Pipeline -> MVar Transport
..} [Cmd]
commands Maybe FlagBit
flagBit Document
params =
  case Maybe FlagBit
flagBit of
    Just FlagBit
f -> case FlagBit
f of
               FlagBit
MoreToCome -> forall (m :: * -> *) a b.
MonadBaseControl IO m =>
MVar a -> (a -> m b) -> m b
withMVar MVar Transport
vStream (\Transport
t -> Transport -> OpMsgMessage -> Maybe FlagBit -> Document -> IO ()
writeOpMsgMessage Transport
t ([Cmd]
commands, forall a. Maybe a
Nothing) Maybe FlagBit
flagBit Document
params) forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`onException` Pipeline -> IO ()
close Pipeline
p -- >> return (return (0, ReplyEmpty))
               FlagBit
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"moreToCome has to be set if no response is expected"
    Maybe FlagBit
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"moreToCome has to be set if no response is expected"

pcall :: Pipeline -> Message -> IO (IO Response)
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
-- Throw IOError and closes pipeline if send fails, likewise for promised response.
pcall :: Pipeline -> Message -> IO (IO Response)
pcall p :: Pipeline
p@Pipeline{ThreadId
MVar ()
MVar Transport
TChan (MVar (Either IOError Response))
ServerData
serverData :: ServerData
finished :: MVar ()
listenThread :: ThreadId
responseQueue :: TChan (MVar (Either IOError Response))
vStream :: MVar Transport
serverData :: Pipeline -> ServerData
finished :: Pipeline -> MVar ()
listenThread :: Pipeline -> ThreadId
responseQueue :: Pipeline -> TChan (MVar (Either IOError Response))
vStream :: Pipeline -> MVar Transport
..} Message
message = do
  Bool
listenerStopped <- Pipeline -> IO Bool
isFinished Pipeline
p
  if Bool
listenerStopped
    then forall a. IOError -> IO a
ioError forall a b. (a -> b) -> a -> b
$ IOErrorType -> [Char] -> Maybe Handle -> Maybe [Char] -> IOError
mkIOError IOErrorType
doesNotExistErrorType [Char]
"Handle has been closed" forall a. Maybe a
Nothing forall a. Maybe a
Nothing
    else forall (m :: * -> *) a b.
MonadBaseControl IO m =>
MVar a -> (a -> m b) -> m b
withMVar MVar Transport
vStream forall {m :: * -> *}.
MonadBase IO m =>
Transport -> IO (m Response)
doCall forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`onException` Pipeline -> IO ()
close Pipeline
p
  where
    doCall :: Transport -> IO (m Response)
doCall Transport
stream = do
        Transport -> Message -> IO ()
writeMessage Transport
stream Message
message
        MVar (Either IOError Response)
var <- forall (m :: * -> *) a. MonadBase IO m => m (MVar a)
newEmptyMVar
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> a -> STM ()
writeTChan TChan (MVar (Either IOError Response))
responseQueue MVar (Either IOError Response)
var
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
readMVar MVar (Either IOError Response)
var forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall (m :: * -> *) e a. (MonadBase IO m, Exception e) => e -> m a
throwIO forall (m :: * -> *) a. Monad m => a -> m a
return -- return promise

pcallOpMsg :: Pipeline -> Maybe (Request, RequestId) -> Maybe FlagBit -> Document -> IO (IO Response)
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
-- Throw IOError and closes pipeline if send fails, likewise for promised response.
pcallOpMsg :: Pipeline
-> Maybe (Request, Int32)
-> Maybe FlagBit
-> Document
-> IO (IO Response)
pcallOpMsg p :: Pipeline
p@Pipeline{ThreadId
MVar ()
MVar Transport
TChan (MVar (Either IOError Response))
ServerData
serverData :: ServerData
finished :: MVar ()
listenThread :: ThreadId
responseQueue :: TChan (MVar (Either IOError Response))
vStream :: MVar Transport
serverData :: Pipeline -> ServerData
finished :: Pipeline -> MVar ()
listenThread :: Pipeline -> ThreadId
responseQueue :: Pipeline -> TChan (MVar (Either IOError Response))
vStream :: Pipeline -> MVar Transport
..} Maybe (Request, Int32)
message Maybe FlagBit
flagbit Document
params = do
  Bool
listenerStopped <- Pipeline -> IO Bool
isFinished Pipeline
p
  if Bool
listenerStopped
    then forall a. IOError -> IO a
ioError forall a b. (a -> b) -> a -> b
$ IOErrorType -> [Char] -> Maybe Handle -> Maybe [Char] -> IOError
mkIOError IOErrorType
doesNotExistErrorType [Char]
"Handle has been closed" forall a. Maybe a
Nothing forall a. Maybe a
Nothing
    else forall (m :: * -> *) a b.
MonadBaseControl IO m =>
MVar a -> (a -> m b) -> m b
withMVar MVar Transport
vStream forall {m :: * -> *}.
MonadBase IO m =>
Transport -> IO (m Response)
doCall forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`onException` Pipeline -> IO ()
close Pipeline
p
  where
    doCall :: Transport -> IO (m Response)
doCall Transport
stream = do
        Transport -> OpMsgMessage -> Maybe FlagBit -> Document -> IO ()
writeOpMsgMessage Transport
stream ([], Maybe (Request, Int32)
message) Maybe FlagBit
flagbit Document
params
        MVar (Either IOError Response)
var <- forall (m :: * -> *) a. MonadBase IO m => m (MVar a)
newEmptyMVar
        -- put var into the response-queue so that it can
        -- fetch the latest response
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> a -> STM ()
writeTChan TChan (MVar (Either IOError Response))
responseQueue MVar (Either IOError Response)
var
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
readMVar MVar (Either IOError Response)
var forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall (m :: * -> *) e a. (MonadBase IO m, Exception e) => e -> m a
throwIO forall (m :: * -> *) a. Monad m => a -> m a
return -- return promise

-- * Pipe

type Pipe = Pipeline
-- ^ Thread-safe TCP connection with pipelined requests. In long-running applications the user is expected to use it as a "client": create a `Pipe`
-- at startup, use it as long as possible, watch out for possible timeouts, and close it on shutdown. Bearing in mind that disconnections may be triggered by MongoDB service providers, the user is responsible for re-creating their `Pipe` whenever necessary.

newPipe :: ServerData -> Handle -> IO Pipe
-- ^ Create pipe over handle
newPipe :: ServerData -> Handle -> IO Pipeline
newPipe ServerData
sd Handle
handle = Handle -> IO Transport
Tr.fromHandle Handle
handle forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (ServerData -> Transport -> IO Pipeline
newPipeWith ServerData
sd)

newPipeWith :: ServerData -> Transport -> IO Pipe
-- ^ Create pipe over connection
newPipeWith :: ServerData -> Transport -> IO Pipeline
newPipeWith ServerData
sd Transport
conn = ServerData -> Transport -> IO Pipeline
newPipeline ServerData
sd Transport
conn

send :: Pipe -> [Notice] -> IO ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
send :: Pipeline -> [Notice] -> IO ()
send Pipeline
pipe [Notice]
notices = Pipeline -> Message -> IO ()
psend Pipeline
pipe ([Notice]
notices, forall a. Maybe a
Nothing)

sendOpMsg :: Pipe -> [Cmd] -> Maybe FlagBit -> Document -> IO ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
sendOpMsg :: Pipeline -> [Cmd] -> Maybe FlagBit -> Document -> IO ()
sendOpMsg Pipeline
pipe commands :: [Cmd]
commands@(Nc Notice
_ : [Cmd]
_) Maybe FlagBit
flagBit Document
params =  Pipeline -> [Cmd] -> Maybe FlagBit -> Document -> IO ()
psendOpMsg Pipeline
pipe [Cmd]
commands Maybe FlagBit
flagBit Document
params
sendOpMsg Pipeline
pipe commands :: [Cmd]
commands@(Kc KillC
_ : [Cmd]
_) Maybe FlagBit
flagBit Document
params =  Pipeline -> [Cmd] -> Maybe FlagBit -> Document -> IO ()
psendOpMsg Pipeline
pipe [Cmd]
commands Maybe FlagBit
flagBit Document
params
sendOpMsg Pipeline
_ [Cmd]
_ Maybe FlagBit
_ Document
_ =  forall a. HasCallStack => [Char] -> a
error [Char]
"This function only supports Cmd types wrapped in Nc or Kc type constructors"

call :: Pipe -> [Notice] -> Request -> IO (IO Reply)
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
call :: Pipeline -> [Notice] -> Request -> IO (IO Reply)
call Pipeline
pipe [Notice]
notices Request
request = do
    Int32
requestId <- forall (m :: * -> *). MonadIO m => m Int32
genRequestId
    IO Response
promise <- Pipeline -> Message -> IO (IO Response)
pcall Pipeline
pipe ([Notice]
notices, forall a. a -> Maybe a
Just (Request
request, Int32
requestId))
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall {a} {b}. (Eq a, Show a) => a -> (a, b) -> b
check Int32
requestId forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO Response
promise
 where
    check :: a -> (a, b) -> b
check a
requestId (a
responseTo, b
reply) = if a
requestId forall a. Eq a => a -> a -> Bool
== a
responseTo then b
reply else
        forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"expected response id (" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show a
responseTo forall a. [a] -> [a] -> [a]
++ [Char]
") to match request id (" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show a
requestId forall a. [a] -> [a] -> [a]
++ [Char]
")"

callOpMsg :: Pipe -> Request -> Maybe FlagBit -> Document -> IO (IO Reply)
-- ^ Send requests as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
callOpMsg :: Pipeline -> Request -> Maybe FlagBit -> Document -> IO (IO Reply)
callOpMsg Pipeline
pipe Request
request Maybe FlagBit
flagBit Document
params = do
    Int32
requestId <- forall (m :: * -> *). MonadIO m => m Int32
genRequestId
    IO Response
promise <- Pipeline
-> Maybe (Request, Int32)
-> Maybe FlagBit
-> Document
-> IO (IO Response)
pcallOpMsg Pipeline
pipe (forall a. a -> Maybe a
Just (Request
request, Int32
requestId)) Maybe FlagBit
flagBit Document
params
    Response
promise' <- IO Response
promise :: IO Response
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. (a, b) -> b
snd forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int32 -> Response -> IO Response
produce Int32
requestId Response
promise'
 where
   -- We need to perform streaming here as within the OP_MSG protocol mongoDB expects
   -- our client to keep receiving messages after the MoreToCome flagbit was
   -- set by the server until our client receives an empty flagbit. After the
   -- first MoreToCome flagbit was set the responseTo field in the following
   -- headers will reference the cursorId that was set in the previous message.
   -- see:
   -- https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst#moretocome-on-responses
    checkFlagBit :: (a, Reply) -> Bool
checkFlagBit (a, Reply)
p =
      case (a, Reply)
p of
        (a
_, Reply
r) ->
          case Reply
r of
            ReplyOpMsg{[Document]
[FlagBit]
Maybe Int32
checksum :: Reply -> Maybe Int32
sections :: Reply -> [Document]
flagBits :: Reply -> [FlagBit]
checksum :: Maybe Int32
sections :: [Document]
flagBits :: [FlagBit]
..} -> [FlagBit]
flagBits forall a. Eq a => a -> a -> Bool
== [FlagBit
MoreToCome]
             -- This is called by functions using the OP_MSG protocol,
             -- so this has to be ReplyOpMsg
            Reply
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"Impossible"
    produce :: Int32 -> Response -> IO Response
produce Int32
reqId Response
p = forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit forall a b. (a -> b) -> a -> b
$
      case Response
p of
        (Int32
rt, Reply
r) ->
          case Reply
r of
              ReplyOpMsg{[Document]
[FlagBit]
Maybe Int32
checksum :: Maybe Int32
sections :: [Document]
flagBits :: [FlagBit]
checksum :: Reply -> Maybe Int32
sections :: Reply -> [Document]
flagBits :: Reply -> [FlagBit]
..} ->
                if [FlagBit]
flagBits forall a. Eq a => a -> a -> Bool
== [FlagBit
MoreToCome]
                  then forall {i}. ConduitT i Response IO ()
yieldResponses forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| forall (m :: * -> *) a b o.
Monad m =>
(a -> b -> a) -> a -> ConduitT b o m a
foldlC forall {a}. Response -> (a, Reply) -> Response
mergeResponses Response
p
                  else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (Int32
rt, forall {a} {b}. (Eq a, Show a) => a -> (a, b) -> b
check Int32
reqId Response
p)
              Reply
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"Impossible" -- see comment above
    yieldResponses :: ConduitT i Response IO ()
yieldResponses = forall (m :: * -> *) a i.
Monad m =>
m a -> (a -> Bool) -> ConduitT i a m ()
repeatWhileMC
          (do
             MVar (Either IOError Response)
var <- forall (m :: * -> *) a. MonadBase IO m => m (MVar a)
newEmptyMVar
             forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> a -> STM ()
writeTChan (Pipeline -> TChan (MVar (Either IOError Response))
responseQueue Pipeline
pipe) MVar (Either IOError Response)
var
             forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
readMVar MVar (Either IOError Response)
var forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall (m :: * -> *) e a. (MonadBase IO m, Exception e) => e -> m a
throwIO forall (m :: * -> *) a. Monad m => a -> m a
return :: IO Response
          )
          forall {a}. (a, Reply) -> Bool
checkFlagBit
    mergeResponses :: Response -> (a, Reply) -> Response
mergeResponses p :: Response
p@(Int32
rt,Reply
rep) (a, Reply)
p' =
      case (Response
p, (a, Reply)
p') of
          ((Int32
_, Reply
r), (a
_, Reply
r')) ->
            case (Reply
r, Reply
r') of
                (ReplyOpMsg [FlagBit]
_ [Document]
sec Maybe Int32
_, ReplyOpMsg [FlagBit]
_ [Document]
sec' Maybe Int32
_) -> do
                    let (Document
section, Document
section') = (forall a. [a] -> a
head [Document]
sec, forall a. [a] -> a
head [Document]
sec')
                        (Maybe Document
cur, Maybe Document
cur') = (forall b a. b -> (a -> b) -> Maybe a -> b
maybe forall a. Maybe a
Nothing forall a (m :: * -> *). (Val a, MonadFail m) => Value -> m a
cast forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadFail m => Text -> Document -> m Value
look Text
"cursor" Document
section,
                                      forall b a. b -> (a -> b) -> Maybe a -> b
maybe forall a. Maybe a
Nothing forall a (m :: * -> *). (Val a, MonadFail m) => Value -> m a
cast forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadFail m => Text -> Document -> m Value
look Text
"cursor" Document
section')
                    case (Maybe Document
cur, Maybe Document
cur') of
                      (Just Document
doc, Just Document
doc') -> do
                        let ([Document]
docs, [Document]
docs') =
                              ( forall a. HasCallStack => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$ forall a (m :: * -> *). (Val a, MonadFail m) => Value -> m a
cast forall a b. (a -> b) -> a -> b
$ Text -> Document -> Value
valueAt Text
"nextBatch" Document
doc :: [Document]
                              , forall a. HasCallStack => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$ forall a (m :: * -> *). (Val a, MonadFail m) => Value -> m a
cast forall a b. (a -> b) -> a -> b
$ Text -> Document -> Value
valueAt Text
"nextBatch" Document
doc' :: [Document])
                            id' :: Int32
id' = forall a. HasCallStack => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$ forall a (m :: * -> *). (Val a, MonadFail m) => Value -> m a
cast forall a b. (a -> b) -> a -> b
$ Text -> Document -> Value
valueAt Text
"id" Document
doc' :: Int32
                        (Int32
rt, forall {a} {b}. (Eq a, Show a) => a -> (a, b) -> b
check Int32
id' (Int32
rt, Reply
rep{ sections :: [Document]
sections = [Document]
docs' forall a. [a] -> [a] -> [a]
++ [Document]
docs })) -- todo: avoid (++)
                        -- Since we use this to process moreToCome messages, we
                        -- know that there will be a nextBatch key in the document
                      (Maybe Document, Maybe Document)
_ ->  forall a. HasCallStack => [Char] -> a
error [Char]
"Impossible"
                (Reply, Reply)
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"Impossible" -- see comment above
    check :: a -> (a, b) -> b
check a
requestId (a
responseTo, b
reply) = if a
requestId forall a. Eq a => a -> a -> Bool
== a
responseTo then b
reply else
        forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"expected response id (" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show a
responseTo forall a. [a] -> [a] -> [a]
++ [Char]
") to match request id (" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show a
requestId forall a. [a] -> [a] -> [a]
++ [Char]
")"

-- * Message

type Message = ([Notice], Maybe (Request, RequestId))
-- ^ A write notice(s) with getLastError request, or just query request.
-- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness.
type OpMsgMessage = ([Cmd], Maybe (Request, RequestId))

writeMessage :: Transport -> Message -> IO ()
-- ^ Write message to connection
writeMessage :: Transport -> Message -> IO ()
writeMessage Transport
conn ([Notice]
notices, Maybe (Request, Int32)
mRequest) = do
    [ByteString]
noticeStrings <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Notice]
notices forall a b. (a -> b) -> a -> b
$ \Notice
n -> do
          Int32
requestId <- forall (m :: * -> *). MonadIO m => m Int32
genRequestId
          let s :: ByteString
s = Put -> ByteString
runPut forall a b. (a -> b) -> a -> b
$ Notice -> Int32 -> Put
putNotice Notice
n Int32
requestId
          forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (ByteString -> ByteString
lenBytes ByteString
s) ByteString -> ByteString -> ByteString
`L.append` ByteString
s

    let requestString :: Maybe ByteString
requestString = do
          (Request
request, Int32
requestId) <- Maybe (Request, Int32)
mRequest
          let s :: ByteString
s = Put -> ByteString
runPut forall a b. (a -> b) -> a -> b
$ Request -> Int32 -> Put
putRequest Request
request Int32
requestId
          forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (ByteString -> ByteString
lenBytes ByteString
s) ByteString -> ByteString -> ByteString
`L.append` ByteString
s

    Transport -> ByteString -> IO ()
Tr.write Transport
conn forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
L.toStrict forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
L.concat forall a b. (a -> b) -> a -> b
$ [ByteString]
noticeStrings forall a. [a] -> [a] -> [a]
++ (forall a. Maybe a -> [a]
maybeToList Maybe ByteString
requestString)
    Transport -> IO ()
Tr.flush Transport
conn
 where
    lenBytes :: ByteString -> ByteString
lenBytes ByteString
bytes = Int32 -> ByteString
encodeSize forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Enum a => Int -> a
toEnum forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Enum a => a -> Int
fromEnum forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
L.length ByteString
bytes
    encodeSize :: Int32 -> ByteString
encodeSize = Put -> ByteString
runPut forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int32 -> Put
putInt32 forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a. Num a => a -> a -> a
+ Int32
4)

writeOpMsgMessage :: Transport -> OpMsgMessage -> Maybe FlagBit -> Document -> IO ()
-- ^ Write message to connection
writeOpMsgMessage :: Transport -> OpMsgMessage -> Maybe FlagBit -> Document -> IO ()
writeOpMsgMessage Transport
conn ([Cmd]
notices, Maybe (Request, Int32)
mRequest) Maybe FlagBit
flagBit Document
params = do
    [ByteString]
noticeStrings <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Cmd]
notices forall a b. (a -> b) -> a -> b
$ \Cmd
n -> do
          Int32
requestId <- forall (m :: * -> *). MonadIO m => m Int32
genRequestId
          let s :: ByteString
s = Put -> ByteString
runPut forall a b. (a -> b) -> a -> b
$ Cmd -> Int32 -> Maybe FlagBit -> Document -> Put
putOpMsg Cmd
n Int32
requestId Maybe FlagBit
flagBit Document
params
          forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (ByteString -> ByteString
lenBytes ByteString
s) ByteString -> ByteString -> ByteString
`L.append` ByteString
s

    let requestString :: Maybe ByteString
requestString = do
           (Request
request, Int32
requestId) <- Maybe (Request, Int32)
mRequest
           let s :: ByteString
s = Put -> ByteString
runPut forall a b. (a -> b) -> a -> b
$ Cmd -> Int32 -> Maybe FlagBit -> Document -> Put
putOpMsg (Request -> Cmd
Req Request
request) Int32
requestId Maybe FlagBit
flagBit Document
params
           forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (ByteString -> ByteString
lenBytes ByteString
s) ByteString -> ByteString -> ByteString
`L.append` ByteString
s

    Transport -> ByteString -> IO ()
Tr.write Transport
conn forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
L.toStrict forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
L.concat forall a b. (a -> b) -> a -> b
$ [ByteString]
noticeStrings forall a. [a] -> [a] -> [a]
++ (forall a. Maybe a -> [a]
maybeToList Maybe ByteString
requestString)
    Transport -> IO ()
Tr.flush Transport
conn
 where
    lenBytes :: ByteString -> ByteString
lenBytes ByteString
bytes = Int32 -> ByteString
encodeSize forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Enum a => Int -> a
toEnum forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Enum a => a -> Int
fromEnum forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
L.length ByteString
bytes
    encodeSize :: Int32 -> ByteString
encodeSize = Put -> ByteString
runPut forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int32 -> Put
putInt32 forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a. Num a => a -> a -> a
+ Int32
4)

type Response = (ResponseTo, Reply)
-- ^ Message received from a Mongo server in response to a Request

readMessage :: Transport -> IO Response
-- ^ read response from a connection
readMessage :: Transport -> IO Response
readMessage Transport
conn = IO Response
readResp  where
    readResp :: IO Response
readResp = do
        Int
len <- forall a. Enum a => a -> Int
fromEnum forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int32
decodeSize forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
L.fromStrict forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Transport -> Int -> IO ByteString
Tr.read Transport
conn Int
4
        forall a. Get a -> ByteString -> a
runGet Get Response
getReply forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
L.fromStrict forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Transport -> Int -> IO ByteString
Tr.read Transport
conn Int
len
    decodeSize :: ByteString -> Int32
decodeSize = forall a. Num a => a -> a -> a
subtract Int32
4 forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Get a -> ByteString -> a
runGet Get Int32
getInt32

type FullCollection = Text
-- ^ Database name and collection name with period (.) in between. Eg. \"myDb.myCollection\"

-- ** Header

type Opcode = Int32

type RequestId = Int32
-- ^ A fresh request id is generated for every message

type ResponseTo = RequestId

genRequestId :: (MonadIO m) => m RequestId
-- ^ Generate fresh request id
{-# NOINLINE genRequestId #-}
genRequestId :: forall (m :: * -> *). MonadIO m => m Int32
genRequestId = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef Int32
counter forall a b. (a -> b) -> a -> b
$ \Int32
n -> (Int32
n forall a. Num a => a -> a -> a
+ Int32
1, Int32
n) where
    counter :: IORef RequestId
    counter :: IORef Int32
counter = forall a. IO a -> a
unsafePerformIO (forall a. a -> IO (IORef a)
newIORef Int32
0)
    {-# NOINLINE counter #-}

-- *** Binary format

putHeader :: Opcode -> RequestId -> Put
-- ^ Note, does not write message length (first int32), assumes caller will write it
putHeader :: Int32 -> Int32 -> Put
putHeader Int32
opcode Int32
requestId = do
    Int32 -> Put
putInt32 Int32
requestId
    Int32 -> Put
putInt32 Int32
0
    Int32 -> Put
putInt32 Int32
opcode

putOpMsgHeader :: Opcode -> RequestId -> Put
-- ^ Note, does not write message length (first int32), assumes caller will write it
putOpMsgHeader :: Int32 -> Int32 -> Put
putOpMsgHeader Int32
opcode Int32
requestId = do
    Int32 -> Put
putInt32 Int32
requestId
    Int32 -> Put
putInt32 Int32
0
    Int32 -> Put
putInt32 Int32
opcode

getHeader :: Get (Opcode, ResponseTo)
-- ^ Note, does not read message length (first int32), assumes it was already read
getHeader :: Get (Int32, Int32)
getHeader = do
    Int32
_requestId <- Get Int32
getInt32
    Int32
responseTo <- Get Int32
getInt32
    Int32
opcode <- Get Int32
getInt32
    forall (m :: * -> *) a. Monad m => a -> m a
return (Int32
opcode, Int32
responseTo)

-- ** Notice

-- | A notice is a message that is sent with no reply
data Notice =
      Insert {
        Notice -> Text
iFullCollection :: FullCollection,
        Notice -> [InsertOption]
iOptions :: [InsertOption],
        Notice -> [Document]
iDocuments :: [Document]}
    | Update {
        Notice -> Text
uFullCollection :: FullCollection,
        Notice -> [UpdateOption]
uOptions :: [UpdateOption],
        Notice -> Document
uSelector :: Document,
        Notice -> Document
uUpdater :: Document}
    | Delete {
        Notice -> Text
dFullCollection :: FullCollection,
        Notice -> [DeleteOption]
dOptions :: [DeleteOption],
        Notice -> Document
dSelector :: Document}
    | KillCursors {
        Notice -> [Int64]
kCursorIds :: [CursorId]}
    deriving (Int -> Notice -> ShowS
[Notice] -> ShowS
Notice -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [Notice] -> ShowS
$cshowList :: [Notice] -> ShowS
show :: Notice -> [Char]
$cshow :: Notice -> [Char]
showsPrec :: Int -> Notice -> ShowS
$cshowsPrec :: Int -> Notice -> ShowS
Show, Notice -> Notice -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Notice -> Notice -> Bool
$c/= :: Notice -> Notice -> Bool
== :: Notice -> Notice -> Bool
$c== :: Notice -> Notice -> Bool
Eq)

data InsertOption = KeepGoing  -- ^ If set, the database will not stop processing a bulk insert if one fails (eg due to duplicate IDs). This makes bulk insert behave similarly to a series of single inserts, except lastError will be set if any insert fails, not just the last one. (new in 1.9.1)
    deriving (Int -> InsertOption -> ShowS
[InsertOption] -> ShowS
InsertOption -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [InsertOption] -> ShowS
$cshowList :: [InsertOption] -> ShowS
show :: InsertOption -> [Char]
$cshow :: InsertOption -> [Char]
showsPrec :: Int -> InsertOption -> ShowS
$cshowsPrec :: Int -> InsertOption -> ShowS
Show, InsertOption -> InsertOption -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: InsertOption -> InsertOption -> Bool
$c/= :: InsertOption -> InsertOption -> Bool
== :: InsertOption -> InsertOption -> Bool
$c== :: InsertOption -> InsertOption -> Bool
Eq)

data UpdateOption =
      Upsert  -- ^ If set, the database will insert the supplied object into the collection if no matching document is found
    | MultiUpdate  -- ^ If set, the database will update all matching objects in the collection. Otherwise only updates first matching doc
    deriving (Int -> UpdateOption -> ShowS
[UpdateOption] -> ShowS
UpdateOption -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [UpdateOption] -> ShowS
$cshowList :: [UpdateOption] -> ShowS
show :: UpdateOption -> [Char]
$cshow :: UpdateOption -> [Char]
showsPrec :: Int -> UpdateOption -> ShowS
$cshowsPrec :: Int -> UpdateOption -> ShowS
Show, UpdateOption -> UpdateOption -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UpdateOption -> UpdateOption -> Bool
$c/= :: UpdateOption -> UpdateOption -> Bool
== :: UpdateOption -> UpdateOption -> Bool
$c== :: UpdateOption -> UpdateOption -> Bool
Eq)

data DeleteOption = SingleRemove  -- ^ If set, the database will remove only the first matching document in the collection. Otherwise all matching documents will be removed
    deriving (Int -> DeleteOption -> ShowS
[DeleteOption] -> ShowS
DeleteOption -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [DeleteOption] -> ShowS
$cshowList :: [DeleteOption] -> ShowS
show :: DeleteOption -> [Char]
$cshow :: DeleteOption -> [Char]
showsPrec :: Int -> DeleteOption -> ShowS
$cshowsPrec :: Int -> DeleteOption -> ShowS
Show, DeleteOption -> DeleteOption -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: DeleteOption -> DeleteOption -> Bool
$c/= :: DeleteOption -> DeleteOption -> Bool
== :: DeleteOption -> DeleteOption -> Bool
$c== :: DeleteOption -> DeleteOption -> Bool
Eq)

type CursorId = Int64

-- *** Binary format

nOpcode :: Notice -> Opcode
nOpcode :: Notice -> Int32
nOpcode Update{} = Int32
2001
nOpcode Insert{} = Int32
2002
nOpcode Delete{} = Int32
2006
nOpcode KillCursors{} = Int32
2007

putNotice :: Notice -> RequestId -> Put
putNotice :: Notice -> Int32 -> Put
putNotice Notice
notice Int32
requestId = do
    Int32 -> Int32 -> Put
putHeader (Notice -> Int32
nOpcode Notice
notice) Int32
requestId
    case Notice
notice of
        Insert{[Document]
[InsertOption]
Text
iDocuments :: [Document]
iOptions :: [InsertOption]
iFullCollection :: Text
iDocuments :: Notice -> [Document]
iOptions :: Notice -> [InsertOption]
iFullCollection :: Notice -> Text
..} -> do
            Int32 -> Put
putInt32 ([InsertOption] -> Int32
iBits [InsertOption]
iOptions)
            Text -> Put
putCString Text
iFullCollection
            forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Document -> Put
putDocument [Document]
iDocuments
        Update{Document
[UpdateOption]
Text
uUpdater :: Document
uSelector :: Document
uOptions :: [UpdateOption]
uFullCollection :: Text
uUpdater :: Notice -> Document
uSelector :: Notice -> Document
uOptions :: Notice -> [UpdateOption]
uFullCollection :: Notice -> Text
..} -> do
            Int32 -> Put
putInt32 Int32
0
            Text -> Put
putCString Text
uFullCollection
            Int32 -> Put
putInt32 ([UpdateOption] -> Int32
uBits [UpdateOption]
uOptions)
            Document -> Put
putDocument Document
uSelector
            Document -> Put
putDocument Document
uUpdater
        Delete{Document
[DeleteOption]
Text
dSelector :: Document
dOptions :: [DeleteOption]
dFullCollection :: Text
dSelector :: Notice -> Document
dOptions :: Notice -> [DeleteOption]
dFullCollection :: Notice -> Text
..} -> do
            Int32 -> Put
putInt32 Int32
0
            Text -> Put
putCString Text
dFullCollection
            Int32 -> Put
putInt32 ([DeleteOption] -> Int32
dBits [DeleteOption]
dOptions)
            Document -> Put
putDocument Document
dSelector
        KillCursors{[Int64]
kCursorIds :: [Int64]
kCursorIds :: Notice -> [Int64]
..} -> do
            Int32 -> Put
putInt32 Int32
0
            Int32 -> Put
putInt32 forall a b. (a -> b) -> a -> b
$ forall a. Enum a => Int -> a
toEnum (forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int64]
kCursorIds)
            forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Int64 -> Put
putInt64 [Int64]
kCursorIds

data KillC = KillC { KillC -> Notice
killCursor :: Notice, KillC -> Text
kFullCollection:: FullCollection} deriving Int -> KillC -> ShowS
[KillC] -> ShowS
KillC -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [KillC] -> ShowS
$cshowList :: [KillC] -> ShowS
show :: KillC -> [Char]
$cshow :: KillC -> [Char]
showsPrec :: Int -> KillC -> ShowS
$cshowsPrec :: Int -> KillC -> ShowS
Show

data Cmd = Nc Notice | Req Request | Kc KillC deriving Int -> Cmd -> ShowS
[Cmd] -> ShowS
Cmd -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [Cmd] -> ShowS
$cshowList :: [Cmd] -> ShowS
show :: Cmd -> [Char]
$cshow :: Cmd -> [Char]
showsPrec :: Int -> Cmd -> ShowS
$cshowsPrec :: Int -> Cmd -> ShowS
Show

data FlagBit =
      ChecksumPresent  -- ^ The message ends with 4 bytes containing a CRC-32C checksum
    | MoreToCome  -- ^ Another message will follow this one without further action from the receiver.
    | ExhaustAllowed  -- ^ The client is prepared for multiple replies to this request using the moreToCome bit.
    deriving (Int -> FlagBit -> ShowS
[FlagBit] -> ShowS
FlagBit -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [FlagBit] -> ShowS
$cshowList :: [FlagBit] -> ShowS
show :: FlagBit -> [Char]
$cshow :: FlagBit -> [Char]
showsPrec :: Int -> FlagBit -> ShowS
$cshowsPrec :: Int -> FlagBit -> ShowS
Show, FlagBit -> FlagBit -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: FlagBit -> FlagBit -> Bool
$c/= :: FlagBit -> FlagBit -> Bool
== :: FlagBit -> FlagBit -> Bool
$c== :: FlagBit -> FlagBit -> Bool
Eq, Int -> FlagBit
FlagBit -> Int
FlagBit -> [FlagBit]
FlagBit -> FlagBit
FlagBit -> FlagBit -> [FlagBit]
FlagBit -> FlagBit -> FlagBit -> [FlagBit]
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
enumFromThenTo :: FlagBit -> FlagBit -> FlagBit -> [FlagBit]
$cenumFromThenTo :: FlagBit -> FlagBit -> FlagBit -> [FlagBit]
enumFromTo :: FlagBit -> FlagBit -> [FlagBit]
$cenumFromTo :: FlagBit -> FlagBit -> [FlagBit]
enumFromThen :: FlagBit -> FlagBit -> [FlagBit]
$cenumFromThen :: FlagBit -> FlagBit -> [FlagBit]
enumFrom :: FlagBit -> [FlagBit]
$cenumFrom :: FlagBit -> [FlagBit]
fromEnum :: FlagBit -> Int
$cfromEnum :: FlagBit -> Int
toEnum :: Int -> FlagBit
$ctoEnum :: Int -> FlagBit
pred :: FlagBit -> FlagBit
$cpred :: FlagBit -> FlagBit
succ :: FlagBit -> FlagBit
$csucc :: FlagBit -> FlagBit
Enum)


{-
  OP_MSG header == 16 byte
  + 4 bytes flagBits
  + 1 byte payload type = 1
  + 1 byte payload type = 2
  + 4 byte size of payload
  == 26 bytes opcode overhead
  + X Full command document {insert: "test", writeConcern: {...}}
  + Y command identifier ("documents", "deletes", "updates") ( + \0)
-}
putOpMsg :: Cmd -> RequestId -> Maybe FlagBit -> Document -> Put
putOpMsg :: Cmd -> Int32 -> Maybe FlagBit -> Document -> Put
putOpMsg Cmd
cmd Int32
requestId Maybe FlagBit
flagBit Document
params = do
    let biT :: Int32
biT = forall b a. b -> (a -> b) -> Maybe a -> b
maybe forall a. Bits a => a
zeroBits (forall a. Bits a => Int -> a
bit forall b c a. (b -> c) -> (a -> b) -> a -> c
. FlagBit -> Int
bitOpMsg) Maybe FlagBit
flagBit:: Int32
    Int32 -> Int32 -> Put
putOpMsgHeader Int32
opMsgOpcode Int32
requestId -- header
    case Cmd
cmd of
        Nc Notice
n -> case Notice
n of
            Insert{[Document]
[InsertOption]
Text
iDocuments :: [Document]
iOptions :: [InsertOption]
iFullCollection :: Text
iDocuments :: Notice -> [Document]
iOptions :: Notice -> [InsertOption]
iFullCollection :: Notice -> Text
..} -> do
                let (Document
sec0, Int32
sec1Size) =
                      Text
-> Maybe [Document]
-> Maybe Document
-> Text
-> Text
-> Document
-> (Document, Int32)
prepSectionInfo
                          Text
iFullCollection
                          (forall a. a -> Maybe a
Just ([Document]
iDocuments:: [Document]))
                          (forall a. Maybe a
Nothing:: Maybe Document)
                          (Text
"insert":: Text)
                          (Text
"documents":: Text)
                          Document
params
                Int32 -> Put
putInt32 Int32
biT                         -- flagBit
                Int8 -> Put
putInt8 Int8
0                            -- payload type 0
                Document -> Put
putDocument Document
sec0                     -- payload
                Int8 -> Put
putInt8 Int8
1                            -- payload type 1
                Int32 -> Put
putInt32 Int32
sec1Size                    -- size of section
                Text -> Put
putCString Text
"documents"               -- identifier
                forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Document -> Put
putDocument [Document]
iDocuments         -- payload
            Update{Document
[UpdateOption]
Text
uUpdater :: Document
uSelector :: Document
uOptions :: [UpdateOption]
uFullCollection :: Text
uUpdater :: Notice -> Document
uSelector :: Notice -> Document
uOptions :: Notice -> [UpdateOption]
uFullCollection :: Notice -> Text
..} -> do
                let doc :: Document
doc = [Text
"q" forall v. Val v => Text -> v -> Field
=: Document
uSelector, Text
"u" forall v. Val v => Text -> v -> Field
=: Document
uUpdater]
                    (Document
sec0, Int32
sec1Size) =
                      Text
-> Maybe [Document]
-> Maybe Document
-> Text
-> Text
-> Document
-> (Document, Int32)
prepSectionInfo
                          Text
uFullCollection
                          (forall a. Maybe a
Nothing:: Maybe [Document])
                          (forall a. a -> Maybe a
Just Document
doc)
                          (Text
"update":: Text)
                          (Text
"updates":: Text)
                          Document
params
                Int32 -> Put
putInt32 Int32
biT
                Int8 -> Put
putInt8 Int8
0
                Document -> Put
putDocument Document
sec0
                Int8 -> Put
putInt8 Int8
1
                Int32 -> Put
putInt32 Int32
sec1Size
                Text -> Put
putCString Text
"updates"
                Document -> Put
putDocument Document
doc
            Delete{Document
[DeleteOption]
Text
dSelector :: Document
dOptions :: [DeleteOption]
dFullCollection :: Text
dSelector :: Notice -> Document
dOptions :: Notice -> [DeleteOption]
dFullCollection :: Notice -> Text
..} -> do
                -- Setting limit to 1 here is ok, since this is only used by deleteOne
                let doc :: Document
doc = [Text
"q" forall v. Val v => Text -> v -> Field
=: Document
dSelector, Text
"limit" forall v. Val v => Text -> v -> Field
=: (Int32
1 :: Int32)]
                    (Document
sec0, Int32
sec1Size) =
                      Text
-> Maybe [Document]
-> Maybe Document
-> Text
-> Text
-> Document
-> (Document, Int32)
prepSectionInfo
                          Text
dFullCollection
                          (forall a. Maybe a
Nothing:: Maybe [Document])
                          (forall a. a -> Maybe a
Just Document
doc)
                          (Text
"delete":: Text)
                          (Text
"deletes":: Text)
                          Document
params
                Int32 -> Put
putInt32 Int32
biT
                Int8 -> Put
putInt8 Int8
0
                Document -> Put
putDocument Document
sec0
                Int8 -> Put
putInt8 Int8
1
                Int32 -> Put
putInt32 Int32
sec1Size
                Text -> Put
putCString Text
"deletes"
                Document -> Put
putDocument Document
doc
            Notice
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"The KillCursors command cannot be wrapped into a Nc type constructor. Please use the Kc type constructor"
        Req Request
r -> case Request
r of
            Query{Int32
Document
[QueryOption]
Text
qProjector :: Request -> Document
qSelector :: Request -> Document
qBatchSize :: Request -> Int32
qSkip :: Request -> Int32
qFullCollection :: Request -> Text
qOptions :: Request -> [QueryOption]
qProjector :: Document
qSelector :: Document
qBatchSize :: Int32
qSkip :: Int32
qFullCollection :: Text
qOptions :: [QueryOption]
..} -> do
                let n :: [Text]
n = Text -> Text -> [Text]
T.splitOn Text
"." Text
qFullCollection
                    db :: Text
db = forall a. [a] -> a
head [Text]
n
                    sec0 :: Document
sec0 = forall a. (a -> a -> a) -> [a] -> a
foldl1' Document -> Document -> Document
merge [Document
qProjector, [ Text
"$db" forall v. Val v => Text -> v -> Field
=: Text
db ], Document
qSelector]
                Int32 -> Put
putInt32 Int32
biT
                Int8 -> Put
putInt8 Int8
0
                Document -> Put
putDocument Document
sec0
            GetMore{Int32
Int64
Text
gCursorId :: Request -> Int64
gBatchSize :: Request -> Int32
gFullCollection :: Request -> Text
gCursorId :: Int64
gBatchSize :: Int32
gFullCollection :: Text
..} -> do
                let n :: [Text]
n = Text -> Text -> [Text]
T.splitOn Text
"." Text
gFullCollection
                    (Text
db, Text
coll) = (forall a. [a] -> a
head [Text]
n, forall a. [a] -> a
last [Text]
n)
                    pre :: Document
pre = [Text
"getMore" forall v. Val v => Text -> v -> Field
=: Int64
gCursorId, Text
"collection" forall v. Val v => Text -> v -> Field
=: Text
coll, Text
"$db" forall v. Val v => Text -> v -> Field
=: Text
db, Text
"batchSize" forall v. Val v => Text -> v -> Field
=: Int32
gBatchSize]
                Int32 -> Put
putInt32 (forall a. Bits a => Int -> a
bit forall a b. (a -> b) -> a -> b
$ FlagBit -> Int
bitOpMsg forall a b. (a -> b) -> a -> b
$ FlagBit
ExhaustAllowed)
                Int8 -> Put
putInt8 Int8
0
                Document -> Put
putDocument Document
pre
        Kc KillC
k -> case KillC
k of
            KillC{Text
Notice
kFullCollection :: Text
killCursor :: Notice
kFullCollection :: KillC -> Text
killCursor :: KillC -> Notice
..} -> do
                let n :: [Text]
n = Text -> Text -> [Text]
T.splitOn Text
"." Text
kFullCollection
                    (Text
db, Text
coll) = (forall a. [a] -> a
head [Text]
n, forall a. [a] -> a
last [Text]
n)
                case Notice
killCursor of
                  KillCursors{[Int64]
kCursorIds :: [Int64]
kCursorIds :: Notice -> [Int64]
..} -> do
                      let doc :: Document
doc = [Text
"killCursors" forall v. Val v => Text -> v -> Field
=: Text
coll, Text
"cursors" forall v. Val v => Text -> v -> Field
=: [Int64]
kCursorIds, Text
"$db" forall v. Val v => Text -> v -> Field
=: Text
db]
                      Int32 -> Put
putInt32 Int32
biT
                      Int8 -> Put
putInt8 Int8
0
                      Document -> Put
putDocument Document
doc
                  -- Notices are already captured at the beginning, so all
                  -- other cases are impossible
                  Notice
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"impossible"
 where
    lenBytes :: ByteString -> Int32
lenBytes ByteString
bytes = forall a. Enum a => Int -> a
toEnum forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Enum a => a -> Int
fromEnum forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
L.length ByteString
bytes:: Int32
    prepSectionInfo :: Text
-> Maybe [Document]
-> Maybe Document
-> Text
-> Text
-> Document
-> (Document, Int32)
prepSectionInfo Text
fullCollection Maybe [Document]
documents Maybe Document
document Text
command Text
identifier Document
ps =
      let n :: [Text]
n = Text -> Text -> [Text]
T.splitOn Text
"." Text
fullCollection
          (Text
db, Text
coll) = (forall a. [a] -> a
head [Text]
n, forall a. [a] -> a
last [Text]
n)
      in
      case Maybe [Document]
documents of
        Just [Document]
ds ->
            let
                sec0 :: Document
sec0 = Document -> Document -> Document
merge Document
ps [Text
command forall v. Val v => Text -> v -> Field
=: Text
coll, Text
"$db" forall v. Val v => Text -> v -> Field
=: Text
db]
                s :: Int32
s = forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map (ByteString -> Int32
lenBytes forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> ByteString
runPut forall b c a. (b -> c) -> (a -> b) -> a -> c
. Document -> Put
putDocument) [Document]
ds
                i :: ByteString
i = Put -> ByteString
runPut forall a b. (a -> b) -> a -> b
$ Text -> Put
putCString Text
identifier
                -- +4 bytes for the type 1 section size that has to be
                -- transported in addition to the type 1 section document
                sec1Size :: Int32
sec1Size = Int32
s forall a. Num a => a -> a -> a
+ ByteString -> Int32
lenBytes ByteString
i forall a. Num a => a -> a -> a
+ Int32
4
            in (Document
sec0, Int32
sec1Size)
        Maybe [Document]
Nothing ->
            let
                sec0 :: Document
sec0 = Document -> Document -> Document
merge Document
ps [Text
command forall v. Val v => Text -> v -> Field
=: Text
coll, Text
"$db" forall v. Val v => Text -> v -> Field
=: Text
db]
                s :: ByteString
s = Put -> ByteString
runPut forall a b. (a -> b) -> a -> b
$ Document -> Put
putDocument forall a b. (a -> b) -> a -> b
$ forall a. HasCallStack => Maybe a -> a
fromJust Maybe Document
document
                i :: ByteString
i = Put -> ByteString
runPut forall a b. (a -> b) -> a -> b
$ Text -> Put
putCString Text
identifier
                sec1Size :: Int32
sec1Size = ByteString -> Int32
lenBytes ByteString
s forall a. Num a => a -> a -> a
+ ByteString -> Int32
lenBytes ByteString
i forall a. Num a => a -> a -> a
+ Int32
4
            in (Document
sec0, Int32
sec1Size)

iBit :: InsertOption -> Int32
iBit :: InsertOption -> Int32
iBit InsertOption
KeepGoing = forall a. Bits a => Int -> a
bit Int
0

iBits :: [InsertOption] -> Int32
iBits :: [InsertOption] -> Int32
iBits = forall a. (Num a, Bits a) => [a] -> a
bitOr forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a -> b) -> [a] -> [b]
map InsertOption -> Int32
iBit

uBit :: UpdateOption -> Int32
uBit :: UpdateOption -> Int32
uBit UpdateOption
Upsert = forall a. Bits a => Int -> a
bit Int
0
uBit UpdateOption
MultiUpdate = forall a. Bits a => Int -> a
bit Int
1

uBits :: [UpdateOption] -> Int32
uBits :: [UpdateOption] -> Int32
uBits = forall a. (Num a, Bits a) => [a] -> a
bitOr forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a -> b) -> [a] -> [b]
map UpdateOption -> Int32
uBit

dBit :: DeleteOption -> Int32
dBit :: DeleteOption -> Int32
dBit DeleteOption
SingleRemove = forall a. Bits a => Int -> a
bit Int
0

dBits :: [DeleteOption] -> Int32
dBits :: [DeleteOption] -> Int32
dBits = forall a. (Num a, Bits a) => [a] -> a
bitOr forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a -> b) -> [a] -> [b]
map DeleteOption -> Int32
dBit

bitOpMsg :: FlagBit -> Int
bitOpMsg :: FlagBit -> Int
bitOpMsg FlagBit
ChecksumPresent = Int
0
bitOpMsg FlagBit
MoreToCome = Int
1
bitOpMsg FlagBit
ExhaustAllowed = Int
16

-- ** Request

-- | A request is a message that is sent with a 'Reply' expected in return
data Request =
      Query {
        Request -> [QueryOption]
qOptions :: [QueryOption],
        Request -> Text
qFullCollection :: FullCollection,
        Request -> Int32
qSkip :: Int32,  -- ^ Number of initial matching documents to skip
        Request -> Int32
qBatchSize :: Int32,  -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size.
        Request -> Document
qSelector :: Document,  -- ^ @[]@ = return all documents in collection
        Request -> Document
qProjector :: Document  -- ^ @[]@ = return whole document
    } | GetMore {
        Request -> Text
gFullCollection :: FullCollection,
        Request -> Int32
gBatchSize :: Int32,
        Request -> Int64
gCursorId :: CursorId}
    deriving (Int -> Request -> ShowS
[Request] -> ShowS
Request -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [Request] -> ShowS
$cshowList :: [Request] -> ShowS
show :: Request -> [Char]
$cshow :: Request -> [Char]
showsPrec :: Int -> Request -> ShowS
$cshowsPrec :: Int -> Request -> ShowS
Show, Request -> Request -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Request -> Request -> Bool
$c/= :: Request -> Request -> Bool
== :: Request -> Request -> Bool
$c== :: Request -> Request -> Bool
Eq)

data QueryOption =
      TailableCursor  -- ^ Tailable means cursor is not closed when the last data is retrieved. Rather, the cursor marks the final object's position. You can resume using the cursor later, from where it was located, if more data were received. Like any "latent cursor", the cursor may become invalid at some point – for example if the final object it references were deleted. Thus, you should be prepared to requery on @CursorNotFound@ exception.
    | SlaveOK  -- ^ Allow query of replica slave. Normally these return an error except for namespace "local".
    | NoCursorTimeout  -- ^ The server normally times out idle cursors after 10 minutes to prevent a memory leak in case a client forgets to close a cursor. Set this option to allow a cursor to live forever until it is closed.
    | AwaitData  -- ^ Use with TailableCursor. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal.

--  | Exhaust  -- ^ Stream the data down full blast in multiple "more" packages, on the assumption that the client will fully read all data queried. Faster when you are pulling a lot of data and know you want to pull it all down. Note: the client is not allowed to not read all the data unless it closes the connection.
-- Exhaust commented out because not compatible with current `Pipeline` implementation

    | Partial  -- ^ Get partial results from a /mongos/ if some shards are down, instead of throwing an error.
    deriving (Int -> QueryOption -> ShowS
[QueryOption] -> ShowS
QueryOption -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [QueryOption] -> ShowS
$cshowList :: [QueryOption] -> ShowS
show :: QueryOption -> [Char]
$cshow :: QueryOption -> [Char]
showsPrec :: Int -> QueryOption -> ShowS
$cshowsPrec :: Int -> QueryOption -> ShowS
Show, QueryOption -> QueryOption -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: QueryOption -> QueryOption -> Bool
$c/= :: QueryOption -> QueryOption -> Bool
== :: QueryOption -> QueryOption -> Bool
$c== :: QueryOption -> QueryOption -> Bool
Eq)

-- *** Binary format

qOpcode :: Request -> Opcode
qOpcode :: Request -> Int32
qOpcode Query{} = Int32
2004
qOpcode GetMore{} = Int32
2005

opMsgOpcode :: Opcode
opMsgOpcode :: Int32
opMsgOpcode = Int32
2013

putRequest :: Request -> RequestId -> Put
putRequest :: Request -> Int32 -> Put
putRequest Request
request Int32
requestId = do
    Int32 -> Int32 -> Put
putHeader (Request -> Int32
qOpcode Request
request) Int32
requestId
    case Request
request of
        Query{Int32
Document
[QueryOption]
Text
qProjector :: Document
qSelector :: Document
qBatchSize :: Int32
qSkip :: Int32
qFullCollection :: Text
qOptions :: [QueryOption]
qProjector :: Request -> Document
qSelector :: Request -> Document
qBatchSize :: Request -> Int32
qSkip :: Request -> Int32
qFullCollection :: Request -> Text
qOptions :: Request -> [QueryOption]
..} -> do
            Int32 -> Put
putInt32 ([QueryOption] -> Int32
qBits [QueryOption]
qOptions)
            Text -> Put
putCString Text
qFullCollection
            Int32 -> Put
putInt32 Int32
qSkip
            Int32 -> Put
putInt32 Int32
qBatchSize
            Document -> Put
putDocument Document
qSelector
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (forall (t :: * -> *) a. Foldable t => t a -> Bool
null Document
qProjector) (Document -> Put
putDocument Document
qProjector)
        GetMore{Int32
Int64
Text
gCursorId :: Int64
gBatchSize :: Int32
gFullCollection :: Text
gCursorId :: Request -> Int64
gBatchSize :: Request -> Int32
gFullCollection :: Request -> Text
..} -> do
            Int32 -> Put
putInt32 Int32
0
            Text -> Put
putCString Text
gFullCollection
            Int32 -> Put
putInt32 Int32
gBatchSize
            Int64 -> Put
putInt64 Int64
gCursorId

qBit :: QueryOption -> Int32
qBit :: QueryOption -> Int32
qBit QueryOption
TailableCursor = forall a. Bits a => Int -> a
bit Int
1
qBit QueryOption
SlaveOK = forall a. Bits a => Int -> a
bit Int
2
qBit QueryOption
NoCursorTimeout = forall a. Bits a => Int -> a
bit Int
4
qBit QueryOption
AwaitData = forall a. Bits a => Int -> a
bit Int
5
--qBit Exhaust = bit 6
qBit QueryOption
Database.MongoDB.Internal.Protocol.Partial = forall a. Bits a => Int -> a
bit Int
7

qBits :: [QueryOption] -> Int32
qBits :: [QueryOption] -> Int32
qBits = forall a. (Num a, Bits a) => [a] -> a
bitOr forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a -> b) -> [a] -> [b]
map QueryOption -> Int32
qBit

-- ** Reply

-- | A reply is a message received in response to a 'Request'
data Reply = Reply {
    Reply -> [ResponseFlag]
rResponseFlags :: [ResponseFlag],
    Reply -> Int64
rCursorId :: CursorId,  -- ^ 0 = cursor finished
    Reply -> Int32
rStartingFrom :: Int32,
    Reply -> [Document]
rDocuments :: [Document]
    }
   | ReplyOpMsg {
        Reply -> [FlagBit]
flagBits :: [FlagBit],
        Reply -> [Document]
sections :: [Document],
        Reply -> Maybe Int32
checksum :: Maybe Int32
    }
    deriving (Int -> Reply -> ShowS
[Reply] -> ShowS
Reply -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [Reply] -> ShowS
$cshowList :: [Reply] -> ShowS
show :: Reply -> [Char]
$cshow :: Reply -> [Char]
showsPrec :: Int -> Reply -> ShowS
$cshowsPrec :: Int -> Reply -> ShowS
Show, Reply -> Reply -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Reply -> Reply -> Bool
$c/= :: Reply -> Reply -> Bool
== :: Reply -> Reply -> Bool
$c== :: Reply -> Reply -> Bool
Eq)

data ResponseFlag =
      CursorNotFound  -- ^ Set when getMore is called but the cursor id is not valid at the server. Returned with zero results.
    | QueryError  -- ^ Query error. Returned with one document containing an "$err" field holding the error message.
    | AwaitCapable  -- ^ For backward compatability: Set when the server supports the AwaitData query option. if it doesn't, a replica slave client should sleep a little between getMore's
    deriving (Int -> ResponseFlag -> ShowS
[ResponseFlag] -> ShowS
ResponseFlag -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [ResponseFlag] -> ShowS
$cshowList :: [ResponseFlag] -> ShowS
show :: ResponseFlag -> [Char]
$cshow :: ResponseFlag -> [Char]
showsPrec :: Int -> ResponseFlag -> ShowS
$cshowsPrec :: Int -> ResponseFlag -> ShowS
Show, ResponseFlag -> ResponseFlag -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ResponseFlag -> ResponseFlag -> Bool
$c/= :: ResponseFlag -> ResponseFlag -> Bool
== :: ResponseFlag -> ResponseFlag -> Bool
$c== :: ResponseFlag -> ResponseFlag -> Bool
Eq, Int -> ResponseFlag
ResponseFlag -> Int
ResponseFlag -> [ResponseFlag]
ResponseFlag -> ResponseFlag
ResponseFlag -> ResponseFlag -> [ResponseFlag]
ResponseFlag -> ResponseFlag -> ResponseFlag -> [ResponseFlag]
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
enumFromThenTo :: ResponseFlag -> ResponseFlag -> ResponseFlag -> [ResponseFlag]
$cenumFromThenTo :: ResponseFlag -> ResponseFlag -> ResponseFlag -> [ResponseFlag]
enumFromTo :: ResponseFlag -> ResponseFlag -> [ResponseFlag]
$cenumFromTo :: ResponseFlag -> ResponseFlag -> [ResponseFlag]
enumFromThen :: ResponseFlag -> ResponseFlag -> [ResponseFlag]
$cenumFromThen :: ResponseFlag -> ResponseFlag -> [ResponseFlag]
enumFrom :: ResponseFlag -> [ResponseFlag]
$cenumFrom :: ResponseFlag -> [ResponseFlag]
fromEnum :: ResponseFlag -> Int
$cfromEnum :: ResponseFlag -> Int
toEnum :: Int -> ResponseFlag
$ctoEnum :: Int -> ResponseFlag
pred :: ResponseFlag -> ResponseFlag
$cpred :: ResponseFlag -> ResponseFlag
succ :: ResponseFlag -> ResponseFlag
$csucc :: ResponseFlag -> ResponseFlag
Enum)

-- * Binary format

replyOpcode :: Opcode
replyOpcode :: Int32
replyOpcode = Int32
1

getReply :: Get (ResponseTo, Reply)
getReply :: Get Response
getReply = do
    (Int32
opcode, Int32
responseTo) <- Get (Int32, Int32)
getHeader
    if Int32
opcode forall a. Eq a => a -> a -> Bool
== Int32
2013
      then do
            -- Notes:
            -- Checksum bits that are set by the server don't seem to be supported by official drivers.
            -- See: https://github.com/mongodb/mongo-python-driver/blob/master/pymongo/message.py#L1423
            [FlagBit]
flagBits <-  Int32 -> [FlagBit]
rFlagsOpMsg forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get Int32
getInt32
            Int8
_ <- Get Int8
getInt8
            Document
sec0 <- Get Document
getDocument
            let sections :: [Document]
sections = [Document
sec0]
                checksum :: Maybe a
checksum = forall a. Maybe a
Nothing
            forall (m :: * -> *) a. Monad m => a -> m a
return (Int32
responseTo, ReplyOpMsg{[Document]
[FlagBit]
forall a. Maybe a
checksum :: forall a. Maybe a
sections :: [Document]
flagBits :: [FlagBit]
checksum :: Maybe Int32
sections :: [Document]
flagBits :: [FlagBit]
..})
      else do
          forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Int32
opcode forall a. Eq a => a -> a -> Bool
== Int32
replyOpcode) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadFail m => [Char] -> m a
fail forall a b. (a -> b) -> a -> b
$ [Char]
"expected reply opcode (1) but got " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Int32
opcode
          [ResponseFlag]
rResponseFlags <-  Int32 -> [ResponseFlag]
rFlags forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get Int32
getInt32
          Int64
rCursorId <- Get Int64
getInt64
          Int32
rStartingFrom <- Get Int32
getInt32
          Int
numDocs <- forall a b. (Integral a, Num b) => a -> b
fromIntegral forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get Int32
getInt32
          [Document]
rDocuments <- forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numDocs Get Document
getDocument
          forall (m :: * -> *) a. Monad m => a -> m a
return (Int32
responseTo, Reply{Int32
Int64
[Document]
[ResponseFlag]
rDocuments :: [Document]
rStartingFrom :: Int32
rCursorId :: Int64
rResponseFlags :: [ResponseFlag]
rDocuments :: [Document]
rStartingFrom :: Int32
rCursorId :: Int64
rResponseFlags :: [ResponseFlag]
..})

rFlags :: Int32 -> [ResponseFlag]
rFlags :: Int32 -> [ResponseFlag]
rFlags Int32
bits = forall a. (a -> Bool) -> [a] -> [a]
filter (forall a. Bits a => a -> Int -> Bool
testBit Int32
bits forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResponseFlag -> Int
rBit) [ResponseFlag
CursorNotFound ..]

-- See https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst#flagbits
rFlagsOpMsg :: Int32 -> [FlagBit]
rFlagsOpMsg :: Int32 -> [FlagBit]
rFlagsOpMsg Int32
bits = forall {a}. Bits a => a -> [FlagBit]
isValidFlag Int32
bits
  where isValidFlag :: a -> [FlagBit]
isValidFlag a
bt =
          let setBits :: [Int32]
setBits = forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> a
fst forall a b. (a -> b) -> a -> b
$ forall a. (a -> Bool) -> [a] -> [a]
filter (\(Int32
_,Bool
b) -> Bool
b forall a. Eq a => a -> a -> Bool
== Bool
True) forall a b. (a -> b) -> a -> b
$ forall a b. [a] -> [b] -> [(a, b)]
zip ([Int32
0..Int32
31] :: [Int32]) forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map (forall a. Bits a => a -> Int -> Bool
testBit a
bt) [Int
0 .. Int
31]
          in if forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any (\Int32
n -> Bool -> Bool
not forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem Int32
n [Int32
0,Int32
1,Int32
16]) [Int32]
setBits
               then forall a. HasCallStack => [Char] -> a
error [Char]
"Unsopported bit was set"
               else forall a. (a -> Bool) -> [a] -> [a]
filter (forall a. Bits a => a -> Int -> Bool
testBit a
bt forall b c a. (b -> c) -> (a -> b) -> a -> c
. FlagBit -> Int
bitOpMsg) [FlagBit
ChecksumPresent ..]

rBit :: ResponseFlag -> Int
rBit :: ResponseFlag -> Int
rBit ResponseFlag
CursorNotFound = Int
0
rBit ResponseFlag
QueryError = Int
1
rBit ResponseFlag
AwaitCapable = Int
3

-- * Authentication

type Username = Text
type Password = Text
type Nonce = Text

pwHash :: Username -> Password -> Text
pwHash :: Text -> Text -> Text
pwHash Text
u Text
p = [Char] -> Text
T.pack forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> [Char]
byteStringHex forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
MD5.hash forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
TE.encodeUtf8 forall a b. (a -> b) -> a -> b
$ Text
u Text -> Text -> Text
`T.append` Text
":mongo:" Text -> Text -> Text
`T.append` Text
p

pwKey :: Nonce -> Username -> Password -> Text
pwKey :: Text -> Text -> Text -> Text
pwKey Text
n Text
u Text
p = [Char] -> Text
T.pack forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> [Char]
byteStringHex forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
MD5.hash forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
TE.encodeUtf8 forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text -> Text
T.append Text
n forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text -> Text
T.append Text
u forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
pwHash Text
u Text
p


{- Authors: Tony Hannan <tony@10gen.com>
   Copyright 2011 10gen Inc.
   Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -}