Skip to content

Commit

Permalink
Merge branch 'main' into fix/synthetic_source_concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
javanna committed Jan 2, 2024
2 parents c143d0d + d57b034 commit 6635b8d
Show file tree
Hide file tree
Showing 230 changed files with 4,605 additions and 1,694 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/101640.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 101640
summary: Support cross clusters query in ESQL
area: ES|QL
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/103670.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 103670
summary: "ESQL: Improve local folding of aggregates"
area: ES|QL
type: bug
issues: []
6 changes: 6 additions & 0 deletions docs/changelog/103673.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 103673
summary: "ESQL: Infer not null for aggregated fields"
area: ES|QL
type: enhancement
issues:
- 102787
5 changes: 5 additions & 0 deletions docs/changelog/103710.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 103710
summary: List hidden shard stores by default
area: Store
type: enhancement
issues: []
6 changes: 6 additions & 0 deletions docs/changelog/103720.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 103720
summary: Add "step":"ERROR" to ILM explain response for missing policy
area: ILM+SLM
type: enhancement
issues:
- 99030
5 changes: 5 additions & 0 deletions docs/changelog/103727.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 103727
summary: "ESQL: Track the rest of `DocVector`"
area: ES|QL
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/103758.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 103758
summary: Fix the transport version of `PlanStreamOutput`
area: ES|QL
type: bug
issues: []
2 changes: 1 addition & 1 deletion docs/reference/esql/esql-async-query-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ complete results in the `wait_for_completion_timeout` parameter.

[source,console]
----
GET /_query/async/get/FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=?wait_for_completion_timeout=30s
GET /_query/async/FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=?wait_for_completion_timeout=30s
----
// TEST[skip: no access to search ID - may return response values]

Expand Down
1 change: 1 addition & 0 deletions docs/reference/esql/functions/signature/to_degrees.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 7 additions & 1 deletion docs/reference/esql/functions/types/add.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@
|===
lhs | rhs | result
date_period | date_period | date_period
date_period | datetime | datetime
datetime | date_period | datetime
datetime | time_duration | datetime
double | double | double
double | integer | double
double | long | double
integer | double | double
integer | integer | integer
integer | long | long
long | double | double
long | integer | long
long | long | long
time_duration | time_duration | time_duration
unsigned_long | unsigned_long | unsigned_long
|===
8 changes: 8 additions & 0 deletions docs/reference/esql/functions/types/to_degrees.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[%header.monospaced.styled,format=dsv,separator=|]
|===
v | result
double | double
integer | double
long | double
unsigned_long | double
|===
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.telemetry.tracing.SpanId;
import org.elasticsearch.telemetry.tracing.TraceContext;
import org.elasticsearch.telemetry.tracing.Traceable;

import java.security.AccessController;
import java.security.PrivilegedAction;
Expand All @@ -61,7 +61,7 @@ public class APMTracer extends AbstractLifecycleComponent implements org.elastic
private static final Logger logger = LogManager.getLogger(APMTracer.class);

/** Holds in-flight span information. */
private final Map<SpanId, Context> spans = ConcurrentCollections.newConcurrentMap();
private final Map<String, Context> spans = ConcurrentCollections.newConcurrentMap();

private volatile boolean enabled;
private volatile APMServices services;
Expand Down Expand Up @@ -160,8 +160,9 @@ private void destroyApmServices() {
}

@Override
public void startTrace(ThreadContext threadContext, SpanId spanId, String spanName, @Nullable Map<String, Object> attributes) {
assert threadContext != null;
public void startTrace(TraceContext traceContext, Traceable traceable, String spanName, @Nullable Map<String, Object> attributes) {
assert traceContext != null;
String spanId = traceable.getSpanId();
assert spanId != null;
assert spanName != null;

Expand All @@ -182,21 +183,21 @@ public void startTrace(ThreadContext threadContext, SpanId spanId, String spanNa

// A span can have a parent span, which here is modelled though a parent span context.
// Setting this is important for seeing a complete trace in the APM UI.
final Context parentContext = getParentContext(threadContext);
final Context parentContext = getParentContext(traceContext);
if (parentContext != null) {
spanBuilder.setParent(parentContext);
}

setSpanAttributes(threadContext, attributes, spanBuilder);
setSpanAttributes(traceContext, attributes, spanBuilder);

Instant startTime = threadContext.getTransient(Task.TRACE_START_TIME);
Instant startTime = traceContext.getTransient(Task.TRACE_START_TIME);
if (startTime != null) {
spanBuilder.setStartTimestamp(startTime);
}
final Span span = spanBuilder.startSpan();
final Context contextForNewSpan = Context.current().with(span);

updateThreadContext(threadContext, services, contextForNewSpan);
updateThreadContext(traceContext, services, contextForNewSpan);

return contextForNewSpan;
}));
Expand All @@ -221,29 +222,29 @@ public void startTrace(String name, Map<String, Object> attributes) {
spanBuilder.startSpan();
}

private static void updateThreadContext(ThreadContext threadContext, APMServices services, Context context) {
private static void updateThreadContext(TraceContext traceContext, APMServices services, Context context) {
// The new span context can be used as the parent context directly within the same Java process...
threadContext.putTransient(Task.APM_TRACE_CONTEXT, context);
traceContext.putTransient(Task.APM_TRACE_CONTEXT, context);

// ...whereas for tasks sent to other ES nodes, we need to put trace HTTP headers into the threadContext so
// ...whereas for tasks sent to other ES nodes, we need to put trace HTTP headers into the traceContext so
// that they can be propagated.
services.openTelemetry.getPropagators().getTextMapPropagator().inject(context, threadContext, (tc, key, value) -> {
services.openTelemetry.getPropagators().getTextMapPropagator().inject(context, traceContext, (tc, key, value) -> {
if (isSupportedContextKey(key)) {
tc.putHeader(key, value);
}
});
}

private Context getParentContext(ThreadContext threadContext) {
private Context getParentContext(TraceContext traceContext) {
// https://github.com/open-telemetry/opentelemetry-java/discussions/2884#discussioncomment-381870
// If you just want to propagate across threads within the same process, you don't need context propagators (extract/inject).
// You can just pass the Context object directly to another thread (it is immutable and thus thread-safe).

// Attempt to fetch a local parent context first, otherwise look for a remote parent
Context parentContext = threadContext.getTransient("parent_" + Task.APM_TRACE_CONTEXT);
Context parentContext = traceContext.getTransient("parent_" + Task.APM_TRACE_CONTEXT);
if (parentContext == null) {
final String traceParentHeader = threadContext.getTransient("parent_" + Task.TRACE_PARENT_HTTP_HEADER);
final String traceStateHeader = threadContext.getTransient("parent_" + Task.TRACE_STATE);
final String traceParentHeader = traceContext.getTransient("parent_" + Task.TRACE_PARENT_HTTP_HEADER);
final String traceStateHeader = traceContext.getTransient("parent_" + Task.TRACE_STATE);

if (traceParentHeader != null) {
final Map<String, String> traceContextMap = Maps.newMapWithExpectedSize(2);
Expand Down Expand Up @@ -276,12 +277,12 @@ private Context getParentContext(ThreadContext threadContext) {
* However, if a scope is active, then the APM agent can capture additional information, so this method
* exists to make it possible to use scopes in the few situation where it makes sense.
*
* @param spanId the ID of a currently-open span for which to open a scope.
* @param traceable provides the ID of a currently-open span for which to open a scope.
* @return a method to close the scope when you are finished with it.
*/
@Override
public Releasable withScope(SpanId spanId) {
final Context context = spans.get(spanId);
public Releasable withScope(Traceable traceable) {
final Context context = spans.get(traceable.getSpanId());
if (context != null) {
var scope = AccessController.doPrivileged((PrivilegedAction<Scope>) context::makeCurrent);
return scope::close;
Expand Down Expand Up @@ -327,60 +328,60 @@ private void setSpanAttributes(@Nullable Map<String, Object> spanAttributes, Spa
spanBuilder.setAttribute(org.elasticsearch.telemetry.tracing.Tracer.AttributeKeys.CLUSTER_NAME, clusterName);
}

private void setSpanAttributes(ThreadContext threadContext, @Nullable Map<String, Object> spanAttributes, SpanBuilder spanBuilder) {
private void setSpanAttributes(TraceContext traceContext, @Nullable Map<String, Object> spanAttributes, SpanBuilder spanBuilder) {
setSpanAttributes(spanAttributes, spanBuilder);

final String xOpaqueId = threadContext.getHeader(Task.X_OPAQUE_ID_HTTP_HEADER);
final String xOpaqueId = traceContext.getHeader(Task.X_OPAQUE_ID_HTTP_HEADER);
if (xOpaqueId != null) {
spanBuilder.setAttribute("es.x-opaque-id", xOpaqueId);
}
}

@Override
public void addError(SpanId spanId, Throwable throwable) {
final var span = Span.fromContextOrNull(spans.get(spanId));
public void addError(Traceable traceable, Throwable throwable) {
final var span = Span.fromContextOrNull(spans.get(traceable.getSpanId()));
if (span != null) {
span.recordException(throwable);
}
}

@Override
public void setAttribute(SpanId spanId, String key, boolean value) {
final var span = Span.fromContextOrNull(spans.get(spanId));
public void setAttribute(Traceable traceable, String key, boolean value) {
final var span = Span.fromContextOrNull(spans.get(traceable.getSpanId()));
if (span != null) {
span.setAttribute(key, value);
}
}

@Override
public void setAttribute(SpanId spanId, String key, double value) {
final var span = Span.fromContextOrNull(spans.get(spanId));
public void setAttribute(Traceable traceable, String key, double value) {
final var span = Span.fromContextOrNull(spans.get(traceable.getSpanId()));
if (span != null) {
span.setAttribute(key, value);
}
}

@Override
public void setAttribute(SpanId spanId, String key, long value) {
final var span = Span.fromContextOrNull(spans.get(spanId));
public void setAttribute(Traceable traceable, String key, long value) {
final var span = Span.fromContextOrNull(spans.get(traceable.getSpanId()));
if (span != null) {
span.setAttribute(key, value);
}
}

@Override
public void setAttribute(SpanId spanId, String key, String value) {
final var span = Span.fromContextOrNull(spans.get(spanId));
public void setAttribute(Traceable traceable, String key, String value) {
final var span = Span.fromContextOrNull(spans.get(traceable.getSpanId()));
if (span != null) {
span.setAttribute(key, value);
}
}

@Override
public void stopTrace(SpanId spanId) {
final var span = Span.fromContextOrNull(spans.remove(spanId));
public void stopTrace(Traceable traceable) {
final var span = Span.fromContextOrNull(spans.remove(traceable.getSpanId()));
if (span != null) {
logger.trace("Finishing trace [{}]", spanId);
logger.trace("Finishing trace [{}]", traceable);
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
span.end();
return null;
Expand All @@ -400,8 +401,8 @@ public void stopTrace() {
}

@Override
public void addEvent(SpanId spanId, String eventName) {
final var span = Span.fromContextOrNull(spans.get(spanId));
public void addEvent(Traceable traceable, String eventName) {
final var span = Span.fromContextOrNull(spans.get(traceable.getSpanId()));
if (span != null) {
span.addEvent(eventName);
}
Expand All @@ -425,7 +426,7 @@ private static boolean isSupportedContextKey(String key) {
}

// VisibleForTesting
Map<SpanId, Context> getSpans() {
Map<String, Context> getSpans() {
return spans;
}

Expand Down
Loading

0 comments on commit 6635b8d

Please sign in to comment.