refract the implementation

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>
This commit is contained in:
Ziqi Zhao 2024-04-02 13:20:13 +08:00
parent 494ccce4f1
commit d8c7074b1c
2 changed files with 306 additions and 122 deletions

View File

@ -440,7 +440,7 @@ type HistogramOpts struct {
// constant (or any negative float value). // constant (or any negative float value).
NativeHistogramZeroThreshold float64 NativeHistogramZeroThreshold float64
// The remaining fields define a strategy to limit the number of // The next three fields define a strategy to limit the number of
// populated sparse buckets. If NativeHistogramMaxBucketNumber is left // populated sparse buckets. If NativeHistogramMaxBucketNumber is left
// at zero, the number of buckets is not limited. (Note that this might // at zero, the number of buckets is not limited. (Note that this might
// lead to unbounded memory consumption if the values observed by the // lead to unbounded memory consumption if the values observed by the
@ -472,8 +472,22 @@ type HistogramOpts struct {
NativeHistogramMaxBucketNumber uint32 NativeHistogramMaxBucketNumber uint32
NativeHistogramMinResetDuration time.Duration NativeHistogramMinResetDuration time.Duration
NativeHistogramMaxZeroThreshold float64 NativeHistogramMaxZeroThreshold float64
NativeHistogramMaxExemplarCount uint32
NativeHistogramExemplarTTL time.Duration // NativeHistogramMaxExemplars limits the number of exemplars
// that are kept in memory for each native histogram. If you leave it at
// zero, a default value of 10 is used. If no exemplars should be kept specifically
// for native histograms, set it to a negative value. (Scrapers can
// still use the exemplars exposed for classic buckets, which are managed
// independently.)
NativeHistogramMaxExemplars int
// NativeHistogramExemplarTTL is only checked once
// NativeHistogramMaxExemplars is exceeded. In that case, the
// oldest exemplar is removed if it is older than NativeHistogramExemplarTTL.
// Otherwise, the older exemplar in the pair of exemplars that are closest
// together (on an exponential scale) is removed.
// If NativeHistogramExemplarTTL is left at its zero value, a default value of
// 5m is used. To always delete the oldest exemplar, set it to a negative value.
NativeHistogramExemplarTTL time.Duration
// now is for testing purposes, by default it's time.Now. // now is for testing purposes, by default it's time.Now.
now func() time.Time now func() time.Time
@ -534,6 +548,7 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
if opts.afterFunc == nil { if opts.afterFunc == nil {
opts.afterFunc = time.AfterFunc opts.afterFunc = time.AfterFunc
} }
h := &histogram{ h := &histogram{
desc: desc, desc: desc,
upperBounds: opts.Buckets, upperBounds: opts.Buckets,
@ -558,7 +573,7 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
h.nativeHistogramZeroThreshold = DefNativeHistogramZeroThreshold h.nativeHistogramZeroThreshold = DefNativeHistogramZeroThreshold
} // Leave h.nativeHistogramZeroThreshold at 0 otherwise. } // Leave h.nativeHistogramZeroThreshold at 0 otherwise.
h.nativeHistogramSchema = pickSchema(opts.NativeHistogramBucketFactor) h.nativeHistogramSchema = pickSchema(opts.NativeHistogramBucketFactor)
h.nativeExemplars = newNativeExemplars(opts.NativeHistogramExemplarTTL, opts.NativeHistogramMaxExemplarCount) h.nativeExemplars = makeNativeExemplars(opts.NativeHistogramExemplarTTL, opts.NativeHistogramMaxExemplars)
} }
for i, upperBound := range h.upperBounds { for i, upperBound := range h.upperBounds {
if i < len(h.upperBounds)-1 { if i < len(h.upperBounds)-1 {
@ -728,15 +743,14 @@ type histogram struct {
// resetScheduled is protected by mtx. It is true if a reset is // resetScheduled is protected by mtx. It is true if a reset is
// scheduled for a later time (when nativeHistogramMinResetDuration has // scheduled for a later time (when nativeHistogramMinResetDuration has
// passed). // passed).
resetScheduled bool resetScheduled bool
nativeExemplars nativeExemplars
// now is for testing purposes, by default it's time.Now. // now is for testing purposes, by default it's time.Now.
now func() time.Time now func() time.Time
// afterFunc is for testing purposes, by default it's time.AfterFunc. // afterFunc is for testing purposes, by default it's time.AfterFunc.
afterFunc func(time.Duration, func()) *time.Timer afterFunc func(time.Duration, func()) *time.Timer
nativeExemplars nativeExemplars
} }
func (h *histogram) Desc() *Desc { func (h *histogram) Desc() *Desc {
@ -747,6 +761,8 @@ func (h *histogram) Observe(v float64) {
h.observe(v, h.findBucket(v)) h.observe(v, h.findBucket(v))
} }
// ObserveWithExemplar should not be called in high-frequency settings,
// since it isn't lock-free for native histograms with configured exemplars.
func (h *histogram) ObserveWithExemplar(v float64, e Labels) { func (h *histogram) ObserveWithExemplar(v float64, e Labels) {
i := h.findBucket(v) i := h.findBucket(v)
h.observe(v, i) h.observe(v, i)
@ -827,7 +843,12 @@ func (h *histogram) Write(out *dto.Metric) error {
}} }}
} }
his.Exemplars = append(his.Exemplars, h.nativeExemplars.exemplars...) if cap(h.nativeExemplars.exemplars) > 0 {
h.nativeExemplars.Lock()
his.Exemplars = append(his.Exemplars, h.nativeExemplars.exemplars...)
h.nativeExemplars.Unlock()
}
} }
addAndResetCounts(hotCounts, coldCounts) addAndResetCounts(hotCounts, coldCounts)
return nil return nil
@ -1098,8 +1119,10 @@ func (h *histogram) resetCounts(counts *histogramCounts) {
deleteSyncMap(&counts.nativeHistogramBucketsPositive) deleteSyncMap(&counts.nativeHistogramBucketsPositive)
} }
// updateExemplar replaces the exemplar for the provided bucket. With empty // updateExemplar replaces the exemplar for the provided classic bucket.
// labels, it's a no-op. It panics if any of the labels is invalid. // With empty labels, it's a no-op. It panics if any of the labels is invalid.
// If histogram is native, the exemplar will be cached into nativeExemplars,
// which has a limit, and will remove one exemplar when limit is reached.
func (h *histogram) updateExemplar(v float64, bucket int, l Labels) { func (h *histogram) updateExemplar(v float64, bucket int, l Labels) {
if l == nil { if l == nil {
return return
@ -1588,56 +1611,140 @@ func addAndResetCounts(hot, cold *histogramCounts) {
} }
type nativeExemplars struct { type nativeExemplars struct {
nativeHistogramExemplarTTL time.Duration sync.Mutex
nativeHistogramMaxExemplarCount uint32
ttl time.Duration
exemplars []*dto.Exemplar exemplars []*dto.Exemplar
lock sync.Mutex
} }
func newNativeExemplars(ttl time.Duration, count uint32) nativeExemplars { func makeNativeExemplars(ttl time.Duration, maxCount int) nativeExemplars {
if ttl == 0 {
ttl = 5 * time.Minute
}
if maxCount == 0 {
maxCount = 10
}
if maxCount < 0 {
maxCount = 0
}
return nativeExemplars{ return nativeExemplars{
nativeHistogramExemplarTTL: ttl, ttl: ttl,
nativeHistogramMaxExemplarCount: count, exemplars: make([]*dto.Exemplar, 0, maxCount),
exemplars: make([]*dto.Exemplar, 0),
lock: sync.Mutex{},
} }
} }
func (n *nativeExemplars) addExemplar(e *dto.Exemplar) { func (n *nativeExemplars) addExemplar(e *dto.Exemplar) {
n.lock.Lock() if cap(n.exemplars) == 0 {
defer n.lock.Unlock()
elogarithm := math.Log(e.GetValue())
if len(n.exemplars) == int(n.nativeHistogramMaxExemplarCount) {
// check if oldestIndex is beyond TTL,
// if so, find the oldest exemplar, and nearest exemplar
oldestTimestamp := time.Now()
oldestIndex := -1
nearestValue := -1.0
nearestIndex := -1
for i, exemplar := range n.exemplars {
if exemplar.Timestamp.AsTime().Before(oldestTimestamp) {
oldestTimestamp = exemplar.Timestamp.AsTime()
oldestIndex = i
}
logarithm := math.Log(exemplar.GetValue())
if nearestValue == -1 || math.Abs(elogarithm-logarithm) < nearestValue {
fmt.Printf("gap: %f", math.Abs(elogarithm-logarithm))
nearestValue = math.Abs(elogarithm - logarithm)
nearestIndex = i
}
}
if oldestIndex != -1 && time.Since(oldestTimestamp) > n.nativeHistogramExemplarTTL {
n.exemplars[oldestIndex] = e
} else {
n.exemplars[nearestIndex] = e
}
return return
} }
n.exemplars = append(n.exemplars, e) n.Lock()
defer n.Unlock()
// The index where to insert the new exemplar.
var nIdx int = -1
// When the number of exemplars has not yet exceeded or
// is equal to cap(n.exemplars), then
// insert the new exemplar directly.
if len(n.exemplars) < cap(n.exemplars) {
for nIdx = 0; nIdx < len(n.exemplars); nIdx++ {
if *e.Value < *n.exemplars[nIdx].Value {
break
}
}
n.exemplars = append(n.exemplars[:nIdx], append([]*dto.Exemplar{e}, n.exemplars[nIdx:]...)...)
return
}
// When the number of exemplars exceeds the limit, remove one exemplar.
var (
rIdx int // The index where to remove the old exemplar.
ot = time.Now() // Oldest timestamp seen.
otIdx = -1 // Index of the exemplar with the oldest timestamp.
md = -1.0 // Logarithm of the delta of the closest pair of exemplars.
mdIdx = -1 // Index of the older exemplar within the closest pair.
cLog float64 // Logarithm of the current exemplar.
pLog float64 // Logarithm of the previous exemplar.
)
for i, exemplar := range n.exemplars {
// Find the exemplar with the oldest timestamp.
if otIdx == -1 || exemplar.Timestamp.AsTime().Before(ot) {
ot = exemplar.Timestamp.AsTime()
otIdx = i
}
// Find the index at which to insert new the exemplar.
if *e.Value <= *exemplar.Value && nIdx == -1 {
nIdx = i
}
// Find the two closest exemplars and pick the one the with older timestamp.
pLog = cLog
cLog = math.Log(exemplar.GetValue())
if i == 0 {
continue
}
diff := math.Abs(cLog - pLog)
if md == -1 || diff < md {
md = diff
if n.exemplars[i].Timestamp.AsTime().Before(n.exemplars[i-1].Timestamp.AsTime()) {
mdIdx = i
} else {
mdIdx = i - 1
}
}
}
// If all existing exemplar are smaller than new exemplar,
// then the exemplar should be inserted at the end.
if nIdx == -1 {
nIdx = len(n.exemplars)
}
if otIdx != -1 && time.Since(ot) > n.ttl {
rIdx = otIdx
} else {
// In the previous for loop, when calculating the closest pair of exemplars,
// we did not take into account the newly inserted exemplar.
// So we need to calculate with the newly inserted exemplar again.
elog := math.Log(e.GetValue())
if nIdx > 0 {
diff := math.Abs(elog - math.Log(n.exemplars[nIdx-1].GetValue()))
if diff < md {
md = diff
mdIdx = nIdx
if n.exemplars[nIdx-1].Timestamp.AsTime().Before(e.Timestamp.AsTime()) {
mdIdx = nIdx - 1
}
}
}
if nIdx < len(n.exemplars) {
diff := math.Abs(math.Log(n.exemplars[nIdx].GetValue()) - elog)
if diff < md {
mdIdx = nIdx
if n.exemplars[nIdx].Timestamp.AsTime().Before(e.Timestamp.AsTime()) {
mdIdx = nIdx
}
}
}
rIdx = mdIdx
}
// Adjust the slice according to rIdx and nIdx.
switch {
case rIdx == nIdx:
n.exemplars[nIdx] = e
case rIdx < nIdx:
n.exemplars = append(n.exemplars[:rIdx], append(n.exemplars[rIdx+1:nIdx], append([]*dto.Exemplar{e}, n.exemplars[nIdx:]...)...)...)
case rIdx > nIdx:
n.exemplars = append(n.exemplars[:nIdx], append([]*dto.Exemplar{e}, append(n.exemplars[nIdx:rIdx], n.exemplars[rIdx+1:]...)...)...)
}
} }

View File

@ -1273,89 +1273,166 @@ func TestHistogramVecCreatedTimestampWithDeletes(t *testing.T) {
} }
func TestNativeHistogramExemplar(t *testing.T) { func TestNativeHistogramExemplar(t *testing.T) {
histogram := NewHistogram(HistogramOpts{ // Test the histogram with positive NativeHistogramExemplarTTL and NativeHistogramMaxExemplars
Name: "test", h := NewHistogram(HistogramOpts{
Help: "test help", Name: "test",
Buckets: []float64{1, 2, 3, 4}, Help: "test help",
NativeHistogramBucketFactor: 1.1, Buckets: []float64{1, 2, 3, 4},
NativeHistogramMaxExemplarCount: 3, NativeHistogramBucketFactor: 1.1,
NativeHistogramExemplarTTL: 10 * time.Second, NativeHistogramMaxExemplars: 3,
NativeHistogramExemplarTTL: 10 * time.Second,
}).(*histogram) }).(*histogram)
// expectedExemplars := []*dto.Exemplar{ tcs := []struct {
// { name string
// Label: []*dto.LabelPair{ addFunc func(*histogram)
// {Name: proto.String("id"), Value: proto.String("1")}, expectedValues []float64
// }, }{
// Value: proto.Float64(1), {
// }, name: "add exemplars to the limit",
// { addFunc: func(h *histogram) {
// Label: []*dto.LabelPair{ h.ObserveWithExemplar(1, Labels{"id": "1"})
// {Name: proto.String("id"), Value: proto.String("2")}, h.ObserveWithExemplar(3, Labels{"id": "1"})
// }, h.ObserveWithExemplar(5, Labels{"id": "1"})
// Value: proto.Float64(3), },
// }, expectedValues: []float64{1, 3, 5},
// { },
// Label: []*dto.LabelPair{ {
// {Name: proto.String("id"), Value: proto.String("3")}, name: "remove exemplar in closest pair, the removed index equals to inserted index",
// }, addFunc: func(h *histogram) {
// Value: proto.Float64(5), h.ObserveWithExemplar(4, Labels{"id": "1"})
// }, },
// } expectedValues: []float64{1, 3, 4},
},
histogram.ObserveWithExemplar(1, Labels{"id": "1"}) {
histogram.ObserveWithExemplar(3, Labels{"id": "1"}) name: "remove exemplar in closest pair, the removed index is bigger than inserted index",
histogram.ObserveWithExemplar(5, Labels{"id": "1"}) addFunc: func(h *histogram) {
h.ObserveWithExemplar(0, Labels{"id": "1"})
if len(histogram.nativeExemplars.exemplars) != 3 { },
t.Errorf("the count of exemplars is not 3") expectedValues: []float64{0, 1, 4},
},
{
name: "remove exemplar with oldest timestamp, the removed index is smaller than inserted index",
addFunc: func(h *histogram) {
time.Sleep(10 * time.Second)
h.ObserveWithExemplar(6, Labels{"id": "1"})
},
expectedValues: []float64{0, 4, 6},
},
} }
expectedValues := map[float64]struct{}{ for _, tc := range tcs {
1: {}, t.Run(tc.name, func(t *testing.T) {
3: {}, tc.addFunc(h)
5: {}, if len(h.nativeExemplars.exemplars) != len(tc.expectedValues) {
t.Errorf("the count of exemplars is not %d", len(tc.expectedValues))
}
for i, e := range h.nativeExemplars.exemplars {
if e.GetValue() != tc.expectedValues[i] {
t.Errorf("the %dth exemplar value %v is not as expected: %v", i, e.GetValue(), tc.expectedValues[i])
}
}
})
} }
for _, e := range histogram.nativeExemplars.exemplars { // Test the histogram with negative NativeHistogramExemplarTTL
if _, ok := expectedValues[e.GetValue()]; !ok { h = NewHistogram(HistogramOpts{
t.Errorf("the value is not in expected value") Name: "test",
} Help: "test help",
Buckets: []float64{1, 2, 3, 4},
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxExemplars: 3,
NativeHistogramExemplarTTL: -1 * time.Second,
}).(*histogram)
tcs = []struct {
name string
addFunc func(*histogram)
expectedValues []float64
}{
{
name: "add exemplars to the limit",
addFunc: func(h *histogram) {
h.ObserveWithExemplar(1, Labels{"id": "1"})
h.ObserveWithExemplar(3, Labels{"id": "1"})
h.ObserveWithExemplar(5, Labels{"id": "1"})
},
expectedValues: []float64{1, 3, 5},
},
{
name: "remove exemplar with oldest timestamp, the removed index is smaller than inserted index",
addFunc: func(h *histogram) {
h.ObserveWithExemplar(4, Labels{"id": "1"})
},
expectedValues: []float64{3, 4, 5},
},
{
name: "remove exemplar with oldest timestamp, the removed index equals to inserted index",
addFunc: func(h *histogram) {
h.ObserveWithExemplar(0, Labels{"id": "1"})
},
expectedValues: []float64{0, 4, 5},
},
{
name: "remove exemplar with oldest timestamp, the removed index is bigger than inserted index",
addFunc: func(h *histogram) {
h.ObserveWithExemplar(3, Labels{"id": "1"})
},
expectedValues: []float64{0, 3, 4},
},
} }
histogram.ObserveWithExemplar(4, Labels{"id": "1"}) for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
if len(histogram.nativeExemplars.exemplars) != 3 { tc.addFunc(h)
t.Errorf("the count of exemplars is not 3") if len(h.nativeExemplars.exemplars) != len(tc.expectedValues) {
t.Errorf("the count of exemplars is not %d", len(tc.expectedValues))
}
for i, e := range h.nativeExemplars.exemplars {
if e.GetValue() != tc.expectedValues[i] {
t.Errorf("the %dth exemplar value %v is not as expected: %v", i, e.GetValue(), tc.expectedValues[i])
}
}
})
} }
expectedValues = map[float64]struct{}{ // Test the histogram with negative NativeHistogramMaxExemplars
1: {}, h = NewHistogram(HistogramOpts{
3: {}, Name: "test",
4: {}, Help: "test help",
Buckets: []float64{1, 2, 3, 4},
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxExemplars: -1,
NativeHistogramExemplarTTL: -1 * time.Second,
}).(*histogram)
tcs = []struct {
name string
addFunc func(*histogram)
expectedValues []float64
}{
{
name: "add exemplars to the limit, but no effect",
addFunc: func(h *histogram) {
h.ObserveWithExemplar(1, Labels{"id": "1"})
h.ObserveWithExemplar(3, Labels{"id": "1"})
h.ObserveWithExemplar(5, Labels{"id": "1"})
},
expectedValues: []float64{},
},
} }
for _, e := range histogram.nativeExemplars.exemplars { for _, tc := range tcs {
if _, ok := expectedValues[e.GetValue()]; !ok { t.Run(tc.name, func(t *testing.T) {
t.Errorf("the value is not in expected value") tc.addFunc(h)
} if len(h.nativeExemplars.exemplars) != len(tc.expectedValues) {
} t.Errorf("the count of exemplars is not %d", len(tc.expectedValues))
}
time.Sleep(10 * time.Second) for i, e := range h.nativeExemplars.exemplars {
histogram.ObserveWithExemplar(6, Labels{"id": "1"}) if e.GetValue() != tc.expectedValues[i] {
t.Errorf("the %dth exemplar value %v is not as expected: %v", i, e.GetValue(), tc.expectedValues[i])
if len(histogram.nativeExemplars.exemplars) != 3 { }
t.Errorf("the count of exemplars is not 3") }
} })
expectedValues = map[float64]struct{}{
6: {},
3: {},
4: {},
}
for _, e := range histogram.nativeExemplars.exemplars {
if _, ok := expectedValues[e.GetValue()]; !ok {
t.Errorf("the value is not in expected value")
}
} }
} }