module Database.MongoDB.Queue (
emit
, nextFromQueue
, createEmitter, mkEmitter, EmitterOpts (..)
, createWorker, mkWorker, WorkerOpts (..)
) where
import Prelude hiding (lookup)
import Control.Concurrent (threadDelay)
import Control.Exception.Lifted (catch, throwIO, Exception, SomeException)
import Data.Default (Default (..))
import Data.Typeable (Typeable)
import Database.MongoDB
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Control (MonadBaseControl(..))
import Data.Text (Text)
import Network.BSD (getHostName, HostName)
import Control.Monad (void)
import Control.Applicative
default (Int)
queueCollection, handled, dataField, _id, hostField, versionField :: Text
queueCollection = "queue"
handled = "handled"
dataField = "data"
hostField = "host"
versionField = "version"
_id = "_id"
data QueueEmitter = QueueEmitter {
qeVersion :: Int
, qeHost :: HostName
, qeCollection :: Collection
}
data EmitterOpts = EmitterOpts
{ emitterVersion :: Int
, emitterCollection :: Collection
, emitterMaxByteSize :: Int
}
instance Default EmitterOpts where
def = EmitterOpts 1 queueCollection 100000
createEmitter :: (Applicative m, MonadIO m) => Action m QueueEmitter
createEmitter = mkEmitter def
mkEmitter :: (Applicative m, MonadIO m) => EmitterOpts -> Action m QueueEmitter
mkEmitter EmitterOpts {..} = do
name <- liftIO getHostName
void $ createCollection [Capped, MaxByteSize emitterMaxByteSize] emitterCollection
return $ QueueEmitter emitterVersion name emitterCollection
emit :: (MonadIO m, Applicative m) => QueueEmitter -> Document -> Action m ()
emit QueueEmitter {..} doc =
insert_ qeCollection [
versionField =: qeVersion
, dataField =: doc
, handled =: False
, hostField =: qeHost
]
data QueueWorker = QueueWorker { qwCollection :: Collection }
data WorkerOpts = WorkerOpts
{ workerMaxByteSize :: Int
, workerCollection :: Collection
}
instance Default WorkerOpts where
def = WorkerOpts 100000 queueCollection
createWorker :: (MonadIO m, Applicative m) => Action m QueueWorker
createWorker = mkWorker def
mkWorker :: (MonadIO m, Applicative m) => WorkerOpts -> Action m QueueWorker
mkWorker WorkerOpts {..} = do
_<- createCollection [Capped, MaxByteSize workerMaxByteSize] workerCollection
return $ QueueWorker workerCollection
getCursor :: (MonadIO m, MonadBaseControl IO m) => QueueWorker -> Action m Cursor
getCursor QueueWorker{..} = do
_<- insert qwCollection [ "tailableCursorFix" =: ("helps when there are no docs" :: Text) ]
find (select [ handled =: False ] qwCollection) {
options = [TailableCursor, AwaitData, NoCursorTimeout]
}
nextDoc :: (MonadIO m, MonadBaseControl IO m, Functor m) => Cursor -> Action m Document
nextDoc cursor = do
n <- next cursor
case n of
Nothing -> nextDoc cursor
(Just doc) -> return doc
data MongoQueueException = FindAndModifyError String
deriving (Show, Typeable)
instance Exception MongoQueueException
nextFromQueue :: (MonadIO m, MonadBaseControl IO m) => QueueWorker -> Action m Document
nextFromQueue qw@QueueWorker {..} =
getCursor qw >>= processNext
where
processNext cursor = do
origDoc <- nextDoc cursor `catch` handleDroppedCursor
let idQuery = [_id := valueAt _id origDoc]
eDoc <- findAndModify (selectQuery $ idQuery ++ [handled =: False])
["$set" =: [handled =: True]]
case eDoc of
Right doc -> return (at dataField doc)
Left err -> do
mDoc <- findOne (selectQuery idQuery)
case mDoc of
Nothing -> liftIO $ throwIO $ FindAndModifyError err
Just _ -> processNext cursor
selectQuery query = (select query qwCollection) {
sort = ["$natural" =: 1]
}
handleDroppedCursor :: (MonadIO m, MonadBaseControl IO m, Functor m) => SomeException -> Action m Document
handleDroppedCursor _ =
liftIO ( threadDelay (1000 * 1000) ) >> (getCursor qw >>= nextDoc)