module ZooKeeper.Recipe.Election
  ( election
  ) where

import           Control.Monad
import qualified Z.Data.Builder         as B
import           Z.Data.CBytes          (CBytes)
import qualified Z.IO.Logger            as Log

import           ZooKeeper
import           ZooKeeper.Recipe.Utils (SequenceNumWithGUID (..),
                                         createSeqEphemeralZNode,
                                         mkSequenceNumWithGUID)
import           ZooKeeper.Types

-- | Run a leader election process.
-- __IMPORTANT__: This function may run endlessly until it is selected
-- as the leader.
election :: ZHandle
         -- ^ The zookeeper handle obtained by a call to 'zookeeperResInit'
         -> CBytes
         -- ^ The path to start the election from. Ephemeral znodes will be
         -- put on it
         -> CBytes
         -- ^ The GUID for this zookeeper session. To handle recoverable execptions
         -- correctly, it should be distinct from different sessions.
         -> IO ()
         -- ^ The action to be executed when an leader is elected.
         -> (DataCompletion -> IO ())
         -- ^ The action to be executed when a watcher is set. It can be used to
         -- remind the user that one 'step' is finished.
         -> IO ()
-- TODO: Use user-configurable logger instead
election :: ZHandle
-> CBytes -> CBytes -> IO () -> (DataCompletion -> IO ()) -> IO ()
election ZHandle
zk CBytes
electionPath CBytes
guid IO ()
leaderApp DataCompletion -> IO ()
watchSetApp = IO () -> IO ()
Log.withDefaultLogger (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  let electionSeqPath :: CBytes
electionSeqPath = CBytes
electionPath CBytes -> CBytes -> CBytes
forall a. Semigroup a => a -> a -> a
<> CBytes
"/" CBytes -> CBytes -> CBytes
forall a. Semigroup a => a -> a -> a
<> CBytes
guid CBytes -> CBytes -> CBytes
forall a. Semigroup a => a -> a -> a
<> CBytes
"_"

  -- Check persistent paths
  Maybe StatCompletion
electionExists <- HasCallStack => ZHandle -> CBytes -> IO (Maybe StatCompletion)
ZHandle -> CBytes -> IO (Maybe StatCompletion)
zooExists ZHandle
zk CBytes
electionPath
  case Maybe StatCompletion
electionExists of
    Just StatCompletion
_  -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    Maybe StatCompletion
Nothing -> IO StringCompletion -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO StringCompletion -> IO ()) -> IO StringCompletion -> IO ()
forall a b. (a -> b) -> a -> b
$ HasCallStack =>
ZHandle
-> CBytes
-> Maybe Bytes
-> AclVector
-> CreateMode
-> IO StringCompletion
ZHandle
-> CBytes
-> Maybe Bytes
-> AclVector
-> CreateMode
-> IO StringCompletion
zooCreate ZHandle
zk CBytes
electionPath Maybe Bytes
forall a. Maybe a
Nothing AclVector
zooOpenAclUnsafe CreateMode
ZooPersistent

  -- Create Ephemeral and Sequece znode, and get the seq number i
  (StringCompletion CBytes
this) <- ZHandle -> CBytes -> CBytes -> IO StringCompletion
createSeqEphemeralZNode ZHandle
zk CBytes
electionPath CBytes
guid
  let thisSeqNumWithGUID :: SequenceNumWithGUID
thisSeqNumWithGUID = CBytes -> SequenceNumWithGUID
mkSequenceNumWithGUID CBytes
this
  HasCallStack => Builder () -> IO ()
Builder () -> IO ()
Log.debug (Builder () -> IO ()) -> (String -> Builder ()) -> String -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Builder ()
B.stringUTF8 (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Created SEQUENTIAL|EPHEMERAL ZNode " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> SequenceNumWithGUID -> String
forall a. Show a => a -> String
show SequenceNumWithGUID
thisSeqNumWithGUID

  -- Get the child that has the max seq number j < i
  (StringsCompletion (StringVector [CBytes]
children)) <- HasCallStack => ZHandle -> CBytes -> IO StringsCompletion
ZHandle -> CBytes -> IO StringsCompletion
zooGetChildren ZHandle
zk CBytes
electionPath
  let childrenSeqNumWithGUID :: [SequenceNumWithGUID]
childrenSeqNumWithGUID = CBytes -> SequenceNumWithGUID
mkSequenceNumWithGUID (CBytes -> SequenceNumWithGUID)
-> [CBytes] -> [SequenceNumWithGUID]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [CBytes]
children
  HasCallStack => Builder () -> IO ()
Builder () -> IO ()
Log.debug (Builder () -> IO ()) -> (String -> Builder ()) -> String -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Builder ()
B.stringUTF8 (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Children now: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> [SequenceNumWithGUID] -> String
forall a. Show a => a -> String
show [SequenceNumWithGUID]
childrenSeqNumWithGUID

  -- find max j < i
  case (SequenceNumWithGUID -> Bool)
-> [SequenceNumWithGUID] -> [SequenceNumWithGUID]
forall a. (a -> Bool) -> [a] -> [a]
filter (SequenceNumWithGUID -> SequenceNumWithGUID -> Bool
forall a. Ord a => a -> a -> Bool
< SequenceNumWithGUID
thisSeqNumWithGUID) [SequenceNumWithGUID]
childrenSeqNumWithGUID of
    [] -> do
      let smallest :: SequenceNumWithGUID
smallest = [SequenceNumWithGUID] -> SequenceNumWithGUID
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
minimum [SequenceNumWithGUID]
childrenSeqNumWithGUID
      HasCallStack => Builder () -> IO ()
Builder () -> IO ()
Log.debug (Builder () -> IO ()) -> (String -> Builder ()) -> String -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Builder ()
B.stringUTF8 (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Leader elected: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> SequenceNumWithGUID -> String
forall a. Show a => a -> String
show SequenceNumWithGUID
smallest
      IO ()
leaderApp
    [SequenceNumWithGUID]
xs -> do
      let toWatch :: CBytes
toWatch = CBytes
electionPath CBytes -> CBytes -> CBytes
forall a. Semigroup a => a -> a -> a
<> CBytes
"/" CBytes -> CBytes -> CBytes
forall a. Semigroup a => a -> a -> a
<> SequenceNumWithGUID -> CBytes
unSequenceNumWithGUID ([SequenceNumWithGUID] -> SequenceNumWithGUID
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum [SequenceNumWithGUID]
xs)
      HasCallStack => Builder () -> IO ()
Builder () -> IO ()
Log.debug (Builder () -> IO ()) -> (String -> Builder ()) -> String -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Builder ()
B.stringUTF8 (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Now watching: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> CBytes -> String
forall a. Show a => a -> String
show CBytes
toWatch
      -- add watch
      HasCallStack =>
ZHandle
-> CBytes
-> (HsWatcherCtx -> IO ())
-> (DataCompletion -> IO ())
-> IO ()
ZHandle
-> CBytes
-> (HsWatcherCtx -> IO ())
-> (DataCompletion -> IO ())
-> IO ()
zooWatchGet ZHandle
zk CBytes
toWatch (CBytes -> SequenceNumWithGUID -> HsWatcherCtx -> IO ()
forall t. t -> SequenceNumWithGUID -> HsWatcherCtx -> IO ()
callback CBytes
electionSeqPath SequenceNumWithGUID
thisSeqNumWithGUID) DataCompletion -> IO ()
watchSetApp
  where
    callback :: t -> SequenceNumWithGUID -> HsWatcherCtx -> IO ()
callback t
electionSeqPath SequenceNumWithGUID
thisSeqNumWithGUID HsWatcherCtx{Maybe CBytes
ZooEvent
ZooState
ZHandle
watcherCtxPath :: HsWatcherCtx -> Maybe CBytes
watcherCtxState :: HsWatcherCtx -> ZooState
watcherCtxType :: HsWatcherCtx -> ZooEvent
watcherCtxZHandle :: HsWatcherCtx -> ZHandle
watcherCtxPath :: Maybe CBytes
watcherCtxState :: ZooState
watcherCtxType :: ZooEvent
watcherCtxZHandle :: ZHandle
..} = do
      HasCallStack => Builder () -> IO ()
Builder () -> IO ()
Log.debug (Builder () -> IO ()) -> (String -> Builder ()) -> String -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Builder ()
B.stringUTF8 (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Watch triggered, some node failed."
      (StringsCompletion (StringVector [CBytes]
children)) <- HasCallStack => ZHandle -> CBytes -> IO StringsCompletion
ZHandle -> CBytes -> IO StringsCompletion
zooGetChildren ZHandle
watcherCtxZHandle CBytes
electionPath
      let childrenSeqNumWithGUID :: [SequenceNumWithGUID]
childrenSeqNumWithGUID = CBytes -> SequenceNumWithGUID
mkSequenceNumWithGUID (CBytes -> SequenceNumWithGUID)
-> [CBytes] -> [SequenceNumWithGUID]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [CBytes]
children
      let smallest :: SequenceNumWithGUID
smallest = [SequenceNumWithGUID] -> SequenceNumWithGUID
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
minimum [SequenceNumWithGUID]
childrenSeqNumWithGUID
      case SequenceNumWithGUID
smallest SequenceNumWithGUID -> SequenceNumWithGUID -> Bool
forall a. Eq a => a -> a -> Bool
== SequenceNumWithGUID
thisSeqNumWithGUID of
        Bool
True  -> do
          HasCallStack => Builder () -> IO ()
Builder () -> IO ()
Log.debug (Builder () -> IO ()) -> (String -> Builder ()) -> String -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Builder ()
B.stringUTF8 (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Leader elected: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> SequenceNumWithGUID -> String
forall a. Show a => a -> String
show SequenceNumWithGUID
smallest
          IO ()
leaderApp
        Bool
False -> do
          -- find max j < i
          case (SequenceNumWithGUID -> Bool)
-> [SequenceNumWithGUID] -> [SequenceNumWithGUID]
forall a. (a -> Bool) -> [a] -> [a]
filter (SequenceNumWithGUID -> SequenceNumWithGUID -> Bool
forall a. Ord a => a -> a -> Bool
< SequenceNumWithGUID
thisSeqNumWithGUID) [SequenceNumWithGUID]
childrenSeqNumWithGUID of
            [] -> HasCallStack => Builder () -> IO ()
Builder () -> IO ()
Log.fatal (Builder () -> IO ()) -> (String -> Builder ()) -> String -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Builder ()
B.stringUTF8 (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"The 'impossible' happened!"
            [SequenceNumWithGUID]
xs -> do
              let toWatch :: CBytes
toWatch = CBytes
electionPath CBytes -> CBytes -> CBytes
forall a. Semigroup a => a -> a -> a
<> CBytes
"/" CBytes -> CBytes -> CBytes
forall a. Semigroup a => a -> a -> a
<> SequenceNumWithGUID -> CBytes
unSequenceNumWithGUID ([SequenceNumWithGUID] -> SequenceNumWithGUID
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum [SequenceNumWithGUID]
xs )
              HasCallStack => Builder () -> IO ()
Builder () -> IO ()
Log.debug (Builder () -> IO ()) -> (String -> Builder ()) -> String -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Builder ()
B.stringUTF8 (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Now watching: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> CBytes -> String
forall a. Show a => a -> String
show CBytes
toWatch
              -- add watch
              HasCallStack =>
ZHandle
-> CBytes
-> (HsWatcherCtx -> IO ())
-> (DataCompletion -> IO ())
-> IO ()
ZHandle
-> CBytes
-> (HsWatcherCtx -> IO ())
-> (DataCompletion -> IO ())
-> IO ()
zooWatchGet ZHandle
zk CBytes
toWatch (t -> SequenceNumWithGUID -> HsWatcherCtx -> IO ()
callback t
electionSeqPath SequenceNumWithGUID
thisSeqNumWithGUID) DataCompletion -> IO ()
watchSetApp