{-# LANGUAGE FlexibleContexts, RecordWildCards, Rank2Types, DeriveDataTypeable, ExtendedDefaultRules #-} {-# OPTIONS_GHC -fno-warn-type-defaults #-} module Database.MongoDB.Queue ( emit , nextFromQueuePoll, nextFromQueueTail -- * queue emitters , createEmitter, mkEmitter, EmitterOpts (..) -- * queue consumers , createPollBroker, createTailBroker, mkTailBroker, mkPollBroker, WorkerOpts (..) ) where import Prelude hiding (lookup) import Control.Concurrent (threadDelay) import Data.IORef (atomicWriteIORef, IORef, newIORef, readIORef) import Control.Exception.Lifted (catch, throwIO, Exception, SomeException) import Data.Default (Default (..)) import Data.Typeable (Typeable) import Database.MongoDB import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Trans.Control (MonadBaseControl(..)) import Data.Text (Text) import Network.BSD (getHostName, HostName) import Control.Monad (void) import Control.Applicative default (Int) queueCollection, handled, dataField, _id, hostField, versionField :: Text queueCollection = "queue" handled = "handled" dataField = "data" hostField = "host" versionField = "version" _id = "_id" data QueueEmitter = QueueEmitter { qeVersion :: Int -- ^ version , qeHost :: HostName , qeCollection :: Collection } data EmitterOpts = EmitterOpts { emitterVersion :: Int , emitterCollection :: Collection , emitterMaxByteSize :: Int } instance Default EmitterOpts where def = EmitterOpts 1 queueCollection 100000 -- | create a QueueEmitter createEmitter :: (Applicative m, MonadIO m) => Action m QueueEmitter createEmitter = mkEmitter def -- | create an emitter with non-default configuration mkEmitter :: (Applicative m, MonadIO m) => EmitterOpts -> Action m QueueEmitter mkEmitter EmitterOpts {..} = do name <- liftIO getHostName void $ createCollection [Capped, MaxByteSize emitterMaxByteSize] emitterCollection return $ QueueEmitter emitterVersion name emitterCollection -- | emit a message for a worker emit :: (MonadIO m, Applicative m) => QueueEmitter -> Document -> Action m () emit QueueEmitter {..} doc = insert_ qeCollection [ versionField =: qeVersion , dataField =: doc , handled =: False , hostField =: qeHost ] -- TODO: add timestamp -- but actually the _id will already have a timestamp -- localTime: dt, -- globalTime: new Date(dt-self.serverTimeOffset), -- pickedTime: new Date(dt-self.serverTimeOffset), -- | A worker that uses a tailable cursor data TailBroker = TailBroker { tbCollection :: Collection } -- | A worker that uses polling data PollBroker = PollBroker { pbCollection :: Collection , pbPollInterval :: Int , pbLastId :: IORef ObjectId } data WorkerOpts = WorkerOpts { workerMaxByteSize :: Int , workerCollection :: Collection } instance Default WorkerOpts where def = WorkerOpts 100000 queueCollection -- | creates a TailBroker -- create a single TailBroker per process (per queue collection) -- call nextFromQueueTail with the TailBroker to get the next message -- -- TailBroker is designed to have 1 instance per process (and 1 process per machine) -- To handle multiple messages at once immediately hand off messages from nextFromQueueTail to worker threads (this library does not help you create worker threads) createTailBroker :: (MonadIO m, Applicative m) => Action m TailBroker createTailBroker = mkTailBroker def -- | same as createTailBroker, but uses a polling technique instead of tailable cursors -- Use default settings and poll with a 10 ms delay createPollBroker :: (MonadIO m, Applicative m) => Action m PollBroker createPollBroker = mkPollBroker def (10 * 10000) -- | create a tailable cursor worker with non-default configuration mkTailBroker :: (MonadIO m, Applicative m) => WorkerOpts -> Action m TailBroker mkTailBroker WorkerOpts {..} = do _<- createCollection [Capped, MaxByteSize workerMaxByteSize] workerCollection _ <- insert workerCollection [ "tailableCursorFix" =: ("helps when there are no docs" :: Text) ] return TailBroker { tbCollection = workerCollection } -- | create an worker with non-default configuration mkPollBroker :: (MonadIO m, Applicative m) => WorkerOpts -> Int -- ^ polling interval in us (uses threadDelay) -> Action m PollBroker mkPollBroker WorkerOpts {..} interval = do _<- createCollection [Capped, MaxByteSize workerMaxByteSize] workerCollection (ObjId insertId) <- insert workerCollection [ "tailableCursorFix" =: ("helps when there are no docs" :: Text) ] lastId <- liftIO $ newIORef insertId return PollBroker { pbCollection = workerCollection , pbPollInterval = interval , pbLastId = lastId } data MongoQueueException = FindAndModifyError String deriving (Show, Typeable) instance Exception MongoQueueException -- | Get the next message from the queue. -- First marks the message as handled. -- -- Uses polling rather than a tailable cursor -- -- Do not call this from multiple threads against the same PollBroker nextFromQueuePoll :: (MonadIO m, MonadBaseControl IO m) => PollBroker -> Action m Document nextFromQueuePoll pb = nextFromQueue (pbCollection pb) getCursor (nextDocPoll getCursor) $ \doc -> atomicWriteIORef (pbLastId pb) (at _id doc) where getCursor = getCursorPoll pb getCursorPoll :: (MonadIO m, MonadBaseControl IO m) => PollBroker -> Action m Cursor getCursorPoll PollBroker{..} = do lastId <- liftIO $ readIORef pbLastId find (select [ handled =: False, _id =: ["$gt" =: lastId ] ] pbCollection) -- | Get the next message from the queue. -- First marks the message as handled. -- -- Uses a tailable cursor rather than polling -- -- Do not call this from multiple threads against the same TailBroker nextFromQueueTail :: (MonadIO m, MonadBaseControl IO m) => TailBroker -> Action m Document nextFromQueueTail tb = nextFromQueue (tbCollection tb) (getCursorTail tb) nextDocTail (const (return ())) where getCursorTail :: (MonadIO m, MonadBaseControl IO m) => TailBroker -> Action m Cursor getCursorTail TailBroker{..} = find (select [ handled =: False ] tbCollection) { options = [TailableCursor, AwaitData, NoCursorTimeout] } nextFromQueue :: (MonadIO m, MonadBaseControl IO m) => Collection -> Action m Cursor -> (Cursor -> Action m Document) -> (Document -> IO ()) -> Action m Document nextFromQueue collection getCursor nextDoc successCb = getCursor >>= processNext where processNext cursor = do origDoc <- nextDoc cursor `catch` handleDroppedCursor getCursor nextDoc let idQuery = [_id := valueAt _id origDoc] eDoc <- findAndModify (selectQuery $ idQuery ++ [handled =: False]) ["$set" =: [handled =: True]] case eDoc of Right doc -> do liftIO $ successCb doc return (at dataField doc) Left err -> do -- a different cursor can lock this first by setting handled to True -- verify that this is what happened mDoc <- findOne (selectQuery idQuery) case mDoc of Nothing -> liftIO $ throwIO $ FindAndModifyError err Just _ -> processNext cursor selectQuery query = (select query collection) { sort = ["$natural" =: -1] } handleDroppedCursor :: (MonadIO m, MonadBaseControl IO m, Functor m) => Action m Cursor -> (Cursor -> Action m Document) -> SomeException -> Action m Document handleDroppedCursor getCursor nextDoc _ = do liftIO ( threadDelay (1000 * 1000) ) >> (getCursor >>= nextDoc) nextDocTail :: (MonadIO m, MonadBaseControl IO m, Functor m) => Cursor -> Action m Document nextDocTail cursor = do n <- next cursor case n of Nothing -> nextDocTail cursor (Just doc) -> return doc nextDocPoll :: (MonadIO m, MonadBaseControl IO m, Functor m) => Action m Cursor -> Cursor -> Action m Document nextDocPoll getCursor cursor = do n <- next cursor case n of Nothing -> do liftIO $ threadDelay (1000 * 100) -- 100 ms getCursor >>= nextDocPoll getCursor (Just doc) -> return doc {- -- | Perform the action every time there is a new message. -- And then marks the message as handled. -- Does not call ForkIO, blocks the program -- -- Do not call this multiple times against the same TailBroker work :: TailBroker -> (Document -> Action IO ()) -> IO () work qw handler = loop where loop = do doc <- nextFromQueue qw handler doc loop -}