Refactoring of sparse histograms

Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
beorn7 2021-08-31 20:17:19 +02:00
parent 24099603bc
commit 263be8dab7
1 changed files with 165 additions and 169 deletions

View File

@ -563,13 +563,7 @@ func (hc *histogramCounts) observe(v float64, bucket int, doSparse bool) {
if bucket < len(hc.buckets) {
atomic.AddUint64(&hc.buckets[bucket], 1)
}
for {
oldBits := atomic.LoadUint64(&hc.sumBits)
newBits := math.Float64bits(math.Float64frombits(oldBits) + v)
if atomic.CompareAndSwapUint64(&hc.sumBits, oldBits, newBits) {
break
}
}
atomicAddFloat(&hc.sumBits, v)
if doSparse {
var (
sparseKey int
@ -685,10 +679,7 @@ func (h *histogram) Write(out *dto.Metric) error {
hotCounts := h.counts[n>>63]
coldCounts := h.counts[(^n)>>63]
// Await cooldown.
for count != atomic.LoadUint64(&coldCounts.count) {
runtime.Gosched() // Let observations get work done.
}
waitForCooldown(count, coldCounts)
his := &dto.Histogram{
Bucket: make([]*dto.Bucket, len(h.upperBounds)),
@ -718,29 +709,12 @@ func (h *histogram) Write(out *dto.Metric) error {
}
his.Bucket = append(his.Bucket, b)
}
// Add all the cold counts to the new hot counts and reset the cold counts.
atomic.AddUint64(&hotCounts.count, count)
atomic.StoreUint64(&coldCounts.count, 0)
for {
oldBits := atomic.LoadUint64(&hotCounts.sumBits)
newBits := math.Float64bits(math.Float64frombits(oldBits) + his.GetSampleSum())
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
atomic.StoreUint64(&coldCounts.sumBits, 0)
break
}
}
for i := range h.upperBounds {
atomic.AddUint64(&hotCounts.buckets[i], atomic.LoadUint64(&coldCounts.buckets[i]))
atomic.StoreUint64(&coldCounts.buckets[i], 0)
}
if h.sparseSchema > math.MinInt32 {
his.SbZeroThreshold = proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sparseZeroThresholdBits)))
his.SbSchema = proto.Int32(atomic.LoadInt32(&coldCounts.sparseSchema))
zeroBucket := atomic.LoadUint64(&coldCounts.sparseZeroBucket)
defer func() {
atomic.AddUint64(&hotCounts.sparseZeroBucket, zeroBucket)
atomic.StoreUint64(&coldCounts.sparseZeroBucket, 0)
coldCounts.sparseBucketsPositive.Range(addAndReset(&hotCounts.sparseBucketsPositive, &hotCounts.sparseBucketsNumber))
coldCounts.sparseBucketsNegative.Range(addAndReset(&hotCounts.sparseBucketsNegative, &hotCounts.sparseBucketsNumber))
}()
@ -749,6 +723,7 @@ func (h *histogram) Write(out *dto.Metric) error {
his.SbNegative = makeSparseBuckets(&coldCounts.sparseBucketsNegative)
his.SbPositive = makeSparseBuckets(&coldCounts.sparseBucketsPositive)
}
addAndResetCounts(hotCounts, coldCounts)
return nil
}
@ -809,102 +784,98 @@ func (h *histogram) limitSparseBuckets(counts *histogramCounts, value float64, b
if h.sparseMaxBuckets >= atomic.LoadUint32(&hotCounts.sparseBucketsNumber) {
return // Bucket limit not exceeded after all.
}
// Try the various strategies in order.
if h.maybeReset(hotCounts, coldCounts, coldIdx, value, bucket) {
return
}
if h.maybeWidenZeroBucket(hotCounts, coldCounts) {
return
}
h.doubleBucketWidth(hotCounts, coldCounts)
}
// (1) Ideally, we can reset the whole histogram.
// maybyReset resests the whole histogram if at least h.sparseMinResetDuration
// has been passed. It returns true if the histogram has been reset. The caller
// must have locked h.mtx.
func (h *histogram) maybeReset(hot, cold *histogramCounts, coldIdx uint64, value float64, bucket int) bool {
// We are using the possibly mocked h.now() rather than
// time.Since(h.lastResetTime) to enable testing.
if h.sparseMinResetDuration > 0 && h.now().Sub(h.lastResetTime) >= h.sparseMinResetDuration {
if h.sparseMinResetDuration == 0 || h.now().Sub(h.lastResetTime) < h.sparseMinResetDuration {
return false
}
// Completely reset coldCounts.
h.resetCounts(coldCounts)
h.resetCounts(cold)
// Repeat the latest observation to not lose it completely.
coldCounts.observe(value, bucket, true)
cold.observe(value, bucket, true)
// Make coldCounts the new hot counts while ressetting countAndHotIdx.
n := atomic.SwapUint64(&h.countAndHotIdx, (coldIdx<<63)+1)
count := n & ((1 << 63) - 1)
// Wait for the formerly hot counts to cool down.
for count != atomic.LoadUint64(&hotCounts.count) {
runtime.Gosched() // Let observations get work done.
}
waitForCooldown(count, hot)
// Finally, reset the formerly hot counts, too.
h.resetCounts(hotCounts)
h.resetCounts(hot)
h.lastResetTime = h.now()
return
return true
}
// (2) Try widening the zero bucket.
currentZeroThreshold := math.Float64frombits(atomic.LoadUint64(&hotCounts.sparseZeroThresholdBits))
switch { // Use switch rather than if to be able to break out of it.
case h.sparseMaxZeroThreshold > currentZeroThreshold:
// maybeWidenZeroBucket widens the zero bucket until it includes the existing
// buckets closest to the zero bucket (which could be two, if an equidistant
// negative and a positive bucket exists, but usually it's only one bucket to be
// merged into the new wider zero bucket). h.sparseMaxZeroThreshold limits how
// far the zero bucket can be extended, and if that's not enough to include an
// existing bucket, the method returns false. The caller must have locked h.mtx.
func (h *histogram) maybeWidenZeroBucket(hot, cold *histogramCounts) bool {
currentZeroThreshold := math.Float64frombits(atomic.LoadUint64(&hot.sparseZeroThresholdBits))
if currentZeroThreshold >= h.sparseMaxZeroThreshold {
return false
}
// Find the key of the bucket closest to zero.
smallestKey := findSmallestKey(&hotCounts.sparseBucketsPositive)
smallestNegativeKey := findSmallestKey(&hotCounts.sparseBucketsNegative)
smallestKey := findSmallestKey(&hot.sparseBucketsPositive)
smallestNegativeKey := findSmallestKey(&hot.sparseBucketsNegative)
if smallestNegativeKey < smallestKey {
smallestKey = smallestNegativeKey
}
if smallestKey == math.MaxInt32 {
break
return false
}
newZeroThreshold := getLe(smallestKey, atomic.LoadInt32(&hotCounts.sparseSchema))
newZeroThreshold := getLe(smallestKey, atomic.LoadInt32(&hot.sparseSchema))
if newZeroThreshold > h.sparseMaxZeroThreshold {
break // New threshold would exceed the max threshold.
return false // New threshold would exceed the max threshold.
}
atomic.StoreUint64(&coldCounts.sparseZeroThresholdBits, math.Float64bits(newZeroThreshold))
atomic.StoreUint64(&cold.sparseZeroThresholdBits, math.Float64bits(newZeroThreshold))
// Remove applicable buckets.
if _, loaded := coldCounts.sparseBucketsNegative.LoadAndDelete(smallestKey); loaded {
atomic.AddUint32(&coldCounts.sparseBucketsNumber, ^uint32(0)) // Decrement, see https://pkg.go.dev/sync/atomic#AddUint32
if _, loaded := cold.sparseBucketsNegative.LoadAndDelete(smallestKey); loaded {
atomicDecUint32(&cold.sparseBucketsNumber)
}
if _, loaded := coldCounts.sparseBucketsPositive.LoadAndDelete(smallestKey); loaded {
atomic.AddUint32(&coldCounts.sparseBucketsNumber, ^uint32(0)) // Decrement, see https://pkg.go.dev/sync/atomic#AddUint32
if _, loaded := cold.sparseBucketsPositive.LoadAndDelete(smallestKey); loaded {
atomicDecUint32(&cold.sparseBucketsNumber)
}
// Make coldCounts the new hot counts.
// Make cold counts the new hot counts.
n := atomic.AddUint64(&h.countAndHotIdx, 1<<63)
count := n & ((1 << 63) - 1)
// Swap the pointer names to represent the new roles and make
// the rest less confusing.
hotCounts, coldCounts = coldCounts, hotCounts
// Wait for the (new) cold counts to cool down.
for count != atomic.LoadUint64(&coldCounts.count) {
runtime.Gosched() // Let observations get work done.
}
// Add all the cold counts to the new hot counts, while merging
// the newly deleted buckets into the wider zero bucket, and
// reset and adjust the cold counts.
// TODO(beorn7): Maybe make it more DRY, cf. Write() method. Maybe
// it's too different, though...
atomic.AddUint64(&hotCounts.count, count)
atomic.StoreUint64(&coldCounts.count, 0)
for {
hotBits := atomic.LoadUint64(&hotCounts.sumBits)
coldBits := atomic.LoadUint64(&coldCounts.sumBits)
newBits := math.Float64bits(math.Float64frombits(hotBits) + math.Float64frombits(coldBits))
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, hotBits, newBits) {
atomic.StoreUint64(&coldCounts.sumBits, 0)
break
}
}
for i := range h.upperBounds {
atomic.AddUint64(&hotCounts.buckets[i], atomic.LoadUint64(&coldCounts.buckets[i]))
atomic.StoreUint64(&coldCounts.buckets[i], 0)
}
atomic.AddUint64(&hotCounts.sparseZeroBucket, atomic.LoadUint64(&coldCounts.sparseZeroBucket))
atomic.StoreUint64(&coldCounts.sparseZeroBucket, 0)
atomic.StoreUint64(&coldCounts.sparseZeroThresholdBits, math.Float64bits(newZeroThreshold))
hot, cold = cold, hot
waitForCooldown(count, cold)
// Add all the now cold counts to the new hot counts...
addAndResetCounts(hot, cold)
// ...adjust the new zero threshold in the cold counts, too...
atomic.StoreUint64(&cold.sparseZeroThresholdBits, math.Float64bits(newZeroThreshold))
// ...and then merge the newly deleted buckets into the wider zero
// bucket.
mergeAndDeleteOrAddAndReset := func(hotBuckets, coldBuckets *sync.Map) func(k, v interface{}) bool {
return func(k, v interface{}) bool {
key := k.(int)
bucket := v.(*int64)
if key == smallestKey {
// Merge into hot zero bucket...
atomic.AddUint64(&hotCounts.sparseZeroBucket, uint64(atomic.LoadInt64(bucket)))
atomic.AddUint64(&hot.sparseZeroBucket, uint64(atomic.LoadInt64(bucket)))
// ...and delete from cold counts.
coldBuckets.Delete(key)
atomic.AddUint32(&coldCounts.sparseBucketsNumber, ^uint32(0)) // Decrement, see https://pkg.go.dev/sync/atomic#AddUint32
atomicDecUint32(&cold.sparseBucketsNumber)
} else {
// Add to corresponding hot bucket...
if addToSparseBucket(hotBuckets, key, atomic.LoadInt64(bucket)) {
atomic.AddUint32(&hotCounts.sparseBucketsNumber, 1)
atomic.AddUint32(&hot.sparseBucketsNumber, 1)
}
// ...and reset cold bucket.
atomic.StoreInt64(bucket, 0)
@ -913,55 +884,38 @@ func (h *histogram) limitSparseBuckets(counts *histogramCounts, value float64, b
}
}
coldCounts.sparseBucketsPositive.Range(mergeAndDeleteOrAddAndReset(&hotCounts.sparseBucketsPositive, &coldCounts.sparseBucketsPositive))
coldCounts.sparseBucketsNegative.Range(mergeAndDeleteOrAddAndReset(&hotCounts.sparseBucketsNegative, &coldCounts.sparseBucketsNegative))
return
cold.sparseBucketsPositive.Range(mergeAndDeleteOrAddAndReset(&hot.sparseBucketsPositive, &cold.sparseBucketsPositive))
cold.sparseBucketsNegative.Range(mergeAndDeleteOrAddAndReset(&hot.sparseBucketsNegative, &cold.sparseBucketsNegative))
return true
}
// (3) Ultima ratio: Doubling of the bucket width AKA halving the resolution AKA decrementing sparseSchema.
coldSchema := atomic.LoadInt32(&coldCounts.sparseSchema)
// doubleBucketWidth doubles the bucket width (by decrementing the schema
// number). Note that very sparse buckets could lead to a low reduction of the
// bucket count (or even no reduction at all). The method does nothing if the
// schema is already -4.
func (h *histogram) doubleBucketWidth(hot, cold *histogramCounts) {
coldSchema := atomic.LoadInt32(&cold.sparseSchema)
if coldSchema == -4 {
return // Already at lowest resolution.
}
coldSchema--
atomic.StoreInt32(&coldCounts.sparseSchema, coldSchema)
atomic.StoreInt32(&cold.sparseSchema, coldSchema)
// Play it simple and just delete all cold buckets.
atomic.StoreUint32(&coldCounts.sparseBucketsNumber, 0)
deleteSyncMap(&coldCounts.sparseBucketsNegative)
deleteSyncMap(&coldCounts.sparseBucketsPositive)
atomic.StoreUint32(&cold.sparseBucketsNumber, 0)
deleteSyncMap(&cold.sparseBucketsNegative)
deleteSyncMap(&cold.sparseBucketsPositive)
// Make coldCounts the new hot counts.
n = atomic.AddUint64(&h.countAndHotIdx, 1<<63)
n := atomic.AddUint64(&h.countAndHotIdx, 1<<63)
count := n & ((1 << 63) - 1)
// Swap the pointer names to represent the new roles and make
// the rest less confusing.
hotCounts, coldCounts = coldCounts, hotCounts
// Wait for the (new) cold counts to cool down.
for count != atomic.LoadUint64(&coldCounts.count) {
runtime.Gosched() // Let observations get work done.
}
// Add all the cold counts to the new hot counts, while merging the cold
// buckets into the wider hot buckets, and reset and adjust the cold
// counts.
// TODO(beorn7): Maybe make it more DRY, cf. Write() method and code
// above. Maybe it's too different, though...
atomic.AddUint64(&hotCounts.count, count)
atomic.StoreUint64(&coldCounts.count, 0)
for {
hotBits := atomic.LoadUint64(&hotCounts.sumBits)
coldBits := atomic.LoadUint64(&coldCounts.sumBits)
newBits := math.Float64bits(math.Float64frombits(hotBits) + math.Float64frombits(coldBits))
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, hotBits, newBits) {
atomic.StoreUint64(&coldCounts.sumBits, 0)
break
}
}
for i := range h.upperBounds {
atomic.AddUint64(&hotCounts.buckets[i], atomic.LoadUint64(&coldCounts.buckets[i]))
atomic.StoreUint64(&coldCounts.buckets[i], 0)
}
atomic.AddUint64(&hotCounts.sparseZeroBucket, atomic.LoadUint64(&coldCounts.sparseZeroBucket))
atomic.StoreUint64(&coldCounts.sparseZeroBucket, 0)
hot, cold = cold, hot
waitForCooldown(count, cold)
// Add all the now cold counts to the new hot counts...
addAndResetCounts(hot, cold)
// ...adjust the schema in the cold counts, too...
atomic.StoreInt32(&cold.sparseSchema, coldSchema)
// ...and then merge the cold buckets into the wider hot buckets.
merge := func(hotBuckets *sync.Map) func(k, v interface{}) bool {
return func(k, v interface{}) bool {
key := k.(int)
@ -973,19 +927,18 @@ func (h *histogram) limitSparseBuckets(counts *histogramCounts, value float64, b
key /= 2
// Add to corresponding hot bucket.
if addToSparseBucket(hotBuckets, key, atomic.LoadInt64(bucket)) {
atomic.AddUint32(&hotCounts.sparseBucketsNumber, 1)
atomic.AddUint32(&hot.sparseBucketsNumber, 1)
}
return true
}
}
coldCounts.sparseBucketsPositive.Range(merge(&hotCounts.sparseBucketsPositive))
coldCounts.sparseBucketsNegative.Range(merge(&hotCounts.sparseBucketsNegative))
atomic.StoreInt32(&coldCounts.sparseSchema, coldSchema)
cold.sparseBucketsPositive.Range(merge(&hot.sparseBucketsPositive))
cold.sparseBucketsNegative.Range(merge(&hot.sparseBucketsNegative))
// Play it simple again and just delete all cold buckets.
atomic.StoreUint32(&coldCounts.sparseBucketsNumber, 0)
deleteSyncMap(&coldCounts.sparseBucketsNegative)
deleteSyncMap(&coldCounts.sparseBucketsPositive)
atomic.StoreUint32(&cold.sparseBucketsNumber, 0)
deleteSyncMap(&cold.sparseBucketsNegative)
deleteSyncMap(&cold.sparseBucketsPositive)
}
func (h *histogram) resetCounts(counts *histogramCounts) {
@ -1385,3 +1338,46 @@ func getLe(key int, schema int32) float64 {
exp := (key >> schema) + 1
return math.Ldexp(frac, exp)
}
// waitForCooldown returns after the count field in the provided histogramCounts
// has reached the provided count value.
func waitForCooldown(count uint64, counts *histogramCounts) {
for count != atomic.LoadUint64(&counts.count) {
runtime.Gosched() // Let observations get work done.
}
}
// atomicAddFloat adds the provided float atomically to another float
// represented by the bit pattern the bits pointer is pointing to.
func atomicAddFloat(bits *uint64, v float64) {
for {
loadedBits := atomic.LoadUint64(bits)
newBits := math.Float64bits(math.Float64frombits(loadedBits) + v)
if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) {
break
}
}
}
// atomicDecUint32 atomically decrements the uint32 p points to. See
// https://pkg.go.dev/sync/atomic#AddUint32 to understand how this is done.
func atomicDecUint32(p *uint32) {
atomic.AddUint32(p, ^uint32(0))
}
// addAndResetCounts adds certain fields (count, sum, conventional buckets,
// sparse zero bucket) from the cold counts to the corresponding fields in the
// hot counts. Those fields are then reset to 0 in the cold counts.
func addAndResetCounts(hot, cold *histogramCounts) {
atomic.AddUint64(&hot.count, atomic.LoadUint64(&cold.count))
atomic.StoreUint64(&cold.count, 0)
coldSum := math.Float64frombits(atomic.LoadUint64(&cold.sumBits))
atomicAddFloat(&hot.sumBits, coldSum)
atomic.StoreUint64(&cold.sumBits, 0)
for i := range hot.buckets {
atomic.AddUint64(&hot.buckets[i], atomic.LoadUint64(&cold.buckets[i]))
atomic.StoreUint64(&cold.buckets[i], 0)
}
atomic.AddUint64(&hot.sparseZeroBucket, atomic.LoadUint64(&cold.sparseZeroBucket))
atomic.StoreUint64(&cold.sparseZeroBucket, 0)
}