Skip to content

Commit

Permalink
[api_v3][query] Change api_v3 http handler to use v2 query service (#…
Browse files Browse the repository at this point in the history
…6459)

## Which problem is this PR solving?
- Part of #6460

## Description of the changes
- This PR migrates the v3_api HTTP handler to use the new v2 query
service.

## How was this change tested?
- Added unit tests

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `npm run lint` and `npm run test`

---------

Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
  • Loading branch information
mahadzaryab1 authored Jan 3, 2025
1 parent d796192 commit 0790c2f
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 147 deletions.
57 changes: 21 additions & 36 deletions cmd/query/app/apiv3/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/proto/api_v3"
"github.com/jaegertracing/jaeger/model"
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/pkg/iter"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
tracestoremocks "github.com/jaegertracing/jaeger/storage_v2/tracestore/mocks"
)

// Utility functions used from http_gateway_test.go.
Expand All @@ -43,7 +43,7 @@ var (
)

type testGateway struct {
reader *spanstoremocks.Reader
reader *tracestoremocks.Reader
url string
router *mux.Router
// used to set a tenancy header when executing requests
Expand Down Expand Up @@ -85,25 +85,6 @@ func parseResponse(t *testing.T, body []byte, obj gogoproto.Message) {
require.NoError(t, gogojsonpb.Unmarshal(bytes.NewBuffer(body), obj))
}

func makeTestTrace() (*model.Trace, spanstore.GetTraceParameters) {
traceID := model.NewTraceID(150, 160)
query := spanstore.GetTraceParameters{TraceID: traceID}
return &model.Trace{
Spans: []*model.Span{
{
TraceID: traceID,
SpanID: model.NewSpanID(180),
OperationName: "foobar",
Tags: []model.KeyValue{
model.SpanKindTag(model.SpanKindServer),
model.Bool("error", true),
},
Process: &model.Process{},
},
},
}, query
}

func makeTestTraceV2() ptrace.Traces {
trace := ptrace.NewTraces()
resources := trace.ResourceSpans().AppendEmpty()
Expand All @@ -112,9 +93,9 @@ func makeTestTraceV2() ptrace.Traces {
spanA := scopes.Spans().AppendEmpty()
spanA.SetName("foobar")
spanA.SetTraceID(traceID)
spanA.SetSpanID(pcommon.SpanID([8]byte{180}))
spanA.SetSpanID(pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 2}))
spanA.SetKind(ptrace.SpanKindServer)
spanA.Attributes().PutBool("error", true)
spanA.Status().SetCode(ptrace.StatusCodeError)

return trace
}
Expand Down Expand Up @@ -145,10 +126,10 @@ func (gw *testGateway) runGatewayGetServices(t *testing.T) {
}

func (gw *testGateway) runGatewayGetOperations(t *testing.T) {
qp := spanstore.OperationQueryParameters{ServiceName: "foo", SpanKind: "server"}
qp := tracestore.OperationQueryParams{ServiceName: "foo", SpanKind: "server"}
gw.reader.
On("GetOperations", matchContext, qp).
Return([]spanstore.Operation{{Name: "get_users", SpanKind: "server"}}, nil).Once()
Return([]tracestore.Operation{{Name: "get_users", SpanKind: "server"}}, nil).Once()

body, statusCode := gw.execRequest(t, "/api/v3/operations?service=foo&span_kind=server")
require.Equal(t, http.StatusOK, statusCode)
Expand All @@ -162,21 +143,25 @@ func (gw *testGateway) runGatewayGetOperations(t *testing.T) {
}

func (gw *testGateway) runGatewayGetTrace(t *testing.T) {
trace, query := makeTestTrace()
gw.reader.On("GetTrace", matchContext, query).Return(trace, nil).Once()
gw.getTracesAndVerify(t, "/api/v3/traces/"+query.TraceID.String(), query.TraceID)
query := tracestore.GetTraceParams{TraceID: traceID}
gw.reader.
On("GetTraces", matchContext, query).
Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) {
yield([]ptrace.Traces{makeTestTraceV2()}, nil)
})).Once()
gw.getTracesAndVerify(t, "/api/v3/traces/1", traceID)
}

func (gw *testGateway) runGatewayFindTraces(t *testing.T) {
trace, query := makeTestTrace()
q, qp := mockFindQueries()
gw.reader.
On("FindTraces", matchContext, qp).
Return([]*model.Trace{trace}, nil).Once()
gw.getTracesAndVerify(t, "/api/v3/traces?"+q.Encode(), query.TraceID)
gw.reader.On("FindTraces", matchContext, qp).
Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) {
yield([]ptrace.Traces{makeTestTraceV2()}, nil)
})).Once()
gw.getTracesAndVerify(t, "/api/v3/traces?"+q.Encode(), traceID)
}

func (gw *testGateway) getTracesAndVerify(t *testing.T, url string, expectedTraceID model.TraceID) {
func (gw *testGateway) getTracesAndVerify(t *testing.T, url string, expectedTraceID pcommon.TraceID) {
body, statusCode := gw.execRequest(t, url)
require.Equal(t, http.StatusOK, statusCode, "response=%s", string(body))
body = gw.verifySnapshot(t, body)
Expand Down
5 changes: 1 addition & 4 deletions cmd/query/app/apiv3/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ import (
tracestoremocks "github.com/jaegertracing/jaeger/storage_v2/tracestore/mocks"
)

var (
matchContext = mock.AnythingOfType("*context.valueCtx")
matchGetTraceParameters = mock.AnythingOfType("spanstore.GetTraceParameters")
)
var matchContext = mock.AnythingOfType("*context.valueCtx")

func newGrpcServer(t *testing.T, handler *Handler) (*grpc.Server, net.Addr) {
server := grpc.NewServer()
Expand Down
86 changes: 50 additions & 36 deletions cmd/query/app/apiv3/http_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc"
"github.com/jaegertracing/jaeger/internal/proto/api_v3"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/iter"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

const (
Expand Down Expand Up @@ -108,24 +110,43 @@ func (h *HTTPGateway) tryParamError(w http.ResponseWriter, err error, paramName
return h.tryHandleError(w, fmt.Errorf("malformed parameter %s: %w", paramName, err), http.StatusBadRequest)
}

func (h *HTTPGateway) returnSpans(spans []*model.Span, w http.ResponseWriter) {
// modelToOTLP does not easily return an error, so allow mocking it
h.returnSpansTestable(spans, w, modelToOTLP)
}

func (h *HTTPGateway) returnSpansTestable(
spans []*model.Span,
w http.ResponseWriter,
modelToOTLP func(_ []*model.Span) ptrace.Traces,
) {
td := modelToOTLP(spans)
func (h *HTTPGateway) returnTrace(td ptrace.Traces, w http.ResponseWriter) {
tracesData := api_v3.TracesData(td)
response := &api_v3.GRPCGatewayWrapper{
Result: &tracesData,
}
h.marshalResponse(response, w)
}

func (h *HTTPGateway) returnTraces(traces []ptrace.Traces, err error, w http.ResponseWriter) {
// TODO how do we distinguish internal error from bad parameters?
if h.tryHandleError(w, err, http.StatusInternalServerError) {
return
}
if len(traces) == 0 {
errorResponse := api_v3.GRPCGatewayError{
Error: &api_v3.GRPCGatewayError_GRPCGatewayErrorDetails{
HttpCode: http.StatusNotFound,
Message: "No traces found",
},
}
resp, _ := json.Marshal(&errorResponse)
http.Error(w, string(resp), http.StatusNotFound)
return
}
// TODO: the response should be streamed back to the client
// https://github.com/jaegertracing/jaeger/issues/6467
combinedTrace := ptrace.NewTraces()
for _, t := range traces {
resources := t.ResourceSpans()
for i := 0; i < resources.Len(); i++ {
resource := resources.At(i)
resource.CopyTo(combinedTrace.ResourceSpans().AppendEmpty())
}
}
h.returnTrace(combinedTrace, w)
}

func (*HTTPGateway) marshalResponse(response proto.Message, w http.ResponseWriter) {
_ = new(jsonpb.Marshaler).Marshal(w, response)
}
Expand All @@ -137,9 +158,11 @@ func (h *HTTPGateway) getTrace(w http.ResponseWriter, r *http.Request) {
if h.tryParamError(w, err, paramTraceID) {
return
}
request := querysvc.GetTraceParameters{
GetTraceParameters: spanstore.GetTraceParameters{
TraceID: traceID,
request := querysvc.GetTraceParams{
TraceIDs: []tracestore.GetTraceParams{
{
TraceID: traceID.ToOTELTraceID(),
},
},
}
http_query := r.URL.Query()
Expand All @@ -149,15 +172,15 @@ func (h *HTTPGateway) getTrace(w http.ResponseWriter, r *http.Request) {
if h.tryParamError(w, err, paramStartTime) {
return
}
request.StartTime = timeParsed.UTC()
request.TraceIDs[0].Start = timeParsed.UTC()
}
endTime := http_query.Get(paramEndTime)
if endTime != "" {
timeParsed, err := time.Parse(time.RFC3339Nano, endTime)
if h.tryParamError(w, err, paramEndTime) {
return
}
request.EndTime = timeParsed.UTC()
request.TraceIDs[0].End = timeParsed.UTC()
}
if r := http_query.Get(paramRawTraces); r != "" {
rawTraces, err := strconv.ParseBool(r)
Expand All @@ -166,11 +189,9 @@ func (h *HTTPGateway) getTrace(w http.ResponseWriter, r *http.Request) {
}
request.RawTraces = rawTraces
}
trc, err := h.QueryService.GetTrace(r.Context(), request)
if h.tryHandleError(w, err, http.StatusInternalServerError) {
return
}
h.returnSpans(trc.Spans, w)
getTracesIter := h.QueryService.GetTraces(r.Context(), request)
trc, err := iter.FlattenWithErrors(getTracesIter)
h.returnTraces(trc, err, w)
}

func (h *HTTPGateway) findTraces(w http.ResponseWriter, r *http.Request) {
Expand All @@ -179,21 +200,14 @@ func (h *HTTPGateway) findTraces(w http.ResponseWriter, r *http.Request) {
return
}

traces, err := h.QueryService.FindTraces(r.Context(), queryParams)
// TODO how do we distinguish internal error from bad parameters for FindTrace?
if h.tryHandleError(w, err, http.StatusInternalServerError) {
return
}
var spans []*model.Span
for _, t := range traces {
spans = append(spans, t.Spans...)
}
h.returnSpans(spans, w)
findTracesIter := h.QueryService.FindTraces(r.Context(), *queryParams)
traces, err := iter.FlattenWithErrors(findTracesIter)
h.returnTraces(traces, err, w)
}

func (h *HTTPGateway) parseFindTracesQuery(q url.Values, w http.ResponseWriter) (*querysvc.TraceQueryParameters, bool) {
queryParams := &querysvc.TraceQueryParameters{
TraceQueryParameters: spanstore.TraceQueryParameters{
func (h *HTTPGateway) parseFindTracesQuery(q url.Values, w http.ResponseWriter) (*querysvc.TraceQueryParams, bool) {
queryParams := &querysvc.TraceQueryParams{
TraceQueryParams: tracestore.TraceQueryParams{
ServiceName: q.Get(paramServiceName),
OperationName: q.Get(paramOperationName),
Tags: nil, // most curiously not supported by grpc-gateway
Expand Down Expand Up @@ -262,7 +276,7 @@ func (h *HTTPGateway) getServices(w http.ResponseWriter, r *http.Request) {

func (h *HTTPGateway) getOperations(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
queryParams := spanstore.OperationQueryParameters{
queryParams := tracestore.OperationQueryParams{
ServiceName: query.Get("service"),
SpanKind: query.Get("span_kind"),
}
Expand Down
Loading

0 comments on commit 0790c2f

Please sign in to comment.