Skip to content

Commit

Permalink
Backport changes to confighttp and configgrpc (#2743)
Browse files Browse the repository at this point in the history
Upstream changes to address potential decompression-related failures in
the `confighttp` and `configgrpc` modules are backported.  See
open-telemetry/opentelemetry-collector#10289 and
open-telemetry/opentelemetry-collector#10323 for
more details.

Signed-off-by: Anthony J Mirabella <a9@aneurysm9.com>
  • Loading branch information
Aneurysm9 authored Jun 6, 2024
1 parent fb02e65 commit b89839e
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ for-all-target: $(GOMODULES)

PATCHES := $(shell find ./patches -name *.patch)
apply-patches: $(PATCHES)
$(foreach patch,$(PATCHES), patch --posix --forward -p1 < $(patch);)
$(foreach patch,$(PATCHES), patch -V none --forward -p1 < $(patch);)

.PHONY: apply-patches

Expand Down
118 changes: 118 additions & 0 deletions patches/configgrpc.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
diff --git a/vendor/go.opentelemetry.io/collector/config/configgrpc/configgrpc.go b/vendor/go.opentelemetry.io/collector/config/configgrpc/configgrpc.go
index 87e7b83d7..e64b87142 100644
--- a/vendor/go.opentelemetry.io/collector/config/configgrpc/configgrpc.go
+++ b/vendor/go.opentelemetry.io/collector/config/configgrpc/configgrpc.go
@@ -12,7 +12,6 @@ import (
"time"

"github.com/mostynb/go-grpc-compression/nonclobbering/snappy"
- "github.com/mostynb/go-grpc-compression/nonclobbering/zstd"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"google.golang.org/grpc"
@@ -28,6 +27,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configcompression"
+ grpcInternal "go.opentelemetry.io/collector/config/configgrpc/internal"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtelemetry"
@@ -426,7 +426,7 @@ func getGRPCCompressionName(compressionType configcompression.Type) (string, err
case configcompression.TypeSnappy:
return snappy.Name, nil
case configcompression.TypeZstd:
- return zstd.Name, nil
+ return grpcInternal.ZstdName, nil
default:
return "", fmt.Errorf("unsupported compression type %q", compressionType)
}
diff --git /dev/null b/vendor/go.opentelemetry.io/collector/config/configgrpc/internal/zstd.go
new file mode 100644
index 000000000..0718b7353
--- /dev/null
+++ b/vendor/go.opentelemetry.io/collector/config/configgrpc/internal/zstd.go
@@ -0,0 +1,83 @@
+// Copyright The OpenTelemetry Authors
+// Copyright 2017 gRPC authors
+// SPDX-License-Identifier: Apache-2.0
+
+package internal // import "go.opentelemetry.io/collector/config/configgrpc/internal"
+
+import (
+ "errors"
+ "io"
+ "sync"
+
+ "github.com/klauspost/compress/zstd"
+ "google.golang.org/grpc/encoding"
+)
+
+const ZstdName = "zstd"
+
+func init() {
+ encoding.RegisterCompressor(NewZstdCodec())
+}
+
+type writer struct {
+ *zstd.Encoder
+ pool *sync.Pool
+}
+
+func NewZstdCodec() encoding.Compressor {
+ c := &compressor{}
+ c.poolCompressor.New = func() any {
+ zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), zstd.WithWindowSize(512*1024))
+ return &writer{Encoder: zw, pool: &c.poolCompressor}
+ }
+ return c
+}
+
+func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
+ z := c.poolCompressor.Get().(*writer)
+ z.Encoder.Reset(w)
+ return z, nil
+}
+
+func (z *writer) Close() error {
+ defer z.pool.Put(z)
+ return z.Encoder.Close()
+}
+
+type reader struct {
+ *zstd.Decoder
+ pool *sync.Pool
+}
+
+func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
+ z, inPool := c.poolDecompressor.Get().(*reader)
+ if !inPool {
+ newZ, err := zstd.NewReader(r)
+ if err != nil {
+ return nil, err
+ }
+ return &reader{Decoder: newZ, pool: &c.poolDecompressor}, nil
+ }
+ if err := z.Reset(r); err != nil {
+ c.poolDecompressor.Put(z)
+ return nil, err
+ }
+ return z, nil
+}
+
+func (z *reader) Read(p []byte) (n int, err error) {
+ n, err = z.Decoder.Read(p)
+ if errors.Is(err, io.EOF) {
+ z.pool.Put(z)
+ }
+ return n, err
+}
+
+func (c *compressor) Name() string {
+ return ZstdName
+}
+
+type compressor struct {
+ poolCompressor sync.Pool
+ poolDecompressor sync.Pool
+}
80 changes: 80 additions & 0 deletions patches/confighttp.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
diff --git a/vendor/go.opentelemetry.io/collector/config/confighttp/compression.go b/vendor/go.opentelemetry.io/collector/config/confighttp/compression.go
index 88ecafe78..a700bec84 100644
--- a/vendor/go.opentelemetry.io/collector/config/confighttp/compression.go
+++ b/vendor/go.opentelemetry.io/collector/config/confighttp/compression.go
@@ -67,24 +67,26 @@ func (r *compressRoundTripper) RoundTrip(req *http.Request) (*http.Response, err
}

type decompressor struct {
- errHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int)
- base http.Handler
- decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)
+ errHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int)
+ base http.Handler
+ decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)
+ maxRequestBodySize int64
}

// httpContentDecompressor offloads the task of handling compressed HTTP requests
// by identifying the compression format in the "Content-Encoding" header and re-writing
// request body so that the handlers further in the chain can work on decompressed data.
// It supports gzip and deflate/zlib compression.
-func httpContentDecompressor(h http.Handler, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int), decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)) http.Handler {
+func httpContentDecompressor(h http.Handler, maxRequestBodySize int64, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int), decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)) http.Handler {
errHandler := defaultErrorHandler
if eh != nil {
errHandler = eh
}

d := &decompressor{
- errHandler: errHandler,
- base: h,
+ maxRequestBodySize: maxRequestBodySize,
+ errHandler: errHandler,
+ base: h,
decoders: map[string]func(body io.ReadCloser) (io.ReadCloser, error){
"": func(io.ReadCloser) (io.ReadCloser, error) {
// Not a compressed payload. Nothing to do.
@@ -155,7 +157,7 @@ func (d *decompressor) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// "Content-Length" is set to -1 as the size of the decompressed body is unknown.
r.Header.Del("Content-Length")
r.ContentLength = -1
- r.Body = newBody
+ r.Body = http.MaxBytesReader(w, newBody, d.maxRequestBodySize)
}
d.base.ServeHTTP(w, r)
}
diff --git a/vendor/go.opentelemetry.io/collector/config/confighttp/confighttp.go b/vendor/go.opentelemetry.io/collector/config/confighttp/confighttp.go
index b210fa0dd..71b2f17ee 100644
--- a/vendor/go.opentelemetry.io/collector/config/confighttp/confighttp.go
+++ b/vendor/go.opentelemetry.io/collector/config/confighttp/confighttp.go
@@ -30,6 +30,7 @@ import (
)

const headerContentEncoding = "Content-Encoding"
+const defaultMaxRequestBodySize = 20 * 1024 * 1024 // 20MiB

// ClientConfig defines settings for creating an HTTP client.
type ClientConfig struct {
@@ -269,7 +270,7 @@ type ServerConfig struct {
// Auth for this receiver
Auth *configauth.Authentication `mapstructure:"auth"`

- // MaxRequestBodySize sets the maximum request body size in bytes
+ // MaxRequestBodySize sets the maximum request body size in bytes. Default: 20MiB.
MaxRequestBodySize int64 `mapstructure:"max_request_body_size"`

// IncludeMetadata propagates the client metadata from the incoming requests to the downstream consumers
@@ -340,7 +341,11 @@ func (hss *ServerConfig) ToServer(_ context.Context, host component.Host, settin
o(serverOpts)
}

- handler = httpContentDecompressor(handler, serverOpts.errHandler, serverOpts.decoders)
+ if hss.MaxRequestBodySize <= 0 {
+ hss.MaxRequestBodySize = defaultMaxRequestBodySize
+ }
+
+ handler = httpContentDecompressor(handler, hss.MaxRequestBodySize, serverOpts.errHandler, serverOpts.decoders)

if hss.MaxRequestBodySize > 0 {
handler = maxRequestBodySizeInterceptor(handler, hss.MaxRequestBodySize)

0 comments on commit b89839e

Please sign in to comment.