Avoid the term 'sparse' where possible
This intends to avoid confusing users by the subtle difference between a native histogram and a sparse bucket. Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
parent
d31f13b599
commit
e92a8c7f48
|
@ -65,10 +65,10 @@ func main() {
|
||||||
// one bucket to the text of (at most) 1.1. (The precise factor
|
// one bucket to the text of (at most) 1.1. (The precise factor
|
||||||
// is 2^2^-3 = 1.0905077...)
|
// is 2^2^-3 = 1.0905077...)
|
||||||
rpcDurationsHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
|
rpcDurationsHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||||
Name: "rpc_durations_histogram_seconds",
|
Name: "rpc_durations_histogram_seconds",
|
||||||
Help: "RPC latency distributions.",
|
Help: "RPC latency distributions.",
|
||||||
Buckets: prometheus.LinearBuckets(*normMean-5**normDomain, .5**normDomain, 20),
|
Buckets: prometheus.LinearBuckets(*normMean-5**normDomain, .5**normDomain, 20),
|
||||||
SparseBucketsFactor: 1.1,
|
NativeHistogramBucketFactor: 1.1,
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -28,16 +28,16 @@ import (
|
||||||
dto "github.com/prometheus/client_model/go"
|
dto "github.com/prometheus/client_model/go"
|
||||||
)
|
)
|
||||||
|
|
||||||
// sparseBounds for the frac of observed values. Only relevant for schema > 0.
|
// nativeHistogramBounds for the frac of observed values. Only relevant for
|
||||||
// Position in the slice is the schema. (0 is never used, just here for
|
// schema > 0. The position in the slice is the schema. (0 is never used, just
|
||||||
// convenience of using the schema directly as the index.)
|
// here for convenience of using the schema directly as the index.)
|
||||||
//
|
//
|
||||||
// TODO(beorn7): Currently, we do a binary search into these slices. There are
|
// TODO(beorn7): Currently, we do a binary search into these slices. There are
|
||||||
// ways to turn it into a small number of simple array lookups. It probably only
|
// ways to turn it into a small number of simple array lookups. It probably only
|
||||||
// matters for schema 5 and beyond, but should be investigated. See this comment
|
// matters for schema 5 and beyond, but should be investigated. See this comment
|
||||||
// as a starting point:
|
// as a starting point:
|
||||||
// https://github.com/open-telemetry/opentelemetry-specification/issues/1776#issuecomment-870164310
|
// https://github.com/open-telemetry/opentelemetry-specification/issues/1776#issuecomment-870164310
|
||||||
var sparseBounds = [][]float64{
|
var nativeHistogramBounds = [][]float64{
|
||||||
// Schema "0":
|
// Schema "0":
|
||||||
{0.5},
|
{0.5},
|
||||||
// Schema 1:
|
// Schema 1:
|
||||||
|
@ -190,35 +190,40 @@ var sparseBounds = [][]float64{
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// The sparseBounds above can be generated with the code below.
|
// The nativeHistogramBounds above can be generated with the code below.
|
||||||
// TODO(beorn7): Actually do it via go generate.
|
|
||||||
//
|
//
|
||||||
// var sparseBounds [][]float64 = make([][]float64, 9)
|
// TODO(beorn7): It's tempting to actually use `go generate` to generate the
|
||||||
|
// code above. However, this could lead to slightly different numbers on
|
||||||
|
// different architectures. We still need to come to terms if we are fine with
|
||||||
|
// that, or if we might prefer to specify precise numbers in the standard.
|
||||||
|
//
|
||||||
|
// var nativeHistogramBounds [][]float64 = make([][]float64, 9)
|
||||||
//
|
//
|
||||||
// func init() {
|
// func init() {
|
||||||
// // Populate sparseBounds.
|
// // Populate nativeHistogramBounds.
|
||||||
// numBuckets := 1
|
// numBuckets := 1
|
||||||
// for i := range sparseBounds {
|
// for i := range nativeHistogramBounds {
|
||||||
// bounds := []float64{0.5}
|
// bounds := []float64{0.5}
|
||||||
// factor := math.Exp2(math.Exp2(float64(-i)))
|
// factor := math.Exp2(math.Exp2(float64(-i)))
|
||||||
// for j := 0; j < numBuckets-1; j++ {
|
// for j := 0; j < numBuckets-1; j++ {
|
||||||
// var bound float64
|
// var bound float64
|
||||||
// if (j+1)%2 == 0 {
|
// if (j+1)%2 == 0 {
|
||||||
// // Use previously calculated value for increased precision.
|
// // Use previously calculated value for increased precision.
|
||||||
// bound = sparseBounds[i-1][j/2+1]
|
// bound = nativeHistogramBounds[i-1][j/2+1]
|
||||||
// } else {
|
// } else {
|
||||||
// bound = bounds[j] * factor
|
// bound = bounds[j] * factor
|
||||||
// }
|
// }
|
||||||
// bounds = append(bounds, bound)
|
// bounds = append(bounds, bound)
|
||||||
// }
|
// }
|
||||||
// numBuckets *= 2
|
// numBuckets *= 2
|
||||||
// sparseBounds[i] = bounds
|
// nativeHistogramBounds[i] = bounds
|
||||||
// }
|
// }
|
||||||
// }
|
// }
|
||||||
|
|
||||||
// A Histogram counts individual observations from an event or sample stream in
|
// A Histogram counts individual observations from an event or sample stream in
|
||||||
// configurable buckets. Similar to a Summary, it also provides a sum of
|
// configurable static buckets (or in dynamic sparse buckets as part of the
|
||||||
// observations and an observation count.
|
// experimental Native Histograms, see below for more details). Similar to a
|
||||||
|
// Summary, it also provides a sum of observations and an observation count.
|
||||||
//
|
//
|
||||||
// On the Prometheus server, quantiles can be calculated from a Histogram using
|
// On the Prometheus server, quantiles can be calculated from a Histogram using
|
||||||
// the histogram_quantile PromQL function.
|
// the histogram_quantile PromQL function.
|
||||||
|
@ -227,7 +232,7 @@ var sparseBounds = [][]float64{
|
||||||
// (see the documentation for detailed procedures). However, Histograms require
|
// (see the documentation for detailed procedures). However, Histograms require
|
||||||
// the user to pre-define suitable buckets, and they are in general less
|
// the user to pre-define suitable buckets, and they are in general less
|
||||||
// accurate. (Both problems are addressed by the experimental Native
|
// accurate. (Both problems are addressed by the experimental Native
|
||||||
// Histograms. To use them, configure so-called sparse buckets in the
|
// Histograms. To use them, configure a NativeHistogramBucketFactor in the
|
||||||
// HistogramOpts. They also require a Prometheus server v2.40+ with the
|
// HistogramOpts. They also require a Prometheus server v2.40+ with the
|
||||||
// corresponding feature flag enabled.)
|
// corresponding feature flag enabled.)
|
||||||
//
|
//
|
||||||
|
@ -259,17 +264,17 @@ const bucketLabel = "le"
|
||||||
// customized to your use case.
|
// customized to your use case.
|
||||||
var DefBuckets = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
|
var DefBuckets = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
|
||||||
|
|
||||||
// DefSparseBucketsZeroThreshold is the default value for
|
// DefNativeHistogramZeroThreshold is the default value for
|
||||||
// SparseBucketsZeroThreshold in the HistogramOpts.
|
// NativeHistogramZeroThreshold in the HistogramOpts.
|
||||||
//
|
//
|
||||||
// The value is 2^-128 (or 0.5*2^-127 in the actual IEEE 754 representation),
|
// The value is 2^-128 (or 0.5*2^-127 in the actual IEEE 754 representation),
|
||||||
// which is a bucket boundary at all possible resolutions.
|
// which is a bucket boundary at all possible resolutions.
|
||||||
const DefSparseBucketsZeroThreshold = 2.938735877055719e-39
|
const DefNativeHistogramZeroThreshold = 2.938735877055719e-39
|
||||||
|
|
||||||
// SparseBucketsZeroThresholdZero can be used as SparseBucketsZeroThreshold in
|
// NativeHistogramZeroThresholdZero can be used as NativeHistogramZeroThreshold
|
||||||
// the HistogramOpts to create a zero bucket of width zero, i.e. a zero bucket
|
// in the HistogramOpts to create a zero bucket of width zero, i.e. a zero
|
||||||
// that only receives observations of precisely zero.
|
// bucket that only receives observations of precisely zero.
|
||||||
const SparseBucketsZeroThresholdZero = -1
|
const NativeHistogramZeroThresholdZero = -1
|
||||||
|
|
||||||
var errBucketLabelNotAllowed = fmt.Errorf(
|
var errBucketLabelNotAllowed = fmt.Errorf(
|
||||||
"%q is not allowed as label name in histograms", bucketLabel,
|
"%q is not allowed as label name in histograms", bucketLabel,
|
||||||
|
@ -385,81 +390,83 @@ type HistogramOpts struct {
|
||||||
// to add a highest bucket with +Inf bound, it will be added
|
// to add a highest bucket with +Inf bound, it will be added
|
||||||
// implicitly. If Buckets is left as nil or set to a slice of length
|
// implicitly. If Buckets is left as nil or set to a slice of length
|
||||||
// zero, it is replaced by default buckets. The default buckets are
|
// zero, it is replaced by default buckets. The default buckets are
|
||||||
// DefBuckets if no sparse buckets (see below) are used, otherwise the
|
// DefBuckets if no buckets for a native histogram (see below) are used,
|
||||||
// default is no buckets. (In other words, if you want to use both
|
// otherwise the default is no buckets. (In other words, if you want to
|
||||||
// reguler buckets and sparse buckets, you have to define the regular
|
// use both reguler buckets and buckets for a native histogram, you have
|
||||||
// buckets here explicitly.)
|
// to define the regular buckets here explicitly.)
|
||||||
Buckets []float64
|
Buckets []float64
|
||||||
|
|
||||||
// If SparseBucketsFactor is greater than one, sparse buckets are used
|
// If NativeHistogramBucketFactor is greater than one, so-called sparse
|
||||||
// (in addition to the regular buckets, if defined above). A Histogram
|
// buckets are used (in addition to the regular buckets, if defined
|
||||||
// with sparse buckets will be ingested as a Native Histogram by a
|
// above). A Histogram with sparse buckets will be ingested as a Native
|
||||||
// Prometheus server with that feature enabled (requires Prometheus
|
// Histogram by a Prometheus server with that feature enabled (requires
|
||||||
// v2.40+). Sparse buckets are exponential buckets covering the whole
|
// Prometheus v2.40+). Sparse buckets are exponential buckets covering
|
||||||
// float64 range (with the exception of the “zero” bucket, see
|
// the whole float64 range (with the exception of the “zero” bucket, see
|
||||||
// SparseBucketsZeroThreshold below). From any one bucket to the next,
|
// SparseBucketsZeroThreshold below). From any one bucket to the next,
|
||||||
// the width of the bucket grows by a constant
|
// the width of the bucket grows by a constant
|
||||||
// factor. SparseBucketsFactor provides an upper bound for this factor
|
// factor. NativeHistogramBucketFactor provides an upper bound for this
|
||||||
// (exception see below). The smaller SparseBucketsFactor, the more
|
// factor (exception see below). The smaller
|
||||||
// buckets will be used and thus the more costly the histogram will
|
// NativeHistogramBucketFactor, the more buckets will be used and thus
|
||||||
// become. A generally good trade-off between cost and accuracy is a
|
// the more costly the histogram will become. A generally good trade-off
|
||||||
// value of 1.1 (each bucket is at most 10% wider than the previous
|
// between cost and accuracy is a value of 1.1 (each bucket is at most
|
||||||
// one), which will result in each power of two divided into 8 buckets
|
// 10% wider than the previous one), which will result in each power of
|
||||||
// (e.g. there will be 8 buckets between 1 and 2, same as between 2 and
|
// two divided into 8 buckets (e.g. there will be 8 buckets between 1
|
||||||
// 4, and 4 and 8, etc.).
|
// and 2, same as between 2 and 4, and 4 and 8, etc.).
|
||||||
//
|
//
|
||||||
// Details about the actually used factor: The factor is calculated as
|
// Details about the actually used factor: The factor is calculated as
|
||||||
// 2^(2^n), where n is an integer number between (and including) -8 and
|
// 2^(2^n), where n is an integer number between (and including) -8 and
|
||||||
// 4. n is chosen so that the resulting factor is the largest that is
|
// 4. n is chosen so that the resulting factor is the largest that is
|
||||||
// still smaller or equal to SparseBucketsFactor. Note that the smallest
|
// still smaller or equal to NativeHistogramBucketFactor. Note that the
|
||||||
// possible factor is therefore approx. 1.00271 (i.e. 2^(2^-8) ). If
|
// smallest possible factor is therefore approx. 1.00271 (i.e. 2^(2^-8)
|
||||||
// SparseBucketsFactor is greater than 1 but smaller than 2^(2^-8), then
|
// ). If NativeHistogramBucketFactor is greater than 1 but smaller than
|
||||||
// the actually used factor is still 2^(2^-8) even though it is larger
|
// 2^(2^-8), then the actually used factor is still 2^(2^-8) even though
|
||||||
// than the provided SparseBucketsFactor.
|
// it is larger than the provided NativeHistogramBucketFactor.
|
||||||
//
|
//
|
||||||
// NOTE: Native Histograms are still an experimental feature. Their
|
// NOTE: Native Histograms are still an experimental feature. Their
|
||||||
// behavior might still change without a major version
|
// behavior might still change without a major version
|
||||||
// bump. Subsequently, all SparseBucket... options here might still
|
// bump. Subsequently, all NativeHistogram... options here might still
|
||||||
// change their behavior or name (or might completely disappear) without
|
// change their behavior or name (or might completely disappear) without
|
||||||
// a major version bump.
|
// a major version bump.
|
||||||
SparseBucketsFactor float64
|
NativeHistogramBucketFactor float64
|
||||||
// All observations with an absolute value of less or equal
|
// All observations with an absolute value of less or equal
|
||||||
// SparseBucketsZeroThreshold are accumulated into a “zero” bucket. For
|
// NativeHistogramZeroThreshold are accumulated into a “zero”
|
||||||
// best results, this should be close to a bucket boundary. This is
|
// bucket. For best results, this should be close to a bucket
|
||||||
// usually the case if picking a power of two. If
|
// boundary. This is usually the case if picking a power of two. If
|
||||||
// SparseBucketsZeroThreshold is left at zero,
|
// NativeHistogramZeroThreshold is left at zero,
|
||||||
// DefSparseBucketsZeroThreshold is used as the threshold. To configure
|
// DefSparseBucketsZeroThreshold is used as the threshold. To configure
|
||||||
// a zero bucket with an actual threshold of zero (i.e. only
|
// a zero bucket with an actual threshold of zero (i.e. only
|
||||||
// observations of precisely zero will go into the zero bucket), set
|
// observations of precisely zero will go into the zero bucket), set
|
||||||
// SparseBucketsZeroThreshold to the SparseBucketsZeroThresholdZero
|
// NativeHistogramZeroThreshold to the NativeHistogramZeroThresholdZero
|
||||||
// constant (or any negative float value).
|
// constant (or any negative float value).
|
||||||
SparseBucketsZeroThreshold float64
|
NativeHistogramZeroThreshold float64
|
||||||
|
|
||||||
// The remaining fields define a strategy to limit the number of
|
// The remaining fields define a strategy to limit the number of
|
||||||
// populated sparse buckets. If SparseBucketsMaxNumber is left at zero,
|
// populated sparse buckets. If NativeHistogramMaxBucketNumber is left
|
||||||
// the number of buckets is not limited. (Note that this might lead to
|
// at zero, the number of buckets is not limited. (Note that this might
|
||||||
// unbounded memory consumption if the values observed by the Histogram
|
// lead to unbounded memory consumption if the values observed by the
|
||||||
// are sufficiently wide-spread. In particular, this could be used as a
|
// Histogram are sufficiently wide-spread. In particular, this could be
|
||||||
// DoS attack vector. Where the observed values depend on external
|
// used as a DoS attack vector. Where the observed values depend on
|
||||||
// inputs, it is highly recommended to set a SparseBucketsMaxNumber.)
|
// external inputs, it is highly recommended to set a
|
||||||
// Once the set SparseBucketsMaxNumber is exceeded, the following
|
// NativeHistogramMaxBucketNumber.) Once the set
|
||||||
// strategy is enacted: First, if the last reset (or the creation) of
|
// NativeHistogramMaxBucketNumber is exceeded, the following strategy is
|
||||||
// the histogram is at least SparseBucketsMinResetDuration ago, then the
|
// enacted: First, if the last reset (or the creation) of the histogram
|
||||||
// whole histogram is reset to its initial state (including regular
|
// is at least NativeHistogramMinResetDuration ago, then the whole
|
||||||
|
// histogram is reset to its initial state (including regular
|
||||||
// buckets). If less time has passed, or if
|
// buckets). If less time has passed, or if
|
||||||
// SparseBucketsMinResetDuration is zero, no reset is
|
// NativeHistogramMinResetDuration is zero, no reset is
|
||||||
// performed. Instead, the zero threshold is increased sufficiently to
|
// performed. Instead, the zero threshold is increased sufficiently to
|
||||||
// reduce the number of buckets to or below SparseBucketsMaxNumber, but
|
// reduce the number of buckets to or below
|
||||||
// not to more than SparseBucketsMaxZeroThreshold. Thus, if
|
// NativeHistogramMaxBucketNumber, but not to more than
|
||||||
// SparseBucketsMaxZeroThreshold is already at or below the current zero
|
// NativeHistogramMaxZeroThreshold. Thus, if
|
||||||
// threshold, nothing happens at this step. After that, if the number of
|
// NativeHistogramMaxZeroThreshold is already at or below the current
|
||||||
// buckets still exceeds SparseBucketsMaxNumber, the resolution of the
|
// zero threshold, nothing happens at this step. After that, if the
|
||||||
// histogram is reduced by doubling the width of the sparse buckets (up
|
// number of buckets still exceeds NativeHistogramMaxBucketNumber, the
|
||||||
// to a growth factor between one bucket to the next of 2^(2^4) = 65536,
|
// resolution of the histogram is reduced by doubling the width of the
|
||||||
// see above).
|
// sparse buckets (up to a growth factor between one bucket to the next
|
||||||
SparseBucketsMaxNumber uint32
|
// of 2^(2^4) = 65536, see above).
|
||||||
SparseBucketsMinResetDuration time.Duration
|
NativeHistogramMaxBucketNumber uint32
|
||||||
SparseBucketsMaxZeroThreshold float64
|
NativeHistogramMinResetDuration time.Duration
|
||||||
|
NativeHistogramMaxZeroThreshold float64
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHistogram creates a new Histogram based on the provided HistogramOpts. It
|
// NewHistogram creates a new Histogram based on the provided HistogramOpts. It
|
||||||
|
@ -497,28 +504,28 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
|
||||||
}
|
}
|
||||||
|
|
||||||
h := &histogram{
|
h := &histogram{
|
||||||
desc: desc,
|
desc: desc,
|
||||||
upperBounds: opts.Buckets,
|
upperBounds: opts.Buckets,
|
||||||
labelPairs: MakeLabelPairs(desc, labelValues),
|
labelPairs: MakeLabelPairs(desc, labelValues),
|
||||||
sparseMaxBuckets: opts.SparseBucketsMaxNumber,
|
nativeHistogramMaxBuckets: opts.NativeHistogramMaxBucketNumber,
|
||||||
sparseMaxZeroThreshold: opts.SparseBucketsMaxZeroThreshold,
|
nativeHistogramMaxZeroThreshold: opts.NativeHistogramMaxZeroThreshold,
|
||||||
sparseMinResetDuration: opts.SparseBucketsMinResetDuration,
|
nativeHistogramMinResetDuration: opts.NativeHistogramMinResetDuration,
|
||||||
lastResetTime: time.Now(),
|
lastResetTime: time.Now(),
|
||||||
now: time.Now,
|
now: time.Now,
|
||||||
}
|
}
|
||||||
if len(h.upperBounds) == 0 && opts.SparseBucketsFactor <= 1 {
|
if len(h.upperBounds) == 0 && opts.NativeHistogramBucketFactor <= 1 {
|
||||||
h.upperBounds = DefBuckets
|
h.upperBounds = DefBuckets
|
||||||
}
|
}
|
||||||
if opts.SparseBucketsFactor <= 1 {
|
if opts.NativeHistogramBucketFactor <= 1 {
|
||||||
h.sparseSchema = math.MinInt32 // To mark that there are no sparse buckets.
|
h.nativeHistogramSchema = math.MinInt32 // To mark that there are no sparse buckets.
|
||||||
} else {
|
} else {
|
||||||
switch {
|
switch {
|
||||||
case opts.SparseBucketsZeroThreshold > 0:
|
case opts.NativeHistogramZeroThreshold > 0:
|
||||||
h.sparseZeroThreshold = opts.SparseBucketsZeroThreshold
|
h.nativeHistogramZeroThreshold = opts.NativeHistogramZeroThreshold
|
||||||
case opts.SparseBucketsZeroThreshold == 0:
|
case opts.NativeHistogramZeroThreshold == 0:
|
||||||
h.sparseZeroThreshold = DefSparseBucketsZeroThreshold
|
h.nativeHistogramZeroThreshold = DefNativeHistogramZeroThreshold
|
||||||
} // Leave h.sparseThreshold at 0 otherwise.
|
} // Leave h.nativeHistogramZeroThreshold at 0 otherwise.
|
||||||
h.sparseSchema = pickSparseSchema(opts.SparseBucketsFactor)
|
h.nativeHistogramSchema = pickSchema(opts.NativeHistogramBucketFactor)
|
||||||
}
|
}
|
||||||
for i, upperBound := range h.upperBounds {
|
for i, upperBound := range h.upperBounds {
|
||||||
if i < len(h.upperBounds)-1 {
|
if i < len(h.upperBounds)-1 {
|
||||||
|
@ -538,14 +545,14 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
|
||||||
// Finally we know the final length of h.upperBounds and can make buckets
|
// Finally we know the final length of h.upperBounds and can make buckets
|
||||||
// for both counts as well as exemplars:
|
// for both counts as well as exemplars:
|
||||||
h.counts[0] = &histogramCounts{
|
h.counts[0] = &histogramCounts{
|
||||||
buckets: make([]uint64, len(h.upperBounds)),
|
buckets: make([]uint64, len(h.upperBounds)),
|
||||||
sparseZeroThresholdBits: math.Float64bits(h.sparseZeroThreshold),
|
nativeHistogramZeroThresholdBits: math.Float64bits(h.nativeHistogramZeroThreshold),
|
||||||
sparseSchema: h.sparseSchema,
|
nativeHistogramSchema: h.nativeHistogramSchema,
|
||||||
}
|
}
|
||||||
h.counts[1] = &histogramCounts{
|
h.counts[1] = &histogramCounts{
|
||||||
buckets: make([]uint64, len(h.upperBounds)),
|
buckets: make([]uint64, len(h.upperBounds)),
|
||||||
sparseZeroThresholdBits: math.Float64bits(h.sparseZeroThreshold),
|
nativeHistogramZeroThresholdBits: math.Float64bits(h.nativeHistogramZeroThreshold),
|
||||||
sparseSchema: h.sparseSchema,
|
nativeHistogramSchema: h.nativeHistogramSchema,
|
||||||
}
|
}
|
||||||
h.exemplars = make([]atomic.Value, len(h.upperBounds)+1)
|
h.exemplars = make([]atomic.Value, len(h.upperBounds)+1)
|
||||||
|
|
||||||
|
@ -562,36 +569,38 @@ type histogramCounts struct {
|
||||||
sumBits uint64
|
sumBits uint64
|
||||||
count uint64
|
count uint64
|
||||||
|
|
||||||
// sparseZeroBucket counts all (positive and negative) observations in
|
// nativeHistogramZeroBucket counts all (positive and negative)
|
||||||
// the zero bucket (with an absolute value less or equal the current
|
// observations in the zero bucket (with an absolute value less or equal
|
||||||
// threshold, see next field.
|
// the current threshold, see next field.
|
||||||
sparseZeroBucket uint64
|
nativeHistogramZeroBucket uint64
|
||||||
// sparseZeroThresholdBits is the bit pattern of the current threshold
|
// nativeHistogramZeroThresholdBits is the bit pattern of the current
|
||||||
// for the zero bucket. It's initially equal to sparseZeroThreshold but
|
// threshold for the zero bucket. It's initially equal to
|
||||||
// may change according to the bucket count limitation strategy.
|
// nativeHistogramZeroThreshold but may change according to the bucket
|
||||||
sparseZeroThresholdBits uint64
|
// count limitation strategy.
|
||||||
// sparseSchema may change over time according to the bucket count
|
nativeHistogramZeroThresholdBits uint64
|
||||||
// limitation strategy and therefore has to be saved here.
|
// nativeHistogramSchema may change over time according to the bucket
|
||||||
sparseSchema int32
|
// count limitation strategy and therefore has to be saved here.
|
||||||
|
nativeHistogramSchema int32
|
||||||
// Number of (positive and negative) sparse buckets.
|
// Number of (positive and negative) sparse buckets.
|
||||||
sparseBucketsNumber uint32
|
nativeHistogramBucketsNumber uint32
|
||||||
|
|
||||||
// Regular buckets.
|
// Regular buckets.
|
||||||
buckets []uint64
|
buckets []uint64
|
||||||
|
|
||||||
// Sparse buckets are implemented with a sync.Map for now. A dedicated
|
// The sparse buckets for native histograms are implemented with a
|
||||||
// data structure will likely be more efficient. There are separate maps
|
// sync.Map for now. A dedicated data structure will likely be more
|
||||||
// for negative and positive observations. The map's value is an *int64,
|
// efficient. There are separate maps for negative and positive
|
||||||
// counting observations in that bucket. (Note that we don't use uint64
|
// observations. The map's value is an *int64, counting observations in
|
||||||
// as an int64 won't overflow in practice, and working with signed
|
// that bucket. (Note that we don't use uint64 as an int64 won't
|
||||||
// numbers from the beginning simplifies the handling of deltas.) The
|
// overflow in practice, and working with signed numbers from the
|
||||||
// map's key is the index of the bucket according to the used
|
// beginning simplifies the handling of deltas.) The map's key is the
|
||||||
// sparseSchema. Index 0 is for an upper bound of 1.
|
// index of the bucket according to the used
|
||||||
sparseBucketsPositive, sparseBucketsNegative sync.Map
|
// nativeHistogramSchema. Index 0 is for an upper bound of 1.
|
||||||
|
nativeHistogramBucketsPositive, nativeHistogramBucketsNegative sync.Map
|
||||||
}
|
}
|
||||||
|
|
||||||
// observe manages the parts of observe that only affects
|
// observe manages the parts of observe that only affects
|
||||||
// histogramCounts. doSparse is true if spare buckets should be done,
|
// histogramCounts. doSparse is true if sparse buckets should be done,
|
||||||
// too.
|
// too.
|
||||||
func (hc *histogramCounts) observe(v float64, bucket int, doSparse bool) {
|
func (hc *histogramCounts) observe(v float64, bucket int, doSparse bool) {
|
||||||
if bucket < len(hc.buckets) {
|
if bucket < len(hc.buckets) {
|
||||||
|
@ -600,13 +609,13 @@ func (hc *histogramCounts) observe(v float64, bucket int, doSparse bool) {
|
||||||
atomicAddFloat(&hc.sumBits, v)
|
atomicAddFloat(&hc.sumBits, v)
|
||||||
if doSparse && !math.IsNaN(v) {
|
if doSparse && !math.IsNaN(v) {
|
||||||
var (
|
var (
|
||||||
sparseKey int
|
key int
|
||||||
sparseSchema = atomic.LoadInt32(&hc.sparseSchema)
|
schema = atomic.LoadInt32(&hc.nativeHistogramSchema)
|
||||||
sparseZeroThreshold = math.Float64frombits(atomic.LoadUint64(&hc.sparseZeroThresholdBits))
|
zeroThreshold = math.Float64frombits(atomic.LoadUint64(&hc.nativeHistogramZeroThresholdBits))
|
||||||
bucketCreated, isInf bool
|
bucketCreated, isInf bool
|
||||||
)
|
)
|
||||||
if math.IsInf(v, 0) {
|
if math.IsInf(v, 0) {
|
||||||
// Pretend v is MaxFloat64 but later increment sparseKey by one.
|
// Pretend v is MaxFloat64 but later increment key by one.
|
||||||
if math.IsInf(v, +1) {
|
if math.IsInf(v, +1) {
|
||||||
v = math.MaxFloat64
|
v = math.MaxFloat64
|
||||||
} else {
|
} else {
|
||||||
|
@ -615,30 +624,30 @@ func (hc *histogramCounts) observe(v float64, bucket int, doSparse bool) {
|
||||||
isInf = true
|
isInf = true
|
||||||
}
|
}
|
||||||
frac, exp := math.Frexp(math.Abs(v))
|
frac, exp := math.Frexp(math.Abs(v))
|
||||||
if sparseSchema > 0 {
|
if schema > 0 {
|
||||||
bounds := sparseBounds[sparseSchema]
|
bounds := nativeHistogramBounds[schema]
|
||||||
sparseKey = sort.SearchFloat64s(bounds, frac) + (exp-1)*len(bounds)
|
key = sort.SearchFloat64s(bounds, frac) + (exp-1)*len(bounds)
|
||||||
} else {
|
} else {
|
||||||
sparseKey = exp
|
key = exp
|
||||||
if frac == 0.5 {
|
if frac == 0.5 {
|
||||||
sparseKey--
|
key--
|
||||||
}
|
}
|
||||||
div := 1 << -sparseSchema
|
div := 1 << -schema
|
||||||
sparseKey = (sparseKey + div - 1) / div
|
key = (key + div - 1) / div
|
||||||
}
|
}
|
||||||
if isInf {
|
if isInf {
|
||||||
sparseKey++
|
key++
|
||||||
}
|
}
|
||||||
switch {
|
switch {
|
||||||
case v > sparseZeroThreshold:
|
case v > zeroThreshold:
|
||||||
bucketCreated = addToSparseBucket(&hc.sparseBucketsPositive, sparseKey, 1)
|
bucketCreated = addToBucket(&hc.nativeHistogramBucketsPositive, key, 1)
|
||||||
case v < -sparseZeroThreshold:
|
case v < -zeroThreshold:
|
||||||
bucketCreated = addToSparseBucket(&hc.sparseBucketsNegative, sparseKey, 1)
|
bucketCreated = addToBucket(&hc.nativeHistogramBucketsNegative, key, 1)
|
||||||
default:
|
default:
|
||||||
atomic.AddUint64(&hc.sparseZeroBucket, 1)
|
atomic.AddUint64(&hc.nativeHistogramZeroBucket, 1)
|
||||||
}
|
}
|
||||||
if bucketCreated {
|
if bucketCreated {
|
||||||
atomic.AddUint32(&hc.sparseBucketsNumber, 1)
|
atomic.AddUint32(&hc.nativeHistogramBucketsNumber, 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Increment count last as we take it as a signal that the observation
|
// Increment count last as we take it as a signal that the observation
|
||||||
|
@ -677,15 +686,15 @@ type histogram struct {
|
||||||
// http://golang.org/pkg/sync/atomic/#pkg-note-BUG.
|
// http://golang.org/pkg/sync/atomic/#pkg-note-BUG.
|
||||||
counts [2]*histogramCounts
|
counts [2]*histogramCounts
|
||||||
|
|
||||||
upperBounds []float64
|
upperBounds []float64
|
||||||
labelPairs []*dto.LabelPair
|
labelPairs []*dto.LabelPair
|
||||||
exemplars []atomic.Value // One more than buckets (to include +Inf), each a *dto.Exemplar.
|
exemplars []atomic.Value // One more than buckets (to include +Inf), each a *dto.Exemplar.
|
||||||
sparseSchema int32 // The initial schema. Set to math.MinInt32 if no sparse buckets are used.
|
nativeHistogramSchema int32 // The initial schema. Set to math.MinInt32 if no sparse buckets are used.
|
||||||
sparseZeroThreshold float64 // The initial zero threshold.
|
nativeHistogramZeroThreshold float64 // The initial zero threshold.
|
||||||
sparseMaxZeroThreshold float64
|
nativeHistogramMaxZeroThreshold float64
|
||||||
sparseMaxBuckets uint32
|
nativeHistogramMaxBuckets uint32
|
||||||
sparseMinResetDuration time.Duration
|
nativeHistogramMinResetDuration time.Duration
|
||||||
lastResetTime time.Time // Protected by mtx.
|
lastResetTime time.Time // Protected by mtx.
|
||||||
|
|
||||||
now func() time.Time // To mock out time.Now() for testing.
|
now func() time.Time // To mock out time.Now() for testing.
|
||||||
}
|
}
|
||||||
|
@ -753,19 +762,19 @@ func (h *histogram) Write(out *dto.Metric) error {
|
||||||
}
|
}
|
||||||
his.Bucket = append(his.Bucket, b)
|
his.Bucket = append(his.Bucket, b)
|
||||||
}
|
}
|
||||||
if h.sparseSchema > math.MinInt32 {
|
if h.nativeHistogramSchema > math.MinInt32 {
|
||||||
his.ZeroThreshold = proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sparseZeroThresholdBits)))
|
his.ZeroThreshold = proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.nativeHistogramZeroThresholdBits)))
|
||||||
his.Schema = proto.Int32(atomic.LoadInt32(&coldCounts.sparseSchema))
|
his.Schema = proto.Int32(atomic.LoadInt32(&coldCounts.nativeHistogramSchema))
|
||||||
zeroBucket := atomic.LoadUint64(&coldCounts.sparseZeroBucket)
|
zeroBucket := atomic.LoadUint64(&coldCounts.nativeHistogramZeroBucket)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
coldCounts.sparseBucketsPositive.Range(addAndReset(&hotCounts.sparseBucketsPositive, &hotCounts.sparseBucketsNumber))
|
coldCounts.nativeHistogramBucketsPositive.Range(addAndReset(&hotCounts.nativeHistogramBucketsPositive, &hotCounts.nativeHistogramBucketsNumber))
|
||||||
coldCounts.sparseBucketsNegative.Range(addAndReset(&hotCounts.sparseBucketsNegative, &hotCounts.sparseBucketsNumber))
|
coldCounts.nativeHistogramBucketsNegative.Range(addAndReset(&hotCounts.nativeHistogramBucketsNegative, &hotCounts.nativeHistogramBucketsNumber))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
his.ZeroCount = proto.Uint64(zeroBucket)
|
his.ZeroCount = proto.Uint64(zeroBucket)
|
||||||
his.NegativeSpan, his.NegativeDelta = makeSparseBuckets(&coldCounts.sparseBucketsNegative)
|
his.NegativeSpan, his.NegativeDelta = makeBuckets(&coldCounts.nativeHistogramBucketsNegative)
|
||||||
his.PositiveSpan, his.PositiveDelta = makeSparseBuckets(&coldCounts.sparseBucketsPositive)
|
his.PositiveSpan, his.PositiveDelta = makeBuckets(&coldCounts.nativeHistogramBucketsPositive)
|
||||||
}
|
}
|
||||||
addAndResetCounts(hotCounts, coldCounts)
|
addAndResetCounts(hotCounts, coldCounts)
|
||||||
return nil
|
return nil
|
||||||
|
@ -789,7 +798,7 @@ func (h *histogram) findBucket(v float64) int {
|
||||||
// observe is the implementation for Observe without the findBucket part.
|
// observe is the implementation for Observe without the findBucket part.
|
||||||
func (h *histogram) observe(v float64, bucket int) {
|
func (h *histogram) observe(v float64, bucket int) {
|
||||||
// Do not add to sparse buckets for NaN observations.
|
// Do not add to sparse buckets for NaN observations.
|
||||||
doSparse := h.sparseSchema > math.MinInt32 && !math.IsNaN(v)
|
doSparse := h.nativeHistogramSchema > math.MinInt32 && !math.IsNaN(v)
|
||||||
// We increment h.countAndHotIdx so that the counter in the lower
|
// We increment h.countAndHotIdx so that the counter in the lower
|
||||||
// 63 bits gets incremented. At the same time, we get the new value
|
// 63 bits gets incremented. At the same time, we get the new value
|
||||||
// back, which we can use to find the currently-hot counts.
|
// back, which we can use to find the currently-hot counts.
|
||||||
|
@ -797,7 +806,7 @@ func (h *histogram) observe(v float64, bucket int) {
|
||||||
hotCounts := h.counts[n>>63]
|
hotCounts := h.counts[n>>63]
|
||||||
hotCounts.observe(v, bucket, doSparse)
|
hotCounts.observe(v, bucket, doSparse)
|
||||||
if doSparse {
|
if doSparse {
|
||||||
h.limitSparseBuckets(hotCounts, v, bucket)
|
h.limitBuckets(hotCounts, v, bucket)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -806,11 +815,11 @@ func (h *histogram) observe(v float64, bucket int) {
|
||||||
// number can go higher (if even the lowest resolution isn't enough to reduce
|
// number can go higher (if even the lowest resolution isn't enough to reduce
|
||||||
// the number sufficiently, or if the provided counts aren't fully updated yet
|
// the number sufficiently, or if the provided counts aren't fully updated yet
|
||||||
// by a concurrently happening Write call).
|
// by a concurrently happening Write call).
|
||||||
func (h *histogram) limitSparseBuckets(counts *histogramCounts, value float64, bucket int) {
|
func (h *histogram) limitBuckets(counts *histogramCounts, value float64, bucket int) {
|
||||||
if h.sparseMaxBuckets == 0 {
|
if h.nativeHistogramMaxBuckets == 0 {
|
||||||
return // No limit configured.
|
return // No limit configured.
|
||||||
}
|
}
|
||||||
if h.sparseMaxBuckets >= atomic.LoadUint32(&counts.sparseBucketsNumber) {
|
if h.nativeHistogramMaxBuckets >= atomic.LoadUint32(&counts.nativeHistogramBucketsNumber) {
|
||||||
return // Bucket limit not exceeded yet.
|
return // Bucket limit not exceeded yet.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -825,7 +834,7 @@ func (h *histogram) limitSparseBuckets(counts *histogramCounts, value float64, b
|
||||||
hotCounts := h.counts[hotIdx]
|
hotCounts := h.counts[hotIdx]
|
||||||
coldCounts := h.counts[coldIdx]
|
coldCounts := h.counts[coldIdx]
|
||||||
// ...and then check again if we really have to reduce the bucket count.
|
// ...and then check again if we really have to reduce the bucket count.
|
||||||
if h.sparseMaxBuckets >= atomic.LoadUint32(&hotCounts.sparseBucketsNumber) {
|
if h.nativeHistogramMaxBuckets >= atomic.LoadUint32(&hotCounts.nativeHistogramBucketsNumber) {
|
||||||
return // Bucket limit not exceeded after all.
|
return // Bucket limit not exceeded after all.
|
||||||
}
|
}
|
||||||
// Try the various strategies in order.
|
// Try the various strategies in order.
|
||||||
|
@ -838,13 +847,13 @@ func (h *histogram) limitSparseBuckets(counts *histogramCounts, value float64, b
|
||||||
h.doubleBucketWidth(hotCounts, coldCounts)
|
h.doubleBucketWidth(hotCounts, coldCounts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// maybeReset resests the whole histogram if at least h.sparseMinResetDuration
|
// maybeReset resests the whole histogram if at least h.nativeHistogramMinResetDuration
|
||||||
// has been passed. It returns true if the histogram has been reset. The caller
|
// has been passed. It returns true if the histogram has been reset. The caller
|
||||||
// must have locked h.mtx.
|
// must have locked h.mtx.
|
||||||
func (h *histogram) maybeReset(hot, cold *histogramCounts, coldIdx uint64, value float64, bucket int) bool {
|
func (h *histogram) maybeReset(hot, cold *histogramCounts, coldIdx uint64, value float64, bucket int) bool {
|
||||||
// We are using the possibly mocked h.now() rather than
|
// We are using the possibly mocked h.now() rather than
|
||||||
// time.Since(h.lastResetTime) to enable testing.
|
// time.Since(h.lastResetTime) to enable testing.
|
||||||
if h.sparseMinResetDuration == 0 || h.now().Sub(h.lastResetTime) < h.sparseMinResetDuration {
|
if h.nativeHistogramMinResetDuration == 0 || h.now().Sub(h.lastResetTime) < h.nativeHistogramMinResetDuration {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
// Completely reset coldCounts.
|
// Completely reset coldCounts.
|
||||||
|
@ -864,34 +873,35 @@ func (h *histogram) maybeReset(hot, cold *histogramCounts, coldIdx uint64, value
|
||||||
// maybeWidenZeroBucket widens the zero bucket until it includes the existing
|
// maybeWidenZeroBucket widens the zero bucket until it includes the existing
|
||||||
// buckets closest to the zero bucket (which could be two, if an equidistant
|
// buckets closest to the zero bucket (which could be two, if an equidistant
|
||||||
// negative and a positive bucket exists, but usually it's only one bucket to be
|
// negative and a positive bucket exists, but usually it's only one bucket to be
|
||||||
// merged into the new wider zero bucket). h.sparseMaxZeroThreshold limits how
|
// merged into the new wider zero bucket). h.nativeHistogramMaxZeroThreshold
|
||||||
// far the zero bucket can be extended, and if that's not enough to include an
|
// limits how far the zero bucket can be extended, and if that's not enough to
|
||||||
// existing bucket, the method returns false. The caller must have locked h.mtx.
|
// include an existing bucket, the method returns false. The caller must have
|
||||||
|
// locked h.mtx.
|
||||||
func (h *histogram) maybeWidenZeroBucket(hot, cold *histogramCounts) bool {
|
func (h *histogram) maybeWidenZeroBucket(hot, cold *histogramCounts) bool {
|
||||||
currentZeroThreshold := math.Float64frombits(atomic.LoadUint64(&hot.sparseZeroThresholdBits))
|
currentZeroThreshold := math.Float64frombits(atomic.LoadUint64(&hot.nativeHistogramZeroThresholdBits))
|
||||||
if currentZeroThreshold >= h.sparseMaxZeroThreshold {
|
if currentZeroThreshold >= h.nativeHistogramMaxZeroThreshold {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
// Find the key of the bucket closest to zero.
|
// Find the key of the bucket closest to zero.
|
||||||
smallestKey := findSmallestKey(&hot.sparseBucketsPositive)
|
smallestKey := findSmallestKey(&hot.nativeHistogramBucketsPositive)
|
||||||
smallestNegativeKey := findSmallestKey(&hot.sparseBucketsNegative)
|
smallestNegativeKey := findSmallestKey(&hot.nativeHistogramBucketsNegative)
|
||||||
if smallestNegativeKey < smallestKey {
|
if smallestNegativeKey < smallestKey {
|
||||||
smallestKey = smallestNegativeKey
|
smallestKey = smallestNegativeKey
|
||||||
}
|
}
|
||||||
if smallestKey == math.MaxInt32 {
|
if smallestKey == math.MaxInt32 {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
newZeroThreshold := getLe(smallestKey, atomic.LoadInt32(&hot.sparseSchema))
|
newZeroThreshold := getLe(smallestKey, atomic.LoadInt32(&hot.nativeHistogramSchema))
|
||||||
if newZeroThreshold > h.sparseMaxZeroThreshold {
|
if newZeroThreshold > h.nativeHistogramMaxZeroThreshold {
|
||||||
return false // New threshold would exceed the max threshold.
|
return false // New threshold would exceed the max threshold.
|
||||||
}
|
}
|
||||||
atomic.StoreUint64(&cold.sparseZeroThresholdBits, math.Float64bits(newZeroThreshold))
|
atomic.StoreUint64(&cold.nativeHistogramZeroThresholdBits, math.Float64bits(newZeroThreshold))
|
||||||
// Remove applicable buckets.
|
// Remove applicable buckets.
|
||||||
if _, loaded := cold.sparseBucketsNegative.LoadAndDelete(smallestKey); loaded {
|
if _, loaded := cold.nativeHistogramBucketsNegative.LoadAndDelete(smallestKey); loaded {
|
||||||
atomicDecUint32(&cold.sparseBucketsNumber)
|
atomicDecUint32(&cold.nativeHistogramBucketsNumber)
|
||||||
}
|
}
|
||||||
if _, loaded := cold.sparseBucketsPositive.LoadAndDelete(smallestKey); loaded {
|
if _, loaded := cold.nativeHistogramBucketsPositive.LoadAndDelete(smallestKey); loaded {
|
||||||
atomicDecUint32(&cold.sparseBucketsNumber)
|
atomicDecUint32(&cold.nativeHistogramBucketsNumber)
|
||||||
}
|
}
|
||||||
// Make cold counts the new hot counts.
|
// Make cold counts the new hot counts.
|
||||||
n := atomic.AddUint64(&h.countAndHotIdx, 1<<63)
|
n := atomic.AddUint64(&h.countAndHotIdx, 1<<63)
|
||||||
|
@ -903,7 +913,7 @@ func (h *histogram) maybeWidenZeroBucket(hot, cold *histogramCounts) bool {
|
||||||
// Add all the now cold counts to the new hot counts...
|
// Add all the now cold counts to the new hot counts...
|
||||||
addAndResetCounts(hot, cold)
|
addAndResetCounts(hot, cold)
|
||||||
// ...adjust the new zero threshold in the cold counts, too...
|
// ...adjust the new zero threshold in the cold counts, too...
|
||||||
atomic.StoreUint64(&cold.sparseZeroThresholdBits, math.Float64bits(newZeroThreshold))
|
atomic.StoreUint64(&cold.nativeHistogramZeroThresholdBits, math.Float64bits(newZeroThreshold))
|
||||||
// ...and then merge the newly deleted buckets into the wider zero
|
// ...and then merge the newly deleted buckets into the wider zero
|
||||||
// bucket.
|
// bucket.
|
||||||
mergeAndDeleteOrAddAndReset := func(hotBuckets, coldBuckets *sync.Map) func(k, v interface{}) bool {
|
mergeAndDeleteOrAddAndReset := func(hotBuckets, coldBuckets *sync.Map) func(k, v interface{}) bool {
|
||||||
|
@ -912,14 +922,14 @@ func (h *histogram) maybeWidenZeroBucket(hot, cold *histogramCounts) bool {
|
||||||
bucket := v.(*int64)
|
bucket := v.(*int64)
|
||||||
if key == smallestKey {
|
if key == smallestKey {
|
||||||
// Merge into hot zero bucket...
|
// Merge into hot zero bucket...
|
||||||
atomic.AddUint64(&hot.sparseZeroBucket, uint64(atomic.LoadInt64(bucket)))
|
atomic.AddUint64(&hot.nativeHistogramZeroBucket, uint64(atomic.LoadInt64(bucket)))
|
||||||
// ...and delete from cold counts.
|
// ...and delete from cold counts.
|
||||||
coldBuckets.Delete(key)
|
coldBuckets.Delete(key)
|
||||||
atomicDecUint32(&cold.sparseBucketsNumber)
|
atomicDecUint32(&cold.nativeHistogramBucketsNumber)
|
||||||
} else {
|
} else {
|
||||||
// Add to corresponding hot bucket...
|
// Add to corresponding hot bucket...
|
||||||
if addToSparseBucket(hotBuckets, key, atomic.LoadInt64(bucket)) {
|
if addToBucket(hotBuckets, key, atomic.LoadInt64(bucket)) {
|
||||||
atomic.AddUint32(&hot.sparseBucketsNumber, 1)
|
atomic.AddUint32(&hot.nativeHistogramBucketsNumber, 1)
|
||||||
}
|
}
|
||||||
// ...and reset cold bucket.
|
// ...and reset cold bucket.
|
||||||
atomic.StoreInt64(bucket, 0)
|
atomic.StoreInt64(bucket, 0)
|
||||||
|
@ -928,8 +938,8 @@ func (h *histogram) maybeWidenZeroBucket(hot, cold *histogramCounts) bool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cold.sparseBucketsPositive.Range(mergeAndDeleteOrAddAndReset(&hot.sparseBucketsPositive, &cold.sparseBucketsPositive))
|
cold.nativeHistogramBucketsPositive.Range(mergeAndDeleteOrAddAndReset(&hot.nativeHistogramBucketsPositive, &cold.nativeHistogramBucketsPositive))
|
||||||
cold.sparseBucketsNegative.Range(mergeAndDeleteOrAddAndReset(&hot.sparseBucketsNegative, &cold.sparseBucketsNegative))
|
cold.nativeHistogramBucketsNegative.Range(mergeAndDeleteOrAddAndReset(&hot.nativeHistogramBucketsNegative, &cold.nativeHistogramBucketsNegative))
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -938,16 +948,16 @@ func (h *histogram) maybeWidenZeroBucket(hot, cold *histogramCounts) bool {
|
||||||
// bucket count (or even no reduction at all). The method does nothing if the
|
// bucket count (or even no reduction at all). The method does nothing if the
|
||||||
// schema is already -4.
|
// schema is already -4.
|
||||||
func (h *histogram) doubleBucketWidth(hot, cold *histogramCounts) {
|
func (h *histogram) doubleBucketWidth(hot, cold *histogramCounts) {
|
||||||
coldSchema := atomic.LoadInt32(&cold.sparseSchema)
|
coldSchema := atomic.LoadInt32(&cold.nativeHistogramSchema)
|
||||||
if coldSchema == -4 {
|
if coldSchema == -4 {
|
||||||
return // Already at lowest resolution.
|
return // Already at lowest resolution.
|
||||||
}
|
}
|
||||||
coldSchema--
|
coldSchema--
|
||||||
atomic.StoreInt32(&cold.sparseSchema, coldSchema)
|
atomic.StoreInt32(&cold.nativeHistogramSchema, coldSchema)
|
||||||
// Play it simple and just delete all cold buckets.
|
// Play it simple and just delete all cold buckets.
|
||||||
atomic.StoreUint32(&cold.sparseBucketsNumber, 0)
|
atomic.StoreUint32(&cold.nativeHistogramBucketsNumber, 0)
|
||||||
deleteSyncMap(&cold.sparseBucketsNegative)
|
deleteSyncMap(&cold.nativeHistogramBucketsNegative)
|
||||||
deleteSyncMap(&cold.sparseBucketsPositive)
|
deleteSyncMap(&cold.nativeHistogramBucketsPositive)
|
||||||
// Make coldCounts the new hot counts.
|
// Make coldCounts the new hot counts.
|
||||||
n := atomic.AddUint64(&h.countAndHotIdx, 1<<63)
|
n := atomic.AddUint64(&h.countAndHotIdx, 1<<63)
|
||||||
count := n & ((1 << 63) - 1)
|
count := n & ((1 << 63) - 1)
|
||||||
|
@ -958,7 +968,7 @@ func (h *histogram) doubleBucketWidth(hot, cold *histogramCounts) {
|
||||||
// Add all the now cold counts to the new hot counts...
|
// Add all the now cold counts to the new hot counts...
|
||||||
addAndResetCounts(hot, cold)
|
addAndResetCounts(hot, cold)
|
||||||
// ...adjust the schema in the cold counts, too...
|
// ...adjust the schema in the cold counts, too...
|
||||||
atomic.StoreInt32(&cold.sparseSchema, coldSchema)
|
atomic.StoreInt32(&cold.nativeHistogramSchema, coldSchema)
|
||||||
// ...and then merge the cold buckets into the wider hot buckets.
|
// ...and then merge the cold buckets into the wider hot buckets.
|
||||||
merge := func(hotBuckets *sync.Map) func(k, v interface{}) bool {
|
merge := func(hotBuckets *sync.Map) func(k, v interface{}) bool {
|
||||||
return func(k, v interface{}) bool {
|
return func(k, v interface{}) bool {
|
||||||
|
@ -970,33 +980,33 @@ func (h *histogram) doubleBucketWidth(hot, cold *histogramCounts) {
|
||||||
}
|
}
|
||||||
key /= 2
|
key /= 2
|
||||||
// Add to corresponding hot bucket.
|
// Add to corresponding hot bucket.
|
||||||
if addToSparseBucket(hotBuckets, key, atomic.LoadInt64(bucket)) {
|
if addToBucket(hotBuckets, key, atomic.LoadInt64(bucket)) {
|
||||||
atomic.AddUint32(&hot.sparseBucketsNumber, 1)
|
atomic.AddUint32(&hot.nativeHistogramBucketsNumber, 1)
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cold.sparseBucketsPositive.Range(merge(&hot.sparseBucketsPositive))
|
cold.nativeHistogramBucketsPositive.Range(merge(&hot.nativeHistogramBucketsPositive))
|
||||||
cold.sparseBucketsNegative.Range(merge(&hot.sparseBucketsNegative))
|
cold.nativeHistogramBucketsNegative.Range(merge(&hot.nativeHistogramBucketsNegative))
|
||||||
// Play it simple again and just delete all cold buckets.
|
// Play it simple again and just delete all cold buckets.
|
||||||
atomic.StoreUint32(&cold.sparseBucketsNumber, 0)
|
atomic.StoreUint32(&cold.nativeHistogramBucketsNumber, 0)
|
||||||
deleteSyncMap(&cold.sparseBucketsNegative)
|
deleteSyncMap(&cold.nativeHistogramBucketsNegative)
|
||||||
deleteSyncMap(&cold.sparseBucketsPositive)
|
deleteSyncMap(&cold.nativeHistogramBucketsPositive)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *histogram) resetCounts(counts *histogramCounts) {
|
func (h *histogram) resetCounts(counts *histogramCounts) {
|
||||||
atomic.StoreUint64(&counts.sumBits, 0)
|
atomic.StoreUint64(&counts.sumBits, 0)
|
||||||
atomic.StoreUint64(&counts.count, 0)
|
atomic.StoreUint64(&counts.count, 0)
|
||||||
atomic.StoreUint64(&counts.sparseZeroBucket, 0)
|
atomic.StoreUint64(&counts.nativeHistogramZeroBucket, 0)
|
||||||
atomic.StoreUint64(&counts.sparseZeroThresholdBits, math.Float64bits(h.sparseZeroThreshold))
|
atomic.StoreUint64(&counts.nativeHistogramZeroThresholdBits, math.Float64bits(h.nativeHistogramZeroThreshold))
|
||||||
atomic.StoreInt32(&counts.sparseSchema, h.sparseSchema)
|
atomic.StoreInt32(&counts.nativeHistogramSchema, h.nativeHistogramSchema)
|
||||||
atomic.StoreUint32(&counts.sparseBucketsNumber, 0)
|
atomic.StoreUint32(&counts.nativeHistogramBucketsNumber, 0)
|
||||||
for i := range h.upperBounds {
|
for i := range h.upperBounds {
|
||||||
atomic.StoreUint64(&counts.buckets[i], 0)
|
atomic.StoreUint64(&counts.buckets[i], 0)
|
||||||
}
|
}
|
||||||
deleteSyncMap(&counts.sparseBucketsNegative)
|
deleteSyncMap(&counts.nativeHistogramBucketsNegative)
|
||||||
deleteSyncMap(&counts.sparseBucketsPositive)
|
deleteSyncMap(&counts.nativeHistogramBucketsPositive)
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateExemplar replaces the exemplar for the provided bucket. With empty
|
// updateExemplar replaces the exemplar for the provided bucket. With empty
|
||||||
|
@ -1247,13 +1257,13 @@ func (s buckSort) Less(i, j int) bool {
|
||||||
return s[i].GetUpperBound() < s[j].GetUpperBound()
|
return s[i].GetUpperBound() < s[j].GetUpperBound()
|
||||||
}
|
}
|
||||||
|
|
||||||
// pickSparseschema returns the largest number n between -4 and 8 such that
|
// pickSchema returns the largest number n between -4 and 8 such that
|
||||||
// 2^(2^-n) is less or equal the provided bucketFactor.
|
// 2^(2^-n) is less or equal the provided bucketFactor.
|
||||||
//
|
//
|
||||||
// Special cases:
|
// Special cases:
|
||||||
// - bucketFactor <= 1: panics.
|
// - bucketFactor <= 1: panics.
|
||||||
// - bucketFactor < 2^(2^-8) (but > 1): still returns 8.
|
// - bucketFactor < 2^(2^-8) (but > 1): still returns 8.
|
||||||
func pickSparseSchema(bucketFactor float64) int32 {
|
func pickSchema(bucketFactor float64) int32 {
|
||||||
if bucketFactor <= 1 {
|
if bucketFactor <= 1 {
|
||||||
panic(fmt.Errorf("bucketFactor %f is <=1", bucketFactor))
|
panic(fmt.Errorf("bucketFactor %f is <=1", bucketFactor))
|
||||||
}
|
}
|
||||||
|
@ -1268,7 +1278,7 @@ func pickSparseSchema(bucketFactor float64) int32 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeSparseBuckets(buckets *sync.Map) ([]*dto.BucketSpan, []int64) {
|
func makeBuckets(buckets *sync.Map) ([]*dto.BucketSpan, []int64) {
|
||||||
var ii []int
|
var ii []int
|
||||||
buckets.Range(func(k, v interface{}) bool {
|
buckets.Range(func(k, v interface{}) bool {
|
||||||
ii = append(ii, k.(int))
|
ii = append(ii, k.(int))
|
||||||
|
@ -1323,9 +1333,9 @@ func makeSparseBuckets(buckets *sync.Map) ([]*dto.BucketSpan, []int64) {
|
||||||
return spans, deltas
|
return spans, deltas
|
||||||
}
|
}
|
||||||
|
|
||||||
// addToSparseBucket increments the sparse bucket at key by the provided
|
// addToBucket increments the sparse bucket at key by the provided amount. It
|
||||||
// amount. It returns true if a new sparse bucket had to be created for that.
|
// returns true if a new sparse bucket had to be created for that.
|
||||||
func addToSparseBucket(buckets *sync.Map, key int, increment int64) bool {
|
func addToBucket(buckets *sync.Map, key int, increment int64) bool {
|
||||||
if existingBucket, ok := buckets.Load(key); ok {
|
if existingBucket, ok := buckets.Load(key); ok {
|
||||||
// Fast path without allocation.
|
// Fast path without allocation.
|
||||||
atomic.AddInt64(existingBucket.(*int64), increment)
|
atomic.AddInt64(existingBucket.(*int64), increment)
|
||||||
|
@ -1350,7 +1360,7 @@ func addToSparseBucket(buckets *sync.Map, key int, increment int64) bool {
|
||||||
func addAndReset(hotBuckets *sync.Map, bucketNumber *uint32) func(k, v interface{}) bool {
|
func addAndReset(hotBuckets *sync.Map, bucketNumber *uint32) func(k, v interface{}) bool {
|
||||||
return func(k, v interface{}) bool {
|
return func(k, v interface{}) bool {
|
||||||
bucket := v.(*int64)
|
bucket := v.(*int64)
|
||||||
if addToSparseBucket(hotBuckets, k.(int), atomic.LoadInt64(bucket)) {
|
if addToBucket(hotBuckets, k.(int), atomic.LoadInt64(bucket)) {
|
||||||
atomic.AddUint32(bucketNumber, 1)
|
atomic.AddUint32(bucketNumber, 1)
|
||||||
}
|
}
|
||||||
atomic.StoreInt64(bucket, 0)
|
atomic.StoreInt64(bucket, 0)
|
||||||
|
@ -1420,7 +1430,7 @@ func getLe(key int, schema int32) float64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
fracIdx := key & ((1 << schema) - 1)
|
fracIdx := key & ((1 << schema) - 1)
|
||||||
frac := sparseBounds[schema][fracIdx]
|
frac := nativeHistogramBounds[schema][fracIdx]
|
||||||
exp := (key >> schema) + 1
|
exp := (key >> schema) + 1
|
||||||
if frac == 0.5 && exp == 1025 {
|
if frac == 0.5 && exp == 1025 {
|
||||||
// This is the last bucket before the overflow bucket (for ±Inf
|
// This is the last bucket before the overflow bucket (for ±Inf
|
||||||
|
@ -1456,9 +1466,9 @@ func atomicDecUint32(p *uint32) {
|
||||||
atomic.AddUint32(p, ^uint32(0))
|
atomic.AddUint32(p, ^uint32(0))
|
||||||
}
|
}
|
||||||
|
|
||||||
// addAndResetCounts adds certain fields (count, sum, conventional buckets,
|
// addAndResetCounts adds certain fields (count, sum, conventional buckets, zero
|
||||||
// sparse zero bucket) from the cold counts to the corresponding fields in the
|
// bucket) from the cold counts to the corresponding fields in the hot
|
||||||
// hot counts. Those fields are then reset to 0 in the cold counts.
|
// counts. Those fields are then reset to 0 in the cold counts.
|
||||||
func addAndResetCounts(hot, cold *histogramCounts) {
|
func addAndResetCounts(hot, cold *histogramCounts) {
|
||||||
atomic.AddUint64(&hot.count, atomic.LoadUint64(&cold.count))
|
atomic.AddUint64(&hot.count, atomic.LoadUint64(&cold.count))
|
||||||
atomic.StoreUint64(&cold.count, 0)
|
atomic.StoreUint64(&cold.count, 0)
|
||||||
|
@ -1469,6 +1479,6 @@ func addAndResetCounts(hot, cold *histogramCounts) {
|
||||||
atomic.AddUint64(&hot.buckets[i], atomic.LoadUint64(&cold.buckets[i]))
|
atomic.AddUint64(&hot.buckets[i], atomic.LoadUint64(&cold.buckets[i]))
|
||||||
atomic.StoreUint64(&cold.buckets[i], 0)
|
atomic.StoreUint64(&cold.buckets[i], 0)
|
||||||
}
|
}
|
||||||
atomic.AddUint64(&hot.sparseZeroBucket, atomic.LoadUint64(&cold.sparseZeroBucket))
|
atomic.AddUint64(&hot.nativeHistogramZeroBucket, atomic.LoadUint64(&cold.nativeHistogramZeroBucket))
|
||||||
atomic.StoreUint64(&cold.sparseZeroBucket, 0)
|
atomic.StoreUint64(&cold.nativeHistogramZeroBucket, 0)
|
||||||
}
|
}
|
||||||
|
|
|
@ -656,13 +656,13 @@ func TestSparseHistogram(t *testing.T) {
|
||||||
for _, s := range scenarios {
|
for _, s := range scenarios {
|
||||||
t.Run(s.name, func(t *testing.T) {
|
t.Run(s.name, func(t *testing.T) {
|
||||||
his := NewHistogram(HistogramOpts{
|
his := NewHistogram(HistogramOpts{
|
||||||
Name: "name",
|
Name: "name",
|
||||||
Help: "help",
|
Help: "help",
|
||||||
SparseBucketsFactor: s.factor,
|
NativeHistogramBucketFactor: s.factor,
|
||||||
SparseBucketsZeroThreshold: s.zeroThreshold,
|
NativeHistogramZeroThreshold: s.zeroThreshold,
|
||||||
SparseBucketsMaxNumber: s.maxBuckets,
|
NativeHistogramMaxBucketNumber: s.maxBuckets,
|
||||||
SparseBucketsMinResetDuration: s.minResetDuration,
|
NativeHistogramMinResetDuration: s.minResetDuration,
|
||||||
SparseBucketsMaxZeroThreshold: s.maxZeroThreshold,
|
NativeHistogramMaxZeroThreshold: s.maxZeroThreshold,
|
||||||
})
|
})
|
||||||
ts := time.Now().Add(30 * time.Second)
|
ts := time.Now().Add(30 * time.Second)
|
||||||
now := func() time.Time {
|
now := func() time.Time {
|
||||||
|
@ -702,13 +702,13 @@ func TestSparseHistogramConcurrency(t *testing.T) {
|
||||||
end.Add(concLevel)
|
end.Add(concLevel)
|
||||||
|
|
||||||
his := NewHistogram(HistogramOpts{
|
his := NewHistogram(HistogramOpts{
|
||||||
Name: "test_sparse_histogram",
|
Name: "test_sparse_histogram",
|
||||||
Help: "This help is sparse.",
|
Help: "This help is sparse.",
|
||||||
SparseBucketsFactor: 1.05,
|
NativeHistogramBucketFactor: 1.05,
|
||||||
SparseBucketsZeroThreshold: 0.0000001,
|
NativeHistogramZeroThreshold: 0.0000001,
|
||||||
SparseBucketsMaxNumber: 50,
|
NativeHistogramMaxBucketNumber: 50,
|
||||||
SparseBucketsMinResetDuration: time.Hour, // Comment out to test for totals below.
|
NativeHistogramMinResetDuration: time.Hour, // Comment out to test for totals below.
|
||||||
SparseBucketsMaxZeroThreshold: 0.001,
|
NativeHistogramMaxZeroThreshold: 0.001,
|
||||||
})
|
})
|
||||||
|
|
||||||
ts := time.Now().Add(30 * time.Second).Unix()
|
ts := time.Now().Add(30 * time.Second).Unix()
|
||||||
|
|
Loading…
Reference in New Issue