Skip to content

Commit

Permalink
KH-512: Fix HA not enabled for streaming, add incremental checkpoints…
Browse files Browse the repository at this point in the history
… and minibatching
  • Loading branch information
enyachoke committed Jul 5, 2024
1 parent 01d7b2c commit 62c9c38
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/ozonehis/data/pipelines/BaseJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void initConfig() {
*/
public MiniCluster startCluster() throws Exception {
LOG.info("Starting mini cluster");
cluster = Environment.initMiniClusterWithEnv(false);
cluster = Environment.initMiniClusterWithEnv(stream);
cluster.start();
return cluster;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,18 @@ public static MiniCluster initMiniClusterWithEnv(Boolean isStreaming) throws Exc
flinkConfig.setString("execution.checkpointing.timeout", "10min");
flinkConfig.setString("execution.checkpointing.unaligned.enabled", "true");
flinkConfig.setString("execution.checkpointing.tolerable-failed-checkpoints", "50");
flinkConfig.setString("execution.checkpointing.incremental", "true");
flinkConfig.setString("table.exec.mini-batch.enabled", "true");
flinkConfig.setString("table.exec.mini-batch.allow-latency", "5 s");
flinkConfig.setString("table.exec.mini-batch.size", "5000");
flinkConfig.setString("table.dynamic-table-options.enabled", "true");
flinkConfig.setString(
"table.exec.resource.default-parallelism", System.getenv().getOrDefault("TASK_PARALLELISM", "1"));
flinkConfig.setString("state.backend.type", "rocksdb");
flinkConfig.setString("state.backend.incremental", "true");
flinkConfig.setString("state.checkpoints.dir", "file:///tmp/flink/checkpoints/");
flinkConfig.setString("state.savepoints.dir", "file:///tmp/flink/savepoints/");
flinkConfig.setInteger("state.checkpoints.num-retained", 4);
flinkConfig.setInteger("state.checkpoints.num-retained", 2);
flinkConfig.setString("taskmanager.network.numberOfBuffers", "20");
flinkConfig.setString("io.tmp.dirs", "/tmp/temp");
if (isStreaming) {
Expand Down

0 comments on commit 62c9c38

Please sign in to comment.