- Improved comments throughout the package.

- Added normal and exponential distributions to one of the examples.
- Improved the naming of a couple of local variables.
- Handled an error in the AccumulatingBucket ValueForIndex function whereby
  the vestigal old behavior was accidentally preserved and not updated.
  This could have been caught had the tests been updated first.
- Simplify Histogram prospectiveIndexForPercentile such that various
  small tasks it performs are extracted into separate functions for easier
  testing and code comprehension.
- Remedy a regression in Histogram prospectiveIndexForPercentile whereby
  the prospective index may have included the terminating element of a
  bucket.
- Provide help for Histogram prospectiveIndexForPercentile such that requesting
  the terminating element of a bucket will fast-forward to the first element of
  the next non-empty bucket.
- Fix TallingBucket's boundary constant, because they were originally keyed toward
  percentages [0, 100], not decimal-based ones.  The antique tests had been
  temporarily commented out, which prevented this regression from being exposed.
This commit is contained in:
Matt T. Proud 2012-05-22 09:20:09 +02:00
parent fe4f71b333
commit 5ea9b1a0b5
12 changed files with 153 additions and 109 deletions

72
examples/random/main.go Normal file
View File

@ -0,0 +1,72 @@
// Copyright (c) 2012, Matt T. Proud
// All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// main.go provides a simple example of how to use this instrumentation
// framework in the context of having something that emits values into
// its collectors.
//
// The emitted values correspond to uniform, normal, and exponential
// distributions.
package main
import (
"github.com/matttproud/golang_instrumentation"
"github.com/matttproud/golang_instrumentation/maths"
"github.com/matttproud/golang_instrumentation/metrics"
"math/rand"
"net/http"
"time"
)
func main() {
foo_rpc_latency := metrics.CreateHistogram(&metrics.HistogramSpecification{
Starts: metrics.EquallySizedBucketsFor(0, 200, 4),
BucketMaker: metrics.AccumulatingBucketBuilder(metrics.EvictAndReplaceWith(10, maths.Average), 50),
ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99},
})
foo_rpc_calls := &metrics.GaugeMetric{}
bar_rpc_latency := metrics.CreateHistogram(&metrics.HistogramSpecification{
Starts: metrics.EquallySizedBucketsFor(0, 200, 4),
BucketMaker: metrics.AccumulatingBucketBuilder(metrics.EvictAndReplaceWith(10, maths.Average), 50),
ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99},
})
bar_rpc_calls := &metrics.GaugeMetric{}
zed_rpc_latency := metrics.CreateHistogram(&metrics.HistogramSpecification{
Starts: metrics.EquallySizedBucketsFor(0, 200, 4),
BucketMaker: metrics.AccumulatingBucketBuilder(metrics.EvictAndReplaceWith(10, maths.Average), 50),
ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99},
})
zed_rpc_calls := &metrics.GaugeMetric{}
metrics := registry.NewRegistry()
metrics.Register("foo_rpc_latency_ms_histogram", foo_rpc_latency)
metrics.Register("foo_rpc_call_count", foo_rpc_calls)
metrics.Register("bar_rpc_latency_ms_histogram", bar_rpc_latency)
metrics.Register("bar_rpc_call_count", bar_rpc_calls)
metrics.Register("zed_rpc_latency_ms_histogram", zed_rpc_latency)
metrics.Register("zed_rpc_call_count", zed_rpc_calls)
go func() {
for {
foo_rpc_latency.Add(rand.Float64() * 200)
foo_rpc_calls.Increment()
bar_rpc_latency.Add((rand.NormFloat64() * 10.0) + 100.0)
bar_rpc_calls.Increment()
zed_rpc_latency.Add(rand.ExpFloat64())
zed_rpc_calls.Increment()
time.Sleep(100 * time.Millisecond)
}
}()
exporter := metrics.YieldExporter()
http.Handle("/metrics.json", exporter)
http.ListenAndServe(":8080", nil)
}

View File

@ -1,46 +0,0 @@
// Copyright (c) 2012, Matt T. Proud
// All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// main.go provides a simple example of how to use this instrumentation
// framework in the context of having something that emits values into
// its collectors.
package main
import (
"github.com/matttproud/golang_instrumentation"
"github.com/matttproud/golang_instrumentation/maths"
"github.com/matttproud/golang_instrumentation/metrics"
"math/rand"
"net/http"
"time"
)
func main() {
foo_rpc_latency := metrics.CreateHistogram(&metrics.HistogramSpecification{
Starts: metrics.EquallySizedBucketsFor(0, 200, 4),
BucketMaker: metrics.AccumulatingBucketBuilder(metrics.EvictAndReplaceWith(10, maths.Average), 50),
ReportablePercentiles: []float64{0.01, 0.5, 0.90, 0.99},
})
foo_rpc_calls := &metrics.GaugeMetric{}
metrics := registry.NewRegistry()
metrics.Register("foo_rpc_latency_ms_histogram", foo_rpc_latency)
metrics.Register("foo_rpc_call_count", foo_rpc_calls)
go func() {
for {
foo_rpc_latency.Add(rand.Float64() * 200)
foo_rpc_calls.Increment()
time.Sleep(500 * time.Millisecond)
}
}()
exporter := metrics.YieldExporter()
http.Handle("/metrics.json", exporter)
http.ListenAndServe(":8080", nil)
}

View File

@ -29,6 +29,9 @@ type AccumulatingBucket struct {
evictionPolicy EvictionPolicy
}
// AccumulatingBucketBuilder is a convenience method for generating a
// BucketBuilder that produces AccumatingBucket entries with a certain
// behavior set.
func AccumulatingBucketBuilder(evictionPolicy EvictionPolicy, maximumSize int) BucketBuilder {
return func() Bucket {
return &AccumulatingBucket{
@ -87,21 +90,20 @@ func (b *AccumulatingBucket) ValueForIndex(index int) float64 {
return math.NaN()
}
rawData := make([]float64, elementCount)
sortedElements := make([]float64, elementCount)
for i, element := range b.elements {
rawData[i] = element.Value.(float64)
sortedElements[i] = element.Value.(float64)
}
sort.Float64s(rawData)
sort.Float64s(sortedElements)
// N.B.(mtp): Interfacing components should not need to comprehend what
// evictions strategy is used; therefore, we adjust this silently.
if index >= elementCount {
return rawData[elementCount-1]
}
// eviction and storage container strategies used; therefore,
// we adjust this silently.
targetIndex := int(float64(elementCount-1) * (float64(index) / float64(b.observations)))
return rawData[index]
return sortedElements[targetIndex]
}
func (b *AccumulatingBucket) Observations() int {

View File

@ -139,8 +139,7 @@ func (s *S) TestAccumulatingBucketValueForIndex(c *C) {
}
c.Check(b.ValueForIndex(0), Equals, 1.0)
c.Check(b.ValueForIndex(50), Equals, 51.0)
c.Check(b.ValueForIndex(99), Equals, 100.0)
c.Check(b.ValueForIndex(50), Equals, 50.0)
c.Check(b.ValueForIndex(100), Equals, 100.0)
for i := 101.0; i <= 150; i += 1 {
@ -149,8 +148,11 @@ func (s *S) TestAccumulatingBucketValueForIndex(c *C) {
time.Sleep(1 * time.Millisecond)
}
// The bucket's capacity has been exceeded by inputs at this point;
// consequently, we search for a given element by percentage offset
// therein.
c.Check(b.ValueForIndex(0), Equals, 51.0)
c.Check(b.ValueForIndex(50), Equals, 101.0)
c.Check(b.ValueForIndex(99), Equals, 150.0)
c.Check(b.ValueForIndex(100), Equals, 150.0)
c.Check(b.ValueForIndex(50), Equals, 84.0)
c.Check(b.ValueForIndex(99), Equals, 116.0)
c.Check(b.ValueForIndex(100), Equals, 117.0)
}

View File

@ -8,10 +8,10 @@
package metrics
// A Metric is something that can be exposed via the registry framework.
type Metric interface {
// Produce a human-consumable representation of the metric.
Humanize() string
// Produce a JSON-consumable representation of the metric.
// TODO(mtp):
Marshallable() map[string]interface{}
}

View File

@ -14,6 +14,9 @@ type BucketBuilder func() Bucket
// This defines the base Bucket type. The exact behaviors of the bucket are
// at the whim of the implementor.
//
// A Bucket is used as a container by Histogram as a collection for its
// accumulated samples.
type Bucket interface {
// Add a value to the bucket.
Add(value float64)
@ -22,6 +25,8 @@ type Bucket interface {
// Provide a count of observations throughout the bucket's lifetime.
Observations() int
// Provide the value from the given in-memory value cache or an estimate
// thereof.
// thereof for the given index. The consumer of the bucket's data makes
// no assumptions about the underlying storage mechanisms that the bucket
// employs.
ValueForIndex(index int) float64
}

View File

@ -42,8 +42,6 @@ func (s *S) TestEvictOldest(c *C) {
c.Check(heap.Pop(&q), utility.ValueEquals, 0.0)
}
// TODO(mtp): Extract reduction mechanisms into local variables.
func (s *S) TestEvictAndReplaceWithAverage(c *C) {
q := make(utility.PriorityQueue, 0, 10)
heap.Init(&q)

View File

@ -108,6 +108,7 @@ func (h *Histogram) Humanize() string {
return string(stringBuffer.Bytes())
}
// Determine the number of previous observations up to a given index.
func previousCumulativeObservations(cumulativeObservations []int, bucketIndex int) int {
if bucketIndex == 0 {
return 0
@ -116,8 +117,22 @@ func previousCumulativeObservations(cumulativeObservations []int, bucketIndex in
return cumulativeObservations[bucketIndex-1]
}
// Determine the index for an element given a percentage of length.
func prospectiveIndexForPercentile(percentile float64, totalObservations int) int {
return int(math.Floor(percentile * float64(totalObservations)))
return int(percentile * float64(totalObservations-1))
}
// Determine the next bucket element when interim bucket intervals may be empty.
func (h *Histogram) nextNonEmptyBucketElement(currentIndex, bucketCount int, observationsByBucket []int) (*Bucket, int) {
for i := currentIndex; i < bucketCount; i++ {
if observationsByBucket[i] == 0 {
continue
}
return &h.buckets[i], 0
}
panic("Illegal Condition: There were no remaining buckets to provide a value.")
}
// Find what bucket and element index contains a given percentile value.
@ -125,7 +140,7 @@ func prospectiveIndexForPercentile(percentile float64, totalObservations int) in
// longer contained by the bucket, the index of the last item is returned. This
// may occur if the underlying bucket catalogs values and employs an eviction
// strategy.
func (h *Histogram) bucketForPercentile(percentile float64) (bucket *Bucket, index int) {
func (h *Histogram) bucketForPercentile(percentile float64) (*Bucket, int) {
bucketCount := len(h.buckets)
// This captures the quantity of samples in a given bucket's range.
@ -159,11 +174,11 @@ func (h *Histogram) bucketForPercentile(percentile float64) (bucket *Bucket, ind
// This calculates the index within the current bucket where the given
// percentile may be found.
subIndex = prospectiveIndex - previousCumulativeObservations(cumulativeObservationsByBucket, i)
// Sometimes the index may be the last item, in which case we need to
// take this into account. This is probably indicative of an underlying
// problem.
// take this into account.
if observationsByBucket[i] == subIndex {
subIndex--
return h.nextNonEmptyBucketElement(i+1, bucketCount, observationsByBucket)
}
return &h.buckets[i], subIndex

View File

@ -124,7 +124,7 @@ func (s *S) TestBucketForPercentile(c *C) {
bucket, subindex = h.bucketForPercentile(0.51)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 2)
c.Check(subindex, Equals, 1)
c.Check((*bucket).Observations(), Equals, 51)
}
@ -263,7 +263,7 @@ func (s *S) TestBucketForPercentileDoubleInSingleBucket(c *C) {
bucket, subindex = h.bucketForPercentile(0.5)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 1)
c.Check(subindex, Equals, 0)
c.Check((*bucket).Observations(), Equals, 2)
bucket, subindex = h.bucketForPercentile(0.01)
@ -295,7 +295,7 @@ func (s *S) TestBucketForPercentileDoubleInSingleBucket(c *C) {
bucket, subindex = h.bucketForPercentile(0.5)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 1)
c.Check(subindex, Equals, 0)
c.Check((*bucket).Observations(), Equals, 2)
bucket, subindex = h.bucketForPercentile(0.01)
@ -327,7 +327,7 @@ func (s *S) TestBucketForPercentileDoubleInSingleBucket(c *C) {
bucket, subindex = h.bucketForPercentile(0.5)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 1)
c.Check(subindex, Equals, 0)
c.Check((*bucket).Observations(), Equals, 2)
bucket, subindex = h.bucketForPercentile(0.01)
@ -370,13 +370,13 @@ func (s *S) TestBucketForPercentileTripleInSingleBucket(c *C) {
bucket, subindex = h.bucketForPercentile(0.67)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 2)
c.Check(subindex, Equals, 1)
c.Check((*bucket).Observations(), Equals, 3)
bucket, subindex = h.bucketForPercentile(2.0 / 3.0)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 2)
c.Check(subindex, Equals, 1)
c.Check((*bucket).Observations(), Equals, 3)
bucket, subindex = h.bucketForPercentile(0.5)
@ -388,7 +388,7 @@ func (s *S) TestBucketForPercentileTripleInSingleBucket(c *C) {
bucket, subindex = h.bucketForPercentile(1.0 / 3.0)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 1)
c.Check(subindex, Equals, 0)
c.Check((*bucket).Observations(), Equals, 3)
bucket, subindex = h.bucketForPercentile(0.01)
@ -412,13 +412,13 @@ func (s *S) TestBucketForPercentileTripleInSingleBucket(c *C) {
bucket, subindex = h.bucketForPercentile(0.67)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 2)
c.Check(subindex, Equals, 1)
c.Check((*bucket).Observations(), Equals, 3)
bucket, subindex = h.bucketForPercentile(2.0 / 3.0)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 2)
c.Check(subindex, Equals, 1)
c.Check((*bucket).Observations(), Equals, 3)
bucket, subindex = h.bucketForPercentile(0.5)
@ -430,7 +430,7 @@ func (s *S) TestBucketForPercentileTripleInSingleBucket(c *C) {
bucket, subindex = h.bucketForPercentile(1.0 / 3.0)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 1)
c.Check(subindex, Equals, 0)
c.Check((*bucket).Observations(), Equals, 3)
bucket, subindex = h.bucketForPercentile(0.01)
@ -454,13 +454,13 @@ func (s *S) TestBucketForPercentileTripleInSingleBucket(c *C) {
bucket, subindex = h.bucketForPercentile(0.67)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 2)
c.Check(subindex, Equals, 1)
c.Check((*bucket).Observations(), Equals, 3)
bucket, subindex = h.bucketForPercentile(2.0 / 3.0)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 2)
c.Check(subindex, Equals, 1)
c.Check((*bucket).Observations(), Equals, 3)
bucket, subindex = h.bucketForPercentile(0.5)
@ -472,7 +472,7 @@ func (s *S) TestBucketForPercentileTripleInSingleBucket(c *C) {
bucket, subindex = h.bucketForPercentile(1.0 / 3.0)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 1)
c.Check(subindex, Equals, 0)
c.Check((*bucket).Observations(), Equals, 3)
bucket, subindex = h.bucketForPercentile(0.01)
@ -634,7 +634,7 @@ func (s *S) TestBucketForPercentileTwoAdjacenciesUnequal(c *C) {
bucket, subindex = h.bucketForPercentile(1.0 / 3.0)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 1)
c.Check(subindex, Equals, 0)
c.Check((*bucket).Observations(), Equals, 2)
bucket, subindex = h.bucketForPercentile(0.01)
@ -658,20 +658,20 @@ func (s *S) TestBucketForPercentileTwoAdjacenciesUnequal(c *C) {
bucket, subindex = h.bucketForPercentile(0.67)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 1)
c.Check(subindex, Equals, 0)
c.Check((*bucket).Observations(), Equals, 2)
bucket, subindex = h.bucketForPercentile(2.0 / 3.0)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 1)
c.Check(subindex, Equals, 0)
c.Check((*bucket).Observations(), Equals, 2)
bucket, subindex = h.bucketForPercentile(0.5)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 0)
c.Check((*bucket).Observations(), Equals, 1)
c.Check((*bucket).Observations(), Equals, 2)
bucket, subindex = h.bucketForPercentile(1.0 / 3.0)
@ -718,7 +718,7 @@ func (s *S) TestBucketForPercentileTwoAdjacenciesUnequal(c *C) {
bucket, subindex = h.bucketForPercentile(1.0 / 3.0)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 1)
c.Check(subindex, Equals, 0)
c.Check((*bucket).Observations(), Equals, 2)
bucket, subindex = h.bucketForPercentile(0.01)
@ -742,20 +742,20 @@ func (s *S) TestBucketForPercentileTwoAdjacenciesUnequal(c *C) {
bucket, subindex = h.bucketForPercentile(0.67)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 1)
c.Check(subindex, Equals, 0)
c.Check((*bucket).Observations(), Equals, 2)
bucket, subindex = h.bucketForPercentile(2.0 / 3.0)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 1)
c.Check(subindex, Equals, 0)
c.Check((*bucket).Observations(), Equals, 2)
bucket, subindex = h.bucketForPercentile(0.5)
c.Assert(*bucket, Not(IsNil))
c.Check(subindex, Equals, 0)
c.Check((*bucket).Observations(), Equals, 1)
c.Check((*bucket).Observations(), Equals, 2)
bucket, subindex = h.bucketForPercentile(1.0 / 3.0)

View File

@ -18,8 +18,8 @@ import (
)
const (
lowerThird = 100.0 / 3.0
upperThird = 2.0 * (100.0 / 3.0)
lowerThird = 1.0 / 3.0
upperThird = 2.0 * lowerThird
)
// A TallyingIndexEstimator is responsible for estimating the value of index for

View File

@ -35,18 +35,9 @@ func (s *S) TestTallyingPercentilesEstimatorAverage(c *C) {
func (s *S) TestTallyingPercentilesEstimatorUniform(c *C) {
c.Assert(Uniform(-5, 5, 0, 0), maths.IsNaN)
// TODO(mtp): Rewrite.
// for i := 0.0; i < 33.3; i += 0.1 {
// c.Check(Uniform(-5, 5, i, 2), Equals, -5.0)
// }
// for i := 33.4; i < 66.0; i += 0.1 {
// c.Check(Uniform(-5, 5, i, 2), Equals, 0.0)
// }
// for i := 66.7; i < 100.0; i += 0.1 {
// c.Check(Uniform(-5, 5, i, 2), Equals, 5.0)
// }
c.Check(Uniform(-5, 5, 0, 2), Equals, -5.0)
c.Check(Uniform(-5, 5, 1, 2), Equals, 0.0)
c.Check(Uniform(-5, 5, 2, 2), Equals, 5.0)
}
func (s *S) TestTallyingBucketBuilder(c *C) {

View File

@ -4,7 +4,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// registry.go provides a container for centralization exposition of metrics to
// registry.go provides a container for centralized exposition of metrics to
// their prospective consumers.
package registry
@ -20,6 +20,9 @@ import (
"time"
)
// Boilerplate metrics about the metrics reporting subservice. These are only
// exposed if the DefaultRegistry's exporter is hooked into the HTTP request
// handler.
var requestCount *metrics.GaugeMetric = &metrics.GaugeMetric{}
var requestLatencyLogarithmicBuckets []float64 = metrics.LogarithmicSizedBucketsFor(0, 1000)
var requestLatencyEqualBuckets []float64 = metrics.EquallySizedBucketsFor(0, 1000, 10)
@ -44,13 +47,15 @@ var requestLatencyEqualTallying *metrics.Histogram = metrics.CreateHistogram(&me
ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.9, 0.99},
})
// This callback accumulates the microsecond duration of the reporting
// framework's overhead such that it can be reported.
var requestLatencyAccumulator metrics.CompletionCallback = func(duration time.Duration) {
micros := float64(int64(duration) / 1E3)
microseconds := float64(int64(duration) / 1E3)
requestLatencyLogarithmicAccumulating.Add(micros)
requestLatencyEqualAccumulating.Add(micros)
requestLatencyLogarithmicTallying.Add(micros)
requestLatencyEqualTallying.Add(micros)
requestLatencyLogarithmicAccumulating.Add(microseconds)
requestLatencyEqualAccumulating.Add(microseconds)
requestLatencyLogarithmicTallying.Add(microseconds)
requestLatencyEqualTallying.Add(microseconds)
}
// Registry is, as the name implies, a registrar where metrics are listed.