-------------------------------------------------------------------------------
-- |
-- Module     : Network/Mom/Patterns/Broker/Broker.hs
-- Copyright  : (c) Tobias Schoofs
-- License    : LGPL 
-- Stability  : experimental
-- Portability: non-portable
-- 
-- Majordomo Broker
-------------------------------------------------------------------------------
module Network.Mom.Patterns.Broker.Broker (withBroker)
where

  import           Network.Mom.Patterns.Types
  import           Network.Mom.Patterns.Streams
  import           Network.Mom.Patterns.Broker.Common

  import qualified Registry  as R
  import           Heartbeat (hbPeriodReached)     

  import qualified Data.ByteString        as B
  import qualified Data.ByteString.Char8  as BC
  import qualified Data.Conduit           as C
  import           Data.Conduit ((=$), ($$))
  import           Data.Time.Clock

  import           Control.Applicative ((<$>))
  import           Control.Monad.Trans (liftIO)
  import           Control.Monad       (when)
  import           Prelude hiding (catch)
  import           Control.Exception (throwIO)
  import           Control.Concurrent.MVar

  ------------------------------------------------------------------------
  -- | Start a broker as a background process
  -- 
  --   * 'Context'   - The zeromq context
  --  
  --   * 'Service'   - Service name -
  --                   the service name is for debugging only,
  --                   there is no relation whatsoever
  --                   to the service of the Majordomo Protocol.
  --
  --   * 'Msec'      - The heartbeat interval in milliseconds,
  --                   which should be equal 
  --                   for all workers and the broker 
  --
  --   * 'String'    - The address clients connect to
  --
  --   * 'String'    - The address servers connect to
  --
  --   * 'OnError_'  - Error handler
  --  
  --   * 'Control' a - Control action
  ------------------------------------------------------------------------
  withBroker :: Context  -> Service -> Msec -> String -> String -> 
                OnError_ -> (Controller -> IO r)                -> IO r
  withBroker ctx srv tmo aClients aServers onerr ctrl | tmo <= 0  = 
    throwIO $ MDPExc "Heartbeat is mandatory"
                                                      | otherwise = do
    r <- R.newReg tmo
    t <- getCurrentTime
    m <- newMVar t
    withStreams ctx srv (1000 * fromIntegral tmo)
                  [Poll "servers" aServers RouterT Bind [] [],
                   Poll "clients" aClients RouterT Bind [] []]
                  (handleTmo    r m tmo) onerr 
                  (handleStream r m tmo) ctrl 

  ------------------------------------------------------------------------
  -- handle heartbeat:
  -- - remove unresponsive workers 
  -- - heartbeat those that have been inactive for at least one hb period
  ------------------------------------------------------------------------
  handleTmo :: R.Registry -> MVar UTCTime -> Msec -> Streamer -> IO ()
  handleTmo r m tmo s = hbPeriodReached m tmo >>= \x -> 
                          when x $ R.checkWorker r >>= mapM_ sndHb
    where hbM   i   = [i, B.empty, mdpW01, xHeartBeat]
          sndHb i   = C.runResourceT $ streamList (hbM i) $$  
                                       passAll s ["servers"]
 
  ------------------------------------------------------------------------
  -- Handle incoming Streams
  ------------------------------------------------------------------------
  handleStream :: R.Registry -> MVar UTCTime -> Msec -> StreamSink
  handleStream r m x s = 
    let action | getSource s == "clients" = recvClient r s
               | getSource s == "servers" = recvWorker r s
               | otherwise                = return ()
        -- and handle heartbeat afterwards--------------------------------
     in action >> liftIO (handleTmo r m x s)

  ------------------------------------------------------------------------
  -- Receive stream from client
  ------------------------------------------------------------------------
  recvClient :: R.Registry -> StreamSink
  recvClient r s = mdpCRcvReq >>= uncurry go
    where go i sn | hdr sn == mmiHdr = handleMMI i sn
                  | otherwise        = handleReq i sn
          handleReq i sn = do
            mbW <- liftIO $ R.getWorker r sn
            case mbW of
              Nothing -> noWorker sn
              Just w  -> sendRequest w [i] s
          handleMMI i sn | srvc sn /= mmiSrv = C.yield mmiNimpl =$ 
                                                 sendReply sn [i] s
                         | otherwise = do
            mbX <- C.await
            case mbX of
              Nothing -> liftIO (throwIO $ MMIExc 
                                   "No ServiceName in mmi.service request")
              Just x  -> do
                m <- bool2MMI <$> liftIO (R.lookupService r x)
                C.yield m =$ sendReply sn [i] s
          bool2MMI True  = mmiFound
          bool2MMI False = mmiNotFound
          hdr            = B.take 4
          srvc           = B.drop 4
          noWorker sn    = liftIO (throwIO $ BrokerExc $
                                  "No Worker for service " ++ BC.unpack sn)

  ------------------------------------------------------------------------
  -- Receive stream from worker 
  ------------------------------------------------------------------------
  recvWorker :: R.Registry -> StreamSink 
  recvWorker r s = do
    f <- mdpWRcvRep
    case f of
      WBeat  w    -> liftIO (R.updWorkerHb r w) -- update his heartbeat 
      WReady w sn -> liftIO $ R.insert r w sn   -- insert new worker
      WReply w is -> handleReply r w is s       -- handle reply from worker
      WDisc  w    -> liftIO $ R.remove r w      -- disconnect from worker
      _           -> liftIO (throwIO $ Ouch "Unexpected Frame from Worker!")

  ------------------------------------------------------------------------
  -- Handle reply
  ------------------------------------------------------------------------
  handleReply :: R.Registry -> Identity -> [Identity] -> StreamSink
  handleReply r w is s = do
    mbS <- liftIO $ R.getServiceName r w
    case mbS of
      Nothing -> liftIO (throwIO $ ServerExc "Unknown Worker")
      Just sn -> sendReply sn is s >> liftIO (R.freeWorker r w)

  ------------------------------------------------------------------------
  -- Send request to worker
  ------------------------------------------------------------------------
  sendRequest :: Identity -> [Identity] -> StreamSink
  sendRequest w is s = mdpWSndReq w is =$ passAll s ["servers"]

  ------------------------------------------------------------------------
  -- Send reply to client
  ------------------------------------------------------------------------
  sendReply :: B.ByteString -> [Identity] -> StreamSink
  sendReply sn is s = mdpCSndRep sn is =$ passAll s ["clients"]