{-# LANGUAGE BangPatterns      #-}
{-# LANGUAGE CPP               #-}
{-# LANGUAGE OverloadedStrings #-}
module Instrument.Client
    ( Instrument
    , initInstrument
    , sampleI
    , timeI
    , timeI'
    , timeExI
    , TM.time
    , TM.timeEx
    , submitTime
    , incrementI
    , countI
    , timerMetricName
    , stripTimerPrefix
    , timerMetricNamePrefix
    , packetsKey
    ) where

-------------------------------------------------------------------------------
import           Control.Concurrent     (forkIO)
import           Control.Exception      (throw)
import           Control.Exception.Safe (MonadCatch, SomeException, tryAny)
import           Control.Monad
import           Control.Monad.IO.Class
import qualified Data.ByteString.Char8  as B
import           Data.IORef             (IORef, atomicModifyIORef, newIORef,
                                         readIORef)
import           Data.List              (isPrefixOf, stripPrefix)
import qualified Data.Map               as M
import           Data.Monoid            as Monoid
import qualified Data.SafeCopy          as SC
import qualified Data.Text              as T
#if MIN_VERSION_hedis(0,12,0)
import           Database.Redis         as R
#else
import           Database.Redis         as R hiding (HostName, time)
#endif
import           Network.HostName
-------------------------------------------------------------------------------
import qualified Instrument.Counter     as C
import qualified Instrument.Measurement as TM
import qualified Instrument.Sampler     as S
import           Instrument.Types
import           Instrument.Utils
-------------------------------------------------------------------------------



-- | Initialize an instrument for measurement and feeding data into the system.
--
-- The resulting opaque 'Instrument' is meant to be threaded around in
-- your application to be later used in conjunction with 'sample' and
-- 'time'.
initInstrument :: ConnectInfo
               -- ^ Redis connection info
               -> InstrumentConfig
               -- ^ Instrument configuration. Use "def" if you don't have specific needs
               -> IO Instrument
initInstrument :: ConnectInfo -> InstrumentConfig -> IO Instrument
initInstrument ConnectInfo
conn InstrumentConfig
cfg = do
    Connection
p <- ConnectInfo -> IO Connection
createInstrumentPool ConnectInfo
conn
    HostName
h        <- IO HostName
getHostName
    IORef (Map (MetricName, Dimensions) Sampler)
smplrs <- Map (MetricName, Dimensions) Sampler
-> IO (IORef (Map (MetricName, Dimensions) Sampler))
forall a. a -> IO (IORef a)
newIORef Map (MetricName, Dimensions) Sampler
forall k a. Map k a
M.empty
    IORef (Map (MetricName, Dimensions) Counter)
ctrs <- Map (MetricName, Dimensions) Counter
-> IO (IORef (Map (MetricName, Dimensions) Counter))
forall a. a -> IO (IORef a)
newIORef Map (MetricName, Dimensions) Counter
forall k a. Map k a
M.empty
    IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
indefinitely' (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef (Map (MetricName, Dimensions) Sampler)
-> Connection -> InstrumentConfig -> IO ()
submitSamplers IORef (Map (MetricName, Dimensions) Sampler)
smplrs Connection
p InstrumentConfig
cfg
    IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
indefinitely' (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef (Map (MetricName, Dimensions) Counter)
-> Connection -> InstrumentConfig -> IO ()
submitCounters IORef (Map (MetricName, Dimensions) Counter)
ctrs Connection
p InstrumentConfig
cfg
    Instrument -> IO Instrument
forall (m :: * -> *) a. Monad m => a -> m a
return (Instrument -> IO Instrument) -> Instrument -> IO Instrument
forall a b. (a -> b) -> a -> b
$ HostName
-> IORef (Map (MetricName, Dimensions) Sampler)
-> IORef (Map (MetricName, Dimensions) Counter)
-> Connection
-> Instrument
I HostName
h IORef (Map (MetricName, Dimensions) Sampler)
smplrs IORef (Map (MetricName, Dimensions) Counter)
ctrs Connection
p
  where
    indefinitely' :: IO () -> IO ()
indefinitely' = HostName -> Int -> IO () -> IO ()
indefinitely HostName
"Client" (Int -> Int
seconds Int
1)

-------------------------------------------------------------------------------
mkSampledSubmission :: MetricName
                    -> Dimensions
                    -> [Double]
                    -> IO SubmissionPacket
mkSampledSubmission :: MetricName -> Dimensions -> [Double] -> IO SubmissionPacket
mkSampledSubmission MetricName
nm Dimensions
dims [Double]
vals = do
  Double
ts <- IO Double
TM.getTime
  SubmissionPacket -> IO SubmissionPacket
forall (m :: * -> *) a. Monad m => a -> m a
return (SubmissionPacket -> IO SubmissionPacket)
-> SubmissionPacket -> IO SubmissionPacket
forall a b. (a -> b) -> a -> b
$ Double -> MetricName -> Payload -> Dimensions -> SubmissionPacket
SP Double
ts MetricName
nm ([Double] -> Payload
Samples [Double]
vals) Dimensions
dims


-------------------------------------------------------------------------------
addHostDimension :: HostName -> Dimensions -> Dimensions
addHostDimension :: HostName -> Dimensions -> Dimensions
addHostDimension HostName
host = DimensionName -> DimensionValue -> Dimensions -> Dimensions
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert DimensionName
hostDimension (Text -> DimensionValue
DimensionValue (HostName -> Text
T.pack HostName
host))


-------------------------------------------------------------------------------
mkCounterSubmission :: MetricName
                    -> Dimensions
                    -> Int
                    -> IO SubmissionPacket
mkCounterSubmission :: MetricName -> Dimensions -> Int -> IO SubmissionPacket
mkCounterSubmission MetricName
m Dimensions
dims Int
i = do
    Double
ts <- IO Double
TM.getTime
    SubmissionPacket -> IO SubmissionPacket
forall (m :: * -> *) a. Monad m => a -> m a
return (SubmissionPacket -> IO SubmissionPacket)
-> SubmissionPacket -> IO SubmissionPacket
forall a b. (a -> b) -> a -> b
$ Double -> MetricName -> Payload -> Dimensions -> SubmissionPacket
SP Double
ts MetricName
m (Int -> Payload
Counter Int
i) Dimensions
dims


-- | Flush all samplers in Instrument
submitSamplers
  :: IORef Samplers
  -> Connection
  -> InstrumentConfig
  -> IO ()
submitSamplers :: IORef (Map (MetricName, Dimensions) Sampler)
-> Connection -> InstrumentConfig -> IO ()
submitSamplers IORef (Map (MetricName, Dimensions) Sampler)
smplrs Connection
rds InstrumentConfig
cfg = do
  [((MetricName, Dimensions), Sampler)]
ss <- IORef (Map (MetricName, Dimensions) Sampler)
-> IO [((MetricName, Dimensions), Sampler)]
getSamplers IORef (Map (MetricName, Dimensions) Sampler)
smplrs
  (((MetricName, Dimensions), Sampler) -> IO ())
-> [((MetricName, Dimensions), Sampler)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Connection
-> InstrumentConfig -> ((MetricName, Dimensions), Sampler) -> IO ()
flushSampler Connection
rds InstrumentConfig
cfg) [((MetricName, Dimensions), Sampler)]
ss


-- | Flush all samplers in Instrument
submitCounters
  :: IORef Counters
  -> Connection
  -> InstrumentConfig
  -> IO ()
submitCounters :: IORef (Map (MetricName, Dimensions) Counter)
-> Connection -> InstrumentConfig -> IO ()
submitCounters IORef (Map (MetricName, Dimensions) Counter)
cs Connection
r InstrumentConfig
cfg = do
    [((MetricName, Dimensions), Counter)]
ss <- Map (MetricName, Dimensions) Counter
-> [((MetricName, Dimensions), Counter)]
forall k a. Map k a -> [(k, a)]
M.toList (Map (MetricName, Dimensions) Counter
 -> [((MetricName, Dimensions), Counter)])
-> IO (Map (MetricName, Dimensions) Counter)
-> IO [((MetricName, Dimensions), Counter)]
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
`liftM` IORef (Map (MetricName, Dimensions) Counter)
-> IO (Map (MetricName, Dimensions) Counter)
forall a. IORef a -> IO a
readIORef IORef (Map (MetricName, Dimensions) Counter)
cs
    (((MetricName, Dimensions), Counter) -> IO ())
-> [((MetricName, Dimensions), Counter)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Connection
-> InstrumentConfig -> ((MetricName, Dimensions), Counter) -> IO ()
flushCounter Connection
r InstrumentConfig
cfg) [((MetricName, Dimensions), Counter)]
ss


-------------------------------------------------------------------------------
submitPacket :: (SC.SafeCopy a) => R.Connection -> MetricName -> Maybe Integer -> a -> IO ()
submitPacket :: Connection -> MetricName -> Maybe Integer -> a -> IO ()
submitPacket Connection
r MetricName
m Maybe Integer
mbound a
sp = IO (TxResult Integer) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (TxResult Integer) -> IO ()) -> IO (TxResult Integer) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> Redis (TxResult Integer) -> IO (TxResult Integer)
forall a. Connection -> Redis a -> IO a
R.runRedis Connection
r (Redis (TxResult Integer) -> IO (TxResult Integer))
-> Redis (TxResult Integer) -> IO (TxResult Integer)
forall a b. (a -> b) -> a -> b
$ RedisTx (Queued Integer) -> Redis (TxResult Integer)
forall a. RedisTx (Queued a) -> Redis (TxResult a)
R.multiExec (RedisTx (Queued Integer) -> Redis (TxResult Integer))
-> RedisTx (Queued Integer) -> Redis (TxResult Integer)
forall a b. (a -> b) -> a -> b
$ do
  -- Write key with the stat contents
  Queued ()
_ <- RedisTx (Queued ())
push
  -- Remember the key we wrote to so we can retrieve it later without key scanning
  RedisTx (Queued Integer)
rememberKey
  where rk :: ByteString
rk = [ByteString] -> ByteString
B.concat [HostName -> ByteString
B.pack HostName
"_sq_", HostName -> ByteString
B.pack (MetricName -> HostName
metricName MetricName
m)]
        push :: RedisTx (Queued ())
push = case Maybe Integer
mbound of
          Just Integer
n  -> ByteString -> [ByteString] -> Integer -> RedisTx (Queued ())
lpushBoundedTxn ByteString
rk [a -> ByteString
forall a. SafeCopy a => a -> ByteString
encodeCompress a
sp] Integer
n
          Maybe Integer
Nothing -> (() () -> Queued Integer -> Queued ()
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$) (Queued Integer -> Queued ())
-> RedisTx (Queued Integer) -> RedisTx (Queued ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ByteString -> [ByteString] -> RedisTx (Queued Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> [ByteString] -> m (f Integer)
R.lpush ByteString
rk [a -> ByteString
forall a. SafeCopy a => a -> ByteString
encodeCompress a
sp]
        rememberKey :: RedisTx (Queued Integer)
rememberKey = ByteString -> [ByteString] -> RedisTx (Queued Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> [ByteString] -> m (f Integer)
sadd ByteString
packetsKey [ByteString
rk]


-------------------------------------------------------------------------------
-- | A key pointing to a SET of keys with _sq_ prefix, which contain
-- data packets. These are processed by worker.
packetsKey :: B.ByteString
packetsKey :: ByteString
packetsKey = ByteString
"_sqkeys"


-------------------------------------------------------------------------------
-- | Flush given counter to remote service and reset in-memory counter
-- back to 0.
flushCounter
  :: Connection
  -> InstrumentConfig
  -> ((MetricName, Dimensions), C.Counter)
  -> IO ()
flushCounter :: Connection
-> InstrumentConfig -> ((MetricName, Dimensions), Counter) -> IO ()
flushCounter Connection
r InstrumentConfig
cfg ((MetricName
m, Dimensions
dims), Counter
c) =
    Counter -> IO Int
C.resetCounter Counter
c IO Int -> (Int -> IO SubmissionPacket) -> IO SubmissionPacket
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
    MetricName -> Dimensions -> Int -> IO SubmissionPacket
mkCounterSubmission MetricName
m Dimensions
dims IO SubmissionPacket -> (SubmissionPacket -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
    Connection
-> MetricName -> Maybe Integer -> SubmissionPacket -> IO ()
forall a.
SafeCopy a =>
Connection -> MetricName -> Maybe Integer -> a -> IO ()
submitPacket Connection
r MetricName
m (InstrumentConfig -> Maybe Integer
redisQueueBound InstrumentConfig
cfg)


-------------------------------------------------------------------------------
-- | Flush given sampler to remote service and flush in-memory queue
flushSampler
  :: Connection
  -> InstrumentConfig
  -> ((MetricName, Dimensions), S.Sampler)
  -> IO ()
flushSampler :: Connection
-> InstrumentConfig -> ((MetricName, Dimensions), Sampler) -> IO ()
flushSampler Connection
r InstrumentConfig
cfg ((MetricName
name, Dimensions
dims), Sampler
sampler) = do
  [Double]
vals <- Sampler -> IO [Double]
S.get Sampler
sampler
  case [Double]
vals of
    [] -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    [Double]
_ -> do
      Sampler -> IO ()
S.reset Sampler
sampler
      Connection
-> MetricName -> Maybe Integer -> SubmissionPacket -> IO ()
forall a.
SafeCopy a =>
Connection -> MetricName -> Maybe Integer -> a -> IO ()
submitPacket Connection
r MetricName
name (InstrumentConfig -> Maybe Integer
redisQueueBound InstrumentConfig
cfg) (SubmissionPacket -> IO ()) -> IO SubmissionPacket -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MetricName -> Dimensions -> [Double] -> IO SubmissionPacket
mkSampledSubmission MetricName
name Dimensions
dims [Double]
vals


-------------------------------------------------------------------------------
-- | Increment a counter by one. Same as calling 'countI' with 1.
--
-- >>> incrementI \"uploadedFiles\" instr
incrementI
  :: (MonadIO m)
  => MetricName
  -> HostDimensionPolicy
  -> Dimensions
  -> Instrument
  -> m ()
incrementI :: MetricName
-> HostDimensionPolicy -> Dimensions -> Instrument -> m ()
incrementI MetricName
m HostDimensionPolicy
hostDimPolicy Dimensions
rawDims Instrument
i =
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Counter -> IO ()
C.increment (Counter -> IO ()) -> IO Counter -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MetricName -> Dimensions -> Instrument -> IO Counter
getCounter MetricName
m Dimensions
dims Instrument
i
  where
    dims :: Dimensions
dims = case HostDimensionPolicy
hostDimPolicy of
      HostDimensionPolicy
AddHostDimension      -> HostName -> Dimensions -> Dimensions
addHostDimension (Instrument -> HostName
hostName Instrument
i) Dimensions
rawDims
      HostDimensionPolicy
DoNotAddHostDimension -> Dimensions
rawDims


-------------------------------------------------------------------------------
-- | Increment a counter by n.
--
-- >>> countI \"uploadedFiles\" 1 instr
countI
  :: MonadIO m
  => MetricName
  -> HostDimensionPolicy
  -> Dimensions
  -> Int
  -> Instrument
  -> m ()
countI :: MetricName
-> HostDimensionPolicy -> Dimensions -> Int -> Instrument -> m ()
countI MetricName
m HostDimensionPolicy
hostDimPolicy Dimensions
rawDims Int
n Instrument
i =
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> Counter -> IO ()
C.add Int
n (Counter -> IO ()) -> IO Counter -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MetricName -> Dimensions -> Instrument -> IO Counter
getCounter MetricName
m Dimensions
dims Instrument
i
  where
    dims :: Dimensions
dims = case HostDimensionPolicy
hostDimPolicy of
      HostDimensionPolicy
AddHostDimension      -> HostName -> Dimensions -> Dimensions
addHostDimension (Instrument -> HostName
hostName Instrument
i) Dimensions
rawDims
      HostDimensionPolicy
DoNotAddHostDimension -> Dimensions
rawDims


-- | Run a monadic action while measuring its runtime. Push the
-- measurement into the instrument system.
--
-- >>> timeI \"fileUploadTime\" policy dims instr $ uploadFile file
timeI
  :: (MonadIO m)
  => MetricName
  -> HostDimensionPolicy
  -> Dimensions
  -> Instrument
  -> m a
  -> m a
timeI :: MetricName
-> HostDimensionPolicy -> Dimensions -> Instrument -> m a -> m a
timeI MetricName
nm HostDimensionPolicy
hostDimPolicy Dimensions
rawDims = do
  (a -> m (Maybe (MetricName, HostDimensionPolicy, Dimensions)))
-> Instrument -> m a -> m a
forall (m :: * -> *) a.
MonadIO m =>
(a -> m (Maybe (MetricName, HostDimensionPolicy, Dimensions)))
-> Instrument -> m a -> m a
timeI' (m (Maybe (MetricName, HostDimensionPolicy, Dimensions))
-> a -> m (Maybe (MetricName, HostDimensionPolicy, Dimensions))
forall a b. a -> b -> a
const (Maybe (MetricName, HostDimensionPolicy, Dimensions)
-> m (Maybe (MetricName, HostDimensionPolicy, Dimensions))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((MetricName, HostDimensionPolicy, Dimensions)
-> Maybe (MetricName, HostDimensionPolicy, Dimensions)
forall a. a -> Maybe a
Just (MetricName
nm, HostDimensionPolicy
hostDimPolicy, Dimensions
rawDims))))

-- | like timeI but with maximum flexibility: it uses the result and
-- can use the monad to determine the metric name, host dimension
-- policy, and dimensions or even not emit a timing at all. Some use cases include:
--
-- * Emit different metrics or suppress metrics on error
-- * Fetch some dimension info from the environment
timeI'
  :: (MonadIO m)
  => (a -> m (Maybe (MetricName, HostDimensionPolicy, Dimensions)))
  -> Instrument
  -> m a
  -> m a
timeI' :: (a -> m (Maybe (MetricName, HostDimensionPolicy, Dimensions)))
-> Instrument -> m a -> m a
timeI' a -> m (Maybe (MetricName, HostDimensionPolicy, Dimensions))
toMetric Instrument
i m a
act = do
  (!Double
secs, !a
res) <- m a -> m (Double, a)
forall (m :: * -> *) a. MonadIO m => m a -> m (Double, a)
TM.time m a
act
  Maybe (MetricName, HostDimensionPolicy, Dimensions)
metricMay <- a -> m (Maybe (MetricName, HostDimensionPolicy, Dimensions))
toMetric a
res
  case Maybe (MetricName, HostDimensionPolicy, Dimensions)
metricMay of
    Just (MetricName
nm, HostDimensionPolicy
hostDimPolicy, Dimensions
rawDims) -> do
      MetricName
-> HostDimensionPolicy
-> Dimensions
-> Double
-> Instrument
-> m ()
forall (m :: * -> *).
MonadIO m =>
MetricName
-> HostDimensionPolicy
-> Dimensions
-> Double
-> Instrument
-> m ()
submitTime MetricName
nm HostDimensionPolicy
hostDimPolicy Dimensions
rawDims Double
secs Instrument
i
    Maybe (MetricName, HostDimensionPolicy, Dimensions)
Nothing -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
res

-- | Run a monadic action while measuring its runtime. Push the measurement into
-- the instrument system. rethrows exceptions and sends a different Metric on
-- failure
--
-- >>> timeExI \"fileUploadTimeError\" \"fileUploadTime\" policy dims instr $ uploadFile file
timeExI
  :: (MonadIO m, MonadCatch m)
  => (Either SomeException a -> (MetricName, HostDimensionPolicy, Dimensions))
  -> Instrument
  -> m a
  -> m a
timeExI :: (Either SomeException a
 -> (MetricName, HostDimensionPolicy, Dimensions))
-> Instrument -> m a -> m a
timeExI Either SomeException a
-> (MetricName, HostDimensionPolicy, Dimensions)
toMetric Instrument
i m a
act = do
  Either SomeException a
resE <- (Either SomeException a
 -> m (Maybe (MetricName, HostDimensionPolicy, Dimensions)))
-> Instrument
-> m (Either SomeException a)
-> m (Either SomeException a)
forall (m :: * -> *) a.
MonadIO m =>
(a -> m (Maybe (MetricName, HostDimensionPolicy, Dimensions)))
-> Instrument -> m a -> m a
timeI' (Maybe (MetricName, HostDimensionPolicy, Dimensions)
-> m (Maybe (MetricName, HostDimensionPolicy, Dimensions))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (MetricName, HostDimensionPolicy, Dimensions)
 -> m (Maybe (MetricName, HostDimensionPolicy, Dimensions)))
-> (Either SomeException a
    -> Maybe (MetricName, HostDimensionPolicy, Dimensions))
-> Either SomeException a
-> m (Maybe (MetricName, HostDimensionPolicy, Dimensions))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (MetricName, HostDimensionPolicy, Dimensions)
-> Maybe (MetricName, HostDimensionPolicy, Dimensions)
forall a. a -> Maybe a
Just ((MetricName, HostDimensionPolicy, Dimensions)
 -> Maybe (MetricName, HostDimensionPolicy, Dimensions))
-> (Either SomeException a
    -> (MetricName, HostDimensionPolicy, Dimensions))
-> Either SomeException a
-> Maybe (MetricName, HostDimensionPolicy, Dimensions)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either SomeException a
-> (MetricName, HostDimensionPolicy, Dimensions)
toMetric) Instrument
i (m a -> m (Either SomeException a)
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny m a
act)
  (SomeException -> m a)
-> (a -> m a) -> Either SomeException a -> m a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> m a
forall a e. Exception e => e -> a
throw a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either SomeException a
resE

-------------------------------------------------------------------------------
timerMetricName :: MetricName -> MetricName
timerMetricName :: MetricName -> MetricName
timerMetricName name :: MetricName
name@(MetricName HostName
nameS) =
  if HostName
timerMetricNamePrefix HostName -> HostName -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isPrefixOf` HostName
nameS
     then MetricName
name
     else HostName -> MetricName
MetricName (HostName
timerMetricNamePrefix HostName -> HostName -> HostName
forall a. Semigroup a => a -> a -> a
Monoid.<> HostName
nameS)


-------------------------------------------------------------------------------
stripTimerPrefix :: MetricName -> MetricName
stripTimerPrefix :: MetricName -> MetricName
stripTimerPrefix (MetricName HostName
n) = case HostName -> HostName -> Maybe HostName
forall a. Eq a => [a] -> [a] -> Maybe [a]
stripPrefix HostName
timerMetricNamePrefix HostName
n of
  Just HostName
unprefixed -> HostName -> MetricName
MetricName HostName
unprefixed
  Maybe HostName
Nothing         -> HostName -> MetricName
MetricName HostName
n


-------------------------------------------------------------------------------
timerMetricNamePrefix :: String
timerMetricNamePrefix :: HostName
timerMetricNamePrefix = HostName
"time."


-------------------------------------------------------------------------------
-- | Sometimes dimensions are determined within a code block that
-- you're measuring. In that case, you can use 'time' to measure it
-- and when you're ready to submit, use 'submitTime'.
--
-- Also, you may be pulling time details from some external source
-- that you can't measure with 'timeI' yourself.
--
-- Note: for legacy purposes, metric name will have "time." prepended
-- to it.
submitTime
  :: (MonadIO m)
  => MetricName
  -> HostDimensionPolicy
  -> Dimensions
  -> Double
  -- ^ Time in seconds
  -> Instrument
  -> m ()
submitTime :: MetricName
-> HostDimensionPolicy
-> Dimensions
-> Double
-> Instrument
-> m ()
submitTime MetricName
nameRaw HostDimensionPolicy
hostDimPolicy Dimensions
rawDims Double
secs Instrument
i =
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MetricName
-> HostDimensionPolicy
-> Dimensions
-> Double
-> Instrument
-> IO ()
forall (m :: * -> *).
MonadIO m =>
MetricName
-> HostDimensionPolicy
-> Dimensions
-> Double
-> Instrument
-> m ()
sampleI MetricName
nm HostDimensionPolicy
hostDimPolicy Dimensions
rawDims Double
secs Instrument
i
  where
    nm :: MetricName
nm = MetricName -> MetricName
timerMetricName MetricName
nameRaw


-------------------------------------------------------------------------------
-- | Record given measurement under the given label.
--
-- Instrument will automatically capture useful stats like min, max,
-- count, avg, stdev and percentiles within a single flush interval.
--
-- Say we check our upload queue size every minute and record
-- something like:
--
-- >>> sampleI \"uploadQueue\" 27 inst
sampleI
  :: MonadIO m
  => MetricName
  -> HostDimensionPolicy
  -> Dimensions
  -> Double
  -> Instrument
  -> m ()
sampleI :: MetricName
-> HostDimensionPolicy
-> Dimensions
-> Double
-> Instrument
-> m ()
sampleI MetricName
name HostDimensionPolicy
hostDimPolicy Dimensions
rawDims Double
v Instrument
i =
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Double -> Sampler -> IO ()
S.sample Double
v (Sampler -> IO ()) -> IO Sampler -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MetricName -> Dimensions -> Instrument -> IO Sampler
getSampler MetricName
name Dimensions
dims Instrument
i
  where
    dims :: Dimensions
dims = case HostDimensionPolicy
hostDimPolicy of
      HostDimensionPolicy
AddHostDimension      -> HostName -> Dimensions -> Dimensions
addHostDimension (Instrument -> HostName
hostName Instrument
i) Dimensions
rawDims
      HostDimensionPolicy
DoNotAddHostDimension -> Dimensions
rawDims


-------------------------------------------------------------------------------
getCounter :: MetricName -> Dimensions -> Instrument -> IO C.Counter
getCounter :: MetricName -> Dimensions -> Instrument -> IO Counter
getCounter MetricName
nm Dimensions
dims Instrument
i = IO Counter
-> (MetricName, Dimensions)
-> IORef (Map (MetricName, Dimensions) Counter)
-> IO Counter
forall k b. Ord k => IO b -> k -> IORef (Map k b) -> IO b
getRef IO Counter
C.newCounter (MetricName
nm, Dimensions
dims) (Instrument -> IORef (Map (MetricName, Dimensions) Counter)
counters Instrument
i)


-- | Get or create a sampler under given name
getSampler :: MetricName -> Dimensions -> Instrument -> IO S.Sampler
getSampler :: MetricName -> Dimensions -> Instrument -> IO Sampler
getSampler MetricName
name Dimensions
dims Instrument
i = IO Sampler
-> (MetricName, Dimensions)
-> IORef (Map (MetricName, Dimensions) Sampler)
-> IO Sampler
forall k b. Ord k => IO b -> k -> IORef (Map k b) -> IO b
getRef (Int -> IO Sampler
S.new Int
1000) (MetricName
name, Dimensions
dims) (Instrument -> IORef (Map (MetricName, Dimensions) Sampler)
samplers Instrument
i)


-- | Get a list of current samplers present
getSamplers :: IORef Samplers -> IO [((MetricName, Dimensions), S.Sampler)]
getSamplers :: IORef (Map (MetricName, Dimensions) Sampler)
-> IO [((MetricName, Dimensions), Sampler)]
getSamplers IORef (Map (MetricName, Dimensions) Sampler)
ss = Map (MetricName, Dimensions) Sampler
-> [((MetricName, Dimensions), Sampler)]
forall k a. Map k a -> [(k, a)]
M.toList (Map (MetricName, Dimensions) Sampler
 -> [((MetricName, Dimensions), Sampler)])
-> IO (Map (MetricName, Dimensions) Sampler)
-> IO [((MetricName, Dimensions), Sampler)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` IORef (Map (MetricName, Dimensions) Sampler)
-> IO (Map (MetricName, Dimensions) Sampler)
forall a. IORef a -> IO a
readIORef IORef (Map (MetricName, Dimensions) Sampler)
ss


-- | Lookup a 'Ref' by name in the given map.  If no 'Ref' exists
-- under the given name, create a new one, insert it into the map and
-- return it.
getRef :: Ord k => IO b -> k -> IORef (M.Map k b) -> IO b
getRef :: IO b -> k -> IORef (Map k b) -> IO b
getRef IO b
f k
name IORef (Map k b)
mapRef = do
    b
empty <- IO b
f
    IORef (Map k b) -> (Map k b -> (Map k b, b)) -> IO b
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Map k b)
mapRef ((Map k b -> (Map k b, b)) -> IO b)
-> (Map k b -> (Map k b, b)) -> IO b
forall a b. (a -> b) -> a -> b
$ \ Map k b
m ->
        case k -> Map k b -> Maybe b
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup k
name Map k b
m of
            Maybe b
Nothing  -> let m' :: Map k b
m' = k -> b -> Map k b -> Map k b
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert k
name b
empty Map k b
m
                        in (Map k b
m', b
empty)
            Just b
ref -> (Map k b
m, b
ref)
{-# INLINABLE getRef #-}

-- | Bounded version of lpush which truncates *new* data first. This
-- effectively stops accepting data until the queue shrinks below the
-- bound. Occurs in a transaction for composibility with larger transactions.
lpushBoundedTxn :: B.ByteString -> [B.ByteString] -> Integer -> RedisTx (Queued ())
lpushBoundedTxn :: ByteString -> [ByteString] -> Integer -> RedisTx (Queued ())
lpushBoundedTxn ByteString
k [ByteString]
vs Integer
mx = do
  Queued Integer
_ <- ByteString -> [ByteString] -> RedisTx (Queued Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> [ByteString] -> m (f Integer)
lpush ByteString
k [ByteString]
vs
  (Queued Status -> Queued ())
-> RedisTx (Queued Status) -> RedisTx (Queued ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (() () -> Queued Status -> Queued ()
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$) (ByteString -> Integer -> Integer -> RedisTx (Queued Status)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> Integer -> Integer -> m (f Status)
ltrim ByteString
k (-Integer
mx) (-Integer
1))