module Database.MongoDB.Queue (
emit
, nextFromQueue
, createEmitter
, createWorker
) where
import Prelude hiding (lookup)
import Control.Exception.Base (throwIO, Exception)
import Data.Typeable (Typeable)
import Database.MongoDB
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Applicative (Applicative)
import Control.Monad.Trans.Control (MonadBaseControl(..))
import Data.Text (Text)
import Network.BSD (getHostName, HostName)
queueCollection, handled, dataField, _id, hostField, versionField :: Text
queueCollection = "queue"
handled = "handled"
dataField = "data"
hostField = "host"
versionField = "version"
_id = "_id"
findAndModify :: (Applicative m, MonadIO m)
=> Query
-> Document
-> Action m (Either String Document)
findAndModify (Query {
selection = Select sel collection
, project = project
, sort = sort
}) updates = do
result <- runCommand [
"findAndModify" := String collection
, "new" := Bool True
, "query" := Doc sel
, "update" := Doc updates
, "fields" := Doc project
, "sort" := Doc sort
]
return $ case findErr result of
Nothing -> case lookup "value" result of
Nothing -> Left "findAndModify: no document found (value field was empty)"
Just doc -> Right doc
Just e -> Left e
where
findErr result = lookup "err" (at "lastErrorObject" result)
type DBRunner = (MonadIO m, MonadBaseControl IO m) => Action m a -> m a
data QueueEmitter = QueueEmitter {
qeVersion :: Int
, qeHost :: HostName
, qeRunDB :: DBRunner
, qeCollection :: Collection
}
data EmitterOpts = EmitterOpts {
emitterVersion :: Int
, emitterRunner :: DBRunner
, emitterCollection :: Collection
}
createEmitter :: DBRunner -> IO QueueEmitter
createEmitter runEmitter = mkEmitter $ EmitterOpts {
emitterVersion = 1
, emitterRunner = runEmitter
, emitterCollection = queueCollection
}
mkEmitter :: EmitterOpts -> IO QueueEmitter
mkEmitter EmitterOpts {..} = do
name <- getHostName
return $ QueueEmitter emitterVersion name emitterRunner emitterCollection
emit :: QueueEmitter -> Document -> IO ()
emit QueueEmitter {..} doc =
qeRunDB $ insert_ qeCollection [
versionField =: qeVersion
, dataField =: doc
, handled =: False
, hostField =: qeHost
]
data QueueWorker = QueueWorker {
qwRunDB :: DBRunner
, qwCursor :: Cursor
, qwCollection :: Collection
}
data WorkerOpts = WorkerOpts {
workerRunner :: DBRunner
, workerMaxByteSize :: Int
, workerCollection :: Collection
}
createWorker :: DBRunner -> IO QueueWorker
createWorker runWorker = mkWorker $ WorkerOpts {
workerRunner = runWorker
, workerMaxByteSize = 100000
, workerCollection = queueCollection
}
mkWorker :: WorkerOpts -> IO QueueWorker
mkWorker WorkerOpts {..} = do
_<- workerRunner $
createCollection [Capped, MaxByteSize workerMaxByteSize] workerCollection
cursor <- getCursor workerRunner workerCollection
return $ QueueWorker workerRunner cursor workerCollection
getCursor :: DBRunner -> Collection -> IO Cursor
getCursor runDB collection =
runDB $ do
_<- insert collection [ "tailableCursorFix" =: ("helps when there are no docs" :: Text) ]
find (select [ handled =: False ] collection) {
options = [TailableCursor, AwaitData, NoCursorTimeout]
}
nextDoc :: (MonadIO m, MonadBaseControl IO m, Functor m) => Cursor -> Action m Document
nextDoc cursor = do
n <- next cursor
case n of
Nothing -> nextDoc cursor
(Just doc) -> return doc
data MongoQueueException = FindAndModifyError String
deriving (Show, Typeable)
instance Exception MongoQueueException
nextFromQueue :: QueueWorker -> IO Document
nextFromQueue QueueWorker {..} = qwRunDB $ do
origDoc <- nextDoc qwCursor
liftIO $ print origDoc
eDoc <- findAndModify (select [_id := (valueAt _id origDoc)] queueCollection) {
sort = ["$natural" =: (1 :: Int)]
} [ "$set" =: [handled =: True] ]
case eDoc of
Left err -> liftIO $ throwIO $ FindAndModifyError err
Right doc -> do
liftIO $ print doc
return (at dataField doc)