From b89839e5bbcf4890972390e0f396ddf969236734 Mon Sep 17 00:00:00 2001 From: Anthony Mirabella Date: Wed, 5 Jun 2024 20:27:34 -0400 Subject: [PATCH] Backport changes to `confighttp` and `configgrpc` (#2743) Upstream changes to address potential decompression-related failures in the `confighttp` and `configgrpc` modules are backported. See https://github.com/open-telemetry/opentelemetry-collector/pull/10289 and https://github.com/open-telemetry/opentelemetry-collector/pull/10323 for more details. Signed-off-by: Anthony J Mirabella --- Makefile | 2 +- patches/configgrpc.patch | 118 +++++++++++++++++++++++++++++++++++++++ patches/confighttp.patch | 80 ++++++++++++++++++++++++++ 3 files changed, 199 insertions(+), 1 deletion(-) create mode 100644 patches/configgrpc.patch create mode 100644 patches/confighttp.patch diff --git a/Makefile b/Makefile index 7c2751905a..e06be1dcaf 100644 --- a/Makefile +++ b/Makefile @@ -53,7 +53,7 @@ for-all-target: $(GOMODULES) PATCHES := $(shell find ./patches -name *.patch) apply-patches: $(PATCHES) - $(foreach patch,$(PATCHES), patch --posix --forward -p1 < $(patch);) + $(foreach patch,$(PATCHES), patch -V none --forward -p1 < $(patch);) .PHONY: apply-patches diff --git a/patches/configgrpc.patch b/patches/configgrpc.patch new file mode 100644 index 0000000000..c55a8b7e11 --- /dev/null +++ b/patches/configgrpc.patch @@ -0,0 +1,118 @@ +diff --git a/vendor/go.opentelemetry.io/collector/config/configgrpc/configgrpc.go b/vendor/go.opentelemetry.io/collector/config/configgrpc/configgrpc.go +index 87e7b83d7..e64b87142 100644 +--- a/vendor/go.opentelemetry.io/collector/config/configgrpc/configgrpc.go ++++ b/vendor/go.opentelemetry.io/collector/config/configgrpc/configgrpc.go +@@ -12,7 +12,6 @@ import ( + "time" + + "github.com/mostynb/go-grpc-compression/nonclobbering/snappy" +- "github.com/mostynb/go-grpc-compression/nonclobbering/zstd" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel" + "google.golang.org/grpc" +@@ -28,6 +27,7 @@ import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configauth" + "go.opentelemetry.io/collector/config/configcompression" ++ grpcInternal "go.opentelemetry.io/collector/config/configgrpc/internal" + "go.opentelemetry.io/collector/config/confignet" + "go.opentelemetry.io/collector/config/configopaque" + "go.opentelemetry.io/collector/config/configtelemetry" +@@ -426,7 +426,7 @@ func getGRPCCompressionName(compressionType configcompression.Type) (string, err + case configcompression.TypeSnappy: + return snappy.Name, nil + case configcompression.TypeZstd: +- return zstd.Name, nil ++ return grpcInternal.ZstdName, nil + default: + return "", fmt.Errorf("unsupported compression type %q", compressionType) + } +diff --git /dev/null b/vendor/go.opentelemetry.io/collector/config/configgrpc/internal/zstd.go +new file mode 100644 +index 000000000..0718b7353 +--- /dev/null ++++ b/vendor/go.opentelemetry.io/collector/config/configgrpc/internal/zstd.go +@@ -0,0 +1,83 @@ ++// Copyright The OpenTelemetry Authors ++// Copyright 2017 gRPC authors ++// SPDX-License-Identifier: Apache-2.0 ++ ++package internal // import "go.opentelemetry.io/collector/config/configgrpc/internal" ++ ++import ( ++ "errors" ++ "io" ++ "sync" ++ ++ "github.com/klauspost/compress/zstd" ++ "google.golang.org/grpc/encoding" ++) ++ ++const ZstdName = "zstd" ++ ++func init() { ++ encoding.RegisterCompressor(NewZstdCodec()) ++} ++ ++type writer struct { ++ *zstd.Encoder ++ pool *sync.Pool ++} ++ ++func NewZstdCodec() encoding.Compressor { ++ c := &compressor{} ++ c.poolCompressor.New = func() any { ++ zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), zstd.WithWindowSize(512*1024)) ++ return &writer{Encoder: zw, pool: &c.poolCompressor} ++ } ++ return c ++} ++ ++func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { ++ z := c.poolCompressor.Get().(*writer) ++ z.Encoder.Reset(w) ++ return z, nil ++} ++ ++func (z *writer) Close() error { ++ defer z.pool.Put(z) ++ return z.Encoder.Close() ++} ++ ++type reader struct { ++ *zstd.Decoder ++ pool *sync.Pool ++} ++ ++func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { ++ z, inPool := c.poolDecompressor.Get().(*reader) ++ if !inPool { ++ newZ, err := zstd.NewReader(r) ++ if err != nil { ++ return nil, err ++ } ++ return &reader{Decoder: newZ, pool: &c.poolDecompressor}, nil ++ } ++ if err := z.Reset(r); err != nil { ++ c.poolDecompressor.Put(z) ++ return nil, err ++ } ++ return z, nil ++} ++ ++func (z *reader) Read(p []byte) (n int, err error) { ++ n, err = z.Decoder.Read(p) ++ if errors.Is(err, io.EOF) { ++ z.pool.Put(z) ++ } ++ return n, err ++} ++ ++func (c *compressor) Name() string { ++ return ZstdName ++} ++ ++type compressor struct { ++ poolCompressor sync.Pool ++ poolDecompressor sync.Pool ++} diff --git a/patches/confighttp.patch b/patches/confighttp.patch new file mode 100644 index 0000000000..eb69a9b718 --- /dev/null +++ b/patches/confighttp.patch @@ -0,0 +1,80 @@ +diff --git a/vendor/go.opentelemetry.io/collector/config/confighttp/compression.go b/vendor/go.opentelemetry.io/collector/config/confighttp/compression.go +index 88ecafe78..a700bec84 100644 +--- a/vendor/go.opentelemetry.io/collector/config/confighttp/compression.go ++++ b/vendor/go.opentelemetry.io/collector/config/confighttp/compression.go +@@ -67,24 +67,26 @@ func (r *compressRoundTripper) RoundTrip(req *http.Request) (*http.Response, err + } + + type decompressor struct { +- errHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int) +- base http.Handler +- decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error) ++ errHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int) ++ base http.Handler ++ decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error) ++ maxRequestBodySize int64 + } + + // httpContentDecompressor offloads the task of handling compressed HTTP requests + // by identifying the compression format in the "Content-Encoding" header and re-writing + // request body so that the handlers further in the chain can work on decompressed data. + // It supports gzip and deflate/zlib compression. +-func httpContentDecompressor(h http.Handler, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int), decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)) http.Handler { ++func httpContentDecompressor(h http.Handler, maxRequestBodySize int64, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int), decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)) http.Handler { + errHandler := defaultErrorHandler + if eh != nil { + errHandler = eh + } + + d := &decompressor{ +- errHandler: errHandler, +- base: h, ++ maxRequestBodySize: maxRequestBodySize, ++ errHandler: errHandler, ++ base: h, + decoders: map[string]func(body io.ReadCloser) (io.ReadCloser, error){ + "": func(io.ReadCloser) (io.ReadCloser, error) { + // Not a compressed payload. Nothing to do. +@@ -155,7 +157,7 @@ func (d *decompressor) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // "Content-Length" is set to -1 as the size of the decompressed body is unknown. + r.Header.Del("Content-Length") + r.ContentLength = -1 +- r.Body = newBody ++ r.Body = http.MaxBytesReader(w, newBody, d.maxRequestBodySize) + } + d.base.ServeHTTP(w, r) + } +diff --git a/vendor/go.opentelemetry.io/collector/config/confighttp/confighttp.go b/vendor/go.opentelemetry.io/collector/config/confighttp/confighttp.go +index b210fa0dd..71b2f17ee 100644 +--- a/vendor/go.opentelemetry.io/collector/config/confighttp/confighttp.go ++++ b/vendor/go.opentelemetry.io/collector/config/confighttp/confighttp.go +@@ -30,6 +30,7 @@ import ( + ) + + const headerContentEncoding = "Content-Encoding" ++const defaultMaxRequestBodySize = 20 * 1024 * 1024 // 20MiB + + // ClientConfig defines settings for creating an HTTP client. + type ClientConfig struct { +@@ -269,7 +270,7 @@ type ServerConfig struct { + // Auth for this receiver + Auth *configauth.Authentication `mapstructure:"auth"` + +- // MaxRequestBodySize sets the maximum request body size in bytes ++ // MaxRequestBodySize sets the maximum request body size in bytes. Default: 20MiB. + MaxRequestBodySize int64 `mapstructure:"max_request_body_size"` + + // IncludeMetadata propagates the client metadata from the incoming requests to the downstream consumers +@@ -340,7 +341,11 @@ func (hss *ServerConfig) ToServer(_ context.Context, host component.Host, settin + o(serverOpts) + } + +- handler = httpContentDecompressor(handler, serverOpts.errHandler, serverOpts.decoders) ++ if hss.MaxRequestBodySize <= 0 { ++ hss.MaxRequestBodySize = defaultMaxRequestBodySize ++ } ++ ++ handler = httpContentDecompressor(handler, hss.MaxRequestBodySize, serverOpts.errHandler, serverOpts.decoders) + + if hss.MaxRequestBodySize > 0 { + handler = maxRequestBodySizeInterceptor(handler, hss.MaxRequestBodySize)