Skip to content

Commit

Permalink
Merge pull request #920 from treeverse/fix/export-s3
Browse files Browse the repository at this point in the history
fix export to s3 not working due to wrong paths
  • Loading branch information
guy-har authored Nov 13, 2020
2 parents 19a4357 + 1aca707 commit 1d79c2d
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 43 deletions.
37 changes: 24 additions & 13 deletions export/export_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func PathToPointer(path string) (block.ObjectPointer, error) {
return block.ObjectPointer{}, err
}
return block.ObjectPointer{
StorageNamespace: fmt.Sprintf("%s://%s", u.Scheme, u.Host),
Identifier: u.Path,
StorageNamespace: fmt.Sprintf("%s://%s/", u.Scheme, u.Host),
Identifier: strings.TrimPrefix(u.Path, "/"),
}, err
}

Expand All @@ -61,11 +61,15 @@ func (h *Handler) start(body *string) error {
if err != nil {
return err
}
return h.generateTasks(startData, startData.ExportConfig, &finishBodyStr)
repo, err := h.cataloger.GetRepository(context.Background(), startData.Repo)
if err != nil {
return err
}
return h.generateTasks(startData, startData.ExportConfig, &finishBodyStr, repo.StorageNamespace)
}

func (h *Handler) generateTasks(startData StartData, config catalog.ExportConfiguration, finishBodyStr *string) error {
tasksGenerator := NewTasksGenerator(startData.ExportID, config.Path, getGenerateSuccess(config.LastKeysInPrefixRegexp), finishBodyStr)
func (h *Handler) generateTasks(startData StartData, config catalog.ExportConfiguration, finishBodyStr *string, storageNamespace string) error {
tasksGenerator := NewTasksGenerator(startData.ExportID, config.Path, getGenerateSuccess(config.LastKeysInPrefixRegexp), finishBodyStr, storageNamespace)
var diffs catalog.Differences
var err error
var hasMore bool
Expand Down Expand Up @@ -215,22 +219,29 @@ func getStatus(signalledErrors int) (catalog.CatalogBranchExportStatus, *string)
}
return catalog.ExportStatusSuccess, nil
}
func (h *Handler) done(body *string, signalledErrors int) error {
var finishData FinishData
err := json.Unmarshal([]byte(*body), &finishData)
if err != nil {
return err
}

status, msg := getStatus(signalledErrors)
func (h *Handler) updateStatus(finishData FinishData, status catalog.CatalogBranchExportStatus, signalledErrors int) error {
if finishData.StatusPath == "" {
return nil
}
fileName := fmt.Sprintf("%s-%s-%s", finishData.Repo, finishData.Branch, finishData.CommitRef)
path, err := PathToPointer(fmt.Sprintf("%s/%s", finishData.StatusPath, fileName))
if err != nil {
return err
}
data := fmt.Sprintf("status: %s, signalled_errors: %d\n", status, signalledErrors)
reader := strings.NewReader(data)
err = h.adapter.Put(path, reader.Size(), reader, block.PutOpts{})
return h.adapter.Put(path, reader.Size(), reader, block.PutOpts{})
}

func (h *Handler) done(body *string, signalledErrors int) error {
var finishData FinishData
err := json.Unmarshal([]byte(*body), &finishData)
if err != nil {
return err
}
status, msg := getStatus(signalledErrors)
err = h.updateStatus(finishData, status, signalledErrors)
if err != nil {
return err
}
Expand Down
16 changes: 8 additions & 8 deletions export/export_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
func TestCopy(t *testing.T) {
adapter := testutil.NewBlockAdapterByType(t, &block.NoOpTranslator{}, mem.BlockstoreType)
sourcePointer := block.ObjectPointer{
StorageNamespace: "mem://lakeFS-bucket",
Identifier: "/one/two",
StorageNamespace: "mem://lakeFS-bucket/",
Identifier: "one/two",
}
destinationPointer := block.ObjectPointer{
StorageNamespace: "mem://external-bucket",
Identifier: "/one/two",
StorageNamespace: "mem://external-bucket/",
Identifier: "one/two",
}
from := sourcePointer.StorageNamespace + sourcePointer.Identifier
to := destinationPointer.StorageNamespace + destinationPointer.Identifier
Expand Down Expand Up @@ -70,8 +70,8 @@ func TestDelete(t *testing.T) {
adapter := testutil.NewBlockAdapterByType(t, &block.NoOpTranslator{}, mem.BlockstoreType)

destinationPointer := block.ObjectPointer{
StorageNamespace: "mem://external-bucket",
Identifier: "/one/two",
StorageNamespace: "mem://external-bucket/",
Identifier: "one/two",
}
path := destinationPointer.StorageNamespace + destinationPointer.Identifier

Expand Down Expand Up @@ -108,8 +108,8 @@ func TestDelete(t *testing.T) {
func TestTouch(t *testing.T) {
adapter := testutil.NewBlockAdapterByType(t, &block.NoOpTranslator{}, mem.BlockstoreType)
destinationPointer := block.ObjectPointer{
StorageNamespace: "mem://external-bucket",
Identifier: "/one/two",
StorageNamespace: "mem://external-bucket/",
Identifier: "one/two",
}
path := destinationPointer.StorageNamespace + destinationPointer.Identifier

Expand Down
23 changes: 14 additions & 9 deletions export/tasks_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ func (s *SuccessTasksTreeGenerator) GenerateTasksTo(tasks []parade.TaskData) []p

// makeDiffTaskBody fills TaskData *out with id, action and a body to make it a task to
// perform diff.
func makeDiffTaskBody(out *parade.TaskData, idGen TaskIDGenerator, diff catalog.Difference, makeDestination func(string) string) error {
func makeDiffTaskBody(out *parade.TaskData, idGen TaskIDGenerator, diff catalog.Difference, makeDestination func(string) string, makeSource func(string) string) error {
var data interface{}
switch diff.Type {
case catalog.DifferenceTypeAdded, catalog.DifferenceTypeChanged:
data = CopyData{
From: diff.PhysicalAddress,
From: makeSource(diff.PhysicalAddress),
To: makeDestination(diff.Path),
}
out.ID = idGen.CopyTaskID(diff.Path)
Expand All @@ -252,11 +252,11 @@ func makeDiffTaskBody(out *parade.TaskData, idGen TaskIDGenerator, diff catalog.

// TasksGenerator generates tasks from diffs iteratively.
type TasksGenerator struct {
ExportID string
DstPrefix string
GenerateSuccessFor func(path string) bool
NumTries int

ExportID string
DstPrefix string
GenerateSuccessFor func(path string) bool
NumTries int
makeSource func(string) string
makeDestination func(string) string
idGen TaskIDGenerator
successTasksGenerator SuccessTasksTreeGenerator
Expand Down Expand Up @@ -294,9 +294,13 @@ func GetStartTasks(repo, branch, fromCommitRef, toCommitRef, exportID string, co
// NewTasksGenerator returns a generator that exports tasks from diffs to file operations under
// dstPrefix. It generates success files for files in directories matched by
// "generateSuccessFor".
func NewTasksGenerator(exportID string, dstPrefix string, generateSuccessFor func(path string) bool, finishBody *string) *TasksGenerator {
func NewTasksGenerator(exportID string, dstPrefix string, generateSuccessFor func(path string) bool, finishBody *string, storageNamespace string) *TasksGenerator {
const numTries = 5
dstPrefix = strings.TrimRight(dstPrefix, "/")
storageNamespace = strings.TrimRight(storageNamespace, "/")
makeSource := func(path string) string {
return fmt.Sprintf("%s/%s", storageNamespace, path)
}
makeDestination := func(path string) string {
return fmt.Sprintf("%s/%s", dstPrefix, path)
}
Expand All @@ -307,6 +311,7 @@ func NewTasksGenerator(exportID string, dstPrefix string, generateSuccessFor fun
GenerateSuccessFor: generateSuccessFor,
NumTries: numTries,
makeDestination: makeDestination,
makeSource: makeSource,
idGen: TaskIDGenerator(exportID),
successTasksGenerator: NewSuccessTasksTreeGenerator(exportID, generateSuccessFor, makeDestination, finishBody),
}
Expand All @@ -332,7 +337,7 @@ func (e *TasksGenerator) Add(diffs catalog.Differences) ([]parade.TaskData, erro
MaxTries: &e.NumTries,
TotalDependencies: &zero, // Depends only on a start task
}
err := makeDiffTaskBody(&task, e.idGen, diff, e.makeDestination)
err := makeDiffTaskBody(&task, e.idGen, diff, e.makeDestination, e.makeSource)
if err != nil {
return ret, err
}
Expand Down
18 changes: 5 additions & 13 deletions export/tasks_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ var zero int = 0
var one int = 1

func TestTasksGenerator_Empty(t *testing.T) {
gen := export.NewTasksGenerator("empty", "testfs://prefix/", func(_ string) bool { return true }, nil)
gen := export.NewTasksGenerator("empty", "testfs://prefix/", func(_ string) bool { return true }, nil, "")

tasks, err := gen.Finish()
if err != nil {
Expand Down Expand Up @@ -260,10 +260,7 @@ func TestTasksGenerator_Simple(t *testing.T) {
Type: catalog.DifferenceTypeRemoved,
Entry: catalog.Entry{Path: "remove1", PhysicalAddress: "remove1"},
}}
gen := export.NewTasksGenerator(
"simple",
"testfs://prefix/",
func(_ string) bool { return false }, nil)
gen := export.NewTasksGenerator("simple", "testfs://prefix/", func(_ string) bool { return false }, nil, "testsrc://prefix/")
tasksWithIDs, err := gen.Add(catalogDiffs)
if err != nil {
t.Fatalf("failed to add tasks: %s", err)
Expand All @@ -282,7 +279,7 @@ func TestTasksGenerator_Simple(t *testing.T) {
ID: idGen.CopyTaskID("add1"),
Action: export.CopyAction,
Body: toJSON(t, export.CopyData{
From: "add1",
From: "testsrc://prefix/add1",
To: "testfs://prefix/add1",
}),
StatusCode: parade.TaskPending,
Expand All @@ -293,7 +290,7 @@ func TestTasksGenerator_Simple(t *testing.T) {
ID: idGen.CopyTaskID("change1"),
Action: export.CopyAction,
Body: toJSON(t, export.CopyData{
From: "change1",
From: "testsrc://prefix/change1",
To: "testfs://prefix/change1",
}),
StatusCode: parade.TaskPending,
Expand Down Expand Up @@ -383,12 +380,7 @@ func TestTasksGenerator_SuccessFiles(t *testing.T) {
{before: idGen.DeleteTaskID("a/plain/2"), after: "foo:make-success:a/plain", avoid: true},
{before: idGen.DeleteTaskID("a/plain/2"), after: "foo:finish"},
}
gen := export.NewTasksGenerator(
"foo",
"testfs://prefix/",
func(path string) bool { return strings.HasSuffix(path, "success") },
nil,
)
gen := export.NewTasksGenerator("foo", "testfs://prefix/", func(path string) bool { return strings.HasSuffix(path, "success") }, nil, "")

tasksWithIDs := make([]parade.TaskData, 0, len(catalogDiffs))

Expand Down

0 comments on commit 1d79c2d

Please sign in to comment.