module Network.Fluent.Logger
(
FluentLogger
, withFluentLogger
, newFluentLogger
, closeFluentLogger
, FluentSettings(..)
, defaultFluentSettings
, post
, postWithTime
) where
import qualified Data.ByteString.Char8 as BS ( ByteString, pack, unpack, empty, null)
import qualified Data.ByteString.Lazy as LBS ( ByteString, length )
#if MIN_VERSION_base(4,8,0)
#else
import Control.Applicative ( (<$>) )
import Data.Monoid ( mconcat )
#endif
import qualified Network.Socket as NS
import Network.Socket.Options ( setRecvTimeout, setSendTimeout )
import Network.Socket.ByteString.Lazy ( sendAll, recv )
import Control.Monad ( void, forever, when )
import Control.Concurrent ( ThreadId, killThread, threadDelay )
import Control.Concurrent.STM ( atomically, orElse
, TChan, newTChanIO, readTChan, peekTChan, writeTChan
, TVar, newTVarIO, readTVar, modifyTVar
, TMVar, tryPutTMVar, newEmptyTMVarIO, takeTMVar )
import Control.Exception ( SomeException, AsyncException, Handler(Handler), handle, bracket, throwIO, catches, onException )
import Data.MessagePack
import Data.Serialize hiding (label)
import Data.Int ( Int64 )
import Data.Time.Clock.POSIX ( getPOSIXTime )
import System.Random ( randomRIO )
import Network.Fluent.Logger.Packable
import Network.Fluent.Logger.ForkWrapper (forkIOUnmasked)
close :: NS.Socket -> IO ()
#if MIN_VERSION_network(2,4,0)
close = NS.close
#else
close = NS.sClose
#endif
data FluentSettings =
FluentSettings
{ fluentSettingsTag :: BS.ByteString
, fluentSettingsHost :: BS.ByteString
, fluentSettingsPort :: Int
, fluentSettingsTimeout :: Double
, fluentSettingsBufferLimit :: Int64
}
defaultFluentSettings :: FluentSettings
defaultFluentSettings =
FluentSettings
{ fluentSettingsTag = BS.empty
, fluentSettingsHost = BS.pack "localhost"
, fluentSettingsPort = 24224
, fluentSettingsTimeout = 3.0
, fluentSettingsBufferLimit = 1024*1024
}
data FluentLogger =
FluentLogger
{ fluentLoggerSender :: FluentLoggerSender
, fluentLoggerThread :: ThreadId
}
data FluentLoggerSender =
FluentLoggerSender
{ fluentLoggerSenderChan :: TChan LBS.ByteString
, fluentLoggerSenderBuffered :: TVar Int64
, fluentLoggerSenderSettings :: FluentSettings
}
getSocket :: BS.ByteString -> Int -> Int64 -> IO NS.Socket
getSocket host port timeout = do
let hints = NS.defaultHints { NS.addrFlags = [NS.AI_ADDRCONFIG]
, NS.addrSocketType = NS.Stream
}
(addr:_) <- NS.getAddrInfo (Just hints) (Just $ BS.unpack host) (Just $ show port)
sock <- NS.socket (NS.addrFamily addr) (NS.addrSocketType addr) (NS.addrProtocol addr)
setRecvTimeout sock timeout
setSendTimeout sock timeout
let onErr :: SomeException -> IO a
onErr e = close sock >> throwIO e
handle onErr $ do
NS.connect sock $ NS.addrAddress addr
return sock
type CloseFlag = TMVar ()
setFlagWhenClose :: NS.Socket -> CloseFlag -> IO ()
setFlagWhenClose sock flag = do
received <- recv sock 256 `onException` setFlag
if LBS.length received == 0
then setFlag
else setFlagWhenClose sock flag
where
setFlag = atomically $ void $ tryPutTMVar flag ()
runSender :: FluentLoggerSender -> IO ()
runSender logger = forever $ filterException $ bracket (connectFluent logger) close handleSocket where
passAsyncException :: AsyncException -> IO a
passAsyncException e = throwIO e
dropOtherExceptions :: SomeException -> IO ()
dropOtherExceptions _ = return ()
filterException :: IO () -> IO ()
filterException action = catches action [Handler passAsyncException, Handler dropOtherExceptions]
handleSocket sock = do
flag <- newEmptyTMVarIO
bracket (forkIOUnmasked $ setFlagWhenClose sock flag) killThread (const $ sendFluent logger sock flag)
connectFluent :: FluentLoggerSender -> IO NS.Socket
connectFluent logger = exponentialBackoff $ getSocket host port timeout where
set = fluentLoggerSenderSettings logger
host = fluentSettingsHost set
port = fluentSettingsPort set
timeout = round $ fluentSettingsTimeout set * 1000000
exponentialBackoff :: IO a -> IO a
exponentialBackoff action = handle (retry 100000) action where
retry failCount exception =
let _ = exception :: SomeException
in exponentialBackoff' failCount
exponentialBackoff' interval = do
delay <- randomRIO (interval `div` 2, interval * 3 `div` 2)
threadDelay delay
handle (retry $ min 60000000 $ interval * 3 `div` 2) action
data SenderEvent = SenderNewData LBS.ByteString
| SenderSocketClose
sendFluent :: FluentLoggerSender -> NS.Socket -> CloseFlag -> IO ()
sendFluent logger sock flag = toSender where
chan = fluentLoggerSenderChan logger
buffered = fluentLoggerSenderBuffered logger
toSender = do
event <- atomically $ (takeTMVar flag >> return SenderSocketClose) `orElse` (fmap SenderNewData $ peekTChan chan)
case event of
SenderSocketClose -> return ()
SenderNewData entry -> do
sendAll sock entry
atomically $ do
void $ readTChan chan
modifyTVar buffered (subtract $ LBS.length entry)
sendFluent logger sock flag
newFluentLogger :: FluentSettings -> IO FluentLogger
newFluentLogger set = do
tchan <- newTChanIO
tvar <- newTVarIO 0
let sender = FluentLoggerSender
{ fluentLoggerSenderChan = tchan
, fluentLoggerSenderBuffered = tvar
, fluentLoggerSenderSettings = set
}
tid <- forkIOUnmasked $ runSender sender
let logger = FluentLogger
{ fluentLoggerSender = sender
, fluentLoggerThread = tid
}
return logger
closeFluentLogger :: FluentLogger -> IO ()
closeFluentLogger = killThread . fluentLoggerThread
withFluentLogger :: FluentSettings -> (FluentLogger -> IO a) -> IO a
withFluentLogger set = bracket (newFluentLogger set) closeFluentLogger
getCurrentEpochTime :: IO Int
getCurrentEpochTime = round <$> getPOSIXTime
post :: Packable a => FluentLogger -> BS.ByteString -> a -> IO ()
post logger label obj = do
time <- getCurrentEpochTime
postWithTime logger label time obj
postWithTime :: Packable a => FluentLogger -> BS.ByteString -> Int -> a -> IO ()
postWithTime logger label time obj = atomically send where
sender = fluentLoggerSender logger
set = fluentLoggerSenderSettings sender
tag = fluentSettingsTag set
lbl = if BS.null label then tag else mconcat [ tag, BS.pack ".", label ]
entry = encodeLazy $ ObjectArray [ObjectBinary lbl, ObjectInt (fromIntegral time), pack obj]
len = LBS.length entry
chan = fluentLoggerSenderChan sender
buffered = fluentLoggerSenderBuffered sender
limit = fluentSettingsBufferLimit set
send = do
s <- readTVar buffered
when (s + len <= limit) $ do
writeTChan chan entry
modifyTVar buffered (+ len)