module Database.MongoDB.Queue (
emit
, nextFromQueue
, createEmitter, mkEmitter, EmitterOpts (..)
, createWorker, mkWorker, WorkerOpts (..)
) where
import Prelude hiding (lookup)
import Control.Concurrent (threadDelay)
import Control.Exception.Lifted (catch, throwIO, Exception, SomeException)
import Data.Default (Default (..))
import Data.Typeable (Typeable)
import Database.MongoDB
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Control (MonadBaseControl(..))
import Data.Text (Text)
import Network.BSD (getHostName, HostName)
import Control.Monad (void)
queueCollection, handled, dataField, _id, hostField, versionField :: Text
queueCollection = "queue"
handled = "handled"
dataField = "data"
hostField = "host"
versionField = "version"
_id = "_id"
type DBRunner = (MonadIO m, MonadBaseControl IO m) => Action m a -> m a
data QueueEmitter = QueueEmitter {
qeVersion :: Int
, qeHost :: HostName
, qeRunDB :: DBRunner
, qeCollection :: Collection
}
data EmitterOpts = EmitterOpts
{ emitterVersion :: Int
, emitterCollection :: Collection
, emitterMaxByteSize :: Int
}
instance Default EmitterOpts where
def = EmitterOpts 1 queueCollection 100000
createEmitter :: DBRunner -> IO QueueEmitter
createEmitter = mkEmitter def
mkEmitter :: EmitterOpts -> DBRunner -> IO QueueEmitter
mkEmitter EmitterOpts {..} emitterRunner = do
name <- getHostName
void $ emitterRunner $ createCollection [Capped, MaxByteSize emitterMaxByteSize] emitterCollection
return $ QueueEmitter emitterVersion name emitterRunner emitterCollection
emit :: QueueEmitter -> Document -> IO ()
emit QueueEmitter {..} doc =
qeRunDB $ insert_ qeCollection [
versionField =: qeVersion
, dataField =: doc
, handled =: False
, hostField =: qeHost
]
data QueueWorker = QueueWorker {
qwRunDB :: DBRunner
, qwGetCursor :: IO Cursor
, qwCollection :: Collection
}
data WorkerOpts = WorkerOpts
{ workerMaxByteSize :: Int
, workerCollection :: Collection
}
instance Default WorkerOpts where
def = WorkerOpts 100000 queueCollection
createWorker :: DBRunner -> IO QueueWorker
createWorker = mkWorker def
mkWorker :: WorkerOpts -> DBRunner -> IO QueueWorker
mkWorker WorkerOpts {..} workerRunner = do
_<- workerRunner $
createCollection [Capped, MaxByteSize workerMaxByteSize] workerCollection
return $ QueueWorker
workerRunner
(getCursor workerRunner workerCollection)
workerCollection
getCursor :: DBRunner -> Collection -> IO Cursor
getCursor runDB collection =
runDB $ do
_<- insert collection [ "tailableCursorFix" =: ("helps when there are no docs" :: Text) ]
find (select [ handled =: False ] collection) {
options = [TailableCursor, AwaitData, NoCursorTimeout]
}
nextDoc :: (MonadIO m, MonadBaseControl IO m, Functor m) => Cursor -> Action m Document
nextDoc cursor = do
n <- next cursor
case n of
Nothing -> nextDoc cursor
(Just doc) -> return doc
data MongoQueueException = FindAndModifyError String
deriving (Show, Typeable)
instance Exception MongoQueueException
nextFromQueue :: QueueWorker -> IO Document
nextFromQueue QueueWorker {..} = do
cursor <- qwGetCursor
qwRunDB $ do
origDoc <- nextDoc cursor `catch` handleDroppedCursor
eDoc <- findAndModify (select [_id := (valueAt _id origDoc)] qwCollection) {
sort = ["$natural" =: (1 :: Int)]
} [ "$set" =: [handled =: True] ]
case eDoc of
Left err -> liftIO $ throwIO $ FindAndModifyError err
Right doc -> return (at dataField doc)
where
handleDroppedCursor :: (MonadIO m, MonadBaseControl IO m, Functor m) => SomeException -> Action m Document
handleDroppedCursor _ = nextDoc =<< liftIO (
threadDelay (1000 * 1000) >> qwGetCursor
)