module Database.MongoDB.Queue (
emit
, nextFromQueuePoll, nextFromQueueTail
, createEmitter, mkEmitter, EmitterOpts (..)
, createPollBroker, createTailBroker, mkTailBroker, mkPollBroker, WorkerOpts (..)
) where
import Prelude hiding (lookup)
import Control.Concurrent (threadDelay)
import Data.IORef (atomicWriteIORef, IORef, newIORef, readIORef)
import Control.Exception.Lifted (catch, throwIO, Exception, SomeException)
import Data.Default (Default (..))
import Data.Typeable (Typeable)
import Database.MongoDB
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Control (MonadBaseControl(..))
import Data.Text (Text)
import Network.BSD (getHostName, HostName)
import Control.Monad (void)
import Control.Applicative
default (Int)
queueCollection, handled, dataField, _id, hostField, versionField :: Text
queueCollection = "queue"
handled = "handled"
dataField = "data"
hostField = "host"
versionField = "version"
_id = "_id"
data QueueEmitter = QueueEmitter {
qeVersion :: Int
, qeHost :: HostName
, qeCollection :: Collection
}
data EmitterOpts = EmitterOpts
{ emitterVersion :: Int
, emitterCollection :: Collection
, emitterMaxByteSize :: Int
}
instance Default EmitterOpts where
def = EmitterOpts 1 queueCollection 100000
createEmitter :: (Applicative m, MonadIO m) => Action m QueueEmitter
createEmitter = mkEmitter def
mkEmitter :: (Applicative m, MonadIO m) => EmitterOpts -> Action m QueueEmitter
mkEmitter EmitterOpts {..} = do
name <- liftIO getHostName
void $ createCollection [Capped, MaxByteSize emitterMaxByteSize] emitterCollection
return $ QueueEmitter emitterVersion name emitterCollection
emit :: (MonadIO m, Applicative m) => QueueEmitter -> Document -> Action m ()
emit QueueEmitter {..} doc =
insert_ qeCollection [
versionField =: qeVersion
, dataField =: doc
, handled =: False
, hostField =: qeHost
]
data TailBroker = TailBroker { tbCollection :: Collection }
data PollBroker = PollBroker
{ pbCollection :: Collection
, pbPollInterval :: Int
, pbLastId :: IORef ObjectId
}
data WorkerOpts = WorkerOpts
{ workerMaxByteSize :: Int
, workerCollection :: Collection
}
instance Default WorkerOpts where
def = WorkerOpts 100000 queueCollection
createTailBroker :: (MonadIO m, Applicative m) => Action m TailBroker
createTailBroker = mkTailBroker def
createPollBroker :: (MonadIO m, Applicative m) => Action m PollBroker
createPollBroker = mkPollBroker def (10 * 10000)
mkTailBroker :: (MonadIO m, Applicative m) => WorkerOpts -> Action m TailBroker
mkTailBroker WorkerOpts {..} = do
_<- createCollection [Capped, MaxByteSize workerMaxByteSize] workerCollection
_ <- insert workerCollection [ "tailableCursorFix" =: ("helps when there are no docs" :: Text) ]
return TailBroker { tbCollection = workerCollection }
mkPollBroker :: (MonadIO m, Applicative m)
=> WorkerOpts
-> Int
-> Action m PollBroker
mkPollBroker WorkerOpts {..} interval = do
_<- createCollection [Capped, MaxByteSize workerMaxByteSize] workerCollection
(ObjId insertId) <- insert workerCollection [
"tailableCursorFix" =: ("helps when there are no docs" :: Text)
]
lastId <- liftIO $ newIORef insertId
return PollBroker
{ pbCollection = workerCollection
, pbPollInterval = interval
, pbLastId = lastId
}
data MongoQueueException = FindAndModifyError String
deriving (Show, Typeable)
instance Exception MongoQueueException
nextFromQueuePoll :: (MonadIO m, MonadBaseControl IO m) => PollBroker -> Action m Document
nextFromQueuePoll pb = nextFromQueue (pbCollection pb) getCursor (nextDocPoll getCursor) $
\doc -> atomicWriteIORef (pbLastId pb) (at _id doc)
where
getCursor = getCursorPoll pb
getCursorPoll :: (MonadIO m, MonadBaseControl IO m) => PollBroker -> Action m Cursor
getCursorPoll PollBroker{..} = do
lastId <- liftIO $ readIORef pbLastId
find (select [ handled =: False, _id =: ["$gt" =: lastId ] ] pbCollection)
nextFromQueueTail :: (MonadIO m, MonadBaseControl IO m) => TailBroker -> Action m Document
nextFromQueueTail tb = nextFromQueue (tbCollection tb) (getCursorTail tb) nextDocTail (const (return ()))
where
getCursorTail :: (MonadIO m, MonadBaseControl IO m) => TailBroker -> Action m Cursor
getCursorTail TailBroker{..} =
find (select [ handled =: False ] tbCollection) {
options = [TailableCursor, AwaitData, NoCursorTimeout]
}
nextFromQueue :: (MonadIO m, MonadBaseControl IO m)
=> Collection
-> Action m Cursor
-> (Cursor -> Action m Document)
-> (Document -> IO ())
-> Action m Document
nextFromQueue collection getCursor nextDoc successCb =
getCursor >>= processNext
where
processNext cursor = do
origDoc <- nextDoc cursor `catch` handleDroppedCursor getCursor nextDoc
let idQuery = [_id := valueAt _id origDoc]
eDoc <- findAndModify (selectQuery $ idQuery ++ [handled =: False])
["$set" =: [handled =: True]]
case eDoc of
Right doc -> do
liftIO $ successCb doc
return (at dataField doc)
Left err -> do
mDoc <- findOne (selectQuery idQuery)
case mDoc of
Nothing -> liftIO $ throwIO $ FindAndModifyError err
Just _ -> processNext cursor
selectQuery query = (select query collection) {
sort = ["$natural" =: 1]
}
handleDroppedCursor :: (MonadIO m, MonadBaseControl IO m, Functor m) => Action m Cursor -> (Cursor -> Action m Document) -> SomeException -> Action m Document
handleDroppedCursor getCursor nextDoc _ = do
liftIO ( threadDelay (1000 * 1000) ) >> (getCursor >>= nextDoc)
nextDocTail :: (MonadIO m, MonadBaseControl IO m, Functor m) => Cursor -> Action m Document
nextDocTail cursor = do
n <- next cursor
case n of
Nothing -> nextDocTail cursor
(Just doc) -> return doc
nextDocPoll :: (MonadIO m, MonadBaseControl IO m, Functor m) => Action m Cursor -> Cursor -> Action m Document
nextDocPoll getCursor cursor = do
n <- next cursor
case n of
Nothing -> do
liftIO $ threadDelay (1000 * 100)
getCursor >>= nextDocPoll getCursor
(Just doc) -> return doc