forked from mirror/client_golang
Merge pull request #457 from prometheus/beorn7/histogram
Lock-free atomic observations in Histograms!
This commit is contained in:
commit
b5bfa0eb2c
|
@ -16,7 +16,9 @@ package prometheus
|
|||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
@ -200,28 +202,49 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
|
|||
}
|
||||
}
|
||||
}
|
||||
// Finally we know the final length of h.upperBounds and can make counts.
|
||||
h.counts = make([]uint64, len(h.upperBounds))
|
||||
// Finally we know the final length of h.upperBounds and can make counts
|
||||
// for both states:
|
||||
h.counts[0].buckets = make([]uint64, len(h.upperBounds))
|
||||
h.counts[1].buckets = make([]uint64, len(h.upperBounds))
|
||||
|
||||
h.init(h) // Init self-collection.
|
||||
return h
|
||||
}
|
||||
|
||||
type histogram struct {
|
||||
type histogramCounts struct {
|
||||
// sumBits contains the bits of the float64 representing the sum of all
|
||||
// observations. sumBits and count have to go first in the struct to
|
||||
// guarantee alignment for atomic operations.
|
||||
// http://golang.org/pkg/sync/atomic/#pkg-note-BUG
|
||||
sumBits uint64
|
||||
count uint64
|
||||
buckets []uint64
|
||||
}
|
||||
|
||||
type histogram struct {
|
||||
selfCollector
|
||||
// Note that there is no mutex required.
|
||||
|
||||
desc *Desc
|
||||
desc *Desc
|
||||
writeMtx sync.Mutex // Only used in the Write method.
|
||||
|
||||
upperBounds []float64
|
||||
counts []uint64
|
||||
|
||||
// Two counts, one is "hot" for lock-free observations, the other is
|
||||
// "cold" for writing out a dto.Metric.
|
||||
counts [2]histogramCounts
|
||||
|
||||
hotIdx int // Index of currently-hot counts. Only used within Write.
|
||||
|
||||
// This is a complicated one. For lock-free yet atomic observations, we
|
||||
// need to save the total count of observations again, combined with the
|
||||
// index of the currently-hot counts struct, so that we can perform the
|
||||
// operation on both values atomically. The least significant bit
|
||||
// defines the hot counts struct. The remaining 63 bits represent the
|
||||
// total count of observations. This happens under the assumption that
|
||||
// the 63bit count will never overflow. Rationale: An observations takes
|
||||
// about 30ns. Let's assume it could happen in 10ns. Overflowing the
|
||||
// counter will then take at least (2^63)*10ns, which is about 3000
|
||||
// years.
|
||||
countAndHotIdx uint64
|
||||
|
||||
labelPairs []*dto.LabelPair
|
||||
}
|
||||
|
@ -241,36 +264,113 @@ func (h *histogram) Observe(v float64) {
|
|||
// 100 buckets: 78.1 ns/op linear - binary 54.9 ns/op
|
||||
// 300 buckets: 154 ns/op linear - binary 61.6 ns/op
|
||||
i := sort.SearchFloat64s(h.upperBounds, v)
|
||||
if i < len(h.counts) {
|
||||
atomic.AddUint64(&h.counts[i], 1)
|
||||
|
||||
// We increment h.countAndHotIdx by 2 so that the counter in the upper
|
||||
// 63 bits gets incremented by 1. At the same time, we get the new value
|
||||
// back, which we can use to find the currently-hot counts.
|
||||
n := atomic.AddUint64(&h.countAndHotIdx, 2)
|
||||
hotCounts := &h.counts[n%2]
|
||||
|
||||
if i < len(h.upperBounds) {
|
||||
atomic.AddUint64(&hotCounts.buckets[i], 1)
|
||||
}
|
||||
atomic.AddUint64(&h.count, 1)
|
||||
for {
|
||||
oldBits := atomic.LoadUint64(&h.sumBits)
|
||||
oldBits := atomic.LoadUint64(&hotCounts.sumBits)
|
||||
newBits := math.Float64bits(math.Float64frombits(oldBits) + v)
|
||||
if atomic.CompareAndSwapUint64(&h.sumBits, oldBits, newBits) {
|
||||
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
|
||||
break
|
||||
}
|
||||
}
|
||||
// Increment count last as we take it as a signal that the observation
|
||||
// is complete.
|
||||
atomic.AddUint64(&hotCounts.count, 1)
|
||||
}
|
||||
|
||||
func (h *histogram) Write(out *dto.Metric) error {
|
||||
his := &dto.Histogram{}
|
||||
buckets := make([]*dto.Bucket, len(h.upperBounds))
|
||||
var (
|
||||
his = &dto.Histogram{}
|
||||
buckets = make([]*dto.Bucket, len(h.upperBounds))
|
||||
hotCounts, coldCounts *histogramCounts
|
||||
count uint64
|
||||
)
|
||||
|
||||
his.SampleSum = proto.Float64(math.Float64frombits(atomic.LoadUint64(&h.sumBits)))
|
||||
his.SampleCount = proto.Uint64(atomic.LoadUint64(&h.count))
|
||||
var count uint64
|
||||
// For simplicity, we mutex the rest of this method. It is not in the
|
||||
// hot path, i.e. Observe is called much more often than Write. The
|
||||
// complication of making Write lock-free isn't worth it.
|
||||
h.writeMtx.Lock()
|
||||
defer h.writeMtx.Unlock()
|
||||
|
||||
// This is a bit arcane, which is why the following spells out this if
|
||||
// clause in English:
|
||||
//
|
||||
// If the currently-hot counts struct is #0, we atomically increment
|
||||
// h.countAndHotIdx by 1 so that from now on Observe will use the counts
|
||||
// struct #1. Furthermore, the atomic increment gives us the new value,
|
||||
// which, in its most significant 63 bits, tells us the count of
|
||||
// observations done so far up to and including currently ongoing
|
||||
// observations still using the counts struct just changed from hot to
|
||||
// cold. To have a normal uint64 for the count, we bitshift by 1 and
|
||||
// save the result in count. We also set h.hotIdx to 1 for the next
|
||||
// Write call, and we will refer to counts #1 as hotCounts and to counts
|
||||
// #0 as coldCounts.
|
||||
//
|
||||
// If the currently-hot counts struct is #1, we do the corresponding
|
||||
// things the other way round. We have to _decrement_ h.countAndHotIdx
|
||||
// (which is a bit arcane in itself, as we have to express -1 with an
|
||||
// unsigned int...).
|
||||
if h.hotIdx == 0 {
|
||||
count = atomic.AddUint64(&h.countAndHotIdx, 1) >> 1
|
||||
h.hotIdx = 1
|
||||
hotCounts = &h.counts[1]
|
||||
coldCounts = &h.counts[0]
|
||||
} else {
|
||||
count = atomic.AddUint64(&h.countAndHotIdx, ^uint64(0)) >> 1 // Decrement.
|
||||
h.hotIdx = 0
|
||||
hotCounts = &h.counts[0]
|
||||
coldCounts = &h.counts[1]
|
||||
}
|
||||
|
||||
// Now we have to wait for the now-declared-cold counts to actually cool
|
||||
// down, i.e. wait for all observations still using it to finish. That's
|
||||
// the case once the count in the cold counts struct is the same as the
|
||||
// one atomically retrieved from the upper 63bits of h.countAndHotIdx.
|
||||
for {
|
||||
if count == atomic.LoadUint64(&coldCounts.count) {
|
||||
break
|
||||
}
|
||||
runtime.Gosched() // Let observations get work done.
|
||||
}
|
||||
|
||||
his.SampleCount = proto.Uint64(count)
|
||||
his.SampleSum = proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sumBits)))
|
||||
var cumCount uint64
|
||||
for i, upperBound := range h.upperBounds {
|
||||
count += atomic.LoadUint64(&h.counts[i])
|
||||
cumCount += atomic.LoadUint64(&coldCounts.buckets[i])
|
||||
buckets[i] = &dto.Bucket{
|
||||
CumulativeCount: proto.Uint64(count),
|
||||
CumulativeCount: proto.Uint64(cumCount),
|
||||
UpperBound: proto.Float64(upperBound),
|
||||
}
|
||||
}
|
||||
|
||||
his.Bucket = buckets
|
||||
out.Histogram = his
|
||||
out.Label = h.labelPairs
|
||||
|
||||
// Finally add all the cold counts to the new hot counts and reset the cold counts.
|
||||
atomic.AddUint64(&hotCounts.count, count)
|
||||
atomic.StoreUint64(&coldCounts.count, 0)
|
||||
for {
|
||||
oldBits := atomic.LoadUint64(&hotCounts.sumBits)
|
||||
newBits := math.Float64bits(math.Float64frombits(oldBits) + his.GetSampleSum())
|
||||
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
|
||||
atomic.StoreUint64(&coldCounts.sumBits, 0)
|
||||
break
|
||||
}
|
||||
}
|
||||
for i := range h.upperBounds {
|
||||
atomic.AddUint64(&hotCounts.buckets[i], atomic.LoadUint64(&coldCounts.buckets[i]))
|
||||
atomic.StoreUint64(&coldCounts.buckets[i], 0)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"math"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
|
@ -346,3 +347,47 @@ func TestBuckets(t *testing.T) {
|
|||
t.Errorf("linear buckets: got %v, want %v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHistogramAtomicObserve(t *testing.T) {
|
||||
var (
|
||||
quit = make(chan struct{})
|
||||
his = NewHistogram(HistogramOpts{
|
||||
Buckets: []float64{0.5, 10, 20},
|
||||
})
|
||||
)
|
||||
|
||||
defer func() { close(quit) }()
|
||||
|
||||
observe := func() {
|
||||
for {
|
||||
select {
|
||||
case <-quit:
|
||||
return
|
||||
default:
|
||||
his.Observe(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
go observe()
|
||||
go observe()
|
||||
go observe()
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
m := &dto.Metric{}
|
||||
if err := his.Write(m); err != nil {
|
||||
t.Fatal("unexpected error writing histogram:", err)
|
||||
}
|
||||
h := m.GetHistogram()
|
||||
if h.GetSampleCount() != uint64(h.GetSampleSum()) ||
|
||||
h.GetSampleCount() != h.GetBucket()[1].GetCumulativeCount() ||
|
||||
h.GetSampleCount() != h.GetBucket()[2].GetCumulativeCount() {
|
||||
t.Fatalf(
|
||||
"inconsistent counts in histogram: count=%d sum=%f buckets=[%d, %d]",
|
||||
h.GetSampleCount(), h.GetSampleSum(),
|
||||
h.GetBucket()[1].GetCumulativeCount(), h.GetBucket()[2].GetCumulativeCount(),
|
||||
)
|
||||
}
|
||||
runtime.Gosched()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue