diff --git a/_vendor/perks/MANIFEST b/_vendor/perks/MANIFEST index 7110286..1fb6e82 100644 --- a/_vendor/perks/MANIFEST +++ b/_vendor/perks/MANIFEST @@ -1 +1 @@ -Imported at 5d903d2c5dc7f55829e36c62ae6c5f5f6d75e70a from https://github.com/u-c-l/perks . +Imported at f15ca8fc2964cb9f291e1cf17bb1bf9a4f9e23d5 from https://github.com/beorn7/perks . diff --git a/_vendor/perks/histogram/bench_test.go b/_vendor/perks/histogram/bench_test.go new file mode 100644 index 0000000..56c7e55 --- /dev/null +++ b/_vendor/perks/histogram/bench_test.go @@ -0,0 +1,26 @@ +package histogram + +import ( + "math/rand" + "testing" +) + +func BenchmarkInsert10Bins(b *testing.B) { + b.StopTimer() + h := New(10) + b.StartTimer() + for i := 0; i < b.N; i++ { + f := rand.ExpFloat64() + h.Insert(f) + } +} + +func BenchmarkInsert100Bins(b *testing.B) { + b.StopTimer() + h := New(100) + b.StartTimer() + for i := 0; i < b.N; i++ { + f := rand.ExpFloat64() + h.Insert(f) + } +} diff --git a/_vendor/perks/histogram/histogram.go b/_vendor/perks/histogram/histogram.go new file mode 100644 index 0000000..bef05c7 --- /dev/null +++ b/_vendor/perks/histogram/histogram.go @@ -0,0 +1,108 @@ +// Package histogram provides a Go implementation of BigML's histogram package +// for Clojure/Java. It is currently experimental. +package histogram + +import ( + "container/heap" + "math" + "sort" +) + +type Bin struct { + Count int + Sum float64 +} + +func (b *Bin) Update(x *Bin) { + b.Count += x.Count + b.Sum += x.Sum +} + +func (b *Bin) Mean() float64 { + return b.Sum / float64(b.Count) +} + +type Bins []*Bin + +func (bs Bins) Len() int { return len(bs) } +func (bs Bins) Less(i, j int) bool { return bs[i].Mean() < bs[j].Mean() } +func (bs Bins) Swap(i, j int) { bs[i], bs[j] = bs[j], bs[i] } + +func (bs *Bins) Push(x interface{}) { + *bs = append(*bs, x.(*Bin)) +} + +func (bs *Bins) Pop() interface{} { + return bs.remove(len(*bs) - 1) +} + +func (bs *Bins) remove(n int) *Bin { + if n < 0 || len(*bs) < n { + return nil + } + x := (*bs)[n] + *bs = append((*bs)[:n], (*bs)[n+1:]...) + return x +} + +type Histogram struct { + res *reservoir +} + +func New(maxBins int) *Histogram { + return &Histogram{res: newReservoir(maxBins)} +} + +func (h *Histogram) Insert(f float64) { + h.res.insert(&Bin{1, f}) + h.res.compress() +} + +func (h *Histogram) Bins() Bins { + return h.res.bins +} + +type reservoir struct { + n int + maxBins int + bins Bins +} + +func newReservoir(maxBins int) *reservoir { + return &reservoir{maxBins: maxBins} +} + +func (r *reservoir) insert(bin *Bin) { + r.n += bin.Count + i := sort.Search(len(r.bins), func(i int) bool { + return r.bins[i].Mean() >= bin.Mean() + }) + if i < 0 || i == r.bins.Len() { + // TODO(blake): Maybe use an .insert(i, bin) instead of + // performing the extra work of a heap.Push. + heap.Push(&r.bins, bin) + return + } + r.bins[i].Update(bin) +} + +func (r *reservoir) compress() { + for r.bins.Len() > r.maxBins { + minGapIndex := -1 + minGap := math.MaxFloat64 + for i := 0; i < r.bins.Len()-1; i++ { + gap := gapWeight(r.bins[i], r.bins[i+1]) + if minGap > gap { + minGap = gap + minGapIndex = i + } + } + prev := r.bins[minGapIndex] + next := r.bins.remove(minGapIndex + 1) + prev.Update(next) + } +} + +func gapWeight(prev, next *Bin) float64 { + return next.Mean() - prev.Mean() +} diff --git a/_vendor/perks/histogram/histogram_test.go b/_vendor/perks/histogram/histogram_test.go new file mode 100644 index 0000000..0575ebe --- /dev/null +++ b/_vendor/perks/histogram/histogram_test.go @@ -0,0 +1,38 @@ +package histogram + +import ( + "math/rand" + "testing" +) + +func TestHistogram(t *testing.T) { + const numPoints = 1e6 + const maxBins = 3 + + h := New(maxBins) + for i := 0; i < numPoints; i++ { + f := rand.ExpFloat64() + h.Insert(f) + } + + bins := h.Bins() + if g := len(bins); g > maxBins { + t.Fatalf("got %d bins, wanted <= %d", g, maxBins) + } + + for _, b := range bins { + t.Logf("%+v", b) + } + + if g := count(h.Bins()); g != numPoints { + t.Fatalf("binned %d points, wanted %d", g, numPoints) + } +} + +func count(bins Bins) int { + binCounts := 0 + for _, b := range bins { + binCounts += b.Count + } + return binCounts +} diff --git a/_vendor/perks/quantile/bench_test.go b/_vendor/perks/quantile/bench_test.go index cfb8b32..0bd0e4e 100644 --- a/_vendor/perks/quantile/bench_test.go +++ b/_vendor/perks/quantile/bench_test.go @@ -7,7 +7,7 @@ import ( func BenchmarkInsertTargeted(b *testing.B) { b.ReportAllocs() - s := NewTargeted(0.01, 0.5, 0.9, 0.99) + s := NewTargeted(Targets) b.ResetTimer() for i := float64(0); i < float64(b.N); i++ { s.Insert(i) @@ -15,8 +15,7 @@ func BenchmarkInsertTargeted(b *testing.B) { } func BenchmarkInsertTargetedSmallEpsilon(b *testing.B) { - s := NewTargeted(0.01, 0.5, 0.9, 0.99) - s.SetEpsilon(0.0001) + s := NewTargeted(TargetsSmallEpsilon) b.ResetTimer() for i := float64(0); i < float64(b.N); i++ { s.Insert(i) @@ -24,7 +23,7 @@ func BenchmarkInsertTargetedSmallEpsilon(b *testing.B) { } func BenchmarkInsertBiased(b *testing.B) { - s := NewBiased() + s := NewLowBiased(0.01) b.ResetTimer() for i := float64(0); i < float64(b.N); i++ { s.Insert(i) @@ -32,8 +31,7 @@ func BenchmarkInsertBiased(b *testing.B) { } func BenchmarkInsertBiasedSmallEpsilon(b *testing.B) { - s := NewBiased() - s.SetEpsilon(0.0001) + s := NewLowBiased(0.0001) b.ResetTimer() for i := float64(0); i < float64(b.N); i++ { s.Insert(i) @@ -41,7 +39,7 @@ func BenchmarkInsertBiasedSmallEpsilon(b *testing.B) { } func BenchmarkQuery(b *testing.B) { - s := NewTargeted(0.01, 0.5, 0.9, 0.99) + s := NewTargeted(Targets) for i := float64(0); i < 1e6; i++ { s.Insert(i) } @@ -53,8 +51,7 @@ func BenchmarkQuery(b *testing.B) { } func BenchmarkQuerySmallEpsilon(b *testing.B) { - s := NewTargeted(0.01, 0.5, 0.9, 0.99) - s.SetEpsilon(0.0001) + s := NewTargeted(TargetsSmallEpsilon) for i := float64(0); i < 1e6; i++ { s.Insert(i) } diff --git a/_vendor/perks/quantile/stream.go b/_vendor/perks/quantile/stream.go index 566d0fb..587b1fc 100644 --- a/_vendor/perks/quantile/stream.go +++ b/_vendor/perks/quantile/stream.go @@ -36,30 +36,56 @@ func (a Samples) Swap(i, j int) { a[i], a[j] = a[j], a[i] } type invariant func(s *stream, r float64) float64 -// NewBiased returns an initialized Stream for high-biased quantiles (e.g. -// 50th, 90th, 99th) not known a priori with finer error guarantees for the -// higher ranks of the data distribution. -// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties. -func NewBiased() *Stream { +// NewLowBiased returns an initialized Stream for low-biased quantiles +// (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but +// error guarantees can still be given even for the lower ranks of the data +// distribution. +// +// The provided epsilon is a relative error, i.e. the true quantile of a value +// returned by a query is guaranteed to be within (1±Epsilon)*Quantile. +// +// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error +// properties. +func NewLowBiased(epsilon float64) *Stream { ƒ := func(s *stream, r float64) float64 { - return 2 * s.epsilon * r + return 2 * epsilon * r + } + return newStream(ƒ) +} + +// NewHighBiased returns an initialized Stream for high-biased quantiles +// (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but +// error guarantees can still be given even for the higher ranks of the data +// distribution. +// +// The provided epsilon is a relative error, i.e. the true quantile of a value +// returned by a query is guaranteed to be within 1-(1±Epsilon)*(1-Quantile). +// +// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error +// properties. +func NewHighBiased(epsilon float64) *Stream { + ƒ := func(s *stream, r float64) float64 { + return 2 * epsilon * (s.n - r) } return newStream(ƒ) } // NewTargeted returns an initialized Stream concerned with a particular set of // quantile values that are supplied a priori. Knowing these a priori reduces -// space and computation time. +// space and computation time. The targets map maps the desired quantiles to +// their absolute errors, i.e. the true quantile of a value returned by a query +// is guaranteed to be within (Quantile±Epsilon). +// // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties. -func NewTargeted(quantiles ...float64) *Stream { +func NewTargeted(targets map[float64]float64) *Stream { ƒ := func(s *stream, r float64) float64 { - var m float64 = math.MaxFloat64 + var m = math.MaxFloat64 var f float64 - for _, q := range quantiles { - if q*s.n <= r { - f = (2 * s.epsilon * r) / q + for quantile, epsilon := range targets { + if quantile*s.n <= r { + f = (2 * epsilon * r) / quantile } else { - f = (2 * s.epsilon * (s.n - r)) / (1 - q) + f = (2 * epsilon * (s.n - r)) / (1 - quantile) } if f < m { m = f @@ -79,8 +105,7 @@ type Stream struct { } func newStream(ƒ invariant) *Stream { - const defaultEpsilon = 0.01 - x := &stream{epsilon: defaultEpsilon, ƒ: ƒ} + x := &stream{ƒ: ƒ} return &Stream{x, make(Samples, 0, 500), true} } @@ -94,7 +119,6 @@ func (s *Stream) insert(sample Sample) { s.sorted = false if len(s.b) == cap(s.b) { s.flush() - s.compress() } } @@ -122,6 +146,9 @@ func (s *Stream) Query(q float64) float64 { // Merge merges samples into the underlying streams samples. This is handy when // merging multiple streams from separate threads, database shards, etc. +// +// ATTENTION: This method is broken and does not yield correct results. The +// underlying algorithm is not capable of merging streams correctly. func (s *Stream) Merge(samples Samples) { sort.Sort(samples) s.stream.merge(samples) @@ -139,7 +166,6 @@ func (s *Stream) Samples() Samples { return s.b } s.flush() - s.compress() return s.stream.samples() } @@ -167,18 +193,9 @@ func (s *Stream) flushed() bool { } type stream struct { - epsilon float64 - n float64 - l []Sample - ƒ invariant -} - -// SetEpsilon sets the error epsilon for the Stream. The default epsilon is -// 0.01 and is usually satisfactory. If needed, this must be called before all -// Inserts. -// To learn more, see: http://www.cs.rutgers.edu/~muthu/bquant.pdf -func (s *stream) SetEpsilon(epsilon float64) { - s.epsilon = epsilon + n float64 + l []Sample + ƒ invariant } func (s *stream) reset() { @@ -191,6 +208,10 @@ func (s *stream) insert(v float64) { } func (s *stream) merge(samples Samples) { + // TODO(beorn7): This tries to merge not only individual samples, but + // whole summaries. The paper doesn't mention merging summaries at + // all. Unittests show that the merging is inaccurate. Find out how to + // do merges properly. var r float64 i := 0 for _, sample := range samples { @@ -200,7 +221,12 @@ func (s *stream) merge(samples Samples) { // Insert at position i. s.l = append(s.l, Sample{}) copy(s.l[i+1:], s.l[i:]) - s.l[i] = Sample{sample.Value, sample.Width, math.Floor(s.ƒ(s, r)) - 1} + s.l[i] = Sample{ + sample.Value, + sample.Width, + math.Max(sample.Delta, math.Floor(s.ƒ(s, r))-1), + // TODO(beorn7): How to calculate delta correctly? + } i++ goto inserted } @@ -210,7 +236,9 @@ func (s *stream) merge(samples Samples) { i++ inserted: s.n += sample.Width + r += sample.Width } + s.compress() } func (s *stream) count() int { @@ -221,12 +249,12 @@ func (s *stream) query(q float64) float64 { t := math.Ceil(q * s.n) t += math.Ceil(s.ƒ(s, t) / 2) p := s.l[0] - r := float64(0) + var r float64 for _, c := range s.l[1:] { + r += p.Width if r+c.Width+c.Delta > t { return p.Value } - r += p.Width p = c } return p.Value diff --git a/_vendor/perks/quantile/stream_test.go b/_vendor/perks/quantile/stream_test.go index 1b1b4ed..707b871 100644 --- a/_vendor/perks/quantile/stream_test.go +++ b/_vendor/perks/quantile/stream_test.go @@ -1,81 +1,150 @@ package quantile import ( + "math" "math/rand" "sort" "testing" ) -func TestQuantRandQuery(t *testing.T) { - s := NewTargeted(0.5, 0.90, 0.99) - a := make([]float64, 0, 1e5) - rand.Seed(42) +var ( + Targets = map[float64]float64{ + 0.01: 0.001, + 0.10: 0.01, + 0.50: 0.05, + 0.90: 0.01, + 0.99: 0.001, + } + TargetsSmallEpsilon = map[float64]float64{ + 0.01: 0.0001, + 0.10: 0.001, + 0.50: 0.005, + 0.90: 0.001, + 0.99: 0.0001, + } + LowQuantiles = []float64{0.01, 0.1, 0.5} + HighQuantiles = []float64{0.99, 0.9, 0.5} +) + +const RelativeEpsilon = 0.01 + +func verifyPercsWithAbsoluteEpsilon(t *testing.T, a []float64, s *Stream) { + sort.Float64s(a) + for quantile, epsilon := range Targets { + n := float64(len(a)) + k := int(quantile * n) + lower := int((quantile - epsilon) * n) + if lower < 1 { + lower = 1 + } + upper := int(math.Ceil((quantile + epsilon) * n)) + if upper > len(a) { + upper = len(a) + } + w, min, max := a[k-1], a[lower-1], a[upper-1] + if g := s.Query(quantile); g < min || g > max { + t.Errorf("q=%f: want %v [%f,%f], got %v", quantile, w, min, max, g) + } + } +} + +func verifyLowPercsWithRelativeEpsilon(t *testing.T, a []float64, s *Stream) { + sort.Float64s(a) + for _, qu := range LowQuantiles { + n := float64(len(a)) + k := int(qu * n) + + lowerRank := int((1 - RelativeEpsilon) * qu * n) + upperRank := int(math.Ceil((1 + RelativeEpsilon) * qu * n)) + w, min, max := a[k-1], a[lowerRank-1], a[upperRank-1] + if g := s.Query(qu); g < min || g > max { + t.Errorf("q=%f: want %v [%f,%f], got %v", qu, w, min, max, g) + } + } +} + +func verifyHighPercsWithRelativeEpsilon(t *testing.T, a []float64, s *Stream) { + sort.Float64s(a) + for _, qu := range HighQuantiles { + n := float64(len(a)) + k := int(qu * n) + + lowerRank := int((1 - (1+RelativeEpsilon)*(1-qu)) * n) + upperRank := int(math.Ceil((1 - (1-RelativeEpsilon)*(1-qu)) * n)) + w, min, max := a[k-1], a[lowerRank-1], a[upperRank-1] + if g := s.Query(qu); g < min || g > max { + t.Errorf("q=%f: want %v [%f,%f], got %v", qu, w, min, max, g) + } + } +} + +func populateStream(s *Stream) []float64 { + a := make([]float64, 0, 1e5+100) for i := 0; i < cap(a); i++ { v := rand.NormFloat64() + // Add 5% asymmetric outliers. + if i%20 == 0 { + v = v*v + 1 + } s.Insert(v) a = append(a, v) } - t.Logf("len: %d", s.Count()) - sort.Float64s(a) - w, min, max := getPerc(a, 0.50) - if g := s.Query(0.50); g < min || g > max { - t.Errorf("perc50: want %v [%f,%f], got %v", w, min, max, g) - } - w, min, max = getPerc(a, 0.90) - if g := s.Query(0.90); g < min || g > max { - t.Errorf("perc90: want %v [%f,%f], got %v", w, min, max, g) - } - w, min, max = getPerc(a, 0.99) - if g := s.Query(0.99); g < min || g > max { - t.Errorf("perc99: want %v [%f,%f], got %v", w, min, max, g) - } + return a } -func TestQuantRandMergeQuery(t *testing.T) { - ch := make(chan float64) - done := make(chan *Stream) - for i := 0; i < 2; i++ { - go func() { - s := NewTargeted(0.5, 0.90, 0.99) - for v := range ch { - s.Insert(v) - } - done <- s - }() - } - +func TestTargetedQuery(t *testing.T) { rand.Seed(42) - a := make([]float64, 0, 1e6) - for i := 0; i < cap(a); i++ { - v := rand.NormFloat64() - a = append(a, v) - ch <- v - } - close(ch) + s := NewTargeted(Targets) + a := populateStream(s) + verifyPercsWithAbsoluteEpsilon(t, a, s) +} - s := <-done - o := <-done - s.Merge(o.Samples()) +func TestLowBiasedQuery(t *testing.T) { + rand.Seed(42) + s := NewLowBiased(RelativeEpsilon) + a := populateStream(s) + verifyLowPercsWithRelativeEpsilon(t, a, s) +} - t.Logf("len: %d", s.Count()) - sort.Float64s(a) - w, min, max := getPerc(a, 0.50) - if g := s.Query(0.50); g < min || g > max { - t.Errorf("perc50: want %v [%f,%f], got %v", w, min, max, g) - } - w, min, max = getPerc(a, 0.90) - if g := s.Query(0.90); g < min || g > max { - t.Errorf("perc90: want %v [%f,%f], got %v", w, min, max, g) - } - w, min, max = getPerc(a, 0.99) - if g := s.Query(0.99); g < min || g > max { - t.Errorf("perc99: want %v [%f,%f], got %v", w, min, max, g) - } +func TestHighBiasedQuery(t *testing.T) { + rand.Seed(42) + s := NewHighBiased(RelativeEpsilon) + a := populateStream(s) + verifyHighPercsWithRelativeEpsilon(t, a, s) +} + +func TestTargetedMerge(t *testing.T) { + rand.Seed(42) + s1 := NewTargeted(Targets) + s2 := NewTargeted(Targets) + a := populateStream(s1) + a = append(a, populateStream(s2)...) + s1.Merge(s2.Samples()) + verifyPercsWithAbsoluteEpsilon(t, a, s1) +} + +func TestLowBiasedMerge(t *testing.T) { + rand.Seed(42) + s1 := NewLowBiased(RelativeEpsilon) + s2 := NewLowBiased(RelativeEpsilon) + a := populateStream(s1) + a = append(a, populateStream(s2)...) + s1.Merge(s2.Samples()) + verifyLowPercsWithRelativeEpsilon(t, a, s2) +} + +func TestHighBiasedMerge(t *testing.T) { + rand.Seed(42) + s1 := NewHighBiased(RelativeEpsilon) + s2 := NewHighBiased(RelativeEpsilon) + a := populateStream(s1) + a = append(a, populateStream(s2)...) + s1.Merge(s2.Samples()) + verifyHighPercsWithRelativeEpsilon(t, a, s2) } func TestUncompressed(t *testing.T) { - tests := []float64{0.50, 0.90, 0.95, 0.99} - q := NewTargeted(tests...) + q := NewTargeted(Targets) for i := 100; i > 0; i-- { q.Insert(float64(i)) } @@ -83,16 +152,16 @@ func TestUncompressed(t *testing.T) { t.Errorf("want count 100, got %d", g) } // Before compression, Query should have 100% accuracy. - for _, v := range tests { - w := v * 100 - if g := q.Query(v); g != w { + for quantile := range Targets { + w := quantile * 100 + if g := q.Query(quantile); g != w { t.Errorf("want %f, got %f", w, g) } } } func TestUncompressedSamples(t *testing.T) { - q := NewTargeted(0.99) + q := NewTargeted(map[float64]float64{0.99: 0.001}) for i := 1; i <= 100; i++ { q.Insert(float64(i)) } @@ -102,7 +171,7 @@ func TestUncompressedSamples(t *testing.T) { } func TestUncompressedOne(t *testing.T) { - q := NewTargeted(0.90) + q := NewTargeted(map[float64]float64{0.99: 0.01}) q.Insert(3.14) if g := q.Query(0.90); g != 3.14 { t.Error("want PI, got", g) @@ -110,20 +179,7 @@ func TestUncompressedOne(t *testing.T) { } func TestDefaults(t *testing.T) { - if g := NewTargeted(0.99).Query(0.99); g != 0 { + if g := NewTargeted(map[float64]float64{0.99: 0.001}).Query(0.99); g != 0 { t.Errorf("want 0, got %f", g) } } - -func getPerc(x []float64, p float64) (want, min, max float64) { - k := int(float64(len(x)) * p) - lower := int(float64(len(x)) * (p - 0.04)) - if lower < 0 { - lower = 0 - } - upper := int(float64(len(x))*(p+0.04)) + 1 - if upper >= len(x) { - upper = len(x) - 1 - } - return x[k], x[lower], x[upper] -} diff --git a/_vendor/perks/topk/topk.go b/_vendor/perks/topk/topk.go new file mode 100644 index 0000000..5ac3d99 --- /dev/null +++ b/_vendor/perks/topk/topk.go @@ -0,0 +1,90 @@ +package topk + +import ( + "sort" +) + +// http://www.cs.ucsb.edu/research/tech_reports/reports/2005-23.pdf + +type Element struct { + Value string + Count int +} + +type Samples []*Element + +func (sm Samples) Len() int { + return len(sm) +} + +func (sm Samples) Less(i, j int) bool { + return sm[i].Count < sm[j].Count +} + +func (sm Samples) Swap(i, j int) { + sm[i], sm[j] = sm[j], sm[i] +} + +type Stream struct { + k int + mon map[string]*Element + + // the minimum Element + min *Element +} + +func New(k int) *Stream { + s := new(Stream) + s.k = k + s.mon = make(map[string]*Element) + s.min = &Element{} + + // Track k+1 so that less frequenet items contended for that spot, + // resulting in k being more accurate. + return s +} + +func (s *Stream) Insert(x string) { + s.insert(&Element{x, 1}) +} + +func (s *Stream) Merge(sm Samples) { + for _, e := range sm { + s.insert(e) + } +} + +func (s *Stream) insert(in *Element) { + e := s.mon[in.Value] + if e != nil { + e.Count++ + } else { + if len(s.mon) < s.k+1 { + e = &Element{in.Value, in.Count} + s.mon[in.Value] = e + } else { + e = s.min + delete(s.mon, e.Value) + e.Value = in.Value + e.Count += in.Count + s.min = e + } + } + if e.Count < s.min.Count { + s.min = e + } +} + +func (s *Stream) Query() Samples { + var sm Samples + for _, e := range s.mon { + sm = append(sm, e) + } + sort.Sort(sort.Reverse(sm)) + + if len(sm) < s.k { + return sm + } + + return sm[:s.k] +} diff --git a/_vendor/perks/topk/topk_test.go b/_vendor/perks/topk/topk_test.go new file mode 100644 index 0000000..c24f0f7 --- /dev/null +++ b/_vendor/perks/topk/topk_test.go @@ -0,0 +1,57 @@ +package topk + +import ( + "fmt" + "math/rand" + "sort" + "testing" +) + +func TestTopK(t *testing.T) { + stream := New(10) + ss := []*Stream{New(10), New(10), New(10)} + m := make(map[string]int) + for _, s := range ss { + for i := 0; i < 1e6; i++ { + v := fmt.Sprintf("%x", int8(rand.ExpFloat64())) + s.Insert(v) + m[v]++ + } + stream.Merge(s.Query()) + } + + var sm Samples + for x, s := range m { + sm = append(sm, &Element{x, s}) + } + sort.Sort(sort.Reverse(sm)) + + g := stream.Query() + if len(g) != 10 { + t.Fatalf("got %d, want 10", len(g)) + } + for i, e := range g { + if sm[i].Value != e.Value { + t.Errorf("at %d: want %q, got %q", i, sm[i].Value, e.Value) + } + } +} + +func TestQuery(t *testing.T) { + queryTests := []struct { + value string + expected int + }{ + {"a", 1}, + {"b", 2}, + {"c", 2}, + } + + stream := New(2) + for _, tt := range queryTests { + stream.Insert(tt.value) + if n := len(stream.Query()); n != tt.expected { + t.Errorf("want %d, got %d", tt.expected, n) + } + } +} diff --git a/prometheus/examples_test.go b/prometheus/examples_test.go index 63261db..4afbadb 100644 --- a/prometheus/examples_test.go +++ b/prometheus/examples_test.go @@ -362,11 +362,11 @@ func ExampleSummary() { // sample_sum: 29969.50000000001 // quantile: < // quantile: 0.5 - // value: 30.2 + // value: 31.1 // > // quantile: < // quantile: 0.9 - // value: 41.4 + // value: 41.3 // > // quantile: < // quantile: 0.99 @@ -419,11 +419,11 @@ func ExampleSummaryVec() { // sample_sum: 31956.100000000017 // quantile: < // quantile: 0.5 - // value: 32 + // value: 32.4 // > // quantile: < // quantile: 0.9 - // value: 41.5 + // value: 41.4 // > // quantile: < // quantile: 0.99 @@ -439,11 +439,11 @@ func ExampleSummaryVec() { // sample_sum: 29969.50000000001 // quantile: < // quantile: 0.5 - // value: 30.2 + // value: 31.1 // > // quantile: < // quantile: 0.9 - // value: 41.4 + // value: 41.3 // > // quantile: < // quantile: 0.99 diff --git a/prometheus/go_collector_test.go b/prometheus/go_collector_test.go index 4e7d572..b0582d1 100644 --- a/prometheus/go_collector_test.go +++ b/prometheus/go_collector_test.go @@ -43,6 +43,7 @@ func TestGoCollector(t *testing.T) { } if diff := int(pb.GetGauge().GetValue()) - old; diff != 1 { + // TODO: This is flaky in highly concurrent situations. t.Errorf("want 1 new goroutine, got %d", diff) } diff --git a/prometheus/summary.go b/prometheus/summary.go index afb9bca..164876b 100644 --- a/prometheus/summary.go +++ b/prometheus/summary.go @@ -46,7 +46,7 @@ type Summary interface { // DefObjectives are the default Summary quantile values. var ( - DefObjectives = []float64{0.5, 0.9, 0.99} + DefObjectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001} ) // Default values for SummaryOpts. @@ -56,11 +56,9 @@ const ( DefMaxAge time.Duration = 10 * time.Minute // DefAgeBuckets is the default number of buckets used to calculate the // age of observations. - DefAgeBuckets = 10 + DefAgeBuckets = 5 // DefBufCap is the standard buffer size for collecting Summary observations. DefBufCap = 500 - // DefEpsilon is the default error epsilon for the quantile rank estimates. - DefEpsilon = 0.001 ) // SummaryOpts bundles the options for creating a Summary metric. It is @@ -101,9 +99,9 @@ type SummaryOpts struct { // metric name). ConstLabels Labels - // Objectives defines the quantile rank estimates. The default value is - // DefObjectives. - Objectives []float64 + // Objectives defines the quantile rank estimates with their respective + // absolute error. The default value is DefObjectives. + Objectives map[float64]float64 // MaxAge defines the duration for which an observation stays relevant // for the summary. Must be positive. The default value is DefMaxAge. @@ -127,6 +125,21 @@ type SummaryOpts struct { Epsilon float64 } +// TODO: Great fuck-up with the sliding-window decay algorithm... The Merge +// method of perk/quantile is actually not working as advertised - and it might +// be unfixable, as the underlying algorithm is apparently not capable of +// merging summaries in the first place. To avoid using Merge, we are currently +// adding observations to _each_ age bucket, i.e. the effort to add a sample is +// essentially multiplied by the number of age buckets. When rotating age +// buckets, we empty the previous head stream. On scrape time, we simply take +// the quantiles from the head stream (no merging required). Result: More effort +// on observation time, less effort on scrape time, which is exactly the +// opposite of what we try to accomplish, but at least the results are correct. +// +// The quite elegant previous contraption to merge the age buckets efficiently +// on scrape time (see code up commit 6b9530d72ea715f0ba612c0120e6e09fbf1d49d0) +// can't be used anymore. + // NewSummary creates a new Summary based on the provided SummaryOpts. func NewSummary(opts SummaryOpts) Summary { return newSummary( @@ -164,18 +177,11 @@ func newSummary(desc *Desc, opts SummaryOpts, labelValues ...string) Summary { opts.BufCap = DefBufCap } - if opts.Epsilon < 0 { - panic(fmt.Errorf("illegal value for Epsilon=%f", opts.Epsilon)) - } - if opts.Epsilon == 0. { - opts.Epsilon = DefEpsilon - } - s := &summary{ desc: desc, - objectives: opts.Objectives, - epsilon: opts.Epsilon, + objectives: opts.Objectives, + sortedObjectives: make([]float64, 0, len(opts.Objectives)), labelPairs: makeLabelPairs(desc, labelValues), @@ -183,8 +189,6 @@ func newSummary(desc *Desc, opts SummaryOpts, labelValues ...string) Summary { coldBuf: make([]float64, 0, opts.BufCap), streamDuration: opts.MaxAge / time.Duration(opts.AgeBuckets), } - s.mergedTailStreams = s.newStream() - s.mergedAllStreams = s.newStream() s.headStreamExpTime = time.Now().Add(s.streamDuration) s.hotBufExpTime = s.headStreamExpTime @@ -193,6 +197,11 @@ func newSummary(desc *Desc, opts SummaryOpts, labelValues ...string) Summary { } s.headStream = s.streams[0] + for qu := range s.objectives { + s.sortedObjectives = append(s.sortedObjectives, qu) + } + sort.Float64s(s.sortedObjectives) + s.Init(s) // Init self-collection. return s } @@ -206,8 +215,8 @@ type summary struct { desc *Desc - objectives []float64 - epsilon float64 + objectives map[float64]float64 + sortedObjectives []float64 labelPairs []*dto.LabelPair @@ -218,10 +227,9 @@ type summary struct { streams []*quantile.Stream streamDuration time.Duration + headStream *quantile.Stream headStreamIdx int headStreamExpTime, hotBufExpTime time.Time - - headStream, mergedTailStreams, mergedAllStreams *quantile.Stream } func (s *summary) Desc() *Desc { @@ -255,18 +263,15 @@ func (s *summary) Write(out *dto.Metric) error { s.bufMtx.Unlock() s.flushColdBuf() - s.mergedAllStreams.Merge(s.mergedTailStreams.Samples()) - s.mergedAllStreams.Merge(s.headStream.Samples()) sum.SampleCount = proto.Uint64(s.cnt) sum.SampleSum = proto.Float64(s.sum) - for _, rank := range s.objectives { + for _, rank := range s.sortedObjectives { qs = append(qs, &dto.Quantile{ Quantile: proto.Float64(rank), - Value: proto.Float64(s.mergedAllStreams.Query(rank)), + Value: proto.Float64(s.headStream.Query(rank)), }) } - s.mergedAllStreams.Reset() s.mtx.Unlock() @@ -281,9 +286,7 @@ func (s *summary) Write(out *dto.Metric) error { } func (s *summary) newStream() *quantile.Stream { - stream := quantile.NewTargeted(s.objectives...) - stream.SetEpsilon(s.epsilon) - return stream + return quantile.NewTargeted(s.objectives) } // asyncFlush needs bufMtx locked. @@ -302,32 +305,23 @@ func (s *summary) asyncFlush(now time.Time) { // rotateStreams needs mtx AND bufMtx locked. func (s *summary) maybeRotateStreams() { - if s.hotBufExpTime.Equal(s.headStreamExpTime) { - // Fast return to avoid re-merging s.mergedTailStreams. - return - } for !s.hotBufExpTime.Equal(s.headStreamExpTime) { + s.headStream.Reset() s.headStreamIdx++ if s.headStreamIdx >= len(s.streams) { s.headStreamIdx = 0 } s.headStream = s.streams[s.headStreamIdx] - s.headStream.Reset() s.headStreamExpTime = s.headStreamExpTime.Add(s.streamDuration) } - s.mergedTailStreams.Reset() - for _, stream := range s.streams { - if stream != s.headStream { - s.mergedTailStreams.Merge(stream.Samples()) - } - } - } // flushColdBuf needs mtx locked. func (s *summary) flushColdBuf() { for _, v := range s.coldBuf { - s.headStream.Insert(v) + for _, stream := range s.streams { + stream.Insert(v) + } s.cnt++ s.sum += v } @@ -337,6 +331,9 @@ func (s *summary) flushColdBuf() { // swapBufs needs mtx AND bufMtx locked, coldBuf must be empty. func (s *summary) swapBufs(now time.Time) { + if len(s.coldBuf) != 0 { + panic("coldBuf is not empty") + } s.hotBuf, s.coldBuf = s.coldBuf, s.hotBuf // hotBuf is now empty and gets new expiration set. for now.After(s.hotBufExpTime) { diff --git a/prometheus/summary_test.go b/prometheus/summary_test.go index 704fec5..bc242c3 100644 --- a/prometheus/summary_test.go +++ b/prometheus/summary_test.go @@ -123,19 +123,17 @@ func TestSummaryConcurrency(t *testing.T) { rand.Seed(42) it := func(n uint32) bool { - mutations := int(n%10000 + 1) - concLevel := int(n%15 + 1) + mutations := int(n%1e4 + 1e4) + concLevel := int(n%5 + 1) total := mutations * concLevel - ε := 0.001 var start, end sync.WaitGroup start.Add(1) end.Add(concLevel) sum := NewSummary(SummaryOpts{ - Name: "test_summary", - Help: "helpless", - Epsilon: ε, + Name: "test_summary", + Help: "helpless", }) allVars := make([]float64, total) @@ -170,14 +168,21 @@ func TestSummaryConcurrency(t *testing.T) { t.Errorf("got sample sum %f, want %f", got, want) } - for i, wantQ := range DefObjectives { + objectives := make([]float64, 0, len(DefObjectives)) + for qu := range DefObjectives { + objectives = append(objectives, qu) + } + sort.Float64s(objectives) + + for i, wantQ := range objectives { + ε := DefObjectives[wantQ] gotQ := *m.Summary.Quantile[i].Quantile gotV := *m.Summary.Quantile[i].Value min, max := getBounds(allVars, wantQ, ε) if gotQ != wantQ { t.Errorf("got quantile %f, want %f", gotQ, wantQ) } - if (gotV < min || gotV > max) && len(allVars) > 500 { // Avoid statistical outliers. + if gotV < min || gotV > max { t.Errorf("got %f for quantile %f, want [%f,%f]", gotV, gotQ, min, max) } } @@ -192,11 +197,17 @@ func TestSummaryConcurrency(t *testing.T) { func TestSummaryVecConcurrency(t *testing.T) { rand.Seed(42) + objectives := make([]float64, 0, len(DefObjectives)) + for qu := range DefObjectives { + + objectives = append(objectives, qu) + } + sort.Float64s(objectives) + it := func(n uint32) bool { - mutations := int(n%10000 + 1) - concLevel := int(n%15 + 1) - ε := 0.001 - vecLength := int(n%5 + 1) + mutations := int(n%1e4 + 1e4) + concLevel := int(n%7 + 1) + vecLength := int(n%3 + 1) var start, end sync.WaitGroup start.Add(1) @@ -204,9 +215,8 @@ func TestSummaryVecConcurrency(t *testing.T) { sum := NewSummaryVec( SummaryOpts{ - Name: "test_summary", - Help: "helpless", - Epsilon: ε, + Name: "test_summary", + Help: "helpless", }, []string{"label"}, ) @@ -249,16 +259,16 @@ func TestSummaryVecConcurrency(t *testing.T) { if got, want := *m.Summary.SampleSum, sampleSums[i]; math.Abs((got-want)/want) > 0.001 { t.Errorf("got sample sum %f for label %c, want %f", got, 'A'+i, want) } - for j, wantQ := range DefObjectives { + for j, wantQ := range objectives { + ε := DefObjectives[wantQ] gotQ := *m.Summary.Quantile[j].Quantile gotV := *m.Summary.Quantile[j].Value min, max := getBounds(allVars[i], wantQ, ε) if gotQ != wantQ { t.Errorf("got quantile %f for label %c, want %f", gotQ, 'A'+i, wantQ) } - if (gotV < min || gotV > max) && len(allVars[i]) > 500 { // Avoid statistical outliers. + if gotV < min || gotV > max { t.Errorf("got %f for quantile %f for label %c, want [%f,%f]", gotV, gotQ, 'A'+i, min, max) - t.Log(len(allVars[i])) } } } @@ -270,20 +280,18 @@ func TestSummaryVecConcurrency(t *testing.T) { } } -// TODO(beorn): This test fails on Travis, likely because it depends on -// timing. Fix that and then Remove the leading X from the function name. -func XTestSummaryDecay(t *testing.T) { +func TestSummaryDecay(t *testing.T) { sum := NewSummary(SummaryOpts{ Name: "test_summary", Help: "helpless", - Epsilon: 0.001, - MaxAge: 10 * time.Millisecond, - Objectives: []float64{0.1}, + MaxAge: 100 * time.Millisecond, + Objectives: map[float64]float64{0.1: 0.001}, + AgeBuckets: 10, }) m := &dto.Metric{} i := 0 - tick := time.NewTicker(100 * time.Microsecond) + tick := time.NewTicker(time.Millisecond) for _ = range tick.C { i++ sum.Observe(float64(i)) @@ -302,15 +310,19 @@ func XTestSummaryDecay(t *testing.T) { } func getBounds(vars []float64, q, ε float64) (min, max float64) { - lower := int((q - 4*ε) * float64(len(vars))) - upper := int((q+4*ε)*float64(len(vars))) + 1 + // TODO: This currently tolerates an error of up to 2*ε. The error must + // be at most ε, but for some reason, it's sometimes slightly + // higher. That's a bug. + n := float64(len(vars)) + lower := int((q - 2*ε) * n) + upper := int(math.Ceil((q + 2*ε) * n)) min = vars[0] - if lower > 0 { - min = vars[lower] + if lower > 1 { + min = vars[lower-1] } max = vars[len(vars)-1] - if upper < len(vars)-1 { - max = vars[upper] + if upper < len(vars) { + max = vars[upper-1] } return }