diff --git a/internal/app/push/cli.go b/internal/app/push/cli.go index 5fa345e4..e653e145 100755 --- a/internal/app/push/cli.go +++ b/internal/app/push/cli.go @@ -132,7 +132,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra os.Exit(1) } - plan, e2 := getPlan(idStorageFactory(table, ingressDescriptor), autoTruncate) + plan, formats, e2 := getPlan(idStorageFactory(table, ingressDescriptor), autoTruncate) if e2 != nil { fmt.Fprintln(err, e2.Error()) os.Exit(2) @@ -161,7 +161,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra observers = append(observers, observer) } - e3 := push.Push(rowIteratorFactory(in), datadestination, plan, mode, commitSize, disableConstraints, rowExporter, translator, whereField, savepoint, autoTruncate, observers...) + e3 := push.Push(rowIteratorFactory(in), datadestination, plan, mode, commitSize, disableConstraints, rowExporter, translator, whereField, savepoint, autoTruncate, formats, observers...) if e3 != nil { log.Fatal().AnErr("error", e3).Msg("Fatal error stop the push command") os.Exit(1) @@ -253,20 +253,20 @@ func getDataDestination(dataconnectorName string) (push.DataDestination, *push.E return datadestinationFactory.New(u.URL.String(), alias.Schema), nil } -func getPlan(idStorage id.Storage, autoTruncate bool) (push.Plan, *push.Error) { +func getPlan(idStorage id.Storage, autoTruncate bool) (push.Plan, map[string]string, *push.Error) { id, err1 := idStorage.Read() if err1 != nil { - return nil, &push.Error{Description: err1.Error()} + return nil, nil, &push.Error{Description: err1.Error()} } relations, err2 := relStorage.List() if err2 != nil { - return nil, &push.Error{Description: err2.Error()} + return nil, nil, &push.Error{Description: err2.Error()} } tables, err3 := tabStorage.List() if err3 != nil { - return nil, &push.Error{Description: err3.Error()} + return nil, nil, &push.Error{Description: err3.Error()} } rmap := map[string]relation.Relation{} @@ -286,7 +286,14 @@ func getPlan(idStorage id.Storage, autoTruncate bool) (push.Plan, *push.Error) { pushtmap: map[string]push.Table{}, } - return converter.getPlan(id, autoTruncate), nil + formats := map[string]string{} + if id.Formats() != nil { + for _, column := range id.Formats().Columns() { + formats[column] = id.Formats().Get(column).Export() + } + } + + return converter.getPlan(id, autoTruncate), formats, nil } type idToPushConverter struct { diff --git a/internal/app/push/http.go b/internal/app/push/http.go index 92b732f3..42933a56 100644 --- a/internal/app/push/http.go +++ b/internal/app/push/http.go @@ -97,7 +97,7 @@ func Handler(w http.ResponseWriter, r *http.Request, mode push.Mode, ingressDesc } } - plan, e2 := getPlan(idStorageFactory(query.Get("table"), ingressDescriptor), autoTruncate) + plan, formats, e2 := getPlan(idStorageFactory(query.Get("table"), ingressDescriptor), autoTruncate) if e2 != nil { log.Error().Err(e2).Msg("") w.WriteHeader(http.StatusNotFound) @@ -146,7 +146,7 @@ func Handler(w http.ResponseWriter, r *http.Request, mode push.Mode, ingressDesc log.Debug().Msg(fmt.Sprintf("call Push with mode %s", mode)) - e3 := push.Push(rowIteratorFactory(r.Body), datadestination, plan, mode, commitSize, disableConstraints, push.NoErrorCaptureRowWriter{}, nil, query.Get("using-pk-field"), "", false) + e3 := push.Push(rowIteratorFactory(r.Body), datadestination, plan, mode, commitSize, disableConstraints, push.NoErrorCaptureRowWriter{}, nil, query.Get("using-pk-field"), "", false, formats) if e3 != nil { log.Error().Err(e3).Msg("") w.WriteHeader(http.StatusNotFound) diff --git a/pkg/push/driver.go b/pkg/push/driver.go index 96800ee4..d3b0d6a6 100755 --- a/pkg/push/driver.go +++ b/pkg/push/driver.go @@ -26,7 +26,7 @@ import ( ) // Push write rows to target table -func Push(ri RowIterator, destination DataDestination, plan Plan, mode Mode, commitSize uint, disableConstraints bool, catchError RowWriter, translator Translator, whereField string, savepointPath string, autotruncate bool, observers ...Observer) (err *Error) { +func Push(ri RowIterator, destination DataDestination, plan Plan, mode Mode, commitSize uint, disableConstraints bool, catchError RowWriter, translator Translator, whereField string, savepointPath string, autotruncate bool, formats map[string]string, observers ...Observer) (err *Error) { //nolint:gocyclo defer func() { for _, observer := range observers { if observer != nil { @@ -35,6 +35,10 @@ func Push(ri RowIterator, destination DataDestination, plan Plan, mode Mode, com } }() + if table, ok := plan.FirstTable().(table); ok { + table.applyFormats(formats) + } + err1 := destination.Open(plan, mode, disableConstraints) if err1 != nil { return err1 diff --git a/pkg/push/driver_test.go b/pkg/push/driver_test.go index 0aaef714..332878db 100755 --- a/pkg/push/driver_test.go +++ b/pkg/push/driver_test.go @@ -49,7 +49,7 @@ func TestSimplePush(t *testing.T) { } dest := memoryDataDestination{tables, false, false, false} - err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false, nil) assert.Nil(t, err) assert.Equal(t, true, dest.closed) @@ -88,7 +88,7 @@ func TestRelationPush(t *testing.T) { } dest := memoryDataDestination{tables, false, false, false} - err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false, nil) // no error assert.Nil(t, err) @@ -137,7 +137,7 @@ func TestRelationPushWithEmptyRelation(t *testing.T) { } dest := memoryDataDestination{tables, false, false, false} - err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false, nil) // no error assert.Nil(t, err) @@ -188,7 +188,7 @@ func TestInversseRelationPush(t *testing.T) { } dest := memoryDataDestination{tables, false, false, false} - err := push.Push(&ri, &dest, plan, push.Insert, 5, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + err := push.Push(&ri, &dest, plan, push.Insert, 5, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false, nil) // no error assert.Nil(t, err) diff --git a/pkg/push/model_table.go b/pkg/push/model_table.go index 3e37217a..08ea0df9 100755 --- a/pkg/push/model_table.go +++ b/pkg/push/model_table.go @@ -114,6 +114,27 @@ type ImportedRow struct { jsonline.Row } +func (t *table) applyFormats(formats map[string]string) { + if t.columns == nil { + return + } + + if l := int(t.columns.Len()); l > 0 { + columns := []Column{} + for idx := 0; idx < l; idx++ { + col := t.columns.Column(uint(idx)) + key := col.Name() + + if format, exist := formats[key]; exist { + columns = append(columns, NewColumn(col.Name(), col.Export(), format, col.Length(), col.LengthInBytes(), col.Truncate())) + } else { + columns = append(columns, NewColumn(col.Name(), col.Export(), col.Import(), col.Length(), col.LengthInBytes(), col.Truncate())) + } + } + t.columns = NewColumnList(columns) + } +} + func (t *table) initTemplate() { t.template = jsonline.NewTemplate()