From 810a7f4be2fa0d0c9bc0bcc266c3d702064c69f4 Mon Sep 17 00:00:00 2001 From: Anand Swaminathan Date: Mon, 29 Apr 2024 13:17:41 -0700 Subject: [PATCH] Add support for SchedulerName for Flink pods --- pkg/controller/config/config.go | 1 + pkg/controller/config/config_flags.go | 1 + pkg/controller/flink/job_manager_controller.go | 1 + pkg/controller/flink/task_manager_controller.go | 1 + 4 files changed, 4 insertions(+) diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index 88948939..b3fd3832 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -25,6 +25,7 @@ type Config struct { MaxBackoffDuration config.Duration `json:"maxBackoffDuration" pflag:"\"30s\",Determines the max backoff for exponential retries."` MaxErrDuration config.Duration `json:"maxErrDuration" pflag:"\"5m\",Determines the max time to wait on errors."` FlinkJobVertexTimeout config.Duration `json:"flinkJobVertexTimeout" pflag:"\"3m\",Determines the max time to wait on job vertex state turns into RUNNING."` + SchedulerName string `json:"schedulerName"` } func GetConfig() *Config { diff --git a/pkg/controller/config/config_flags.go b/pkg/controller/config/config_flags.go index 7e631bcb..cc389078 100755 --- a/pkg/controller/config/config_flags.go +++ b/pkg/controller/config/config_flags.go @@ -54,5 +54,6 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "maxBackoffDuration"), "30s", "Determines the max backoff for exponential retries.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "maxErrDuration"), "5m", "Determines the max time to wait on errors.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "flinkJobVertexTimeout"), "3m", "Determines the max time to wait on job vertex state turns into RUNNING.") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "schedulerName"), *new(string), "") return cmdFlags } diff --git a/pkg/controller/flink/job_manager_controller.go b/pkg/controller/flink/job_manager_controller.go index 3fd8a521..d5d78ccb 100644 --- a/pkg/controller/flink/job_manager_controller.go +++ b/pkg/controller/flink/job_manager_controller.go @@ -371,6 +371,7 @@ func jobmanagerTemplate(app *v1beta1.FlinkApplication) *v1.Deployment { NodeSelector: app.Spec.JobManagerConfig.NodeSelector, Tolerations: app.Spec.JobManagerConfig.Tolerations, Affinity: app.Spec.JobManagerConfig.Affinity, + SchedulerName: config.GetConfig().SchedulerName, }, }, }, diff --git a/pkg/controller/flink/task_manager_controller.go b/pkg/controller/flink/task_manager_controller.go index 1cff4507..ce7dd229 100644 --- a/pkg/controller/flink/task_manager_controller.go +++ b/pkg/controller/flink/task_manager_controller.go @@ -216,6 +216,7 @@ func taskmanagerTemplate(app *v1beta1.FlinkApplication) *v1.Deployment { NodeSelector: app.Spec.TaskManagerConfig.NodeSelector, Tolerations: app.Spec.TaskManagerConfig.Tolerations, Affinity: app.Spec.TaskManagerConfig.Affinity, + SchedulerName: config.GetConfig().SchedulerName, }, }, },