client_golang/prometheus/summary.go

428 lines
12 KiB
Go
Raw Normal View History

// Copyright 2014 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prometheus
import (
"fmt"
"hash/fnv"
"sort"
"sync"
"time"
"code.google.com/p/goprotobuf/proto"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/client_golang/_vendor/perks/quantile"
)
// A Summary captures individual observations from an event or sample stream and
// summarizes them in a manner similar to traditional summary statistics: 1. sum
// of observations, 2. observation count, 3. rank estimations.
//
// A typical use-case is the observation of request latencies. By default, a
// Summary provides the median, the 90th and the 99th percentile of the latency
// as rank estimations.
//
// To create Summary instances, use NewSummary.
type Summary interface {
Metric
Collector
// Observe adds a single observation to the summary.
Observe(float64)
}
// DefObjectives are the default Summary quantile values.
var (
DefObjectives = []float64{0.5, 0.9, 0.99}
)
// Default values for SummaryOpts.
const (
// DefMaxAge is the default duration for which observations stay
// relevant.
DefMaxAge time.Duration = 10 * time.Minute
// DefAgeBuckets is the default number of buckets used to calculate the
// age of observations.
DefAgeBuckets = 10
// DefBufCap is the standard buffer size for collecting Summary observations.
DefBufCap = 500
// DefEpsilon is the default error epsilon for the quantile rank estimates.
DefEpsilon = 0.001
)
// SummaryOpts bundles the options for creating a Summary metric. It is
// mandatory to set Name and Help to a non-empty string. All other fields are
// optional and can safely be left at their zero value.
type SummaryOpts struct {
// Namespace, Subsystem, and Name are components of the fully-qualified
// name of the Summary (created by joining these components with
// "_"). Only Name is mandatory, the others merely help structuring the
// name. Note that the fully-qualified name of the Summary must be a
// valid Prometheus metric name.
Namespace string
Subsystem string
Name string
// Help provides information about this Summary. Mandatory!
//
// Metrics with the same fully-qualified name must have the same Help
// string.
Help string
// ConstLabels are used to attach fixed labels to this
// Summary. Summaries with the same fully-qualified name must have the
// same label names in their ConstLabels.
//
// Note that in most cases, labels have a value that varies during the
// lifetime of a process. Those labels are usually managed with a
// SummaryVec. ConstLabels serve only special purposes. One is for the
// special case where the value of a label does not change during the
// lifetime of a process, e.g. if the revision of the running binary is
// put into a label. Another, more advanced purpose is if more than one
// Collector needs to collect Summaries with the same fully-qualified
// name. In that case, those Summaries must differ in the values of
// their ConstLabels. See the Collector examples.
//
// If the value of a label never changes (not even between binaries),
// that label most likely should not be a label at all (but part of the
// metric name).
ConstLabels Labels
// Objectives defines the quantile rank estimates. The default value is
// DefObjectives.
Objectives []float64
// MaxAge defines the duration for which an observation stays relevant
// for the summary. Must be positive. The default value is DefMaxAge.
MaxAge time.Duration
// AgeBuckets is the number of buckets used to exclude observations that
// are older than MaxAge from the summary. A higher number has a
// resource penalty, so only increase it if the higher resolution is
// really required. The default value is DefAgeBuckets.
AgeBuckets uint32
// BufCap defines the default sample stream buffer size. The default
// value of DefBufCap should suffice for most uses. If there is a need
// to increase the value, a multiple of 500 is recommended (because that
// is the internal buffer size of the underlying package
// "github.com/bmizerany/perks/quantile").
BufCap uint32
// Epsilon is the error epsilon for the quantile rank estimate. Must be
// positive. The default is DefEpsilon.
Epsilon float64
}
// NewSummary creates a new Summary based on the provided SummaryOpts.
func NewSummary(opts SummaryOpts) Summary {
return newSummary(
NewDesc(
BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
opts.Help,
nil,
opts.ConstLabels,
),
opts,
)
}
func newSummary(desc *Desc, opts SummaryOpts, labelValues ...string) Summary {
if len(desc.variableLabels) != len(labelValues) {
panic(errInconsistentCardinality)
}
if len(opts.Objectives) == 0 {
opts.Objectives = DefObjectives
}
if opts.MaxAge < 0 {
panic(fmt.Errorf("illegal max age MaxAge=%v", opts.MaxAge))
}
if opts.MaxAge == 0 {
opts.MaxAge = DefMaxAge
}
if opts.AgeBuckets == 0 {
opts.AgeBuckets = DefAgeBuckets
}
if opts.BufCap == 0 {
opts.BufCap = DefBufCap
}
if opts.Epsilon < 0 {
panic(fmt.Errorf("illegal value for Epsilon=%f", opts.Epsilon))
}
if opts.Epsilon == 0. {
opts.Epsilon = DefEpsilon
}
s := &summary{
desc: desc,
objectives: opts.Objectives,
epsilon: opts.Epsilon,
labelPairs: makeLabelPairs(desc, labelValues),
hotBuf: make([]float64, 0, opts.BufCap),
coldBuf: make([]float64, 0, opts.BufCap),
streamDuration: opts.MaxAge / time.Duration(opts.AgeBuckets),
}
s.mergedTailStreams = s.newStream()
s.mergedAllStreams = s.newStream()
s.headStreamExpTime = time.Now().Add(s.streamDuration)
s.hotBufExpTime = s.headStreamExpTime
for i := uint32(0); i < opts.AgeBuckets; i++ {
s.streams = append(s.streams, s.newStream())
}
s.headStream = s.streams[0]
s.Init(s) // Init self-collection.
return s
}
type summary struct {
SelfCollector
bufMtx sync.Mutex // Protects hotBuf and hotBufExpTime.
mtx sync.Mutex // Protects every other moving part.
// Lock bufMtx before mtx if both are needed.
desc *Desc
objectives []float64
epsilon float64
labelPairs []*dto.LabelPair
sum float64
cnt uint64
hotBuf, coldBuf []float64
streams []*quantile.Stream
streamDuration time.Duration
headStreamIdx int
headStreamExpTime, hotBufExpTime time.Time
headStream, mergedTailStreams, mergedAllStreams *quantile.Stream
}
func (s *summary) Desc() *Desc {
return s.desc
}
func (s *summary) Observe(v float64) {
s.bufMtx.Lock()
defer s.bufMtx.Unlock()
now := time.Now()
if now.After(s.hotBufExpTime) {
s.asyncFlush(now)
}
s.hotBuf = append(s.hotBuf, v)
if len(s.hotBuf) == cap(s.hotBuf) {
s.asyncFlush(now)
}
}
Allow error reporting during metrics collection and simplify Register(). Both are interface changes I want to get in before public announcement. They only break rare usage cases, and are always easy to fix, but still we want to avoid breaking changes after a wider announcement of the project. The change of Register() simply removes the return of the Collector, which nobody was using in practice. It was just bloating the call syntax. Note that this is different from RegisterOrGet(), which is used at various occasions where you want to register something that might or might not be registered already, but if it is, you want the previously registered Collector back (because that's the relevant one). WRT error reporting: I first tried the obvious way of letting the Collector methods Describe() and Collect() return error. However, I had to conclude that that bloated _many_ calls and their handling in very obnoxious ways. On the other hand, the case where you actually want to report errors during registration or collection is very rare. Hence, this approach has the wrong trade-off. The approach taken here might at first appear clunky but is in practice quite handy, mostly because there is almost no change for the "normal" case of "no special error handling", but also because it plays well with the way descriptors and metrics are handled (via channels). Explaining the approach in more detail: - During registration / describe: Error handling was actually already in place (for invalid descriptors, which carry an error anyway). I only added a convenience function to create an invalid descriptor with a given error on purpose. - Metrics are now treated in a similar way. The Write method returns an error now (the only change in interface). An "invalid metric" is provided that can be sent via the channel to signal that that metric could not be collected. It alse transports an error. NON-GOALS OF THIS COMMIT: This is NOT yet the major improvement of the whole registry part, where we want a public Registry interface and plenty of modular configurations (for error handling, various auto-metrics, http instrumentation, testing, ...). However, we can do that whole thing without breaking existing interfaces. For now (which is a significant issue) any error during collection will either cause a 500 HTTP response or a panic (depending on registry config). Later, we definitely want to have a possibility to skip (and only report somehow) non-collectible metrics instead of aborting the whole scrape.
2015-01-12 21:16:09 +03:00
func (s *summary) Write(out *dto.Metric) error {
sum := &dto.Summary{}
qs := make([]*dto.Quantile, 0, len(s.objectives))
s.bufMtx.Lock()
s.mtx.Lock()
if len(s.hotBuf) != 0 {
s.swapBufs(time.Now())
}
s.bufMtx.Unlock()
s.flushColdBuf()
s.mergedAllStreams.Merge(s.mergedTailStreams.Samples())
s.mergedAllStreams.Merge(s.headStream.Samples())
sum.SampleCount = proto.Uint64(s.cnt)
sum.SampleSum = proto.Float64(s.sum)
for _, rank := range s.objectives {
qs = append(qs, &dto.Quantile{
Quantile: proto.Float64(rank),
Value: proto.Float64(s.mergedAllStreams.Query(rank)),
})
}
s.mergedAllStreams.Reset()
s.mtx.Unlock()
if len(qs) > 0 {
sort.Sort(quantSort(qs))
}
sum.Quantile = qs
out.Summary = sum
out.Label = s.labelPairs
Allow error reporting during metrics collection and simplify Register(). Both are interface changes I want to get in before public announcement. They only break rare usage cases, and are always easy to fix, but still we want to avoid breaking changes after a wider announcement of the project. The change of Register() simply removes the return of the Collector, which nobody was using in practice. It was just bloating the call syntax. Note that this is different from RegisterOrGet(), which is used at various occasions where you want to register something that might or might not be registered already, but if it is, you want the previously registered Collector back (because that's the relevant one). WRT error reporting: I first tried the obvious way of letting the Collector methods Describe() and Collect() return error. However, I had to conclude that that bloated _many_ calls and their handling in very obnoxious ways. On the other hand, the case where you actually want to report errors during registration or collection is very rare. Hence, this approach has the wrong trade-off. The approach taken here might at first appear clunky but is in practice quite handy, mostly because there is almost no change for the "normal" case of "no special error handling", but also because it plays well with the way descriptors and metrics are handled (via channels). Explaining the approach in more detail: - During registration / describe: Error handling was actually already in place (for invalid descriptors, which carry an error anyway). I only added a convenience function to create an invalid descriptor with a given error on purpose. - Metrics are now treated in a similar way. The Write method returns an error now (the only change in interface). An "invalid metric" is provided that can be sent via the channel to signal that that metric could not be collected. It alse transports an error. NON-GOALS OF THIS COMMIT: This is NOT yet the major improvement of the whole registry part, where we want a public Registry interface and plenty of modular configurations (for error handling, various auto-metrics, http instrumentation, testing, ...). However, we can do that whole thing without breaking existing interfaces. For now (which is a significant issue) any error during collection will either cause a 500 HTTP response or a panic (depending on registry config). Later, we definitely want to have a possibility to skip (and only report somehow) non-collectible metrics instead of aborting the whole scrape.
2015-01-12 21:16:09 +03:00
return nil
}
func (s *summary) newStream() *quantile.Stream {
stream := quantile.NewTargeted(s.objectives...)
stream.SetEpsilon(s.epsilon)
return stream
}
// asyncFlush needs bufMtx locked.
func (s *summary) asyncFlush(now time.Time) {
s.mtx.Lock()
s.swapBufs(now)
// Unblock the original goroutine that was responsible for the mutation
// that triggered the compaction. But hold onto the global non-buffer
// state mutex until the operation finishes.
go func() {
s.flushColdBuf()
s.mtx.Unlock()
}()
}
// rotateStreams needs mtx AND bufMtx locked.
func (s *summary) maybeRotateStreams() {
if s.hotBufExpTime.Equal(s.headStreamExpTime) {
// Fast return to avoid re-merging s.mergedTailStreams.
return
}
for !s.hotBufExpTime.Equal(s.headStreamExpTime) {
s.headStreamIdx++
if s.headStreamIdx >= len(s.streams) {
s.headStreamIdx = 0
}
s.headStream = s.streams[s.headStreamIdx]
s.headStream.Reset()
s.headStreamExpTime = s.headStreamExpTime.Add(s.streamDuration)
}
s.mergedTailStreams.Reset()
for _, stream := range s.streams {
if stream != s.headStream {
s.mergedTailStreams.Merge(stream.Samples())
}
}
}
// flushColdBuf needs mtx locked.
func (s *summary) flushColdBuf() {
for _, v := range s.coldBuf {
s.headStream.Insert(v)
s.cnt++
s.sum += v
}
s.coldBuf = s.coldBuf[0:0]
s.maybeRotateStreams()
}
// swapBufs needs mtx AND bufMtx locked, coldBuf must be empty.
func (s *summary) swapBufs(now time.Time) {
s.hotBuf, s.coldBuf = s.coldBuf, s.hotBuf
// hotBuf is now empty and gets new expiration set.
for now.After(s.hotBufExpTime) {
s.hotBufExpTime = s.hotBufExpTime.Add(s.streamDuration)
}
}
type quantSort []*dto.Quantile
func (s quantSort) Len() int {
return len(s)
}
func (s quantSort) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s quantSort) Less(i, j int) bool {
return s[i].GetQuantile() < s[j].GetQuantile()
}
// SummaryVec is a Collector that bundles a set of Summaries that all share the
// same Desc, but have different values for their variable labels. This is used
// if you want to count the same thing partitioned by various dimensions
// (e.g. http request latencies, partitioned by status code and method). Create
// instances with NewSummaryVec.
type SummaryVec struct {
MetricVec
}
// NewSummaryVec creates a new SummaryVec based on the provided SummaryOpts and
// partitioned by the given label names. At least one label name must be
// provided.
func NewSummaryVec(opts SummaryOpts, labelNames []string) *SummaryVec {
desc := NewDesc(
BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
opts.Help,
labelNames,
opts.ConstLabels,
)
return &SummaryVec{
MetricVec: MetricVec{
children: map[uint64]Metric{},
desc: desc,
hash: fnv.New64a(),
newMetric: func(lvs ...string) Metric {
return newSummary(desc, opts, lvs...)
},
},
}
}
// GetMetricWithLabelValues replaces the method of the same name in
// MetricVec. The difference is that this method returns a Summary and not a
// Metric so that no type conversion is required.
func (m *SummaryVec) GetMetricWithLabelValues(lvs ...string) (Summary, error) {
metric, err := m.MetricVec.GetMetricWithLabelValues(lvs...)
if metric != nil {
return metric.(Summary), err
}
return nil, err
}
// GetMetricWith replaces the method of the same name in MetricVec. The
// difference is that this method returns a Summary and not a Metric so that no
// type conversion is required.
func (m *SummaryVec) GetMetricWith(labels Labels) (Summary, error) {
metric, err := m.MetricVec.GetMetricWith(labels)
if metric != nil {
return metric.(Summary), err
}
return nil, err
}
// WithLabelValues works as GetMetricWithLabelValues, but panics where
// GetMetricWithLabelValues would have returned an error. By not returning an
// error, WithLabelValues allows shortcuts like
// myVec.WithLabelValues("404", "GET").Add(42)
func (m *SummaryVec) WithLabelValues(lvs ...string) Summary {
return m.MetricVec.WithLabelValues(lvs...).(Summary)
}
// With works as GetMetricWith, but panics where GetMetricWithLabels would have
// returned an error. By not returning an error, With allows shortcuts like
// myVec.With(Labels{"code": "404", "method": "GET"}).Add(42)
func (m *SummaryVec) With(labels Labels) Summary {
return m.MetricVec.With(labels).(Summary)
}