Skip to content

Commit

Permalink
Migration commits
Browse files Browse the repository at this point in the history
Co-authored-by: alty1224 <alnur.tynyshbek@netcracker.com>
  • Loading branch information
alnur05 and alty1224 authored Dec 27, 2024
1 parent 895a55e commit e667943
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 36 deletions.
60 changes: 40 additions & 20 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ func (bp BackupProvider) RestorationBackupHandler(repo string, basePath string)
return
}

changedNameDb, err := bp.ProcessRestorationRequest(backupID, req, ctx)
response := bp.TrackRestore(backupID, ctx, changedNameDb)
changedNameDb, err, trackId := bp.ProcessRestorationRequest(backupID, req, ctx)
response := bp.TrackRestore(trackId, ctx, changedNameDb)
if err != nil {
logger.ErrorContext(ctx, "Failed to restore backup", slog.Any("error", err))
w.WriteHeader(http.StatusInternalServerError)
Expand Down Expand Up @@ -504,30 +504,38 @@ func (bp BackupProvider) RestoreBackup(backupId string, dbs []string, fromRepo s
return nil, err
}

func (bp BackupProvider) ProcessRestorationRequest(backupId string, restorationRequest RestorationRequest, ctx context.Context) (map[string]string, error) {
func (bp BackupProvider) ProcessRestorationRequest(backupId string, restorationRequest RestorationRequest, ctx context.Context) (map[string]string, error, string) {
if len(restorationRequest.Databases) == 0 {
logger.ErrorContext(ctx, "Databases to restore are not specified")
return nil, errors.New("database to restore are not specified")
return nil, errors.New("database to restore are not specified"), ""
}
var renames, dbs []string
var changedDbNames map[string]string
prefixes := make(map[string]struct{})
for _, dabatase := range restorationRequest.Databases {
dbs = append(dbs, fmt.Sprintf(`"%s"`, dabatase.Name))
if restorationRequest.RegenerateNames {
if dabatase.Prefix != "" {
if ok, err := bp.checkPrefixUniqueness(dabatase.Prefix, ctx); ok {
if err != nil {
return nil, err
return nil, err, ""
}
renames = append(renames, fmt.Sprintf("%s:%s", dabatase.Name, dabatase.Prefix))
}
} else {
prefix, err := core.PrepareDatabaseName(dabatase.Namespace, dabatase.Microservice, 64)
if _, ok := prefixes[prefix]; ok {
// Make an artificial delay for prefix creation, since it happens too fast
// currently we can't include nanoseconds into pattern for prefix creation
time.Sleep(1 * time.Millisecond)
prefix, err = core.PrepareDatabaseName(dabatase.Namespace, dabatase.Microservice, 64)
}
if err != nil {
logger.ErrorContext(ctx, fmt.Sprintf("Failed to regenerate name for provided database: %v", dabatase), slog.Any("error", err))
return nil, err
return nil, err, ""
}
renames = append(renames, fmt.Sprintf("%s:%s", dabatase.Name, prefix))
prefixes[prefix] = struct{}{}
}
}
}
Expand All @@ -538,19 +546,22 @@ func (bp BackupProvider) ProcessRestorationRequest(backupId string, restorationR
changedDbNames[parts[0]] = parts[1]
}
}
err := bp.requestRestoration(ctx, dbs, backupId, renames)
return changedDbNames, err
err, trackId := bp.requestRestoration(ctx, dbs, backupId, renames)
if err != nil {
return nil, err, trackId
}
return changedDbNames, err, trackId
}

func (bp BackupProvider) TrackRestore(backupId string, ctx context.Context, changedNameDb map[string]string) ActionTrack {
logger.InfoContext(ctx, fmt.Sprintf("Request to track '%s' restoration is received", backupId))
jobStatus, err := bp.getJobStatus(backupId, ctx)
func (bp BackupProvider) TrackRestore(trackId string, ctx context.Context, changedNameDb map[string]string) ActionTrack {
logger.InfoContext(ctx, fmt.Sprintf("Request to track '%s' restoration is received", trackId))
jobStatus, err := bp.getJobStatus(trackId, ctx)
if err != nil {
logger.ErrorContext(ctx, "Failed to find snapshot", slog.Any("error", err))
return backupTrack(backupId, "FAIL")
return backupTrack(trackId, "FAIL")
}
logger.DebugContext(ctx, fmt.Sprintf("'%s' backup status is %s", backupId, jobStatus))
return restoreTrack(backupId, jobStatus, changedNameDb)
logger.DebugContext(ctx, fmt.Sprintf("'%s' backup status is %s", trackId, jobStatus))
return restoreTrack(trackId, jobStatus, changedNameDb)
}
func (bp BackupProvider) checkPrefixUniqueness(prefix string, ctx context.Context) (bool, error) {
logger.InfoContext(ctx, "Checking user prefix uniqueness during restoration with renaming")
Expand Down Expand Up @@ -622,7 +633,8 @@ func (bp BackupProvider) TrackRestoreIndices(ctx context.Context, backupId strin
func (bp BackupProvider) requestRestore(ctx context.Context, dbs []string, backupId string, pattern, replacement string) error {
body := strings.NewReader(fmt.Sprintf(`
{
"vault": "%s",
"vault": "%s",
"skip_users_recovery": "true",
"dbs": ["%s"]
%s
}
Expand All @@ -638,10 +650,11 @@ func (bp BackupProvider) requestRestore(ctx context.Context, dbs []string, backu
return nil
}

func (bp BackupProvider) requestRestoration(ctx context.Context, dbs []string, backupId string, replacement []string) error {
func (bp BackupProvider) requestRestoration(ctx context.Context, dbs []string, backupId string, replacement []string) (error, string) {
body := strings.NewReader(fmt.Sprintf(`
{
"vault": "%s",
"vault": "%s",
"skip_users_recovery": "true",
"dbs": [%s]
%s
}
Expand All @@ -651,10 +664,17 @@ func (bp BackupProvider) requestRestoration(ctx context.Context, dbs []string, b
logger.DebugContext(ctx, fmt.Sprintf("Request body built to restore '%s' backup: %v", backupId, body))
response, err := bp.Curator.client.Do(request)
if err != nil {
return err
return err, ""
}
logger.InfoContext(ctx, fmt.Sprintf("'%s' snapshot restoration is started: %s", backupId, response.Body))
return nil
defer response.Body.Close()
trackId, err := io.ReadAll(response.Body)
if err != nil {
logger.ErrorContext(ctx, "Error reading body", "error", err)
return err, ""
}

logger.InfoContext(ctx, fmt.Sprintf("'%s' snapshot restoration is started: %s", backupId, trackId))
return nil, string(trackId)
}

func (bp BackupProvider) prepareRestoreRequest(ctx context.Context, url string, body io.Reader) *http.Request {
Expand Down
12 changes: 12 additions & 0 deletions basic/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,18 @@ func (bp BaseProvider) createDatabase(requestOnCreateDb DbCreateRequest, ctx con
}
}

if ok, err := common.CheckPrefixUniqueness(prefix, ctx, bp.opensearch.Client); !ok {
if err != nil {
return nil, err
}
}

if ok, err := common.CheckPrefixUniqueness(prefix, ctx, bp.opensearch.Client); !ok {
if err != nil {
return nil, err
}
}

resourcesToCreate := requestOnCreateDb.Settings.CreateOnly
if len(resourcesToCreate) == 0 {
resourcesToCreate = []string{common.UserKind, common.IndexKind}
Expand Down
2 changes: 2 additions & 0 deletions basic/role.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (bp BaseProvider) CreateRoleWithDMLPermissions() error {
ClusterScrollClearPermission,
ClusterMonitorTaskGetPermission,
ClusterMonitorStatePermission,
ClusterMonitorMainPermission,
}
return bp.createRole(clusterPermissions, indexPermissions, []string{}, DmlRoleType)
}
Expand All @@ -161,6 +162,7 @@ func (bp BaseProvider) CreateRoleWithReadOnlyPermissions() error {
strings.ToUpper(ClusterReadOnlyPermissions),
ClusterScrollClearPermission,
ClusterMonitorStatePermission,
ClusterMonitorMainPermission,
}
return bp.createRole(clusterPermissions, indexPermissions, []string{}, ReadOnlyRoleType)
}
Expand Down
13 changes: 7 additions & 6 deletions basic/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,21 @@ func (bp BaseProvider) CreateUserHandler() func(w http.ResponseWriter, r *http.R
}

func (bp BaseProvider) ensureUser(username string, userCreateRequest dao.UserCreateRequest, ctx context.Context) (*CreatedUser, error) {
indexName := userCreateRequest.DbName
dbName := userCreateRequest.DbName
roleType := userCreateRequest.Role
if roleType == "" {
roleType = AdminRoleType
}
username, password, resources, err :=
bp.createOrUpdateUser(username, userCreateRequest.Password, indexName, roleType, ctx)
bp.createOrUpdateUser(username, userCreateRequest.Password, dbName, roleType, ctx)
if err != nil {
return nil, err
}
if indexName != "" {
resources = append(resources, dao.DbResource{Kind: common.IndexKind, Name: indexName})
if dbName != "" {
resources = append(resources, dao.DbResource{Kind: common.MetadataKind, Name: dbName})
resources = append(resources, dao.DbResource{Kind: common.ResourcePrefixKind, Name: dbName})
}
connectionProperties := bp.GetExtendedConnectionProperties(indexName, username, password, "", roleType)
connectionProperties := bp.GetExtendedConnectionProperties("", username, password, "", roleType)
user, err := bp.GetUser(username)
if err != nil {
return nil, err
Expand All @@ -125,7 +126,7 @@ func (bp BaseProvider) ensureUser(username string, userCreateRequest dao.UserCre

response := &CreatedUser{
ConnectionProperties: connectionProperties,
Name: indexName,
Name: dbName,
Resources: resources,
}
return response, nil
Expand Down
22 changes: 12 additions & 10 deletions basic/user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package basic

import (
"fmt"
"github.com/Netcracker/dbaas-opensearch-adapter/common"
"github.com/Netcracker/qubership-dbaas-adapter-core/pkg/dao"
"github.com/stretchr/testify/assert"
Expand All @@ -32,15 +31,16 @@ func TestUpdateUserWithDbNameAndPassword(t *testing.T) {
assert.Empty(t, err)
assert.Equal(t, username, response.ConnectionProperties.ResourcePrefix)
assert.Equal(t, userCreateRequest.DbName, response.Name)
assert.Equal(t, userCreateRequest.DbName, response.ConnectionProperties.DbName)
expectedUrl := fmt.Sprintf("http://localhost:9200/%s", userCreateRequest.DbName)
assert.Equal(t, "", response.ConnectionProperties.DbName)
expectedUrl := "http://localhost:9200/"
assert.Equal(t, expectedUrl, response.ConnectionProperties.Url)
assert.Equal(t, username, response.ConnectionProperties.Username)
assert.Equal(t, userCreateRequest.Password, response.ConnectionProperties.Password)
assert.Equal(t, AdminRoleType, response.ConnectionProperties.Role)
expectedResources := []dao.DbResource{
{Kind: common.UserKind, Name: username},
{Kind: common.IndexKind, Name: userCreateRequest.DbName},
{Kind: common.MetadataKind, Name: userCreateRequest.DbName},
{Kind: common.ResourcePrefixKind, Name: userCreateRequest.DbName},
}
assert.ElementsMatch(t, expectedResources, response.Resources)
}
Expand All @@ -54,15 +54,16 @@ func TestCreateUserWithoutUsername(t *testing.T) {
assert.Empty(t, err)
assert.Empty(t, response.ConnectionProperties.ResourcePrefix)
assert.Equal(t, userCreateRequest.DbName, response.Name)
assert.Equal(t, userCreateRequest.DbName, response.ConnectionProperties.DbName)
expectedUrl := fmt.Sprintf("http://localhost:9200/%s", userCreateRequest.DbName)
assert.Equal(t, "", response.ConnectionProperties.DbName)
expectedUrl := "http://localhost:9200/"
assert.Equal(t, expectedUrl, response.ConnectionProperties.Url)
assert.Contains(t, response.ConnectionProperties.Username, "dbaas_")
assert.Equal(t, userCreateRequest.Password, response.ConnectionProperties.Password)
assert.Equal(t, AdminRoleType, response.ConnectionProperties.Role)
expectedResources := []dao.DbResource{
{Kind: common.UserKind, Name: response.ConnectionProperties.Username},
{Kind: common.IndexKind, Name: userCreateRequest.DbName},
{Kind: common.MetadataKind, Name: userCreateRequest.DbName},
{Kind: common.ResourcePrefixKind, Name: userCreateRequest.DbName},
}
assert.ElementsMatch(t, expectedResources, response.Resources)
}
Expand All @@ -76,15 +77,16 @@ func TestUpdateUserWithDbName(t *testing.T) {
assert.Empty(t, err)
assert.Equal(t, username, response.ConnectionProperties.ResourcePrefix)
assert.Equal(t, userCreateRequest.DbName, response.Name)
assert.Equal(t, userCreateRequest.DbName, response.ConnectionProperties.DbName)
expectedUrl := fmt.Sprintf("http://localhost:9200/%s", userCreateRequest.DbName)
assert.Equal(t, "", response.ConnectionProperties.DbName)
expectedUrl := "http://localhost:9200/"
assert.Equal(t, expectedUrl, response.ConnectionProperties.Url)
assert.Equal(t, username, response.ConnectionProperties.Username)
assert.NotEmpty(t, response.ConnectionProperties.Password)
assert.Equal(t, AdminRoleType, response.ConnectionProperties.Role)
expectedResources := []dao.DbResource{
{Kind: common.UserKind, Name: username},
{Kind: common.IndexKind, Name: userCreateRequest.DbName},
{Kind: common.MetadataKind, Name: userCreateRequest.DbName},
{Kind: common.ResourcePrefixKind, Name: userCreateRequest.DbName},
}
assert.ElementsMatch(t, expectedResources, response.Resources)
}
Expand Down
38 changes: 38 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/Netcracker/dbaas-opensearch-adapter/api"
"io"
"io/fs"
"log"
Expand Down Expand Up @@ -56,6 +57,7 @@ const (

var logger = GetLogger()
var BasePath = GetBasePath()
var resourcePrefixAttributeName = "resource_prefix"

type Component struct {
Address string `json:"address"`
Expand Down Expand Up @@ -89,6 +91,12 @@ type CustomLogHandler struct {
l *log.Logger
}

type User struct {
Attributes map[string]string `json:"attributes,omitempty"`
Hash string `json:"hash"`
Roles []string `json:"backend_roles"`
}

func GetBasePath() string {
return fmt.Sprintf("/api/%s/dbaas/adapter/opensearch", GetEnv("API_VERSION", ApiV2))
}
Expand Down Expand Up @@ -205,3 +213,33 @@ func PrepareContext(r *http.Request) context.Context {
}
return context.WithValue(r.Context(), RequestIdKey, requestId)
}

func CheckPrefixUniqueness(prefix string, ctx context.Context, opensearchcli Client) (bool, error) {
logger.InfoContext(ctx, "Checking user prefix uniqueness during restoration with renaming")
getUsersRequest := api.GetUsersRequest{}
response, err := getUsersRequest.Do(context.Background(), opensearchcli)
if err != nil {
return false, fmt.Errorf("failed to receive users: %+v", err)
}
defer response.Body.Close()
if response.StatusCode == http.StatusOK {
var users map[string]User
err = ProcessBody(response.Body, &users)
if err != nil {
return false, err
}
for element, user := range users {
if strings.HasPrefix(element, prefix) {
logger.ErrorContext(ctx, fmt.Sprintf("provided prefix already exists or a part of another prefix: %+v", prefix))
return false, fmt.Errorf("provided prefix already exists or a part of another prefix: %+v", prefix)
}
if user.Attributes[resourcePrefixAttributeName] != "" && strings.HasPrefix(user.Attributes[resourcePrefixAttributeName], prefix) {
logger.ErrorContext(ctx, fmt.Sprintf("provided prefix already exists or a part of another prefix: %+v", prefix))
return false, fmt.Errorf("provided prefix already exists or a part of another prefix: %+v", prefix)
}
}
} else if response.StatusCode == http.StatusNotFound {
return true, nil
}
return true, nil
}

0 comments on commit e667943

Please sign in to comment.