200 lines
6.7 KiB
Go
200 lines
6.7 KiB
Go
|
// Copyright 2024 The Prometheus Authors
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
package remotewrite
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"net/http"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
|
||
|
writev1 "github.com/prometheus/client_golang/api/remotewrite/genproto/v1"
|
||
|
writev2 "github.com/prometheus/client_golang/api/remotewrite/genproto/v2"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
VersionHeader = "X-Prometheus-Remote-Write-Version"
|
||
|
Version1HeaderValue = "0.1.0"
|
||
|
Version20HeaderValue = "2.0.0"
|
||
|
appProtoContentType = "application/x-protobuf"
|
||
|
)
|
||
|
|
||
|
// Compression represents the encoding. Currently remote storage supports only
|
||
|
// one, but we experiment with more, thus leaving the compression scaffolding
|
||
|
// for now.
|
||
|
type Compression string
|
||
|
|
||
|
const (
|
||
|
// SnappyBlockCompression represents https://github.com/google/snappy/blob/2c94e11145f0b7b184b831577c93e5a41c4c0346/format_description.txt
|
||
|
SnappyBlockCompression Compression = "snappy"
|
||
|
)
|
||
|
|
||
|
// ProtoMsg represents the known protobuf message for the remote write 1.0 and 2.0 specs.
|
||
|
type ProtoMsg string
|
||
|
|
||
|
const (
|
||
|
// ProtoMsgV1 represents the `prometheus.WriteRequest` protobuf
|
||
|
// message introduced in the https://prometheus.io/docs/specs/remote_write_spec/,
|
||
|
// which will eventually be deprecated.
|
||
|
ProtoMsgV1 ProtoMsg = "prometheus.WriteRequest"
|
||
|
// ProtoMsgV2 represents the `io.prometheus.write.v2.Request` protobuf
|
||
|
// message introduced in https://prometheus.io/docs/specs/remote_write_spec_2_0/
|
||
|
ProtoMsgV2 ProtoMsg = "io.prometheus.write.v2.Request"
|
||
|
)
|
||
|
|
||
|
// Validate returns error if the given reference for the protobuf message is not supported.
|
||
|
func (s ProtoMsg) Validate() error {
|
||
|
switch s {
|
||
|
case ProtoMsgV1, ProtoMsgV2:
|
||
|
return nil
|
||
|
default:
|
||
|
return fmt.Errorf("unknown remote write protobuf message %v, supported: %v", s, protoMsgs{ProtoMsgV1, ProtoMsgV2}.String())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type protoMsgs []ProtoMsg
|
||
|
|
||
|
func (m protoMsgs) Strings() []string {
|
||
|
ret := make([]string, 0, len(m))
|
||
|
for _, typ := range m {
|
||
|
ret = append(ret, string(typ))
|
||
|
}
|
||
|
return ret
|
||
|
}
|
||
|
|
||
|
func (m protoMsgs) String() string {
|
||
|
return strings.Join(m.Strings(), ", ")
|
||
|
}
|
||
|
|
||
|
var contentTypeHeaders = map[ProtoMsg]string{
|
||
|
ProtoMsgV1: appProtoContentType, // Also application/x-protobuf;proto=prometheus.WriteRequest but simplified for compatibility with 1.x spec.
|
||
|
ProtoMsgV2: appProtoContentType + ";proto=io.prometheus.write.v2.Request",
|
||
|
}
|
||
|
|
||
|
// ContentTypeHeader returns content type header value for the given proto message
|
||
|
// or empty string for unknown proto message.
|
||
|
func ContentTypeHeader(m ProtoMsg) string {
|
||
|
return contentTypeHeaders[m]
|
||
|
}
|
||
|
|
||
|
const (
|
||
|
WrittenSamplesHeader = "X-Prometheus-Remote-Write-Samples-Written"
|
||
|
WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Histograms-Written"
|
||
|
WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Exemplars-Written"
|
||
|
)
|
||
|
|
||
|
// WriteResponseStats represents the response write statistics.
|
||
|
type WriteResponseStats struct {
|
||
|
// Samples represents X-Prometheus-Remote-Write-Written-Samples
|
||
|
Samples int
|
||
|
// Histograms represents X-Prometheus-Remote-Write-Written-Histograms
|
||
|
Histograms int
|
||
|
// Exemplars represents X-Prometheus-Remote-Write-Written-Exemplars
|
||
|
Exemplars int
|
||
|
|
||
|
// Confirmed means we can trust those statistics from the point of view
|
||
|
// of the PRW 2.0 spec. When parsed from headers, it means we got at least one
|
||
|
// response header from the Receiver to confirm those numbers, meaning it must
|
||
|
// be at least 2.0 Receiver. See ParseWriteResponseStats for details.
|
||
|
Confirmed bool
|
||
|
}
|
||
|
|
||
|
// NoDataWritten returns true if statistics indicate no data was written.
|
||
|
func (s WriteResponseStats) NoDataWritten() bool {
|
||
|
return (s.Samples + s.Histograms + s.Exemplars) == 0
|
||
|
}
|
||
|
|
||
|
// AllSamples returns both float and histogram sample numbers.
|
||
|
func (s WriteResponseStats) AllSamples() int {
|
||
|
return s.Samples + s.Histograms
|
||
|
}
|
||
|
|
||
|
// Add returns the sum of this WriteResponseStats plus the given WriteResponseStats.
|
||
|
func (s WriteResponseStats) Add(rs WriteResponseStats) WriteResponseStats {
|
||
|
s.Confirmed = rs.Confirmed
|
||
|
s.Samples += rs.Samples
|
||
|
s.Histograms += rs.Histograms
|
||
|
s.Exemplars += rs.Exemplars
|
||
|
return s
|
||
|
}
|
||
|
|
||
|
// AddV1 returns the sum of this WriteResponseStats plus the statistics from v1 request.
|
||
|
func (s WriteResponseStats) AddV1(req *writev1.WriteRequest) WriteResponseStats {
|
||
|
s.Confirmed = true
|
||
|
for _, ts := range req.Timeseries {
|
||
|
s.Samples += len(ts.Samples)
|
||
|
s.Histograms += len(ts.Histograms)
|
||
|
s.Exemplars += len(ts.Exemplars)
|
||
|
}
|
||
|
return s
|
||
|
}
|
||
|
|
||
|
// AddV2 returns the sum of this WriteResponseStats plus the statistics from v2 request.
|
||
|
func (s WriteResponseStats) AddV2(req *writev2.Request) WriteResponseStats {
|
||
|
s.Confirmed = true
|
||
|
for _, ts := range req.Timeseries {
|
||
|
s.Samples += len(ts.Samples)
|
||
|
s.Histograms += len(ts.Histograms)
|
||
|
s.Exemplars += len(ts.Exemplars)
|
||
|
}
|
||
|
return s
|
||
|
}
|
||
|
|
||
|
// SetHeaders sets response headers in a given response writer.
|
||
|
// Make sure to use it before http.ResponseWriter.WriteHeader and .Write.
|
||
|
func (s WriteResponseStats) SetHeaders(w http.ResponseWriter) {
|
||
|
h := w.Header()
|
||
|
h.Set(WrittenSamplesHeader, strconv.Itoa(s.Samples))
|
||
|
h.Set(WrittenHistogramsHeader, strconv.Itoa(s.Histograms))
|
||
|
h.Set(WrittenExemplarsHeader, strconv.Itoa(s.Exemplars))
|
||
|
}
|
||
|
|
||
|
// parseWriteResponseStats returns WriteResponseStats parsed from the response headers.
|
||
|
//
|
||
|
// As per 2.0 spec, missing header means 0. However, abrupt HTTP errors, 1.0 Receivers
|
||
|
// or buggy 2.0 Receivers might result in no response headers specified and that
|
||
|
// might NOT necessarily mean nothing was written. To represent that we set
|
||
|
// s.Confirmed = true only when see at least on response header.
|
||
|
//
|
||
|
// Error is returned when any of the header fails to parse as int64.
|
||
|
func parseWriteResponseStats(r *http.Response) (s WriteResponseStats, err error) {
|
||
|
var (
|
||
|
errs []error
|
||
|
h = r.Header
|
||
|
)
|
||
|
if v := h.Get(WrittenSamplesHeader); v != "" { // Empty means zero.
|
||
|
s.Confirmed = true
|
||
|
if s.Samples, err = strconv.Atoi(v); err != nil {
|
||
|
s.Samples = 0
|
||
|
errs = append(errs, err)
|
||
|
}
|
||
|
}
|
||
|
if v := h.Get(WrittenHistogramsHeader); v != "" { // Empty means zero.
|
||
|
s.Confirmed = true
|
||
|
if s.Histograms, err = strconv.Atoi(v); err != nil {
|
||
|
s.Histograms = 0
|
||
|
errs = append(errs, err)
|
||
|
}
|
||
|
}
|
||
|
if v := h.Get(WrittenExemplarsHeader); v != "" { // Empty means zero.
|
||
|
s.Confirmed = true
|
||
|
if s.Exemplars, err = strconv.Atoi(v); err != nil {
|
||
|
s.Exemplars = 0
|
||
|
errs = append(errs, err)
|
||
|
}
|
||
|
}
|
||
|
return s, errors.Join(errs...)
|
||
|
}
|