From 1e08f788cf45ad385cc0df60bc309ef903a95652 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Fri, 7 Sep 2018 18:58:14 +0200 Subject: [PATCH 1/2] Add test to expose #275 Signed-off-by: beorn7 --- prometheus/histogram_test.go | 39 ++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/prometheus/histogram_test.go b/prometheus/histogram_test.go index 5a20f4b..9e5e3a1 100644 --- a/prometheus/histogram_test.go +++ b/prometheus/histogram_test.go @@ -17,6 +17,7 @@ import ( "math" "math/rand" "reflect" + "runtime" "sort" "sync" "testing" @@ -346,3 +347,41 @@ func TestBuckets(t *testing.T) { t.Errorf("linear buckets: got %v, want %v", got, want) } } + +func TestHistogramAtomicObserve(t *testing.T) { + var ( + quit = make(chan struct{}) + his = NewHistogram(HistogramOpts{ + Buckets: []float64{0.5, 10, 20}, + }) + ) + + defer func() { close(quit) }() + + go func() { + for { + select { + case <-quit: + return + default: + his.Observe(1) + } + } + }() + + for i := 0; i < 100; i++ { + m := &dto.Metric{} + if err := his.Write(m); err != nil { + t.Fatal("unexpected error writing histogram:", err) + } + h := m.GetHistogram() + if h.GetSampleCount() != uint64(h.GetSampleSum()) || + h.GetSampleCount() != h.GetBucket()[1].GetCumulativeCount() { + t.Fatalf( + "inconsistent counts in histogram: count=%d sum=%f bucket=%d", + h.GetSampleCount(), h.GetSampleSum(), h.GetBucket()[1].GetCumulativeCount(), + ) + } + runtime.Gosched() + } +} From 1b2bd1d665c299622b573c9088e8018d0028102b Mon Sep 17 00:00:00 2001 From: beorn7 Date: Fri, 7 Sep 2018 16:20:30 +0200 Subject: [PATCH 2/2] Make Histogram observations atomic while keeping them lock-free Fixes #275 This is rather tricky and required some studying of the Go memory model. I have added copious code comments to explain what's going on. Benchmarks haven't changed significantly, despite the additional atomic operations now required during Observe. Write performance is noticable, but it is also much more involved now and has a mutex. (But note that Write is supposed to be a relatively rare operation and thus not in the hot path compared to Observe.) Allocs haven't changed at all. OLD: BenchmarkHistogramWithLabelValues-4 10000000 151 ns/op 0 B/op 0 allocs/op BenchmarkHistogramNoLabels-4 50000000 36.0 ns/op 0 B/op 0 allocs/op BenchmarkHistogramObserve1-4 50000000 28.1 ns/op 0 B/op 0 allocs/op BenchmarkHistogramObserve2-4 10000000 160 ns/op 0 B/op 0 allocs/op BenchmarkHistogramObserve4-4 5000000 378 ns/op 0 B/op 0 allocs/op BenchmarkHistogramObserve8-4 2000000 768 ns/op 0 B/op 0 allocs/op BenchmarkHistogramWrite1-4 1000000 1589 ns/op 896 B/op 37 allocs/op BenchmarkHistogramWrite2-4 500000 2973 ns/op 1792 B/op 74 allocs/op BenchmarkHistogramWrite4-4 300000 6979 ns/op 3584 B/op 148 allocs/op BenchmarkHistogramWrite8-4 100000 10701 ns/op 7168 B/op 296 allocs/op NEW: BenchmarkHistogramWithLabelValues-4 10000000 191 ns/op 0 B/op 0 allocs/op BenchmarkHistogramNoLabels-4 30000000 50.1 ns/op 0 B/op 0 allocs/op BenchmarkHistogramObserve1-4 30000000 40.0 ns/op 0 B/op 0 allocs/op BenchmarkHistogramObserve2-4 20000000 91.5 ns/op 0 B/op 0 allocs/op BenchmarkHistogramObserve4-4 5000000 317 ns/op 0 B/op 0 allocs/op BenchmarkHistogramObserve8-4 2000000 636 ns/op 0 B/op 0 allocs/op BenchmarkHistogramWrite1-4 1000000 2072 ns/op 896 B/op 37 allocs/op BenchmarkHistogramWrite2-4 300000 3729 ns/op 1792 B/op 74 allocs/op BenchmarkHistogramWrite4-4 200000 7847 ns/op 3584 B/op 148 allocs/op BenchmarkHistogramWrite8-4 100000 16975 ns/op 7168 B/op 296 allocs/op Signed-off-by: beorn7 --- prometheus/histogram.go | 138 ++++++++++++++++++++++++++++++----- prometheus/histogram_test.go | 16 ++-- 2 files changed, 130 insertions(+), 24 deletions(-) diff --git a/prometheus/histogram.go b/prometheus/histogram.go index 331783a..a1868d2 100644 --- a/prometheus/histogram.go +++ b/prometheus/histogram.go @@ -16,7 +16,9 @@ package prometheus import ( "fmt" "math" + "runtime" "sort" + "sync" "sync/atomic" "github.com/golang/protobuf/proto" @@ -200,28 +202,49 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr } } } - // Finally we know the final length of h.upperBounds and can make counts. - h.counts = make([]uint64, len(h.upperBounds)) + // Finally we know the final length of h.upperBounds and can make counts + // for both states: + h.counts[0].buckets = make([]uint64, len(h.upperBounds)) + h.counts[1].buckets = make([]uint64, len(h.upperBounds)) h.init(h) // Init self-collection. return h } -type histogram struct { +type histogramCounts struct { // sumBits contains the bits of the float64 representing the sum of all // observations. sumBits and count have to go first in the struct to // guarantee alignment for atomic operations. // http://golang.org/pkg/sync/atomic/#pkg-note-BUG sumBits uint64 count uint64 + buckets []uint64 +} +type histogram struct { selfCollector - // Note that there is no mutex required. - - desc *Desc + desc *Desc + writeMtx sync.Mutex // Only used in the Write method. upperBounds []float64 - counts []uint64 + + // Two counts, one is "hot" for lock-free observations, the other is + // "cold" for writing out a dto.Metric. + counts [2]histogramCounts + + hotIdx int // Index of currently-hot counts. Only used within Write. + + // This is a complicated one. For lock-free yet atomic observations, we + // need to save the total count of observations again, combined with the + // index of the currently-hot counts struct, so that we can perform the + // operation on both values atomically. The least significant bit + // defines the hot counts struct. The remaining 63 bits represent the + // total count of observations. This happens under the assumption that + // the 63bit count will never overflow. Rationale: An observations takes + // about 30ns. Let's assume it could happen in 10ns. Overflowing the + // counter will then take at least (2^63)*10ns, which is about 3000 + // years. + countAndHotIdx uint64 labelPairs []*dto.LabelPair } @@ -241,36 +264,113 @@ func (h *histogram) Observe(v float64) { // 100 buckets: 78.1 ns/op linear - binary 54.9 ns/op // 300 buckets: 154 ns/op linear - binary 61.6 ns/op i := sort.SearchFloat64s(h.upperBounds, v) - if i < len(h.counts) { - atomic.AddUint64(&h.counts[i], 1) + + // We increment h.countAndHotIdx by 2 so that the counter in the upper + // 63 bits gets incremented by 1. At the same time, we get the new value + // back, which we can use to find the currently-hot counts. + n := atomic.AddUint64(&h.countAndHotIdx, 2) + hotCounts := &h.counts[n%2] + + if i < len(h.upperBounds) { + atomic.AddUint64(&hotCounts.buckets[i], 1) } - atomic.AddUint64(&h.count, 1) for { - oldBits := atomic.LoadUint64(&h.sumBits) + oldBits := atomic.LoadUint64(&hotCounts.sumBits) newBits := math.Float64bits(math.Float64frombits(oldBits) + v) - if atomic.CompareAndSwapUint64(&h.sumBits, oldBits, newBits) { + if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) { break } } + // Increment count last as we take it as a signal that the observation + // is complete. + atomic.AddUint64(&hotCounts.count, 1) } func (h *histogram) Write(out *dto.Metric) error { - his := &dto.Histogram{} - buckets := make([]*dto.Bucket, len(h.upperBounds)) + var ( + his = &dto.Histogram{} + buckets = make([]*dto.Bucket, len(h.upperBounds)) + hotCounts, coldCounts *histogramCounts + count uint64 + ) - his.SampleSum = proto.Float64(math.Float64frombits(atomic.LoadUint64(&h.sumBits))) - his.SampleCount = proto.Uint64(atomic.LoadUint64(&h.count)) - var count uint64 + // For simplicity, we mutex the rest of this method. It is not in the + // hot path, i.e. Observe is called much more often than Write. The + // complication of making Write lock-free isn't worth it. + h.writeMtx.Lock() + defer h.writeMtx.Unlock() + + // This is a bit arcane, which is why the following spells out this if + // clause in English: + // + // If the currently-hot counts struct is #0, we atomically increment + // h.countAndHotIdx by 1 so that from now on Observe will use the counts + // struct #1. Furthermore, the atomic increment gives us the new value, + // which, in its most significant 63 bits, tells us the count of + // observations done so far up to and including currently ongoing + // observations still using the counts struct just changed from hot to + // cold. To have a normal uint64 for the count, we bitshift by 1 and + // save the result in count. We also set h.hotIdx to 1 for the next + // Write call, and we will refer to counts #1 as hotCounts and to counts + // #0 as coldCounts. + // + // If the currently-hot counts struct is #1, we do the corresponding + // things the other way round. We have to _decrement_ h.countAndHotIdx + // (which is a bit arcane in itself, as we have to express -1 with an + // unsigned int...). + if h.hotIdx == 0 { + count = atomic.AddUint64(&h.countAndHotIdx, 1) >> 1 + h.hotIdx = 1 + hotCounts = &h.counts[1] + coldCounts = &h.counts[0] + } else { + count = atomic.AddUint64(&h.countAndHotIdx, ^uint64(0)) >> 1 // Decrement. + h.hotIdx = 0 + hotCounts = &h.counts[0] + coldCounts = &h.counts[1] + } + + // Now we have to wait for the now-declared-cold counts to actually cool + // down, i.e. wait for all observations still using it to finish. That's + // the case once the count in the cold counts struct is the same as the + // one atomically retrieved from the upper 63bits of h.countAndHotIdx. + for { + if count == atomic.LoadUint64(&coldCounts.count) { + break + } + runtime.Gosched() // Let observations get work done. + } + + his.SampleCount = proto.Uint64(count) + his.SampleSum = proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sumBits))) + var cumCount uint64 for i, upperBound := range h.upperBounds { - count += atomic.LoadUint64(&h.counts[i]) + cumCount += atomic.LoadUint64(&coldCounts.buckets[i]) buckets[i] = &dto.Bucket{ - CumulativeCount: proto.Uint64(count), + CumulativeCount: proto.Uint64(cumCount), UpperBound: proto.Float64(upperBound), } } + his.Bucket = buckets out.Histogram = his out.Label = h.labelPairs + + // Finally add all the cold counts to the new hot counts and reset the cold counts. + atomic.AddUint64(&hotCounts.count, count) + atomic.StoreUint64(&coldCounts.count, 0) + for { + oldBits := atomic.LoadUint64(&hotCounts.sumBits) + newBits := math.Float64bits(math.Float64frombits(oldBits) + his.GetSampleSum()) + if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) { + atomic.StoreUint64(&coldCounts.sumBits, 0) + break + } + } + for i := range h.upperBounds { + atomic.AddUint64(&hotCounts.buckets[i], atomic.LoadUint64(&coldCounts.buckets[i])) + atomic.StoreUint64(&coldCounts.buckets[i], 0) + } return nil } diff --git a/prometheus/histogram_test.go b/prometheus/histogram_test.go index 9e5e3a1..8427b5b 100644 --- a/prometheus/histogram_test.go +++ b/prometheus/histogram_test.go @@ -358,7 +358,7 @@ func TestHistogramAtomicObserve(t *testing.T) { defer func() { close(quit) }() - go func() { + observe := func() { for { select { case <-quit: @@ -367,7 +367,11 @@ func TestHistogramAtomicObserve(t *testing.T) { his.Observe(1) } } - }() + } + + go observe() + go observe() + go observe() for i := 0; i < 100; i++ { m := &dto.Metric{} @@ -376,10 +380,12 @@ func TestHistogramAtomicObserve(t *testing.T) { } h := m.GetHistogram() if h.GetSampleCount() != uint64(h.GetSampleSum()) || - h.GetSampleCount() != h.GetBucket()[1].GetCumulativeCount() { + h.GetSampleCount() != h.GetBucket()[1].GetCumulativeCount() || + h.GetSampleCount() != h.GetBucket()[2].GetCumulativeCount() { t.Fatalf( - "inconsistent counts in histogram: count=%d sum=%f bucket=%d", - h.GetSampleCount(), h.GetSampleSum(), h.GetBucket()[1].GetCumulativeCount(), + "inconsistent counts in histogram: count=%d sum=%f buckets=[%d, %d]", + h.GetSampleCount(), h.GetSampleSum(), + h.GetBucket()[1].GetCumulativeCount(), h.GetBucket()[2].GetCumulativeCount(), ) } runtime.Gosched()