Skip to content

Commit

Permalink
[FLINK-36624][checkpointing] Log JobID from the SourceCoordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowojski committed Nov 4, 2024
1 parent 4e2057d commit 96bb45d
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.DynamicFilteringInfo;
Expand Down Expand Up @@ -49,6 +50,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.runtime.state.heap.HeapPriorityQueue;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.MdcUtils;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.apache.flink.util.function.ThrowingRunnable;

Expand Down Expand Up @@ -103,6 +105,7 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>

private final WatermarkAlignmentParams watermarkAlignmentParams;

private final JobID jobID;
/** The name of the operator this SourceCoordinator is associated with. */
private final String operatorName;
/** The Source that is associated with this SourceCoordinator. */
Expand Down Expand Up @@ -133,6 +136,7 @@ public SourceCoordinator(
SourceCoordinatorContext<SplitT> context,
CoordinatorStore coordinatorStore) {
this(
new JobID(),
operatorName,
source,
context,
Expand All @@ -142,12 +146,14 @@ public SourceCoordinator(
}

public SourceCoordinator(
JobID jobID,
String operatorName,
Source<?, SplitT, EnumChkT> source,
SourceCoordinatorContext<SplitT> context,
CoordinatorStore coordinatorStore,
WatermarkAlignmentParams watermarkAlignmentParams,
@Nullable String coordinatorListeningID) {
this.jobID = jobID;
this.operatorName = operatorName;
this.source = source;
this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
Expand Down Expand Up @@ -296,12 +302,15 @@ public void start() throws Exception {

@Override
public void close() throws Exception {
LOG.info("Closing SourceCoordinator for source {}.", operatorName);
if (started) {
closeQuietly(enumerator);
try (MdcUtils.MdcCloseable mdcCloseable =
MdcUtils.withContext(MdcUtils.asContextData(jobID))) {
LOG.info("Closing SourceCoordinator for source {}.", operatorName);
if (started) {
closeQuietly(enumerator);
}
closeQuietly(context);
LOG.info("Source coordinator for source {} closed.", operatorName);
}
closeQuietly(context);
LOG.info("Source coordinator for source {} closed.", operatorName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
Expand All @@ -41,6 +42,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.MdcUtils;
import org.apache.flink.util.TernaryBoolean;
import org.apache.flink.util.ThrowableCatchingRunnable;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
Expand Down Expand Up @@ -120,12 +122,14 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
private volatile TernaryBoolean backlog = TernaryBoolean.UNDEFINED;

public SourceCoordinatorContext(
JobID jobID,
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
int numWorkerThreads,
OperatorCoordinator.Context operatorCoordinatorContext,
SimpleVersionedSerializer<SplitT> splitSerializer,
boolean supportsConcurrentExecutionAttempts) {
this(
jobID,
Executors.newScheduledThreadPool(1, coordinatorThreadFactory),
Executors.newScheduledThreadPool(
numWorkerThreads,
Expand All @@ -141,6 +145,7 @@ public SourceCoordinatorContext(
// Package private method for unit test.
@VisibleForTesting
SourceCoordinatorContext(
JobID jobID,
ScheduledExecutorService coordinatorExecutor,
ScheduledExecutorService workerExecutor,
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
Expand All @@ -149,7 +154,7 @@ public SourceCoordinatorContext(
SplitAssignmentTracker<SplitT> splitAssignmentTracker,
boolean supportsConcurrentExecutionAttempts) {
this.workerExecutor = workerExecutor;
this.coordinatorExecutor = coordinatorExecutor;
this.coordinatorExecutor = MdcUtils.scopeToJob(jobID, coordinatorExecutor);
this.coordinatorThreadFactory = coordinatorThreadFactory;
this.operatorCoordinatorContext = operatorCoordinatorContext;
this.splitSerializer = splitSerializer;
Expand All @@ -160,7 +165,7 @@ public SourceCoordinatorContext(

final Executor errorHandlingCoordinatorExecutor =
(runnable) ->
coordinatorExecutor.execute(
this.coordinatorExecutor.execute(
new ThrowableCatchingRunnable(
this::handleUncaughtExceptionFromAsyncCall, runnable));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,14 @@ public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) {
SimpleVersionedSerializer<SplitT> splitSerializer = source.getSplitSerializer();
SourceCoordinatorContext<SplitT> sourceCoordinatorContext =
new SourceCoordinatorContext<>(
context.getJobID(),
coordinatorThreadFactory,
numWorkerThreads,
context,
splitSerializer,
context.isConcurrentExecutionAttemptsSupported());
return new SourceCoordinator<>(
context.getJobID(),
operatorName,
source,
sourceCoordinatorContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.source.coordinator;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.Source;
Expand Down Expand Up @@ -166,6 +167,7 @@ void testAnnounceCombinedWatermarkWithoutStart() throws Exception {
AtomicInteger counter1 = new AtomicInteger(0);
sourceCoordinator =
new SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>>(
new JobID(),
OPERATOR_NAME,
mockSource,
getNewSourceCoordinatorContext(),
Expand All @@ -184,6 +186,7 @@ void announceCombinedWatermark() {
CountDownLatch latch = new CountDownLatch(2);
sourceCoordinator =
new SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>>(
new JobID(),
OPERATOR_NAME,
mockSource,
getNewSourceCoordinatorContext(),
Expand Down Expand Up @@ -218,6 +221,7 @@ void testSendWatermarkAlignmentEventFailed() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
sourceCoordinator =
new SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>>(
new JobID(),
OPERATOR_NAME,
mockSource,
getNewSourceCoordinatorContext(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.source.coordinator;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
Expand Down Expand Up @@ -160,6 +161,7 @@ void testExceptionInRunnableFailsTheJob() throws InterruptedException, Execution
new ManuallyTriggeredScheduledExecutorService();
SourceCoordinatorContext<MockSourceSplit> testingContext =
new SourceCoordinatorContext<>(
new JobID(),
coordinatorExecutorWithExceptionHandler,
manualWorkerExecutor,
new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
Expand Down Expand Up @@ -195,6 +197,7 @@ void testCallableInterruptedDuringShutdownDoNotFailJob() throws InterruptedExcep

SourceCoordinatorContext<MockSourceSplit> testingContext =
new SourceCoordinatorContext<>(
new JobID(),
manualCoordinatorExecutor,
manualWorkerExecutor,
new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.source.coordinator;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.DynamicFilteringInfo;
Expand Down Expand Up @@ -290,6 +291,7 @@ public void start() {
};
final SourceCoordinator<?, ?> coordinator =
new SourceCoordinator<>(
new JobID(),
OPERATOR_NAME,
new EnumeratorCreatingSource<>(() -> splitEnumerator),
context,
Expand All @@ -312,6 +314,7 @@ void testFailJobWhenExceptionThrownFromEnumeratorCreation() throws Exception {

final SourceCoordinator<?, ?> coordinator =
new SourceCoordinator<>(
new JobID(),
OPERATOR_NAME,
new EnumeratorCreatingSource<>(
() -> {
Expand Down Expand Up @@ -342,6 +345,7 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
};
final SourceCoordinator<?, ?> coordinator =
new SourceCoordinator<>(
new JobID(),
OPERATOR_NAME,
new EnumeratorCreatingSource<>(() -> splitEnumerator),
context,
Expand Down Expand Up @@ -562,6 +566,7 @@ public void testListeningEventsFromOtherCoordinators() throws Exception {
CoordinatorStore store = new CoordinatorStoreImpl();
final SourceCoordinator<?, ?> coordinator =
new SourceCoordinator<>(
new JobID(),
OPERATOR_NAME,
createMockSource(),
context,
Expand All @@ -583,6 +588,7 @@ class TestDynamicFilteringEvent implements SourceEvent, DynamicFilteringInfo {}
store.putIfAbsent(listeningID, new SourceEventWrapper(new TestDynamicFilteringEvent()));
final SourceCoordinator<?, ?> coordinator =
new SourceCoordinator<>(
new JobID(),
OPERATOR_NAME,
createMockSource(),
context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.source.coordinator;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceSplit;
Expand Down Expand Up @@ -181,6 +182,7 @@ protected SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> getNewSourceC
createMockSource();

return new SourceCoordinator<>(
new JobID(),
OPERATOR_NAME,
mockSource,
getNewSourceCoordinatorContext(),
Expand All @@ -201,6 +203,7 @@ protected SourceCoordinatorContext<MockSourceSplit> getNewSourceCoordinatorConte
coordinatorThreadName, operatorCoordinatorContext);
SourceCoordinatorContext<MockSourceSplit> coordinatorContext =
new SourceCoordinatorContext<>(
new JobID(),
Executors.newScheduledThreadPool(1, coordinatorThreadFactory),
Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory(coordinatorThreadName + "-worker")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
Expand All @@ -30,6 +33,7 @@
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler;
import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
Expand Down Expand Up @@ -70,6 +74,10 @@ class JobIDLoggingITCase {
public final LoggerAuditingExtension checkpointCoordinatorLogging =
new LoggerAuditingExtension(CheckpointCoordinator.class, DEBUG);

@RegisterExtension
public final LoggerAuditingExtension sourceCoordinatorLogging =
new LoggerAuditingExtension(SourceCoordinator.class, DEBUG);

@RegisterExtension
public final LoggerAuditingExtension streamTaskLogging =
new LoggerAuditingExtension(StreamTask.class, DEBUG);
Expand Down Expand Up @@ -136,6 +144,15 @@ void testJobIDLogging(@InjectClusterClient ClusterClient<?> clusterClient) throw
"Completed checkpoint .*",
"Checkpoint state: .*"));

assertJobIDPresent(
jobID,
sourceCoordinatorLogging,
asList(
"Starting split enumerator.*",
"Distributing maxAllowedWatermark.*",
"Source .* registering reader for parallel task.*",
"Closing SourceCoordinator for source .*"));

assertJobIDPresent(
jobID,
streamTaskLogging,
Expand Down Expand Up @@ -272,7 +289,15 @@ private static boolean matchesAny(List<Pattern> patternStream, String message) {

private static JobID runJob(ClusterClient<?> clusterClient) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).addSink(new DiscardingSink<>());

env.fromSource(
new NumberSequenceSource(0, Long.MAX_VALUE),
WatermarkStrategy.forGenerator(ctx -> new AscendingTimestampsWatermarks())
.withWatermarkAlignment(
"group-1", Duration.ofMillis(1000), Duration.ofMillis(1))
.withTimestampAssigner((r, t) -> (long) r),
"Source-42441337")
.addSink(new DiscardingSink<>());
JobID jobId = clusterClient.submitJob(env.getStreamGraph().getJobGraph()).get();
Deadline deadline = Deadline.fromNow(Duration.ofMinutes(5));
while (deadline.hasTimeLeft()
Expand Down

0 comments on commit 96bb45d

Please sign in to comment.