Demo sparse histograms

Printf the structure of it instead of actually encoding it.

Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
beorn7 2020-03-05 20:07:45 +01:00
parent 346356f42e
commit c98db4eccd
2 changed files with 188 additions and 41 deletions

View File

@ -54,9 +54,10 @@ var (
// normal distribution, with 20 buckets centered on the mean, each
// half-sigma wide.
rpcDurationsHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "rpc_durations_histogram_seconds",
Help: "RPC latency distributions.",
Buckets: prometheus.LinearBuckets(*normMean-5**normDomain, .5**normDomain, 20),
Name: "rpc_durations_histogram_seconds",
Help: "RPC latency distributions.",
Buckets: prometheus.LinearBuckets(*normMean-5**normDomain, .5**normDomain, 20),
SparseBucketsResolution: 20,
})
)

View File

@ -14,7 +14,9 @@
package prometheus
import (
"bytes"
"fmt"
"io"
"math"
"runtime"
"sort"
@ -58,12 +60,14 @@ const bucketLabel = "le"
// tailored to broadly measure the response time (in seconds) of a network
// service. Most likely, however, you will be required to define buckets
// customized to your use case.
var (
DefBuckets = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
var DefBuckets = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
errBucketLabelNotAllowed = fmt.Errorf(
"%q is not allowed as label name in histograms", bucketLabel,
)
// DefSparseBucketsZeroThreshold is the default value for
// SparseBucketsZeroThreshold in the HistogramOpts.
var DefSparseBucketsZeroThreshold = 1e-128
var errBucketLabelNotAllowed = fmt.Errorf(
"%q is not allowed as label name in histograms", bucketLabel,
)
// LinearBuckets creates 'count' buckets, each 'width' wide, where the lowest
@ -146,8 +150,32 @@ type HistogramOpts struct {
// element in the slice is the upper inclusive bound of a bucket. The
// values must be sorted in strictly increasing order. There is no need
// to add a highest bucket with +Inf bound, it will be added
// implicitly. The default value is DefBuckets.
// implicitly. If Buckets is left as nil or set to a slice of length
// zero, it is replaced by default buckets. The default buckets are
// DefBuckets if no sparse buckets (see below) are used, otherwise the
// default is no buckets. (In other words, if you want to use both
// reguler buckets and sparse buckets, you have to define the regular
// buckets here explicitly.)
Buckets []float64
// If SparseBucketsResolution is not zero, sparse buckets are used (in
// addition to the regular buckets, if defined above). Every power of
// ten is divided into the given number of exponential buckets. For
// example, if set to 3, the bucket boundaries are approximately […,
// 0.1, 0.215, 0.464, 1, 2.15, 4,64, 10, 21.5, 46.4, 100, …] Histograms
// can only be properly aggregated if they use the same
// resolution. Therefore, it is recommended to use 20 as a resolution,
// which is generally expected to be a good tradeoff between resource
// usage and accuracy (resulting in a maximum error of quantile values
// of about 6%).
SparseBucketsResolution uint8
// All observations with an absolute value of less or equal
// SparseBucketsZeroThreshold are accumulated into a “zero” bucket. For
// best results, this should be close to a bucket boundary. This is
// moste easily accomplished by picking a power of ten. If
// SparseBucketsZeroThreshold is left at zero (or set to a negative
// value), DefSparseBucketsZeroThreshold is used as the threshold.
SparseBucketsZeroThreshold float64
}
// NewHistogram creates a new Histogram based on the provided HistogramOpts. It
@ -184,16 +212,20 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
}
}
if len(opts.Buckets) == 0 {
opts.Buckets = DefBuckets
}
h := &histogram{
desc: desc,
upperBounds: opts.Buckets,
labelPairs: makeLabelPairs(desc, labelValues),
counts: [2]*histogramCounts{{}, {}},
now: time.Now,
desc: desc,
upperBounds: opts.Buckets,
sparseResolution: opts.SparseBucketsResolution,
sparseThreshold: opts.SparseBucketsZeroThreshold,
labelPairs: makeLabelPairs(desc, labelValues),
counts: [2]*histogramCounts{{}, {}},
now: time.Now,
}
if len(h.upperBounds) == 0 && opts.SparseBucketsResolution == 0 {
h.upperBounds = DefBuckets
}
if h.sparseThreshold <= 0 {
h.sparseThreshold = DefSparseBucketsZeroThreshold
}
for i, upperBound := range h.upperBounds {
if i < len(h.upperBounds)-1 {
@ -228,6 +260,67 @@ type histogramCounts struct {
sumBits uint64
count uint64
buckets []uint64
// sparse buckets are implemented with a sync.Map for this PoC. A
// dedicated data structure will likely be more efficient.
// There are separate maps for negative and positive observations.
// The map's value is a *uint64, counting observations in that bucket.
// The map's key is the logarithmic index of the bucket. Index 0 is for an
// upper bound of 1. Each increment/decrement by SparseBucketsResolution
// multiplies/divides the upper bound by 10. Indices in between are
// spaced exponentially as defined in spareBounds.
sparseBucketsPositive, sparseBucketsNegative sync.Map
// sparseZeroBucket counts all (positive and negative) observations in
// the zero bucket (with an absolute value less or equal
// SparseBucketsZeroThreshold).
sparseZeroBucket uint64
}
// observe manages the parts of observe that only affects
// histogramCounts. doSparse is true if spare buckets should be done,
// too. whichSparse is 0 for the sparseZeroBucket and +1 or -1 for
// sparseBucketsPositive or sparseBucketsNegative, respectively. sparseKey is
// the key of the sparse bucket to use.
func (hc *histogramCounts) observe(v float64, bucket int, doSparse bool, whichSparse int, sparseKey int) {
if bucket < len(hc.buckets) {
atomic.AddUint64(&hc.buckets[bucket], 1)
}
for {
oldBits := atomic.LoadUint64(&hc.sumBits)
newBits := math.Float64bits(math.Float64frombits(oldBits) + v)
if atomic.CompareAndSwapUint64(&hc.sumBits, oldBits, newBits) {
break
}
}
if doSparse {
switch whichSparse {
case 0:
atomic.AddUint64(&hc.sparseZeroBucket, 1)
case +1:
addToSparseBucket(&hc.sparseBucketsPositive, sparseKey, 1)
case -1:
addToSparseBucket(&hc.sparseBucketsNegative, sparseKey, 1)
default:
panic(fmt.Errorf("invalid value for whichSparse: %d", whichSparse))
}
}
// Increment count last as we take it as a signal that the observation
// is complete.
atomic.AddUint64(&hc.count, 1)
}
func addToSparseBucket(buckets *sync.Map, key int, increment uint64) {
if existingBucket, ok := buckets.Load(key); ok {
// Fast path without allocation.
atomic.AddUint64(existingBucket.(*uint64), increment)
return
}
// Bucket doesn't exist yet. Slow path allocating new counter.
newBucket := increment // TODO(beorn7): Check if this is sufficient to not let increment escape.
if actualBucket, loaded := buckets.LoadOrStore(key, &newBucket); loaded {
// The bucket was created concurrently in another goroutine.
// Have to increment after all.
atomic.AddUint64(actualBucket.(*uint64), increment)
}
}
type histogram struct {
@ -259,9 +352,11 @@ type histogram struct {
// http://golang.org/pkg/sync/atomic/#pkg-note-BUG.
counts [2]*histogramCounts
upperBounds []float64
labelPairs []*dto.LabelPair
exemplars []atomic.Value // One more than buckets (to include +Inf), each a *dto.Exemplar.
upperBounds []float64
labelPairs []*dto.LabelPair
exemplars []atomic.Value // One more than buckets (to include +Inf), each a *dto.Exemplar.
sparseResolution uint8
sparseThreshold float64
now func() time.Time // To mock out time.Now() for testing.
}
@ -309,6 +404,9 @@ func (h *histogram) Write(out *dto.Metric) error {
SampleCount: proto.Uint64(count),
SampleSum: proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sumBits))),
}
out.Histogram = his
out.Label = h.labelPairs
var cumCount uint64
for i, upperBound := range h.upperBounds {
cumCount += atomic.LoadUint64(&coldCounts.buckets[i])
@ -329,11 +427,7 @@ func (h *histogram) Write(out *dto.Metric) error {
}
his.Bucket = append(his.Bucket, b)
}
out.Histogram = his
out.Label = h.labelPairs
// Finally add all the cold counts to the new hot counts and reset the cold counts.
// Add all the cold counts to the new hot counts and reset the cold counts.
atomic.AddUint64(&hotCounts.count, count)
atomic.StoreUint64(&coldCounts.count, 0)
for {
@ -348,9 +442,64 @@ func (h *histogram) Write(out *dto.Metric) error {
atomic.AddUint64(&hotCounts.buckets[i], atomic.LoadUint64(&coldCounts.buckets[i]))
atomic.StoreUint64(&coldCounts.buckets[i], 0)
}
if h.sparseResolution != 0 {
zeroBucket := atomic.LoadUint64(&coldCounts.sparseZeroBucket)
defer func() {
atomic.AddUint64(&hotCounts.sparseZeroBucket, zeroBucket)
atomic.StoreUint64(&coldCounts.sparseZeroBucket, 0)
coldCounts.sparseBucketsPositive.Range(addAndReset(&hotCounts.sparseBucketsPositive))
coldCounts.sparseBucketsNegative.Range(addAndReset(&hotCounts.sparseBucketsNegative))
}()
var buf bytes.Buffer
// TODO(beorn7): encode zero bucket threshold and count.
fmt.Println("Zero bucket:", zeroBucket) // DEBUG
fmt.Println("Positive buckets:") // DEBUG
if _, err := encodeSparseBuckets(&buf, &coldCounts.sparseBucketsPositive, zeroBucket); err != nil {
return err
}
fmt.Println("Negative buckets:") // DEBUG
if _, err := encodeSparseBuckets(&buf, &coldCounts.sparseBucketsNegative, zeroBucket); err != nil {
return err
}
}
return nil
}
func encodeSparseBuckets(w io.Writer, buckets *sync.Map, zeroBucket uint64) (n int, err error) {
// TODO(beorn7): Add actual encoding of spare buckets.
var ii []int
buckets.Range(func(k, v interface{}) bool {
ii = append(ii, k.(int))
return true
})
sort.Ints(ii)
fmt.Println(len(ii), "buckets")
var prev uint64
for _, i := range ii {
v, _ := buckets.Load(i)
current := atomic.LoadUint64(v.(*uint64))
fmt.Printf("- %d: %d Δ=%d\n", i, current, int(current)-int(prev))
prev = current
}
return 0, nil
}
// addAndReset returns a function to be used with sync.Map.Range of spare
// buckets in coldCounts. It increments the buckets in the provided hotBuckets
// according to the buckets ranged through. It then resets all buckets ranged
// through to 0 (but leaves them in place so that they don't need to get
// recreated on the next scrape).
func addAndReset(hotBuckets *sync.Map) func(k, v interface{}) bool {
return func(k, v interface{}) bool {
bucket := v.(*uint64)
addToSparseBucket(hotBuckets, k.(int), atomic.LoadUint64(bucket))
atomic.StoreUint64(bucket, 0)
return true
}
}
// findBucket returns the index of the bucket for the provided value, or
// len(h.upperBounds) for the +Inf bucket.
func (h *histogram) findBucket(v float64) int {
@ -368,25 +517,22 @@ func (h *histogram) findBucket(v float64) int {
// observe is the implementation for Observe without the findBucket part.
func (h *histogram) observe(v float64, bucket int) {
doSparse := h.sparseResolution != 0
var whichSparse, sparseKey int
if doSparse {
switch {
case v > h.sparseThreshold:
whichSparse = +1
case v < -h.sparseThreshold:
whichSparse = -1
}
sparseKey = int(math.Ceil(math.Log10(math.Abs(v)) * float64(h.sparseResolution)))
}
// We increment h.countAndHotIdx so that the counter in the lower
// 63 bits gets incremented. At the same time, we get the new value
// back, which we can use to find the currently-hot counts.
n := atomic.AddUint64(&h.countAndHotIdx, 1)
hotCounts := h.counts[n>>63]
if bucket < len(h.upperBounds) {
atomic.AddUint64(&hotCounts.buckets[bucket], 1)
}
for {
oldBits := atomic.LoadUint64(&hotCounts.sumBits)
newBits := math.Float64bits(math.Float64frombits(oldBits) + v)
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
break
}
}
// Increment count last as we take it as a signal that the observation
// is complete.
atomic.AddUint64(&hotCounts.count, 1)
h.counts[n>>63].observe(v, bucket, doSparse, whichSparse, sparseKey)
}
// updateExemplar replaces the exemplar for the provided bucket. With empty