Provide lock-free implementation for Summary without objectives

Signed-off-by: beorn7 <beorn@soundcloud.com>
This commit is contained in:
beorn7 2018-12-24 11:23:13 +01:00
parent a006a7550a
commit 226b83ac2e
3 changed files with 159 additions and 2 deletions

View File

@ -204,8 +204,8 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
} }
} }
} }
// Finally we know the final length of h.upperBounds and can make counts // Finally we know the final length of h.upperBounds and can make buckets
// for both states: // for both counts:
h.counts[0].buckets = make([]uint64, len(h.upperBounds)) h.counts[0].buckets = make([]uint64, len(h.upperBounds))
h.counts[1].buckets = make([]uint64, len(h.upperBounds)) h.counts[1].buckets = make([]uint64, len(h.upperBounds))

View File

@ -16,8 +16,10 @@ package prometheus
import ( import (
"fmt" "fmt"
"math" "math"
"runtime"
"sort" "sort"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/beorn7/perks/quantile" "github.com/beorn7/perks/quantile"
@ -214,6 +216,17 @@ func newSummary(desc *Desc, opts SummaryOpts, labelValues ...string) Summary {
opts.BufCap = DefBufCap opts.BufCap = DefBufCap
} }
if len(opts.Objectives) == 0 {
// Use the lock-free implementation of a Summary without objectives.
s := &noObjectivesSummary{
desc: desc,
labelPairs: makeLabelPairs(desc, labelValues),
counts: [2]*summaryCounts{&summaryCounts{}, &summaryCounts{}},
}
s.init(s) // Init self-collection.
return s
}
s := &summary{ s := &summary{
desc: desc, desc: desc,
@ -382,6 +395,142 @@ func (s *summary) swapBufs(now time.Time) {
} }
} }
type summaryCounts struct {
// sumBits contains the bits of the float64 representing the sum of all
// observations. sumBits and count have to go first in the struct to
// guarantee alignment for atomic operations.
// http://golang.org/pkg/sync/atomic/#pkg-note-BUG
sumBits uint64
count uint64
}
type noObjectivesSummary struct {
// countAndHotIdx is a complicated one. For lock-free yet atomic
// observations, we need to save the total count of observations again,
// combined with the index of the currently-hot counts struct, so that
// we can perform the operation on both values atomically. The least
// significant bit defines the hot counts struct. The remaining 63 bits
// represent the total count of observations. This happens under the
// assumption that the 63bit count will never overflow. Rationale: An
// observations takes about 30ns. Let's assume it could happen in
// 10ns. Overflowing the counter will then take at least (2^63)*10ns,
// which is about 3000 years.
//
// This has to be first in the struct for 64bit alignment. See
// http://golang.org/pkg/sync/atomic/#pkg-note-BUG
countAndHotIdx uint64
selfCollector
desc *Desc
writeMtx sync.Mutex // Only used in the Write method.
// Two counts, one is "hot" for lock-free observations, the other is
// "cold" for writing out a dto.Metric. It has to be an array of
// pointers to guarantee 64bit alignment of the histogramCounts, see
// http://golang.org/pkg/sync/atomic/#pkg-note-BUG.
counts [2]*summaryCounts
hotIdx int // Index of currently-hot counts. Only used within Write.
labelPairs []*dto.LabelPair
}
func (s *noObjectivesSummary) Desc() *Desc {
return s.desc
}
func (s *noObjectivesSummary) Observe(v float64) {
// We increment s.countAndHotIdx by 2 so that the counter in the upper
// 63 bits gets incremented by 1. At the same time, we get the new value
// back, which we can use to find the currently-hot counts.
n := atomic.AddUint64(&s.countAndHotIdx, 2)
hotCounts := s.counts[n%2]
for {
oldBits := atomic.LoadUint64(&hotCounts.sumBits)
newBits := math.Float64bits(math.Float64frombits(oldBits) + v)
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
break
}
}
// Increment count last as we take it as a signal that the observation
// is complete.
atomic.AddUint64(&hotCounts.count, 1)
}
func (s *noObjectivesSummary) Write(out *dto.Metric) error {
var (
sum = &dto.Summary{}
hotCounts, coldCounts *summaryCounts
count uint64
)
// For simplicity, we mutex the rest of this method. It is not in the
// hot path, i.e. Observe is called much more often than Write. The
// complication of making Write lock-free isn't worth it.
s.writeMtx.Lock()
defer s.writeMtx.Unlock()
// This is a bit arcane, which is why the following spells out this if
// clause in English:
//
// If the currently-hot counts struct is #0, we atomically increment
// s.countAndHotIdx by 1 so that from now on Observe will use the counts
// struct #1. Furthermore, the atomic increment gives us the new value,
// which, in its most significant 63 bits, tells us the count of
// observations done so far up to and including currently ongoing
// observations still using the counts struct just changed from hot to
// cold. To have a normal uint64 for the count, we bitshift by 1 and
// save the result in count. We also set s.hotIdx to 1 for the next
// Write call, and we will refer to counts #1 as hotCounts and to counts
// #0 as coldCounts.
//
// If the currently-hot counts struct is #1, we do the corresponding
// things the other way round. We have to _decrement_ s.countAndHotIdx
// (which is a bit arcane in itself, as we have to express -1 with an
// unsigned int...).
if s.hotIdx == 0 {
count = atomic.AddUint64(&s.countAndHotIdx, 1) >> 1
s.hotIdx = 1
hotCounts = s.counts[1]
coldCounts = s.counts[0]
} else {
count = atomic.AddUint64(&s.countAndHotIdx, ^uint64(0)) >> 1 // Decrement.
s.hotIdx = 0
hotCounts = s.counts[0]
coldCounts = s.counts[1]
}
// Now we have to wait for the now-declared-cold counts to actually cool
// down, i.e. wait for all observations still using it to finish. That's
// the case once the count in the cold counts struct is the same as the
// one atomically retrieved from the upper 63bits of s.countAndHotIdx.
for {
if count == atomic.LoadUint64(&coldCounts.count) {
break
}
runtime.Gosched() // Let observations get work done.
}
sum.SampleCount = proto.Uint64(count)
sum.SampleSum = proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sumBits)))
out.Summary = sum
out.Label = s.labelPairs
// Finally add all the cold counts to the new hot counts and reset the cold counts.
atomic.AddUint64(&hotCounts.count, count)
atomic.StoreUint64(&coldCounts.count, 0)
for {
oldBits := atomic.LoadUint64(&hotCounts.sumBits)
newBits := math.Float64bits(math.Float64frombits(oldBits) + sum.GetSampleSum())
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
atomic.StoreUint64(&coldCounts.sumBits, 0)
break
}
}
return nil
}
type quantSort []*dto.Quantile type quantSort []*dto.Quantile
func (s quantSort) Len() int { func (s quantSort) Len() int {

View File

@ -54,11 +54,19 @@ func TestSummaryWithoutObjectives(t *testing.T) {
if err := reg.Register(summaryWithEmptyObjectives); err != nil { if err := reg.Register(summaryWithEmptyObjectives); err != nil {
t.Error(err) t.Error(err)
} }
summaryWithEmptyObjectives.Observe(3)
summaryWithEmptyObjectives.Observe(0.14)
m := &dto.Metric{} m := &dto.Metric{}
if err := summaryWithEmptyObjectives.Write(m); err != nil { if err := summaryWithEmptyObjectives.Write(m); err != nil {
t.Error(err) t.Error(err)
} }
if got, want := m.GetSummary().GetSampleSum(), 3.14; got != want {
t.Errorf("got sample sum %f, want %f", got, want)
}
if got, want := m.GetSummary().GetSampleCount(), uint64(2); got != want {
t.Errorf("got sample sum %d, want %d", got, want)
}
if len(m.GetSummary().Quantile) != 0 { if len(m.GetSummary().Quantile) != 0 {
t.Error("expected no objectives in summary") t.Error("expected no objectives in summary")
} }