Vendorize perks/quantile.

Change-Id: I2b24bddf5a975a46ceb598db328c317982154466
This commit is contained in:
Bjoern Rabenstein 2014-06-23 19:48:50 +02:00
parent f9401ffab9
commit 010dc1af88
10 changed files with 3052 additions and 1 deletions

1
_vendor/perks/MANIFEST Normal file
View File

@ -0,0 +1 @@
Imported at da3e0acc8525a74a0ac8651ac5e7a68891291fdf from https://github.com/u-c-l/perks/tree/opt/pool-for-sample .

31
_vendor/perks/README.md Normal file
View File

@ -0,0 +1,31 @@
# Perks for Go (golang.org)
Perks contains the Go package quantile that computes approximate quantiles over
an unbounded data stream within low memory and CPU bounds.
For more information and examples, see:
http://godoc.org/github.com/bmizerany/perks
A very special thank you and shout out to Graham Cormode (Rutgers University),
Flip Korn (AT&T LabsResearch), S. Muthukrishnan (Rutgers University), and
Divesh Srivastava (AT&T LabsResearch) for their research and publication of
[Effective Computation of Biased Quantiles over Data Streams](http://www.cs.rutgers.edu/~muthu/bquant.pdf)
Thank you, also:
* Armon Dadgar (@armon)
* Andrew Gerrand (@nf)
* Brad Fitzpatrick (@bradfitz)
* Keith Rarick (@kr)
FAQ:
Q: Why not move the quantile package into the project root?
A: I want to add more packages to perks later.
Copyright (C) 2013 Blake Mizerany
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -0,0 +1,64 @@
package quantile
import (
"testing"
)
func BenchmarkInsertTargeted(b *testing.B) {
s := NewTargeted(0.01, 0.5, 0.9, 0.99)
b.ResetTimer()
for i := float64(0); i < float64(b.N); i++ {
s.Insert(i)
}
}
func BenchmarkInsertTargetedSmallEpsilon(b *testing.B) {
s := NewTargeted(0.01, 0.5, 0.9, 0.99)
s.SetEpsilon(0.0001)
b.ResetTimer()
for i := float64(0); i < float64(b.N); i++ {
s.Insert(i)
}
}
func BenchmarkInsertBiased(b *testing.B) {
s := NewBiased()
b.ResetTimer()
for i := float64(0); i < float64(b.N); i++ {
s.Insert(i)
}
}
func BenchmarkInsertBiasedSmallEpsilon(b *testing.B) {
s := NewBiased()
s.SetEpsilon(0.0001)
b.ResetTimer()
for i := float64(0); i < float64(b.N); i++ {
s.Insert(i)
}
}
func BenchmarkQuery(b *testing.B) {
s := NewTargeted(0.01, 0.5, 0.9, 0.99)
for i := float64(0); i < 1e6; i++ {
s.Insert(i)
}
b.ResetTimer()
n := float64(b.N)
for i := float64(0); i < n; i++ {
s.Query(i / n)
}
}
func BenchmarkQuerySmallEpsilon(b *testing.B) {
s := NewTargeted(0.01, 0.5, 0.9, 0.99)
s.SetEpsilon(0.0001)
for i := float64(0); i < 1e6; i++ {
s.Insert(i)
}
b.ResetTimer()
n := float64(b.N)
for i := float64(0); i < n; i++ {
s.Query(i / n)
}
}

View File

@ -0,0 +1,112 @@
// +build go1.1
package quantile_test
import (
"bufio"
"fmt"
"github.com/bmizerany/perks/quantile"
"log"
"os"
"strconv"
"time"
)
func Example_simple() {
ch := make(chan float64)
go sendFloats(ch)
// Compute the 50th, 90th, and 99th percentile.
q := quantile.NewTargeted(0.50, 0.90, 0.99)
for v := range ch {
q.Insert(v)
}
fmt.Println("perc50:", q.Query(0.50))
fmt.Println("perc90:", q.Query(0.90))
fmt.Println("perc99:", q.Query(0.99))
fmt.Println("count:", q.Count())
// Output:
// perc50: 5
// perc90: 14
// perc99: 40
// count: 2388
}
func Example_mergeMultipleStreams() {
// Scenario:
// We have multiple database shards. On each shard, there is a process
// collecting query response times from the database logs and inserting
// them into a Stream (created via NewTargeted(0.90)), much like the
// Simple example. These processes expose a network interface for us to
// ask them to serialize and send us the results of their
// Stream.Samples so we may Merge and Query them.
//
// NOTES:
// * These sample sets are small, allowing us to get them
// across the network much faster than sending the entire list of data
// points.
//
// * For this to work correctly, we must supply the same quantiles
// a priori the process collecting the samples supplied to NewTargeted,
// even if we do not plan to query them all here.
ch := make(chan quantile.Samples)
getDBQuerySamples(ch)
q := quantile.NewTargeted(0.90)
for samples := range ch {
q.Merge(samples)
}
fmt.Println("perc90:", q.Query(0.90))
}
func Example_window() {
// Scenario: We want the 90th, 95th, and 99th percentiles for each
// minute.
ch := make(chan float64)
go sendStreamValues(ch)
tick := time.NewTicker(1 * time.Minute)
q := quantile.NewTargeted(0.90, 0.95, 0.99)
for {
select {
case t := <-tick.C:
flushToDB(t, q.Samples())
q.Reset()
case v := <-ch:
q.Insert(v)
}
}
}
func sendStreamValues(ch chan float64) {
// Use your imagination
}
func flushToDB(t time.Time, samples quantile.Samples) {
// Use your imagination
}
// This is a stub for the above example. In reality this would hit the remote
// servers via http or something like it.
func getDBQuerySamples(ch chan quantile.Samples) {}
func sendFloats(ch chan<- float64) {
f, err := os.Open("exampledata.txt")
if err != nil {
log.Fatal(err)
}
sc := bufio.NewScanner(f)
for sc.Scan() {
b := sc.Bytes()
v, err := strconv.ParseFloat(string(b), 64)
if err != nil {
log.Fatal(err)
}
ch <- v
}
if sc.Err() != nil {
log.Fatal(sc.Err())
}
close(ch)
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,28 @@
//+build !go1.3
package quantile
type samplePool struct {
pool chan *Sample
}
func newSamplePool(capacity int) *samplePool {
return &samplePool{pool: make(chan *Sample, capacity)}
}
func (sp *samplePool) Get(value, width, delta float64) *Sample {
select {
case sample := <-sp.pool:
sample.Value, sample.Width, sample.Delta = value, width, delta
return sample
default:
return &Sample{value, width, delta}
}
}
func (sp *samplePool) Put(sample *Sample) {
select {
case sp.pool <- sample:
default:
}
}

View File

@ -0,0 +1,26 @@
//+build go1.3
package quantile
import "sync"
// With the Go1.3 sync Pool, there is no max capacity, and a globally shared
// pool is more efficient.
var globalSamplePool = sync.Pool{New: func() interface{} { return &Sample{} }}
type samplePool struct{}
func newSamplePool(capacity int) *samplePool {
// capacity ignored for Go1.3 sync.Pool.
return &samplePool{}
}
func (_ samplePool) Get(value, width, delta float64) *Sample {
sample := globalSamplePool.Get().(*Sample)
sample.Value, sample.Width, sample.Delta = value, width, delta
return sample
}
func (_ samplePool) Put(sample *Sample) {
globalSamplePool.Put(sample)
}

View File

@ -0,0 +1,272 @@
// Package quantile computes approximate quantiles over an unbounded data
// stream within low memory and CPU bounds.
//
// A small amount of accuracy is traded to achieve the above properties.
//
// Multiple streams can be merged before calling Query to generate a single set
// of results. This is meaningful when the streams represent the same type of
// data. See Merge and Samples.
//
// For more detailed information about the algorithm used, see:
//
// Effective Computation of Biased Quantiles over Data Streams
//
// http://www.cs.rutgers.edu/~muthu/bquant.pdf
package quantile
import (
"math"
"sort"
)
// Sample holds an observed value and meta information for compression. JSON
// tags have been added for convenience.
type Sample struct {
Value float64 `json:",string"`
Width float64 `json:",string"`
Delta float64 `json:",string"`
}
// Samples represents a slice of samples. It implements sort.Interface.
type Samples []Sample
func (a Samples) Len() int { return len(a) }
func (a Samples) Less(i, j int) bool { return a[i].Value < a[j].Value }
func (a Samples) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
type invariant func(s *stream, r float64) float64
// NewBiased returns an initialized Stream for high-biased quantiles (e.g.
// 50th, 90th, 99th) not known a priori with finer error guarantees for the
// higher ranks of the data distribution.
// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties.
func NewBiased() *Stream {
ƒ := func(s *stream, r float64) float64 {
return 2 * s.epsilon * r
}
return newStream(ƒ)
}
// NewTargeted returns an initialized Stream concerned with a particular set of
// quantile values that are supplied a priori. Knowing these a priori reduces
// space and computation time.
// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties.
func NewTargeted(quantiles ...float64) *Stream {
ƒ := func(s *stream, r float64) float64 {
var m float64 = math.MaxFloat64
var f float64
for _, q := range quantiles {
if q*s.n <= r {
f = (2 * s.epsilon * r) / q
} else {
f = (2 * s.epsilon * (s.n - r)) / (1 - q)
}
if f < m {
m = f
}
}
return m
}
return newStream(ƒ)
}
// Stream computes quantiles for a stream of float64s. It is not thread-safe by
// design. Take care when using across multiple goroutines.
type Stream struct {
*stream
b Samples
sorted bool
}
func newStream(ƒ invariant) *Stream {
const defaultEpsilon = 0.01
x := &stream{
epsilon: defaultEpsilon,
ƒ: ƒ,
pool: newSamplePool(1024),
}
return &Stream{x, make(Samples, 0, 500), true}
}
// Insert inserts v into the stream.
func (s *Stream) Insert(v float64) {
s.insert(Sample{Value: v, Width: 1})
}
func (s *Stream) insert(sample Sample) {
s.b = append(s.b, sample)
s.sorted = false
if len(s.b) == cap(s.b) {
s.flush()
s.compress()
}
}
// Query returns the computed qth percentiles value. If s was created with
// NewTargeted, and q is not in the set of quantiles provided a priori, Query
// will return an unspecified result.
func (s *Stream) Query(q float64) float64 {
if !s.flushed() {
// Fast path when there hasn't been enough data for a flush;
// this also yields better accuracy for small sets of data.
l := len(s.b)
if l == 0 {
return 0
}
i := int(float64(l) * q)
if i > 0 {
i -= 1
}
s.maybeSort()
return s.b[i].Value
}
s.flush()
return s.stream.query(q)
}
// Merge merges samples into the underlying streams samples. This is handy when
// merging multiple streams from separate threads, database shards, etc.
func (s *Stream) Merge(samples Samples) {
sort.Sort(samples)
s.stream.merge(samples)
}
// Reset reinitializes and clears the list reusing the samples buffer memory.
func (s *Stream) Reset() {
s.stream.reset()
s.b = s.b[:0]
}
// Samples returns stream samples held by s.
func (s *Stream) Samples() Samples {
if !s.flushed() {
return s.b
}
s.flush()
s.compress()
return s.stream.samples()
}
// Count returns the total number of samples observed in the stream
// since initialization.
func (s *Stream) Count() int {
return len(s.b) + s.stream.count()
}
func (s *Stream) flush() {
s.maybeSort()
s.stream.merge(s.b)
s.b = s.b[:0]
}
func (s *Stream) maybeSort() {
if !s.sorted {
s.sorted = true
sort.Sort(s.b)
}
}
func (s *Stream) flushed() bool {
return len(s.stream.l) > 0
}
type stream struct {
epsilon float64
n float64
l []*Sample
ƒ invariant
pool *samplePool
}
// SetEpsilon sets the error epsilon for the Stream. The default epsilon is
// 0.01 and is usually satisfactory. If needed, this must be called before all
// Inserts.
// To learn more, see: http://www.cs.rutgers.edu/~muthu/bquant.pdf
func (s *stream) SetEpsilon(epsilon float64) {
s.epsilon = epsilon
}
func (s *stream) reset() {
for _, sample := range s.l {
s.pool.Put(sample)
}
s.l = s.l[:0]
s.n = 0
}
func (s *stream) insert(v float64) {
s.merge(Samples{{v, 1, 0}})
}
func (s *stream) merge(samples Samples) {
var r float64
i := 0
for _, sample := range samples {
for ; i < len(s.l); i++ {
c := s.l[i]
if c.Value > sample.Value {
// Insert at position i.
s.l = append(s.l, nil)
copy(s.l[i+1:], s.l[i:])
s.l[i] = s.pool.Get(sample.Value, sample.Width, math.Floor(s.ƒ(s, r))-1)
i++
goto inserted
}
r += c.Width
}
s.l = append(s.l, s.pool.Get(sample.Value, sample.Width, 0))
i++
inserted:
s.n += sample.Width
}
}
func (s *stream) count() int {
return int(s.n)
}
func (s *stream) query(q float64) float64 {
t := math.Ceil(q * s.n)
t += math.Ceil(s.ƒ(s, t) / 2)
p := s.l[0]
r := float64(0)
for _, c := range s.l[1:] {
if r+c.Width+c.Delta > t {
return p.Value
}
r += p.Width
p = c
}
return p.Value
}
func (s *stream) compress() {
if len(s.l) < 2 {
return
}
x := s.l[len(s.l)-1]
r := s.n - 1 - x.Width
for i := len(s.l) - 2; i >= 0; i-- {
c := s.l[i]
if c.Width+x.Width+x.Delta <= s.ƒ(s, r) {
x.Width += c.Width
// Remove element at i.
copy(s.l[i:], s.l[i+1:])
s.l[len(s.l)-1] = nil
s.l = s.l[:len(s.l)-1]
s.pool.Put(c)
} else {
x = c
}
r -= c.Width
}
}
func (s *stream) samples() Samples {
samples := make(Samples, len(s.l))
for i, c := range s.l {
samples[i] = *c
}
return samples
}

View File

@ -0,0 +1,128 @@
package quantile
import (
"math"
"math/rand"
"sort"
"testing"
)
func TestQuantRandQuery(t *testing.T) {
s := NewTargeted(0.5, 0.90, 0.99)
a := make([]float64, 0, 1e5)
rand.Seed(42)
for i := 0; i < cap(a); i++ {
v := float64(rand.Int63())
s.Insert(v)
a = append(a, v)
}
t.Logf("len: %d", s.Count())
sort.Float64s(a)
w := getPerc(a, 0.50)
if g := s.Query(0.50); math.Abs(w-g)/w > 0.03 {
t.Errorf("perc50: want %v, got %v", w, g)
t.Logf("e: %f", math.Abs(w-g)/w)
}
w = getPerc(a, 0.90)
if g := s.Query(0.90); math.Abs(w-g)/w > 0.03 {
t.Errorf("perc90: want %v, got %v", w, g)
t.Logf("e: %f", math.Abs(w-g)/w)
}
w = getPerc(a, 0.99)
if g := s.Query(0.99); math.Abs(w-g)/w > 0.03 {
t.Errorf("perc99: want %v, got %v", w, g)
t.Logf("e: %f", math.Abs(w-g)/w)
}
}
func TestQuantRandMergeQuery(t *testing.T) {
ch := make(chan float64)
done := make(chan *Stream)
for i := 0; i < 2; i++ {
go func() {
s := NewTargeted(0.5, 0.90, 0.99)
for v := range ch {
s.Insert(v)
}
done <- s
}()
}
rand.Seed(42)
a := make([]float64, 0, 1e6)
for i := 0; i < cap(a); i++ {
v := float64(rand.Int63())
a = append(a, v)
ch <- v
}
close(ch)
s := <-done
o := <-done
s.Merge(o.Samples())
t.Logf("len: %d", s.Count())
sort.Float64s(a)
w := getPerc(a, 0.50)
if g := s.Query(0.50); math.Abs(w-g)/w > 0.03 {
t.Errorf("perc50: want %v, got %v", w, g)
t.Logf("e: %f", math.Abs(w-g)/w)
}
w = getPerc(a, 0.90)
if g := s.Query(0.90); math.Abs(w-g)/w > 0.03 {
t.Errorf("perc90: want %v, got %v", w, g)
t.Logf("e: %f", math.Abs(w-g)/w)
}
w = getPerc(a, 0.99)
if g := s.Query(0.99); math.Abs(w-g)/w > 0.03 {
t.Errorf("perc99: want %v, got %v", w, g)
t.Logf("e: %f", math.Abs(w-g)/w)
}
}
func TestUncompressed(t *testing.T) {
tests := []float64{0.50, 0.90, 0.95, 0.99}
q := NewTargeted(tests...)
for i := 100; i > 0; i-- {
q.Insert(float64(i))
}
if g := q.Count(); g != 100 {
t.Errorf("want count 100, got %d", g)
}
// Before compression, Query should have 100% accuracy.
for _, v := range tests {
w := v * 100
if g := q.Query(v); g != w {
t.Errorf("want %f, got %f", w, g)
}
}
}
func TestUncompressedSamples(t *testing.T) {
q := NewTargeted(0.99)
for i := 1; i <= 100; i++ {
q.Insert(float64(i))
}
if g := q.Samples().Len(); g != 100 {
t.Errorf("want count 100, got %d", g)
}
}
func TestUncompressedOne(t *testing.T) {
q := NewTargeted(0.90)
q.Insert(3.14)
if g := q.Query(0.90); g != 3.14 {
t.Error("want PI, got", g)
}
}
func TestDefaults(t *testing.T) {
if g := NewTargeted(0.99).Query(0.99); g != 0 {
t.Errorf("want 0, got %f", g)
}
}
func getPerc(x []float64, p float64) float64 {
k := int(float64(len(x)) * p)
return x[k]
}

View File

@ -21,9 +21,10 @@ import (
"time" "time"
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
"github.com/bmizerany/perks/quantile" // TODO: Vendorize?
dto "github.com/prometheus/client_model/go" dto "github.com/prometheus/client_model/go"
"github.com/prometheus/client_golang/_vendor/perks/quantile"
) )
// A Summary captures individual observations from an event or sample stream and // A Summary captures individual observations from an event or sample stream and