module Database.MongoDB
(
Connection,
connect, connectOnPort, conClose, disconnect,
dropDatabase,
Database, MongoDBCollectionInvalid,
ColCreateOpt(..),
collectionNames, createCollection, dropCollection, validateCollection,
Collection, FieldSelector, NumToSkip, NumToReturn, Selector,
QueryOpt(..),
UpdateFlag(..),
count, countMatching, delete, insert, insertMany, query, remove, update,
find, findOne, quickFind, quickFind',
Cursor,
allDocs, allDocs', finish, nextDoc,
)
where
import Control.Exception
import Control.Monad
import Data.Binary
import Data.Binary.Get
import Data.Binary.Put
import Data.Bits
import Data.ByteString.Char8 hiding (count, find)
import qualified Data.ByteString.Lazy as L
import qualified Data.ByteString.Lazy.UTF8 as L8
import Data.Int
import Data.IORef
import qualified Data.List as List
import Data.Maybe
import Data.Typeable
import Database.MongoDB.BSON as BSON
import Database.MongoDB.Util
import qualified Network
import Network.Socket hiding (connect, send, sendTo, recv, recvFrom)
import Prelude hiding (getContents)
import System.IO
import System.IO.Unsafe
import System.Random
data Connection = Connection { cHandle :: Handle, cRand :: IORef [Int] }
connect :: HostName -> IO Connection
connect = flip connectOnPort $ Network.PortNumber 27017
connectOnPort :: HostName -> Network.PortID -> IO Connection
connectOnPort host port = do
h <- Network.connectTo host port
hSetBuffering h NoBuffering
r <- newStdGen
let ns = randomRs (fromIntegral (minBound :: Int32),
fromIntegral (maxBound :: Int32)) r
nsRef <- newIORef ns
return $ Connection { cHandle = h, cRand = nsRef }
conClose :: Connection -> IO ()
conClose = hClose . cHandle
disconnect :: Connection -> IO ()
disconnect = conClose
dropDatabase :: Connection -> Database -> IO ()
dropDatabase c db = do
_ <- dbCmd c db $ toBsonDoc [("dropDatabase", toBson (1::Int))]
return ()
collectionNames :: Connection -> Database -> IO [Collection]
collectionNames c db = do
docs <- quickFind' c (db ++ ".system.namespaces") BSON.empty
let names = flip List.map docs $ \doc ->
fromBson $ fromJust $ BSON.lookup "name" doc
return $ List.filter (List.notElem '$') names
data ColCreateOpt = CCOSize Int64
| CCOCapped Bool
| CCOMax Int64
deriving (Show, Eq)
colCreateOptToBson :: ColCreateOpt -> (String, BsonValue)
colCreateOptToBson (CCOSize sz) = ("size", toBson sz)
colCreateOptToBson (CCOCapped b) = ("capped", toBson b)
colCreateOptToBson (CCOMax m) = ("max", toBson m)
createCollection :: Connection -> Collection -> [ColCreateOpt] -> IO ()
createCollection c col opts = do
let db = dbFromCol col
col' = colMinusDB col
dbcols <- collectionNames c db
case col `List.elem` dbcols of
True -> throwColInvalid $ "Collection already exists: " ++ show col
False -> return ()
case ".." `List.elem` (List.group col) of
True -> throwColInvalid $ "Collection can't contain \"..\": " ++ show col
False -> return ()
case '$' `List.elem` col &&
not ("oplog.$mail" `List.isPrefixOf` col' ||
"$cmd" `List.isPrefixOf` col') of
True -> throwColInvalid $ "Collection can't contain '$': " ++ show col
False -> return ()
case List.head col == '.' || List.last col == '.' of
True -> throwColInvalid $
"Collection can't start or end with '.': " ++ show col
False -> return ()
let cmd = ("create", toBson col') : List.map colCreateOptToBson opts
_ <- dbCmd c db $ toBsonDoc cmd
return ()
dropCollection :: Connection -> Collection -> IO ()
dropCollection c col = do
let db = dbFromCol col
col' = colMinusDB col
_ <- dbCmd c db $ toBsonDoc [("drop", toBson col')]
return ()
validateCollection :: Connection -> Collection -> IO String
validateCollection c col = do
let db = dbFromCol col
col' = colMinusDB col
res <- dbCmd c db $ toBsonDoc [("validate", toBson col')]
return $ fromBson $ fromJust $ BSON.lookup "result" res
dbFromCol :: Collection -> Database
dbFromCol = List.takeWhile (/= '.')
colMinusDB :: Collection -> Collection
colMinusDB = List.tail . List.dropWhile (/= '.')
dbCmd :: Connection -> Database -> BsonDoc -> IO BsonDoc
dbCmd c db cmd = do
mres <- findOne c (db ++ ".$cmd") cmd
let res = fromJust mres
case fromBson $ fromJust $ BSON.lookup "ok" res :: Int of
1 -> return ()
_ -> throwOpFailure $ "command \"" ++ show cmd ++ "\" failed: " ++
(fromBson $ fromJust $ BSON.lookup "errmsg" res)
return res
data Cursor = Cursor {
curCon :: Connection,
curID :: IORef Int64,
curNumToRet :: Int32,
curCol :: Collection,
curDocBytes :: IORef L.ByteString,
curClosed :: IORef Bool
}
data Opcode
= OP_REPLY
| OP_MSG
| OP_UPDATE
| OP_INSERT
| OP_GET_BY_OID
| OP_QUERY
| OP_GET_MORE
| OP_DELETE
| OP_KILL_CURSORS
deriving (Show, Eq)
data MongoDBInternalError = MongoDBInternalError String
deriving (Eq, Show, Read)
mongoDBInternalError :: TyCon
mongoDBInternalError = mkTyCon "Database.MongoDB.MongoDBInternalError"
instance Typeable MongoDBInternalError where
typeOf _ = mkTyConApp mongoDBInternalError []
instance Exception MongoDBInternalError
data MongoDBCollectionInvalid = MongoDBCollectionInvalid String
deriving (Eq, Show, Read)
mongoDBCollectionInvalid :: TyCon
mongoDBCollectionInvalid = mkTyCon "Database.MongoDB.MongoDBcollectionInvalid"
instance Typeable MongoDBCollectionInvalid where
typeOf _ = mkTyConApp mongoDBCollectionInvalid []
instance Exception MongoDBCollectionInvalid
throwColInvalid :: String -> a
throwColInvalid s = throw $ MongoDBCollectionInvalid s
data MongoDBOperationFailure = MongoDBOperationFailure String
deriving (Eq, Show, Read)
mongoDBOperationFailure :: TyCon
mongoDBOperationFailure = mkTyCon "Database.MongoDB.MongoDBoperationFailure"
instance Typeable MongoDBOperationFailure where
typeOf _ = mkTyConApp mongoDBOperationFailure []
instance Exception MongoDBOperationFailure
throwOpFailure :: String -> a
throwOpFailure s = throw $ MongoDBOperationFailure s
fromOpcode :: Opcode -> Int32
fromOpcode OP_REPLY = 1
fromOpcode OP_MSG = 1000
fromOpcode OP_UPDATE = 2001
fromOpcode OP_INSERT = 2002
fromOpcode OP_GET_BY_OID = 2003
fromOpcode OP_QUERY = 2004
fromOpcode OP_GET_MORE = 2005
fromOpcode OP_DELETE = 2006
fromOpcode OP_KILL_CURSORS = 2007
toOpcode :: Int32 -> Opcode
toOpcode 1 = OP_REPLY
toOpcode 1000 = OP_MSG
toOpcode 2001 = OP_UPDATE
toOpcode 2002 = OP_INSERT
toOpcode 2003 = OP_GET_BY_OID
toOpcode 2004 = OP_QUERY
toOpcode 2005 = OP_GET_MORE
toOpcode 2006 = OP_DELETE
toOpcode 2007 = OP_KILL_CURSORS
toOpcode n = throw $ MongoDBInternalError $ "Got unexpected Opcode: " ++ show n
type Database = String
type Collection = String
type Selector = BsonDoc
type FieldSelector = [L8.ByteString]
type RequestID = Int32
type NumToSkip = Int32
type NumToReturn = Int32
data QueryOpt = QO_TailableCursor
| QO_SlaveOK
| QO_OpLogReplay
| QO_NoCursorTimeout
deriving (Show)
fromQueryOpts :: [QueryOpt] -> Int32
fromQueryOpts opts = List.foldl (.|.) 0 $ fmap toVal opts
where toVal QO_TailableCursor = 2
toVal QO_SlaveOK = 4
toVal QO_OpLogReplay = 8
toVal QO_NoCursorTimeout = 16
data UpdateFlag = UF_Upsert
| UF_Multiupdate
deriving (Show, Enum)
fromUpdateFlags :: [UpdateFlag] -> Int32
fromUpdateFlags flags = List.foldl (.|.) 0 $
flip fmap flags $ (1 `shiftL`) . fromEnum
count :: Connection -> Collection -> IO Int64
count c col = countMatching c col BSON.empty
countMatching :: Connection -> Collection -> Selector -> IO Int64
countMatching c col sel = do
let db = dbFromCol col
col' = colMinusDB col
res <- dbCmd c db $ toBsonDoc [("count", toBson col'),
("query", BsonObject sel)]
return $ fromBson $ fromJust $ BSON.lookup "n" res
delete :: Connection -> Collection -> Selector -> IO RequestID
delete c col sel = do
let body = runPut $ do
putI32 0
putCol col
putI32 0
put sel
(reqID, msg) <- packMsg c OP_DELETE body
L.hPut (cHandle c) msg
return reqID
remove :: Connection -> Collection -> Selector -> IO RequestID
remove = delete
insert :: Connection -> Collection -> BsonDoc -> IO RequestID
insert c col doc = do
let body = runPut $ do
putI32 0
putCol col
put doc
(reqID, msg) <- packMsg c OP_INSERT body
L.hPut (cHandle c) msg
return reqID
insertMany :: Connection -> Collection -> [BsonDoc] -> IO RequestID
insertMany c col docs = do
let body = runPut $ do
putI32 0
putCol col
forM_ docs put
(reqID, msg) <- packMsg c OP_INSERT body
L.hPut (cHandle c) msg
return reqID
find :: Connection -> Collection -> Selector -> IO Cursor
find c col sel = query c col [] 0 0 sel []
findOne :: Connection -> Collection -> Selector -> IO (Maybe BsonDoc)
findOne c col sel = do
cur <- query c col [] 0 (1) sel []
el <- nextDoc cur
finish cur
return el
quickFind :: Connection -> Collection -> Selector -> IO [BsonDoc]
quickFind c col sel = find c col sel >>= allDocs
quickFind' :: Connection -> Collection -> Selector -> IO [BsonDoc]
quickFind' c col sel = find c col sel >>= allDocs'
query :: Connection -> Collection -> [QueryOpt] -> NumToSkip -> NumToReturn ->
Selector -> FieldSelector -> IO Cursor
query c col opts nskip ret sel fsel = do
let h = cHandle c
let body = runPut $ do
putI32 $ fromQueryOpts opts
putCol col
putI32 nskip
putI32 ret
put sel
case fsel of
[] -> putNothing
_ -> put $ toBsonDoc $ List.zip fsel $ repeat $ BsonInt32 1
(reqID, msg) <- packMsg c OP_QUERY body
L.hPut h msg
hdr <- getHeader h
assert (OP_REPLY == hOp hdr) $ return ()
assert (hRespTo hdr == reqID) $ return ()
reply <- getReply h
assert (rRespFlags reply == 0) $ return ()
docBytes <- (L.hGet h $ fromIntegral $ hMsgLen hdr 16 20) >>= newIORef
closed <- newIORef False
cid <- newIORef $ rCursorID reply
return $ Cursor {
curCon = c,
curID = cid,
curNumToRet = ret,
curCol = col,
curDocBytes = docBytes,
curClosed = closed
}
update :: Connection -> Collection ->
[UpdateFlag] -> Selector -> BsonDoc -> IO RequestID
update c col flags sel obj = do
let body = runPut $ do
putI32 0
putCol col
putI32 $ fromUpdateFlags flags
put sel
put obj
(reqID, msg) <- packMsg c OP_UPDATE body
L.hPut (cHandle c) msg
return reqID
data Hdr = Hdr {
hMsgLen :: Int32,
hRespTo :: Int32,
hOp :: Opcode
} deriving (Show)
data Reply = Reply {
rRespFlags :: Int32,
rCursorID :: Int64
} deriving (Show)
getHeader :: Handle -> IO Hdr
getHeader h = do
hdrBytes <- L.hGet h 16
return $ flip runGet hdrBytes $ do
msgLen <- getI32
skip 4
respTo <- getI32
op <- getI32
return $ Hdr msgLen respTo $ toOpcode op
getReply :: Handle -> IO Reply
getReply h = do
replyBytes <- L.hGet h 20
return $ flip runGet replyBytes $ do
respFlags <- getI32
cursorID <- getI64
skip 4
skip 4
return $ (Reply respFlags cursorID)
nextDoc :: Cursor -> IO (Maybe BsonDoc)
nextDoc cur = do
closed <- readIORef $ curClosed cur
case closed of
True -> return Nothing
False -> do
docBytes <- readIORef $ curDocBytes cur
cid <- readIORef $ curID cur
case L.length docBytes of
0 -> if cid == 0
then writeIORef (curClosed cur) True >> return Nothing
else getMore cur
_ -> do
let (doc, docBytes') = getFirstDoc docBytes
writeIORef (curDocBytes cur) docBytes'
return $ Just doc
allDocs :: Cursor -> IO [BsonDoc]
allDocs cur = unsafeInterleaveIO $ do
doc <- nextDoc cur
case doc of
Nothing -> return []
Just d -> allDocs cur >>= return . (d :)
allDocs' :: Cursor -> IO [BsonDoc]
allDocs' cur = do
doc <- nextDoc cur
case doc of
Nothing -> return []
Just d -> allDocs' cur >>= return . (d :)
getFirstDoc :: L.ByteString -> (BsonDoc, L.ByteString)
getFirstDoc docBytes = flip runGet docBytes $ do
doc <- get
docBytes' <- getRemainingLazyByteString
return (doc, docBytes')
getMore :: Cursor -> IO (Maybe BsonDoc)
getMore cur = do
let h = cHandle $ curCon cur
cid <- readIORef $ curID cur
let body = runPut $ do
putI32 0
putCol $ curCol cur
putI32 $ curNumToRet cur
putI64 cid
(reqID, msg) <- packMsg (curCon cur) OP_GET_MORE body
L.hPut h msg
hdr <- getHeader h
assert (OP_REPLY == hOp hdr) $ return ()
assert (hRespTo hdr == reqID) $ return ()
reply <- getReply h
assert (rRespFlags reply == 0) $ return ()
case rCursorID reply of
0 -> writeIORef (curID cur) 0
ncid -> assert (ncid == cid) $ return ()
docBytes <- (L.hGet h $ fromIntegral $ hMsgLen hdr 16 20)
case L.length docBytes of
0 -> writeIORef (curClosed cur) True >> return Nothing
_ -> do
let (doc, docBytes') = getFirstDoc docBytes
writeIORef (curDocBytes cur) docBytes'
return $ Just doc
finish :: Cursor -> IO ()
finish cur = do
let h = cHandle $ curCon cur
cid <- readIORef $ curID cur
let body = runPut $ do
putI32 0
putI32 1
putI64 cid
(_reqID, msg) <- packMsg (curCon cur) OP_KILL_CURSORS body
L.hPut h msg
writeIORef (curClosed cur) True
return ()
putCol :: Collection -> Put
putCol col = putByteString (pack col) >> putNull
packMsg :: Connection -> Opcode -> L.ByteString -> IO (RequestID, L.ByteString)
packMsg c op body = do
reqID <- randNum c
let msg = runPut $ do
putI32 $ fromIntegral $ L.length body + 16
putI32 reqID
putI32 0
putI32 $ fromOpcode op
putLazyByteString body
return (reqID, msg)
randNum :: Connection -> IO Int32
randNum Connection { cRand = nsRef } = atomicModifyIORef nsRef $ \ns ->
(List.tail ns,
fromIntegral $ List.head ns)