module Database.MongoDB.Queue (
work
, emit
, peek
, createEmitter
, createWorker
) where
import Prelude hiding (lookup)
import Control.Exception.Base (throwIO, Exception)
import Data.Typeable (Typeable)
import Database.MongoDB
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Applicative (Applicative)
import Control.Monad.Trans.Control (MonadBaseControl(..))
import Data.Text (Text)
import Network.BSD (getHostName, HostName)
import Control.Monad (void)
queueCollection, handled, dataField, _id, hostField, versionField :: Text
queueCollection = "queue"
handled = "handled"
dataField = "data"
hostField = "host"
versionField = "version"
_id = "_id"
findAndModify :: (Applicative m, MonadIO m)
=> Query
-> Document
-> Action m (Either String Document)
findAndModify (Query {
selection = Select sel collection
, project = project
, sort = sort
}) updates = do
result <- runCommand [
"findAndModify" := String collection
, "new" := Bool True
, "query" := Doc sel
, "update" := Doc updates
, "fields" := Doc project
, "sort" := Doc sort
]
return $ case findErr result of
Nothing -> case lookup "value" result of
Nothing -> Left "findAndModify: no document found (value field was empty)"
Just doc -> Right doc
Just e -> Left e
where
findErr result = lookup "err" (at "lastErrorObject" result)
type DBRunner = MonadIO m => Action m a -> m a
data QueueEmitter = QueueEmitter {
qeVersion :: Int
, qeHost :: HostName
, qeRunDB :: DBRunner
, qeCollection :: Collection
}
data EmitterOpts = EmitterOpts {
emitterVersion :: Int
, emitterRunner :: DBRunner
, emitterCollection :: Collection
}
createEmitter :: DBRunner -> IO QueueEmitter
createEmitter runEmitter = mkEmitter $ EmitterOpts {
emitterVersion = 1
, emitterRunner = runEmitter
, emitterCollection = queueCollection
}
mkEmitter :: EmitterOpts -> IO QueueEmitter
mkEmitter EmitterOpts {..} = do
name <- getHostName
return $ QueueEmitter emitterVersion name emitterRunner emitterCollection
emit :: QueueEmitter -> Document -> IO ()
emit QueueEmitter {..} doc =
qeRunDB $ insert_ qeCollection [
versionField =: qeVersion
, dataField =: doc
, handled =: False
, hostField =: qeHost
]
data QueueWorker = QueueWorker {
qwRunDB :: DBRunner
, qwCursor :: Cursor
, qwCollection :: Collection
}
data WorkerOpts = WorkerOpts {
workerRunner :: DBRunner
, workerMaxByteSize :: Int
, workerCollection :: Collection
}
createWorker :: DBRunner -> IO QueueWorker
createWorker runWorker = mkWorker $ WorkerOpts {
workerRunner = runWorker
, workerMaxByteSize = 100000
, workerCollection = queueCollection
}
mkWorker :: WorkerOpts -> IO QueueWorker
mkWorker WorkerOpts {..} = do
_<- workerRunner $
createCollection [Capped, MaxByteSize workerMaxByteSize] workerCollection
cursor <- getCursor workerRunner workerCollection
return $ QueueWorker workerRunner cursor workerCollection
getCursor :: DBRunner -> Collection -> IO Cursor
getCursor runDB collection =
runDB $ find (select [ handled =: False ] collection) {
options = [TailableCursor, AwaitData, NoCursorTimeout]
}
peek :: QueueWorker -> IO Document
peek QueueWorker {..} = qwRunDB $
fmap (at dataField) (nextDoc qwCursor)
nextDoc :: (MonadIO m, MonadBaseControl IO m, Functor m) => Cursor -> Action m Document
nextDoc cursor = do
n <- next cursor
case n of
Nothing -> liftIO $ throwIO $ TailableCursorError "tailable cursor ended"
(Just doc) -> return doc
data MongoQueueException = FindAndModifyError String
| TailableCursorError String
deriving (Show, Typeable)
instance Exception MongoQueueException
work :: QueueWorker -> (Document -> Action IO ()) -> IO ()
work QueueWorker {..} handler = qwRunDB $ do
void $ tailableCursorApply qwCursor $ \origDoc -> do
eDoc <- findAndModify (select [_id := (valueAt _id origDoc)] queueCollection) {
sort = ["$natural" =: (1 :: Int)]
} [ handled =: True ]
case eDoc of
Left err -> liftIO $ throwIO $ FindAndModifyError err
Right doc -> handler (at dataField doc)
where
tailableCursorApply :: (MonadIO m, MonadBaseControl IO m, Functor m) => Cursor -> (Document -> Action m a) -> Action m a
tailableCursorApply cursor f = do
x <- nextDoc cursor
f x >> tailableCursorApply cursor f