-------------------------------------------------------------------------------
-- |
-- Module     : Network/Mom/Patterns/Broker/Server.hs
-- Copyright  : (c) Tobias Schoofs
-- License    : LGPL 
-- Stability  : experimental
-- Portability: non-portable
-- 
-- Majordomo Server
-------------------------------------------------------------------------------
module Network.Mom.Patterns.Broker.Server (withServer)
where

  import           Prelude hiding (catch)
  import           Control.Exception (throwIO, finally)
  import           Control.Monad.Trans (liftIO)
  import           Control.Monad (when)
  import           Control.Concurrent.MVar
  import           Data.Conduit ((=$), (=$=), ($$))
  import qualified Data.Conduit    as C
  import qualified Data.ByteString as B

  import           Network.Mom.Patterns.Types
  import           Network.Mom.Patterns.Streams
  import           Network.Mom.Patterns.Broker.Common
  import           Heartbeat -- time arithmetics
  import           Data.Time.Clock 

  ------------------------------------------------------------------------
  -- | Start a server as a background process
  -- 
  --   * 'Context'   - The zeromq context
  --  
  --   * 'Service'   - Service name; 
  --                   the service name is used to register
  --                   at the broker.
  -- 
  --   * 'Msec'      - Heartbeat in Milliseconds;
  --                   must be synchronised with the broker heartbeat
  --
  --   * 'String'    - The address to link to
  --
  --   * 'OnError_'  - Error handler
  --  
  --   * 'Conduit_'  - The application-defined stream transformer;
  --                   the conduit receives the request as input stream
  --                   and should create the output stream that is
  --                   internally sent back to the client
  --
  --   * 'Control' a - Control action
  ------------------------------------------------------------------------
  withServer :: Context     ->
                Service     -> 
                Msec        ->
                String      ->
                OnError_    ->
                Conduit_    ->
                (Control a) -> IO a
  withServer ctx srv tmo add onErr serve act | tmo <= 0  =
    throwIO $ MDPExc "Heartbeat is mandatory"
                                             | otherwise = do
    t <- getCurrentTime
    m <- newMVar t                             -- my  heartbeat 
    h <- newMVar (timeAdd t (tolerance * tmo)) -- his heartbeat
    withStreams ctx srv (1000 * fromIntegral tmo)
                [Poll "client" add DealerT Connect [] []]
                (handleTmo m h tmo) onErr
                (job       m h) $ \c ->
      -- connect message, main loop, disconnect message ------------------
      finally (send c ["client"] (mdpWConnect srv) >> act c)
              (send c ["client"]  mdpWDisconnect)

          -- receiv message -----------------------------------------------    
    where job m h s 
            | getSource s == "client" = mdpServe m h s 
            | otherwise               = return ()
          mdpServe m h s = do
            f <- mdpWRcvReq
            case f of
              WRequest is -> liftIO (updBeat h) >>
                             serve =$= mdpWSndRep is =$ passAll s ["client"]
              WBeat    _  -> liftIO (updBeat h)
              WDisc    _  -> liftIO (throwIO $ BrokerExc 
                                           "Broker disconnects")
              _           -> liftIO (throwIO $ Ouch 
                                   "Unknown frame from Broker!")
            liftIO (handleTmo m h tmo s) -- send heartbeat if it's time

          -- update his heartbeat -----------------------------------------
          updBeat h = modifyMVar_ h $ \_ -> do
                        now <- getCurrentTime
                        return (now `timeAdd` (tolerance * tmo))

  ------------------------------------------------------------------------
  -- Send heartbeat if it's time and check broker's state
  ------------------------------------------------------------------------
  handleTmo :: MVar UTCTime -> MVar UTCTime -> Msec -> Streamer -> IO ()
  handleTmo m h tmo s = do hbPeriodReached m tmo >>= \x -> when x sndHb
                           hbDelay               >>= \x -> when x $ throwIO $
                                                     BrokerExc $ "Missing heartbeat"
    where sndHb   = C.runResourceT $ 
                      streamList [B.empty, mdpW01, xHeartBeat] $$  
                      passAll s ["client"]
          hbDelay = getCurrentTime >>= \now -> 
                        readMVar h >>= \t   -> return (now > t)