Update vendoring of perks to newest (fixed) version.

Adjust the API and usage accordingly.
Make tests stricter.

Since the merging is still faulty, test are broken now.
The next commit will fix it by avoiding merging.
This commit is contained in:
Bjoern Rabenstein 2015-01-20 18:27:10 +01:00
parent 26e2417d3e
commit 6b9530d72e
11 changed files with 563 additions and 159 deletions

View File

@ -1 +1 @@
Imported at 5d903d2c5dc7f55829e36c62ae6c5f5f6d75e70a from https://github.com/u-c-l/perks . Imported at f15ca8fc2964cb9f291e1cf17bb1bf9a4f9e23d5 from https://github.com/beorn7/perks .

View File

@ -0,0 +1,26 @@
package histogram
import (
"math/rand"
"testing"
)
func BenchmarkInsert10Bins(b *testing.B) {
b.StopTimer()
h := New(10)
b.StartTimer()
for i := 0; i < b.N; i++ {
f := rand.ExpFloat64()
h.Insert(f)
}
}
func BenchmarkInsert100Bins(b *testing.B) {
b.StopTimer()
h := New(100)
b.StartTimer()
for i := 0; i < b.N; i++ {
f := rand.ExpFloat64()
h.Insert(f)
}
}

View File

@ -0,0 +1,108 @@
// Package histogram provides a Go implementation of BigML's histogram package
// for Clojure/Java. It is currently experimental.
package histogram
import (
"container/heap"
"math"
"sort"
)
type Bin struct {
Count int
Sum float64
}
func (b *Bin) Update(x *Bin) {
b.Count += x.Count
b.Sum += x.Sum
}
func (b *Bin) Mean() float64 {
return b.Sum / float64(b.Count)
}
type Bins []*Bin
func (bs Bins) Len() int { return len(bs) }
func (bs Bins) Less(i, j int) bool { return bs[i].Mean() < bs[j].Mean() }
func (bs Bins) Swap(i, j int) { bs[i], bs[j] = bs[j], bs[i] }
func (bs *Bins) Push(x interface{}) {
*bs = append(*bs, x.(*Bin))
}
func (bs *Bins) Pop() interface{} {
return bs.remove(len(*bs) - 1)
}
func (bs *Bins) remove(n int) *Bin {
if n < 0 || len(*bs) < n {
return nil
}
x := (*bs)[n]
*bs = append((*bs)[:n], (*bs)[n+1:]...)
return x
}
type Histogram struct {
res *reservoir
}
func New(maxBins int) *Histogram {
return &Histogram{res: newReservoir(maxBins)}
}
func (h *Histogram) Insert(f float64) {
h.res.insert(&Bin{1, f})
h.res.compress()
}
func (h *Histogram) Bins() Bins {
return h.res.bins
}
type reservoir struct {
n int
maxBins int
bins Bins
}
func newReservoir(maxBins int) *reservoir {
return &reservoir{maxBins: maxBins}
}
func (r *reservoir) insert(bin *Bin) {
r.n += bin.Count
i := sort.Search(len(r.bins), func(i int) bool {
return r.bins[i].Mean() >= bin.Mean()
})
if i < 0 || i == r.bins.Len() {
// TODO(blake): Maybe use an .insert(i, bin) instead of
// performing the extra work of a heap.Push.
heap.Push(&r.bins, bin)
return
}
r.bins[i].Update(bin)
}
func (r *reservoir) compress() {
for r.bins.Len() > r.maxBins {
minGapIndex := -1
minGap := math.MaxFloat64
for i := 0; i < r.bins.Len()-1; i++ {
gap := gapWeight(r.bins[i], r.bins[i+1])
if minGap > gap {
minGap = gap
minGapIndex = i
}
}
prev := r.bins[minGapIndex]
next := r.bins.remove(minGapIndex + 1)
prev.Update(next)
}
}
func gapWeight(prev, next *Bin) float64 {
return next.Mean() - prev.Mean()
}

View File

@ -0,0 +1,38 @@
package histogram
import (
"math/rand"
"testing"
)
func TestHistogram(t *testing.T) {
const numPoints = 1e6
const maxBins = 3
h := New(maxBins)
for i := 0; i < numPoints; i++ {
f := rand.ExpFloat64()
h.Insert(f)
}
bins := h.Bins()
if g := len(bins); g > maxBins {
t.Fatalf("got %d bins, wanted <= %d", g, maxBins)
}
for _, b := range bins {
t.Logf("%+v", b)
}
if g := count(h.Bins()); g != numPoints {
t.Fatalf("binned %d points, wanted %d", g, numPoints)
}
}
func count(bins Bins) int {
binCounts := 0
for _, b := range bins {
binCounts += b.Count
}
return binCounts
}

View File

@ -7,7 +7,7 @@ import (
func BenchmarkInsertTargeted(b *testing.B) { func BenchmarkInsertTargeted(b *testing.B) {
b.ReportAllocs() b.ReportAllocs()
s := NewTargeted(0.01, 0.5, 0.9, 0.99) s := NewTargeted(Targets)
b.ResetTimer() b.ResetTimer()
for i := float64(0); i < float64(b.N); i++ { for i := float64(0); i < float64(b.N); i++ {
s.Insert(i) s.Insert(i)
@ -15,8 +15,7 @@ func BenchmarkInsertTargeted(b *testing.B) {
} }
func BenchmarkInsertTargetedSmallEpsilon(b *testing.B) { func BenchmarkInsertTargetedSmallEpsilon(b *testing.B) {
s := NewTargeted(0.01, 0.5, 0.9, 0.99) s := NewTargeted(TargetsSmallEpsilon)
s.SetEpsilon(0.0001)
b.ResetTimer() b.ResetTimer()
for i := float64(0); i < float64(b.N); i++ { for i := float64(0); i < float64(b.N); i++ {
s.Insert(i) s.Insert(i)
@ -24,7 +23,7 @@ func BenchmarkInsertTargetedSmallEpsilon(b *testing.B) {
} }
func BenchmarkInsertBiased(b *testing.B) { func BenchmarkInsertBiased(b *testing.B) {
s := NewBiased() s := NewLowBiased(0.01)
b.ResetTimer() b.ResetTimer()
for i := float64(0); i < float64(b.N); i++ { for i := float64(0); i < float64(b.N); i++ {
s.Insert(i) s.Insert(i)
@ -32,8 +31,7 @@ func BenchmarkInsertBiased(b *testing.B) {
} }
func BenchmarkInsertBiasedSmallEpsilon(b *testing.B) { func BenchmarkInsertBiasedSmallEpsilon(b *testing.B) {
s := NewBiased() s := NewLowBiased(0.0001)
s.SetEpsilon(0.0001)
b.ResetTimer() b.ResetTimer()
for i := float64(0); i < float64(b.N); i++ { for i := float64(0); i < float64(b.N); i++ {
s.Insert(i) s.Insert(i)
@ -41,7 +39,7 @@ func BenchmarkInsertBiasedSmallEpsilon(b *testing.B) {
} }
func BenchmarkQuery(b *testing.B) { func BenchmarkQuery(b *testing.B) {
s := NewTargeted(0.01, 0.5, 0.9, 0.99) s := NewTargeted(Targets)
for i := float64(0); i < 1e6; i++ { for i := float64(0); i < 1e6; i++ {
s.Insert(i) s.Insert(i)
} }
@ -53,8 +51,7 @@ func BenchmarkQuery(b *testing.B) {
} }
func BenchmarkQuerySmallEpsilon(b *testing.B) { func BenchmarkQuerySmallEpsilon(b *testing.B) {
s := NewTargeted(0.01, 0.5, 0.9, 0.99) s := NewTargeted(TargetsSmallEpsilon)
s.SetEpsilon(0.0001)
for i := float64(0); i < 1e6; i++ { for i := float64(0); i < 1e6; i++ {
s.Insert(i) s.Insert(i)
} }

View File

@ -36,30 +36,56 @@ func (a Samples) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
type invariant func(s *stream, r float64) float64 type invariant func(s *stream, r float64) float64
// NewBiased returns an initialized Stream for high-biased quantiles (e.g. // NewLowBiased returns an initialized Stream for low-biased quantiles
// 50th, 90th, 99th) not known a priori with finer error guarantees for the // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
// higher ranks of the data distribution. // error guarantees can still be given even for the lower ranks of the data
// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties. // distribution.
func NewBiased() *Stream { //
// The provided epsilon is a relative error, i.e. the true quantile of a value
// returned by a query is guaranteed to be within (1±Epsilon)*Quantile.
//
// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
// properties.
func NewLowBiased(epsilon float64) *Stream {
ƒ := func(s *stream, r float64) float64 { ƒ := func(s *stream, r float64) float64 {
return 2 * s.epsilon * r return 2 * epsilon * r
}
return newStream(ƒ)
}
// NewHighBiased returns an initialized Stream for high-biased quantiles
// (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
// error guarantees can still be given even for the higher ranks of the data
// distribution.
//
// The provided epsilon is a relative error, i.e. the true quantile of a value
// returned by a query is guaranteed to be within 1-(1±Epsilon)*(1-Quantile).
//
// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
// properties.
func NewHighBiased(epsilon float64) *Stream {
ƒ := func(s *stream, r float64) float64 {
return 2 * epsilon * (s.n - r)
} }
return newStream(ƒ) return newStream(ƒ)
} }
// NewTargeted returns an initialized Stream concerned with a particular set of // NewTargeted returns an initialized Stream concerned with a particular set of
// quantile values that are supplied a priori. Knowing these a priori reduces // quantile values that are supplied a priori. Knowing these a priori reduces
// space and computation time. // space and computation time. The targets map maps the desired quantiles to
// their absolute errors, i.e. the true quantile of a value returned by a query
// is guaranteed to be within (Quantile±Epsilon).
//
// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties. // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties.
func NewTargeted(quantiles ...float64) *Stream { func NewTargeted(targets map[float64]float64) *Stream {
ƒ := func(s *stream, r float64) float64 { ƒ := func(s *stream, r float64) float64 {
var m float64 = math.MaxFloat64 var m = math.MaxFloat64
var f float64 var f float64
for _, q := range quantiles { for quantile, epsilon := range targets {
if q*s.n <= r { if quantile*s.n <= r {
f = (2 * s.epsilon * r) / q f = (2 * epsilon * r) / quantile
} else { } else {
f = (2 * s.epsilon * (s.n - r)) / (1 - q) f = (2 * epsilon * (s.n - r)) / (1 - quantile)
} }
if f < m { if f < m {
m = f m = f
@ -79,8 +105,7 @@ type Stream struct {
} }
func newStream(ƒ invariant) *Stream { func newStream(ƒ invariant) *Stream {
const defaultEpsilon = 0.01 x := &stream{ƒ: ƒ}
x := &stream{epsilon: defaultEpsilon, ƒ: ƒ}
return &Stream{x, make(Samples, 0, 500), true} return &Stream{x, make(Samples, 0, 500), true}
} }
@ -94,7 +119,6 @@ func (s *Stream) insert(sample Sample) {
s.sorted = false s.sorted = false
if len(s.b) == cap(s.b) { if len(s.b) == cap(s.b) {
s.flush() s.flush()
s.compress()
} }
} }
@ -122,6 +146,9 @@ func (s *Stream) Query(q float64) float64 {
// Merge merges samples into the underlying streams samples. This is handy when // Merge merges samples into the underlying streams samples. This is handy when
// merging multiple streams from separate threads, database shards, etc. // merging multiple streams from separate threads, database shards, etc.
//
// ATTENTION: This method is broken and does not yield correct results. The
// underlying algorithm is not capable of merging streams correctly.
func (s *Stream) Merge(samples Samples) { func (s *Stream) Merge(samples Samples) {
sort.Sort(samples) sort.Sort(samples)
s.stream.merge(samples) s.stream.merge(samples)
@ -139,7 +166,6 @@ func (s *Stream) Samples() Samples {
return s.b return s.b
} }
s.flush() s.flush()
s.compress()
return s.stream.samples() return s.stream.samples()
} }
@ -167,18 +193,9 @@ func (s *Stream) flushed() bool {
} }
type stream struct { type stream struct {
epsilon float64 n float64
n float64 l []Sample
l []Sample ƒ invariant
ƒ invariant
}
// SetEpsilon sets the error epsilon for the Stream. The default epsilon is
// 0.01 and is usually satisfactory. If needed, this must be called before all
// Inserts.
// To learn more, see: http://www.cs.rutgers.edu/~muthu/bquant.pdf
func (s *stream) SetEpsilon(epsilon float64) {
s.epsilon = epsilon
} }
func (s *stream) reset() { func (s *stream) reset() {
@ -191,6 +208,10 @@ func (s *stream) insert(v float64) {
} }
func (s *stream) merge(samples Samples) { func (s *stream) merge(samples Samples) {
// TODO(beorn7): This tries to merge not only individual samples, but
// whole summaries. The paper doesn't mention merging summaries at
// all. Unittests show that the merging is inaccurate. Find out how to
// do merges properly.
var r float64 var r float64
i := 0 i := 0
for _, sample := range samples { for _, sample := range samples {
@ -200,7 +221,12 @@ func (s *stream) merge(samples Samples) {
// Insert at position i. // Insert at position i.
s.l = append(s.l, Sample{}) s.l = append(s.l, Sample{})
copy(s.l[i+1:], s.l[i:]) copy(s.l[i+1:], s.l[i:])
s.l[i] = Sample{sample.Value, sample.Width, math.Floor(s.ƒ(s, r)) - 1} s.l[i] = Sample{
sample.Value,
sample.Width,
math.Max(sample.Delta, math.Floor(s.ƒ(s, r))-1),
// TODO(beorn7): How to calculate delta correctly?
}
i++ i++
goto inserted goto inserted
} }
@ -210,7 +236,9 @@ func (s *stream) merge(samples Samples) {
i++ i++
inserted: inserted:
s.n += sample.Width s.n += sample.Width
r += sample.Width
} }
s.compress()
} }
func (s *stream) count() int { func (s *stream) count() int {
@ -221,12 +249,12 @@ func (s *stream) query(q float64) float64 {
t := math.Ceil(q * s.n) t := math.Ceil(q * s.n)
t += math.Ceil(s.ƒ(s, t) / 2) t += math.Ceil(s.ƒ(s, t) / 2)
p := s.l[0] p := s.l[0]
r := float64(0) var r float64
for _, c := range s.l[1:] { for _, c := range s.l[1:] {
r += p.Width
if r+c.Width+c.Delta > t { if r+c.Width+c.Delta > t {
return p.Value return p.Value
} }
r += p.Width
p = c p = c
} }
return p.Value return p.Value

View File

@ -1,81 +1,150 @@
package quantile package quantile
import ( import (
"math"
"math/rand" "math/rand"
"sort" "sort"
"testing" "testing"
) )
func TestQuantRandQuery(t *testing.T) { var (
s := NewTargeted(0.5, 0.90, 0.99) Targets = map[float64]float64{
a := make([]float64, 0, 1e5) 0.01: 0.001,
rand.Seed(42) 0.10: 0.01,
0.50: 0.05,
0.90: 0.01,
0.99: 0.001,
}
TargetsSmallEpsilon = map[float64]float64{
0.01: 0.0001,
0.10: 0.001,
0.50: 0.005,
0.90: 0.001,
0.99: 0.0001,
}
LowQuantiles = []float64{0.01, 0.1, 0.5}
HighQuantiles = []float64{0.99, 0.9, 0.5}
)
const RelativeEpsilon = 0.01
func verifyPercsWithAbsoluteEpsilon(t *testing.T, a []float64, s *Stream) {
sort.Float64s(a)
for quantile, epsilon := range Targets {
n := float64(len(a))
k := int(quantile * n)
lower := int((quantile - epsilon) * n)
if lower < 1 {
lower = 1
}
upper := int(math.Ceil((quantile + epsilon) * n))
if upper > len(a) {
upper = len(a)
}
w, min, max := a[k-1], a[lower-1], a[upper-1]
if g := s.Query(quantile); g < min || g > max {
t.Errorf("q=%f: want %v [%f,%f], got %v", quantile, w, min, max, g)
}
}
}
func verifyLowPercsWithRelativeEpsilon(t *testing.T, a []float64, s *Stream) {
sort.Float64s(a)
for _, qu := range LowQuantiles {
n := float64(len(a))
k := int(qu * n)
lowerRank := int((1 - RelativeEpsilon) * qu * n)
upperRank := int(math.Ceil((1 + RelativeEpsilon) * qu * n))
w, min, max := a[k-1], a[lowerRank-1], a[upperRank-1]
if g := s.Query(qu); g < min || g > max {
t.Errorf("q=%f: want %v [%f,%f], got %v", qu, w, min, max, g)
}
}
}
func verifyHighPercsWithRelativeEpsilon(t *testing.T, a []float64, s *Stream) {
sort.Float64s(a)
for _, qu := range HighQuantiles {
n := float64(len(a))
k := int(qu * n)
lowerRank := int((1 - (1+RelativeEpsilon)*(1-qu)) * n)
upperRank := int(math.Ceil((1 - (1-RelativeEpsilon)*(1-qu)) * n))
w, min, max := a[k-1], a[lowerRank-1], a[upperRank-1]
if g := s.Query(qu); g < min || g > max {
t.Errorf("q=%f: want %v [%f,%f], got %v", qu, w, min, max, g)
}
}
}
func populateStream(s *Stream) []float64 {
a := make([]float64, 0, 1e5+100)
for i := 0; i < cap(a); i++ { for i := 0; i < cap(a); i++ {
v := rand.NormFloat64() v := rand.NormFloat64()
// Add 5% asymmetric outliers.
if i%20 == 0 {
v = v*v + 1
}
s.Insert(v) s.Insert(v)
a = append(a, v) a = append(a, v)
} }
t.Logf("len: %d", s.Count()) return a
sort.Float64s(a)
w, min, max := getPerc(a, 0.50)
if g := s.Query(0.50); g < min || g > max {
t.Errorf("perc50: want %v [%f,%f], got %v", w, min, max, g)
}
w, min, max = getPerc(a, 0.90)
if g := s.Query(0.90); g < min || g > max {
t.Errorf("perc90: want %v [%f,%f], got %v", w, min, max, g)
}
w, min, max = getPerc(a, 0.99)
if g := s.Query(0.99); g < min || g > max {
t.Errorf("perc99: want %v [%f,%f], got %v", w, min, max, g)
}
} }
func TestQuantRandMergeQuery(t *testing.T) { func TestTargetedQuery(t *testing.T) {
ch := make(chan float64)
done := make(chan *Stream)
for i := 0; i < 2; i++ {
go func() {
s := NewTargeted(0.5, 0.90, 0.99)
for v := range ch {
s.Insert(v)
}
done <- s
}()
}
rand.Seed(42) rand.Seed(42)
a := make([]float64, 0, 1e6) s := NewTargeted(Targets)
for i := 0; i < cap(a); i++ { a := populateStream(s)
v := rand.NormFloat64() verifyPercsWithAbsoluteEpsilon(t, a, s)
a = append(a, v) }
ch <- v
}
close(ch)
s := <-done func TestLowBiasedQuery(t *testing.T) {
o := <-done rand.Seed(42)
s.Merge(o.Samples()) s := NewLowBiased(RelativeEpsilon)
a := populateStream(s)
verifyLowPercsWithRelativeEpsilon(t, a, s)
}
t.Logf("len: %d", s.Count()) func TestHighBiasedQuery(t *testing.T) {
sort.Float64s(a) rand.Seed(42)
w, min, max := getPerc(a, 0.50) s := NewHighBiased(RelativeEpsilon)
if g := s.Query(0.50); g < min || g > max { a := populateStream(s)
t.Errorf("perc50: want %v [%f,%f], got %v", w, min, max, g) verifyHighPercsWithRelativeEpsilon(t, a, s)
} }
w, min, max = getPerc(a, 0.90)
if g := s.Query(0.90); g < min || g > max { func TestTargetedMerge(t *testing.T) {
t.Errorf("perc90: want %v [%f,%f], got %v", w, min, max, g) rand.Seed(42)
} s1 := NewTargeted(Targets)
w, min, max = getPerc(a, 0.99) s2 := NewTargeted(Targets)
if g := s.Query(0.99); g < min || g > max { a := populateStream(s1)
t.Errorf("perc99: want %v [%f,%f], got %v", w, min, max, g) a = append(a, populateStream(s2)...)
} s1.Merge(s2.Samples())
verifyPercsWithAbsoluteEpsilon(t, a, s1)
}
func TestLowBiasedMerge(t *testing.T) {
rand.Seed(42)
s1 := NewLowBiased(RelativeEpsilon)
s2 := NewLowBiased(RelativeEpsilon)
a := populateStream(s1)
a = append(a, populateStream(s2)...)
s1.Merge(s2.Samples())
verifyLowPercsWithRelativeEpsilon(t, a, s2)
}
func TestHighBiasedMerge(t *testing.T) {
rand.Seed(42)
s1 := NewHighBiased(RelativeEpsilon)
s2 := NewHighBiased(RelativeEpsilon)
a := populateStream(s1)
a = append(a, populateStream(s2)...)
s1.Merge(s2.Samples())
verifyHighPercsWithRelativeEpsilon(t, a, s2)
} }
func TestUncompressed(t *testing.T) { func TestUncompressed(t *testing.T) {
tests := []float64{0.50, 0.90, 0.95, 0.99} q := NewTargeted(Targets)
q := NewTargeted(tests...)
for i := 100; i > 0; i-- { for i := 100; i > 0; i-- {
q.Insert(float64(i)) q.Insert(float64(i))
} }
@ -83,16 +152,16 @@ func TestUncompressed(t *testing.T) {
t.Errorf("want count 100, got %d", g) t.Errorf("want count 100, got %d", g)
} }
// Before compression, Query should have 100% accuracy. // Before compression, Query should have 100% accuracy.
for _, v := range tests { for quantile := range Targets {
w := v * 100 w := quantile * 100
if g := q.Query(v); g != w { if g := q.Query(quantile); g != w {
t.Errorf("want %f, got %f", w, g) t.Errorf("want %f, got %f", w, g)
} }
} }
} }
func TestUncompressedSamples(t *testing.T) { func TestUncompressedSamples(t *testing.T) {
q := NewTargeted(0.99) q := NewTargeted(map[float64]float64{0.99: 0.001})
for i := 1; i <= 100; i++ { for i := 1; i <= 100; i++ {
q.Insert(float64(i)) q.Insert(float64(i))
} }
@ -102,7 +171,7 @@ func TestUncompressedSamples(t *testing.T) {
} }
func TestUncompressedOne(t *testing.T) { func TestUncompressedOne(t *testing.T) {
q := NewTargeted(0.90) q := NewTargeted(map[float64]float64{0.99: 0.01})
q.Insert(3.14) q.Insert(3.14)
if g := q.Query(0.90); g != 3.14 { if g := q.Query(0.90); g != 3.14 {
t.Error("want PI, got", g) t.Error("want PI, got", g)
@ -110,20 +179,7 @@ func TestUncompressedOne(t *testing.T) {
} }
func TestDefaults(t *testing.T) { func TestDefaults(t *testing.T) {
if g := NewTargeted(0.99).Query(0.99); g != 0 { if g := NewTargeted(map[float64]float64{0.99: 0.001}).Query(0.99); g != 0 {
t.Errorf("want 0, got %f", g) t.Errorf("want 0, got %f", g)
} }
} }
func getPerc(x []float64, p float64) (want, min, max float64) {
k := int(float64(len(x)) * p)
lower := int(float64(len(x)) * (p - 0.04))
if lower < 0 {
lower = 0
}
upper := int(float64(len(x))*(p+0.04)) + 1
if upper >= len(x) {
upper = len(x) - 1
}
return x[k], x[lower], x[upper]
}

View File

@ -0,0 +1,90 @@
package topk
import (
"sort"
)
// http://www.cs.ucsb.edu/research/tech_reports/reports/2005-23.pdf
type Element struct {
Value string
Count int
}
type Samples []*Element
func (sm Samples) Len() int {
return len(sm)
}
func (sm Samples) Less(i, j int) bool {
return sm[i].Count < sm[j].Count
}
func (sm Samples) Swap(i, j int) {
sm[i], sm[j] = sm[j], sm[i]
}
type Stream struct {
k int
mon map[string]*Element
// the minimum Element
min *Element
}
func New(k int) *Stream {
s := new(Stream)
s.k = k
s.mon = make(map[string]*Element)
s.min = &Element{}
// Track k+1 so that less frequenet items contended for that spot,
// resulting in k being more accurate.
return s
}
func (s *Stream) Insert(x string) {
s.insert(&Element{x, 1})
}
func (s *Stream) Merge(sm Samples) {
for _, e := range sm {
s.insert(e)
}
}
func (s *Stream) insert(in *Element) {
e := s.mon[in.Value]
if e != nil {
e.Count++
} else {
if len(s.mon) < s.k+1 {
e = &Element{in.Value, in.Count}
s.mon[in.Value] = e
} else {
e = s.min
delete(s.mon, e.Value)
e.Value = in.Value
e.Count += in.Count
s.min = e
}
}
if e.Count < s.min.Count {
s.min = e
}
}
func (s *Stream) Query() Samples {
var sm Samples
for _, e := range s.mon {
sm = append(sm, e)
}
sort.Sort(sort.Reverse(sm))
if len(sm) < s.k {
return sm
}
return sm[:s.k]
}

View File

@ -0,0 +1,57 @@
package topk
import (
"fmt"
"math/rand"
"sort"
"testing"
)
func TestTopK(t *testing.T) {
stream := New(10)
ss := []*Stream{New(10), New(10), New(10)}
m := make(map[string]int)
for _, s := range ss {
for i := 0; i < 1e6; i++ {
v := fmt.Sprintf("%x", int8(rand.ExpFloat64()))
s.Insert(v)
m[v]++
}
stream.Merge(s.Query())
}
var sm Samples
for x, s := range m {
sm = append(sm, &Element{x, s})
}
sort.Sort(sort.Reverse(sm))
g := stream.Query()
if len(g) != 10 {
t.Fatalf("got %d, want 10", len(g))
}
for i, e := range g {
if sm[i].Value != e.Value {
t.Errorf("at %d: want %q, got %q", i, sm[i].Value, e.Value)
}
}
}
func TestQuery(t *testing.T) {
queryTests := []struct {
value string
expected int
}{
{"a", 1},
{"b", 2},
{"c", 2},
}
stream := New(2)
for _, tt := range queryTests {
stream.Insert(tt.value)
if n := len(stream.Query()); n != tt.expected {
t.Errorf("want %d, got %d", tt.expected, n)
}
}
}

View File

@ -46,7 +46,7 @@ type Summary interface {
// DefObjectives are the default Summary quantile values. // DefObjectives are the default Summary quantile values.
var ( var (
DefObjectives = []float64{0.5, 0.9, 0.99} DefObjectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}
) )
// Default values for SummaryOpts. // Default values for SummaryOpts.
@ -59,8 +59,6 @@ const (
DefAgeBuckets = 10 DefAgeBuckets = 10
// DefBufCap is the standard buffer size for collecting Summary observations. // DefBufCap is the standard buffer size for collecting Summary observations.
DefBufCap = 500 DefBufCap = 500
// DefEpsilon is the default error epsilon for the quantile rank estimates.
DefEpsilon = 0.001
) )
// SummaryOpts bundles the options for creating a Summary metric. It is // SummaryOpts bundles the options for creating a Summary metric. It is
@ -101,9 +99,9 @@ type SummaryOpts struct {
// metric name). // metric name).
ConstLabels Labels ConstLabels Labels
// Objectives defines the quantile rank estimates. The default value is // Objectives defines the quantile rank estimates with their respective
// DefObjectives. // absolute error. The default value is DefObjectives.
Objectives []float64 Objectives map[float64]float64
// MaxAge defines the duration for which an observation stays relevant // MaxAge defines the duration for which an observation stays relevant
// for the summary. Must be positive. The default value is DefMaxAge. // for the summary. Must be positive. The default value is DefMaxAge.
@ -164,18 +162,11 @@ func newSummary(desc *Desc, opts SummaryOpts, labelValues ...string) Summary {
opts.BufCap = DefBufCap opts.BufCap = DefBufCap
} }
if opts.Epsilon < 0 {
panic(fmt.Errorf("illegal value for Epsilon=%f", opts.Epsilon))
}
if opts.Epsilon == 0. {
opts.Epsilon = DefEpsilon
}
s := &summary{ s := &summary{
desc: desc, desc: desc,
objectives: opts.Objectives, objectives: opts.Objectives,
epsilon: opts.Epsilon, sortedObjectives: make([]float64, 0, len(opts.Objectives)),
labelPairs: makeLabelPairs(desc, labelValues), labelPairs: makeLabelPairs(desc, labelValues),
@ -193,6 +184,11 @@ func newSummary(desc *Desc, opts SummaryOpts, labelValues ...string) Summary {
} }
s.headStream = s.streams[0] s.headStream = s.streams[0]
for qu := range DefObjectives {
s.sortedObjectives = append(s.sortedObjectives, qu)
}
sort.Float64s(s.sortedObjectives)
s.Init(s) // Init self-collection. s.Init(s) // Init self-collection.
return s return s
} }
@ -206,8 +202,8 @@ type summary struct {
desc *Desc desc *Desc
objectives []float64 objectives map[float64]float64
epsilon float64 sortedObjectives []float64
labelPairs []*dto.LabelPair labelPairs []*dto.LabelPair
@ -260,7 +256,7 @@ func (s *summary) Write(out *dto.Metric) error {
sum.SampleCount = proto.Uint64(s.cnt) sum.SampleCount = proto.Uint64(s.cnt)
sum.SampleSum = proto.Float64(s.sum) sum.SampleSum = proto.Float64(s.sum)
for _, rank := range s.objectives { for _, rank := range s.sortedObjectives {
qs = append(qs, &dto.Quantile{ qs = append(qs, &dto.Quantile{
Quantile: proto.Float64(rank), Quantile: proto.Float64(rank),
Value: proto.Float64(s.mergedAllStreams.Query(rank)), Value: proto.Float64(s.mergedAllStreams.Query(rank)),
@ -281,9 +277,7 @@ func (s *summary) Write(out *dto.Metric) error {
} }
func (s *summary) newStream() *quantile.Stream { func (s *summary) newStream() *quantile.Stream {
stream := quantile.NewTargeted(s.objectives...) return quantile.NewTargeted(s.objectives)
stream.SetEpsilon(s.epsilon)
return stream
} }
// asyncFlush needs bufMtx locked. // asyncFlush needs bufMtx locked.

View File

@ -126,16 +126,14 @@ func TestSummaryConcurrency(t *testing.T) {
mutations := int(n%10000 + 1) mutations := int(n%10000 + 1)
concLevel := int(n%15 + 1) concLevel := int(n%15 + 1)
total := mutations * concLevel total := mutations * concLevel
ε := 0.001
var start, end sync.WaitGroup var start, end sync.WaitGroup
start.Add(1) start.Add(1)
end.Add(concLevel) end.Add(concLevel)
sum := NewSummary(SummaryOpts{ sum := NewSummary(SummaryOpts{
Name: "test_summary", Name: "test_summary",
Help: "helpless", Help: "helpless",
Epsilon: ε,
}) })
allVars := make([]float64, total) allVars := make([]float64, total)
@ -170,14 +168,21 @@ func TestSummaryConcurrency(t *testing.T) {
t.Errorf("got sample sum %f, want %f", got, want) t.Errorf("got sample sum %f, want %f", got, want)
} }
for i, wantQ := range DefObjectives { objectives := make([]float64, 0, len(DefObjectives))
for qu := range DefObjectives {
objectives = append(objectives, qu)
}
sort.Float64s(objectives)
for i, wantQ := range objectives {
ε := DefObjectives[wantQ]
gotQ := *m.Summary.Quantile[i].Quantile gotQ := *m.Summary.Quantile[i].Quantile
gotV := *m.Summary.Quantile[i].Value gotV := *m.Summary.Quantile[i].Value
min, max := getBounds(allVars, wantQ, ε) min, max := getBounds(allVars, wantQ, ε)
if gotQ != wantQ { if gotQ != wantQ {
t.Errorf("got quantile %f, want %f", gotQ, wantQ) t.Errorf("got quantile %f, want %f", gotQ, wantQ)
} }
if (gotV < min || gotV > max) && len(allVars) > 500 { // Avoid statistical outliers. if gotV < min || gotV > max {
t.Errorf("got %f for quantile %f, want [%f,%f]", gotV, gotQ, min, max) t.Errorf("got %f for quantile %f, want [%f,%f]", gotV, gotQ, min, max)
} }
} }
@ -192,10 +197,15 @@ func TestSummaryConcurrency(t *testing.T) {
func TestSummaryVecConcurrency(t *testing.T) { func TestSummaryVecConcurrency(t *testing.T) {
rand.Seed(42) rand.Seed(42)
objectives := make([]float64, 0, len(DefObjectives))
for qu := range DefObjectives {
objectives = append(objectives, qu)
}
sort.Float64s(objectives)
it := func(n uint32) bool { it := func(n uint32) bool {
mutations := int(n%10000 + 1) mutations := int(n%10000 + 1)
concLevel := int(n%15 + 1) concLevel := int(n%15 + 1)
ε := 0.001
vecLength := int(n%5 + 1) vecLength := int(n%5 + 1)
var start, end sync.WaitGroup var start, end sync.WaitGroup
@ -204,9 +214,8 @@ func TestSummaryVecConcurrency(t *testing.T) {
sum := NewSummaryVec( sum := NewSummaryVec(
SummaryOpts{ SummaryOpts{
Name: "test_summary", Name: "test_summary",
Help: "helpless", Help: "helpless",
Epsilon: ε,
}, },
[]string{"label"}, []string{"label"},
) )
@ -249,14 +258,15 @@ func TestSummaryVecConcurrency(t *testing.T) {
if got, want := *m.Summary.SampleSum, sampleSums[i]; math.Abs((got-want)/want) > 0.001 { if got, want := *m.Summary.SampleSum, sampleSums[i]; math.Abs((got-want)/want) > 0.001 {
t.Errorf("got sample sum %f for label %c, want %f", got, 'A'+i, want) t.Errorf("got sample sum %f for label %c, want %f", got, 'A'+i, want)
} }
for j, wantQ := range DefObjectives { for j, wantQ := range objectives {
ε := DefObjectives[wantQ]
gotQ := *m.Summary.Quantile[j].Quantile gotQ := *m.Summary.Quantile[j].Quantile
gotV := *m.Summary.Quantile[j].Value gotV := *m.Summary.Quantile[j].Value
min, max := getBounds(allVars[i], wantQ, ε) min, max := getBounds(allVars[i], wantQ, ε)
if gotQ != wantQ { if gotQ != wantQ {
t.Errorf("got quantile %f for label %c, want %f", gotQ, 'A'+i, wantQ) t.Errorf("got quantile %f for label %c, want %f", gotQ, 'A'+i, wantQ)
} }
if (gotV < min || gotV > max) && len(allVars[i]) > 500 { // Avoid statistical outliers. if gotV < min || gotV > max {
t.Errorf("got %f for quantile %f for label %c, want [%f,%f]", gotV, gotQ, 'A'+i, min, max) t.Errorf("got %f for quantile %f for label %c, want [%f,%f]", gotV, gotQ, 'A'+i, min, max)
t.Log(len(allVars[i])) t.Log(len(allVars[i]))
} }
@ -276,9 +286,8 @@ func XTestSummaryDecay(t *testing.T) {
sum := NewSummary(SummaryOpts{ sum := NewSummary(SummaryOpts{
Name: "test_summary", Name: "test_summary",
Help: "helpless", Help: "helpless",
Epsilon: 0.001,
MaxAge: 10 * time.Millisecond, MaxAge: 10 * time.Millisecond,
Objectives: []float64{0.1}, Objectives: map[float64]float64{0.1: 0.001},
}) })
m := &dto.Metric{} m := &dto.Metric{}
@ -302,15 +311,16 @@ func XTestSummaryDecay(t *testing.T) {
} }
func getBounds(vars []float64, q, ε float64) (min, max float64) { func getBounds(vars []float64, q, ε float64) (min, max float64) {
lower := int((q - 4*ε) * float64(len(vars))) n := float64(len(vars))
upper := int((q+4*ε)*float64(len(vars))) + 1 lower := int((q - ε) * n)
upper := int(math.Ceil((q + ε) * n))
min = vars[0] min = vars[0]
if lower > 0 { if lower > 1 {
min = vars[lower] min = vars[lower-1]
} }
max = vars[len(vars)-1] max = vars[len(vars)-1]
if upper < len(vars)-1 { if upper < len(vars) {
max = vars[upper] max = vars[upper-1]
} }
return return
} }