module System.Remote.Monitoring.CloudWatch
( CloudWatchId
, cloudWatchThreadId
, forkCloudWatch
, CloudWatchEnv(..)
, defaultCloudWatchEnv
) where
import Control.Concurrent (ThreadId, forkFinally,
myThreadId, threadDelay)
import Control.Exception (Exception, throwTo,
toException)
import Control.Lens ((&), (.~), (?~))
import Control.Monad (forM_, guard, unless,
void)
import qualified Data.ByteString as BS
import qualified Data.HashMap.Strict as Map
import Data.Int (Int64)
import Data.List (foldl')
import Data.List.NonEmpty (NonEmpty (..))
import Data.Maybe (mapMaybe)
import Data.Text (Text)
import qualified Data.Text as Text
import qualified Data.Text.IO as Text
import Data.Time (NominalDiffTime)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Data.Traversable (for)
import Network.AWS as AWS
import Network.AWS.CloudWatch as AWS
import Network.AWS.Data.ByteString (toBS)
import Network.AWS.Data.Query (toQueryList)
import System.IO (stderr)
import qualified System.Metrics as Metrics
import qualified System.Metrics.Distribution.Internal as Distribution
newtype CloudWatchId = CloudWatchId
{ cloudWatchThreadId :: ThreadId
}
data CloudWatchEnv = CloudWatchEnv
{ cweFlushInterval :: !Int
, cweAwsEnv :: !AWS.Env
, cweDimensions :: ![AWS.Dimension]
, cweNamespace :: !Text
, cweOnError :: !(forall e. Exception e => e -> IO ())
}
defaultCloudWatchEnv :: Text -> AWS.Env -> CloudWatchEnv
defaultCloudWatchEnv namespace x =
CloudWatchEnv
{ cweFlushInterval = 1000
, cweAwsEnv = x
, cweNamespace = namespace
, cweDimensions = []
, cweOnError = defaultOnError
}
defaultOnError :: Exception e => e -> IO ()
defaultOnError =
Text.hPutStrLn stderr . Text.pack . show . toException
swallowOnError :: Exception e => e -> IO ()
swallowOnError _ = pure ()
forkCloudWatch :: CloudWatchEnv -> Metrics.Store -> IO CloudWatchId
forkCloudWatch env store = do
me <- myThreadId
fmap CloudWatchId . forkFinally (loop env store mempty) $ \case
Left e -> throwTo me e
Right _ -> pure ()
loop :: CloudWatchEnv -> Metrics.Store -> Metrics.Sample -> IO ()
loop env store lastSample = do
start <- time
sample <- Metrics.sampleAll store
flushSample env (diffSamples lastSample sample)
end <- time
threadDelay (cweFlushInterval env * 1000 fromIntegral (end start))
loop env store sample
time :: IO Int64
time = round . (* 1000000.0) . toDouble <$> getPOSIXTime
where
toDouble = realToFrac :: NominalDiffTime -> Double
diffSamples :: Metrics.Sample -> Metrics.Sample -> Metrics.Sample
diffSamples prev !curr = Map.foldlWithKey' combine Map.empty curr
where
combine m name new =
case Map.lookup name prev of
Just old ->
case diffMetric old new of
Just val -> Map.insert name val m
Nothing -> m
_ -> Map.insert name new m
diffMetric :: Metrics.Value -> Metrics.Value -> Maybe Metrics.Value
diffMetric (Metrics.Counter n1) (Metrics.Counter n2)
| n1 == n2 = Nothing
| otherwise = Just $! Metrics.Counter $ n2 n1
diffMetric (Metrics.Gauge n1) (Metrics.Gauge n2)
| n1 == n2 = Nothing
| otherwise = Just $ Metrics.Gauge n2
diffMetric (Metrics.Label n1) (Metrics.Label n2)
| n1 == n2 = Nothing
| otherwise = Just $ Metrics.Label n2
diffMetric (Metrics.Distribution d1) (Metrics.Distribution d2)
| Distribution.count d1 == Distribution.count d2 = Nothing
| otherwise =
Just . Metrics.Distribution $
d2 {Distribution.count = Distribution.count d2 Distribution.count d1}
diffMetric _ _ = Nothing
metricToDatum :: [Dimension] -> Text -> Metrics.Value -> Maybe MetricDatum
metricToDatum dim name val = case val of
Metrics.Counter n ->
Just $ mkDatum (mdValue ?~ fromIntegral n)
Metrics.Gauge n ->
Just $ mkDatum (mdValue ?~ fromIntegral n)
Metrics.Distribution d ->
fmap (\dist -> mkDatum (mdStatisticValues ?~ dist)) (conv d)
Metrics.Label l ->
Nothing
where
mkDatum k =
metricDatum name & mdDimensions .~ dim & k
conv :: Distribution.Stats -> Maybe StatisticSet
conv Distribution.Stats {..} = do
guard (count > 0)
pure (statisticSet (fromIntegral count) sum min max)
weighDatum :: MetricDatum -> Int
weighDatum = BS.length . toBS . toQueryList "member" . (:[])
data SplitAcc = SplitAcc
{ splitAccData :: !(NonEmpty [MetricDatum])
, splitAccSize :: !Int
}
splitAt40KB :: [MetricDatum] -> NonEmpty [MetricDatum]
splitAt40KB = splitAccData . foldl' go (SplitAcc ([] :| []) 0)
where
limit = 40000
fudge = 2000
safety = limit fudge
go (SplitAcc (acc :| accs) size) x
| size + weight >= safety =
SplitAcc ((x : acc) :| accs) (size + weight)
| otherwise =
SplitAcc ([x] :| (acc : accs)) 0
where
weight = weighDatum x
flushSample :: CloudWatchEnv -> Metrics.Sample -> IO ()
flushSample CloudWatchEnv{..} = void
. sendMetric
. mapMaybe (uncurry (metricToDatum cweDimensions))
. Map.toList
where
sendMetric :: [MetricDatum] -> IO ()
sendMetric metrics = do
e <- trying _Error . void . runResourceT . runAWS cweAwsEnv .
forM_ (splitAt40KB metrics) $ \metrics ->
unless (null metrics) $
void (send (putMetricData cweNamespace & pmdMetricData .~ metrics))
case e of
Left err -> cweOnError err
Right _ -> pure ()