diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index f84848a52d4..e5718a2afff 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -133,7 +133,8 @@ by default uses only in-memory database.`, if err != nil { logger.Fatal("Failed to initialize collector", zap.Error(err)) } - qOpts, err := new(queryApp.QueryOptions).InitFromViper(v, logger) + defaultOpts := queryApp.DefaultQueryOptions() + qOpts, err := defaultOpts.InitFromViper(v, logger) if err != nil { logger.Fatal("Failed to configure query service", zap.Error(err)) } @@ -220,11 +221,11 @@ func startQuery( spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, telset.Metrics) qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts) - server, err := queryApp.NewServer(qs, metricsQueryService, qOpts, tm, telset) + server, err := queryApp.NewServer(context.Background(), qs, metricsQueryService, qOpts, tm, telset) if err != nil { svc.Logger.Fatal("Could not create jaeger-query", zap.Error(err)) } - if err := server.Start(); err != nil { + if err := server.Start(context.Background()); err != nil { svc.Logger.Fatal("Could not start jaeger-query", zap.Error(err)) } diff --git a/cmd/jaeger/internal/extension/jaegerquery/factory.go b/cmd/jaeger/internal/extension/jaegerquery/factory.go index 2404fa671d8..b5fff8b4ef8 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/factory.go +++ b/cmd/jaeger/internal/extension/jaegerquery/factory.go @@ -7,13 +7,9 @@ import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configgrpc" - "go.opentelemetry.io/collector/config/confighttp" - "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/extension" "github.com/jaegertracing/jaeger/cmd/query/app" - "github.com/jaegertracing/jaeger/ports" ) // componentType is the name of this extension in configuration. @@ -28,17 +24,7 @@ func NewFactory() extension.Factory { func createDefaultConfig() component.Config { return &Config{ - QueryOptions: app.QueryOptions{ - HTTP: confighttp.ServerConfig{ - Endpoint: ports.PortToHostPort(ports.QueryHTTP), - }, - GRPC: configgrpc.ServerConfig{ - NetAddr: confignet.AddrConfig{ - Endpoint: ports.PortToHostPort(ports.QueryGRPC), - Transport: confignet.TransportTypeTCP, - }, - }, - }, + QueryOptions: app.DefaultQueryOptions(), } } diff --git a/cmd/jaeger/internal/extension/jaegerquery/server.go b/cmd/jaeger/internal/extension/jaegerquery/server.go index 9d1a968d2cd..46bcfc4cc87 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server.go @@ -51,7 +51,7 @@ func (*server) Dependencies() []component.ID { return []component.ID{jaegerstorage.ID} } -func (s *server) Start(_ context.Context, host component.Host) error { +func (s *server) Start(ctx context.Context, host component.Host) error { mf := otelmetrics.NewFactory(s.telset.MeterProvider) baseFactory := mf.Namespace(metrics.NSOptions{Name: "jaeger"}) queryMetricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "query"}) @@ -100,11 +100,11 @@ func (s *server) Start(_ context.Context, host component.Host) error { ReportStatus: func(event *componentstatus.Event) { componentstatus.ReportStatus(host, event) }, + Host: host, } - // TODO contextcheck linter complains about next line that context is not passed. It is not wrong. - //nolint s.server, err = queryApp.NewServer( + ctx, // TODO propagate healthcheck updates up to the collector's runtime qs, mqs, @@ -116,7 +116,7 @@ func (s *server) Start(_ context.Context, host component.Host) error { return fmt.Errorf("could not create jaeger-query: %w", err) } - if err := s.server.Start(); err != nil { + if err := s.server.Start(ctx); err != nil { return fmt.Errorf("could not start jaeger-query: %w", err) } diff --git a/cmd/jaeger/internal/extension/jaegerquery/server_test.go b/cmd/jaeger/internal/extension/jaegerquery/server_test.go index 213ae878379..e17875a707a 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server_test.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/otel/metric" noopmetric "go.opentelemetry.io/otel/metric/noop" @@ -211,6 +212,7 @@ func TestServerStart(t *testing.T) { } tt.config.HTTP.Endpoint = ":0" tt.config.GRPC.NetAddr.Endpoint = ":0" + tt.config.GRPC.NetAddr.Transport = confignet.TransportTypeTCP server := newServer(tt.config, telemetrySettings) err := server.Start(context.Background(), host) if tt.expectedErr == "" { diff --git a/cmd/query/app/flags.go b/cmd/query/app/flags.go index 01cb8ae2e5e..c3fd2e0cdab 100644 --- a/cmd/query/app/flags.go +++ b/cmd/query/app/flags.go @@ -18,6 +18,7 @@ import ( "github.com/spf13/viper" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configopaque" "go.uber.org/zap" @@ -100,6 +101,11 @@ func AddFlags(flagSet *flag.FlagSet) { func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) (*QueryOptions, error) { qOpts.HTTP.Endpoint = v.GetString(queryHTTPHostPort) qOpts.GRPC.NetAddr.Endpoint = v.GetString(queryGRPCHostPort) + // TODO: drop support for same host ports + // https://github.com/jaegertracing/jaeger/issues/6117 + if qOpts.HTTP.Endpoint == qOpts.GRPC.NetAddr.Endpoint { + logger.Warn("using the same port for gRPC and HTTP is deprecated; please use dedicated ports instead; support for shared ports will be removed in Feb 2025") + } tlsGrpc, err := tlsGRPCFlagsConfig.InitFromViper(v) if err != nil { return qOpts, fmt.Errorf("failed to process gRPC TLS options: %w", err) @@ -169,3 +175,17 @@ func mapHTTPHeaderToOTELHeaders(h http.Header) map[string]configopaque.String { return otelHeaders } + +func DefaultQueryOptions() QueryOptions { + return QueryOptions{ + HTTP: confighttp.ServerConfig{ + Endpoint: ports.PortToHostPort(ports.QueryHTTP), + }, + GRPC: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: ports.PortToHostPort(ports.QueryGRPC), + Transport: confignet.TransportTypeTCP, + }, + }, + } +} diff --git a/cmd/query/app/flags_test.go b/cmd/query/app/flags_test.go index d18ece51699..c8e623ed07c 100644 --- a/cmd/query/app/flags_test.go +++ b/cmd/query/app/flags_test.go @@ -15,6 +15,7 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/config" + "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/ports" "github.com/jaegertracing/jaeger/storage/mocks" spanstore_mocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" @@ -183,3 +184,27 @@ func TestQueryOptions_FailedTLSFlags(t *testing.T) { }) } } + +func TestQueryOptions_SamePortsLogsWarning(t *testing.T) { + logger, logBuf := testutils.NewLogger() + v, command := config.Viperize(AddFlags) + command.ParseFlags([]string{ + "--query.http-server.host-port=127.0.0.1:8081", + "--query.grpc-server.host-port=127.0.0.1:8081", + }) + _, err := new(QueryOptions).InitFromViper(v, logger) + require.NoError(t, err) + + require.Contains( + t, + logBuf.String(), + "using the same port for gRPC and HTTP is deprecated", + ) +} + +func TestDefaultQueryOptions(t *testing.T) { + qo := DefaultQueryOptions() + require.Equal(t, ":16686", qo.HTTP.Endpoint) + require.Equal(t, ":16685", qo.GRPC.NetAddr.Endpoint) + require.EqualValues(t, "tcp", qo.GRPC.NetAddr.Transport) +} diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index 4c854096db4..aa4647590c7 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -16,7 +16,12 @@ import ( "github.com/gorilla/handlers" "github.com/soheilhy/cmux" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc" @@ -54,7 +59,9 @@ type Server struct { } // NewServer creates and initializes Server -func NewServer(querySvc *querysvc.QueryService, +func NewServer( + ctx context.Context, + querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, @@ -74,12 +81,18 @@ func NewServer(querySvc *querysvc.QueryService, return nil, errors.New("server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead") } - grpcServer, err := createGRPCServer(querySvc, metricsQuerySvc, options, tm, telset) + var grpcServer *grpc.Server + if separatePorts { + grpcServer, err = createGRPCServerLegacy(ctx, options, tm) + } else { + grpcServer, err = createGRPCServerOTEL(ctx, options, tm, telset) + } if err != nil { return nil, err } + registerGRPCHandlers(grpcServer, querySvc, metricsQuerySvc, telset) - httpServer, err := createHTTPServer(querySvc, metricsQuerySvc, options, tm, telset) + httpServer, err := createHTTPServer(ctx, querySvc, metricsQuerySvc, options, tm, telset) if err != nil { return nil, err } @@ -94,11 +107,15 @@ func NewServer(querySvc *querysvc.QueryService, }, nil } -func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, telset telemetery.Setting) (*grpc.Server, error) { +func createGRPCServerLegacy( + ctx context.Context, + options *QueryOptions, + tm *tenancy.Manager, +) (*grpc.Server, error) { var grpcOpts []grpc.ServerOption if options.GRPC.TLSSetting != nil { - tlsCfg, err := options.GRPC.TLSSetting.LoadTLSConfig(context.Background()) + tlsCfg, err := options.GRPC.TLSSetting.LoadTLSConfig(ctx) if err != nil { return nil, err } @@ -108,6 +125,7 @@ func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc. grpcOpts = append(grpcOpts, grpc.Creds(creds)) } if tm.Enabled { + //nolint:contextcheck grpcOpts = append(grpcOpts, grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tm)), grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tm)), @@ -115,8 +133,16 @@ func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc. } server := grpc.NewServer(grpcOpts...) - reflection.Register(server) + return server, nil +} +func registerGRPCHandlers( + server *grpc.Server, + querySvc *querysvc.QueryService, + metricsQuerySvc querysvc.MetricsQueryService, + telset telemetery.Setting, +) { + reflection.Register(server) handler := NewGRPCHandler(querySvc, metricsQuerySvc, GRPCHandlerOptions{ Logger: telset.Logger, }) @@ -131,7 +157,33 @@ func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc. healthServer.SetServingStatus("jaeger.api_v3.QueryService", grpc_health_v1.HealthCheckResponse_SERVING) grpc_health_v1.RegisterHealthServer(server, healthServer) - return server, nil +} + +func createGRPCServerOTEL( + ctx context.Context, + options *QueryOptions, + tm *tenancy.Manager, + telset telemetery.Setting, +) (*grpc.Server, error) { + var grpcOpts []configgrpc.ToServerOption + if tm.Enabled { + //nolint:contextcheck + grpcOpts = append(grpcOpts, + configgrpc.WithGrpcServerOption(grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tm))), + configgrpc.WithGrpcServerOption(grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tm))), + ) + } + return options.GRPC.ToServer( + ctx, + telset.Host, + component.TelemetrySettings{ + Logger: telset.Logger, + TracerProvider: telset.TracerProvider, + LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider { + return noop.NewMeterProvider() + }, + }, + grpcOpts...) } type httpServer struct { @@ -142,6 +194,7 @@ type httpServer struct { var _ io.Closer = (*httpServer)(nil) func createHTTPServer( + ctx context.Context, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, queryOpts *QueryOptions, @@ -189,7 +242,7 @@ func createHTTPServer( } if queryOpts.HTTP.TLSSetting != nil { - tlsCfg, err := queryOpts.HTTP.TLSSetting.LoadTLSConfig(context.Background()) // This checks if the certificates are correctly provided + tlsCfg, err := queryOpts.HTTP.TLSSetting.LoadTLSConfig(ctx) // This checks if the certificates are correctly provided if err != nil { return nil, err } @@ -209,10 +262,10 @@ func (hS httpServer) Close() error { } // initListener initialises listeners of the server -func (s *Server) initListener() (cmux.CMux, error) { +func (s *Server) initListener(ctx context.Context) (cmux.CMux, error) { if s.separatePorts { // use separate ports and listeners each for gRPC and HTTP requests var err error - s.grpcConn, err = net.Listen("tcp", s.queryOptions.GRPC.NetAddr.Endpoint) + s.grpcConn, err = s.queryOptions.GRPC.NetAddr.Listen(ctx) if err != nil { return nil, err } @@ -260,8 +313,8 @@ func (s *Server) initListener() (cmux.CMux, error) { } // Start http, GRPC and cmux servers concurrently -func (s *Server) Start() error { - cmuxServer, err := s.initListener() +func (s *Server) Start(ctx context.Context) error { + cmuxServer, err := s.initListener(ctx) if err != nil { return fmt.Errorf("query server failed to initialize listener: %w", err) } diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index ec7de1fc773..5a0d4216f47 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/confignet" @@ -47,6 +48,7 @@ func initTelSet(logger *zap.Logger, tracerProvider *jtracer.JTracer, hc *healthc Logger: logger, TracerProvider: tracerProvider.OTEL, ReportStatus: telemetery.HCAdapter(hc), + Host: componenttest.NewNopHost(), } } @@ -56,7 +58,7 @@ func TestServerError(t *testing.T) { HTTP: confighttp.ServerConfig{Endpoint: ":-1"}, }, } - require.Error(t, srv.Start()) + require.Error(t, srv.Start(context.Background())) } func TestCreateTLSServerSinglePortError(t *testing.T) { @@ -69,10 +71,10 @@ func TestCreateTLSServerSinglePortError(t *testing.T) { }, } telset := initTelSet(zaptest.NewLogger(t), jtracer.NoOp(), healthcheck.New()) - _, err := NewServer(&querysvc.QueryService{}, nil, + _, err := NewServer(context.Background(), &querysvc.QueryService{}, nil, &QueryOptions{ HTTP: confighttp.ServerConfig{Endpoint: ":8080", TLSSetting: &tlsCfg}, - GRPC: configgrpc.ServerConfig{NetAddr: confignet.AddrConfig{Endpoint: ":8080"}, TLSSetting: &tlsCfg}, + GRPC: configgrpc.ServerConfig{NetAddr: confignet.AddrConfig{Endpoint: ":8080", Transport: confignet.TransportTypeTCP}, TLSSetting: &tlsCfg}, }, tenancy.NewManager(&tenancy.Options{}), telset) require.Error(t, err) @@ -87,10 +89,10 @@ func TestCreateTLSGrpcServerError(t *testing.T) { }, } telset := initTelSet(zaptest.NewLogger(t), jtracer.NoOp(), healthcheck.New()) - _, err := NewServer(&querysvc.QueryService{}, nil, + _, err := NewServer(context.Background(), &querysvc.QueryService{}, nil, &QueryOptions{ HTTP: confighttp.ServerConfig{Endpoint: ":8080"}, - GRPC: configgrpc.ServerConfig{NetAddr: confignet.AddrConfig{Endpoint: ":8081"}, TLSSetting: &tlsCfg}, + GRPC: configgrpc.ServerConfig{NetAddr: confignet.AddrConfig{Endpoint: ":8081", Transport: confignet.TransportTypeTCP}, TLSSetting: &tlsCfg}, }, tenancy.NewManager(&tenancy.Options{}), telset) require.Error(t, err) @@ -105,10 +107,10 @@ func TestCreateTLSHttpServerError(t *testing.T) { }, } telset := initTelSet(zaptest.NewLogger(t), jtracer.NoOp(), healthcheck.New()) - _, err := NewServer(&querysvc.QueryService{}, nil, + _, err := NewServer(context.Background(), &querysvc.QueryService{}, nil, &QueryOptions{ HTTP: confighttp.ServerConfig{Endpoint: ":8080", TLSSetting: &tlsCfg}, - GRPC: configgrpc.ServerConfig{NetAddr: confignet.AddrConfig{Endpoint: ":8081"}}, + GRPC: configgrpc.ServerConfig{NetAddr: confignet.AddrConfig{Endpoint: ":8081", Transport: confignet.TransportTypeTCP}}, }, tenancy.NewManager(&tenancy.Options{}), telset) require.Error(t, err) } @@ -380,7 +382,8 @@ func TestServerHTTPTLS(t *testing.T) { }, GRPC: configgrpc.ServerConfig{ NetAddr: confignet.AddrConfig{ - Endpoint: ":0", + Endpoint: ":0", + Transport: confignet.TransportTypeTCP, }, TLSSetting: tlsGrpc, }, @@ -389,11 +392,11 @@ func TestServerHTTPTLS(t *testing.T) { flagsSvc.Logger = zaptest.NewLogger(t) telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC()) querySvc := makeQuerySvc() - server, err := NewServer(querySvc.qs, + server, err := NewServer(context.Background(), querySvc.qs, nil, serverOptions, tenancy.NewManager(&tenancy.Options{}), telset) require.NoError(t, err) - require.NoError(t, server.Start()) + require.NoError(t, server.Start(context.Background())) t.Cleanup(func() { require.NoError(t, server.Close()) }) @@ -517,7 +520,8 @@ func TestServerGRPCTLS(t *testing.T) { }, GRPC: configgrpc.ServerConfig{ NetAddr: confignet.AddrConfig{ - Endpoint: ":0", + Endpoint: ":0", + Transport: confignet.TransportTypeTCP, }, TLSSetting: test.TLS, }, @@ -527,11 +531,11 @@ func TestServerGRPCTLS(t *testing.T) { querySvc := makeQuerySvc() telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC()) - server, err := NewServer(querySvc.qs, + server, err := NewServer(context.Background(), querySvc.qs, nil, serverOptions, tenancy.NewManager(&tenancy.Options{}), telset) require.NoError(t, err) - require.NoError(t, server.Start()) + require.NoError(t, server.Start(context.Background())) t.Cleanup(func() { require.NoError(t, server.Close()) }) @@ -569,7 +573,7 @@ func TestServerGRPCTLS(t *testing.T) { func TestServerBadHostPort(t *testing.T) { telset := initTelSet(zaptest.NewLogger(t), jtracer.NoOp(), healthcheck.New()) - _, err := NewServer(&querysvc.QueryService{}, nil, + _, err := NewServer(context.Background(), &querysvc.QueryService{}, nil, &QueryOptions{ BearerTokenPropagation: true, HTTP: confighttp.ServerConfig{ @@ -577,7 +581,8 @@ func TestServerBadHostPort(t *testing.T) { }, GRPC: configgrpc.ServerConfig{ NetAddr: confignet.AddrConfig{ - Endpoint: "127.0.0.1:8081", + Endpoint: "127.0.0.1:8081", + Transport: confignet.TransportTypeTCP, }, }, }, @@ -585,7 +590,7 @@ func TestServerBadHostPort(t *testing.T) { telset) require.Error(t, err) - _, err = NewServer(&querysvc.QueryService{}, nil, + _, err = NewServer(context.Background(), &querysvc.QueryService{}, nil, &QueryOptions{ BearerTokenPropagation: true, HTTP: confighttp.ServerConfig{ @@ -593,7 +598,8 @@ func TestServerBadHostPort(t *testing.T) { }, GRPC: configgrpc.ServerConfig{ NetAddr: confignet.AddrConfig{ - Endpoint: "9123", // bad string, not :port + Endpoint: "9123", // bad string, not :port + Transport: confignet.TransportTypeTCP, }, }, }, @@ -620,6 +626,7 @@ func TestServerInUseHostPort(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { server, err := NewServer( + context.Background(), &querysvc.QueryService{}, nil, &QueryOptions{ @@ -629,7 +636,8 @@ func TestServerInUseHostPort(t *testing.T) { }, GRPC: configgrpc.ServerConfig{ NetAddr: confignet.AddrConfig{ - Endpoint: tc.grpcHostPort, + Endpoint: tc.grpcHostPort, + Transport: confignet.TransportTypeTCP, }, }, }, @@ -637,7 +645,7 @@ func TestServerInUseHostPort(t *testing.T) { telset, ) require.NoError(t, err) - require.Error(t, server.Start()) + require.Error(t, server.Start(context.Background())) server.Close() }) } @@ -649,7 +657,7 @@ func TestServerSinglePort(t *testing.T) { hostPort := ports.PortToHostPort(ports.QueryHTTP) querySvc := makeQuerySvc() telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC()) - server, err := NewServer(querySvc.qs, nil, + server, err := NewServer(context.Background(), querySvc.qs, nil, &QueryOptions{ BearerTokenPropagation: true, HTTP: confighttp.ServerConfig{ @@ -657,14 +665,15 @@ func TestServerSinglePort(t *testing.T) { }, GRPC: configgrpc.ServerConfig{ NetAddr: confignet.AddrConfig{ - Endpoint: hostPort, + Endpoint: hostPort, + Transport: confignet.TransportTypeTCP, }, }, }, tenancy.NewManager(&tenancy.Options{}), telset) require.NoError(t, err) - require.NoError(t, server.Start()) + require.NoError(t, server.Start(context.Background())) t.Cleanup(func() { require.NoError(t, server.Close()) }) @@ -694,20 +703,21 @@ func TestServerGracefulExit(t *testing.T) { querySvc := makeQuerySvc() telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC()) - server, err := NewServer(querySvc.qs, nil, + server, err := NewServer(context.Background(), querySvc.qs, nil, &QueryOptions{ HTTP: confighttp.ServerConfig{ Endpoint: hostPort, }, GRPC: configgrpc.ServerConfig{ NetAddr: confignet.AddrConfig{ - Endpoint: hostPort, + Endpoint: hostPort, + Transport: confignet.TransportTypeTCP, }, }, }, tenancy.NewManager(&tenancy.Options{}), telset) require.NoError(t, err) - require.NoError(t, server.Start()) + require.NoError(t, server.Start(context.Background())) // Wait for servers to come up before we can call .Close() { @@ -736,21 +746,22 @@ func TestServerHandlesPortZero(t *testing.T) { querySvc := &querysvc.QueryService{} telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC()) - server, err := NewServer(querySvc, nil, + server, err := NewServer(context.Background(), querySvc, nil, &QueryOptions{ HTTP: confighttp.ServerConfig{ Endpoint: ":0", }, GRPC: configgrpc.ServerConfig{ NetAddr: confignet.AddrConfig{ - Endpoint: ":0", + Endpoint: ":0", + Transport: confignet.TransportTypeTCP, }, }, }, tenancy.NewManager(&tenancy.Options{}), telset) require.NoError(t, err) - require.NoError(t, server.Start()) + require.NoError(t, server.Start(context.Background())) defer server.Close() message := logs.FilterMessage("Query server started") @@ -794,7 +805,8 @@ func TestServerHTTPTenancy(t *testing.T) { }, GRPC: configgrpc.ServerConfig{ NetAddr: confignet.AddrConfig{ - Endpoint: ":8080", + Endpoint: ":8080", + Transport: confignet.TransportTypeTCP, }, }, } @@ -802,10 +814,10 @@ func TestServerHTTPTenancy(t *testing.T) { querySvc := makeQuerySvc() querySvc.spanReader.On("FindTraces", mock.Anything, mock.Anything).Return([]*model.Trace{mockTrace}, nil).Once() telset := initTelSet(zaptest.NewLogger(t), jtracer.NoOp(), healthcheck.New()) - server, err := NewServer(querySvc.qs, + server, err := NewServer(context.Background(), querySvc.qs, nil, serverOptions, tenancyMgr, telset) require.NoError(t, err) - require.NoError(t, server.Start()) + require.NoError(t, server.Start(context.Background())) t.Cleanup(func() { require.NoError(t, server.Close()) }) diff --git a/cmd/query/app/token_propagation_test.go b/cmd/query/app/token_propagation_test.go index 8565667a19b..81ec248e4c6 100644 --- a/cmd/query/app/token_propagation_test.go +++ b/cmd/query/app/token_propagation_test.go @@ -4,6 +4,7 @@ package app import ( + "context" "encoding/json" "fmt" "net/http" @@ -89,7 +90,7 @@ func runQueryService(t *testing.T, esURL string) *Server { TracerProvider: jtracer.NoOp().OTEL, ReportStatus: telemetery.HCAdapter(flagsSvc.HC()), } - server, err := NewServer(querySvc, nil, + server, err := NewServer(context.Background(), querySvc, nil, &QueryOptions{ BearerTokenPropagation: true, HTTP: confighttp.ServerConfig{ @@ -97,7 +98,8 @@ func runQueryService(t *testing.T, esURL string) *Server { }, GRPC: configgrpc.ServerConfig{ NetAddr: confignet.AddrConfig{ - Endpoint: ":0", + Endpoint: ":0", + Transport: confignet.TransportTypeTCP, }, }, }, @@ -105,7 +107,7 @@ func runQueryService(t *testing.T, esURL string) *Server { telset, ) require.NoError(t, err) - require.NoError(t, server.Start()) + require.NoError(t, server.Start(context.Background())) return server } diff --git a/cmd/query/main.go b/cmd/query/main.go index 77f77b5e2b6..1e8197d3e3f 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -64,7 +64,8 @@ func main() { metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "query"}) version.NewInfoMetrics(metricsFactory) - queryOpts, err := new(app.QueryOptions).InitFromViper(v, logger) + defaultOpts := app.DefaultQueryOptions() + queryOpts, err := defaultOpts.InitFromViper(v, logger) if err != nil { logger.Fatal("Failed to configure query service", zap.Error(err)) } @@ -108,12 +109,12 @@ func main() { TracerProvider: jt.OTEL, ReportStatus: telemetery.HCAdapter(svc.HC()), } - server, err := app.NewServer(queryService, metricsQueryService, queryOpts, tm, telset) + server, err := app.NewServer(context.Background(), queryService, metricsQueryService, queryOpts, tm, telset) if err != nil { logger.Fatal("Failed to create server", zap.Error(err)) } - if err := server.Start(); err != nil { + if err := server.Start(context.Background()); err != nil { logger.Fatal("Could not start servers", zap.Error(err)) } diff --git a/pkg/telemetery/settings.go b/pkg/telemetery/settings.go index f93e561360f..eef11f4a77f 100644 --- a/pkg/telemetery/settings.go +++ b/pkg/telemetery/settings.go @@ -4,6 +4,7 @@ package telemetery import ( + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -17,6 +18,7 @@ type Setting struct { TracerProvider trace.TracerProvider Metrics metrics.Factory ReportStatus func(*componentstatus.Event) + Host component.Host } func HCAdapter(hc *healthcheck.HealthCheck) func(*componentstatus.Event) {