module Network.NSQ.Connection
( defaultConfig
, establish
) where
import Control.Monad.Reader
import Control.Monad.State.Strict
import Data.List
import Data.Maybe
import Prelude hiding (log)
import Network.HostName
import qualified Data.Text as T
import qualified Data.ByteString as BS
import qualified Pipes.Network.TCP as PNT
import qualified Pipes.Attoparsec as PA
import qualified Pipes.Prelude as PP
import Pipes
import Control.Applicative
import Control.Concurrent.Async
import Control.Concurrent.STM
import System.Log.Logger (debugM, errorM, warningM, infoM)
import Network.NSQ.Types (NSQConnection(..), Message, Command, server, port, logName, LogName, identConf)
import qualified Network.NSQ.Types as NSQ
import qualified Network.NSQ.Identify as NSQ
import qualified Network.NSQ.Parser as NSQ
defaultConfig :: String -> IO NSQConnection
defaultConfig serverHost = do
localHost <- T.pack <$> getHostName
let localClientId = T.takeWhile (/= '.') localHost
let localIdent = NSQ.defaultIdentify localClientId localHost
return $ NSQConnection serverHost 4150 "NSQ.Connection." localIdent
establish :: NSQConnection -> TQueue Message -> TQueue Command -> IO ()
establish conn topicQueue reply = PNT.withSocketsDo $
PNT.connect (server conn) (show $ port conn) (\(sock, _) -> do
let send = (log "send" $ logName conn) >-> PNT.toSocket sock
let recv = PNT.fromSocket sock 8192 >-> (log "recv" $ logName conn)
race_
(handleNSQ conn recv send topicQueue)
(runEffect $ handleReply reply >-> showCommand >-> send)
)
handleReply :: (Monad m, MonadIO m) => TQueue Command -> Producer NSQ.Command m ()
handleReply queue = forever $ do
cmd <- liftIO $ atomically $ readTQueue queue
yield cmd
handleNSQ :: (Monad m, MonadIO m) => NSQConnection -> Producer BS.ByteString m () -> Consumer BS.ByteString m () -> TQueue Message -> m ()
handleNSQ sc recv send topicQueue = do
runEffect $ (initialHandshake $ identConf sc) >-> showCommand >-> send
runEffect $ (nsqParserErrorLogging (logName sc) recv) >-> identReply
runEffect $ setupTopic >-> showCommand >-> send
runEffect $ (nsqParserErrorLogging (logName sc) recv) >-> (command (logName sc) topicQueue) >-> showCommand >-> send
return ()
where
initialHandshake im = do
yield $ NSQ.Protocol
yield $ NSQ.Identify im
return ()
setupTopic = do
yield $ NSQ.Sub "glc-gamestate" "netheril.elder.lan." False
yield $ NSQ.Rdy 1
return ()
identReply = do
identR <- await
liftIO $ debugM (logName sc) ("IDENT: " ++ show identR)
return ()
nsqParserErrorLogging :: MonadIO m => LogName -> Producer BS.ByteString m () -> Producer NSQ.Message m ()
nsqParserErrorLogging l producer = do
(result, rest) <- lift $ runStateT (PA.parse NSQ.message) producer
case result of
Nothing -> liftIO $ errorM l "Pipe is exhausted for nsq parser\n"
Just y -> do
case y of
Right x -> (liftIO $ debugM l ("msg: " ++ show x)) >> yield x
Left x -> liftIO $ errorM l (show x)
nsqParserErrorLogging l rest
showCommand :: Monad m => Pipe NSQ.Command BS.ByteString m ()
showCommand = PP.map NSQ.encode
log :: MonadIO m => String -> LogName -> Pipe BS.ByteString BS.ByteString m r
log w l = forever $ do
x <- await
liftIO $ debugM l (w ++ ": " ++ show x)
yield x
command :: (Monad m, MonadIO m) => LogName -> TQueue Message -> Pipe NSQ.Message NSQ.Command m ()
command l topicQueue = forever $ do
msg <- await
case msg of
NSQ.OK -> return ()
NSQ.Heartbeat -> yield $ NSQ.NOP
NSQ.CloseWait -> liftIO $ infoM l ("Error: Server closed queue")
NSQ.Error e -> liftIO $ errorM l ("Error: " ++ show e)
NSQ.Message _ _ _ _ -> liftIO $ atomically $ writeTQueue topicQueue msg
NSQ.CatchAllMessage f m -> liftIO $ warningM l ("Error: Frame - " ++ show f ++ " - Msg - " ++ show m)