module Data.Metrics.Reservoir.ExponentiallyDecaying (
ExponentiallyDecayingReservoir,
standardReservoir,
reservoir,
clear,
size,
snapshot,
rescale,
update
) where
import Control.Monad.Primitive
import Control.Monad.ST
import Data.Time.Clock
import Data.Time.Clock.POSIX
import Data.Metrics.Internal
import qualified Data.Metrics.Reservoir as R
import qualified Data.Map.Strict as M
import Data.Metrics.Snapshot (Snapshot(..), takeSnapshot)
import Data.Primitive.MutVar
import qualified Data.Vector.Unboxed as V
import Data.Word
import System.Posix.Time
import System.Posix.Types
import System.Random.MWC
rescaleThreshold :: Word64
rescaleThreshold = 60 * 60
data ExponentiallyDecayingReservoir = ExponentiallyDecayingReservoir
{ _edrSize :: !Int
, _edrAlpha :: !Double
, _edrRescaleThreshold :: !Word64
, _edrReservoir :: !(M.Map Double Double)
, _edrCount :: !Int
, _edrStartTime :: !Word64
, _edrNextScaleTime :: !Word64
, _edrSeed :: !Seed
} deriving (Show)
standardReservoir :: NominalDiffTime -> Seed -> R.Reservoir
standardReservoir = reservoir 0.015 1028
reservoir :: Double
-> Int
-> NominalDiffTime
-> Seed -> R.Reservoir
reservoir a r t s = R.Reservoir
{ R._reservoirClear = clear
, R._reservoirSize = size
, R._reservoirSnapshot = snapshot
, R._reservoirUpdate = update
, R._reservoirState = ExponentiallyDecayingReservoir r a rescaleThreshold M.empty 0 c c' s
}
where
c = truncate t
c' = c + rescaleThreshold
clear :: NominalDiffTime -> ExponentiallyDecayingReservoir -> ExponentiallyDecayingReservoir
clear = go
where
go t c = c { _edrStartTime = t', _edrNextScaleTime = t'', _edrCount = 0, _edrReservoir = M.empty }
where
t' = truncate t
t'' = t' + _edrRescaleThreshold c
size :: ExponentiallyDecayingReservoir -> Int
size = go
where
go r = min c s
where
c = _edrCount r
s = _edrSize r
snapshot :: ExponentiallyDecayingReservoir -> Snapshot
snapshot r = runST $ do
let svals = V.fromList $ M.elems $ _edrReservoir $ r
mvals <- V.unsafeThaw svals
takeSnapshot mvals
weight :: Double -> Word64 -> Double
weight alpha t = exp (alpha * fromIntegral t)
rescale :: Word64 -> ExponentiallyDecayingReservoir -> ExponentiallyDecayingReservoir
rescale now c = c
{ _edrReservoir = adjustedReservoir
, _edrStartTime = now
, _edrCount = M.size adjustedReservoir
, _edrNextScaleTime = st
}
where
potentialScaleTime = now + rescaleThreshold
currentScaleTime = _edrNextScaleTime c
st = if potentialScaleTime > currentScaleTime then potentialScaleTime else currentScaleTime
diff = now _edrStartTime c
adjustKey x = x * exp (alpha * fromIntegral diff)
adjustedReservoir = M.mapKeys adjustKey $ _edrReservoir c
alpha = _edrAlpha c
update :: Double
-> NominalDiffTime
-> ExponentiallyDecayingReservoir
-> ExponentiallyDecayingReservoir
update v t c = rescaled
{ _edrSeed = s'
, _edrCount = newCount
, _edrReservoir = addValue r
}
where
rescaled = if seconds >= _edrNextScaleTime c
then rescale seconds c
else c
seconds = truncate t
priority = weight (_edrAlpha c) (seconds _edrStartTime c) / priorityDenom
addValue r = if newCount <= _edrSize c
then M.insert priority v r
else if firstKey < priority
then M.delete firstKey $ M.insertWith const priority v r
else r
r = _edrReservoir c
firstKey = head $ M.keys r
newCount = 1 + _edrCount c
(priorityDenom, s') = runST $ do
g <- restore $ _edrSeed c
p <- uniform g
s' <- save g
return (p :: Double, s')