// Copyright 2024 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package remotewrite import ( "context" "fmt" "io" "log/slog" "net/http" "strings" "github.com/klauspost/compress/snappy" writev1 "github.com/prometheus/client_golang/api/remotewrite/genproto/v1" writev2 "github.com/prometheus/client_golang/api/remotewrite/genproto/v2" ) type Storage interface { Store(ctx context.Context, proto ProtoMsg, serializedRequest []byte) (_ WriteResponseStats, code int, _ error) } type Handler struct { logger *slog.Logger store Storage } type DecodedStorage interface { StoreV1(ctx context.Context, reqV1 *writev1.WriteRequest) (_ *WriteResponseStats, code *int, _ error) StoreV2(ctx context.Context, reqV2 *writev2.Request) (_ *WriteResponseStats, code *int, _ error) } func NewDecodingStore(store DecodedStorage) Storage { return &decodingStore{store: store} } type decodingStore struct { store DecodedStorage } func (s *decodingStore) Store(ctx context.Context, proto ProtoMsg, serializedRequest []byte) (stats WriteResponseStats, code int, _ error) { var ( maybeStats *WriteResponseStats maybeCode *int err error ) switch proto { case ProtoMsgV1: var req writev1.WriteRequest if err := req.UnmarshalVT(serializedRequest); err != nil { return stats, http.StatusBadRequest, fmt.Errorf("decoding v1 remote write request: %w", err) } maybeStats, maybeCode, err = s.store.StoreV1(ctx, &req) if maybeStats != nil { stats = *maybeStats } else { stats = stats.AddV1(&req) } case ProtoMsgV2: var req writev2.Request if err := req.UnmarshalVT(serializedRequest); err != nil { return stats, http.StatusBadRequest, fmt.Errorf("decoding v2 remote write request: %w", err) } maybeStats, maybeCode, err = s.store.StoreV2(ctx, &req) if maybeStats != nil { stats = *maybeStats } else { stats = stats.AddV2(&req) } default: return stats, http.StatusUnsupportedMediaType, fmt.Errorf("unsupported proto format %v", string(proto)) } if err != nil { if maybeCode == nil { return stats, http.StatusInternalServerError, err } return stats, *maybeCode, err } if maybeCode == nil { return stats, http.StatusOK, nil } return stats, *maybeCode, nil } // TODO(bwplotka): Add variadic options if needed. func NewHandler(logger *slog.Logger, store Storage) *Handler { return &Handler{logger: logger, store: store} } func parseProtoMsg(contentType string) (ProtoMsg, error) { contentType = strings.TrimSpace(contentType) parts := strings.Split(contentType, ";") if parts[0] != appProtoContentType { return "", fmt.Errorf("expected %v as the first (media) part, got %v content-type", appProtoContentType, contentType) } // Parse potential https://www.rfc-editor.org/rfc/rfc9110#parameter for _, p := range parts[1:] { pair := strings.Split(p, "=") if len(pair) != 2 { return "", fmt.Errorf("as per https://www.rfc-editor.org/rfc/rfc9110#parameter expected parameters to be key-values, got %v in %v content-type", p, contentType) } if pair[0] == "proto" { ret := ProtoMsg(pair[1]) if err := ret.Validate(); err != nil { return "", fmt.Errorf("got %v content type; %w", contentType, err) } return ret, nil } } // No "proto=" parameter, assuming v1. return ProtoMsgV1, nil } func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { contentType := r.Header.Get("Content-Type") if contentType == "" { // Don't break yolo 1.0 clients if not needed. // We could give http.StatusUnsupportedMediaType, but let's assume 1.0 message by default. contentType = appProtoContentType } msgType, err := parseProtoMsg(contentType) if err != nil { h.logger.Error("Error decoding remote write request", "err", err) http.Error(w, err.Error(), http.StatusUnsupportedMediaType) return } enc := r.Header.Get("Content-Encoding") if enc == "" { // Don't break yolo 1.0 clients if not needed. This is similar to what we did // before 2.0: https://github.com/prometheus/prometheus/blob/d78253319daa62c8f28ed47e40bafcad2dd8b586/storage/remote/write_handler.go#L62 // We could give http.StatusUnsupportedMediaType, but let's assume snappy by default. } else if enc != string(SnappyBlockCompression) { err := fmt.Errorf("%v encoding (compression) is not accepted by this server; only %v is acceptable", enc, SnappyBlockCompression) h.logger.Error("Error decoding remote write request", "err", err) http.Error(w, err.Error(), http.StatusUnsupportedMediaType) } // Read the request body. body, err := io.ReadAll(r.Body) if err != nil { h.logger.Error("Error decoding remote write request", "err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) return } decompressed, err := snappy.Decode(nil, body) if err != nil { // TODO(bwplotka): Add more context to responded error? h.logger.Error("Error decompressing remote write request", "err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) return } stats, code, storeErr := h.store.Store(r.Context(), msgType, decompressed) // Set required X-Prometheus-Remote-Write-Written-* response headers, in all cases. stats.SetHeaders(w) if storeErr != nil { if code == 0 { code = http.StatusInternalServerError } if code/5 == 100 { // 5xx h.logger.Error("Error while remote writing the v2 request", "err", storeErr.Error()) } http.Error(w, storeErr.Error(), code) return } w.WriteHeader(http.StatusNoContent) }