From 488571ab1658db40d93b3b9b83095629c26adaba Mon Sep 17 00:00:00 2001 From: Gabriel Saratura Date: Wed, 8 Jan 2025 14:29:11 +0100 Subject: [PATCH] Add webhook logic --- pkg/controller/webhooks/postgresql.go | 287 +++++++++++--------------- 1 file changed, 120 insertions(+), 167 deletions(-) diff --git a/pkg/controller/webhooks/postgresql.go b/pkg/controller/webhooks/postgresql.go index 21c50f4f92..a2eb988499 100644 --- a/pkg/controller/webhooks/postgresql.go +++ b/pkg/controller/webhooks/postgresql.go @@ -4,17 +4,18 @@ import ( "context" "encoding/json" "fmt" - - vshnv1 "github.com/vshn/appcat/v4/apis/vshn/v1" "github.com/vshn/appcat/v4/pkg/common/quotas" "github.com/vshn/appcat/v4/pkg/common/utils" - apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" + "sigs.k8s.io/controller-runtime/pkg/webhook" + "strconv" + + vshnv1 "github.com/vshn/appcat/v4/apis/vshn/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/validation/field" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/webhook" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" ) @@ -28,39 +29,40 @@ import ( //+kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch;patch;update;delete //+kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;patch;update;delete +const ( + maxResourceNameLength = 30 +) + var ( pgGK = schema.GroupKind{Group: "vshn.appcat.vshn.io", Kind: "VSHNPostgreSQL"} pgGR = schema.GroupResource{Group: pgGK.Group, Resource: "vshnpostgresqls"} -) -var _ webhook.CustomValidator = &PostgreSQLWebhookHandler{} - -var blocklist = map[string]string{ - "listen_addresses": "", - "port": "", - "cluster_name": "", - "hot_standby": "", - "fsync": "", - "full_page_writes": "", - "log_destination": "", - "logging_collector": "", - "max_replication_slots": "", - "max_wal_senders": "", - "wal_keep_segments": "", - "wal_level": "", - "wal_log_hints": "", - "archive_mode": "", - "archive_command": "", -} + _ webhook.CustomValidator = &PostgreSQLWebhookHandler{} + + blocklist = map[string]string{ + "listen_addresses": "", + "port": "", + "cluster_name": "", + "hot_standby": "", + "fsync": "", + "full_page_writes": "", + "log_destination": "", + "logging_collector": "", + "max_replication_slots": "", + "max_wal_senders": "", + "wal_keep_segments": "", + "wal_level": "", + "wal_log_hints": "", + "archive_mode": "", + "archive_command": "", + } +) -// PostgreSQLWebhookHandler handles all quota webhooks concerning postgresql by vshn. type PostgreSQLWebhookHandler struct { DefaultWebhookHandler } -// SetupPostgreSQLWebhookHandlerWithManager registers the validation webhook with the manager. func SetupPostgreSQLWebhookHandlerWithManager(mgr ctrl.Manager, withQuota bool) error { - return ctrl.NewWebhookManagedBy(mgr). For(&vshnv1.VSHNPostgreSQL{}). WithValidator(&PostgreSQLWebhookHandler{ @@ -77,145 +79,73 @@ func SetupPostgreSQLWebhookHandlerWithManager(mgr ctrl.Manager, withQuota bool) Complete() } -// ValidateCreate implements webhook.CustomValidator so a webhook will be registered for the type func (p *PostgreSQLWebhookHandler) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) { - allErrs := field.ErrorList{} - pg, ok := obj.(*vshnv1.VSHNPostgreSQL) - if !ok { - return nil, fmt.Errorf("provided manifest is not a valid VSHNPostgreSQL object") - } - - err := validateVacuumRepack(pg.Spec.Parameters.Service.VacuumEnabled, pg.Spec.Parameters.Service.RepackEnabled) - if err != nil { - allErrs = append(allErrs, &field.Error{ - Field: "spec.parameters.service", - Detail: fmt.Sprintf("pg.Spec.Parameters.Service.VacuumEnabled and pg.Spec.Parameters.Service.RepackEnabled settings can't be both disabled: %s", err.Error()), - Type: field.ErrorTypeForbidden, - }) - } - - if p.withQuota { - quotaErrs, fieldErrs := p.checkPostgreSQLQuotas(ctx, pg, true) - if quotaErrs != nil { - allErrs = append(allErrs, &field.Error{ - Field: "quota", - Detail: fmt.Sprintf("quota check failed: %s", - quotaErrs.Error()), - BadValue: "*your namespace quota*", - Type: field.ErrorTypeForbidden, - }) - } - allErrs = append(allErrs, fieldErrs...) - } - - instancesError := p.checkGuaranteedAvailability(pg) - - allErrs = append(allErrs, instancesError...) - - // longest postfix is 26 chars for the sgbackup object (eg. "-952zx-2024-07-25-12-50-10"). Max SgBackup length is 56, therefore 30 characters is the maximum length - err = p.validateResourceNameLength(pg.GetName(), 30) - if err != nil { - allErrs = append(allErrs, &field.Error{ - Field: ".metadata.name", - Detail: fmt.Sprintf("Please shorten PostgreSQL name to 30 characters or less: %s", - err.Error()), - BadValue: pg.GetName(), - Type: field.ErrorTypeTooLong, - }) - } - - errList := validatePgConf(pg) - if errList != nil { - allErrs = append(allErrs, errList...) - } - - if len(allErrs) != 0 { - return nil, apierrors.NewInvalid( - pgGK, - pg.GetName(), - allErrs, - ) - } - - return nil, nil + return p.validatePostgreSQL(ctx, obj, nil, true) } -// ValidateUpdate implements webhook.CustomValidator so a webhook will be registered for the type func (p *PostgreSQLWebhookHandler) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) { + return p.validatePostgreSQL(ctx, newObj, oldObj, false) +} +func (p *PostgreSQLWebhookHandler) validatePostgreSQL(ctx context.Context, newObj, oldObj runtime.Object, isCreate bool) (admission.Warnings, error) { allErrs := field.ErrorList{} - pg, ok := newObj.(*vshnv1.VSHNPostgreSQL) + newPg, ok := newObj.(*vshnv1.VSHNPostgreSQL) + oldPg, ok := oldObj.(*vshnv1.VSHNPostgreSQL) if !ok { return nil, fmt.Errorf("provided manifest is not a valid VSHNPostgreSQL object") } - if pg.DeletionTimestamp != nil { - return nil, nil + if !isCreate { + if newPg.DeletionTimestamp != nil { + return nil, nil + } + + // Validate major upgrades + allErrs = append(allErrs, validateMajorVersionUpgrade(newPg, oldPg)) } - err := validateVacuumRepack(pg.Spec.Parameters.Service.VacuumEnabled, pg.Spec.Parameters.Service.RepackEnabled) - if err != nil { - allErrs = append(allErrs, &field.Error{ - Field: "spec.parameters.service", - Detail: fmt.Sprintf("pg.Spec.Parameters.Service.VacuumEnabled and pg.Spec.Parameters.Service.RepackEnabled settings can't be both disabled: %s", err.Error()), - Type: field.ErrorTypeForbidden, - }) + // Validate Vacuum and Repack settings + if err := validateVacuumRepack(newPg.Spec.Parameters.Service.VacuumEnabled, newPg.Spec.Parameters.Service.RepackEnabled); err != nil { + allErrs = append(allErrs, field.Forbidden( + field.NewPath("spec.parameters.service"), + fmt.Sprintf("pg.Spec.Parameters.Service.VacuumEnabled and pg.Spec.Parameters.Service.RepackEnabled settings can't be both disabled: %s", err.Error()), + )) } + // Validate quotas if enabled if p.withQuota { - quotaErrs, fieldErrs := p.checkPostgreSQLQuotas(ctx, pg, false) + quotaErrs, fieldErrs := p.checkPostgreSQLQuotas(ctx, newPg, isCreate) if quotaErrs != nil { - allErrs = append(allErrs, &field.Error{ - Field: "quota", - Detail: fmt.Sprintf("quota check failed: %s", - quotaErrs.Error()), - BadValue: "*your namespace quota*", - Type: field.ErrorTypeForbidden, - }) + allErrs = append(allErrs, field.Forbidden(field.NewPath("quota"), fmt.Sprintf("quota check failed: %s", quotaErrs.Error()))) } allErrs = append(allErrs, fieldErrs...) } - instancesError := p.checkGuaranteedAvailability(pg) - allErrs = append(allErrs, instancesError...) + // Validate guaranteed availability + allErrs = append(allErrs, p.checkGuaranteedAvailability(newPg)...) - // longest postfix is 26 chars for the sgbackup object (eg. "-952zx-2024-07-25-12-50-10"). Max SgBackup length is 56, therefore 30 characters is the maximum length - err = p.validateResourceNameLength(pg.GetName(), 30) - if err != nil { - allErrs = append(allErrs, &field.Error{ - Field: ".metadata.name", - Detail: fmt.Sprintf("Please shorten PostgreSQL name, currently it is: %s", - err.Error()), - BadValue: pg.GetName(), - Type: field.ErrorTypeTooLong, - }) + // Validate name length + if err := p.validateResourceNameLength(newPg.GetName(), maxResourceNameLength); err != nil { + allErrs = append(allErrs, field.TooLong(field.NewPath(".metadata.name"), newPg.GetName(), maxResourceNameLength)) } - errList := validatePgConf(pg) - if errList != nil { - allErrs = append(allErrs, errList...) - } + // Validate PostgreSQL configuration + allErrs = append(allErrs, validatePgConf(newPg)...) - // We aggregate and return all errors at the same time. - // So the user is aware of all broken parameters. - // But at the same time, if any of these fail we cannot do proper quota checks anymore. - if len(allErrs) != 0 { - return nil, apierrors.NewInvalid( - pgGK, - pg.GetName(), - allErrs, - ) + if len(allErrs) > 0 { + return nil, apierrors.NewInvalid(pgGK, newPg.GetName(), allErrs) } return nil, nil } -// checkPostgreSQLQuotas will read the plan if it's set and then check if any other size parameters are overwriten +// checkPostgreSQLQuotas will read the plan if it's set and then check if any other size parameters are overwritten func (p *PostgreSQLWebhookHandler) checkPostgreSQLQuotas(ctx context.Context, pg *vshnv1.VSHNPostgreSQL, checkNamespaceQuota bool) (quotaErrs *apierrors.StatusError, fieldErrs field.ErrorList) { var fieldErr *field.Error instances := int64(pg.Spec.Parameters.Instances) resources := utils.Resources{} + // Fetch plans if specified if pg.Spec.Parameters.Size.Plan != "" { var err error resources, err = utils.FetchPlansFromCluster(ctx, p.client, "vshnpostgresqlplans", pg.Spec.Parameters.Size.Plan) @@ -223,27 +153,31 @@ func (p *PostgreSQLWebhookHandler) checkPostgreSQLQuotas(ctx context.Context, pg return apierrors.NewInternalError(err), fieldErrs } } - s, err := utils.FetchSidecarsFromCluster(ctx, p.client, "vshnpostgresqlplans") + + // Fetch sidecars from the cluster + sidecars, err := utils.FetchSidecarsFromCluster(ctx, p.client, "vshnpostgresqlplans") if err != nil { return apierrors.NewInternalError(err), fieldErrs } - resourcesSidecars, err := utils.GetAllSideCarsResources(s) + // Aggregate resources from sidecars + resourcesSidecars, err := utils.GetAllSideCarsResources(sidecars) if err != nil { return apierrors.NewInternalError(err), fieldErrs } p.addPathsToResources(&resources, false) + // Parse and validate resource requests and limits if pg.Spec.Parameters.Size.CPU != "" { - resources.CPULimits, fieldErr = parseResource(resources.CPULimitsPath, pg.Spec.Parameters.Size.CPU, "not a valid cpu size") + resources.CPULimits, fieldErr = parseResource(resources.CPULimitsPath, pg.Spec.Parameters.Size.CPU, "not a valid CPU size") if fieldErr != nil { fieldErrs = append(fieldErrs, fieldErr) } } if pg.Spec.Parameters.Size.Requests.CPU != "" { - resources.CPURequests, fieldErr = parseResource(resources.CPURequestsPath, pg.Spec.Parameters.Size.Requests.CPU, "not a valid cpu size") + resources.CPURequests, fieldErr = parseResource(resources.CPURequestsPath, pg.Spec.Parameters.Size.Requests.CPU, "not a valid CPU size") if fieldErr != nil { fieldErrs = append(fieldErrs, fieldErr) } @@ -264,15 +198,17 @@ func (p *PostgreSQLWebhookHandler) checkPostgreSQLQuotas(ctx context.Context, pg } if pg.Spec.Parameters.Size.Disk != "" { - resources.Disk, fieldErr = parseResource(resources.DiskPath, pg.Spec.Parameters.Size.Disk, "not a valid cpu size") + resources.Disk, fieldErr = parseResource(resources.DiskPath, pg.Spec.Parameters.Size.Disk, "not a valid disk size") if fieldErr != nil { fieldErrs = append(fieldErrs, fieldErr) } } + // Add aggregated sidecar resources resources.AddResources(resourcesSidecars) resources.MultiplyBy(instances) + // Perform quota checks checker := quotas.NewQuotaChecker( p.client, pg.GetName(), @@ -296,20 +232,18 @@ func parseResource(childPath *field.Path, value, errMessage string) (resource.Qu return quantity, nil } -func (p *PostgreSQLWebhookHandler) checkGuaranteedAvailability(pg *vshnv1.VSHNPostgreSQL) (fieldErrs field.ErrorList) { - // service level and instances are verified in the CRD validation, therefore I skip checking them +func (p *PostgreSQLWebhookHandler) checkGuaranteedAvailability(pg *vshnv1.VSHNPostgreSQL) field.ErrorList { + allErrs := field.ErrorList{} if pg.Spec.Parameters.Service.ServiceLevel == "guaranteed" && pg.Spec.Parameters.Instances < 2 { - fieldErrs = append(fieldErrs, &field.Error{ - Field: "spec.parameters.instances", - Detail: "PostgreSQL instances with service level Guaranteed Availability must have at least 2 replicas. Please set .spec.parameters.instances: [2,3]. Additional costs will apply, please refer to: https://products.vshn.ch/appcat/pricing.html", - Type: field.ErrorTypeInvalid, - BadValue: pg.Spec.Parameters.Instances, - }) + allErrs = append(allErrs, field.Invalid( + field.NewPath("spec.parameters.instances"), + pg.Spec.Parameters.Instances, + "PostgreSQL instances with service level Guaranteed Availability must have at least 2 replicas. Please set .spec.parameters.instances: [2,3]. Additional costs will apply, please refer to: https://products.vshn.ch/appcat/pricing.html", + )) } - return fieldErrs + return allErrs } -// validate vacuum and repack settings func validateVacuumRepack(vacuum, repack bool) error { if !vacuum && !repack { return fmt.Errorf("repack cannot be enabled without vacuum") @@ -317,33 +251,52 @@ func validateVacuumRepack(vacuum, repack bool) error { return nil } -func validatePgConf(pg *vshnv1.VSHNPostgreSQL) (fErros field.ErrorList) { - +func validatePgConf(pg *vshnv1.VSHNPostgreSQL) field.ErrorList { + allErrs := field.ErrorList{} pgConfBytes := pg.Spec.Parameters.Service.PostgreSQLSettings - pgConf := map[string]string{} + if pgConfBytes.Raw != nil { - err := json.Unmarshal(pgConfBytes.Raw, &pgConf) - if err != nil { - fErros = append(fErros, &field.Error{ - Field: "spec.parameters.service.postgresqlSettings", - Detail: fmt.Sprintf("Error parsing pgConf: %s", err.Error()), - Type: field.ErrorTypeInvalid, - BadValue: pgConfBytes, - }) - return fErros + if err := json.Unmarshal(pgConfBytes.Raw, &pgConf); err != nil { + return append(allErrs, field.Invalid(field.NewPath("spec.parameters.service.postgresqlSettings"), pgConfBytes, fmt.Sprintf("error parsing pgConf: %v", err))) } } for key := range pgConf { - if _, ok := blocklist[key]; ok { - fErros = append(fErros, &field.Error{ - Field: fmt.Sprintf("spec.parameters.service.postgresqlSettings[%s]", key), - Type: field.ErrorTypeForbidden, - BadValue: key, - Detail: "https://stackgres.io/doc/latest/api/responses/error/#postgres-blocklist", - }) + if _, blocked := blocklist[key]; blocked { + allErrs = append(allErrs, field.Forbidden(field.NewPath(fmt.Sprintf("spec.parameters.service.postgresqlSettings[%s]", key)), "https://stackgres.io/doc/latest/api/responses/error/#postgres-blocklist")) + } + } + + return allErrs +} + +func validateMajorVersionUpgrade(newPg *vshnv1.VSHNPostgreSQL, oldPg *vshnv1.VSHNPostgreSQL) *field.Error { + newVersion, err := strconv.Atoi(newPg.Spec.Parameters.Service.MajorVersion) + if err != nil { + return field.Invalid( + field.NewPath("spec.parameters.service.majorVersion"), + newPg.Spec.Parameters.Service.MajorVersion, + fmt.Sprintf("invalid major version: %s", err.Error()), + ) + } + oldVersion, err := strconv.Atoi(oldPg.Spec.Parameters.Service.MajorVersion) + if err != nil { + return field.Invalid( + field.NewPath("spec.parameters.service.majorVersion"), + oldPg.Spec.Parameters.Service.MajorVersion, + fmt.Sprintf("invalid major version: %s", err.Error()), + ) + } + + // Check if the upgrade is allowed + if newVersion != oldVersion { + if oldVersion != newVersion-1 { + return field.Forbidden( + field.NewPath("spec.parameters.service.majorVersion"), + "only one major version upgrade at a time is allowed", + ) } } - return fErros + return nil }