module Network.JobQueue.Backend.Zookeeper.ZookeeperQueue (
ZookeeperQueue
, initZQueue
, readZQueue
, peekZQueue
, updateZQueue
, deleteZQueue
, writeZQueue
, destroyZQueue
, listZQueue
, itemsZQueue
, countZQueue
) where
import qualified Database.Zookeeper as Z
import qualified Data.ByteString.Char8 as C
import Control.Exception hiding (handle)
import Data.List
import Control.Monad
import Data.Maybe
import Data.List.Split
import Network.JobQueue.Backend.Class
import Network.JobQueue.Backend.Types
data ZookeeperQueue = ZookeeperQueue {
zqHandle :: Z.Zookeeper
, zqBasePath :: String
, zqNodeName :: String
, zqAcls :: Z.AclList
}
instance BackendQueue ZookeeperQueue where
readQueue = readZQueue
peekQueue = peekZQueue
updateQueue = updateZQueue
deleteQueue = deleteZQueue
writeQueue = writeZQueue
listQueue = listZQueue
itemsQueue = itemsZQueue
countQueue = countZQueue
maxPrio :: Int
maxPrio = 999
minPrio :: Int
minPrio = 999
qnPrefix :: String
qnPrefix = "qn-"
initZQueue :: Z.Zookeeper -> String -> Z.AclList -> IO (ZookeeperQueue)
initZQueue z path acls = do
e <- createZnodeRecursively z path Nothing acls []
case e of
Right _ -> return ()
Left zkerr -> throwZKError "initZQueue" zkerr
return (ZookeeperQueue z path qnPrefix acls)
readZQueue :: ZookeeperQueue -> IO (Maybe (C.ByteString, String))
readZQueue zkQueue = do
children <- getChildren zkQueue
case children of
[] -> return (Nothing)
_ -> takeHead (sortChildren children)
where
takeHead [] = return (Nothing)
takeHead (nodeName:xs) = do
let path = zqBasePath zkQueue ++ "/" ++ nodeName
e <- Z.get (zqHandle zkQueue) path Nothing
case e of
Right (Just value, _stat) -> do
e' <- Z.delete (zqHandle zkQueue) path Nothing
case e' of
Right () -> return (Just (value, nodeName))
Left _zkerr -> do
r <- Z.exists (zqHandle zkQueue) path Nothing
case r of
Right _stat -> takeHead (nodeName:xs)
Left Z.NoNodeError -> takeHead xs
Left zkerr -> throwZKError "readZQueue" zkerr
Right (Nothing, _stat) -> takeHead xs
Left Z.NoNodeError -> return (Nothing)
Left zkerr -> throwZKError "readZQueue" zkerr
peekZQueue :: ZookeeperQueue -> IO (Maybe (C.ByteString, String, String, Int))
peekZQueue zkQueue = do
children <- getChildren zkQueue
case children of
[] -> return Nothing
_ -> getHead (sortChildren children)
where
idSuffixLen :: Int
idSuffixLen = 10
getHead :: [String] -> IO (Maybe (C.ByteString, String, String, Int))
getHead [] = return Nothing
getHead (x:xs) = do
e <- Z.get (zqHandle zkQueue) (fullPath zkQueue x) Nothing
case e of
Right (mValue, stat) -> do
case mValue of
Just v -> return $ Just (v, x, drop (length x idSuffixLen) x, fromIntegral $ Z.statVersion stat)
Nothing -> getHead xs
Left Z.NoNodeError -> peekZQueue zkQueue
Left zkerr -> throwZKError "peekZQueue" zkerr
updateZQueue :: ZookeeperQueue -> String -> C.ByteString -> Int -> IO (Bool)
updateZQueue zkQueue znodeName value version = do
e <- Z.set (zqHandle zkQueue) (fullPath zkQueue znodeName) (Just value) (Just (fromIntegral version))
case e of
Right _stat -> return (True)
Left Z.BadVersionError -> return (False)
Left Z.NoNodeError -> return (False)
Left zkerr -> throwZKError "updateZQueue" zkerr
deleteZQueue :: ZookeeperQueue -> String -> IO (Bool)
deleteZQueue zkQueue nodeName = do
let nodeName' = zqBasePath zkQueue ++ "/" ++ nodeName
e <- Z.delete (zqHandle zkQueue) nodeName' Nothing
case e of
Right () -> return (True)
Left Z.NoNodeError -> throwIO $ NotFound nodeName
Left zkerr -> throwZKError ("deleteZQueue(nodeName=" ++ nodeName' ++ ")") zkerr
writeZQueue :: ZookeeperQueue -> C.ByteString -> Int -> IO (String)
writeZQueue zkQueue value prio = do
r <- Z.create (zqHandle zkQueue)
(zqBasePath zkQueue ++ "/" ++ (nodePrefix (zqNodeName zkQueue) prio))
(Just value)
(zqAcls zkQueue)
[Z.Sequence]
case r of
Right znode -> return $ head $ reverse (splitOn "/" znode)
Left zkerr -> throwZKError "writeZQueue" zkerr
destroyZQueue :: ZookeeperQueue -> IO ()
destroyZQueue _zkQueue = return ()
listZQueue :: ZookeeperQueue -> IO ([C.ByteString])
listZQueue zkQueue = do
results <- getChildren zkQueue
values <- forM (sortChildren results) getItem
return (catMaybes values)
where
getItem x = do
e <- Z.get (zqHandle zkQueue) (zqBasePath zkQueue ++ "/" ++ x) Nothing
case e of
Right (mValue, _stat) -> return (mValue)
Left Z.NoNodeError -> return (Nothing)
Left zkerr -> throwZKError "listZQueue" zkerr
itemsZQueue :: ZookeeperQueue -> IO ([String])
itemsZQueue zkQueue = do
items <- getChildren zkQueue
return (sortChildren items)
countZQueue :: ZookeeperQueue -> IO (Int)
countZQueue zkQueue = do
items <- getChildren zkQueue
return (length items)
getChildren :: ZookeeperQueue -> IO ([String])
getChildren zkQueue = do
e <- Z.getChildren (zqHandle zkQueue) (zqBasePath zkQueue) Nothing
case e of
Right results -> return (results)
Left zkerr -> throwZKError "getChildren" zkerr
sortChildren :: [String] -> [String]
sortChildren = sort . filter (isPrefixOf qnPrefix)
fullPath :: ZookeeperQueue -> String -> String
fullPath zkQueue x = (zqBasePath zkQueue ++ "/" ++ x)
nodePrefix :: String -> Int -> String
nodePrefix base prio = base ++ priorityPart' ++ "-"
where
priority = if prio > maxPrio then maxPrio else (if prio < minPrio then minPrio else prio)
plus = priority >= 0
priorityPart = show $ if plus then abs priority else maxPrio + 1 + priority
priorityPart' = (if plus then "0" else "-")
++ (take (3 length priorityPart) $ repeat '0')
++ priorityPart
throwZKError :: String -> Z.ZKError -> IO a
throwZKError func zkerr = throwIO $ SessionError (func ++ ": " ++ show zkerr)
createZnodeRecursively :: Z.Zookeeper -> String -> Maybe C.ByteString -> Z.AclList -> [Z.CreateFlag] -> IO (Either Z.ZKError String)
createZnodeRecursively z path mData acls flags = do
createZnodeRecursively' z (reverse $ splitOn "/" path) mData acls flags
createZnodeRecursively' :: Z.Zookeeper -> [String] -> Maybe C.ByteString -> Z.AclList -> [Z.CreateFlag] -> IO (Either Z.ZKError String)
createZnodeRecursively' _ [] _ _ _ = return $ Right "/"
createZnodeRecursively' _ ("":[]) _ _ _ = return $ Right "/"
createZnodeRecursively' z revZnodes value acls cflags = do
let path = intercalate "/" (reverse revZnodes)
eStats <- Z.exists z path Nothing
case eStats of
Right _stat -> return $ Right path
Left Z.NoNodeError -> do
e <- createZnodeRecursively' z (tail revZnodes) Nothing acls cflags
case e of
Right _ -> do
r <- Z.create z path value acls cflags
return $ case r of
Right newPath -> Right newPath
Left zkerr -> Left zkerr
Left zkerr -> return (Left zkerr)
Left zkerr -> return (Left zkerr)