diff --git a/internal/component/pyroscope/receive_http/receive_http.go b/internal/component/pyroscope/receive_http/receive_http.go index c17868b427..4e969694dc 100644 --- a/internal/component/pyroscope/receive_http/receive_http.go +++ b/internal/component/pyroscope/receive_http/receive_http.go @@ -9,8 +9,10 @@ import ( "reflect" "sync" + "connectrpc.com/connect" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" "golang.org/x/sync/errgroup" "github.com/grafana/alloy/internal/component" @@ -20,6 +22,9 @@ import ( "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/util" + pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1" + "github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect" + typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" ) const ( @@ -137,14 +142,81 @@ func (c *Component) Update(args component.Arguments) error { c.server = srv return c.server.MountAndRun(func(router *mux.Router) { + // this mounts the og pyroscope ingest API, mostly used by SDKs router.HandleFunc("/ingest", c.handleIngest).Methods(http.MethodPost) + + // mount connect go pushv1 + pathPush, handlePush := pushv1connect.NewPusherServiceHandler(c) + router.PathPrefix(pathPush).Handler(handlePush).Methods(http.MethodPost) }) } -func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) { +func setLabelBuilderFromAPI(lb *labels.Builder, api []*typesv1.LabelPair) { + for i := range api { + lb.Set(api[i].Name, api[i].Value) + } +} + +func apiToAlloySamples(api []*pushv1.RawSample) []*pyroscope.RawSample { + var ( + alloy = make([]*pyroscope.RawSample, len(api)) + ) + for i := range alloy { + alloy[i] = &pyroscope.RawSample{ + RawProfile: api[i].RawProfile, + } + } + return alloy +} + +func (c *Component) Push(ctx context.Context, req *connect.Request[pushv1.PushRequest], +) (*connect.Response[pushv1.PushResponse], error) { + appendables := c.getAppendables() + + // Create an errgroup with the timeout context + g, ctx := errgroup.WithContext(ctx) + + // Start copying the request body to all pipes + for i := range appendables { + appendable := appendables[i].Appender() + g.Go(func() error { + var ( + errs error + lb = labels.NewBuilder(nil) + ) + + for idx := range req.Msg.Series { + lb.Reset(nil) + setLabelBuilderFromAPI(lb, req.Msg.Series[idx].Labels) + err := appendable.Append(ctx, lb.Labels(), apiToAlloySamples(req.Msg.Series[idx].Samples)) + if err != nil { + errs = errors.Join( + errs, + fmt.Errorf("unable to append series %s to appendable %d: %w", lb.Labels().String(), i, err), + ) + } + } + return errs + }) + } + if err := g.Wait(); err != nil { + level.Error(c.opts.Logger).Log("msg", "Failed to forward profiles requests", "err", err) + return nil, connect.NewError(connect.CodeInternal, err) + } + + level.Debug(c.opts.Logger).Log("msg", "Profiles successfully forwarded") + return connect.NewResponse(&pushv1.PushResponse{}), nil +} + +func (c *Component) getAppendables() []pyroscope.Appendable { c.mut.Lock() + defer c.mut.Unlock() appendables := c.appendables - c.mut.Unlock() + return appendables +} + +func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) { + appendables := c.getAppendables() // Create a pipe for each appendable pipeWriters := make([]io.Writer, len(appendables)) diff --git a/internal/component/pyroscope/receive_http/receive_http_test.go b/internal/component/pyroscope/receive_http/receive_http_test.go index 5c507016cb..628de88aec 100644 --- a/internal/component/pyroscope/receive_http/receive_http_test.go +++ b/internal/component/pyroscope/receive_http/receive_http_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/rand" + "errors" "fmt" "io" "net/http" @@ -11,6 +12,7 @@ import ( "testing" "time" + "connectrpc.com/connect" "github.com/phayes/freeport" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" @@ -20,13 +22,18 @@ import ( fnet "github.com/grafana/alloy/internal/component/common/net" "github.com/grafana/alloy/internal/component/pyroscope" "github.com/grafana/alloy/internal/util" + pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1" + "github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect" + typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" ) -// TestForwardsProfiles verifies the behavior of the pyroscope.receive_http component -// under various scenarios. It tests different profile sizes, HTTP methods, paths, -// query parameters, and error conditions to ensure correct forwarding behavior -// and proper error handling. -func TestForwardsProfiles(t *testing.T) { +// TestForwardsProfilesIngest verifies the behavior of the +// pyroscope.receive_http component under various scenarios. It tests different +// profile sizes, HTTP methods, paths, query parameters, and error conditions +// to ensure correct forwarding behavior and proper error handling, when +// clients use the legacy OG Pyroscope /ingest API, which is predominentaly +// used by the SDKs. +func TestForwardsProfilesIngest(t *testing.T) { tests := []struct { name string profileSize int @@ -135,6 +142,121 @@ func TestForwardsProfiles(t *testing.T) { } } +// TestForwardsProfilesPushV1 verifies the behavior of the +// pyroscope.receive_http using the connect pushv1 API. This is predominentaly +// used by other alloy components like pyrscope.ebpf. +func TestForwardsProfilesPushV1(t *testing.T) { + for _, tc := range []struct { + name string + clientOpts []connect.ClientOption + appendableErrors []error + + numSeries int + numSamplesPerSeries int + SampleSize int + + expectedSeries []string + expectedError error + }{ + { + name: "One series, one small profile, one appendables", + expectedSeries: []string{`{app="app-0"}`}, + }, + { + name: "Two series, one small profile, one appendables", + numSeries: 2, + expectedSeries: []string{ + `{app="app-0"}`, + `{app="app-1"}`, + }, + }, + { + name: "One series, two small profile, one appendable", + numSamplesPerSeries: 2, + expectedSeries: []string{`{app="app-0"}`}, + }, + { + name: "One series, two small profile, two appendable", + numSamplesPerSeries: 2, + appendableErrors: []error{nil, nil}, + expectedSeries: []string{`{app="app-0"}`}, + }, + { + name: "One series, two small profile, two appendable one with errors", + numSamplesPerSeries: 2, + appendableErrors: []error{nil, errors.New("wtf")}, + expectedSeries: []string{`{app="app-0"}`}, + expectedError: errors.New(`internal: unable to append series {app="app-0"} to appendable 1: wtf`), + }, + } { + t.Run(tc.name, func(t *testing.T) { + if tc.SampleSize == 0 { + tc.SampleSize = 1024 + } + if tc.numSeries == 0 { + tc.numSeries = 1 + } + if len(tc.appendableErrors) == 0 { + tc.appendableErrors = []error{nil} + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + appendables := createTestAppendables(tc.appendableErrors) + port := startComponent(t, appendables) + + c := pushv1connect.NewPusherServiceClient( + http.DefaultClient, + fmt.Sprintf("http://127.0.0.1:%d", port), + tc.clientOpts...) + + var series []*pushv1.RawProfileSeries + for i := 0; i < tc.numSeries; i++ { + var samples []*pushv1.RawSample + for j := 0; j < tc.numSamplesPerSeries; j++ { + samples = append(samples, &pushv1.RawSample{ + RawProfile: bytes.Repeat([]byte{0xde, 0xad}, tc.SampleSize/2), + }) + } + + series = append(series, &pushv1.RawProfileSeries{ + Labels: []*typesv1.LabelPair{ + {Name: "app", Value: fmt.Sprintf("app-%d", i)}, + }, + Samples: samples, + }) + } + + _, err := c.Push(ctx, connect.NewRequest(&pushv1.PushRequest{ + Series: series, + })) + if tc.expectedError != nil { + require.ErrorContains(t, err, tc.expectedError.Error()) + } else { + require.NoError(t, err) + } + + for idx := range appendables { + a := appendables[idx].(*testAppender) + + // check series match + require.Equal(t, a.series(), tc.expectedSeries) + + // check number of samples is correct + require.Equal(t, tc.numSeries*tc.numSamplesPerSeries, a.samples()) + + // check samples are received in full + for _, samples := range a.pushedSamples { + for _, sample := range samples { + require.Len(t, sample.RawProfile, tc.SampleSize) + } + } + } + }) + } +} + func createTestAppendables(errors []error) []pyroscope.Appendable { var appendables []pyroscope.Appendable for _, err := range errors { @@ -153,7 +275,13 @@ func countForwardedProfiles(appendables []pyroscope.Appendable) int { return count } -func verifyForwardedProfiles(t *testing.T, appendables []pyroscope.Appendable, expectedProfile []byte, expectedHeaders map[string]string, expectedQueryParams string) { +func verifyForwardedProfiles( + t *testing.T, + appendables []pyroscope.Appendable, + expectedProfile []byte, + expectedHeaders map[string]string, + expectedQueryParams string, +) { for i, app := range appendables { testApp, ok := app.(*testAppender) require.True(t, ok, "Appendable is not a testAppender") @@ -166,7 +294,14 @@ func verifyForwardedProfiles(t *testing.T, appendables []pyroscope.Appendable, e // Verify headers for key, value := range expectedHeaders { - require.Equal(t, value, testApp.lastProfile.Headers.Get(key), "Header mismatch for key %s in appendable %d", key, i) + require.Equal( + t, + value, + testApp.lastProfile.Headers.Get(key), + "Header mismatch for key %s in appendable %d", + key, + i, + ) } // Verify query parameters @@ -208,7 +343,13 @@ func startComponent(t *testing.T, appendables []pyroscope.Appendable) int { return port } -func sendCustomRequest(t *testing.T, port int, method, path, queryParams string, headers map[string]string, profileSize int) ([]byte, *http.Response) { +func sendCustomRequest( + t *testing.T, + port int, + method, path, queryParams string, + headers map[string]string, + profileSize int, +) ([]byte, *http.Response) { t.Helper() testProfile := make([]byte, profileSize) _, err := rand.Read(testProfile) @@ -251,14 +392,35 @@ func testAppendable(appendErr error) pyroscope.Appendable { type testAppender struct { appendErr error lastProfile *pyroscope.IncomingProfile + + pushedLabels []labels.Labels + pushedSamples [][]*pyroscope.RawSample +} + +func (a *testAppender) samples() int { + var c = 0 + for _, x := range a.pushedSamples { + c += len(x) + } + return c +} + +func (a *testAppender) series() []string { + var series []string + for _, labels := range a.pushedLabels { + series = append(series, labels.String()) + } + return series } func (a *testAppender) Appender() pyroscope.Appender { return a } -func (a *testAppender) Append(_ context.Context, _ labels.Labels, _ []*pyroscope.RawSample) error { - return fmt.Errorf("Append method not implemented for test") +func (a *testAppender) Append(_ context.Context, lbls labels.Labels, samples []*pyroscope.RawSample) error { + a.pushedLabels = append(a.pushedLabels, lbls) + a.pushedSamples = append(a.pushedSamples, samples) + return a.appendErr } func (a *testAppender) AppendIngest(_ context.Context, profile *pyroscope.IncomingProfile) error { @@ -288,7 +450,7 @@ func testOptions(t *testing.T) component.Options { } } -// TestUpdateArgs verifies that the component can be updated with new arguments. This explictly also makes sure that the server is restarted when the server configuration changes. And there are no metric registration conflicts. +// TestUpdateArgs verifies that the component can be updated with new arguments. This explicitly also makes sure that the server is restarted when the server configuration changes. And there are no metric registration conflicts. func TestUpdateArgs(t *testing.T) { ports, err := freeport.GetFreePorts(2) require.NoError(t, err)