-- | Low-level messaging between this client and the MongoDB server, see Mongo
-- Wire Protocol (<http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol>).
--
-- This module is not intended for direct use. Use the high-level interface at
-- "Database.MongoDB.Query" and "Database.MongoDB.Connection" instead.

{-# LANGUAGE RecordWildCards, OverloadedStrings #-}
{-# LANGUAGE CPP, FlexibleContexts #-}
{-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-}
{-# LANGUAGE BangPatterns #-}

{-# LANGUAGE NamedFieldPuns, ScopedTypeVariables #-}

#if (__GLASGOW_HASKELL__ >= 706)
{-# LANGUAGE RecursiveDo #-}
#else
{-# LANGUAGE DoRec #-}
#endif

module Database.MongoDB.Internal.Protocol (
    FullCollection,
    -- * Pipe
    Pipe,  newPipe, newPipeWith, send, sendOpMsg, call, callOpMsg,
    -- ** Notice
    Notice(..), InsertOption(..), UpdateOption(..), DeleteOption(..), CursorId,
    -- ** Request
    Request(..), QueryOption(..), Cmd (..), KillC(..),
    -- ** Reply
    Reply(..), ResponseFlag(..), FlagBit(..),
    -- * Authentication
    Username, Password, Nonce, pwHash, pwKey,
    isClosed, close, ServerData(..), Pipeline(..), putOpMsg,
    bitOpMsg
) where

#if !MIN_VERSION_base(4,8,0)
import Control.Applicative ((<$>))
#endif
import Control.Monad ( forM, replicateM, unless, forever )
import Data.Binary.Get (Get, runGet, getInt8)
import Data.Binary.Put (Put, runPut, putInt8)
import Data.Bits (bit, testBit, zeroBits)
import Data.Int (Int32, Int64)
import Data.IORef (IORef, newIORef, atomicModifyIORef)
import System.IO (Handle)
import System.IO.Error (doesNotExistErrorType, mkIOError)
import System.IO.Unsafe (unsafePerformIO)
import Data.Maybe (maybeToList, fromJust)
import GHC.Conc (ThreadStatus(..), threadStatus)
import Control.Monad.STM (atomically)
import Control.Concurrent (ThreadId, killThread, forkIOWithUnmask)
import Control.Concurrent.STM.TChan (TChan, newTChan, readTChan, writeTChan, isEmptyTChan)

import Control.Exception.Lifted (SomeException, mask_, onException, throwIO, try)

import qualified Data.ByteString.Lazy as L

import Control.Monad.Trans (MonadIO, liftIO)
import Data.Bson (Document, (=:), merge, cast, valueAt, look)
import Data.Bson.Binary (getDocument, putDocument, getInt32, putInt32, getInt64,
                         putInt64, putCString)
import Data.Text (Text)

import qualified Crypto.Hash.MD5 as MD5
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE

import Database.MongoDB.Internal.Util (bitOr, byteStringHex)

import Database.MongoDB.Transport (Transport)
import qualified Database.MongoDB.Transport as Tr


#if MIN_VERSION_base(4,6,0)
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
                                       putMVar, readMVar, mkWeakMVar, isEmptyMVar)
import GHC.List (foldl1')
import Conduit (repeatWhileMC, (.|), runConduit, foldlC)
#else
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
                                         putMVar, readMVar, addMVarFinalizer)
#endif

#if !MIN_VERSION_base(4,6,0)
mkWeakMVar :: MVar a -> IO () -> IO ()
mkWeakMVar = addMVarFinalizer
#endif


-- * Pipeline

-- | Thread-safe and pipelined connection
data Pipeline = Pipeline
    { Pipeline -> MVar Transport
vStream :: MVar Transport -- ^ Mutex on handle, so only one thread at a time can write to it
    , Pipeline -> TChan (MVar (Either IOError Response))
responseQueue :: TChan (MVar (Either IOError Response)) -- ^ Queue of threads waiting for responses. Every time a response arrives we pop the next thread and give it the response.
    , Pipeline -> ThreadId
listenThread :: ThreadId
    , Pipeline -> MVar ()
finished :: MVar ()
    , Pipeline -> ServerData
serverData :: ServerData
    }

data ServerData = ServerData
                { ServerData -> Bool
isMaster            :: Bool
                , ServerData -> Int
minWireVersion      :: Int
                , ServerData -> Int
maxWireVersion      :: Int
                , ServerData -> Int
maxMessageSizeBytes :: Int
                , ServerData -> Int
maxBsonObjectSize   :: Int
                , ServerData -> Int
maxWriteBatchSize   :: Int
                }
                deriving Int -> ServerData -> ShowS
[ServerData] -> ShowS
ServerData -> [Char]
(Int -> ServerData -> ShowS)
-> (ServerData -> [Char])
-> ([ServerData] -> ShowS)
-> Show ServerData
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ServerData -> ShowS
showsPrec :: Int -> ServerData -> ShowS
$cshow :: ServerData -> [Char]
show :: ServerData -> [Char]
$cshowList :: [ServerData] -> ShowS
showList :: [ServerData] -> ShowS
Show

-- | @'forkUnmaskedFinally' action and_then@ behaves the same as @'forkFinally' action and_then@, except that @action@ is run completely unmasked, whereas with 'forkFinally', @action@ is run with the same mask as the parent thread.
forkUnmaskedFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkUnmaskedFinally :: forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkUnmaskedFinally IO a
action Either SomeException a -> IO ()
and_then =
  IO ThreadId -> IO ThreadId
forall (m :: * -> *) a. MonadBaseControl IO m => m a -> m a
mask_ (IO ThreadId -> IO ThreadId) -> IO ThreadId -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask ->
    IO a -> IO (Either SomeException a)
forall (m :: * -> *) e a.
(MonadBaseControl IO m, Exception e) =>
m a -> m (Either e a)
try (IO a -> IO a
forall a. IO a -> IO a
unmask IO a
action) IO (Either SomeException a)
-> (Either SomeException a -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either SomeException a -> IO ()
and_then

-- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle.
newPipeline :: ServerData -> Transport -> IO Pipeline
newPipeline :: ServerData -> Transport -> IO Pipeline
newPipeline ServerData
serverData Transport
stream = do
    MVar Transport
vStream <- Transport -> IO (MVar Transport)
forall (m :: * -> *) a. MonadBase IO m => a -> m (MVar a)
newMVar Transport
stream
    TChan (MVar (Either IOError Response))
responseQueue <- STM (TChan (MVar (Either IOError Response)))
-> IO (TChan (MVar (Either IOError Response)))
forall a. STM a -> IO a
atomically STM (TChan (MVar (Either IOError Response)))
forall a. STM (TChan a)
newTChan
    MVar ()
finished <- IO (MVar ())
forall (m :: * -> *) a. MonadBase IO m => m (MVar a)
newEmptyMVar
    let drainReplies :: IO ()
drainReplies = do
          Bool
chanEmpty <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TChan (MVar (Either IOError Response)) -> STM Bool
forall a. TChan a -> STM Bool
isEmptyTChan TChan (MVar (Either IOError Response))
responseQueue
          if Bool
chanEmpty
            then () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            else do
              MVar (Either IOError Response)
var <- STM (MVar (Either IOError Response))
-> IO (MVar (Either IOError Response))
forall a. STM a -> IO a
atomically (STM (MVar (Either IOError Response))
 -> IO (MVar (Either IOError Response)))
-> STM (MVar (Either IOError Response))
-> IO (MVar (Either IOError Response))
forall a b. (a -> b) -> a -> b
$ TChan (MVar (Either IOError Response))
-> STM (MVar (Either IOError Response))
forall a. TChan a -> STM a
readTChan TChan (MVar (Either IOError Response))
responseQueue
              MVar (Either IOError Response) -> Either IOError Response -> IO ()
forall (m :: * -> *) a. MonadBase IO m => MVar a -> a -> m ()
putMVar MVar (Either IOError Response)
var (Either IOError Response -> IO ())
-> Either IOError Response -> IO ()
forall a b. (a -> b) -> a -> b
$ IOError -> Either IOError Response
forall a b. a -> Either a b
Left (IOError -> Either IOError Response)
-> IOError -> Either IOError Response
forall a b. (a -> b) -> a -> b
$ IOErrorType -> [Char] -> Maybe Handle -> Maybe [Char] -> IOError
mkIOError
                                        IOErrorType
doesNotExistErrorType
                                        [Char]
"Handle has been closed"
                                        Maybe Handle
forall a. Maybe a
Nothing
                                        Maybe [Char]
forall a. Maybe a
Nothing
              IO ()
drainReplies

    rec
        let pipe :: Pipeline
pipe = Pipeline{ThreadId
MVar ()
MVar Transport
TChan (MVar (Either IOError Response))
ServerData
vStream :: MVar Transport
responseQueue :: TChan (MVar (Either IOError Response))
listenThread :: ThreadId
finished :: MVar ()
serverData :: ServerData
serverData :: ServerData
vStream :: MVar Transport
responseQueue :: TChan (MVar (Either IOError Response))
finished :: MVar ()
listenThread :: ThreadId
..}
        ThreadId
listenThread <- IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkUnmaskedFinally (Pipeline -> IO ()
listen Pipeline
pipe) ((Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \Either SomeException ()
_ -> do
                                                              MVar () -> () -> IO ()
forall (m :: * -> *) a. MonadBase IO m => MVar a -> a -> m ()
putMVar MVar ()
finished ()
                                                              IO ()
drainReplies

    Weak (MVar Transport)
_ <- MVar Transport -> IO () -> IO (Weak (MVar Transport))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
MVar a -> m () -> m (Weak (MVar a))
mkWeakMVar MVar Transport
vStream (IO () -> IO (Weak (MVar Transport)))
-> IO () -> IO (Weak (MVar Transport))
forall a b. (a -> b) -> a -> b
$ do
        ThreadId -> IO ()
killThread ThreadId
listenThread
        Transport -> IO ()
Tr.close Transport
stream
    Pipeline -> IO Pipeline
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Pipeline
pipe

isFinished :: Pipeline -> IO Bool
isFinished :: Pipeline -> IO Bool
isFinished Pipeline {MVar ()
finished :: Pipeline -> MVar ()
finished :: MVar ()
finished} = do
  Bool
empty <- MVar () -> IO Bool
forall (m :: * -> *) a. MonadBase IO m => MVar a -> m Bool
isEmptyMVar MVar ()
finished
  Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool -> Bool
not Bool
empty

close :: Pipeline -> IO ()
-- ^ Close pipe and underlying connection
close :: Pipeline -> IO ()
close Pipeline{ThreadId
MVar ()
MVar Transport
TChan (MVar (Either IOError Response))
ServerData
vStream :: Pipeline -> MVar Transport
responseQueue :: Pipeline -> TChan (MVar (Either IOError Response))
listenThread :: Pipeline -> ThreadId
finished :: Pipeline -> MVar ()
serverData :: Pipeline -> ServerData
vStream :: MVar Transport
responseQueue :: TChan (MVar (Either IOError Response))
listenThread :: ThreadId
finished :: MVar ()
serverData :: ServerData
..} = do
    ThreadId -> IO ()
killThread ThreadId
listenThread
    Transport -> IO ()
Tr.close (Transport -> IO ()) -> IO Transport -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MVar Transport -> IO Transport
forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
readMVar MVar Transport
vStream

isClosed :: Pipeline -> IO Bool
isClosed :: Pipeline -> IO Bool
isClosed Pipeline{ThreadId
listenThread :: Pipeline -> ThreadId
listenThread :: ThreadId
listenThread} = do
    ThreadStatus
status <- ThreadId -> IO ThreadStatus
threadStatus ThreadId
listenThread
    Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ case ThreadStatus
status of
        ThreadStatus
ThreadRunning -> Bool
False
        ThreadStatus
ThreadFinished -> Bool
True
        ThreadBlocked BlockReason
_ -> Bool
False
        ThreadStatus
ThreadDied -> Bool
True

--isPipeClosed Pipeline{..} = isClosed =<< readMVar vHandle  -- isClosed hangs while listen loop is waiting on read

listen :: Pipeline -> IO ()
-- ^ Listen for responses and supply them to waiting threads in order
listen :: Pipeline -> IO ()
listen Pipeline{ThreadId
MVar ()
MVar Transport
TChan (MVar (Either IOError Response))
ServerData
vStream :: Pipeline -> MVar Transport
responseQueue :: Pipeline -> TChan (MVar (Either IOError Response))
listenThread :: Pipeline -> ThreadId
finished :: Pipeline -> MVar ()
serverData :: Pipeline -> ServerData
vStream :: MVar Transport
responseQueue :: TChan (MVar (Either IOError Response))
listenThread :: ThreadId
finished :: MVar ()
serverData :: ServerData
..} = do
    Transport
stream <- MVar Transport -> IO Transport
forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
readMVar MVar Transport
vStream
    IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        Either IOError Response
e <- IO Response -> IO (Either IOError Response)
forall (m :: * -> *) e a.
(MonadBaseControl IO m, Exception e) =>
m a -> m (Either e a)
try (IO Response -> IO (Either IOError Response))
-> IO Response -> IO (Either IOError Response)
forall a b. (a -> b) -> a -> b
$ Transport -> IO Response
readMessage Transport
stream
        MVar (Either IOError Response)
var <- STM (MVar (Either IOError Response))
-> IO (MVar (Either IOError Response))
forall a. STM a -> IO a
atomically (STM (MVar (Either IOError Response))
 -> IO (MVar (Either IOError Response)))
-> STM (MVar (Either IOError Response))
-> IO (MVar (Either IOError Response))
forall a b. (a -> b) -> a -> b
$ TChan (MVar (Either IOError Response))
-> STM (MVar (Either IOError Response))
forall a. TChan a -> STM a
readTChan TChan (MVar (Either IOError Response))
responseQueue
        MVar (Either IOError Response) -> Either IOError Response -> IO ()
forall (m :: * -> *) a. MonadBase IO m => MVar a -> a -> m ()
putMVar MVar (Either IOError Response)
var Either IOError Response
e
        case Either IOError Response
e of
            Left IOError
err -> Transport -> IO ()
Tr.close Transport
stream IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IOError -> IO ()
forall a. IOError -> IO a
ioError IOError
err  -- close and stop looping
            Right Response
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

psend :: Pipeline -> Message -> IO ()
-- ^ Send message to destination; the destination must not response (otherwise future 'call's will get these responses instead of their own).
-- Throw IOError and close pipeline if send fails
psend :: Pipeline -> Message -> IO ()
psend p :: Pipeline
p@Pipeline{ThreadId
MVar ()
MVar Transport
TChan (MVar (Either IOError Response))
ServerData
vStream :: Pipeline -> MVar Transport
responseQueue :: Pipeline -> TChan (MVar (Either IOError Response))
listenThread :: Pipeline -> ThreadId
finished :: Pipeline -> MVar ()
serverData :: Pipeline -> ServerData
vStream :: MVar Transport
responseQueue :: TChan (MVar (Either IOError Response))
listenThread :: ThreadId
finished :: MVar ()
serverData :: ServerData
..} !Message
message = MVar Transport -> (Transport -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
MVar a -> (a -> m b) -> m b
withMVar MVar Transport
vStream ((Transport -> Message -> IO ()) -> Message -> Transport -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Transport -> Message -> IO ()
writeMessage Message
message) IO () -> IO () -> IO ()
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`onException` Pipeline -> IO ()
close Pipeline
p

psendOpMsg :: Pipeline -> [Cmd] -> Maybe FlagBit -> Document -> IO ()-- IO (IO Response)
psendOpMsg :: Pipeline -> [Cmd] -> Maybe FlagBit -> Document -> IO ()
psendOpMsg p :: Pipeline
p@Pipeline{ThreadId
MVar ()
MVar Transport
TChan (MVar (Either IOError Response))
ServerData
vStream :: Pipeline -> MVar Transport
responseQueue :: Pipeline -> TChan (MVar (Either IOError Response))
listenThread :: Pipeline -> ThreadId
finished :: Pipeline -> MVar ()
serverData :: Pipeline -> ServerData
vStream :: MVar Transport
responseQueue :: TChan (MVar (Either IOError Response))
listenThread :: ThreadId
finished :: MVar ()
serverData :: ServerData
..} [Cmd]
commands Maybe FlagBit
flagBit Document
params =
  case Maybe FlagBit
flagBit of
    Just FlagBit
f -> case FlagBit
f of
               FlagBit
MoreToCome -> MVar Transport -> (Transport -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
MVar a -> (a -> m b) -> m b
withMVar MVar Transport
vStream (\Transport
t -> Transport -> OpMsgMessage -> Maybe FlagBit -> Document -> IO ()
writeOpMsgMessage Transport
t ([Cmd]
commands, Maybe (Request, Int32)
forall a. Maybe a
Nothing) Maybe FlagBit
flagBit Document
params) IO () -> IO () -> IO ()
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`onException` Pipeline -> IO ()
close Pipeline
p -- >> return (return (0, ReplyEmpty))
               FlagBit
_ -> [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"moreToCome has to be set if no response is expected"
    Maybe FlagBit
_ -> [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"moreToCome has to be set if no response is expected"

pcall :: Pipeline -> Message -> IO (IO Response)
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
-- Throw IOError and closes pipeline if send fails, likewise for promised response.
pcall :: Pipeline -> Message -> IO (IO Response)
pcall p :: Pipeline
p@Pipeline{ThreadId
MVar ()
MVar Transport
TChan (MVar (Either IOError Response))
ServerData
vStream :: Pipeline -> MVar Transport
responseQueue :: Pipeline -> TChan (MVar (Either IOError Response))
listenThread :: Pipeline -> ThreadId
finished :: Pipeline -> MVar ()
serverData :: Pipeline -> ServerData
vStream :: MVar Transport
responseQueue :: TChan (MVar (Either IOError Response))
listenThread :: ThreadId
finished :: MVar ()
serverData :: ServerData
..} Message
message = do
  Bool
listenerStopped <- Pipeline -> IO Bool
isFinished Pipeline
p
  if Bool
listenerStopped
    then IOError -> IO (IO Response)
forall a. IOError -> IO a
ioError (IOError -> IO (IO Response)) -> IOError -> IO (IO Response)
forall a b. (a -> b) -> a -> b
$ IOErrorType -> [Char] -> Maybe Handle -> Maybe [Char] -> IOError
mkIOError IOErrorType
doesNotExistErrorType [Char]
"Handle has been closed" Maybe Handle
forall a. Maybe a
Nothing Maybe [Char]
forall a. Maybe a
Nothing
    else MVar Transport
-> (Transport -> IO (IO Response)) -> IO (IO Response)
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
MVar a -> (a -> m b) -> m b
withMVar MVar Transport
vStream Transport -> IO (IO Response)
forall {m :: * -> *}.
MonadBase IO m =>
Transport -> IO (m Response)
doCall IO (IO Response) -> IO () -> IO (IO Response)
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`onException` Pipeline -> IO ()
close Pipeline
p
  where
    doCall :: Transport -> IO (m Response)
doCall Transport
stream = do
        Transport -> Message -> IO ()
writeMessage Transport
stream Message
message
        MVar (Either IOError Response)
var <- IO (MVar (Either IOError Response))
forall (m :: * -> *) a. MonadBase IO m => m (MVar a)
newEmptyMVar
        IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan (MVar (Either IOError Response))
-> MVar (Either IOError Response) -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (MVar (Either IOError Response))
responseQueue MVar (Either IOError Response)
var
        m Response -> IO (m Response)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (m Response -> IO (m Response)) -> m Response -> IO (m Response)
forall a b. (a -> b) -> a -> b
$ MVar (Either IOError Response) -> m (Either IOError Response)
forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
readMVar MVar (Either IOError Response)
var m (Either IOError Response)
-> (Either IOError Response -> m Response) -> m Response
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (IOError -> m Response)
-> (Response -> m Response)
-> Either IOError Response
-> m Response
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either IOError -> m Response
forall (m :: * -> *) e a. (MonadBase IO m, Exception e) => e -> m a
throwIO Response -> m Response
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return -- return promise

pcallOpMsg :: Pipeline -> Maybe (Request, RequestId) -> Maybe FlagBit -> Document -> IO (IO Response)
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
-- Throw IOError and closes pipeline if send fails, likewise for promised response.
pcallOpMsg :: Pipeline
-> Maybe (Request, Int32)
-> Maybe FlagBit
-> Document
-> IO (IO Response)
pcallOpMsg p :: Pipeline
p@Pipeline{ThreadId
MVar ()
MVar Transport
TChan (MVar (Either IOError Response))
ServerData
vStream :: Pipeline -> MVar Transport
responseQueue :: Pipeline -> TChan (MVar (Either IOError Response))
listenThread :: Pipeline -> ThreadId
finished :: Pipeline -> MVar ()
serverData :: Pipeline -> ServerData
vStream :: MVar Transport
responseQueue :: TChan (MVar (Either IOError Response))
listenThread :: ThreadId
finished :: MVar ()
serverData :: ServerData
..} Maybe (Request, Int32)
message Maybe FlagBit
flagbit Document
params = do
  Bool
listenerStopped <- Pipeline -> IO Bool
isFinished Pipeline
p
  if Bool
listenerStopped
    then IOError -> IO (IO Response)
forall a. IOError -> IO a
ioError (IOError -> IO (IO Response)) -> IOError -> IO (IO Response)
forall a b. (a -> b) -> a -> b
$ IOErrorType -> [Char] -> Maybe Handle -> Maybe [Char] -> IOError
mkIOError IOErrorType
doesNotExistErrorType [Char]
"Handle has been closed" Maybe Handle
forall a. Maybe a
Nothing Maybe [Char]
forall a. Maybe a
Nothing
    else MVar Transport
-> (Transport -> IO (IO Response)) -> IO (IO Response)
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
MVar a -> (a -> m b) -> m b
withMVar MVar Transport
vStream Transport -> IO (IO Response)
forall {m :: * -> *}.
MonadBase IO m =>
Transport -> IO (m Response)
doCall IO (IO Response) -> IO () -> IO (IO Response)
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`onException` Pipeline -> IO ()
close Pipeline
p
  where
    doCall :: Transport -> IO (m Response)
doCall Transport
stream = do
        Transport -> OpMsgMessage -> Maybe FlagBit -> Document -> IO ()
writeOpMsgMessage Transport
stream ([], Maybe (Request, Int32)
message) Maybe FlagBit
flagbit Document
params
        MVar (Either IOError Response)
var <- IO (MVar (Either IOError Response))
forall (m :: * -> *) a. MonadBase IO m => m (MVar a)
newEmptyMVar
        -- put var into the response-queue so that it can
        -- fetch the latest response
        IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan (MVar (Either IOError Response))
-> MVar (Either IOError Response) -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (MVar (Either IOError Response))
responseQueue MVar (Either IOError Response)
var
        m Response -> IO (m Response)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (m Response -> IO (m Response)) -> m Response -> IO (m Response)
forall a b. (a -> b) -> a -> b
$ MVar (Either IOError Response) -> m (Either IOError Response)
forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
readMVar MVar (Either IOError Response)
var m (Either IOError Response)
-> (Either IOError Response -> m Response) -> m Response
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (IOError -> m Response)
-> (Response -> m Response)
-> Either IOError Response
-> m Response
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either IOError -> m Response
forall (m :: * -> *) e a. (MonadBase IO m, Exception e) => e -> m a
throwIO Response -> m Response
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return -- return promise

-- * Pipe

type Pipe = Pipeline
-- ^ Thread-safe TCP connection with pipelined requests. In long-running applications the user is expected to use it as a "client": create a `Pipe`
-- at startup, use it as long as possible, watch out for possible timeouts, and close it on shutdown. Bearing in mind that disconnections may be triggered by MongoDB service providers, the user is responsible for re-creating their `Pipe` whenever necessary.

newPipe :: ServerData -> Handle -> IO Pipe
-- ^ Create pipe over handle
newPipe :: ServerData -> Handle -> IO Pipeline
newPipe ServerData
sd Handle
handle = Handle -> IO Transport
Tr.fromHandle Handle
handle IO Transport -> (Transport -> IO Pipeline) -> IO Pipeline
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (ServerData -> Transport -> IO Pipeline
newPipeWith ServerData
sd)

newPipeWith :: ServerData -> Transport -> IO Pipe
-- ^ Create pipe over connection
newPipeWith :: ServerData -> Transport -> IO Pipeline
newPipeWith ServerData
sd Transport
conn = ServerData -> Transport -> IO Pipeline
newPipeline ServerData
sd Transport
conn

send :: Pipe -> [Notice] -> IO ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
send :: Pipeline -> [Notice] -> IO ()
send Pipeline
pipe [Notice]
notices = Pipeline -> Message -> IO ()
psend Pipeline
pipe ([Notice]
notices, Maybe (Request, Int32)
forall a. Maybe a
Nothing)

sendOpMsg :: Pipe -> [Cmd] -> Maybe FlagBit -> Document -> IO ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
sendOpMsg :: Pipeline -> [Cmd] -> Maybe FlagBit -> Document -> IO ()
sendOpMsg Pipeline
pipe commands :: [Cmd]
commands@(Nc Notice
_ : [Cmd]
_) Maybe FlagBit
flagBit Document
params =  Pipeline -> [Cmd] -> Maybe FlagBit -> Document -> IO ()
psendOpMsg Pipeline
pipe [Cmd]
commands Maybe FlagBit
flagBit Document
params
sendOpMsg Pipeline
pipe commands :: [Cmd]
commands@(Kc KillC
_ : [Cmd]
_) Maybe FlagBit
flagBit Document
params =  Pipeline -> [Cmd] -> Maybe FlagBit -> Document -> IO ()
psendOpMsg Pipeline
pipe [Cmd]
commands Maybe FlagBit
flagBit Document
params
sendOpMsg Pipeline
_ [Cmd]
_ Maybe FlagBit
_ Document
_ =  [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"This function only supports Cmd types wrapped in Nc or Kc type constructors"

call :: Pipe -> [Notice] -> Request -> IO (IO Reply)
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
call :: Pipeline -> [Notice] -> Request -> IO (IO Reply)
call Pipeline
pipe [Notice]
notices Request
request = do
    Int32
requestId <- IO Int32
forall (m :: * -> *). MonadIO m => m Int32
genRequestId
    IO Response
promise <- Pipeline -> Message -> IO (IO Response)
pcall Pipeline
pipe ([Notice]
notices, (Request, Int32) -> Maybe (Request, Int32)
forall a. a -> Maybe a
Just (Request
request, Int32
requestId))
    IO Reply -> IO (IO Reply)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO Reply -> IO (IO Reply)) -> IO Reply -> IO (IO Reply)
forall a b. (a -> b) -> a -> b
$ Int32 -> Response -> Reply
forall {a} {b}. (Eq a, Show a) => a -> (a, b) -> b
check Int32
requestId (Response -> Reply) -> IO Response -> IO Reply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO Response
promise
 where
    check :: a -> (a, b) -> b
check a
requestId (a
responseTo, b
reply) = if a
requestId a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
responseTo then b
reply else
        [Char] -> b
forall a. HasCallStack => [Char] -> a
error ([Char] -> b) -> [Char] -> b
forall a b. (a -> b) -> a -> b
$ [Char]
"expected response id (" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ a -> [Char]
forall a. Show a => a -> [Char]
show a
responseTo [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
") to match request id (" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ a -> [Char]
forall a. Show a => a -> [Char]
show a
requestId [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
")"

callOpMsg :: Pipe -> Request -> Maybe FlagBit -> Document -> IO (IO Reply)
-- ^ Send requests as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
callOpMsg :: Pipeline -> Request -> Maybe FlagBit -> Document -> IO (IO Reply)
callOpMsg Pipeline
pipe Request
request Maybe FlagBit
flagBit Document
params = do
    Int32
requestId <- IO Int32
forall (m :: * -> *). MonadIO m => m Int32
genRequestId
    IO Response
promise <- Pipeline
-> Maybe (Request, Int32)
-> Maybe FlagBit
-> Document
-> IO (IO Response)
pcallOpMsg Pipeline
pipe ((Request, Int32) -> Maybe (Request, Int32)
forall a. a -> Maybe a
Just (Request
request, Int32
requestId)) Maybe FlagBit
flagBit Document
params
    Response
promise' <- IO Response
promise :: IO Response
    IO Reply -> IO (IO Reply)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO Reply -> IO (IO Reply)) -> IO Reply -> IO (IO Reply)
forall a b. (a -> b) -> a -> b
$ Response -> Reply
forall a b. (a, b) -> b
snd (Response -> Reply) -> IO Response -> IO Reply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int32 -> Response -> IO Response
produce Int32
requestId Response
promise'
 where
   -- We need to perform streaming here as within the OP_MSG protocol mongoDB expects
   -- our client to keep receiving messages after the MoreToCome flagbit was
   -- set by the server until our client receives an empty flagbit. After the
   -- first MoreToCome flagbit was set the responseTo field in the following
   -- headers will reference the cursorId that was set in the previous message.
   -- see:
   -- https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst#moretocome-on-responses
    checkFlagBit :: (a, Reply) -> Bool
checkFlagBit (a, Reply)
p =
      case (a, Reply)
p of
        (a
_, Reply
r) ->
          case Reply
r of
            ReplyOpMsg{[Document]
[FlagBit]
Maybe Int32
flagBits :: [FlagBit]
sections :: [Document]
checksum :: Maybe Int32
flagBits :: Reply -> [FlagBit]
sections :: Reply -> [Document]
checksum :: Reply -> Maybe Int32
..} -> [FlagBit]
flagBits [FlagBit] -> [FlagBit] -> Bool
forall a. Eq a => a -> a -> Bool
== [FlagBit
MoreToCome]
             -- This is called by functions using the OP_MSG protocol,
             -- so this has to be ReplyOpMsg
            Reply
_ -> [Char] -> Bool
forall a. HasCallStack => [Char] -> a
error [Char]
"Impossible"
    produce :: Int32 -> Response -> IO Response
produce Int32
reqId Response
p = ConduitT () Void IO Response -> IO Response
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void IO Response -> IO Response)
-> ConduitT () Void IO Response -> IO Response
forall a b. (a -> b) -> a -> b
$
      case Response
p of
        (Int32
rt, Reply
r) ->
          case Reply
r of
              ReplyOpMsg{[Document]
[FlagBit]
Maybe Int32
flagBits :: Reply -> [FlagBit]
sections :: Reply -> [Document]
checksum :: Reply -> Maybe Int32
flagBits :: [FlagBit]
sections :: [Document]
checksum :: Maybe Int32
..} ->
                if [FlagBit]
flagBits [FlagBit] -> [FlagBit] -> Bool
forall a. Eq a => a -> a -> Bool
== [FlagBit
MoreToCome]
                  then ConduitT () Response IO ()
forall {i}. ConduitT i Response IO ()
yieldResponses ConduitT () Response IO ()
-> ConduitT Response Void IO Response
-> ConduitT () Void IO Response
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| (Response -> Response -> Response)
-> Response -> ConduitT Response Void IO Response
forall (m :: * -> *) a b o.
Monad m =>
(a -> b -> a) -> a -> ConduitT b o m a
foldlC Response -> Response -> Response
forall {a}. Response -> (a, Reply) -> Response
mergeResponses Response
p
                  else Response -> ConduitT () Void IO Response
forall a. a -> ConduitT () Void IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Response -> ConduitT () Void IO Response)
-> Response -> ConduitT () Void IO Response
forall a b. (a -> b) -> a -> b
$ (Int32
rt, Int32 -> Response -> Reply
forall {a} {b}. (Eq a, Show a) => a -> (a, b) -> b
check Int32
reqId Response
p)
              Reply
_ -> [Char] -> ConduitT () Void IO Response
forall a. HasCallStack => [Char] -> a
error [Char]
"Impossible" -- see comment above
    yieldResponses :: ConduitT i Response IO ()
yieldResponses = IO Response -> (Response -> Bool) -> ConduitT i Response IO ()
forall (m :: * -> *) a i.
Monad m =>
m a -> (a -> Bool) -> ConduitT i a m ()
repeatWhileMC
          (do
             MVar (Either IOError Response)
var <- IO (MVar (Either IOError Response))
forall (m :: * -> *) a. MonadBase IO m => m (MVar a)
newEmptyMVar
             IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan (MVar (Either IOError Response))
-> MVar (Either IOError Response) -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan (Pipeline -> TChan (MVar (Either IOError Response))
responseQueue Pipeline
pipe) MVar (Either IOError Response)
var
             MVar (Either IOError Response) -> IO (Either IOError Response)
forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
readMVar MVar (Either IOError Response)
var IO (Either IOError Response)
-> (Either IOError Response -> IO Response) -> IO Response
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (IOError -> IO Response)
-> (Response -> IO Response)
-> Either IOError Response
-> IO Response
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either IOError -> IO Response
forall (m :: * -> *) e a. (MonadBase IO m, Exception e) => e -> m a
throwIO Response -> IO Response
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return :: IO Response
          )
          Response -> Bool
forall {a}. (a, Reply) -> Bool
checkFlagBit
    mergeResponses :: Response -> (a, Reply) -> Response
mergeResponses p :: Response
p@(Int32
rt,Reply
rep) (a, Reply)
p' =
      case (Response
p, (a, Reply)
p') of
          ((Int32
_, Reply
r), (a
_, Reply
r')) ->
            case (Reply
r, Reply
r') of
                (ReplyOpMsg [FlagBit]
_ [Document]
sec Maybe Int32
_, ReplyOpMsg [FlagBit]
_ [Document]
sec' Maybe Int32
_) -> do
                    let (Document
section, Document
section') = ([Document] -> Document
forall a. HasCallStack => [a] -> a
head [Document]
sec, [Document] -> Document
forall a. HasCallStack => [a] -> a
head [Document]
sec')
                        (Maybe Document
cur, Maybe Document
cur') = (Maybe Document
-> (Value -> Maybe Document) -> Maybe Value -> Maybe Document
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Maybe Document
forall a. Maybe a
Nothing Value -> Maybe Document
forall a (m :: * -> *). (Val a, MonadFail m) => Value -> m a
cast (Maybe Value -> Maybe Document) -> Maybe Value -> Maybe Document
forall a b. (a -> b) -> a -> b
$ Text -> Document -> Maybe Value
forall (m :: * -> *). MonadFail m => Text -> Document -> m Value
look Text
"cursor" Document
section,
                                      Maybe Document
-> (Value -> Maybe Document) -> Maybe Value -> Maybe Document
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Maybe Document
forall a. Maybe a
Nothing Value -> Maybe Document
forall a (m :: * -> *). (Val a, MonadFail m) => Value -> m a
cast (Maybe Value -> Maybe Document) -> Maybe Value -> Maybe Document
forall a b. (a -> b) -> a -> b
$ Text -> Document -> Maybe Value
forall (m :: * -> *). MonadFail m => Text -> Document -> m Value
look Text
"cursor" Document
section')
                    case (Maybe Document
cur, Maybe Document
cur') of
                      (Just Document
doc, Just Document
doc') -> do
                        let ([Document]
docs, [Document]
docs') =
                              ( Maybe [Document] -> [Document]
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe [Document] -> [Document]) -> Maybe [Document] -> [Document]
forall a b. (a -> b) -> a -> b
$ Value -> Maybe [Document]
forall a (m :: * -> *). (Val a, MonadFail m) => Value -> m a
cast (Value -> Maybe [Document]) -> Value -> Maybe [Document]
forall a b. (a -> b) -> a -> b
$ Text -> Document -> Value
valueAt Text
"nextBatch" Document
doc :: [Document]
                              , Maybe [Document] -> [Document]
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe [Document] -> [Document]) -> Maybe [Document] -> [Document]
forall a b. (a -> b) -> a -> b
$ Value -> Maybe [Document]
forall a (m :: * -> *). (Val a, MonadFail m) => Value -> m a
cast (Value -> Maybe [Document]) -> Value -> Maybe [Document]
forall a b. (a -> b) -> a -> b
$ Text -> Document -> Value
valueAt Text
"nextBatch" Document
doc' :: [Document])
                            id' :: Int32
id' = Maybe Int32 -> Int32
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe Int32 -> Int32) -> Maybe Int32 -> Int32
forall a b. (a -> b) -> a -> b
$ Value -> Maybe Int32
forall a (m :: * -> *). (Val a, MonadFail m) => Value -> m a
cast (Value -> Maybe Int32) -> Value -> Maybe Int32
forall a b. (a -> b) -> a -> b
$ Text -> Document -> Value
valueAt Text
"id" Document
doc' :: Int32
                        (Int32
rt, Int32 -> Response -> Reply
forall {a} {b}. (Eq a, Show a) => a -> (a, b) -> b
check Int32
id' (Int32
rt, Reply
rep{ sections = docs' ++ docs })) -- todo: avoid (++)
                        -- Since we use this to process moreToCome messages, we
                        -- know that there will be a nextBatch key in the document
                      (Maybe Document, Maybe Document)
_ ->  [Char] -> Response
forall a. HasCallStack => [Char] -> a
error [Char]
"Impossible"
                (Reply, Reply)
_ -> [Char] -> Response
forall a. HasCallStack => [Char] -> a
error [Char]
"Impossible" -- see comment above
    check :: a -> (a, b) -> b
check a
requestId (a
responseTo, b
reply) = if a
requestId a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
responseTo then b
reply else
        [Char] -> b
forall a. HasCallStack => [Char] -> a
error ([Char] -> b) -> [Char] -> b
forall a b. (a -> b) -> a -> b
$ [Char]
"expected response id (" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ a -> [Char]
forall a. Show a => a -> [Char]
show a
responseTo [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
") to match request id (" [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ a -> [Char]
forall a. Show a => a -> [Char]
show a
requestId [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
")"

-- * Message

type Message = ([Notice], Maybe (Request, RequestId))
-- ^ A write notice(s) with getLastError request, or just query request.
-- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness.
type OpMsgMessage = ([Cmd], Maybe (Request, RequestId))

writeMessage :: Transport -> Message -> IO ()
-- ^ Write message to connection
writeMessage :: Transport -> Message -> IO ()
writeMessage Transport
conn ([Notice]
notices, Maybe (Request, Int32)
mRequest) = do
    [ByteString]
noticeStrings <- [Notice] -> (Notice -> IO ByteString) -> IO [ByteString]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Notice]
notices ((Notice -> IO ByteString) -> IO [ByteString])
-> (Notice -> IO ByteString) -> IO [ByteString]
forall a b. (a -> b) -> a -> b
$ \Notice
n -> do
          Int32
requestId <- IO Int32
forall (m :: * -> *). MonadIO m => m Int32
genRequestId
          let s :: ByteString
s = Put -> ByteString
runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ Notice -> Int32 -> Put
putNotice Notice
n Int32
requestId
          ByteString -> IO ByteString
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ (ByteString -> ByteString
lenBytes ByteString
s) ByteString -> ByteString -> ByteString
`L.append` ByteString
s

    let requestString :: Maybe ByteString
requestString = do
          (Request
request, Int32
requestId) <- Maybe (Request, Int32)
mRequest
          let s :: ByteString
s = Put -> ByteString
runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ Request -> Int32 -> Put
putRequest Request
request Int32
requestId
          ByteString -> Maybe ByteString
forall a. a -> Maybe a
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> Maybe ByteString) -> ByteString -> Maybe ByteString
forall a b. (a -> b) -> a -> b
$ (ByteString -> ByteString
lenBytes ByteString
s) ByteString -> ByteString -> ByteString
`L.append` ByteString
s

    Transport -> ByteString -> IO ()
Tr.write Transport
conn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
L.toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
L.concat ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString]
noticeStrings [ByteString] -> [ByteString] -> [ByteString]
forall a. [a] -> [a] -> [a]
++ (Maybe ByteString -> [ByteString]
forall a. Maybe a -> [a]
maybeToList Maybe ByteString
requestString)
    Transport -> IO ()
Tr.flush Transport
conn
 where
    lenBytes :: ByteString -> ByteString
lenBytes ByteString
bytes = Int32 -> ByteString
encodeSize (Int32 -> ByteString) -> (Int64 -> Int32) -> Int64 -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Int32
forall a. Enum a => Int -> a
toEnum (Int -> Int32) -> (Int64 -> Int) -> Int64 -> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int64 -> Int
forall a. Enum a => a -> Int
fromEnum (Int64 -> ByteString) -> Int64 -> ByteString
forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
L.length ByteString
bytes
    encodeSize :: Int32 -> ByteString
encodeSize = Put -> ByteString
runPut (Put -> ByteString) -> (Int32 -> Put) -> Int32 -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int32 -> Put
putInt32 (Int32 -> Put) -> (Int32 -> Int32) -> Int32 -> Put
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
4)

writeOpMsgMessage :: Transport -> OpMsgMessage -> Maybe FlagBit -> Document -> IO ()
-- ^ Write message to connection
writeOpMsgMessage :: Transport -> OpMsgMessage -> Maybe FlagBit -> Document -> IO ()
writeOpMsgMessage Transport
conn ([Cmd]
notices, Maybe (Request, Int32)
mRequest) Maybe FlagBit
flagBit Document
params = do
    [ByteString]
noticeStrings <- [Cmd] -> (Cmd -> IO ByteString) -> IO [ByteString]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Cmd]
notices ((Cmd -> IO ByteString) -> IO [ByteString])
-> (Cmd -> IO ByteString) -> IO [ByteString]
forall a b. (a -> b) -> a -> b
$ \Cmd
n -> do
          Int32
requestId <- IO Int32
forall (m :: * -> *). MonadIO m => m Int32
genRequestId
          let s :: ByteString
s = Put -> ByteString
runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ Cmd -> Int32 -> Maybe FlagBit -> Document -> Put
putOpMsg Cmd
n Int32
requestId Maybe FlagBit
flagBit Document
params
          ByteString -> IO ByteString
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ (ByteString -> ByteString
lenBytes ByteString
s) ByteString -> ByteString -> ByteString
`L.append` ByteString
s

    let requestString :: Maybe ByteString
requestString = do
           (Request
request, Int32
requestId) <- Maybe (Request, Int32)
mRequest
           let s :: ByteString
s = Put -> ByteString
runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ Cmd -> Int32 -> Maybe FlagBit -> Document -> Put
putOpMsg (Request -> Cmd
Req Request
request) Int32
requestId Maybe FlagBit
flagBit Document
params
           ByteString -> Maybe ByteString
forall a. a -> Maybe a
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> Maybe ByteString) -> ByteString -> Maybe ByteString
forall a b. (a -> b) -> a -> b
$ (ByteString -> ByteString
lenBytes ByteString
s) ByteString -> ByteString -> ByteString
`L.append` ByteString
s

    Transport -> ByteString -> IO ()
Tr.write Transport
conn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
L.toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
L.concat ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString]
noticeStrings [ByteString] -> [ByteString] -> [ByteString]
forall a. [a] -> [a] -> [a]
++ (Maybe ByteString -> [ByteString]
forall a. Maybe a -> [a]
maybeToList Maybe ByteString
requestString)
    Transport -> IO ()
Tr.flush Transport
conn
 where
    lenBytes :: ByteString -> ByteString
lenBytes ByteString
bytes = Int32 -> ByteString
encodeSize (Int32 -> ByteString) -> (Int64 -> Int32) -> Int64 -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Int32
forall a. Enum a => Int -> a
toEnum (Int -> Int32) -> (Int64 -> Int) -> Int64 -> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int64 -> Int
forall a. Enum a => a -> Int
fromEnum (Int64 -> ByteString) -> Int64 -> ByteString
forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
L.length ByteString
bytes
    encodeSize :: Int32 -> ByteString
encodeSize = Put -> ByteString
runPut (Put -> ByteString) -> (Int32 -> Put) -> Int32 -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int32 -> Put
putInt32 (Int32 -> Put) -> (Int32 -> Int32) -> Int32 -> Put
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
4)

type Response = (ResponseTo, Reply)
-- ^ Message received from a Mongo server in response to a Request

readMessage :: Transport -> IO Response
-- ^ read response from a connection
readMessage :: Transport -> IO Response
readMessage Transport
conn = IO Response
readResp  where
    readResp :: IO Response
readResp = do
        Int
len <- Int32 -> Int
forall a. Enum a => a -> Int
fromEnum (Int32 -> Int) -> (ByteString -> Int32) -> ByteString -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int32
decodeSize (ByteString -> Int32)
-> (ByteString -> ByteString) -> ByteString -> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
L.fromStrict (ByteString -> Int) -> IO ByteString -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Transport -> Int -> IO ByteString
Tr.read Transport
conn Int
4
        Get Response -> ByteString -> Response
forall a. Get a -> ByteString -> a
runGet Get Response
getReply (ByteString -> Response)
-> (ByteString -> ByteString) -> ByteString -> Response
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
L.fromStrict (ByteString -> Response) -> IO ByteString -> IO Response
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Transport -> Int -> IO ByteString
Tr.read Transport
conn Int
len
    decodeSize :: ByteString -> Int32
decodeSize = Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
subtract Int32
4 (Int32 -> Int32) -> (ByteString -> Int32) -> ByteString -> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Get Int32 -> ByteString -> Int32
forall a. Get a -> ByteString -> a
runGet Get Int32
getInt32

type FullCollection = Text
-- ^ Database name and collection name with period (.) in between. Eg. \"myDb.myCollection\"

-- ** Header

type Opcode = Int32

type RequestId = Int32
-- ^ A fresh request id is generated for every message

type ResponseTo = RequestId

genRequestId :: (MonadIO m) => m RequestId
-- ^ Generate fresh request id
{-# NOINLINE genRequestId #-}
genRequestId :: forall (m :: * -> *). MonadIO m => m Int32
genRequestId = IO Int32 -> m Int32
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int32 -> m Int32) -> IO Int32 -> m Int32
forall a b. (a -> b) -> a -> b
$ IORef Int32 -> (Int32 -> (Int32, Int32)) -> IO Int32
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef Int32
counter ((Int32 -> (Int32, Int32)) -> IO Int32)
-> (Int32 -> (Int32, Int32)) -> IO Int32
forall a b. (a -> b) -> a -> b
$ \Int32
n -> (Int32
n Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
1, Int32
n) where
    counter :: IORef RequestId
    counter :: IORef Int32
counter = IO (IORef Int32) -> IORef Int32
forall a. IO a -> a
unsafePerformIO (Int32 -> IO (IORef Int32)
forall a. a -> IO (IORef a)
newIORef Int32
0)
    {-# NOINLINE counter #-}

-- *** Binary format

putHeader :: Opcode -> RequestId -> Put
-- ^ Note, does not write message length (first int32), assumes caller will write it
putHeader :: Int32 -> Int32 -> Put
putHeader Int32
opcode Int32
requestId = do
    Int32 -> Put
putInt32 Int32
requestId
    Int32 -> Put
putInt32 Int32
0
    Int32 -> Put
putInt32 Int32
opcode

putOpMsgHeader :: Opcode -> RequestId -> Put
-- ^ Note, does not write message length (first int32), assumes caller will write it
putOpMsgHeader :: Int32 -> Int32 -> Put
putOpMsgHeader Int32
opcode Int32
requestId = do
    Int32 -> Put
putInt32 Int32
requestId
    Int32 -> Put
putInt32 Int32
0
    Int32 -> Put
putInt32 Int32
opcode

getHeader :: Get (Opcode, ResponseTo)
-- ^ Note, does not read message length (first int32), assumes it was already read
getHeader :: Get (Int32, Int32)
getHeader = do
    Int32
_requestId <- Get Int32
getInt32
    Int32
responseTo <- Get Int32
getInt32
    Int32
opcode <- Get Int32
getInt32
    (Int32, Int32) -> Get (Int32, Int32)
forall a. a -> Get a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int32
opcode, Int32
responseTo)

-- ** Notice

-- | A notice is a message that is sent with no reply
data Notice =
      Insert {
        Notice -> Text
iFullCollection :: FullCollection,
        Notice -> [InsertOption]
iOptions :: [InsertOption],
        Notice -> [Document]
iDocuments :: [Document]}
    | Update {
        Notice -> Text
uFullCollection :: FullCollection,
        Notice -> [UpdateOption]
uOptions :: [UpdateOption],
        Notice -> Document
uSelector :: Document,
        Notice -> Document
uUpdater :: Document}
    | Delete {
        Notice -> Text
dFullCollection :: FullCollection,
        Notice -> [DeleteOption]
dOptions :: [DeleteOption],
        Notice -> Document
dSelector :: Document}
    | KillCursors {
        Notice -> [Int64]
kCursorIds :: [CursorId]}
    deriving (Int -> Notice -> ShowS
[Notice] -> ShowS
Notice -> [Char]
(Int -> Notice -> ShowS)
-> (Notice -> [Char]) -> ([Notice] -> ShowS) -> Show Notice
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Notice -> ShowS
showsPrec :: Int -> Notice -> ShowS
$cshow :: Notice -> [Char]
show :: Notice -> [Char]
$cshowList :: [Notice] -> ShowS
showList :: [Notice] -> ShowS
Show, Notice -> Notice -> Bool
(Notice -> Notice -> Bool)
-> (Notice -> Notice -> Bool) -> Eq Notice
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Notice -> Notice -> Bool
== :: Notice -> Notice -> Bool
$c/= :: Notice -> Notice -> Bool
/= :: Notice -> Notice -> Bool
Eq)

data InsertOption = KeepGoing  -- ^ If set, the database will not stop processing a bulk insert if one fails (eg due to duplicate IDs). This makes bulk insert behave similarly to a series of single inserts, except lastError will be set if any insert fails, not just the last one. (new in 1.9.1)
    deriving (Int -> InsertOption -> ShowS
[InsertOption] -> ShowS
InsertOption -> [Char]
(Int -> InsertOption -> ShowS)
-> (InsertOption -> [Char])
-> ([InsertOption] -> ShowS)
-> Show InsertOption
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> InsertOption -> ShowS
showsPrec :: Int -> InsertOption -> ShowS
$cshow :: InsertOption -> [Char]
show :: InsertOption -> [Char]
$cshowList :: [InsertOption] -> ShowS
showList :: [InsertOption] -> ShowS
Show, InsertOption -> InsertOption -> Bool
(InsertOption -> InsertOption -> Bool)
-> (InsertOption -> InsertOption -> Bool) -> Eq InsertOption
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: InsertOption -> InsertOption -> Bool
== :: InsertOption -> InsertOption -> Bool
$c/= :: InsertOption -> InsertOption -> Bool
/= :: InsertOption -> InsertOption -> Bool
Eq)

data UpdateOption =
      Upsert  -- ^ If set, the database will insert the supplied object into the collection if no matching document is found
    | MultiUpdate  -- ^ If set, the database will update all matching objects in the collection. Otherwise only updates first matching doc
    deriving (Int -> UpdateOption -> ShowS
[UpdateOption] -> ShowS
UpdateOption -> [Char]
(Int -> UpdateOption -> ShowS)
-> (UpdateOption -> [Char])
-> ([UpdateOption] -> ShowS)
-> Show UpdateOption
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UpdateOption -> ShowS
showsPrec :: Int -> UpdateOption -> ShowS
$cshow :: UpdateOption -> [Char]
show :: UpdateOption -> [Char]
$cshowList :: [UpdateOption] -> ShowS
showList :: [UpdateOption] -> ShowS
Show, UpdateOption -> UpdateOption -> Bool
(UpdateOption -> UpdateOption -> Bool)
-> (UpdateOption -> UpdateOption -> Bool) -> Eq UpdateOption
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: UpdateOption -> UpdateOption -> Bool
== :: UpdateOption -> UpdateOption -> Bool
$c/= :: UpdateOption -> UpdateOption -> Bool
/= :: UpdateOption -> UpdateOption -> Bool
Eq)

data DeleteOption = SingleRemove  -- ^ If set, the database will remove only the first matching document in the collection. Otherwise all matching documents will be removed
    deriving (Int -> DeleteOption -> ShowS
[DeleteOption] -> ShowS
DeleteOption -> [Char]
(Int -> DeleteOption -> ShowS)
-> (DeleteOption -> [Char])
-> ([DeleteOption] -> ShowS)
-> Show DeleteOption
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> DeleteOption -> ShowS
showsPrec :: Int -> DeleteOption -> ShowS
$cshow :: DeleteOption -> [Char]
show :: DeleteOption -> [Char]
$cshowList :: [DeleteOption] -> ShowS
showList :: [DeleteOption] -> ShowS
Show, DeleteOption -> DeleteOption -> Bool
(DeleteOption -> DeleteOption -> Bool)
-> (DeleteOption -> DeleteOption -> Bool) -> Eq DeleteOption
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: DeleteOption -> DeleteOption -> Bool
== :: DeleteOption -> DeleteOption -> Bool
$c/= :: DeleteOption -> DeleteOption -> Bool
/= :: DeleteOption -> DeleteOption -> Bool
Eq)

type CursorId = Int64

-- *** Binary format

nOpcode :: Notice -> Opcode
nOpcode :: Notice -> Int32
nOpcode Update{} = Int32
2001
nOpcode Insert{} = Int32
2002
nOpcode Delete{} = Int32
2006
nOpcode KillCursors{} = Int32
2007

putNotice :: Notice -> RequestId -> Put
putNotice :: Notice -> Int32 -> Put
putNotice Notice
notice Int32
requestId = do
    Int32 -> Int32 -> Put
putHeader (Notice -> Int32
nOpcode Notice
notice) Int32
requestId
    case Notice
notice of
        Insert{[Document]
[InsertOption]
Text
iFullCollection :: Notice -> Text
iOptions :: Notice -> [InsertOption]
iDocuments :: Notice -> [Document]
iFullCollection :: Text
iOptions :: [InsertOption]
iDocuments :: [Document]
..} -> do
            Int32 -> Put
putInt32 ([InsertOption] -> Int32
iBits [InsertOption]
iOptions)
            Text -> Put
putCString Text
iFullCollection
            (Document -> Put) -> [Document] -> Put
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Document -> Put
putDocument [Document]
iDocuments
        Update{Document
[UpdateOption]
Text
uFullCollection :: Notice -> Text
uOptions :: Notice -> [UpdateOption]
uSelector :: Notice -> Document
uUpdater :: Notice -> Document
uFullCollection :: Text
uOptions :: [UpdateOption]
uSelector :: Document
uUpdater :: Document
..} -> do
            Int32 -> Put
putInt32 Int32
0
            Text -> Put
putCString Text
uFullCollection
            Int32 -> Put
putInt32 ([UpdateOption] -> Int32
uBits [UpdateOption]
uOptions)
            Document -> Put
putDocument Document
uSelector
            Document -> Put
putDocument Document
uUpdater
        Delete{Document
[DeleteOption]
Text
dFullCollection :: Notice -> Text
dOptions :: Notice -> [DeleteOption]
dSelector :: Notice -> Document
dFullCollection :: Text
dOptions :: [DeleteOption]
dSelector :: Document
..} -> do
            Int32 -> Put
putInt32 Int32
0
            Text -> Put
putCString Text
dFullCollection
            Int32 -> Put
putInt32 ([DeleteOption] -> Int32
dBits [DeleteOption]
dOptions)
            Document -> Put
putDocument Document
dSelector
        KillCursors{[Int64]
kCursorIds :: Notice -> [Int64]
kCursorIds :: [Int64]
..} -> do
            Int32 -> Put
putInt32 Int32
0
            Int32 -> Put
putInt32 (Int32 -> Put) -> Int32 -> Put
forall a b. (a -> b) -> a -> b
$ Int -> Int32
forall a. Enum a => Int -> a
toEnum ([Int64] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int64]
kCursorIds)
            (Int64 -> Put) -> [Int64] -> Put
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Int64 -> Put
putInt64 [Int64]
kCursorIds

data KillC = KillC { KillC -> Notice
killCursor :: Notice, KillC -> Text
kFullCollection:: FullCollection} deriving Int -> KillC -> ShowS
[KillC] -> ShowS
KillC -> [Char]
(Int -> KillC -> ShowS)
-> (KillC -> [Char]) -> ([KillC] -> ShowS) -> Show KillC
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> KillC -> ShowS
showsPrec :: Int -> KillC -> ShowS
$cshow :: KillC -> [Char]
show :: KillC -> [Char]
$cshowList :: [KillC] -> ShowS
showList :: [KillC] -> ShowS
Show

data Cmd = Nc Notice | Req Request | Kc KillC deriving Int -> Cmd -> ShowS
[Cmd] -> ShowS
Cmd -> [Char]
(Int -> Cmd -> ShowS)
-> (Cmd -> [Char]) -> ([Cmd] -> ShowS) -> Show Cmd
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Cmd -> ShowS
showsPrec :: Int -> Cmd -> ShowS
$cshow :: Cmd -> [Char]
show :: Cmd -> [Char]
$cshowList :: [Cmd] -> ShowS
showList :: [Cmd] -> ShowS
Show

data FlagBit =
      ChecksumPresent  -- ^ The message ends with 4 bytes containing a CRC-32C checksum
    | MoreToCome  -- ^ Another message will follow this one without further action from the receiver.
    | ExhaustAllowed  -- ^ The client is prepared for multiple replies to this request using the moreToCome bit.
    deriving (Int -> FlagBit -> ShowS
[FlagBit] -> ShowS
FlagBit -> [Char]
(Int -> FlagBit -> ShowS)
-> (FlagBit -> [Char]) -> ([FlagBit] -> ShowS) -> Show FlagBit
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> FlagBit -> ShowS
showsPrec :: Int -> FlagBit -> ShowS
$cshow :: FlagBit -> [Char]
show :: FlagBit -> [Char]
$cshowList :: [FlagBit] -> ShowS
showList :: [FlagBit] -> ShowS
Show, FlagBit -> FlagBit -> Bool
(FlagBit -> FlagBit -> Bool)
-> (FlagBit -> FlagBit -> Bool) -> Eq FlagBit
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: FlagBit -> FlagBit -> Bool
== :: FlagBit -> FlagBit -> Bool
$c/= :: FlagBit -> FlagBit -> Bool
/= :: FlagBit -> FlagBit -> Bool
Eq, Int -> FlagBit
FlagBit -> Int
FlagBit -> [FlagBit]
FlagBit -> FlagBit
FlagBit -> FlagBit -> [FlagBit]
FlagBit -> FlagBit -> FlagBit -> [FlagBit]
(FlagBit -> FlagBit)
-> (FlagBit -> FlagBit)
-> (Int -> FlagBit)
-> (FlagBit -> Int)
-> (FlagBit -> [FlagBit])
-> (FlagBit -> FlagBit -> [FlagBit])
-> (FlagBit -> FlagBit -> [FlagBit])
-> (FlagBit -> FlagBit -> FlagBit -> [FlagBit])
-> Enum FlagBit
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
$csucc :: FlagBit -> FlagBit
succ :: FlagBit -> FlagBit
$cpred :: FlagBit -> FlagBit
pred :: FlagBit -> FlagBit
$ctoEnum :: Int -> FlagBit
toEnum :: Int -> FlagBit
$cfromEnum :: FlagBit -> Int
fromEnum :: FlagBit -> Int
$cenumFrom :: FlagBit -> [FlagBit]
enumFrom :: FlagBit -> [FlagBit]
$cenumFromThen :: FlagBit -> FlagBit -> [FlagBit]
enumFromThen :: FlagBit -> FlagBit -> [FlagBit]
$cenumFromTo :: FlagBit -> FlagBit -> [FlagBit]
enumFromTo :: FlagBit -> FlagBit -> [FlagBit]
$cenumFromThenTo :: FlagBit -> FlagBit -> FlagBit -> [FlagBit]
enumFromThenTo :: FlagBit -> FlagBit -> FlagBit -> [FlagBit]
Enum)

uOptDoc :: UpdateOption -> Document
uOptDoc :: UpdateOption -> Document
uOptDoc UpdateOption
Upsert = [Text
"upsert" Text -> Bool -> Field
forall v. Val v => Text -> v -> Field
=: Bool
True]
uOptDoc UpdateOption
MultiUpdate = [Text
"multi" Text -> Bool -> Field
forall v. Val v => Text -> v -> Field
=: Bool
True]

{-
  OP_MSG header == 16 byte
  + 4 bytes flagBits
  + 1 byte payload type = 1
  + 1 byte payload type = 2
  + 4 byte size of payload
  == 26 bytes opcode overhead
  + X Full command document {insert: "test", writeConcern: {...}}
  + Y command identifier ("documents", "deletes", "updates") ( + \0)
-}
putOpMsg :: Cmd -> RequestId -> Maybe FlagBit -> Document -> Put
putOpMsg :: Cmd -> Int32 -> Maybe FlagBit -> Document -> Put
putOpMsg Cmd
cmd Int32
requestId Maybe FlagBit
flagBit Document
params = do
    let biT :: Int32
biT = Int32 -> (FlagBit -> Int32) -> Maybe FlagBit -> Int32
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int32
forall a. Bits a => a
zeroBits (Int -> Int32
forall a. Bits a => Int -> a
bit (Int -> Int32) -> (FlagBit -> Int) -> FlagBit -> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FlagBit -> Int
bitOpMsg) Maybe FlagBit
flagBit:: Int32
    Int32 -> Int32 -> Put
putOpMsgHeader Int32
opMsgOpcode Int32
requestId -- header
    case Cmd
cmd of
        Nc Notice
n -> case Notice
n of
            Insert{[Document]
[InsertOption]
Text
iFullCollection :: Notice -> Text
iOptions :: Notice -> [InsertOption]
iDocuments :: Notice -> [Document]
iFullCollection :: Text
iOptions :: [InsertOption]
iDocuments :: [Document]
..} -> do
                let (Document
sec0, Int32
sec1Size) =
                      Text
-> Maybe [Document]
-> Maybe Document
-> Text
-> Text
-> Document
-> (Document, Int32)
prepSectionInfo
                          Text
iFullCollection
                          ([Document] -> Maybe [Document]
forall a. a -> Maybe a
Just ([Document]
iDocuments:: [Document]))
                          (Maybe Document
forall a. Maybe a
Nothing:: Maybe Document)
                          (Text
"insert":: Text)
                          (Text
"documents":: Text)
                          Document
params
                Int32 -> Put
putInt32 Int32
biT                         -- flagBit
                Int8 -> Put
putInt8 Int8
0                            -- payload type 0
                Document -> Put
putDocument Document
sec0                     -- payload
                Int8 -> Put
putInt8 Int8
1                            -- payload type 1
                Int32 -> Put
putInt32 Int32
sec1Size                    -- size of section
                Text -> Put
putCString Text
"documents"               -- identifier
                (Document -> Put) -> [Document] -> Put
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Document -> Put
putDocument [Document]
iDocuments         -- payload
            Update{Document
[UpdateOption]
Text
uFullCollection :: Notice -> Text
uOptions :: Notice -> [UpdateOption]
uSelector :: Notice -> Document
uUpdater :: Notice -> Document
uFullCollection :: Text
uOptions :: [UpdateOption]
uSelector :: Document
uUpdater :: Document
..} -> do
                let doc :: Document
doc = [Text
"q" Text -> Document -> Field
forall v. Val v => Text -> v -> Field
=: Document
uSelector, Text
"u" Text -> Document -> Field
forall v. Val v => Text -> v -> Field
=: Document
uUpdater] Document -> Document -> Document
forall a. Semigroup a => a -> a -> a
<> (UpdateOption -> Document) -> [UpdateOption] -> Document
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap UpdateOption -> Document
uOptDoc [UpdateOption]
uOptions
                    (Document
sec0, Int32
sec1Size) =
                      Text
-> Maybe [Document]
-> Maybe Document
-> Text
-> Text
-> Document
-> (Document, Int32)
prepSectionInfo
                          Text
uFullCollection
                          (Maybe [Document]
forall a. Maybe a
Nothing:: Maybe [Document])
                          (Document -> Maybe Document
forall a. a -> Maybe a
Just Document
doc)
                          (Text
"update":: Text)
                          (Text
"updates":: Text)
                          Document
params
                Int32 -> Put
putInt32 Int32
biT
                Int8 -> Put
putInt8 Int8
0
                Document -> Put
putDocument Document
sec0
                Int8 -> Put
putInt8 Int8
1
                Int32 -> Put
putInt32 Int32
sec1Size
                Text -> Put
putCString Text
"updates"
                Document -> Put
putDocument Document
doc
            Delete{Document
[DeleteOption]
Text
dFullCollection :: Notice -> Text
dOptions :: Notice -> [DeleteOption]
dSelector :: Notice -> Document
dFullCollection :: Text
dOptions :: [DeleteOption]
dSelector :: Document
..} -> do
                -- Setting limit to 1 here is ok, since this is only used by deleteOne
                let doc :: Document
doc = [Text
"q" Text -> Document -> Field
forall v. Val v => Text -> v -> Field
=: Document
dSelector, Text
"limit" Text -> Int32 -> Field
forall v. Val v => Text -> v -> Field
=: (Int32
1 :: Int32)]
                    (Document
sec0, Int32
sec1Size) =
                      Text
-> Maybe [Document]
-> Maybe Document
-> Text
-> Text
-> Document
-> (Document, Int32)
prepSectionInfo
                          Text
dFullCollection
                          (Maybe [Document]
forall a. Maybe a
Nothing:: Maybe [Document])
                          (Document -> Maybe Document
forall a. a -> Maybe a
Just Document
doc)
                          (Text
"delete":: Text)
                          (Text
"deletes":: Text)
                          Document
params
                Int32 -> Put
putInt32 Int32
biT
                Int8 -> Put
putInt8 Int8
0
                Document -> Put
putDocument Document
sec0
                Int8 -> Put
putInt8 Int8
1
                Int32 -> Put
putInt32 Int32
sec1Size
                Text -> Put
putCString Text
"deletes"
                Document -> Put
putDocument Document
doc
            Notice
_ -> [Char] -> Put
forall a. HasCallStack => [Char] -> a
error [Char]
"The KillCursors command cannot be wrapped into a Nc type constructor. Please use the Kc type constructor"
        Req Request
r -> case Request
r of
            Query{Int32
Document
[QueryOption]
Text
qOptions :: [QueryOption]
qFullCollection :: Text
qSkip :: Int32
qBatchSize :: Int32
qSelector :: Document
qProjector :: Document
qOptions :: Request -> [QueryOption]
qFullCollection :: Request -> Text
qSkip :: Request -> Int32
qBatchSize :: Request -> Int32
qSelector :: Request -> Document
qProjector :: Request -> Document
..} -> do
                let n :: [Text]
n = HasCallStack => Text -> Text -> [Text]
Text -> Text -> [Text]
T.splitOn Text
"." Text
qFullCollection
                    db :: Text
db = [Text] -> Text
forall a. HasCallStack => [a] -> a
head [Text]
n
                    sec0 :: Document
sec0 = (Document -> Document -> Document) -> [Document] -> Document
forall a. HasCallStack => (a -> a -> a) -> [a] -> a
foldl1' Document -> Document -> Document
merge [Document
qProjector, [ Text
"$db" Text -> Text -> Field
forall v. Val v => Text -> v -> Field
=: Text
db ], Document
qSelector]
                Int32 -> Put
putInt32 Int32
biT
                Int8 -> Put
putInt8 Int8
0
                Document -> Put
putDocument Document
sec0
            GetMore{Int32
Int64
Text
gFullCollection :: Text
gBatchSize :: Int32
gCursorId :: Int64
gFullCollection :: Request -> Text
gBatchSize :: Request -> Int32
gCursorId :: Request -> Int64
..} -> do
                let n :: [Text]
n = HasCallStack => Text -> Text -> [Text]
Text -> Text -> [Text]
T.splitOn Text
"." Text
gFullCollection
                    (Text
db, Text
coll) = ([Text] -> Text
forall a. HasCallStack => [a] -> a
head [Text]
n, [Text] -> Text
forall a. HasCallStack => [a] -> a
last [Text]
n)
                    pre :: Document
pre = [Text
"getMore" Text -> Int64 -> Field
forall v. Val v => Text -> v -> Field
=: Int64
gCursorId, Text
"collection" Text -> Text -> Field
forall v. Val v => Text -> v -> Field
=: Text
coll, Text
"$db" Text -> Text -> Field
forall v. Val v => Text -> v -> Field
=: Text
db, Text
"batchSize" Text -> Int32 -> Field
forall v. Val v => Text -> v -> Field
=: Int32
gBatchSize]
                Int32 -> Put
putInt32 (Int -> Int32
forall a. Bits a => Int -> a
bit (Int -> Int32) -> Int -> Int32
forall a b. (a -> b) -> a -> b
$ FlagBit -> Int
bitOpMsg (FlagBit -> Int) -> FlagBit -> Int
forall a b. (a -> b) -> a -> b
$ FlagBit
ExhaustAllowed)
                Int8 -> Put
putInt8 Int8
0
                Document -> Put
putDocument Document
pre
            Message{Document
Text
mDatabase :: Text
mParams :: Document
mDatabase :: Request -> Text
mParams :: Request -> Document
..} -> do
                Int32 -> Put
putInt32 Int32
biT
                Int8 -> Put
putInt8 Int8
0
                Document -> Put
putDocument (Document -> Put) -> Document -> Put
forall a b. (a -> b) -> a -> b
$ Document -> Document -> Document
merge [ Text
"$db" Text -> Text -> Field
forall v. Val v => Text -> v -> Field
=: Text
mDatabase ] Document
mParams
        Kc KillC
k -> case KillC
k of
            KillC{Text
Notice
killCursor :: KillC -> Notice
kFullCollection :: KillC -> Text
killCursor :: Notice
kFullCollection :: Text
..} -> do
                let n :: [Text]
n = HasCallStack => Text -> Text -> [Text]
Text -> Text -> [Text]
T.splitOn Text
"." Text
kFullCollection
                    (Text
db, Text
coll) = ([Text] -> Text
forall a. HasCallStack => [a] -> a
head [Text]
n, [Text] -> Text
forall a. HasCallStack => [a] -> a
last [Text]
n)
                case Notice
killCursor of
                  KillCursors{[Int64]
kCursorIds :: Notice -> [Int64]
kCursorIds :: [Int64]
..} -> do
                      let doc :: Document
doc = [Text
"killCursors" Text -> Text -> Field
forall v. Val v => Text -> v -> Field
=: Text
coll, Text
"cursors" Text -> [Int64] -> Field
forall v. Val v => Text -> v -> Field
=: [Int64]
kCursorIds, Text
"$db" Text -> Text -> Field
forall v. Val v => Text -> v -> Field
=: Text
db]
                      Int32 -> Put
putInt32 Int32
biT
                      Int8 -> Put
putInt8 Int8
0
                      Document -> Put
putDocument Document
doc
                  -- Notices are already captured at the beginning, so all
                  -- other cases are impossible
                  Notice
_ -> [Char] -> Put
forall a. HasCallStack => [Char] -> a
error [Char]
"impossible"
 where
    lenBytes :: ByteString -> Int32
lenBytes ByteString
bytes = Int -> Int32
forall a. Enum a => Int -> a
toEnum (Int -> Int32) -> (Int64 -> Int) -> Int64 -> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int64 -> Int
forall a. Enum a => a -> Int
fromEnum (Int64 -> Int32) -> Int64 -> Int32
forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
L.length ByteString
bytes:: Int32
    prepSectionInfo :: Text
-> Maybe [Document]
-> Maybe Document
-> Text
-> Text
-> Document
-> (Document, Int32)
prepSectionInfo Text
fullCollection Maybe [Document]
documents Maybe Document
document Text
command Text
identifier Document
ps =
      let n :: [Text]
n = HasCallStack => Text -> Text -> [Text]
Text -> Text -> [Text]
T.splitOn Text
"." Text
fullCollection
          (Text
db, Text
coll) = ([Text] -> Text
forall a. HasCallStack => [a] -> a
head [Text]
n, [Text] -> Text
forall a. HasCallStack => [a] -> a
last [Text]
n)
      in
      case Maybe [Document]
documents of
        Just [Document]
ds ->
            let
                sec0 :: Document
sec0 = Document -> Document -> Document
merge Document
ps [Text
command Text -> Text -> Field
forall v. Val v => Text -> v -> Field
=: Text
coll, Text
"$db" Text -> Text -> Field
forall v. Val v => Text -> v -> Field
=: Text
db]
                s :: Int32
s = [Int32] -> Int32
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int32] -> Int32) -> [Int32] -> Int32
forall a b. (a -> b) -> a -> b
$ (Document -> Int32) -> [Document] -> [Int32]
forall a b. (a -> b) -> [a] -> [b]
map (ByteString -> Int32
lenBytes (ByteString -> Int32)
-> (Document -> ByteString) -> Document -> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> ByteString
runPut (Put -> ByteString) -> (Document -> Put) -> Document -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Document -> Put
putDocument) [Document]
ds
                i :: ByteString
i = Put -> ByteString
runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ Text -> Put
putCString Text
identifier
                -- +4 bytes for the type 1 section size that has to be
                -- transported in addition to the type 1 section document
                sec1Size :: Int32
sec1Size = Int32
s Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ ByteString -> Int32
lenBytes ByteString
i Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
4
            in (Document
sec0, Int32
sec1Size)
        Maybe [Document]
Nothing ->
            let
                sec0 :: Document
sec0 = Document -> Document -> Document
merge Document
ps [Text
command Text -> Text -> Field
forall v. Val v => Text -> v -> Field
=: Text
coll, Text
"$db" Text -> Text -> Field
forall v. Val v => Text -> v -> Field
=: Text
db]
                s :: ByteString
s = Put -> ByteString
runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ Document -> Put
putDocument (Document -> Put) -> Document -> Put
forall a b. (a -> b) -> a -> b
$ Maybe Document -> Document
forall a. HasCallStack => Maybe a -> a
fromJust Maybe Document
document
                i :: ByteString
i = Put -> ByteString
runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ Text -> Put
putCString Text
identifier
                sec1Size :: Int32
sec1Size = ByteString -> Int32
lenBytes ByteString
s Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ ByteString -> Int32
lenBytes ByteString
i Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
4
            in (Document
sec0, Int32
sec1Size)

iBit :: InsertOption -> Int32
iBit :: InsertOption -> Int32
iBit InsertOption
KeepGoing = Int -> Int32
forall a. Bits a => Int -> a
bit Int
0

iBits :: [InsertOption] -> Int32
iBits :: [InsertOption] -> Int32
iBits = [Int32] -> Int32
forall a. (Num a, Bits a) => [a] -> a
bitOr ([Int32] -> Int32)
-> ([InsertOption] -> [Int32]) -> [InsertOption] -> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (InsertOption -> Int32) -> [InsertOption] -> [Int32]
forall a b. (a -> b) -> [a] -> [b]
map InsertOption -> Int32
iBit

uBit :: UpdateOption -> Int32
uBit :: UpdateOption -> Int32
uBit UpdateOption
Upsert = Int -> Int32
forall a. Bits a => Int -> a
bit Int
0
uBit UpdateOption
MultiUpdate = Int -> Int32
forall a. Bits a => Int -> a
bit Int
1

uBits :: [UpdateOption] -> Int32
uBits :: [UpdateOption] -> Int32
uBits = [Int32] -> Int32
forall a. (Num a, Bits a) => [a] -> a
bitOr ([Int32] -> Int32)
-> ([UpdateOption] -> [Int32]) -> [UpdateOption] -> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (UpdateOption -> Int32) -> [UpdateOption] -> [Int32]
forall a b. (a -> b) -> [a] -> [b]
map UpdateOption -> Int32
uBit

dBit :: DeleteOption -> Int32
dBit :: DeleteOption -> Int32
dBit DeleteOption
SingleRemove = Int -> Int32
forall a. Bits a => Int -> a
bit Int
0

dBits :: [DeleteOption] -> Int32
dBits :: [DeleteOption] -> Int32
dBits = [Int32] -> Int32
forall a. (Num a, Bits a) => [a] -> a
bitOr ([Int32] -> Int32)
-> ([DeleteOption] -> [Int32]) -> [DeleteOption] -> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (DeleteOption -> Int32) -> [DeleteOption] -> [Int32]
forall a b. (a -> b) -> [a] -> [b]
map DeleteOption -> Int32
dBit

bitOpMsg :: FlagBit -> Int
bitOpMsg :: FlagBit -> Int
bitOpMsg FlagBit
ChecksumPresent = Int
0
bitOpMsg FlagBit
MoreToCome = Int
1
bitOpMsg FlagBit
ExhaustAllowed = Int
16

-- ** Request

-- | A request is a message that is sent with a 'Reply' expected in return
data Request =
      Query {
        Request -> [QueryOption]
qOptions :: [QueryOption],
        Request -> Text
qFullCollection :: FullCollection,
        Request -> Int32
qSkip :: Int32,  -- ^ Number of initial matching documents to skip
        Request -> Int32
qBatchSize :: Int32,  -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size.
        Request -> Document
qSelector :: Document,  -- ^ @[]@ = return all documents in collection
        Request -> Document
qProjector :: Document  -- ^ @[]@ = return whole document
    } | GetMore {
        Request -> Text
gFullCollection :: FullCollection,
        Request -> Int32
gBatchSize :: Int32,
        Request -> Int64
gCursorId :: CursorId
    } | Message {
        Request -> Text
mDatabase :: Text,
        Request -> Document
mParams :: Document
    }
    deriving (Int -> Request -> ShowS
[Request] -> ShowS
Request -> [Char]
(Int -> Request -> ShowS)
-> (Request -> [Char]) -> ([Request] -> ShowS) -> Show Request
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Request -> ShowS
showsPrec :: Int -> Request -> ShowS
$cshow :: Request -> [Char]
show :: Request -> [Char]
$cshowList :: [Request] -> ShowS
showList :: [Request] -> ShowS
Show, Request -> Request -> Bool
(Request -> Request -> Bool)
-> (Request -> Request -> Bool) -> Eq Request
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Request -> Request -> Bool
== :: Request -> Request -> Bool
$c/= :: Request -> Request -> Bool
/= :: Request -> Request -> Bool
Eq)

data QueryOption =
      TailableCursor  -- ^ Tailable means cursor is not closed when the last data is retrieved. Rather, the cursor marks the final object's position. You can resume using the cursor later, from where it was located, if more data were received. Like any "latent cursor", the cursor may become invalid at some point – for example if the final object it references were deleted. Thus, you should be prepared to requery on @CursorNotFound@ exception.
    | SlaveOK  -- ^ Allow query of replica slave. Normally these return an error except for namespace "local".
    | NoCursorTimeout  -- ^ The server normally times out idle cursors after 10 minutes to prevent a memory leak in case a client forgets to close a cursor. Set this option to allow a cursor to live forever until it is closed.
    | AwaitData  -- ^ Use with TailableCursor. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal.

--  | Exhaust  -- ^ Stream the data down full blast in multiple "more" packages, on the assumption that the client will fully read all data queried. Faster when you are pulling a lot of data and know you want to pull it all down. Note: the client is not allowed to not read all the data unless it closes the connection.
-- Exhaust commented out because not compatible with current `Pipeline` implementation

    | Partial  -- ^ Get partial results from a /mongos/ if some shards are down, instead of throwing an error.
    deriving (Int -> QueryOption -> ShowS
[QueryOption] -> ShowS
QueryOption -> [Char]
(Int -> QueryOption -> ShowS)
-> (QueryOption -> [Char])
-> ([QueryOption] -> ShowS)
-> Show QueryOption
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> QueryOption -> ShowS
showsPrec :: Int -> QueryOption -> ShowS
$cshow :: QueryOption -> [Char]
show :: QueryOption -> [Char]
$cshowList :: [QueryOption] -> ShowS
showList :: [QueryOption] -> ShowS
Show, QueryOption -> QueryOption -> Bool
(QueryOption -> QueryOption -> Bool)
-> (QueryOption -> QueryOption -> Bool) -> Eq QueryOption
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: QueryOption -> QueryOption -> Bool
== :: QueryOption -> QueryOption -> Bool
$c/= :: QueryOption -> QueryOption -> Bool
/= :: QueryOption -> QueryOption -> Bool
Eq)

-- *** Binary format

qOpcode :: Request -> Opcode
qOpcode :: Request -> Int32
qOpcode Query{} = Int32
2004
qOpcode GetMore{} = Int32
2005
qOpcode Message{} = Int32
2013

opMsgOpcode :: Opcode
opMsgOpcode :: Int32
opMsgOpcode = Int32
2013

putRequest :: Request -> RequestId -> Put
putRequest :: Request -> Int32 -> Put
putRequest Request
request Int32
requestId = do
    Int32 -> Int32 -> Put
putHeader (Request -> Int32
qOpcode Request
request) Int32
requestId
    case Request
request of
        Query{Int32
Document
[QueryOption]
Text
qOptions :: Request -> [QueryOption]
qFullCollection :: Request -> Text
qSkip :: Request -> Int32
qBatchSize :: Request -> Int32
qSelector :: Request -> Document
qProjector :: Request -> Document
qOptions :: [QueryOption]
qFullCollection :: Text
qSkip :: Int32
qBatchSize :: Int32
qSelector :: Document
qProjector :: Document
..} -> do
            Int32 -> Put
putInt32 ([QueryOption] -> Int32
qBits [QueryOption]
qOptions)
            Text -> Put
putCString Text
qFullCollection
            Int32 -> Put
putInt32 Int32
qSkip
            Int32 -> Put
putInt32 Int32
qBatchSize
            Document -> Put
putDocument Document
qSelector
            Bool -> Put -> Put
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Document -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null Document
qProjector) (Document -> Put
putDocument Document
qProjector)
        GetMore{Int32
Int64
Text
gFullCollection :: Request -> Text
gBatchSize :: Request -> Int32
gCursorId :: Request -> Int64
gFullCollection :: Text
gBatchSize :: Int32
gCursorId :: Int64
..} -> do
            Int32 -> Put
putInt32 Int32
0
            Text -> Put
putCString Text
gFullCollection
            Int32 -> Put
putInt32 Int32
gBatchSize
            Int64 -> Put
putInt64 Int64
gCursorId
        Message{Document
Text
mDatabase :: Request -> Text
mParams :: Request -> Document
mDatabase :: Text
mParams :: Document
..} -> do
            Int32 -> Put
putInt32 Int32
0
            Int8 -> Put
putInt8 Int8
0
            Document -> Put
putDocument (Document -> Put) -> Document -> Put
forall a b. (a -> b) -> a -> b
$ Document -> Document -> Document
merge [ Text
"$db" Text -> Text -> Field
forall v. Val v => Text -> v -> Field
=: Text
mDatabase ] Document
mParams

qBit :: QueryOption -> Int32
qBit :: QueryOption -> Int32
qBit QueryOption
TailableCursor = Int -> Int32
forall a. Bits a => Int -> a
bit Int
1
qBit QueryOption
SlaveOK = Int -> Int32
forall a. Bits a => Int -> a
bit Int
2
qBit QueryOption
NoCursorTimeout = Int -> Int32
forall a. Bits a => Int -> a
bit Int
4
qBit QueryOption
AwaitData = Int -> Int32
forall a. Bits a => Int -> a
bit Int
5
--qBit Exhaust = bit 6
qBit QueryOption
Database.MongoDB.Internal.Protocol.Partial = Int -> Int32
forall a. Bits a => Int -> a
bit Int
7

qBits :: [QueryOption] -> Int32
qBits :: [QueryOption] -> Int32
qBits = [Int32] -> Int32
forall a. (Num a, Bits a) => [a] -> a
bitOr ([Int32] -> Int32)
-> ([QueryOption] -> [Int32]) -> [QueryOption] -> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (QueryOption -> Int32) -> [QueryOption] -> [Int32]
forall a b. (a -> b) -> [a] -> [b]
map QueryOption -> Int32
qBit

-- ** Reply

-- | A reply is a message received in response to a 'Request'
data Reply = Reply {
    Reply -> [ResponseFlag]
rResponseFlags :: [ResponseFlag],
    Reply -> Int64
rCursorId :: CursorId,  -- ^ 0 = cursor finished
    Reply -> Int32
rStartingFrom :: Int32,
    Reply -> [Document]
rDocuments :: [Document]
    }
   | ReplyOpMsg {
        Reply -> [FlagBit]
flagBits :: [FlagBit],
        Reply -> [Document]
sections :: [Document],
        Reply -> Maybe Int32
checksum :: Maybe Int32
    }
    deriving (Int -> Reply -> ShowS
[Reply] -> ShowS
Reply -> [Char]
(Int -> Reply -> ShowS)
-> (Reply -> [Char]) -> ([Reply] -> ShowS) -> Show Reply
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Reply -> ShowS
showsPrec :: Int -> Reply -> ShowS
$cshow :: Reply -> [Char]
show :: Reply -> [Char]
$cshowList :: [Reply] -> ShowS
showList :: [Reply] -> ShowS
Show, Reply -> Reply -> Bool
(Reply -> Reply -> Bool) -> (Reply -> Reply -> Bool) -> Eq Reply
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Reply -> Reply -> Bool
== :: Reply -> Reply -> Bool
$c/= :: Reply -> Reply -> Bool
/= :: Reply -> Reply -> Bool
Eq)

data ResponseFlag =
      CursorNotFound  -- ^ Set when getMore is called but the cursor id is not valid at the server. Returned with zero results.
    | QueryError  -- ^ Query error. Returned with one document containing an "$err" field holding the error message.
    | AwaitCapable  -- ^ For backward compatability: Set when the server supports the AwaitData query option. if it doesn't, a replica slave client should sleep a little between getMore's
    deriving (Int -> ResponseFlag -> ShowS
[ResponseFlag] -> ShowS
ResponseFlag -> [Char]
(Int -> ResponseFlag -> ShowS)
-> (ResponseFlag -> [Char])
-> ([ResponseFlag] -> ShowS)
-> Show ResponseFlag
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ResponseFlag -> ShowS
showsPrec :: Int -> ResponseFlag -> ShowS
$cshow :: ResponseFlag -> [Char]
show :: ResponseFlag -> [Char]
$cshowList :: [ResponseFlag] -> ShowS
showList :: [ResponseFlag] -> ShowS
Show, ResponseFlag -> ResponseFlag -> Bool
(ResponseFlag -> ResponseFlag -> Bool)
-> (ResponseFlag -> ResponseFlag -> Bool) -> Eq ResponseFlag
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ResponseFlag -> ResponseFlag -> Bool
== :: ResponseFlag -> ResponseFlag -> Bool
$c/= :: ResponseFlag -> ResponseFlag -> Bool
/= :: ResponseFlag -> ResponseFlag -> Bool
Eq, Int -> ResponseFlag
ResponseFlag -> Int
ResponseFlag -> [ResponseFlag]
ResponseFlag -> ResponseFlag
ResponseFlag -> ResponseFlag -> [ResponseFlag]
ResponseFlag -> ResponseFlag -> ResponseFlag -> [ResponseFlag]
(ResponseFlag -> ResponseFlag)
-> (ResponseFlag -> ResponseFlag)
-> (Int -> ResponseFlag)
-> (ResponseFlag -> Int)
-> (ResponseFlag -> [ResponseFlag])
-> (ResponseFlag -> ResponseFlag -> [ResponseFlag])
-> (ResponseFlag -> ResponseFlag -> [ResponseFlag])
-> (ResponseFlag -> ResponseFlag -> ResponseFlag -> [ResponseFlag])
-> Enum ResponseFlag
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
$csucc :: ResponseFlag -> ResponseFlag
succ :: ResponseFlag -> ResponseFlag
$cpred :: ResponseFlag -> ResponseFlag
pred :: ResponseFlag -> ResponseFlag
$ctoEnum :: Int -> ResponseFlag
toEnum :: Int -> ResponseFlag
$cfromEnum :: ResponseFlag -> Int
fromEnum :: ResponseFlag -> Int
$cenumFrom :: ResponseFlag -> [ResponseFlag]
enumFrom :: ResponseFlag -> [ResponseFlag]
$cenumFromThen :: ResponseFlag -> ResponseFlag -> [ResponseFlag]
enumFromThen :: ResponseFlag -> ResponseFlag -> [ResponseFlag]
$cenumFromTo :: ResponseFlag -> ResponseFlag -> [ResponseFlag]
enumFromTo :: ResponseFlag -> ResponseFlag -> [ResponseFlag]
$cenumFromThenTo :: ResponseFlag -> ResponseFlag -> ResponseFlag -> [ResponseFlag]
enumFromThenTo :: ResponseFlag -> ResponseFlag -> ResponseFlag -> [ResponseFlag]
Enum)

-- * Binary format

replyOpcode :: Opcode
replyOpcode :: Int32
replyOpcode = Int32
1

getReply :: Get (ResponseTo, Reply)
getReply :: Get Response
getReply = do
    (Int32
opcode, Int32
responseTo) <- Get (Int32, Int32)
getHeader
    if Int32
opcode Int32 -> Int32 -> Bool
forall a. Eq a => a -> a -> Bool
== Int32
2013
      then do
            -- Notes:
            -- Checksum bits that are set by the server don't seem to be supported by official drivers.
            -- See: https://github.com/mongodb/mongo-python-driver/blob/master/pymongo/message.py#L1423
            [FlagBit]
flagBits <-  Int32 -> [FlagBit]
rFlagsOpMsg (Int32 -> [FlagBit]) -> Get Int32 -> Get [FlagBit]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get Int32
getInt32
            Int8
_ <- Get Int8
getInt8
            Document
sec0 <- Get Document
getDocument
            let sections :: [Document]
sections = [Document
sec0]
                checksum :: Maybe a
checksum = Maybe a
forall a. Maybe a
Nothing
            Response -> Get Response
forall a. a -> Get a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int32
responseTo, ReplyOpMsg{[Document]
[FlagBit]
Maybe Int32
forall a. Maybe a
flagBits :: [FlagBit]
sections :: [Document]
checksum :: Maybe Int32
flagBits :: [FlagBit]
sections :: [Document]
checksum :: forall a. Maybe a
..})
      else do
          Bool -> Get () -> Get ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Int32
opcode Int32 -> Int32 -> Bool
forall a. Eq a => a -> a -> Bool
== Int32
replyOpcode) (Get () -> Get ()) -> Get () -> Get ()
forall a b. (a -> b) -> a -> b
$ [Char] -> Get ()
forall a. [Char] -> Get a
forall (m :: * -> *) a. MonadFail m => [Char] -> m a
fail ([Char] -> Get ()) -> [Char] -> Get ()
forall a b. (a -> b) -> a -> b
$ [Char]
"expected reply opcode (1) but got " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ Int32 -> [Char]
forall a. Show a => a -> [Char]
show Int32
opcode
          [ResponseFlag]
rResponseFlags <-  Int32 -> [ResponseFlag]
rFlags (Int32 -> [ResponseFlag]) -> Get Int32 -> Get [ResponseFlag]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get Int32
getInt32
          Int64
rCursorId <- Get Int64
getInt64
          Int32
rStartingFrom <- Get Int32
getInt32
          Int
numDocs <- Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32 -> Int) -> Get Int32 -> Get Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get Int32
getInt32
          [Document]
rDocuments <- Int -> Get Document -> Get [Document]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numDocs Get Document
getDocument
          Response -> Get Response
forall a. a -> Get a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int32
responseTo, Reply{Int32
Int64
[Document]
[ResponseFlag]
rResponseFlags :: [ResponseFlag]
rCursorId :: Int64
rStartingFrom :: Int32
rDocuments :: [Document]
rResponseFlags :: [ResponseFlag]
rCursorId :: Int64
rStartingFrom :: Int32
rDocuments :: [Document]
..})

rFlags :: Int32 -> [ResponseFlag]
rFlags :: Int32 -> [ResponseFlag]
rFlags Int32
bits = (ResponseFlag -> Bool) -> [ResponseFlag] -> [ResponseFlag]
forall a. (a -> Bool) -> [a] -> [a]
filter (Int32 -> Int -> Bool
forall a. Bits a => a -> Int -> Bool
testBit Int32
bits (Int -> Bool) -> (ResponseFlag -> Int) -> ResponseFlag -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResponseFlag -> Int
rBit) [ResponseFlag
CursorNotFound ..]

-- See https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst#flagbits
rFlagsOpMsg :: Int32 -> [FlagBit]
rFlagsOpMsg :: Int32 -> [FlagBit]
rFlagsOpMsg Int32
bits = Int32 -> [FlagBit]
forall {a}. Bits a => a -> [FlagBit]
isValidFlag Int32
bits
  where isValidFlag :: a -> [FlagBit]
isValidFlag a
bt =
          let setBits :: [Int32]
setBits = ((Int32, Bool) -> Int32) -> [(Int32, Bool)] -> [Int32]
forall a b. (a -> b) -> [a] -> [b]
map (Int32, Bool) -> Int32
forall a b. (a, b) -> a
fst ([(Int32, Bool)] -> [Int32]) -> [(Int32, Bool)] -> [Int32]
forall a b. (a -> b) -> a -> b
$ ((Int32, Bool) -> Bool) -> [(Int32, Bool)] -> [(Int32, Bool)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(Int32
_,Bool
b) -> Bool
b Bool -> Bool -> Bool
forall a. Eq a => a -> a -> Bool
== Bool
True) ([(Int32, Bool)] -> [(Int32, Bool)])
-> [(Int32, Bool)] -> [(Int32, Bool)]
forall a b. (a -> b) -> a -> b
$ [Int32] -> [Bool] -> [(Int32, Bool)]
forall a b. [a] -> [b] -> [(a, b)]
zip ([Int32
0..Int32
31] :: [Int32]) ([Bool] -> [(Int32, Bool)]) -> [Bool] -> [(Int32, Bool)]
forall a b. (a -> b) -> a -> b
$ (Int -> Bool) -> [Int] -> [Bool]
forall a b. (a -> b) -> [a] -> [b]
map (a -> Int -> Bool
forall a. Bits a => a -> Int -> Bool
testBit a
bt) [Int
0 .. Int
31]
          in if (Int32 -> Bool) -> [Int32] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any (\Int32
n -> Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Int32 -> [Int32] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem Int32
n [Int32
0,Int32
1,Int32
16]) [Int32]
setBits
               then [Char] -> [FlagBit]
forall a. HasCallStack => [Char] -> a
error [Char]
"Unsopported bit was set"
               else (FlagBit -> Bool) -> [FlagBit] -> [FlagBit]
forall a. (a -> Bool) -> [a] -> [a]
filter (a -> Int -> Bool
forall a. Bits a => a -> Int -> Bool
testBit a
bt (Int -> Bool) -> (FlagBit -> Int) -> FlagBit -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FlagBit -> Int
bitOpMsg) [FlagBit
ChecksumPresent ..]

rBit :: ResponseFlag -> Int
rBit :: ResponseFlag -> Int
rBit ResponseFlag
CursorNotFound = Int
0
rBit ResponseFlag
QueryError = Int
1
rBit ResponseFlag
AwaitCapable = Int
3

-- * Authentication

type Username = Text
type Password = Text
type Nonce = Text

pwHash :: Username -> Password -> Text
pwHash :: Text -> Text -> Text
pwHash Text
u Text
p = [Char] -> Text
T.pack ([Char] -> Text) -> (Text -> [Char]) -> Text -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> [Char]
byteStringHex (ByteString -> [Char]) -> (Text -> ByteString) -> Text -> [Char]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
MD5.hash (ByteString -> ByteString)
-> (Text -> ByteString) -> Text -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
TE.encodeUtf8 (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ Text
u Text -> Text -> Text
`T.append` Text
":mongo:" Text -> Text -> Text
`T.append` Text
p

pwKey :: Nonce -> Username -> Password -> Text
pwKey :: Text -> Text -> Text -> Text
pwKey Text
n Text
u Text
p = [Char] -> Text
T.pack ([Char] -> Text) -> (Text -> [Char]) -> Text -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> [Char]
byteStringHex (ByteString -> [Char]) -> (Text -> ByteString) -> Text -> [Char]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
MD5.hash (ByteString -> ByteString)
-> (Text -> ByteString) -> Text -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
TE.encodeUtf8 (Text -> ByteString) -> (Text -> Text) -> Text -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text -> Text
T.append Text
n (Text -> Text) -> (Text -> Text) -> Text -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text -> Text
T.append Text
u (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
pwHash Text
u Text
p


{- Authors: Tony Hannan <tony@10gen.com>
   Copyright 2011 10gen Inc.
   Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -}