Skip to content

Commit

Permalink
Support pushv1 in receive_http
Browse files Browse the repository at this point in the history
  • Loading branch information
simonswine committed Jan 17, 2025
1 parent c58c114 commit 7e2e94e
Show file tree
Hide file tree
Showing 2 changed files with 247 additions and 13 deletions.
76 changes: 74 additions & 2 deletions internal/component/pyroscope/receive_http/receive_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"reflect"
"sync"

"connectrpc.com/connect"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"golang.org/x/sync/errgroup"

"github.com/grafana/alloy/internal/component"
Expand All @@ -20,6 +22,9 @@ import (
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/internal/util"
pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
"github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
)

const (
Expand Down Expand Up @@ -137,14 +142,81 @@ func (c *Component) Update(args component.Arguments) error {
c.server = srv

return c.server.MountAndRun(func(router *mux.Router) {
// this mounts the og pyroscope ingest API, mostly used by SDKs
router.HandleFunc("/ingest", c.handleIngest).Methods(http.MethodPost)

// mount connect go pushv1
pathPush, handlePush := pushv1connect.NewPusherServiceHandler(c)
router.PathPrefix(pathPush).Handler(handlePush).Methods(http.MethodPost)
})
}

func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) {
func setLabelBuilderFromAPI(lb *labels.Builder, api []*typesv1.LabelPair) {
for i := range api {
lb.Set(api[i].Name, api[i].Value)
}
}

func apiToAlloySamples(api []*pushv1.RawSample) []*pyroscope.RawSample {
var (
alloy = make([]*pyroscope.RawSample, len(api))
)
for i := range alloy {
alloy[i] = &pyroscope.RawSample{
RawProfile: api[i].RawProfile,
}
}
return alloy
}

func (c *Component) Push(ctx context.Context, req *connect.Request[pushv1.PushRequest],
) (*connect.Response[pushv1.PushResponse], error) {
appendables := c.getAppendables()

// Create an errgroup with the timeout context
g, ctx := errgroup.WithContext(ctx)

// Start copying the request body to all pipes
for i := range appendables {
appendable := appendables[i].Appender()
g.Go(func() error {
var (
errs error
lb = labels.NewBuilder(nil)
)

for idx := range req.Msg.Series {
lb.Reset(nil)
setLabelBuilderFromAPI(lb, req.Msg.Series[idx].Labels)
err := appendable.Append(ctx, lb.Labels(), apiToAlloySamples(req.Msg.Series[idx].Samples))
if err != nil {
errs = errors.Join(
errs,
fmt.Errorf("unable to append series %s to appendable %d: %w", lb.Labels().String(), i, err),
)
}
}
return errs
})
}
if err := g.Wait(); err != nil {
level.Error(c.opts.Logger).Log("msg", "Failed to forward profiles requests", "err", err)
return nil, connect.NewError(connect.CodeInternal, err)
}

level.Debug(c.opts.Logger).Log("msg", "Profiles successfully forwarded")
return connect.NewResponse(&pushv1.PushResponse{}), nil
}

func (c *Component) getAppendables() []pyroscope.Appendable {
c.mut.Lock()
defer c.mut.Unlock()
appendables := c.appendables
c.mut.Unlock()
return appendables
}

func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) {
appendables := c.getAppendables()

// Create a pipe for each appendable
pipeWriters := make([]io.Writer, len(appendables))
Expand Down
184 changes: 173 additions & 11 deletions internal/component/pyroscope/receive_http/receive_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"bytes"
"context"
"crypto/rand"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"testing"
"time"

"connectrpc.com/connect"
"github.com/phayes/freeport"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -20,13 +22,18 @@ import (
fnet "github.com/grafana/alloy/internal/component/common/net"
"github.com/grafana/alloy/internal/component/pyroscope"
"github.com/grafana/alloy/internal/util"
pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
"github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
)

// TestForwardsProfiles verifies the behavior of the pyroscope.receive_http component
// under various scenarios. It tests different profile sizes, HTTP methods, paths,
// query parameters, and error conditions to ensure correct forwarding behavior
// and proper error handling.
func TestForwardsProfiles(t *testing.T) {
// TestForwardsProfilesIngest verifies the behavior of the
// pyroscope.receive_http component under various scenarios. It tests different
// profile sizes, HTTP methods, paths, query parameters, and error conditions
// to ensure correct forwarding behavior and proper error handling, when
// clients use the legacy OG Pyroscope /ingest API, which is predominentaly
// used by the SDKs.
func TestForwardsProfilesIngest(t *testing.T) {
tests := []struct {
name string
profileSize int
Expand Down Expand Up @@ -135,6 +142,121 @@ func TestForwardsProfiles(t *testing.T) {
}
}

// TestForwardsProfilesPushV1 verifies the behavior of the
// pyroscope.receive_http using the connect pushv1 API. This is predominentaly
// used by other alloy components like pyrscope.ebpf.
func TestForwardsProfilesPushV1(t *testing.T) {
for _, tc := range []struct {
name string
clientOpts []connect.ClientOption
appendableErrors []error

numSeries int
numSamplesPerSeries int
SampleSize int

expectedSeries []string
expectedError error
}{
{
name: "One series, one small profile, one appendables",
expectedSeries: []string{`{app="app-0"}`},
},
{
name: "Two series, one small profile, one appendables",
numSeries: 2,
expectedSeries: []string{
`{app="app-0"}`,
`{app="app-1"}`,
},
},
{
name: "One series, two small profile, one appendable",
numSamplesPerSeries: 2,
expectedSeries: []string{`{app="app-0"}`},
},
{
name: "One series, two small profile, two appendable",
numSamplesPerSeries: 2,
appendableErrors: []error{nil, nil},
expectedSeries: []string{`{app="app-0"}`},
},
{
name: "One series, two small profile, two appendable one with errors",
numSamplesPerSeries: 2,
appendableErrors: []error{nil, errors.New("wtf")},
expectedSeries: []string{`{app="app-0"}`},
expectedError: errors.New(`internal: unable to append series {app="app-0"} to appendable 1: wtf`),
},
} {
t.Run(tc.name, func(t *testing.T) {
if tc.SampleSize == 0 {
tc.SampleSize = 1024
}
if tc.numSeries == 0 {
tc.numSeries = 1
}
if len(tc.appendableErrors) == 0 {
tc.appendableErrors = []error{nil}
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

appendables := createTestAppendables(tc.appendableErrors)
port := startComponent(t, appendables)

c := pushv1connect.NewPusherServiceClient(
http.DefaultClient,
fmt.Sprintf("http://127.0.0.1:%d", port),
tc.clientOpts...)

var series []*pushv1.RawProfileSeries
for i := 0; i < tc.numSeries; i++ {
var samples []*pushv1.RawSample
for j := 0; j < tc.numSamplesPerSeries; j++ {
samples = append(samples, &pushv1.RawSample{
RawProfile: bytes.Repeat([]byte{0xde, 0xad}, tc.SampleSize/2),
})
}

series = append(series, &pushv1.RawProfileSeries{
Labels: []*typesv1.LabelPair{
{Name: "app", Value: fmt.Sprintf("app-%d", i)},
},
Samples: samples,
})
}

_, err := c.Push(ctx, connect.NewRequest(&pushv1.PushRequest{
Series: series,
}))
if tc.expectedError != nil {
require.ErrorContains(t, err, tc.expectedError.Error())
} else {
require.NoError(t, err)
}

for idx := range appendables {
a := appendables[idx].(*testAppender)

// check series match
require.Equal(t, a.series(), tc.expectedSeries)

// check number of samples is correct
require.Equal(t, tc.numSeries*tc.numSamplesPerSeries, a.samples())

// check samples are received in full
for _, samples := range a.pushedSamples {
for _, sample := range samples {
require.Len(t, sample.RawProfile, tc.SampleSize)
}
}
}
})
}
}

func createTestAppendables(errors []error) []pyroscope.Appendable {
var appendables []pyroscope.Appendable
for _, err := range errors {
Expand All @@ -153,7 +275,13 @@ func countForwardedProfiles(appendables []pyroscope.Appendable) int {
return count
}

func verifyForwardedProfiles(t *testing.T, appendables []pyroscope.Appendable, expectedProfile []byte, expectedHeaders map[string]string, expectedQueryParams string) {
func verifyForwardedProfiles(
t *testing.T,
appendables []pyroscope.Appendable,
expectedProfile []byte,
expectedHeaders map[string]string,
expectedQueryParams string,
) {
for i, app := range appendables {
testApp, ok := app.(*testAppender)
require.True(t, ok, "Appendable is not a testAppender")
Expand All @@ -166,7 +294,14 @@ func verifyForwardedProfiles(t *testing.T, appendables []pyroscope.Appendable, e

// Verify headers
for key, value := range expectedHeaders {
require.Equal(t, value, testApp.lastProfile.Headers.Get(key), "Header mismatch for key %s in appendable %d", key, i)
require.Equal(
t,
value,
testApp.lastProfile.Headers.Get(key),
"Header mismatch for key %s in appendable %d",
key,
i,
)
}

// Verify query parameters
Expand Down Expand Up @@ -208,7 +343,13 @@ func startComponent(t *testing.T, appendables []pyroscope.Appendable) int {
return port
}

func sendCustomRequest(t *testing.T, port int, method, path, queryParams string, headers map[string]string, profileSize int) ([]byte, *http.Response) {
func sendCustomRequest(
t *testing.T,
port int,
method, path, queryParams string,
headers map[string]string,
profileSize int,
) ([]byte, *http.Response) {
t.Helper()
testProfile := make([]byte, profileSize)
_, err := rand.Read(testProfile)
Expand Down Expand Up @@ -251,14 +392,35 @@ func testAppendable(appendErr error) pyroscope.Appendable {
type testAppender struct {
appendErr error
lastProfile *pyroscope.IncomingProfile

pushedLabels []labels.Labels
pushedSamples [][]*pyroscope.RawSample
}

func (a *testAppender) samples() int {
var c = 0
for _, x := range a.pushedSamples {
c += len(x)
}
return c
}

func (a *testAppender) series() []string {
var series []string
for _, labels := range a.pushedLabels {
series = append(series, labels.String())
}
return series
}

func (a *testAppender) Appender() pyroscope.Appender {
return a
}

func (a *testAppender) Append(_ context.Context, _ labels.Labels, _ []*pyroscope.RawSample) error {
return fmt.Errorf("Append method not implemented for test")
func (a *testAppender) Append(_ context.Context, lbls labels.Labels, samples []*pyroscope.RawSample) error {
a.pushedLabels = append(a.pushedLabels, lbls)
a.pushedSamples = append(a.pushedSamples, samples)
return a.appendErr
}

func (a *testAppender) AppendIngest(_ context.Context, profile *pyroscope.IncomingProfile) error {
Expand Down Expand Up @@ -288,7 +450,7 @@ func testOptions(t *testing.T) component.Options {
}
}

// TestUpdateArgs verifies that the component can be updated with new arguments. This explictly also makes sure that the server is restarted when the server configuration changes. And there are no metric registration conflicts.
// TestUpdateArgs verifies that the component can be updated with new arguments. This explicitly also makes sure that the server is restarted when the server configuration changes. And there are no metric registration conflicts.
func TestUpdateArgs(t *testing.T) {
ports, err := freeport.GetFreePorts(2)
require.NoError(t, err)
Expand Down

0 comments on commit 7e2e94e

Please sign in to comment.