Skip to content

Commit

Permalink
speculative API for metrics in instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
zvkemp committed Dec 19, 2024
1 parent a745666 commit 0abb927
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 54 deletions.
43 changes: 38 additions & 5 deletions instrumentation/base/lib/opentelemetry/instrumentation/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,44 @@ def option(name, default:, validate:)
def instance
@instance || SINGLETON_MUTEX.synchronize do
@instance ||= new(instrumentation_name, instrumentation_version, install_blk,
present_blk, compatible_blk, options)
present_blk, compatible_blk, options, instrument_configs)
end
end

if defined?(OpenTelemetry::Metrics)
%i[
counter asynchronous_counter
histogram gauge asynchronous_gauge
updown_counter asynchronous_updown_counter
].each do |instrument_kind|
define_method(instrument_kind) do |name, **opts|
register_instrument(instrument_kind, name, **opts)
end
end

def register_instrument(kind, name, **opts)
@instrument_configs ||= {}

key = [kind, name]
if @instrument_configs.key?(key)
warn("Duplicate instrument configured for #{self}: #{key.inspect}")
else
@instrument_configs[key] = opts
end
end
else
def counter(*, **); end
def asynchronous_counter(*, **); end
def histogram(*, **); end
def gauge(*, **); end
def asynchronous_gauge(*, **); end
def updown_counter(*, **); end
def asynchronous_updown_counter(*, **); end
end

private

attr_reader :install_blk, :present_blk, :compatible_blk, :options
attr_reader :install_blk, :present_blk, :compatible_blk, :options, :instrument_configs

def infer_name
@inferred_name ||= if (md = name.match(NAME_REGEX)) # rubocop:disable Naming/MemoizedInstanceVariableName
Expand All @@ -192,13 +223,13 @@ def infer_version
end
end

attr_reader :name, :version, :config, :installed, :tracer, :meter
attr_reader :name, :version, :config, :installed, :tracer, :meter, :instrument_configs

alias installed? installed

# rubocop:disable Metrics/ParameterLists
def initialize(name, version, install_blk, present_blk,
compatible_blk, options)
compatible_blk, options, instrument_configs)
@name = name
@version = version
@install_blk = install_blk
Expand All @@ -209,7 +240,8 @@ def initialize(name, version, install_blk, present_blk,
@options = options
@tracer = OpenTelemetry::Trace::Tracer.new # default no-op tracer

@meter = OpenTelemetry::Metrics::Meter.new if defined?(OpenTelemetry::Meter) # default no-op meter
@meter = OpenTelemetry::Metrics::Meter.new if defined?(OpenTelemetry::Metrics::Meter) # default no-op meter
@instrument_configs = instrument_configs || {}
end
# rubocop:enable Metrics/ParameterLists

Expand Down Expand Up @@ -277,6 +309,7 @@ def metrics_enabled?
end

# @api private
# ONLY yields if the meter is enabled.
def with_meter
yield @meter if metrics_enabled?
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,26 +109,61 @@ class Instrumentation < OpenTelemetry::Instrumentation::Base
option :peer_service, default: nil, validate: :string
option :metrics, default: false, validate: :boolean

# FIXME: descriptions?

if defined?(OpenTelemetry::Metrics)
counter 'messaging.client.sent.messages'
histogram 'messaging.client.operation.duration', unit: 's' # FIXME: UCUM::S
counter 'messaging.client.consumed.messages'
histogram 'messaging.process.duration', unit: 's'

# FIXME: not semconv
gauge 'messaging.queue.latency', unit: 's'
end

# FIXME: upstream
def get_counter(name, description: nil)
return unless metrics_enabled?
def counter(name)
get_instrument(:counter, name)
end

binding.pry
# FIXME: structural keys
# FIXME: mutex counter creation (& reads?)
INSTRUMENTS[[name, description]] ||= meter.create_counter(name, description: description)
# FIXME: upstream
def histogram(name)
get_instrument(:histogram, name)
end

# FIXME: upstream
def gauge(name)
get_instrument(:gauge, name)
end

private

# FIXME: upstream
INSTRUMENTS = {}
def get_instrument(kind, name)
return unless metrics_enabled?

@instruments ||= {}
@instruments[[kind, name]] ||= create_configured_instrument(kind, name)
end

def create_configured_instrument(kind, name)
config = @instrument_configs[[kind, name]]

if config.nil?
Kernel.warn("unconfigured instrument requested: #{kind} of '#{name}'")
return
end

# FIXME: some of these have different opts;
# should verify that they work before this point.
meter.public_send(:"create_#{kind}", name, **config)
end

def gem_version
Gem::Version.new(::Sidekiq::VERSION)
end

def require_dependencies
require_relative 'middlewares/common'
require_relative 'middlewares/client/tracer_middleware'
require_relative 'middlewares/server/tracer_middleware'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,41 @@ def call(_worker_class, job, _queue, _redis_pool)
span.add_event('created_at', timestamp: job['created_at'])
yield
end.tap do
# FIXME: is it possible to detect failures here? Does sidekiq bubble them up the middlewares?
with_meter do |meter|
counter_attributes = metrics_attributes(job).merge(
{
'messaging.operation.name' => 'enqueue' # FIXME: metrics semconv
# server.address => # FIXME: required if available
# messaging.destination.partition.id => FIXME: recommended
# server.port => # FIXME: recommended
}
)

# FIXME: avoid create_counter repetition?
binding.pry
counter = instrumentation.get_counter('messaging.client.sent.messages')
counter.add(1, attributes: counter_attributes)
end
# FIXME: is it possible/necessary to detect failures here? Does sidekiq bubble them up the middlewares?
count_sent_message(job)
end
end

private

def count_sent_message(job)
with_meter do |_meter|
counter_attributes = metrics_attributes(job).merge(
{
'messaging.operation.name' => 'create'
# server.address => # FIXME: required if available
# messaging.destination.partition.id => FIXME: recommended
# server.port => # FIXME: recommended
}
)

counter = messaging_client_sent_messages_counter
counter.add(1, attributes: counter_attributes)
end
end

def messaging_client_sent_messages_counter
instrumentation.counter('messaging.client.sent.messages')
end

def messaging_client_operation_duration_histogram
instrumentation.histogram('messaging.client.operation.duration')
end

def messaging_client_consumed_messages_counter
instrumentation.counter('messaging.client.consumed.messages')
end

def instrumentation
Sidekiq::Instrumentation.instance
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Server
# TracerMiddleware propagates context and instruments Sidekiq requests
# by way of its middleware system
class TracerMiddleware
include OpenTelemetry::Instrumentation::Sidekiq::Middlewares::Common
include ::Sidekiq::ServerMiddleware if defined?(::Sidekiq::ServerMiddleware)

def call(_worker, msg, _queue)
Expand All @@ -32,40 +33,79 @@ def call(_worker, msg, _queue)

extracted_context = OpenTelemetry.propagation.extract(msg)
OpenTelemetry::Context.with_current(extracted_context) do
if instrumentation_config[:propagation_style] == :child
tracer.in_span(span_name, attributes: attributes, kind: :consumer) do |span|
span.add_event('created_at', timestamp: msg['created_at'])
span.add_event('enqueued_at', timestamp: msg['enqueued_at'])
yield
end
else
links = []
span_context = OpenTelemetry::Trace.current_span(extracted_context).context
links << OpenTelemetry::Trace::Link.new(span_context) if instrumentation_config[:propagation_style] == :link && span_context.valid?
span = tracer.start_root_span(span_name, attributes: attributes, links: links, kind: :consumer)
OpenTelemetry::Trace.with_span(span) do
span.add_event('created_at', timestamp: msg['created_at'])
span.add_event('enqueued_at', timestamp: msg['enqueued_at'])
yield
rescue Exception => e # rubocop:disable Lint/RescueException
span.record_exception(e)
span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}")
raise e
ensure
span.finish
track_queue_latency(msg)

timed(track_process_time_callback(msg)) do
if instrumentation_config[:propagation_style] == :child
tracer.in_span(span_name, attributes: attributes, kind: :consumer) do |span|
span.add_event('created_at', timestamp: msg['created_at'])
span.add_event('enqueued_at', timestamp: msg['enqueued_at'])
yield
end
else
links = []
span_context = OpenTelemetry::Trace.current_span(extracted_context).context
links << OpenTelemetry::Trace::Link.new(span_context) if instrumentation_config[:propagation_style] == :link && span_context.valid?
span = tracer.start_root_span(span_name, attributes: attributes, links: links, kind: :consumer)
OpenTelemetry::Trace.with_span(span) do
span.add_event('created_at', timestamp: msg['created_at'])
span.add_event('enqueued_at', timestamp: msg['enqueued_at'])
yield
rescue Exception => e # rubocop:disable Lint/RescueException
span.record_exception(e)
span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}")
raise e
ensure
span.finish
end
end
end
end
end

private

def instrumentation_config
Sidekiq::Instrumentation.instance.config
def track_queue_latency(msg)
with_meter do
return unless (enqueued_at = msg['enqueued_at'])
return unless enqueued_at.is_a?(Numeric)

latency = (realtime_now - enqueued_at).abs

queue_latency_gauge&.record(latency, attributes: metrics_attributes(msg))
end
end

def track_process_time_callback(msg)
->(duration) { track_process_time(msg, duration) }
end

def tracer
Sidekiq::Instrumentation.instance.tracer
def track_process_time(msg, duration)
with_meter do
attributes = metrics_attributes(msg).merge(
{ 'messaging.operation.name' => 'process' }
)
messaging_process_duration_histogram&.record(duration, attributes: attributes)
end
end

def messaging_process_duration_histogram
instrumentation.histogram('messaging.process.duration')
end

def queue_latency_gauge
instrumentation.gauge('messaging.queue.latency')
end

# FIXME: dedupe
def metrics_attributes(msg)
{
'messaging.system' => 'sidekiq', # FIXME: metrics semconv
'messaging.destination.name' => msg['queue'] # FIXME: metrics semconv
# server.address => # FIXME: required if available
# messaging.destination.partition.id => FIXME: recommended
# server.port => # FIXME: recommended
}
end
end
end
Expand Down

0 comments on commit 0abb927

Please sign in to comment.