module Network.Fluent.Logger
(
FluentLogger
, withFluentLogger
, newFluentLogger
, closeFluentLogger
, FluentSettings(..)
, defaultFluentSettings
, post
, postWithTime
) where
import qualified Data.ByteString.Char8 as BS ( ByteString, pack, unpack, empty, null)
import qualified Data.ByteString.Lazy as LBS ( ByteString, length )
import Data.Monoid ( mconcat )
import qualified Network.Socket as NS
import Network.Socket.Options ( setRecvTimeout, setSendTimeout )
import Network.Socket.ByteString.Lazy ( sendAll )
import Control.Monad ( void, forever, when )
import Control.Applicative ( (<$>) )
import Control.Concurrent ( ThreadId, forkIO, killThread, threadDelay )
import Control.Concurrent.STM ( atomically
, TChan, newTChanIO, readTChan, peekTChan, writeTChan
, TVar, newTVarIO, readTVar, modifyTVar )
import Control.Exception ( SomeException, handle, bracket, throwIO )
import Data.MessagePack
import Data.Serialize hiding (label)
import Data.Int ( Int64 )
import Data.Time.Clock.POSIX ( getPOSIXTime )
import System.Random ( randomRIO )
import Network.Fluent.Logger.Packable
data FluentSettings =
FluentSettings
{ fluentSettingsTag :: BS.ByteString
, fluentSettingsHost :: BS.ByteString
, fluentSettingsPort :: Int
, fluentSettingsTimeout :: Double
, fluentSettingsBufferLimit :: Int64
}
defaultFluentSettings :: FluentSettings
defaultFluentSettings =
FluentSettings
{ fluentSettingsTag = BS.empty
, fluentSettingsHost = BS.pack "localhost"
, fluentSettingsPort = 24224
, fluentSettingsTimeout = 3.0
, fluentSettingsBufferLimit = 1024*1024
}
data FluentLogger =
FluentLogger
{ fluentLoggerSender :: FluentLoggerSender
, fluentLoggerThread :: ThreadId
}
data FluentLoggerSender =
FluentLoggerSender
{ fluentLoggerSenderChan :: TChan LBS.ByteString
, fluentLoggerSenderBuffered :: TVar Int64
, fluentLoggerSenderSettings :: FluentSettings
}
getSocket :: BS.ByteString -> Int -> Int64 -> IO NS.Socket
getSocket host port timeout = do
let hints = NS.defaultHints { NS.addrFlags = [NS.AI_ADDRCONFIG]
, NS.addrSocketType = NS.Stream
}
(addr:_) <- NS.getAddrInfo (Just hints) (Just $ BS.unpack host) (Just $ show port)
sock <- NS.socket (NS.addrFamily addr) (NS.addrSocketType addr) (NS.addrProtocol addr)
setRecvTimeout sock timeout
setSendTimeout sock timeout
let onErr :: SomeException -> IO a
onErr e = NS.sClose sock >> throwIO e
handle onErr $ do
NS.connect sock $ NS.addrAddress addr
return sock
runSender :: FluentLoggerSender -> IO ()
runSender logger = forever $ connectFluent logger >>= sendFluent logger
connectFluent :: FluentLoggerSender -> IO NS.Socket
connectFluent logger = exponentialBackoff $ getSocket host port timeout where
set = fluentLoggerSenderSettings logger
host = fluentSettingsHost set
port = fluentSettingsPort set
timeout = round $ fluentSettingsTimeout set * 1000000
exponentialBackoff :: IO a -> IO a
exponentialBackoff action = handle (retry 100000) action where
retry failCount exception =
let _ = exception :: SomeException
in exponentialBackoff' failCount
exponentialBackoff' interval = do
delay <- randomRIO (interval `div` 2, interval * 3 `div` 2)
threadDelay delay
handle (retry $ min 60000000 $ interval * 3 `div` 2) action
sendFluent :: FluentLoggerSender -> NS.Socket -> IO ()
sendFluent logger sock = handle (done sock) toSender where
chan = fluentLoggerSenderChan logger
buffered = fluentLoggerSenderBuffered logger
done :: NS.Socket -> SomeException -> IO ()
done = const . NS.sClose
toSender = do
entry <- atomically $ peekTChan chan
sendAll sock entry
atomically $ do
void $ readTChan chan
modifyTVar buffered (subtract $ LBS.length entry)
sendFluent logger sock
newFluentLogger :: FluentSettings -> IO FluentLogger
newFluentLogger set = do
tchan <- newTChanIO
tvar <- newTVarIO 0
let sender = FluentLoggerSender
{ fluentLoggerSenderChan = tchan
, fluentLoggerSenderBuffered = tvar
, fluentLoggerSenderSettings = set
}
tid <- forkIO $ runSender sender
let logger = FluentLogger
{ fluentLoggerSender = sender
, fluentLoggerThread = tid
}
return logger
closeFluentLogger :: FluentLogger -> IO ()
closeFluentLogger = killThread . fluentLoggerThread
withFluentLogger :: FluentSettings -> (FluentLogger -> IO a) -> IO a
withFluentLogger set = bracket (newFluentLogger set) closeFluentLogger
getCurrentEpochTime :: IO Int
getCurrentEpochTime = round <$> getPOSIXTime
post :: Packable a => FluentLogger -> BS.ByteString -> a -> IO ()
post logger label obj = do
time <- getCurrentEpochTime
postWithTime logger label time obj
postWithTime :: Packable a => FluentLogger -> BS.ByteString -> Int -> a -> IO ()
postWithTime logger label time obj = atomically send where
sender = fluentLoggerSender logger
set = fluentLoggerSenderSettings sender
tag = fluentSettingsTag set
lbl = if BS.null label then tag else mconcat [ tag, BS.pack ".", label ]
entry = encodeLazy $ ObjectArray [ObjectBinary lbl, ObjectInt (fromIntegral time), pack obj]
len = LBS.length entry
chan = fluentLoggerSenderChan sender
buffered = fluentLoggerSenderBuffered sender
limit = fluentSettingsBufferLimit set
send = do
s <- readTVar buffered
when (s + len <= limit) $ do
writeTChan chan entry
modifyTVar buffered (+ len)