client_golang/prometheus/histogram.go

404 lines
11 KiB
Go

// Copyright (c) 2013, Prometheus Team
// All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package prometheus
import (
"bytes"
"encoding/json"
"fmt"
"math"
"strconv"
"sync"
"time"
dto "github.com/prometheus/client_model/go"
"code.google.com/p/goprotobuf/proto"
"github.com/prometheus/client_golang/model"
)
// This generates count-buckets of equal size distributed along the open
// interval of lower to upper. For instance, {lower=0, upper=10, count=5}
// yields the following: [0, 2, 4, 6, 8].
func EquallySizedBucketsFor(lower, upper float64, count int) []float64 {
buckets := make([]float64, count)
partitionSize := (upper - lower) / float64(count)
for i := 0; i < count; i++ {
m := float64(i)
buckets[i] = lower + (m * partitionSize)
}
return buckets
}
// This generates log2-sized buckets spanning from lower to upper inclusively
// as well as values beyond it.
func LogarithmicSizedBucketsFor(lower, upper float64) []float64 {
bucketCount := int(math.Ceil(math.Log2(upper)))
buckets := make([]float64, bucketCount)
for i, j := 0, 0.0; i < bucketCount; i, j = i+1, math.Pow(2, float64(i+1.0)) {
buckets[i] = j
}
return buckets
}
// A HistogramSpecification defines how a Histogram is to be built.
type HistogramSpecification struct {
BucketBuilder BucketBuilder
ReportablePercentiles []float64
Starts []float64
PurgeInterval time.Duration
}
type Histogram interface {
Metric
Add(labels map[string]string, value float64)
}
// The histogram is an accumulator for samples. It merely routes into which
// bucket to capture an event and provides a percentile calculation mechanism.
type histogram struct {
bucketMaker BucketBuilder
// This represents the open interval's start at which values shall be added to
// the bucket. The interval continues until the beginning of the next bucket
// exclusive or positive infinity.
//
// N.B.
// - bucketStarts should be sorted in ascending order;
// - len(bucketStarts) must be equivalent to len(buckets);
// - The index of a given bucketStarts' element is presumed to
// correspond to the appropriate element in buckets.
bucketStarts []float64
mutex sync.RWMutex
// These are the buckets that capture samples as they are emitted to the
// histogram. Please consult the reference interface and its implements for
// further details about behavior expectations.
values map[uint64]*histogramVector
// These are the percentile values that will be reported on marshalling.
reportablePercentiles []float64
purgeInterval time.Duration
lastPurge time.Time
}
type histogramVector struct {
buckets []Bucket
labels map[string]string
sum float64
count uint64
}
func (h *histogram) Add(labels map[string]string, value float64) {
if labels == nil {
labels = blankLabelsSingleton
}
signature := model.LabelValuesToSignature(labels)
var histogram *histogramVector = nil
h.mutex.Lock()
defer h.mutex.Unlock()
if original, ok := h.values[signature]; ok {
histogram = original
} else {
bucketCount := len(h.bucketStarts)
histogram = &histogramVector{
buckets: make([]Bucket, bucketCount),
labels: labels,
}
for i := 0; i < bucketCount; i++ {
histogram.buckets[i] = h.bucketMaker()
}
h.values[signature] = histogram
}
lastIndex := 0
for i, bucketStart := range h.bucketStarts {
if value < bucketStart {
break
}
lastIndex = i
}
histogram.buckets[lastIndex].Add(value)
histogram.sum += value
histogram.count++
}
func (h *histogram) String() string {
h.mutex.RLock()
defer h.mutex.RUnlock()
stringBuffer := &bytes.Buffer{}
stringBuffer.WriteString("[Histogram { ")
for _, histogram := range h.values {
fmt.Fprintf(stringBuffer, "Labels: %s ", histogram.labels)
for i, bucketStart := range h.bucketStarts {
bucket := histogram.buckets[i]
fmt.Fprintf(stringBuffer, "[%f, inf) = %s, ", bucketStart, bucket)
}
}
stringBuffer.WriteString("}]")
return stringBuffer.String()
}
// Determine the number of previous observations up to a given index.
func previousCumulativeObservations(cumulativeObservations []int, bucketIndex int) int {
if bucketIndex == 0 {
return 0
}
return cumulativeObservations[bucketIndex-1]
}
// Determine the index for an element given a percentage of length.
func prospectiveIndexForPercentile(percentile float64, totalObservations int) int {
return int(percentile * float64(totalObservations-1))
}
// Determine the next bucket element when interim bucket intervals may be empty.
func (h histogram) nextNonEmptyBucketElement(signature uint64, currentIndex, bucketCount int, observationsByBucket []int) (*Bucket, int) {
for i := currentIndex; i < bucketCount; i++ {
if observationsByBucket[i] == 0 {
continue
}
histogram := h.values[signature]
return &histogram.buckets[i], 0
}
panic("Illegal Condition: There were no remaining buckets to provide a value.")
}
// Find what bucket and element index contains a given percentile value.
// If a percentile is requested that results in a corresponding index that is no
// longer contained by the bucket, the index of the last item is returned. This
// may occur if the underlying bucket catalogs values and employs an eviction
// strategy.
func (h histogram) bucketForPercentile(signature uint64, percentile float64) (*Bucket, int) {
bucketCount := len(h.bucketStarts)
// This captures the quantity of samples in a given bucket's range.
observationsByBucket := make([]int, bucketCount)
// This captures the cumulative quantity of observations from all preceding
// buckets up and to the end of this bucket.
cumulativeObservationsByBucket := make([]int, bucketCount)
totalObservations := 0
histogram := h.values[signature]
for i, bucket := range histogram.buckets {
observations := bucket.Observations()
observationsByBucket[i] = observations
totalObservations += bucket.Observations()
cumulativeObservationsByBucket[i] = totalObservations
}
// This captures the index offset where the given percentile value would be
// were all submitted samples stored and never down-/re-sampled nor deleted
// and housed in a singular array.
prospectiveIndex := prospectiveIndexForPercentile(percentile, totalObservations)
for i, cumulativeObservation := range cumulativeObservationsByBucket {
if cumulativeObservation == 0 {
continue
}
// Find the bucket that contains the given index.
if cumulativeObservation >= prospectiveIndex {
var subIndex int
// This calculates the index within the current bucket where the given
// percentile may be found.
subIndex = prospectiveIndex - previousCumulativeObservations(cumulativeObservationsByBucket, i)
// Sometimes the index may be the last item, in which case we need to
// take this into account.
if observationsByBucket[i] == subIndex {
return h.nextNonEmptyBucketElement(signature, i+1, bucketCount, observationsByBucket)
}
return &histogram.buckets[i], subIndex
}
}
return &histogram.buckets[0], 0
}
// Return the histogram's estimate of the value for a given percentile of
// collected samples. The requested percentile is expected to be a real
// value within (0, 1.0].
func (h histogram) percentile(signature uint64, percentile float64) float64 {
bucket, index := h.bucketForPercentile(signature, percentile)
return (*bucket).ValueForIndex(index)
}
func formatFloat(value float64) string {
return strconv.FormatFloat(value, floatFormat, floatPrecision, floatBitCount)
}
func (h *histogram) MarshalJSON() ([]byte, error) {
h.Purge()
h.mutex.RLock()
defer h.mutex.RUnlock()
values := make([]map[string]interface{}, 0, len(h.values))
for signature, value := range h.values {
percentiles := make(map[string]float64, len(h.reportablePercentiles))
for _, percentile := range h.reportablePercentiles {
formatted := formatFloat(percentile)
percentiles[formatted] = h.percentile(signature, percentile)
}
values = append(values, map[string]interface{}{
labelsKey: value.labels,
valueKey: percentiles,
})
}
return json.Marshal(map[string]interface{}{
typeKey: histogramTypeValue,
valueKey: values,
})
}
func (h *histogram) Purge() {
if h.purgeInterval == 0 {
return
}
h.mutex.Lock()
defer h.mutex.Unlock()
if time.Since(h.lastPurge) < h.purgeInterval {
return
}
h.resetAll()
h.lastPurge = time.Now()
}
func (h *histogram) Reset(labels map[string]string) {
signature := model.LabelValuesToSignature(labels)
h.mutex.Lock()
defer h.mutex.Unlock()
value, ok := h.values[signature]
if !ok {
return
}
for _, bucket := range value.buckets {
bucket.Reset()
}
delete(h.values, signature)
}
func (h *histogram) ResetAll() {
h.mutex.Lock()
defer h.mutex.Unlock()
h.resetAll()
}
func (h *histogram) resetAll() {
for signature, value := range h.values {
for _, bucket := range value.buckets {
bucket.Reset()
}
delete(h.values, signature)
}
}
// Produce a histogram from a given specification.
func NewHistogram(specification *HistogramSpecification) Histogram {
metric := &histogram{
bucketMaker: specification.BucketBuilder,
bucketStarts: specification.Starts,
reportablePercentiles: specification.ReportablePercentiles,
values: map[uint64]*histogramVector{},
lastPurge: time.Now(),
purgeInterval: specification.PurgeInterval,
}
return metric
}
// Furnish a Histogram with unsensible default values and behaviors that is
// strictly useful for prototyping purposes.
func NewDefaultHistogram() Histogram {
return NewHistogram(
&HistogramSpecification{
Starts: LogarithmicSizedBucketsFor(0, 4096),
BucketBuilder: AccumulatingBucketBuilder(EvictAndReplaceWith(10, AverageReducer), 50),
ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99},
PurgeInterval: 15 * time.Minute,
},
)
}
func (metric *histogram) dumpChildren(f *dto.MetricFamily) {
metric.Purge()
metric.mutex.RLock()
defer metric.mutex.RUnlock()
f.Type = dto.MetricType_SUMMARY.Enum()
for signature, child := range metric.values {
c := &dto.Summary{
SampleSum: proto.Float64(child.sum),
SampleCount: proto.Uint64(child.count),
}
m := &dto.Metric{
Summary: c,
}
for name, value := range child.labels {
p := &dto.LabelPair{
Name: proto.String(name),
Value: proto.String(value),
}
m.Label = append(m.Label, p)
}
for _, percentile := range metric.reportablePercentiles {
q := &dto.Quantile{
Quantile: proto.Float64(percentile),
Value: proto.Float64(metric.percentile(signature, percentile)),
}
c.Quantile = append(c.Quantile, q)
}
f.Metric = append(f.Metric, m)
}
}