-- -- Copyright (C) 2012 Noriyuki OHKAWA -- -- Licensed under the Apache License, Version 2.0 (the "License"); -- you may not use this file except in compliance with the License. -- You may obtain a copy of the License at -- -- http://www.apache.org/licenses/LICENSE-2.0 -- -- Unless required by applicable law or agreed to in writing, software -- distributed under the License is distributed on an "AS IS" BASIS, -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- See the License for the specific language governing permissions and -- limitations under the License. -- -- | Fluent Logger for Haskell module Network.Fluent.Logger ( -- * Logger FluentLogger , withFluentLogger , newFluentLogger , closeFluentLogger -- * Settings , FluentSettings(..) , defaultFluentSettings -- * Post , post , postWithTime ) where import qualified Data.ByteString.Char8 as BS ( ByteString, pack, unpack, empty, null) import qualified Data.ByteString.Lazy as LBS ( ByteString, length ) import Data.Monoid ( mconcat ) import qualified Network.Socket as NS import Network.Socket.Options ( setRecvTimeout, setSendTimeout ) import Network.Socket.ByteString.Lazy ( sendAll ) import Control.Monad ( void, forever, when ) import Control.Applicative ( (<$>) ) import Control.Concurrent ( ThreadId, forkIO, killThread, threadDelay ) import Control.Concurrent.STM ( atomically , TChan, newTChanIO, readTChan, peekTChan, writeTChan , TVar, newTVarIO, readTVar, modifyTVar ) import Control.Exception ( SomeException, handle, bracket, throwIO ) import Data.MessagePack import Data.Serialize hiding (label) import Data.Int ( Int64 ) import Data.Time.Clock.POSIX ( getPOSIXTime ) import System.Random ( randomRIO ) import Network.Fluent.Logger.Packable -- | Fluent logger settings -- -- Since 0.1.0.0 -- data FluentSettings = FluentSettings { fluentSettingsTag :: BS.ByteString , fluentSettingsHost :: BS.ByteString , fluentSettingsPort :: Int , fluentSettingsTimeout :: Double , fluentSettingsBufferLimit :: Int64 } -- | Default fluent logger settings -- -- Since 0.1.0.0 -- defaultFluentSettings :: FluentSettings defaultFluentSettings = FluentSettings { fluentSettingsTag = BS.empty , fluentSettingsHost = BS.pack "localhost" , fluentSettingsPort = 24224 , fluentSettingsTimeout = 3.0 , fluentSettingsBufferLimit = 1024*1024 } -- | Fluent logger -- -- Since 0.1.0.0 -- data FluentLogger = FluentLogger { fluentLoggerSender :: FluentLoggerSender , fluentLoggerThread :: ThreadId } data FluentLoggerSender = FluentLoggerSender { fluentLoggerSenderChan :: TChan LBS.ByteString , fluentLoggerSenderBuffered :: TVar Int64 , fluentLoggerSenderSettings :: FluentSettings } getSocket :: BS.ByteString -> Int -> Int64 -> IO NS.Socket getSocket host port timeout = do let hints = NS.defaultHints { NS.addrFlags = [NS.AI_ADDRCONFIG] , NS.addrSocketType = NS.Stream } (addr:_) <- NS.getAddrInfo (Just hints) (Just $ BS.unpack host) (Just $ show port) sock <- NS.socket (NS.addrFamily addr) (NS.addrSocketType addr) (NS.addrProtocol addr) setRecvTimeout sock timeout setSendTimeout sock timeout let onErr :: SomeException -> IO a onErr e = NS.sClose sock >> throwIO e handle onErr $ do NS.connect sock $ NS.addrAddress addr return sock runSender :: FluentLoggerSender -> IO () runSender logger = forever $ connectFluent logger >>= sendFluent logger connectFluent :: FluentLoggerSender -> IO NS.Socket connectFluent logger = exponentialBackoff $ getSocket host port timeout where set = fluentLoggerSenderSettings logger host = fluentSettingsHost set port = fluentSettingsPort set timeout = round $ fluentSettingsTimeout set * 1000000 exponentialBackoff :: IO a -> IO a exponentialBackoff action = handle (retry 100000) action where retry failCount exception = let _ = exception :: SomeException in exponentialBackoff' failCount exponentialBackoff' interval = do delay <- randomRIO (interval `div` 2, interval * 3 `div` 2) threadDelay delay handle (retry $ min 60000000 $ interval * 3 `div` 2) action sendFluent :: FluentLoggerSender -> NS.Socket -> IO () sendFluent logger sock = handle (done sock) toSender where chan = fluentLoggerSenderChan logger buffered = fluentLoggerSenderBuffered logger done :: NS.Socket -> SomeException -> IO () done = const . NS.sClose toSender = do entry <- atomically $ peekTChan chan sendAll sock entry atomically $ do void $ readTChan chan modifyTVar buffered (subtract $ LBS.length entry) sendFluent logger sock -- | Create a fluent logger -- -- Since 0.1.0.0 -- newFluentLogger :: FluentSettings -> IO FluentLogger newFluentLogger set = do tchan <- newTChanIO tvar <- newTVarIO 0 let sender = FluentLoggerSender { fluentLoggerSenderChan = tchan , fluentLoggerSenderBuffered = tvar , fluentLoggerSenderSettings = set } tid <- forkIO $ runSender sender let logger = FluentLogger { fluentLoggerSender = sender , fluentLoggerThread = tid } return logger -- | Close logger -- -- Since 0.1.0.0 -- closeFluentLogger :: FluentLogger -> IO () closeFluentLogger = killThread . fluentLoggerThread -- | Create a fluent logger and run given action. -- -- Since 0.1.0.0 -- withFluentLogger :: FluentSettings -> (FluentLogger -> IO a) -> IO a withFluentLogger set = bracket (newFluentLogger set) closeFluentLogger getCurrentEpochTime :: IO Int getCurrentEpochTime = round <$> getPOSIXTime -- | Post a message. -- -- Since 0.2.0.0 -- post :: Packable a => FluentLogger -> BS.ByteString -> a -> IO () post logger label obj = do time <- getCurrentEpochTime postWithTime logger label time obj -- | Post a message with given time. -- -- Since 0.2.0.0 -- postWithTime :: Packable a => FluentLogger -> BS.ByteString -> Int -> a -> IO () postWithTime logger label time obj = atomically send where sender = fluentLoggerSender logger set = fluentLoggerSenderSettings sender tag = fluentSettingsTag set lbl = if BS.null label then tag else mconcat [ tag, BS.pack ".", label ] entry = encodeLazy $ ObjectArray [ObjectBinary lbl, ObjectInt (fromIntegral time), pack obj] len = LBS.length entry chan = fluentLoggerSenderChan sender buffered = fluentLoggerSenderBuffered sender limit = fluentSettingsBufferLimit set send = do s <- readTVar buffered when (s + len <= limit) $ do writeTChan chan entry modifyTVar buffered (+ len)