{-# LANGUAGE DeriveDataTypeable, OverloadedStrings #-}
-----------------------------------------------------------------------------
-- |
-- Module      :  Network.Beanstalk
-- Copyright   :  (c) Greg Heartsfield 2010
-- License     :  BSD3
--
-- Client API to beanstalkd work queue.
-----------------------------------------------------------------------------

module Network.Beanstalk (
  -- * Connecting and Disconnecting
  connectBeanstalk, disconnectBeanstalk,
  -- * Beanstalk Job Commands
  putJob, releaseJob, reserveJob, reserveJobWithTimeout, deleteJob, buryJob,
  peekJob, peekReadyJob, peekDelayedJob, peekBuriedJob, kickJobs, touchJob,
  -- * Beanstalk Tube Commands
  useTube, watchTube, ignoreTube, pauseTube, listTubes, listTubesWatched,
  listTubeUsed,
  -- * Beanstalk Stats Commands
  statsJob, statsTube, statsServer, jobCountWithState,
  -- * Pretty-Printing Stats and Lists
  printStats, printList,
  -- * Exception Predicates
  isNotFoundException, isBadFormatException, isTimedOutException,
  isOutOfMemoryException, isInternalErrorException, isJobTooBigException,
  isDeadlineSoonException, isNotIgnoredException, isDrainingException,
  -- * Data Types
  Job(..), BeanstalkServer, JobState(..), BeanstalkException(..)
  ) where

import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
import Data.Typeable
import qualified Data.Map as M
import qualified Control.Exception as E
import Data.Maybe
import Control.Concurrent.MVar
import Control.Applicative
import Data.Attoparsec as P
import qualified Data.Attoparsec.Char8 as P8
import qualified Data.ByteString.Char8 as B
import Blaze.ByteString.Builder
import Blaze.ByteString.Builder.Char8
import Data.Monoid (mappend)

-- | Beanstalk Server, wrapped in an 'MVar' for synchronizing access
--   to the server socket.  As many of these can be created as are
--   needed, but jobs are associated to a single server session and
--   must be released/deleted with the same session that reserved
--   them.
type BeanstalkServer = MVar Socket

-- | Information essential to performing a job and operating on it.
data Job =
    Job { -- | Job numeric identifier
          job_id :: Int,
          -- | Job body
          job_body :: B.ByteString}
           deriving (Show, Read, Eq)

-- | States describing the lifecycle of a job.
data JobState = -- | Ready, retrievable with 'reserveJob'
                READY |
                -- | Reserved by a worker
                RESERVED |
                -- | Delayed, waiting to be put in ready queue
                DELAYED |
                -- | Buried, can be resurrected with 'kickJobs'
                BURIED
              deriving (Show, Read, Eq)

-- | Exceptions generated from the beanstalkd server
data BeanstalkException =
    -- | Job does not exist, or is not reserved by this client.
    NotFoundException |
    -- | The server did not have enough memory available to create the job.
    OutOfMemoryException |
    -- | The server detected an internal error.  If this happens, please report to
    --   <http://groups.google.com/group/beanstalk-talk>.
    InternalErrorException |
    -- | The server is in drain mode, and is not accepting new jobs.
    DrainingException |
    -- | Client sent a command that was not understood.  May indicate
    --   a bad argument list or other format violation.
    BadFormatException |
    -- | The server did not recognize a command.  Should never occur,
    --   this is either a bug in the hbeanstalk library or an
    --   incompatible server version.
    UnknownCommandException |
    -- | A 'putJob' call included a body larger than the server's
    --   @max-job-size@ setting allows.
    JobTooBigException |
    -- | This library failed to terminate a job body with a CR-LF
    --   terminator.  Should never occur, if it does it is a bug in
    --   hbeanstalk.
    ExpectedCRLFException |
    -- | Not strictly an error condition, this indicates a job this
    --   client has reserved is about to expire.  See
    --   <http://groups.google.com/group/beanstalk-talk/browse_thread/thread/232d0cac5bebe30f>
    --   for a detailed explanation.
    DeadlineSoonException |
    -- | Timeout for @reserveJobWithTimeout@ expired before a job became available.
    TimedOutException |
    -- | Client attempted to ignore the only tube in its watch list (clients
    --   must always watch one or more tubes).
    NotIgnoredException
    deriving (Show, Typeable, Eq)

instance E.Exception BeanstalkException

-- | Predicate to detect 'NotFoundException'
isNotFoundException :: BeanstalkException -> Bool
isNotFoundException = (==) NotFoundException

-- | Predicate to detect 'OutOfMemoryException'
isOutOfMemoryException :: BeanstalkException -> Bool
isOutOfMemoryException = (==) OutOfMemoryException

-- | Predicate to detect 'InternalErrorException'
isInternalErrorException :: BeanstalkException -> Bool
isInternalErrorException = (==) InternalErrorException

-- | Predicate to detect 'DrainingException'
isDrainingException :: BeanstalkException -> Bool
isDrainingException = (==) DrainingException

-- | Predicate to detect 'BadFormatException'
isBadFormatException :: BeanstalkException -> Bool
isBadFormatException = (==) BadFormatException

-- | Predicate to detect 'JobTooBigException'
isJobTooBigException :: BeanstalkException -> Bool
isJobTooBigException = (==) JobTooBigException

-- | Predicate to detect 'DeadlineSoonException'
isDeadlineSoonException :: BeanstalkException -> Bool
isDeadlineSoonException = (==) DeadlineSoonException

-- | Predicate to detect 'TimedOutException'
isTimedOutException :: BeanstalkException -> Bool
isTimedOutException = (==) TimedOutException

-- | Predicate to detect 'NotIgnoredException'
isNotIgnoredException :: BeanstalkException -> Bool
isNotIgnoredException = (==) NotIgnoredException

-- | Connect to a beanstalkd server.
connectBeanstalk :: HostName -- ^ Hostname of beanstalkd server
                 -> String -- ^ Port number of server
                 -> IO BeanstalkServer -- ^ Server object for commands to operate on
connectBeanstalk hostname port =
    do addrinfos <- getAddrInfo Nothing (Just hostname) (Just port)
       let serveraddr = head addrinfos
       -- Establish a socket for communication
       sock <- socket (addrFamily serveraddr) Stream defaultProtocol
       -- Mark the socket for keep-alive handling since it may be idle
       -- for long periods of time
       setSocketOption sock KeepAlive 1
       -- Connect to server
       connect sock (addrAddress serveraddr)
       bs <- newMVar sock
       return bs

-- | Disconnect from a beanstalkd server.  Any jobs reserved from this connection will be released
disconnectBeanstalk :: BeanstalkServer -- ^ Beanstalk server
                    -> IO ()
disconnectBeanstalk bs = withMVar bs task
    where task s = sClose s

-- | Put a new job on the current tube that was selected with useTube.
-- Specify numeric priority, delay before becoming active, a limit
-- on the time-to-run, and a job body.  Returns job state and ID.
putJob :: BeanstalkServer -- ^ Beanstalk server
       -> Int -- ^ Job priority: an integer less than 2**32. Jobs with smaller
             --   priorities are scheduled before jobs with
             --   larger priorities.  The most urgent priority is 0; the
             --   least urgent priority is 4294967295.
       -> Int -- ^ Number of seconds to delay putting the job into the
             --   ready queue.  Until that time expires, the job will
             --   have a state of 'DELAYED'.
       -> Int -- ^ Maximum time-to-run in seconds for a reserved job.
             --   This timer starts when a job is reserved.  If it
             --   expires without a 'deleteJob', 'releaseJob',
             --   'buryJob', or 'touchJob' command being run, the job
             --   will be placed back on the ready queue.  The minimum
             --   value is 1.
       -> B.ByteString -- ^ Job body.
       -> IO (JobState, Int) -- ^ State of the newly created job and its ID
putJob bs priority delay ttr jobBody = withMVar bs task
    where task s =
              do let job_size = B.length jobBody
                 sendAll s $ toByteString (fromByteString "put " `mappend`
                                           fromShow priority `mappend` fromChar ' ' `mappend`
                                           fromShow delay `mappend` fromChar ' ' `mappend`
                                           fromShow ttr `mappend` fromChar ' ' `mappend`
                                           fromShow job_size `mappend`
                                           fromByteString "\r\n" `mappend`
                                           fromByteString jobBody `mappend`
                                           fromByteString "\r\n")
                 response <- readLine s
                 checkForBeanstalkErrors response
                 let (state, jobid) = parsePut response
                 return (state, jobid)

-- | Reserve a new job from the watched tube list, blocking until one
--   becomes available. 'DeadlineSoonException' may be thrown if a job
--   reserved by the same client is about to expire.
reserveJob :: BeanstalkServer -- ^ Beanstalk server
           -> IO Job -- ^ Job reserved by this client
reserveJob bs = withMVar bs task
    where task s =
              do sendAll s "reserve\r\n"
                 response <- readLine s
                 checkForBeanstalkErrors response
                 let (jobid, bytes) = parseReserve response
                 jobContent <- recvBytes s bytes
                 _ <- recvBytes s 2 -- Ending CRLF
                 return (Job jobid jobContent)

-- | Reserve a job from the watched tube list, blocking for the specified number
-- of seconds or until a job is returned.  If no jobs are found before the
-- timeout value, a 'TimedOutException' will be thrown.  If another reserved job
-- is about to exceed its time-to-run, a 'DeadlineSoonException' will be thrown.
reserveJobWithTimeout :: BeanstalkServer -- ^ Beanstalk server
                      -> Int -- ^ Time in seconds to wait for a job
                            --   to become available.  Once this time
                            --   passes, a 'TimedOutException' will be
                            --   thrown.
                      -> IO Job -- ^ Job reserved by this client
reserveJobWithTimeout bs seconds = withMVar bs task
    where task s =
              do sendAll s $ toByteString (fromByteString "reserve-with-timeout " `mappend`
                                           fromShow seconds `mappend`
                                           fromByteString "\r\n")
                 response <- readLine s
                 checkForBeanstalkErrors response
                 let (jobid, bytes) = parseReserve response
                 jobContent <- recvBytes s bytes
                 _ <- recvBytes s 2 -- Ending CRLF
                 return (Job jobid jobContent)

-- | Delete a job to indicate that it has been completed.  If the job
--   does not exist, was not reserved by this client, or is not in the
--   'READY' or 'BURIED' state, a 'NotFoundException' will be thrown.
deleteJob :: BeanstalkServer -- ^ Beanstalk server
          -> Int -- ^ ID of the job to delete
          -> IO ()
deleteJob bs jobid = withMVar bs task
    where task s =
              do sendAll s $ toByteString (fromByteString "delete " `mappend`
                                           fromShow jobid `mappend`
                                           fromByteString "\r\n")
                 response <- readLine s
                 checkForBeanstalkErrors response

-- | Indicate that a job should be released back to the tube for another consumer.
releaseJob :: BeanstalkServer -- ^ Beanstalk server
           -> Int -- ^ ID of the job to release
           -> Int -- ^ New priority to assign the job
           -> Int -- ^ Delay before the job is placed on the ready queue
           -> IO ()
releaseJob bs jobid priority delay = withMVar bs task
    where task s =
              do sendAll s $ toByteString (fromByteString "release " `mappend`
                                           fromShow jobid `mappend` fromChar ' ' `mappend`
                                           fromShow priority `mappend` fromChar ' ' `mappend`
                                           fromShow delay `mappend` fromByteString "\r\n")
                 response <- readLine s
                 checkForBeanstalkErrors response

-- | Bury a job so that it cannot be reserved.
buryJob :: BeanstalkServer -- ^ Beanstalk server
        -> Int -- ^ ID of the job to bury
        -> Int -- ^ New priority to assign the job
        -> IO ()
buryJob bs jobid pri = withMVar bs task
    where task s =
              do sendAll s $ toByteString (fromByteString "bury " `mappend`
                                           fromShow jobid `mappend` fromChar ' ' `mappend`
                                           fromShow pri `mappend` fromByteString "\r\n")
                 response <- readLine s
                 checkForBeanstalkErrors response

checkForBeanstalkErrors :: B.ByteString -- ^ beanstalkd server response
                        -> IO ()
checkForBeanstalkErrors input =
    do eop OutOfMemoryException "OUT_OF_MEMORY\r\n"
       eop InternalErrorException "INTERNAL_ERROR\r\n"
       eop DrainingException "DRAINING\r\n"
       eop BadFormatException "BAD_FORMAT\r\n"
       eop UnknownCommandException "UNKNOWN_COMMAND\r\n"
       eop NotFoundException "NOT_FOUND\r\n"
       eop JobTooBigException "JOB_TOO_BIG\r\n"
       eop ExpectedCRLFException "EXPECTED_CRLF\r\n"
       eop DeadlineSoonException "DEADLINE_SOON\r\n"
       eop TimedOutException "TIMED_OUT\r\n"
       eop NotIgnoredException "NOT_IGNORED\r\n"
       where eop e s = if B.take (B.length s) input == s then E.throwIO e else return ()

-- | Assign a tube for new jobs created with put command.  If the tube
--   does not already exist, it will be created.  Initially, all
--   sessions will use the tube named \"default\".
useTube :: BeanstalkServer -- ^ Beanstalk server
        -> B.ByteString -- ^ Name of tube to watch
        -> IO ()
useTube bs name = withMVar bs task
    where task s =
              do sendAll s $ toByteString (fromByteString "use " `mappend`
                                           fromByteString name `mappend`
                                           fromByteString "\r\n")
                 response <- readLine s
                 checkForBeanstalkErrors response

-- | Add a named tube to the watch list, those tubes which
--   'reserveJob' will request jobs from.
watchTube :: BeanstalkServer -- ^ Beanstalk server
          -> B.ByteString -- ^ Name of tube to watch
          -> IO Int -- ^ Number of tubes currently being watched
watchTube bs name = withMVar bs task
    where task s =
              do sendAll s $ toByteString (fromByteString "watch " `mappend`
                                           fromByteString name `mappend`
                                           fromByteString "\r\n")
                 response <- readLine s
                 checkForBeanstalkErrors response
                 return $ parseWatching response

-- | Removes the named tube to watch list.  If the tube being ignored
--   is the only one currently being watched, a 'NotIgnoredException'
--   is thrown.
ignoreTube :: BeanstalkServer -- ^ Beanstalk server
           -> B.ByteString -- ^ Name of tube to ignore
           -> IO Int -- ^ Number of tubes currently being watched
ignoreTube bs name = withMVar bs task
    where task s =
              do sendAll s $ toByteString (fromByteString "ignore " `mappend`
                                           fromByteString name `mappend`
                                           fromByteString "\r\n")
                 response <- readLine s
                 checkForBeanstalkErrors response
                 return $ parseWatching response

-- | Inspect a specific job in the system.
peekJob :: BeanstalkServer -- ^ Beanstalk server
        -> Int -- ^ ID of job to get information about
        -> IO Job -- ^ Job definition
peekJob bs jobid = genericPeek bs (fromByteString "peek " `mappend` fromShow jobid)

-- | Inspect the next ready job on the currently used tube.
peekReadyJob :: BeanstalkServer -- ^ Beanstalk server
             -> IO Job -- ^ Job definition
peekReadyJob bs = genericPeek bs (fromByteString "peek-ready")

-- | Inspect the delayed job with shortest delay remaining on the currently used tube.
peekDelayedJob :: BeanstalkServer -- ^ Beanstalk server
               -> IO Job -- ^ Job definition
peekDelayedJob bs = genericPeek bs (fromByteString "peek-delayed")

-- | Inspect the next buried job on the currently used tube.
peekBuriedJob :: BeanstalkServer -- ^ Beanstalk server
              -> IO Job -- ^ Job definition
peekBuriedJob bs = genericPeek bs (fromByteString "peek-buried")

-- Essence of the peek command.  Variations (peek, peek-ready,
-- peek-delayed, peek-buried) just provide the command string, while
-- this function actually executes it and parses the results.
genericPeek :: BeanstalkServer -> Builder -> IO Job
genericPeek bs cmd = withMVar bs task
    where task s =
              do sendAll s $ toByteString (cmd `mappend` fromByteString "\r\n")
                 response <- readLine s
                 checkForBeanstalkErrors response
                 let (jobid, bytes) = parseFoundIdLen response
                 content <- recvBytes s bytes
                 _ <- recvBytes s 2 -- Ending CRLF
                 return (Job jobid content)

-- | Update the Time-To-Run (TTR) value for a job, giving a worker more time before job expiry.
touchJob :: BeanstalkServer -- ^ Beanstalk server
         -> Int -- ^ ID of job
         -> IO ()
touchJob bs jobid = withMVar bs task
    where task s =
              do sendAll s $ toByteString (fromByteString "touch " `mappend`
                                           fromShow jobid `mappend`
                                           fromByteString "\r\n")
                 response <- readLine s
                 checkForBeanstalkErrors response
                 return ()

-- | Move jobs from current tube into ready queue.  If buried jobs
--   exist, only those will be moved, otherwise delayed jobs will be
--   made ready.
kickJobs :: BeanstalkServer -- ^ Beanstalk server
         -> Int -- ^ Number of jobs to kick
         -> IO Int -- ^ Number of jobs actually kicked
kickJobs bs maxcount = withMVar bs task
    where task s =
              do sendAll s $ toByteString (fromByteString "kick " `mappend`
                                           fromShow maxcount `mappend`
                                           fromByteString "\r\n")
                 response <- readLine s
                 checkForBeanstalkErrors response
                 return (parseKicked response)

parseKicked :: B.ByteString -> Int
parseKicked input = case kparse of
                      Done _ x -> x
                      _        -> 0 -- Error
    where kparse = parse (P.string "KICKED " *> P8.decimal) input

-- Essence of the various stats commands.  Variations provide the
-- command, while this function actually executes it and parses the
-- results.
genericStats :: BeanstalkServer -> Builder -> IO (M.Map B.ByteString B.ByteString)
genericStats bs cmd = withMVar bs task
    where task s =
              do sendAll s $ toByteString (cmd `mappend` fromByteString "\r\n")
                 statHeader <- readLine s
                 checkForBeanstalkErrors statHeader
                 readYamlDict s

-- | Return statistical information about a job.  Keys that can be
--   expected to be returned are the following:
--
--   [@id@] ID of the job.
--
--   [@tube@] The tube that contains this job
--
--   [@state@] State of the job, either \"ready\", \"delayed\", \"reserved\", or \"buried\"
--
--   [@pri@] Priority of the job
--
--   [@age@] Time in seconds since the 'putJob' command created this job
--
--   [@time-left@] Time in seconds until this job is placed in the
--   ready queue, if it is currently reserved or delayed
--
--   [@reserves@] Number of times this job has been reserved
--
--   [@timeouts@] Number of times this job has timed out after a reservation
--
--   [@releases@] Number of times this job has been released
--
--   [@buries@] Number of times this job has been buried
--
--   [@kicks@] Number of times this job has been kicked
--
--   See the Beanstalk protocol docs for the definitive list and definitions.
statsJob :: BeanstalkServer -- ^ Beanstalk server
         -> Int -- ^ ID of job
         -> IO (M.Map B.ByteString B.ByteString) -- ^ Key-value map of job statistics
statsJob bs jobid = genericStats bs (fromByteString "stats-job " `mappend` fromShow jobid)

-- | Return statistical information about a tube.  Keys that can be
--   expected to be returned are the following:
--
--   [@name@] Name of the tube
--
--   [@current-jobs-urgent@] Number of jobs in this tube with state
--   'READY', with priority less than 1024
--
--   [@current-jobs-ready@] Number of jobs in this tube with state
--   'READY'
--
--   [@current-jobs-reserved@] Number of jobs in this tube with state
--   'RESERVED'
--
--   [@current-jobs-delayed@] Number of jobs in this tube with state
--   'DELAYED'
--
--   [@current-jobs-buried@] Number of jobs in this tube with state
--   'BURIED'
--
--   [@total-jobs@] Number of jobs that have been created in this tube
--   since it was created
--
--   [@current-waiting@] Number of clients that have issued a reserve
--   command for this tube, and are still blocking waiting on a
--   response
--
--   [@pause@] Number of seconds this tube has been paused
--
--   [@cmd-pause-tube@] Number of seconds this tube has been paused
--
--   [@pause-time-left@] Seconds remaining until this tube accepts job
--   reservations
--
--   See the Beanstalk protocol docs for the definitive list and definitions.
statsTube :: BeanstalkServer -- ^ Beanstalk server
          -> B.ByteString -- ^ Name of tube
          -> IO (M.Map B.ByteString B.ByteString) -- ^ Key-value map of tube statistics
statsTube bs tube =
    genericStats bs (fromByteString "stats-tube " `mappend` fromByteString tube)

-- | Print stats to screen in a readable format.
printStats :: M.Map B.ByteString B.ByteString -- ^ Key-value map of statistic names and values
           -> IO () -- ^ Screen output showing all \"key => value\" pairs
printStats stats =
    do let kv = M.assocs stats
       mapM_ (\(k,v) -> B.putStr k >> B.putStr " => " >> B.putStrLn v) kv

-- | Pretty print a list.
printList :: [B.ByteString] -- ^ List of names
          -> IO () -- ^ Screen output showing results with prefixed counter
printList list =
    do mapM_ (\(n,x) -> putStr (" "++(show n)++". ") >> B.putStrLn x) (zip [(1::Int)..] (list))

-- | Return statistical information about the server, across all
--   clients.  Keys that can be expected to be returned are the
--   following:
--
--   [@current-jobs-urgent@] Number of 'READY' jobs with priority less
--   than 1024
--
--   [@current-jobs-ready@] Number of jobs in the ready queue
--
--   [@current-jobs-reserved@] Number of jobs reserved
--
--   [@current-jobs-delayed@] Number of delayed jobs
--
--   [@current-jobs-buried@] Number of buried jobs
--
--   [@cmd-put@] Cumulative number of 'putJob' commands issued
--
--   [@cmd-peek@] Cumulative number of 'peekJob' commands issued
--
--   [@cmd-peek-ready@] Cumulative number of 'peekReadyJob' commands
--   issued
--
--   [@cmd-peek-delayed@] Cumulative number of 'peekDelayedJob'
--   commands issued
--
--   [@cmd-peek-buried@] Cumulative number of 'peekBuriedJob' commands
--   issued
--
--   [@cmd-reserve@] Cumulative number of 'reserveJob' commands issued
--
--   [@cmd-use@] Cumulative number of 'useTube' commands issued
--
--   [@cmd-watch@] Cumulative number of 'watchTube' commands issued
--
--   [@cmd-ignore@] Cumulative number of 'ignoreTube' commands issued
--
--   [@cmd-delete@] Cumulative number of 'deleteJob' commands issued
--
--   [@cmd-release@] Cumulative number of 'releaseJob' commands issued
--
--   [@cmd-bury@] Cumulative number of 'buryJob' commands issued
--
--   [@cmd-kick@] Cumulative number of 'kickJobs' commands issued
--
--   [@cmd-stats@] Cumulative number of 'statsServer' commands issued
--
--   [@cmd-stats-job@] Cumulative number of 'statsJob' commands issued
--
--   [@cmd-stats-tube@] Cumulative number of 'statsTube' commands
--   issued
--
--   [@cmd-list-tubes@] Cumulative number of 'listTubes' commands
--   issued
--
--   [@cmd-list-tube-used@] Cumulative number of 'listTubeUsed'
--   commands issued
--
--   [@cmd-list-tubes-watched@] Cumulative number of
--   'listTubesWatched' commands issued
--
--   [@cmd-pause-tube@] Cumulative number of 'pauseTube' commands
--   issued
--
--   [@job-timeouts@] Cumulative number of times a job has timed out
--
--   [@total-jobs@] Total count of jobs created
--
--   [@max-job-size@] Maximum number of bytes for a job body
--
--   [@current-tubes@] Current number of existing tubes
--
--   [@current-connections@] Number of currently open connections
--
--   [@current-producers@] Number of currently open connections that
--   have issued at least one 'putJob' command
--
--   [@current-workers@] Number of currently open connections that
--   have issued at least one 'reserveJob' command
--
--   [@current-waiting@] Number of currently open connections that are
--   blocking on a 'reserveJob' or 'reserveJobWithTimeout' command
--
--   [@total-connections@] Cumulative count of connections
--
--   [@pid@] Process ID of the server
--
--   [@version@] Server's version string
--
--   [@rusage-utime@] The accumulated user CPU time of the server
--   process in seconds and microseconds
--
--   [@rusage-stime@] The accumulated system CPU time of the server
--   process in seconds and microseconds
--
--   [@uptime@] The number of seconds since the server started
--
--   [@binlog-oldest-index@] The index of the oldest binlog file
--   needed to store the current jobs
--
--   [@binlog-current-index@] The index of the current binlog file
--   being written to.  If the binlog is not active, this is zero
--
--   [@binlog-max-size@] The maximum number of bytes for a binlog file
--   before a new binlog file is opened
--
--   See the Beanstalk protocol docs for the definitive list and definitions.
statsServer :: BeanstalkServer -- ^ Beanstalk server
            -> IO (M.Map B.ByteString B.ByteString) -- ^ Key-value map of server statistics
statsServer bs = genericStats bs (fromByteString "stats")

-- | Pause a tube for a specified time, so that reservations are no longer accepted.
pauseTube :: BeanstalkServer -- ^ Beanstalk server
          -> B.ByteString -- ^ Name of tube to pause
          -> Int -- ^ Number of seconds before reservations are accepted again
          -> IO ()
pauseTube bs tube delay = withMVar bs task
    where task s =
              do sendAll s $ toByteString (fromByteString "pause-tube " `mappend`
                                           fromByteString tube `mappend`
                                           fromChar ' ' `mappend`
                                           fromShow delay `mappend` fromByteString "\r\n")
                 response <- readLine s
                 checkForBeanstalkErrors response
                 return ()

-- | List all existing tubes.
listTubes :: BeanstalkServer -- ^ Beanstalk server
          -> IO [B.ByteString] -- ^ Names of all tubes on the server
listTubes bs = genericList bs "list-tubes"

-- | List all watched tubes.
listTubesWatched :: BeanstalkServer -- ^ Beanstalk server
                 -> IO [B.ByteString] -- ^ Names of all currently watched tubes
listTubesWatched bs = genericList bs "list-tubes-watched"

-- | List used tube.
listTubeUsed :: BeanstalkServer -- ^ Beanstalk server
             -> IO B.ByteString -- ^ Name of current used tube
listTubeUsed bs = withMVar bs task
    where task s =
              do sendAll s ("list-tube-used\r\n")
                 response <- readLine s
                 checkForBeanstalkErrors response
                 let tubeName = parseUsedTube response
                 return tubeName

parseUsedTube :: B.ByteString -> B.ByteString
parseUsedTube input =
    case (parse usedTubeParser input) of
      Done _ x -> x
      _        -> ""

nameParser :: Parser B.ByteString
nameParser = B.cons <$> leadingNameParser <*> followingNameParser
    where leadingNameParser = P8.satisfy $ P8.inClass "a-zA-Z0-9+/;.$_()"
          followingNameParser = P.takeWhile $ inClass "-a-zA-Z0-9+/;.$_()"

usedTubeParser :: Parser B.ByteString
usedTubeParser = P.string "USING " *> nameParser <* P.string "\r\n"

-- Essence of list commands that return YAML lists.
genericList :: BeanstalkServer -> B.ByteString -> IO [B.ByteString]
genericList bs cmd = withMVar bs task
    where task s =
              do sendAll s (cmd `B.append` "\r\n")
                 lHeader <- readLine s
                 checkForBeanstalkErrors lHeader
                 readYamlList s

-- | Count number of jobs in a tube with a state in a given list.
--   This is not part of the beanstalk protocol spec, so multiple
--   commands are issued to retrieve the count.  Therefore, the result
--   may not be consistent (it does not represent one snapshot in
--   time).
jobCountWithState :: BeanstalkServer -- ^ Beanstalk server
                   -> B.ByteString -- ^ Name of tube to inspect
                   -> [JobState] -- ^ List of valid states for count
                   -> IO Int -- ^ Number of jobs with a state in the valid list
jobCountWithState bs tube validStatuses =
    do ts <- statsTube bs tube
       let readyCount = case (elem READY validStatuses) of
                          True -> parseIntBS (fromJust (M.lookup "current-jobs-ready" ts))
                          False -> 0
       let reservedCount = case (elem RESERVED validStatuses) of
                          True -> parseIntBS (fromJust (M.lookup "current-jobs-reserved" ts))
                          False -> 0
       let delayedCount = case (elem DELAYED validStatuses) of
                          True -> parseIntBS (fromJust (M.lookup "current-jobs-delayed" ts))
                          False -> 0
       let buriedCount = case (elem BURIED validStatuses) of
                          True -> parseIntBS (fromJust (M.lookup "current-jobs-buried" ts))
                          False -> 0
       return (readyCount+reservedCount+delayedCount+buriedCount)

-- Read up to and including a newline.
readLine :: Socket -> IO B.ByteString
readLine s =
    readLine' (fromByteString B.empty) >>= return . toByteString
        where readLine' b = do c <- recvBytes s 1
                               if B.head c == '\n'
                                 then return    (b `mappend` fromByteString c)
                                 else readLine' (b `mappend` fromByteString c)

-- Parse response from watch/ignore command to determine how many
-- tubes are currently being watched.
-- Parse response from watch/ignore command to determine how many
-- tubes are currently being watched.
parseWatching :: B.ByteString -> Int
parseWatching input =
    case (parse (P.string "WATCHING " *> P8.decimal) input) of
      Done _ x -> x
      _        -> 0

-- Parse response from put command.
parsePut :: B.ByteString -> (JobState, Int)
parsePut input =
    case (parse putParser input) of
      Done _ x -> x
      _        -> (READY, 0)  -- Error

putParser :: Parser (JobState, Int)
putParser = (,) <$> (parseStateStr <$> takeTill (==32) <* P8.space) <*> P8.decimal
    where parseStateStr "BURIED" = BURIED
          parseStateStr _        = READY

-- Get Job ID and size.
parseReserve :: B.ByteString -> (Int,Int)
parseReserve input =
    case (parse reservedParser input) of
      Done _ x -> x
      _        -> (-1, 0)  -- Error

reservedParser :: Parser (Int, Int)
reservedParser = P.string "RESERVED " *> ((,) <$> P8.decimal <* P8.space <*> P8.decimal)

-- Get job id and number of bytes from FOUND response string.
parseFoundIdLen :: B.ByteString -> (Int,Int)
parseFoundIdLen input =
    case (parse foundIdLenParser input) of
      Done _ x -> x
      _        -> (0,0)

foundIdLenParser :: Parser (Int, Int)
foundIdLenParser = P.string "FOUND " *> ((,) <$> P8.decimal <* P8.space <*> P8.decimal)

-- Read a YAML list, and parse it.
readYamlList :: Socket -> IO [B.ByteString]
readYamlList s = do result <- parseWith (recv s 1024) yamlListParser ""
                    case result of
                      Done _ x -> return x
                      _        -> return []

yamlListParser :: Parser [B.ByteString]
yamlListParser = start_line *> many list_item <* P.string "\r\n"
    where start_line = P.string "---" *> P8.endOfLine
          list_item = P.string "- " *> takeTill P8.isEndOfLine <* P8.endOfLine

-- Read a YAML dict, parse it, and return a Map
readYamlDict :: Socket -> IO (M.Map B.ByteString B.ByteString)
readYamlDict s = do result <- parseWith (recv s 1024) yamlDictParser ""
                    case result of
                      Done _ x -> return x
                      _        -> return M.empty

yamlDictParser :: Parser (M.Map B.ByteString B.ByteString)
yamlDictParser = start_line *> (M.fromList <$> many dict_item) <* P.string "\r\n"
    where start_line = P.string "---" *> P8.endOfLine
          dict_item = (,) <$> dict_key <*> dict_value
          dict_key = takeTill colon_or_newline <* P.string ": "
          colon_or_newline byte = byte == 58 || P8.isEndOfLine byte
          dict_value = takeTill P8.isEndOfLine <* P8.endOfLine

-- Parse an integer ByteString
parseIntBS :: B.ByteString -> Int
parseIntBS input =
    case feed (parse P8.decimal input) "" of
      Done _ x -> x
      _        -> error "Beanstalk.parseIntBS: no parse"

-- Read bytes from a socket, and return the bytestring. Blocks until the given
-- number of bytes has been read.
recvBytes :: Socket -> Int -> IO B.ByteString
recvBytes s bytes = recv' (fromByteString "") bytes
    where recv' b 0 = return $ toByteString b
          recv' b n = do
            chunk <- recv s (min n 1024)
            let n' = n - B.length chunk
            if n' == n
              then ioError $ userError $ "Could not read " ++ (show bytes) ++
                       " bytes from beanstalkd; server disconnect."
              else
                  recv' (b `mappend` fromByteString chunk) n'