From 263be8dab7a35a94356772cf193ba630f883deb1 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Tue, 31 Aug 2021 20:17:19 +0200 Subject: [PATCH] Refactoring of sparse histograms Signed-off-by: beorn7 --- prometheus/histogram.go | 334 ++++++++++++++++++++-------------------- 1 file changed, 165 insertions(+), 169 deletions(-) diff --git a/prometheus/histogram.go b/prometheus/histogram.go index 1136907..cd61b83 100644 --- a/prometheus/histogram.go +++ b/prometheus/histogram.go @@ -563,13 +563,7 @@ func (hc *histogramCounts) observe(v float64, bucket int, doSparse bool) { if bucket < len(hc.buckets) { atomic.AddUint64(&hc.buckets[bucket], 1) } - for { - oldBits := atomic.LoadUint64(&hc.sumBits) - newBits := math.Float64bits(math.Float64frombits(oldBits) + v) - if atomic.CompareAndSwapUint64(&hc.sumBits, oldBits, newBits) { - break - } - } + atomicAddFloat(&hc.sumBits, v) if doSparse { var ( sparseKey int @@ -685,10 +679,7 @@ func (h *histogram) Write(out *dto.Metric) error { hotCounts := h.counts[n>>63] coldCounts := h.counts[(^n)>>63] - // Await cooldown. - for count != atomic.LoadUint64(&coldCounts.count) { - runtime.Gosched() // Let observations get work done. - } + waitForCooldown(count, coldCounts) his := &dto.Histogram{ Bucket: make([]*dto.Bucket, len(h.upperBounds)), @@ -718,29 +709,12 @@ func (h *histogram) Write(out *dto.Metric) error { } his.Bucket = append(his.Bucket, b) } - // Add all the cold counts to the new hot counts and reset the cold counts. - atomic.AddUint64(&hotCounts.count, count) - atomic.StoreUint64(&coldCounts.count, 0) - for { - oldBits := atomic.LoadUint64(&hotCounts.sumBits) - newBits := math.Float64bits(math.Float64frombits(oldBits) + his.GetSampleSum()) - if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) { - atomic.StoreUint64(&coldCounts.sumBits, 0) - break - } - } - for i := range h.upperBounds { - atomic.AddUint64(&hotCounts.buckets[i], atomic.LoadUint64(&coldCounts.buckets[i])) - atomic.StoreUint64(&coldCounts.buckets[i], 0) - } if h.sparseSchema > math.MinInt32 { his.SbZeroThreshold = proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sparseZeroThresholdBits))) his.SbSchema = proto.Int32(atomic.LoadInt32(&coldCounts.sparseSchema)) zeroBucket := atomic.LoadUint64(&coldCounts.sparseZeroBucket) defer func() { - atomic.AddUint64(&hotCounts.sparseZeroBucket, zeroBucket) - atomic.StoreUint64(&coldCounts.sparseZeroBucket, 0) coldCounts.sparseBucketsPositive.Range(addAndReset(&hotCounts.sparseBucketsPositive, &hotCounts.sparseBucketsNumber)) coldCounts.sparseBucketsNegative.Range(addAndReset(&hotCounts.sparseBucketsNegative, &hotCounts.sparseBucketsNumber)) }() @@ -749,6 +723,7 @@ func (h *histogram) Write(out *dto.Metric) error { his.SbNegative = makeSparseBuckets(&coldCounts.sparseBucketsNegative) his.SbPositive = makeSparseBuckets(&coldCounts.sparseBucketsPositive) } + addAndResetCounts(hotCounts, coldCounts) return nil } @@ -809,159 +784,138 @@ func (h *histogram) limitSparseBuckets(counts *histogramCounts, value float64, b if h.sparseMaxBuckets >= atomic.LoadUint32(&hotCounts.sparseBucketsNumber) { return // Bucket limit not exceeded after all. } + // Try the various strategies in order. + if h.maybeReset(hotCounts, coldCounts, coldIdx, value, bucket) { + return + } + if h.maybeWidenZeroBucket(hotCounts, coldCounts) { + return + } + h.doubleBucketWidth(hotCounts, coldCounts) +} - // (1) Ideally, we can reset the whole histogram. - +// maybyReset resests the whole histogram if at least h.sparseMinResetDuration +// has been passed. It returns true if the histogram has been reset. The caller +// must have locked h.mtx. +func (h *histogram) maybeReset(hot, cold *histogramCounts, coldIdx uint64, value float64, bucket int) bool { // We are using the possibly mocked h.now() rather than // time.Since(h.lastResetTime) to enable testing. - if h.sparseMinResetDuration > 0 && h.now().Sub(h.lastResetTime) >= h.sparseMinResetDuration { - // Completely reset coldCounts. - h.resetCounts(coldCounts) - // Repeat the latest observation to not lose it completely. - coldCounts.observe(value, bucket, true) - // Make coldCounts the new hot counts while ressetting countAndHotIdx. - n := atomic.SwapUint64(&h.countAndHotIdx, (coldIdx<<63)+1) - count := n & ((1 << 63) - 1) - // Wait for the formerly hot counts to cool down. - for count != atomic.LoadUint64(&hotCounts.count) { - runtime.Gosched() // Let observations get work done. - } - // Finally, reset the formerly hot counts, too. - h.resetCounts(hotCounts) - h.lastResetTime = h.now() - return + if h.sparseMinResetDuration == 0 || h.now().Sub(h.lastResetTime) < h.sparseMinResetDuration { + return false } + // Completely reset coldCounts. + h.resetCounts(cold) + // Repeat the latest observation to not lose it completely. + cold.observe(value, bucket, true) + // Make coldCounts the new hot counts while ressetting countAndHotIdx. + n := atomic.SwapUint64(&h.countAndHotIdx, (coldIdx<<63)+1) + count := n & ((1 << 63) - 1) + waitForCooldown(count, hot) + // Finally, reset the formerly hot counts, too. + h.resetCounts(hot) + h.lastResetTime = h.now() + return true +} - // (2) Try widening the zero bucket. - currentZeroThreshold := math.Float64frombits(atomic.LoadUint64(&hotCounts.sparseZeroThresholdBits)) - switch { // Use switch rather than if to be able to break out of it. - case h.sparseMaxZeroThreshold > currentZeroThreshold: - // Find the key of the bucket closest to zero. - smallestKey := findSmallestKey(&hotCounts.sparseBucketsPositive) - smallestNegativeKey := findSmallestKey(&hotCounts.sparseBucketsNegative) - if smallestNegativeKey < smallestKey { - smallestKey = smallestNegativeKey - } - if smallestKey == math.MaxInt32 { - break - } - newZeroThreshold := getLe(smallestKey, atomic.LoadInt32(&hotCounts.sparseSchema)) - if newZeroThreshold > h.sparseMaxZeroThreshold { - break // New threshold would exceed the max threshold. - } - atomic.StoreUint64(&coldCounts.sparseZeroThresholdBits, math.Float64bits(newZeroThreshold)) - // Remove applicable buckets. - if _, loaded := coldCounts.sparseBucketsNegative.LoadAndDelete(smallestKey); loaded { - atomic.AddUint32(&coldCounts.sparseBucketsNumber, ^uint32(0)) // Decrement, see https://pkg.go.dev/sync/atomic#AddUint32 - } - if _, loaded := coldCounts.sparseBucketsPositive.LoadAndDelete(smallestKey); loaded { - atomic.AddUint32(&coldCounts.sparseBucketsNumber, ^uint32(0)) // Decrement, see https://pkg.go.dev/sync/atomic#AddUint32 - } - // Make coldCounts the new hot counts. - n := atomic.AddUint64(&h.countAndHotIdx, 1<<63) - count := n & ((1 << 63) - 1) - // Swap the pointer names to represent the new roles and make - // the rest less confusing. - hotCounts, coldCounts = coldCounts, hotCounts - // Wait for the (new) cold counts to cool down. - for count != atomic.LoadUint64(&coldCounts.count) { - runtime.Gosched() // Let observations get work done. - } - // Add all the cold counts to the new hot counts, while merging - // the newly deleted buckets into the wider zero bucket, and - // reset and adjust the cold counts. - // TODO(beorn7): Maybe make it more DRY, cf. Write() method. Maybe - // it's too different, though... - atomic.AddUint64(&hotCounts.count, count) - atomic.StoreUint64(&coldCounts.count, 0) - for { - hotBits := atomic.LoadUint64(&hotCounts.sumBits) - coldBits := atomic.LoadUint64(&coldCounts.sumBits) - newBits := math.Float64bits(math.Float64frombits(hotBits) + math.Float64frombits(coldBits)) - if atomic.CompareAndSwapUint64(&hotCounts.sumBits, hotBits, newBits) { - atomic.StoreUint64(&coldCounts.sumBits, 0) - break - } - } - for i := range h.upperBounds { - atomic.AddUint64(&hotCounts.buckets[i], atomic.LoadUint64(&coldCounts.buckets[i])) - atomic.StoreUint64(&coldCounts.buckets[i], 0) - } - atomic.AddUint64(&hotCounts.sparseZeroBucket, atomic.LoadUint64(&coldCounts.sparseZeroBucket)) - atomic.StoreUint64(&coldCounts.sparseZeroBucket, 0) - atomic.StoreUint64(&coldCounts.sparseZeroThresholdBits, math.Float64bits(newZeroThreshold)) - - mergeAndDeleteOrAddAndReset := func(hotBuckets, coldBuckets *sync.Map) func(k, v interface{}) bool { - return func(k, v interface{}) bool { - key := k.(int) - bucket := v.(*int64) - if key == smallestKey { - // Merge into hot zero bucket... - atomic.AddUint64(&hotCounts.sparseZeroBucket, uint64(atomic.LoadInt64(bucket))) - // ...and delete from cold counts. - coldBuckets.Delete(key) - atomic.AddUint32(&coldCounts.sparseBucketsNumber, ^uint32(0)) // Decrement, see https://pkg.go.dev/sync/atomic#AddUint32 - } else { - // Add to corresponding hot bucket... - if addToSparseBucket(hotBuckets, key, atomic.LoadInt64(bucket)) { - atomic.AddUint32(&hotCounts.sparseBucketsNumber, 1) - } - // ...and reset cold bucket. - atomic.StoreInt64(bucket, 0) +// maybeWidenZeroBucket widens the zero bucket until it includes the existing +// buckets closest to the zero bucket (which could be two, if an equidistant +// negative and a positive bucket exists, but usually it's only one bucket to be +// merged into the new wider zero bucket). h.sparseMaxZeroThreshold limits how +// far the zero bucket can be extended, and if that's not enough to include an +// existing bucket, the method returns false. The caller must have locked h.mtx. +func (h *histogram) maybeWidenZeroBucket(hot, cold *histogramCounts) bool { + currentZeroThreshold := math.Float64frombits(atomic.LoadUint64(&hot.sparseZeroThresholdBits)) + if currentZeroThreshold >= h.sparseMaxZeroThreshold { + return false + } + // Find the key of the bucket closest to zero. + smallestKey := findSmallestKey(&hot.sparseBucketsPositive) + smallestNegativeKey := findSmallestKey(&hot.sparseBucketsNegative) + if smallestNegativeKey < smallestKey { + smallestKey = smallestNegativeKey + } + if smallestKey == math.MaxInt32 { + return false + } + newZeroThreshold := getLe(smallestKey, atomic.LoadInt32(&hot.sparseSchema)) + if newZeroThreshold > h.sparseMaxZeroThreshold { + return false // New threshold would exceed the max threshold. + } + atomic.StoreUint64(&cold.sparseZeroThresholdBits, math.Float64bits(newZeroThreshold)) + // Remove applicable buckets. + if _, loaded := cold.sparseBucketsNegative.LoadAndDelete(smallestKey); loaded { + atomicDecUint32(&cold.sparseBucketsNumber) + } + if _, loaded := cold.sparseBucketsPositive.LoadAndDelete(smallestKey); loaded { + atomicDecUint32(&cold.sparseBucketsNumber) + } + // Make cold counts the new hot counts. + n := atomic.AddUint64(&h.countAndHotIdx, 1<<63) + count := n & ((1 << 63) - 1) + // Swap the pointer names to represent the new roles and make + // the rest less confusing. + hot, cold = cold, hot + waitForCooldown(count, cold) + // Add all the now cold counts to the new hot counts... + addAndResetCounts(hot, cold) + // ...adjust the new zero threshold in the cold counts, too... + atomic.StoreUint64(&cold.sparseZeroThresholdBits, math.Float64bits(newZeroThreshold)) + // ...and then merge the newly deleted buckets into the wider zero + // bucket. + mergeAndDeleteOrAddAndReset := func(hotBuckets, coldBuckets *sync.Map) func(k, v interface{}) bool { + return func(k, v interface{}) bool { + key := k.(int) + bucket := v.(*int64) + if key == smallestKey { + // Merge into hot zero bucket... + atomic.AddUint64(&hot.sparseZeroBucket, uint64(atomic.LoadInt64(bucket))) + // ...and delete from cold counts. + coldBuckets.Delete(key) + atomicDecUint32(&cold.sparseBucketsNumber) + } else { + // Add to corresponding hot bucket... + if addToSparseBucket(hotBuckets, key, atomic.LoadInt64(bucket)) { + atomic.AddUint32(&hot.sparseBucketsNumber, 1) } - return true + // ...and reset cold bucket. + atomic.StoreInt64(bucket, 0) } + return true } - - coldCounts.sparseBucketsPositive.Range(mergeAndDeleteOrAddAndReset(&hotCounts.sparseBucketsPositive, &coldCounts.sparseBucketsPositive)) - coldCounts.sparseBucketsNegative.Range(mergeAndDeleteOrAddAndReset(&hotCounts.sparseBucketsNegative, &coldCounts.sparseBucketsNegative)) - return } - // (3) Ultima ratio: Doubling of the bucket width AKA halving the resolution AKA decrementing sparseSchema. - coldSchema := atomic.LoadInt32(&coldCounts.sparseSchema) + cold.sparseBucketsPositive.Range(mergeAndDeleteOrAddAndReset(&hot.sparseBucketsPositive, &cold.sparseBucketsPositive)) + cold.sparseBucketsNegative.Range(mergeAndDeleteOrAddAndReset(&hot.sparseBucketsNegative, &cold.sparseBucketsNegative)) + return true +} + +// doubleBucketWidth doubles the bucket width (by decrementing the schema +// number). Note that very sparse buckets could lead to a low reduction of the +// bucket count (or even no reduction at all). The method does nothing if the +// schema is already -4. +func (h *histogram) doubleBucketWidth(hot, cold *histogramCounts) { + coldSchema := atomic.LoadInt32(&cold.sparseSchema) if coldSchema == -4 { return // Already at lowest resolution. } coldSchema-- - atomic.StoreInt32(&coldCounts.sparseSchema, coldSchema) + atomic.StoreInt32(&cold.sparseSchema, coldSchema) // Play it simple and just delete all cold buckets. - atomic.StoreUint32(&coldCounts.sparseBucketsNumber, 0) - deleteSyncMap(&coldCounts.sparseBucketsNegative) - deleteSyncMap(&coldCounts.sparseBucketsPositive) + atomic.StoreUint32(&cold.sparseBucketsNumber, 0) + deleteSyncMap(&cold.sparseBucketsNegative) + deleteSyncMap(&cold.sparseBucketsPositive) // Make coldCounts the new hot counts. - n = atomic.AddUint64(&h.countAndHotIdx, 1<<63) + n := atomic.AddUint64(&h.countAndHotIdx, 1<<63) count := n & ((1 << 63) - 1) // Swap the pointer names to represent the new roles and make // the rest less confusing. - hotCounts, coldCounts = coldCounts, hotCounts - // Wait for the (new) cold counts to cool down. - for count != atomic.LoadUint64(&coldCounts.count) { - runtime.Gosched() // Let observations get work done. - } - // Add all the cold counts to the new hot counts, while merging the cold - // buckets into the wider hot buckets, and reset and adjust the cold - // counts. - // TODO(beorn7): Maybe make it more DRY, cf. Write() method and code - // above. Maybe it's too different, though... - atomic.AddUint64(&hotCounts.count, count) - atomic.StoreUint64(&coldCounts.count, 0) - for { - hotBits := atomic.LoadUint64(&hotCounts.sumBits) - coldBits := atomic.LoadUint64(&coldCounts.sumBits) - newBits := math.Float64bits(math.Float64frombits(hotBits) + math.Float64frombits(coldBits)) - if atomic.CompareAndSwapUint64(&hotCounts.sumBits, hotBits, newBits) { - atomic.StoreUint64(&coldCounts.sumBits, 0) - break - } - } - for i := range h.upperBounds { - atomic.AddUint64(&hotCounts.buckets[i], atomic.LoadUint64(&coldCounts.buckets[i])) - atomic.StoreUint64(&coldCounts.buckets[i], 0) - } - atomic.AddUint64(&hotCounts.sparseZeroBucket, atomic.LoadUint64(&coldCounts.sparseZeroBucket)) - atomic.StoreUint64(&coldCounts.sparseZeroBucket, 0) - + hot, cold = cold, hot + waitForCooldown(count, cold) + // Add all the now cold counts to the new hot counts... + addAndResetCounts(hot, cold) + // ...adjust the schema in the cold counts, too... + atomic.StoreInt32(&cold.sparseSchema, coldSchema) + // ...and then merge the cold buckets into the wider hot buckets. merge := func(hotBuckets *sync.Map) func(k, v interface{}) bool { return func(k, v interface{}) bool { key := k.(int) @@ -973,19 +927,18 @@ func (h *histogram) limitSparseBuckets(counts *histogramCounts, value float64, b key /= 2 // Add to corresponding hot bucket. if addToSparseBucket(hotBuckets, key, atomic.LoadInt64(bucket)) { - atomic.AddUint32(&hotCounts.sparseBucketsNumber, 1) + atomic.AddUint32(&hot.sparseBucketsNumber, 1) } return true } } - coldCounts.sparseBucketsPositive.Range(merge(&hotCounts.sparseBucketsPositive)) - coldCounts.sparseBucketsNegative.Range(merge(&hotCounts.sparseBucketsNegative)) - atomic.StoreInt32(&coldCounts.sparseSchema, coldSchema) + cold.sparseBucketsPositive.Range(merge(&hot.sparseBucketsPositive)) + cold.sparseBucketsNegative.Range(merge(&hot.sparseBucketsNegative)) // Play it simple again and just delete all cold buckets. - atomic.StoreUint32(&coldCounts.sparseBucketsNumber, 0) - deleteSyncMap(&coldCounts.sparseBucketsNegative) - deleteSyncMap(&coldCounts.sparseBucketsPositive) + atomic.StoreUint32(&cold.sparseBucketsNumber, 0) + deleteSyncMap(&cold.sparseBucketsNegative) + deleteSyncMap(&cold.sparseBucketsPositive) } func (h *histogram) resetCounts(counts *histogramCounts) { @@ -1385,3 +1338,46 @@ func getLe(key int, schema int32) float64 { exp := (key >> schema) + 1 return math.Ldexp(frac, exp) } + +// waitForCooldown returns after the count field in the provided histogramCounts +// has reached the provided count value. +func waitForCooldown(count uint64, counts *histogramCounts) { + for count != atomic.LoadUint64(&counts.count) { + runtime.Gosched() // Let observations get work done. + } +} + +// atomicAddFloat adds the provided float atomically to another float +// represented by the bit pattern the bits pointer is pointing to. +func atomicAddFloat(bits *uint64, v float64) { + for { + loadedBits := atomic.LoadUint64(bits) + newBits := math.Float64bits(math.Float64frombits(loadedBits) + v) + if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) { + break + } + } +} + +// atomicDecUint32 atomically decrements the uint32 p points to. See +// https://pkg.go.dev/sync/atomic#AddUint32 to understand how this is done. +func atomicDecUint32(p *uint32) { + atomic.AddUint32(p, ^uint32(0)) +} + +// addAndResetCounts adds certain fields (count, sum, conventional buckets, +// sparse zero bucket) from the cold counts to the corresponding fields in the +// hot counts. Those fields are then reset to 0 in the cold counts. +func addAndResetCounts(hot, cold *histogramCounts) { + atomic.AddUint64(&hot.count, atomic.LoadUint64(&cold.count)) + atomic.StoreUint64(&cold.count, 0) + coldSum := math.Float64frombits(atomic.LoadUint64(&cold.sumBits)) + atomicAddFloat(&hot.sumBits, coldSum) + atomic.StoreUint64(&cold.sumBits, 0) + for i := range hot.buckets { + atomic.AddUint64(&hot.buckets[i], atomic.LoadUint64(&cold.buckets[i])) + atomic.StoreUint64(&cold.buckets[i], 0) + } + atomic.AddUint64(&hot.sparseZeroBucket, atomic.LoadUint64(&cold.sparseZeroBucket)) + atomic.StoreUint64(&cold.sparseZeroBucket, 0) +}