{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE OverloadedStrings #-}
module Instrument.Client
( Instrument,
initInstrument,
sampleI,
timeI,
timeI',
timeExI,
TM.time,
TM.timeEx,
submitTime,
incrementI,
countI,
timerMetricName,
stripTimerPrefix,
timerMetricNamePrefix,
packetsKey,
)
where
import Control.Concurrent (forkIO)
import Control.Exception (throw)
import Control.Exception.Safe (MonadCatch, SomeException, tryAny)
import Control.Monad
import Control.Monad.IO.Class
import qualified Data.ByteString.Char8 as B
import Data.IORef
( IORef,
atomicModifyIORef,
newIORef,
readIORef,
)
import Data.List (isPrefixOf, stripPrefix)
import qualified Data.Map as M
import Data.Monoid as Monoid
import qualified Data.SafeCopy as SC
import qualified Data.Text as T
#if MIN_VERSION_hedis(0,12,0)
import Database.Redis as R
#else
import Database.Redis as R hiding (HostName, time)
#endif
import qualified Instrument.Counter as C
import qualified Instrument.Measurement as TM
import qualified Instrument.Sampler as S
import Instrument.Types
import Instrument.Utils
import Network.HostName
initInstrument ::
ConnectInfo ->
InstrumentConfig ->
IO Instrument
initInstrument :: ConnectInfo -> InstrumentConfig -> IO Instrument
initInstrument ConnectInfo
conn InstrumentConfig
cfg = do
Connection
p <- ConnectInfo -> IO Connection
createInstrumentPool ConnectInfo
conn
String
h <- IO String
getHostName
IORef (Map (MetricName, Dimensions) Sampler)
smplrs <- forall a. a -> IO (IORef a)
newIORef forall k a. Map k a
M.empty
IORef (Map (MetricName, Dimensions) Counter)
ctrs <- forall a. a -> IO (IORef a)
newIORef forall k a. Map k a
M.empty
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
indefinitely' forall a b. (a -> b) -> a -> b
$ IORef (Map (MetricName, Dimensions) Sampler)
-> Connection -> InstrumentConfig -> IO ()
submitSamplers IORef (Map (MetricName, Dimensions) Sampler)
smplrs Connection
p InstrumentConfig
cfg
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
indefinitely' forall a b. (a -> b) -> a -> b
$ IORef (Map (MetricName, Dimensions) Counter)
-> Connection -> InstrumentConfig -> IO ()
submitCounters IORef (Map (MetricName, Dimensions) Counter)
ctrs Connection
p InstrumentConfig
cfg
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ String
-> IORef (Map (MetricName, Dimensions) Sampler)
-> IORef (Map (MetricName, Dimensions) Counter)
-> Connection
-> Instrument
I String
h IORef (Map (MetricName, Dimensions) Sampler)
smplrs IORef (Map (MetricName, Dimensions) Counter)
ctrs Connection
p
where
indefinitely' :: IO () -> IO ()
indefinitely' = String -> Int -> IO () -> IO ()
indefinitely String
"Client" (Int -> Int
seconds Int
1)
mkSampledSubmission ::
MetricName ->
Dimensions ->
[Double] ->
IO SubmissionPacket
mkSampledSubmission :: MetricName -> Dimensions -> [Double] -> IO SubmissionPacket
mkSampledSubmission MetricName
nm Dimensions
dims [Double]
vals = do
Double
ts <- IO Double
TM.getTime
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Double -> MetricName -> Payload -> Dimensions -> SubmissionPacket
SP Double
ts MetricName
nm ([Double] -> Payload
Samples [Double]
vals) Dimensions
dims
addHostDimension :: HostName -> Dimensions -> Dimensions
addHostDimension :: String -> Dimensions -> Dimensions
addHostDimension String
host = forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert DimensionName
hostDimension (Text -> DimensionValue
DimensionValue (String -> Text
T.pack String
host))
mkCounterSubmission ::
MetricName ->
Dimensions ->
Integer ->
IO SubmissionPacket
mkCounterSubmission :: MetricName -> Dimensions -> Integer -> IO SubmissionPacket
mkCounterSubmission MetricName
m Dimensions
dims Integer
i = do
Double
ts <- IO Double
TM.getTime
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Double -> MetricName -> Payload -> Dimensions -> SubmissionPacket
SP Double
ts MetricName
m (Integer -> Payload
Counter Integer
i) Dimensions
dims
submitSamplers ::
IORef Samplers ->
Connection ->
InstrumentConfig ->
IO ()
submitSamplers :: IORef (Map (MetricName, Dimensions) Sampler)
-> Connection -> InstrumentConfig -> IO ()
submitSamplers IORef (Map (MetricName, Dimensions) Sampler)
smplrs Connection
rds InstrumentConfig
cfg = do
[((MetricName, Dimensions), Sampler)]
ss <- IORef (Map (MetricName, Dimensions) Sampler)
-> IO [((MetricName, Dimensions), Sampler)]
getSamplers IORef (Map (MetricName, Dimensions) Sampler)
smplrs
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Connection
-> InstrumentConfig -> ((MetricName, Dimensions), Sampler) -> IO ()
flushSampler Connection
rds InstrumentConfig
cfg) [((MetricName, Dimensions), Sampler)]
ss
submitCounters ::
IORef Counters ->
Connection ->
InstrumentConfig ->
IO ()
submitCounters :: IORef (Map (MetricName, Dimensions) Counter)
-> Connection -> InstrumentConfig -> IO ()
submitCounters IORef (Map (MetricName, Dimensions) Counter)
cs Connection
r InstrumentConfig
cfg = do
[((MetricName, Dimensions), Counter)]
ss <- forall k a. Map k a -> [(k, a)]
M.toList forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
`liftM` forall a. IORef a -> IO a
readIORef IORef (Map (MetricName, Dimensions) Counter)
cs
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Connection
-> InstrumentConfig -> ((MetricName, Dimensions), Counter) -> IO ()
flushCounter Connection
r InstrumentConfig
cfg) [((MetricName, Dimensions), Counter)]
ss
submitPacket :: (SC.SafeCopy a) => R.Connection -> MetricName -> Maybe Integer -> a -> IO ()
submitPacket :: forall a.
SafeCopy a =>
Connection -> MetricName -> Maybe Integer -> a -> IO ()
submitPacket Connection
r MetricName
m Maybe Integer
mbound a
sp = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$
forall a. Connection -> Redis a -> IO a
R.runRedis Connection
r forall a b. (a -> b) -> a -> b
$
forall a. RedisTx (Queued a) -> Redis (TxResult a)
R.multiExec forall a b. (a -> b) -> a -> b
$ do
Queued ()
_ <- RedisTx (Queued ())
push
RedisTx (Queued Integer)
rememberKey
where
rk :: ByteString
rk = [ByteString] -> ByteString
B.concat [String -> ByteString
B.pack String
"_sq_", String -> ByteString
B.pack (MetricName -> String
metricName MetricName
m)]
push :: RedisTx (Queued ())
push = case Maybe Integer
mbound of
Just Integer
n -> ByteString -> [ByteString] -> Integer -> RedisTx (Queued ())
lpushBoundedTxn ByteString
rk [forall a. SafeCopy a => a -> ByteString
encodeCompress a
sp] Integer
n
Maybe Integer
Nothing -> (() forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> [ByteString] -> m (f Integer)
R.lpush ByteString
rk [forall a. SafeCopy a => a -> ByteString
encodeCompress a
sp]
rememberKey :: RedisTx (Queued Integer)
rememberKey = forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> [ByteString] -> m (f Integer)
sadd ByteString
packetsKey [ByteString
rk]
packetsKey :: B.ByteString
packetsKey :: ByteString
packetsKey = ByteString
"_sqkeys"
flushCounter ::
Connection ->
InstrumentConfig ->
((MetricName, Dimensions), C.Counter) ->
IO ()
flushCounter :: Connection
-> InstrumentConfig -> ((MetricName, Dimensions), Counter) -> IO ()
flushCounter Connection
r InstrumentConfig
cfg ((MetricName
m, Dimensions
dims), Counter
c) =
Counter -> IO Integer
C.resetCounter Counter
c
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MetricName -> Dimensions -> Integer -> IO SubmissionPacket
mkCounterSubmission MetricName
m Dimensions
dims
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a.
SafeCopy a =>
Connection -> MetricName -> Maybe Integer -> a -> IO ()
submitPacket Connection
r MetricName
m (InstrumentConfig -> Maybe Integer
redisQueueBound InstrumentConfig
cfg)
flushSampler ::
Connection ->
InstrumentConfig ->
((MetricName, Dimensions), S.Sampler) ->
IO ()
flushSampler :: Connection
-> InstrumentConfig -> ((MetricName, Dimensions), Sampler) -> IO ()
flushSampler Connection
r InstrumentConfig
cfg ((MetricName
name, Dimensions
dims), Sampler
sampler) = do
[Double]
vals <- Sampler -> IO [Double]
S.get Sampler
sampler
case [Double]
vals of
[] -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
[Double]
_ -> do
Sampler -> IO ()
S.reset Sampler
sampler
forall a.
SafeCopy a =>
Connection -> MetricName -> Maybe Integer -> a -> IO ()
submitPacket Connection
r MetricName
name (InstrumentConfig -> Maybe Integer
redisQueueBound InstrumentConfig
cfg) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MetricName -> Dimensions -> [Double] -> IO SubmissionPacket
mkSampledSubmission MetricName
name Dimensions
dims [Double]
vals
incrementI ::
(MonadIO m) =>
MetricName ->
HostDimensionPolicy ->
Dimensions ->
Instrument ->
m ()
incrementI :: forall (m :: * -> *).
MonadIO m =>
MetricName
-> HostDimensionPolicy -> Dimensions -> Instrument -> m ()
incrementI MetricName
m HostDimensionPolicy
hostDimPolicy Dimensions
rawDims Instrument
i =
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Counter -> IO ()
C.increment forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MetricName -> Dimensions -> Instrument -> IO Counter
getCounter MetricName
m Dimensions
dims Instrument
i
where
dims :: Dimensions
dims = case HostDimensionPolicy
hostDimPolicy of
HostDimensionPolicy
AddHostDimension -> String -> Dimensions -> Dimensions
addHostDimension (Instrument -> String
hostName Instrument
i) Dimensions
rawDims
HostDimensionPolicy
DoNotAddHostDimension -> Dimensions
rawDims
countI ::
MonadIO m =>
MetricName ->
HostDimensionPolicy ->
Dimensions ->
Int ->
Instrument ->
m ()
countI :: forall (m :: * -> *).
MonadIO m =>
MetricName
-> HostDimensionPolicy -> Dimensions -> Int -> Instrument -> m ()
countI MetricName
m HostDimensionPolicy
hostDimPolicy Dimensions
rawDims Int
n Instrument
i =
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> Counter -> IO ()
C.add Int
n forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MetricName -> Dimensions -> Instrument -> IO Counter
getCounter MetricName
m Dimensions
dims Instrument
i
where
dims :: Dimensions
dims = case HostDimensionPolicy
hostDimPolicy of
HostDimensionPolicy
AddHostDimension -> String -> Dimensions -> Dimensions
addHostDimension (Instrument -> String
hostName Instrument
i) Dimensions
rawDims
HostDimensionPolicy
DoNotAddHostDimension -> Dimensions
rawDims
timeI ::
(MonadIO m) =>
MetricName ->
HostDimensionPolicy ->
Dimensions ->
Instrument ->
m a ->
m a
timeI :: forall (m :: * -> *) a.
MonadIO m =>
MetricName
-> HostDimensionPolicy -> Dimensions -> Instrument -> m a -> m a
timeI MetricName
nm HostDimensionPolicy
hostDimPolicy Dimensions
rawDims = do
forall (m :: * -> *) a.
MonadIO m =>
(a -> m (Maybe (MetricName, HostDimensionPolicy, Dimensions)))
-> Instrument -> m a -> m a
timeI' (forall a b. a -> b -> a
const (forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> Maybe a
Just (MetricName
nm, HostDimensionPolicy
hostDimPolicy, Dimensions
rawDims))))
timeI' ::
(MonadIO m) =>
(a -> m (Maybe (MetricName, HostDimensionPolicy, Dimensions))) ->
Instrument ->
m a ->
m a
timeI' :: forall (m :: * -> *) a.
MonadIO m =>
(a -> m (Maybe (MetricName, HostDimensionPolicy, Dimensions)))
-> Instrument -> m a -> m a
timeI' a -> m (Maybe (MetricName, HostDimensionPolicy, Dimensions))
toMetric Instrument
i m a
act = do
(!Double
secs, !a
res) <- forall (m :: * -> *) a. MonadIO m => m a -> m (Double, a)
TM.time m a
act
Maybe (MetricName, HostDimensionPolicy, Dimensions)
metricMay <- a -> m (Maybe (MetricName, HostDimensionPolicy, Dimensions))
toMetric a
res
case Maybe (MetricName, HostDimensionPolicy, Dimensions)
metricMay of
Just (MetricName
nm, HostDimensionPolicy
hostDimPolicy, Dimensions
rawDims) -> do
forall (m :: * -> *).
MonadIO m =>
MetricName
-> HostDimensionPolicy
-> Dimensions
-> Double
-> Instrument
-> m ()
submitTime MetricName
nm HostDimensionPolicy
hostDimPolicy Dimensions
rawDims Double
secs Instrument
i
Maybe (MetricName, HostDimensionPolicy, Dimensions)
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
forall (m :: * -> *) a. Monad m => a -> m a
return a
res
timeExI ::
(MonadIO m, MonadCatch m) =>
(Either SomeException a -> (MetricName, HostDimensionPolicy, Dimensions)) ->
Instrument ->
m a ->
m a
timeExI :: forall (m :: * -> *) a.
(MonadIO m, MonadCatch m) =>
(Either SomeException a
-> (MetricName, HostDimensionPolicy, Dimensions))
-> Instrument -> m a -> m a
timeExI Either SomeException a
-> (MetricName, HostDimensionPolicy, Dimensions)
toMetric Instrument
i m a
act = do
Either SomeException a
resE <- forall (m :: * -> *) a.
MonadIO m =>
(a -> m (Maybe (MetricName, HostDimensionPolicy, Dimensions)))
-> Instrument -> m a -> m a
timeI' (forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> Maybe a
Just forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either SomeException a
-> (MetricName, HostDimensionPolicy, Dimensions)
toMetric) Instrument
i (forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny m a
act)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a e. Exception e => e -> a
throw forall (f :: * -> *) a. Applicative f => a -> f a
pure Either SomeException a
resE
timerMetricName :: MetricName -> MetricName
timerMetricName :: MetricName -> MetricName
timerMetricName name :: MetricName
name@(MetricName String
nameS) =
if String
timerMetricNamePrefix forall a. Eq a => [a] -> [a] -> Bool
`isPrefixOf` String
nameS
then MetricName
name
else String -> MetricName
MetricName (String
timerMetricNamePrefix forall a. Semigroup a => a -> a -> a
Monoid.<> String
nameS)
stripTimerPrefix :: MetricName -> MetricName
stripTimerPrefix :: MetricName -> MetricName
stripTimerPrefix (MetricName String
n) = case forall a. Eq a => [a] -> [a] -> Maybe [a]
stripPrefix String
timerMetricNamePrefix String
n of
Just String
unprefixed -> String -> MetricName
MetricName String
unprefixed
Maybe String
Nothing -> String -> MetricName
MetricName String
n
timerMetricNamePrefix :: String
timerMetricNamePrefix :: String
timerMetricNamePrefix = String
"time."
submitTime ::
(MonadIO m) =>
MetricName ->
HostDimensionPolicy ->
Dimensions ->
Double ->
Instrument ->
m ()
submitTime :: forall (m :: * -> *).
MonadIO m =>
MetricName
-> HostDimensionPolicy
-> Dimensions
-> Double
-> Instrument
-> m ()
submitTime MetricName
nameRaw HostDimensionPolicy
hostDimPolicy Dimensions
rawDims Double
secs Instrument
i =
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *).
MonadIO m =>
MetricName
-> HostDimensionPolicy
-> Dimensions
-> Double
-> Instrument
-> m ()
sampleI MetricName
nm HostDimensionPolicy
hostDimPolicy Dimensions
rawDims Double
secs Instrument
i
where
nm :: MetricName
nm = MetricName -> MetricName
timerMetricName MetricName
nameRaw
sampleI ::
MonadIO m =>
MetricName ->
HostDimensionPolicy ->
Dimensions ->
Double ->
Instrument ->
m ()
sampleI :: forall (m :: * -> *).
MonadIO m =>
MetricName
-> HostDimensionPolicy
-> Dimensions
-> Double
-> Instrument
-> m ()
sampleI MetricName
name HostDimensionPolicy
hostDimPolicy Dimensions
rawDims Double
v Instrument
i =
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Double -> Sampler -> IO ()
S.sample Double
v forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MetricName -> Dimensions -> Instrument -> IO Sampler
getSampler MetricName
name Dimensions
dims Instrument
i
where
dims :: Dimensions
dims = case HostDimensionPolicy
hostDimPolicy of
HostDimensionPolicy
AddHostDimension -> String -> Dimensions -> Dimensions
addHostDimension (Instrument -> String
hostName Instrument
i) Dimensions
rawDims
HostDimensionPolicy
DoNotAddHostDimension -> Dimensions
rawDims
getCounter :: MetricName -> Dimensions -> Instrument -> IO C.Counter
getCounter :: MetricName -> Dimensions -> Instrument -> IO Counter
getCounter MetricName
nm Dimensions
dims Instrument
i = forall k b. Ord k => IO b -> k -> IORef (Map k b) -> IO b
getRef IO Counter
C.newCounter (MetricName
nm, Dimensions
dims) (Instrument -> IORef (Map (MetricName, Dimensions) Counter)
counters Instrument
i)
getSampler :: MetricName -> Dimensions -> Instrument -> IO S.Sampler
getSampler :: MetricName -> Dimensions -> Instrument -> IO Sampler
getSampler MetricName
name Dimensions
dims Instrument
i = forall k b. Ord k => IO b -> k -> IORef (Map k b) -> IO b
getRef (Int -> IO Sampler
S.new Int
1000) (MetricName
name, Dimensions
dims) (Instrument -> IORef (Map (MetricName, Dimensions) Sampler)
samplers Instrument
i)
getSamplers :: IORef Samplers -> IO [((MetricName, Dimensions), S.Sampler)]
getSamplers :: IORef (Map (MetricName, Dimensions) Sampler)
-> IO [((MetricName, Dimensions), Sampler)]
getSamplers IORef (Map (MetricName, Dimensions) Sampler)
ss = forall k a. Map k a -> [(k, a)]
M.toList forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` forall a. IORef a -> IO a
readIORef IORef (Map (MetricName, Dimensions) Sampler)
ss
getRef :: Ord k => IO b -> k -> IORef (M.Map k b) -> IO b
getRef :: forall k b. Ord k => IO b -> k -> IORef (Map k b) -> IO b
getRef IO b
f k
name IORef (Map k b)
mapRef = do
Map k b
mapRef' <- forall a. IORef a -> IO a
readIORef IORef (Map k b)
mapRef
case forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup k
name Map k b
mapRef' of
Just b
ref -> forall (f :: * -> *) a. Applicative f => a -> f a
pure b
ref
Maybe b
Nothing -> do
b
empty <- IO b
f
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Map k b)
mapRef forall a b. (a -> b) -> a -> b
$ \Map k b
m ->
case forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup k
name Map k b
m of
Maybe b
Nothing ->
let m' :: Map k b
m' = forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert k
name b
empty Map k b
m
in (Map k b
m', b
empty)
Just b
ref -> (Map k b
m, b
ref)
{-# INLINEABLE getRef #-}
lpushBoundedTxn :: B.ByteString -> [B.ByteString] -> Integer -> RedisTx (Queued ())
lpushBoundedTxn :: ByteString -> [ByteString] -> Integer -> RedisTx (Queued ())
lpushBoundedTxn ByteString
k [ByteString]
vs Integer
mx = do
Queued Integer
_ <- forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> [ByteString] -> m (f Integer)
lpush ByteString
k [ByteString]
vs
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (() forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$) (forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> Integer -> Integer -> m (f Status)
ltrim ByteString
k (- Integer
mx) (-Integer
1))