Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[api_v3][query] Change api_v3 http handler to use v2 query service #6459

Merged
merged 12 commits into from
Jan 3, 2025
4 changes: 2 additions & 2 deletions cmd/query/app/apiv3/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/jaegertracing/jaeger/model"
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
tracestoremocks "github.com/jaegertracing/jaeger/storage_v2/tracestore/mocks"
)

// Utility functions used from http_gateway_test.go.
Expand All @@ -38,7 +38,7 @@ const (
var regenerateSnapshots = os.Getenv("REGENERATE_SNAPSHOTS") == "true"

type testGateway struct {
reader *spanstoremocks.Reader
reader *tracestoremocks.Reader
url string
router *mux.Router
// used to set a tenancy header when executing requests
Expand Down
64 changes: 36 additions & 28 deletions cmd/query/app/apiv3/http_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ import (
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc"
"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/internal/proto/api_v3"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/iter"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

const (
Expand Down Expand Up @@ -108,18 +111,8 @@ func (h *HTTPGateway) tryParamError(w http.ResponseWriter, err error, paramName
return h.tryHandleError(w, fmt.Errorf("malformed parameter %s: %w", paramName, err), http.StatusBadRequest)
}

func (h *HTTPGateway) returnSpans(spans []*model.Span, w http.ResponseWriter) {
// modelToOTLP does not easily return an error, so allow mocking it
h.returnSpansTestable(spans, w, modelToOTLP)
}

func (h *HTTPGateway) returnSpansTestable(
spans []*model.Span,
w http.ResponseWriter,
modelToOTLP func(_ []*model.Span) ptrace.Traces,
) {
td := modelToOTLP(spans)
tracesData := api_v3.TracesData(td)
func (h *HTTPGateway) returnTrace(t ptrace.Traces, w http.ResponseWriter) {
tracesData := api_v3.TracesData(t)
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
response := &api_v3.GRPCGatewayWrapper{
Result: &tracesData,
}
Expand All @@ -137,9 +130,11 @@ func (h *HTTPGateway) getTrace(w http.ResponseWriter, r *http.Request) {
if h.tryParamError(w, err, paramTraceID) {
return
}
request := querysvc.GetTraceParameters{
GetTraceParameters: spanstore.GetTraceParameters{
TraceID: traceID,
request := querysvc.GetTraceParams{
TraceIDs: []tracestore.GetTraceParams{
{
TraceID: traceID.ToOTELTraceID(),
},
},
}
http_query := r.URL.Query()
Expand All @@ -149,15 +144,15 @@ func (h *HTTPGateway) getTrace(w http.ResponseWriter, r *http.Request) {
if h.tryParamError(w, err, paramStartTime) {
return
}
request.StartTime = timeParsed.UTC()
request.TraceIDs[0].Start = timeParsed.UTC()
}
endTime := http_query.Get(paramEndTime)
if endTime != "" {
timeParsed, err := time.Parse(time.RFC3339Nano, endTime)
if h.tryParamError(w, err, paramEndTime) {
return
}
request.EndTime = timeParsed.UTC()
request.TraceIDs[0].End = timeParsed.UTC()
}
if r := http_query.Get(paramRawTraces); r != "" {
rawTraces, err := strconv.ParseBool(r)
Expand All @@ -166,11 +161,18 @@ func (h *HTTPGateway) getTrace(w http.ResponseWriter, r *http.Request) {
}
request.RawTraces = rawTraces
}
trc, err := h.QueryService.GetTrace(r.Context(), request)
getTracesIter := h.QueryService.GetTraces(r.Context(), request)
aggrTracesIter := jptrace.AggregateTraces(getTracesIter)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to aggregate here? If QS needs to apply adjustments it would already aggregate, otherwise it will return raw chunks, and this API can just pass them through to the caller, it's not require to recombine them.

trc, err := iter.CollectWithErrors(aggrTracesIter)

if h.tryHandleError(w, err, http.StatusInternalServerError) {
return
}
h.returnSpans(trc.Spans, w)
if len(trc) == 0 {
// TODO: should we return 404 if trace not found?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think this is the contract of v3 API (which is unfortunate since it doesn't match the Storage v2 API contract, but we'll keep it for backwards compatibility).

return
}
h.returnTrace(trc[0], w)
}

func (h *HTTPGateway) findTraces(w http.ResponseWriter, r *http.Request) {
Expand All @@ -179,21 +181,27 @@ func (h *HTTPGateway) findTraces(w http.ResponseWriter, r *http.Request) {
return
}

traces, err := h.QueryService.FindTraces(r.Context(), queryParams)
findTracesIter := h.QueryService.FindTraces(r.Context(), *queryParams)
aggrTracesIter := jptrace.AggregateTraces(findTracesIter)
traces, err := iter.CollectWithErrors(aggrTracesIter)
// TODO how do we distinguish internal error from bad parameters for FindTrace?
if h.tryHandleError(w, err, http.StatusInternalServerError) {
return
}
var spans []*model.Span
combinedTrace := ptrace.NewTraces()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should have a serious TODO, since it's clearly the wrong thing todo now.

for _, t := range traces {
spans = append(spans, t.Spans...)
resources := t.ResourceSpans()
for i := 0; i < resources.Len(); i++ {
resource := resources.At(i)
resource.CopyTo(combinedTrace.ResourceSpans().AppendEmpty())
}
}
h.returnSpans(spans, w)
h.returnTrace(combinedTrace, w)
}

func (h *HTTPGateway) parseFindTracesQuery(q url.Values, w http.ResponseWriter) (*querysvc.TraceQueryParameters, bool) {
queryParams := &querysvc.TraceQueryParameters{
TraceQueryParameters: spanstore.TraceQueryParameters{
func (h *HTTPGateway) parseFindTracesQuery(q url.Values, w http.ResponseWriter) (*querysvc.TraceQueryParams, bool) {
queryParams := &querysvc.TraceQueryParams{
TraceQueryParams: tracestore.TraceQueryParams{
ServiceName: q.Get(paramServiceName),
OperationName: q.Get(paramOperationName),
Tags: nil, // most curiously not supported by grpc-gateway
Expand Down Expand Up @@ -262,7 +270,7 @@ func (h *HTTPGateway) getServices(w http.ResponseWriter, r *http.Request) {

func (h *HTTPGateway) getOperations(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
queryParams := spanstore.OperationQueryParameters{
queryParams := tracestore.OperationQueryParams{
ServiceName: query.Get("service"),
SpanKind: query.Get("span_kind"),
}
Expand Down
48 changes: 33 additions & 15 deletions cmd/query/app/apiv3/http_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,30 @@ import (
"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/iter"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
dependencyStoreMocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
tracestoremocks "github.com/jaegertracing/jaeger/storage_v2/tracestore/mocks"
)

func setupHTTPGatewayNoServer(
_ *testing.T,
basePath string,
) *testGateway {
gw := &testGateway{
reader: &spanstoremocks.Reader{},
reader: &tracestoremocks.Reader{},
}

q := querysvc.NewQueryService(v1adapter.NewTraceReader(gw.reader),
q := querysvc.NewQueryService(gw.reader,
&dependencyStoreMocks.Reader{},
querysvc.QueryServiceOptions{},
)
Expand Down Expand Up @@ -93,16 +96,28 @@ func TestHTTPGatewayTryHandleError(t *testing.T) {

func TestHTTPGatewayGetTrace(t *testing.T) {
traceId, _ := model.TraceIDFromString("123")

makeTestTraces := func() ptrace.Traces {
traces := ptrace.NewTraces()
resources := traces.ResourceSpans().AppendEmpty()
scopes := resources.ScopeSpans().AppendEmpty()
span := scopes.Spans().AppendEmpty()
span.SetName("test-span")
span.SetTraceID(traceId.ToOTELTraceID())
span.SetSpanID(pcommon.SpanID{1})
return traces
}

testCases := []struct {
name string
params map[string]string
expectedQuery spanstore.GetTraceParameters
expectedQuery tracestore.GetTraceParams
}{
{
name: "TestGetTrace",
params: map[string]string{},
expectedQuery: spanstore.GetTraceParameters{
TraceID: traceId,
expectedQuery: tracestore.GetTraceParams{
TraceID: traceId.ToOTELTraceID(),
},
},
{
Expand All @@ -111,10 +126,10 @@ func TestHTTPGatewayGetTrace(t *testing.T) {
"start_time": "2000-01-02T12:30:08.999999998Z",
"end_time": "2000-04-05T21:55:16.999999992+08:00",
},
expectedQuery: spanstore.GetTraceParameters{
TraceID: traceId,
StartTime: time.Date(2000, time.January, 0o2, 12, 30, 8, 999999998, time.UTC),
EndTime: time.Date(2000, time.April, 0o5, 13, 55, 16, 999999992, time.UTC),
expectedQuery: tracestore.GetTraceParams{
TraceID: traceId.ToOTELTraceID(),
Start: time.Date(2000, time.January, 0o2, 12, 30, 8, 999999998, time.UTC),
End: time.Date(2000, time.April, 0o5, 13, 55, 16, 999999992, time.UTC),
},
},
}
Expand All @@ -124,9 +139,12 @@ func TestHTTPGatewayGetTrace(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
gw := setupHTTPGatewayNoServer(t, "")

gw.reader.
On("GetTrace", matchContext, tc.expectedQuery).
Return(&model.Trace{}, nil).Once()
On("GetTraces", matchContext, tc.expectedQuery).
Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) {
yield([]ptrace.Traces{makeTestTraces()}, nil)
})).Once()

q := url.Values{}
for k, v := range tc.params {
Expand All @@ -141,7 +159,7 @@ func TestHTTPGatewayGetTrace(t *testing.T) {
require.NoError(t, err)
w := httptest.NewRecorder()
gw.router.ServeHTTP(w, r)
gw.reader.AssertCalled(t, "GetTrace", matchContext, tc.expectedQuery)
gw.reader.AssertCalled(t, "GetTraces", matchContext, tc.expectedQuery)
})
}
}
Expand Down
17 changes: 0 additions & 17 deletions cmd/query/app/apiv3/otlp_translator.go

This file was deleted.

14 changes: 8 additions & 6 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/query/app/apiv3"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
querysvcv2 "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc"
v2querysvc "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc"
"github.com/jaegertracing/jaeger/internal/proto/api_v3"
"github.com/jaegertracing/jaeger/pkg/bearertoken"
"github.com/jaegertracing/jaeger/pkg/netutils"
Expand Down Expand Up @@ -59,7 +59,7 @@ type Server struct {
func NewServer(
ctx context.Context,
querySvc *querysvc.QueryService,
v2QuerySvc *querysvcv2.QueryService,
v2QuerySvc *v2querysvc.QueryService,
metricsQuerySvc querysvc.MetricsQueryService,
options *QueryOptions,
tm *tenancy.Manager,
Expand All @@ -84,7 +84,7 @@ func NewServer(
return nil, err
}
registerGRPCHandlers(grpcServer, querySvc, v2QuerySvc, metricsQuerySvc, telset)
httpServer, err := createHTTPServer(ctx, querySvc, metricsQuerySvc, options, tm, telset)
httpServer, err := createHTTPServer(ctx, querySvc, v2QuerySvc, metricsQuerySvc, options, tm, telset)
if err != nil {
return nil, err
}
Expand All @@ -102,7 +102,7 @@ func NewServer(
func registerGRPCHandlers(
server *grpc.Server,
querySvc *querysvc.QueryService,
v2QuerySvc *querysvcv2.QueryService,
v2QuerySvc *v2querysvc.QueryService,
metricsQuerySvc querysvc.MetricsQueryService,
telset telemetry.Settings,
) {
Expand Down Expand Up @@ -167,6 +167,7 @@ var _ io.Closer = (*httpServer)(nil)

func initRouter(
querySvc *querysvc.QueryService,
v2QuerySvc *v2querysvc.QueryService,
metricsQuerySvc querysvc.MetricsQueryService,
queryOpts *QueryOptions,
tenancyMgr *tenancy.Manager,
Expand All @@ -187,7 +188,7 @@ func initRouter(
}

(&apiv3.HTTPGateway{
QueryService: querySvc,
QueryService: v2QuerySvc,
Logger: telset.Logger,
Tracer: telset.TracerProvider,
}).RegisterRoutes(r)
Expand All @@ -209,12 +210,13 @@ func initRouter(
func createHTTPServer(
ctx context.Context,
querySvc *querysvc.QueryService,
v2QuerySvc *v2querysvc.QueryService,
metricsQuerySvc querysvc.MetricsQueryService,
queryOpts *QueryOptions,
tm *tenancy.Manager,
telset telemetry.Settings,
) (*httpServer, error) {
handler, staticHandlerCloser := initRouter(querySvc, metricsQuerySvc, queryOpts, tm, telset)
handler, staticHandlerCloser := initRouter(querySvc, v2QuerySvc, metricsQuerySvc, queryOpts, tm, telset)
handler = recoveryhandler.NewRecoveryHandler(telset.Logger, true)(handler)
hs, err := queryOpts.HTTP.ToServer(
ctx,
Expand Down
Loading