{-# LANGUAGE BangPatterns        #-}
{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE RecordWildCards     #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Instrument.Worker
    ( initWorkerCSV
    , initWorkerCSV'
    , initWorkerGraphite
    , initWorkerGraphite'
    , work
    , initWorker
    , AggProcess(..)
    -- * Configuring agg processes
    , AggProcessConfig(..)
    , standardQuantiles
    , noQuantiles
    , quantileMap
    , defAggProcessConfig
    -- * Exported for testing
    , expandDims
    ) where

-------------------------------------------------------------------------------
import           Control.Error
import           Control.Monad
import           Control.Monad.IO.Class
import qualified Data.ByteString.Char8  as B
import           Data.Conduit           (runConduit, (.|))
import qualified Data.Conduit.List      as CL
import           Data.CSV.Conduit
import           Data.Default
import qualified Data.Map               as M
import qualified Data.SafeCopy          as SC
import           Data.Semigroup         as Semigroup
import           Data.Serialize
import qualified Data.Set               as Set
import qualified Data.Text              as T
import qualified Data.Text.IO           as T
import qualified Data.Vector.Unboxed    as V
import           Database.Redis         as R hiding (decode)
import           Network.Socket         as N
import qualified Statistics.Quantile    as Q
import           Statistics.Sample
import           System.IO
import           System.Posix
-------------------------------------------------------------------------------
import           Instrument.Client      (packetsKey, stripTimerPrefix,
                                         timerMetricName)
import qualified Instrument.Measurement as TM
import           Instrument.Types
import           Instrument.Utils
-------------------------------------------------------------------------------



-------------------------------------------------------------------------------
-- | A CSV backend to store aggregation results in a CSV
initWorkerCSV
  :: ConnectInfo
  -> FilePath
  -- ^ Target file name
  -> Int
  -- ^ Aggregation period / flush interval in seconds
  -> AggProcessConfig
  -> IO ()
initWorkerCSV :: ConnectInfo -> FilePath -> Int -> AggProcessConfig -> IO ()
initWorkerCSV ConnectInfo
conn FilePath
fp Int
n AggProcessConfig
cfg =
  FilePath -> ConnectInfo -> Int -> AggProcess -> IO ()
initWorker FilePath
"CSV Worker" ConnectInfo
conn Int
n (AggProcess -> IO ()) -> IO AggProcess -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< FilePath -> AggProcessConfig -> IO AggProcess
initWorkerCSV' FilePath
fp AggProcessConfig
cfg


-------------------------------------------------------------------------------
-- | Create an AggProcess that dumps to CSV. Use this to compose with
-- other AggProcesses
initWorkerCSV'
  :: FilePath
  -- ^ Target file name
  -> AggProcessConfig
  -> IO AggProcess
initWorkerCSV' :: FilePath -> AggProcessConfig -> IO AggProcess
initWorkerCSV' FilePath
fp AggProcessConfig
cfg = do
  !Bool
res <- FilePath -> IO Bool
fileExist FilePath
fp
  !Handle
h <- FilePath -> IOMode -> IO Handle
openFile FilePath
fp IOMode
AppendMode
  Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
LineBuffering
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
res (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    Handle -> Text -> IO ()
T.hPutStrLn Handle
h (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ CSVSettings -> [Text] -> Text
forall s r. CSV s r => CSVSettings -> r -> s
rowToStr CSVSettings
defCSVSettings ([Text] -> Text)
-> (Map Text Text -> [Text]) -> Map Text Text -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Map Text Text -> [Text]
forall k a. Map k a -> [k]
M.keys (Map Text Text -> Text) -> Map Text Text -> Text
forall a b. (a -> b) -> a -> b
$ Aggregated -> Map Text Text
aggToCSV Aggregated
forall a. Default a => a
def
  AggProcess -> IO AggProcess
forall (m :: * -> *) a. Monad m => a -> m a
return (AggProcess -> IO AggProcess) -> AggProcess -> IO AggProcess
forall a b. (a -> b) -> a -> b
$ Handle -> AggProcessConfig -> AggProcess
putAggregateCSV Handle
h AggProcessConfig
cfg


-------------------------------------------------------------------------------
-- | Initialize a Graphite backend
initWorkerGraphite
    :: ConnectInfo
    -- ^ Redis connection
    -> Int
    -- ^ Aggregation period / flush interval in seconds
    -> HostName
    -- ^ Graphite host
    -> Int
    -- ^ Graphite port
    -> AggProcessConfig
    -> IO ()
initWorkerGraphite :: ConnectInfo -> Int -> FilePath -> Int -> AggProcessConfig -> IO ()
initWorkerGraphite ConnectInfo
conn Int
n FilePath
server Int
port AggProcessConfig
cfg =
    FilePath -> ConnectInfo -> Int -> AggProcess -> IO ()
initWorker FilePath
"Graphite Worker" ConnectInfo
conn Int
n (AggProcess -> IO ()) -> IO AggProcess -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< FilePath -> Int -> AggProcessConfig -> IO AggProcess
initWorkerGraphite' FilePath
server Int
port AggProcessConfig
cfg


-------------------------------------------------------------------------------
-- | Crete an AggProcess that dumps to graphite. Use this to compose
-- with other AggProcesses
initWorkerGraphite'
    :: HostName
    -- ^ Graphite host
    -> Int
    -- ^ Graphite port
    -> AggProcessConfig
    -> IO AggProcess
initWorkerGraphite' :: FilePath -> Int -> AggProcessConfig -> IO AggProcess
initWorkerGraphite' FilePath
server Int
port AggProcessConfig
cfg = do
  AddrInfo
addr <- FilePath -> PortNumber -> IO AddrInfo
resolve FilePath
server (Int -> PortNumber
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
port)
  Socket
sock <- AddrInfo -> IO Socket
open AddrInfo
addr
  Handle
h <- Socket -> IOMode -> IO Handle
N.socketToHandle Socket
sock IOMode
ReadWriteMode
  Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
LineBuffering
  AggProcess -> IO AggProcess
forall (m :: * -> *) a. Monad m => a -> m a
return (AggProcess -> IO AggProcess) -> AggProcess -> IO AggProcess
forall a b. (a -> b) -> a -> b
$ Handle -> AggProcessConfig -> AggProcess
putAggregateGraphite Handle
h AggProcessConfig
cfg
  where
    portNumberToServiceName :: N.PortNumber -> N.ServiceName
    portNumberToServiceName :: PortNumber -> FilePath
portNumberToServiceName = PortNumber -> FilePath
forall a. Show a => a -> FilePath
show
    resolve :: FilePath -> PortNumber -> IO AddrInfo
resolve FilePath
host PortNumber
portNumber = do
      let hints :: AddrInfo
hints = AddrInfo
N.defaultHints { addrSocketType :: SocketType
N.addrSocketType = SocketType
N.Stream }
      AddrInfo
addr:[AddrInfo]
_ <- Maybe AddrInfo -> Maybe FilePath -> Maybe FilePath -> IO [AddrInfo]
N.getAddrInfo
        (AddrInfo -> Maybe AddrInfo
forall a. a -> Maybe a
Just AddrInfo
hints)
        (FilePath -> Maybe FilePath
forall a. a -> Maybe a
Just FilePath
host)
        (FilePath -> Maybe FilePath
forall a. a -> Maybe a
Just (PortNumber -> FilePath
portNumberToServiceName PortNumber
portNumber))
      AddrInfo -> IO AddrInfo
forall (m :: * -> *) a. Monad m => a -> m a
return AddrInfo
addr
    open :: AddrInfo -> IO Socket
open AddrInfo
addr = do
      Socket
sock <- Family -> SocketType -> ProtocolNumber -> IO Socket
N.socket
        (AddrInfo -> Family
N.addrFamily AddrInfo
addr)
        (AddrInfo -> SocketType
N.addrSocketType AddrInfo
addr)
        (AddrInfo -> ProtocolNumber
N.addrProtocol AddrInfo
addr)
      Socket -> SockAddr -> IO ()
N.connect Socket
sock (AddrInfo -> SockAddr
N.addrAddress AddrInfo
addr)
      Socket -> IO Socket
forall (m :: * -> *) a. Monad m => a -> m a
return Socket
sock


-------------------------------------------------------------------------------
-- | Generic utility for making worker backends. Will retry
-- indefinitely with exponential backoff.
initWorker :: String -> ConnectInfo -> Int -> AggProcess -> IO ()
initWorker :: FilePath -> ConnectInfo -> Int -> AggProcess -> IO ()
initWorker FilePath
wname ConnectInfo
conn Int
n AggProcess
f = do
    Connection
p <- ConnectInfo -> IO Connection
createInstrumentPool ConnectInfo
conn
    IO () -> IO ()
indefinitely' (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> Int -> AggProcess -> IO ()
work Connection
p Int
n AggProcess
f
  where
    indefinitely' :: IO () -> IO ()
indefinitely' = FilePath -> Int -> IO () -> IO ()
indefinitely FilePath
wname (Int -> Int
seconds Int
n)



-------------------------------------------------------------------------------
-- | Extract statistics out of the given sample for this flush period
mkStats :: Set.Set Quantile -> Sample -> Stats
mkStats :: Set Quantile -> Sample -> Stats
mkStats Set Quantile
qs Sample
s = Stats :: Double
-> Double
-> Int
-> Double
-> Double
-> Double
-> Double
-> Double
-> Double
-> Map Int Double
-> Stats
Stats { smean :: Double
smean = Sample -> Double
forall (v :: * -> *). Vector v Double => v Double -> Double
mean Sample
s
                     , ssum :: Double
ssum = Sample -> Double
forall a. (Unbox a, Num a) => Vector a -> a
V.sum Sample
s
                     , scount :: Int
scount = Sample -> Int
forall a. Unbox a => Vector a -> Int
V.length Sample
s
                     , smax :: Double
smax = Sample -> Double
forall a. (Unbox a, Ord a) => Vector a -> a
V.maximum Sample
s
                     , smin :: Double
smin = Sample -> Double
forall a. (Unbox a, Ord a) => Vector a -> a
V.minimum Sample
s
                     , srange :: Double
srange = Sample -> Double
forall (v :: * -> *). Vector v Double => v Double -> Double
range Sample
s
                     , sstdev :: Double
sstdev = Sample -> Double
forall (v :: * -> *). Vector v Double => v Double -> Double
stdDev Sample
s
                     , sskewness :: Double
sskewness = Sample -> Double
forall (v :: * -> *). Vector v Double => v Double -> Double
skewness Sample
s
                     , skurtosis :: Double
skurtosis = Sample -> Double
forall (v :: * -> *). Vector v Double => v Double -> Double
kurtosis Sample
s
                     , squantiles :: Map Int Double
squantiles = Map Int Double
quantiles }
  where
    quantiles :: Map Int Double
quantiles = [(Int, Double)] -> Map Int Double
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList (Int -> Int -> (Int, Double)
mkQ Int
100 (Int -> (Int, Double))
-> (Quantile -> Int) -> Quantile -> (Int, Double)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Quantile -> Int
quantile (Quantile -> (Int, Double)) -> [Quantile] -> [(Int, Double)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Set Quantile -> [Quantile]
forall a. Set a -> [a]
Set.toList Set Quantile
qs)
    mkQ :: Int -> Int -> (Int, Double)
mkQ Int
mx Int
i = (Int
i, Int -> Int -> Sample -> Double
forall (v :: * -> *).
Vector v Double =>
Int -> Int -> v Double -> Double
Q.weightedAvg Int
i Int
mx Sample
s)


-------------------------------------------------------------------------------
-- | Go over all pending stats buffers in redis.
work :: R.Connection -> Int -> AggProcess -> IO ()
work :: Connection -> Int -> AggProcess -> IO ()
work Connection
r Int
n AggProcess
f = Connection -> Redis () -> IO ()
forall a. Connection -> Redis a -> IO a
runRedis Connection
r (Redis () -> IO ()) -> Redis () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    FilePath -> Redis ()
forall (m :: * -> *). Monad m => FilePath -> m ()
dbg FilePath
"entered work block"
    Integer
estimate <- (Reply -> Integer)
-> (Integer -> Integer) -> Either Reply Integer -> Integer
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Integer -> Reply -> Integer
forall a b. a -> b -> a
const Integer
0) Integer -> Integer
forall a. a -> a
id (Either Reply Integer -> Integer)
-> Redis (Either Reply Integer) -> Redis Integer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ByteString -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f Integer)
scard ByteString
packetsKey
    ConduitT () Void Redis () -> Redis ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void Redis () -> Redis ())
-> ConduitT () Void Redis () -> Redis ()
forall a b. (a -> b) -> a -> b
$
      (Integer -> Redis (Maybe (ByteString, Integer)))
-> Integer -> ConduitT () ByteString Redis ()
forall (m :: * -> *) b a i.
Monad m =>
(b -> m (Maybe (a, b))) -> b -> ConduitT i a m ()
CL.unfoldM Integer -> Redis (Maybe (ByteString, Integer))
forall b (m :: * -> *) a.
(Ord b, Num b, RedisCtx m (Either a)) =>
b -> m (Maybe (ByteString, b))
nextKey Integer
estimate ConduitT () ByteString Redis ()
-> ConduitM ByteString Void Redis () -> ConduitT () Void Redis ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.|
        (ByteString -> Redis ()) -> ConduitM ByteString Void Redis ()
forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
CL.mapM_ (Int -> AggProcess -> ByteString -> Redis ()
processSampler Int
n AggProcess
f)
  where
    nextKey :: b -> m (Maybe (ByteString, b))
nextKey b
estRemaining
      | b
estRemaining b -> b -> Bool
forall a. Ord a => a -> a -> Bool
> b
0 = do
         Either a (Maybe ByteString)
mk <- ByteString -> m (Either a (Maybe ByteString))
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f (Maybe ByteString))
spop ByteString
packetsKey
         Maybe (ByteString, b) -> m (Maybe (ByteString, b))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (ByteString, b) -> m (Maybe (ByteString, b)))
-> Maybe (ByteString, b) -> m (Maybe (ByteString, b))
forall a b. (a -> b) -> a -> b
$ case Either a (Maybe ByteString)
mk of
           Right (Just ByteString
k) -> (ByteString, b) -> Maybe (ByteString, b)
forall a. a -> Maybe a
Just (ByteString
k, b
estRemaining b -> b -> b
forall a. Num a => a -> a -> a
- b
1)
           Either a (Maybe ByteString)
_              -> Maybe (ByteString, b)
forall a. Maybe a
Nothing
      | Bool
otherwise = Maybe (ByteString, b) -> m (Maybe (ByteString, b))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (ByteString, b)
forall a. Maybe a
Nothing


-------------------------------------------------------------------------------
processSampler
    :: Int
    -- ^ Flush interval - determines resolution
    -> AggProcess
    -- ^ What to do with aggregation results
    -> B.ByteString
    -- ^ Redis buffer for this metric
    -> Redis ()
processSampler :: Int -> AggProcess -> ByteString -> Redis ()
processSampler Int
n (AggProcess AggProcessConfig
cfg Aggregated -> Redis ()
f) ByteString
k = do
  [SubmissionPacket]
packets <- ByteString -> Redis [SubmissionPacket]
forall a. (Serialize a, SafeCopy a) => ByteString -> Redis [a]
popLAll ByteString
k
  case [SubmissionPacket]
packets of
    [] -> () -> Redis ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    [SubmissionPacket]
_ -> do
      let nm :: MetricName
nm = SubmissionPacket -> MetricName
spName (SubmissionPacket -> MetricName)
-> ([SubmissionPacket] -> SubmissionPacket)
-> [SubmissionPacket]
-> MetricName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [SubmissionPacket] -> SubmissionPacket
forall a. [a] -> a
head ([SubmissionPacket] -> MetricName)
-> [SubmissionPacket] -> MetricName
forall a b. (a -> b) -> a -> b
$ [SubmissionPacket]
packets
          -- with and without timer prefix
          qs :: Set Quantile
qs = MetricName -> Set Quantile
quantilesFn (MetricName -> MetricName
stripTimerPrefix MetricName
nm) Set Quantile -> Set Quantile -> Set Quantile
forall a. Semigroup a => a -> a -> a
<> MetricName -> Set Quantile
quantilesFn (MetricName -> MetricName
timerMetricName MetricName
nm)
          byDims :: M.Map Dimensions [SubmissionPacket]
          byDims :: Map Dimensions [SubmissionPacket]
byDims = [SubmissionPacket]
-> (SubmissionPacket -> Dimensions)
-> (SubmissionPacket -> SubmissionPacket)
-> Map Dimensions [SubmissionPacket]
forall b a c. Ord b => [a] -> (a -> b) -> (a -> c) -> Map b [c]
collect [SubmissionPacket]
packets SubmissionPacket -> Dimensions
spDimensions SubmissionPacket -> SubmissionPacket
forall a. a -> a
id
          mkAgg :: [SubmissionPacket] -> AggPayload
mkAgg [SubmissionPacket]
xs =
              case SubmissionPacket -> Payload
spPayload (SubmissionPacket -> Payload) -> SubmissionPacket -> Payload
forall a b. (a -> b) -> a -> b
$ [SubmissionPacket] -> SubmissionPacket
forall a. [a] -> a
head [SubmissionPacket]
xs of
                Samples [Double]
_ -> Stats -> AggPayload
AggStats (Stats -> AggPayload)
-> ([SubmissionPacket] -> Stats)
-> [SubmissionPacket]
-> AggPayload
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Set Quantile -> Sample -> Stats
mkStats Set Quantile
qs (Sample -> Stats)
-> ([SubmissionPacket] -> Sample) -> [SubmissionPacket] -> Stats
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Double] -> Sample
forall a. Unbox a => [a] -> Vector a
V.fromList ([Double] -> Sample)
-> ([SubmissionPacket] -> [Double]) -> [SubmissionPacket] -> Sample
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
                             (SubmissionPacket -> [Double]) -> [SubmissionPacket] -> [Double]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (Payload -> [Double]
unSamples (Payload -> [Double])
-> (SubmissionPacket -> Payload) -> SubmissionPacket -> [Double]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SubmissionPacket -> Payload
spPayload) ([SubmissionPacket] -> AggPayload)
-> [SubmissionPacket] -> AggPayload
forall a b. (a -> b) -> a -> b
$
                             [SubmissionPacket]
xs
                Counter Int
_ -> Int -> AggPayload
AggCount (Int -> AggPayload)
-> ([SubmissionPacket] -> Int) -> [SubmissionPacket] -> AggPayload
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Int] -> Int
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int] -> Int)
-> ([SubmissionPacket] -> [Int]) -> [SubmissionPacket] -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
                             (SubmissionPacket -> Int) -> [SubmissionPacket] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (Payload -> Int
unCounter (Payload -> Int)
-> (SubmissionPacket -> Payload) -> SubmissionPacket -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SubmissionPacket -> Payload
spPayload) ([SubmissionPacket] -> AggPayload)
-> [SubmissionPacket] -> AggPayload
forall a b. (a -> b) -> a -> b
$
                             [SubmissionPacket]
xs
      Double
t <- (Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Double) -> (Double -> Int) -> Double -> Double
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
n) (Int -> Int) -> (Double -> Int) -> Double -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
n) (Int -> Int) -> (Double -> Int) -> Double -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round) (Double -> Double) -> Redis Double -> Redis Double
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
`liftM` IO Double -> Redis Double
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Double
TM.getTime
      let aggs :: [Aggregated]
aggs = ((Dimensions, [SubmissionPacket]) -> Aggregated)
-> [(Dimensions, [SubmissionPacket])] -> [Aggregated]
forall a b. (a -> b) -> [a] -> [b]
map (Dimensions, [SubmissionPacket]) -> Aggregated
mkDimsAgg ([(Dimensions, [SubmissionPacket])] -> [Aggregated])
-> [(Dimensions, [SubmissionPacket])] -> [Aggregated]
forall a b. (a -> b) -> a -> b
$ Map Dimensions [SubmissionPacket]
-> [(Dimensions, [SubmissionPacket])]
forall k a. Map k a -> [(k, a)]
M.toList (Map Dimensions [SubmissionPacket]
 -> [(Dimensions, [SubmissionPacket])])
-> Map Dimensions [SubmissionPacket]
-> [(Dimensions, [SubmissionPacket])]
forall a b. (a -> b) -> a -> b
$ Map Dimensions [SubmissionPacket]
-> Map Dimensions [SubmissionPacket]
forall packets.
(Monoid packets, Eq packets) =>
Map Dimensions packets -> Map Dimensions packets
expandDims (Map Dimensions [SubmissionPacket]
 -> Map Dimensions [SubmissionPacket])
-> Map Dimensions [SubmissionPacket]
-> Map Dimensions [SubmissionPacket]
forall a b. (a -> b) -> a -> b
$ Map Dimensions [SubmissionPacket]
byDims
          mkDimsAgg :: (Dimensions, [SubmissionPacket]) -> Aggregated
mkDimsAgg (Dimensions
dims, [SubmissionPacket]
ps) = Double -> MetricName -> AggPayload -> Dimensions -> Aggregated
Aggregated Double
t MetricName
nm ([SubmissionPacket] -> AggPayload
mkAgg [SubmissionPacket]
ps) Dimensions
dims
      (Aggregated -> Redis ()) -> [Aggregated] -> Redis ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Aggregated -> Redis ()
f [Aggregated]
aggs
      () -> Redis ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
  where
    quantilesFn :: MetricName -> Set Quantile
quantilesFn = AggProcessConfig -> MetricName -> Set Quantile
metricQuantiles AggProcessConfig
cfg


-------------------------------------------------------------------------------
-- | Take a map of packets by dimensions and *add* aggregations of the
-- existing dims that isolate each distinct dimension/dimensionvalue
-- pair + one more entry with an empty dimension set that aggregates
-- the whole thing.
-- worked example:
--
-- Given:
-- { {d1=>d1v1,d2=>d2v1} => p1
-- , {d1=>d1v1,d2=>d2v2} => p2
-- }
-- Produces:
-- { {d1=>d1v1,d2=>d2v1} => p1
-- , {d1=>d1v1,d2=>d2v2} => p2
-- , {d1=>d1v1} => p1 + p2
-- , {d2=>d2v1} => p1
-- , {d2=>d2v2} => p2
-- , {} => p1 + p2
-- }
expandDims
  :: forall packets. (Monoid packets, Eq packets)
  => M.Map Dimensions packets
  -> M.Map Dimensions packets
expandDims :: Map Dimensions packets -> Map Dimensions packets
expandDims Map Dimensions packets
m =
  -- left-biased so technically if we have anything occupying the aggregated spots, leave them be
  Map Dimensions packets
m Map Dimensions packets
-> Map Dimensions packets -> Map Dimensions packets
forall a. Semigroup a => a -> a -> a
<> Map Dimensions packets
additions Map Dimensions packets
-> Map Dimensions packets -> Map Dimensions packets
forall a. Semigroup a => a -> a -> a
<> Map Dimensions packets
fullAggregation
  where
    distinctPairs :: Set.Set (DimensionName, DimensionValue)
    distinctPairs :: Set (DimensionName, DimensionValue)
distinctPairs = [(DimensionName, DimensionValue)]
-> Set (DimensionName, DimensionValue)
forall a. Ord a => [a] -> Set a
Set.fromList ([[(DimensionName, DimensionValue)]]
-> [(DimensionName, DimensionValue)]
forall a. Monoid a => [a] -> a
mconcat (Dimensions -> [(DimensionName, DimensionValue)]
forall k a. Map k a -> [(k, a)]
M.toList (Dimensions -> [(DimensionName, DimensionValue)])
-> [Dimensions] -> [[(DimensionName, DimensionValue)]]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map Dimensions packets -> [Dimensions]
forall k a. Map k a -> [k]
M.keys Map Dimensions packets
m))
    additions :: Map Dimensions packets
additions = ((DimensionName, DimensionValue) -> Map Dimensions packets)
-> Set (DimensionName, DimensionValue) -> Map Dimensions packets
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (DimensionName, DimensionValue) -> Map Dimensions packets
mkIsolatedMap Set (DimensionName, DimensionValue)
distinctPairs
    mkIsolatedMap :: (DimensionName, DimensionValue) -> M.Map Dimensions packets
    mkIsolatedMap :: (DimensionName, DimensionValue) -> Map Dimensions packets
mkIsolatedMap (DimensionName, DimensionValue)
dPair =
      let matches :: [packets]
matches = ((DimensionName, DimensionValue), packets) -> packets
forall a b. (a, b) -> b
snd (((DimensionName, DimensionValue), packets) -> packets)
-> [((DimensionName, DimensionValue), packets)] -> [packets]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (((DimensionName, DimensionValue), packets) -> Bool)
-> [((DimensionName, DimensionValue), packets)]
-> [((DimensionName, DimensionValue), packets)]
forall a. (a -> Bool) -> [a] -> [a]
filter (((DimensionName, DimensionValue)
-> (DimensionName, DimensionValue) -> Bool
forall a. Eq a => a -> a -> Bool
== (DimensionName, DimensionValue)
dPair) ((DimensionName, DimensionValue) -> Bool)
-> (((DimensionName, DimensionValue), packets)
    -> (DimensionName, DimensionValue))
-> ((DimensionName, DimensionValue), packets)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((DimensionName, DimensionValue), packets)
-> (DimensionName, DimensionValue)
forall a b. (a, b) -> a
fst) [((DimensionName, DimensionValue), packets)]
mFlat
      in if [packets]
matches [packets] -> [packets] -> Bool
forall a. Eq a => a -> a -> Bool
== [packets]
forall a. Monoid a => a
mempty
            then Map Dimensions packets
forall a. Monoid a => a
mempty
            else Dimensions -> packets -> Map Dimensions packets
forall k a. k -> a -> Map k a
M.singleton ((DimensionName -> DimensionValue -> Dimensions)
-> (DimensionName, DimensionValue) -> Dimensions
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry DimensionName -> DimensionValue -> Dimensions
forall k a. k -> a -> Map k a
M.singleton (DimensionName, DimensionValue)
dPair) ([packets] -> packets
forall a. Monoid a => [a] -> a
mconcat [packets]
matches)
    mFlat :: [((DimensionName, DimensionValue), packets)]
    mFlat :: [((DimensionName, DimensionValue), packets)]
mFlat = [ ((DimensionName
dn, DimensionValue
dv), packets
packets)
            | (Dimensions
dimensionsMap, packets
packets) <- Map Dimensions packets -> [(Dimensions, packets)]
forall k a. Map k a -> [(k, a)]
M.toList Map Dimensions packets
m
            , (DimensionName
dn, DimensionValue
dv) <- Dimensions -> [(DimensionName, DimensionValue)]
forall k a. Map k a -> [(k, a)]
M.toList Dimensions
dimensionsMap]
    -- All packets across any combination of dimensions
    fullAggregation :: Map Dimensions packets
fullAggregation = Dimensions -> packets -> Map Dimensions packets
forall k a. k -> a -> Map k a
M.singleton Dimensions
forall a. Monoid a => a
mempty ([packets] -> packets
forall a. Monoid a => [a] -> a
mconcat (Map Dimensions packets -> [packets]
forall k a. Map k a -> [a]
M.elems Map Dimensions packets
m))


-- | A function that does something with the aggregation results. Can
-- implement multiple backends simply using this. Note that Semigroup and Monoid instances are provided for defaulting and combining agg processes.
data AggProcess = AggProcess
  { AggProcess -> AggProcessConfig
apConfig :: AggProcessConfig
  , AggProcess -> Aggregated -> Redis ()
apProc   :: Aggregated -> Redis ()
  }


instance Semigroup.Semigroup AggProcess where
  (AggProcess AggProcessConfig
cfg1 Aggregated -> Redis ()
prc1) <> :: AggProcess -> AggProcess -> AggProcess
<> (AggProcess AggProcessConfig
cfg2 Aggregated -> Redis ()
prc2) =
    AggProcessConfig -> (Aggregated -> Redis ()) -> AggProcess
AggProcess (AggProcessConfig
cfg1 AggProcessConfig -> AggProcessConfig -> AggProcessConfig
forall a. Semigroup a => a -> a -> a
<> AggProcessConfig
cfg2) (\Aggregated
agg -> Aggregated -> Redis ()
prc1 Aggregated
agg Redis () -> Redis () -> Redis ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Aggregated -> Redis ()
prc2 Aggregated
agg)


instance Monoid AggProcess where
  mempty :: AggProcess
mempty = AggProcessConfig -> (Aggregated -> Redis ()) -> AggProcess
AggProcess AggProcessConfig
forall a. Monoid a => a
mempty (Redis () -> Aggregated -> Redis ()
forall a b. a -> b -> a
const (() -> Redis ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()))
  mappend :: AggProcess -> AggProcess -> AggProcess
mappend = AggProcess -> AggProcess -> AggProcess
forall a. Semigroup a => a -> a -> a
(<>)


-------------------------------------------------------------------------------
-- | General configuration for agg processes. Defaulted with 'def',
-- 'defAggProcessConfig', and 'mempty'. Configurations can be combined
-- with (<>) from Monoid or Semigroup.
data AggProcessConfig = AggProcessConfig
  { AggProcessConfig -> MetricName -> Set Quantile
metricQuantiles :: MetricName -> Set.Set Quantile
  -- ^ What quantiles should we calculate for any given metric, if
  -- any? We offer some common patterns for this in 'quantileMap',
  -- 'standardQuantiles', and 'noQuantiles'.
  }


instance Semigroup AggProcessConfig where
  AggProcessConfig MetricName -> Set Quantile
f1 <> :: AggProcessConfig -> AggProcessConfig -> AggProcessConfig
<> AggProcessConfig MetricName -> Set Quantile
f2 =
    let f3 :: MetricName -> Set Quantile
f3 = MetricName -> Set Quantile
f1 (MetricName -> Set Quantile)
-> (MetricName -> Set Quantile) -> MetricName -> Set Quantile
forall a. Semigroup a => a -> a -> a
<> MetricName -> Set Quantile
f2
    in (MetricName -> Set Quantile) -> AggProcessConfig
AggProcessConfig MetricName -> Set Quantile
f3


instance Monoid AggProcessConfig where
  mempty :: AggProcessConfig
mempty = (MetricName -> Set Quantile) -> AggProcessConfig
AggProcessConfig MetricName -> Set Quantile
forall a. Monoid a => a
mempty
  mappend :: AggProcessConfig -> AggProcessConfig -> AggProcessConfig
mappend = AggProcessConfig -> AggProcessConfig -> AggProcessConfig
forall a. Semigroup a => a -> a -> a
(<>)


-- | Uses 'standardQuantiles'.
defAggProcessConfig :: AggProcessConfig
defAggProcessConfig :: AggProcessConfig
defAggProcessConfig = (MetricName -> Set Quantile) -> AggProcessConfig
AggProcessConfig MetricName -> Set Quantile
standardQuantiles


instance Default AggProcessConfig where
  def :: AggProcessConfig
def = AggProcessConfig
defAggProcessConfig


-- | Regardless of metric, produce no quantiles.
noQuantiles :: MetricName -> Set.Set Quantile
noQuantiles :: MetricName -> Set Quantile
noQuantiles = Set Quantile -> MetricName -> Set Quantile
forall a b. a -> b -> a
const Set Quantile
forall a. Monoid a => a
mempty


-- | This is usually a good, comprehensive default. Produces quantiles
-- 10,20,30,40,50,60,70,80,90,99. *Note:* for some backends like
-- cloudwatch, each quantile produces an additional metric, so you
-- should probably consider using something more limited than this.
standardQuantiles :: MetricName -> Set.Set Quantile
standardQuantiles :: MetricName -> Set Quantile
standardQuantiles MetricName
_ =
  [Quantile] -> Set Quantile
forall a. Ord a => [a] -> Set a
Set.fromList [Int -> Quantile
Q Int
10,Int -> Quantile
Q Int
20,Int -> Quantile
Q Int
30,Int -> Quantile
Q Int
40,Int -> Quantile
Q Int
50,Int -> Quantile
Q Int
60,Int -> Quantile
Q Int
70,Int -> Quantile
Q Int
80,Int -> Quantile
Q Int
90,Int -> Quantile
Q Int
99]


-- | If you have a fixed set of metric names, this is often a
-- convenient way to express quantiles-per-metric.
quantileMap
  :: M.Map MetricName (Set.Set Quantile)
  -> Set.Set Quantile
  -- ^ What to return on miss
  -> (MetricName -> Set.Set Quantile)
quantileMap :: Map MetricName (Set Quantile)
-> Set Quantile -> MetricName -> Set Quantile
quantileMap Map MetricName (Set Quantile)
m Set Quantile
qdef MetricName
mn = Set Quantile -> Maybe (Set Quantile) -> Set Quantile
forall a. a -> Maybe a -> a
fromMaybe Set Quantile
qdef (MetricName -> Map MetricName (Set Quantile) -> Maybe (Set Quantile)
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup MetricName
mn Map MetricName (Set Quantile)
m)


-------------------------------------------------------------------------------
-- | Store aggregation results in a CSV file
putAggregateCSV :: Handle -> AggProcessConfig -> AggProcess
putAggregateCSV :: Handle -> AggProcessConfig -> AggProcess
putAggregateCSV Handle
h AggProcessConfig
cfg = AggProcessConfig -> (Aggregated -> Redis ()) -> AggProcess
AggProcess AggProcessConfig
cfg ((Aggregated -> Redis ()) -> AggProcess)
-> (Aggregated -> Redis ()) -> AggProcess
forall a b. (a -> b) -> a -> b
$ \Aggregated
agg ->
  let d :: Text
d = CSVSettings -> Map Text Text -> Text
forall s r. CSV s r => CSVSettings -> r -> s
rowToStr CSVSettings
defCSVSettings (Map Text Text -> Text) -> Map Text Text -> Text
forall a b. (a -> b) -> a -> b
$ Aggregated -> Map Text Text
aggToCSV Aggregated
agg
  in IO () -> Redis ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Redis ()) -> IO () -> Redis ()
forall a b. (a -> b) -> a -> b
$ Handle -> Text -> IO ()
T.hPutStrLn Handle
h Text
d


typePrefix :: AggPayload -> T.Text
typePrefix :: AggPayload -> Text
typePrefix AggStats{} = Text
"samples"
typePrefix AggCount{} = Text
"counts"


-------------------------------------------------------------------------------
-- | Push data into a Graphite database using the plaintext protocol
putAggregateGraphite :: Handle -> AggProcessConfig -> AggProcess
putAggregateGraphite :: Handle -> AggProcessConfig -> AggProcess
putAggregateGraphite Handle
h AggProcessConfig
cfg = AggProcessConfig -> (Aggregated -> Redis ()) -> AggProcess
AggProcess AggProcessConfig
cfg ((Aggregated -> Redis ()) -> AggProcess)
-> (Aggregated -> Redis ()) -> AggProcess
forall a b. (a -> b) -> a -> b
$ \Aggregated
agg ->
    let ([(Text, Text)]
ss, Text
ts) = Aggregated -> ([(Text, Text)], Text)
mkStatsFields Aggregated
agg
        -- Expand dimensions into one datum per dimension pair as the group
        mkLines :: (Text, Text) -> [Text]
mkLines (Text
m, Text
val) = [(DimensionName, DimensionValue)]
-> ((DimensionName, DimensionValue) -> Text) -> [Text]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
for (Dimensions -> [(DimensionName, DimensionValue)]
forall k a. Map k a -> [(k, a)]
M.toList (Aggregated -> Dimensions
aggDimensions Aggregated
agg)) (((DimensionName, DimensionValue) -> Text) -> [Text])
-> ((DimensionName, DimensionValue) -> Text) -> [Text]
forall a b. (a -> b) -> a -> b
$ \(DimensionName Text
dimName, DimensionValue Text
dimVal) -> [Text] -> Text
T.concat
            [ Text
"inst."
            , AggPayload -> Text
typePrefix (Aggregated -> AggPayload
aggPayload Aggregated
agg), Text
"."
            ,  FilePath -> Text
T.pack (MetricName -> FilePath
metricName (Aggregated -> MetricName
aggName Aggregated
agg)), Text
"."
            , Text
m, Text
"."
            , Text
dimName, Text
"."
            , Text
dimVal, Text
" "
            , Text
val, Text
" "
            , Text
ts ]
    in IO () -> Redis ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Redis ()) -> IO () -> Redis ()
forall a b. (a -> b) -> a -> b
$ ((Text, Text) -> IO ()) -> [(Text, Text)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((Text -> IO ()) -> [Text] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Handle -> Text -> IO ()
T.hPutStrLn Handle
h) ([Text] -> IO ())
-> ((Text, Text) -> [Text]) -> (Text, Text) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text, Text) -> [Text]
mkLines) [(Text, Text)]
ss


-------------------------------------------------------------------------------
-- | Pop all keys in a redis List
popLAll :: (Serialize a, SC.SafeCopy a) => B.ByteString -> Redis [a]
popLAll :: ByteString -> Redis [a]
popLAll ByteString
k = do
  [a]
res <- ByteString -> Int -> Redis [a]
forall a.
(Serialize a, SafeCopy a) =>
ByteString -> Int -> Redis [a]
popLMany ByteString
k Int
100
  case [a]
res of
    [] -> [a] -> Redis [a]
forall (m :: * -> *) a. Monad m => a -> m a
return [a]
res
    [a]
_  -> ([a]
res [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ ) ([a] -> [a]) -> Redis [a] -> Redis [a]
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
`liftM` ByteString -> Redis [a]
forall a. (Serialize a, SafeCopy a) => ByteString -> Redis [a]
popLAll ByteString
k


-------------------------------------------------------------------------------
-- | Pop up to N items from a queue. It will pop from left and preserve order.
popLMany :: (Serialize a, SC.SafeCopy a) => B.ByteString -> Int -> Redis [a]
popLMany :: ByteString -> Int -> Redis [a]
popLMany ByteString
k Int
n = do
    [Either Reply (Maybe ByteString)]
res <- Int
-> Redis (Either Reply (Maybe ByteString))
-> Redis [Either Reply (Maybe ByteString)]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
n Redis (Either Reply (Maybe ByteString))
pop
    case [Either Reply (Maybe ByteString)]
-> Either Reply [Maybe ByteString]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence [Either Reply (Maybe ByteString)]
res of
      Left Reply
_   -> [a] -> Redis [a]
forall (m :: * -> *) a. Monad m => a -> m a
return []
      Right [Maybe ByteString]
xs -> [a] -> Redis [a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([a] -> Redis [a]) -> [a] -> Redis [a]
forall a b. (a -> b) -> a -> b
$ (ByteString -> Maybe a) -> [ByteString] -> [a]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe ByteString -> Maybe a
forall b. (SafeCopy b, Serialize b) => ByteString -> Maybe b
conv ([ByteString] -> [a]) -> [ByteString] -> [a]
forall a b. (a -> b) -> a -> b
$ [Maybe ByteString] -> [ByteString]
forall a. [Maybe a] -> [a]
catMaybes [Maybe ByteString]
xs
    where
      pop :: Redis (Either Reply (Maybe ByteString))
pop = ByteString -> Redis (Either Reply (Maybe ByteString))
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f (Maybe ByteString))
R.lpop ByteString
k
      conv :: ByteString -> Maybe b
conv ByteString
x =  Either FilePath b -> Maybe b
forall a b. Either a b -> Maybe b
hush (Either FilePath b -> Maybe b) -> Either FilePath b -> Maybe b
forall a b. (a -> b) -> a -> b
$ ByteString -> Either FilePath b
forall a.
(SafeCopy a, Serialize a) =>
ByteString -> Either FilePath a
decodeCompress ByteString
x


-------------------------------------------------------------------------------
-- | Need to pull in a debugging library here.
dbg :: (Monad m) => String -> m ()
dbg :: FilePath -> m ()
dbg FilePath
_ = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()


-- ------------------------------------------------------------------------------
-- dbg :: (MonadIO m) => String -> m ()
-- dbg s = debug $ "Instrument.Worker: " ++ s


-------------------------------------------------------------------------------
-- | Expand count aggregation to have the full columns
aggToCSV :: Aggregated -> M.Map T.Text T.Text
aggToCSV :: Aggregated -> Map Text Text
aggToCSV agg :: Aggregated
agg@Aggregated{Double
Dimensions
AggPayload
MetricName
aggTS :: Aggregated -> Double
aggDimensions :: Dimensions
aggPayload :: AggPayload
aggName :: MetricName
aggTS :: Double
aggName :: Aggregated -> MetricName
aggPayload :: Aggregated -> AggPayload
aggDimensions :: Aggregated -> Dimensions
..} = Map Text Text
els Map Text Text -> Map Text Text -> Map Text Text
forall a. Semigroup a => a -> a -> a
<> Map Text Text
defFields Map Text Text -> Map Text Text -> Map Text Text
forall a. Semigroup a => a -> a -> a
<> Map Text Text
dimFields
  where
    els :: MapRow T.Text
    els :: Map Text Text
els = [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList ([(Text, Text)] -> Map Text Text)
-> [(Text, Text)] -> Map Text Text
forall a b. (a -> b) -> a -> b
$
            (Text
"metric", FilePath -> Text
T.pack (MetricName -> FilePath
metricName MetricName
aggName)) (Text, Text) -> [(Text, Text)] -> [(Text, Text)]
forall a. a -> [a] -> [a]
:
            (Text
"timestamp", Text
ts) (Text, Text) -> [(Text, Text)] -> [(Text, Text)]
forall a. a -> [a] -> [a]
:
            [(Text, Text)]
fields
    ([(Text, Text)]
fields, Text
ts) = Aggregated -> ([(Text, Text)], Text)
mkStatsFields Aggregated
agg
    defFields :: Map Text Text
defFields = [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList ([(Text, Text)] -> Map Text Text)
-> [(Text, Text)] -> Map Text Text
forall a b. (a -> b) -> a -> b
$ ([(Text, Text)], Text) -> [(Text, Text)]
forall a b. (a, b) -> a
fst (([(Text, Text)], Text) -> [(Text, Text)])
-> ([(Text, Text)], Text) -> [(Text, Text)]
forall a b. (a -> b) -> a -> b
$ Aggregated -> ([(Text, Text)], Text)
mkStatsFields (Aggregated -> ([(Text, Text)], Text))
-> Aggregated -> ([(Text, Text)], Text)
forall a b. (a -> b) -> a -> b
$ Aggregated
agg { aggPayload :: AggPayload
aggPayload =  (Stats -> AggPayload
AggStats Stats
forall a. Default a => a
def) }
    dimFields :: Map Text Text
dimFields = [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [(Text
k,Text
v) | (DimensionName Text
k, DimensionValue Text
v) <- Dimensions -> [(DimensionName, DimensionValue)]
forall k a. Map k a -> [(k, a)]
M.toList Dimensions
aggDimensions]


-------------------------------------------------------------------------------
-- | Get agg results into a form ready to be output
mkStatsFields :: Aggregated -> ([(T.Text, T.Text)], T.Text)
mkStatsFields :: Aggregated -> ([(Text, Text)], Text)
mkStatsFields Aggregated{Double
Dimensions
AggPayload
MetricName
aggDimensions :: Dimensions
aggPayload :: AggPayload
aggName :: MetricName
aggTS :: Double
aggTS :: Aggregated -> Double
aggName :: Aggregated -> MetricName
aggPayload :: Aggregated -> AggPayload
aggDimensions :: Aggregated -> Dimensions
..}  = ([(Text, Text)]
els, Text
ts)
    where
      els :: [(Text, Text)]
els =
        case AggPayload
aggPayload of
          AggStats Stats{Double
Int
Map Int Double
squantiles :: Map Int Double
skurtosis :: Double
sskewness :: Double
sstdev :: Double
srange :: Double
smin :: Double
smax :: Double
scount :: Int
ssum :: Double
smean :: Double
squantiles :: Stats -> Map Int Double
skurtosis :: Stats -> Double
sskewness :: Stats -> Double
sstdev :: Stats -> Double
srange :: Stats -> Double
smin :: Stats -> Double
smax :: Stats -> Double
scount :: Stats -> Int
ssum :: Stats -> Double
smean :: Stats -> Double
..} ->
              [ (Text
"mean", Int -> Bool -> Double -> Text
forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
smean)
              , (Text
"count", Int -> Text
forall a. Show a => a -> Text
showT Int
scount)
              , (Text
"max", Int -> Bool -> Double -> Text
forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
smax)
              , (Text
"min", Int -> Bool -> Double -> Text
forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
smin)
              , (Text
"srange", Int -> Bool -> Double -> Text
forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
srange)
              , (Text
"stdDev", Int -> Bool -> Double -> Text
forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
sstdev)
              , (Text
"sum", Int -> Bool -> Double -> Text
forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
ssum)
              , (Text
"skewness", Int -> Bool -> Double -> Text
forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
sskewness)
              , (Text
"kurtosis", Int -> Bool -> Double -> Text
forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
skurtosis)
              ] [(Text, Text)] -> [(Text, Text)] -> [(Text, Text)]
forall a. [a] -> [a] -> [a]
++ (((Int, Double) -> (Text, Text))
-> [(Int, Double)] -> [(Text, Text)]
forall a b. (a -> b) -> [a] -> [b]
map (Int, Double) -> (Text, Text)
forall a a. (Show a, RealFloat a) => (a, a) -> (Text, Text)
mkQ ([(Int, Double)] -> [(Text, Text)])
-> [(Int, Double)] -> [(Text, Text)]
forall a b. (a -> b) -> a -> b
$ Map Int Double -> [(Int, Double)]
forall k a. Map k a -> [(k, a)]
M.toList Map Int Double
squantiles)
          AggCount Int
i ->
              [ (Text
"count", Int -> Text
forall a. Show a => a -> Text
showT Int
i)]

      mkQ :: (a, a) -> (Text, Text)
mkQ (a
k,a
v) = ([Text] -> Text
T.concat [Text
"percentile_", a -> Text
forall a. Show a => a -> Text
showT a
k], Int -> Bool -> a -> Text
forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False a
v)
      ts :: Text
ts = Double -> Text
forall a. RealFrac a => a -> Text
formatInt Double
aggTS