From 7a74d43e1c01410f53a6aaac1dd1029ab023cfb3 Mon Sep 17 00:00:00 2001 From: alty1224 Date: Wed, 25 Dec 2024 14:21:08 +0500 Subject: [PATCH] CPDEV-108110. Pass quoted dbs and new API contract --- backup/backup.go | 60 ++++++++++++++++++++++++++++++++---------------- basic/basic.go | 6 +++++ common/common.go | 38 ++++++++++++++++++++++++++++++ 3 files changed, 84 insertions(+), 20 deletions(-) diff --git a/backup/backup.go b/backup/backup.go index 01ae711..e3faf5c 100644 --- a/backup/backup.go +++ b/backup/backup.go @@ -293,8 +293,8 @@ func (bp BackupProvider) RestorationBackupHandler(repo string, basePath string) return } - changedNameDb, err := bp.ProcessRestorationRequest(backupID, req, ctx) - response := bp.TrackRestore(backupID, ctx, changedNameDb) + changedNameDb, err, trackId := bp.ProcessRestorationRequest(backupID, req, ctx) + response := bp.TrackRestore(trackId, ctx, changedNameDb) if err != nil { logger.ErrorContext(ctx, "Failed to restore backup", slog.Any("error", err)) w.WriteHeader(http.StatusInternalServerError) @@ -504,30 +504,38 @@ func (bp BackupProvider) RestoreBackup(backupId string, dbs []string, fromRepo s return nil, err } -func (bp BackupProvider) ProcessRestorationRequest(backupId string, restorationRequest RestorationRequest, ctx context.Context) (map[string]string, error) { +func (bp BackupProvider) ProcessRestorationRequest(backupId string, restorationRequest RestorationRequest, ctx context.Context) (map[string]string, error, string) { if len(restorationRequest.Databases) == 0 { logger.ErrorContext(ctx, "Databases to restore are not specified") - return nil, errors.New("database to restore are not specified") + return nil, errors.New("database to restore are not specified"), "" } var renames, dbs []string var changedDbNames map[string]string + prefixes := make(map[string]struct{}) for _, dabatase := range restorationRequest.Databases { dbs = append(dbs, fmt.Sprintf(`"%s"`, dabatase.Name)) if restorationRequest.RegenerateNames { if dabatase.Prefix != "" { if ok, err := bp.checkPrefixUniqueness(dabatase.Prefix, ctx); ok { if err != nil { - return nil, err + return nil, err, "" } renames = append(renames, fmt.Sprintf("%s:%s", dabatase.Name, dabatase.Prefix)) } } else { prefix, err := core.PrepareDatabaseName(dabatase.Namespace, dabatase.Microservice, 64) + if _, ok := prefixes[prefix]; ok { + // Make an artificial delay for prefix creation, since it happens too fast + // currently we can't include nanoseconds into pattern for prefix creation + time.Sleep(1 * time.Millisecond) + prefix, err = core.PrepareDatabaseName(dabatase.Namespace, dabatase.Microservice, 64) + } if err != nil { logger.ErrorContext(ctx, fmt.Sprintf("Failed to regenerate name for provided database: %v", dabatase), slog.Any("error", err)) - return nil, err + return nil, err, "" } renames = append(renames, fmt.Sprintf("%s:%s", dabatase.Name, prefix)) + prefixes[prefix] = struct{}{} } } } @@ -538,19 +546,22 @@ func (bp BackupProvider) ProcessRestorationRequest(backupId string, restorationR changedDbNames[parts[0]] = parts[1] } } - err := bp.requestRestoration(ctx, dbs, backupId, renames) - return changedDbNames, err + err, trackId := bp.requestRestoration(ctx, dbs, backupId, renames) + if err != nil { + return nil, err, trackId + } + return changedDbNames, err, trackId } -func (bp BackupProvider) TrackRestore(backupId string, ctx context.Context, changedNameDb map[string]string) ActionTrack { - logger.InfoContext(ctx, fmt.Sprintf("Request to track '%s' restoration is received", backupId)) - jobStatus, err := bp.getJobStatus(backupId, ctx) +func (bp BackupProvider) TrackRestore(trackId string, ctx context.Context, changedNameDb map[string]string) ActionTrack { + logger.InfoContext(ctx, fmt.Sprintf("Request to track '%s' restoration is received", trackId)) + jobStatus, err := bp.getJobStatus(trackId, ctx) if err != nil { logger.ErrorContext(ctx, "Failed to find snapshot", slog.Any("error", err)) - return backupTrack(backupId, "FAIL") + return backupTrack(trackId, "FAIL") } - logger.DebugContext(ctx, fmt.Sprintf("'%s' backup status is %s", backupId, jobStatus)) - return restoreTrack(backupId, jobStatus, changedNameDb) + logger.DebugContext(ctx, fmt.Sprintf("'%s' backup status is %s", trackId, jobStatus)) + return restoreTrack(trackId, jobStatus, changedNameDb) } func (bp BackupProvider) checkPrefixUniqueness(prefix string, ctx context.Context) (bool, error) { logger.InfoContext(ctx, "Checking user prefix uniqueness during restoration with renaming") @@ -622,7 +633,8 @@ func (bp BackupProvider) TrackRestoreIndices(ctx context.Context, backupId strin func (bp BackupProvider) requestRestore(ctx context.Context, dbs []string, backupId string, pattern, replacement string) error { body := strings.NewReader(fmt.Sprintf(` { - "vault": "%s", + "vault": "%s", + "skip_users_recovery": "true", "dbs": ["%s"] %s } @@ -638,10 +650,11 @@ func (bp BackupProvider) requestRestore(ctx context.Context, dbs []string, backu return nil } -func (bp BackupProvider) requestRestoration(ctx context.Context, dbs []string, backupId string, replacement []string) error { +func (bp BackupProvider) requestRestoration(ctx context.Context, dbs []string, backupId string, replacement []string) (error, string) { body := strings.NewReader(fmt.Sprintf(` { - "vault": "%s", + "vault": "%s", + "skip_users_recovery": "true", "dbs": [%s] %s } @@ -651,10 +664,17 @@ func (bp BackupProvider) requestRestoration(ctx context.Context, dbs []string, b logger.DebugContext(ctx, fmt.Sprintf("Request body built to restore '%s' backup: %v", backupId, body)) response, err := bp.Curator.client.Do(request) if err != nil { - return err + return err, "" } - logger.InfoContext(ctx, fmt.Sprintf("'%s' snapshot restoration is started: %s", backupId, response.Body)) - return nil + defer response.Body.Close() + trackId, err := io.ReadAll(response.Body) + if err != nil { + logger.ErrorContext(ctx, "Error reading body", "error", err) + return err, "" + } + + logger.InfoContext(ctx, fmt.Sprintf("'%s' snapshot restoration is started: %s", backupId, trackId)) + return nil, string(trackId) } func (bp BackupProvider) prepareRestoreRequest(ctx context.Context, url string, body io.Reader) *http.Request { diff --git a/basic/basic.go b/basic/basic.go index a77fd62..a52a56e 100644 --- a/basic/basic.go +++ b/basic/basic.go @@ -326,6 +326,12 @@ func (bp BaseProvider) createDatabase(requestOnCreateDb DbCreateRequest, ctx con } } + if ok, err := common.CheckPrefixUniqueness(prefix, ctx, bp.opensearch.Client); !ok { + if err != nil { + return nil, err + } + } + resourcesToCreate := requestOnCreateDb.Settings.CreateOnly if len(resourcesToCreate) == 0 { resourcesToCreate = []string{common.UserKind, common.IndexKind} diff --git a/common/common.go b/common/common.go index 8b809f1..deee308 100644 --- a/common/common.go +++ b/common/common.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "git.netcracker.com/PROD.Platform.ElasticStack/dbaas-opensearch-adapter/api" "io" "io/fs" "log" @@ -56,6 +57,7 @@ const ( var logger = GetLogger() var BasePath = GetBasePath() +var resourcePrefixAttributeName = "resource_prefix" type Component struct { Address string `json:"address"` @@ -89,6 +91,12 @@ type CustomLogHandler struct { l *log.Logger } +type User struct { + Attributes map[string]string `json:"attributes,omitempty"` + Hash string `json:"hash"` + Roles []string `json:"backend_roles"` +} + func GetBasePath() string { return fmt.Sprintf("/api/%s/dbaas/adapter/opensearch", GetEnv("API_VERSION", ApiV2)) } @@ -205,3 +213,33 @@ func PrepareContext(r *http.Request) context.Context { } return context.WithValue(r.Context(), RequestIdKey, requestId) } + +func CheckPrefixUniqueness(prefix string, ctx context.Context, opensearchcli Client) (bool, error) { + logger.InfoContext(ctx, "Checking user prefix uniqueness during restoration with renaming") + getUsersRequest := api.GetUsersRequest{} + response, err := getUsersRequest.Do(context.Background(), opensearchcli) + if err != nil { + return false, fmt.Errorf("failed to receive users: %+v", err) + } + defer response.Body.Close() + if response.StatusCode == http.StatusOK { + var users map[string]User + err = ProcessBody(response.Body, &users) + if err != nil { + return false, err + } + for element, user := range users { + if strings.HasPrefix(element, prefix) { + logger.ErrorContext(ctx, fmt.Sprintf("provided prefix already exists or a part of another prefix: %+v", prefix)) + return false, fmt.Errorf("provided prefix already exists or a part of another prefix: %+v", prefix) + } + if user.Attributes[resourcePrefixAttributeName] != "" && strings.HasPrefix(user.Attributes[resourcePrefixAttributeName], prefix) { + logger.ErrorContext(ctx, fmt.Sprintf("provided prefix already exists or a part of another prefix: %+v", prefix)) + return false, fmt.Errorf("provided prefix already exists or a part of another prefix: %+v", prefix) + } + } + } else if response.StatusCode == http.StatusNotFound { + return true, nil + } + return true, nil +}