{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Instrument.Worker
( initWorkerCSV
, initWorkerCSV'
, initWorkerGraphite
, initWorkerGraphite'
, work
, initWorker
, AggProcess(..)
, AggProcessConfig(..)
, standardQuantiles
, noQuantiles
, quantileMap
, defAggProcessConfig
, expandDims
) where
import Control.Error
import Control.Monad
import Control.Monad.IO.Class
import qualified Data.ByteString.Char8 as B
import Data.Conduit (runConduit, (.|))
import qualified Data.Conduit.List as CL
import Data.CSV.Conduit
import Data.Default
import qualified Data.Map as M
import qualified Data.SafeCopy as SC
import Data.Semigroup as Semigroup
import Data.Serialize
import qualified Data.Set as Set
import qualified Data.Text as T
import qualified Data.Text.IO as T
import qualified Data.Vector.Unboxed as V
import Database.Redis as R hiding (decode)
import Network.Socket as N
import qualified Statistics.Quantile as Q
import Statistics.Sample
import System.IO
import System.Posix
import Instrument.Client (packetsKey, stripTimerPrefix,
timerMetricName)
import qualified Instrument.Measurement as TM
import Instrument.Types
import Instrument.Utils
initWorkerCSV
:: ConnectInfo
-> FilePath
-> Int
-> AggProcessConfig
-> IO ()
initWorkerCSV :: ConnectInfo -> FilePath -> Int -> AggProcessConfig -> IO ()
initWorkerCSV ConnectInfo
conn FilePath
fp Int
n AggProcessConfig
cfg =
FilePath -> ConnectInfo -> Int -> AggProcess -> IO ()
initWorker FilePath
"CSV Worker" ConnectInfo
conn Int
n (AggProcess -> IO ()) -> IO AggProcess -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< FilePath -> AggProcessConfig -> IO AggProcess
initWorkerCSV' FilePath
fp AggProcessConfig
cfg
initWorkerCSV'
:: FilePath
-> AggProcessConfig
-> IO AggProcess
initWorkerCSV' :: FilePath -> AggProcessConfig -> IO AggProcess
initWorkerCSV' FilePath
fp AggProcessConfig
cfg = do
!Bool
res <- FilePath -> IO Bool
fileExist FilePath
fp
!Handle
h <- FilePath -> IOMode -> IO Handle
openFile FilePath
fp IOMode
AppendMode
Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
LineBuffering
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
res (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
Handle -> Text -> IO ()
T.hPutStrLn Handle
h (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ CSVSettings -> [Text] -> Text
forall s r. CSV s r => CSVSettings -> r -> s
rowToStr CSVSettings
defCSVSettings ([Text] -> Text)
-> (Map Text Text -> [Text]) -> Map Text Text -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Map Text Text -> [Text]
forall k a. Map k a -> [k]
M.keys (Map Text Text -> Text) -> Map Text Text -> Text
forall a b. (a -> b) -> a -> b
$ Aggregated -> Map Text Text
aggToCSV Aggregated
forall a. Default a => a
def
AggProcess -> IO AggProcess
forall (m :: * -> *) a. Monad m => a -> m a
return (AggProcess -> IO AggProcess) -> AggProcess -> IO AggProcess
forall a b. (a -> b) -> a -> b
$ Handle -> AggProcessConfig -> AggProcess
putAggregateCSV Handle
h AggProcessConfig
cfg
initWorkerGraphite
:: ConnectInfo
-> Int
-> HostName
-> Int
-> AggProcessConfig
-> IO ()
initWorkerGraphite :: ConnectInfo -> Int -> FilePath -> Int -> AggProcessConfig -> IO ()
initWorkerGraphite ConnectInfo
conn Int
n FilePath
server Int
port AggProcessConfig
cfg =
FilePath -> ConnectInfo -> Int -> AggProcess -> IO ()
initWorker FilePath
"Graphite Worker" ConnectInfo
conn Int
n (AggProcess -> IO ()) -> IO AggProcess -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< FilePath -> Int -> AggProcessConfig -> IO AggProcess
initWorkerGraphite' FilePath
server Int
port AggProcessConfig
cfg
initWorkerGraphite'
:: HostName
-> Int
-> AggProcessConfig
-> IO AggProcess
initWorkerGraphite' :: FilePath -> Int -> AggProcessConfig -> IO AggProcess
initWorkerGraphite' FilePath
server Int
port AggProcessConfig
cfg = do
AddrInfo
addr <- FilePath -> PortNumber -> IO AddrInfo
resolve FilePath
server (Int -> PortNumber
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
port)
Socket
sock <- AddrInfo -> IO Socket
open AddrInfo
addr
Handle
h <- Socket -> IOMode -> IO Handle
N.socketToHandle Socket
sock IOMode
ReadWriteMode
Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
LineBuffering
AggProcess -> IO AggProcess
forall (m :: * -> *) a. Monad m => a -> m a
return (AggProcess -> IO AggProcess) -> AggProcess -> IO AggProcess
forall a b. (a -> b) -> a -> b
$ Handle -> AggProcessConfig -> AggProcess
putAggregateGraphite Handle
h AggProcessConfig
cfg
where
portNumberToServiceName :: N.PortNumber -> N.ServiceName
portNumberToServiceName :: PortNumber -> FilePath
portNumberToServiceName = PortNumber -> FilePath
forall a. Show a => a -> FilePath
show
resolve :: FilePath -> PortNumber -> IO AddrInfo
resolve FilePath
host PortNumber
portNumber = do
let hints :: AddrInfo
hints = AddrInfo
N.defaultHints { addrSocketType :: SocketType
N.addrSocketType = SocketType
N.Stream }
AddrInfo
addr:[AddrInfo]
_ <- Maybe AddrInfo -> Maybe FilePath -> Maybe FilePath -> IO [AddrInfo]
N.getAddrInfo
(AddrInfo -> Maybe AddrInfo
forall a. a -> Maybe a
Just AddrInfo
hints)
(FilePath -> Maybe FilePath
forall a. a -> Maybe a
Just FilePath
host)
(FilePath -> Maybe FilePath
forall a. a -> Maybe a
Just (PortNumber -> FilePath
portNumberToServiceName PortNumber
portNumber))
AddrInfo -> IO AddrInfo
forall (m :: * -> *) a. Monad m => a -> m a
return AddrInfo
addr
open :: AddrInfo -> IO Socket
open AddrInfo
addr = do
Socket
sock <- Family -> SocketType -> ProtocolNumber -> IO Socket
N.socket
(AddrInfo -> Family
N.addrFamily AddrInfo
addr)
(AddrInfo -> SocketType
N.addrSocketType AddrInfo
addr)
(AddrInfo -> ProtocolNumber
N.addrProtocol AddrInfo
addr)
Socket -> SockAddr -> IO ()
N.connect Socket
sock (AddrInfo -> SockAddr
N.addrAddress AddrInfo
addr)
Socket -> IO Socket
forall (m :: * -> *) a. Monad m => a -> m a
return Socket
sock
initWorker :: String -> ConnectInfo -> Int -> AggProcess -> IO ()
initWorker :: FilePath -> ConnectInfo -> Int -> AggProcess -> IO ()
initWorker FilePath
wname ConnectInfo
conn Int
n AggProcess
f = do
Connection
p <- ConnectInfo -> IO Connection
createInstrumentPool ConnectInfo
conn
IO () -> IO ()
indefinitely' (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> Int -> AggProcess -> IO ()
work Connection
p Int
n AggProcess
f
where
indefinitely' :: IO () -> IO ()
indefinitely' = FilePath -> Int -> IO () -> IO ()
indefinitely FilePath
wname (Int -> Int
seconds Int
n)
mkStats :: Set.Set Quantile -> Sample -> Stats
mkStats :: Set Quantile -> Sample -> Stats
mkStats Set Quantile
qs Sample
s = Stats :: Double
-> Double
-> Int
-> Double
-> Double
-> Double
-> Double
-> Double
-> Double
-> Map Int Double
-> Stats
Stats { smean :: Double
smean = Sample -> Double
forall (v :: * -> *). Vector v Double => v Double -> Double
mean Sample
s
, ssum :: Double
ssum = Sample -> Double
forall a. (Unbox a, Num a) => Vector a -> a
V.sum Sample
s
, scount :: Int
scount = Sample -> Int
forall a. Unbox a => Vector a -> Int
V.length Sample
s
, smax :: Double
smax = Sample -> Double
forall a. (Unbox a, Ord a) => Vector a -> a
V.maximum Sample
s
, smin :: Double
smin = Sample -> Double
forall a. (Unbox a, Ord a) => Vector a -> a
V.minimum Sample
s
, srange :: Double
srange = Sample -> Double
forall (v :: * -> *). Vector v Double => v Double -> Double
range Sample
s
, sstdev :: Double
sstdev = Sample -> Double
forall (v :: * -> *). Vector v Double => v Double -> Double
stdDev Sample
s
, sskewness :: Double
sskewness = Sample -> Double
forall (v :: * -> *). Vector v Double => v Double -> Double
skewness Sample
s
, skurtosis :: Double
skurtosis = Sample -> Double
forall (v :: * -> *). Vector v Double => v Double -> Double
kurtosis Sample
s
, squantiles :: Map Int Double
squantiles = Map Int Double
quantiles }
where
quantiles :: Map Int Double
quantiles = [(Int, Double)] -> Map Int Double
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList (Int -> Int -> (Int, Double)
mkQ Int
100 (Int -> (Int, Double))
-> (Quantile -> Int) -> Quantile -> (Int, Double)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Quantile -> Int
quantile (Quantile -> (Int, Double)) -> [Quantile] -> [(Int, Double)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Set Quantile -> [Quantile]
forall a. Set a -> [a]
Set.toList Set Quantile
qs)
mkQ :: Int -> Int -> (Int, Double)
mkQ Int
mx Int
i = (Int
i, Int -> Int -> Sample -> Double
forall (v :: * -> *).
Vector v Double =>
Int -> Int -> v Double -> Double
Q.weightedAvg Int
i Int
mx Sample
s)
work :: R.Connection -> Int -> AggProcess -> IO ()
work :: Connection -> Int -> AggProcess -> IO ()
work Connection
r Int
n AggProcess
f = Connection -> Redis () -> IO ()
forall a. Connection -> Redis a -> IO a
runRedis Connection
r (Redis () -> IO ()) -> Redis () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
FilePath -> Redis ()
forall (m :: * -> *). Monad m => FilePath -> m ()
dbg FilePath
"entered work block"
Integer
estimate <- (Reply -> Integer)
-> (Integer -> Integer) -> Either Reply Integer -> Integer
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Integer -> Reply -> Integer
forall a b. a -> b -> a
const Integer
0) Integer -> Integer
forall a. a -> a
id (Either Reply Integer -> Integer)
-> Redis (Either Reply Integer) -> Redis Integer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ByteString -> Redis (Either Reply Integer)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f Integer)
scard ByteString
packetsKey
ConduitT () Void Redis () -> Redis ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void Redis () -> Redis ())
-> ConduitT () Void Redis () -> Redis ()
forall a b. (a -> b) -> a -> b
$
(Integer -> Redis (Maybe (ByteString, Integer)))
-> Integer -> ConduitT () ByteString Redis ()
forall (m :: * -> *) b a i.
Monad m =>
(b -> m (Maybe (a, b))) -> b -> ConduitT i a m ()
CL.unfoldM Integer -> Redis (Maybe (ByteString, Integer))
forall b (m :: * -> *) a.
(Ord b, Num b, RedisCtx m (Either a)) =>
b -> m (Maybe (ByteString, b))
nextKey Integer
estimate ConduitT () ByteString Redis ()
-> ConduitM ByteString Void Redis () -> ConduitT () Void Redis ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.|
(ByteString -> Redis ()) -> ConduitM ByteString Void Redis ()
forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
CL.mapM_ (Int -> AggProcess -> ByteString -> Redis ()
processSampler Int
n AggProcess
f)
where
nextKey :: b -> m (Maybe (ByteString, b))
nextKey b
estRemaining
| b
estRemaining b -> b -> Bool
forall a. Ord a => a -> a -> Bool
> b
0 = do
Either a (Maybe ByteString)
mk <- ByteString -> m (Either a (Maybe ByteString))
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f (Maybe ByteString))
spop ByteString
packetsKey
Maybe (ByteString, b) -> m (Maybe (ByteString, b))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (ByteString, b) -> m (Maybe (ByteString, b)))
-> Maybe (ByteString, b) -> m (Maybe (ByteString, b))
forall a b. (a -> b) -> a -> b
$ case Either a (Maybe ByteString)
mk of
Right (Just ByteString
k) -> (ByteString, b) -> Maybe (ByteString, b)
forall a. a -> Maybe a
Just (ByteString
k, b
estRemaining b -> b -> b
forall a. Num a => a -> a -> a
- b
1)
Either a (Maybe ByteString)
_ -> Maybe (ByteString, b)
forall a. Maybe a
Nothing
| Bool
otherwise = Maybe (ByteString, b) -> m (Maybe (ByteString, b))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (ByteString, b)
forall a. Maybe a
Nothing
processSampler
:: Int
-> AggProcess
-> B.ByteString
-> Redis ()
processSampler :: Int -> AggProcess -> ByteString -> Redis ()
processSampler Int
n (AggProcess AggProcessConfig
cfg Aggregated -> Redis ()
f) ByteString
k = do
[SubmissionPacket]
packets <- ByteString -> Redis [SubmissionPacket]
forall a. (Serialize a, SafeCopy a) => ByteString -> Redis [a]
popLAll ByteString
k
case [SubmissionPacket]
packets of
[] -> () -> Redis ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
[SubmissionPacket]
_ -> do
let nm :: MetricName
nm = SubmissionPacket -> MetricName
spName (SubmissionPacket -> MetricName)
-> ([SubmissionPacket] -> SubmissionPacket)
-> [SubmissionPacket]
-> MetricName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [SubmissionPacket] -> SubmissionPacket
forall a. [a] -> a
head ([SubmissionPacket] -> MetricName)
-> [SubmissionPacket] -> MetricName
forall a b. (a -> b) -> a -> b
$ [SubmissionPacket]
packets
qs :: Set Quantile
qs = MetricName -> Set Quantile
quantilesFn (MetricName -> MetricName
stripTimerPrefix MetricName
nm) Set Quantile -> Set Quantile -> Set Quantile
forall a. Semigroup a => a -> a -> a
<> MetricName -> Set Quantile
quantilesFn (MetricName -> MetricName
timerMetricName MetricName
nm)
byDims :: M.Map Dimensions [SubmissionPacket]
byDims :: Map Dimensions [SubmissionPacket]
byDims = [SubmissionPacket]
-> (SubmissionPacket -> Dimensions)
-> (SubmissionPacket -> SubmissionPacket)
-> Map Dimensions [SubmissionPacket]
forall b a c. Ord b => [a] -> (a -> b) -> (a -> c) -> Map b [c]
collect [SubmissionPacket]
packets SubmissionPacket -> Dimensions
spDimensions SubmissionPacket -> SubmissionPacket
forall a. a -> a
id
mkAgg :: [SubmissionPacket] -> AggPayload
mkAgg [SubmissionPacket]
xs =
case SubmissionPacket -> Payload
spPayload (SubmissionPacket -> Payload) -> SubmissionPacket -> Payload
forall a b. (a -> b) -> a -> b
$ [SubmissionPacket] -> SubmissionPacket
forall a. [a] -> a
head [SubmissionPacket]
xs of
Samples [Double]
_ -> Stats -> AggPayload
AggStats (Stats -> AggPayload)
-> ([SubmissionPacket] -> Stats)
-> [SubmissionPacket]
-> AggPayload
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Set Quantile -> Sample -> Stats
mkStats Set Quantile
qs (Sample -> Stats)
-> ([SubmissionPacket] -> Sample) -> [SubmissionPacket] -> Stats
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Double] -> Sample
forall a. Unbox a => [a] -> Vector a
V.fromList ([Double] -> Sample)
-> ([SubmissionPacket] -> [Double]) -> [SubmissionPacket] -> Sample
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
(SubmissionPacket -> [Double]) -> [SubmissionPacket] -> [Double]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (Payload -> [Double]
unSamples (Payload -> [Double])
-> (SubmissionPacket -> Payload) -> SubmissionPacket -> [Double]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SubmissionPacket -> Payload
spPayload) ([SubmissionPacket] -> AggPayload)
-> [SubmissionPacket] -> AggPayload
forall a b. (a -> b) -> a -> b
$
[SubmissionPacket]
xs
Counter Int
_ -> Int -> AggPayload
AggCount (Int -> AggPayload)
-> ([SubmissionPacket] -> Int) -> [SubmissionPacket] -> AggPayload
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Int] -> Int
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int] -> Int)
-> ([SubmissionPacket] -> [Int]) -> [SubmissionPacket] -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
(SubmissionPacket -> Int) -> [SubmissionPacket] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (Payload -> Int
unCounter (Payload -> Int)
-> (SubmissionPacket -> Payload) -> SubmissionPacket -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SubmissionPacket -> Payload
spPayload) ([SubmissionPacket] -> AggPayload)
-> [SubmissionPacket] -> AggPayload
forall a b. (a -> b) -> a -> b
$
[SubmissionPacket]
xs
Double
t <- (Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Double) -> (Double -> Int) -> Double -> Double
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
n) (Int -> Int) -> (Double -> Int) -> Double -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
n) (Int -> Int) -> (Double -> Int) -> Double -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round) (Double -> Double) -> Redis Double -> Redis Double
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
`liftM` IO Double -> Redis Double
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Double
TM.getTime
let aggs :: [Aggregated]
aggs = ((Dimensions, [SubmissionPacket]) -> Aggregated)
-> [(Dimensions, [SubmissionPacket])] -> [Aggregated]
forall a b. (a -> b) -> [a] -> [b]
map (Dimensions, [SubmissionPacket]) -> Aggregated
mkDimsAgg ([(Dimensions, [SubmissionPacket])] -> [Aggregated])
-> [(Dimensions, [SubmissionPacket])] -> [Aggregated]
forall a b. (a -> b) -> a -> b
$ Map Dimensions [SubmissionPacket]
-> [(Dimensions, [SubmissionPacket])]
forall k a. Map k a -> [(k, a)]
M.toList (Map Dimensions [SubmissionPacket]
-> [(Dimensions, [SubmissionPacket])])
-> Map Dimensions [SubmissionPacket]
-> [(Dimensions, [SubmissionPacket])]
forall a b. (a -> b) -> a -> b
$ Map Dimensions [SubmissionPacket]
-> Map Dimensions [SubmissionPacket]
forall packets.
(Monoid packets, Eq packets) =>
Map Dimensions packets -> Map Dimensions packets
expandDims (Map Dimensions [SubmissionPacket]
-> Map Dimensions [SubmissionPacket])
-> Map Dimensions [SubmissionPacket]
-> Map Dimensions [SubmissionPacket]
forall a b. (a -> b) -> a -> b
$ Map Dimensions [SubmissionPacket]
byDims
mkDimsAgg :: (Dimensions, [SubmissionPacket]) -> Aggregated
mkDimsAgg (Dimensions
dims, [SubmissionPacket]
ps) = Double -> MetricName -> AggPayload -> Dimensions -> Aggregated
Aggregated Double
t MetricName
nm ([SubmissionPacket] -> AggPayload
mkAgg [SubmissionPacket]
ps) Dimensions
dims
(Aggregated -> Redis ()) -> [Aggregated] -> Redis ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Aggregated -> Redis ()
f [Aggregated]
aggs
() -> Redis ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
where
quantilesFn :: MetricName -> Set Quantile
quantilesFn = AggProcessConfig -> MetricName -> Set Quantile
metricQuantiles AggProcessConfig
cfg
expandDims
:: forall packets. (Monoid packets, Eq packets)
=> M.Map Dimensions packets
-> M.Map Dimensions packets
expandDims :: Map Dimensions packets -> Map Dimensions packets
expandDims Map Dimensions packets
m =
Map Dimensions packets
m Map Dimensions packets
-> Map Dimensions packets -> Map Dimensions packets
forall a. Semigroup a => a -> a -> a
<> Map Dimensions packets
additions Map Dimensions packets
-> Map Dimensions packets -> Map Dimensions packets
forall a. Semigroup a => a -> a -> a
<> Map Dimensions packets
fullAggregation
where
distinctPairs :: Set.Set (DimensionName, DimensionValue)
distinctPairs :: Set (DimensionName, DimensionValue)
distinctPairs = [(DimensionName, DimensionValue)]
-> Set (DimensionName, DimensionValue)
forall a. Ord a => [a] -> Set a
Set.fromList ([[(DimensionName, DimensionValue)]]
-> [(DimensionName, DimensionValue)]
forall a. Monoid a => [a] -> a
mconcat (Dimensions -> [(DimensionName, DimensionValue)]
forall k a. Map k a -> [(k, a)]
M.toList (Dimensions -> [(DimensionName, DimensionValue)])
-> [Dimensions] -> [[(DimensionName, DimensionValue)]]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map Dimensions packets -> [Dimensions]
forall k a. Map k a -> [k]
M.keys Map Dimensions packets
m))
additions :: Map Dimensions packets
additions = ((DimensionName, DimensionValue) -> Map Dimensions packets)
-> Set (DimensionName, DimensionValue) -> Map Dimensions packets
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (DimensionName, DimensionValue) -> Map Dimensions packets
mkIsolatedMap Set (DimensionName, DimensionValue)
distinctPairs
mkIsolatedMap :: (DimensionName, DimensionValue) -> M.Map Dimensions packets
mkIsolatedMap :: (DimensionName, DimensionValue) -> Map Dimensions packets
mkIsolatedMap (DimensionName, DimensionValue)
dPair =
let matches :: [packets]
matches = ((DimensionName, DimensionValue), packets) -> packets
forall a b. (a, b) -> b
snd (((DimensionName, DimensionValue), packets) -> packets)
-> [((DimensionName, DimensionValue), packets)] -> [packets]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (((DimensionName, DimensionValue), packets) -> Bool)
-> [((DimensionName, DimensionValue), packets)]
-> [((DimensionName, DimensionValue), packets)]
forall a. (a -> Bool) -> [a] -> [a]
filter (((DimensionName, DimensionValue)
-> (DimensionName, DimensionValue) -> Bool
forall a. Eq a => a -> a -> Bool
== (DimensionName, DimensionValue)
dPair) ((DimensionName, DimensionValue) -> Bool)
-> (((DimensionName, DimensionValue), packets)
-> (DimensionName, DimensionValue))
-> ((DimensionName, DimensionValue), packets)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((DimensionName, DimensionValue), packets)
-> (DimensionName, DimensionValue)
forall a b. (a, b) -> a
fst) [((DimensionName, DimensionValue), packets)]
mFlat
in if [packets]
matches [packets] -> [packets] -> Bool
forall a. Eq a => a -> a -> Bool
== [packets]
forall a. Monoid a => a
mempty
then Map Dimensions packets
forall a. Monoid a => a
mempty
else Dimensions -> packets -> Map Dimensions packets
forall k a. k -> a -> Map k a
M.singleton ((DimensionName -> DimensionValue -> Dimensions)
-> (DimensionName, DimensionValue) -> Dimensions
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry DimensionName -> DimensionValue -> Dimensions
forall k a. k -> a -> Map k a
M.singleton (DimensionName, DimensionValue)
dPair) ([packets] -> packets
forall a. Monoid a => [a] -> a
mconcat [packets]
matches)
mFlat :: [((DimensionName, DimensionValue), packets)]
mFlat :: [((DimensionName, DimensionValue), packets)]
mFlat = [ ((DimensionName
dn, DimensionValue
dv), packets
packets)
| (Dimensions
dimensionsMap, packets
packets) <- Map Dimensions packets -> [(Dimensions, packets)]
forall k a. Map k a -> [(k, a)]
M.toList Map Dimensions packets
m
, (DimensionName
dn, DimensionValue
dv) <- Dimensions -> [(DimensionName, DimensionValue)]
forall k a. Map k a -> [(k, a)]
M.toList Dimensions
dimensionsMap]
fullAggregation :: Map Dimensions packets
fullAggregation = Dimensions -> packets -> Map Dimensions packets
forall k a. k -> a -> Map k a
M.singleton Dimensions
forall a. Monoid a => a
mempty ([packets] -> packets
forall a. Monoid a => [a] -> a
mconcat (Map Dimensions packets -> [packets]
forall k a. Map k a -> [a]
M.elems Map Dimensions packets
m))
data AggProcess = AggProcess
{ AggProcess -> AggProcessConfig
apConfig :: AggProcessConfig
, AggProcess -> Aggregated -> Redis ()
apProc :: Aggregated -> Redis ()
}
instance Semigroup.Semigroup AggProcess where
(AggProcess AggProcessConfig
cfg1 Aggregated -> Redis ()
prc1) <> :: AggProcess -> AggProcess -> AggProcess
<> (AggProcess AggProcessConfig
cfg2 Aggregated -> Redis ()
prc2) =
AggProcessConfig -> (Aggregated -> Redis ()) -> AggProcess
AggProcess (AggProcessConfig
cfg1 AggProcessConfig -> AggProcessConfig -> AggProcessConfig
forall a. Semigroup a => a -> a -> a
<> AggProcessConfig
cfg2) (\Aggregated
agg -> Aggregated -> Redis ()
prc1 Aggregated
agg Redis () -> Redis () -> Redis ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Aggregated -> Redis ()
prc2 Aggregated
agg)
instance Monoid AggProcess where
mempty :: AggProcess
mempty = AggProcessConfig -> (Aggregated -> Redis ()) -> AggProcess
AggProcess AggProcessConfig
forall a. Monoid a => a
mempty (Redis () -> Aggregated -> Redis ()
forall a b. a -> b -> a
const (() -> Redis ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()))
mappend :: AggProcess -> AggProcess -> AggProcess
mappend = AggProcess -> AggProcess -> AggProcess
forall a. Semigroup a => a -> a -> a
(<>)
data AggProcessConfig = AggProcessConfig
{ AggProcessConfig -> MetricName -> Set Quantile
metricQuantiles :: MetricName -> Set.Set Quantile
}
instance Semigroup AggProcessConfig where
AggProcessConfig MetricName -> Set Quantile
f1 <> :: AggProcessConfig -> AggProcessConfig -> AggProcessConfig
<> AggProcessConfig MetricName -> Set Quantile
f2 =
let f3 :: MetricName -> Set Quantile
f3 = MetricName -> Set Quantile
f1 (MetricName -> Set Quantile)
-> (MetricName -> Set Quantile) -> MetricName -> Set Quantile
forall a. Semigroup a => a -> a -> a
<> MetricName -> Set Quantile
f2
in (MetricName -> Set Quantile) -> AggProcessConfig
AggProcessConfig MetricName -> Set Quantile
f3
instance Monoid AggProcessConfig where
mempty :: AggProcessConfig
mempty = (MetricName -> Set Quantile) -> AggProcessConfig
AggProcessConfig MetricName -> Set Quantile
forall a. Monoid a => a
mempty
mappend :: AggProcessConfig -> AggProcessConfig -> AggProcessConfig
mappend = AggProcessConfig -> AggProcessConfig -> AggProcessConfig
forall a. Semigroup a => a -> a -> a
(<>)
defAggProcessConfig :: AggProcessConfig
defAggProcessConfig :: AggProcessConfig
defAggProcessConfig = (MetricName -> Set Quantile) -> AggProcessConfig
AggProcessConfig MetricName -> Set Quantile
standardQuantiles
instance Default AggProcessConfig where
def :: AggProcessConfig
def = AggProcessConfig
defAggProcessConfig
noQuantiles :: MetricName -> Set.Set Quantile
noQuantiles :: MetricName -> Set Quantile
noQuantiles = Set Quantile -> MetricName -> Set Quantile
forall a b. a -> b -> a
const Set Quantile
forall a. Monoid a => a
mempty
standardQuantiles :: MetricName -> Set.Set Quantile
standardQuantiles :: MetricName -> Set Quantile
standardQuantiles MetricName
_ =
[Quantile] -> Set Quantile
forall a. Ord a => [a] -> Set a
Set.fromList [Int -> Quantile
Q Int
10,Int -> Quantile
Q Int
20,Int -> Quantile
Q Int
30,Int -> Quantile
Q Int
40,Int -> Quantile
Q Int
50,Int -> Quantile
Q Int
60,Int -> Quantile
Q Int
70,Int -> Quantile
Q Int
80,Int -> Quantile
Q Int
90,Int -> Quantile
Q Int
99]
quantileMap
:: M.Map MetricName (Set.Set Quantile)
-> Set.Set Quantile
-> (MetricName -> Set.Set Quantile)
quantileMap :: Map MetricName (Set Quantile)
-> Set Quantile -> MetricName -> Set Quantile
quantileMap Map MetricName (Set Quantile)
m Set Quantile
qdef MetricName
mn = Set Quantile -> Maybe (Set Quantile) -> Set Quantile
forall a. a -> Maybe a -> a
fromMaybe Set Quantile
qdef (MetricName -> Map MetricName (Set Quantile) -> Maybe (Set Quantile)
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup MetricName
mn Map MetricName (Set Quantile)
m)
putAggregateCSV :: Handle -> AggProcessConfig -> AggProcess
putAggregateCSV :: Handle -> AggProcessConfig -> AggProcess
putAggregateCSV Handle
h AggProcessConfig
cfg = AggProcessConfig -> (Aggregated -> Redis ()) -> AggProcess
AggProcess AggProcessConfig
cfg ((Aggregated -> Redis ()) -> AggProcess)
-> (Aggregated -> Redis ()) -> AggProcess
forall a b. (a -> b) -> a -> b
$ \Aggregated
agg ->
let d :: Text
d = CSVSettings -> Map Text Text -> Text
forall s r. CSV s r => CSVSettings -> r -> s
rowToStr CSVSettings
defCSVSettings (Map Text Text -> Text) -> Map Text Text -> Text
forall a b. (a -> b) -> a -> b
$ Aggregated -> Map Text Text
aggToCSV Aggregated
agg
in IO () -> Redis ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Redis ()) -> IO () -> Redis ()
forall a b. (a -> b) -> a -> b
$ Handle -> Text -> IO ()
T.hPutStrLn Handle
h Text
d
typePrefix :: AggPayload -> T.Text
typePrefix :: AggPayload -> Text
typePrefix AggStats{} = Text
"samples"
typePrefix AggCount{} = Text
"counts"
putAggregateGraphite :: Handle -> AggProcessConfig -> AggProcess
putAggregateGraphite :: Handle -> AggProcessConfig -> AggProcess
putAggregateGraphite Handle
h AggProcessConfig
cfg = AggProcessConfig -> (Aggregated -> Redis ()) -> AggProcess
AggProcess AggProcessConfig
cfg ((Aggregated -> Redis ()) -> AggProcess)
-> (Aggregated -> Redis ()) -> AggProcess
forall a b. (a -> b) -> a -> b
$ \Aggregated
agg ->
let ([(Text, Text)]
ss, Text
ts) = Aggregated -> ([(Text, Text)], Text)
mkStatsFields Aggregated
agg
mkLines :: (Text, Text) -> [Text]
mkLines (Text
m, Text
val) = [(DimensionName, DimensionValue)]
-> ((DimensionName, DimensionValue) -> Text) -> [Text]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
for (Dimensions -> [(DimensionName, DimensionValue)]
forall k a. Map k a -> [(k, a)]
M.toList (Aggregated -> Dimensions
aggDimensions Aggregated
agg)) (((DimensionName, DimensionValue) -> Text) -> [Text])
-> ((DimensionName, DimensionValue) -> Text) -> [Text]
forall a b. (a -> b) -> a -> b
$ \(DimensionName Text
dimName, DimensionValue Text
dimVal) -> [Text] -> Text
T.concat
[ Text
"inst."
, AggPayload -> Text
typePrefix (Aggregated -> AggPayload
aggPayload Aggregated
agg), Text
"."
, FilePath -> Text
T.pack (MetricName -> FilePath
metricName (Aggregated -> MetricName
aggName Aggregated
agg)), Text
"."
, Text
m, Text
"."
, Text
dimName, Text
"."
, Text
dimVal, Text
" "
, Text
val, Text
" "
, Text
ts ]
in IO () -> Redis ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Redis ()) -> IO () -> Redis ()
forall a b. (a -> b) -> a -> b
$ ((Text, Text) -> IO ()) -> [(Text, Text)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((Text -> IO ()) -> [Text] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Handle -> Text -> IO ()
T.hPutStrLn Handle
h) ([Text] -> IO ())
-> ((Text, Text) -> [Text]) -> (Text, Text) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text, Text) -> [Text]
mkLines) [(Text, Text)]
ss
popLAll :: (Serialize a, SC.SafeCopy a) => B.ByteString -> Redis [a]
popLAll :: ByteString -> Redis [a]
popLAll ByteString
k = do
[a]
res <- ByteString -> Int -> Redis [a]
forall a.
(Serialize a, SafeCopy a) =>
ByteString -> Int -> Redis [a]
popLMany ByteString
k Int
100
case [a]
res of
[] -> [a] -> Redis [a]
forall (m :: * -> *) a. Monad m => a -> m a
return [a]
res
[a]
_ -> ([a]
res [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ ) ([a] -> [a]) -> Redis [a] -> Redis [a]
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
`liftM` ByteString -> Redis [a]
forall a. (Serialize a, SafeCopy a) => ByteString -> Redis [a]
popLAll ByteString
k
popLMany :: (Serialize a, SC.SafeCopy a) => B.ByteString -> Int -> Redis [a]
popLMany :: ByteString -> Int -> Redis [a]
popLMany ByteString
k Int
n = do
[Either Reply (Maybe ByteString)]
res <- Int
-> Redis (Either Reply (Maybe ByteString))
-> Redis [Either Reply (Maybe ByteString)]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
n Redis (Either Reply (Maybe ByteString))
pop
case [Either Reply (Maybe ByteString)]
-> Either Reply [Maybe ByteString]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence [Either Reply (Maybe ByteString)]
res of
Left Reply
_ -> [a] -> Redis [a]
forall (m :: * -> *) a. Monad m => a -> m a
return []
Right [Maybe ByteString]
xs -> [a] -> Redis [a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([a] -> Redis [a]) -> [a] -> Redis [a]
forall a b. (a -> b) -> a -> b
$ (ByteString -> Maybe a) -> [ByteString] -> [a]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe ByteString -> Maybe a
forall b. (SafeCopy b, Serialize b) => ByteString -> Maybe b
conv ([ByteString] -> [a]) -> [ByteString] -> [a]
forall a b. (a -> b) -> a -> b
$ [Maybe ByteString] -> [ByteString]
forall a. [Maybe a] -> [a]
catMaybes [Maybe ByteString]
xs
where
pop :: Redis (Either Reply (Maybe ByteString))
pop = ByteString -> Redis (Either Reply (Maybe ByteString))
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f (Maybe ByteString))
R.lpop ByteString
k
conv :: ByteString -> Maybe b
conv ByteString
x = Either FilePath b -> Maybe b
forall a b. Either a b -> Maybe b
hush (Either FilePath b -> Maybe b) -> Either FilePath b -> Maybe b
forall a b. (a -> b) -> a -> b
$ ByteString -> Either FilePath b
forall a.
(SafeCopy a, Serialize a) =>
ByteString -> Either FilePath a
decodeCompress ByteString
x
dbg :: (Monad m) => String -> m ()
dbg :: FilePath -> m ()
dbg FilePath
_ = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
aggToCSV :: Aggregated -> M.Map T.Text T.Text
aggToCSV :: Aggregated -> Map Text Text
aggToCSV agg :: Aggregated
agg@Aggregated{Double
Dimensions
AggPayload
MetricName
aggTS :: Aggregated -> Double
aggDimensions :: Dimensions
aggPayload :: AggPayload
aggName :: MetricName
aggTS :: Double
aggName :: Aggregated -> MetricName
aggPayload :: Aggregated -> AggPayload
aggDimensions :: Aggregated -> Dimensions
..} = Map Text Text
els Map Text Text -> Map Text Text -> Map Text Text
forall a. Semigroup a => a -> a -> a
<> Map Text Text
defFields Map Text Text -> Map Text Text -> Map Text Text
forall a. Semigroup a => a -> a -> a
<> Map Text Text
dimFields
where
els :: MapRow T.Text
els :: Map Text Text
els = [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList ([(Text, Text)] -> Map Text Text)
-> [(Text, Text)] -> Map Text Text
forall a b. (a -> b) -> a -> b
$
(Text
"metric", FilePath -> Text
T.pack (MetricName -> FilePath
metricName MetricName
aggName)) (Text, Text) -> [(Text, Text)] -> [(Text, Text)]
forall a. a -> [a] -> [a]
:
(Text
"timestamp", Text
ts) (Text, Text) -> [(Text, Text)] -> [(Text, Text)]
forall a. a -> [a] -> [a]
:
[(Text, Text)]
fields
([(Text, Text)]
fields, Text
ts) = Aggregated -> ([(Text, Text)], Text)
mkStatsFields Aggregated
agg
defFields :: Map Text Text
defFields = [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList ([(Text, Text)] -> Map Text Text)
-> [(Text, Text)] -> Map Text Text
forall a b. (a -> b) -> a -> b
$ ([(Text, Text)], Text) -> [(Text, Text)]
forall a b. (a, b) -> a
fst (([(Text, Text)], Text) -> [(Text, Text)])
-> ([(Text, Text)], Text) -> [(Text, Text)]
forall a b. (a -> b) -> a -> b
$ Aggregated -> ([(Text, Text)], Text)
mkStatsFields (Aggregated -> ([(Text, Text)], Text))
-> Aggregated -> ([(Text, Text)], Text)
forall a b. (a -> b) -> a -> b
$ Aggregated
agg { aggPayload :: AggPayload
aggPayload = (Stats -> AggPayload
AggStats Stats
forall a. Default a => a
def) }
dimFields :: Map Text Text
dimFields = [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [(Text
k,Text
v) | (DimensionName Text
k, DimensionValue Text
v) <- Dimensions -> [(DimensionName, DimensionValue)]
forall k a. Map k a -> [(k, a)]
M.toList Dimensions
aggDimensions]
mkStatsFields :: Aggregated -> ([(T.Text, T.Text)], T.Text)
mkStatsFields :: Aggregated -> ([(Text, Text)], Text)
mkStatsFields Aggregated{Double
Dimensions
AggPayload
MetricName
aggDimensions :: Dimensions
aggPayload :: AggPayload
aggName :: MetricName
aggTS :: Double
aggTS :: Aggregated -> Double
aggName :: Aggregated -> MetricName
aggPayload :: Aggregated -> AggPayload
aggDimensions :: Aggregated -> Dimensions
..} = ([(Text, Text)]
els, Text
ts)
where
els :: [(Text, Text)]
els =
case AggPayload
aggPayload of
AggStats Stats{Double
Int
Map Int Double
squantiles :: Map Int Double
skurtosis :: Double
sskewness :: Double
sstdev :: Double
srange :: Double
smin :: Double
smax :: Double
scount :: Int
ssum :: Double
smean :: Double
squantiles :: Stats -> Map Int Double
skurtosis :: Stats -> Double
sskewness :: Stats -> Double
sstdev :: Stats -> Double
srange :: Stats -> Double
smin :: Stats -> Double
smax :: Stats -> Double
scount :: Stats -> Int
ssum :: Stats -> Double
smean :: Stats -> Double
..} ->
[ (Text
"mean", Int -> Bool -> Double -> Text
forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
smean)
, (Text
"count", Int -> Text
forall a. Show a => a -> Text
showT Int
scount)
, (Text
"max", Int -> Bool -> Double -> Text
forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
smax)
, (Text
"min", Int -> Bool -> Double -> Text
forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
smin)
, (Text
"srange", Int -> Bool -> Double -> Text
forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
srange)
, (Text
"stdDev", Int -> Bool -> Double -> Text
forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
sstdev)
, (Text
"sum", Int -> Bool -> Double -> Text
forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
ssum)
, (Text
"skewness", Int -> Bool -> Double -> Text
forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
sskewness)
, (Text
"kurtosis", Int -> Bool -> Double -> Text
forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
skurtosis)
] [(Text, Text)] -> [(Text, Text)] -> [(Text, Text)]
forall a. [a] -> [a] -> [a]
++ (((Int, Double) -> (Text, Text))
-> [(Int, Double)] -> [(Text, Text)]
forall a b. (a -> b) -> [a] -> [b]
map (Int, Double) -> (Text, Text)
forall a a. (Show a, RealFloat a) => (a, a) -> (Text, Text)
mkQ ([(Int, Double)] -> [(Text, Text)])
-> [(Int, Double)] -> [(Text, Text)]
forall a b. (a -> b) -> a -> b
$ Map Int Double -> [(Int, Double)]
forall k a. Map k a -> [(k, a)]
M.toList Map Int Double
squantiles)
AggCount Int
i ->
[ (Text
"count", Int -> Text
forall a. Show a => a -> Text
showT Int
i)]
mkQ :: (a, a) -> (Text, Text)
mkQ (a
k,a
v) = ([Text] -> Text
T.concat [Text
"percentile_", a -> Text
forall a. Show a => a -> Text
showT a
k], Int -> Bool -> a -> Text
forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False a
v)
ts :: Text
ts = Double -> Text
forall a. RealFrac a => a -> Text
formatInt Double
aggTS