From 6b9530d72ea715f0ba612c0120e6e09fbf1d49d0 Mon Sep 17 00:00:00 2001 From: Bjoern Rabenstein Date: Tue, 20 Jan 2015 18:27:10 +0100 Subject: [PATCH] Update vendoring of perks to newest (fixed) version. Adjust the API and usage accordingly. Make tests stricter. Since the merging is still faulty, test are broken now. The next commit will fix it by avoiding merging. --- _vendor/perks/MANIFEST | 2 +- _vendor/perks/histogram/bench_test.go | 26 +++ _vendor/perks/histogram/histogram.go | 108 +++++++++++ _vendor/perks/histogram/histogram_test.go | 38 ++++ _vendor/perks/quantile/bench_test.go | 15 +- _vendor/perks/quantile/stream.go | 92 ++++++---- _vendor/perks/quantile/stream_test.go | 208 ++++++++++++++-------- _vendor/perks/topk/topk.go | 90 ++++++++++ _vendor/perks/topk/topk_test.go | 57 ++++++ prometheus/summary.go | 36 ++-- prometheus/summary_test.go | 50 +++--- 11 files changed, 563 insertions(+), 159 deletions(-) create mode 100644 _vendor/perks/histogram/bench_test.go create mode 100644 _vendor/perks/histogram/histogram.go create mode 100644 _vendor/perks/histogram/histogram_test.go create mode 100644 _vendor/perks/topk/topk.go create mode 100644 _vendor/perks/topk/topk_test.go diff --git a/_vendor/perks/MANIFEST b/_vendor/perks/MANIFEST index 7110286..1fb6e82 100644 --- a/_vendor/perks/MANIFEST +++ b/_vendor/perks/MANIFEST @@ -1 +1 @@ -Imported at 5d903d2c5dc7f55829e36c62ae6c5f5f6d75e70a from https://github.com/u-c-l/perks . +Imported at f15ca8fc2964cb9f291e1cf17bb1bf9a4f9e23d5 from https://github.com/beorn7/perks . diff --git a/_vendor/perks/histogram/bench_test.go b/_vendor/perks/histogram/bench_test.go new file mode 100644 index 0000000..56c7e55 --- /dev/null +++ b/_vendor/perks/histogram/bench_test.go @@ -0,0 +1,26 @@ +package histogram + +import ( + "math/rand" + "testing" +) + +func BenchmarkInsert10Bins(b *testing.B) { + b.StopTimer() + h := New(10) + b.StartTimer() + for i := 0; i < b.N; i++ { + f := rand.ExpFloat64() + h.Insert(f) + } +} + +func BenchmarkInsert100Bins(b *testing.B) { + b.StopTimer() + h := New(100) + b.StartTimer() + for i := 0; i < b.N; i++ { + f := rand.ExpFloat64() + h.Insert(f) + } +} diff --git a/_vendor/perks/histogram/histogram.go b/_vendor/perks/histogram/histogram.go new file mode 100644 index 0000000..bef05c7 --- /dev/null +++ b/_vendor/perks/histogram/histogram.go @@ -0,0 +1,108 @@ +// Package histogram provides a Go implementation of BigML's histogram package +// for Clojure/Java. It is currently experimental. +package histogram + +import ( + "container/heap" + "math" + "sort" +) + +type Bin struct { + Count int + Sum float64 +} + +func (b *Bin) Update(x *Bin) { + b.Count += x.Count + b.Sum += x.Sum +} + +func (b *Bin) Mean() float64 { + return b.Sum / float64(b.Count) +} + +type Bins []*Bin + +func (bs Bins) Len() int { return len(bs) } +func (bs Bins) Less(i, j int) bool { return bs[i].Mean() < bs[j].Mean() } +func (bs Bins) Swap(i, j int) { bs[i], bs[j] = bs[j], bs[i] } + +func (bs *Bins) Push(x interface{}) { + *bs = append(*bs, x.(*Bin)) +} + +func (bs *Bins) Pop() interface{} { + return bs.remove(len(*bs) - 1) +} + +func (bs *Bins) remove(n int) *Bin { + if n < 0 || len(*bs) < n { + return nil + } + x := (*bs)[n] + *bs = append((*bs)[:n], (*bs)[n+1:]...) + return x +} + +type Histogram struct { + res *reservoir +} + +func New(maxBins int) *Histogram { + return &Histogram{res: newReservoir(maxBins)} +} + +func (h *Histogram) Insert(f float64) { + h.res.insert(&Bin{1, f}) + h.res.compress() +} + +func (h *Histogram) Bins() Bins { + return h.res.bins +} + +type reservoir struct { + n int + maxBins int + bins Bins +} + +func newReservoir(maxBins int) *reservoir { + return &reservoir{maxBins: maxBins} +} + +func (r *reservoir) insert(bin *Bin) { + r.n += bin.Count + i := sort.Search(len(r.bins), func(i int) bool { + return r.bins[i].Mean() >= bin.Mean() + }) + if i < 0 || i == r.bins.Len() { + // TODO(blake): Maybe use an .insert(i, bin) instead of + // performing the extra work of a heap.Push. + heap.Push(&r.bins, bin) + return + } + r.bins[i].Update(bin) +} + +func (r *reservoir) compress() { + for r.bins.Len() > r.maxBins { + minGapIndex := -1 + minGap := math.MaxFloat64 + for i := 0; i < r.bins.Len()-1; i++ { + gap := gapWeight(r.bins[i], r.bins[i+1]) + if minGap > gap { + minGap = gap + minGapIndex = i + } + } + prev := r.bins[minGapIndex] + next := r.bins.remove(minGapIndex + 1) + prev.Update(next) + } +} + +func gapWeight(prev, next *Bin) float64 { + return next.Mean() - prev.Mean() +} diff --git a/_vendor/perks/histogram/histogram_test.go b/_vendor/perks/histogram/histogram_test.go new file mode 100644 index 0000000..0575ebe --- /dev/null +++ b/_vendor/perks/histogram/histogram_test.go @@ -0,0 +1,38 @@ +package histogram + +import ( + "math/rand" + "testing" +) + +func TestHistogram(t *testing.T) { + const numPoints = 1e6 + const maxBins = 3 + + h := New(maxBins) + for i := 0; i < numPoints; i++ { + f := rand.ExpFloat64() + h.Insert(f) + } + + bins := h.Bins() + if g := len(bins); g > maxBins { + t.Fatalf("got %d bins, wanted <= %d", g, maxBins) + } + + for _, b := range bins { + t.Logf("%+v", b) + } + + if g := count(h.Bins()); g != numPoints { + t.Fatalf("binned %d points, wanted %d", g, numPoints) + } +} + +func count(bins Bins) int { + binCounts := 0 + for _, b := range bins { + binCounts += b.Count + } + return binCounts +} diff --git a/_vendor/perks/quantile/bench_test.go b/_vendor/perks/quantile/bench_test.go index cfb8b32..0bd0e4e 100644 --- a/_vendor/perks/quantile/bench_test.go +++ b/_vendor/perks/quantile/bench_test.go @@ -7,7 +7,7 @@ import ( func BenchmarkInsertTargeted(b *testing.B) { b.ReportAllocs() - s := NewTargeted(0.01, 0.5, 0.9, 0.99) + s := NewTargeted(Targets) b.ResetTimer() for i := float64(0); i < float64(b.N); i++ { s.Insert(i) @@ -15,8 +15,7 @@ func BenchmarkInsertTargeted(b *testing.B) { } func BenchmarkInsertTargetedSmallEpsilon(b *testing.B) { - s := NewTargeted(0.01, 0.5, 0.9, 0.99) - s.SetEpsilon(0.0001) + s := NewTargeted(TargetsSmallEpsilon) b.ResetTimer() for i := float64(0); i < float64(b.N); i++ { s.Insert(i) @@ -24,7 +23,7 @@ func BenchmarkInsertTargetedSmallEpsilon(b *testing.B) { } func BenchmarkInsertBiased(b *testing.B) { - s := NewBiased() + s := NewLowBiased(0.01) b.ResetTimer() for i := float64(0); i < float64(b.N); i++ { s.Insert(i) @@ -32,8 +31,7 @@ func BenchmarkInsertBiased(b *testing.B) { } func BenchmarkInsertBiasedSmallEpsilon(b *testing.B) { - s := NewBiased() - s.SetEpsilon(0.0001) + s := NewLowBiased(0.0001) b.ResetTimer() for i := float64(0); i < float64(b.N); i++ { s.Insert(i) @@ -41,7 +39,7 @@ func BenchmarkInsertBiasedSmallEpsilon(b *testing.B) { } func BenchmarkQuery(b *testing.B) { - s := NewTargeted(0.01, 0.5, 0.9, 0.99) + s := NewTargeted(Targets) for i := float64(0); i < 1e6; i++ { s.Insert(i) } @@ -53,8 +51,7 @@ func BenchmarkQuery(b *testing.B) { } func BenchmarkQuerySmallEpsilon(b *testing.B) { - s := NewTargeted(0.01, 0.5, 0.9, 0.99) - s.SetEpsilon(0.0001) + s := NewTargeted(TargetsSmallEpsilon) for i := float64(0); i < 1e6; i++ { s.Insert(i) } diff --git a/_vendor/perks/quantile/stream.go b/_vendor/perks/quantile/stream.go index 566d0fb..587b1fc 100644 --- a/_vendor/perks/quantile/stream.go +++ b/_vendor/perks/quantile/stream.go @@ -36,30 +36,56 @@ func (a Samples) Swap(i, j int) { a[i], a[j] = a[j], a[i] } type invariant func(s *stream, r float64) float64 -// NewBiased returns an initialized Stream for high-biased quantiles (e.g. -// 50th, 90th, 99th) not known a priori with finer error guarantees for the -// higher ranks of the data distribution. -// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties. -func NewBiased() *Stream { +// NewLowBiased returns an initialized Stream for low-biased quantiles +// (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but +// error guarantees can still be given even for the lower ranks of the data +// distribution. +// +// The provided epsilon is a relative error, i.e. the true quantile of a value +// returned by a query is guaranteed to be within (1±Epsilon)*Quantile. +// +// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error +// properties. +func NewLowBiased(epsilon float64) *Stream { ƒ := func(s *stream, r float64) float64 { - return 2 * s.epsilon * r + return 2 * epsilon * r + } + return newStream(ƒ) +} + +// NewHighBiased returns an initialized Stream for high-biased quantiles +// (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but +// error guarantees can still be given even for the higher ranks of the data +// distribution. +// +// The provided epsilon is a relative error, i.e. the true quantile of a value +// returned by a query is guaranteed to be within 1-(1±Epsilon)*(1-Quantile). +// +// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error +// properties. +func NewHighBiased(epsilon float64) *Stream { + ƒ := func(s *stream, r float64) float64 { + return 2 * epsilon * (s.n - r) } return newStream(ƒ) } // NewTargeted returns an initialized Stream concerned with a particular set of // quantile values that are supplied a priori. Knowing these a priori reduces -// space and computation time. +// space and computation time. The targets map maps the desired quantiles to +// their absolute errors, i.e. the true quantile of a value returned by a query +// is guaranteed to be within (Quantile±Epsilon). +// // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties. -func NewTargeted(quantiles ...float64) *Stream { +func NewTargeted(targets map[float64]float64) *Stream { ƒ := func(s *stream, r float64) float64 { - var m float64 = math.MaxFloat64 + var m = math.MaxFloat64 var f float64 - for _, q := range quantiles { - if q*s.n <= r { - f = (2 * s.epsilon * r) / q + for quantile, epsilon := range targets { + if quantile*s.n <= r { + f = (2 * epsilon * r) / quantile } else { - f = (2 * s.epsilon * (s.n - r)) / (1 - q) + f = (2 * epsilon * (s.n - r)) / (1 - quantile) } if f < m { m = f @@ -79,8 +105,7 @@ type Stream struct { } func newStream(ƒ invariant) *Stream { - const defaultEpsilon = 0.01 - x := &stream{epsilon: defaultEpsilon, ƒ: ƒ} + x := &stream{ƒ: ƒ} return &Stream{x, make(Samples, 0, 500), true} } @@ -94,7 +119,6 @@ func (s *Stream) insert(sample Sample) { s.sorted = false if len(s.b) == cap(s.b) { s.flush() - s.compress() } } @@ -122,6 +146,9 @@ func (s *Stream) Query(q float64) float64 { // Merge merges samples into the underlying streams samples. This is handy when // merging multiple streams from separate threads, database shards, etc. +// +// ATTENTION: This method is broken and does not yield correct results. The +// underlying algorithm is not capable of merging streams correctly. func (s *Stream) Merge(samples Samples) { sort.Sort(samples) s.stream.merge(samples) @@ -139,7 +166,6 @@ func (s *Stream) Samples() Samples { return s.b } s.flush() - s.compress() return s.stream.samples() } @@ -167,18 +193,9 @@ func (s *Stream) flushed() bool { } type stream struct { - epsilon float64 - n float64 - l []Sample - ƒ invariant -} - -// SetEpsilon sets the error epsilon for the Stream. The default epsilon is -// 0.01 and is usually satisfactory. If needed, this must be called before all -// Inserts. -// To learn more, see: http://www.cs.rutgers.edu/~muthu/bquant.pdf -func (s *stream) SetEpsilon(epsilon float64) { - s.epsilon = epsilon + n float64 + l []Sample + ƒ invariant } func (s *stream) reset() { @@ -191,6 +208,10 @@ func (s *stream) insert(v float64) { } func (s *stream) merge(samples Samples) { + // TODO(beorn7): This tries to merge not only individual samples, but + // whole summaries. The paper doesn't mention merging summaries at + // all. Unittests show that the merging is inaccurate. Find out how to + // do merges properly. var r float64 i := 0 for _, sample := range samples { @@ -200,7 +221,12 @@ func (s *stream) merge(samples Samples) { // Insert at position i. s.l = append(s.l, Sample{}) copy(s.l[i+1:], s.l[i:]) - s.l[i] = Sample{sample.Value, sample.Width, math.Floor(s.ƒ(s, r)) - 1} + s.l[i] = Sample{ + sample.Value, + sample.Width, + math.Max(sample.Delta, math.Floor(s.ƒ(s, r))-1), + // TODO(beorn7): How to calculate delta correctly? + } i++ goto inserted } @@ -210,7 +236,9 @@ func (s *stream) merge(samples Samples) { i++ inserted: s.n += sample.Width + r += sample.Width } + s.compress() } func (s *stream) count() int { @@ -221,12 +249,12 @@ func (s *stream) query(q float64) float64 { t := math.Ceil(q * s.n) t += math.Ceil(s.ƒ(s, t) / 2) p := s.l[0] - r := float64(0) + var r float64 for _, c := range s.l[1:] { + r += p.Width if r+c.Width+c.Delta > t { return p.Value } - r += p.Width p = c } return p.Value diff --git a/_vendor/perks/quantile/stream_test.go b/_vendor/perks/quantile/stream_test.go index 1b1b4ed..707b871 100644 --- a/_vendor/perks/quantile/stream_test.go +++ b/_vendor/perks/quantile/stream_test.go @@ -1,81 +1,150 @@ package quantile import ( + "math" "math/rand" "sort" "testing" ) -func TestQuantRandQuery(t *testing.T) { - s := NewTargeted(0.5, 0.90, 0.99) - a := make([]float64, 0, 1e5) - rand.Seed(42) +var ( + Targets = map[float64]float64{ + 0.01: 0.001, + 0.10: 0.01, + 0.50: 0.05, + 0.90: 0.01, + 0.99: 0.001, + } + TargetsSmallEpsilon = map[float64]float64{ + 0.01: 0.0001, + 0.10: 0.001, + 0.50: 0.005, + 0.90: 0.001, + 0.99: 0.0001, + } + LowQuantiles = []float64{0.01, 0.1, 0.5} + HighQuantiles = []float64{0.99, 0.9, 0.5} +) + +const RelativeEpsilon = 0.01 + +func verifyPercsWithAbsoluteEpsilon(t *testing.T, a []float64, s *Stream) { + sort.Float64s(a) + for quantile, epsilon := range Targets { + n := float64(len(a)) + k := int(quantile * n) + lower := int((quantile - epsilon) * n) + if lower < 1 { + lower = 1 + } + upper := int(math.Ceil((quantile + epsilon) * n)) + if upper > len(a) { + upper = len(a) + } + w, min, max := a[k-1], a[lower-1], a[upper-1] + if g := s.Query(quantile); g < min || g > max { + t.Errorf("q=%f: want %v [%f,%f], got %v", quantile, w, min, max, g) + } + } +} + +func verifyLowPercsWithRelativeEpsilon(t *testing.T, a []float64, s *Stream) { + sort.Float64s(a) + for _, qu := range LowQuantiles { + n := float64(len(a)) + k := int(qu * n) + + lowerRank := int((1 - RelativeEpsilon) * qu * n) + upperRank := int(math.Ceil((1 + RelativeEpsilon) * qu * n)) + w, min, max := a[k-1], a[lowerRank-1], a[upperRank-1] + if g := s.Query(qu); g < min || g > max { + t.Errorf("q=%f: want %v [%f,%f], got %v", qu, w, min, max, g) + } + } +} + +func verifyHighPercsWithRelativeEpsilon(t *testing.T, a []float64, s *Stream) { + sort.Float64s(a) + for _, qu := range HighQuantiles { + n := float64(len(a)) + k := int(qu * n) + + lowerRank := int((1 - (1+RelativeEpsilon)*(1-qu)) * n) + upperRank := int(math.Ceil((1 - (1-RelativeEpsilon)*(1-qu)) * n)) + w, min, max := a[k-1], a[lowerRank-1], a[upperRank-1] + if g := s.Query(qu); g < min || g > max { + t.Errorf("q=%f: want %v [%f,%f], got %v", qu, w, min, max, g) + } + } +} + +func populateStream(s *Stream) []float64 { + a := make([]float64, 0, 1e5+100) for i := 0; i < cap(a); i++ { v := rand.NormFloat64() + // Add 5% asymmetric outliers. + if i%20 == 0 { + v = v*v + 1 + } s.Insert(v) a = append(a, v) } - t.Logf("len: %d", s.Count()) - sort.Float64s(a) - w, min, max := getPerc(a, 0.50) - if g := s.Query(0.50); g < min || g > max { - t.Errorf("perc50: want %v [%f,%f], got %v", w, min, max, g) - } - w, min, max = getPerc(a, 0.90) - if g := s.Query(0.90); g < min || g > max { - t.Errorf("perc90: want %v [%f,%f], got %v", w, min, max, g) - } - w, min, max = getPerc(a, 0.99) - if g := s.Query(0.99); g < min || g > max { - t.Errorf("perc99: want %v [%f,%f], got %v", w, min, max, g) - } + return a } -func TestQuantRandMergeQuery(t *testing.T) { - ch := make(chan float64) - done := make(chan *Stream) - for i := 0; i < 2; i++ { - go func() { - s := NewTargeted(0.5, 0.90, 0.99) - for v := range ch { - s.Insert(v) - } - done <- s - }() - } - +func TestTargetedQuery(t *testing.T) { rand.Seed(42) - a := make([]float64, 0, 1e6) - for i := 0; i < cap(a); i++ { - v := rand.NormFloat64() - a = append(a, v) - ch <- v - } - close(ch) + s := NewTargeted(Targets) + a := populateStream(s) + verifyPercsWithAbsoluteEpsilon(t, a, s) +} - s := <-done - o := <-done - s.Merge(o.Samples()) +func TestLowBiasedQuery(t *testing.T) { + rand.Seed(42) + s := NewLowBiased(RelativeEpsilon) + a := populateStream(s) + verifyLowPercsWithRelativeEpsilon(t, a, s) +} - t.Logf("len: %d", s.Count()) - sort.Float64s(a) - w, min, max := getPerc(a, 0.50) - if g := s.Query(0.50); g < min || g > max { - t.Errorf("perc50: want %v [%f,%f], got %v", w, min, max, g) - } - w, min, max = getPerc(a, 0.90) - if g := s.Query(0.90); g < min || g > max { - t.Errorf("perc90: want %v [%f,%f], got %v", w, min, max, g) - } - w, min, max = getPerc(a, 0.99) - if g := s.Query(0.99); g < min || g > max { - t.Errorf("perc99: want %v [%f,%f], got %v", w, min, max, g) - } +func TestHighBiasedQuery(t *testing.T) { + rand.Seed(42) + s := NewHighBiased(RelativeEpsilon) + a := populateStream(s) + verifyHighPercsWithRelativeEpsilon(t, a, s) +} + +func TestTargetedMerge(t *testing.T) { + rand.Seed(42) + s1 := NewTargeted(Targets) + s2 := NewTargeted(Targets) + a := populateStream(s1) + a = append(a, populateStream(s2)...) + s1.Merge(s2.Samples()) + verifyPercsWithAbsoluteEpsilon(t, a, s1) +} + +func TestLowBiasedMerge(t *testing.T) { + rand.Seed(42) + s1 := NewLowBiased(RelativeEpsilon) + s2 := NewLowBiased(RelativeEpsilon) + a := populateStream(s1) + a = append(a, populateStream(s2)...) + s1.Merge(s2.Samples()) + verifyLowPercsWithRelativeEpsilon(t, a, s2) +} + +func TestHighBiasedMerge(t *testing.T) { + rand.Seed(42) + s1 := NewHighBiased(RelativeEpsilon) + s2 := NewHighBiased(RelativeEpsilon) + a := populateStream(s1) + a = append(a, populateStream(s2)...) + s1.Merge(s2.Samples()) + verifyHighPercsWithRelativeEpsilon(t, a, s2) } func TestUncompressed(t *testing.T) { - tests := []float64{0.50, 0.90, 0.95, 0.99} - q := NewTargeted(tests...) + q := NewTargeted(Targets) for i := 100; i > 0; i-- { q.Insert(float64(i)) } @@ -83,16 +152,16 @@ func TestUncompressed(t *testing.T) { t.Errorf("want count 100, got %d", g) } // Before compression, Query should have 100% accuracy. - for _, v := range tests { - w := v * 100 - if g := q.Query(v); g != w { + for quantile := range Targets { + w := quantile * 100 + if g := q.Query(quantile); g != w { t.Errorf("want %f, got %f", w, g) } } } func TestUncompressedSamples(t *testing.T) { - q := NewTargeted(0.99) + q := NewTargeted(map[float64]float64{0.99: 0.001}) for i := 1; i <= 100; i++ { q.Insert(float64(i)) } @@ -102,7 +171,7 @@ func TestUncompressedSamples(t *testing.T) { } func TestUncompressedOne(t *testing.T) { - q := NewTargeted(0.90) + q := NewTargeted(map[float64]float64{0.99: 0.01}) q.Insert(3.14) if g := q.Query(0.90); g != 3.14 { t.Error("want PI, got", g) @@ -110,20 +179,7 @@ func TestUncompressedOne(t *testing.T) { } func TestDefaults(t *testing.T) { - if g := NewTargeted(0.99).Query(0.99); g != 0 { + if g := NewTargeted(map[float64]float64{0.99: 0.001}).Query(0.99); g != 0 { t.Errorf("want 0, got %f", g) } } - -func getPerc(x []float64, p float64) (want, min, max float64) { - k := int(float64(len(x)) * p) - lower := int(float64(len(x)) * (p - 0.04)) - if lower < 0 { - lower = 0 - } - upper := int(float64(len(x))*(p+0.04)) + 1 - if upper >= len(x) { - upper = len(x) - 1 - } - return x[k], x[lower], x[upper] -} diff --git a/_vendor/perks/topk/topk.go b/_vendor/perks/topk/topk.go new file mode 100644 index 0000000..5ac3d99 --- /dev/null +++ b/_vendor/perks/topk/topk.go @@ -0,0 +1,90 @@ +package topk + +import ( + "sort" +) + +// http://www.cs.ucsb.edu/research/tech_reports/reports/2005-23.pdf + +type Element struct { + Value string + Count int +} + +type Samples []*Element + +func (sm Samples) Len() int { + return len(sm) +} + +func (sm Samples) Less(i, j int) bool { + return sm[i].Count < sm[j].Count +} + +func (sm Samples) Swap(i, j int) { + sm[i], sm[j] = sm[j], sm[i] +} + +type Stream struct { + k int + mon map[string]*Element + + // the minimum Element + min *Element +} + +func New(k int) *Stream { + s := new(Stream) + s.k = k + s.mon = make(map[string]*Element) + s.min = &Element{} + + // Track k+1 so that less frequenet items contended for that spot, + // resulting in k being more accurate. + return s +} + +func (s *Stream) Insert(x string) { + s.insert(&Element{x, 1}) +} + +func (s *Stream) Merge(sm Samples) { + for _, e := range sm { + s.insert(e) + } +} + +func (s *Stream) insert(in *Element) { + e := s.mon[in.Value] + if e != nil { + e.Count++ + } else { + if len(s.mon) < s.k+1 { + e = &Element{in.Value, in.Count} + s.mon[in.Value] = e + } else { + e = s.min + delete(s.mon, e.Value) + e.Value = in.Value + e.Count += in.Count + s.min = e + } + } + if e.Count < s.min.Count { + s.min = e + } +} + +func (s *Stream) Query() Samples { + var sm Samples + for _, e := range s.mon { + sm = append(sm, e) + } + sort.Sort(sort.Reverse(sm)) + + if len(sm) < s.k { + return sm + } + + return sm[:s.k] +} diff --git a/_vendor/perks/topk/topk_test.go b/_vendor/perks/topk/topk_test.go new file mode 100644 index 0000000..c24f0f7 --- /dev/null +++ b/_vendor/perks/topk/topk_test.go @@ -0,0 +1,57 @@ +package topk + +import ( + "fmt" + "math/rand" + "sort" + "testing" +) + +func TestTopK(t *testing.T) { + stream := New(10) + ss := []*Stream{New(10), New(10), New(10)} + m := make(map[string]int) + for _, s := range ss { + for i := 0; i < 1e6; i++ { + v := fmt.Sprintf("%x", int8(rand.ExpFloat64())) + s.Insert(v) + m[v]++ + } + stream.Merge(s.Query()) + } + + var sm Samples + for x, s := range m { + sm = append(sm, &Element{x, s}) + } + sort.Sort(sort.Reverse(sm)) + + g := stream.Query() + if len(g) != 10 { + t.Fatalf("got %d, want 10", len(g)) + } + for i, e := range g { + if sm[i].Value != e.Value { + t.Errorf("at %d: want %q, got %q", i, sm[i].Value, e.Value) + } + } +} + +func TestQuery(t *testing.T) { + queryTests := []struct { + value string + expected int + }{ + {"a", 1}, + {"b", 2}, + {"c", 2}, + } + + stream := New(2) + for _, tt := range queryTests { + stream.Insert(tt.value) + if n := len(stream.Query()); n != tt.expected { + t.Errorf("want %d, got %d", tt.expected, n) + } + } +} diff --git a/prometheus/summary.go b/prometheus/summary.go index afb9bca..1e7a6ac 100644 --- a/prometheus/summary.go +++ b/prometheus/summary.go @@ -46,7 +46,7 @@ type Summary interface { // DefObjectives are the default Summary quantile values. var ( - DefObjectives = []float64{0.5, 0.9, 0.99} + DefObjectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001} ) // Default values for SummaryOpts. @@ -59,8 +59,6 @@ const ( DefAgeBuckets = 10 // DefBufCap is the standard buffer size for collecting Summary observations. DefBufCap = 500 - // DefEpsilon is the default error epsilon for the quantile rank estimates. - DefEpsilon = 0.001 ) // SummaryOpts bundles the options for creating a Summary metric. It is @@ -101,9 +99,9 @@ type SummaryOpts struct { // metric name). ConstLabels Labels - // Objectives defines the quantile rank estimates. The default value is - // DefObjectives. - Objectives []float64 + // Objectives defines the quantile rank estimates with their respective + // absolute error. The default value is DefObjectives. + Objectives map[float64]float64 // MaxAge defines the duration for which an observation stays relevant // for the summary. Must be positive. The default value is DefMaxAge. @@ -164,18 +162,11 @@ func newSummary(desc *Desc, opts SummaryOpts, labelValues ...string) Summary { opts.BufCap = DefBufCap } - if opts.Epsilon < 0 { - panic(fmt.Errorf("illegal value for Epsilon=%f", opts.Epsilon)) - } - if opts.Epsilon == 0. { - opts.Epsilon = DefEpsilon - } - s := &summary{ desc: desc, - objectives: opts.Objectives, - epsilon: opts.Epsilon, + objectives: opts.Objectives, + sortedObjectives: make([]float64, 0, len(opts.Objectives)), labelPairs: makeLabelPairs(desc, labelValues), @@ -193,6 +184,11 @@ func newSummary(desc *Desc, opts SummaryOpts, labelValues ...string) Summary { } s.headStream = s.streams[0] + for qu := range DefObjectives { + s.sortedObjectives = append(s.sortedObjectives, qu) + } + sort.Float64s(s.sortedObjectives) + s.Init(s) // Init self-collection. return s } @@ -206,8 +202,8 @@ type summary struct { desc *Desc - objectives []float64 - epsilon float64 + objectives map[float64]float64 + sortedObjectives []float64 labelPairs []*dto.LabelPair @@ -260,7 +256,7 @@ func (s *summary) Write(out *dto.Metric) error { sum.SampleCount = proto.Uint64(s.cnt) sum.SampleSum = proto.Float64(s.sum) - for _, rank := range s.objectives { + for _, rank := range s.sortedObjectives { qs = append(qs, &dto.Quantile{ Quantile: proto.Float64(rank), Value: proto.Float64(s.mergedAllStreams.Query(rank)), @@ -281,9 +277,7 @@ func (s *summary) Write(out *dto.Metric) error { } func (s *summary) newStream() *quantile.Stream { - stream := quantile.NewTargeted(s.objectives...) - stream.SetEpsilon(s.epsilon) - return stream + return quantile.NewTargeted(s.objectives) } // asyncFlush needs bufMtx locked. diff --git a/prometheus/summary_test.go b/prometheus/summary_test.go index 704fec5..844b3ac 100644 --- a/prometheus/summary_test.go +++ b/prometheus/summary_test.go @@ -126,16 +126,14 @@ func TestSummaryConcurrency(t *testing.T) { mutations := int(n%10000 + 1) concLevel := int(n%15 + 1) total := mutations * concLevel - ε := 0.001 var start, end sync.WaitGroup start.Add(1) end.Add(concLevel) sum := NewSummary(SummaryOpts{ - Name: "test_summary", - Help: "helpless", - Epsilon: ε, + Name: "test_summary", + Help: "helpless", }) allVars := make([]float64, total) @@ -170,14 +168,21 @@ func TestSummaryConcurrency(t *testing.T) { t.Errorf("got sample sum %f, want %f", got, want) } - for i, wantQ := range DefObjectives { + objectives := make([]float64, 0, len(DefObjectives)) + for qu := range DefObjectives { + objectives = append(objectives, qu) + } + sort.Float64s(objectives) + + for i, wantQ := range objectives { + ε := DefObjectives[wantQ] gotQ := *m.Summary.Quantile[i].Quantile gotV := *m.Summary.Quantile[i].Value min, max := getBounds(allVars, wantQ, ε) if gotQ != wantQ { t.Errorf("got quantile %f, want %f", gotQ, wantQ) } - if (gotV < min || gotV > max) && len(allVars) > 500 { // Avoid statistical outliers. + if gotV < min || gotV > max { t.Errorf("got %f for quantile %f, want [%f,%f]", gotV, gotQ, min, max) } } @@ -192,10 +197,15 @@ func TestSummaryConcurrency(t *testing.T) { func TestSummaryVecConcurrency(t *testing.T) { rand.Seed(42) + objectives := make([]float64, 0, len(DefObjectives)) + for qu := range DefObjectives { + objectives = append(objectives, qu) + } + sort.Float64s(objectives) + it := func(n uint32) bool { mutations := int(n%10000 + 1) concLevel := int(n%15 + 1) - ε := 0.001 vecLength := int(n%5 + 1) var start, end sync.WaitGroup @@ -204,9 +214,8 @@ func TestSummaryVecConcurrency(t *testing.T) { sum := NewSummaryVec( SummaryOpts{ - Name: "test_summary", - Help: "helpless", - Epsilon: ε, + Name: "test_summary", + Help: "helpless", }, []string{"label"}, ) @@ -249,14 +258,15 @@ func TestSummaryVecConcurrency(t *testing.T) { if got, want := *m.Summary.SampleSum, sampleSums[i]; math.Abs((got-want)/want) > 0.001 { t.Errorf("got sample sum %f for label %c, want %f", got, 'A'+i, want) } - for j, wantQ := range DefObjectives { + for j, wantQ := range objectives { + ε := DefObjectives[wantQ] gotQ := *m.Summary.Quantile[j].Quantile gotV := *m.Summary.Quantile[j].Value min, max := getBounds(allVars[i], wantQ, ε) if gotQ != wantQ { t.Errorf("got quantile %f for label %c, want %f", gotQ, 'A'+i, wantQ) } - if (gotV < min || gotV > max) && len(allVars[i]) > 500 { // Avoid statistical outliers. + if gotV < min || gotV > max { t.Errorf("got %f for quantile %f for label %c, want [%f,%f]", gotV, gotQ, 'A'+i, min, max) t.Log(len(allVars[i])) } @@ -276,9 +286,8 @@ func XTestSummaryDecay(t *testing.T) { sum := NewSummary(SummaryOpts{ Name: "test_summary", Help: "helpless", - Epsilon: 0.001, MaxAge: 10 * time.Millisecond, - Objectives: []float64{0.1}, + Objectives: map[float64]float64{0.1: 0.001}, }) m := &dto.Metric{} @@ -302,15 +311,16 @@ func XTestSummaryDecay(t *testing.T) { } func getBounds(vars []float64, q, ε float64) (min, max float64) { - lower := int((q - 4*ε) * float64(len(vars))) - upper := int((q+4*ε)*float64(len(vars))) + 1 + n := float64(len(vars)) + lower := int((q - ε) * n) + upper := int(math.Ceil((q + ε) * n)) min = vars[0] - if lower > 0 { - min = vars[lower] + if lower > 1 { + min = vars[lower-1] } max = vars[len(vars)-1] - if upper < len(vars)-1 { - max = vars[upper] + if upper < len(vars) { + max = vars[upper-1] } return }