{-# LANGUAGE OverloadedStrings #-}
module System.Network.ZMQ.MDP.Worker where

-- libraries
import Data.ByteString.Char8
import Data.Int
import Prelude hiding (putStr, putStrLn)
import qualified Prelude
import qualified System.ZMQ as Z
import System.ZMQ hiding(receive)
import Control.Applicative

import Control.Exception
import Control.Monad
import Data.Time.Clock
import Data.Time.Format
import System.Locale
import Control.Concurrent
import System.Timeout

-- friends
import System.Network.ZMQ.MDP.Util

data Protocol = WORKER_PROTOCOL
instance Show Protocol where
  show WORKER_PROTOCOL = "MDPW01"

type Address = ByteString -- cheaty.

data Response = Response { envelope :: ! [Address],
                           body     :: ! [ByteString] }

data ResponseCode = REPLY | READY | WORKER_HEARTBEAT
instance Show ResponseCode where
  show REPLY      = "\003"
  show READY      = "\001"
  show WORKER_HEARTBEAT  = "\004"

data CommandCode = REQUEST | HEARTBEAT | DISCONNECT
instance Show CommandCode where
  show REQUEST    = "\002"
  show HEARTBEAT  = "\004"
  show DISCONNECT = "\005"

parseCommand ::  ByteString -> Maybe CommandCode
parseCommand "\002" = Just REQUEST
parseCommand "\004" = Just HEARTBEAT
parseCommand "\005" = Just DISCONNECT
parseCommand _ = Nothing

type MDError = ByteString

sendToBroker :: Socket a -> ResponseCode -> [ByteString] ->
                  [ByteString] -> IO ()
sendToBroker sock cmd option message =
  sendAll sock $ ["",
                   pack $ show WORKER_PROTOCOL,
                   pack $ show cmd] ++ option ++ message

{-
Frame 0: Empty frame
Frame 1: "MDPW01" (six bytes, representing MDP/Worker v0.1)
Frame 2: 0x03 (one byte, representing REPLY)
Frame 3: Client address (envelope stack)
Frame 4: Empty (zero bytes, envelope delimiter)
Frames 5+: Reply body (opaque binary)
-}

-- FIXME might we ever want to send a multi-part body?
--
sendResponse :: Socket a -> Response -> IO ()
sendResponse sock resp = sendToBroker sock REPLY (envelope resp) (body resp)

--  sendToBroker sock REPLY Nothing [reply])

whileJust :: Monad m => (b -> m (Maybe b)) -> b -> m b
whileJust action seed = action seed >>=  maybe (return seed) (whileJust action)

start :: WorkerState a -> IO ()
start worker = forever (withBroker readTillDrop worker)

readTillDrop :: Socket a -> WorkerState a1 -> IO (WorkerState a1)
readTillDrop sock worker = whileJust (receive sock) worker


data WorkerState a = WorkerState { heartbeat_at :: ! UTCTime,
                                   liveness     :: ! Int,
                                   heartbeat    :: ! Int64,
                                   reconnect    :: ! Int,
                                   broker       :: String,
                                   context      :: System.ZMQ.Context,
                                   svc          :: ByteString,
                                   handler      :: [ByteString] -> IO [ByteString]
                                 }

epoch :: UTCTime
epoch = buildTime defaultTimeLocale []

lIVENESS :: Int
lIVENESS = 3


withWorker :: String -> ByteString -> ([ByteString] -> IO [ByteString]) -> IO ()
withWorker  broker_ service_ io =
 withContext 1 $ \c ->
 start WorkerState { broker = broker_,
                     context = c,
                     svc = service_,
                     handler = io,
                     liveness = 1,
                     heartbeat_at = epoch,
                     heartbeat = 2,
                     reconnect = 2
                   }

withBroker :: (Socket XReq -> WorkerState a -> IO b) -> WorkerState t -> IO b
withBroker go worker =
  withSocket (context worker) XReq $ \sock -> do
    loggedPut ( "connecting to broker " ++ broker worker)
    connect sock (broker worker)
    sendToBroker sock READY [svc worker] []
    now <- getCurrentTime
    let time = addUTCTime (fromIntegral $ heartbeat worker) now
    loggedPut ("beat at:" ++ show time)
    go sock worker { liveness     = lIVENESS,
                     heartbeat_at = time
                   }

loggedPut :: String -> IO ()
loggedPut _res = return () -- do
--  Prelude.putStr . show =<< getCurrentTime
--  Prelude.putStrLn (": " ++ res)

receive :: Socket a -> WorkerState a1 -> IO (Maybe (WorkerState a2))
receive sock worker = do loggedPut "polling"
                         next <- getMessage
                         case next of
                           Nothing -> loggedPut "no message" >> return Nothing
                           Just w -> loggedPut "message!" >> postCheck w
  where

  getMessage = do
    -- this timeout is different in 3.1
    -- loggedPut $ "Polling socket: should finish in " ++ (show (heartbeat worker)) ++ "seconds"

    [S _ polled] <- poll [S sock In] $ 1000000 * heartbeat worker
    -- use this in 3.1
    -- polled <- timeout (1000000 * fromIntegral (heartbeat worker)) $ Z.receive sock []

    case polled of
      None -> noMessage
      In   -> Z.receive sock [] >>= handleEvent
    -- case polled of
    --   Nothing -> noMessage
    --   Just s   -> handleEvent s

  noMessage :: IO (Maybe (WorkerState b))
  noMessage = do
    let live = liveness worker - 1
    if liveness worker == 0
       then loggedPut "reconnecting" >> threadDelay (1000000 * reconnect worker) >> return Nothing
       else return $ Just worker { liveness = live }
  postCheck :: WorkerState a -> IO (Maybe (WorkerState a))
  postCheck worker = do
      --loggedPut "postcheck"
      time <- {-# SCC "getCurrentTime" #-} getCurrentTime
      -- loggedPut $ "beat at " ++ show (heartbeat_at worker)
      if {-# SCC "time_comparison" #-} time > heartbeat_at worker
        then do --loggedPut "sending heartbeat"
                {-# SCC "postcheck_send" #-} sendToBroker sock WORKER_HEARTBEAT [] []
                --loggedPut "sent heartbeat!"
                {-# SCC "postcheck_return" #-} return $ Just $ updateWorkerTime worker time
        else return $ Just worker

  updateWorkerTime w time =
    w { heartbeat_at = {-# SCC "addtime" #-} addUTCTime (fromIntegral $! heartbeat w) time}

  handleEvent header = do
      let zrecv = Z.receive sock []
      -- loggedPut "handling"
      assert (header == "") (return ())
      prot <- zrecv
      assert (prot == "MDPW01") (return ())
      -- ideally, we'd encapsulate the process of reading
      -- the whole thing in in the parser. this will do for now though.
      command <- parseCommand <$> zrecv
      let new_worker = worker { liveness = lIVENESS }
      case command of
        Just REQUEST -> do
          addresses   <- receiveUntilEmpty sock
          msgs        <- receiveUntilEnd   sock
          replyString <- handler worker msgs
          sendResponse sock Response { envelope = addresses,
                                        body = replyString }
          return $ Just new_worker
        Just HEARTBEAT -> do
          -- loggedPut "handling a heartbeat"
          return $ Just new_worker
        Just DISCONNECT -> do
          -- loggedPut "handling a disconnect"
          return Nothing
        Nothing -> error "borked"