Skip to content

Commit

Permalink
feat: autotruncate (#261)
Browse files Browse the repository at this point in the history
* feat(autotruncate): add venom test

* feat(autotruncate): implement autotruncate

* feat(autotruncate): read autotruncate flag

* feat(autotruncate): add byte based truncate mode

* feat(autotruncate): add flags to add-column command

* feat(autotruncate): fix truncate in bytes

* feat(autotruncate): docs

* fix(table): export defaults to make push work

* fix(table): export defaults to make push work
  • Loading branch information
adrienaury authored Feb 21, 2024
1 parent b367c39 commit 3c24865
Show file tree
Hide file tree
Showing 16 changed files with 325 additions and 68 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ Types of changes
- `Added` columns information and export type using the `lino table extract` command, columns and keys organized according to the database order.
- `Added` flag `--only-tables` to `lino table extract` command. This flag allows for the extraction of table information exclusively, excluding columns. It has been included to maintain the previous behavior.
- `Added` flag `--with-db-infos` to `lino table extract` command. This flag enables the extraction of information regarding column types, length, size, and precision if the column has been configured with these specifications.
- `Added` flag `--autotruncate` to `lino push` command. This flag will enable a truncate on each value based each `dbinfo`.`length` parameters set in the table.yaml file for each columns.
- `Added` property `dbinfo`.`bytes` to column definition in table.yaml file. Set it to true to truncate the value based on a maximum number of bytes and not characters (assuming utf-8 encoding for now).
- `Added` flags `--max-length` and `--bytes` to `lino table add-column` command. Use it to edit the properties `dbinfo`.`length` and `dbinfo`.`bytes` of the table.yaml file.

## [2.6.1]

Expand Down
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,36 @@ Each line is a filter and `lino` apply it to the start table to extract data.
The `push` sub-command import a **json** line stream (jsonline format http://jsonlines.org/) in each table, following the ingress descriptor defined in current directory.
### Autotruncate values
Use the `autotruncate` flag to automatically truncate string values that overflows the maximum length accepted by the database.
```
$ lino push truncate dest --table actor --autotruncate < actors.jsonl
```
LINO will truncate each value based each `dbinfo`.`length` parameters set in the table.yaml file for each columns.
Additionnaly, if your database maximum value is not defined in number of characters but in number of bytes, set the `dbinfo`.`bytes` to true. LINO will truncate the value based on a maximum number of bytes and not characters (assuming utf-8 encoding for now).
```yaml
version: v1
tables:
- name: actor
keys:
- actor_id
columns:
- name: actor_id
dbinfo:
type: INT4
- name: first_name
export: string
dbinfo:
type: VARCHAR
length: 45
bytes: true
```

### How to update primary key

Let's say you have this record in database :
Expand Down
26 changes: 14 additions & 12 deletions internal/app/push/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
pkTranslations map[string]string
whereField string
savepoint string
autoTruncate bool
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -127,7 +128,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
os.Exit(1)
}

plan, e2 := getPlan(idStorageFactory(table, ingressDescriptor))
plan, e2 := getPlan(idStorageFactory(table, ingressDescriptor), autoTruncate)
if e2 != nil {
fmt.Fprintln(err, e2.Error())
os.Exit(2)
Expand All @@ -151,7 +152,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
os.Exit(1)
}

e3 := push.Push(rowIteratorFactory(in), datadestination, plan, mode, commitSize, disableConstraints, rowExporter, translator, whereField, savepoint)
e3 := push.Push(rowIteratorFactory(in), datadestination, plan, mode, commitSize, disableConstraints, rowExporter, translator, whereField, savepoint, autoTruncate)
if e3 != nil {
log.Fatal().AnErr("error", e3).Msg("Fatal error stop the push command")
os.Exit(1)
Expand All @@ -172,6 +173,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
cmd.Flags().StringToStringVar(&pkTranslations, "pk-translation", map[string]string{}, "list of dictionaries old value / new value for primary key update")
cmd.Flags().StringVar(&whereField, "using-pk-field", "__usingpk__", "Name of the data field that can be used as pk for update queries")
cmd.Flags().StringVar(&savepoint, "savepoint", "", "Name of a file to write primary keys of effectively processed lines (commit to database)")
cmd.Flags().BoolVarP(&autoTruncate, "autotruncate", "a", false, "Automatically truncate values to the maximum length defined in table.yaml")
cmd.SetOut(out)
cmd.SetErr(err)
cmd.SetIn(in)
Expand Down Expand Up @@ -241,7 +243,7 @@ func getDataDestination(dataconnectorName string) (push.DataDestination, *push.E
return datadestinationFactory.New(u.URL.String(), alias.Schema), nil
}

func getPlan(idStorage id.Storage) (push.Plan, *push.Error) {
func getPlan(idStorage id.Storage, autoTruncate bool) (push.Plan, *push.Error) {
id, err1 := idStorage.Read()
if err1 != nil {
return nil, &push.Error{Description: err1.Error()}
Expand Down Expand Up @@ -274,7 +276,7 @@ func getPlan(idStorage id.Storage) (push.Plan, *push.Error) {
pushtmap: map[string]push.Table{},
}

return converter.getPlan(id), nil
return converter.getPlan(id, autoTruncate), nil
}

type idToPushConverter struct {
Expand All @@ -285,7 +287,7 @@ type idToPushConverter struct {
pushtmap map[string]push.Table
}

func (c idToPushConverter) getTable(name string) push.Table {
func (c idToPushConverter) getTable(name string, autoTruncate bool) push.Table {
if pushtable, ok := c.pushtmap[name]; ok {
return pushtable
}
Expand All @@ -300,13 +302,13 @@ func (c idToPushConverter) getTable(name string) push.Table {

columns := []push.Column{}
for _, col := range table.Columns {
columns = append(columns, push.NewColumn(col.Name, col.Export, col.Import))
columns = append(columns, push.NewColumn(col.Name, col.Export, col.Import, col.DBInfo.Length, col.DBInfo.ByteBased, autoTruncate))
}

return push.NewTable(table.Name, table.Keys, push.NewColumnList(columns))
}

func (c idToPushConverter) getRelation(name string) push.Relation {
func (c idToPushConverter) getRelation(name string, autoTruncate bool) push.Relation {
if pushrelation, ok := c.pushrmap[name]; ok {
return pushrelation
}
Expand All @@ -321,12 +323,12 @@ func (c idToPushConverter) getRelation(name string) push.Relation {

return push.NewRelation(
relation.Name,
c.getTable(relation.Parent.Name),
c.getTable(relation.Child.Name),
c.getTable(relation.Parent.Name, autoTruncate),
c.getTable(relation.Child.Name, autoTruncate),
)
}

func (c idToPushConverter) getPlan(idesc id.IngressDescriptor) push.Plan {
func (c idToPushConverter) getPlan(idesc id.IngressDescriptor, autoTruncate bool) push.Plan {
relations := []push.Relation{}

activeTables, err := id.GetActiveTables(idesc)
Expand All @@ -338,9 +340,9 @@ func (c idToPushConverter) getPlan(idesc id.IngressDescriptor) push.Plan {
rel := idesc.Relations().Relation(idx)
if (activeTables.Contains(rel.Child().Name()) && rel.LookUpChild()) ||
(activeTables.Contains(rel.Parent().Name()) && rel.LookUpParent()) {
relations = append(relations, c.getRelation(rel.Name()))
relations = append(relations, c.getRelation(rel.Name(), autoTruncate))
}
}

return push.NewPlan(c.getTable(idesc.StartTable().Name()), relations)
return push.NewPlan(c.getTable(idesc.StartTable().Name(), autoTruncate), relations)
}
20 changes: 18 additions & 2 deletions internal/app/push/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,23 @@ func Handler(w http.ResponseWriter, r *http.Request, mode push.Mode, ingressDesc
return
}

plan, e2 := getPlan(idStorageFactory(query.Get("table"), ingressDescriptor))
autoTruncate := false
if query.Get("auto-truncate") != "" {
var err error
autoTruncate, err = strconv.ParseBool(query.Get("auto-truncate"))
if err != nil {
log.Error().Err(err).Msg("can't parse auto-truncate")
w.WriteHeader(http.StatusBadRequest)
_, ew := w.Write([]byte("{\"error\" : \"param auto-truncate must be a boolean\"}\n"))
if ew != nil {
log.Error().Err(ew).Msg("Write failed")
return
}
return
}
}

plan, e2 := getPlan(idStorageFactory(query.Get("table"), ingressDescriptor), autoTruncate)
if e2 != nil {
log.Error().Err(e2).Msg("")
w.WriteHeader(http.StatusNotFound)
Expand Down Expand Up @@ -130,7 +146,7 @@ func Handler(w http.ResponseWriter, r *http.Request, mode push.Mode, ingressDesc

log.Debug().Msg(fmt.Sprintf("call Push with mode %s", mode))

e3 := push.Push(rowIteratorFactory(r.Body), datadestination, plan, mode, commitSize, disableConstraints, push.NoErrorCaptureRowWriter{}, nil, query.Get("using-pk-field"), "")
e3 := push.Push(rowIteratorFactory(r.Body), datadestination, plan, mode, commitSize, disableConstraints, push.NoErrorCaptureRowWriter{}, nil, query.Get("using-pk-field"), "", false)
if e3 != nil {
log.Error().Err(e3).Msg("")
w.WriteHeader(http.StatusNotFound)
Expand Down
6 changes: 5 additions & 1 deletion internal/app/table/add-columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
func newAddColumnCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra.Command {
// local flags
var exportType, importType string
var maxLength int64
var byteBased bool

cmd := &cobra.Command{
Use: "add-column [Table Name] [Column Name]",
Expand All @@ -40,7 +42,7 @@ func newAddColumnCommand(fullName string, err *os.File, out *os.File, in *os.Fil
tableName := args[0]
columnName := args[1]

_, e1 := table.AddOrUpdateColumn(tableStorage, tableName, columnName, exportType, importType)
_, e1 := table.AddOrUpdateColumn(tableStorage, tableName, columnName, exportType, importType, maxLength, byteBased)
if e1 != nil {
fmt.Fprintln(err, e1.Description)
os.Exit(1)
Expand All @@ -54,5 +56,7 @@ func newAddColumnCommand(fullName string, err *os.File, out *os.File, in *os.Fil
cmd.SetIn(in)
cmd.Flags().StringVarP(&exportType, "export", "e", "", "export type for the column")
cmd.Flags().StringVarP(&importType, "import", "i", "", "import type for the column")
cmd.Flags().Int64VarP(&maxLength, "max-length", "l", 0, "set optional maximum length for this column that can be used with --autotruncate flag on push")
cmd.Flags().BoolVarP(&byteBased, "bytes", "b", false, "maximum length is expressed in bytes, not in characters")
return cmd
}
6 changes: 3 additions & 3 deletions internal/infra/table/extractor_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,19 @@ func (d PostgresDialect) GetExportType(dbtype string) (string, bool) {
return "string", true
// Numeric types
case "NUMERIC", "DECIMAL", "FLOAT", "REAL", "DOUBLE PRECISION", "MONEY", "INTEGER", "BIGINT",
"NUMBER", "BINARY_FLOAT", "BINARY_DOUBLE", "INT", "TINYINT", "SMALLINT", "MEDIUMINT":
"NUMBER", "BINARY_FLOAT", "BINARY_DOUBLE", "INT", "TINYINT", "SMALLINT", "MEDIUMINT", "INT4", "INT2", "BOOL":
return "numeric", true
// Timestamp types
case "TIMESTAMP", "TIMESTAMPTZ",
"TIMESTAMP WITH TIME ZONE", "TIMESTAMP WITH LOCAL TIME ZONE":
return "timestamp", true
return "datetime", true // export timestamps to datetime is the only option that works well with lino push
// Datetime types
case "DATE", "DATETIME2", "SMALLDATETIME", "DATETIME":
return "datetime", true
// Base64 types
case "BYTEA", "BLOB":
return "base64", true
default:
return "", false
return "string", true // default to export string since it will work most of the time (binary types are already handled)
}
}
1 change: 1 addition & 0 deletions internal/infra/table/storage_yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type YAMLDBInfo struct {
Length int64 `yaml:"length,omitempty"`
Size int64 `yaml:"size,omitempty"`
Precision int64 `yaml:"precision,omitempty"`
ByteBased bool `yaml:"bytes,omitempty"`
}

// YAMLStorage provides storage in a local YAML file
Expand Down
2 changes: 1 addition & 1 deletion pkg/push/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

// Push write rows to target table
func Push(ri RowIterator, destination DataDestination, plan Plan, mode Mode, commitSize uint, disableConstraints bool, catchError RowWriter, translator Translator, whereField string, savepointPath string) (err *Error) {
func Push(ri RowIterator, destination DataDestination, plan Plan, mode Mode, commitSize uint, disableConstraints bool, catchError RowWriter, translator Translator, whereField string, savepointPath string, autotruncate bool) (err *Error) {
err1 := destination.Open(plan, mode, disableConstraints)
if err1 != nil {
return err1
Expand Down
8 changes: 4 additions & 4 deletions pkg/push/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestSimplePush(t *testing.T) {
}
dest := memoryDataDestination{tables, false, false, false}

err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "")
err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false)

assert.Nil(t, err)
assert.Equal(t, true, dest.closed)
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestRelationPush(t *testing.T) {
}
dest := memoryDataDestination{tables, false, false, false}

err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "")
err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false)

// no error
assert.Nil(t, err)
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestRelationPushWithEmptyRelation(t *testing.T) {
}
dest := memoryDataDestination{tables, false, false, false}

err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "")
err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false)

// no error
assert.Nil(t, err)
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestInversseRelationPush(t *testing.T) {
}
dest := memoryDataDestination{tables, false, false, false}

err := push.Push(&ri, &dest, plan, push.Insert, 5, true, push.NoErrorCaptureRowWriter{}, nil, "", "")
err := push.Push(&ri, &dest, plan, push.Insert, 5, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false)

// no error
assert.Nil(t, err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/push/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type Column interface {
Name() string
Export() string
Import() string
Length() int64
LengthInBytes() bool
Truncate() bool
}

// Plan describe how to push data
Expand Down
60 changes: 52 additions & 8 deletions pkg/push/model_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"strings"
"time"
"unicode/utf8"

"github.com/cgi-fr/jsonline/pkg/jsonline"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -89,19 +90,25 @@ func (l columnList) String() string {
}

type column struct {
name string
exp string
imp string
name string
exp string
imp string
lgth int64
inbytes bool
truncate bool
}

// NewColumn initialize a new Column object
func NewColumn(name string, exp string, imp string) Column {
return column{name, exp, imp}
func NewColumn(name string, exp string, imp string, lgth int64, inbytes bool, truncate bool) Column {
return column{name, exp, imp, lgth, inbytes, truncate}
}

func (c column) Name() string { return c.name }
func (c column) Export() string { return c.exp }
func (c column) Import() string { return c.imp }
func (c column) Name() string { return c.name }
func (c column) Export() string { return c.exp }
func (c column) Import() string { return c.imp }
func (c column) Length() int64 { return c.lgth }
func (c column) LengthInBytes() bool { return c.inbytes }
func (c column) Truncate() bool { return c.truncate }

type ImportedRow struct {
jsonline.Row
Expand Down Expand Up @@ -176,6 +183,16 @@ func (t table) Import(row map[string]interface{}) (ImportedRow, *Error) {
}
result.SetValue(key, jsonline.NewValueAuto(bytes))
}

// autotruncate
value, exists := result.GetValue(key)
if exists && col.Truncate() && col.Length() > 0 && value.GetFormat() == jsonline.String {
if col.LengthInBytes() {
result.Set(key, truncateUTF8String(result.GetString(key), int(col.Length())))
} else {
result.Set(key, truncateRuneString(result.GetString(key), int(col.Length())))
}
}
}
}

Expand Down Expand Up @@ -234,3 +251,30 @@ func parseFormatWithType(option string) (string, string) {
}
return parts[0], strings.Trim(parts[1], ")")
}

// truncateUTF8String truncate s to n bytes or less. If len(s) is more than n,
// truncate before the start of the first rune that doesn't fit. s should
// consist of valid utf-8.
func truncateUTF8String(s string, n int) string {
if len(s) <= n {
return s
}
for n > 0 && !utf8.RuneStart(s[n]) {
n--
}

return s[:n]
}

// truncateRuneString truncate s to n runes or less.
func truncateRuneString(s string, n int) string {
if n <= 0 {
return ""
}

if utf8.RuneCountInString(s) < n {
return s
}

return string([]rune(s)[:n])
}
Loading

0 comments on commit 3c24865

Please sign in to comment.