{-# LANGUAGE CPP #-}
-------------------------------------------------------------------------------
-- |
-- Module     : Registry.hs
-- Copyright  : (c) Tobias Schoofs
-- License    : LGPL 
-- Stability  : experimental
-- Portability: non-portable
-- 
-- Backoffice for majordomo broker
-------------------------------------------------------------------------------
module Registry 
#ifndef TEST
       (Registry, newReg,
        insert, remove, 
        checkWorker, getWorker, freeWorker, 
        getServiceName,
        updWorkerHb, lookupService, 
        clean, setHbPeriod, 
        size, stat, statPerService,
        printQ, getQ)
#endif
where

  import           Heartbeat
  import           Network.Mom.Patterns.Types (Msec, Identity)

  import           Control.Concurrent
  import           Control.Applicative ((<$>))

  import qualified Data.ByteString.Char8 as B
  import           Data.Map   (Map)
  import qualified Data.Map   as   M -- filter, keys
  import qualified Data.Sequence as  S
  import           Data.Sequence (Seq, (|>), (<|), (><), ViewL(..), ViewR(..)) 
  import           Data.Foldable (toList)
  import           Data.Time.Clock
                 
  ------------------------------------------------------------------------
  -- Registry 
  ------------------------------------------------------------------------
  type Registry = MVar Reg
  data Reg      = NewReg {
                     rgHb  :: Msec,
                     rgSrv :: SrvTree,
                     rgWrk :: WrkTree,
                     rgS   :: Seq B.ByteString
                  }

  newReg :: Msec -> IO Registry
  newReg ms = newMVar NewReg {
                    rgHb  = ms,
                    rgSrv = M.empty,
                    rgWrk = M.empty,
                    rgS   = S.empty}

  modifyReg :: Registry -> (Reg -> IO (Reg, r)) -> IO r
  modifyReg = modifyMVar 

  modifyReg_ :: Registry -> (Reg -> IO Reg) -> IO ()
  modifyReg_ = modifyMVar_ 

  withReg :: Registry -> (Reg -> IO r) -> IO r
  withReg = withMVar 

  ------------------------------------------------------------------------
  -- Worker may be free or busy
  ------------------------------------------------------------------------
  data State = Free | Busy 
    deriving (Show, Eq)
                 
  ------------------------------------------------------------------------
  -- Worker 
  ------------------------------------------------------------------------
  data Worker = Worker {
                  wrkId    :: Identity,
                  wrkHB    :: Heartbeat,
                  wrkQ     :: MVar Queue
                }

  -------------------------------------------------------------------------
  -- Set common heartbeat period
  -------------------------------------------------------------------------
  setHbPeriod :: Registry -> Msec -> IO ()
  setHbPeriod r hb = modifyReg_ r $ \reg -> 
                       return reg{rgHb = hb}

  -------------------------------------------------------------------------
  -- Update worker's heartbeat descriptor according to function
  -------------------------------------------------------------------------
  updHb :: (UTCTime -> Heartbeat -> Heartbeat) -> 
            UTCTime -> WrkNode -> WrkNode
  updHb upd t (i, w) = (i, w {wrkHB = upd t $ wrkHB w})

  ------------------------------------------------------------------------
  -- A Worker Node is an Identity paired with a worker (lookup)
  -- A Worker Tree is a map of identities and services,
  -- where a service is basically a queue of free and busy workers;
  -- the worker tree, hence, does not point directly to the worker,
  -- but to the queue where the worker can be found:
  --
  -- tree
  --  |
  --  ----> (i,srv)
  --            |
  --            ----> (sn,srvQ)
  --                       |
  --                       -----> Free Seq (i,wrk)
  --                       -----> Busy Seq (i,wrk)
  -------------------------------------------------------------------------
  type WrkNode = (Identity, Worker)
  type WrkTree = Map Identity Service

  -------------------------------------------------------------------------
  -- A service is a queue of free and busy workers
  -------------------------------------------------------------------------
  data Service = Service {
                   srvName :: B.ByteString,
                   srvQ    :: MVar Queue
                 }

  -------------------------------------------------------------------------
  -- A service node is a pair of (ServiceName, Service)
  -- A service tree is a map  of ServiceName and Service
  -------------------------------------------------------------------------
  type SrvNode = (B.ByteString, Service)
  type SrvTree = Map B.ByteString Service

  -------------------------------------------------------------------------
  -- Register new worker for service
  -------------------------------------------------------------------------
  insert :: Registry -> Identity -> B.ByteString -> IO ()
  insert r i s = modifyReg_ r $ \reg -> 
    case M.lookup i (rgWrk reg) of
      Just _  -> return reg -- identity already known
      Nothing -> do
        (reg2, sn) <- case M.lookup s (rgSrv reg) of
                        Just x  -> return (reg, x) -- service exists
                        Nothing -> do              -- new service
                          q <- newMVar $ Q S.empty S.empty
                          let x  = Service {
                                    srvName = s,
                                    srvQ    = q}
                          return (reg {rgSrv = M.insert s x (rgSrv reg),
                                       rgS   = rgS reg |> s}, x)
        hb <- newHeartbeat (rgHb reg2)
        initW  i hb sn                  -- add worker to service queue
        return reg2 {rgWrk =            -- insert worker into wrk tree
          M.insert i sn $ rgWrk reg2}

  -------------------------------------------------------------------------
  -- Remove worker 
  -------------------------------------------------------------------------
  remove :: Registry -> Identity -> IO ()
  remove r i = modifyReg_ r $ \reg -> 
    case M.lookup i (rgWrk reg) of
      Nothing -> return reg                        -- worker does not exist
      Just sn -> do
        let reg2 = reg {rgWrk = M.delete i (rgWrk reg)} -- delete from tree
        e <- modifyMVar (srvQ sn) $ \q ->               -- remove from queue
               let q' = removeQ i q
                   e  = emptyQ q'
                in return (q', e)
        if e then return reg2{rgSrv =                -- remove empty service
                    M.delete (srvName sn) (rgSrv reg2)}
             else return reg2

  -------------------------------------------------------------------------
  -- Free a worker that has been busy
  -- updating his heatbeat
  -------------------------------------------------------------------------
  freeWorker :: Registry -> Identity -> IO ()
  freeWorker r i = getCurrentTime >>= \now ->
                     freeWorkerWithUpd r i (updHb updHim now)

  -------------------------------------------------------------------------
  -- Free a worker that has been busy
  -- updating heartbeat (because we apparently have feedback )
  -------------------------------------------------------------------------
  freeWorkerWithUpd :: Registry -> Identity -> (WrkNode -> WrkNode) -> IO ()
  freeWorkerWithUpd r i upd = withReg r $ \reg -> 
    case M.lookup i (rgWrk reg) of
      Nothing -> return ()                       -- worker does not exist
      Just sn -> modifyMVar_ (srvQ sn) $ \q -> 
                   return $ setStateQ i Free upd q -- set free and upd hb 

  -------------------------------------------------------------------------
  -- Update heartbeat
  -------------------------------------------------------------------------
  updWorkerHb :: Registry -> Identity -> IO ()
  updWorkerHb r i = getCurrentTime >>= \now -> 
                      updWorker r i (updHb updHim now)

  -------------------------------------------------------------------------
  -- Generic worker update
  -------------------------------------------------------------------------
  updWorker :: Registry -> Identity -> (WrkNode -> WrkNode) -> IO ()
  updWorker r i f = withReg r $ \reg ->
    case M.lookup i (rgWrk reg) of
      Nothing -> return ()        -- worker does not exist
      Just sn -> modifyMVar_ (srvQ sn) $ \q -> 
                   return $ updateQ i f q -- update worker

  -------------------------------------------------------------------------
  -- Get service by worker identity
  -------------------------------------------------------------------------
  getServiceName :: Registry -> Identity -> IO (Maybe B.ByteString)
  getServiceName r i = withReg r $ \reg -> 
    case M.lookup i (rgWrk reg) of
      Nothing -> return Nothing
      Just s  -> return $ Just (srvName s)

  -------------------------------------------------------------------------
  -- Check whether service exists
  -------------------------------------------------------------------------
  lookupService :: Registry -> B.ByteString -> IO Bool
  lookupService r s = do
    mbS <- lookupS r s
    case mbS of
      Nothing -> return False
      Just  _ -> return True

  -------------------------------------------------------------------------
  -- Get worker for service:
  --   - removing unresponsive workers on the way 
  --   - get first of free q, put it to the busy queue 
  --                          and update its heartbeat
  --   - if free q is emtpy, take the first of the busy queue
  -------------------------------------------------------------------------
  getWorker :: Registry -> B.ByteString -> IO (Maybe Identity)
  getWorker r s = do
    now <- getCurrentTime
    mbS <- lookupS r s
    case mbS of
      Nothing -> return Nothing -- service does not exist
      Just sn -> do
        bury r now sn    -- remove non-responsive workers
        modifyMVar (srvQ sn) $ \q ->
          case firstFreeQ q of -- get first of free
            Just (i,_) ->
              return (setStateQ i Busy (updHb updMe now) q, Just i)
            Nothing    -> 
              case firstBusyQ q of
                Nothing    -> return (q, Nothing)
                Just (i,_) -> return (
                      setStateQ i Busy (updHb updMe now) q, Just i)
          
  -------------------------------------------------------------------------
  -- Remove non-responsive workers
  -- and heartbeat those that have been inactive 
  --     for at least one heartbeat period
  -------------------------------------------------------------------------
  checkWorker :: Registry -> IO [Identity]
  checkWorker r = do
    now <- getCurrentTime
    checkService r >>= \mbS -> case mbS of
                                 Nothing -> return ()
                                 Just s  -> bury r now s
    srv <- M.elems <$> withReg r (return . rgSrv) 
    concat <$> mapM (getWorkers now) srv

    where getWorkers now s = do
            is <- withMVar (srvQ s) ( 
                    return . map fst . takeIfQ Free tst)
            mapM_ (\i -> updWorker r i updSt) is
            return is
            where updSt (i,w) = let hb  = updMe now $ wrkHB w
                                 in (i, w{wrkHB = hb}) 
                  tst = checkMe now . wrkHB . snd

  -------------------------------------------------------------------------
  -- Get next service to check
  -- check on the way if service still exists
  -- if not, remove service and try next.
  -- Note: there is no upper bound;
  --       if a lot of services have been removed since our last visit
  --          we will go through this many times
  -------------------------------------------------------------------------
  checkService :: Registry -> IO (Maybe Service)
  checkService r = do 
    mbS <- modifyReg r $ \reg -> do
      let (s, mbS) = getS (rgS reg)
      return (reg {rgS = s}, mbS)
    case mbS of 
      Nothing -> return Nothing
      Just s  -> do
        mbSrv <- lookupS r s
        case mbSrv of
          Nothing  -> rm >> checkService r
          Just srv -> return $ Just srv
    where getS s = 
            case S.viewl s of
              EmptyL  -> (s, Nothing)
              x :< xs -> (xs |> x, Just x)
          rmTail s = 
            case S.viewr s of
              EmptyR  -> s
              xs :> _ -> xs
          rm = modifyReg_ r $ \reg -> 
                 return reg {rgS = rmTail (rgS reg)}

  -------------------------------------------------------------------------
  -- Remove unresponsive workers
  -------------------------------------------------------------------------
  bury :: Registry -> UTCTime -> Service -> IO ()
  bury r now sn = do 
    q <- readMVar $ srvQ sn -- we use remove on registry, so don't block
    mapM_ (remove r . fst) $ findDead $ qFree q
    mapM_ (remove r . fst) $ findDead $ qBusy q
    where findDead = filter (not . alive now . wrkHB . snd) . toList

  -------------------------------------------------------------------------
  -- Clean up registry
  -------------------------------------------------------------------------
  clean :: Registry -> IO ()
  clean r = modifyReg_ r $ \reg -> return reg {
                                     rgWrk = M.empty,
                                     rgSrv = M.empty,
                                     rgS   = S.empty}

  -------------------------------------------------------------------------
  -- Number of workers
  -------------------------------------------------------------------------
  size :: Registry -> IO Int
  size r = withReg r $ \reg -> return $ M.size (rgWrk reg)

  -------------------------------------------------------------------------
  -- Per service:
  --     number of free workers
  --     number of busy workers
  -------------------------------------------------------------------------
  statPerService :: Registry -> B.ByteString -> IO (B.ByteString, Int, Int)
  statPerService r s = do
    mbS <- lookupS r s
    case mbS of
      Nothing -> return (s, 0, 0)
      Just sn -> perService (s, sn)

  -------------------------------------------------------------------------
  -- For all services:
  --     number of free workers
  --     number of busy workers
  -------------------------------------------------------------------------
  stat :: Registry -> IO [(B.ByteString, Int, Int)]
  stat r = withReg r $ \reg -> 
             mapM perService $ M.assocs (rgSrv reg)

  -------------------------------------------------------------------------
  -- Get stats per service
  -------------------------------------------------------------------------
  perService :: SrvNode -> IO (B.ByteString, Int, Int)
  perService (s, sn) = withMVar (srvQ sn) $ \q ->
                         return (s, S.length $ qFree q,
                                    S.length $ qBusy q)

  -------------------------------------------------------------------------
  -- Debug: print q
  -------------------------------------------------------------------------
  printQ :: Registry -> B.ByteString -> IO ()
  printQ r s = do
    mbS <- lookupS r s
    case mbS of
      Nothing -> return ()
      Just x  -> do q <- readMVar (srvQ x)
                    putStr "Free: "
                    print $ map fst $ toList $ qFree q
                    putStrLn ""
                    putStr "Busy: "
                    print $ map fst $ toList $ qBusy q 

  -------------------------------------------------------------------------
  -- Create worker
  --        initialising heartbeat
  --        inserting    into service queue
  -------------------------------------------------------------------------
  initW :: Identity -> Heartbeat -> Service -> IO ()
  initW i hb sn = modifyMVar_ (srvQ sn) $ \q -> do
                    let w = Worker {
                              wrkId    = i,
                              wrkHB    = hb,
                              wrkQ     = srvQ sn}
                    return $ insertQ (i,w) q
                    
  -------------------------------------------------------------------------
  -- Lookup service
  -------------------------------------------------------------------------
  lookupS :: Registry -> B.ByteString -> IO (Maybe Service)
  lookupS r sn = withMVar r $ \reg -> 
                   return $ M.lookup sn (rgSrv reg)

  -------------------------------------------------------------------------
  -- Queue:
  --    Seq free workers
  --    Seq busy workers
  -------------------------------------------------------------------------
  data Queue = Q {
                 qFree :: Seq WrkNode,
                 qBusy :: Seq WrkNode 
               }

  -------------------------------------------------------------------------
  -- Get a worker (either from free or busy)
  -------------------------------------------------------------------------
  getQ :: Identity -> Queue -> Maybe WrkNode
  getQ i q = lookupQ i Free q ~> lookupQ i Busy q

  -------------------------------------------------------------------------
  -- lookup one seq, either
  --    free or
  --    busy
  -------------------------------------------------------------------------
  lookupQ :: Identity -> State -> Queue -> Maybe WrkNode
  lookupQ i s q = 
      let r = toList $ snd $ S.breakl (eq i) l
       in if null r then Nothing else Just $ head r
    where l  = getList q s
          
  -------------------------------------------------------------------------
  -- Insert worker into q (always free)
  -------------------------------------------------------------------------
  insertQ :: WrkNode -> Queue -> Queue
  insertQ w q = q{qFree = qFree q |> w}

  -------------------------------------------------------------------------
  -- Remove worker from q (either free or busy)
  -------------------------------------------------------------------------
  removeQ :: Identity -> Queue -> Queue
  removeQ i = removeWithStateQ i Free . removeWithStateQ i Busy

  -------------------------------------------------------------------------
  -- Remove worker from q depending on state
  -------------------------------------------------------------------------
  removeWithStateQ :: Identity -> State -> Queue -> Queue
  removeWithStateQ i s q = 
    case S.viewl t of
      EmptyL    -> q
      (_ :< xs) -> case s of
                     Free -> q{qFree = h >< xs}
                     Busy -> q{qBusy = h >< xs}
    where (h,t) = getWithStateQ i s q

  -------------------------------------------------------------------------
  -- Generic update of a worker in a queue
  -------------------------------------------------------------------------
  updateQ :: Identity -> (WrkNode -> WrkNode) -> Queue -> Queue
  updateQ i f = updateWithStateQ i Free f . updateWithStateQ i Busy f 

  -------------------------------------------------------------------------
  -- Generic update of a worker in a queue depending on state
  -------------------------------------------------------------------------
  updateWithStateQ :: Identity -> State    -> 
                      (WrkNode -> WrkNode) -> Queue -> Queue
  updateWithStateQ i s f q =
    case S.viewl t of
      EmptyL    -> q
      (x :< xs) -> case s of
                     Free -> q{qFree = h >< f x <| xs}
                     Busy -> q{qBusy = h >< f x <| xs}
    where (h,t) = getWithStateQ i s q

  -------------------------------------------------------------------------
  -- Remove worker from Seq with state x and
  -- Add    it     to   Seq with state y
  -------------------------------------------------------------------------
  setStateQ :: Identity -> State -> (WrkNode -> WrkNode) -> Queue -> Queue
  setStateQ i s f q = case S.viewl t of
                        EmptyL    -> q
                        (x :< xs) -> case s of 
                                       Free -> q{qFree = qFree q |> f x,
                                                 qBusy = h >< xs}
                                       Busy -> q{qBusy = qBusy q |> f x,
                                                 qFree = h >< xs}
    where (h,t) = let s' = case s of
                             Free -> Busy
                             Busy -> Free
                   in getWithStateQ i s' q 

  -------------------------------------------------------------------------
  -- Get view of Seq, breaking on worker
  -- either free or busy
  -------------------------------------------------------------------------
  getWithStateQ :: Identity -> State -> Queue -> (Seq WrkNode, Seq WrkNode)
  getWithStateQ i s q = case s of
                          Free -> S.breakl (eq i) $ qFree q
                          Busy -> S.breakl (eq i) $ qBusy q

  -------------------------------------------------------------------------
  -- Queue is empty
  -------------------------------------------------------------------------
  emptyQ :: Queue -> Bool
  emptyQ q = S.null (qFree q) && S.null (qBusy q)

  -------------------------------------------------------------------------
  -- Head of Seq free
  -------------------------------------------------------------------------
  firstFreeQ :: Queue -> Maybe WrkNode
  firstFreeQ q = firstQ (qFree q)

  -------------------------------------------------------------------------
  -- Head of Seq busy
  -------------------------------------------------------------------------
  firstBusyQ :: Queue -> Maybe WrkNode
  firstBusyQ q = firstQ (qBusy q)

  -------------------------------------------------------------------------
  -- Head
  -------------------------------------------------------------------------
  firstQ :: Seq WrkNode -> Maybe WrkNode
  firstQ f = case S.viewl f of
               EmptyL   -> Nothing
               (w :< _) -> Just w

  -------------------------------------------------------------------------
  -- Take those that fulfil f with state s
  -------------------------------------------------------------------------
  takeIfQ :: State -> (WrkNode -> Bool) -> Queue -> [WrkNode]
  takeIfQ s f q = case s of
                      Free -> takeIf f $ qFree q
                      Busy -> takeIf f $ qBusy q

  -------------------------------------------------------------------------
  -- Take n that fulfil f from Seq
  -------------------------------------------------------------------------
  takeIf :: (WrkNode -> Bool) -> Seq WrkNode -> [WrkNode]
  takeIf f s = case S.viewl s of
                 EmptyL    -> []
                 (w :< ws) | f w       -> w : takeIf f ws
                           | otherwise ->     takeIf f ws

  eq :: Identity -> WrkNode -> Bool
  eq i = (== i) . fst 

  getList :: Queue -> State -> Seq WrkNode
  getList q s = case s of
                  Free -> qFree q
                  Busy -> qBusy q

  -------------------------------------------------------------------------
  -- Simple Maybe combinator
  -------------------------------------------------------------------------
  infixl 9 ~>
  (~>) :: Maybe a -> Maybe a -> Maybe a
  (~>) f g = case f of
               Nothing -> g
               Just x  -> Just x