Make Histogram observations atomic while keeping them lock-free

Fixes #275

This is rather tricky and required some studying of the Go memory
model. I have added copious code comments to explain what's going on.

Benchmarks haven't changed significantly, despite the additional
atomic operations now required during Observe. Write performance is
noticable, but it is also much more involved now and has a mutex. (But
note that Write is supposed to be a relatively rare operation and thus
not in the hot path compared to Observe.) Allocs haven't changed at
all.

OLD:

BenchmarkHistogramWithLabelValues-4     10000000               151 ns/op               0 B/op          0 allocs/op
BenchmarkHistogramNoLabels-4            50000000                36.0 ns/op             0 B/op          0 allocs/op
BenchmarkHistogramObserve1-4            50000000                28.1 ns/op             0 B/op          0 allocs/op
BenchmarkHistogramObserve2-4            10000000               160 ns/op               0 B/op          0 allocs/op
BenchmarkHistogramObserve4-4             5000000               378 ns/op               0 B/op          0 allocs/op
BenchmarkHistogramObserve8-4             2000000               768 ns/op               0 B/op          0 allocs/op
BenchmarkHistogramWrite1-4               1000000              1589 ns/op             896 B/op         37 allocs/op
BenchmarkHistogramWrite2-4                500000              2973 ns/op            1792 B/op         74 allocs/op
BenchmarkHistogramWrite4-4                300000              6979 ns/op            3584 B/op        148 allocs/op
BenchmarkHistogramWrite8-4                100000             10701 ns/op            7168 B/op        296 allocs/op

NEW:

BenchmarkHistogramWithLabelValues-4     10000000               191 ns/op               0 B/op          0 allocs/op
BenchmarkHistogramNoLabels-4            30000000                50.1 ns/op             0 B/op          0 allocs/op
BenchmarkHistogramObserve1-4            30000000                40.0 ns/op             0 B/op          0 allocs/op
BenchmarkHistogramObserve2-4            20000000                91.5 ns/op             0 B/op          0 allocs/op
BenchmarkHistogramObserve4-4             5000000               317 ns/op               0 B/op          0 allocs/op
BenchmarkHistogramObserve8-4             2000000               636 ns/op               0 B/op          0 allocs/op
BenchmarkHistogramWrite1-4               1000000              2072 ns/op             896 B/op         37 allocs/op
BenchmarkHistogramWrite2-4                300000              3729 ns/op            1792 B/op         74 allocs/op
BenchmarkHistogramWrite4-4                200000              7847 ns/op            3584 B/op        148 allocs/op
BenchmarkHistogramWrite8-4                100000             16975 ns/op            7168 B/op        296 allocs/op

Signed-off-by: beorn7 <beorn@soundcloud.com>
This commit is contained in:
beorn7 2018-09-07 16:20:30 +02:00
parent 1e08f788cf
commit 1b2bd1d665
2 changed files with 130 additions and 24 deletions

View File

@ -16,7 +16,9 @@ package prometheus
import ( import (
"fmt" "fmt"
"math" "math"
"runtime"
"sort" "sort"
"sync"
"sync/atomic" "sync/atomic"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -200,28 +202,49 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
} }
} }
} }
// Finally we know the final length of h.upperBounds and can make counts. // Finally we know the final length of h.upperBounds and can make counts
h.counts = make([]uint64, len(h.upperBounds)) // for both states:
h.counts[0].buckets = make([]uint64, len(h.upperBounds))
h.counts[1].buckets = make([]uint64, len(h.upperBounds))
h.init(h) // Init self-collection. h.init(h) // Init self-collection.
return h return h
} }
type histogram struct { type histogramCounts struct {
// sumBits contains the bits of the float64 representing the sum of all // sumBits contains the bits of the float64 representing the sum of all
// observations. sumBits and count have to go first in the struct to // observations. sumBits and count have to go first in the struct to
// guarantee alignment for atomic operations. // guarantee alignment for atomic operations.
// http://golang.org/pkg/sync/atomic/#pkg-note-BUG // http://golang.org/pkg/sync/atomic/#pkg-note-BUG
sumBits uint64 sumBits uint64
count uint64 count uint64
buckets []uint64
}
type histogram struct {
selfCollector selfCollector
// Note that there is no mutex required.
desc *Desc desc *Desc
writeMtx sync.Mutex // Only used in the Write method.
upperBounds []float64 upperBounds []float64
counts []uint64
// Two counts, one is "hot" for lock-free observations, the other is
// "cold" for writing out a dto.Metric.
counts [2]histogramCounts
hotIdx int // Index of currently-hot counts. Only used within Write.
// This is a complicated one. For lock-free yet atomic observations, we
// need to save the total count of observations again, combined with the
// index of the currently-hot counts struct, so that we can perform the
// operation on both values atomically. The least significant bit
// defines the hot counts struct. The remaining 63 bits represent the
// total count of observations. This happens under the assumption that
// the 63bit count will never overflow. Rationale: An observations takes
// about 30ns. Let's assume it could happen in 10ns. Overflowing the
// counter will then take at least (2^63)*10ns, which is about 3000
// years.
countAndHotIdx uint64
labelPairs []*dto.LabelPair labelPairs []*dto.LabelPair
} }
@ -241,36 +264,113 @@ func (h *histogram) Observe(v float64) {
// 100 buckets: 78.1 ns/op linear - binary 54.9 ns/op // 100 buckets: 78.1 ns/op linear - binary 54.9 ns/op
// 300 buckets: 154 ns/op linear - binary 61.6 ns/op // 300 buckets: 154 ns/op linear - binary 61.6 ns/op
i := sort.SearchFloat64s(h.upperBounds, v) i := sort.SearchFloat64s(h.upperBounds, v)
if i < len(h.counts) {
atomic.AddUint64(&h.counts[i], 1) // We increment h.countAndHotIdx by 2 so that the counter in the upper
// 63 bits gets incremented by 1. At the same time, we get the new value
// back, which we can use to find the currently-hot counts.
n := atomic.AddUint64(&h.countAndHotIdx, 2)
hotCounts := &h.counts[n%2]
if i < len(h.upperBounds) {
atomic.AddUint64(&hotCounts.buckets[i], 1)
} }
atomic.AddUint64(&h.count, 1)
for { for {
oldBits := atomic.LoadUint64(&h.sumBits) oldBits := atomic.LoadUint64(&hotCounts.sumBits)
newBits := math.Float64bits(math.Float64frombits(oldBits) + v) newBits := math.Float64bits(math.Float64frombits(oldBits) + v)
if atomic.CompareAndSwapUint64(&h.sumBits, oldBits, newBits) { if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
break break
} }
} }
// Increment count last as we take it as a signal that the observation
// is complete.
atomic.AddUint64(&hotCounts.count, 1)
} }
func (h *histogram) Write(out *dto.Metric) error { func (h *histogram) Write(out *dto.Metric) error {
his := &dto.Histogram{} var (
buckets := make([]*dto.Bucket, len(h.upperBounds)) his = &dto.Histogram{}
buckets = make([]*dto.Bucket, len(h.upperBounds))
hotCounts, coldCounts *histogramCounts
count uint64
)
his.SampleSum = proto.Float64(math.Float64frombits(atomic.LoadUint64(&h.sumBits))) // For simplicity, we mutex the rest of this method. It is not in the
his.SampleCount = proto.Uint64(atomic.LoadUint64(&h.count)) // hot path, i.e. Observe is called much more often than Write. The
var count uint64 // complication of making Write lock-free isn't worth it.
h.writeMtx.Lock()
defer h.writeMtx.Unlock()
// This is a bit arcane, which is why the following spells out this if
// clause in English:
//
// If the currently-hot counts struct is #0, we atomically increment
// h.countAndHotIdx by 1 so that from now on Observe will use the counts
// struct #1. Furthermore, the atomic increment gives us the new value,
// which, in its most significant 63 bits, tells us the count of
// observations done so far up to and including currently ongoing
// observations still using the counts struct just changed from hot to
// cold. To have a normal uint64 for the count, we bitshift by 1 and
// save the result in count. We also set h.hotIdx to 1 for the next
// Write call, and we will refer to counts #1 as hotCounts and to counts
// #0 as coldCounts.
//
// If the currently-hot counts struct is #1, we do the corresponding
// things the other way round. We have to _decrement_ h.countAndHotIdx
// (which is a bit arcane in itself, as we have to express -1 with an
// unsigned int...).
if h.hotIdx == 0 {
count = atomic.AddUint64(&h.countAndHotIdx, 1) >> 1
h.hotIdx = 1
hotCounts = &h.counts[1]
coldCounts = &h.counts[0]
} else {
count = atomic.AddUint64(&h.countAndHotIdx, ^uint64(0)) >> 1 // Decrement.
h.hotIdx = 0
hotCounts = &h.counts[0]
coldCounts = &h.counts[1]
}
// Now we have to wait for the now-declared-cold counts to actually cool
// down, i.e. wait for all observations still using it to finish. That's
// the case once the count in the cold counts struct is the same as the
// one atomically retrieved from the upper 63bits of h.countAndHotIdx.
for {
if count == atomic.LoadUint64(&coldCounts.count) {
break
}
runtime.Gosched() // Let observations get work done.
}
his.SampleCount = proto.Uint64(count)
his.SampleSum = proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sumBits)))
var cumCount uint64
for i, upperBound := range h.upperBounds { for i, upperBound := range h.upperBounds {
count += atomic.LoadUint64(&h.counts[i]) cumCount += atomic.LoadUint64(&coldCounts.buckets[i])
buckets[i] = &dto.Bucket{ buckets[i] = &dto.Bucket{
CumulativeCount: proto.Uint64(count), CumulativeCount: proto.Uint64(cumCount),
UpperBound: proto.Float64(upperBound), UpperBound: proto.Float64(upperBound),
} }
} }
his.Bucket = buckets his.Bucket = buckets
out.Histogram = his out.Histogram = his
out.Label = h.labelPairs out.Label = h.labelPairs
// Finally add all the cold counts to the new hot counts and reset the cold counts.
atomic.AddUint64(&hotCounts.count, count)
atomic.StoreUint64(&coldCounts.count, 0)
for {
oldBits := atomic.LoadUint64(&hotCounts.sumBits)
newBits := math.Float64bits(math.Float64frombits(oldBits) + his.GetSampleSum())
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
atomic.StoreUint64(&coldCounts.sumBits, 0)
break
}
}
for i := range h.upperBounds {
atomic.AddUint64(&hotCounts.buckets[i], atomic.LoadUint64(&coldCounts.buckets[i]))
atomic.StoreUint64(&coldCounts.buckets[i], 0)
}
return nil return nil
} }

View File

@ -358,7 +358,7 @@ func TestHistogramAtomicObserve(t *testing.T) {
defer func() { close(quit) }() defer func() { close(quit) }()
go func() { observe := func() {
for { for {
select { select {
case <-quit: case <-quit:
@ -367,7 +367,11 @@ func TestHistogramAtomicObserve(t *testing.T) {
his.Observe(1) his.Observe(1)
} }
} }
}() }
go observe()
go observe()
go observe()
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
m := &dto.Metric{} m := &dto.Metric{}
@ -376,10 +380,12 @@ func TestHistogramAtomicObserve(t *testing.T) {
} }
h := m.GetHistogram() h := m.GetHistogram()
if h.GetSampleCount() != uint64(h.GetSampleSum()) || if h.GetSampleCount() != uint64(h.GetSampleSum()) ||
h.GetSampleCount() != h.GetBucket()[1].GetCumulativeCount() { h.GetSampleCount() != h.GetBucket()[1].GetCumulativeCount() ||
h.GetSampleCount() != h.GetBucket()[2].GetCumulativeCount() {
t.Fatalf( t.Fatalf(
"inconsistent counts in histogram: count=%d sum=%f bucket=%d", "inconsistent counts in histogram: count=%d sum=%f buckets=[%d, %d]",
h.GetSampleCount(), h.GetSampleSum(), h.GetBucket()[1].GetCumulativeCount(), h.GetSampleCount(), h.GetSampleSum(),
h.GetBucket()[1].GetCumulativeCount(), h.GetBucket()[2].GetCumulativeCount(),
) )
} }
runtime.Gosched() runtime.Gosched()