Skip to content

Commit

Permalink
operator v1: add support for host-index-offset for advertised listeners
Browse files Browse the repository at this point in the history
This is required for the combination of NodePools and Private Link.
PrivateLink has a different advertised port per broker, based on Pod ordinal.
Since we can have multiple STS, we need an additional offset that makes sure
basePort + ordinal does not overlap. Therefore, nodePool Spec can provide an offset.
It defaults to 0, so nothing changes for existing clusters. In Gen2 NodePools,
cloud controlplane provides the offset for each nodepool, e.g. 100.
  • Loading branch information
birdayz committed Nov 15, 2024
1 parent 385cee7 commit 85317aa
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 19 deletions.
6 changes: 6 additions & 0 deletions operator/api/vectorized/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ type NodePoolSpec struct {
// These are applied last and will override any other command line arguments that may be defined,
// including the ones added when setting `DeveloperMode` to `true`.
AdditionalCommandlineArguments map[string]string `json:"additionalCommandlineArguments,omitempty"`

// HostIndexOffset is an additional offset on top of the host index - which
// is practically the pod ordinal.
// This makes it possible to have PrivateLink separate port ranges per NodePool.
// +optional
HostIndexOffset int `json:"hostIndexOffset"`
}

// RestartConfig contains strategies to configure how the cluster behaves when restarting, because of upgrades
Expand Down
2 changes: 1 addition & 1 deletion operator/api/vectorized/v1alpha1/cluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func (r *Cluster) validateKafkaListeners(l logr.Logger) field.ErrorList {

func checkValidEndpointTemplate(tmpl string) error {
// Using an example input to ensure that the template expression is allowed
data := utils.NewEndpointTemplateData(0, "1.2.3.4")
data := utils.NewEndpointTemplateData(0, "1.2.3.4", 0)
_, err := utils.ComputeEndpoint(tmpl, data)
return err
}
Expand Down
30 changes: 23 additions & 7 deletions operator/cmd/configurator/configurator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
hostIPEnvVar = "HOST_IP_ADDRESS"
hostNameEnvVar = "HOSTNAME"
hostPortEnvVar = "HOST_PORT"
hostIndexOffsetEnvVar = "HOST_INDEX_OFFSET"
nodeNameEnvVar = "NODE_NAME"
proxyHostPortEnvVar = "PROXY_HOST_PORT"
rackAwarenessEnvVar = "RACK_AWARENESS"
Expand Down Expand Up @@ -79,6 +80,7 @@ type configuratorConfig struct {
subdomain string
svcFQDN string
additionalListeners string
hostIndexOffset int
}

func (c *configuratorConfig) String() string {
Expand All @@ -96,7 +98,8 @@ func (c *configuratorConfig) String() string {
"proxyHostPort: %d\n"+
"rackAwareness: %t\n"+
"validateMountedVolume: %t\n"+
"additionalListeners: %s\n",
"additionalListeners: %s\n"+
"hostIndexOffset: %d\n",
c.hostName,
c.svcFQDN,
c.configSourceDir,
Expand All @@ -110,7 +113,8 @@ func (c *configuratorConfig) String() string {
c.proxyHostPort,
c.rackAwareness,
c.validateMountedVolume,
c.additionalListeners)
c.additionalListeners,
c.hostIndexOffset)
}

var errorMissingEnvironmentVariable = errors.New("missing environment variable")
Expand Down Expand Up @@ -199,7 +203,7 @@ func run(cmd *cobra.Command, args []string) {
populateRack(cfg, zone, zoneID)
}

if err = setAdditionalListeners(c.additionalListeners, c.hostIP, int(hostIndex), cfg); err != nil {
if err = setAdditionalListeners(c.additionalListeners, c.hostIP, int(hostIndex), cfg, c.hostIndexOffset); err != nil {
log.Fatalf("%s", fmt.Errorf("unable to set additional listeners: %w", err))
}

Expand Down Expand Up @@ -343,7 +347,7 @@ func registerAdvertisedKafkaAPI(
}

if c.subdomain != "" {
data := utils.NewEndpointTemplateData(int(index), c.hostIP)
data := utils.NewEndpointTemplateData(int(index), c.hostIP, c.hostIndexOffset)
ep, err := utils.ComputeEndpoint(c.externalConnectivityKafkaEndpointTemplate, data)
if err != nil {
return err
Expand Down Expand Up @@ -388,7 +392,7 @@ func registerAdvertisedPandaproxyAPI(

// Pandaproxy uses the Kafka API subdomain.
if c.subdomain != "" {
data := utils.NewEndpointTemplateData(int(index), c.hostIP)
data := utils.NewEndpointTemplateData(int(index), c.hostIP, c.hostIndexOffset)
ep, err := utils.ComputeEndpoint(c.externalConnectivityPandaProxyEndpointTemplate, data)
if err != nil {
return err
Expand Down Expand Up @@ -434,6 +438,7 @@ func checkEnvVars() (configuratorConfig, error) {
var extCon string
var rpcPort string
var hostPort string
var hostIndexOffset string

c := configuratorConfig{}

Expand Down Expand Up @@ -489,6 +494,10 @@ func checkEnvVars() (configuratorConfig, error) {
value: &c.hostIP,
name: hostIPEnvVar,
},
{
value: &hostIndexOffset,
name: hostIndexOffsetEnvVar,
},
}
for _, envVar := range envVarList {
v, exist := os.LookupEnv(envVar.name)
Expand Down Expand Up @@ -538,6 +547,13 @@ func checkEnvVars() (configuratorConfig, error) {
result = errors.Join(result, fmt.Errorf("unable to convert rpc port from string to int: %w", err))
}

if hostIndexOffset != "" {
c.hostIndexOffset, err = strconv.Atoi(hostIndexOffset)
if err != nil {
result = errors.Join(result, fmt.Errorf("unable to convert HOST_INDEX_OFFSET env var from string to int: %w", err))
}
}

c.hostPort, err = strconv.Atoi(hostPort)
if err != nil && c.externalConnectivity {
result = errors.Join(result, fmt.Errorf("unable to convert host port from string to int: %w", err))
Expand Down Expand Up @@ -573,7 +589,7 @@ func hostIndex(hostName string) (brokerID, error) {
// setAdditionalListeners sets the additional listeners in the input Redpanda config.
// sample additional listeners config string:
// {"pandaproxy.advertised_pandaproxy_api":"[{'name': 'private-link-proxy', 'address': '{{ .Index }}-f415bda0-{{ .HostIP | sha256sum | substr 0 }}.redpanda.com', 'port': {{39282 | add .Index}}}]","pandaproxy.pandaproxy_api":"[{'name': 'private-link-proxy', 'address': '0.0.0.0','port': 'port': {{39282 | add .Index}}}]","redpanda.advertised_kafka_api":"[{'name': 'private-link-kafka', 'address': '{{ .Index }}-f415bda0-{{ .HostIP | sha256sum | substr 0 }}.redpanda.com', 'port': {{30092 | add .Index}}}]","redpanda.kafka_api":"[{'name': 'private-link-kakfa', 'address': '0.0.0.0', 'port': {{30092 | add .Index}}}]"}
func setAdditionalListeners(additionalListenersCfg, hostIP string, hostIndex int, cfg *config.RedpandaYaml) error {
func setAdditionalListeners(additionalListenersCfg, hostIP string, hostIndex int, cfg *config.RedpandaYaml, hostIndexOffset int) error {
if additionalListenersCfg == "" || additionalListenersCfg == "{}" {
return nil
}
Expand All @@ -588,7 +604,7 @@ func setAdditionalListeners(additionalListenersCfg, hostIP string, hostIndex int
nodeConfig := config.ProdDefault()
for _, k := range additionalListenerCfgNames {
if v, found := additionalListeners[k]; found {
res, err := utils.Compute(v, utils.NewEndpointTemplateData(hostIndex, hostIP), false)
res, err := utils.Compute(v, utils.NewEndpointTemplateData(hostIndex, hostIP, hostIndexOffset), false)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion operator/cmd/configurator/configurator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func TestAdditionalListeners(t *testing.T) { //nolint
}
for i := 0; i < len(tests); i++ {
tt := &tests[i]
err := setAdditionalListeners(tt.addtionalListenersCfg, tt.hostIP, tt.hostIndex, &tt.nodeCfg)
err := setAdditionalListeners(tt.addtionalListenersCfg, tt.hostIP, tt.hostIndex, &tt.nodeCfg, 0)
if tt.expectedError {
assert.Error(t, err)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,12 @@ spec:
description: Storage class name - https://kubernetes.io/docs/concepts/storage/storage-classes/
type: string
type: object
hostIndexOffset:
description: |-
HostIndexOffset is an additional offset on top of the host index - which
is practically the pod ordinal.
This makes it possible to have PrivateLink separate port ranges per NodePool.
type: integer
name:
description: Name of the NodePool. Must be unique, and must
not be "default".
Expand Down
19 changes: 14 additions & 5 deletions operator/internal/controller/vectorized/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ func (r *ClusterReconciler) createExternalNodesList(
}

if externalKafkaListener != nil && externalKafkaListener.External.Subdomain != "" {
address, err := subdomainAddress(externalKafkaListener.External.EndpointTemplate, &pod, externalKafkaListener.External.Subdomain, getNodePort(&nodePortSvc, resources.ExternalListenerName))
address, err := subdomainAddress(externalKafkaListener.External.EndpointTemplate, &pod, externalKafkaListener.External.Subdomain, getNodePort(&nodePortSvc, resources.ExternalListenerName), pandaCluster)
if err != nil {
return nil, err
}
Expand All @@ -726,7 +726,7 @@ func (r *ClusterReconciler) createExternalNodesList(
}

if externalAdminListener != nil && externalAdminListener.External.Subdomain != "" {
address, err := subdomainAddress(externalAdminListener.External.EndpointTemplate, &pod, externalAdminListener.External.Subdomain, getNodePort(&nodePortSvc, resources.AdminPortExternalName))
address, err := subdomainAddress(externalAdminListener.External.EndpointTemplate, &pod, externalAdminListener.External.Subdomain, getNodePort(&nodePortSvc, resources.AdminPortExternalName), pandaCluster)
if err != nil {
return nil, err
}
Expand All @@ -740,7 +740,7 @@ func (r *ClusterReconciler) createExternalNodesList(
}

if externalProxyListener != nil && externalProxyListener.External.Subdomain != "" {
address, err := subdomainAddress(externalProxyListener.External.EndpointTemplate, &pod, externalProxyListener.External.Subdomain, getNodePort(&nodePortSvc, resources.PandaproxyPortExternalName))
address, err := subdomainAddress(externalProxyListener.External.EndpointTemplate, &pod, externalProxyListener.External.Subdomain, getNodePort(&nodePortSvc, resources.PandaproxyPortExternalName), pandaCluster)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1032,14 +1032,23 @@ func needExternalIP(external vectorizedv1alpha1.ExternalConnectivityConfig) bool
}

func subdomainAddress(
tmpl string, pod *corev1.Pod, subdomain string, port int32,
tmpl string, pod *corev1.Pod, subdomain string, port int32, pandaCluster *vectorizedv1alpha1.Cluster,
) (string, error) {
prefixLen := len(pod.GenerateName)
index, err := strconv.Atoi(pod.Name[prefixLen:])
if err != nil {
return "", fmt.Errorf("could not parse node ID from pod name %s: %w", pod.Name, err)
}
data := utils.NewEndpointTemplateData(index, pod.Status.HostIP)
var hostIndexOffset int
if val, ok := pod.GetAnnotations()[labels.NodePoolKey]; ok {
for _, np := range pandaCluster.GetNodePoolsFromSpec() {
if np.Name == val {
hostIndexOffset = np.HostIndexOffset
break
}
}
}
data := utils.NewEndpointTemplateData(index, pod.Status.HostIP, hostIndexOffset)
ep, err := utils.ComputeEndpoint(tmpl, data)
if err != nil {
return "", err
Expand Down
6 changes: 5 additions & 1 deletion operator/pkg/resources/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,10 @@ func (r *StatefulSetResource) obj(
Image: r.fullConfiguratorImage(),
ImagePullPolicy: r.configuratorSettings.ImagePullPolicy,
Env: append([]corev1.EnvVar{
{
Name: "HOST_INDEX_OFFSET",
Value: strconv.Itoa(r.nodePool.NodePoolSpec.HostIndexOffset),
},
{
Name: "SERVICE_FQDN",
Value: r.serviceFQDN,
Expand Down Expand Up @@ -1025,7 +1029,7 @@ func (r *StatefulSetResource) GetPortsForListenersInAdditionalConfig() []corev1.
additionalNode0Config := config.ProdDefault()
for _, k := range additionalListenerCfgNames {
if v, found := r.pandaCluster.Spec.AdditionalConfiguration[k]; found {
res, err := utils.Compute(v, utils.NewEndpointTemplateData(0, "dummy"), false)
res, err := utils.Compute(v, utils.NewEndpointTemplateData(0, "dummy", 0), false)
if err != nil {
r.logger.Error(err, "failed to evaluate template", "template", v)
continue
Expand Down
12 changes: 9 additions & 3 deletions operator/pkg/utils/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,19 @@ type EndpointTemplateData struct {
// value of hostIP that is also available in pod status (status.hostIP,
// available also in Kubernetes downward API).
HostIP string

// HostIndexOffset is an offset based on the NodePool.
// This allows users to build endpoint templates, that have non-overlapping
// port ranges on different nodepools.
HostIndexOffset int
}

// NewEndpointTemplateData creates endpoint template data with all required fields.
func NewEndpointTemplateData(index int, hostIP string) EndpointTemplateData {
func NewEndpointTemplateData(index int, hostIP string, hostIndexOffset int) EndpointTemplateData {
return EndpointTemplateData{
Index: index,
HostIP: hostIP,
Index: index,
HostIP: hostIP,
HostIndexOffset: hostIndexOffset,
}
}

Expand Down
7 changes: 6 additions & 1 deletion operator/pkg/utils/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

func TestTemplateGen(t *testing.T) {
data := utils.NewEndpointTemplateData(2, "1.1.1.1")
data := utils.NewEndpointTemplateData(2, "1.1.1.1", 100)
tests := []struct {
tmpl string
endpointContainsPort bool
Expand Down Expand Up @@ -67,6 +67,11 @@ func TestTemplateGen(t *testing.T) {
expected: "'address': '2-f1412386.redpanda.com', 'port': 30092",
endpointContainsPort: true,
},
{
tmpl: "'address': '{{.Index}}-{{.HostIP | sha256sum | substr 0 8}}.redpanda.com', 'port': {{30092 | add (.Index | sub .Index | add .HostIndexOffset )}}",
expected: "'address': '2-f1412386.redpanda.com', 'port': 30192",
endpointContainsPort: true,
},
}

for _, tc := range tests {
Expand Down

0 comments on commit 85317aa

Please sign in to comment.