-- Copyright (c) Gree, Inc. 2013
-- License: MIT-style

module Network.JobQueue.Backend.Zookeeper (
    openZookeeperBackend
  , newZookeeperBackend
  ) where

import qualified Database.Zookeeper as Z
import Control.Concurrent
import Control.Concurrent.STM
import Network.JobQueue.Backend.Types
import Network.JobQueue.Backend.Zookeeper.ZookeeperQueue

openZookeeperBackend :: String -> IO Backend
openZookeeperBackend endpoint = do
  Z.setDebugLevel Z.ZLogError
  zvar <- newTVarIO Nothing
  stateVar <- newTVarIO Z.ConnectingState
  _ <- forkIO $ Z.withZookeeper endpoint 100000 (Just $ watcher stateVar) Nothing $ \z -> do
    atomically $ do
      state <- readTVar stateVar
      case state of
        Z.ConnectingState -> retry
        _ -> return ()
    atomically $ writeTVar zvar (Just z)
    atomically $ do
      mz <- readTVar zvar
      case mz of
        Just _ -> retry
        Nothing -> return ()
  return $ Backend {
      bOpenQueue = openQueue zvar
    , bClose = atomically $ writeTVar zvar Nothing
    }
  where
    openQueue :: TVar (Maybe Z.Zookeeper) -> String -> IO (ZookeeperQueue)
    openQueue zvar queueName = do
      z <- atomically $ readTVar zvar >>= maybe retry return
      zq <- initZQueue z (basePath queueName) Z.OpenAclUnsafe
      return zq

    watcher :: TVar Z.State -> Z.Watcher
    watcher stateVar _z event state _mZnode = do
      case event of
        Z.SessionEvent -> atomically $ writeTVar stateVar state
        _ -> return ()

newZookeeperBackend :: Z.Zookeeper -> Backend
newZookeeperBackend zh = Backend {
      bOpenQueue = \queueName -> initZQueue zh (basePath queueName) Z.OpenAclUnsafe
      , bClose = return ()
    }

basePath :: String -> String
basePath queueName = case queueName of
  '/':_ -> queueName
  _ -> '/':queueName