From f65041b57875f2dcfeb3a39ab00498f0e8442641 Mon Sep 17 00:00:00 2001 From: He Wang Date: Wed, 13 Mar 2024 17:36:58 +0800 Subject: [PATCH] hotfix: reset bufferCount after buffers are cleared --- .../com/oceanbase/connector/flink/sink/OceanBaseWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/sink/OceanBaseWriter.java b/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/sink/OceanBaseWriter.java index c482d18d..73f863be 100644 --- a/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/sink/OceanBaseWriter.java +++ b/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/sink/OceanBaseWriter.java @@ -204,6 +204,7 @@ public synchronized void flush(boolean endOfInput) throws IOException, Interrupt reducedBuffer.clear(); } } + bufferCount = 0; // sync write current record Record record = currentRecord.get(); @@ -221,7 +222,6 @@ public synchronized void flush(boolean endOfInput) throws IOException, Interrupt } metricGroup.getIOMetricGroup().getNumRecordsOutCounter().inc(); currentRecord.compareAndSet(record, null); - bufferCount = 0; break; } catch (Exception e) { LOG.error("OceanBaseWriter flush error, retry times = {}", i, e);