Skip to content

Commit

Permalink
Support new DBaaS contract and restoration api
Browse files Browse the repository at this point in the history
  • Loading branch information
paia0720 committed Dec 9, 2024
1 parent f5b0053 commit 6b7c0f1
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 15 deletions.
194 changes: 180 additions & 14 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"encoding/json"
"errors"
"fmt"
core "github.com/Netcracker/dbaas-adapter-core/pkg/utils"
"github.com/Netcracker/dbaas-opensearch-adapter/api"
"github.com/Netcracker/dbaas-opensearch-adapter/basic"
"io"
"log/slog"
"net/http"
Expand All @@ -16,7 +19,10 @@ import (
"github.com/opensearch-project/opensearch-go/opensearchapi"
)

var logger = common.GetLogger()
var (
logger = common.GetLogger()
resourcePrefixAttributeName = "resource_prefix"
)

type Repository struct {
Status int `json:"status"`
Expand All @@ -40,6 +46,18 @@ type JobStatus struct {
TaskId string `json:"trackPath"`
}

type Database struct {
Namespace string `json:"namespace"`
Microservice string `json:"microservice"`
Name string `json:"name"`
Prefix string `json:"prefix,omitempty"`
}

type RestorationRequest struct {
Databases []Database `json:"databases"`
RegenerateNames bool `json:"regenerateNames,omitempty"`
}

type TrackDetails struct {
LocalId string `json:"localId"`
}
Expand Down Expand Up @@ -210,7 +228,7 @@ func (bp BackupProvider) RestoreBackupHandler(repo string, basePath string) func
_, _ = w.Write([]byte(err.Error()))
return
}
response := bp.TrackRestore(backupID, ctx)
response := bp.TrackRestore(backupID, ctx, changedNameDb)
if regenerateNames {
indices, err := bp.getActualIndices(backupID, repo, changedNameDb, ctx)
if err != nil {
Expand Down Expand Up @@ -238,13 +256,55 @@ func (bp BackupProvider) RestoreBackupHandler(repo string, basePath string) func
}
}

func (bp BackupProvider) RestorationBackupHandler(repo string, basePath string) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
ctx := common.PrepareContext(r)
vars := mux.Vars(r)
backupID := vars["backupID"]
logger.InfoContext(ctx, fmt.Sprintf("Request to restore '%s' backup is received", backupID))
body, err := io.ReadAll(r.Body)
if err != nil {
logger.ErrorContext(ctx, "Failed to decode request body", slog.Any("error", err))
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
defer r.Body.Close()
var req RestorationRequest
err = json.Unmarshal(body, &req)
if err != nil {
logger.ErrorContext(ctx, "Failed to unmarshal request from JSON", slog.Any("error", err))
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}

changedNameDb, err := bp.ProcessRestorationRequest(backupID, req, ctx)
response := bp.TrackRestore(backupID, ctx, changedNameDb)
if err != nil {
logger.ErrorContext(ctx, "Failed to restore backup", slog.Any("error", err))
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
responseBody, err := json.Marshal(response)
if err != nil {
logger.ErrorContext(ctx, "Failed to marshal response to JSON", slog.Any("error", err))
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
_, _ = w.Write(responseBody)
}
}

func (bp BackupProvider) TrackRestoreFromTrackIdHandler(fromRepo string) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
ctx := common.PrepareContext(r)
logger.InfoContext(ctx, fmt.Sprintf("Request to track restore in '%s' in '%s' repository is received", r.URL.Path, fromRepo))
vars := mux.Vars(r)
backupID := vars["backupID"]
response := bp.TrackRestore(backupID, ctx)
response := bp.TrackRestore(backupID, ctx, nil)
responseBody, err := json.Marshal(response)
if err != nil {
logger.ErrorContext(ctx, "Failed to marshal response to JSON", slog.Any("error", err))
Expand Down Expand Up @@ -279,10 +339,15 @@ func (bp BackupProvider) TrackRestoreFromIndicesHandler(fromRepo string) func(w
func (bp BackupProvider) CollectBackup(dbs []string, ctx context.Context) (string, error) {
var body *strings.Reader
if len(dbs) != 0 {
quotedDbs := make([]string, len(dbs))
for i, db := range dbs {
quotedDbs[i] = fmt.Sprintf(`"%s"`, db)
}
body = strings.NewReader(fmt.Sprintf(`
{
"dbs": ["%s"]
}`, strings.Join(dbs, ",")))
{
"allow_eviction":"False",
"dbs": [%s]
}`, strings.Join(quotedDbs, ",")))
}
url := fmt.Sprintf("%s/%s", bp.Curator.url, "backup")
request, err := http.NewRequest(http.MethodPost, url, body)
Expand Down Expand Up @@ -427,15 +492,80 @@ func (bp BackupProvider) RestoreBackup(backupId string, dbs []string, fromRepo s
return nil, err
}

func (bp BackupProvider) TrackRestore(backupId string, ctx context.Context) ActionTrack {
func (bp BackupProvider) ProcessRestorationRequest(backupId string, restorationRequest RestorationRequest, ctx context.Context) (map[string]string, error) {
if len(restorationRequest.Databases) == 0 {
logger.ErrorContext(ctx, "Databases to restore are not specified")
return nil, errors.New("database to restore are not specified")
}
var renames, dbs []string
var changedDbNames map[string]string
for _, dabatase := range restorationRequest.Databases {
dbs = append(dbs, fmt.Sprintf(`"%s"`, dabatase.Name))
if restorationRequest.RegenerateNames {
if dabatase.Prefix != "" {
if ok, err := bp.checkPrefixUniqueness(dabatase.Prefix, ctx); ok {
if err != nil {
return nil, err
}
renames = append(renames, fmt.Sprintf("%s:%s", dabatase.Name, dabatase.Prefix))
}
} else {
prefix, err := core.PrepareDatabaseName(dabatase.Namespace, dabatase.Microservice, 64)
if err != nil {
logger.ErrorContext(ctx, fmt.Sprintf("Failed to regenerate name for provided database: %v", dabatase), slog.Any("error", err))
return nil, err
}
renames = append(renames, fmt.Sprintf("%s:%s", dabatase.Name, prefix))
}
}
}
if len(renames) != 0 {
changedDbNames = make(map[string]string)
for _, pair := range renames {
parts := strings.Split(pair, ":")
changedDbNames[parts[0]] = parts[1]
}
}
err := bp.requestRestoration(ctx, dbs, backupId, renames)
return changedDbNames, err
}

func (bp BackupProvider) TrackRestore(backupId string, ctx context.Context, changedNameDb map[string]string) ActionTrack {
logger.InfoContext(ctx, fmt.Sprintf("Request to track '%s' restoration is received", backupId))
jobStatus, err := bp.getJobStatus(backupId, ctx)
if err != nil {
logger.ErrorContext(ctx, "Failed to find snapshot", slog.Any("error", err))
return backupTrack(backupId, "FAIL")
}
logger.DebugContext(ctx, fmt.Sprintf("'%s' backup status is %s", backupId, jobStatus))
return restoreTrack(backupId, jobStatus, nil)
return restoreTrack(backupId, jobStatus, changedNameDb)
}
func (bp BackupProvider) checkPrefixUniqueness(prefix string, ctx context.Context) (bool, error) {
logger.InfoContext(ctx, "Checking user prefix uniqueness during restoration with renaming")
getUsersRequest := api.GetUsersRequest{}
response, err := getUsersRequest.Do(context.Background(), bp.client)
if err != nil {
return false, fmt.Errorf("failed to receive users: %+v", err)
}
defer response.Body.Close()
if response.StatusCode == http.StatusOK {
var users map[string]basic.User
err = common.ProcessBody(response.Body, &users)
if err != nil {
return false, err
}
for element, user := range users {
if strings.HasPrefix(element, prefix) {
return false, fmt.Errorf("provided prefix already exists or a part of another prefix: %+v", prefix)
}
if user.Attributes[resourcePrefixAttributeName] != "" && strings.HasPrefix(user.Attributes[resourcePrefixAttributeName], prefix) {
return false, fmt.Errorf("provided prefix already exists or a part of another prefix: %+v", prefix)
}
}
} else if response.StatusCode == http.StatusNotFound {
return true, nil
}
return true, nil
}

// TrackRestoreIndices We keep this logic, but first we need to fix the problem with users and regenerate names for indexes, until then it will not work incorrectly.
Expand Down Expand Up @@ -487,14 +617,26 @@ func (bp BackupProvider) requestRestore(ctx context.Context, dbs []string, backu
}
`, backupId, strings.Join(dbs, ","), namesRegenerateRequestPart(pattern, replacement)))
url := fmt.Sprintf("%s/%s", bp.Curator.url, "restore")
request, err := http.NewRequest(http.MethodPost, url, body)
request := bp.prepareRestoreRequest(ctx, url, body)
logger.DebugContext(ctx, fmt.Sprintf("Request body built to restore '%s' backup: %v", backupId, body))
response, err := bp.Curator.client.Do(request)
if err != nil {
logger.ErrorContext(ctx, "Failed to prepare request to restore backup", slog.Any("error", err))
panic(err)
return err
}
request.Header.Set("Content-Type", "application/json")
request.Header.Set(common.RequestIdKey, ctx.Value(common.RequestIdKey).(string))
request.SetBasicAuth(bp.Curator.username, bp.Curator.password)
logger.InfoContext(ctx, fmt.Sprintf("'%s' snapshot restoration is started: %s", backupId, response.Body))
return nil
}

func (bp BackupProvider) requestRestoration(ctx context.Context, dbs []string, backupId string, replacement []string) error {
body := strings.NewReader(fmt.Sprintf(`
{
"vault": "%s",
"dbs": [%s]
%s
}
`, backupId, strings.Join(dbs, ","), prepareChangeNameRequestPart(replacement)))
url := fmt.Sprintf("%s/%s", bp.Curator.url, "restore")
request := bp.prepareRestoreRequest(ctx, url, body)
logger.DebugContext(ctx, fmt.Sprintf("Request body built to restore '%s' backup: %v", backupId, body))
response, err := bp.Curator.client.Do(request)
if err != nil {
Expand All @@ -504,6 +646,18 @@ func (bp BackupProvider) requestRestore(ctx context.Context, dbs []string, backu
return nil
}

func (bp BackupProvider) prepareRestoreRequest(ctx context.Context, url string, body io.Reader) *http.Request {
request, err := http.NewRequest(http.MethodPost, url, body)
if err != nil {
logger.ErrorContext(ctx, "Failed to prepare request to restore backup", slog.Any("error", err))
panic(err)
}
request.Header.Set("Content-Type", "application/json")
request.Header.Set(common.RequestIdKey, ctx.Value(common.RequestIdKey).(string))
request.SetBasicAuth(bp.Curator.username, bp.Curator.password)
return request
}

func (bp BackupProvider) getJobStatus(snapshotName string, ctx context.Context) (string, error) {
url := fmt.Sprintf("%s/%s/%s", bp.Curator.url, "jobstatus", snapshotName)
request, err := http.NewRequest(http.MethodGet, url, nil)
Expand Down Expand Up @@ -617,3 +771,15 @@ func namesRegenerateRequestPart(pattern string, replacement string) string {
"rename_replacement": "%s"
`, pattern, replacement)
}

func prepareChangeNameRequestPart(renames []string) string {
if len(renames) == 0 {
return ""
}
entries := make([]string, len(renames))
for i, pair := range renames {
parts := strings.Split(pair, ":")
entries[i] = fmt.Sprintf(`"%s":"%s"`, parts[0], parts[1])
}
return fmt.Sprintf(`,"changeDbNames": {%s}`, strings.Join(entries, ","))
}
10 changes: 9 additions & 1 deletion physical/physical.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,15 @@ func (rs *RegistrationProvider) prepareRequestParameters(ctx context.Context) (s
func (rs *RegistrationProvider) modifyReqParams(request *dao.PhysicalDatabaseRegistrationRequest) {
if rs.ApiVersion == common.ApiV2 {
request.Metadata = dao.Metadata{
ApiVersion: dao.ApiVersion(rs.ApiVersion),
ApiVersion: dao.ApiVersion(rs.ApiVersion),
ApiVersions: dao.ApiVersions{Specs: []dao.ApiVersionsSpec{
{
SpecRootUrl: dao.RootUrl,
Major: dao.MajorAPIVersion,
Minor: dao.MinorAPIVersion,
SupportedMajors: dao.SupportedMajorsVersions,
},
}},
SupportedRoles: rs.baseProvider.GetSupportedRoleTypes(),
Features: map[string]bool{
"multiusers": true,
Expand Down
4 changes: 4 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func Handlers(adapter common.Component) http.Handler {
handlers.LoggingHandler(os.Stdout, authorizer(backupProvider.RestoreBackupHandler(opensearchRepo, basePath))),
).Methods(http.MethodPost)

r.Handle(fmt.Sprintf("%s/backups/{backupID}/restoration", basePath),
handlers.LoggingHandler(os.Stdout, authorizer(backupProvider.RestorationBackupHandler(opensearchRepo, basePath))),
).Methods(http.MethodPost)

r.Handle(fmt.Sprintf("%s/backups/track/backup/{backupID}", basePath),
handlers.LoggingHandler(os.Stdout, authorizer(backupProvider.TrackBackupHandler())),
).Methods(http.MethodGet)
Expand Down

0 comments on commit 6b7c0f1

Please sign in to comment.