From 82baf8886d27bb5a32245826dfcf180efc52cc17 Mon Sep 17 00:00:00 2001 From: Shuyi Chen Date: Fri, 17 Jan 2025 15:56:54 -0800 Subject: [PATCH] [FLINK-37084][python] Fix TimerRegistration concurrency issue in PyFlink (#26004) --- .../flink/python/PythonFunctionRunner.java | 2 + .../AbstractPythonFunctionOperator.java | 8 ++++ ...bstractExternalPythonFunctionOperator.java | 5 +++ .../ExternalPythonCoProcessOperator.java | 1 + .../ExternalPythonKeyedCoProcessOperator.java | 1 + .../ExternalPythonKeyedProcessOperator.java | 1 + .../ExternalPythonProcessOperator.java | 1 + .../timer/TimerRegistrationAction.java | 42 +++++++++++++++++++ .../BeamDataStreamPythonFunctionRunner.java | 3 ++ .../python/beam/BeamPythonFunctionRunner.java | 29 ++++++++++++- .../AbstractStatelessFunctionOperator.java | 1 + ...AbstractPythonStreamAggregateOperator.java | 1 + .../beam/BeamTablePythonFunctionRunner.java | 7 ++++ ...honStreamGroupWindowAggregateOperator.java | 1 + ...ythonStreamGroupAggregateOperatorTest.java | 1 + ...StreamGroupTableAggregateOperatorTest.java | 1 + ...honGroupAggregateFunctionOperatorTest.java | 1 + ...upWindowAggregateFunctionOperatorTest.java | 1 + ...erWindowAggregateFunctionOperatorTest.java | 1 + ...upWindowAggregateFunctionOperatorTest.java | 1 + ...ythonProcTimeBoundedRangeOperatorTest.java | 1 + ...PythonProcTimeBoundedRowsOperatorTest.java | 1 + ...PythonRowTimeBoundedRangeOperatorTest.java | 1 + ...wPythonRowTimeBoundedRowsOperatorTest.java | 1 + .../PythonScalarFunctionOperatorTest.java | 1 + ...ArrowPythonScalarFunctionOperatorTest.java | 1 + .../PythonTableFunctionOperatorTest.java | 1 + ...sThroughPythonAggregateFunctionRunner.java | 3 ++ ...PassThroughPythonScalarFunctionRunner.java | 3 ++ .../PassThroughPythonTableFunctionRunner.java | 3 ++ ...ghStreamAggregatePythonFunctionRunner.java | 3 ++ ...upWindowAggregatePythonFunctionRunner.java | 3 ++ ...eamTableAggregatePythonFunctionRunner.java | 3 ++ 33 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java index 6ef6a70b59ed3..c637c0e659c13 100644 --- a/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java @@ -44,6 +44,8 @@ public interface PythonFunctionRunner extends AutoCloseable { /** Send the triggered timer to the Python function. */ void processTimer(byte[] timerData) throws Exception; + void drainUnregisteredTimers(); + /** * Retrieves the Python function result. * diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java index 1a6e55f62f346..cff3f2f6b80d1 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java @@ -268,6 +268,8 @@ public Configuration getConfiguration() { protected abstract PythonEnvironmentManager createPythonEnvironmentManager(); + protected void drainUnregisteredTimers() {} + /** * Advances the watermark of all managed timer services, potentially firing event time timers. * It also ensures that the fired timers are processed in the Python user-defined functions. @@ -275,10 +277,16 @@ public Configuration getConfiguration() { private void advanceWatermark(Watermark watermark) throws Exception { if (getTimeServiceManager().isPresent()) { InternalTimeServiceManager timeServiceManager = getTimeServiceManager().get(); + // make sure the registered timer are processed before advancing the watermark to + // ensure the timers could be triggered + drainUnregisteredTimers(); timeServiceManager.advanceWatermark(watermark); while (!isBundleFinished()) { invokeFinishBundle(); + // make sure the registered timer are processed before advancing the watermark to + // ensure the timers could be triggered + drainUnregisteredTimers(); timeServiceManager.advanceWatermark(watermark); } } diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java index 90991b57c4d00..da50b5335f22c 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/AbstractExternalPythonFunctionOperator.java @@ -136,6 +136,11 @@ protected ProcessPythonEnvironmentManager createPythonEnvironmentManager() { } } + @Override + protected void drainUnregisteredTimers() { + pythonFunctionRunner.drainUnregisteredTimers(); + } + protected void emitResults() throws Exception { Tuple3 resultTuple; while ((resultTuple = pythonFunctionRunner.pollResult()) != null && resultTuple.f2 != 0) { diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonCoProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonCoProcessOperator.java index 05b2166cf62aa..71193ceed7c38 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonCoProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonCoProcessOperator.java @@ -72,6 +72,7 @@ public void open() throws Exception { @Override public PythonFunctionRunner createPythonFunctionRunner() throws Exception { return new BeamDataStreamPythonFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), createPythonEnvironmentManager(), STATELESS_FUNCTION_URN, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedCoProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedCoProcessOperator.java index 6ce86e5a40855..dc13aafae19ae 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedCoProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedCoProcessOperator.java @@ -113,6 +113,7 @@ public void open() throws Exception { @Override public PythonFunctionRunner createPythonFunctionRunner() throws Exception { return new BeamDataStreamPythonFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), createPythonEnvironmentManager(), STATEFUL_FUNCTION_URN, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedProcessOperator.java index 2f2a785334ac7..e92301b6bf3d4 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonKeyedProcessOperator.java @@ -139,6 +139,7 @@ public void onProcessingTime(InternalTimer timer) throws Exception @Override public PythonFunctionRunner createPythonFunctionRunner() throws Exception { return new BeamDataStreamPythonFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), createPythonEnvironmentManager(), STATEFUL_FUNCTION_URN, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonProcessOperator.java index 863edf7357bdf..07dc668a35235 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonProcessOperator.java @@ -68,6 +68,7 @@ public void open() throws Exception { @Override public PythonFunctionRunner createPythonFunctionRunner() throws Exception { return new BeamDataStreamPythonFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), createPythonEnvironmentManager(), STATELESS_FUNCTION_URN, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java new file mode 100644 index 0000000000000..8a11779b61fe0 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.python.process.timer; + +public class TimerRegistrationAction { + + private final TimerRegistration timerRegistration; + + private final byte[] serializedTimerData; + + private boolean isRegistered; + + public TimerRegistrationAction( + TimerRegistration timerRegistration, byte[] serializedTimerData) { + this.timerRegistration = timerRegistration; + this.serializedTimerData = serializedTimerData; + this.isRegistered = false; + } + + public void run() { + if (!isRegistered) { + timerRegistration.setTimer(serializedTimerData); + isRegistered = true; + } + } +} diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java index 71bd715c47085..823e91ccfc58a 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java @@ -24,6 +24,7 @@ import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager; import org.apache.flink.python.metric.process.FlinkMetricContainer; import org.apache.flink.python.util.ProtoUtils; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.OperatorStateBackend; @@ -69,6 +70,7 @@ public class BeamDataStreamPythonFunctionRunner extends BeamPythonFunctionRunner private final List userDefinedDataStreamFunctions; public BeamDataStreamPythonFunctionRunner( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, String headOperatorFunctionUrn, @@ -86,6 +88,7 @@ public BeamDataStreamPythonFunctionRunner( @Nullable FlinkFnApi.CoderInfoDescriptor timerCoderDescriptor, Map sideOutputCoderDescriptors) { super( + environment, taskName, environmentManager, flinkMetricContainer, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java index 0029db937d4e9..4e52378f7a698 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java @@ -31,11 +31,13 @@ import org.apache.flink.python.env.process.ProcessPythonEnvironment; import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager; import org.apache.flink.python.metric.process.FlinkMetricContainer; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.memory.OpaqueMemoryResource; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.streaming.api.operators.python.process.timer.TimerRegistration; +import org.apache.flink.streaming.api.operators.python.process.timer.TimerRegistrationAction; import org.apache.flink.streaming.api.runners.python.beam.state.BeamStateRequestHandler; import org.apache.flink.util.Preconditions; import org.apache.flink.util.ShutdownHookUtil; @@ -85,6 +87,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -190,7 +193,12 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner { private transient Thread shutdownHook; + private transient Environment environment; + + private transient List unregisteredTimers; + public BeamPythonFunctionRunner( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, @Nullable FlinkMetricContainer flinkMetricContainer, @@ -204,6 +212,7 @@ public BeamPythonFunctionRunner( FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor, FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor, Map sideOutputCoderDescriptors) { + this.environment = environment; this.taskName = Preconditions.checkNotNull(taskName); this.environmentManager = Preconditions.checkNotNull(environmentManager); this.flinkMetricContainer = flinkMetricContainer; @@ -301,6 +310,8 @@ public void open(ReadableConfig config) throws Exception { shutdownHook = ShutdownHookUtil.addShutdownHook( this, BeamPythonFunctionRunner.class.getSimpleName(), LOG); + + unregisteredTimers = new LinkedList<>(); } @Override @@ -339,6 +350,14 @@ public void process(byte[] data) throws Exception { mainInputReceiver.accept(WindowedValue.valueInGlobalWindow(data)); } + @Override + public void drainUnregisteredTimers() { + for (TimerRegistrationAction timerRegistrationAction : unregisteredTimers) { + timerRegistrationAction.run(); + } + unregisteredTimers.clear(); + } + @Override public void processTimer(byte[] timerData) throws Exception { if (timerInputReceiver == null) { @@ -681,7 +700,15 @@ public void onCompleted(BeamFnApi.ProcessBundleResponse response) { private TimerReceiverFactory createTimerReceiverFactory() { BiConsumer, TimerInternals.TimerData> timerDataConsumer = - (timer, timerData) -> timerRegistration.setTimer((byte[]) timer.getUserKey()); + (timer, timerData) -> { + TimerRegistrationAction timerRegistrationAction = + new TimerRegistrationAction( + timerRegistration, (byte[]) timer.getUserKey()); + unregisteredTimers.add(timerRegistrationAction); + environment + .getMainMailboxExecutor() + .execute(timerRegistrationAction::run, "PythonTimerRegistration"); + }; return new TimerReceiverFactory(stageBundleFactory, timerDataConsumer, null); } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java index 4b5c95701a908..9b53ca7550be3 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java @@ -105,6 +105,7 @@ public void processElement(StreamRecord element) throws Exception { @Override public PythonFunctionRunner createPythonFunctionRunner() throws IOException { return BeamTablePythonFunctionRunner.stateless( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), createPythonEnvironmentManager(), getFunctionUrn(), diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java index 5ee074cbfaf24..64381c06582ce 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java @@ -172,6 +172,7 @@ public void processElement(StreamRecord element) throws Exception { @Override public PythonFunctionRunner createPythonFunctionRunner() throws Exception { return BeamTablePythonFunctionRunner.stateful( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), createPythonEnvironmentManager(), getFunctionUrn(), diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java index 60158397e6286..50a2e16111cfc 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java @@ -23,6 +23,7 @@ import org.apache.flink.fnexecution.v1.FlinkFnApi; import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager; import org.apache.flink.python.metric.process.FlinkMetricContainer; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner; @@ -52,6 +53,7 @@ public class BeamTablePythonFunctionRunner extends BeamPythonFunctionRunner { private final GeneratedMessageV3 userDefinedFunctionProto; public BeamTablePythonFunctionRunner( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, String functionUrn, @@ -65,6 +67,7 @@ public BeamTablePythonFunctionRunner( FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor, FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor) { super( + environment, taskName, environmentManager, flinkMetricContainer, @@ -117,6 +120,7 @@ public void processTimer(byte[] timerData) throws Exception { } public static BeamTablePythonFunctionRunner stateless( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, String functionUrn, @@ -127,6 +131,7 @@ public static BeamTablePythonFunctionRunner stateless( FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor, FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor) { return new BeamTablePythonFunctionRunner( + environment, taskName, environmentManager, functionUrn, @@ -142,6 +147,7 @@ public static BeamTablePythonFunctionRunner stateless( } public static BeamTablePythonFunctionRunner stateful( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, String functionUrn, @@ -155,6 +161,7 @@ public static BeamTablePythonFunctionRunner stateful( FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor, FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor) { return new BeamTablePythonFunctionRunner( + environment, taskName, environmentManager, functionUrn, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java index 5eee5f6958113..0f0df0691a351 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.java @@ -196,6 +196,7 @@ public void open() throws Exception { @Override public PythonFunctionRunner createPythonFunctionRunner() throws Exception { return new PassThroughStreamGroupWindowAggregatePythonFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), userDefinedFunctionInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java index 8924e4e2a9251..f3a8c51f89424 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperatorTest.java @@ -254,6 +254,7 @@ private static class PassThroughPythonStreamGroupAggregateOperator @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughStreamAggregatePythonFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), userDefinedFunctionInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java index b5e688fedb1fe..198f5b23effb4 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupTableAggregateOperatorTest.java @@ -268,6 +268,7 @@ private static class PassThroughPythonStreamGroupTableAggregateOperator @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughStreamTableAggregatePythonFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), userDefinedFunctionInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java index 8ec9f5f750f2d..a1da0c17785c0 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java @@ -236,6 +236,7 @@ private static class PassThroughBatchArrowPythonGroupAggregateFunctionOperator @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughPythonAggregateFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java index 571aee90e119c..3f8ad32a98f19 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java @@ -405,6 +405,7 @@ private static class PassThroughBatchArrowPythonGroupWindowAggregateFunctionOper @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughPythonAggregateFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java index 1c3830164992c..fc0b105598264 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java @@ -306,6 +306,7 @@ private static class PassThroughBatchArrowPythonOverWindowAggregateFunctionOpera @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughPythonAggregateFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java index b9da904e4b8fa..1e85d81e4443b 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java @@ -433,6 +433,7 @@ private static class PassThroughStreamArrowPythonGroupWindowAggregateFunctionOpe @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughPythonAggregateFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java index a3f6ac85f77e2..079848d3135a1 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java @@ -162,6 +162,7 @@ private static class PassThroughStreamArrowPythonProcTimeBoundedRangeOperator @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughPythonAggregateFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java index 0fa82b65ee872..c468e1bd471ab 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRowsOperatorTest.java @@ -163,6 +163,7 @@ private static class PassThroughStreamArrowPythonProcTimeBoundedRowsOperator @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughPythonAggregateFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java index ccf8bea34a631..52d2f77f77a46 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java @@ -281,6 +281,7 @@ private static class PassThroughStreamArrowPythonRowTimeBoundedRangeOperator @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughPythonAggregateFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java index e644596162b69..84c421ee4ec95 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java @@ -247,6 +247,7 @@ private static class PassThroughStreamArrowPythonRowTimeBoundedRowsOperator @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughPythonAggregateFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java index 5e22a51d31cf4..2bb218b208b53 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java @@ -148,6 +148,7 @@ private static class PassThroughPythonScalarFunctionOperator @Override public PythonFunctionRunner createPythonFunctionRunner() throws IOException { return new PassThroughPythonScalarFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java index 0f926a342f97f..5aa7dd76e65aa 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java @@ -147,6 +147,7 @@ private static class PassThroughRowDataArrowPythonScalarFunctionOperator @Override public PythonFunctionRunner createPythonFunctionRunner() throws IOException { return new PassThroughPythonScalarFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java index a9b30714dd650..6f26bd82a3ff8 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java @@ -124,6 +124,7 @@ private static class PassThroughPythonTableFunctionOperator @Override public PythonFunctionRunner createPythonFunctionRunner() { return new PassThroughPythonTableFunctionRunner( + getContainingTask().getEnvironment(), getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), udfInputType, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java index 6eb8202a31ec5..af8c70a64e094 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java @@ -27,6 +27,7 @@ import org.apache.flink.fnexecution.v1.FlinkFnApi; import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager; import org.apache.flink.python.metric.process.FlinkMetricContainer; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer; import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner; @@ -68,6 +69,7 @@ public class PassThroughPythonAggregateFunctionRunner extends BeamTablePythonFun private transient ByteArrayOutputStreamWithPos baos; public PassThroughPythonAggregateFunctionRunner( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, RowType inputType, @@ -77,6 +79,7 @@ public PassThroughPythonAggregateFunctionRunner( FlinkMetricContainer flinkMetricContainer, boolean isBatchOverWindow) { super( + environment, taskName, environmentManager, functionUrn, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java index e84ee4d1d656b..721fb9f2e4cb1 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java @@ -22,6 +22,7 @@ import org.apache.flink.fnexecution.v1.FlinkFnApi; import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager; import org.apache.flink.python.metric.process.FlinkMetricContainer; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner; import org.apache.flink.table.types.logical.RowType; @@ -44,6 +45,7 @@ public class PassThroughPythonScalarFunctionRunner extends BeamTablePythonFuncti private final List buffer; public PassThroughPythonScalarFunctionRunner( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, RowType inputType, @@ -52,6 +54,7 @@ public PassThroughPythonScalarFunctionRunner( FlinkFnApi.UserDefinedFunctions userDefinedFunctions, FlinkMetricContainer flinkMetricContainer) { super( + environment, taskName, environmentManager, functionUrn, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java index b94e765e41355..0c8f41a64231f 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java @@ -22,6 +22,7 @@ import org.apache.flink.fnexecution.v1.FlinkFnApi; import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager; import org.apache.flink.python.metric.process.FlinkMetricContainer; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner; import org.apache.flink.table.types.logical.RowType; @@ -46,6 +47,7 @@ public class PassThroughPythonTableFunctionRunner extends BeamTablePythonFunctio private final List buffer; public PassThroughPythonTableFunctionRunner( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, RowType inputType, @@ -54,6 +56,7 @@ public PassThroughPythonTableFunctionRunner( FlinkFnApi.UserDefinedFunctions userDefinedFunctions, FlinkMetricContainer flinkMetricContainer) { super( + environment, taskName, environmentManager, functionUrn, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java index 7d970e2a5293f..6455cc52ef4c6 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java @@ -23,6 +23,7 @@ import org.apache.flink.fnexecution.v1.FlinkFnApi; import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager; import org.apache.flink.python.metric.process.FlinkMetricContainer; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner; import org.apache.flink.table.types.logical.RowType; @@ -50,6 +51,7 @@ public class PassThroughStreamAggregatePythonFunctionRunner extends BeamTablePyt private final Function processFunction; public PassThroughStreamAggregatePythonFunctionRunner( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, RowType inputType, @@ -61,6 +63,7 @@ public PassThroughStreamAggregatePythonFunctionRunner( TypeSerializer keySerializer, Function processFunction) { super( + environment, taskName, environmentManager, functionUrn, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java index 48c56cd434d22..b41676141d7e3 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java @@ -22,6 +22,7 @@ import org.apache.flink.fnexecution.v1.FlinkFnApi; import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager; import org.apache.flink.python.metric.process.FlinkMetricContainer; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.table.runtime.operators.python.aggregate.PassThroughPythonStreamGroupWindowAggregateOperator; import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner; @@ -42,6 +43,7 @@ public class PassThroughStreamGroupWindowAggregatePythonFunctionRunner private final PassThroughPythonStreamGroupWindowAggregateOperator operator; public PassThroughStreamGroupWindowAggregatePythonFunctionRunner( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, RowType inputType, @@ -53,6 +55,7 @@ public PassThroughStreamGroupWindowAggregatePythonFunctionRunner( TypeSerializer keySerializer, PassThroughPythonStreamGroupWindowAggregateOperator operator) { super( + environment, taskName, environmentManager, functionUrn, diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java index d962463d6803d..d6e2986cd3d58 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java @@ -23,6 +23,7 @@ import org.apache.flink.fnexecution.v1.FlinkFnApi; import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager; import org.apache.flink.python.metric.process.FlinkMetricContainer; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner; import org.apache.flink.table.types.logical.RowType; @@ -52,6 +53,7 @@ public class PassThroughStreamTableAggregatePythonFunctionRunner private final Function processFunction; public PassThroughStreamTableAggregatePythonFunctionRunner( + Environment environment, String taskName, ProcessPythonEnvironmentManager environmentManager, RowType inputType, @@ -63,6 +65,7 @@ public PassThroughStreamTableAggregatePythonFunctionRunner( TypeSerializer keySerializer, Function processFunction) { super( + environment, taskName, environmentManager, functionUrn,