Merge pull request #49 from prometheus/beorn7/fix-summaries

Beorn7/fix summaries
This commit is contained in:
Björn Rabenstein 2015-01-21 16:42:22 +01:00
commit d5fe8ed656
13 changed files with 608 additions and 198 deletions

View File

@ -1 +1 @@
Imported at 5d903d2c5dc7f55829e36c62ae6c5f5f6d75e70a from https://github.com/u-c-l/perks .
Imported at f15ca8fc2964cb9f291e1cf17bb1bf9a4f9e23d5 from https://github.com/beorn7/perks .

View File

@ -0,0 +1,26 @@
package histogram
import (
"math/rand"
"testing"
)
func BenchmarkInsert10Bins(b *testing.B) {
b.StopTimer()
h := New(10)
b.StartTimer()
for i := 0; i < b.N; i++ {
f := rand.ExpFloat64()
h.Insert(f)
}
}
func BenchmarkInsert100Bins(b *testing.B) {
b.StopTimer()
h := New(100)
b.StartTimer()
for i := 0; i < b.N; i++ {
f := rand.ExpFloat64()
h.Insert(f)
}
}

View File

@ -0,0 +1,108 @@
// Package histogram provides a Go implementation of BigML's histogram package
// for Clojure/Java. It is currently experimental.
package histogram
import (
"container/heap"
"math"
"sort"
)
type Bin struct {
Count int
Sum float64
}
func (b *Bin) Update(x *Bin) {
b.Count += x.Count
b.Sum += x.Sum
}
func (b *Bin) Mean() float64 {
return b.Sum / float64(b.Count)
}
type Bins []*Bin
func (bs Bins) Len() int { return len(bs) }
func (bs Bins) Less(i, j int) bool { return bs[i].Mean() < bs[j].Mean() }
func (bs Bins) Swap(i, j int) { bs[i], bs[j] = bs[j], bs[i] }
func (bs *Bins) Push(x interface{}) {
*bs = append(*bs, x.(*Bin))
}
func (bs *Bins) Pop() interface{} {
return bs.remove(len(*bs) - 1)
}
func (bs *Bins) remove(n int) *Bin {
if n < 0 || len(*bs) < n {
return nil
}
x := (*bs)[n]
*bs = append((*bs)[:n], (*bs)[n+1:]...)
return x
}
type Histogram struct {
res *reservoir
}
func New(maxBins int) *Histogram {
return &Histogram{res: newReservoir(maxBins)}
}
func (h *Histogram) Insert(f float64) {
h.res.insert(&Bin{1, f})
h.res.compress()
}
func (h *Histogram) Bins() Bins {
return h.res.bins
}
type reservoir struct {
n int
maxBins int
bins Bins
}
func newReservoir(maxBins int) *reservoir {
return &reservoir{maxBins: maxBins}
}
func (r *reservoir) insert(bin *Bin) {
r.n += bin.Count
i := sort.Search(len(r.bins), func(i int) bool {
return r.bins[i].Mean() >= bin.Mean()
})
if i < 0 || i == r.bins.Len() {
// TODO(blake): Maybe use an .insert(i, bin) instead of
// performing the extra work of a heap.Push.
heap.Push(&r.bins, bin)
return
}
r.bins[i].Update(bin)
}
func (r *reservoir) compress() {
for r.bins.Len() > r.maxBins {
minGapIndex := -1
minGap := math.MaxFloat64
for i := 0; i < r.bins.Len()-1; i++ {
gap := gapWeight(r.bins[i], r.bins[i+1])
if minGap > gap {
minGap = gap
minGapIndex = i
}
}
prev := r.bins[minGapIndex]
next := r.bins.remove(minGapIndex + 1)
prev.Update(next)
}
}
func gapWeight(prev, next *Bin) float64 {
return next.Mean() - prev.Mean()
}

View File

@ -0,0 +1,38 @@
package histogram
import (
"math/rand"
"testing"
)
func TestHistogram(t *testing.T) {
const numPoints = 1e6
const maxBins = 3
h := New(maxBins)
for i := 0; i < numPoints; i++ {
f := rand.ExpFloat64()
h.Insert(f)
}
bins := h.Bins()
if g := len(bins); g > maxBins {
t.Fatalf("got %d bins, wanted <= %d", g, maxBins)
}
for _, b := range bins {
t.Logf("%+v", b)
}
if g := count(h.Bins()); g != numPoints {
t.Fatalf("binned %d points, wanted %d", g, numPoints)
}
}
func count(bins Bins) int {
binCounts := 0
for _, b := range bins {
binCounts += b.Count
}
return binCounts
}

View File

@ -7,7 +7,7 @@ import (
func BenchmarkInsertTargeted(b *testing.B) {
b.ReportAllocs()
s := NewTargeted(0.01, 0.5, 0.9, 0.99)
s := NewTargeted(Targets)
b.ResetTimer()
for i := float64(0); i < float64(b.N); i++ {
s.Insert(i)
@ -15,8 +15,7 @@ func BenchmarkInsertTargeted(b *testing.B) {
}
func BenchmarkInsertTargetedSmallEpsilon(b *testing.B) {
s := NewTargeted(0.01, 0.5, 0.9, 0.99)
s.SetEpsilon(0.0001)
s := NewTargeted(TargetsSmallEpsilon)
b.ResetTimer()
for i := float64(0); i < float64(b.N); i++ {
s.Insert(i)
@ -24,7 +23,7 @@ func BenchmarkInsertTargetedSmallEpsilon(b *testing.B) {
}
func BenchmarkInsertBiased(b *testing.B) {
s := NewBiased()
s := NewLowBiased(0.01)
b.ResetTimer()
for i := float64(0); i < float64(b.N); i++ {
s.Insert(i)
@ -32,8 +31,7 @@ func BenchmarkInsertBiased(b *testing.B) {
}
func BenchmarkInsertBiasedSmallEpsilon(b *testing.B) {
s := NewBiased()
s.SetEpsilon(0.0001)
s := NewLowBiased(0.0001)
b.ResetTimer()
for i := float64(0); i < float64(b.N); i++ {
s.Insert(i)
@ -41,7 +39,7 @@ func BenchmarkInsertBiasedSmallEpsilon(b *testing.B) {
}
func BenchmarkQuery(b *testing.B) {
s := NewTargeted(0.01, 0.5, 0.9, 0.99)
s := NewTargeted(Targets)
for i := float64(0); i < 1e6; i++ {
s.Insert(i)
}
@ -53,8 +51,7 @@ func BenchmarkQuery(b *testing.B) {
}
func BenchmarkQuerySmallEpsilon(b *testing.B) {
s := NewTargeted(0.01, 0.5, 0.9, 0.99)
s.SetEpsilon(0.0001)
s := NewTargeted(TargetsSmallEpsilon)
for i := float64(0); i < 1e6; i++ {
s.Insert(i)
}

View File

@ -36,30 +36,56 @@ func (a Samples) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
type invariant func(s *stream, r float64) float64
// NewBiased returns an initialized Stream for high-biased quantiles (e.g.
// 50th, 90th, 99th) not known a priori with finer error guarantees for the
// higher ranks of the data distribution.
// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties.
func NewBiased() *Stream {
// NewLowBiased returns an initialized Stream for low-biased quantiles
// (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
// error guarantees can still be given even for the lower ranks of the data
// distribution.
//
// The provided epsilon is a relative error, i.e. the true quantile of a value
// returned by a query is guaranteed to be within (1±Epsilon)*Quantile.
//
// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
// properties.
func NewLowBiased(epsilon float64) *Stream {
ƒ := func(s *stream, r float64) float64 {
return 2 * s.epsilon * r
return 2 * epsilon * r
}
return newStream(ƒ)
}
// NewHighBiased returns an initialized Stream for high-biased quantiles
// (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
// error guarantees can still be given even for the higher ranks of the data
// distribution.
//
// The provided epsilon is a relative error, i.e. the true quantile of a value
// returned by a query is guaranteed to be within 1-(1±Epsilon)*(1-Quantile).
//
// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
// properties.
func NewHighBiased(epsilon float64) *Stream {
ƒ := func(s *stream, r float64) float64 {
return 2 * epsilon * (s.n - r)
}
return newStream(ƒ)
}
// NewTargeted returns an initialized Stream concerned with a particular set of
// quantile values that are supplied a priori. Knowing these a priori reduces
// space and computation time.
// space and computation time. The targets map maps the desired quantiles to
// their absolute errors, i.e. the true quantile of a value returned by a query
// is guaranteed to be within (Quantile±Epsilon).
//
// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties.
func NewTargeted(quantiles ...float64) *Stream {
func NewTargeted(targets map[float64]float64) *Stream {
ƒ := func(s *stream, r float64) float64 {
var m float64 = math.MaxFloat64
var m = math.MaxFloat64
var f float64
for _, q := range quantiles {
if q*s.n <= r {
f = (2 * s.epsilon * r) / q
for quantile, epsilon := range targets {
if quantile*s.n <= r {
f = (2 * epsilon * r) / quantile
} else {
f = (2 * s.epsilon * (s.n - r)) / (1 - q)
f = (2 * epsilon * (s.n - r)) / (1 - quantile)
}
if f < m {
m = f
@ -79,8 +105,7 @@ type Stream struct {
}
func newStream(ƒ invariant) *Stream {
const defaultEpsilon = 0.01
x := &stream{epsilon: defaultEpsilon, ƒ: ƒ}
x := &stream{ƒ: ƒ}
return &Stream{x, make(Samples, 0, 500), true}
}
@ -94,7 +119,6 @@ func (s *Stream) insert(sample Sample) {
s.sorted = false
if len(s.b) == cap(s.b) {
s.flush()
s.compress()
}
}
@ -122,6 +146,9 @@ func (s *Stream) Query(q float64) float64 {
// Merge merges samples into the underlying streams samples. This is handy when
// merging multiple streams from separate threads, database shards, etc.
//
// ATTENTION: This method is broken and does not yield correct results. The
// underlying algorithm is not capable of merging streams correctly.
func (s *Stream) Merge(samples Samples) {
sort.Sort(samples)
s.stream.merge(samples)
@ -139,7 +166,6 @@ func (s *Stream) Samples() Samples {
return s.b
}
s.flush()
s.compress()
return s.stream.samples()
}
@ -167,18 +193,9 @@ func (s *Stream) flushed() bool {
}
type stream struct {
epsilon float64
n float64
l []Sample
ƒ invariant
}
// SetEpsilon sets the error epsilon for the Stream. The default epsilon is
// 0.01 and is usually satisfactory. If needed, this must be called before all
// Inserts.
// To learn more, see: http://www.cs.rutgers.edu/~muthu/bquant.pdf
func (s *stream) SetEpsilon(epsilon float64) {
s.epsilon = epsilon
n float64
l []Sample
ƒ invariant
}
func (s *stream) reset() {
@ -191,6 +208,10 @@ func (s *stream) insert(v float64) {
}
func (s *stream) merge(samples Samples) {
// TODO(beorn7): This tries to merge not only individual samples, but
// whole summaries. The paper doesn't mention merging summaries at
// all. Unittests show that the merging is inaccurate. Find out how to
// do merges properly.
var r float64
i := 0
for _, sample := range samples {
@ -200,7 +221,12 @@ func (s *stream) merge(samples Samples) {
// Insert at position i.
s.l = append(s.l, Sample{})
copy(s.l[i+1:], s.l[i:])
s.l[i] = Sample{sample.Value, sample.Width, math.Floor(s.ƒ(s, r)) - 1}
s.l[i] = Sample{
sample.Value,
sample.Width,
math.Max(sample.Delta, math.Floor(s.ƒ(s, r))-1),
// TODO(beorn7): How to calculate delta correctly?
}
i++
goto inserted
}
@ -210,7 +236,9 @@ func (s *stream) merge(samples Samples) {
i++
inserted:
s.n += sample.Width
r += sample.Width
}
s.compress()
}
func (s *stream) count() int {
@ -221,12 +249,12 @@ func (s *stream) query(q float64) float64 {
t := math.Ceil(q * s.n)
t += math.Ceil(s.ƒ(s, t) / 2)
p := s.l[0]
r := float64(0)
var r float64
for _, c := range s.l[1:] {
r += p.Width
if r+c.Width+c.Delta > t {
return p.Value
}
r += p.Width
p = c
}
return p.Value

View File

@ -1,81 +1,150 @@
package quantile
import (
"math"
"math/rand"
"sort"
"testing"
)
func TestQuantRandQuery(t *testing.T) {
s := NewTargeted(0.5, 0.90, 0.99)
a := make([]float64, 0, 1e5)
rand.Seed(42)
var (
Targets = map[float64]float64{
0.01: 0.001,
0.10: 0.01,
0.50: 0.05,
0.90: 0.01,
0.99: 0.001,
}
TargetsSmallEpsilon = map[float64]float64{
0.01: 0.0001,
0.10: 0.001,
0.50: 0.005,
0.90: 0.001,
0.99: 0.0001,
}
LowQuantiles = []float64{0.01, 0.1, 0.5}
HighQuantiles = []float64{0.99, 0.9, 0.5}
)
const RelativeEpsilon = 0.01
func verifyPercsWithAbsoluteEpsilon(t *testing.T, a []float64, s *Stream) {
sort.Float64s(a)
for quantile, epsilon := range Targets {
n := float64(len(a))
k := int(quantile * n)
lower := int((quantile - epsilon) * n)
if lower < 1 {
lower = 1
}
upper := int(math.Ceil((quantile + epsilon) * n))
if upper > len(a) {
upper = len(a)
}
w, min, max := a[k-1], a[lower-1], a[upper-1]
if g := s.Query(quantile); g < min || g > max {
t.Errorf("q=%f: want %v [%f,%f], got %v", quantile, w, min, max, g)
}
}
}
func verifyLowPercsWithRelativeEpsilon(t *testing.T, a []float64, s *Stream) {
sort.Float64s(a)
for _, qu := range LowQuantiles {
n := float64(len(a))
k := int(qu * n)
lowerRank := int((1 - RelativeEpsilon) * qu * n)
upperRank := int(math.Ceil((1 + RelativeEpsilon) * qu * n))
w, min, max := a[k-1], a[lowerRank-1], a[upperRank-1]
if g := s.Query(qu); g < min || g > max {
t.Errorf("q=%f: want %v [%f,%f], got %v", qu, w, min, max, g)
}
}
}
func verifyHighPercsWithRelativeEpsilon(t *testing.T, a []float64, s *Stream) {
sort.Float64s(a)
for _, qu := range HighQuantiles {
n := float64(len(a))
k := int(qu * n)
lowerRank := int((1 - (1+RelativeEpsilon)*(1-qu)) * n)
upperRank := int(math.Ceil((1 - (1-RelativeEpsilon)*(1-qu)) * n))
w, min, max := a[k-1], a[lowerRank-1], a[upperRank-1]
if g := s.Query(qu); g < min || g > max {
t.Errorf("q=%f: want %v [%f,%f], got %v", qu, w, min, max, g)
}
}
}
func populateStream(s *Stream) []float64 {
a := make([]float64, 0, 1e5+100)
for i := 0; i < cap(a); i++ {
v := rand.NormFloat64()
// Add 5% asymmetric outliers.
if i%20 == 0 {
v = v*v + 1
}
s.Insert(v)
a = append(a, v)
}
t.Logf("len: %d", s.Count())
sort.Float64s(a)
w, min, max := getPerc(a, 0.50)
if g := s.Query(0.50); g < min || g > max {
t.Errorf("perc50: want %v [%f,%f], got %v", w, min, max, g)
}
w, min, max = getPerc(a, 0.90)
if g := s.Query(0.90); g < min || g > max {
t.Errorf("perc90: want %v [%f,%f], got %v", w, min, max, g)
}
w, min, max = getPerc(a, 0.99)
if g := s.Query(0.99); g < min || g > max {
t.Errorf("perc99: want %v [%f,%f], got %v", w, min, max, g)
}
return a
}
func TestQuantRandMergeQuery(t *testing.T) {
ch := make(chan float64)
done := make(chan *Stream)
for i := 0; i < 2; i++ {
go func() {
s := NewTargeted(0.5, 0.90, 0.99)
for v := range ch {
s.Insert(v)
}
done <- s
}()
}
func TestTargetedQuery(t *testing.T) {
rand.Seed(42)
a := make([]float64, 0, 1e6)
for i := 0; i < cap(a); i++ {
v := rand.NormFloat64()
a = append(a, v)
ch <- v
}
close(ch)
s := NewTargeted(Targets)
a := populateStream(s)
verifyPercsWithAbsoluteEpsilon(t, a, s)
}
s := <-done
o := <-done
s.Merge(o.Samples())
func TestLowBiasedQuery(t *testing.T) {
rand.Seed(42)
s := NewLowBiased(RelativeEpsilon)
a := populateStream(s)
verifyLowPercsWithRelativeEpsilon(t, a, s)
}
t.Logf("len: %d", s.Count())
sort.Float64s(a)
w, min, max := getPerc(a, 0.50)
if g := s.Query(0.50); g < min || g > max {
t.Errorf("perc50: want %v [%f,%f], got %v", w, min, max, g)
}
w, min, max = getPerc(a, 0.90)
if g := s.Query(0.90); g < min || g > max {
t.Errorf("perc90: want %v [%f,%f], got %v", w, min, max, g)
}
w, min, max = getPerc(a, 0.99)
if g := s.Query(0.99); g < min || g > max {
t.Errorf("perc99: want %v [%f,%f], got %v", w, min, max, g)
}
func TestHighBiasedQuery(t *testing.T) {
rand.Seed(42)
s := NewHighBiased(RelativeEpsilon)
a := populateStream(s)
verifyHighPercsWithRelativeEpsilon(t, a, s)
}
func TestTargetedMerge(t *testing.T) {
rand.Seed(42)
s1 := NewTargeted(Targets)
s2 := NewTargeted(Targets)
a := populateStream(s1)
a = append(a, populateStream(s2)...)
s1.Merge(s2.Samples())
verifyPercsWithAbsoluteEpsilon(t, a, s1)
}
func TestLowBiasedMerge(t *testing.T) {
rand.Seed(42)
s1 := NewLowBiased(RelativeEpsilon)
s2 := NewLowBiased(RelativeEpsilon)
a := populateStream(s1)
a = append(a, populateStream(s2)...)
s1.Merge(s2.Samples())
verifyLowPercsWithRelativeEpsilon(t, a, s2)
}
func TestHighBiasedMerge(t *testing.T) {
rand.Seed(42)
s1 := NewHighBiased(RelativeEpsilon)
s2 := NewHighBiased(RelativeEpsilon)
a := populateStream(s1)
a = append(a, populateStream(s2)...)
s1.Merge(s2.Samples())
verifyHighPercsWithRelativeEpsilon(t, a, s2)
}
func TestUncompressed(t *testing.T) {
tests := []float64{0.50, 0.90, 0.95, 0.99}
q := NewTargeted(tests...)
q := NewTargeted(Targets)
for i := 100; i > 0; i-- {
q.Insert(float64(i))
}
@ -83,16 +152,16 @@ func TestUncompressed(t *testing.T) {
t.Errorf("want count 100, got %d", g)
}
// Before compression, Query should have 100% accuracy.
for _, v := range tests {
w := v * 100
if g := q.Query(v); g != w {
for quantile := range Targets {
w := quantile * 100
if g := q.Query(quantile); g != w {
t.Errorf("want %f, got %f", w, g)
}
}
}
func TestUncompressedSamples(t *testing.T) {
q := NewTargeted(0.99)
q := NewTargeted(map[float64]float64{0.99: 0.001})
for i := 1; i <= 100; i++ {
q.Insert(float64(i))
}
@ -102,7 +171,7 @@ func TestUncompressedSamples(t *testing.T) {
}
func TestUncompressedOne(t *testing.T) {
q := NewTargeted(0.90)
q := NewTargeted(map[float64]float64{0.99: 0.01})
q.Insert(3.14)
if g := q.Query(0.90); g != 3.14 {
t.Error("want PI, got", g)
@ -110,20 +179,7 @@ func TestUncompressedOne(t *testing.T) {
}
func TestDefaults(t *testing.T) {
if g := NewTargeted(0.99).Query(0.99); g != 0 {
if g := NewTargeted(map[float64]float64{0.99: 0.001}).Query(0.99); g != 0 {
t.Errorf("want 0, got %f", g)
}
}
func getPerc(x []float64, p float64) (want, min, max float64) {
k := int(float64(len(x)) * p)
lower := int(float64(len(x)) * (p - 0.04))
if lower < 0 {
lower = 0
}
upper := int(float64(len(x))*(p+0.04)) + 1
if upper >= len(x) {
upper = len(x) - 1
}
return x[k], x[lower], x[upper]
}

View File

@ -0,0 +1,90 @@
package topk
import (
"sort"
)
// http://www.cs.ucsb.edu/research/tech_reports/reports/2005-23.pdf
type Element struct {
Value string
Count int
}
type Samples []*Element
func (sm Samples) Len() int {
return len(sm)
}
func (sm Samples) Less(i, j int) bool {
return sm[i].Count < sm[j].Count
}
func (sm Samples) Swap(i, j int) {
sm[i], sm[j] = sm[j], sm[i]
}
type Stream struct {
k int
mon map[string]*Element
// the minimum Element
min *Element
}
func New(k int) *Stream {
s := new(Stream)
s.k = k
s.mon = make(map[string]*Element)
s.min = &Element{}
// Track k+1 so that less frequenet items contended for that spot,
// resulting in k being more accurate.
return s
}
func (s *Stream) Insert(x string) {
s.insert(&Element{x, 1})
}
func (s *Stream) Merge(sm Samples) {
for _, e := range sm {
s.insert(e)
}
}
func (s *Stream) insert(in *Element) {
e := s.mon[in.Value]
if e != nil {
e.Count++
} else {
if len(s.mon) < s.k+1 {
e = &Element{in.Value, in.Count}
s.mon[in.Value] = e
} else {
e = s.min
delete(s.mon, e.Value)
e.Value = in.Value
e.Count += in.Count
s.min = e
}
}
if e.Count < s.min.Count {
s.min = e
}
}
func (s *Stream) Query() Samples {
var sm Samples
for _, e := range s.mon {
sm = append(sm, e)
}
sort.Sort(sort.Reverse(sm))
if len(sm) < s.k {
return sm
}
return sm[:s.k]
}

View File

@ -0,0 +1,57 @@
package topk
import (
"fmt"
"math/rand"
"sort"
"testing"
)
func TestTopK(t *testing.T) {
stream := New(10)
ss := []*Stream{New(10), New(10), New(10)}
m := make(map[string]int)
for _, s := range ss {
for i := 0; i < 1e6; i++ {
v := fmt.Sprintf("%x", int8(rand.ExpFloat64()))
s.Insert(v)
m[v]++
}
stream.Merge(s.Query())
}
var sm Samples
for x, s := range m {
sm = append(sm, &Element{x, s})
}
sort.Sort(sort.Reverse(sm))
g := stream.Query()
if len(g) != 10 {
t.Fatalf("got %d, want 10", len(g))
}
for i, e := range g {
if sm[i].Value != e.Value {
t.Errorf("at %d: want %q, got %q", i, sm[i].Value, e.Value)
}
}
}
func TestQuery(t *testing.T) {
queryTests := []struct {
value string
expected int
}{
{"a", 1},
{"b", 2},
{"c", 2},
}
stream := New(2)
for _, tt := range queryTests {
stream.Insert(tt.value)
if n := len(stream.Query()); n != tt.expected {
t.Errorf("want %d, got %d", tt.expected, n)
}
}
}

View File

@ -362,11 +362,11 @@ func ExampleSummary() {
// sample_sum: 29969.50000000001
// quantile: <
// quantile: 0.5
// value: 30.2
// value: 31.1
// >
// quantile: <
// quantile: 0.9
// value: 41.4
// value: 41.3
// >
// quantile: <
// quantile: 0.99
@ -419,11 +419,11 @@ func ExampleSummaryVec() {
// sample_sum: 31956.100000000017
// quantile: <
// quantile: 0.5
// value: 32
// value: 32.4
// >
// quantile: <
// quantile: 0.9
// value: 41.5
// value: 41.4
// >
// quantile: <
// quantile: 0.99
@ -439,11 +439,11 @@ func ExampleSummaryVec() {
// sample_sum: 29969.50000000001
// quantile: <
// quantile: 0.5
// value: 30.2
// value: 31.1
// >
// quantile: <
// quantile: 0.9
// value: 41.4
// value: 41.3
// >
// quantile: <
// quantile: 0.99

View File

@ -43,6 +43,7 @@ func TestGoCollector(t *testing.T) {
}
if diff := int(pb.GetGauge().GetValue()) - old; diff != 1 {
// TODO: This is flaky in highly concurrent situations.
t.Errorf("want 1 new goroutine, got %d", diff)
}

View File

@ -46,7 +46,7 @@ type Summary interface {
// DefObjectives are the default Summary quantile values.
var (
DefObjectives = []float64{0.5, 0.9, 0.99}
DefObjectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}
)
// Default values for SummaryOpts.
@ -56,11 +56,9 @@ const (
DefMaxAge time.Duration = 10 * time.Minute
// DefAgeBuckets is the default number of buckets used to calculate the
// age of observations.
DefAgeBuckets = 10
DefAgeBuckets = 5
// DefBufCap is the standard buffer size for collecting Summary observations.
DefBufCap = 500
// DefEpsilon is the default error epsilon for the quantile rank estimates.
DefEpsilon = 0.001
)
// SummaryOpts bundles the options for creating a Summary metric. It is
@ -101,9 +99,9 @@ type SummaryOpts struct {
// metric name).
ConstLabels Labels
// Objectives defines the quantile rank estimates. The default value is
// DefObjectives.
Objectives []float64
// Objectives defines the quantile rank estimates with their respective
// absolute error. The default value is DefObjectives.
Objectives map[float64]float64
// MaxAge defines the duration for which an observation stays relevant
// for the summary. Must be positive. The default value is DefMaxAge.
@ -127,6 +125,21 @@ type SummaryOpts struct {
Epsilon float64
}
// TODO: Great fuck-up with the sliding-window decay algorithm... The Merge
// method of perk/quantile is actually not working as advertised - and it might
// be unfixable, as the underlying algorithm is apparently not capable of
// merging summaries in the first place. To avoid using Merge, we are currently
// adding observations to _each_ age bucket, i.e. the effort to add a sample is
// essentially multiplied by the number of age buckets. When rotating age
// buckets, we empty the previous head stream. On scrape time, we simply take
// the quantiles from the head stream (no merging required). Result: More effort
// on observation time, less effort on scrape time, which is exactly the
// opposite of what we try to accomplish, but at least the results are correct.
//
// The quite elegant previous contraption to merge the age buckets efficiently
// on scrape time (see code up commit 6b9530d72ea715f0ba612c0120e6e09fbf1d49d0)
// can't be used anymore.
// NewSummary creates a new Summary based on the provided SummaryOpts.
func NewSummary(opts SummaryOpts) Summary {
return newSummary(
@ -164,18 +177,11 @@ func newSummary(desc *Desc, opts SummaryOpts, labelValues ...string) Summary {
opts.BufCap = DefBufCap
}
if opts.Epsilon < 0 {
panic(fmt.Errorf("illegal value for Epsilon=%f", opts.Epsilon))
}
if opts.Epsilon == 0. {
opts.Epsilon = DefEpsilon
}
s := &summary{
desc: desc,
objectives: opts.Objectives,
epsilon: opts.Epsilon,
objectives: opts.Objectives,
sortedObjectives: make([]float64, 0, len(opts.Objectives)),
labelPairs: makeLabelPairs(desc, labelValues),
@ -183,8 +189,6 @@ func newSummary(desc *Desc, opts SummaryOpts, labelValues ...string) Summary {
coldBuf: make([]float64, 0, opts.BufCap),
streamDuration: opts.MaxAge / time.Duration(opts.AgeBuckets),
}
s.mergedTailStreams = s.newStream()
s.mergedAllStreams = s.newStream()
s.headStreamExpTime = time.Now().Add(s.streamDuration)
s.hotBufExpTime = s.headStreamExpTime
@ -193,6 +197,11 @@ func newSummary(desc *Desc, opts SummaryOpts, labelValues ...string) Summary {
}
s.headStream = s.streams[0]
for qu := range s.objectives {
s.sortedObjectives = append(s.sortedObjectives, qu)
}
sort.Float64s(s.sortedObjectives)
s.Init(s) // Init self-collection.
return s
}
@ -206,8 +215,8 @@ type summary struct {
desc *Desc
objectives []float64
epsilon float64
objectives map[float64]float64
sortedObjectives []float64
labelPairs []*dto.LabelPair
@ -218,10 +227,9 @@ type summary struct {
streams []*quantile.Stream
streamDuration time.Duration
headStream *quantile.Stream
headStreamIdx int
headStreamExpTime, hotBufExpTime time.Time
headStream, mergedTailStreams, mergedAllStreams *quantile.Stream
}
func (s *summary) Desc() *Desc {
@ -255,18 +263,15 @@ func (s *summary) Write(out *dto.Metric) error {
s.bufMtx.Unlock()
s.flushColdBuf()
s.mergedAllStreams.Merge(s.mergedTailStreams.Samples())
s.mergedAllStreams.Merge(s.headStream.Samples())
sum.SampleCount = proto.Uint64(s.cnt)
sum.SampleSum = proto.Float64(s.sum)
for _, rank := range s.objectives {
for _, rank := range s.sortedObjectives {
qs = append(qs, &dto.Quantile{
Quantile: proto.Float64(rank),
Value: proto.Float64(s.mergedAllStreams.Query(rank)),
Value: proto.Float64(s.headStream.Query(rank)),
})
}
s.mergedAllStreams.Reset()
s.mtx.Unlock()
@ -281,9 +286,7 @@ func (s *summary) Write(out *dto.Metric) error {
}
func (s *summary) newStream() *quantile.Stream {
stream := quantile.NewTargeted(s.objectives...)
stream.SetEpsilon(s.epsilon)
return stream
return quantile.NewTargeted(s.objectives)
}
// asyncFlush needs bufMtx locked.
@ -302,32 +305,23 @@ func (s *summary) asyncFlush(now time.Time) {
// rotateStreams needs mtx AND bufMtx locked.
func (s *summary) maybeRotateStreams() {
if s.hotBufExpTime.Equal(s.headStreamExpTime) {
// Fast return to avoid re-merging s.mergedTailStreams.
return
}
for !s.hotBufExpTime.Equal(s.headStreamExpTime) {
s.headStream.Reset()
s.headStreamIdx++
if s.headStreamIdx >= len(s.streams) {
s.headStreamIdx = 0
}
s.headStream = s.streams[s.headStreamIdx]
s.headStream.Reset()
s.headStreamExpTime = s.headStreamExpTime.Add(s.streamDuration)
}
s.mergedTailStreams.Reset()
for _, stream := range s.streams {
if stream != s.headStream {
s.mergedTailStreams.Merge(stream.Samples())
}
}
}
// flushColdBuf needs mtx locked.
func (s *summary) flushColdBuf() {
for _, v := range s.coldBuf {
s.headStream.Insert(v)
for _, stream := range s.streams {
stream.Insert(v)
}
s.cnt++
s.sum += v
}
@ -337,6 +331,9 @@ func (s *summary) flushColdBuf() {
// swapBufs needs mtx AND bufMtx locked, coldBuf must be empty.
func (s *summary) swapBufs(now time.Time) {
if len(s.coldBuf) != 0 {
panic("coldBuf is not empty")
}
s.hotBuf, s.coldBuf = s.coldBuf, s.hotBuf
// hotBuf is now empty and gets new expiration set.
for now.After(s.hotBufExpTime) {

View File

@ -123,19 +123,17 @@ func TestSummaryConcurrency(t *testing.T) {
rand.Seed(42)
it := func(n uint32) bool {
mutations := int(n%10000 + 1)
concLevel := int(n%15 + 1)
mutations := int(n%1e4 + 1e4)
concLevel := int(n%5 + 1)
total := mutations * concLevel
ε := 0.001
var start, end sync.WaitGroup
start.Add(1)
end.Add(concLevel)
sum := NewSummary(SummaryOpts{
Name: "test_summary",
Help: "helpless",
Epsilon: ε,
Name: "test_summary",
Help: "helpless",
})
allVars := make([]float64, total)
@ -170,14 +168,21 @@ func TestSummaryConcurrency(t *testing.T) {
t.Errorf("got sample sum %f, want %f", got, want)
}
for i, wantQ := range DefObjectives {
objectives := make([]float64, 0, len(DefObjectives))
for qu := range DefObjectives {
objectives = append(objectives, qu)
}
sort.Float64s(objectives)
for i, wantQ := range objectives {
ε := DefObjectives[wantQ]
gotQ := *m.Summary.Quantile[i].Quantile
gotV := *m.Summary.Quantile[i].Value
min, max := getBounds(allVars, wantQ, ε)
if gotQ != wantQ {
t.Errorf("got quantile %f, want %f", gotQ, wantQ)
}
if (gotV < min || gotV > max) && len(allVars) > 500 { // Avoid statistical outliers.
if gotV < min || gotV > max {
t.Errorf("got %f for quantile %f, want [%f,%f]", gotV, gotQ, min, max)
}
}
@ -192,11 +197,17 @@ func TestSummaryConcurrency(t *testing.T) {
func TestSummaryVecConcurrency(t *testing.T) {
rand.Seed(42)
objectives := make([]float64, 0, len(DefObjectives))
for qu := range DefObjectives {
objectives = append(objectives, qu)
}
sort.Float64s(objectives)
it := func(n uint32) bool {
mutations := int(n%10000 + 1)
concLevel := int(n%15 + 1)
ε := 0.001
vecLength := int(n%5 + 1)
mutations := int(n%1e4 + 1e4)
concLevel := int(n%7 + 1)
vecLength := int(n%3 + 1)
var start, end sync.WaitGroup
start.Add(1)
@ -204,9 +215,8 @@ func TestSummaryVecConcurrency(t *testing.T) {
sum := NewSummaryVec(
SummaryOpts{
Name: "test_summary",
Help: "helpless",
Epsilon: ε,
Name: "test_summary",
Help: "helpless",
},
[]string{"label"},
)
@ -249,16 +259,16 @@ func TestSummaryVecConcurrency(t *testing.T) {
if got, want := *m.Summary.SampleSum, sampleSums[i]; math.Abs((got-want)/want) > 0.001 {
t.Errorf("got sample sum %f for label %c, want %f", got, 'A'+i, want)
}
for j, wantQ := range DefObjectives {
for j, wantQ := range objectives {
ε := DefObjectives[wantQ]
gotQ := *m.Summary.Quantile[j].Quantile
gotV := *m.Summary.Quantile[j].Value
min, max := getBounds(allVars[i], wantQ, ε)
if gotQ != wantQ {
t.Errorf("got quantile %f for label %c, want %f", gotQ, 'A'+i, wantQ)
}
if (gotV < min || gotV > max) && len(allVars[i]) > 500 { // Avoid statistical outliers.
if gotV < min || gotV > max {
t.Errorf("got %f for quantile %f for label %c, want [%f,%f]", gotV, gotQ, 'A'+i, min, max)
t.Log(len(allVars[i]))
}
}
}
@ -270,20 +280,18 @@ func TestSummaryVecConcurrency(t *testing.T) {
}
}
// TODO(beorn): This test fails on Travis, likely because it depends on
// timing. Fix that and then Remove the leading X from the function name.
func XTestSummaryDecay(t *testing.T) {
func TestSummaryDecay(t *testing.T) {
sum := NewSummary(SummaryOpts{
Name: "test_summary",
Help: "helpless",
Epsilon: 0.001,
MaxAge: 10 * time.Millisecond,
Objectives: []float64{0.1},
MaxAge: 100 * time.Millisecond,
Objectives: map[float64]float64{0.1: 0.001},
AgeBuckets: 10,
})
m := &dto.Metric{}
i := 0
tick := time.NewTicker(100 * time.Microsecond)
tick := time.NewTicker(time.Millisecond)
for _ = range tick.C {
i++
sum.Observe(float64(i))
@ -302,15 +310,19 @@ func XTestSummaryDecay(t *testing.T) {
}
func getBounds(vars []float64, q, ε float64) (min, max float64) {
lower := int((q - 4*ε) * float64(len(vars)))
upper := int((q+4*ε)*float64(len(vars))) + 1
// TODO: This currently tolerates an error of up to 2*ε. The error must
// be at most ε, but for some reason, it's sometimes slightly
// higher. That's a bug.
n := float64(len(vars))
lower := int((q - 2*ε) * n)
upper := int(math.Ceil((q + 2*ε) * n))
min = vars[0]
if lower > 0 {
min = vars[lower]
if lower > 1 {
min = vars[lower-1]
}
max = vars[len(vars)-1]
if upper < len(vars)-1 {
max = vars[upper]
if upper < len(vars) {
max = vars[upper-1]
}
return
}