module RedisJobQueue (
withJobQueue,
withJobQueue',
JobQueue,
push,
pop,
pushJson,
popJson,
QueueName,
Job,
Priority
) where
import Database.Redis
import Data.ByteString
import qualified Data.Aeson as A
import qualified Data.ByteString.Lazy as LBS
import Data.ByteString.Char8 as BS8
type QueueName = ByteString
type Job = ByteString
type Priority = Double
data JobQueue = JobQueue QueueName Connection
withJobQueue :: QueueName -> (JobQueue -> IO ()) -> IO ()
withJobQueue = withJobQueue' defaultConnectInfo
withJobQueue' :: ConnectInfo -> QueueName -> (JobQueue -> IO ()) -> IO ()
withJobQueue' connInfo qn f = do
conn <- connect connInfo
f $ JobQueue qn conn
push :: JobQueue -> Priority -> Job -> IO (Either Reply Integer)
push (JobQueue qn conn) p j = runRedis conn $ zadd qn [(p,j)]
pop :: JobQueue -> IO (Either Reply (Maybe Job))
pop (JobQueue qn conn) = runRedis conn $ do
_ <- watch [qn]
js <- zrange qn 0 0
case js of
Right (j:_) -> do
r <- multiExec $ zrem qn [j]
case r of
TxSuccess _ -> return $ Right (Just j)
TxAborted -> return $ Left $ Error "aborted"
TxError err -> return $ Left $ Error $ BS8.pack err
Right [] -> do
_ <- unwatch
return $ Right Nothing
Left e -> return $ Left e
pushJson :: A.ToJSON a => JobQueue -> Priority -> a -> IO (Either Reply Integer)
pushJson jq p j = push jq p $ LBS.toStrict $ A.encode j
popJson :: A.FromJSON a => JobQueue -> IO (Either String (Maybe a))
popJson jq = do
mj <- pop jq
case mj of
Right (Just j) -> return $ A.eitherDecode $ LBS.fromStrict j
Right Nothing -> return $ Right Nothing
Left l -> return $ Left (show l)