From 814169efe0cea2313a2bc81f5704655ee41f9c7a Mon Sep 17 00:00:00 2001 From: Emmanuel Nyachoke Date: Mon, 24 Jun 2024 14:41:03 +0300 Subject: [PATCH] KH-560: Allow setting parallelism and number of task slots via envs (#29) --- .../java/com/ozonehis/data/pipelines/utils/Environment.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/ozonehis/data/pipelines/utils/Environment.java b/src/main/java/com/ozonehis/data/pipelines/utils/Environment.java index c997ca2..d6d1dce 100644 --- a/src/main/java/com/ozonehis/data/pipelines/utils/Environment.java +++ b/src/main/java/com/ozonehis/data/pipelines/utils/Environment.java @@ -35,7 +35,7 @@ public static MiniCluster initMiniClusterWithEnv(Boolean isStreaming) throws Exc Configuration flinkConfig = new Configuration(); String port = System.getProperty(Constants.PROP_FLINK_REST_PORT); if (StringUtils.isBlank(port)) { - port = Environment.getEnv("FLINK_REST_PORT", null); + port = Environment.getEnv("FLINK_REST_PORT", "8081"); } if (StringUtils.isNotBlank(port)) { @@ -79,7 +79,7 @@ public static MiniCluster initMiniClusterWithEnv(Boolean isStreaming) throws Exc flinkConfig.setString("execution.checkpointing.unaligned.enabled", "true"); flinkConfig.setString("execution.checkpointing.tolerable-failed-checkpoints", "50"); flinkConfig.setString("table.dynamic-table-options.enabled", "true"); - flinkConfig.setString("table.exec.resource.default-parallelism", "1"); + flinkConfig.setString("table.exec.resource.default-parallelism", System.getenv().getOrDefault("TASK_PARALLELISM", "1")); flinkConfig.setString("state.backend.type", "rocksdb"); flinkConfig.setString("state.backend.incremental", "true"); flinkConfig.setString("state.checkpoints.dir", "file:///tmp/flink/checkpoints/"); @@ -101,7 +101,7 @@ public static MiniCluster initMiniClusterWithEnv(Boolean isStreaming) throws Exc .setString("port", "9250"); MiniClusterConfiguration clusterConfig = new MiniClusterConfiguration.Builder() .setNumTaskManagers(1) - .setNumSlotsPerTaskManager(20) + .setNumSlotsPerTaskManager(Integer.parseInt(System.getenv().getOrDefault("TASK_MANAGER_SLOTS", "20"))) .setConfiguration(flinkConfig) .build(); MiniCluster cluster = new MiniCluster(clusterConfig);