Merge pull request #521 from prometheus/beorn7/summary
Lock-free summaries without objectives
This commit is contained in:
commit
cf7da0f496
|
@ -204,8 +204,8 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Finally we know the final length of h.upperBounds and can make counts
|
// Finally we know the final length of h.upperBounds and can make buckets
|
||||||
// for both states:
|
// for both counts:
|
||||||
h.counts[0].buckets = make([]uint64, len(h.upperBounds))
|
h.counts[0].buckets = make([]uint64, len(h.upperBounds))
|
||||||
h.counts[1].buckets = make([]uint64, len(h.upperBounds))
|
h.counts[1].buckets = make([]uint64, len(h.upperBounds))
|
||||||
|
|
||||||
|
|
|
@ -34,7 +34,6 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
contentTypeHeader = "Content-Type"
|
contentTypeHeader = "Content-Type"
|
||||||
contentLengthHeader = "Content-Length"
|
|
||||||
contentEncodingHeader = "Content-Encoding"
|
contentEncodingHeader = "Content-Encoding"
|
||||||
acceptEncodingHeader = "Accept-Encoding"
|
acceptEncodingHeader = "Accept-Encoding"
|
||||||
)
|
)
|
||||||
|
|
|
@ -38,7 +38,6 @@ type delegator interface {
|
||||||
type responseWriterDelegator struct {
|
type responseWriterDelegator struct {
|
||||||
http.ResponseWriter
|
http.ResponseWriter
|
||||||
|
|
||||||
handler, method string
|
|
||||||
status int
|
status int
|
||||||
written int64
|
written int64
|
||||||
wroteHeader bool
|
wroteHeader bool
|
||||||
|
|
|
@ -47,7 +47,6 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
contentTypeHeader = "Content-Type"
|
contentTypeHeader = "Content-Type"
|
||||||
contentLengthHeader = "Content-Length"
|
|
||||||
contentEncodingHeader = "Content-Encoding"
|
contentEncodingHeader = "Content-Encoding"
|
||||||
acceptEncodingHeader = "Accept-Encoding"
|
acceptEncodingHeader = "Accept-Encoding"
|
||||||
)
|
)
|
||||||
|
|
|
@ -16,8 +16,10 @@ package prometheus
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/beorn7/perks/quantile"
|
"github.com/beorn7/perks/quantile"
|
||||||
|
@ -214,6 +216,17 @@ func newSummary(desc *Desc, opts SummaryOpts, labelValues ...string) Summary {
|
||||||
opts.BufCap = DefBufCap
|
opts.BufCap = DefBufCap
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(opts.Objectives) == 0 {
|
||||||
|
// Use the lock-free implementation of a Summary without objectives.
|
||||||
|
s := &noObjectivesSummary{
|
||||||
|
desc: desc,
|
||||||
|
labelPairs: makeLabelPairs(desc, labelValues),
|
||||||
|
counts: [2]*summaryCounts{&summaryCounts{}, &summaryCounts{}},
|
||||||
|
}
|
||||||
|
s.init(s) // Init self-collection.
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
s := &summary{
|
s := &summary{
|
||||||
desc: desc,
|
desc: desc,
|
||||||
|
|
||||||
|
@ -382,6 +395,142 @@ func (s *summary) swapBufs(now time.Time) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type summaryCounts struct {
|
||||||
|
// sumBits contains the bits of the float64 representing the sum of all
|
||||||
|
// observations. sumBits and count have to go first in the struct to
|
||||||
|
// guarantee alignment for atomic operations.
|
||||||
|
// http://golang.org/pkg/sync/atomic/#pkg-note-BUG
|
||||||
|
sumBits uint64
|
||||||
|
count uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type noObjectivesSummary struct {
|
||||||
|
// countAndHotIdx is a complicated one. For lock-free yet atomic
|
||||||
|
// observations, we need to save the total count of observations again,
|
||||||
|
// combined with the index of the currently-hot counts struct, so that
|
||||||
|
// we can perform the operation on both values atomically. The least
|
||||||
|
// significant bit defines the hot counts struct. The remaining 63 bits
|
||||||
|
// represent the total count of observations. This happens under the
|
||||||
|
// assumption that the 63bit count will never overflow. Rationale: An
|
||||||
|
// observations takes about 30ns. Let's assume it could happen in
|
||||||
|
// 10ns. Overflowing the counter will then take at least (2^63)*10ns,
|
||||||
|
// which is about 3000 years.
|
||||||
|
//
|
||||||
|
// This has to be first in the struct for 64bit alignment. See
|
||||||
|
// http://golang.org/pkg/sync/atomic/#pkg-note-BUG
|
||||||
|
countAndHotIdx uint64
|
||||||
|
|
||||||
|
selfCollector
|
||||||
|
desc *Desc
|
||||||
|
writeMtx sync.Mutex // Only used in the Write method.
|
||||||
|
|
||||||
|
// Two counts, one is "hot" for lock-free observations, the other is
|
||||||
|
// "cold" for writing out a dto.Metric. It has to be an array of
|
||||||
|
// pointers to guarantee 64bit alignment of the histogramCounts, see
|
||||||
|
// http://golang.org/pkg/sync/atomic/#pkg-note-BUG.
|
||||||
|
counts [2]*summaryCounts
|
||||||
|
hotIdx int // Index of currently-hot counts. Only used within Write.
|
||||||
|
|
||||||
|
labelPairs []*dto.LabelPair
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *noObjectivesSummary) Desc() *Desc {
|
||||||
|
return s.desc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *noObjectivesSummary) Observe(v float64) {
|
||||||
|
// We increment s.countAndHotIdx by 2 so that the counter in the upper
|
||||||
|
// 63 bits gets incremented by 1. At the same time, we get the new value
|
||||||
|
// back, which we can use to find the currently-hot counts.
|
||||||
|
n := atomic.AddUint64(&s.countAndHotIdx, 2)
|
||||||
|
hotCounts := s.counts[n%2]
|
||||||
|
|
||||||
|
for {
|
||||||
|
oldBits := atomic.LoadUint64(&hotCounts.sumBits)
|
||||||
|
newBits := math.Float64bits(math.Float64frombits(oldBits) + v)
|
||||||
|
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Increment count last as we take it as a signal that the observation
|
||||||
|
// is complete.
|
||||||
|
atomic.AddUint64(&hotCounts.count, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *noObjectivesSummary) Write(out *dto.Metric) error {
|
||||||
|
var (
|
||||||
|
sum = &dto.Summary{}
|
||||||
|
hotCounts, coldCounts *summaryCounts
|
||||||
|
count uint64
|
||||||
|
)
|
||||||
|
|
||||||
|
// For simplicity, we mutex the rest of this method. It is not in the
|
||||||
|
// hot path, i.e. Observe is called much more often than Write. The
|
||||||
|
// complication of making Write lock-free isn't worth it.
|
||||||
|
s.writeMtx.Lock()
|
||||||
|
defer s.writeMtx.Unlock()
|
||||||
|
|
||||||
|
// This is a bit arcane, which is why the following spells out this if
|
||||||
|
// clause in English:
|
||||||
|
//
|
||||||
|
// If the currently-hot counts struct is #0, we atomically increment
|
||||||
|
// s.countAndHotIdx by 1 so that from now on Observe will use the counts
|
||||||
|
// struct #1. Furthermore, the atomic increment gives us the new value,
|
||||||
|
// which, in its most significant 63 bits, tells us the count of
|
||||||
|
// observations done so far up to and including currently ongoing
|
||||||
|
// observations still using the counts struct just changed from hot to
|
||||||
|
// cold. To have a normal uint64 for the count, we bitshift by 1 and
|
||||||
|
// save the result in count. We also set s.hotIdx to 1 for the next
|
||||||
|
// Write call, and we will refer to counts #1 as hotCounts and to counts
|
||||||
|
// #0 as coldCounts.
|
||||||
|
//
|
||||||
|
// If the currently-hot counts struct is #1, we do the corresponding
|
||||||
|
// things the other way round. We have to _decrement_ s.countAndHotIdx
|
||||||
|
// (which is a bit arcane in itself, as we have to express -1 with an
|
||||||
|
// unsigned int...).
|
||||||
|
if s.hotIdx == 0 {
|
||||||
|
count = atomic.AddUint64(&s.countAndHotIdx, 1) >> 1
|
||||||
|
s.hotIdx = 1
|
||||||
|
hotCounts = s.counts[1]
|
||||||
|
coldCounts = s.counts[0]
|
||||||
|
} else {
|
||||||
|
count = atomic.AddUint64(&s.countAndHotIdx, ^uint64(0)) >> 1 // Decrement.
|
||||||
|
s.hotIdx = 0
|
||||||
|
hotCounts = s.counts[0]
|
||||||
|
coldCounts = s.counts[1]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now we have to wait for the now-declared-cold counts to actually cool
|
||||||
|
// down, i.e. wait for all observations still using it to finish. That's
|
||||||
|
// the case once the count in the cold counts struct is the same as the
|
||||||
|
// one atomically retrieved from the upper 63bits of s.countAndHotIdx.
|
||||||
|
for {
|
||||||
|
if count == atomic.LoadUint64(&coldCounts.count) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
runtime.Gosched() // Let observations get work done.
|
||||||
|
}
|
||||||
|
|
||||||
|
sum.SampleCount = proto.Uint64(count)
|
||||||
|
sum.SampleSum = proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sumBits)))
|
||||||
|
|
||||||
|
out.Summary = sum
|
||||||
|
out.Label = s.labelPairs
|
||||||
|
|
||||||
|
// Finally add all the cold counts to the new hot counts and reset the cold counts.
|
||||||
|
atomic.AddUint64(&hotCounts.count, count)
|
||||||
|
atomic.StoreUint64(&coldCounts.count, 0)
|
||||||
|
for {
|
||||||
|
oldBits := atomic.LoadUint64(&hotCounts.sumBits)
|
||||||
|
newBits := math.Float64bits(math.Float64frombits(oldBits) + sum.GetSampleSum())
|
||||||
|
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
|
||||||
|
atomic.StoreUint64(&coldCounts.sumBits, 0)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type quantSort []*dto.Quantile
|
type quantSort []*dto.Quantile
|
||||||
|
|
||||||
func (s quantSort) Len() int {
|
func (s quantSort) Len() int {
|
||||||
|
|
|
@ -54,11 +54,19 @@ func TestSummaryWithoutObjectives(t *testing.T) {
|
||||||
if err := reg.Register(summaryWithEmptyObjectives); err != nil {
|
if err := reg.Register(summaryWithEmptyObjectives); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
summaryWithEmptyObjectives.Observe(3)
|
||||||
|
summaryWithEmptyObjectives.Observe(0.14)
|
||||||
|
|
||||||
m := &dto.Metric{}
|
m := &dto.Metric{}
|
||||||
if err := summaryWithEmptyObjectives.Write(m); err != nil {
|
if err := summaryWithEmptyObjectives.Write(m); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
if got, want := m.GetSummary().GetSampleSum(), 3.14; got != want {
|
||||||
|
t.Errorf("got sample sum %f, want %f", got, want)
|
||||||
|
}
|
||||||
|
if got, want := m.GetSummary().GetSampleCount(), uint64(2); got != want {
|
||||||
|
t.Errorf("got sample sum %d, want %d", got, want)
|
||||||
|
}
|
||||||
if len(m.GetSummary().Quantile) != 0 {
|
if len(m.GetSummary().Quantile) != 0 {
|
||||||
t.Error("expected no objectives in summary")
|
t.Error("expected no objectives in summary")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue