diff --git a/instrumentation/base/lib/opentelemetry/instrumentation/base.rb b/instrumentation/base/lib/opentelemetry/instrumentation/base.rb index dc2abd3ba..ae8aa60f4 100644 --- a/instrumentation/base/lib/opentelemetry/instrumentation/base.rb +++ b/instrumentation/base/lib/opentelemetry/instrumentation/base.rb @@ -166,13 +166,44 @@ def option(name, default:, validate:) def instance @instance || SINGLETON_MUTEX.synchronize do @instance ||= new(instrumentation_name, instrumentation_version, install_blk, - present_blk, compatible_blk, options) + present_blk, compatible_blk, options, instrument_configs) end end + if defined?(OpenTelemetry::Metrics) + %i[ + counter asynchronous_counter + histogram gauge asynchronous_gauge + updown_counter asynchronous_updown_counter + ].each do |instrument_kind| + define_method(instrument_kind) do |name, **opts| + register_instrument(instrument_kind, name, **opts) + end + end + + def register_instrument(kind, name, **opts) + @instrument_configs ||= {} + + key = [kind, name] + if @instrument_configs.key?(key) + warn("Duplicate instrument configured for #{self}: #{key.inspect}") + else + @instrument_configs[key] = opts + end + end + else + def counter(*, **); end + def asynchronous_counter(*, **); end + def histogram(*, **); end + def gauge(*, **); end + def asynchronous_gauge(*, **); end + def updown_counter(*, **); end + def asynchronous_updown_counter(*, **); end + end + private - attr_reader :install_blk, :present_blk, :compatible_blk, :options + attr_reader :install_blk, :present_blk, :compatible_blk, :options, :instrument_configs def infer_name @inferred_name ||= if (md = name.match(NAME_REGEX)) # rubocop:disable Naming/MemoizedInstanceVariableName @@ -192,13 +223,13 @@ def infer_version end end - attr_reader :name, :version, :config, :installed, :tracer, :meter + attr_reader :name, :version, :config, :installed, :tracer, :meter, :instrument_configs alias installed? installed # rubocop:disable Metrics/ParameterLists def initialize(name, version, install_blk, present_blk, - compatible_blk, options) + compatible_blk, options, instrument_configs) @name = name @version = version @install_blk = install_blk @@ -209,7 +240,8 @@ def initialize(name, version, install_blk, present_blk, @options = options @tracer = OpenTelemetry::Trace::Tracer.new # default no-op tracer - @meter = OpenTelemetry::Metrics::Meter.new if defined?(OpenTelemetry::Meter) # default no-op meter + @meter = OpenTelemetry::Metrics::Meter.new if defined?(OpenTelemetry::Metrics::Meter) # default no-op meter + @instrument_configs = instrument_configs || {} end # rubocop:enable Metrics/ParameterLists @@ -277,6 +309,7 @@ def metrics_enabled? end # @api private + # ONLY yields if the meter is enabled. def with_meter yield @meter if metrics_enabled? end diff --git a/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/instrumentation.rb b/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/instrumentation.rb index eff8e8e29..40f19dc48 100644 --- a/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/instrumentation.rb +++ b/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/instrumentation.rb @@ -109,26 +109,61 @@ class Instrumentation < OpenTelemetry::Instrumentation::Base option :peer_service, default: nil, validate: :string option :metrics, default: false, validate: :boolean + # FIXME: descriptions? + + if defined?(OpenTelemetry::Metrics) + counter 'messaging.client.sent.messages' + histogram 'messaging.client.operation.duration', unit: 's' # FIXME: UCUM::S + counter 'messaging.client.consumed.messages' + histogram 'messaging.process.duration', unit: 's' + + # FIXME: not semconv + gauge 'messaging.queue.latency', unit: 's' + end + # FIXME: upstream - def get_counter(name, description: nil) - return unless metrics_enabled? + def counter(name) + get_instrument(:counter, name) + end - binding.pry - # FIXME: structural keys - # FIXME: mutex counter creation (& reads?) - INSTRUMENTS[[name, description]] ||= meter.create_counter(name, description: description) + # FIXME: upstream + def histogram(name) + get_instrument(:histogram, name) + end + + # FIXME: upstream + def gauge(name) + get_instrument(:gauge, name) end private - # FIXME: upstream - INSTRUMENTS = {} + def get_instrument(kind, name) + return unless metrics_enabled? + + @instruments ||= {} + @instruments[[kind, name]] ||= create_configured_instrument(kind, name) + end + + def create_configured_instrument(kind, name) + config = @instrument_configs[[kind, name]] + + if config.nil? + Kernel.warn("unconfigured instrument requested: #{kind} of '#{name}'") + return + end + + # FIXME: some of these have different opts; + # should verify that they work before this point. + meter.public_send(:"create_#{kind}", name, **config) + end def gem_version Gem::Version.new(::Sidekiq::VERSION) end def require_dependencies + require_relative 'middlewares/common' require_relative 'middlewares/client/tracer_middleware' require_relative 'middlewares/server/tracer_middleware' diff --git a/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/client/tracer_middleware.rb b/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/client/tracer_middleware.rb index 99b39b077..0792cc895 100644 --- a/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/client/tracer_middleware.rb +++ b/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/client/tracer_middleware.rb @@ -34,27 +34,41 @@ def call(_worker_class, job, _queue, _redis_pool) span.add_event('created_at', timestamp: job['created_at']) yield end.tap do - # FIXME: is it possible to detect failures here? Does sidekiq bubble them up the middlewares? - with_meter do |meter| - counter_attributes = metrics_attributes(job).merge( - { - 'messaging.operation.name' => 'enqueue' # FIXME: metrics semconv - # server.address => # FIXME: required if available - # messaging.destination.partition.id => FIXME: recommended - # server.port => # FIXME: recommended - } - ) - - # FIXME: avoid create_counter repetition? - binding.pry - counter = instrumentation.get_counter('messaging.client.sent.messages') - counter.add(1, attributes: counter_attributes) - end + # FIXME: is it possible/necessary to detect failures here? Does sidekiq bubble them up the middlewares? + count_sent_message(job) end end private + def count_sent_message(job) + with_meter do |_meter| + counter_attributes = metrics_attributes(job).merge( + { + 'messaging.operation.name' => 'create' + # server.address => # FIXME: required if available + # messaging.destination.partition.id => FIXME: recommended + # server.port => # FIXME: recommended + } + ) + + counter = messaging_client_sent_messages_counter + counter.add(1, attributes: counter_attributes) + end + end + + def messaging_client_sent_messages_counter + instrumentation.counter('messaging.client.sent.messages') + end + + def messaging_client_operation_duration_histogram + instrumentation.histogram('messaging.client.operation.duration') + end + + def messaging_client_consumed_messages_counter + instrumentation.counter('messaging.client.consumed.messages') + end + def instrumentation Sidekiq::Instrumentation.instance end diff --git a/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware.rb b/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware.rb index 90da96ea3..a89f4bab6 100644 --- a/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware.rb +++ b/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware.rb @@ -12,6 +12,7 @@ module Server # TracerMiddleware propagates context and instruments Sidekiq requests # by way of its middleware system class TracerMiddleware + include OpenTelemetry::Instrumentation::Sidekiq::Middlewares::Common include ::Sidekiq::ServerMiddleware if defined?(::Sidekiq::ServerMiddleware) def call(_worker, msg, _queue) @@ -32,27 +33,31 @@ def call(_worker, msg, _queue) extracted_context = OpenTelemetry.propagation.extract(msg) OpenTelemetry::Context.with_current(extracted_context) do - if instrumentation_config[:propagation_style] == :child - tracer.in_span(span_name, attributes: attributes, kind: :consumer) do |span| - span.add_event('created_at', timestamp: msg['created_at']) - span.add_event('enqueued_at', timestamp: msg['enqueued_at']) - yield - end - else - links = [] - span_context = OpenTelemetry::Trace.current_span(extracted_context).context - links << OpenTelemetry::Trace::Link.new(span_context) if instrumentation_config[:propagation_style] == :link && span_context.valid? - span = tracer.start_root_span(span_name, attributes: attributes, links: links, kind: :consumer) - OpenTelemetry::Trace.with_span(span) do - span.add_event('created_at', timestamp: msg['created_at']) - span.add_event('enqueued_at', timestamp: msg['enqueued_at']) - yield - rescue Exception => e # rubocop:disable Lint/RescueException - span.record_exception(e) - span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}") - raise e - ensure - span.finish + track_queue_latency(msg) + + timed(track_process_time_callback(msg)) do + if instrumentation_config[:propagation_style] == :child + tracer.in_span(span_name, attributes: attributes, kind: :consumer) do |span| + span.add_event('created_at', timestamp: msg['created_at']) + span.add_event('enqueued_at', timestamp: msg['enqueued_at']) + yield + end + else + links = [] + span_context = OpenTelemetry::Trace.current_span(extracted_context).context + links << OpenTelemetry::Trace::Link.new(span_context) if instrumentation_config[:propagation_style] == :link && span_context.valid? + span = tracer.start_root_span(span_name, attributes: attributes, links: links, kind: :consumer) + OpenTelemetry::Trace.with_span(span) do + span.add_event('created_at', timestamp: msg['created_at']) + span.add_event('enqueued_at', timestamp: msg['enqueued_at']) + yield + rescue Exception => e # rubocop:disable Lint/RescueException + span.record_exception(e) + span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}") + raise e + ensure + span.finish + end end end end @@ -60,12 +65,47 @@ def call(_worker, msg, _queue) private - def instrumentation_config - Sidekiq::Instrumentation.instance.config + def track_queue_latency(msg) + with_meter do + return unless (enqueued_at = msg['enqueued_at']) + return unless enqueued_at.is_a?(Numeric) + + latency = (realtime_now - enqueued_at).abs + + queue_latency_gauge&.record(latency, attributes: metrics_attributes(msg)) + end + end + + def track_process_time_callback(msg) + ->(duration) { track_process_time(msg, duration) } end - def tracer - Sidekiq::Instrumentation.instance.tracer + def track_process_time(msg, duration) + with_meter do + attributes = metrics_attributes(msg).merge( + { 'messaging.operation.name' => 'process' } + ) + messaging_process_duration_histogram&.record(duration, attributes: attributes) + end + end + + def messaging_process_duration_histogram + instrumentation.histogram('messaging.process.duration') + end + + def queue_latency_gauge + instrumentation.gauge('messaging.queue.latency') + end + + # FIXME: dedupe + def metrics_attributes(msg) + { + 'messaging.system' => 'sidekiq', # FIXME: metrics semconv + 'messaging.destination.name' => msg['queue'] # FIXME: metrics semconv + # server.address => # FIXME: required if available + # messaging.destination.partition.id => FIXME: recommended + # server.port => # FIXME: recommended + } end end end