{-# LANGUAGE FlexibleContexts, RecordWildCards, Rank2Types, DeriveDataTypeable #-}
module Database.MongoDB.Queue (
    work
  , emit
  , peek
  , createEmitter
  , createWorker
) where

import Prelude hiding (lookup)
import Control.Exception.Base (throwIO, Exception)
import Data.Typeable (Typeable)
import Database.MongoDB
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Applicative (Applicative)
import Control.Monad.Trans.Control (MonadBaseControl(..))
import Data.Text (Text)
import Network.BSD (getHostName, HostName)
import Control.Monad (void)

queueCollection, handled, dataField, _id, hostField, versionField :: Text
queueCollection = "queue"
handled = "handled"
dataField = "data"
hostField = "host"
versionField = "version"
_id = "_id"

-- from Database.Persist.MongoDB, trying to move into driver
findAndModify :: (Applicative m, MonadIO m)
              => Query
              -> Document -- ^ updates
              -> Action m (Either String Document)
findAndModify (Query {
    selection = Select sel collection
  , project = project
  , sort = sort
  }) updates = do
  result <- runCommand [
     "findAndModify" := String collection
   , "new"    := Bool True -- return updated document, not original document
   , "query"  := Doc sel
   , "update" := Doc updates
   , "fields" := Doc project
   , "sort"   := Doc sort
   ]
  return $ case findErr result of
    Nothing -> case lookup "value" result of
      Nothing -> Left "findAndModify: no document found (value field was empty)"
      Just doc -> Right doc
    Just e -> Left e
    where
      findErr result = lookup "err" (at "lastErrorObject" result)


type DBRunner = MonadIO m => Action m a -> m a
data QueueEmitter = QueueEmitter {
                      qeVersion :: Int -- ^ version
                    , qeHost :: HostName
                    , qeRunDB :: DBRunner
                    , qeCollection :: Collection
                    }

data EmitterOpts = EmitterOpts {
                     emitterVersion :: Int
                   , emitterRunner :: DBRunner
                   , emitterCollection :: Collection
                   }

-- | create a QueueEmitter
createEmitter :: DBRunner -> IO QueueEmitter
createEmitter runEmitter = mkEmitter $ EmitterOpts {
                             emitterVersion = 1
                           , emitterRunner = runEmitter
                           , emitterCollection = queueCollection
                           }

mkEmitter :: EmitterOpts -> IO QueueEmitter
mkEmitter EmitterOpts {..} = do
  name <- getHostName
  return $ QueueEmitter emitterVersion name emitterRunner emitterCollection

-- | emit a message for a worker
emit :: QueueEmitter -> Document -> IO ()
emit QueueEmitter {..} doc =
  qeRunDB $ insert_ qeCollection [
            versionField =: qeVersion
          , dataField =: doc
          , handled =: False
          , hostField =: qeHost
          ]
          -- TODO: add timestamp
          -- but actually the _id will already have a timestamp
          -- localTime: dt, 
          -- globalTime: new Date(dt-self.serverTimeOffset),
          -- pickedTime: new Date(dt-self.serverTimeOffset),

data QueueWorker = QueueWorker {
                     qwRunDB :: DBRunner
                   , qwCursor :: Cursor
                   , qwCollection :: Collection
                   }
data WorkerOpts = WorkerOpts { 
                    workerRunner :: DBRunner
                  , workerMaxByteSize :: Int
                  , workerCollection :: Collection
                  }

-- | creates a QueueWorker
-- Do not 'work' multiple times against the same QueueWorker
createWorker :: DBRunner -> IO QueueWorker
createWorker runWorker = mkWorker $ WorkerOpts { 
                         workerRunner = runWorker
                       , workerMaxByteSize = 100000
                       , workerCollection = queueCollection
                       }

mkWorker :: WorkerOpts -> IO QueueWorker
mkWorker WorkerOpts {..} = do
    _<- workerRunner $
      createCollection [Capped, MaxByteSize workerMaxByteSize] workerCollection
    cursor <- getCursor workerRunner workerCollection
    return $ QueueWorker workerRunner cursor workerCollection

getCursor :: DBRunner -> Collection -> IO Cursor
getCursor runDB collection =
    runDB $ find (select [ handled =: False ] collection) {
        options = [TailableCursor, AwaitData, NoCursorTimeout]
      }


-- | used for testing.
-- Get the next document, will not mark it as handled
-- Do not peek/work multiple times against the same QueueWorker
peek :: QueueWorker -> IO Document
peek QueueWorker {..} = qwRunDB $ 
  fmap (at dataField) (nextDoc qwCursor)

nextDoc :: (MonadIO m, MonadBaseControl IO m, Functor m) => Cursor -> Action m Document
nextDoc cursor = do
  n <- next cursor
  case n of
    Nothing -> liftIO $ throwIO $ TailableCursorError "tailable cursor ended"
    (Just doc) -> return doc

data MongoQueueException = FindAndModifyError String
                         | TailableCursorError String
                         deriving (Show, Typeable)
instance Exception MongoQueueException

-- | Perform the action every time there is a new message.
-- And then marks the message as handled.
-- Does not call ForkIO, blocks the program
--
-- Do not call this multiple times against the same QueueWorker
work :: QueueWorker -> (Document -> Action IO ()) -> IO ()
work QueueWorker {..} handler = qwRunDB $ do
    void $ tailableCursorApply qwCursor $ \origDoc -> do
      eDoc <- findAndModify (select [_id := (valueAt _id origDoc)] queueCollection) {
          sort = ["$natural" =: (-1 :: Int)]
        } [ handled =: True ]
      case eDoc of
        Left err  -> liftIO $ throwIO $ FindAndModifyError err
        Right doc -> handler (at dataField doc)
  where
    tailableCursorApply :: (MonadIO m, MonadBaseControl IO m, Functor m) => Cursor -> (Document ->  Action m a) ->  Action m a
    tailableCursorApply cursor f = do
      x <- nextDoc cursor
      f x >> tailableCursorApply cursor f