--
-- Copyright (C) 2012 Noriyuki OHKAWA
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
--     http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE CPP #-}
#if __GLASGOW_HASKELL__ < 706
{-# LANGUAGE DoRec #-}
#else
{-# LANGUAGE RecursiveDo #-}
#endif

-- | Fluent Logger for Haskell
module Network.Fluent.Logger
    ( -- * Logger
      FluentLogger
    , withFluentLogger
    , newFluentLogger
    , closeFluentLogger
      -- * Settings
    , FluentSettings(..)
    , defaultFluentSettings
      -- * Post
    , post
    , postWithTime
    ) where

import qualified Data.ByteString as BS
import Data.ByteString.Char8 ( unpack )
import qualified Data.ByteString.Lazy as LBS
import Data.Monoid ( mconcat )
import qualified Network.Socket as NS
import Network.Socket.Options ( setRecvTimeout, setSendTimeout )
import Network.Socket.ByteString.Lazy ( sendAll )
import Control.Monad ( void, forever, when )
import Control.Applicative ( (<$>) )
import Control.Concurrent ( ThreadId, forkIO, killThread )
import Control.Concurrent.STM ( atomically
                              , TChan, newTChanIO, readTChan, peekTChan, writeTChan
                              , TVar, newTVarIO, readTVar, modifyTVar )
import Control.Exception ( SomeException, handle, bracket, throwIO )
import Data.MessagePack ( Packable, pack )
import Data.Int ( Int64 )
import Data.Time.Clock.POSIX ( getPOSIXTime )

-- | Fluent logger settings
--
-- Since 0.1.0.0
--
data FluentSettings =
    FluentSettings
    { fluentSettingsTag :: BS.ByteString
    , fluentSettingsHost :: BS.ByteString
    , fluentSettingsPort :: Int
    , fluentSettingsTimeout :: Double
    , fluentSettingsBufferLimit :: Int64
    }

-- | Default fluent logger settings
--
-- Since 0.1.0.0
--
defaultFluentSettings :: FluentSettings
defaultFluentSettings =
    FluentSettings
    { fluentSettingsTag = BS.empty
    , fluentSettingsHost = "localhost"
    , fluentSettingsPort = 24224
    , fluentSettingsTimeout = 3.0
    , fluentSettingsBufferLimit = 1024*1024
    }

-- | Fluent logger
--
-- Since 0.1.0.0
--
data FluentLogger =
    FluentLogger
    { fluentLoggerChan :: TChan LBS.ByteString
    , fluentLoggerBuffered :: TVar Int64
    , fluentLoggerSettings :: FluentSettings
    , fluentLoggerThread :: ThreadId
    }

getSocket :: BS.ByteString -> Int -> Int64 -> IO NS.Socket
getSocket host port timeout = do
  let hints = NS.defaultHints { NS.addrFlags = [NS.AI_ADDRCONFIG]
                              , NS.addrSocketType = NS.Stream
                              }
  (addr:_) <- NS.getAddrInfo (Just hints) (Just $ unpack host) (Just $ show port)
  sock <- NS.socket (NS.addrFamily addr) (NS.addrSocketType addr) (NS.addrProtocol addr)
  setRecvTimeout sock timeout
  setSendTimeout sock timeout
  let onErr :: SomeException -> IO a
      onErr e = NS.sClose sock >> throwIO e
  handle onErr $ do
    NS.connect sock $ NS.addrAddress addr
    return sock

sender :: FluentLogger -> IO ()
sender logger = forever $ connectFluent logger >>= sendFluent logger

connectFluent :: FluentLogger -> IO NS.Socket
connectFluent logger = handle (retry logger) (getSocket host port timeout)
    where
      host = fluentSettingsHost $ fluentLoggerSettings logger
      port = fluentSettingsPort $ fluentLoggerSettings logger
      timeout = round $ fluentSettingsTimeout (fluentLoggerSettings logger) * 1000000
      retry :: FluentLogger -> SomeException -> IO NS.Socket
      retry = const . connectFluent

sendFluent :: FluentLogger -> NS.Socket -> IO ()
sendFluent logger sock = handle (done sock) $ do
                           entry <- atomically $ peekTChan chan
                           sendAll sock entry
                           atomically $ do
                             void $ readTChan chan
                             modifyTVar buffered (subtract $ LBS.length entry)
                           sendFluent logger sock
    where
      chan = fluentLoggerChan logger
      buffered = fluentLoggerBuffered logger
      done :: NS.Socket -> SomeException -> IO ()
      done = const . NS.sClose

-- | Create a fluent logger
--
-- Since 0.1.0.0
--
newFluentLogger :: FluentSettings -> IO FluentLogger
newFluentLogger set = do
  tchan <- newTChanIO
  tvar <- newTVarIO 0
  let mkLogger tid = FluentLogger { fluentLoggerChan = tchan
                                  , fluentLoggerBuffered = tvar
                                  , fluentLoggerSettings = set
                                  , fluentLoggerThread = tid
                                  }
  rec logger <- mkLogger <$> forkIO (sender logger)
  return logger

-- | Close logger
--
-- Since 0.1.0.0
--
closeFluentLogger :: FluentLogger -> IO ()
closeFluentLogger = killThread . fluentLoggerThread

-- | Create a fluent logger and run given action.
--
-- Since 0.1.0.0
--
withFluentLogger :: FluentSettings -> (FluentLogger -> IO a) -> IO a
withFluentLogger set = bracket (newFluentLogger set) closeFluentLogger

getCurrentEpochTime :: IO Int
getCurrentEpochTime = round <$> getPOSIXTime

-- | Post a message.
--
-- Since 0.1.0.0
--
post :: Packable a => FluentLogger -> BS.ByteString -> a -> IO ()
post logger label obj = do
  time <- getCurrentEpochTime
  postWithTime logger label time obj

-- | Post a message with given time.
--
-- Since 0.1.0.0
--
postWithTime :: Packable a => FluentLogger -> BS.ByteString -> Int -> a -> IO ()
postWithTime logger label time obj = atomically $ do
                                       s <- readTVar buffered
                                       when (s + len <= limit) $ do
                                         writeTChan chan entry
                                         modifyTVar buffered (+ len)
  where
    tag = fluentSettingsTag $ fluentLoggerSettings logger
    lbl = if BS.null label then tag else mconcat [ tag, ".", label ]
    entry = pack ( lbl, time, obj )
    len = LBS.length entry
    chan = fluentLoggerChan logger
    buffered = fluentLoggerBuffered logger
    limit = fluentSettingsBufferLimit $ fluentLoggerSettings logger