Skip to content

Commit

Permalink
CPDEV-108110. Pass quoted dbs and new API contract
Browse files Browse the repository at this point in the history
  • Loading branch information
alty1224 committed Dec 25, 2024
1 parent 895a55e commit 7a74d43
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 20 deletions.
60 changes: 40 additions & 20 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ func (bp BackupProvider) RestorationBackupHandler(repo string, basePath string)
return
}

changedNameDb, err := bp.ProcessRestorationRequest(backupID, req, ctx)
response := bp.TrackRestore(backupID, ctx, changedNameDb)
changedNameDb, err, trackId := bp.ProcessRestorationRequest(backupID, req, ctx)
response := bp.TrackRestore(trackId, ctx, changedNameDb)
if err != nil {
logger.ErrorContext(ctx, "Failed to restore backup", slog.Any("error", err))
w.WriteHeader(http.StatusInternalServerError)
Expand Down Expand Up @@ -504,30 +504,38 @@ func (bp BackupProvider) RestoreBackup(backupId string, dbs []string, fromRepo s
return nil, err
}

func (bp BackupProvider) ProcessRestorationRequest(backupId string, restorationRequest RestorationRequest, ctx context.Context) (map[string]string, error) {
func (bp BackupProvider) ProcessRestorationRequest(backupId string, restorationRequest RestorationRequest, ctx context.Context) (map[string]string, error, string) {
if len(restorationRequest.Databases) == 0 {
logger.ErrorContext(ctx, "Databases to restore are not specified")
return nil, errors.New("database to restore are not specified")
return nil, errors.New("database to restore are not specified"), ""
}
var renames, dbs []string
var changedDbNames map[string]string
prefixes := make(map[string]struct{})
for _, dabatase := range restorationRequest.Databases {
dbs = append(dbs, fmt.Sprintf(`"%s"`, dabatase.Name))
if restorationRequest.RegenerateNames {
if dabatase.Prefix != "" {
if ok, err := bp.checkPrefixUniqueness(dabatase.Prefix, ctx); ok {
if err != nil {
return nil, err
return nil, err, ""
}
renames = append(renames, fmt.Sprintf("%s:%s", dabatase.Name, dabatase.Prefix))
}
} else {
prefix, err := core.PrepareDatabaseName(dabatase.Namespace, dabatase.Microservice, 64)
if _, ok := prefixes[prefix]; ok {
// Make an artificial delay for prefix creation, since it happens too fast
// currently we can't include nanoseconds into pattern for prefix creation
time.Sleep(1 * time.Millisecond)
prefix, err = core.PrepareDatabaseName(dabatase.Namespace, dabatase.Microservice, 64)
}
if err != nil {
logger.ErrorContext(ctx, fmt.Sprintf("Failed to regenerate name for provided database: %v", dabatase), slog.Any("error", err))
return nil, err
return nil, err, ""
}
renames = append(renames, fmt.Sprintf("%s:%s", dabatase.Name, prefix))
prefixes[prefix] = struct{}{}
}
}
}
Expand All @@ -538,19 +546,22 @@ func (bp BackupProvider) ProcessRestorationRequest(backupId string, restorationR
changedDbNames[parts[0]] = parts[1]
}
}
err := bp.requestRestoration(ctx, dbs, backupId, renames)
return changedDbNames, err
err, trackId := bp.requestRestoration(ctx, dbs, backupId, renames)
if err != nil {
return nil, err, trackId
}
return changedDbNames, err, trackId
}

func (bp BackupProvider) TrackRestore(backupId string, ctx context.Context, changedNameDb map[string]string) ActionTrack {
logger.InfoContext(ctx, fmt.Sprintf("Request to track '%s' restoration is received", backupId))
jobStatus, err := bp.getJobStatus(backupId, ctx)
func (bp BackupProvider) TrackRestore(trackId string, ctx context.Context, changedNameDb map[string]string) ActionTrack {
logger.InfoContext(ctx, fmt.Sprintf("Request to track '%s' restoration is received", trackId))
jobStatus, err := bp.getJobStatus(trackId, ctx)
if err != nil {
logger.ErrorContext(ctx, "Failed to find snapshot", slog.Any("error", err))
return backupTrack(backupId, "FAIL")
return backupTrack(trackId, "FAIL")
}
logger.DebugContext(ctx, fmt.Sprintf("'%s' backup status is %s", backupId, jobStatus))
return restoreTrack(backupId, jobStatus, changedNameDb)
logger.DebugContext(ctx, fmt.Sprintf("'%s' backup status is %s", trackId, jobStatus))
return restoreTrack(trackId, jobStatus, changedNameDb)
}
func (bp BackupProvider) checkPrefixUniqueness(prefix string, ctx context.Context) (bool, error) {
logger.InfoContext(ctx, "Checking user prefix uniqueness during restoration with renaming")
Expand Down Expand Up @@ -622,7 +633,8 @@ func (bp BackupProvider) TrackRestoreIndices(ctx context.Context, backupId strin
func (bp BackupProvider) requestRestore(ctx context.Context, dbs []string, backupId string, pattern, replacement string) error {
body := strings.NewReader(fmt.Sprintf(`
{
"vault": "%s",
"vault": "%s",
"skip_users_recovery": "true",
"dbs": ["%s"]
%s
}
Expand All @@ -638,10 +650,11 @@ func (bp BackupProvider) requestRestore(ctx context.Context, dbs []string, backu
return nil
}

func (bp BackupProvider) requestRestoration(ctx context.Context, dbs []string, backupId string, replacement []string) error {
func (bp BackupProvider) requestRestoration(ctx context.Context, dbs []string, backupId string, replacement []string) (error, string) {
body := strings.NewReader(fmt.Sprintf(`
{
"vault": "%s",
"vault": "%s",
"skip_users_recovery": "true",
"dbs": [%s]
%s
}
Expand All @@ -651,10 +664,17 @@ func (bp BackupProvider) requestRestoration(ctx context.Context, dbs []string, b
logger.DebugContext(ctx, fmt.Sprintf("Request body built to restore '%s' backup: %v", backupId, body))
response, err := bp.Curator.client.Do(request)
if err != nil {
return err
return err, ""
}
logger.InfoContext(ctx, fmt.Sprintf("'%s' snapshot restoration is started: %s", backupId, response.Body))
return nil
defer response.Body.Close()
trackId, err := io.ReadAll(response.Body)
if err != nil {
logger.ErrorContext(ctx, "Error reading body", "error", err)
return err, ""
}

logger.InfoContext(ctx, fmt.Sprintf("'%s' snapshot restoration is started: %s", backupId, trackId))
return nil, string(trackId)
}

func (bp BackupProvider) prepareRestoreRequest(ctx context.Context, url string, body io.Reader) *http.Request {
Expand Down
6 changes: 6 additions & 0 deletions basic/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,12 @@ func (bp BaseProvider) createDatabase(requestOnCreateDb DbCreateRequest, ctx con
}
}

if ok, err := common.CheckPrefixUniqueness(prefix, ctx, bp.opensearch.Client); !ok {
if err != nil {
return nil, err
}
}

resourcesToCreate := requestOnCreateDb.Settings.CreateOnly
if len(resourcesToCreate) == 0 {
resourcesToCreate = []string{common.UserKind, common.IndexKind}
Expand Down
38 changes: 38 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"git.netcracker.com/PROD.Platform.ElasticStack/dbaas-opensearch-adapter/api"
"io"
"io/fs"
"log"
Expand Down Expand Up @@ -56,6 +57,7 @@ const (

var logger = GetLogger()
var BasePath = GetBasePath()
var resourcePrefixAttributeName = "resource_prefix"

type Component struct {
Address string `json:"address"`
Expand Down Expand Up @@ -89,6 +91,12 @@ type CustomLogHandler struct {
l *log.Logger
}

type User struct {
Attributes map[string]string `json:"attributes,omitempty"`
Hash string `json:"hash"`
Roles []string `json:"backend_roles"`
}

func GetBasePath() string {
return fmt.Sprintf("/api/%s/dbaas/adapter/opensearch", GetEnv("API_VERSION", ApiV2))
}
Expand Down Expand Up @@ -205,3 +213,33 @@ func PrepareContext(r *http.Request) context.Context {
}
return context.WithValue(r.Context(), RequestIdKey, requestId)
}

func CheckPrefixUniqueness(prefix string, ctx context.Context, opensearchcli Client) (bool, error) {
logger.InfoContext(ctx, "Checking user prefix uniqueness during restoration with renaming")
getUsersRequest := api.GetUsersRequest{}
response, err := getUsersRequest.Do(context.Background(), opensearchcli)
if err != nil {
return false, fmt.Errorf("failed to receive users: %+v", err)
}
defer response.Body.Close()
if response.StatusCode == http.StatusOK {
var users map[string]User
err = ProcessBody(response.Body, &users)
if err != nil {
return false, err
}
for element, user := range users {
if strings.HasPrefix(element, prefix) {
logger.ErrorContext(ctx, fmt.Sprintf("provided prefix already exists or a part of another prefix: %+v", prefix))
return false, fmt.Errorf("provided prefix already exists or a part of another prefix: %+v", prefix)
}
if user.Attributes[resourcePrefixAttributeName] != "" && strings.HasPrefix(user.Attributes[resourcePrefixAttributeName], prefix) {
logger.ErrorContext(ctx, fmt.Sprintf("provided prefix already exists or a part of another prefix: %+v", prefix))
return false, fmt.Errorf("provided prefix already exists or a part of another prefix: %+v", prefix)
}
}
} else if response.StatusCode == http.StatusNotFound {
return true, nil
}
return true, nil
}

0 comments on commit 7a74d43

Please sign in to comment.