#if __GLASGOW_HASKELL__ < 706
#else
#endif
module Network.Fluent.Logger
(
FluentLogger
, withFluentLogger
, newFluentLogger
, closeFluentLogger
, FluentSettings(..)
, defaultFluentSettings
, post
, postWithTime
) where
import qualified Data.ByteString as BS
import Data.ByteString.Char8 ( unpack )
import qualified Data.ByteString.Lazy as LBS
import Data.Monoid ( mconcat )
import qualified Network.Socket as NS
import Network.Socket.Options ( setRecvTimeout, setSendTimeout )
import Network.Socket.ByteString.Lazy ( sendAll )
import Control.Monad ( void, forever, when )
import Control.Applicative ( (<$>) )
import Control.Concurrent ( ThreadId, forkIO, killThread )
import Control.Concurrent.STM ( atomically
, TChan, newTChanIO, readTChan, peekTChan, writeTChan
, TVar, newTVarIO, readTVar, modifyTVar )
import Control.Exception ( SomeException, handle, bracket, throwIO )
import Data.MessagePack ( Packable, pack )
import Data.Int ( Int64 )
import Data.Time.Clock.POSIX ( getPOSIXTime )
data FluentSettings =
FluentSettings
{ fluentSettingsTag :: BS.ByteString
, fluentSettingsHost :: BS.ByteString
, fluentSettingsPort :: Int
, fluentSettingsTimeout :: Double
, fluentSettingsBufferLimit :: Int64
}
defaultFluentSettings :: FluentSettings
defaultFluentSettings =
FluentSettings
{ fluentSettingsTag = BS.empty
, fluentSettingsHost = "localhost"
, fluentSettingsPort = 24224
, fluentSettingsTimeout = 3.0
, fluentSettingsBufferLimit = 1024*1024
}
data FluentLogger =
FluentLogger
{ fluentLoggerChan :: TChan LBS.ByteString
, fluentLoggerBuffered :: TVar Int64
, fluentLoggerSettings :: FluentSettings
, fluentLoggerThread :: ThreadId
}
getSocket :: BS.ByteString -> Int -> Int64 -> IO NS.Socket
getSocket host port timeout = do
let hints = NS.defaultHints { NS.addrFlags = [NS.AI_ADDRCONFIG]
, NS.addrSocketType = NS.Stream
}
(addr:_) <- NS.getAddrInfo (Just hints) (Just $ unpack host) (Just $ show port)
sock <- NS.socket (NS.addrFamily addr) (NS.addrSocketType addr) (NS.addrProtocol addr)
setRecvTimeout sock timeout
setSendTimeout sock timeout
let onErr :: SomeException -> IO a
onErr e = NS.sClose sock >> throwIO e
handle onErr $ do
NS.connect sock $ NS.addrAddress addr
return sock
sender :: FluentLogger -> IO ()
sender logger = forever $ connectFluent logger >>= sendFluent logger
connectFluent :: FluentLogger -> IO NS.Socket
connectFluent logger = handle (retry logger) (getSocket host port timeout)
where
host = fluentSettingsHost $ fluentLoggerSettings logger
port = fluentSettingsPort $ fluentLoggerSettings logger
timeout = round $ fluentSettingsTimeout (fluentLoggerSettings logger) * 1000000
retry :: FluentLogger -> SomeException -> IO NS.Socket
retry = const . connectFluent
sendFluent :: FluentLogger -> NS.Socket -> IO ()
sendFluent logger sock = handle (done sock) $ do
entry <- atomically $ peekTChan chan
sendAll sock entry
atomically $ do
void $ readTChan chan
modifyTVar buffered (subtract $ LBS.length entry)
sendFluent logger sock
where
chan = fluentLoggerChan logger
buffered = fluentLoggerBuffered logger
done :: NS.Socket -> SomeException -> IO ()
done = const . NS.sClose
newFluentLogger :: FluentSettings -> IO FluentLogger
newFluentLogger set = do
tchan <- newTChanIO
tvar <- newTVarIO 0
let mkLogger tid = FluentLogger { fluentLoggerChan = tchan
, fluentLoggerBuffered = tvar
, fluentLoggerSettings = set
, fluentLoggerThread = tid
}
rec logger <- mkLogger <$> forkIO (sender logger)
return logger
closeFluentLogger :: FluentLogger -> IO ()
closeFluentLogger = killThread . fluentLoggerThread
withFluentLogger :: FluentSettings -> (FluentLogger -> IO a) -> IO a
withFluentLogger set = bracket (newFluentLogger set) closeFluentLogger
getCurrentEpochTime :: IO Int
getCurrentEpochTime = round <$> getPOSIXTime
post :: Packable a => FluentLogger -> BS.ByteString -> a -> IO ()
post logger label obj = do
time <- getCurrentEpochTime
postWithTime logger label time obj
postWithTime :: Packable a => FluentLogger -> BS.ByteString -> Int -> a -> IO ()
postWithTime logger label time obj = atomically $ do
s <- readTVar buffered
when (s + len <= limit) $ do
writeTChan chan entry
modifyTVar buffered (+ len)
where
tag = fluentSettingsTag $ fluentLoggerSettings logger
lbl = if BS.null label then tag else mconcat [ tag, ".", label ]
entry = pack ( lbl, time, obj )
len = LBS.length entry
chan = fluentLoggerChan logger
buffered = fluentLoggerBuffered logger
limit = fluentSettingsBufferLimit $ fluentLoggerSettings logger