{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.ZGossip (
    zgossipServer
  , zgossipClient
  , zgossipZRE) where


import Prelude hiding (putStrLn, take)
import Control.Monad hiding (join)
import Control.Monad.IO.Class
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM

import Data.UUID
import qualified Data.Map as M
import qualified Data.Set as S
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy as BL

import Data.ZGossip
import Network.ZRE.Types (API(DoDiscover))
import Network.ZRE.Utils (bshow)

import Network.ZGossip.ZMQ
import Network.ZGossip.Types

import System.ZMQ4.Endpoint

zgossipClient :: Key -> Endpoint -> Endpoint -> (ZGSMsg -> IO ()) -> IO ()
zgossipClient uuid endpoint ourEndpoint handler = do
  gossipQ <- atomically $ newTBQueue 10
  atomically $ mapM_ (writeTBQueue gossipQ) [Hello]
  pa <- async $ forever $ do
    atomically $ writeTBQueue gossipQ $ Publish uuid (pEndpoint ourEndpoint) 600
    -- publish every 50s
    threadDelay $ 1000000*50

  link pa
  zgossipDealer endpoint uuid gossipQ handler

zgossipServer :: Endpoint -> IO ()
zgossipServer endpoint = do
  gossipS <- atomically $ newTVar emptyGossipState

  let expire = forever $ do
        atomically $ modifyTVar gossipS $ \x -> x { gossipPairs = M.mapMaybe ttlUpdate (gossipPairs x) }
        threadDelay 1000000
        where
           ttlUpdate (_, 0)   = Nothing
           ttlUpdate (v, ttl) = Just (v, ttl - 1)

  ea <- async expire
  link ea

  zgossipRouter endpoint (serverHandle gossipS)

serverHandle :: TVar ZGossipState -> Peer -> ZGSCmd -> IO [(Peer, ZGSCmd)]
serverHandle s from Hello = do
  atomically $ modifyTVar s $ \x -> x { gossipPeers = S.insert from (gossipPeers x) }
  st <- atomically $ readTVar s
  dbg ["Hello from", tryUUID from]
  -- send all the k,v pairs to this client
  return [(from, cvtPub pub) | pub <- M.toList $ gossipPairs st ]
serverHandle s from pub@(Publish k v ttl) = do
  atomically $ modifyTVar s $ \x -> x { gossipPairs = M.insert k (v, ttl) (gossipPairs x) }
  st <- atomically $ readTVar s
  dbg ["Publish from", tryUUID from, tryUUID k, "=", v, "( ttl", bshow ttl, ")"]

  -- republish this to all other clients
  return [(to, pub) | to <- M.keys $ gossipPairs st, to /= from ]
serverHandle _ from Ping = do
  dbg ["Ping from", tryUUID from]
  return [(from, PingOk)]
serverHandle _ _ PingOk = return []
serverHandle _ _ Invalid = return []

tryUUID :: B.ByteString -> B.ByteString
tryUUID x = maybe x toASCIIBytes (fromByteString $ BL.fromStrict x)

dbg :: [B.ByteString] -> IO ()
dbg = B.putStrLn . (B.intercalate " ")

-- send DoDiscover ZRE API messages on new Publish message
zgossipZRE :: TBQueue API -> ZGSMsg -> IO ()
zgossipZRE q ZGSMsg{..} = handlePublish zgsCmd
  where handlePublish (Publish k v _) = do
          case fromByteString $ BL.fromStrict k of
            Nothing ->  liftIO $ B.putStrLn "Can't parse zgossip uuid"
            Just uuid -> do
             case parseAttoEndpoint v of
               (Left _err) -> liftIO $ B.putStrLn "Can't parse zgossip endpoint"
               (Right endpoint) -> atomically $ writeTBQueue q (DoDiscover uuid endpoint)
        handlePublish _ = return ()