diff --git a/operator/api/vectorized/v1alpha1/cluster_types.go b/operator/api/vectorized/v1alpha1/cluster_types.go index a072a3d43..437638ef4 100644 --- a/operator/api/vectorized/v1alpha1/cluster_types.go +++ b/operator/api/vectorized/v1alpha1/cluster_types.go @@ -228,6 +228,12 @@ type NodePoolSpec struct { // These are applied last and will override any other command line arguments that may be defined, // including the ones added when setting `DeveloperMode` to `true`. AdditionalCommandlineArguments map[string]string `json:"additionalCommandlineArguments,omitempty"` + + // HostIndexOffset is an additional offset on top of the host index - which + // is practically the pod ordinal. + // This makes it possible to have PrivateLink separate port ranges per NodePool. + // +optional + HostIndexOffset int `json:"hostIndexOffset"` } // RestartConfig contains strategies to configure how the cluster behaves when restarting, because of upgrades diff --git a/operator/api/vectorized/v1alpha1/cluster_webhook.go b/operator/api/vectorized/v1alpha1/cluster_webhook.go index 26ac478f7..fc8699e16 100644 --- a/operator/api/vectorized/v1alpha1/cluster_webhook.go +++ b/operator/api/vectorized/v1alpha1/cluster_webhook.go @@ -496,7 +496,7 @@ func (r *Cluster) validateKafkaListeners(l logr.Logger) field.ErrorList { func checkValidEndpointTemplate(tmpl string) error { // Using an example input to ensure that the template expression is allowed - data := utils.NewEndpointTemplateData(0, "1.2.3.4") + data := utils.NewEndpointTemplateData(0, "1.2.3.4", 0) _, err := utils.ComputeEndpoint(tmpl, data) return err } diff --git a/operator/cmd/configurator/configurator.go b/operator/cmd/configurator/configurator.go index 57c566b17..6a96fe9ad 100644 --- a/operator/cmd/configurator/configurator.go +++ b/operator/cmd/configurator/configurator.go @@ -50,6 +50,7 @@ const ( hostIPEnvVar = "HOST_IP_ADDRESS" hostNameEnvVar = "HOSTNAME" hostPortEnvVar = "HOST_PORT" + hostIndexOffsetEnvVar = "HOST_INDEX_OFFSET" nodeNameEnvVar = "NODE_NAME" proxyHostPortEnvVar = "PROXY_HOST_PORT" rackAwarenessEnvVar = "RACK_AWARENESS" @@ -79,6 +80,7 @@ type configuratorConfig struct { subdomain string svcFQDN string additionalListeners string + hostIndexOffset int } func (c *configuratorConfig) String() string { @@ -96,7 +98,8 @@ func (c *configuratorConfig) String() string { "proxyHostPort: %d\n"+ "rackAwareness: %t\n"+ "validateMountedVolume: %t\n"+ - "additionalListeners: %s\n", + "additionalListeners: %s\n"+ + "hostIndexOffset: %d\n", c.hostName, c.svcFQDN, c.configSourceDir, @@ -110,7 +113,8 @@ func (c *configuratorConfig) String() string { c.proxyHostPort, c.rackAwareness, c.validateMountedVolume, - c.additionalListeners) + c.additionalListeners, + c.hostIndexOffset) } var errorMissingEnvironmentVariable = errors.New("missing environment variable") @@ -199,7 +203,7 @@ func run(cmd *cobra.Command, args []string) { populateRack(cfg, zone, zoneID) } - if err = setAdditionalListeners(c.additionalListeners, c.hostIP, int(hostIndex), cfg); err != nil { + if err = setAdditionalListeners(c.additionalListeners, c.hostIP, int(hostIndex), cfg, c.hostIndexOffset); err != nil { log.Fatalf("%s", fmt.Errorf("unable to set additional listeners: %w", err)) } @@ -343,7 +347,7 @@ func registerAdvertisedKafkaAPI( } if c.subdomain != "" { - data := utils.NewEndpointTemplateData(int(index), c.hostIP) + data := utils.NewEndpointTemplateData(int(index), c.hostIP, c.hostIndexOffset) ep, err := utils.ComputeEndpoint(c.externalConnectivityKafkaEndpointTemplate, data) if err != nil { return err @@ -388,7 +392,7 @@ func registerAdvertisedPandaproxyAPI( // Pandaproxy uses the Kafka API subdomain. if c.subdomain != "" { - data := utils.NewEndpointTemplateData(int(index), c.hostIP) + data := utils.NewEndpointTemplateData(int(index), c.hostIP, c.hostIndexOffset) ep, err := utils.ComputeEndpoint(c.externalConnectivityPandaProxyEndpointTemplate, data) if err != nil { return err @@ -434,6 +438,7 @@ func checkEnvVars() (configuratorConfig, error) { var extCon string var rpcPort string var hostPort string + var hostIndexOffset string c := configuratorConfig{} @@ -489,6 +494,10 @@ func checkEnvVars() (configuratorConfig, error) { value: &c.hostIP, name: hostIPEnvVar, }, + { + value: &hostIndexOffset, + name: hostIndexOffsetEnvVar, + }, } for _, envVar := range envVarList { v, exist := os.LookupEnv(envVar.name) @@ -538,6 +547,13 @@ func checkEnvVars() (configuratorConfig, error) { result = errors.Join(result, fmt.Errorf("unable to convert rpc port from string to int: %w", err)) } + if hostIndexOffset != "" { + c.hostIndexOffset, err = strconv.Atoi(hostIndexOffset) + if err != nil { + result = errors.Join(result, fmt.Errorf("unable to convert HOST_INDEX_OFFSET env var from string to int: %w", err)) + } + } + c.hostPort, err = strconv.Atoi(hostPort) if err != nil && c.externalConnectivity { result = errors.Join(result, fmt.Errorf("unable to convert host port from string to int: %w", err)) @@ -573,7 +589,7 @@ func hostIndex(hostName string) (brokerID, error) { // setAdditionalListeners sets the additional listeners in the input Redpanda config. // sample additional listeners config string: // {"pandaproxy.advertised_pandaproxy_api":"[{'name': 'private-link-proxy', 'address': '{{ .Index }}-f415bda0-{{ .HostIP | sha256sum | substr 0 }}.redpanda.com', 'port': {{39282 | add .Index}}}]","pandaproxy.pandaproxy_api":"[{'name': 'private-link-proxy', 'address': '0.0.0.0','port': 'port': {{39282 | add .Index}}}]","redpanda.advertised_kafka_api":"[{'name': 'private-link-kafka', 'address': '{{ .Index }}-f415bda0-{{ .HostIP | sha256sum | substr 0 }}.redpanda.com', 'port': {{30092 | add .Index}}}]","redpanda.kafka_api":"[{'name': 'private-link-kakfa', 'address': '0.0.0.0', 'port': {{30092 | add .Index}}}]"} -func setAdditionalListeners(additionalListenersCfg, hostIP string, hostIndex int, cfg *config.RedpandaYaml) error { +func setAdditionalListeners(additionalListenersCfg, hostIP string, hostIndex int, cfg *config.RedpandaYaml, hostIndexOffset int) error { if additionalListenersCfg == "" || additionalListenersCfg == "{}" { return nil } @@ -588,7 +604,7 @@ func setAdditionalListeners(additionalListenersCfg, hostIP string, hostIndex int nodeConfig := config.ProdDefault() for _, k := range additionalListenerCfgNames { if v, found := additionalListeners[k]; found { - res, err := utils.Compute(v, utils.NewEndpointTemplateData(hostIndex, hostIP), false) + res, err := utils.Compute(v, utils.NewEndpointTemplateData(hostIndex, hostIP, hostIndexOffset), false) if err != nil { return err } diff --git a/operator/cmd/configurator/configurator_test.go b/operator/cmd/configurator/configurator_test.go index 88c075ed9..1e3831d9b 100644 --- a/operator/cmd/configurator/configurator_test.go +++ b/operator/cmd/configurator/configurator_test.go @@ -348,7 +348,7 @@ func TestAdditionalListeners(t *testing.T) { //nolint } for i := 0; i < len(tests); i++ { tt := &tests[i] - err := setAdditionalListeners(tt.addtionalListenersCfg, tt.hostIP, tt.hostIndex, &tt.nodeCfg) + err := setAdditionalListeners(tt.addtionalListenersCfg, tt.hostIP, tt.hostIndex, &tt.nodeCfg, 0) if tt.expectedError { assert.Error(t, err) } else { diff --git a/operator/config/crd/bases/redpanda.vectorized.io_clusters.yaml b/operator/config/crd/bases/redpanda.vectorized.io_clusters.yaml index d81382ed0..1798f9309 100644 --- a/operator/config/crd/bases/redpanda.vectorized.io_clusters.yaml +++ b/operator/config/crd/bases/redpanda.vectorized.io_clusters.yaml @@ -1050,6 +1050,12 @@ spec: description: Storage class name - https://kubernetes.io/docs/concepts/storage/storage-classes/ type: string type: object + hostIndexOffset: + description: |- + HostIndexOffset is an additional offset on top of the host index - which + is practically the pod ordinal. + This makes it possible to have PrivateLink separate port ranges per NodePool. + type: integer name: description: Name of the NodePool. Must be unique, and must not be "default". diff --git a/operator/internal/controller/vectorized/cluster_controller.go b/operator/internal/controller/vectorized/cluster_controller.go index c902c9db9..2a4289cd2 100644 --- a/operator/internal/controller/vectorized/cluster_controller.go +++ b/operator/internal/controller/vectorized/cluster_controller.go @@ -712,7 +712,7 @@ func (r *ClusterReconciler) createExternalNodesList( } if externalKafkaListener != nil && externalKafkaListener.External.Subdomain != "" { - address, err := subdomainAddress(externalKafkaListener.External.EndpointTemplate, &pod, externalKafkaListener.External.Subdomain, getNodePort(&nodePortSvc, resources.ExternalListenerName)) + address, err := subdomainAddress(externalKafkaListener.External.EndpointTemplate, &pod, externalKafkaListener.External.Subdomain, getNodePort(&nodePortSvc, resources.ExternalListenerName), pandaCluster) if err != nil { return nil, err } @@ -726,7 +726,7 @@ func (r *ClusterReconciler) createExternalNodesList( } if externalAdminListener != nil && externalAdminListener.External.Subdomain != "" { - address, err := subdomainAddress(externalAdminListener.External.EndpointTemplate, &pod, externalAdminListener.External.Subdomain, getNodePort(&nodePortSvc, resources.AdminPortExternalName)) + address, err := subdomainAddress(externalAdminListener.External.EndpointTemplate, &pod, externalAdminListener.External.Subdomain, getNodePort(&nodePortSvc, resources.AdminPortExternalName), pandaCluster) if err != nil { return nil, err } @@ -740,7 +740,7 @@ func (r *ClusterReconciler) createExternalNodesList( } if externalProxyListener != nil && externalProxyListener.External.Subdomain != "" { - address, err := subdomainAddress(externalProxyListener.External.EndpointTemplate, &pod, externalProxyListener.External.Subdomain, getNodePort(&nodePortSvc, resources.PandaproxyPortExternalName)) + address, err := subdomainAddress(externalProxyListener.External.EndpointTemplate, &pod, externalProxyListener.External.Subdomain, getNodePort(&nodePortSvc, resources.PandaproxyPortExternalName), pandaCluster) if err != nil { return nil, err } @@ -1032,14 +1032,23 @@ func needExternalIP(external vectorizedv1alpha1.ExternalConnectivityConfig) bool } func subdomainAddress( - tmpl string, pod *corev1.Pod, subdomain string, port int32, + tmpl string, pod *corev1.Pod, subdomain string, port int32, pandaCluster *vectorizedv1alpha1.Cluster, ) (string, error) { prefixLen := len(pod.GenerateName) index, err := strconv.Atoi(pod.Name[prefixLen:]) if err != nil { return "", fmt.Errorf("could not parse node ID from pod name %s: %w", pod.Name, err) } - data := utils.NewEndpointTemplateData(index, pod.Status.HostIP) + var hostIndexOffset int + if val, ok := pod.GetAnnotations()[labels.NodePoolKey]; ok { + for _, np := range pandaCluster.GetNodePoolsFromSpec() { + if np.Name == val { + hostIndexOffset = np.HostIndexOffset + break + } + } + } + data := utils.NewEndpointTemplateData(index, pod.Status.HostIP, hostIndexOffset) ep, err := utils.ComputeEndpoint(tmpl, data) if err != nil { return "", err diff --git a/operator/pkg/resources/statefulset.go b/operator/pkg/resources/statefulset.go index 16e34d799..edb0822b3 100644 --- a/operator/pkg/resources/statefulset.go +++ b/operator/pkg/resources/statefulset.go @@ -497,6 +497,10 @@ func (r *StatefulSetResource) obj( Image: r.fullConfiguratorImage(), ImagePullPolicy: r.configuratorSettings.ImagePullPolicy, Env: append([]corev1.EnvVar{ + { + Name: "HOST_INDEX_OFFSET", + Value: strconv.Itoa(r.nodePool.NodePoolSpec.HostIndexOffset), + }, { Name: "SERVICE_FQDN", Value: r.serviceFQDN, @@ -1025,7 +1029,7 @@ func (r *StatefulSetResource) GetPortsForListenersInAdditionalConfig() []corev1. additionalNode0Config := config.ProdDefault() for _, k := range additionalListenerCfgNames { if v, found := r.pandaCluster.Spec.AdditionalConfiguration[k]; found { - res, err := utils.Compute(v, utils.NewEndpointTemplateData(0, "dummy"), false) + res, err := utils.Compute(v, utils.NewEndpointTemplateData(0, "dummy", 0), false) if err != nil { r.logger.Error(err, "failed to evaluate template", "template", v) continue diff --git a/operator/pkg/utils/template.go b/operator/pkg/utils/template.go index ef21496ca..5a1cf555f 100644 --- a/operator/pkg/utils/template.go +++ b/operator/pkg/utils/template.go @@ -30,13 +30,19 @@ type EndpointTemplateData struct { // value of hostIP that is also available in pod status (status.hostIP, // available also in Kubernetes downward API). HostIP string + + // HostIndexOffset is an offset based on the NodePool. + // This allows users to build endpoint templates, that have non-overlapping + // port ranges on different nodepools. + HostIndexOffset int } // NewEndpointTemplateData creates endpoint template data with all required fields. -func NewEndpointTemplateData(index int, hostIP string) EndpointTemplateData { +func NewEndpointTemplateData(index int, hostIP string, hostIndexOffset int) EndpointTemplateData { return EndpointTemplateData{ - Index: index, - HostIP: hostIP, + Index: index, + HostIP: hostIP, + HostIndexOffset: hostIndexOffset, } } diff --git a/operator/pkg/utils/template_test.go b/operator/pkg/utils/template_test.go index b8ebd8c01..f5df4feb9 100644 --- a/operator/pkg/utils/template_test.go +++ b/operator/pkg/utils/template_test.go @@ -18,7 +18,7 @@ import ( ) func TestTemplateGen(t *testing.T) { - data := utils.NewEndpointTemplateData(2, "1.1.1.1") + data := utils.NewEndpointTemplateData(2, "1.1.1.1", 100) tests := []struct { tmpl string endpointContainsPort bool @@ -67,6 +67,11 @@ func TestTemplateGen(t *testing.T) { expected: "'address': '2-f1412386.redpanda.com', 'port': 30092", endpointContainsPort: true, }, + { + tmpl: "'address': '{{.Index}}-{{.HostIP | sha256sum | substr 0 8}}.redpanda.com', 'port': {{30092 | add (.Index | sub .Index | add .HostIndexOffset )}}", + expected: "'address': '2-f1412386.redpanda.com', 'port': 30192", + endpointContainsPort: true, + }, } for _, tc := range tests {