module System.Log.Greg (
Configuration(..)
,logMessage
,withGregDo
,defaultConfiguration
) where
import System.Log.PreciseClock
import System.Posix.Clock
import Data.ByteString.Unsafe
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as L
import Data.Binary
import Data.Binary.Put
import Network
import Network.HostName (getHostName)
import System.UUID.V4
import System.IO
import Foreign
#ifdef DEBUG
import Debug.Trace
#endif
import qualified Control.Exception as E
import Control.Concurrent
import Control.Concurrent.STM
import GHC.Conc
import Control.Monad
data Record = Record {
timestamp :: TimeSpec,
message :: B.ByteString
}
data GregState = GregState {
configuration :: Configuration,
records :: TChan Record,
numRecords :: TVar Int,
isDropping :: TVar Bool,
packet :: TMVar [Record]
}
data Configuration = Configuration {
server :: String
,port :: Int
,calibrationPort :: Int
,flushPeriodMs :: Int
,clientId :: String
,maxBufferedRecords :: Int
,useCompression :: Bool
,calibrationPeriodSec :: Int
}
hostname, ourUuid :: B.ByteString
hostname = B.pack $ unsafePerformIO getHostName
ourUuid = repack . runPut . put $ unsafePerformIO uuid
defaultConfiguration :: Configuration
defaultConfiguration = Configuration {
server = "localhost",
port = 5676,
calibrationPort = 5677,
flushPeriodMs = 1000,
clientId = "unknown",
maxBufferedRecords = 100000,
useCompression = True,
calibrationPeriodSec = 10
}
withGregDo :: Configuration -> IO () -> IO ()
withGregDo conf realMain = withSocketsDo $ do
st <- atomically $ do st <- readTVar state
let st' = st{configuration = conf}
writeTVar state $ st'
return st'
let everyMs ms action = forkIO $ forever (action >> threadDelay (1000 * ms))
let safely action label = action `E.catch` \e -> putStrLnT ("Error in " ++ label ++ ": " ++ show (e::E.SomeException))
let safelyEveryMs ms action label = everyMs ms (safely action label)
calTID <- safelyEveryMs (1000*calibrationPeriodSec conf) (initiateCalibrationOnce st) "calibrator"
packTID <- safelyEveryMs ( flushPeriodMs conf) (packRecordsOnce st) "packer"
checkTID <- safelyEveryMs ( flushPeriodMs conf) (checkQueueSize st) "queue size checker"
sendTID <- safelyEveryMs ( flushPeriodMs conf) (sendPacketOnce st) "sender"
realMain
putStrLnT "Flushing remaining messages"
killThread checkTID
atomically $ writeTVar (isDropping st) True
let waitFlush = do
numrs <- atomically $ readTVar (numRecords st)
unless (numrs == 0) $ threadDelay (1000*flushPeriodMs conf) >> waitFlush
waitFlush
killThread packTID
atomically $ putTMVar (packet st) []
let waitSend = do
sent <- atomically $ isEmptyTMVar (packet st)
unless sent $ threadDelay (1000*flushPeriodMs conf) >> waitSend
waitSend
killThread sendTID
killThread calTID
putStrLnT "Shutdown finished."
checkQueueSize :: GregState -> IO ()
checkQueueSize st = do
currsize <- atomically $ readTVar (numRecords st)
let maxrs = maxBufferedRecords (configuration st)
droppingNow <- atomically $ readTVar (isDropping st)
case (droppingNow, currsize > maxrs) of
(True , True) -> putStrLnT ("Still dropping (queue " ++ show currsize ++ ")")
(False, True) -> do putStrLnT ("Started to drop (queue " ++ show currsize ++ ")")
atomically $ writeTVar (isDropping st) True
(True, False) -> do putStrLnT ("Stopped dropping (queue " ++ show currsize ++ ")")
atomically $ writeTVar (isDropping st) False
(False, False) -> return ()
packRecordsOnce :: GregState -> IO ()
packRecordsOnce st = atomically $ do
putStrLnT $ "Packing: reading all messages ..."
rs <- readAtMost (10000::Int)
putStrLnT $ "Packing: reading all messages done (" ++ show (length rs) ++ ")"
unless (null rs) $ do
putStrLnT $ "Packing " ++ show (length rs) ++ " records"
atomModTVar (numRecords st) (\x -> x length rs)
senderAccepted <- tryPutTMVar (packet st) rs
unless senderAccepted retry
putStrLnT "Packing done"
where
readAtMost 0 = return []
readAtMost n = do
empty <- isEmptyTChan (records st)
if empty then return []
else do r <- readTChan (records st)
rest <- readAtMost (n1)
return (r:rest)
sendPacketOnce :: GregState -> IO ()
sendPacketOnce st = atomically $ withWarning "Failed to pack/send records" $ do
rs <- takeTMVar $ packet st
unless (null rs) $ do
let conf = configuration st
putStrLnT "Pushing records"
unsafeIOToSTM $ E.bracket (connectTo (server conf) (PortNumber $ fromIntegral $ port conf)) hClose $ \hdl -> do
putStrLnT "Pushing records - connected"
let msg = formatRecords (configuration st) rs
putStrLnT $ "Snapshotted " ++ show (length rs) ++ " records --> " ++ show (B.length msg) ++ " bytes"
unsafeUseAsCStringLen msg $ \(ptr, len) -> hPutBuf hdl ptr len
hFlush hdl
putStrLnT $ "Pushing records - done"
where
withWarning s t = (t `catchSTM` (\e -> putStrLnT (s ++ ": " ++ show (e::E.SomeException)) >> check False)) `orElse` return ()
formatRecords :: Configuration -> [Record] -> B.ByteString
formatRecords conf records = repack . runPut $ do
putByteString ourUuid
putWord8 0
putWord32le (fromIntegral $ length $ clientId conf)
putByteString (B.pack $ clientId conf)
mapM_ putRecord records
putWord32le 0
putRecord :: Record -> Put
putRecord r = do
putWord32le 1
putWord64le (toNanos64 (timestamp r))
putWord32le(fromIntegral $ B.length hostname)
putByteString hostname
putWord32le (fromIntegral $ B.length (message r))
putByteString (message r)
initiateCalibrationOnce :: GregState -> IO ()
initiateCalibrationOnce st = do
putStrLnT "Initiating calibration"
let conf = configuration st
E.bracket (connectTo (server conf) (PortNumber $ fromIntegral $ calibrationPort conf)) hClose $ \hdl -> do
hSetBuffering hdl NoBuffering
putStrLnT "Calibration - connected"
unsafeUseAsCString ourUuid $ \p -> hPutBuf hdl p 16
allocaBytes 8 $ \pTheirTimestamp -> do
let whenM mp m = mp >>= \v -> when v m
loop = whenM (hSkipBytes hdl 8 pTheirTimestamp) $ do
ts <- preciseTimeSpec
let pOurTimestamp = repack $ runPut $ putWord64le (toNanos64 ts)
unsafeUseAsCString pOurTimestamp $ \ptr -> hPutBuf hdl ptr 8
loop
loop
putStrLnT "Calibration ended - sleeping"
state :: TVar GregState
state = unsafePerformIO $ do rs <- newTChanIO
numrs <- newTVarIO 0
dropping <- newTVarIO False
pkt <- newEmptyTMVarIO
newTVarIO $ GregState defaultConfiguration rs numrs dropping pkt
logMessage :: String -> IO ()
logMessage s = do
t <- preciseTimeSpec
st <- atomically $ readTVar state
shouldDrop <- atomically $ readTVar (isDropping st)
unless shouldDrop $ atomically $ do
writeTChan (records st) (Record {timestamp = t, message = B.pack s})
atomModTVar (numRecords st) (+1)
toNanos64 :: TimeSpec -> Word64
toNanos64 (TimeSpec s ns) = fromIntegral ns + 1000000000*fromIntegral s
hSkipBytes :: Handle -> Int -> Ptr a -> IO Bool
hSkipBytes _ 0 _ = return True
hSkipBytes h n p = do
closed <- hIsEOF h
if closed
then return False
else do skipped <- hGetBuf h p n
if skipped < 0
then return False
else hSkipBytes h (nskipped) p
repack :: L.ByteString -> B.ByteString
repack = B.concat . L.toChunks
atomModTVar :: TVar a -> (a -> a) -> STM ()
atomModTVar var f = readTVar var >>= \val -> writeTVar var (f val)
putStrLnT :: (Monad m) => String -> m ()
#ifdef DEBUG
putStrLnT s = trace s $ return ()
#else
putStrLnT _ = return ()
#endif
#ifdef DEBUG
testFlood :: IO ()
testFlood = withGregDo defaultConfiguration $ forever $ logMessage "Hello"
testSequence :: IO ()
testSequence = withGregDo defaultConfiguration $ mapM_ (\x -> logMessage (show x) >> threadDelay 100000) [1..]
#endif