From cd8cba2ceb736e61a6a3e4e989cea6cfc37c2297 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Rabenstein?= Date: Thu, 19 Oct 2023 15:22:02 +0200 Subject: [PATCH] histograms: Add timer to reset ASAP after bucket limiting has happened (#1367) Fixes #1248. See issue description for all the details. Signed-off-by: beorn7 --- prometheus/histogram.go | 56 ++++++++++++++++++++++++++++++++---- prometheus/histogram_test.go | 36 ++++++++++++++++------- 2 files changed, 76 insertions(+), 16 deletions(-) diff --git a/prometheus/histogram.go b/prometheus/histogram.go index 1feba62..b5c8bcb 100644 --- a/prometheus/histogram.go +++ b/prometheus/histogram.go @@ -475,6 +475,9 @@ type HistogramOpts struct { // now is for testing purposes, by default it's time.Now. now func() time.Time + + // afterFunc is for testing purposes, by default it's time.AfterFunc. + afterFunc func(time.Duration, func()) *time.Timer } // HistogramVecOpts bundles the options to create a HistogramVec metric. @@ -526,7 +529,9 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr if opts.now == nil { opts.now = time.Now } - + if opts.afterFunc == nil { + opts.afterFunc = time.AfterFunc + } h := &histogram{ desc: desc, upperBounds: opts.Buckets, @@ -536,6 +541,7 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr nativeHistogramMinResetDuration: opts.NativeHistogramMinResetDuration, lastResetTime: opts.now(), now: opts.now, + afterFunc: opts.afterFunc, } if len(h.upperBounds) == 0 && opts.NativeHistogramBucketFactor <= 1 { h.upperBounds = DefBuckets @@ -716,9 +722,16 @@ type histogram struct { nativeHistogramMinResetDuration time.Duration // lastResetTime is protected by mtx. It is also used as created timestamp. lastResetTime time.Time + // resetScheduled is protected by mtx. It is true if a reset is + // scheduled for a later time (when nativeHistogramMinResetDuration has + // passed). + resetScheduled bool // now is for testing purposes, by default it's time.Now. now func() time.Time + + // afterFunc is for testing purposes, by default it's time.AfterFunc. + afterFunc func(time.Duration, func()) *time.Timer } func (h *histogram) Desc() *Desc { @@ -874,21 +887,31 @@ func (h *histogram) limitBuckets(counts *histogramCounts, value float64, bucket if h.maybeReset(hotCounts, coldCounts, coldIdx, value, bucket) { return } + // One of the other strategies will happen. To undo what they will do as + // soon as enough time has passed to satisfy + // h.nativeHistogramMinResetDuration, schedule a reset at the right time + // if we haven't done so already. + if h.nativeHistogramMinResetDuration > 0 && !h.resetScheduled { + h.resetScheduled = true + h.afterFunc(h.nativeHistogramMinResetDuration-h.now().Sub(h.lastResetTime), h.reset) + } + if h.maybeWidenZeroBucket(hotCounts, coldCounts) { return } h.doubleBucketWidth(hotCounts, coldCounts) } -// maybeReset resets the whole histogram if at least h.nativeHistogramMinResetDuration -// has been passed. It returns true if the histogram has been reset. The caller -// must have locked h.mtx. +// maybeReset resets the whole histogram if at least +// h.nativeHistogramMinResetDuration has been passed. It returns true if the +// histogram has been reset. The caller must have locked h.mtx. func (h *histogram) maybeReset( hot, cold *histogramCounts, coldIdx uint64, value float64, bucket int, ) bool { // We are using the possibly mocked h.now() rather than // time.Since(h.lastResetTime) to enable testing. - if h.nativeHistogramMinResetDuration == 0 || + if h.nativeHistogramMinResetDuration == 0 || // No reset configured. + h.resetScheduled || // Do not interefere if a reset is already scheduled. h.now().Sub(h.lastResetTime) < h.nativeHistogramMinResetDuration { return false } @@ -906,6 +929,29 @@ func (h *histogram) maybeReset( return true } +// reset resets the whole histogram. It locks h.mtx itself, i.e. it has to be +// called without having locked h.mtx. +func (h *histogram) reset() { + h.mtx.Lock() + defer h.mtx.Unlock() + + n := atomic.LoadUint64(&h.countAndHotIdx) + hotIdx := n >> 63 + coldIdx := (^n) >> 63 + hot := h.counts[hotIdx] + cold := h.counts[coldIdx] + // Completely reset coldCounts. + h.resetCounts(cold) + // Make coldCounts the new hot counts while resetting countAndHotIdx. + n = atomic.SwapUint64(&h.countAndHotIdx, coldIdx<<63) + count := n & ((1 << 63) - 1) + waitForCooldown(count, hot) + // Finally, reset the formerly hot counts, too. + h.resetCounts(hot) + h.lastResetTime = h.now() + h.resetScheduled = false +} + // maybeWidenZeroBucket widens the zero bucket until it includes the existing // buckets closest to the zero bucket (which could be two, if an equidistant // negative and a positive bucket exists, but usually it's only one bucket to be diff --git a/prometheus/histogram_test.go b/prometheus/histogram_test.go index 60b43fd..413b3f8 100644 --- a/prometheus/histogram_test.go +++ b/prometheus/histogram_test.go @@ -925,16 +925,16 @@ func TestNativeHistogram(t *testing.T) { maxBuckets: 4, minResetDuration: 9 * time.Minute, want: &dto.Histogram{ - SampleCount: proto.Uint64(2), - SampleSum: proto.Float64(7), + SampleCount: proto.Uint64(3), + SampleSum: proto.Float64(12.1), Schema: proto.Int32(2), ZeroThreshold: proto.Float64(2.938735877055719e-39), ZeroCount: proto.Uint64(0), PositiveSpan: []*dto.BucketSpan{ - {Offset: proto.Int32(7), Length: proto.Uint32(2)}, + {Offset: proto.Int32(7), Length: proto.Uint32(4)}, }, - PositiveDelta: []int64{1, 0}, - CreatedTimestamp: timestamppb.New(now.Add(10 * time.Minute)), // We expect reset to happen after 9 minutes. + PositiveDelta: []int64{1, 0, -1, 1}, + CreatedTimestamp: timestamppb.New(now.Add(9 * time.Minute)), // We expect reset to happen after 8 minutes. }, }, { @@ -945,23 +945,27 @@ func TestNativeHistogram(t *testing.T) { maxZeroThreshold: 1.2, minResetDuration: 9 * time.Minute, want: &dto.Histogram{ - SampleCount: proto.Uint64(2), - SampleSum: proto.Float64(7), + SampleCount: proto.Uint64(3), + SampleSum: proto.Float64(12.1), Schema: proto.Int32(2), ZeroThreshold: proto.Float64(2.938735877055719e-39), ZeroCount: proto.Uint64(0), PositiveSpan: []*dto.BucketSpan{ - {Offset: proto.Int32(7), Length: proto.Uint32(2)}, + {Offset: proto.Int32(7), Length: proto.Uint32(4)}, }, - PositiveDelta: []int64{1, 0}, - CreatedTimestamp: timestamppb.New(now.Add(10 * time.Minute)), // We expect reset to happen after 9 minutes. + PositiveDelta: []int64{1, 0, -1, 1}, + CreatedTimestamp: timestamppb.New(now.Add(9 * time.Minute)), // We expect reset to happen after 8 minutes. }, }, } for _, s := range scenarios { t.Run(s.name, func(t *testing.T) { - ts := now + var ( + ts = now + funcToCall func() + whenToCall time.Duration + ) his := NewHistogram(HistogramOpts{ Name: "name", @@ -972,12 +976,22 @@ func TestNativeHistogram(t *testing.T) { NativeHistogramMinResetDuration: s.minResetDuration, NativeHistogramMaxZeroThreshold: s.maxZeroThreshold, now: func() time.Time { return ts }, + afterFunc: func(d time.Duration, f func()) *time.Timer { + funcToCall = f + whenToCall = d + return nil + }, }) ts = ts.Add(time.Minute) for _, o := range s.observations { his.Observe(o) ts = ts.Add(time.Minute) + whenToCall -= time.Minute + if funcToCall != nil && whenToCall <= 0 { + funcToCall() + funcToCall = nil + } } m := &dto.Metric{} if err := his.Write(m); err != nil {