Skip to content

Commit

Permalink
feat(id): add formats to push command
Browse files Browse the repository at this point in the history
  • Loading branch information
adrienaury committed Dec 11, 2024
1 parent 398a140 commit 593b1f3
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 14 deletions.
21 changes: 14 additions & 7 deletions internal/app/push/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
os.Exit(1)
}

plan, e2 := getPlan(idStorageFactory(table, ingressDescriptor), autoTruncate)
plan, formats, e2 := getPlan(idStorageFactory(table, ingressDescriptor), autoTruncate)
if e2 != nil {
fmt.Fprintln(err, e2.Error())
os.Exit(2)
Expand Down Expand Up @@ -161,7 +161,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
observers = append(observers, observer)
}

e3 := push.Push(rowIteratorFactory(in), datadestination, plan, mode, commitSize, disableConstraints, rowExporter, translator, whereField, savepoint, autoTruncate, observers...)
e3 := push.Push(rowIteratorFactory(in), datadestination, plan, mode, commitSize, disableConstraints, rowExporter, translator, whereField, savepoint, autoTruncate, formats, observers...)
if e3 != nil {
log.Fatal().AnErr("error", e3).Msg("Fatal error stop the push command")
os.Exit(1)
Expand Down Expand Up @@ -253,20 +253,20 @@ func getDataDestination(dataconnectorName string) (push.DataDestination, *push.E
return datadestinationFactory.New(u.URL.String(), alias.Schema), nil
}

func getPlan(idStorage id.Storage, autoTruncate bool) (push.Plan, *push.Error) {
func getPlan(idStorage id.Storage, autoTruncate bool) (push.Plan, map[string]string, *push.Error) {
id, err1 := idStorage.Read()
if err1 != nil {
return nil, &push.Error{Description: err1.Error()}
return nil, nil, &push.Error{Description: err1.Error()}
}

relations, err2 := relStorage.List()
if err2 != nil {
return nil, &push.Error{Description: err2.Error()}
return nil, nil, &push.Error{Description: err2.Error()}
}

tables, err3 := tabStorage.List()
if err3 != nil {
return nil, &push.Error{Description: err3.Error()}
return nil, nil, &push.Error{Description: err3.Error()}
}

rmap := map[string]relation.Relation{}
Expand All @@ -286,7 +286,14 @@ func getPlan(idStorage id.Storage, autoTruncate bool) (push.Plan, *push.Error) {
pushtmap: map[string]push.Table{},
}

return converter.getPlan(id, autoTruncate), nil
formats := map[string]string{}
if id.Formats() != nil {
for _, column := range id.Formats().Columns() {
formats[column] = id.Formats().Get(column).Export()
}
}

return converter.getPlan(id, autoTruncate), formats, nil
}

type idToPushConverter struct {
Expand Down
4 changes: 2 additions & 2 deletions internal/app/push/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func Handler(w http.ResponseWriter, r *http.Request, mode push.Mode, ingressDesc
}
}

plan, e2 := getPlan(idStorageFactory(query.Get("table"), ingressDescriptor), autoTruncate)
plan, formats, e2 := getPlan(idStorageFactory(query.Get("table"), ingressDescriptor), autoTruncate)
if e2 != nil {
log.Error().Err(e2).Msg("")
w.WriteHeader(http.StatusNotFound)
Expand Down Expand Up @@ -146,7 +146,7 @@ func Handler(w http.ResponseWriter, r *http.Request, mode push.Mode, ingressDesc

log.Debug().Msg(fmt.Sprintf("call Push with mode %s", mode))

e3 := push.Push(rowIteratorFactory(r.Body), datadestination, plan, mode, commitSize, disableConstraints, push.NoErrorCaptureRowWriter{}, nil, query.Get("using-pk-field"), "", false)
e3 := push.Push(rowIteratorFactory(r.Body), datadestination, plan, mode, commitSize, disableConstraints, push.NoErrorCaptureRowWriter{}, nil, query.Get("using-pk-field"), "", false, formats)
if e3 != nil {
log.Error().Err(e3).Msg("")
w.WriteHeader(http.StatusNotFound)
Expand Down
6 changes: 5 additions & 1 deletion pkg/push/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

// Push write rows to target table
func Push(ri RowIterator, destination DataDestination, plan Plan, mode Mode, commitSize uint, disableConstraints bool, catchError RowWriter, translator Translator, whereField string, savepointPath string, autotruncate bool, observers ...Observer) (err *Error) {
func Push(ri RowIterator, destination DataDestination, plan Plan, mode Mode, commitSize uint, disableConstraints bool, catchError RowWriter, translator Translator, whereField string, savepointPath string, autotruncate bool, formats map[string]string, observers ...Observer) (err *Error) { //nolint:gocyclo
defer func() {
for _, observer := range observers {
if observer != nil {
Expand All @@ -35,6 +35,10 @@ func Push(ri RowIterator, destination DataDestination, plan Plan, mode Mode, com
}
}()

if table, ok := plan.FirstTable().(table); ok {
table.applyFormats(formats)
}

err1 := destination.Open(plan, mode, disableConstraints)
if err1 != nil {
return err1
Expand Down
8 changes: 4 additions & 4 deletions pkg/push/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestSimplePush(t *testing.T) {
}
dest := memoryDataDestination{tables, false, false, false}

err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false)
err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false, nil)

assert.Nil(t, err)
assert.Equal(t, true, dest.closed)
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestRelationPush(t *testing.T) {
}
dest := memoryDataDestination{tables, false, false, false}

err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false)
err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false, nil)

// no error
assert.Nil(t, err)
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestRelationPushWithEmptyRelation(t *testing.T) {
}
dest := memoryDataDestination{tables, false, false, false}

err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false)
err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false, nil)

// no error
assert.Nil(t, err)
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestInversseRelationPush(t *testing.T) {
}
dest := memoryDataDestination{tables, false, false, false}

err := push.Push(&ri, &dest, plan, push.Insert, 5, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false)
err := push.Push(&ri, &dest, plan, push.Insert, 5, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false, nil)

// no error
assert.Nil(t, err)
Expand Down
21 changes: 21 additions & 0 deletions pkg/push/model_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,27 @@ type ImportedRow struct {
jsonline.Row
}

func (t *table) applyFormats(formats map[string]string) {
if t.columns == nil {
return
}

if l := int(t.columns.Len()); l > 0 {
columns := []Column{}
for idx := 0; idx < l; idx++ {
col := t.columns.Column(uint(idx))
key := col.Name()

if format, exist := formats[key]; exist {
columns = append(columns, NewColumn(col.Name(), col.Export(), format, col.Length(), col.LengthInBytes(), col.Truncate()))
} else {
columns = append(columns, NewColumn(col.Name(), col.Export(), col.Import(), col.Length(), col.LengthInBytes(), col.Truncate()))
}
}
t.columns = NewColumnList(columns)
}
}

func (t *table) initTemplate() {
t.template = jsonline.NewTemplate()

Expand Down

0 comments on commit 593b1f3

Please sign in to comment.