From 226b83ac2ed99358992f6571f36597b6391f2058 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 24 Dec 2018 11:23:13 +0100 Subject: [PATCH] Provide lock-free implementation for Summary without objectives Signed-off-by: beorn7 --- prometheus/histogram.go | 4 +- prometheus/summary.go | 149 +++++++++++++++++++++++++++++++++++++ prometheus/summary_test.go | 8 ++ 3 files changed, 159 insertions(+), 2 deletions(-) diff --git a/prometheus/histogram.go b/prometheus/histogram.go index f88da70..20ee1af 100644 --- a/prometheus/histogram.go +++ b/prometheus/histogram.go @@ -204,8 +204,8 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr } } } - // Finally we know the final length of h.upperBounds and can make counts - // for both states: + // Finally we know the final length of h.upperBounds and can make buckets + // for both counts: h.counts[0].buckets = make([]uint64, len(h.upperBounds)) h.counts[1].buckets = make([]uint64, len(h.upperBounds)) diff --git a/prometheus/summary.go b/prometheus/summary.go index 2980614..577fd49 100644 --- a/prometheus/summary.go +++ b/prometheus/summary.go @@ -16,8 +16,10 @@ package prometheus import ( "fmt" "math" + "runtime" "sort" "sync" + "sync/atomic" "time" "github.com/beorn7/perks/quantile" @@ -214,6 +216,17 @@ func newSummary(desc *Desc, opts SummaryOpts, labelValues ...string) Summary { opts.BufCap = DefBufCap } + if len(opts.Objectives) == 0 { + // Use the lock-free implementation of a Summary without objectives. + s := &noObjectivesSummary{ + desc: desc, + labelPairs: makeLabelPairs(desc, labelValues), + counts: [2]*summaryCounts{&summaryCounts{}, &summaryCounts{}}, + } + s.init(s) // Init self-collection. + return s + } + s := &summary{ desc: desc, @@ -382,6 +395,142 @@ func (s *summary) swapBufs(now time.Time) { } } +type summaryCounts struct { + // sumBits contains the bits of the float64 representing the sum of all + // observations. sumBits and count have to go first in the struct to + // guarantee alignment for atomic operations. + // http://golang.org/pkg/sync/atomic/#pkg-note-BUG + sumBits uint64 + count uint64 +} + +type noObjectivesSummary struct { + // countAndHotIdx is a complicated one. For lock-free yet atomic + // observations, we need to save the total count of observations again, + // combined with the index of the currently-hot counts struct, so that + // we can perform the operation on both values atomically. The least + // significant bit defines the hot counts struct. The remaining 63 bits + // represent the total count of observations. This happens under the + // assumption that the 63bit count will never overflow. Rationale: An + // observations takes about 30ns. Let's assume it could happen in + // 10ns. Overflowing the counter will then take at least (2^63)*10ns, + // which is about 3000 years. + // + // This has to be first in the struct for 64bit alignment. See + // http://golang.org/pkg/sync/atomic/#pkg-note-BUG + countAndHotIdx uint64 + + selfCollector + desc *Desc + writeMtx sync.Mutex // Only used in the Write method. + + // Two counts, one is "hot" for lock-free observations, the other is + // "cold" for writing out a dto.Metric. It has to be an array of + // pointers to guarantee 64bit alignment of the histogramCounts, see + // http://golang.org/pkg/sync/atomic/#pkg-note-BUG. + counts [2]*summaryCounts + hotIdx int // Index of currently-hot counts. Only used within Write. + + labelPairs []*dto.LabelPair +} + +func (s *noObjectivesSummary) Desc() *Desc { + return s.desc +} + +func (s *noObjectivesSummary) Observe(v float64) { + // We increment s.countAndHotIdx by 2 so that the counter in the upper + // 63 bits gets incremented by 1. At the same time, we get the new value + // back, which we can use to find the currently-hot counts. + n := atomic.AddUint64(&s.countAndHotIdx, 2) + hotCounts := s.counts[n%2] + + for { + oldBits := atomic.LoadUint64(&hotCounts.sumBits) + newBits := math.Float64bits(math.Float64frombits(oldBits) + v) + if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) { + break + } + } + // Increment count last as we take it as a signal that the observation + // is complete. + atomic.AddUint64(&hotCounts.count, 1) +} + +func (s *noObjectivesSummary) Write(out *dto.Metric) error { + var ( + sum = &dto.Summary{} + hotCounts, coldCounts *summaryCounts + count uint64 + ) + + // For simplicity, we mutex the rest of this method. It is not in the + // hot path, i.e. Observe is called much more often than Write. The + // complication of making Write lock-free isn't worth it. + s.writeMtx.Lock() + defer s.writeMtx.Unlock() + + // This is a bit arcane, which is why the following spells out this if + // clause in English: + // + // If the currently-hot counts struct is #0, we atomically increment + // s.countAndHotIdx by 1 so that from now on Observe will use the counts + // struct #1. Furthermore, the atomic increment gives us the new value, + // which, in its most significant 63 bits, tells us the count of + // observations done so far up to and including currently ongoing + // observations still using the counts struct just changed from hot to + // cold. To have a normal uint64 for the count, we bitshift by 1 and + // save the result in count. We also set s.hotIdx to 1 for the next + // Write call, and we will refer to counts #1 as hotCounts and to counts + // #0 as coldCounts. + // + // If the currently-hot counts struct is #1, we do the corresponding + // things the other way round. We have to _decrement_ s.countAndHotIdx + // (which is a bit arcane in itself, as we have to express -1 with an + // unsigned int...). + if s.hotIdx == 0 { + count = atomic.AddUint64(&s.countAndHotIdx, 1) >> 1 + s.hotIdx = 1 + hotCounts = s.counts[1] + coldCounts = s.counts[0] + } else { + count = atomic.AddUint64(&s.countAndHotIdx, ^uint64(0)) >> 1 // Decrement. + s.hotIdx = 0 + hotCounts = s.counts[0] + coldCounts = s.counts[1] + } + + // Now we have to wait for the now-declared-cold counts to actually cool + // down, i.e. wait for all observations still using it to finish. That's + // the case once the count in the cold counts struct is the same as the + // one atomically retrieved from the upper 63bits of s.countAndHotIdx. + for { + if count == atomic.LoadUint64(&coldCounts.count) { + break + } + runtime.Gosched() // Let observations get work done. + } + + sum.SampleCount = proto.Uint64(count) + sum.SampleSum = proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sumBits))) + + out.Summary = sum + out.Label = s.labelPairs + + // Finally add all the cold counts to the new hot counts and reset the cold counts. + atomic.AddUint64(&hotCounts.count, count) + atomic.StoreUint64(&coldCounts.count, 0) + for { + oldBits := atomic.LoadUint64(&hotCounts.sumBits) + newBits := math.Float64bits(math.Float64frombits(oldBits) + sum.GetSampleSum()) + if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) { + atomic.StoreUint64(&coldCounts.sumBits, 0) + break + } + } + return nil +} + type quantSort []*dto.Quantile func (s quantSort) Len() int { diff --git a/prometheus/summary_test.go b/prometheus/summary_test.go index 8b1a62e..26f693d 100644 --- a/prometheus/summary_test.go +++ b/prometheus/summary_test.go @@ -54,11 +54,19 @@ func TestSummaryWithoutObjectives(t *testing.T) { if err := reg.Register(summaryWithEmptyObjectives); err != nil { t.Error(err) } + summaryWithEmptyObjectives.Observe(3) + summaryWithEmptyObjectives.Observe(0.14) m := &dto.Metric{} if err := summaryWithEmptyObjectives.Write(m); err != nil { t.Error(err) } + if got, want := m.GetSummary().GetSampleSum(), 3.14; got != want { + t.Errorf("got sample sum %f, want %f", got, want) + } + if got, want := m.GetSummary().GetSampleCount(), uint64(2); got != want { + t.Errorf("got sample sum %d, want %d", got, want) + } if len(m.GetSummary().Quantile) != 0 { t.Error("expected no objectives in summary") }