From d8c7074b1c7d54cf3ad65beff45de79d42901165 Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Tue, 2 Apr 2024 13:20:13 +0800 Subject: [PATCH] refract the implementation Signed-off-by: Ziqi Zhao --- prometheus/histogram.go | 207 ++++++++++++++++++++++++-------- prometheus/histogram_test.go | 221 +++++++++++++++++++++++------------ 2 files changed, 306 insertions(+), 122 deletions(-) diff --git a/prometheus/histogram.go b/prometheus/histogram.go index 6ee6bdb..d247d77 100644 --- a/prometheus/histogram.go +++ b/prometheus/histogram.go @@ -440,7 +440,7 @@ type HistogramOpts struct { // constant (or any negative float value). NativeHistogramZeroThreshold float64 - // The remaining fields define a strategy to limit the number of + // The next three fields define a strategy to limit the number of // populated sparse buckets. If NativeHistogramMaxBucketNumber is left // at zero, the number of buckets is not limited. (Note that this might // lead to unbounded memory consumption if the values observed by the @@ -472,8 +472,22 @@ type HistogramOpts struct { NativeHistogramMaxBucketNumber uint32 NativeHistogramMinResetDuration time.Duration NativeHistogramMaxZeroThreshold float64 - NativeHistogramMaxExemplarCount uint32 - NativeHistogramExemplarTTL time.Duration + + // NativeHistogramMaxExemplars limits the number of exemplars + // that are kept in memory for each native histogram. If you leave it at + // zero, a default value of 10 is used. If no exemplars should be kept specifically + // for native histograms, set it to a negative value. (Scrapers can + // still use the exemplars exposed for classic buckets, which are managed + // independently.) + NativeHistogramMaxExemplars int + // NativeHistogramExemplarTTL is only checked once + // NativeHistogramMaxExemplars is exceeded. In that case, the + // oldest exemplar is removed if it is older than NativeHistogramExemplarTTL. + // Otherwise, the older exemplar in the pair of exemplars that are closest + // together (on an exponential scale) is removed. + // If NativeHistogramExemplarTTL is left at its zero value, a default value of + // 5m is used. To always delete the oldest exemplar, set it to a negative value. + NativeHistogramExemplarTTL time.Duration // now is for testing purposes, by default it's time.Now. now func() time.Time @@ -534,6 +548,7 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr if opts.afterFunc == nil { opts.afterFunc = time.AfterFunc } + h := &histogram{ desc: desc, upperBounds: opts.Buckets, @@ -558,7 +573,7 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr h.nativeHistogramZeroThreshold = DefNativeHistogramZeroThreshold } // Leave h.nativeHistogramZeroThreshold at 0 otherwise. h.nativeHistogramSchema = pickSchema(opts.NativeHistogramBucketFactor) - h.nativeExemplars = newNativeExemplars(opts.NativeHistogramExemplarTTL, opts.NativeHistogramMaxExemplarCount) + h.nativeExemplars = makeNativeExemplars(opts.NativeHistogramExemplarTTL, opts.NativeHistogramMaxExemplars) } for i, upperBound := range h.upperBounds { if i < len(h.upperBounds)-1 { @@ -728,15 +743,14 @@ type histogram struct { // resetScheduled is protected by mtx. It is true if a reset is // scheduled for a later time (when nativeHistogramMinResetDuration has // passed). - resetScheduled bool + resetScheduled bool + nativeExemplars nativeExemplars // now is for testing purposes, by default it's time.Now. now func() time.Time // afterFunc is for testing purposes, by default it's time.AfterFunc. afterFunc func(time.Duration, func()) *time.Timer - - nativeExemplars nativeExemplars } func (h *histogram) Desc() *Desc { @@ -747,6 +761,8 @@ func (h *histogram) Observe(v float64) { h.observe(v, h.findBucket(v)) } +// ObserveWithExemplar should not be called in high-frequency settings, +// since it isn't lock-free for native histograms with configured exemplars. func (h *histogram) ObserveWithExemplar(v float64, e Labels) { i := h.findBucket(v) h.observe(v, i) @@ -827,7 +843,12 @@ func (h *histogram) Write(out *dto.Metric) error { }} } - his.Exemplars = append(his.Exemplars, h.nativeExemplars.exemplars...) + if cap(h.nativeExemplars.exemplars) > 0 { + h.nativeExemplars.Lock() + his.Exemplars = append(his.Exemplars, h.nativeExemplars.exemplars...) + h.nativeExemplars.Unlock() + } + } addAndResetCounts(hotCounts, coldCounts) return nil @@ -1098,8 +1119,10 @@ func (h *histogram) resetCounts(counts *histogramCounts) { deleteSyncMap(&counts.nativeHistogramBucketsPositive) } -// updateExemplar replaces the exemplar for the provided bucket. With empty -// labels, it's a no-op. It panics if any of the labels is invalid. +// updateExemplar replaces the exemplar for the provided classic bucket. +// With empty labels, it's a no-op. It panics if any of the labels is invalid. +// If histogram is native, the exemplar will be cached into nativeExemplars, +// which has a limit, and will remove one exemplar when limit is reached. func (h *histogram) updateExemplar(v float64, bucket int, l Labels) { if l == nil { return @@ -1588,56 +1611,140 @@ func addAndResetCounts(hot, cold *histogramCounts) { } type nativeExemplars struct { - nativeHistogramExemplarTTL time.Duration - nativeHistogramMaxExemplarCount uint32 + sync.Mutex + ttl time.Duration exemplars []*dto.Exemplar - - lock sync.Mutex } -func newNativeExemplars(ttl time.Duration, count uint32) nativeExemplars { +func makeNativeExemplars(ttl time.Duration, maxCount int) nativeExemplars { + if ttl == 0 { + ttl = 5 * time.Minute + } + + if maxCount == 0 { + maxCount = 10 + } + + if maxCount < 0 { + maxCount = 0 + } + return nativeExemplars{ - nativeHistogramExemplarTTL: ttl, - nativeHistogramMaxExemplarCount: count, - exemplars: make([]*dto.Exemplar, 0), - lock: sync.Mutex{}, + ttl: ttl, + exemplars: make([]*dto.Exemplar, 0, maxCount), } } func (n *nativeExemplars) addExemplar(e *dto.Exemplar) { - n.lock.Lock() - defer n.lock.Unlock() - - elogarithm := math.Log(e.GetValue()) - if len(n.exemplars) == int(n.nativeHistogramMaxExemplarCount) { - // check if oldestIndex is beyond TTL, - // if so, find the oldest exemplar, and nearest exemplar - oldestTimestamp := time.Now() - oldestIndex := -1 - nearestValue := -1.0 - nearestIndex := -1 - - for i, exemplar := range n.exemplars { - if exemplar.Timestamp.AsTime().Before(oldestTimestamp) { - oldestTimestamp = exemplar.Timestamp.AsTime() - oldestIndex = i - } - logarithm := math.Log(exemplar.GetValue()) - if nearestValue == -1 || math.Abs(elogarithm-logarithm) < nearestValue { - fmt.Printf("gap: %f", math.Abs(elogarithm-logarithm)) - nearestValue = math.Abs(elogarithm - logarithm) - nearestIndex = i - } - } - - if oldestIndex != -1 && time.Since(oldestTimestamp) > n.nativeHistogramExemplarTTL { - n.exemplars[oldestIndex] = e - } else { - n.exemplars[nearestIndex] = e - } + if cap(n.exemplars) == 0 { return } - n.exemplars = append(n.exemplars, e) + n.Lock() + defer n.Unlock() + + // The index where to insert the new exemplar. + var nIdx int = -1 + + // When the number of exemplars has not yet exceeded or + // is equal to cap(n.exemplars), then + // insert the new exemplar directly. + if len(n.exemplars) < cap(n.exemplars) { + for nIdx = 0; nIdx < len(n.exemplars); nIdx++ { + if *e.Value < *n.exemplars[nIdx].Value { + break + } + } + n.exemplars = append(n.exemplars[:nIdx], append([]*dto.Exemplar{e}, n.exemplars[nIdx:]...)...) + return + } + + // When the number of exemplars exceeds the limit, remove one exemplar. + var ( + rIdx int // The index where to remove the old exemplar. + + ot = time.Now() // Oldest timestamp seen. + otIdx = -1 // Index of the exemplar with the oldest timestamp. + + md = -1.0 // Logarithm of the delta of the closest pair of exemplars. + mdIdx = -1 // Index of the older exemplar within the closest pair. + cLog float64 // Logarithm of the current exemplar. + pLog float64 // Logarithm of the previous exemplar. + ) + + for i, exemplar := range n.exemplars { + // Find the exemplar with the oldest timestamp. + if otIdx == -1 || exemplar.Timestamp.AsTime().Before(ot) { + ot = exemplar.Timestamp.AsTime() + otIdx = i + } + + // Find the index at which to insert new the exemplar. + if *e.Value <= *exemplar.Value && nIdx == -1 { + nIdx = i + } + + // Find the two closest exemplars and pick the one the with older timestamp. + pLog = cLog + cLog = math.Log(exemplar.GetValue()) + if i == 0 { + continue + } + diff := math.Abs(cLog - pLog) + if md == -1 || diff < md { + md = diff + if n.exemplars[i].Timestamp.AsTime().Before(n.exemplars[i-1].Timestamp.AsTime()) { + mdIdx = i + } else { + mdIdx = i - 1 + } + } + + } + + // If all existing exemplar are smaller than new exemplar, + // then the exemplar should be inserted at the end. + if nIdx == -1 { + nIdx = len(n.exemplars) + } + + if otIdx != -1 && time.Since(ot) > n.ttl { + rIdx = otIdx + } else { + // In the previous for loop, when calculating the closest pair of exemplars, + // we did not take into account the newly inserted exemplar. + // So we need to calculate with the newly inserted exemplar again. + elog := math.Log(e.GetValue()) + if nIdx > 0 { + diff := math.Abs(elog - math.Log(n.exemplars[nIdx-1].GetValue())) + if diff < md { + md = diff + mdIdx = nIdx + if n.exemplars[nIdx-1].Timestamp.AsTime().Before(e.Timestamp.AsTime()) { + mdIdx = nIdx - 1 + } + } + } + if nIdx < len(n.exemplars) { + diff := math.Abs(math.Log(n.exemplars[nIdx].GetValue()) - elog) + if diff < md { + mdIdx = nIdx + if n.exemplars[nIdx].Timestamp.AsTime().Before(e.Timestamp.AsTime()) { + mdIdx = nIdx + } + } + } + rIdx = mdIdx + } + + // Adjust the slice according to rIdx and nIdx. + switch { + case rIdx == nIdx: + n.exemplars[nIdx] = e + case rIdx < nIdx: + n.exemplars = append(n.exemplars[:rIdx], append(n.exemplars[rIdx+1:nIdx], append([]*dto.Exemplar{e}, n.exemplars[nIdx:]...)...)...) + case rIdx > nIdx: + n.exemplars = append(n.exemplars[:nIdx], append([]*dto.Exemplar{e}, append(n.exemplars[nIdx:rIdx], n.exemplars[rIdx+1:]...)...)...) + } } diff --git a/prometheus/histogram_test.go b/prometheus/histogram_test.go index 5578094..40ced59 100644 --- a/prometheus/histogram_test.go +++ b/prometheus/histogram_test.go @@ -1273,89 +1273,166 @@ func TestHistogramVecCreatedTimestampWithDeletes(t *testing.T) { } func TestNativeHistogramExemplar(t *testing.T) { - histogram := NewHistogram(HistogramOpts{ - Name: "test", - Help: "test help", - Buckets: []float64{1, 2, 3, 4}, - NativeHistogramBucketFactor: 1.1, - NativeHistogramMaxExemplarCount: 3, - NativeHistogramExemplarTTL: 10 * time.Second, + // Test the histogram with positive NativeHistogramExemplarTTL and NativeHistogramMaxExemplars + h := NewHistogram(HistogramOpts{ + Name: "test", + Help: "test help", + Buckets: []float64{1, 2, 3, 4}, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxExemplars: 3, + NativeHistogramExemplarTTL: 10 * time.Second, }).(*histogram) - // expectedExemplars := []*dto.Exemplar{ - // { - // Label: []*dto.LabelPair{ - // {Name: proto.String("id"), Value: proto.String("1")}, - // }, - // Value: proto.Float64(1), - // }, - // { - // Label: []*dto.LabelPair{ - // {Name: proto.String("id"), Value: proto.String("2")}, - // }, - // Value: proto.Float64(3), - // }, - // { - // Label: []*dto.LabelPair{ - // {Name: proto.String("id"), Value: proto.String("3")}, - // }, - // Value: proto.Float64(5), - // }, - // } - - histogram.ObserveWithExemplar(1, Labels{"id": "1"}) - histogram.ObserveWithExemplar(3, Labels{"id": "1"}) - histogram.ObserveWithExemplar(5, Labels{"id": "1"}) - - if len(histogram.nativeExemplars.exemplars) != 3 { - t.Errorf("the count of exemplars is not 3") + tcs := []struct { + name string + addFunc func(*histogram) + expectedValues []float64 + }{ + { + name: "add exemplars to the limit", + addFunc: func(h *histogram) { + h.ObserveWithExemplar(1, Labels{"id": "1"}) + h.ObserveWithExemplar(3, Labels{"id": "1"}) + h.ObserveWithExemplar(5, Labels{"id": "1"}) + }, + expectedValues: []float64{1, 3, 5}, + }, + { + name: "remove exemplar in closest pair, the removed index equals to inserted index", + addFunc: func(h *histogram) { + h.ObserveWithExemplar(4, Labels{"id": "1"}) + }, + expectedValues: []float64{1, 3, 4}, + }, + { + name: "remove exemplar in closest pair, the removed index is bigger than inserted index", + addFunc: func(h *histogram) { + h.ObserveWithExemplar(0, Labels{"id": "1"}) + }, + expectedValues: []float64{0, 1, 4}, + }, + { + name: "remove exemplar with oldest timestamp, the removed index is smaller than inserted index", + addFunc: func(h *histogram) { + time.Sleep(10 * time.Second) + h.ObserveWithExemplar(6, Labels{"id": "1"}) + }, + expectedValues: []float64{0, 4, 6}, + }, } - expectedValues := map[float64]struct{}{ - 1: {}, - 3: {}, - 5: {}, + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + tc.addFunc(h) + if len(h.nativeExemplars.exemplars) != len(tc.expectedValues) { + t.Errorf("the count of exemplars is not %d", len(tc.expectedValues)) + } + for i, e := range h.nativeExemplars.exemplars { + if e.GetValue() != tc.expectedValues[i] { + t.Errorf("the %dth exemplar value %v is not as expected: %v", i, e.GetValue(), tc.expectedValues[i]) + } + } + }) } - for _, e := range histogram.nativeExemplars.exemplars { - if _, ok := expectedValues[e.GetValue()]; !ok { - t.Errorf("the value is not in expected value") - } + // Test the histogram with negative NativeHistogramExemplarTTL + h = NewHistogram(HistogramOpts{ + Name: "test", + Help: "test help", + Buckets: []float64{1, 2, 3, 4}, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxExemplars: 3, + NativeHistogramExemplarTTL: -1 * time.Second, + }).(*histogram) + + tcs = []struct { + name string + addFunc func(*histogram) + expectedValues []float64 + }{ + { + name: "add exemplars to the limit", + addFunc: func(h *histogram) { + h.ObserveWithExemplar(1, Labels{"id": "1"}) + h.ObserveWithExemplar(3, Labels{"id": "1"}) + h.ObserveWithExemplar(5, Labels{"id": "1"}) + }, + expectedValues: []float64{1, 3, 5}, + }, + { + name: "remove exemplar with oldest timestamp, the removed index is smaller than inserted index", + addFunc: func(h *histogram) { + h.ObserveWithExemplar(4, Labels{"id": "1"}) + }, + expectedValues: []float64{3, 4, 5}, + }, + { + name: "remove exemplar with oldest timestamp, the removed index equals to inserted index", + addFunc: func(h *histogram) { + h.ObserveWithExemplar(0, Labels{"id": "1"}) + }, + expectedValues: []float64{0, 4, 5}, + }, + { + name: "remove exemplar with oldest timestamp, the removed index is bigger than inserted index", + addFunc: func(h *histogram) { + h.ObserveWithExemplar(3, Labels{"id": "1"}) + }, + expectedValues: []float64{0, 3, 4}, + }, } - histogram.ObserveWithExemplar(4, Labels{"id": "1"}) - - if len(histogram.nativeExemplars.exemplars) != 3 { - t.Errorf("the count of exemplars is not 3") + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + tc.addFunc(h) + if len(h.nativeExemplars.exemplars) != len(tc.expectedValues) { + t.Errorf("the count of exemplars is not %d", len(tc.expectedValues)) + } + for i, e := range h.nativeExemplars.exemplars { + if e.GetValue() != tc.expectedValues[i] { + t.Errorf("the %dth exemplar value %v is not as expected: %v", i, e.GetValue(), tc.expectedValues[i]) + } + } + }) } - expectedValues = map[float64]struct{}{ - 1: {}, - 3: {}, - 4: {}, + // Test the histogram with negative NativeHistogramMaxExemplars + h = NewHistogram(HistogramOpts{ + Name: "test", + Help: "test help", + Buckets: []float64{1, 2, 3, 4}, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxExemplars: -1, + NativeHistogramExemplarTTL: -1 * time.Second, + }).(*histogram) + + tcs = []struct { + name string + addFunc func(*histogram) + expectedValues []float64 + }{ + { + name: "add exemplars to the limit, but no effect", + addFunc: func(h *histogram) { + h.ObserveWithExemplar(1, Labels{"id": "1"}) + h.ObserveWithExemplar(3, Labels{"id": "1"}) + h.ObserveWithExemplar(5, Labels{"id": "1"}) + }, + expectedValues: []float64{}, + }, } - for _, e := range histogram.nativeExemplars.exemplars { - if _, ok := expectedValues[e.GetValue()]; !ok { - t.Errorf("the value is not in expected value") - } - } - - time.Sleep(10 * time.Second) - histogram.ObserveWithExemplar(6, Labels{"id": "1"}) - - if len(histogram.nativeExemplars.exemplars) != 3 { - t.Errorf("the count of exemplars is not 3") - } - - expectedValues = map[float64]struct{}{ - 6: {}, - 3: {}, - 4: {}, - } - for _, e := range histogram.nativeExemplars.exemplars { - if _, ok := expectedValues[e.GetValue()]; !ok { - t.Errorf("the value is not in expected value") - } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + tc.addFunc(h) + if len(h.nativeExemplars.exemplars) != len(tc.expectedValues) { + t.Errorf("the count of exemplars is not %d", len(tc.expectedValues)) + } + for i, e := range h.nativeExemplars.exemplars { + if e.GetValue() != tc.expectedValues[i] { + t.Errorf("the %dth exemplar value %v is not as expected: %v", i, e.GetValue(), tc.expectedValues[i]) + } + } + }) } }