Merge pull request #901 from prometheus/beorn7/histogram
Implement strategy to limit the sparse bucket count
This commit is contained in:
commit
dfbcc28fff
|
@ -39,21 +39,21 @@ import (
|
||||||
// https://github.com/open-telemetry/opentelemetry-specification/issues/1776#issuecomment-870164310
|
// https://github.com/open-telemetry/opentelemetry-specification/issues/1776#issuecomment-870164310
|
||||||
var sparseBounds = [][]float64{
|
var sparseBounds = [][]float64{
|
||||||
// Schema "0":
|
// Schema "0":
|
||||||
[]float64{0.5},
|
{0.5},
|
||||||
// Schema 1:
|
// Schema 1:
|
||||||
[]float64{0.5, 0.7071067811865475},
|
{0.5, 0.7071067811865475},
|
||||||
// Schema 2:
|
// Schema 2:
|
||||||
[]float64{0.5, 0.5946035575013605, 0.7071067811865475, 0.8408964152537144},
|
{0.5, 0.5946035575013605, 0.7071067811865475, 0.8408964152537144},
|
||||||
// Schema 3:
|
// Schema 3:
|
||||||
[]float64{0.5, 0.5452538663326288, 0.5946035575013605, 0.6484197773255048,
|
{0.5, 0.5452538663326288, 0.5946035575013605, 0.6484197773255048,
|
||||||
0.7071067811865475, 0.7711054127039704, 0.8408964152537144, 0.9170040432046711},
|
0.7071067811865475, 0.7711054127039704, 0.8408964152537144, 0.9170040432046711},
|
||||||
// Schema 4:
|
// Schema 4:
|
||||||
[]float64{0.5, 0.5221368912137069, 0.5452538663326288, 0.5693943173783458,
|
{0.5, 0.5221368912137069, 0.5452538663326288, 0.5693943173783458,
|
||||||
0.5946035575013605, 0.620928906036742, 0.6484197773255048, 0.6771277734684463,
|
0.5946035575013605, 0.620928906036742, 0.6484197773255048, 0.6771277734684463,
|
||||||
0.7071067811865475, 0.7384130729697496, 0.7711054127039704, 0.805245165974627,
|
0.7071067811865475, 0.7384130729697496, 0.7711054127039704, 0.805245165974627,
|
||||||
0.8408964152537144, 0.8781260801866495, 0.9170040432046711, 0.9576032806985735},
|
0.8408964152537144, 0.8781260801866495, 0.9170040432046711, 0.9576032806985735},
|
||||||
// Schema 5:
|
// Schema 5:
|
||||||
[]float64{0.5, 0.5109485743270583, 0.5221368912137069, 0.5335702003384117,
|
{0.5, 0.5109485743270583, 0.5221368912137069, 0.5335702003384117,
|
||||||
0.5452538663326288, 0.5571933712979462, 0.5693943173783458, 0.5818624293887887,
|
0.5452538663326288, 0.5571933712979462, 0.5693943173783458, 0.5818624293887887,
|
||||||
0.5946035575013605, 0.6076236799902344, 0.620928906036742, 0.6345254785958666,
|
0.5946035575013605, 0.6076236799902344, 0.620928906036742, 0.6345254785958666,
|
||||||
0.6484197773255048, 0.6626183215798706, 0.6771277734684463, 0.6919549409819159,
|
0.6484197773255048, 0.6626183215798706, 0.6771277734684463, 0.6919549409819159,
|
||||||
|
@ -62,7 +62,7 @@ var sparseBounds = [][]float64{
|
||||||
0.8408964152537144, 0.8593096490612387, 0.8781260801866495, 0.8973545375015533,
|
0.8408964152537144, 0.8593096490612387, 0.8781260801866495, 0.8973545375015533,
|
||||||
0.9170040432046711, 0.9370838170551498, 0.9576032806985735, 0.9785720620876999},
|
0.9170040432046711, 0.9370838170551498, 0.9576032806985735, 0.9785720620876999},
|
||||||
// Schema 6:
|
// Schema 6:
|
||||||
[]float64{0.5, 0.5054446430258502, 0.5109485743270583, 0.5165124395106142,
|
{0.5, 0.5054446430258502, 0.5109485743270583, 0.5165124395106142,
|
||||||
0.5221368912137069, 0.5278225891802786, 0.5335702003384117, 0.5393803988785598,
|
0.5221368912137069, 0.5278225891802786, 0.5335702003384117, 0.5393803988785598,
|
||||||
0.5452538663326288, 0.5511912916539204, 0.5571933712979462, 0.5632608093041209,
|
0.5452538663326288, 0.5511912916539204, 0.5571933712979462, 0.5632608093041209,
|
||||||
0.5693943173783458, 0.5755946149764913, 0.5818624293887887, 0.5881984958251406,
|
0.5693943173783458, 0.5755946149764913, 0.5818624293887887, 0.5881984958251406,
|
||||||
|
@ -79,7 +79,7 @@ var sparseBounds = [][]float64{
|
||||||
0.9170040432046711, 0.9269895625416926, 0.9370838170551498, 0.9472879907934827,
|
0.9170040432046711, 0.9269895625416926, 0.9370838170551498, 0.9472879907934827,
|
||||||
0.9576032806985735, 0.9680308967461471, 0.9785720620876999, 0.9892280131939752},
|
0.9576032806985735, 0.9680308967461471, 0.9785720620876999, 0.9892280131939752},
|
||||||
// Schema 7:
|
// Schema 7:
|
||||||
[]float64{0.5, 0.5027149505564014, 0.5054446430258502, 0.5081891574554764,
|
{0.5, 0.5027149505564014, 0.5054446430258502, 0.5081891574554764,
|
||||||
0.5109485743270583, 0.5137229745593818, 0.5165124395106142, 0.5193170509806894,
|
0.5109485743270583, 0.5137229745593818, 0.5165124395106142, 0.5193170509806894,
|
||||||
0.5221368912137069, 0.5249720429003435, 0.5278225891802786, 0.5306886136446309,
|
0.5221368912137069, 0.5249720429003435, 0.5278225891802786, 0.5306886136446309,
|
||||||
0.5335702003384117, 0.5364674337629877, 0.5393803988785598, 0.5423091811066545,
|
0.5335702003384117, 0.5364674337629877, 0.5393803988785598, 0.5423091811066545,
|
||||||
|
@ -112,7 +112,7 @@ var sparseBounds = [][]float64{
|
||||||
0.9576032806985735, 0.9628029718180622, 0.9680308967461471, 0.9732872087896164,
|
0.9576032806985735, 0.9628029718180622, 0.9680308967461471, 0.9732872087896164,
|
||||||
0.9785720620876999, 0.9838856116165875, 0.9892280131939752, 0.9945994234836328},
|
0.9785720620876999, 0.9838856116165875, 0.9892280131939752, 0.9945994234836328},
|
||||||
// Schema 8:
|
// Schema 8:
|
||||||
[]float64{0.5, 0.5013556375251013, 0.5027149505564014, 0.5040779490592088,
|
{0.5, 0.5013556375251013, 0.5027149505564014, 0.5040779490592088,
|
||||||
0.5054446430258502, 0.5068150424757447, 0.5081891574554764, 0.509566998038869,
|
0.5054446430258502, 0.5068150424757447, 0.5081891574554764, 0.509566998038869,
|
||||||
0.5109485743270583, 0.5123338964485679, 0.5137229745593818, 0.5151158188430205,
|
0.5109485743270583, 0.5123338964485679, 0.5137229745593818, 0.5151158188430205,
|
||||||
0.5165124395106142, 0.5179128468009786, 0.5193170509806894, 0.520725062344158,
|
0.5165124395106142, 0.5179128468009786, 0.5193170509806894, 0.520725062344158,
|
||||||
|
@ -405,10 +405,27 @@ type HistogramOpts struct {
|
||||||
// of making the zero value of HistogramOpts meaningful. Has to be
|
// of making the zero value of HistogramOpts meaningful. Has to be
|
||||||
// solved more elegantly in the final version.)
|
// solved more elegantly in the final version.)
|
||||||
SparseBucketsZeroThreshold float64
|
SparseBucketsZeroThreshold float64
|
||||||
// TODO(beorn7): Need a setting to limit total bucket count and to
|
|
||||||
// configure a strategy to enforce the limit, e.g. if minimum duration
|
// The remaining fields define a strategy to limit the number of
|
||||||
// after last reset, reset. If not, half the resolution and/or expand
|
// populated sparse buckets. If SparseBucketsMaxNumber is left at zero,
|
||||||
// the zero bucket.
|
// the number of buckets is not limited. Otherwise, once the provided
|
||||||
|
// number is exceeded, the following strategy is enacted: First, if the
|
||||||
|
// last reset (or the creation) of the histogram is at least
|
||||||
|
// SparseBucketsMinResetDuration ago, then the whole histogram is reset
|
||||||
|
// to its initial state (including regular buckets). If less time has
|
||||||
|
// passed, or if SparseBucketsMinResetDuration is zero, no reset is
|
||||||
|
// performed. Instead, the zero threshold is increased sufficiently to
|
||||||
|
// reduce the number of buckets to or below SparseBucketsMaxNumber, but
|
||||||
|
// not to more than SparseBucketsMaxZeroThreshold. Thus, if
|
||||||
|
// SparseBucketsMaxZeroThreshold is already at or below the current zero
|
||||||
|
// threshold, nothing happens at this step. After that, if the number of
|
||||||
|
// buckets still exceeds SparseBucketsMaxNumber, the resolution of the
|
||||||
|
// histogram is reduced by doubling the width of the sparse buckets (up
|
||||||
|
// to a growth factor between one bucket to the next of 2^(2^4) = 65536,
|
||||||
|
// see above).
|
||||||
|
SparseBucketsMaxNumber uint32
|
||||||
|
SparseBucketsMinResetDuration time.Duration
|
||||||
|
SparseBucketsMaxZeroThreshold float64
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHistogram creates a new Histogram based on the provided HistogramOpts. It
|
// NewHistogram creates a new Histogram based on the provided HistogramOpts. It
|
||||||
|
@ -446,11 +463,14 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
|
||||||
}
|
}
|
||||||
|
|
||||||
h := &histogram{
|
h := &histogram{
|
||||||
desc: desc,
|
desc: desc,
|
||||||
upperBounds: opts.Buckets,
|
upperBounds: opts.Buckets,
|
||||||
labelPairs: MakeLabelPairs(desc, labelValues),
|
labelPairs: MakeLabelPairs(desc, labelValues),
|
||||||
counts: [2]*histogramCounts{{}, {}},
|
sparseMaxBuckets: opts.SparseBucketsMaxNumber,
|
||||||
now: time.Now,
|
sparseMaxZeroThreshold: opts.SparseBucketsMaxZeroThreshold,
|
||||||
|
sparseMinResetDuration: opts.SparseBucketsMinResetDuration,
|
||||||
|
lastResetTime: time.Now(),
|
||||||
|
now: time.Now,
|
||||||
}
|
}
|
||||||
if len(h.upperBounds) == 0 && opts.SparseBucketsFactor <= 1 {
|
if len(h.upperBounds) == 0 && opts.SparseBucketsFactor <= 1 {
|
||||||
h.upperBounds = DefBuckets
|
h.upperBounds = DefBuckets
|
||||||
|
@ -460,9 +480,9 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
|
||||||
} else {
|
} else {
|
||||||
switch {
|
switch {
|
||||||
case opts.SparseBucketsZeroThreshold > 0:
|
case opts.SparseBucketsZeroThreshold > 0:
|
||||||
h.sparseThreshold = opts.SparseBucketsZeroThreshold
|
h.sparseZeroThreshold = opts.SparseBucketsZeroThreshold
|
||||||
case opts.SparseBucketsZeroThreshold == 0:
|
case opts.SparseBucketsZeroThreshold == 0:
|
||||||
h.sparseThreshold = DefSparseBucketsZeroThreshold
|
h.sparseZeroThreshold = DefSparseBucketsZeroThreshold
|
||||||
} // Leave h.sparseThreshold at 0 otherwise.
|
} // Leave h.sparseThreshold at 0 otherwise.
|
||||||
h.sparseSchema = pickSparseSchema(opts.SparseBucketsFactor)
|
h.sparseSchema = pickSparseSchema(opts.SparseBucketsFactor)
|
||||||
}
|
}
|
||||||
|
@ -483,8 +503,16 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
|
||||||
}
|
}
|
||||||
// Finally we know the final length of h.upperBounds and can make buckets
|
// Finally we know the final length of h.upperBounds and can make buckets
|
||||||
// for both counts as well as exemplars:
|
// for both counts as well as exemplars:
|
||||||
h.counts[0].buckets = make([]uint64, len(h.upperBounds))
|
h.counts[0] = &histogramCounts{
|
||||||
h.counts[1].buckets = make([]uint64, len(h.upperBounds))
|
buckets: make([]uint64, len(h.upperBounds)),
|
||||||
|
sparseZeroThresholdBits: math.Float64bits(h.sparseZeroThreshold),
|
||||||
|
sparseSchema: h.sparseSchema,
|
||||||
|
}
|
||||||
|
h.counts[1] = &histogramCounts{
|
||||||
|
buckets: make([]uint64, len(h.upperBounds)),
|
||||||
|
sparseZeroThresholdBits: math.Float64bits(h.sparseZeroThreshold),
|
||||||
|
sparseSchema: h.sparseSchema,
|
||||||
|
}
|
||||||
h.exemplars = make([]atomic.Value, len(h.upperBounds)+1)
|
h.exemplars = make([]atomic.Value, len(h.upperBounds)+1)
|
||||||
|
|
||||||
h.init(h) // Init self-collection.
|
h.init(h) // Init self-collection.
|
||||||
|
@ -492,14 +520,32 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
|
||||||
}
|
}
|
||||||
|
|
||||||
type histogramCounts struct {
|
type histogramCounts struct {
|
||||||
|
// Order in this struct matters for the alignment required by atomic
|
||||||
|
// operations, see http://golang.org/pkg/sync/atomic/#pkg-note-BUG
|
||||||
|
|
||||||
// sumBits contains the bits of the float64 representing the sum of all
|
// sumBits contains the bits of the float64 representing the sum of all
|
||||||
// observations. sumBits and count have to go first in the struct to
|
// observations.
|
||||||
// guarantee alignment for atomic operations.
|
|
||||||
// http://golang.org/pkg/sync/atomic/#pkg-note-BUG
|
|
||||||
sumBits uint64
|
sumBits uint64
|
||||||
count uint64
|
count uint64
|
||||||
|
|
||||||
|
// sparseZeroBucket counts all (positive and negative) observations in
|
||||||
|
// the zero bucket (with an absolute value less or equal the current
|
||||||
|
// threshold, see next field.
|
||||||
|
sparseZeroBucket uint64
|
||||||
|
// sparseZeroThresholdBits is the bit pattern of the current threshold
|
||||||
|
// for the zero bucket. It's initially equal to sparseZeroThreshold but
|
||||||
|
// may change according to the bucket count limitation strategy.
|
||||||
|
sparseZeroThresholdBits uint64
|
||||||
|
// sparseSchema may change over time according to the bucket count
|
||||||
|
// limitation strategy and therefore has to be saved here.
|
||||||
|
sparseSchema int32
|
||||||
|
// Number of (positive and negative) sparse buckets.
|
||||||
|
sparseBucketsNumber uint32
|
||||||
|
|
||||||
|
// Regular buckets.
|
||||||
buckets []uint64
|
buckets []uint64
|
||||||
// sparse buckets are implemented with a sync.Map for now. A dedicated
|
|
||||||
|
// Sparse buckets are implemented with a sync.Map for now. A dedicated
|
||||||
// data structure will likely be more efficient. There are separate maps
|
// data structure will likely be more efficient. There are separate maps
|
||||||
// for negative and positive observations. The map's value is an *int64,
|
// for negative and positive observations. The map's value is an *int64,
|
||||||
// counting observations in that bucket. (Note that we don't use uint64
|
// counting observations in that bucket. (Note that we don't use uint64
|
||||||
|
@ -508,38 +554,47 @@ type histogramCounts struct {
|
||||||
// map's key is the index of the bucket according to the used
|
// map's key is the index of the bucket according to the used
|
||||||
// sparseSchema. Index 0 is for an upper bound of 1.
|
// sparseSchema. Index 0 is for an upper bound of 1.
|
||||||
sparseBucketsPositive, sparseBucketsNegative sync.Map
|
sparseBucketsPositive, sparseBucketsNegative sync.Map
|
||||||
// sparseZeroBucket counts all (positive and negative) observations in
|
|
||||||
// the zero bucket (with an absolute value less or equal
|
|
||||||
// SparseBucketsZeroThreshold).
|
|
||||||
sparseZeroBucket uint64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// observe manages the parts of observe that only affects
|
// observe manages the parts of observe that only affects
|
||||||
// histogramCounts. doSparse is true if spare buckets should be done,
|
// histogramCounts. doSparse is true if spare buckets should be done,
|
||||||
// too. whichSparse is 0 for the sparseZeroBucket and +1 or -1 for
|
// too.
|
||||||
// sparseBucketsPositive or sparseBucketsNegative, respectively. sparseKey is
|
func (hc *histogramCounts) observe(v float64, bucket int, doSparse bool) {
|
||||||
// the key of the sparse bucket to use.
|
|
||||||
func (hc *histogramCounts) observe(v float64, bucket int, doSparse bool, whichSparse int, sparseKey int) {
|
|
||||||
if bucket < len(hc.buckets) {
|
if bucket < len(hc.buckets) {
|
||||||
atomic.AddUint64(&hc.buckets[bucket], 1)
|
atomic.AddUint64(&hc.buckets[bucket], 1)
|
||||||
}
|
}
|
||||||
for {
|
atomicAddFloat(&hc.sumBits, v)
|
||||||
oldBits := atomic.LoadUint64(&hc.sumBits)
|
|
||||||
newBits := math.Float64bits(math.Float64frombits(oldBits) + v)
|
|
||||||
if atomic.CompareAndSwapUint64(&hc.sumBits, oldBits, newBits) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if doSparse {
|
if doSparse {
|
||||||
switch whichSparse {
|
var (
|
||||||
case 0:
|
sparseKey int
|
||||||
atomic.AddUint64(&hc.sparseZeroBucket, 1)
|
sparseSchema = atomic.LoadInt32(&hc.sparseSchema)
|
||||||
case +1:
|
sparseZeroThreshold = math.Float64frombits(atomic.LoadUint64(&hc.sparseZeroThresholdBits))
|
||||||
addToSparseBucket(&hc.sparseBucketsPositive, sparseKey, 1)
|
frac, exp = math.Frexp(math.Abs(v))
|
||||||
case -1:
|
bucketCreated bool
|
||||||
addToSparseBucket(&hc.sparseBucketsNegative, sparseKey, 1)
|
)
|
||||||
|
switch {
|
||||||
|
case math.IsInf(v, 0):
|
||||||
|
sparseKey = math.MaxInt32 // Largest possible sparseKey.
|
||||||
|
case sparseSchema > 0:
|
||||||
|
bounds := sparseBounds[sparseSchema]
|
||||||
|
sparseKey = sort.SearchFloat64s(bounds, frac) + (exp-1)*len(bounds)
|
||||||
default:
|
default:
|
||||||
panic(fmt.Errorf("invalid value for whichSparse: %d", whichSparse))
|
sparseKey = exp
|
||||||
|
if frac == 0.5 {
|
||||||
|
sparseKey--
|
||||||
|
}
|
||||||
|
sparseKey /= 1 << -sparseSchema
|
||||||
|
}
|
||||||
|
switch {
|
||||||
|
case v > sparseZeroThreshold:
|
||||||
|
bucketCreated = addToSparseBucket(&hc.sparseBucketsPositive, sparseKey, 1)
|
||||||
|
case v < -sparseZeroThreshold:
|
||||||
|
bucketCreated = addToSparseBucket(&hc.sparseBucketsNegative, sparseKey, 1)
|
||||||
|
default:
|
||||||
|
atomic.AddUint64(&hc.sparseZeroBucket, 1)
|
||||||
|
}
|
||||||
|
if bucketCreated {
|
||||||
|
atomic.AddUint32(&hc.sparseBucketsNumber, 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Increment count last as we take it as a signal that the observation
|
// Increment count last as we take it as a signal that the observation
|
||||||
|
@ -547,21 +602,6 @@ func (hc *histogramCounts) observe(v float64, bucket int, doSparse bool, whichSp
|
||||||
atomic.AddUint64(&hc.count, 1)
|
atomic.AddUint64(&hc.count, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func addToSparseBucket(buckets *sync.Map, key int, increment int64) {
|
|
||||||
if existingBucket, ok := buckets.Load(key); ok {
|
|
||||||
// Fast path without allocation.
|
|
||||||
atomic.AddInt64(existingBucket.(*int64), increment)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Bucket doesn't exist yet. Slow path allocating new counter.
|
|
||||||
newBucket := increment // TODO(beorn7): Check if this is sufficient to not let increment escape.
|
|
||||||
if actualBucket, loaded := buckets.LoadOrStore(key, &newBucket); loaded {
|
|
||||||
// The bucket was created concurrently in another goroutine.
|
|
||||||
// Have to increment after all.
|
|
||||||
atomic.AddInt64(actualBucket.(*int64), increment)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type histogram struct {
|
type histogram struct {
|
||||||
// countAndHotIdx enables lock-free writes with use of atomic updates.
|
// countAndHotIdx enables lock-free writes with use of atomic updates.
|
||||||
// The most significant bit is the hot index [0 or 1] of the count field
|
// The most significant bit is the hot index [0 or 1] of the count field
|
||||||
|
@ -582,8 +622,10 @@ type histogram struct {
|
||||||
countAndHotIdx uint64
|
countAndHotIdx uint64
|
||||||
|
|
||||||
selfCollector
|
selfCollector
|
||||||
desc *Desc
|
desc *Desc
|
||||||
writeMtx sync.Mutex // Only used in the Write method.
|
|
||||||
|
// Only used in the Write method and for sparse bucket management.
|
||||||
|
mtx sync.Mutex
|
||||||
|
|
||||||
// Two counts, one is "hot" for lock-free observations, the other is
|
// Two counts, one is "hot" for lock-free observations, the other is
|
||||||
// "cold" for writing out a dto.Metric. It has to be an array of
|
// "cold" for writing out a dto.Metric. It has to be an array of
|
||||||
|
@ -591,11 +633,15 @@ type histogram struct {
|
||||||
// http://golang.org/pkg/sync/atomic/#pkg-note-BUG.
|
// http://golang.org/pkg/sync/atomic/#pkg-note-BUG.
|
||||||
counts [2]*histogramCounts
|
counts [2]*histogramCounts
|
||||||
|
|
||||||
upperBounds []float64
|
upperBounds []float64
|
||||||
labelPairs []*dto.LabelPair
|
labelPairs []*dto.LabelPair
|
||||||
exemplars []atomic.Value // One more than buckets (to include +Inf), each a *dto.Exemplar.
|
exemplars []atomic.Value // One more than buckets (to include +Inf), each a *dto.Exemplar.
|
||||||
sparseSchema int32 // Set to math.MinInt32 if no sparse buckets are used.
|
sparseSchema int32 // The initial schema. Set to math.MinInt32 if no sparse buckets are used.
|
||||||
sparseThreshold float64
|
sparseZeroThreshold float64 // The initial zero threshold.
|
||||||
|
sparseMaxZeroThreshold float64
|
||||||
|
sparseMaxBuckets uint32
|
||||||
|
sparseMinResetDuration time.Duration
|
||||||
|
lastResetTime time.Time // Protected by mtx.
|
||||||
|
|
||||||
now func() time.Time // To mock out time.Now() for testing.
|
now func() time.Time // To mock out time.Now() for testing.
|
||||||
}
|
}
|
||||||
|
@ -619,8 +665,8 @@ func (h *histogram) Write(out *dto.Metric) error {
|
||||||
// the hot path, i.e. Observe is called much more often than Write. The
|
// the hot path, i.e. Observe is called much more often than Write. The
|
||||||
// complication of making Write lock-free isn't worth it, if possible at
|
// complication of making Write lock-free isn't worth it, if possible at
|
||||||
// all.
|
// all.
|
||||||
h.writeMtx.Lock()
|
h.mtx.Lock()
|
||||||
defer h.writeMtx.Unlock()
|
defer h.mtx.Unlock()
|
||||||
|
|
||||||
// Adding 1<<63 switches the hot index (from 0 to 1 or from 1 to 0)
|
// Adding 1<<63 switches the hot index (from 0 to 1 or from 1 to 0)
|
||||||
// without touching the count bits. See the struct comments for a full
|
// without touching the count bits. See the struct comments for a full
|
||||||
|
@ -633,10 +679,7 @@ func (h *histogram) Write(out *dto.Metric) error {
|
||||||
hotCounts := h.counts[n>>63]
|
hotCounts := h.counts[n>>63]
|
||||||
coldCounts := h.counts[(^n)>>63]
|
coldCounts := h.counts[(^n)>>63]
|
||||||
|
|
||||||
// Await cooldown.
|
waitForCooldown(count, coldCounts)
|
||||||
for count != atomic.LoadUint64(&coldCounts.count) {
|
|
||||||
runtime.Gosched() // Let observations get work done.
|
|
||||||
}
|
|
||||||
|
|
||||||
his := &dto.Histogram{
|
his := &dto.Histogram{
|
||||||
Bucket: make([]*dto.Bucket, len(h.upperBounds)),
|
Bucket: make([]*dto.Bucket, len(h.upperBounds)),
|
||||||
|
@ -666,106 +709,24 @@ func (h *histogram) Write(out *dto.Metric) error {
|
||||||
}
|
}
|
||||||
his.Bucket = append(his.Bucket, b)
|
his.Bucket = append(his.Bucket, b)
|
||||||
}
|
}
|
||||||
// Add all the cold counts to the new hot counts and reset the cold counts.
|
|
||||||
atomic.AddUint64(&hotCounts.count, count)
|
|
||||||
atomic.StoreUint64(&coldCounts.count, 0)
|
|
||||||
for {
|
|
||||||
oldBits := atomic.LoadUint64(&hotCounts.sumBits)
|
|
||||||
newBits := math.Float64bits(math.Float64frombits(oldBits) + his.GetSampleSum())
|
|
||||||
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
|
|
||||||
atomic.StoreUint64(&coldCounts.sumBits, 0)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for i := range h.upperBounds {
|
|
||||||
atomic.AddUint64(&hotCounts.buckets[i], atomic.LoadUint64(&coldCounts.buckets[i]))
|
|
||||||
atomic.StoreUint64(&coldCounts.buckets[i], 0)
|
|
||||||
}
|
|
||||||
if h.sparseSchema > math.MinInt32 {
|
if h.sparseSchema > math.MinInt32 {
|
||||||
his.SbZeroThreshold = &h.sparseThreshold
|
his.SbZeroThreshold = proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sparseZeroThresholdBits)))
|
||||||
his.SbSchema = &h.sparseSchema
|
his.SbSchema = proto.Int32(atomic.LoadInt32(&coldCounts.sparseSchema))
|
||||||
zeroBucket := atomic.LoadUint64(&coldCounts.sparseZeroBucket)
|
zeroBucket := atomic.LoadUint64(&coldCounts.sparseZeroBucket)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
atomic.AddUint64(&hotCounts.sparseZeroBucket, zeroBucket)
|
coldCounts.sparseBucketsPositive.Range(addAndReset(&hotCounts.sparseBucketsPositive, &hotCounts.sparseBucketsNumber))
|
||||||
atomic.StoreUint64(&coldCounts.sparseZeroBucket, 0)
|
coldCounts.sparseBucketsNegative.Range(addAndReset(&hotCounts.sparseBucketsNegative, &hotCounts.sparseBucketsNumber))
|
||||||
coldCounts.sparseBucketsPositive.Range(addAndReset(&hotCounts.sparseBucketsPositive))
|
|
||||||
coldCounts.sparseBucketsNegative.Range(addAndReset(&hotCounts.sparseBucketsNegative))
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
his.SbZeroCount = proto.Uint64(zeroBucket)
|
his.SbZeroCount = proto.Uint64(zeroBucket)
|
||||||
his.SbNegative = makeSparseBuckets(&coldCounts.sparseBucketsNegative)
|
his.SbNegative = makeSparseBuckets(&coldCounts.sparseBucketsNegative)
|
||||||
his.SbPositive = makeSparseBuckets(&coldCounts.sparseBucketsPositive)
|
his.SbPositive = makeSparseBuckets(&coldCounts.sparseBucketsPositive)
|
||||||
}
|
}
|
||||||
|
addAndResetCounts(hotCounts, coldCounts)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeSparseBuckets(buckets *sync.Map) *dto.SparseBuckets {
|
|
||||||
var ii []int
|
|
||||||
buckets.Range(func(k, v interface{}) bool {
|
|
||||||
ii = append(ii, k.(int))
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
sort.Ints(ii)
|
|
||||||
|
|
||||||
if len(ii) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
sbs := dto.SparseBuckets{}
|
|
||||||
var prevCount int64
|
|
||||||
var nextI int
|
|
||||||
|
|
||||||
appendDelta := func(count int64) {
|
|
||||||
*sbs.Span[len(sbs.Span)-1].Length++
|
|
||||||
sbs.Delta = append(sbs.Delta, count-prevCount)
|
|
||||||
prevCount = count
|
|
||||||
}
|
|
||||||
|
|
||||||
for n, i := range ii {
|
|
||||||
v, _ := buckets.Load(i)
|
|
||||||
count := atomic.LoadInt64(v.(*int64))
|
|
||||||
// Multiple spans with only small gaps in between are probably
|
|
||||||
// encoded more efficiently as one larger span with a few empty
|
|
||||||
// buckets. Needs some research to find the sweet spot. For now,
|
|
||||||
// we assume that gaps of one ore two buckets should not create
|
|
||||||
// a new span.
|
|
||||||
iDelta := int32(i - nextI)
|
|
||||||
if n == 0 || iDelta > 2 {
|
|
||||||
// We have to create a new span, either because we are
|
|
||||||
// at the very beginning, or because we have found a gap
|
|
||||||
// of more than two buckets.
|
|
||||||
sbs.Span = append(sbs.Span, &dto.SparseBuckets_Span{
|
|
||||||
Offset: proto.Int32(iDelta),
|
|
||||||
Length: proto.Uint32(0),
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
// We have found a small gap (or no gap at all).
|
|
||||||
// Insert empty buckets as needed.
|
|
||||||
for j := int32(0); j < iDelta; j++ {
|
|
||||||
appendDelta(0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
appendDelta(count)
|
|
||||||
nextI = i + 1
|
|
||||||
}
|
|
||||||
return &sbs
|
|
||||||
}
|
|
||||||
|
|
||||||
// addAndReset returns a function to be used with sync.Map.Range of spare
|
|
||||||
// buckets in coldCounts. It increments the buckets in the provided hotBuckets
|
|
||||||
// according to the buckets ranged through. It then resets all buckets ranged
|
|
||||||
// through to 0 (but leaves them in place so that they don't need to get
|
|
||||||
// recreated on the next scrape).
|
|
||||||
func addAndReset(hotBuckets *sync.Map) func(k, v interface{}) bool {
|
|
||||||
return func(k, v interface{}) bool {
|
|
||||||
bucket := v.(*int64)
|
|
||||||
addToSparseBucket(hotBuckets, k.(int), atomic.LoadInt64(bucket))
|
|
||||||
atomic.StoreInt64(bucket, 0)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// findBucket returns the index of the bucket for the provided value, or
|
// findBucket returns the index of the bucket for the provided value, or
|
||||||
// len(h.upperBounds) for the +Inf bucket.
|
// len(h.upperBounds) for the +Inf bucket.
|
||||||
func (h *histogram) findBucket(v float64) int {
|
func (h *histogram) findBucket(v float64) int {
|
||||||
|
@ -785,34 +746,213 @@ func (h *histogram) findBucket(v float64) int {
|
||||||
func (h *histogram) observe(v float64, bucket int) {
|
func (h *histogram) observe(v float64, bucket int) {
|
||||||
// Do not add to sparse buckets for NaN observations.
|
// Do not add to sparse buckets for NaN observations.
|
||||||
doSparse := h.sparseSchema > math.MinInt32 && !math.IsNaN(v)
|
doSparse := h.sparseSchema > math.MinInt32 && !math.IsNaN(v)
|
||||||
var whichSparse, sparseKey int
|
|
||||||
if doSparse {
|
|
||||||
switch {
|
|
||||||
case v > h.sparseThreshold:
|
|
||||||
whichSparse = +1
|
|
||||||
case v < -h.sparseThreshold:
|
|
||||||
whichSparse = -1
|
|
||||||
}
|
|
||||||
frac, exp := math.Frexp(math.Abs(v))
|
|
||||||
switch {
|
|
||||||
case math.IsInf(v, 0):
|
|
||||||
sparseKey = math.MaxInt32 // Largest possible sparseKey.
|
|
||||||
case h.sparseSchema > 0:
|
|
||||||
bounds := sparseBounds[h.sparseSchema]
|
|
||||||
sparseKey = sort.SearchFloat64s(bounds, frac) + (exp-1)*len(bounds)
|
|
||||||
default:
|
|
||||||
sparseKey = exp
|
|
||||||
if frac == 0.5 {
|
|
||||||
sparseKey--
|
|
||||||
}
|
|
||||||
sparseKey /= 1 << -h.sparseSchema
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// We increment h.countAndHotIdx so that the counter in the lower
|
// We increment h.countAndHotIdx so that the counter in the lower
|
||||||
// 63 bits gets incremented. At the same time, we get the new value
|
// 63 bits gets incremented. At the same time, we get the new value
|
||||||
// back, which we can use to find the currently-hot counts.
|
// back, which we can use to find the currently-hot counts.
|
||||||
n := atomic.AddUint64(&h.countAndHotIdx, 1)
|
n := atomic.AddUint64(&h.countAndHotIdx, 1)
|
||||||
h.counts[n>>63].observe(v, bucket, doSparse, whichSparse, sparseKey)
|
hotCounts := h.counts[n>>63]
|
||||||
|
hotCounts.observe(v, bucket, doSparse)
|
||||||
|
if doSparse {
|
||||||
|
h.limitSparseBuckets(hotCounts, v, bucket)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// limitSparsebuckets applies a strategy to limit the number of populated sparse
|
||||||
|
// buckets. It's generally best effort, and there are situations where the
|
||||||
|
// number can go higher (if even the lowest resolution isn't enough to reduce
|
||||||
|
// the number sufficiently, or if the provided counts aren't fully updated yet
|
||||||
|
// by a concurrently happening Write call).
|
||||||
|
func (h *histogram) limitSparseBuckets(counts *histogramCounts, value float64, bucket int) {
|
||||||
|
if h.sparseMaxBuckets == 0 {
|
||||||
|
return // No limit configured.
|
||||||
|
}
|
||||||
|
if h.sparseMaxBuckets >= atomic.LoadUint32(&counts.sparseBucketsNumber) {
|
||||||
|
return // Bucket limit not exceeded yet.
|
||||||
|
}
|
||||||
|
|
||||||
|
h.mtx.Lock()
|
||||||
|
defer h.mtx.Unlock()
|
||||||
|
|
||||||
|
// The hot counts might have been swapped just before we acquired the
|
||||||
|
// lock. Re-fetch the hot counts first...
|
||||||
|
n := atomic.LoadUint64(&h.countAndHotIdx)
|
||||||
|
hotIdx := n >> 63
|
||||||
|
coldIdx := (^n) >> 63
|
||||||
|
hotCounts := h.counts[hotIdx]
|
||||||
|
coldCounts := h.counts[coldIdx]
|
||||||
|
// ...and then check again if we really have to reduce the bucket count.
|
||||||
|
if h.sparseMaxBuckets >= atomic.LoadUint32(&hotCounts.sparseBucketsNumber) {
|
||||||
|
return // Bucket limit not exceeded after all.
|
||||||
|
}
|
||||||
|
// Try the various strategies in order.
|
||||||
|
if h.maybeReset(hotCounts, coldCounts, coldIdx, value, bucket) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if h.maybeWidenZeroBucket(hotCounts, coldCounts) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
h.doubleBucketWidth(hotCounts, coldCounts)
|
||||||
|
}
|
||||||
|
|
||||||
|
// maybyReset resests the whole histogram if at least h.sparseMinResetDuration
|
||||||
|
// has been passed. It returns true if the histogram has been reset. The caller
|
||||||
|
// must have locked h.mtx.
|
||||||
|
func (h *histogram) maybeReset(hot, cold *histogramCounts, coldIdx uint64, value float64, bucket int) bool {
|
||||||
|
// We are using the possibly mocked h.now() rather than
|
||||||
|
// time.Since(h.lastResetTime) to enable testing.
|
||||||
|
if h.sparseMinResetDuration == 0 || h.now().Sub(h.lastResetTime) < h.sparseMinResetDuration {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// Completely reset coldCounts.
|
||||||
|
h.resetCounts(cold)
|
||||||
|
// Repeat the latest observation to not lose it completely.
|
||||||
|
cold.observe(value, bucket, true)
|
||||||
|
// Make coldCounts the new hot counts while ressetting countAndHotIdx.
|
||||||
|
n := atomic.SwapUint64(&h.countAndHotIdx, (coldIdx<<63)+1)
|
||||||
|
count := n & ((1 << 63) - 1)
|
||||||
|
waitForCooldown(count, hot)
|
||||||
|
// Finally, reset the formerly hot counts, too.
|
||||||
|
h.resetCounts(hot)
|
||||||
|
h.lastResetTime = h.now()
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// maybeWidenZeroBucket widens the zero bucket until it includes the existing
|
||||||
|
// buckets closest to the zero bucket (which could be two, if an equidistant
|
||||||
|
// negative and a positive bucket exists, but usually it's only one bucket to be
|
||||||
|
// merged into the new wider zero bucket). h.sparseMaxZeroThreshold limits how
|
||||||
|
// far the zero bucket can be extended, and if that's not enough to include an
|
||||||
|
// existing bucket, the method returns false. The caller must have locked h.mtx.
|
||||||
|
func (h *histogram) maybeWidenZeroBucket(hot, cold *histogramCounts) bool {
|
||||||
|
currentZeroThreshold := math.Float64frombits(atomic.LoadUint64(&hot.sparseZeroThresholdBits))
|
||||||
|
if currentZeroThreshold >= h.sparseMaxZeroThreshold {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// Find the key of the bucket closest to zero.
|
||||||
|
smallestKey := findSmallestKey(&hot.sparseBucketsPositive)
|
||||||
|
smallestNegativeKey := findSmallestKey(&hot.sparseBucketsNegative)
|
||||||
|
if smallestNegativeKey < smallestKey {
|
||||||
|
smallestKey = smallestNegativeKey
|
||||||
|
}
|
||||||
|
if smallestKey == math.MaxInt32 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
newZeroThreshold := getLe(smallestKey, atomic.LoadInt32(&hot.sparseSchema))
|
||||||
|
if newZeroThreshold > h.sparseMaxZeroThreshold {
|
||||||
|
return false // New threshold would exceed the max threshold.
|
||||||
|
}
|
||||||
|
atomic.StoreUint64(&cold.sparseZeroThresholdBits, math.Float64bits(newZeroThreshold))
|
||||||
|
// Remove applicable buckets.
|
||||||
|
if _, loaded := cold.sparseBucketsNegative.LoadAndDelete(smallestKey); loaded {
|
||||||
|
atomicDecUint32(&cold.sparseBucketsNumber)
|
||||||
|
}
|
||||||
|
if _, loaded := cold.sparseBucketsPositive.LoadAndDelete(smallestKey); loaded {
|
||||||
|
atomicDecUint32(&cold.sparseBucketsNumber)
|
||||||
|
}
|
||||||
|
// Make cold counts the new hot counts.
|
||||||
|
n := atomic.AddUint64(&h.countAndHotIdx, 1<<63)
|
||||||
|
count := n & ((1 << 63) - 1)
|
||||||
|
// Swap the pointer names to represent the new roles and make
|
||||||
|
// the rest less confusing.
|
||||||
|
hot, cold = cold, hot
|
||||||
|
waitForCooldown(count, cold)
|
||||||
|
// Add all the now cold counts to the new hot counts...
|
||||||
|
addAndResetCounts(hot, cold)
|
||||||
|
// ...adjust the new zero threshold in the cold counts, too...
|
||||||
|
atomic.StoreUint64(&cold.sparseZeroThresholdBits, math.Float64bits(newZeroThreshold))
|
||||||
|
// ...and then merge the newly deleted buckets into the wider zero
|
||||||
|
// bucket.
|
||||||
|
mergeAndDeleteOrAddAndReset := func(hotBuckets, coldBuckets *sync.Map) func(k, v interface{}) bool {
|
||||||
|
return func(k, v interface{}) bool {
|
||||||
|
key := k.(int)
|
||||||
|
bucket := v.(*int64)
|
||||||
|
if key == smallestKey {
|
||||||
|
// Merge into hot zero bucket...
|
||||||
|
atomic.AddUint64(&hot.sparseZeroBucket, uint64(atomic.LoadInt64(bucket)))
|
||||||
|
// ...and delete from cold counts.
|
||||||
|
coldBuckets.Delete(key)
|
||||||
|
atomicDecUint32(&cold.sparseBucketsNumber)
|
||||||
|
} else {
|
||||||
|
// Add to corresponding hot bucket...
|
||||||
|
if addToSparseBucket(hotBuckets, key, atomic.LoadInt64(bucket)) {
|
||||||
|
atomic.AddUint32(&hot.sparseBucketsNumber, 1)
|
||||||
|
}
|
||||||
|
// ...and reset cold bucket.
|
||||||
|
atomic.StoreInt64(bucket, 0)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cold.sparseBucketsPositive.Range(mergeAndDeleteOrAddAndReset(&hot.sparseBucketsPositive, &cold.sparseBucketsPositive))
|
||||||
|
cold.sparseBucketsNegative.Range(mergeAndDeleteOrAddAndReset(&hot.sparseBucketsNegative, &cold.sparseBucketsNegative))
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// doubleBucketWidth doubles the bucket width (by decrementing the schema
|
||||||
|
// number). Note that very sparse buckets could lead to a low reduction of the
|
||||||
|
// bucket count (or even no reduction at all). The method does nothing if the
|
||||||
|
// schema is already -4.
|
||||||
|
func (h *histogram) doubleBucketWidth(hot, cold *histogramCounts) {
|
||||||
|
coldSchema := atomic.LoadInt32(&cold.sparseSchema)
|
||||||
|
if coldSchema == -4 {
|
||||||
|
return // Already at lowest resolution.
|
||||||
|
}
|
||||||
|
coldSchema--
|
||||||
|
atomic.StoreInt32(&cold.sparseSchema, coldSchema)
|
||||||
|
// Play it simple and just delete all cold buckets.
|
||||||
|
atomic.StoreUint32(&cold.sparseBucketsNumber, 0)
|
||||||
|
deleteSyncMap(&cold.sparseBucketsNegative)
|
||||||
|
deleteSyncMap(&cold.sparseBucketsPositive)
|
||||||
|
// Make coldCounts the new hot counts.
|
||||||
|
n := atomic.AddUint64(&h.countAndHotIdx, 1<<63)
|
||||||
|
count := n & ((1 << 63) - 1)
|
||||||
|
// Swap the pointer names to represent the new roles and make
|
||||||
|
// the rest less confusing.
|
||||||
|
hot, cold = cold, hot
|
||||||
|
waitForCooldown(count, cold)
|
||||||
|
// Add all the now cold counts to the new hot counts...
|
||||||
|
addAndResetCounts(hot, cold)
|
||||||
|
// ...adjust the schema in the cold counts, too...
|
||||||
|
atomic.StoreInt32(&cold.sparseSchema, coldSchema)
|
||||||
|
// ...and then merge the cold buckets into the wider hot buckets.
|
||||||
|
merge := func(hotBuckets *sync.Map) func(k, v interface{}) bool {
|
||||||
|
return func(k, v interface{}) bool {
|
||||||
|
key := k.(int)
|
||||||
|
bucket := v.(*int64)
|
||||||
|
// Adjust key to match the bucket to merge into.
|
||||||
|
if key > 0 {
|
||||||
|
key++
|
||||||
|
}
|
||||||
|
key /= 2
|
||||||
|
// Add to corresponding hot bucket.
|
||||||
|
if addToSparseBucket(hotBuckets, key, atomic.LoadInt64(bucket)) {
|
||||||
|
atomic.AddUint32(&hot.sparseBucketsNumber, 1)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cold.sparseBucketsPositive.Range(merge(&hot.sparseBucketsPositive))
|
||||||
|
cold.sparseBucketsNegative.Range(merge(&hot.sparseBucketsNegative))
|
||||||
|
// Play it simple again and just delete all cold buckets.
|
||||||
|
atomic.StoreUint32(&cold.sparseBucketsNumber, 0)
|
||||||
|
deleteSyncMap(&cold.sparseBucketsNegative)
|
||||||
|
deleteSyncMap(&cold.sparseBucketsPositive)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *histogram) resetCounts(counts *histogramCounts) {
|
||||||
|
atomic.StoreUint64(&counts.sumBits, 0)
|
||||||
|
atomic.StoreUint64(&counts.count, 0)
|
||||||
|
atomic.StoreUint64(&counts.sparseZeroBucket, 0)
|
||||||
|
atomic.StoreUint64(&counts.sparseZeroThresholdBits, math.Float64bits(h.sparseZeroThreshold))
|
||||||
|
atomic.StoreInt32(&counts.sparseSchema, h.sparseSchema)
|
||||||
|
atomic.StoreUint32(&counts.sparseBucketsNumber, 0)
|
||||||
|
for i := range h.upperBounds {
|
||||||
|
atomic.StoreUint64(&counts.buckets[i], 0)
|
||||||
|
}
|
||||||
|
deleteSyncMap(&counts.sparseBucketsNegative)
|
||||||
|
deleteSyncMap(&counts.sparseBucketsPositive)
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateExemplar replaces the exemplar for the provided bucket. With empty
|
// updateExemplar replaces the exemplar for the provided bucket. With empty
|
||||||
|
@ -1081,3 +1221,163 @@ func pickSparseSchema(bucketFactor float64) int32 {
|
||||||
return -int32(floor)
|
return -int32(floor)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func makeSparseBuckets(buckets *sync.Map) *dto.SparseBuckets {
|
||||||
|
var ii []int
|
||||||
|
buckets.Range(func(k, v interface{}) bool {
|
||||||
|
ii = append(ii, k.(int))
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
sort.Ints(ii)
|
||||||
|
|
||||||
|
if len(ii) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sbs := dto.SparseBuckets{}
|
||||||
|
var prevCount int64
|
||||||
|
var nextI int
|
||||||
|
|
||||||
|
appendDelta := func(count int64) {
|
||||||
|
*sbs.Span[len(sbs.Span)-1].Length++
|
||||||
|
sbs.Delta = append(sbs.Delta, count-prevCount)
|
||||||
|
prevCount = count
|
||||||
|
}
|
||||||
|
|
||||||
|
for n, i := range ii {
|
||||||
|
v, _ := buckets.Load(i)
|
||||||
|
count := atomic.LoadInt64(v.(*int64))
|
||||||
|
// Multiple spans with only small gaps in between are probably
|
||||||
|
// encoded more efficiently as one larger span with a few empty
|
||||||
|
// buckets. Needs some research to find the sweet spot. For now,
|
||||||
|
// we assume that gaps of one ore two buckets should not create
|
||||||
|
// a new span.
|
||||||
|
iDelta := int32(i - nextI)
|
||||||
|
if n == 0 || iDelta > 2 {
|
||||||
|
// We have to create a new span, either because we are
|
||||||
|
// at the very beginning, or because we have found a gap
|
||||||
|
// of more than two buckets.
|
||||||
|
sbs.Span = append(sbs.Span, &dto.SparseBuckets_Span{
|
||||||
|
Offset: proto.Int32(iDelta),
|
||||||
|
Length: proto.Uint32(0),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
// We have found a small gap (or no gap at all).
|
||||||
|
// Insert empty buckets as needed.
|
||||||
|
for j := int32(0); j < iDelta; j++ {
|
||||||
|
appendDelta(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
appendDelta(count)
|
||||||
|
nextI = i + 1
|
||||||
|
}
|
||||||
|
return &sbs
|
||||||
|
}
|
||||||
|
|
||||||
|
// addToSparseBucket increments the sparse bucket at key by the provided
|
||||||
|
// amount. It returns true if a new sparse bucket had to be created for that.
|
||||||
|
func addToSparseBucket(buckets *sync.Map, key int, increment int64) bool {
|
||||||
|
if existingBucket, ok := buckets.Load(key); ok {
|
||||||
|
// Fast path without allocation.
|
||||||
|
atomic.AddInt64(existingBucket.(*int64), increment)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// Bucket doesn't exist yet. Slow path allocating new counter.
|
||||||
|
newBucket := increment // TODO(beorn7): Check if this is sufficient to not let increment escape.
|
||||||
|
if actualBucket, loaded := buckets.LoadOrStore(key, &newBucket); loaded {
|
||||||
|
// The bucket was created concurrently in another goroutine.
|
||||||
|
// Have to increment after all.
|
||||||
|
atomic.AddInt64(actualBucket.(*int64), increment)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// addAndReset returns a function to be used with sync.Map.Range of spare
|
||||||
|
// buckets in coldCounts. It increments the buckets in the provided hotBuckets
|
||||||
|
// according to the buckets ranged through. It then resets all buckets ranged
|
||||||
|
// through to 0 (but leaves them in place so that they don't need to get
|
||||||
|
// recreated on the next scrape).
|
||||||
|
func addAndReset(hotBuckets *sync.Map, bucketNumber *uint32) func(k, v interface{}) bool {
|
||||||
|
return func(k, v interface{}) bool {
|
||||||
|
bucket := v.(*int64)
|
||||||
|
if addToSparseBucket(hotBuckets, k.(int), atomic.LoadInt64(bucket)) {
|
||||||
|
atomic.AddUint32(bucketNumber, 1)
|
||||||
|
}
|
||||||
|
atomic.StoreInt64(bucket, 0)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func deleteSyncMap(m *sync.Map) {
|
||||||
|
m.Range(func(k, v interface{}) bool {
|
||||||
|
m.Delete(k)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func findSmallestKey(m *sync.Map) int {
|
||||||
|
result := math.MaxInt32
|
||||||
|
m.Range(func(k, v interface{}) bool {
|
||||||
|
key := k.(int)
|
||||||
|
if key < result {
|
||||||
|
result = key
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func getLe(key int, schema int32) float64 {
|
||||||
|
if schema < 0 {
|
||||||
|
return math.Ldexp(1, key<<(-schema))
|
||||||
|
}
|
||||||
|
|
||||||
|
fracIdx := key & ((1 << schema) - 1)
|
||||||
|
frac := sparseBounds[schema][fracIdx]
|
||||||
|
exp := (key >> schema) + 1
|
||||||
|
return math.Ldexp(frac, exp)
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitForCooldown returns after the count field in the provided histogramCounts
|
||||||
|
// has reached the provided count value.
|
||||||
|
func waitForCooldown(count uint64, counts *histogramCounts) {
|
||||||
|
for count != atomic.LoadUint64(&counts.count) {
|
||||||
|
runtime.Gosched() // Let observations get work done.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// atomicAddFloat adds the provided float atomically to another float
|
||||||
|
// represented by the bit pattern the bits pointer is pointing to.
|
||||||
|
func atomicAddFloat(bits *uint64, v float64) {
|
||||||
|
for {
|
||||||
|
loadedBits := atomic.LoadUint64(bits)
|
||||||
|
newBits := math.Float64bits(math.Float64frombits(loadedBits) + v)
|
||||||
|
if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// atomicDecUint32 atomically decrements the uint32 p points to. See
|
||||||
|
// https://pkg.go.dev/sync/atomic#AddUint32 to understand how this is done.
|
||||||
|
func atomicDecUint32(p *uint32) {
|
||||||
|
atomic.AddUint32(p, ^uint32(0))
|
||||||
|
}
|
||||||
|
|
||||||
|
// addAndResetCounts adds certain fields (count, sum, conventional buckets,
|
||||||
|
// sparse zero bucket) from the cold counts to the corresponding fields in the
|
||||||
|
// hot counts. Those fields are then reset to 0 in the cold counts.
|
||||||
|
func addAndResetCounts(hot, cold *histogramCounts) {
|
||||||
|
atomic.AddUint64(&hot.count, atomic.LoadUint64(&cold.count))
|
||||||
|
atomic.StoreUint64(&cold.count, 0)
|
||||||
|
coldSum := math.Float64frombits(atomic.LoadUint64(&cold.sumBits))
|
||||||
|
atomicAddFloat(&hot.sumBits, coldSum)
|
||||||
|
atomic.StoreUint64(&cold.sumBits, 0)
|
||||||
|
for i := range hot.buckets {
|
||||||
|
atomic.AddUint64(&hot.buckets[i], atomic.LoadUint64(&cold.buckets[i]))
|
||||||
|
atomic.StoreUint64(&cold.buckets[i], 0)
|
||||||
|
}
|
||||||
|
atomic.AddUint64(&hot.sparseZeroBucket, atomic.LoadUint64(&cold.sparseZeroBucket))
|
||||||
|
atomic.StoreUint64(&cold.sparseZeroBucket, 0)
|
||||||
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"runtime"
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"testing/quick"
|
"testing/quick"
|
||||||
"time"
|
"time"
|
||||||
|
@ -167,7 +168,7 @@ func TestHistogramConcurrency(t *testing.T) {
|
||||||
start.Add(1)
|
start.Add(1)
|
||||||
end.Add(concLevel)
|
end.Add(concLevel)
|
||||||
|
|
||||||
sum := NewHistogram(HistogramOpts{
|
his := NewHistogram(HistogramOpts{
|
||||||
Name: "test_histogram",
|
Name: "test_histogram",
|
||||||
Help: "helpless",
|
Help: "helpless",
|
||||||
Buckets: testBuckets,
|
Buckets: testBuckets,
|
||||||
|
@ -188,9 +189,9 @@ func TestHistogramConcurrency(t *testing.T) {
|
||||||
start.Wait()
|
start.Wait()
|
||||||
for _, v := range vals {
|
for _, v := range vals {
|
||||||
if n%2 == 0 {
|
if n%2 == 0 {
|
||||||
sum.Observe(v)
|
his.Observe(v)
|
||||||
} else {
|
} else {
|
||||||
sum.(ExemplarObserver).ObserveWithExemplar(v, Labels{"foo": "bar"})
|
his.(ExemplarObserver).ObserveWithExemplar(v, Labels{"foo": "bar"})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
end.Done()
|
end.Done()
|
||||||
|
@ -201,7 +202,7 @@ func TestHistogramConcurrency(t *testing.T) {
|
||||||
end.Wait()
|
end.Wait()
|
||||||
|
|
||||||
m := &dto.Metric{}
|
m := &dto.Metric{}
|
||||||
sum.Write(m)
|
his.Write(m)
|
||||||
if got, want := int(*m.Histogram.SampleCount), total; got != want {
|
if got, want := int(*m.Histogram.SampleCount), total; got != want {
|
||||||
t.Errorf("got sample count %d, want %d", got, want)
|
t.Errorf("got sample count %d, want %d", got, want)
|
||||||
}
|
}
|
||||||
|
@ -424,24 +425,24 @@ func TestHistogramExemplar(t *testing.T) {
|
||||||
}
|
}
|
||||||
expectedExemplars := []*dto.Exemplar{
|
expectedExemplars := []*dto.Exemplar{
|
||||||
nil,
|
nil,
|
||||||
&dto.Exemplar{
|
{
|
||||||
Label: []*dto.LabelPair{
|
Label: []*dto.LabelPair{
|
||||||
&dto.LabelPair{Name: proto.String("id"), Value: proto.String("2")},
|
{Name: proto.String("id"), Value: proto.String("2")},
|
||||||
},
|
},
|
||||||
Value: proto.Float64(1.6),
|
Value: proto.Float64(1.6),
|
||||||
Timestamp: ts,
|
Timestamp: ts,
|
||||||
},
|
},
|
||||||
nil,
|
nil,
|
||||||
&dto.Exemplar{
|
{
|
||||||
Label: []*dto.LabelPair{
|
Label: []*dto.LabelPair{
|
||||||
&dto.LabelPair{Name: proto.String("id"), Value: proto.String("3")},
|
{Name: proto.String("id"), Value: proto.String("3")},
|
||||||
},
|
},
|
||||||
Value: proto.Float64(4),
|
Value: proto.Float64(4),
|
||||||
Timestamp: ts,
|
Timestamp: ts,
|
||||||
},
|
},
|
||||||
&dto.Exemplar{
|
{
|
||||||
Label: []*dto.LabelPair{
|
Label: []*dto.LabelPair{
|
||||||
&dto.LabelPair{Name: proto.String("id"), Value: proto.String("4")},
|
{Name: proto.String("id"), Value: proto.String("4")},
|
||||||
},
|
},
|
||||||
Value: proto.Float64(4.5),
|
Value: proto.Float64(4.5),
|
||||||
Timestamp: ts,
|
Timestamp: ts,
|
||||||
|
@ -470,11 +471,14 @@ func TestHistogramExemplar(t *testing.T) {
|
||||||
func TestSparseHistogram(t *testing.T) {
|
func TestSparseHistogram(t *testing.T) {
|
||||||
|
|
||||||
scenarios := []struct {
|
scenarios := []struct {
|
||||||
name string
|
name string
|
||||||
observations []float64
|
observations []float64 // With simulated interval of 1m.
|
||||||
factor float64
|
factor float64
|
||||||
zeroThreshold float64
|
zeroThreshold float64
|
||||||
want string // String representation of protobuf.
|
maxBuckets uint32
|
||||||
|
minResetDuration time.Duration
|
||||||
|
maxZeroThreshold float64
|
||||||
|
want string // String representation of protobuf.
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "no sparse buckets",
|
name: "no sparse buckets",
|
||||||
|
@ -531,18 +535,122 @@ func TestSparseHistogram(t *testing.T) {
|
||||||
factor: 1.2,
|
factor: 1.2,
|
||||||
want: `sample_count:7 sample_sum:-inf sb_schema:2 sb_zero_threshold:2.938735877055719e-39 sb_zero_count:1 sb_negative:<span:<offset:2147483647 length:1 > delta:1 > sb_positive:<span:<offset:0 length:5 > delta:1 delta:-1 delta:2 delta:-2 delta:2 > `,
|
want: `sample_count:7 sample_sum:-inf sb_schema:2 sb_zero_threshold:2.938735877055719e-39 sb_zero_count:1 sb_negative:<span:<offset:2147483647 length:1 > delta:1 > sb_positive:<span:<offset:0 length:5 > delta:1 delta:-1 delta:2 delta:-2 delta:2 > `,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "limited buckets but nothing triggered",
|
||||||
|
observations: []float64{0, 1, 1.2, 1.4, 1.8, 2},
|
||||||
|
factor: 1.2,
|
||||||
|
maxBuckets: 4,
|
||||||
|
want: `sample_count:6 sample_sum:7.4 sb_schema:2 sb_zero_threshold:2.938735877055719e-39 sb_zero_count:1 sb_positive:<span:<offset:0 length:5 > delta:1 delta:-1 delta:2 delta:-2 delta:2 > `,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "buckets limited by halving resolution",
|
||||||
|
observations: []float64{0, 1, 1.1, 1.2, 1.4, 1.8, 2, 3},
|
||||||
|
factor: 1.2,
|
||||||
|
maxBuckets: 4,
|
||||||
|
want: `sample_count:8 sample_sum:11.5 sb_schema:1 sb_zero_threshold:2.938735877055719e-39 sb_zero_count:1 sb_positive:<span:<offset:0 length:5 > delta:1 delta:2 delta:-1 delta:-2 delta:1 > `,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "buckets limited by widening the zero bucket",
|
||||||
|
observations: []float64{0, 1, 1.1, 1.2, 1.4, 1.8, 2, 3},
|
||||||
|
factor: 1.2,
|
||||||
|
maxBuckets: 4,
|
||||||
|
maxZeroThreshold: 1.2,
|
||||||
|
want: `sample_count:8 sample_sum:11.5 sb_schema:2 sb_zero_threshold:1 sb_zero_count:2 sb_positive:<span:<offset:1 length:7 > delta:1 delta:1 delta:-2 delta:2 delta:-2 delta:0 delta:1 > `,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "buckets limited by widening the zero bucket twice",
|
||||||
|
observations: []float64{0, 1, 1.1, 1.2, 1.4, 1.8, 2, 3, 4},
|
||||||
|
factor: 1.2,
|
||||||
|
maxBuckets: 4,
|
||||||
|
maxZeroThreshold: 1.2,
|
||||||
|
want: `sample_count:9 sample_sum:15.5 sb_schema:2 sb_zero_threshold:1.189207115002721 sb_zero_count:3 sb_positive:<span:<offset:2 length:7 > delta:2 delta:-2 delta:2 delta:-2 delta:0 delta:1 delta:0 > `,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "buckets limited by reset",
|
||||||
|
observations: []float64{0, 1, 1.1, 1.2, 1.4, 1.8, 2, 3, 4},
|
||||||
|
factor: 1.2,
|
||||||
|
maxBuckets: 4,
|
||||||
|
maxZeroThreshold: 1.2,
|
||||||
|
minResetDuration: 5 * time.Minute,
|
||||||
|
want: `sample_count:2 sample_sum:7 sb_schema:2 sb_zero_threshold:2.938735877055719e-39 sb_zero_count:0 sb_positive:<span:<offset:7 length:2 > delta:1 delta:0 > `,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "limited buckets but nothing triggered, negative observations",
|
||||||
|
observations: []float64{0, -1, -1.2, -1.4, -1.8, -2},
|
||||||
|
factor: 1.2,
|
||||||
|
maxBuckets: 4,
|
||||||
|
want: `sample_count:6 sample_sum:-7.4 sb_schema:2 sb_zero_threshold:2.938735877055719e-39 sb_zero_count:1 sb_negative:<span:<offset:0 length:5 > delta:1 delta:-1 delta:2 delta:-2 delta:2 > `,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "buckets limited by halving resolution, negative observations",
|
||||||
|
observations: []float64{0, -1, -1.1, -1.2, -1.4, -1.8, -2, -3},
|
||||||
|
factor: 1.2,
|
||||||
|
maxBuckets: 4,
|
||||||
|
want: `sample_count:8 sample_sum:-11.5 sb_schema:1 sb_zero_threshold:2.938735877055719e-39 sb_zero_count:1 sb_negative:<span:<offset:0 length:5 > delta:1 delta:2 delta:-1 delta:-2 delta:1 > `,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "buckets limited by widening the zero bucket, negative observations",
|
||||||
|
observations: []float64{0, -1, -1.1, -1.2, -1.4, -1.8, -2, -3},
|
||||||
|
factor: 1.2,
|
||||||
|
maxBuckets: 4,
|
||||||
|
maxZeroThreshold: 1.2,
|
||||||
|
want: `sample_count:8 sample_sum:-11.5 sb_schema:2 sb_zero_threshold:1 sb_zero_count:2 sb_negative:<span:<offset:1 length:7 > delta:1 delta:1 delta:-2 delta:2 delta:-2 delta:0 delta:1 > `,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "buckets limited by widening the zero bucket twice, negative observations",
|
||||||
|
observations: []float64{0, -1, -1.1, -1.2, -1.4, -1.8, -2, -3, -4},
|
||||||
|
factor: 1.2,
|
||||||
|
maxBuckets: 4,
|
||||||
|
maxZeroThreshold: 1.2,
|
||||||
|
want: `sample_count:9 sample_sum:-15.5 sb_schema:2 sb_zero_threshold:1.189207115002721 sb_zero_count:3 sb_negative:<span:<offset:2 length:7 > delta:2 delta:-2 delta:2 delta:-2 delta:0 delta:1 delta:0 > `,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "buckets limited by reset, negative observations",
|
||||||
|
observations: []float64{0, -1, -1.1, -1.2, -1.4, -1.8, -2, -3, -4},
|
||||||
|
factor: 1.2,
|
||||||
|
maxBuckets: 4,
|
||||||
|
maxZeroThreshold: 1.2,
|
||||||
|
minResetDuration: 5 * time.Minute,
|
||||||
|
want: `sample_count:2 sample_sum:-7 sb_schema:2 sb_zero_threshold:2.938735877055719e-39 sb_zero_count:0 sb_negative:<span:<offset:7 length:2 > delta:1 delta:0 > `,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "buckets limited by halving resolution, then reset",
|
||||||
|
observations: []float64{0, 1, 1.1, 1.2, 1.4, 1.8, 2, 5, 5.1, 3, 4},
|
||||||
|
factor: 1.2,
|
||||||
|
maxBuckets: 4,
|
||||||
|
minResetDuration: 9 * time.Minute,
|
||||||
|
want: `sample_count:2 sample_sum:7 sb_schema:2 sb_zero_threshold:2.938735877055719e-39 sb_zero_count:0 sb_positive:<span:<offset:7 length:2 > delta:1 delta:0 > `,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "buckets limited by widening the zero bucket, then reset",
|
||||||
|
observations: []float64{0, 1, 1.1, 1.2, 1.4, 1.8, 2, 5, 5.1, 3, 4},
|
||||||
|
factor: 1.2,
|
||||||
|
maxBuckets: 4,
|
||||||
|
maxZeroThreshold: 1.2,
|
||||||
|
minResetDuration: 9 * time.Minute,
|
||||||
|
want: `sample_count:2 sample_sum:7 sb_schema:2 sb_zero_threshold:2.938735877055719e-39 sb_zero_count:0 sb_positive:<span:<offset:7 length:2 > delta:1 delta:0 > `,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, s := range scenarios {
|
for _, s := range scenarios {
|
||||||
t.Run(s.name, func(t *testing.T) {
|
t.Run(s.name, func(t *testing.T) {
|
||||||
his := NewHistogram(HistogramOpts{
|
his := NewHistogram(HistogramOpts{
|
||||||
Name: "name",
|
Name: "name",
|
||||||
Help: "help",
|
Help: "help",
|
||||||
SparseBucketsFactor: s.factor,
|
SparseBucketsFactor: s.factor,
|
||||||
SparseBucketsZeroThreshold: s.zeroThreshold,
|
SparseBucketsZeroThreshold: s.zeroThreshold,
|
||||||
|
SparseBucketsMaxNumber: s.maxBuckets,
|
||||||
|
SparseBucketsMinResetDuration: s.minResetDuration,
|
||||||
|
SparseBucketsMaxZeroThreshold: s.maxZeroThreshold,
|
||||||
})
|
})
|
||||||
|
ts := time.Now().Add(30 * time.Second)
|
||||||
|
now := func() time.Time {
|
||||||
|
return ts
|
||||||
|
}
|
||||||
|
his.(*histogram).now = now
|
||||||
for _, o := range s.observations {
|
for _, o := range s.observations {
|
||||||
his.Observe(o)
|
his.Observe(o)
|
||||||
|
ts = ts.Add(time.Minute)
|
||||||
}
|
}
|
||||||
m := &dto.Metric{}
|
m := &dto.Metric{}
|
||||||
if err := his.Write(m); err != nil {
|
if err := his.Write(m); err != nil {
|
||||||
|
@ -556,3 +664,101 @@ func TestSparseHistogram(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSparseHistogramConcurrency(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping test in short mode.")
|
||||||
|
}
|
||||||
|
|
||||||
|
rand.Seed(42)
|
||||||
|
|
||||||
|
it := func(n uint32) bool {
|
||||||
|
mutations := int(n%1e4 + 1e4)
|
||||||
|
concLevel := int(n%5 + 1)
|
||||||
|
total := mutations * concLevel
|
||||||
|
|
||||||
|
var start, end sync.WaitGroup
|
||||||
|
start.Add(1)
|
||||||
|
end.Add(concLevel)
|
||||||
|
|
||||||
|
his := NewHistogram(HistogramOpts{
|
||||||
|
Name: "test_sparse_histogram",
|
||||||
|
Help: "This help is sparse.",
|
||||||
|
SparseBucketsFactor: 1.05,
|
||||||
|
SparseBucketsZeroThreshold: 0.0000001,
|
||||||
|
SparseBucketsMaxNumber: 50,
|
||||||
|
SparseBucketsMinResetDuration: time.Hour, // Comment out to test for totals below.
|
||||||
|
SparseBucketsMaxZeroThreshold: 0.001,
|
||||||
|
})
|
||||||
|
|
||||||
|
ts := time.Now().Add(30 * time.Second).Unix()
|
||||||
|
now := func() time.Time {
|
||||||
|
return time.Unix(atomic.LoadInt64(&ts), 0)
|
||||||
|
}
|
||||||
|
his.(*histogram).now = now
|
||||||
|
|
||||||
|
allVars := make([]float64, total)
|
||||||
|
var sampleSum float64
|
||||||
|
for i := 0; i < concLevel; i++ {
|
||||||
|
vals := make([]float64, mutations)
|
||||||
|
for j := 0; j < mutations; j++ {
|
||||||
|
v := rand.NormFloat64()
|
||||||
|
vals[j] = v
|
||||||
|
allVars[i*mutations+j] = v
|
||||||
|
sampleSum += v
|
||||||
|
}
|
||||||
|
|
||||||
|
go func(vals []float64) {
|
||||||
|
start.Wait()
|
||||||
|
for _, v := range vals {
|
||||||
|
// An observation every 1 to 10 seconds.
|
||||||
|
atomic.AddInt64(&ts, rand.Int63n(10)+1)
|
||||||
|
his.Observe(v)
|
||||||
|
}
|
||||||
|
end.Done()
|
||||||
|
}(vals)
|
||||||
|
}
|
||||||
|
sort.Float64s(allVars)
|
||||||
|
start.Done()
|
||||||
|
end.Wait()
|
||||||
|
|
||||||
|
m := &dto.Metric{}
|
||||||
|
his.Write(m)
|
||||||
|
|
||||||
|
// Uncomment these tests for totals only if you have disabled histogram resets above.
|
||||||
|
//
|
||||||
|
// if got, want := int(*m.Histogram.SampleCount), total; got != want {
|
||||||
|
// t.Errorf("got sample count %d, want %d", got, want)
|
||||||
|
// }
|
||||||
|
// if got, want := *m.Histogram.SampleSum, sampleSum; math.Abs((got-want)/want) > 0.001 {
|
||||||
|
// t.Errorf("got sample sum %f, want %f", got, want)
|
||||||
|
// }
|
||||||
|
|
||||||
|
sumBuckets := int(m.Histogram.GetSbZeroCount())
|
||||||
|
current := 0
|
||||||
|
for _, delta := range m.Histogram.GetSbNegative().GetDelta() {
|
||||||
|
current += int(delta)
|
||||||
|
if current < 0 {
|
||||||
|
t.Fatalf("negative bucket population negative: %d", current)
|
||||||
|
}
|
||||||
|
sumBuckets += current
|
||||||
|
}
|
||||||
|
current = 0
|
||||||
|
for _, delta := range m.Histogram.GetSbPositive().GetDelta() {
|
||||||
|
current += int(delta)
|
||||||
|
if current < 0 {
|
||||||
|
t.Fatalf("positive bucket population negative: %d", current)
|
||||||
|
}
|
||||||
|
sumBuckets += current
|
||||||
|
}
|
||||||
|
if got, want := sumBuckets, int(*m.Histogram.SampleCount); got != want {
|
||||||
|
t.Errorf("got bucket population sum %d, want %d", got, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := quick.Check(it, nil); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue