diff --git a/prometheus/histogram.go b/prometheus/histogram.go index b5c8bcb..bf5f68a 100644 --- a/prometheus/histogram.go +++ b/prometheus/histogram.go @@ -440,7 +440,7 @@ type HistogramOpts struct { // constant (or any negative float value). NativeHistogramZeroThreshold float64 - // The remaining fields define a strategy to limit the number of + // The next three fields define a strategy to limit the number of // populated sparse buckets. If NativeHistogramMaxBucketNumber is left // at zero, the number of buckets is not limited. (Note that this might // lead to unbounded memory consumption if the values observed by the @@ -473,6 +473,22 @@ type HistogramOpts struct { NativeHistogramMinResetDuration time.Duration NativeHistogramMaxZeroThreshold float64 + // NativeHistogramMaxExemplars limits the number of exemplars + // that are kept in memory for each native histogram. If you leave it at + // zero, a default value of 10 is used. If no exemplars should be kept specifically + // for native histograms, set it to a negative value. (Scrapers can + // still use the exemplars exposed for classic buckets, which are managed + // independently.) + NativeHistogramMaxExemplars int + // NativeHistogramExemplarTTL is only checked once + // NativeHistogramMaxExemplars is exceeded. In that case, the + // oldest exemplar is removed if it is older than NativeHistogramExemplarTTL. + // Otherwise, the older exemplar in the pair of exemplars that are closest + // together (on an exponential scale) is removed. + // If NativeHistogramExemplarTTL is left at its zero value, a default value of + // 5m is used. To always delete the oldest exemplar, set it to a negative value. + NativeHistogramExemplarTTL time.Duration + // now is for testing purposes, by default it's time.Now. now func() time.Time @@ -532,6 +548,7 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr if opts.afterFunc == nil { opts.afterFunc = time.AfterFunc } + h := &histogram{ desc: desc, upperBounds: opts.Buckets, @@ -556,6 +573,7 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr h.nativeHistogramZeroThreshold = DefNativeHistogramZeroThreshold } // Leave h.nativeHistogramZeroThreshold at 0 otherwise. h.nativeHistogramSchema = pickSchema(opts.NativeHistogramBucketFactor) + h.nativeExemplars = makeNativeExemplars(opts.NativeHistogramExemplarTTL, opts.NativeHistogramMaxExemplars) } for i, upperBound := range h.upperBounds { if i < len(h.upperBounds)-1 { @@ -725,7 +743,8 @@ type histogram struct { // resetScheduled is protected by mtx. It is true if a reset is // scheduled for a later time (when nativeHistogramMinResetDuration has // passed). - resetScheduled bool + resetScheduled bool + nativeExemplars nativeExemplars // now is for testing purposes, by default it's time.Now. now func() time.Time @@ -742,6 +761,9 @@ func (h *histogram) Observe(v float64) { h.observe(v, h.findBucket(v)) } +// ObserveWithExemplar should not be called in a high-frequency setting +// for a native histogram with configured exemplars. For this case, +// the implementation isn't lock-free and might suffer from lock contention. func (h *histogram) ObserveWithExemplar(v float64, e Labels) { i := h.findBucket(v) h.observe(v, i) @@ -821,6 +843,15 @@ func (h *histogram) Write(out *dto.Metric) error { Length: proto.Uint32(0), }} } + + // If exemplars are not configured, the cap will be 0. + // So append is not needed in this case. + if cap(h.nativeExemplars.exemplars) > 0 { + h.nativeExemplars.Lock() + his.Exemplars = append(his.Exemplars, h.nativeExemplars.exemplars...) + h.nativeExemplars.Unlock() + } + } addAndResetCounts(hotCounts, coldCounts) return nil @@ -1091,8 +1122,10 @@ func (h *histogram) resetCounts(counts *histogramCounts) { deleteSyncMap(&counts.nativeHistogramBucketsPositive) } -// updateExemplar replaces the exemplar for the provided bucket. With empty -// labels, it's a no-op. It panics if any of the labels is invalid. +// updateExemplar replaces the exemplar for the provided classic bucket. +// With empty labels, it's a no-op. It panics if any of the labels is invalid. +// If histogram is native, the exemplar will be cached into nativeExemplars, +// which has a limit, and will remove one exemplar when limit is reached. func (h *histogram) updateExemplar(v float64, bucket int, l Labels) { if l == nil { return @@ -1102,6 +1135,10 @@ func (h *histogram) updateExemplar(v float64, bucket int, l Labels) { panic(err) } h.exemplars[bucket].Store(e) + doSparse := h.nativeHistogramSchema > math.MinInt32 && !math.IsNaN(v) + if doSparse { + h.nativeExemplars.addExemplar(e) + } } // HistogramVec is a Collector that bundles a set of Histograms that all share the @@ -1575,3 +1612,142 @@ func addAndResetCounts(hot, cold *histogramCounts) { atomic.AddUint64(&hot.nativeHistogramZeroBucket, atomic.LoadUint64(&cold.nativeHistogramZeroBucket)) atomic.StoreUint64(&cold.nativeHistogramZeroBucket, 0) } + +type nativeExemplars struct { + sync.Mutex + + ttl time.Duration + exemplars []*dto.Exemplar +} + +func makeNativeExemplars(ttl time.Duration, maxCount int) nativeExemplars { + if ttl == 0 { + ttl = 5 * time.Minute + } + + if maxCount == 0 { + maxCount = 10 + } + + if maxCount < 0 { + maxCount = 0 + } + + return nativeExemplars{ + ttl: ttl, + exemplars: make([]*dto.Exemplar, 0, maxCount), + } +} + +func (n *nativeExemplars) addExemplar(e *dto.Exemplar) { + if cap(n.exemplars) == 0 { + return + } + + n.Lock() + defer n.Unlock() + + // The index where to insert the new exemplar. + var nIdx int = -1 + + // When the number of exemplars has not yet exceeded or + // is equal to cap(n.exemplars), then + // insert the new exemplar directly. + if len(n.exemplars) < cap(n.exemplars) { + for nIdx = 0; nIdx < len(n.exemplars); nIdx++ { + if *e.Value < *n.exemplars[nIdx].Value { + break + } + } + n.exemplars = append(n.exemplars[:nIdx], append([]*dto.Exemplar{e}, n.exemplars[nIdx:]...)...) + return + } + + // When the number of exemplars exceeds the limit, remove one exemplar. + var ( + rIdx int // The index where to remove the old exemplar. + + ot = time.Now() // Oldest timestamp seen. + otIdx = -1 // Index of the exemplar with the oldest timestamp. + + md = -1.0 // Logarithm of the delta of the closest pair of exemplars. + mdIdx = -1 // Index of the older exemplar within the closest pair. + cLog float64 // Logarithm of the current exemplar. + pLog float64 // Logarithm of the previous exemplar. + ) + + for i, exemplar := range n.exemplars { + // Find the exemplar with the oldest timestamp. + if otIdx == -1 || exemplar.Timestamp.AsTime().Before(ot) { + ot = exemplar.Timestamp.AsTime() + otIdx = i + } + + // Find the index at which to insert new the exemplar. + if *e.Value <= *exemplar.Value && nIdx == -1 { + nIdx = i + } + + // Find the two closest exemplars and pick the one the with older timestamp. + pLog = cLog + cLog = math.Log(exemplar.GetValue()) + if i == 0 { + continue + } + diff := math.Abs(cLog - pLog) + if md == -1 || diff < md { + md = diff + if n.exemplars[i].Timestamp.AsTime().Before(n.exemplars[i-1].Timestamp.AsTime()) { + mdIdx = i + } else { + mdIdx = i - 1 + } + } + + } + + // If all existing exemplar are smaller than new exemplar, + // then the exemplar should be inserted at the end. + if nIdx == -1 { + nIdx = len(n.exemplars) + } + + if otIdx != -1 && e.Timestamp.AsTime().Sub(ot) > n.ttl { + rIdx = otIdx + } else { + // In the previous for loop, when calculating the closest pair of exemplars, + // we did not take into account the newly inserted exemplar. + // So we need to calculate with the newly inserted exemplar again. + elog := math.Log(e.GetValue()) + if nIdx > 0 { + diff := math.Abs(elog - math.Log(n.exemplars[nIdx-1].GetValue())) + if diff < md { + md = diff + mdIdx = nIdx + if n.exemplars[nIdx-1].Timestamp.AsTime().Before(e.Timestamp.AsTime()) { + mdIdx = nIdx - 1 + } + } + } + if nIdx < len(n.exemplars) { + diff := math.Abs(math.Log(n.exemplars[nIdx].GetValue()) - elog) + if diff < md { + mdIdx = nIdx + if n.exemplars[nIdx].Timestamp.AsTime().Before(e.Timestamp.AsTime()) { + mdIdx = nIdx + } + } + } + rIdx = mdIdx + } + + // Adjust the slice according to rIdx and nIdx. + switch { + case rIdx == nIdx: + n.exemplars[nIdx] = e + case rIdx < nIdx: + n.exemplars = append(n.exemplars[:rIdx], append(n.exemplars[rIdx+1:nIdx], append([]*dto.Exemplar{e}, n.exemplars[nIdx:]...)...)...) + case rIdx > nIdx: + n.exemplars = append(n.exemplars[:nIdx], append([]*dto.Exemplar{e}, append(n.exemplars[nIdx:rIdx], n.exemplars[rIdx+1:]...)...)...) + } +} diff --git a/prometheus/histogram_test.go b/prometheus/histogram_test.go index 39bb0dc..57fafd3 100644 --- a/prometheus/histogram_test.go +++ b/prometheus/histogram_test.go @@ -1271,3 +1271,158 @@ func TestHistogramVecCreatedTimestampWithDeletes(t *testing.T) { now = now.Add(1 * time.Hour) expectCTsForMetricVecValues(t, histogramVec.MetricVec, dto.MetricType_HISTOGRAM, expected) } + +func TestNativeHistogramExemplar(t *testing.T) { + // Test the histogram with positive NativeHistogramExemplarTTL and NativeHistogramMaxExemplars + h := NewHistogram(HistogramOpts{ + Name: "test", + Help: "test help", + Buckets: []float64{1, 2, 3, 4}, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxExemplars: 3, + NativeHistogramExemplarTTL: 10 * time.Second, + }).(*histogram) + + tcs := []struct { + name string + addFunc func(*histogram) + expectedValues []float64 + }{ + { + name: "add exemplars to the limit", + addFunc: func(h *histogram) { + h.ObserveWithExemplar(1, Labels{"id": "1"}) + h.ObserveWithExemplar(3, Labels{"id": "1"}) + h.ObserveWithExemplar(5, Labels{"id": "1"}) + }, + expectedValues: []float64{1, 3, 5}, + }, + { + name: "remove exemplar in closest pair, the removed index equals to inserted index", + addFunc: func(h *histogram) { + h.ObserveWithExemplar(4, Labels{"id": "1"}) + }, + expectedValues: []float64{1, 3, 4}, + }, + { + name: "remove exemplar in closest pair, the removed index is bigger than inserted index", + addFunc: func(h *histogram) { + h.ObserveWithExemplar(0, Labels{"id": "1"}) + }, + expectedValues: []float64{0, 1, 4}, + }, + { + name: "remove exemplar with oldest timestamp, the removed index is smaller than inserted index", + addFunc: func(h *histogram) { + h.now = func() time.Time { return time.Now().Add(time.Second * 11) } + h.ObserveWithExemplar(6, Labels{"id": "1"}) + }, + expectedValues: []float64{0, 4, 6}, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + tc.addFunc(h) + compareNativeExemplarValues(t, h.nativeExemplars.exemplars, tc.expectedValues) + }) + } + + // Test the histogram with negative NativeHistogramExemplarTTL + h = NewHistogram(HistogramOpts{ + Name: "test", + Help: "test help", + Buckets: []float64{1, 2, 3, 4}, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxExemplars: 3, + NativeHistogramExemplarTTL: -1 * time.Second, + }).(*histogram) + + tcs = []struct { + name string + addFunc func(*histogram) + expectedValues []float64 + }{ + { + name: "add exemplars to the limit", + addFunc: func(h *histogram) { + h.ObserveWithExemplar(1, Labels{"id": "1"}) + h.ObserveWithExemplar(3, Labels{"id": "1"}) + h.ObserveWithExemplar(5, Labels{"id": "1"}) + }, + expectedValues: []float64{1, 3, 5}, + }, + { + name: "remove exemplar with oldest timestamp, the removed index is smaller than inserted index", + addFunc: func(h *histogram) { + h.ObserveWithExemplar(4, Labels{"id": "1"}) + }, + expectedValues: []float64{3, 4, 5}, + }, + { + name: "remove exemplar with oldest timestamp, the removed index equals to inserted index", + addFunc: func(h *histogram) { + h.ObserveWithExemplar(0, Labels{"id": "1"}) + }, + expectedValues: []float64{0, 4, 5}, + }, + { + name: "remove exemplar with oldest timestamp, the removed index is bigger than inserted index", + addFunc: func(h *histogram) { + h.ObserveWithExemplar(3, Labels{"id": "1"}) + }, + expectedValues: []float64{0, 3, 4}, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + tc.addFunc(h) + compareNativeExemplarValues(t, h.nativeExemplars.exemplars, tc.expectedValues) + }) + } + + // Test the histogram with negative NativeHistogramMaxExemplars + h = NewHistogram(HistogramOpts{ + Name: "test", + Help: "test help", + Buckets: []float64{1, 2, 3, 4}, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxExemplars: -1, + NativeHistogramExemplarTTL: -1 * time.Second, + }).(*histogram) + + tcs = []struct { + name string + addFunc func(*histogram) + expectedValues []float64 + }{ + { + name: "add exemplars to the limit, but no effect", + addFunc: func(h *histogram) { + h.ObserveWithExemplar(1, Labels{"id": "1"}) + h.ObserveWithExemplar(3, Labels{"id": "1"}) + h.ObserveWithExemplar(5, Labels{"id": "1"}) + }, + expectedValues: []float64{}, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + tc.addFunc(h) + compareNativeExemplarValues(t, h.nativeExemplars.exemplars, tc.expectedValues) + }) + } +} + +func compareNativeExemplarValues(t *testing.T, exps []*dto.Exemplar, values []float64) { + if len(exps) != len(values) { + t.Errorf("the count of exemplars is not %d", len(values)) + } + for i, e := range exps { + if e.GetValue() != values[i] { + t.Errorf("the %dth exemplar value %v is not as expected: %v", i, e.GetValue(), values[i]) + } + } +}