Compare commits

...

6 Commits

Author SHA1 Message Date
Lukas Vogel 121546b417
Merge e7ef5df93d into 13851e9287 2024-11-12 13:07:06 -08:00
PrometheusBot 13851e9287
Update common Prometheus files (#1683)
Signed-off-by: prombot <prometheus-team@googlegroups.com>
2024-11-12 20:02:43 +00:00
Ivan Goncharov a934c35951
Add: exponential backoff for CAS operations on floats (#1661)
* add: exponential backoff for CAS operations of floats

Signed-off-by: Ivan Goncharov <i.morph@gmail.com>

* add: some more benchmark use cases (higher contention)

Signed-off-by: Ivan Goncharov <i.morph@gmail.com>

* fmt: fumpted some files

Signed-off-by: Ivan Goncharov <i.morph@gmail.com>

* add: license header

Signed-off-by: Ivan Goncharov <i.morph@gmail.com>

* add: comment explaining origin of backoff constants

Signed-off-by: Ivan Goncharov <i.morph@gmail.com>

---------

Signed-off-by: Ivan Goncharov <i.morph@gmail.com>
2024-11-11 16:26:17 +00:00
Matthieu MOREL bab92a7743
chore: enable usestdlibvars linter (#1680)
Signed-off-by: Matthieu MOREL <matthieu.morel35@gmail.com>
2024-11-11 15:19:33 +01:00
PrometheusBot 400ee29a10
Update common Prometheus files (#1679)
Signed-off-by: prombot <prometheus-team@googlegroups.com>
2024-11-11 15:09:29 +01:00
Lukas Vogel e7ef5df93d registry: drop duplicate checks
In processMetrics drop duplicate checks. If the metricFamily exists we
already have a type check with the call to checkMetricConsistency. The
help string is already checked in the checkDescConsistency albeit only
if pedantic mode is enabled. However this is probably what is desired
anyway.
2024-09-05 15:01:23 +02:00
14 changed files with 252 additions and 98 deletions

View File

@ -36,4 +36,4 @@ jobs:
uses: golangci/golangci-lint-action@971e284b6050e8a5849b72094c50ab08da042db8 # v6.1.1 uses: golangci/golangci-lint-action@971e284b6050e8a5849b72094c50ab08da042db8 # v6.1.1
with: with:
args: --verbose args: --verbose
version: v1.60.2 version: v1.61.0

View File

@ -31,6 +31,7 @@ linters:
- staticcheck - staticcheck
- unconvert - unconvert
- unused - unused
- usestdlibvars
- wastedassign - wastedassign
issues: issues:

View File

@ -61,7 +61,7 @@ PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_
SKIP_GOLANGCI_LINT := SKIP_GOLANGCI_LINT :=
GOLANGCI_LINT := GOLANGCI_LINT :=
GOLANGCI_LINT_OPTS ?= GOLANGCI_LINT_OPTS ?=
GOLANGCI_LINT_VERSION ?= v1.60.2 GOLANGCI_LINT_VERSION ?= v1.61.0
# golangci-lint only supports linux, darwin and windows platforms on i386/amd64/arm64. # golangci-lint only supports linux, darwin and windows platforms on i386/amd64/arm64.
# windows isn't included here because of the path separator being different. # windows isn't included here because of the path separator being different.
ifeq ($(GOHOSTOS),$(filter $(GOHOSTOS),linux darwin)) ifeq ($(GOHOSTOS),$(filter $(GOHOSTOS),linux darwin))

View File

@ -0,0 +1,50 @@
// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prometheus
import (
"math"
"sync/atomic"
"time"
)
// atomicUpdateFloat atomically updates the float64 value pointed to by bits
// using the provided updateFunc, with an exponential backoff on contention.
func atomicUpdateFloat(bits *uint64, updateFunc func(float64) float64) {
const (
// both numbers are derived from empirical observations
// documented in this PR: https://github.com/prometheus/client_golang/pull/1661
maxBackoff = 320 * time.Millisecond
initialBackoff = 10 * time.Millisecond
)
backoff := initialBackoff
for {
loadedBits := atomic.LoadUint64(bits)
oldFloat := math.Float64frombits(loadedBits)
newFloat := updateFunc(oldFloat)
newBits := math.Float64bits(newFloat)
if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) {
break
} else {
// Exponential backoff with sleep and cap to avoid infinite wait
time.Sleep(backoff)
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
}

View File

@ -0,0 +1,167 @@
// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prometheus
import (
"math"
"sync"
"sync/atomic"
"testing"
"unsafe"
)
var output float64
func TestAtomicUpdateFloat(t *testing.T) {
var val float64 = 0.0
bits := (*uint64)(unsafe.Pointer(&val))
var wg sync.WaitGroup
numGoroutines := 100000
increment := 1.0
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomicUpdateFloat(bits, func(f float64) float64 {
return f + increment
})
}()
}
wg.Wait()
expected := float64(numGoroutines) * increment
if val != expected {
t.Errorf("Expected %f, got %f", expected, val)
}
}
// Benchmark for atomicUpdateFloat with single goroutine (no contention).
func BenchmarkAtomicUpdateFloat_SingleGoroutine(b *testing.B) {
var val float64 = 0.0
bits := (*uint64)(unsafe.Pointer(&val))
for i := 0; i < b.N; i++ {
atomicUpdateFloat(bits, func(f float64) float64 {
return f + 1.0
})
}
output = val
}
// Benchmark for old implementation with single goroutine (no contention) -> to check overhead of backoff
func BenchmarkAtomicNoBackoff_SingleGoroutine(b *testing.B) {
var val float64 = 0.0
bits := (*uint64)(unsafe.Pointer(&val))
for i := 0; i < b.N; i++ {
for {
loadedBits := atomic.LoadUint64(bits)
newBits := math.Float64bits(math.Float64frombits(loadedBits) + 1.0)
if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) {
break
}
}
}
output = val
}
// Benchmark varying the number of goroutines.
func benchmarkAtomicUpdateFloatConcurrency(b *testing.B, numGoroutines int) {
var val float64 = 0.0
bits := (*uint64)(unsafe.Pointer(&val))
b.SetParallelism(numGoroutines)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
atomicUpdateFloat(bits, func(f float64) float64 {
return f + 1.0
})
}
})
output = val
}
func benchmarkAtomicNoBackoffFloatConcurrency(b *testing.B, numGoroutines int) {
var val float64 = 0.0
bits := (*uint64)(unsafe.Pointer(&val))
b.SetParallelism(numGoroutines)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for {
loadedBits := atomic.LoadUint64(bits)
newBits := math.Float64bits(math.Float64frombits(loadedBits) + 1.0)
if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) {
break
}
}
}
})
output = val
}
func BenchmarkAtomicUpdateFloat_1Goroutine(b *testing.B) {
benchmarkAtomicUpdateFloatConcurrency(b, 1)
}
func BenchmarkAtomicNoBackoff_1Goroutine(b *testing.B) {
benchmarkAtomicNoBackoffFloatConcurrency(b, 1)
}
func BenchmarkAtomicUpdateFloat_2Goroutines(b *testing.B) {
benchmarkAtomicUpdateFloatConcurrency(b, 2)
}
func BenchmarkAtomicNoBackoff_2Goroutines(b *testing.B) {
benchmarkAtomicNoBackoffFloatConcurrency(b, 2)
}
func BenchmarkAtomicUpdateFloat_4Goroutines(b *testing.B) {
benchmarkAtomicUpdateFloatConcurrency(b, 4)
}
func BenchmarkAtomicNoBackoff_4Goroutines(b *testing.B) {
benchmarkAtomicNoBackoffFloatConcurrency(b, 4)
}
func BenchmarkAtomicUpdateFloat_8Goroutines(b *testing.B) {
benchmarkAtomicUpdateFloatConcurrency(b, 8)
}
func BenchmarkAtomicNoBackoff_8Goroutines(b *testing.B) {
benchmarkAtomicNoBackoffFloatConcurrency(b, 8)
}
func BenchmarkAtomicUpdateFloat_16Goroutines(b *testing.B) {
benchmarkAtomicUpdateFloatConcurrency(b, 16)
}
func BenchmarkAtomicNoBackoff_16Goroutines(b *testing.B) {
benchmarkAtomicNoBackoffFloatConcurrency(b, 16)
}
func BenchmarkAtomicUpdateFloat_32Goroutines(b *testing.B) {
benchmarkAtomicUpdateFloatConcurrency(b, 32)
}
func BenchmarkAtomicNoBackoff_32Goroutines(b *testing.B) {
benchmarkAtomicNoBackoffFloatConcurrency(b, 32)
}

View File

@ -134,13 +134,9 @@ func (c *counter) Add(v float64) {
return return
} }
for { atomicUpdateFloat(&c.valBits, func(oldVal float64) float64 {
oldBits := atomic.LoadUint64(&c.valBits) return oldVal + v
newBits := math.Float64bits(math.Float64frombits(oldBits) + v) })
if atomic.CompareAndSwapUint64(&c.valBits, oldBits, newBits) {
return
}
}
} }
func (c *counter) AddWithExemplar(v float64, e Labels) { func (c *counter) AddWithExemplar(v float64, e Labels) {

View File

@ -120,13 +120,9 @@ func (g *gauge) Dec() {
} }
func (g *gauge) Add(val float64) { func (g *gauge) Add(val float64) {
for { atomicUpdateFloat(&g.valBits, func(oldVal float64) float64 {
oldBits := atomic.LoadUint64(&g.valBits) return oldVal + val
newBits := math.Float64bits(math.Float64frombits(oldBits) + val) })
if atomic.CompareAndSwapUint64(&g.valBits, oldBits, newBits) {
return
}
}
} }
func (g *gauge) Sub(val float64) { func (g *gauge) Sub(val float64) {

View File

@ -1641,13 +1641,9 @@ func waitForCooldown(count uint64, counts *histogramCounts) {
// atomicAddFloat adds the provided float atomically to another float // atomicAddFloat adds the provided float atomically to another float
// represented by the bit pattern the bits pointer is pointing to. // represented by the bit pattern the bits pointer is pointing to.
func atomicAddFloat(bits *uint64, v float64) { func atomicAddFloat(bits *uint64, v float64) {
for { atomicUpdateFloat(bits, func(oldVal float64) float64 {
loadedBits := atomic.LoadUint64(bits) return oldVal + v
newBits := math.Float64bits(math.Float64frombits(loadedBits) + v) })
if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) {
break
}
}
} }
// atomicDecUint32 atomically decrements the uint32 p points to. See // atomicDecUint32 atomically decrements the uint32 p points to. See

View File

@ -131,7 +131,7 @@ func TestHandlerErrorHandling(t *testing.T) {
logger := log.New(logBuf, "", 0) logger := log.New(logBuf, "", 0)
writer := httptest.NewRecorder() writer := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/", nil) request, _ := http.NewRequest(http.MethodGet, "/", nil)
request.Header.Add("Accept", "test/plain") request.Header.Add("Accept", "test/plain")
mReg := &mockTransactionGatherer{g: reg} mReg := &mockTransactionGatherer{g: reg}
@ -252,7 +252,7 @@ func TestInstrumentMetricHandler(t *testing.T) {
// Do it again to test idempotency. // Do it again to test idempotency.
InstrumentMetricHandler(reg, HandlerForTransactional(mReg, HandlerOpts{})) InstrumentMetricHandler(reg, HandlerForTransactional(mReg, HandlerOpts{}))
writer := httptest.NewRecorder() writer := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/", nil) request, _ := http.NewRequest(http.MethodGet, "/", nil)
request.Header.Add(acceptHeader, acceptTextPlain) request.Header.Add(acceptHeader, acceptTextPlain)
handler.ServeHTTP(writer, request) handler.ServeHTTP(writer, request)
@ -311,7 +311,7 @@ func TestHandlerMaxRequestsInFlight(t *testing.T) {
w1 := httptest.NewRecorder() w1 := httptest.NewRecorder()
w2 := httptest.NewRecorder() w2 := httptest.NewRecorder()
w3 := httptest.NewRecorder() w3 := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/", nil) request, _ := http.NewRequest(http.MethodGet, "/", nil)
request.Header.Add(acceptHeader, acceptTextPlain) request.Header.Add(acceptHeader, acceptTextPlain)
c := blockingCollector{Block: make(chan struct{}), CollectStarted: make(chan struct{}, 1)} c := blockingCollector{Block: make(chan struct{}), CollectStarted: make(chan struct{}, 1)}
@ -348,7 +348,7 @@ func TestHandlerTimeout(t *testing.T) {
handler := HandlerFor(reg, HandlerOpts{Timeout: time.Millisecond}) handler := HandlerFor(reg, HandlerOpts{Timeout: time.Millisecond})
w := httptest.NewRecorder() w := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/", nil) request, _ := http.NewRequest(http.MethodGet, "/", nil)
request.Header.Add("Accept", "test/plain") request.Header.Add("Accept", "test/plain")
c := blockingCollector{Block: make(chan struct{}), CollectStarted: make(chan struct{}, 1)} c := blockingCollector{Block: make(chan struct{}), CollectStarted: make(chan struct{}, 1)}
@ -372,7 +372,7 @@ func TestInstrumentMetricHandlerWithCompression(t *testing.T) {
handler := InstrumentMetricHandler(reg, HandlerForTransactional(mReg, HandlerOpts{DisableCompression: false})) handler := InstrumentMetricHandler(reg, HandlerForTransactional(mReg, HandlerOpts{DisableCompression: false}))
compression := Zstd compression := Zstd
writer := httptest.NewRecorder() writer := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/", nil) request, _ := http.NewRequest(http.MethodGet, "/", nil)
request.Header.Add(acceptHeader, acceptTextPlain) request.Header.Add(acceptHeader, acceptTextPlain)
request.Header.Add(acceptEncodingHeader, string(compression)) request.Header.Add(acceptEncodingHeader, string(compression))
@ -533,7 +533,7 @@ func TestNegotiateEncodingWriter(t *testing.T) {
} }
for _, test := range testCases { for _, test := range testCases {
request, _ := http.NewRequest("GET", "/", nil) request, _ := http.NewRequest(http.MethodGet, "/", nil)
request.Header.Add(acceptEncodingHeader, test.acceptEncoding) request.Header.Add(acceptEncodingHeader, test.acceptEncoding)
rr := httptest.NewRecorder() rr := httptest.NewRecorder()
_, encodingHeader, _, err := negotiateEncodingWriter(request, rr, test.offeredCompressions) _, encodingHeader, _, err := negotiateEncodingWriter(request, rr, test.offeredCompressions)
@ -631,7 +631,7 @@ func BenchmarkCompression(b *testing.B) {
b.Run(benchmark.name+"_"+size.name, func(b *testing.B) { b.Run(benchmark.name+"_"+size.name, func(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
writer := httptest.NewRecorder() writer := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/", nil) request, _ := http.NewRequest(http.MethodGet, "/", nil)
request.Header.Add(acceptEncodingHeader, benchmark.compressionType) request.Header.Add(acceptEncodingHeader, benchmark.compressionType)
handler.ServeHTTP(writer, request) handler.ServeHTTP(writer, request)
} }

View File

@ -223,7 +223,7 @@ func TestClientMiddlewareAPI_WithRequestContext(t *testing.T) {
})) }))
defer backend.Close() defer backend.Close()
req, err := http.NewRequest("GET", backend.URL, nil) req, err := http.NewRequest(http.MethodGet, backend.URL, nil)
if err != nil { if err != nil {
t.Fatalf("%v", err) t.Fatalf("%v", err)
} }
@ -276,7 +276,7 @@ func TestClientMiddlewareAPIWithRequestContextTimeout(t *testing.T) {
})) }))
defer backend.Close() defer backend.Close()
req, err := http.NewRequest("GET", backend.URL, nil) req, err := http.NewRequest(http.MethodGet, backend.URL, nil)
if err != nil { if err != nil {
t.Fatalf("%v", err) t.Fatalf("%v", err)
} }

View File

@ -418,7 +418,7 @@ func TestMiddlewareAPI(t *testing.T) {
_, _ = w.Write([]byte("OK")) _, _ = w.Write([]byte("OK"))
}) })
r, _ := http.NewRequest("GET", "www.example.com", nil) r, _ := http.NewRequest(http.MethodGet, "www.example.com", nil)
w := httptest.NewRecorder() w := httptest.NewRecorder()
chain.ServeHTTP(w, r) chain.ServeHTTP(w, r)
@ -432,7 +432,7 @@ func TestMiddlewareAPI_WithExemplars(t *testing.T) {
_, _ = w.Write([]byte("OK")) _, _ = w.Write([]byte("OK"))
}, WithExemplarFromContext(func(_ context.Context) prometheus.Labels { return exemplar })) }, WithExemplarFromContext(func(_ context.Context) prometheus.Labels { return exemplar }))
r, _ := http.NewRequest("GET", "www.example.com", nil) r, _ := http.NewRequest(http.MethodGet, "www.example.com", nil)
w := httptest.NewRecorder() w := httptest.NewRecorder()
chain.ServeHTTP(w, r) chain.ServeHTTP(w, r)

View File

@ -634,54 +634,7 @@ func processMetric(
return fmt.Errorf("error collecting metric %v: %w", desc, err) return fmt.Errorf("error collecting metric %v: %w", desc, err)
} }
metricFamily, ok := metricFamiliesByName[desc.fqName] metricFamily, ok := metricFamiliesByName[desc.fqName]
if ok { // Existing name. if !ok { // New name.
if metricFamily.GetHelp() != desc.help {
return fmt.Errorf(
"collected metric %s %s has help %q but should have %q",
desc.fqName, dtoMetric, desc.help, metricFamily.GetHelp(),
)
}
// TODO(beorn7): Simplify switch once Desc has type.
switch metricFamily.GetType() {
case dto.MetricType_COUNTER:
if dtoMetric.Counter == nil {
return fmt.Errorf(
"collected metric %s %s should be a Counter",
desc.fqName, dtoMetric,
)
}
case dto.MetricType_GAUGE:
if dtoMetric.Gauge == nil {
return fmt.Errorf(
"collected metric %s %s should be a Gauge",
desc.fqName, dtoMetric,
)
}
case dto.MetricType_SUMMARY:
if dtoMetric.Summary == nil {
return fmt.Errorf(
"collected metric %s %s should be a Summary",
desc.fqName, dtoMetric,
)
}
case dto.MetricType_UNTYPED:
if dtoMetric.Untyped == nil {
return fmt.Errorf(
"collected metric %s %s should be Untyped",
desc.fqName, dtoMetric,
)
}
case dto.MetricType_HISTOGRAM:
if dtoMetric.Histogram == nil {
return fmt.Errorf(
"collected metric %s %s should be a Histogram",
desc.fqName, dtoMetric,
)
}
default:
panic("encountered MetricFamily with invalid type")
}
} else { // New name.
metricFamily = &dto.MetricFamily{} metricFamily = &dto.MetricFamily{}
metricFamily.Name = proto.String(desc.fqName) metricFamily.Name = proto.String(desc.fqName)
metricFamily.Help = proto.String(desc.help) metricFamily.Help = proto.String(desc.help)

View File

@ -714,7 +714,7 @@ collected metric "broken_metric" { label:<name:"foo" value:"bar" > label:<name:"
} }
writer := httptest.NewRecorder() writer := httptest.NewRecorder()
handler := promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{}) handler := promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{})
request, _ := http.NewRequest("GET", "/", nil) request, _ := http.NewRequest(http.MethodGet, "/", nil)
for key, value := range scenario.headers { for key, value := range scenario.headers {
request.Header.Add(key, value) request.Header.Add(key, value)
} }

View File

@ -471,13 +471,9 @@ func (s *noObjectivesSummary) Observe(v float64) {
n := atomic.AddUint64(&s.countAndHotIdx, 1) n := atomic.AddUint64(&s.countAndHotIdx, 1)
hotCounts := s.counts[n>>63] hotCounts := s.counts[n>>63]
for { atomicUpdateFloat(&hotCounts.sumBits, func(oldVal float64) float64 {
oldBits := atomic.LoadUint64(&hotCounts.sumBits) return oldVal + v
newBits := math.Float64bits(math.Float64frombits(oldBits) + v) })
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
break
}
}
// Increment count last as we take it as a signal that the observation // Increment count last as we take it as a signal that the observation
// is complete. // is complete.
atomic.AddUint64(&hotCounts.count, 1) atomic.AddUint64(&hotCounts.count, 1)
@ -519,14 +515,13 @@ func (s *noObjectivesSummary) Write(out *dto.Metric) error {
// Finally add all the cold counts to the new hot counts and reset the cold counts. // Finally add all the cold counts to the new hot counts and reset the cold counts.
atomic.AddUint64(&hotCounts.count, count) atomic.AddUint64(&hotCounts.count, count)
atomic.StoreUint64(&coldCounts.count, 0) atomic.StoreUint64(&coldCounts.count, 0)
for {
oldBits := atomic.LoadUint64(&hotCounts.sumBits) // Use atomicUpdateFloat to update hotCounts.sumBits atomically.
newBits := math.Float64bits(math.Float64frombits(oldBits) + sum.GetSampleSum()) atomicUpdateFloat(&hotCounts.sumBits, func(oldVal float64) float64 {
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) { return oldVal + sum.GetSampleSum()
atomic.StoreUint64(&coldCounts.sumBits, 0) })
break atomic.StoreUint64(&coldCounts.sumBits, 0)
}
}
return nil return nil
} }