diff --git a/src/main/java/com/ozonehis/data/pipelines/BaseJob.java b/src/main/java/com/ozonehis/data/pipelines/BaseJob.java index 4be650d..7545a21 100644 --- a/src/main/java/com/ozonehis/data/pipelines/BaseJob.java +++ b/src/main/java/com/ozonehis/data/pipelines/BaseJob.java @@ -56,7 +56,7 @@ public void initConfig() { */ public MiniCluster startCluster() throws Exception { LOG.info("Starting mini cluster"); - cluster = Environment.initMiniClusterWithEnv(false); + cluster = Environment.initMiniClusterWithEnv(stream); cluster.start(); return cluster; } diff --git a/src/main/java/com/ozonehis/data/pipelines/utils/Environment.java b/src/main/java/com/ozonehis/data/pipelines/utils/Environment.java index 69f3165..5aeceec 100644 --- a/src/main/java/com/ozonehis/data/pipelines/utils/Environment.java +++ b/src/main/java/com/ozonehis/data/pipelines/utils/Environment.java @@ -78,6 +78,10 @@ public static MiniCluster initMiniClusterWithEnv(Boolean isStreaming) throws Exc flinkConfig.setString("execution.checkpointing.timeout", "10min"); flinkConfig.setString("execution.checkpointing.unaligned.enabled", "true"); flinkConfig.setString("execution.checkpointing.tolerable-failed-checkpoints", "50"); + flinkConfig.setString("execution.checkpointing.incremental", "true"); + flinkConfig.setString("table.exec.mini-batch.enabled", "true"); + flinkConfig.setString("table.exec.mini-batch.allow-latency", "5 s"); + flinkConfig.setString("table.exec.mini-batch.size", "5000"); flinkConfig.setString("table.dynamic-table-options.enabled", "true"); flinkConfig.setString( "table.exec.resource.default-parallelism", System.getenv().getOrDefault("TASK_PARALLELISM", "1")); @@ -85,7 +89,7 @@ public static MiniCluster initMiniClusterWithEnv(Boolean isStreaming) throws Exc flinkConfig.setString("state.backend.incremental", "true"); flinkConfig.setString("state.checkpoints.dir", "file:///tmp/flink/checkpoints/"); flinkConfig.setString("state.savepoints.dir", "file:///tmp/flink/savepoints/"); - flinkConfig.setInteger("state.checkpoints.num-retained", 4); + flinkConfig.setInteger("state.checkpoints.num-retained", 2); flinkConfig.setString("taskmanager.network.numberOfBuffers", "20"); flinkConfig.setString("io.tmp.dirs", "/tmp/temp"); if (isStreaming) {