From 2aae630ba8cf1d572d8515ed015763f650eba0f5 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 19 Aug 2015 15:33:59 +0200 Subject: [PATCH] Add initial HTTP API v1 package --- api/prometheus/api.go | 362 +++++++++++++++++++++++++++++ api/prometheus/api_test.go | 453 +++++++++++++++++++++++++++++++++++++ 2 files changed, 815 insertions(+) create mode 100644 api/prometheus/api.go create mode 100644 api/prometheus/api_test.go diff --git a/api/prometheus/api.go b/api/prometheus/api.go new file mode 100644 index 0000000..ea98d43 --- /dev/null +++ b/api/prometheus/api.go @@ -0,0 +1,362 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package prometheus provides bindings to the Prometheus HTTP API: +// http://prometheus.io/docs/querying/api/ +package prometheus + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "path" + "strconv" + "strings" + "time" + + "github.com/prometheus/common/model" + "golang.org/x/net/context" +) + +const ( + statusAPIError = 422 + apiPrefix = "/api/v1" + + epQuery = "/query" + epQueryRange = "/query_range" + epLabelValues = "/label/:name/values" + epSeries = "/series" +) + +type ErrorType string + +const ( + // The different API error types. + ErrBadData ErrorType = "bad_data" + ErrTimeout = "timeout" + ErrCanceled = "canceled" + ErrExec = "execution" + ErrBadResponse = "bad_response" +) + +// Error is an error returned by the API. +type Error struct { + Type ErrorType + Msg string +} + +func (e *Error) Error() string { + return fmt.Sprintf("%s: %s", e.Type, e.Msg) +} + +// CancelableTransport is like net.Transport but provides +// per-request cancelation functionality. +type CancelableTransport interface { + http.RoundTripper + CancelRequest(req *http.Request) +} + +var DefaultTransport CancelableTransport = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).Dial, + TLSHandshakeTimeout: 10 * time.Second, +} + +// Config defines configuration parameters for a new client. +type Config struct { + // The address of the Prometheus to connect to. + Address string + + // Transport is used by the Client to drive HTTP requests. If not + // provided, DefaultTransport will be used. + Transport CancelableTransport +} + +func (cfg *Config) transport() CancelableTransport { + if cfg.Transport == nil { + return DefaultTransport + } + return cfg.Transport +} + +type Client interface { + url(ep string, args map[string]string) *url.URL + do(context.Context, *http.Request) (*http.Response, []byte, error) +} + +// New returns a new Client. +func New(cfg Config) (Client, error) { + u, err := url.Parse(cfg.Address) + if err != nil { + return nil, err + } + u.Path = apiPrefix + + return &httpClient{ + endpoint: u, + transport: cfg.transport(), + }, nil +} + +type httpClient struct { + endpoint *url.URL + transport CancelableTransport +} + +func (c *httpClient) url(ep string, args map[string]string) *url.URL { + p := path.Join(c.endpoint.Path, ep) + + for arg, val := range args { + arg = ":" + arg + p = strings.Replace(p, arg, val, -1) + } + + u := *c.endpoint + u.Path = p + + return &u +} + +func (c *httpClient) do(ctx context.Context, req *http.Request) (*http.Response, []byte, error) { + type roundTripResponse struct { + resp *http.Response + err error + } + + rtchan := make(chan roundTripResponse, 1) + go func() { + resp, err := c.transport.RoundTrip(req) + rtchan <- roundTripResponse{resp: resp, err: err} + close(rtchan) + }() + + var resp *http.Response + var err error + + select { + case rtresp := <-rtchan: + resp, err = rtresp.resp, rtresp.err + case <-ctx.Done(): + // Cancel request and wait until it terminated. + c.transport.CancelRequest(req) + resp, err = (<-rtchan).resp, ctx.Err() + } + + defer func() { + if resp != nil { + resp.Body.Close() + } + }() + + if err != nil { + return nil, nil, err + } + + var body []byte + done := make(chan struct{}) + go func() { + body, err = ioutil.ReadAll(resp.Body) + close(done) + }() + + select { + case <-ctx.Done(): + err = resp.Body.Close() + <-done + if err == nil { + err = ctx.Err() + } + case <-done: + } + + return resp, body, err +} + +// apiClient wraps a regular client and processes successful API responses. +// Successful also includes responses that errored at the API level. +type apiClient struct { + Client +} + +type apiResponse struct { + Status string `json:"status"` + Data json.RawMessage `json:"data"` + ErrorType ErrorType `json:"errorType"` + Error string `json:"error"` +} + +func (c apiClient) do(ctx context.Context, req *http.Request) (*http.Response, []byte, error) { + resp, body, err := c.Client.do(ctx, req) + if err != nil { + return resp, body, err + } + + code := resp.StatusCode + + if code/100 != 2 && code != statusAPIError { + return resp, body, &Error{ + Type: ErrBadResponse, + Msg: fmt.Sprintf("bad response code %d", resp.StatusCode), + } + } + + var result apiResponse + + if err = json.Unmarshal(body, &result); err != nil { + return resp, body, &Error{ + Type: ErrBadResponse, + Msg: err.Error(), + } + } + + if (code == statusAPIError) != (result.Status == "error") { + err = &Error{ + Type: ErrBadResponse, + Msg: "inconsistent body for response code", + } + } + + if code == statusAPIError && result.Status == "error" { + err = &Error{ + Type: result.ErrorType, + Msg: result.Error, + } + } + + return resp, []byte(result.Data), err +} + +// Range represents a sliced time range. +type Range struct { + // The boundaries of the time range. + Start, End time.Time + // The maximum time between two slices within the boundaries. + Step time.Duration +} + +// queryResult contains result data for a query. +type queryResult struct { + Type model.ValueType `json:"resultType"` + Result interface{} `json:"result"` + + // The decoded value. + v model.Value +} + +func (qr *queryResult) UnmarshalJSON(b []byte) error { + v := struct { + Type model.ValueType `json:"resultType"` + Result json.RawMessage `json:"result"` + }{} + + err := json.Unmarshal(b, &v) + if err != nil { + return err + } + + switch v.Type { + case model.ValScalar: + var sv model.Scalar + err = json.Unmarshal(v.Result, &sv) + qr.v = &sv + + case model.ValVector: + var vv model.Vector + err = json.Unmarshal(v.Result, &vv) + qr.v = vv + + case model.ValMatrix: + var mv model.Matrix + err = json.Unmarshal(v.Result, &mv) + qr.v = mv + + default: + err = fmt.Errorf("unexpected value type %q", v.Type) + } + return err +} + +// QueryAPI provides bindings the Prometheus's query API. +type QueryAPI interface { + // Query performs a query for the given time. + Query(ctx context.Context, query string, ts time.Time) (model.Value, error) + // Query performs a query for the given range. + QueryRange(ctx context.Context, query string, r Range) (model.Value, error) +} + +// NewQueryAPI returns a new QueryAPI for the client. +func NewQueryAPI(c Client) QueryAPI { + return &httpQueryAPI{client: apiClient{c}} +} + +type httpQueryAPI struct { + client Client +} + +func (h *httpQueryAPI) Query(ctx context.Context, query string, ts time.Time) (model.Value, error) { + u := h.client.url(epQuery, nil) + q := u.Query() + + q.Set("query", query) + q.Set("time", ts.Format(time.RFC3339Nano)) + + u.RawQuery = q.Encode() + + req, _ := http.NewRequest("GET", u.String(), nil) + + _, body, err := h.client.do(ctx, req) + if err != nil { + return nil, err + } + + var qres queryResult + err = json.Unmarshal(body, &qres) + + return model.Value(qres.v), err +} + +func (h *httpQueryAPI) QueryRange(ctx context.Context, query string, r Range) (model.Value, error) { + u := h.client.url(epQueryRange, nil) + q := u.Query() + + var ( + start = r.Start.Format(time.RFC3339Nano) + end = r.End.Format(time.RFC3339Nano) + step = strconv.FormatFloat(r.Step.Seconds(), 'f', 3, 64) + ) + + q.Set("query", query) + q.Set("start", start) + q.Set("end", end) + q.Set("step", step) + + u.RawQuery = q.Encode() + + req, _ := http.NewRequest("GET", u.String(), nil) + + _, body, err := h.client.do(ctx, req) + if err != nil { + return nil, err + } + + var qres queryResult + err = json.Unmarshal(body, &qres) + + return model.Value(qres.v), err +} diff --git a/api/prometheus/api_test.go b/api/prometheus/api_test.go new file mode 100644 index 0000000..87d3e40 --- /dev/null +++ b/api/prometheus/api_test.go @@ -0,0 +1,453 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheus + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "reflect" + "testing" + "time" + + "github.com/prometheus/common/model" + "golang.org/x/net/context" +) + +func TestConfig(t *testing.T) { + c := Config{} + if c.transport() != DefaultTransport { + t.Fatalf("expected default transport for nil Transport field") + } +} + +func TestClientURL(t *testing.T) { + tests := []struct { + address string + endpoint string + args map[string]string + expected string + }{ + { + address: "http://localhost:9090", + endpoint: "/test", + expected: "http://localhost:9090/test", + }, + { + address: "http://localhost", + endpoint: "/test", + expected: "http://localhost/test", + }, + { + address: "http://localhost:9090", + endpoint: "test", + expected: "http://localhost:9090/test", + }, + { + address: "http://localhost:9090/prefix", + endpoint: "/test", + expected: "http://localhost:9090/prefix/test", + }, + { + address: "https://localhost:9090/", + endpoint: "/test/", + expected: "https://localhost:9090/test", + }, + { + address: "http://localhost:9090", + endpoint: "/test/:param", + args: map[string]string{ + "param": "content", + }, + expected: "http://localhost:9090/test/content", + }, + { + address: "http://localhost:9090", + endpoint: "/test/:param/more/:param", + args: map[string]string{ + "param": "content", + }, + expected: "http://localhost:9090/test/content/more/content", + }, + { + address: "http://localhost:9090", + endpoint: "/test/:param/more/:foo", + args: map[string]string{ + "param": "content", + "foo": "bar", + }, + expected: "http://localhost:9090/test/content/more/bar", + }, + { + address: "http://localhost:9090", + endpoint: "/test/:param", + args: map[string]string{ + "nonexistant": "content", + }, + expected: "http://localhost:9090/test/:param", + }, + } + + for _, test := range tests { + ep, err := url.Parse(test.address) + if err != nil { + t.Fatal(err) + } + + hclient := &httpClient{ + endpoint: ep, + transport: DefaultTransport, + } + + u := hclient.url(test.endpoint, test.args) + if u.String() != test.expected { + t.Errorf("unexpected result: got %s, want %s", u, test.expected) + continue + } + + // The apiClient must return exactly the same result as the httpClient. + aclient := &apiClient{hclient} + + u = aclient.url(test.endpoint, test.args) + if u.String() != test.expected { + t.Errorf("unexpected result: got %s, want %s", u, test.expected) + } + } +} + +type testClient struct { + *testing.T + + ch chan apiClientTest + req *http.Request +} + +type apiClientTest struct { + code int + response interface{} + expected string + err *Error +} + +func (c *testClient) url(ep string, args map[string]string) *url.URL { + return nil +} + +func (c *testClient) do(ctx context.Context, req *http.Request) (*http.Response, []byte, error) { + if ctx == nil { + c.Fatalf("context was not passed down") + } + if req != c.req { + c.Fatalf("request was not passed down") + } + + test := <-c.ch + + var b []byte + var err error + + switch v := test.response.(type) { + case string: + b = []byte(v) + default: + b, err = json.Marshal(v) + if err != nil { + c.Fatal(err) + } + } + + resp := &http.Response{ + StatusCode: test.code, + } + + return resp, b, nil +} + +func TestAPIClientDo(t *testing.T) { + tests := []apiClientTest{ + { + response: &apiResponse{ + Status: "error", + Data: json.RawMessage(`null`), + ErrorType: ErrBadData, + Error: "failed", + }, + err: &Error{ + Type: ErrBadData, + Msg: "failed", + }, + code: statusAPIError, + expected: `null`, + }, + { + response: &apiResponse{ + Status: "error", + Data: json.RawMessage(`"test"`), + ErrorType: ErrTimeout, + Error: "timed out", + }, + err: &Error{ + Type: ErrTimeout, + Msg: "timed out", + }, + code: statusAPIError, + expected: `test`, + }, + { + response: "bad json", + err: &Error{ + Type: ErrBadResponse, + Msg: "bad response code 400", + }, + code: http.StatusBadRequest, + }, + { + response: "bad json", + err: &Error{ + Type: ErrBadResponse, + Msg: "invalid character 'b' looking for beginning of value", + }, + code: statusAPIError, + }, + { + response: &apiResponse{ + Status: "success", + Data: json.RawMessage(`"test"`), + }, + err: &Error{ + Type: ErrBadResponse, + Msg: "inconsistent body for response code", + }, + code: statusAPIError, + }, + { + response: &apiResponse{ + Status: "success", + Data: json.RawMessage(`"test"`), + ErrorType: ErrTimeout, + Error: "timed out", + }, + err: &Error{ + Type: ErrBadResponse, + Msg: "inconsistent body for response code", + }, + code: statusAPIError, + }, + { + response: &apiResponse{ + Status: "error", + Data: json.RawMessage(`"test"`), + ErrorType: ErrTimeout, + Error: "timed out", + }, + err: &Error{ + Type: ErrBadResponse, + Msg: "inconsistent body for response code", + }, + code: http.StatusOK, + }, + } + + tc := &testClient{ + T: t, + ch: make(chan apiClientTest, 1), + req: &http.Request{}, + } + client := &apiClient{tc} + + for _, test := range tests { + + tc.ch <- test + + _, body, err := client.do(context.Background(), tc.req) + + if test.err != nil { + if err == nil { + t.Errorf("expected error %q but got none", test.err) + continue + } + if test.err.Error() != err.Error() { + t.Errorf("unexpected error: want %q, got %q", test.err, err) + } + continue + } + if err != nil { + t.Errorf("unexpeceted error %s", err) + continue + } + + want, got := test.expected, string(body) + if want != got { + t.Errorf("unexpected body: want %q, got %q", want, got) + } + } +} + +type apiTestClient struct { + *testing.T + curTest apiTest +} + +type apiTest struct { + do func() (interface{}, error) + inErr error + inRes interface{} + + reqPath string + reqParam url.Values + reqMethod string + res interface{} + err error +} + +func (c *apiTestClient) url(ep string, args map[string]string) *url.URL { + u := &url.URL{ + Host: "test:9090", + Path: apiPrefix + ep, + } + return u +} + +func (c *apiTestClient) do(ctx context.Context, req *http.Request) (*http.Response, []byte, error) { + + test := c.curTest + + if req.URL.Path != test.reqPath { + c.Errorf("unexpected request path: want %s, got %s", test.reqPath, req.URL.Path) + } + if req.Method != test.reqMethod { + c.Errorf("unexpected request method: want %s, got %s", test.reqMethod, req.Method) + } + + b, err := json.Marshal(test.inRes) + if err != nil { + c.Fatal(err) + } + + resp := &http.Response{} + if test.inErr != nil { + resp.StatusCode = statusAPIError + } else { + resp.StatusCode = http.StatusOK + } + + return resp, b, test.inErr +} + +func TestAPIs(t *testing.T) { + + testTime := time.Now() + + client := &apiTestClient{T: t} + + queryApi := &httpQueryAPI{ + client: client, + } + + doQuery := func(q string, ts time.Time) func() (interface{}, error) { + return func() (interface{}, error) { + return queryApi.Query(context.Background(), q, ts) + } + } + + doQueryRange := func(q string, rng Range) func() (interface{}, error) { + return func() (interface{}, error) { + return queryApi.QueryRange(context.Background(), q, rng) + } + } + + queryTests := []apiTest{ + { + do: doQuery("2", testTime), + inRes: &queryResult{ + Type: model.ValScalar, + Result: &model.Scalar{ + Value: 2, + Timestamp: model.TimeFromUnix(testTime.Unix()), + }, + }, + + reqMethod: "GET", + reqPath: "/api/v1/query", + reqParam: url.Values{ + "query": []string{"2"}, + "time": []string{testTime.Format(time.RFC3339Nano)}, + }, + res: &model.Scalar{ + Value: 2, + Timestamp: model.TimeFromUnix(testTime.Unix()), + }, + }, + { + do: doQuery("2", testTime), + inErr: fmt.Errorf("some error"), + + reqMethod: "GET", + reqPath: "/api/v1/query", + reqParam: url.Values{ + "query": []string{"2"}, + "time": []string{testTime.Format(time.RFC3339Nano)}, + }, + err: fmt.Errorf("some error"), + }, + + { + do: doQueryRange("2", Range{ + Start: testTime.Add(-time.Minute), + End: testTime, + Step: time.Minute, + }), + inErr: fmt.Errorf("some error"), + + reqMethod: "GET", + reqPath: "/api/v1/query_range", + reqParam: url.Values{ + "query": []string{"2"}, + "start": []string{testTime.Add(-time.Minute).Format(time.RFC3339Nano)}, + "end": []string{testTime.Format(time.RFC3339Nano)}, + "step": []string{time.Minute.String()}, + }, + err: fmt.Errorf("some error"), + }, + } + + var tests []apiTest + tests = append(tests, queryTests...) + + for _, test := range tests { + client.curTest = test + + res, err := test.do() + + if test.err != nil { + if err == nil { + t.Errorf("expected error %q but got none", test.err) + continue + } + if err.Error() != test.err.Error() { + t.Errorf("unexpected error: want %s, got %s", test.err, err) + } + continue + } + if err != nil { + t.Errorf("unexpected error: %s", err) + continue + } + + if !reflect.DeepEqual(res, test.res) { + t.Errorf("unexpected result: want %v, got %v", test.res, res) + } + } +}