Encode sparse histograms in protobuf

Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
beorn7 2020-04-07 23:18:40 +02:00
parent c98db4eccd
commit abe540f8c0
3 changed files with 36 additions and 29 deletions

2
go.mod
View File

@ -7,7 +7,7 @@ require (
github.com/google/go-cmp v0.4.0 // indirect
github.com/json-iterator/go v1.1.9
github.com/kr/pretty v0.1.0 // indirect
github.com/prometheus/client_model v0.2.0
github.com/prometheus/client_model v0.2.1-0.20200406191659-4b803f3550a4
github.com/prometheus/common v0.9.1
github.com/prometheus/procfs v0.0.8
github.com/stretchr/testify v1.4.0 // indirect

2
go.sum
View File

@ -73,6 +73,8 @@ github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.1-0.20200406191659-4b803f3550a4 h1:7Ws+6l4/5eJPHAxe0Axwo4XJwSAA4i0ipEjuoLXWFyo=
github.com/prometheus/client_model v0.2.1-0.20200406191659-4b803f3550a4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.4.1 h1:K0MGApIoQvMw27RTdJkPbr3JZ7DNbtxQNyi5STVM6Kw=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.9.1 h1:KOMtN28tlbam3/7ZKEYKHhKoJZYYj3gMH4uc62x7X7U=

View File

@ -14,9 +14,7 @@
package prometheus
import (
"bytes"
"fmt"
"io"
"math"
"runtime"
"sort"
@ -215,7 +213,7 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
h := &histogram{
desc: desc,
upperBounds: opts.Buckets,
sparseResolution: opts.SparseBucketsResolution,
sparseResolution: uint32(opts.SparseBucketsResolution),
sparseThreshold: opts.SparseBucketsZeroThreshold,
labelPairs: makeLabelPairs(desc, labelValues),
counts: [2]*histogramCounts{{}, {}},
@ -355,7 +353,7 @@ type histogram struct {
upperBounds []float64
labelPairs []*dto.LabelPair
exemplars []atomic.Value // One more than buckets (to include +Inf), each a *dto.Exemplar.
sparseResolution uint8
sparseResolution uint32 // Instead of uint8 to be ready for protobuf encoding.
sparseThreshold float64
now func() time.Time // To mock out time.Now() for testing.
@ -403,6 +401,8 @@ func (h *histogram) Write(out *dto.Metric) error {
Bucket: make([]*dto.Bucket, len(h.upperBounds)),
SampleCount: proto.Uint64(count),
SampleSum: proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sumBits))),
SbResolution: &h.sparseResolution,
SbZeroThreshold: &h.sparseThreshold,
}
out.Histogram = his
out.Label = h.labelPairs
@ -452,38 +452,43 @@ func (h *histogram) Write(out *dto.Metric) error {
coldCounts.sparseBucketsNegative.Range(addAndReset(&hotCounts.sparseBucketsNegative))
}()
var buf bytes.Buffer
// TODO(beorn7): encode zero bucket threshold and count.
fmt.Println("Zero bucket:", zeroBucket) // DEBUG
fmt.Println("Positive buckets:") // DEBUG
if _, err := encodeSparseBuckets(&buf, &coldCounts.sparseBucketsPositive, zeroBucket); err != nil {
return err
}
fmt.Println("Negative buckets:") // DEBUG
if _, err := encodeSparseBuckets(&buf, &coldCounts.sparseBucketsNegative, zeroBucket); err != nil {
return err
}
his.SbZeroCount = proto.Uint64(zeroBucket)
his.SbNegative = makeSparseBuckets(&coldCounts.sparseBucketsNegative)
his.SbPositive = makeSparseBuckets(&coldCounts.sparseBucketsPositive)
}
return nil
}
func encodeSparseBuckets(w io.Writer, buckets *sync.Map, zeroBucket uint64) (n int, err error) {
// TODO(beorn7): Add actual encoding of spare buckets.
func makeSparseBuckets(buckets *sync.Map) *dto.SparseBuckets {
var ii []int
buckets.Range(func(k, v interface{}) bool {
ii = append(ii, k.(int))
return true
})
sort.Ints(ii)
fmt.Println(len(ii), "buckets")
var prev uint64
for _, i := range ii {
v, _ := buckets.Load(i)
current := atomic.LoadUint64(v.(*uint64))
fmt.Printf("- %d: %d Δ=%d\n", i, current, int(current)-int(prev))
prev = current
if len(ii) == 0 {
return nil
}
return 0, nil
sbs := dto.SparseBuckets{}
var prevCount uint64
var prevI int
for n, i := range ii {
v, _ := buckets.Load(i)
count := atomic.LoadUint64(v.(*uint64))
if n == 0 || i-prevI != 1 {
sbs.Span = append(sbs.Span, &dto.SparseBuckets_Span{
Offset: proto.Int(i - prevI),
Length: proto.Uint32(1),
})
} else {
*sbs.Span[len(sbs.Span)-1].Length++
}
sbs.Delta = append(sbs.Delta, int64(count)-int64(prevCount)) // TODO(beorn7): Do proper overflow handling.
prevI, prevCount = i, count
}
return &sbs
}
// addAndReset returns a function to be used with sync.Map.Range of spare