From 24099603bc44b75a6946744e1d468ff59d1c0ac6 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 18 Aug 2021 19:04:29 +0200 Subject: [PATCH] Implement strategy to limit the sparse bucket count Signed-off-by: beorn7 --- prometheus/histogram.go | 624 ++++++++++++++++++++++++++--------- prometheus/histogram_test.go | 244 ++++++++++++-- 2 files changed, 689 insertions(+), 179 deletions(-) diff --git a/prometheus/histogram.go b/prometheus/histogram.go index 219be31..1136907 100644 --- a/prometheus/histogram.go +++ b/prometheus/histogram.go @@ -39,21 +39,21 @@ import ( // https://github.com/open-telemetry/opentelemetry-specification/issues/1776#issuecomment-870164310 var sparseBounds = [][]float64{ // Schema "0": - []float64{0.5}, + {0.5}, // Schema 1: - []float64{0.5, 0.7071067811865475}, + {0.5, 0.7071067811865475}, // Schema 2: - []float64{0.5, 0.5946035575013605, 0.7071067811865475, 0.8408964152537144}, + {0.5, 0.5946035575013605, 0.7071067811865475, 0.8408964152537144}, // Schema 3: - []float64{0.5, 0.5452538663326288, 0.5946035575013605, 0.6484197773255048, + {0.5, 0.5452538663326288, 0.5946035575013605, 0.6484197773255048, 0.7071067811865475, 0.7711054127039704, 0.8408964152537144, 0.9170040432046711}, // Schema 4: - []float64{0.5, 0.5221368912137069, 0.5452538663326288, 0.5693943173783458, + {0.5, 0.5221368912137069, 0.5452538663326288, 0.5693943173783458, 0.5946035575013605, 0.620928906036742, 0.6484197773255048, 0.6771277734684463, 0.7071067811865475, 0.7384130729697496, 0.7711054127039704, 0.805245165974627, 0.8408964152537144, 0.8781260801866495, 0.9170040432046711, 0.9576032806985735}, // Schema 5: - []float64{0.5, 0.5109485743270583, 0.5221368912137069, 0.5335702003384117, + {0.5, 0.5109485743270583, 0.5221368912137069, 0.5335702003384117, 0.5452538663326288, 0.5571933712979462, 0.5693943173783458, 0.5818624293887887, 0.5946035575013605, 0.6076236799902344, 0.620928906036742, 0.6345254785958666, 0.6484197773255048, 0.6626183215798706, 0.6771277734684463, 0.6919549409819159, @@ -62,7 +62,7 @@ var sparseBounds = [][]float64{ 0.8408964152537144, 0.8593096490612387, 0.8781260801866495, 0.8973545375015533, 0.9170040432046711, 0.9370838170551498, 0.9576032806985735, 0.9785720620876999}, // Schema 6: - []float64{0.5, 0.5054446430258502, 0.5109485743270583, 0.5165124395106142, + {0.5, 0.5054446430258502, 0.5109485743270583, 0.5165124395106142, 0.5221368912137069, 0.5278225891802786, 0.5335702003384117, 0.5393803988785598, 0.5452538663326288, 0.5511912916539204, 0.5571933712979462, 0.5632608093041209, 0.5693943173783458, 0.5755946149764913, 0.5818624293887887, 0.5881984958251406, @@ -79,7 +79,7 @@ var sparseBounds = [][]float64{ 0.9170040432046711, 0.9269895625416926, 0.9370838170551498, 0.9472879907934827, 0.9576032806985735, 0.9680308967461471, 0.9785720620876999, 0.9892280131939752}, // Schema 7: - []float64{0.5, 0.5027149505564014, 0.5054446430258502, 0.5081891574554764, + {0.5, 0.5027149505564014, 0.5054446430258502, 0.5081891574554764, 0.5109485743270583, 0.5137229745593818, 0.5165124395106142, 0.5193170509806894, 0.5221368912137069, 0.5249720429003435, 0.5278225891802786, 0.5306886136446309, 0.5335702003384117, 0.5364674337629877, 0.5393803988785598, 0.5423091811066545, @@ -112,7 +112,7 @@ var sparseBounds = [][]float64{ 0.9576032806985735, 0.9628029718180622, 0.9680308967461471, 0.9732872087896164, 0.9785720620876999, 0.9838856116165875, 0.9892280131939752, 0.9945994234836328}, // Schema 8: - []float64{0.5, 0.5013556375251013, 0.5027149505564014, 0.5040779490592088, + {0.5, 0.5013556375251013, 0.5027149505564014, 0.5040779490592088, 0.5054446430258502, 0.5068150424757447, 0.5081891574554764, 0.509566998038869, 0.5109485743270583, 0.5123338964485679, 0.5137229745593818, 0.5151158188430205, 0.5165124395106142, 0.5179128468009786, 0.5193170509806894, 0.520725062344158, @@ -405,10 +405,27 @@ type HistogramOpts struct { // of making the zero value of HistogramOpts meaningful. Has to be // solved more elegantly in the final version.) SparseBucketsZeroThreshold float64 - // TODO(beorn7): Need a setting to limit total bucket count and to - // configure a strategy to enforce the limit, e.g. if minimum duration - // after last reset, reset. If not, half the resolution and/or expand - // the zero bucket. + + // The remaining fields define a strategy to limit the number of + // populated sparse buckets. If SparseBucketsMaxNumber is left at zero, + // the number of buckets is not limited. Otherwise, once the provided + // number is exceeded, the following strategy is enacted: First, if the + // last reset (or the creation) of the histogram is at least + // SparseBucketsMinResetDuration ago, then the whole histogram is reset + // to its initial state (including regular buckets). If less time has + // passed, or if SparseBucketsMinResetDuration is zero, no reset is + // performed. Instead, the zero threshold is increased sufficiently to + // reduce the number of buckets to or below SparseBucketsMaxNumber, but + // not to more than SparseBucketsMaxZeroThreshold. Thus, if + // SparseBucketsMaxZeroThreshold is already at or below the current zero + // threshold, nothing happens at this step. After that, if the number of + // buckets still exceeds SparseBucketsMaxNumber, the resolution of the + // histogram is reduced by doubling the width of the sparse buckets (up + // to a growth factor between one bucket to the next of 2^(2^4) = 65536, + // see above). + SparseBucketsMaxNumber uint32 + SparseBucketsMinResetDuration time.Duration + SparseBucketsMaxZeroThreshold float64 } // NewHistogram creates a new Histogram based on the provided HistogramOpts. It @@ -446,11 +463,14 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr } h := &histogram{ - desc: desc, - upperBounds: opts.Buckets, - labelPairs: MakeLabelPairs(desc, labelValues), - counts: [2]*histogramCounts{{}, {}}, - now: time.Now, + desc: desc, + upperBounds: opts.Buckets, + labelPairs: MakeLabelPairs(desc, labelValues), + sparseMaxBuckets: opts.SparseBucketsMaxNumber, + sparseMaxZeroThreshold: opts.SparseBucketsMaxZeroThreshold, + sparseMinResetDuration: opts.SparseBucketsMinResetDuration, + lastResetTime: time.Now(), + now: time.Now, } if len(h.upperBounds) == 0 && opts.SparseBucketsFactor <= 1 { h.upperBounds = DefBuckets @@ -460,9 +480,9 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr } else { switch { case opts.SparseBucketsZeroThreshold > 0: - h.sparseThreshold = opts.SparseBucketsZeroThreshold + h.sparseZeroThreshold = opts.SparseBucketsZeroThreshold case opts.SparseBucketsZeroThreshold == 0: - h.sparseThreshold = DefSparseBucketsZeroThreshold + h.sparseZeroThreshold = DefSparseBucketsZeroThreshold } // Leave h.sparseThreshold at 0 otherwise. h.sparseSchema = pickSparseSchema(opts.SparseBucketsFactor) } @@ -483,8 +503,16 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr } // Finally we know the final length of h.upperBounds and can make buckets // for both counts as well as exemplars: - h.counts[0].buckets = make([]uint64, len(h.upperBounds)) - h.counts[1].buckets = make([]uint64, len(h.upperBounds)) + h.counts[0] = &histogramCounts{ + buckets: make([]uint64, len(h.upperBounds)), + sparseZeroThresholdBits: math.Float64bits(h.sparseZeroThreshold), + sparseSchema: h.sparseSchema, + } + h.counts[1] = &histogramCounts{ + buckets: make([]uint64, len(h.upperBounds)), + sparseZeroThresholdBits: math.Float64bits(h.sparseZeroThreshold), + sparseSchema: h.sparseSchema, + } h.exemplars = make([]atomic.Value, len(h.upperBounds)+1) h.init(h) // Init self-collection. @@ -492,14 +520,32 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr } type histogramCounts struct { + // Order in this struct matters for the alignment required by atomic + // operations, see http://golang.org/pkg/sync/atomic/#pkg-note-BUG + // sumBits contains the bits of the float64 representing the sum of all - // observations. sumBits and count have to go first in the struct to - // guarantee alignment for atomic operations. - // http://golang.org/pkg/sync/atomic/#pkg-note-BUG + // observations. sumBits uint64 count uint64 + + // sparseZeroBucket counts all (positive and negative) observations in + // the zero bucket (with an absolute value less or equal the current + // threshold, see next field. + sparseZeroBucket uint64 + // sparseZeroThresholdBits is the bit pattern of the current threshold + // for the zero bucket. It's initially equal to sparseZeroThreshold but + // may change according to the bucket count limitation strategy. + sparseZeroThresholdBits uint64 + // sparseSchema may change over time according to the bucket count + // limitation strategy and therefore has to be saved here. + sparseSchema int32 + // Number of (positive and negative) sparse buckets. + sparseBucketsNumber uint32 + + // Regular buckets. buckets []uint64 - // sparse buckets are implemented with a sync.Map for now. A dedicated + + // Sparse buckets are implemented with a sync.Map for now. A dedicated // data structure will likely be more efficient. There are separate maps // for negative and positive observations. The map's value is an *int64, // counting observations in that bucket. (Note that we don't use uint64 @@ -508,18 +554,12 @@ type histogramCounts struct { // map's key is the index of the bucket according to the used // sparseSchema. Index 0 is for an upper bound of 1. sparseBucketsPositive, sparseBucketsNegative sync.Map - // sparseZeroBucket counts all (positive and negative) observations in - // the zero bucket (with an absolute value less or equal - // SparseBucketsZeroThreshold). - sparseZeroBucket uint64 } // observe manages the parts of observe that only affects // histogramCounts. doSparse is true if spare buckets should be done, -// too. whichSparse is 0 for the sparseZeroBucket and +1 or -1 for -// sparseBucketsPositive or sparseBucketsNegative, respectively. sparseKey is -// the key of the sparse bucket to use. -func (hc *histogramCounts) observe(v float64, bucket int, doSparse bool, whichSparse int, sparseKey int) { +// too. +func (hc *histogramCounts) observe(v float64, bucket int, doSparse bool) { if bucket < len(hc.buckets) { atomic.AddUint64(&hc.buckets[bucket], 1) } @@ -531,15 +571,36 @@ func (hc *histogramCounts) observe(v float64, bucket int, doSparse bool, whichSp } } if doSparse { - switch whichSparse { - case 0: - atomic.AddUint64(&hc.sparseZeroBucket, 1) - case +1: - addToSparseBucket(&hc.sparseBucketsPositive, sparseKey, 1) - case -1: - addToSparseBucket(&hc.sparseBucketsNegative, sparseKey, 1) + var ( + sparseKey int + sparseSchema = atomic.LoadInt32(&hc.sparseSchema) + sparseZeroThreshold = math.Float64frombits(atomic.LoadUint64(&hc.sparseZeroThresholdBits)) + frac, exp = math.Frexp(math.Abs(v)) + bucketCreated bool + ) + switch { + case math.IsInf(v, 0): + sparseKey = math.MaxInt32 // Largest possible sparseKey. + case sparseSchema > 0: + bounds := sparseBounds[sparseSchema] + sparseKey = sort.SearchFloat64s(bounds, frac) + (exp-1)*len(bounds) default: - panic(fmt.Errorf("invalid value for whichSparse: %d", whichSparse)) + sparseKey = exp + if frac == 0.5 { + sparseKey-- + } + sparseKey /= 1 << -sparseSchema + } + switch { + case v > sparseZeroThreshold: + bucketCreated = addToSparseBucket(&hc.sparseBucketsPositive, sparseKey, 1) + case v < -sparseZeroThreshold: + bucketCreated = addToSparseBucket(&hc.sparseBucketsNegative, sparseKey, 1) + default: + atomic.AddUint64(&hc.sparseZeroBucket, 1) + } + if bucketCreated { + atomic.AddUint32(&hc.sparseBucketsNumber, 1) } } // Increment count last as we take it as a signal that the observation @@ -547,21 +608,6 @@ func (hc *histogramCounts) observe(v float64, bucket int, doSparse bool, whichSp atomic.AddUint64(&hc.count, 1) } -func addToSparseBucket(buckets *sync.Map, key int, increment int64) { - if existingBucket, ok := buckets.Load(key); ok { - // Fast path without allocation. - atomic.AddInt64(existingBucket.(*int64), increment) - return - } - // Bucket doesn't exist yet. Slow path allocating new counter. - newBucket := increment // TODO(beorn7): Check if this is sufficient to not let increment escape. - if actualBucket, loaded := buckets.LoadOrStore(key, &newBucket); loaded { - // The bucket was created concurrently in another goroutine. - // Have to increment after all. - atomic.AddInt64(actualBucket.(*int64), increment) - } -} - type histogram struct { // countAndHotIdx enables lock-free writes with use of atomic updates. // The most significant bit is the hot index [0 or 1] of the count field @@ -582,8 +628,10 @@ type histogram struct { countAndHotIdx uint64 selfCollector - desc *Desc - writeMtx sync.Mutex // Only used in the Write method. + desc *Desc + + // Only used in the Write method and for sparse bucket management. + mtx sync.Mutex // Two counts, one is "hot" for lock-free observations, the other is // "cold" for writing out a dto.Metric. It has to be an array of @@ -591,11 +639,15 @@ type histogram struct { // http://golang.org/pkg/sync/atomic/#pkg-note-BUG. counts [2]*histogramCounts - upperBounds []float64 - labelPairs []*dto.LabelPair - exemplars []atomic.Value // One more than buckets (to include +Inf), each a *dto.Exemplar. - sparseSchema int32 // Set to math.MinInt32 if no sparse buckets are used. - sparseThreshold float64 + upperBounds []float64 + labelPairs []*dto.LabelPair + exemplars []atomic.Value // One more than buckets (to include +Inf), each a *dto.Exemplar. + sparseSchema int32 // The initial schema. Set to math.MinInt32 if no sparse buckets are used. + sparseZeroThreshold float64 // The initial zero threshold. + sparseMaxZeroThreshold float64 + sparseMaxBuckets uint32 + sparseMinResetDuration time.Duration + lastResetTime time.Time // Protected by mtx. now func() time.Time // To mock out time.Now() for testing. } @@ -619,8 +671,8 @@ func (h *histogram) Write(out *dto.Metric) error { // the hot path, i.e. Observe is called much more often than Write. The // complication of making Write lock-free isn't worth it, if possible at // all. - h.writeMtx.Lock() - defer h.writeMtx.Unlock() + h.mtx.Lock() + defer h.mtx.Unlock() // Adding 1<<63 switches the hot index (from 0 to 1 or from 1 to 0) // without touching the count bits. See the struct comments for a full @@ -682,15 +734,15 @@ func (h *histogram) Write(out *dto.Metric) error { atomic.StoreUint64(&coldCounts.buckets[i], 0) } if h.sparseSchema > math.MinInt32 { - his.SbZeroThreshold = &h.sparseThreshold - his.SbSchema = &h.sparseSchema + his.SbZeroThreshold = proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sparseZeroThresholdBits))) + his.SbSchema = proto.Int32(atomic.LoadInt32(&coldCounts.sparseSchema)) zeroBucket := atomic.LoadUint64(&coldCounts.sparseZeroBucket) defer func() { atomic.AddUint64(&hotCounts.sparseZeroBucket, zeroBucket) atomic.StoreUint64(&coldCounts.sparseZeroBucket, 0) - coldCounts.sparseBucketsPositive.Range(addAndReset(&hotCounts.sparseBucketsPositive)) - coldCounts.sparseBucketsNegative.Range(addAndReset(&hotCounts.sparseBucketsNegative)) + coldCounts.sparseBucketsPositive.Range(addAndReset(&hotCounts.sparseBucketsPositive, &hotCounts.sparseBucketsNumber)) + coldCounts.sparseBucketsNegative.Range(addAndReset(&hotCounts.sparseBucketsNegative, &hotCounts.sparseBucketsNumber)) }() his.SbZeroCount = proto.Uint64(zeroBucket) @@ -700,72 +752,6 @@ func (h *histogram) Write(out *dto.Metric) error { return nil } -func makeSparseBuckets(buckets *sync.Map) *dto.SparseBuckets { - var ii []int - buckets.Range(func(k, v interface{}) bool { - ii = append(ii, k.(int)) - return true - }) - sort.Ints(ii) - - if len(ii) == 0 { - return nil - } - - sbs := dto.SparseBuckets{} - var prevCount int64 - var nextI int - - appendDelta := func(count int64) { - *sbs.Span[len(sbs.Span)-1].Length++ - sbs.Delta = append(sbs.Delta, count-prevCount) - prevCount = count - } - - for n, i := range ii { - v, _ := buckets.Load(i) - count := atomic.LoadInt64(v.(*int64)) - // Multiple spans with only small gaps in between are probably - // encoded more efficiently as one larger span with a few empty - // buckets. Needs some research to find the sweet spot. For now, - // we assume that gaps of one ore two buckets should not create - // a new span. - iDelta := int32(i - nextI) - if n == 0 || iDelta > 2 { - // We have to create a new span, either because we are - // at the very beginning, or because we have found a gap - // of more than two buckets. - sbs.Span = append(sbs.Span, &dto.SparseBuckets_Span{ - Offset: proto.Int32(iDelta), - Length: proto.Uint32(0), - }) - } else { - // We have found a small gap (or no gap at all). - // Insert empty buckets as needed. - for j := int32(0); j < iDelta; j++ { - appendDelta(0) - } - } - appendDelta(count) - nextI = i + 1 - } - return &sbs -} - -// addAndReset returns a function to be used with sync.Map.Range of spare -// buckets in coldCounts. It increments the buckets in the provided hotBuckets -// according to the buckets ranged through. It then resets all buckets ranged -// through to 0 (but leaves them in place so that they don't need to get -// recreated on the next scrape). -func addAndReset(hotBuckets *sync.Map) func(k, v interface{}) bool { - return func(k, v interface{}) bool { - bucket := v.(*int64) - addToSparseBucket(hotBuckets, k.(int), atomic.LoadInt64(bucket)) - atomic.StoreInt64(bucket, 0) - return true - } -} - // findBucket returns the index of the bucket for the provided value, or // len(h.upperBounds) for the +Inf bucket. func (h *histogram) findBucket(v float64) int { @@ -785,34 +771,235 @@ func (h *histogram) findBucket(v float64) int { func (h *histogram) observe(v float64, bucket int) { // Do not add to sparse buckets for NaN observations. doSparse := h.sparseSchema > math.MinInt32 && !math.IsNaN(v) - var whichSparse, sparseKey int - if doSparse { - switch { - case v > h.sparseThreshold: - whichSparse = +1 - case v < -h.sparseThreshold: - whichSparse = -1 - } - frac, exp := math.Frexp(math.Abs(v)) - switch { - case math.IsInf(v, 0): - sparseKey = math.MaxInt32 // Largest possible sparseKey. - case h.sparseSchema > 0: - bounds := sparseBounds[h.sparseSchema] - sparseKey = sort.SearchFloat64s(bounds, frac) + (exp-1)*len(bounds) - default: - sparseKey = exp - if frac == 0.5 { - sparseKey-- - } - sparseKey /= 1 << -h.sparseSchema - } - } // We increment h.countAndHotIdx so that the counter in the lower // 63 bits gets incremented. At the same time, we get the new value // back, which we can use to find the currently-hot counts. n := atomic.AddUint64(&h.countAndHotIdx, 1) - h.counts[n>>63].observe(v, bucket, doSparse, whichSparse, sparseKey) + hotCounts := h.counts[n>>63] + hotCounts.observe(v, bucket, doSparse) + if doSparse { + h.limitSparseBuckets(hotCounts, v, bucket) + } +} + +// limitSparsebuckets applies a strategy to limit the number of populated sparse +// buckets. It's generally best effort, and there are situations where the +// number can go higher (if even the lowest resolution isn't enough to reduce +// the number sufficiently, or if the provided counts aren't fully updated yet +// by a concurrently happening Write call). +func (h *histogram) limitSparseBuckets(counts *histogramCounts, value float64, bucket int) { + if h.sparseMaxBuckets == 0 { + return // No limit configured. + } + if h.sparseMaxBuckets >= atomic.LoadUint32(&counts.sparseBucketsNumber) { + return // Bucket limit not exceeded yet. + } + + h.mtx.Lock() + defer h.mtx.Unlock() + + // The hot counts might have been swapped just before we acquired the + // lock. Re-fetch the hot counts first... + n := atomic.LoadUint64(&h.countAndHotIdx) + hotIdx := n >> 63 + coldIdx := (^n) >> 63 + hotCounts := h.counts[hotIdx] + coldCounts := h.counts[coldIdx] + // ...and then check again if we really have to reduce the bucket count. + if h.sparseMaxBuckets >= atomic.LoadUint32(&hotCounts.sparseBucketsNumber) { + return // Bucket limit not exceeded after all. + } + + // (1) Ideally, we can reset the whole histogram. + + // We are using the possibly mocked h.now() rather than + // time.Since(h.lastResetTime) to enable testing. + if h.sparseMinResetDuration > 0 && h.now().Sub(h.lastResetTime) >= h.sparseMinResetDuration { + // Completely reset coldCounts. + h.resetCounts(coldCounts) + // Repeat the latest observation to not lose it completely. + coldCounts.observe(value, bucket, true) + // Make coldCounts the new hot counts while ressetting countAndHotIdx. + n := atomic.SwapUint64(&h.countAndHotIdx, (coldIdx<<63)+1) + count := n & ((1 << 63) - 1) + // Wait for the formerly hot counts to cool down. + for count != atomic.LoadUint64(&hotCounts.count) { + runtime.Gosched() // Let observations get work done. + } + // Finally, reset the formerly hot counts, too. + h.resetCounts(hotCounts) + h.lastResetTime = h.now() + return + } + + // (2) Try widening the zero bucket. + currentZeroThreshold := math.Float64frombits(atomic.LoadUint64(&hotCounts.sparseZeroThresholdBits)) + switch { // Use switch rather than if to be able to break out of it. + case h.sparseMaxZeroThreshold > currentZeroThreshold: + // Find the key of the bucket closest to zero. + smallestKey := findSmallestKey(&hotCounts.sparseBucketsPositive) + smallestNegativeKey := findSmallestKey(&hotCounts.sparseBucketsNegative) + if smallestNegativeKey < smallestKey { + smallestKey = smallestNegativeKey + } + if smallestKey == math.MaxInt32 { + break + } + newZeroThreshold := getLe(smallestKey, atomic.LoadInt32(&hotCounts.sparseSchema)) + if newZeroThreshold > h.sparseMaxZeroThreshold { + break // New threshold would exceed the max threshold. + } + atomic.StoreUint64(&coldCounts.sparseZeroThresholdBits, math.Float64bits(newZeroThreshold)) + // Remove applicable buckets. + if _, loaded := coldCounts.sparseBucketsNegative.LoadAndDelete(smallestKey); loaded { + atomic.AddUint32(&coldCounts.sparseBucketsNumber, ^uint32(0)) // Decrement, see https://pkg.go.dev/sync/atomic#AddUint32 + } + if _, loaded := coldCounts.sparseBucketsPositive.LoadAndDelete(smallestKey); loaded { + atomic.AddUint32(&coldCounts.sparseBucketsNumber, ^uint32(0)) // Decrement, see https://pkg.go.dev/sync/atomic#AddUint32 + } + // Make coldCounts the new hot counts. + n := atomic.AddUint64(&h.countAndHotIdx, 1<<63) + count := n & ((1 << 63) - 1) + // Swap the pointer names to represent the new roles and make + // the rest less confusing. + hotCounts, coldCounts = coldCounts, hotCounts + // Wait for the (new) cold counts to cool down. + for count != atomic.LoadUint64(&coldCounts.count) { + runtime.Gosched() // Let observations get work done. + } + // Add all the cold counts to the new hot counts, while merging + // the newly deleted buckets into the wider zero bucket, and + // reset and adjust the cold counts. + // TODO(beorn7): Maybe make it more DRY, cf. Write() method. Maybe + // it's too different, though... + atomic.AddUint64(&hotCounts.count, count) + atomic.StoreUint64(&coldCounts.count, 0) + for { + hotBits := atomic.LoadUint64(&hotCounts.sumBits) + coldBits := atomic.LoadUint64(&coldCounts.sumBits) + newBits := math.Float64bits(math.Float64frombits(hotBits) + math.Float64frombits(coldBits)) + if atomic.CompareAndSwapUint64(&hotCounts.sumBits, hotBits, newBits) { + atomic.StoreUint64(&coldCounts.sumBits, 0) + break + } + } + for i := range h.upperBounds { + atomic.AddUint64(&hotCounts.buckets[i], atomic.LoadUint64(&coldCounts.buckets[i])) + atomic.StoreUint64(&coldCounts.buckets[i], 0) + } + atomic.AddUint64(&hotCounts.sparseZeroBucket, atomic.LoadUint64(&coldCounts.sparseZeroBucket)) + atomic.StoreUint64(&coldCounts.sparseZeroBucket, 0) + atomic.StoreUint64(&coldCounts.sparseZeroThresholdBits, math.Float64bits(newZeroThreshold)) + + mergeAndDeleteOrAddAndReset := func(hotBuckets, coldBuckets *sync.Map) func(k, v interface{}) bool { + return func(k, v interface{}) bool { + key := k.(int) + bucket := v.(*int64) + if key == smallestKey { + // Merge into hot zero bucket... + atomic.AddUint64(&hotCounts.sparseZeroBucket, uint64(atomic.LoadInt64(bucket))) + // ...and delete from cold counts. + coldBuckets.Delete(key) + atomic.AddUint32(&coldCounts.sparseBucketsNumber, ^uint32(0)) // Decrement, see https://pkg.go.dev/sync/atomic#AddUint32 + } else { + // Add to corresponding hot bucket... + if addToSparseBucket(hotBuckets, key, atomic.LoadInt64(bucket)) { + atomic.AddUint32(&hotCounts.sparseBucketsNumber, 1) + } + // ...and reset cold bucket. + atomic.StoreInt64(bucket, 0) + } + return true + } + } + + coldCounts.sparseBucketsPositive.Range(mergeAndDeleteOrAddAndReset(&hotCounts.sparseBucketsPositive, &coldCounts.sparseBucketsPositive)) + coldCounts.sparseBucketsNegative.Range(mergeAndDeleteOrAddAndReset(&hotCounts.sparseBucketsNegative, &coldCounts.sparseBucketsNegative)) + return + } + + // (3) Ultima ratio: Doubling of the bucket width AKA halving the resolution AKA decrementing sparseSchema. + coldSchema := atomic.LoadInt32(&coldCounts.sparseSchema) + if coldSchema == -4 { + return // Already at lowest resolution. + } + coldSchema-- + atomic.StoreInt32(&coldCounts.sparseSchema, coldSchema) + // Play it simple and just delete all cold buckets. + atomic.StoreUint32(&coldCounts.sparseBucketsNumber, 0) + deleteSyncMap(&coldCounts.sparseBucketsNegative) + deleteSyncMap(&coldCounts.sparseBucketsPositive) + // Make coldCounts the new hot counts. + n = atomic.AddUint64(&h.countAndHotIdx, 1<<63) + count := n & ((1 << 63) - 1) + // Swap the pointer names to represent the new roles and make + // the rest less confusing. + hotCounts, coldCounts = coldCounts, hotCounts + // Wait for the (new) cold counts to cool down. + for count != atomic.LoadUint64(&coldCounts.count) { + runtime.Gosched() // Let observations get work done. + } + // Add all the cold counts to the new hot counts, while merging the cold + // buckets into the wider hot buckets, and reset and adjust the cold + // counts. + // TODO(beorn7): Maybe make it more DRY, cf. Write() method and code + // above. Maybe it's too different, though... + atomic.AddUint64(&hotCounts.count, count) + atomic.StoreUint64(&coldCounts.count, 0) + for { + hotBits := atomic.LoadUint64(&hotCounts.sumBits) + coldBits := atomic.LoadUint64(&coldCounts.sumBits) + newBits := math.Float64bits(math.Float64frombits(hotBits) + math.Float64frombits(coldBits)) + if atomic.CompareAndSwapUint64(&hotCounts.sumBits, hotBits, newBits) { + atomic.StoreUint64(&coldCounts.sumBits, 0) + break + } + } + for i := range h.upperBounds { + atomic.AddUint64(&hotCounts.buckets[i], atomic.LoadUint64(&coldCounts.buckets[i])) + atomic.StoreUint64(&coldCounts.buckets[i], 0) + } + atomic.AddUint64(&hotCounts.sparseZeroBucket, atomic.LoadUint64(&coldCounts.sparseZeroBucket)) + atomic.StoreUint64(&coldCounts.sparseZeroBucket, 0) + + merge := func(hotBuckets *sync.Map) func(k, v interface{}) bool { + return func(k, v interface{}) bool { + key := k.(int) + bucket := v.(*int64) + // Adjust key to match the bucket to merge into. + if key > 0 { + key++ + } + key /= 2 + // Add to corresponding hot bucket. + if addToSparseBucket(hotBuckets, key, atomic.LoadInt64(bucket)) { + atomic.AddUint32(&hotCounts.sparseBucketsNumber, 1) + } + return true + } + } + + coldCounts.sparseBucketsPositive.Range(merge(&hotCounts.sparseBucketsPositive)) + coldCounts.sparseBucketsNegative.Range(merge(&hotCounts.sparseBucketsNegative)) + atomic.StoreInt32(&coldCounts.sparseSchema, coldSchema) + // Play it simple again and just delete all cold buckets. + atomic.StoreUint32(&coldCounts.sparseBucketsNumber, 0) + deleteSyncMap(&coldCounts.sparseBucketsNegative) + deleteSyncMap(&coldCounts.sparseBucketsPositive) +} + +func (h *histogram) resetCounts(counts *histogramCounts) { + atomic.StoreUint64(&counts.sumBits, 0) + atomic.StoreUint64(&counts.count, 0) + atomic.StoreUint64(&counts.sparseZeroBucket, 0) + atomic.StoreUint64(&counts.sparseZeroThresholdBits, math.Float64bits(h.sparseZeroThreshold)) + atomic.StoreInt32(&counts.sparseSchema, h.sparseSchema) + atomic.StoreUint32(&counts.sparseBucketsNumber, 0) + for i := range h.upperBounds { + atomic.StoreUint64(&counts.buckets[i], 0) + } + deleteSyncMap(&counts.sparseBucketsNegative) + deleteSyncMap(&counts.sparseBucketsPositive) } // updateExemplar replaces the exemplar for the provided bucket. With empty @@ -1081,3 +1268,120 @@ func pickSparseSchema(bucketFactor float64) int32 { return -int32(floor) } } + +func makeSparseBuckets(buckets *sync.Map) *dto.SparseBuckets { + var ii []int + buckets.Range(func(k, v interface{}) bool { + ii = append(ii, k.(int)) + return true + }) + sort.Ints(ii) + + if len(ii) == 0 { + return nil + } + + sbs := dto.SparseBuckets{} + var prevCount int64 + var nextI int + + appendDelta := func(count int64) { + *sbs.Span[len(sbs.Span)-1].Length++ + sbs.Delta = append(sbs.Delta, count-prevCount) + prevCount = count + } + + for n, i := range ii { + v, _ := buckets.Load(i) + count := atomic.LoadInt64(v.(*int64)) + // Multiple spans with only small gaps in between are probably + // encoded more efficiently as one larger span with a few empty + // buckets. Needs some research to find the sweet spot. For now, + // we assume that gaps of one ore two buckets should not create + // a new span. + iDelta := int32(i - nextI) + if n == 0 || iDelta > 2 { + // We have to create a new span, either because we are + // at the very beginning, or because we have found a gap + // of more than two buckets. + sbs.Span = append(sbs.Span, &dto.SparseBuckets_Span{ + Offset: proto.Int32(iDelta), + Length: proto.Uint32(0), + }) + } else { + // We have found a small gap (or no gap at all). + // Insert empty buckets as needed. + for j := int32(0); j < iDelta; j++ { + appendDelta(0) + } + } + appendDelta(count) + nextI = i + 1 + } + return &sbs +} + +// addToSparseBucket increments the sparse bucket at key by the provided +// amount. It returns true if a new sparse bucket had to be created for that. +func addToSparseBucket(buckets *sync.Map, key int, increment int64) bool { + if existingBucket, ok := buckets.Load(key); ok { + // Fast path without allocation. + atomic.AddInt64(existingBucket.(*int64), increment) + return false + } + // Bucket doesn't exist yet. Slow path allocating new counter. + newBucket := increment // TODO(beorn7): Check if this is sufficient to not let increment escape. + if actualBucket, loaded := buckets.LoadOrStore(key, &newBucket); loaded { + // The bucket was created concurrently in another goroutine. + // Have to increment after all. + atomic.AddInt64(actualBucket.(*int64), increment) + return false + } + return true +} + +// addAndReset returns a function to be used with sync.Map.Range of spare +// buckets in coldCounts. It increments the buckets in the provided hotBuckets +// according to the buckets ranged through. It then resets all buckets ranged +// through to 0 (but leaves them in place so that they don't need to get +// recreated on the next scrape). +func addAndReset(hotBuckets *sync.Map, bucketNumber *uint32) func(k, v interface{}) bool { + return func(k, v interface{}) bool { + bucket := v.(*int64) + if addToSparseBucket(hotBuckets, k.(int), atomic.LoadInt64(bucket)) { + atomic.AddUint32(bucketNumber, 1) + } + atomic.StoreInt64(bucket, 0) + return true + } +} + +func deleteSyncMap(m *sync.Map) { + m.Range(func(k, v interface{}) bool { + m.Delete(k) + return true + }) +} + +func findSmallestKey(m *sync.Map) int { + result := math.MaxInt32 + m.Range(func(k, v interface{}) bool { + key := k.(int) + if key < result { + result = key + } + return true + }) + return result +} + +func getLe(key int, schema int32) float64 { + if schema < 0 { + return math.Ldexp(1, key<<(-schema)) + } + + fracIdx := key & ((1 << schema) - 1) + frac := sparseBounds[schema][fracIdx] + exp := (key >> schema) + 1 + return math.Ldexp(frac, exp) +} diff --git a/prometheus/histogram_test.go b/prometheus/histogram_test.go index 9971d62..46e48c1 100644 --- a/prometheus/histogram_test.go +++ b/prometheus/histogram_test.go @@ -20,6 +20,7 @@ import ( "runtime" "sort" "sync" + "sync/atomic" "testing" "testing/quick" "time" @@ -167,7 +168,7 @@ func TestHistogramConcurrency(t *testing.T) { start.Add(1) end.Add(concLevel) - sum := NewHistogram(HistogramOpts{ + his := NewHistogram(HistogramOpts{ Name: "test_histogram", Help: "helpless", Buckets: testBuckets, @@ -188,9 +189,9 @@ func TestHistogramConcurrency(t *testing.T) { start.Wait() for _, v := range vals { if n%2 == 0 { - sum.Observe(v) + his.Observe(v) } else { - sum.(ExemplarObserver).ObserveWithExemplar(v, Labels{"foo": "bar"}) + his.(ExemplarObserver).ObserveWithExemplar(v, Labels{"foo": "bar"}) } } end.Done() @@ -201,7 +202,7 @@ func TestHistogramConcurrency(t *testing.T) { end.Wait() m := &dto.Metric{} - sum.Write(m) + his.Write(m) if got, want := int(*m.Histogram.SampleCount), total; got != want { t.Errorf("got sample count %d, want %d", got, want) } @@ -424,24 +425,24 @@ func TestHistogramExemplar(t *testing.T) { } expectedExemplars := []*dto.Exemplar{ nil, - &dto.Exemplar{ + { Label: []*dto.LabelPair{ - &dto.LabelPair{Name: proto.String("id"), Value: proto.String("2")}, + {Name: proto.String("id"), Value: proto.String("2")}, }, Value: proto.Float64(1.6), Timestamp: ts, }, nil, - &dto.Exemplar{ + { Label: []*dto.LabelPair{ - &dto.LabelPair{Name: proto.String("id"), Value: proto.String("3")}, + {Name: proto.String("id"), Value: proto.String("3")}, }, Value: proto.Float64(4), Timestamp: ts, }, - &dto.Exemplar{ + { Label: []*dto.LabelPair{ - &dto.LabelPair{Name: proto.String("id"), Value: proto.String("4")}, + {Name: proto.String("id"), Value: proto.String("4")}, }, Value: proto.Float64(4.5), Timestamp: ts, @@ -470,11 +471,14 @@ func TestHistogramExemplar(t *testing.T) { func TestSparseHistogram(t *testing.T) { scenarios := []struct { - name string - observations []float64 - factor float64 - zeroThreshold float64 - want string // String representation of protobuf. + name string + observations []float64 // With simulated interval of 1m. + factor float64 + zeroThreshold float64 + maxBuckets uint32 + minResetDuration time.Duration + maxZeroThreshold float64 + want string // String representation of protobuf. }{ { name: "no sparse buckets", @@ -531,18 +535,122 @@ func TestSparseHistogram(t *testing.T) { factor: 1.2, want: `sample_count:7 sample_sum:-inf sb_schema:2 sb_zero_threshold:2.938735877055719e-39 sb_zero_count:1 sb_negative: delta:1 > sb_positive: delta:1 delta:-1 delta:2 delta:-2 delta:2 > `, }, + { + name: "limited buckets but nothing triggered", + observations: []float64{0, 1, 1.2, 1.4, 1.8, 2}, + factor: 1.2, + maxBuckets: 4, + want: `sample_count:6 sample_sum:7.4 sb_schema:2 sb_zero_threshold:2.938735877055719e-39 sb_zero_count:1 sb_positive: delta:1 delta:-1 delta:2 delta:-2 delta:2 > `, + }, + { + name: "buckets limited by halving resolution", + observations: []float64{0, 1, 1.1, 1.2, 1.4, 1.8, 2, 3}, + factor: 1.2, + maxBuckets: 4, + want: `sample_count:8 sample_sum:11.5 sb_schema:1 sb_zero_threshold:2.938735877055719e-39 sb_zero_count:1 sb_positive: delta:1 delta:2 delta:-1 delta:-2 delta:1 > `, + }, + { + name: "buckets limited by widening the zero bucket", + observations: []float64{0, 1, 1.1, 1.2, 1.4, 1.8, 2, 3}, + factor: 1.2, + maxBuckets: 4, + maxZeroThreshold: 1.2, + want: `sample_count:8 sample_sum:11.5 sb_schema:2 sb_zero_threshold:1 sb_zero_count:2 sb_positive: delta:1 delta:1 delta:-2 delta:2 delta:-2 delta:0 delta:1 > `, + }, + { + name: "buckets limited by widening the zero bucket twice", + observations: []float64{0, 1, 1.1, 1.2, 1.4, 1.8, 2, 3, 4}, + factor: 1.2, + maxBuckets: 4, + maxZeroThreshold: 1.2, + want: `sample_count:9 sample_sum:15.5 sb_schema:2 sb_zero_threshold:1.189207115002721 sb_zero_count:3 sb_positive: delta:2 delta:-2 delta:2 delta:-2 delta:0 delta:1 delta:0 > `, + }, + { + name: "buckets limited by reset", + observations: []float64{0, 1, 1.1, 1.2, 1.4, 1.8, 2, 3, 4}, + factor: 1.2, + maxBuckets: 4, + maxZeroThreshold: 1.2, + minResetDuration: 5 * time.Minute, + want: `sample_count:2 sample_sum:7 sb_schema:2 sb_zero_threshold:2.938735877055719e-39 sb_zero_count:0 sb_positive: delta:1 delta:0 > `, + }, + { + name: "limited buckets but nothing triggered, negative observations", + observations: []float64{0, -1, -1.2, -1.4, -1.8, -2}, + factor: 1.2, + maxBuckets: 4, + want: `sample_count:6 sample_sum:-7.4 sb_schema:2 sb_zero_threshold:2.938735877055719e-39 sb_zero_count:1 sb_negative: delta:1 delta:-1 delta:2 delta:-2 delta:2 > `, + }, + { + name: "buckets limited by halving resolution, negative observations", + observations: []float64{0, -1, -1.1, -1.2, -1.4, -1.8, -2, -3}, + factor: 1.2, + maxBuckets: 4, + want: `sample_count:8 sample_sum:-11.5 sb_schema:1 sb_zero_threshold:2.938735877055719e-39 sb_zero_count:1 sb_negative: delta:1 delta:2 delta:-1 delta:-2 delta:1 > `, + }, + { + name: "buckets limited by widening the zero bucket, negative observations", + observations: []float64{0, -1, -1.1, -1.2, -1.4, -1.8, -2, -3}, + factor: 1.2, + maxBuckets: 4, + maxZeroThreshold: 1.2, + want: `sample_count:8 sample_sum:-11.5 sb_schema:2 sb_zero_threshold:1 sb_zero_count:2 sb_negative: delta:1 delta:1 delta:-2 delta:2 delta:-2 delta:0 delta:1 > `, + }, + { + name: "buckets limited by widening the zero bucket twice, negative observations", + observations: []float64{0, -1, -1.1, -1.2, -1.4, -1.8, -2, -3, -4}, + factor: 1.2, + maxBuckets: 4, + maxZeroThreshold: 1.2, + want: `sample_count:9 sample_sum:-15.5 sb_schema:2 sb_zero_threshold:1.189207115002721 sb_zero_count:3 sb_negative: delta:2 delta:-2 delta:2 delta:-2 delta:0 delta:1 delta:0 > `, + }, + { + name: "buckets limited by reset, negative observations", + observations: []float64{0, -1, -1.1, -1.2, -1.4, -1.8, -2, -3, -4}, + factor: 1.2, + maxBuckets: 4, + maxZeroThreshold: 1.2, + minResetDuration: 5 * time.Minute, + want: `sample_count:2 sample_sum:-7 sb_schema:2 sb_zero_threshold:2.938735877055719e-39 sb_zero_count:0 sb_negative: delta:1 delta:0 > `, + }, + { + name: "buckets limited by halving resolution, then reset", + observations: []float64{0, 1, 1.1, 1.2, 1.4, 1.8, 2, 5, 5.1, 3, 4}, + factor: 1.2, + maxBuckets: 4, + minResetDuration: 9 * time.Minute, + want: `sample_count:2 sample_sum:7 sb_schema:2 sb_zero_threshold:2.938735877055719e-39 sb_zero_count:0 sb_positive: delta:1 delta:0 > `, + }, + { + name: "buckets limited by widening the zero bucket, then reset", + observations: []float64{0, 1, 1.1, 1.2, 1.4, 1.8, 2, 5, 5.1, 3, 4}, + factor: 1.2, + maxBuckets: 4, + maxZeroThreshold: 1.2, + minResetDuration: 9 * time.Minute, + want: `sample_count:2 sample_sum:7 sb_schema:2 sb_zero_threshold:2.938735877055719e-39 sb_zero_count:0 sb_positive: delta:1 delta:0 > `, + }, } for _, s := range scenarios { t.Run(s.name, func(t *testing.T) { his := NewHistogram(HistogramOpts{ - Name: "name", - Help: "help", - SparseBucketsFactor: s.factor, - SparseBucketsZeroThreshold: s.zeroThreshold, + Name: "name", + Help: "help", + SparseBucketsFactor: s.factor, + SparseBucketsZeroThreshold: s.zeroThreshold, + SparseBucketsMaxNumber: s.maxBuckets, + SparseBucketsMinResetDuration: s.minResetDuration, + SparseBucketsMaxZeroThreshold: s.maxZeroThreshold, }) + ts := time.Now().Add(30 * time.Second) + now := func() time.Time { + return ts + } + his.(*histogram).now = now for _, o := range s.observations { his.Observe(o) + ts = ts.Add(time.Minute) } m := &dto.Metric{} if err := his.Write(m); err != nil { @@ -556,3 +664,101 @@ func TestSparseHistogram(t *testing.T) { } } + +func TestSparseHistogramConcurrency(t *testing.T) { + if testing.Short() { + t.Skip("Skipping test in short mode.") + } + + rand.Seed(42) + + it := func(n uint32) bool { + mutations := int(n%1e4 + 1e4) + concLevel := int(n%5 + 1) + total := mutations * concLevel + + var start, end sync.WaitGroup + start.Add(1) + end.Add(concLevel) + + his := NewHistogram(HistogramOpts{ + Name: "test_sparse_histogram", + Help: "This help is sparse.", + SparseBucketsFactor: 1.05, + SparseBucketsZeroThreshold: 0.0000001, + SparseBucketsMaxNumber: 50, + SparseBucketsMinResetDuration: time.Hour, // Comment out to test for totals below. + SparseBucketsMaxZeroThreshold: 0.001, + }) + + ts := time.Now().Add(30 * time.Second).Unix() + now := func() time.Time { + return time.Unix(atomic.LoadInt64(&ts), 0) + } + his.(*histogram).now = now + + allVars := make([]float64, total) + var sampleSum float64 + for i := 0; i < concLevel; i++ { + vals := make([]float64, mutations) + for j := 0; j < mutations; j++ { + v := rand.NormFloat64() + vals[j] = v + allVars[i*mutations+j] = v + sampleSum += v + } + + go func(vals []float64) { + start.Wait() + for _, v := range vals { + // An observation every 1 to 10 seconds. + atomic.AddInt64(&ts, rand.Int63n(10)+1) + his.Observe(v) + } + end.Done() + }(vals) + } + sort.Float64s(allVars) + start.Done() + end.Wait() + + m := &dto.Metric{} + his.Write(m) + + // Uncomment these tests for totals only if you have disabled histogram resets above. + // + // if got, want := int(*m.Histogram.SampleCount), total; got != want { + // t.Errorf("got sample count %d, want %d", got, want) + // } + // if got, want := *m.Histogram.SampleSum, sampleSum; math.Abs((got-want)/want) > 0.001 { + // t.Errorf("got sample sum %f, want %f", got, want) + // } + + sumBuckets := int(m.Histogram.GetSbZeroCount()) + current := 0 + for _, delta := range m.Histogram.GetSbNegative().GetDelta() { + current += int(delta) + if current < 0 { + t.Fatalf("negative bucket population negative: %d", current) + } + sumBuckets += current + } + current = 0 + for _, delta := range m.Histogram.GetSbPositive().GetDelta() { + current += int(delta) + if current < 0 { + t.Fatalf("positive bucket population negative: %d", current) + } + sumBuckets += current + } + if got, want := sumBuckets, int(*m.Histogram.SampleCount); got != want { + t.Errorf("got bucket population sum %d, want %d", got, want) + } + + return true + } + + if err := quick.Check(it, nil); err != nil { + t.Error(err) + } +}