Merge pull request #1471 from fatsheep9146/native-histogram-exemplar
add native histogram exemplar support
This commit is contained in:
commit
542f7e6c6e
|
@ -440,7 +440,7 @@ type HistogramOpts struct {
|
||||||
// constant (or any negative float value).
|
// constant (or any negative float value).
|
||||||
NativeHistogramZeroThreshold float64
|
NativeHistogramZeroThreshold float64
|
||||||
|
|
||||||
// The remaining fields define a strategy to limit the number of
|
// The next three fields define a strategy to limit the number of
|
||||||
// populated sparse buckets. If NativeHistogramMaxBucketNumber is left
|
// populated sparse buckets. If NativeHistogramMaxBucketNumber is left
|
||||||
// at zero, the number of buckets is not limited. (Note that this might
|
// at zero, the number of buckets is not limited. (Note that this might
|
||||||
// lead to unbounded memory consumption if the values observed by the
|
// lead to unbounded memory consumption if the values observed by the
|
||||||
|
@ -473,6 +473,22 @@ type HistogramOpts struct {
|
||||||
NativeHistogramMinResetDuration time.Duration
|
NativeHistogramMinResetDuration time.Duration
|
||||||
NativeHistogramMaxZeroThreshold float64
|
NativeHistogramMaxZeroThreshold float64
|
||||||
|
|
||||||
|
// NativeHistogramMaxExemplars limits the number of exemplars
|
||||||
|
// that are kept in memory for each native histogram. If you leave it at
|
||||||
|
// zero, a default value of 10 is used. If no exemplars should be kept specifically
|
||||||
|
// for native histograms, set it to a negative value. (Scrapers can
|
||||||
|
// still use the exemplars exposed for classic buckets, which are managed
|
||||||
|
// independently.)
|
||||||
|
NativeHistogramMaxExemplars int
|
||||||
|
// NativeHistogramExemplarTTL is only checked once
|
||||||
|
// NativeHistogramMaxExemplars is exceeded. In that case, the
|
||||||
|
// oldest exemplar is removed if it is older than NativeHistogramExemplarTTL.
|
||||||
|
// Otherwise, the older exemplar in the pair of exemplars that are closest
|
||||||
|
// together (on an exponential scale) is removed.
|
||||||
|
// If NativeHistogramExemplarTTL is left at its zero value, a default value of
|
||||||
|
// 5m is used. To always delete the oldest exemplar, set it to a negative value.
|
||||||
|
NativeHistogramExemplarTTL time.Duration
|
||||||
|
|
||||||
// now is for testing purposes, by default it's time.Now.
|
// now is for testing purposes, by default it's time.Now.
|
||||||
now func() time.Time
|
now func() time.Time
|
||||||
|
|
||||||
|
@ -532,6 +548,7 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
|
||||||
if opts.afterFunc == nil {
|
if opts.afterFunc == nil {
|
||||||
opts.afterFunc = time.AfterFunc
|
opts.afterFunc = time.AfterFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
h := &histogram{
|
h := &histogram{
|
||||||
desc: desc,
|
desc: desc,
|
||||||
upperBounds: opts.Buckets,
|
upperBounds: opts.Buckets,
|
||||||
|
@ -556,6 +573,7 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
|
||||||
h.nativeHistogramZeroThreshold = DefNativeHistogramZeroThreshold
|
h.nativeHistogramZeroThreshold = DefNativeHistogramZeroThreshold
|
||||||
} // Leave h.nativeHistogramZeroThreshold at 0 otherwise.
|
} // Leave h.nativeHistogramZeroThreshold at 0 otherwise.
|
||||||
h.nativeHistogramSchema = pickSchema(opts.NativeHistogramBucketFactor)
|
h.nativeHistogramSchema = pickSchema(opts.NativeHistogramBucketFactor)
|
||||||
|
h.nativeExemplars = makeNativeExemplars(opts.NativeHistogramExemplarTTL, opts.NativeHistogramMaxExemplars)
|
||||||
}
|
}
|
||||||
for i, upperBound := range h.upperBounds {
|
for i, upperBound := range h.upperBounds {
|
||||||
if i < len(h.upperBounds)-1 {
|
if i < len(h.upperBounds)-1 {
|
||||||
|
@ -726,6 +744,7 @@ type histogram struct {
|
||||||
// scheduled for a later time (when nativeHistogramMinResetDuration has
|
// scheduled for a later time (when nativeHistogramMinResetDuration has
|
||||||
// passed).
|
// passed).
|
||||||
resetScheduled bool
|
resetScheduled bool
|
||||||
|
nativeExemplars nativeExemplars
|
||||||
|
|
||||||
// now is for testing purposes, by default it's time.Now.
|
// now is for testing purposes, by default it's time.Now.
|
||||||
now func() time.Time
|
now func() time.Time
|
||||||
|
@ -742,6 +761,9 @@ func (h *histogram) Observe(v float64) {
|
||||||
h.observe(v, h.findBucket(v))
|
h.observe(v, h.findBucket(v))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ObserveWithExemplar should not be called in a high-frequency setting
|
||||||
|
// for a native histogram with configured exemplars. For this case,
|
||||||
|
// the implementation isn't lock-free and might suffer from lock contention.
|
||||||
func (h *histogram) ObserveWithExemplar(v float64, e Labels) {
|
func (h *histogram) ObserveWithExemplar(v float64, e Labels) {
|
||||||
i := h.findBucket(v)
|
i := h.findBucket(v)
|
||||||
h.observe(v, i)
|
h.observe(v, i)
|
||||||
|
@ -821,6 +843,15 @@ func (h *histogram) Write(out *dto.Metric) error {
|
||||||
Length: proto.Uint32(0),
|
Length: proto.Uint32(0),
|
||||||
}}
|
}}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If exemplars are not configured, the cap will be 0.
|
||||||
|
// So append is not needed in this case.
|
||||||
|
if cap(h.nativeExemplars.exemplars) > 0 {
|
||||||
|
h.nativeExemplars.Lock()
|
||||||
|
his.Exemplars = append(his.Exemplars, h.nativeExemplars.exemplars...)
|
||||||
|
h.nativeExemplars.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
addAndResetCounts(hotCounts, coldCounts)
|
addAndResetCounts(hotCounts, coldCounts)
|
||||||
return nil
|
return nil
|
||||||
|
@ -1091,8 +1122,10 @@ func (h *histogram) resetCounts(counts *histogramCounts) {
|
||||||
deleteSyncMap(&counts.nativeHistogramBucketsPositive)
|
deleteSyncMap(&counts.nativeHistogramBucketsPositive)
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateExemplar replaces the exemplar for the provided bucket. With empty
|
// updateExemplar replaces the exemplar for the provided classic bucket.
|
||||||
// labels, it's a no-op. It panics if any of the labels is invalid.
|
// With empty labels, it's a no-op. It panics if any of the labels is invalid.
|
||||||
|
// If histogram is native, the exemplar will be cached into nativeExemplars,
|
||||||
|
// which has a limit, and will remove one exemplar when limit is reached.
|
||||||
func (h *histogram) updateExemplar(v float64, bucket int, l Labels) {
|
func (h *histogram) updateExemplar(v float64, bucket int, l Labels) {
|
||||||
if l == nil {
|
if l == nil {
|
||||||
return
|
return
|
||||||
|
@ -1102,6 +1135,10 @@ func (h *histogram) updateExemplar(v float64, bucket int, l Labels) {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
h.exemplars[bucket].Store(e)
|
h.exemplars[bucket].Store(e)
|
||||||
|
doSparse := h.nativeHistogramSchema > math.MinInt32 && !math.IsNaN(v)
|
||||||
|
if doSparse {
|
||||||
|
h.nativeExemplars.addExemplar(e)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// HistogramVec is a Collector that bundles a set of Histograms that all share the
|
// HistogramVec is a Collector that bundles a set of Histograms that all share the
|
||||||
|
@ -1575,3 +1612,142 @@ func addAndResetCounts(hot, cold *histogramCounts) {
|
||||||
atomic.AddUint64(&hot.nativeHistogramZeroBucket, atomic.LoadUint64(&cold.nativeHistogramZeroBucket))
|
atomic.AddUint64(&hot.nativeHistogramZeroBucket, atomic.LoadUint64(&cold.nativeHistogramZeroBucket))
|
||||||
atomic.StoreUint64(&cold.nativeHistogramZeroBucket, 0)
|
atomic.StoreUint64(&cold.nativeHistogramZeroBucket, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type nativeExemplars struct {
|
||||||
|
sync.Mutex
|
||||||
|
|
||||||
|
ttl time.Duration
|
||||||
|
exemplars []*dto.Exemplar
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeNativeExemplars(ttl time.Duration, maxCount int) nativeExemplars {
|
||||||
|
if ttl == 0 {
|
||||||
|
ttl = 5 * time.Minute
|
||||||
|
}
|
||||||
|
|
||||||
|
if maxCount == 0 {
|
||||||
|
maxCount = 10
|
||||||
|
}
|
||||||
|
|
||||||
|
if maxCount < 0 {
|
||||||
|
maxCount = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return nativeExemplars{
|
||||||
|
ttl: ttl,
|
||||||
|
exemplars: make([]*dto.Exemplar, 0, maxCount),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *nativeExemplars) addExemplar(e *dto.Exemplar) {
|
||||||
|
if cap(n.exemplars) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
n.Lock()
|
||||||
|
defer n.Unlock()
|
||||||
|
|
||||||
|
// The index where to insert the new exemplar.
|
||||||
|
var nIdx int = -1
|
||||||
|
|
||||||
|
// When the number of exemplars has not yet exceeded or
|
||||||
|
// is equal to cap(n.exemplars), then
|
||||||
|
// insert the new exemplar directly.
|
||||||
|
if len(n.exemplars) < cap(n.exemplars) {
|
||||||
|
for nIdx = 0; nIdx < len(n.exemplars); nIdx++ {
|
||||||
|
if *e.Value < *n.exemplars[nIdx].Value {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
n.exemplars = append(n.exemplars[:nIdx], append([]*dto.Exemplar{e}, n.exemplars[nIdx:]...)...)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// When the number of exemplars exceeds the limit, remove one exemplar.
|
||||||
|
var (
|
||||||
|
rIdx int // The index where to remove the old exemplar.
|
||||||
|
|
||||||
|
ot = time.Now() // Oldest timestamp seen.
|
||||||
|
otIdx = -1 // Index of the exemplar with the oldest timestamp.
|
||||||
|
|
||||||
|
md = -1.0 // Logarithm of the delta of the closest pair of exemplars.
|
||||||
|
mdIdx = -1 // Index of the older exemplar within the closest pair.
|
||||||
|
cLog float64 // Logarithm of the current exemplar.
|
||||||
|
pLog float64 // Logarithm of the previous exemplar.
|
||||||
|
)
|
||||||
|
|
||||||
|
for i, exemplar := range n.exemplars {
|
||||||
|
// Find the exemplar with the oldest timestamp.
|
||||||
|
if otIdx == -1 || exemplar.Timestamp.AsTime().Before(ot) {
|
||||||
|
ot = exemplar.Timestamp.AsTime()
|
||||||
|
otIdx = i
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find the index at which to insert new the exemplar.
|
||||||
|
if *e.Value <= *exemplar.Value && nIdx == -1 {
|
||||||
|
nIdx = i
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find the two closest exemplars and pick the one the with older timestamp.
|
||||||
|
pLog = cLog
|
||||||
|
cLog = math.Log(exemplar.GetValue())
|
||||||
|
if i == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
diff := math.Abs(cLog - pLog)
|
||||||
|
if md == -1 || diff < md {
|
||||||
|
md = diff
|
||||||
|
if n.exemplars[i].Timestamp.AsTime().Before(n.exemplars[i-1].Timestamp.AsTime()) {
|
||||||
|
mdIdx = i
|
||||||
|
} else {
|
||||||
|
mdIdx = i - 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// If all existing exemplar are smaller than new exemplar,
|
||||||
|
// then the exemplar should be inserted at the end.
|
||||||
|
if nIdx == -1 {
|
||||||
|
nIdx = len(n.exemplars)
|
||||||
|
}
|
||||||
|
|
||||||
|
if otIdx != -1 && e.Timestamp.AsTime().Sub(ot) > n.ttl {
|
||||||
|
rIdx = otIdx
|
||||||
|
} else {
|
||||||
|
// In the previous for loop, when calculating the closest pair of exemplars,
|
||||||
|
// we did not take into account the newly inserted exemplar.
|
||||||
|
// So we need to calculate with the newly inserted exemplar again.
|
||||||
|
elog := math.Log(e.GetValue())
|
||||||
|
if nIdx > 0 {
|
||||||
|
diff := math.Abs(elog - math.Log(n.exemplars[nIdx-1].GetValue()))
|
||||||
|
if diff < md {
|
||||||
|
md = diff
|
||||||
|
mdIdx = nIdx
|
||||||
|
if n.exemplars[nIdx-1].Timestamp.AsTime().Before(e.Timestamp.AsTime()) {
|
||||||
|
mdIdx = nIdx - 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if nIdx < len(n.exemplars) {
|
||||||
|
diff := math.Abs(math.Log(n.exemplars[nIdx].GetValue()) - elog)
|
||||||
|
if diff < md {
|
||||||
|
mdIdx = nIdx
|
||||||
|
if n.exemplars[nIdx].Timestamp.AsTime().Before(e.Timestamp.AsTime()) {
|
||||||
|
mdIdx = nIdx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rIdx = mdIdx
|
||||||
|
}
|
||||||
|
|
||||||
|
// Adjust the slice according to rIdx and nIdx.
|
||||||
|
switch {
|
||||||
|
case rIdx == nIdx:
|
||||||
|
n.exemplars[nIdx] = e
|
||||||
|
case rIdx < nIdx:
|
||||||
|
n.exemplars = append(n.exemplars[:rIdx], append(n.exemplars[rIdx+1:nIdx], append([]*dto.Exemplar{e}, n.exemplars[nIdx:]...)...)...)
|
||||||
|
case rIdx > nIdx:
|
||||||
|
n.exemplars = append(n.exemplars[:nIdx], append([]*dto.Exemplar{e}, append(n.exemplars[nIdx:rIdx], n.exemplars[rIdx+1:]...)...)...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1271,3 +1271,158 @@ func TestHistogramVecCreatedTimestampWithDeletes(t *testing.T) {
|
||||||
now = now.Add(1 * time.Hour)
|
now = now.Add(1 * time.Hour)
|
||||||
expectCTsForMetricVecValues(t, histogramVec.MetricVec, dto.MetricType_HISTOGRAM, expected)
|
expectCTsForMetricVecValues(t, histogramVec.MetricVec, dto.MetricType_HISTOGRAM, expected)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNativeHistogramExemplar(t *testing.T) {
|
||||||
|
// Test the histogram with positive NativeHistogramExemplarTTL and NativeHistogramMaxExemplars
|
||||||
|
h := NewHistogram(HistogramOpts{
|
||||||
|
Name: "test",
|
||||||
|
Help: "test help",
|
||||||
|
Buckets: []float64{1, 2, 3, 4},
|
||||||
|
NativeHistogramBucketFactor: 1.1,
|
||||||
|
NativeHistogramMaxExemplars: 3,
|
||||||
|
NativeHistogramExemplarTTL: 10 * time.Second,
|
||||||
|
}).(*histogram)
|
||||||
|
|
||||||
|
tcs := []struct {
|
||||||
|
name string
|
||||||
|
addFunc func(*histogram)
|
||||||
|
expectedValues []float64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "add exemplars to the limit",
|
||||||
|
addFunc: func(h *histogram) {
|
||||||
|
h.ObserveWithExemplar(1, Labels{"id": "1"})
|
||||||
|
h.ObserveWithExemplar(3, Labels{"id": "1"})
|
||||||
|
h.ObserveWithExemplar(5, Labels{"id": "1"})
|
||||||
|
},
|
||||||
|
expectedValues: []float64{1, 3, 5},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "remove exemplar in closest pair, the removed index equals to inserted index",
|
||||||
|
addFunc: func(h *histogram) {
|
||||||
|
h.ObserveWithExemplar(4, Labels{"id": "1"})
|
||||||
|
},
|
||||||
|
expectedValues: []float64{1, 3, 4},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "remove exemplar in closest pair, the removed index is bigger than inserted index",
|
||||||
|
addFunc: func(h *histogram) {
|
||||||
|
h.ObserveWithExemplar(0, Labels{"id": "1"})
|
||||||
|
},
|
||||||
|
expectedValues: []float64{0, 1, 4},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "remove exemplar with oldest timestamp, the removed index is smaller than inserted index",
|
||||||
|
addFunc: func(h *histogram) {
|
||||||
|
h.now = func() time.Time { return time.Now().Add(time.Second * 11) }
|
||||||
|
h.ObserveWithExemplar(6, Labels{"id": "1"})
|
||||||
|
},
|
||||||
|
expectedValues: []float64{0, 4, 6},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
tc.addFunc(h)
|
||||||
|
compareNativeExemplarValues(t, h.nativeExemplars.exemplars, tc.expectedValues)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test the histogram with negative NativeHistogramExemplarTTL
|
||||||
|
h = NewHistogram(HistogramOpts{
|
||||||
|
Name: "test",
|
||||||
|
Help: "test help",
|
||||||
|
Buckets: []float64{1, 2, 3, 4},
|
||||||
|
NativeHistogramBucketFactor: 1.1,
|
||||||
|
NativeHistogramMaxExemplars: 3,
|
||||||
|
NativeHistogramExemplarTTL: -1 * time.Second,
|
||||||
|
}).(*histogram)
|
||||||
|
|
||||||
|
tcs = []struct {
|
||||||
|
name string
|
||||||
|
addFunc func(*histogram)
|
||||||
|
expectedValues []float64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "add exemplars to the limit",
|
||||||
|
addFunc: func(h *histogram) {
|
||||||
|
h.ObserveWithExemplar(1, Labels{"id": "1"})
|
||||||
|
h.ObserveWithExemplar(3, Labels{"id": "1"})
|
||||||
|
h.ObserveWithExemplar(5, Labels{"id": "1"})
|
||||||
|
},
|
||||||
|
expectedValues: []float64{1, 3, 5},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "remove exemplar with oldest timestamp, the removed index is smaller than inserted index",
|
||||||
|
addFunc: func(h *histogram) {
|
||||||
|
h.ObserveWithExemplar(4, Labels{"id": "1"})
|
||||||
|
},
|
||||||
|
expectedValues: []float64{3, 4, 5},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "remove exemplar with oldest timestamp, the removed index equals to inserted index",
|
||||||
|
addFunc: func(h *histogram) {
|
||||||
|
h.ObserveWithExemplar(0, Labels{"id": "1"})
|
||||||
|
},
|
||||||
|
expectedValues: []float64{0, 4, 5},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "remove exemplar with oldest timestamp, the removed index is bigger than inserted index",
|
||||||
|
addFunc: func(h *histogram) {
|
||||||
|
h.ObserveWithExemplar(3, Labels{"id": "1"})
|
||||||
|
},
|
||||||
|
expectedValues: []float64{0, 3, 4},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
tc.addFunc(h)
|
||||||
|
compareNativeExemplarValues(t, h.nativeExemplars.exemplars, tc.expectedValues)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test the histogram with negative NativeHistogramMaxExemplars
|
||||||
|
h = NewHistogram(HistogramOpts{
|
||||||
|
Name: "test",
|
||||||
|
Help: "test help",
|
||||||
|
Buckets: []float64{1, 2, 3, 4},
|
||||||
|
NativeHistogramBucketFactor: 1.1,
|
||||||
|
NativeHistogramMaxExemplars: -1,
|
||||||
|
NativeHistogramExemplarTTL: -1 * time.Second,
|
||||||
|
}).(*histogram)
|
||||||
|
|
||||||
|
tcs = []struct {
|
||||||
|
name string
|
||||||
|
addFunc func(*histogram)
|
||||||
|
expectedValues []float64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "add exemplars to the limit, but no effect",
|
||||||
|
addFunc: func(h *histogram) {
|
||||||
|
h.ObserveWithExemplar(1, Labels{"id": "1"})
|
||||||
|
h.ObserveWithExemplar(3, Labels{"id": "1"})
|
||||||
|
h.ObserveWithExemplar(5, Labels{"id": "1"})
|
||||||
|
},
|
||||||
|
expectedValues: []float64{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
tc.addFunc(h)
|
||||||
|
compareNativeExemplarValues(t, h.nativeExemplars.exemplars, tc.expectedValues)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func compareNativeExemplarValues(t *testing.T, exps []*dto.Exemplar, values []float64) {
|
||||||
|
if len(exps) != len(values) {
|
||||||
|
t.Errorf("the count of exemplars is not %d", len(values))
|
||||||
|
}
|
||||||
|
for i, e := range exps {
|
||||||
|
if e.GetValue() != values[i] {
|
||||||
|
t.Errorf("the %dth exemplar value %v is not as expected: %v", i, e.GetValue(), values[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue