--
-- Copyright (C) 2012 Noriyuki OHKAWA
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
--     http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
{-# LANGUAGE CPP #-}
-- | Fluent Logger for Haskell
module Network.Fluent.Logger
    ( -- * Logger
      FluentLogger
    , withFluentLogger
    , newFluentLogger
    , closeFluentLogger
      -- * Settings
    , FluentSettings(..)
    , defaultFluentSettings
      -- * Post
    , post
    , postWithTime
    ) where

import qualified Data.ByteString.Char8 as BS ( ByteString, pack, unpack, empty, null)
import qualified Data.ByteString.Lazy as LBS ( ByteString, length )
#if MIN_VERSION_base(4,8,0)
#else
import Control.Applicative ( (<$>) )
import Data.Monoid ( mconcat )
#endif
import qualified Network.Socket as NS
import Network.Socket.Options ( setRecvTimeout, setSendTimeout )
import Network.Socket.ByteString.Lazy ( sendAll, recv )
import Control.Monad ( void, forever, when )
import Control.Concurrent ( ThreadId, killThread, threadDelay )
import Control.Concurrent.STM ( atomically, orElse
                              , TChan, newTChanIO, readTChan, peekTChan, writeTChan
                              , TVar, newTVarIO, readTVar, modifyTVar
                              , TMVar, tryPutTMVar, newEmptyTMVarIO, takeTMVar )
import Control.Exception ( SomeException, AsyncException, Handler(Handler), handle, bracket, throwIO, catches, onException )
import Data.MessagePack
import Data.Serialize hiding (label)
import Data.Int ( Int64 )
import Data.Time.Clock.POSIX ( getPOSIXTime )
import System.Random ( randomRIO )

import Network.Fluent.Logger.Packable
import Network.Fluent.Logger.ForkWrapper (forkIOUnmasked)

-- | Wrap close / sClose (deprecated)
close :: NS.Socket -> IO ()
#if MIN_VERSION_network(2,4,0)
close = NS.close
#else
close = NS.sClose
#endif

-- | Fluent logger settings
--
-- Since 0.1.0.0
--
data FluentSettings =
    FluentSettings
    { fluentSettingsTag :: BS.ByteString
    , fluentSettingsHost :: BS.ByteString
    , fluentSettingsPort :: Int
    , fluentSettingsTimeout :: Double
    , fluentSettingsBufferLimit :: Int64
    }

-- | Default fluent logger settings
--
-- Since 0.1.0.0
--
defaultFluentSettings :: FluentSettings
defaultFluentSettings =
    FluentSettings
    { fluentSettingsTag = BS.empty
    , fluentSettingsHost = BS.pack "localhost"
    , fluentSettingsPort = 24224
    , fluentSettingsTimeout = 3.0
    , fluentSettingsBufferLimit = 1024*1024
    }

-- | Fluent logger
--
-- Since 0.1.0.0
--
data FluentLogger =
    FluentLogger
    { fluentLoggerSender :: FluentLoggerSender
    , fluentLoggerThread :: ThreadId
    }

data FluentLoggerSender =
    FluentLoggerSender
    { fluentLoggerSenderChan :: TChan LBS.ByteString
    , fluentLoggerSenderBuffered :: TVar Int64
    , fluentLoggerSenderSettings :: FluentSettings
    }

getSocket :: BS.ByteString -> Int -> Int64 -> IO NS.Socket
getSocket host port timeout = do
  let hints = NS.defaultHints { NS.addrFlags = [NS.AI_ADDRCONFIG]
                              , NS.addrSocketType = NS.Stream
                              }
  (addr:_) <- NS.getAddrInfo (Just hints) (Just $ BS.unpack host) (Just $ show port)
  sock <- NS.socket (NS.addrFamily addr) (NS.addrSocketType addr) (NS.addrProtocol addr)
  setRecvTimeout sock timeout
  setSendTimeout sock timeout
  let onErr :: SomeException -> IO a
      onErr e = close sock >> throwIO e
  handle onErr $ do
    NS.connect sock $ NS.addrAddress addr
    return sock

type CloseFlag = TMVar ()

setFlagWhenClose :: NS.Socket -> CloseFlag -> IO ()
setFlagWhenClose sock flag = do
  received <- recv sock 256 `onException` setFlag
  if LBS.length received == 0
    then setFlag
    else setFlagWhenClose sock flag
  where
    setFlag = atomically $ void $ tryPutTMVar flag ()

runSender :: FluentLoggerSender -> IO ()
runSender logger = forever $ filterException $ bracket (connectFluent logger) close handleSocket where
  passAsyncException :: AsyncException -> IO a
  passAsyncException e = throwIO e
  dropOtherExceptions :: SomeException -> IO ()
  dropOtherExceptions _ = return ()
  filterException :: IO () -> IO ()
  filterException action = catches action [Handler passAsyncException, Handler dropOtherExceptions]
  handleSocket sock = do
    flag <- newEmptyTMVarIO
    bracket (forkIOUnmasked $ setFlagWhenClose sock flag) killThread (const $ sendFluent logger sock flag)

connectFluent :: FluentLoggerSender -> IO NS.Socket
connectFluent logger = exponentialBackoff $ getSocket host port timeout where
    set = fluentLoggerSenderSettings logger
    host = fluentSettingsHost set
    port = fluentSettingsPort set
    timeout = round $ fluentSettingsTimeout set * 1000000

exponentialBackoff :: IO a -> IO a
exponentialBackoff action = handle (retry 100000) action where
    retry failCount exception =
        let _ = exception :: SomeException
        in exponentialBackoff' failCount
    exponentialBackoff' interval = do
      delay <- randomRIO (interval `div` 2, interval * 3 `div` 2)
      threadDelay delay
      handle (retry $ min 60000000 $ interval * 3 `div` 2) action


data SenderEvent = SenderNewData LBS.ByteString
                 | SenderSocketClose

sendFluent :: FluentLoggerSender -> NS.Socket -> CloseFlag -> IO ()
sendFluent logger sock flag = toSender where
    chan = fluentLoggerSenderChan logger
    buffered = fluentLoggerSenderBuffered logger
    toSender = do
      event <- atomically $ (takeTMVar flag >> return SenderSocketClose) `orElse` (fmap SenderNewData $ peekTChan chan)
      case event of
        SenderSocketClose -> return ()
        SenderNewData entry -> do
          sendAll sock entry
          atomically $ do
            void $ readTChan chan
            modifyTVar buffered (subtract $ LBS.length entry)
          sendFluent logger sock flag

-- | Create a fluent logger
--
-- Since 0.1.0.0
--
newFluentLogger :: FluentSettings -> IO FluentLogger
newFluentLogger set = do
  tchan <- newTChanIO
  tvar <- newTVarIO 0
  let sender = FluentLoggerSender
               { fluentLoggerSenderChan = tchan
               , fluentLoggerSenderBuffered = tvar
               , fluentLoggerSenderSettings = set
               }
  tid <- forkIOUnmasked $ runSender sender
  let logger = FluentLogger
               { fluentLoggerSender = sender
               , fluentLoggerThread = tid
               }
  return logger

-- | Close logger
--
-- Since 0.1.0.0
--
closeFluentLogger :: FluentLogger -> IO ()
closeFluentLogger = killThread . fluentLoggerThread

-- | Create a fluent logger and run given action.
--
-- Since 0.1.0.0
--
withFluentLogger :: FluentSettings -> (FluentLogger -> IO a) -> IO a
withFluentLogger set = bracket (newFluentLogger set) closeFluentLogger

getCurrentEpochTime :: IO Int
getCurrentEpochTime = round <$> getPOSIXTime

-- | Post a message.
--
-- Since 0.2.0.0
--
post :: Packable a => FluentLogger -> BS.ByteString -> a -> IO ()
post logger label obj = do
  time <- getCurrentEpochTime
  postWithTime logger label time obj

-- | Post a message with given time.
--
-- Since 0.2.0.0
--
postWithTime :: Packable a => FluentLogger -> BS.ByteString -> Int -> a -> IO ()
postWithTime logger label time obj = atomically send where
    sender = fluentLoggerSender logger
    set = fluentLoggerSenderSettings sender
    tag = fluentSettingsTag set
    lbl = if BS.null label then tag else mconcat [ tag, BS.pack ".", label ]
    entry = encodeLazy $ ObjectArray [ObjectBinary lbl, ObjectInt (fromIntegral time), pack obj]
    len = LBS.length entry
    chan = fluentLoggerSenderChan sender
    buffered = fluentLoggerSenderBuffered sender
    limit = fluentSettingsBufferLimit set
    send = do
      s <- readTVar buffered
      when (s + len <= limit) $ do
        writeTChan chan entry
        modifyTVar buffered (+ len)