Skip to content

Commit

Permalink
perf: Reduce Context Allocations in ActiveJob (#1018)
Browse files Browse the repository at this point in the history
* perf: Reduce Context Allocations in ActiveJob

After seeing changes to Rack::Events that reduced the number of active contexts required for ingress spans,
I decided to apply the same changes to ActiveJob

* doc: Update default.rb

Co-authored-by: Steven Harman <steven@harmanly.com>

---------

Co-authored-by: Steven Harman <steven@harmanly.com>
  • Loading branch information
arielvalentin and stevenharman authored Jun 20, 2024
1 parent a324938 commit 989da17
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 17 deletions.
4 changes: 4 additions & 0 deletions instrumentation/active_job/Appraisals
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@
gem 'activejob', "~> #{version}"
end
end

appraise 'activejob-latest' do
gem 'activejob'
end
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ def start(name, id, payload)
# @return [Hash] with the span and generated context tokens
def start_span(name, _id, payload)
span = tracer.start_span(name, attributes: @mapper.call(payload))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
token = OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))

{ span: span, ctx_tokens: tokens }
{ span: span, ctx_token: token }
end

# Creates a span and registers it with the current context
Expand All @@ -55,20 +55,20 @@ def start_span(name, _id, payload)
def finish(_name, _id, payload)
otel = payload.delete(:__otel)
span = otel&.fetch(:span)
tokens = otel&.fetch(:ctx_tokens)
token = otel&.fetch(:ctx_token)

on_exception((payload[:error] || payload[:exception_object]), span)
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
ensure
finish_span(span, tokens)
finish_span(span, token)
end

# Finishes the provided spans and also detaches the associated contexts
#
# @param span [OpenTelemetry::Trace::Span]
# @param tokens [Array] to unregister
def finish_span(span, tokens)
# @param token [Numeric] to unregister
def finish_span(span, token)
# closes the span after all attributes have been finalized
begin
if span&.recording?
Expand All @@ -79,8 +79,7 @@ def finish_span(span, tokens)
OpenTelemetry.handle_error(exception: e)
end

# pops the context stack
tokens&.reverse_each do |token|
begin
OpenTelemetry::Context.detach(token)
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ def initialize(...)
def start_span(name, _id, payload)
job = payload.fetch(:job)
span = tracer.start_span(@span_name_formatter.call(job), kind: :producer, attributes: @mapper.call(payload))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
OpenTelemetry.propagation.inject(job.__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_tokens: tokens }
{ span: span, ctx_token: OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span)) }
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ def initialize(...)
# @param payload [Hash] containing job run information
# @return [Hash] with the span and generated context tokens
def start_span(name, _id, payload)
tokens = []
job = payload.fetch(:job)
parent_context = OpenTelemetry.propagation.extract(job.__otel_headers)

Expand All @@ -36,30 +35,27 @@ def start_span(name, _id, payload)
# TODO: Refactor into a propagation strategy
propagation_style = @config[:propagation_style]
if propagation_style == :child
tokens << OpenTelemetry::Context.attach(parent_context)
span = tracer.start_span(span_name, kind: :consumer, attributes: @mapper.call(payload))
else
span_context = OpenTelemetry::Trace.current_span(parent_context).context
links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid? && propagation_style == :link
span = tracer.start_root_span(span_name, kind: :consumer, attributes: @mapper.call(payload), links: links)
end

tokens.concat(attach_consumer_context(span))

{ span: span, ctx_tokens: tokens }
{ span: span, ctx_token: attach_consumer_context(span) }
end

# This method attaches a span to multiple contexts:
# 1. Registers the ingress span as the top level ActiveJob span.
# This is used later to enrich the ingress span in children, e.g. setting span status to error when a child event like `discard` terminates due to an error
# 2. Registers the ingress span as the "active" span, which is the default behavior of the SDK.
# @param span [OpenTelemetry::Trace::Span] the currently active span used to record the exception and set the status
# @return [Array] Context tokens that must be detached when finished
# @return [Numeric] Context token that must be detached when finished
def attach_consumer_context(span)
consumer_context = OpenTelemetry::Trace.context_with_span(span)
internal_context = OpenTelemetry::Instrumentation::ActiveJob.context_with_span(span, parent_context: consumer_context)

[consumer_context, internal_context].map { |context| OpenTelemetry::Context.attach(context) }
OpenTelemetry::Context.attach(internal_context)
end
end
end
Expand Down

0 comments on commit 989da17

Please sign in to comment.