From c98db4eccd6e212585505fc1079d4a0472e04ff5 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 5 Mar 2020 20:07:45 +0100 Subject: [PATCH] Demo sparse histograms Printf the structure of it instead of actually encoding it. Signed-off-by: beorn7 --- examples/random/main.go | 7 +- prometheus/histogram.go | 222 +++++++++++++++++++++++++++++++++------- 2 files changed, 188 insertions(+), 41 deletions(-) diff --git a/examples/random/main.go b/examples/random/main.go index 20a9db5..9b910fa 100644 --- a/examples/random/main.go +++ b/examples/random/main.go @@ -54,9 +54,10 @@ var ( // normal distribution, with 20 buckets centered on the mean, each // half-sigma wide. rpcDurationsHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "rpc_durations_histogram_seconds", - Help: "RPC latency distributions.", - Buckets: prometheus.LinearBuckets(*normMean-5**normDomain, .5**normDomain, 20), + Name: "rpc_durations_histogram_seconds", + Help: "RPC latency distributions.", + Buckets: prometheus.LinearBuckets(*normMean-5**normDomain, .5**normDomain, 20), + SparseBucketsResolution: 20, }) ) diff --git a/prometheus/histogram.go b/prometheus/histogram.go index 4271f43..e7115e4 100644 --- a/prometheus/histogram.go +++ b/prometheus/histogram.go @@ -14,7 +14,9 @@ package prometheus import ( + "bytes" "fmt" + "io" "math" "runtime" "sort" @@ -58,12 +60,14 @@ const bucketLabel = "le" // tailored to broadly measure the response time (in seconds) of a network // service. Most likely, however, you will be required to define buckets // customized to your use case. -var ( - DefBuckets = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10} +var DefBuckets = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10} - errBucketLabelNotAllowed = fmt.Errorf( - "%q is not allowed as label name in histograms", bucketLabel, - ) +// DefSparseBucketsZeroThreshold is the default value for +// SparseBucketsZeroThreshold in the HistogramOpts. +var DefSparseBucketsZeroThreshold = 1e-128 + +var errBucketLabelNotAllowed = fmt.Errorf( + "%q is not allowed as label name in histograms", bucketLabel, ) // LinearBuckets creates 'count' buckets, each 'width' wide, where the lowest @@ -146,8 +150,32 @@ type HistogramOpts struct { // element in the slice is the upper inclusive bound of a bucket. The // values must be sorted in strictly increasing order. There is no need // to add a highest bucket with +Inf bound, it will be added - // implicitly. The default value is DefBuckets. + // implicitly. If Buckets is left as nil or set to a slice of length + // zero, it is replaced by default buckets. The default buckets are + // DefBuckets if no sparse buckets (see below) are used, otherwise the + // default is no buckets. (In other words, if you want to use both + // reguler buckets and sparse buckets, you have to define the regular + // buckets here explicitly.) Buckets []float64 + + // If SparseBucketsResolution is not zero, sparse buckets are used (in + // addition to the regular buckets, if defined above). Every power of + // ten is divided into the given number of exponential buckets. For + // example, if set to 3, the bucket boundaries are approximately […, + // 0.1, 0.215, 0.464, 1, 2.15, 4,64, 10, 21.5, 46.4, 100, …] Histograms + // can only be properly aggregated if they use the same + // resolution. Therefore, it is recommended to use 20 as a resolution, + // which is generally expected to be a good tradeoff between resource + // usage and accuracy (resulting in a maximum error of quantile values + // of about 6%). + SparseBucketsResolution uint8 + // All observations with an absolute value of less or equal + // SparseBucketsZeroThreshold are accumulated into a “zero” bucket. For + // best results, this should be close to a bucket boundary. This is + // moste easily accomplished by picking a power of ten. If + // SparseBucketsZeroThreshold is left at zero (or set to a negative + // value), DefSparseBucketsZeroThreshold is used as the threshold. + SparseBucketsZeroThreshold float64 } // NewHistogram creates a new Histogram based on the provided HistogramOpts. It @@ -184,16 +212,20 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr } } - if len(opts.Buckets) == 0 { - opts.Buckets = DefBuckets - } - h := &histogram{ - desc: desc, - upperBounds: opts.Buckets, - labelPairs: makeLabelPairs(desc, labelValues), - counts: [2]*histogramCounts{{}, {}}, - now: time.Now, + desc: desc, + upperBounds: opts.Buckets, + sparseResolution: opts.SparseBucketsResolution, + sparseThreshold: opts.SparseBucketsZeroThreshold, + labelPairs: makeLabelPairs(desc, labelValues), + counts: [2]*histogramCounts{{}, {}}, + now: time.Now, + } + if len(h.upperBounds) == 0 && opts.SparseBucketsResolution == 0 { + h.upperBounds = DefBuckets + } + if h.sparseThreshold <= 0 { + h.sparseThreshold = DefSparseBucketsZeroThreshold } for i, upperBound := range h.upperBounds { if i < len(h.upperBounds)-1 { @@ -228,6 +260,67 @@ type histogramCounts struct { sumBits uint64 count uint64 buckets []uint64 + // sparse buckets are implemented with a sync.Map for this PoC. A + // dedicated data structure will likely be more efficient. + // There are separate maps for negative and positive observations. + // The map's value is a *uint64, counting observations in that bucket. + // The map's key is the logarithmic index of the bucket. Index 0 is for an + // upper bound of 1. Each increment/decrement by SparseBucketsResolution + // multiplies/divides the upper bound by 10. Indices in between are + // spaced exponentially as defined in spareBounds. + sparseBucketsPositive, sparseBucketsNegative sync.Map + // sparseZeroBucket counts all (positive and negative) observations in + // the zero bucket (with an absolute value less or equal + // SparseBucketsZeroThreshold). + sparseZeroBucket uint64 +} + +// observe manages the parts of observe that only affects +// histogramCounts. doSparse is true if spare buckets should be done, +// too. whichSparse is 0 for the sparseZeroBucket and +1 or -1 for +// sparseBucketsPositive or sparseBucketsNegative, respectively. sparseKey is +// the key of the sparse bucket to use. +func (hc *histogramCounts) observe(v float64, bucket int, doSparse bool, whichSparse int, sparseKey int) { + if bucket < len(hc.buckets) { + atomic.AddUint64(&hc.buckets[bucket], 1) + } + for { + oldBits := atomic.LoadUint64(&hc.sumBits) + newBits := math.Float64bits(math.Float64frombits(oldBits) + v) + if atomic.CompareAndSwapUint64(&hc.sumBits, oldBits, newBits) { + break + } + } + if doSparse { + switch whichSparse { + case 0: + atomic.AddUint64(&hc.sparseZeroBucket, 1) + case +1: + addToSparseBucket(&hc.sparseBucketsPositive, sparseKey, 1) + case -1: + addToSparseBucket(&hc.sparseBucketsNegative, sparseKey, 1) + default: + panic(fmt.Errorf("invalid value for whichSparse: %d", whichSparse)) + } + } + // Increment count last as we take it as a signal that the observation + // is complete. + atomic.AddUint64(&hc.count, 1) +} + +func addToSparseBucket(buckets *sync.Map, key int, increment uint64) { + if existingBucket, ok := buckets.Load(key); ok { + // Fast path without allocation. + atomic.AddUint64(existingBucket.(*uint64), increment) + return + } + // Bucket doesn't exist yet. Slow path allocating new counter. + newBucket := increment // TODO(beorn7): Check if this is sufficient to not let increment escape. + if actualBucket, loaded := buckets.LoadOrStore(key, &newBucket); loaded { + // The bucket was created concurrently in another goroutine. + // Have to increment after all. + atomic.AddUint64(actualBucket.(*uint64), increment) + } } type histogram struct { @@ -259,9 +352,11 @@ type histogram struct { // http://golang.org/pkg/sync/atomic/#pkg-note-BUG. counts [2]*histogramCounts - upperBounds []float64 - labelPairs []*dto.LabelPair - exemplars []atomic.Value // One more than buckets (to include +Inf), each a *dto.Exemplar. + upperBounds []float64 + labelPairs []*dto.LabelPair + exemplars []atomic.Value // One more than buckets (to include +Inf), each a *dto.Exemplar. + sparseResolution uint8 + sparseThreshold float64 now func() time.Time // To mock out time.Now() for testing. } @@ -309,6 +404,9 @@ func (h *histogram) Write(out *dto.Metric) error { SampleCount: proto.Uint64(count), SampleSum: proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sumBits))), } + out.Histogram = his + out.Label = h.labelPairs + var cumCount uint64 for i, upperBound := range h.upperBounds { cumCount += atomic.LoadUint64(&coldCounts.buckets[i]) @@ -329,11 +427,7 @@ func (h *histogram) Write(out *dto.Metric) error { } his.Bucket = append(his.Bucket, b) } - - out.Histogram = his - out.Label = h.labelPairs - - // Finally add all the cold counts to the new hot counts and reset the cold counts. + // Add all the cold counts to the new hot counts and reset the cold counts. atomic.AddUint64(&hotCounts.count, count) atomic.StoreUint64(&coldCounts.count, 0) for { @@ -348,9 +442,64 @@ func (h *histogram) Write(out *dto.Metric) error { atomic.AddUint64(&hotCounts.buckets[i], atomic.LoadUint64(&coldCounts.buckets[i])) atomic.StoreUint64(&coldCounts.buckets[i], 0) } + if h.sparseResolution != 0 { + zeroBucket := atomic.LoadUint64(&coldCounts.sparseZeroBucket) + + defer func() { + atomic.AddUint64(&hotCounts.sparseZeroBucket, zeroBucket) + atomic.StoreUint64(&coldCounts.sparseZeroBucket, 0) + coldCounts.sparseBucketsPositive.Range(addAndReset(&hotCounts.sparseBucketsPositive)) + coldCounts.sparseBucketsNegative.Range(addAndReset(&hotCounts.sparseBucketsNegative)) + }() + + var buf bytes.Buffer + // TODO(beorn7): encode zero bucket threshold and count. + fmt.Println("Zero bucket:", zeroBucket) // DEBUG + fmt.Println("Positive buckets:") // DEBUG + if _, err := encodeSparseBuckets(&buf, &coldCounts.sparseBucketsPositive, zeroBucket); err != nil { + return err + } + fmt.Println("Negative buckets:") // DEBUG + if _, err := encodeSparseBuckets(&buf, &coldCounts.sparseBucketsNegative, zeroBucket); err != nil { + return err + } + } return nil } +func encodeSparseBuckets(w io.Writer, buckets *sync.Map, zeroBucket uint64) (n int, err error) { + // TODO(beorn7): Add actual encoding of spare buckets. + var ii []int + buckets.Range(func(k, v interface{}) bool { + ii = append(ii, k.(int)) + return true + }) + sort.Ints(ii) + fmt.Println(len(ii), "buckets") + var prev uint64 + for _, i := range ii { + v, _ := buckets.Load(i) + current := atomic.LoadUint64(v.(*uint64)) + fmt.Printf("- %d: %d Δ=%d\n", i, current, int(current)-int(prev)) + prev = current + } + return 0, nil +} + +// addAndReset returns a function to be used with sync.Map.Range of spare +// buckets in coldCounts. It increments the buckets in the provided hotBuckets +// according to the buckets ranged through. It then resets all buckets ranged +// through to 0 (but leaves them in place so that they don't need to get +// recreated on the next scrape). +func addAndReset(hotBuckets *sync.Map) func(k, v interface{}) bool { + return func(k, v interface{}) bool { + bucket := v.(*uint64) + addToSparseBucket(hotBuckets, k.(int), atomic.LoadUint64(bucket)) + atomic.StoreUint64(bucket, 0) + return true + } +} + // findBucket returns the index of the bucket for the provided value, or // len(h.upperBounds) for the +Inf bucket. func (h *histogram) findBucket(v float64) int { @@ -368,25 +517,22 @@ func (h *histogram) findBucket(v float64) int { // observe is the implementation for Observe without the findBucket part. func (h *histogram) observe(v float64, bucket int) { + doSparse := h.sparseResolution != 0 + var whichSparse, sparseKey int + if doSparse { + switch { + case v > h.sparseThreshold: + whichSparse = +1 + case v < -h.sparseThreshold: + whichSparse = -1 + } + sparseKey = int(math.Ceil(math.Log10(math.Abs(v)) * float64(h.sparseResolution))) + } // We increment h.countAndHotIdx so that the counter in the lower // 63 bits gets incremented. At the same time, we get the new value // back, which we can use to find the currently-hot counts. n := atomic.AddUint64(&h.countAndHotIdx, 1) - hotCounts := h.counts[n>>63] - - if bucket < len(h.upperBounds) { - atomic.AddUint64(&hotCounts.buckets[bucket], 1) - } - for { - oldBits := atomic.LoadUint64(&hotCounts.sumBits) - newBits := math.Float64bits(math.Float64frombits(oldBits) + v) - if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) { - break - } - } - // Increment count last as we take it as a signal that the observation - // is complete. - atomic.AddUint64(&hotCounts.count, 1) + h.counts[n>>63].observe(v, bucket, doSparse, whichSparse, sparseKey) } // updateExemplar replaces the exemplar for the provided bucket. With empty