{-# LANGUAGE OverloadedStrings #-}
module Network.NSQ.Connection
    ( defaultConfig
    , establish

    ) where

import Control.Monad.Reader
import Control.Monad.State.Strict
import Data.List
import Data.Maybe
import Prelude hiding (log)
import Network.HostName

import qualified Data.Text as T
import qualified Data.ByteString as BS

import qualified Pipes.Network.TCP as PNT
import qualified Pipes.Attoparsec as PA
import qualified Pipes.Prelude as PP
import Pipes

import Control.Applicative
import Control.Concurrent.Async
import Control.Concurrent.STM

import System.Log.Logger (debugM, errorM, warningM, infoM)

import Network.NSQ.Types (NSQConnection(..), Message, Command, server, port, logName, LogName, identConf)
import qualified Network.NSQ.Types as NSQ
import qualified Network.NSQ.Identify as NSQ
import qualified Network.NSQ.Parser as NSQ


--
-- Default Config
--
-- TODO: standardize on some sort of logger hierchary (nsq server/topic?)
-- NSQ.[subsystem].[topic].[connection] - message
-- NSQ.[subsystem].[custom] ....
--
defaultConfig :: String -> IO NSQConnection
defaultConfig serverHost = do
    localHost <- T.pack <$> getHostName
    let localClientId = T.takeWhile (/= '.') localHost
    let localIdent = NSQ.defaultIdentify localClientId localHost

    return $ NSQConnection serverHost 4150 "NSQ.Connection." localIdent

--
-- Establish a session with this server
--
-- Detail:
--  * Support connecting to a particular nsqd and doing the needful to establish identification and so forth
--  * Auto-handle heartbeat and all related stuff
--  * A higher layer will handle the message reading/balancing between multiplex nsqd connection for a particular topic/channel
--
establish :: NSQConnection -> TQueue Message -> TQueue Command -> IO ()
establish conn topicQueue reply = PNT.withSocketsDo $
    -- Establish stocket
    PNT.connect (server conn) (show $ port conn) (\(sock, _) -> do

        -- TODO: maybe consider PNT.fromSocketN so that we can adjust fetch size if needed downstream
        let send = (log "send" $ logName conn) >-> PNT.toSocket sock
        let recv = PNT.fromSocket sock 8192 >-> (log "recv" $ logName conn)
        race_
            (handleNSQ conn recv send topicQueue)
            (runEffect $ handleReply reply >-> showCommand >-> send) -- Handles user replies
    )

--
-- NSQ Reply handler
--
handleReply :: (Monad m, MonadIO m) => TQueue Command -> Producer NSQ.Command m ()
handleReply queue = forever $ do
    cmd <- liftIO $ atomically $ readTQueue queue
    yield cmd

--
-- The NSQ handler and protocol
--
handleNSQ :: (Monad m, MonadIO m) => NSQConnection -> Producer BS.ByteString m () -> Consumer BS.ByteString m () -> TQueue Message -> m ()
handleNSQ sc recv send topicQueue = do
    -- Initial handshake to kick off the handshake
    runEffect $ (initialHandshake $ identConf sc) >-> showCommand >-> send

    -- Rest of the handshake process (parsing and dealing with identification)
    runEffect $ (nsqParserErrorLogging (logName sc) recv) >-> identReply

    -- Setup the topic/channel/rdy
    runEffect $ setupTopic >-> showCommand >-> send

    -- Regular nsq streaming
    runEffect $ (nsqParserErrorLogging (logName sc) recv) >-> (command (logName sc) topicQueue) >-> showCommand >-> send

    return ()

    where
        -- Initial Handshake
        initialHandshake im = do
            yield $ NSQ.Protocol
            yield $ NSQ.Identify im
            return ()

        -- TODO; replace it with actual logic
        setupTopic = do
            yield $ NSQ.Sub "glc-gamestate" "netheril.elder.lan." False
            yield $ NSQ.Rdy 1
            return ()

        -- Process the ident reply
        identReply = do
            identR <- await

            -- TODO: do stuff with it
            liftIO $ debugM (logName sc) ("IDENT: " ++ show identR)

            return ()

--
-- Parses incoming nsq messages and emits any errors to a log and keep going
--
nsqParserErrorLogging :: MonadIO m => LogName -> Producer BS.ByteString m () -> Producer NSQ.Message m ()
nsqParserErrorLogging l producer = do
    (result, rest) <- lift $ runStateT (PA.parse NSQ.message) producer

    case result of
        Nothing -> liftIO $ errorM l "Pipe is exhausted for nsq parser\n"
        Just y  -> do
            case y of
                Right x -> (liftIO $ debugM l ("msg: " ++ show x)) >> yield x
                Left x  -> liftIO $ errorM l (show x)
            nsqParserErrorLogging l rest

--
-- Format outbound NSQ Commands
--
showCommand :: Monad m => Pipe NSQ.Command BS.ByteString m ()
showCommand = PP.map NSQ.encode

--
-- Log anything that passes through this stream to a logfile
--
log :: MonadIO m => String -> LogName -> Pipe BS.ByteString BS.ByteString m r
log w l = forever $ do
    x <- await
    liftIO $ debugM l (w ++ ": " ++ show x) -- TODO: need a better way to log raw protocol messages
    yield x

--
-- Do something with the inbound message
--
command :: (Monad m, MonadIO m) => LogName -> TQueue Message -> Pipe NSQ.Message NSQ.Command m ()
command l topicQueue = forever $ do
    msg <- await

    case msg of
        -- TODO: currently no-op
        NSQ.OK                  -> return ()
        NSQ.Heartbeat           -> yield $ NSQ.NOP
        -- TODO: Implement a way to close our connection gracefully
        NSQ.CloseWait           -> liftIO $ infoM l ("Error: Server closed queue")
        -- TODO: should pass it onto the client or have a callback
        NSQ.Error e             -> liftIO $ errorM l ("Error: " ++ show e)
        NSQ.Message _ _ _ _     -> liftIO $ atomically $ writeTQueue topicQueue msg
        -- TODO: should pass it onto the client or have a callback
        NSQ.CatchAllMessage f m -> liftIO $ warningM l ("Error: Frame - " ++ show f ++ " - Msg - " ++ show m)