added support backward compatibility

Signed-off-by: Darya Melentsova <ifireice@gmail.com>
This commit is contained in:
Darya Melentsova 2023-05-14 18:07:07 +03:00
parent a7ff852155
commit 8bc96219a0
2 changed files with 115 additions and 40 deletions

View File

@ -38,11 +38,22 @@ const (
millisecondsPerSecond = 1000 millisecondsPerSecond = 1000
) )
// ErrorHandler is a function that handles errors // HandlerErrorHandling defines how a Handler serving metrics will handle
type ErrorHandler func(err error) // errors.
type HandlerErrorHandling int
// DefaultErrorHandler skips received errors // These constants cause handlers serving metrics to behave as described if
var DefaultErrorHandler = func(err error) {} // errors are encountered.
const (
// Ignore errors and try to push as many metrics to Graphite as possible.
ContinueOnError HandlerErrorHandling = iota
// Abort the push to Graphite upon the first error encountered.
AbortOnError
// Execute callback function on error.
CallbackOnError
)
// Config defines the Graphite bridge config. // Config defines the Graphite bridge config.
type Config struct { type Config struct {
@ -64,8 +75,16 @@ type Config struct {
// The Gatherer to use for metrics. Defaults to prometheus.DefaultGatherer. // The Gatherer to use for metrics. Defaults to prometheus.DefaultGatherer.
Gatherer prometheus.Gatherer Gatherer prometheus.Gatherer
// ErrorHandler defines how errors are handled. // The logger that messages are written to. Defaults to no logging.
ErrorHandler ErrorHandler Logger Logger
// ErrorHandling defines how errors are handled. Note that errors are
// logged regardless of the configured ErrorHandling provided Logger
// is not nil.
ErrorHandling HandlerErrorHandling
// ErrorCallbackFunc is a callback function that can be executed when error is occurred
ErrorCallbackFunc CallbackFunc
} }
// Bridge pushes metrics to the configured Graphite server. // Bridge pushes metrics to the configured Graphite server.
@ -76,11 +95,23 @@ type Bridge struct {
interval time.Duration interval time.Duration
timeout time.Duration timeout time.Duration
errorHandler ErrorHandler errorHandling HandlerErrorHandling
errorCallbackFunc CallbackFunc
logger Logger
g prometheus.Gatherer g prometheus.Gatherer
} }
// Logger is the minimal interface Bridge needs for logging. Note that
// log.Logger from the standard library implements this interface, and it is
// easy to implement by custom loggers, if they don't do so already anyway.
type Logger interface {
Println(v ...interface{})
}
// CallbackFunc is a special type for callback functions
type CallbackFunc func(error)
// NewBridge returns a pointer to a new Bridge struct. // NewBridge returns a pointer to a new Bridge struct.
func NewBridge(c *Config) (*Bridge, error) { func NewBridge(c *Config) (*Bridge, error) {
b := &Bridge{} b := &Bridge{}
@ -98,6 +129,10 @@ func NewBridge(c *Config) (*Bridge, error) {
b.g = c.Gatherer b.g = c.Gatherer
} }
if c.Logger != nil {
b.logger = c.Logger
}
if c.Prefix != "" { if c.Prefix != "" {
b.prefix = c.Prefix b.prefix = c.Prefix
} }
@ -115,7 +150,11 @@ func NewBridge(c *Config) (*Bridge, error) {
b.timeout = c.Timeout b.timeout = c.Timeout
} }
b.errorHandler = c.ErrorHandler b.errorHandling = c.ErrorHandling
if c.ErrorCallbackFunc != nil {
b.errorCallbackFunc = c.ErrorCallbackFunc
}
return b, nil return b, nil
} }
@ -128,7 +167,9 @@ func (b *Bridge) Run(ctx context.Context) {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
b.errorHandler(b.Push()) if err := b.Push(); err != nil && b.logger != nil {
b.logger.Println("error pushing to Graphite:", err)
}
case <-ctx.Done(): case <-ctx.Done():
return return
} }
@ -137,11 +178,27 @@ func (b *Bridge) Run(ctx context.Context) {
// Push pushes Prometheus metrics to the configured Graphite server. // Push pushes Prometheus metrics to the configured Graphite server.
func (b *Bridge) Push() error { func (b *Bridge) Push() error {
err := b.push()
switch b.errorHandling {
case AbortOnError:
return err
case ContinueOnError:
if b.logger != nil {
b.logger.Println("continue on error:", err)
}
case CallbackOnError:
if b.errorCallbackFunc != nil {
b.errorCallbackFunc(err)
}
}
return nil
}
func (b *Bridge) push() error {
mfs, err := b.g.Gather() mfs, err := b.g.Gather()
if err != nil { if err != nil {
return err return err
} }
if len(mfs) == 0 { if len(mfs) == 0 {
return nil return nil
} }

View File

@ -19,7 +19,9 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"log"
"net" "net"
"os"
"reflect" "reflect"
"regexp" "regexp"
"sort" "sort"
@ -441,7 +443,8 @@ func ExampleBridge() {
Prefix: "prefix", Prefix: "prefix",
Interval: 15 * time.Second, Interval: 15 * time.Second,
Timeout: 10 * time.Second, Timeout: 10 * time.Second,
ErrorHandler: func(err error) {}, ErrorHandling: AbortOnError,
Logger: log.New(os.Stdout, "graphite bridge: ", log.Lshortfile),
}) })
if err != nil { if err != nil {
panic(err) panic(err)
@ -465,32 +468,47 @@ func ExampleBridge() {
b.Run(ctx) b.Run(ctx)
} }
func TestErrorHandler(t *testing.T) { func TestErrorHandling(t *testing.T) {
var internalError error var testCases = []struct {
errorHandling HandlerErrorHandling
receivedError error
interceptedError error
}{
{
errorHandling: ContinueOnError,
receivedError: nil,
interceptedError: nil,
},
{
errorHandling: AbortOnError,
receivedError: &net.OpError{},
interceptedError: nil,
},
{
errorHandling: CallbackOnError,
receivedError: nil,
interceptedError: &net.OpError{},
},
}
for _, testCase := range testCases {
var interceptedError error
c := &Config{ c := &Config{
URL: "localhost", URL: "localhost",
Gatherer: prometheus.DefaultGatherer, ErrorHandling: testCase.errorHandling,
Prefix: "prefix", ErrorCallbackFunc: func(err error) { interceptedError = err },
Interval: 5 * time.Second,
Timeout: 2 * time.Second,
ErrorHandler: func(err error) { internalError = err },
} }
b, err := NewBridge(c) b, err := NewBridge(c)
if err != nil { if err != nil {
panic(err) t.Fatal(err)
} }
// Create a Context to control stopping the Run() loop that pushes receivedError := b.Push()
// metrics to Graphite. Multiplied by 2, because we need Run to be executed at least one time. if reflect.TypeOf(receivedError) != reflect.TypeOf(testCase.receivedError) {
ctx, cancel := context.WithTimeout(context.Background(), c.Interval*2) t.Errorf("expected to receive: %T, received: %T", testCase.receivedError, receivedError)
defer cancel() }
if reflect.TypeOf(interceptedError) != reflect.TypeOf(testCase.interceptedError) {
// Start pushing metrics to Graphite in the Run() loop. t.Errorf("expected to intercept: %T, intercepted: %T", testCase.interceptedError, interceptedError)
b.Run(ctx) }
// We haven't specified port
expError := fmt.Errorf("dial tcp: address localhost: missing port in address")
if internalError.Error() != expError.Error() {
t.Fatalf("Expected: '%s', actual: '%s'", expError, internalError)
} }
} }