module Database.Disque
(
Disque
, runDisque
, Connection
, ConnectInfo(..)
, Reply(..)
, Job(..)
, JobId
, disqueConnectInfo
, connect
, addjob
, getjob
, getjobs
, ackjob
, fastack
, working
, nack
, info'
, hello
, qlen
, qstat
, qpeek
, enqueue
, dequeue
, deljob
, jshow
, qscan
, jscan
, withGetJobs
) where
import qualified Data.ByteString.Char8 as BS8
#if !MIN_VERSION_base(4,8,0)
import Data.Monoid
import Control.Applicative
#endif
import Control.Monad.IO.Class
import Data.ByteString (ByteString)
import Database.Redis as R
import Data.String.Conversions (cs)
newtype Disque a
= Disque (Redis a)
deriving ( Functor
, Applicative
, Monad
, MonadIO
, MonadRedis
)
runDisque :: Connection -> Disque a -> IO a
runDisque c (Disque m) = runRedis c m
type JobId = ByteString
type Queue = ByteString
type Data = ByteString
data Job = Job {queue:: Queue, jobid :: JobId, jobdata :: Data}
deriving (Show)
instance RedisResult Job where
decode (MultiBulk (Just (x:y:z:_))) =
Job <$> decode x <*> decode y <*> decode z
decode r = Left r
disqueConnectInfo :: ConnectInfo
disqueConnectInfo
= defaultConnectInfo {
connectHost = "127.0.0.1"
, connectPort = PortNumber 7711
}
bshow :: Show a => a -> BS8.ByteString
bshow = BS8.pack . show
addjob :: ByteString -> ByteString -> Int -> Disque (Either Reply ByteString)
addjob q jobdata_ _timeout = Disque $ sendRequest ["ADDJOB", q, jobdata_, cs $ show _timeout]
getjob :: [ByteString] -> Disque (Either Reply Job)
getjob qs = Disque $ sendRequest $ ["GETJOB", "FROM"] ++ qs
getjobs :: [ByteString] -> Int -> Disque (Either Reply [Job])
getjobs qs cnt = Disque $ sendRequest $ ["GETJOB", "COUNT", bshow cnt, "FROM"] ++ qs
ackjob :: [ByteString] -> Disque (Either Reply ByteString)
ackjob js = Disque $ sendRequest $ "ACKJOB" : js
fastack :: [ByteString] -> Disque (Either Reply ByteString)
fastack js = Disque $ sendRequest $ "FASTACK" : js
working :: ByteString -> Disque (Either Reply ByteString)
working jid = Disque $ sendRequest ["WORKING", jid]
nack :: [ByteString] -> Disque (Either Reply ByteString)
nack js = Disque $ sendRequest $ "NACK" : js
info' :: Disque (Either Reply ByteString)
info' = Disque $ sendRequest ["INFO"]
hello :: Disque (Either Reply ByteString)
hello = Disque $ sendRequest ["HELLO"]
qlen :: ByteString -> Disque (Either Reply ByteString)
qlen qname = Disque $ sendRequest ["QLEN", qname]
qstat :: ByteString -> Disque (Either Reply ByteString)
qstat qname = Disque $ sendRequest ["QSTAT", qname]
qpeek :: ByteString -> ByteString -> Disque (Either Reply ByteString)
qpeek qname cnt = Disque $ sendRequest ["QPEEK", qname, cnt]
enqueue :: [ByteString] -> Disque (Either Reply ByteString)
enqueue jids = Disque $ sendRequest $ "ENQUEUE" : jids
dequeue :: [ByteString] -> Disque (Either Reply ByteString)
dequeue jids = Disque $ sendRequest $ "DEQUEUE" : jids
deljob :: [ByteString] -> Disque (Either Reply ByteString)
deljob jids = Disque $ sendRequest $ "ENQUEUE" : jids
jshow :: ByteString -> Disque (Either Reply ByteString)
jshow jid = Disque $ sendRequest ["SHOW", jid]
qscan :: ByteString -> Disque (Either Reply ByteString)
qscan _ = Disque $ sendRequest ["QSCAN"]
jscan :: ByteString -> Disque (Either Reply ByteString)
jscan _ = Disque $ sendRequest ["JSCAN"]
withGetJobs :: [ByteString] -> Int -> (Job -> Disque a) -> Disque [a]
withGetJobs qs jids f = do
Right jobs <- getjobs qs jids
mapM f jobs