From 21680259811cce3e1d62b9354109d0995dfce1d3 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Thu, 24 Oct 2024 09:17:24 +0800 Subject: [PATCH] Support Canal Json Protocol (Draft) (#407) --- pkg/common/event/row_change.go | 17 + pkg/sink/codec/canal/canal_encoder.go | 176 ---------- pkg/sink/codec/canal/canal_entry.go | 276 ++++++++++----- pkg/sink/codec/canal/canal_json_decoder.go | 324 ------------------ pkg/sink/codec/canal/canal_json_message.go | 98 +++--- .../canal/canal_json_row_event_encoder.go | 242 +++++++------ pkg/sink/codec/decoder/decoder.go | 3 +- pkg/sink/codec/encoder_builder.go | 7 +- 8 files changed, 402 insertions(+), 741 deletions(-) delete mode 100644 pkg/sink/codec/canal/canal_encoder.go delete mode 100644 pkg/sink/codec/canal/canal_json_decoder.go diff --git a/pkg/common/event/row_change.go b/pkg/common/event/row_change.go index d25a38304..4e04c3a69 100644 --- a/pkg/common/event/row_change.go +++ b/pkg/common/event/row_change.go @@ -243,3 +243,20 @@ func (e *RowEvent) GetRows() *chunk.Row { func (e *RowEvent) GetPreRows() *chunk.Row { return &e.Event.PreRow } + +// PrimaryKeyColumnNames return all primary key's name +// TODO: need a test for delete / insert / update event +// 但理论上应该没区别,没有 ddl 没有发生 schema 变化的 +func (e *RowEvent) PrimaryKeyColumnNames() []string { + var result []string + + result = make([]string, 0) + tableInfo := e.TableInfo + columns := e.TableInfo.Columns + for _, col := range columns { + if col != nil && tableInfo.ForceGetColumnFlagType(col.ID).IsPrimaryKey() { + result = append(result, tableInfo.ForceGetColumnName(col.ID)) + } + } + return result +} diff --git a/pkg/sink/codec/canal/canal_encoder.go b/pkg/sink/codec/canal/canal_encoder.go deleted file mode 100644 index f543e33cd..000000000 --- a/pkg/sink/codec/canal/canal_encoder.go +++ /dev/null @@ -1,176 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package canal - -import ( - "context" - - commonEvent "github.com/flowbehappy/tigate/pkg/common/event" - "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" - "github.com/golang/protobuf/proto" - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" - ticommon "github.com/pingcap/tiflow/pkg/sink/codec/common" - canal "github.com/pingcap/tiflow/proto/canal" - "go.uber.org/zap" -) - -// BatchEncoder encodes the events into the byte of a batch into. -type BatchEncoder struct { - messages *canal.Messages - callbackBuf []func() - packet *canal.Packet - entryBuilder *canalEntryBuilder - - config *ticommon.Config -} - -// EncodeCheckpointEvent implements the RowEventEncoder interface -func (d *BatchEncoder) EncodeCheckpointEvent(ts uint64) (*ticommon.Message, error) { - // For canal now, there is no such a corresponding type to ResolvedEvent so far. - // Therefore, the event is ignored. - return nil, nil -} - -// AppendRowChangedEvent implements the RowEventEncoder interface -func (d *BatchEncoder) AppendRowChangedEvent( - _ context.Context, - _ string, - e *commonEvent.RowChangedEvent, - callback func(), -) error { - entry, err := d.entryBuilder.fromRowEvent(e, d.config.DeleteOnlyHandleKeyColumns) - if err != nil { - return errors.Trace(err) - } - b, err := proto.Marshal(entry) - if err != nil { - return cerror.WrapError(cerror.ErrCanalEncodeFailed, err) - } - d.messages.Messages = append(d.messages.Messages, b) - if callback != nil { - d.callbackBuf = append(d.callbackBuf, callback) - } - return nil -} - -// EncodeDDLEvent implements the RowEventEncoder interface -func (d *BatchEncoder) EncodeDDLEvent(e *commonEvent.DDLEvent) (*ticommon.Message, error) { - // entry, err := d.entryBuilder.fromDDLEvent(e) - // if err != nil { - // return nil, errors.Trace(err) - // } - // b, err := proto.Marshal(entry) - // if err != nil { - // return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) - // } - - // messages := new(canal.Messages) - // messages.Messages = append(messages.Messages, b) - // b, err = messages.Marshal() - // if err != nil { - // return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) - // } - - // packet := &canal.Packet{ - // VersionPresent: &canal.Packet_Version{ - // Version: CanalPacketVersion, - // }, - // Type: canal.PacketType_MESSAGES, - // } - // packet.Body = b - // b, err = packet.Marshal() - // if err != nil { - // return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) - // } - - // return ticommon.NewDDLMsg(config.ProtocolCanal, nil, b, e), nil - return nil, nil -} - -// Build implements the RowEventEncoder interface -func (d *BatchEncoder) Build() []*ticommon.Message { - rowCount := len(d.messages.Messages) - if rowCount == 0 { - return nil - } - - err := d.refreshPacketBody() - if err != nil { - log.Panic("Error when generating Canal packet", zap.Error(err)) - } - - value, err := proto.Marshal(d.packet) - if err != nil { - log.Panic("Error when serializing Canal packet", zap.Error(err)) - } - ret := ticommon.NewMsg(config.ProtocolCanal, nil, value, 0, model.MessageTypeRow, nil, nil) - ret.SetRowsCount(rowCount) - d.messages.Reset() - d.resetPacket() - - if len(d.callbackBuf) != 0 && len(d.callbackBuf) == rowCount { - callbacks := d.callbackBuf - ret.Callback = func() { - for _, cb := range callbacks { - cb() - } - } - d.callbackBuf = make([]func(), 0) - } - return []*ticommon.Message{ret} -} - -// refreshPacketBody() marshals the messages to the packet body -func (d *BatchEncoder) refreshPacketBody() error { - oldSize := len(d.packet.Body) - newSize := proto.Size(d.messages) - if newSize > oldSize { - // resize packet body slice - d.packet.Body = append(d.packet.Body, make([]byte, newSize-oldSize)...) - } else { - d.packet.Body = d.packet.Body[:newSize] - } - - _, err := d.messages.MarshalToSizedBuffer(d.packet.Body) - return err -} - -func (d *BatchEncoder) resetPacket() { - d.packet = &canal.Packet{ - VersionPresent: &canal.Packet_Version{ - Version: CanalPacketVersion, - }, - Type: canal.PacketType_MESSAGES, - } -} - -func (d *BatchEncoder) Clean() {} - -// newBatchEncoder creates a new canalBatchEncoder. -func NewBatchEncoder(config *ticommon.Config) (encoder.EventEncoder, error) { - encoder := &BatchEncoder{ - messages: &canal.Messages{}, - callbackBuf: make([]func(), 0), - entryBuilder: newCanalEntryBuilder(config), - - config: config, - } - - encoder.resetPacket() - return encoder, nil -} diff --git a/pkg/sink/codec/canal/canal_entry.go b/pkg/sink/codec/canal/canal_entry.go index d80aa1b3e..eff2dad8b 100644 --- a/pkg/sink/codec/canal/canal_entry.go +++ b/pkg/sink/codec/canal/canal_entry.go @@ -20,15 +20,16 @@ import ( "strconv" "github.com/flowbehappy/tigate/pkg/common" + commonEvent "github.com/flowbehappy/tigate/pkg/common/event" + ticommon "github.com/flowbehappy/tigate/pkg/sink/codec/common" "github.com/flowbehappy/tigate/pkg/sink/codec/internal" "github.com/golang/protobuf/proto" // nolint:staticcheck "github.com/pingcap/errors" mm "github.com/pingcap/tidb/pkg/parser/model" timodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tidb/pkg/util/chunk" cerror "github.com/pingcap/tiflow/pkg/errors" - ticommon "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/codec/utils" canal "github.com/pingcap/tiflow/proto/canal" "golang.org/x/text/encoding" @@ -78,64 +79,171 @@ func (b *canalEntryBuilder) buildHeader(commitTs uint64, schema string, table st return h } -// In the official canal-json implementation, value were extracted from binlog buffer. -// see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java#L276-L1147 -// all value will be represented in string type -// see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L760-L855 -func (b *canalEntryBuilder) formatValue(value interface{}, isBinary bool) (result string, err error) { - // value would be nil, if no value insert for the column. - if value == nil { - return "", nil - } +func formatColumnValue(row *chunk.Row, idx int, columnInfo *timodel.ColumnInfo, flag *common.ColumnFlagType) (string, internal.JavaSQLType, error) { + colType := columnInfo.GetType() - switch v := value.(type) { - case int64: - result = strconv.FormatInt(v, 10) - case uint64: - result = strconv.FormatUint(v, 10) - case float32: - result = strconv.FormatFloat(float64(v), 'f', -1, 32) - case float64: - result = strconv.FormatFloat(v, 'f', -1, 64) - case string: - result = v - case []byte: - // see https://github.com/alibaba/canal/blob/9f6021cf36f78cc8ac853dcf37a1769f359b868b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L801 - if isBinary { - decoded, err := b.bytesDecoder.Bytes(v) - if err != nil { - return "", err - } - result = string(decoded) + var value string + var javaType internal.JavaSQLType + + switch colType { + case mysql.TypeBit: + javaType = internal.JavaSQLTypeBIT + d := row.GetDatum(idx, &columnInfo.FieldType) + if d.IsNull() { + value = "null" + } else { + dp := &d + // Encode bits as integers to avoid pingcap/tidb#10988 (which also affects MySQL itself) + value = dp.GetMysqlBit().String() + } + case mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: + if flag.IsBinary() { + javaType = internal.JavaSQLTypeBLOB + } else { + javaType = internal.JavaSQLTypeCLOB + } + value = string(row.GetBytes(idx)) + case mysql.TypeVarchar, mysql.TypeVarString: + if flag.IsBinary() { + javaType = internal.JavaSQLTypeBLOB + } else { + javaType = internal.JavaSQLTypeVARCHAR + } + value = string(row.GetBytes(idx)) // TODO:do test for binary case + case mysql.TypeString: + if flag.IsBinary() { + javaType = internal.JavaSQLTypeBLOB + } + javaType = internal.JavaSQLTypeCHAR + value = string(row.GetBytes(idx)) // TODO:do test for binary case + case mysql.TypeEnum: + javaType = internal.JavaSQLTypeINTEGER + enumValue := row.GetEnum(idx).Value + if enumValue == 0 { // 测一下 enum 这种 case 到底是 "0" 还是 "" + value = "null" + } else { + value = fmt.Sprintf("%d", enumValue) + } + case mysql.TypeSet: + javaType = internal.JavaSQLTypeBIT + bitValue := row.GetEnum(idx).Value + if bitValue == 0 { // 测一下 enum 这种 case 到底是 "0" 还是 "null" + value = "null" + } else { + value = fmt.Sprintf("%d", bitValue) + } + case mysql.TypeDate, mysql.TypeNewDate: + javaType = internal.JavaSQLTypeDATE + timeValue := row.GetTime(idx) + if timeValue.IsZero() { + value = "null" + } else { + value = timeValue.String() + } + case mysql.TypeDatetime, mysql.TypeTimestamp: + javaType = internal.JavaSQLTypeTIMESTAMP + timeValue := row.GetTime(idx) + if timeValue.IsZero() { + value = "null" } else { - result = string(v) + value = timeValue.String() } + case mysql.TypeDuration: + javaType = internal.JavaSQLTypeTIME + durationValue := row.GetDuration(idx, 0) + if durationValue.ToNumber().IsZero() { + value = "null" + } else { + value = durationValue.String() + } + case mysql.TypeJSON: + javaType = internal.JavaSQLTypeVARCHAR + jsonValue := row.GetJSON(idx) + if jsonValue.IsZero() { + value = "null" + } else { + value = jsonValue.String() + } + case mysql.TypeNewDecimal: + javaType = internal.JavaSQLTypeDECIMAL + decimalValue := row.GetMyDecimal(idx) + if decimalValue.IsZero() { + value = "null" + } else { + value = decimalValue.String() + } + case mysql.TypeTiny: + javaType = internal.JavaSQLTypeTINYINT + d := row.GetDatum(idx, &columnInfo.FieldType) + uintValue := d.GetUint64() + value = strconv.FormatUint(uintValue, 10) + case mysql.TypeShort: + javaType = internal.JavaSQLTypeSMALLINT + d := row.GetDatum(idx, &columnInfo.FieldType) + uintValue := d.GetUint64() + value = strconv.FormatUint(uintValue, 10) + case mysql.TypeLong: + javaType = internal.JavaSQLTypeINTEGER + d := row.GetDatum(idx, &columnInfo.FieldType) + uintValue := d.GetUint64() + value = strconv.FormatUint(uintValue, 10) + case mysql.TypeFloat: + javaType = internal.JavaSQLTypeREAL + d := row.GetDatum(idx, &columnInfo.FieldType) + floatValue := d.GetFloat32() + value = strconv.FormatFloat(float64(floatValue), 'f', -1, 32) + case mysql.TypeDouble: + javaType = internal.JavaSQLTypeDOUBLE + d := row.GetDatum(idx, &columnInfo.FieldType) + floatValue := d.GetFloat64() + value = strconv.FormatFloat(floatValue, 'f', -1, 64) + case mysql.TypeNull: + javaType = internal.JavaSQLTypeNULL + value = "null" + case mysql.TypeLonglong: + javaType = internal.JavaSQLTypeBIGINT + d := row.GetDatum(idx, &columnInfo.FieldType) + uintValue := d.GetUint64() + value = strconv.FormatUint(uintValue, 10) + case mysql.TypeInt24: + javaType = internal.JavaSQLTypeINTEGER + d := row.GetDatum(idx, &columnInfo.FieldType) + uintValue := d.GetUint64() + value = strconv.FormatUint(uintValue, 10) + case mysql.TypeYear: + javaType = internal.JavaSQLTypeVARCHAR + d := row.GetDatum(idx, &columnInfo.FieldType) + yearValue := d.GetInt64() + value = strconv.FormatInt(yearValue, 10) default: - result = fmt.Sprintf("%v", v) + javaType = internal.JavaSQLTypeVARCHAR + d := row.GetDatum(idx, &columnInfo.FieldType) + // NOTICE: GetValue() may return some types that go sql not support, which will cause sink DML fail + // Make specified convert upper if you need + // Go sql support type ref to: https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236 + value = fmt.Sprintf("%v", d.GetValue()) } - return result, nil + return value, javaType, nil } // build the Column in the canal RowData // see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L756-L872 -func (b *canalEntryBuilder) buildColumn(c *common.Column, columnInfo *timodel.ColumnInfo, updated bool) (*canal.Column, error) { +func (b *canalEntryBuilder) buildColumn(row *chunk.Row, idx int, columnInfo *timodel.ColumnInfo, tableInfo *common.TableInfo, updated bool) (*canal.Column, error) { mysqlType := utils.GetMySQLType(columnInfo, b.config.ContentCompatible) - javaType, err := getJavaSQLType(c.Value, c.Type, c.Flag) - if err != nil { - return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) - } - value, err := b.formatValue(c.Value, c.Flag.IsBinary()) + flag := tableInfo.ColumnsFlag[columnInfo.ID] + + value, javaType, err := formatColumnValue(row, idx, columnInfo, flag) if err != nil { return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) } canalColumn := &canal.Column{ SqlType: int32(javaType), - Name: c.Name, - IsKey: c.Flag.IsPrimaryKey(), + Name: columnInfo.Name.O, + IsKey: flag.IsPrimaryKey(), Updated: updated, - IsNullPresent: &canal.Column_IsNull{IsNull: c.Value == nil}, + IsNullPresent: &canal.Column_IsNull{IsNull: row.IsNull(idx)}, Value: value, MysqlType: mysqlType, } @@ -143,44 +251,48 @@ func (b *canalEntryBuilder) buildColumn(c *common.Column, columnInfo *timodel.Co } // build the RowData of a canal entry -func (b *canalEntryBuilder) buildRowData(e *common.RowChangedEvent, onlyHandleKeyColumns bool) (*canal.RowData, error) { +func (b *canalEntryBuilder) buildRowData(e *commonEvent.RowEvent, onlyHandleKeyColumns bool) (*canal.RowData, error) { var columns []*canal.Column - colInfos := e.TableInfo.GetColInfosForRowChangedEvent() - for idx, column := range e.GetColumns() { - if column == nil { - continue - } - columnInfo, ok := e.TableInfo.GetColumnInfo(colInfos[idx].ID) - if !ok { - return nil, cerror.ErrCanalEncodeFailed.GenWithStack( - "column info not found for column id: %d", colInfos[idx].ID) - } - c, err := b.buildColumn(column, columnInfo, !e.IsDelete()) - if err != nil { - return nil, errors.Trace(err) + + // build after columns + flag := false // flag to check if any column is written + + colInfo := e.TableInfo.Columns + for idx, col := range colInfo { + if e.ColumnSelector.Select(col) { + flag = true + column, err := b.buildColumn(e.GetRows(), idx, col, e.TableInfo, !e.IsDelete()) + if err != nil { + return nil, errors.Trace(err) + } + columns = append(columns, column) } - columns = append(columns, c) + + } + if !flag { + return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found handle key columns for the delete event") } - onlyHandleKeyColumns = onlyHandleKeyColumns && e.IsDelete() + // build before columns var preColumns []*canal.Column - for idx, column := range e.GetPreColumns() { - if column == nil { - continue - } - if onlyHandleKeyColumns && !column.Flag.IsHandleKey() { - continue - } - columnInfo, ok := e.TableInfo.GetColumnInfo(colInfos[idx].ID) - if !ok { - return nil, cerror.ErrCanalEncodeFailed.GenWithStack( - "column info not found for column id: %d", colInfos[idx].ID) - } - c, err := b.buildColumn(column, columnInfo, !e.IsDelete()) - if err != nil { - return nil, errors.Trace(err) + flag = false + onlyHandleKeyColumns = onlyHandleKeyColumns && e.IsDelete() + for idx, col := range colInfo { + if e.ColumnSelector.Select(col) { + if onlyHandleKeyColumns && !e.TableInfo.ColumnsFlag[col.ID].IsHandleKey() { + continue + } + flag = true + column, err := b.buildColumn(e.GetPreRows(), idx, col, e.TableInfo, !e.IsDelete()) + if err != nil { + return nil, errors.Trace(err) + } + preColumns = append(preColumns, column) } - preColumns = append(preColumns, c) + + } + if !flag { + return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found handle key columns for the delete event") } rowData := &canal.RowData{} @@ -190,7 +302,7 @@ func (b *canalEntryBuilder) buildRowData(e *common.RowChangedEvent, onlyHandleKe } // fromRowEvent builds canal entry from cdc RowChangedEvent -func (b *canalEntryBuilder) fromRowEvent(e *common.RowChangedEvent, onlyHandleKeyColumns bool) (*canal.Entry, error) { +func (b *canalEntryBuilder) fromRowEvent(e *commonEvent.RowEvent, onlyHandleKeyColumns bool) (*canal.Entry, error) { eventType := convertRowEventType(e) header := b.buildHeader(e.CommitTs, e.TableInfo.GetSchemaName(), e.TableInfo.GetTableName(), eventType, 1) isDdl := isCanalDDL(eventType) // false @@ -218,16 +330,16 @@ func (b *canalEntryBuilder) fromRowEvent(e *common.RowChangedEvent, onlyHandleKe } // fromDDLEvent builds canal entry from cdc DDLEvent -func (b *canalEntryBuilder) fromDDLEvent(e *model.DDLEvent) (*canal.Entry, error) { +func (b *canalEntryBuilder) fromDDLEvent(e *commonEvent.DDLEvent) (*canal.Entry, error) { eventType := convertDdlEventType(e) - header := b.buildHeader(e.CommitTs, e.TableInfo.TableName.Schema, e.TableInfo.TableName.Table, eventType, -1) + header := b.buildHeader(e.GetCommitTs(), e.SchemaName, e.TableName, eventType, -1) isDdl := isCanalDDL(eventType) rc := &canal.RowChange{ EventTypePresent: &canal.RowChange_EventType{EventType: eventType}, IsDdlPresent: &canal.RowChange_IsDdl{IsDdl: isDdl}, Sql: e.Query, RowDatas: nil, - DdlSchemaName: e.TableInfo.TableName.Schema, + DdlSchemaName: e.SchemaName, } rcBytes, err := proto.Marshal(rc) if err != nil { @@ -249,20 +361,20 @@ func convertToCanalTs(commitTs uint64) int64 { } // get the canal EventType according to the RowChangedEvent -func convertRowEventType(e *common.RowChangedEvent) canal.EventType { +func convertRowEventType(e *commonEvent.RowEvent) canal.EventType { if e.IsDelete() { return canal.EventType_DELETE } - if len(e.PreColumns) == 0 { + if e.IsInsert() { return canal.EventType_INSERT } return canal.EventType_UPDATE } // get the canal EventType according to the DDLEvent -func convertDdlEventType(e *model.DDLEvent) canal.EventType { +func convertDdlEventType(e *commonEvent.DDLEvent) canal.EventType { // see https://github.com/alibaba/canal/blob/d53bfd7ee76f8fe6eb581049d64b07d4fcdd692d/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/ddl/DruidDdlParser.java#L59-L178 - switch e.Type { + switch mm.ActionType(e.Type) { case mm.ActionCreateSchema, mm.ActionDropSchema, mm.ActionShardRowID, mm.ActionCreateView, mm.ActionDropView, mm.ActionRecoverTable, mm.ActionModifySchemaCharsetAndCollate, mm.ActionLockTable, mm.ActionUnlockTable, mm.ActionRepairTable, mm.ActionSetTiFlashReplica, diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go deleted file mode 100644 index 51dcf1e9d..000000000 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ /dev/null @@ -1,324 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package canal - -import ( - "bytes" - "context" - "database/sql" - "path/filepath" - "strconv" - "strings" - "time" - - "github.com/flowbehappy/tigate/pkg/common" - "github.com/flowbehappy/tigate/pkg/sink/codec/decoder" - "github.com/goccy/go-json" - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tiflow/cdc/model" - cerror "github.com/pingcap/tiflow/pkg/errors" - ticommon "github.com/pingcap/tiflow/pkg/sink/codec/common" - "github.com/pingcap/tiflow/pkg/sink/codec/utils" - "github.com/pingcap/tiflow/pkg/util" - "go.uber.org/zap" - "golang.org/x/text/encoding" - "golang.org/x/text/encoding/charmap" -) - -// batchDecoder decodes the byte into the original message. -type batchDecoder struct { - data []byte - msg canalJSONMessageInterface - - config *ticommon.Config - - storage storage.ExternalStorage - - upstreamTiDB *sql.DB - bytesDecoder *encoding.Decoder -} - -// NewBatchDecoder return a decoder for canal-json -func NewBatchDecoder( - ctx context.Context, codecConfig *ticommon.Config, db *sql.DB, -) (decoder.RowEventDecoder, error) { - var ( - externalStorage storage.ExternalStorage - err error - ) - if codecConfig.LargeMessageHandle.EnableClaimCheck() { - storageURI := codecConfig.LargeMessageHandle.ClaimCheckStorageURI - externalStorage, err = util.GetExternalStorage(ctx, storageURI, nil, util.NewS3Retryer(10, 10*time.Second, 10*time.Second)) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) - } - } - - if codecConfig.LargeMessageHandle.HandleKeyOnly() && db == nil { - return nil, cerror.ErrCodecDecode. - GenWithStack("handle-key-only is enabled, but upstream TiDB is not provided") - } - - return &batchDecoder{ - config: codecConfig, - storage: externalStorage, - upstreamTiDB: db, - bytesDecoder: charmap.ISO8859_1.NewDecoder(), - }, nil -} - -// AddKeyValue implements the ticommonDecoder interface -func (b *batchDecoder) AddKeyValue(_, value []byte) error { - value, err := ticommon.Decompress(b.config.LargeMessageHandle.LargeMessageHandleCompression, value) - if err != nil { - log.Error("decompress data failed", - zap.String("compression", b.config.LargeMessageHandle.LargeMessageHandleCompression), - zap.Error(err)) - - return errors.Trace(err) - } - b.data = value - return nil -} - -// HasNext implements the ticommonDecoder interface -func (b *batchDecoder) HasNext() (model.MessageType, bool, error) { - if b.data == nil { - return model.MessageTypeUnknown, false, nil - } - var ( - msg canalJSONMessageInterface = &JSONMessage{} - encodedData []byte - ) - - if b.config.EnableTiDBExtension { - msg = &canalJSONMessageWithTiDBExtension{ - JSONMessage: &JSONMessage{}, - Extensions: &tidbExtension{}, - } - } - - if len(b.config.Terminator) > 0 { - idx := bytes.IndexAny(b.data, b.config.Terminator) - if idx >= 0 { - encodedData = b.data[:idx] - b.data = b.data[idx+len(b.config.Terminator):] - } else { - encodedData = b.data - b.data = nil - } - } else { - encodedData = b.data - b.data = nil - } - - if len(encodedData) == 0 { - return model.MessageTypeUnknown, false, nil - } - - if err := json.Unmarshal(encodedData, msg); err != nil { - log.Error("canal-json decoder unmarshal data failed", - zap.Error(err), zap.ByteString("data", encodedData)) - return model.MessageTypeUnknown, false, err - } - b.msg = msg - return b.msg.messageType(), true, nil -} - -func (b *batchDecoder) assembleClaimCheckRowChangedEvent(ctx context.Context, claimCheckLocation string) (*common.RowChangedEvent, error) { - _, claimCheckFileName := filepath.Split(claimCheckLocation) - data, err := b.storage.ReadFile(ctx, claimCheckFileName) - if err != nil { - return nil, err - } - - if !b.config.LargeMessageHandle.ClaimCheckRawValue { - claimCheckM, err := ticommon.UnmarshalClaimCheckMessage(data) - if err != nil { - return nil, err - } - data = claimCheckM.Value - } - - value, err := ticommon.Decompress(b.config.LargeMessageHandle.LargeMessageHandleCompression, data) - if err != nil { - return nil, err - } - message := &canalJSONMessageWithTiDBExtension{} - err = json.Unmarshal(value, message) - if err != nil { - return nil, err - } - - b.msg = message - return b.NextRowChangedEvent() -} - -func (b *batchDecoder) buildData(holder *ticommon.ColumnsHolder) (map[string]interface{}, map[string]string, error) { - columnsCount := holder.Length() - data := make(map[string]interface{}, columnsCount) - mysqlTypeMap := make(map[string]string, columnsCount) - - for i := 0; i < columnsCount; i++ { - t := holder.Types[i] - name := holder.Types[i].Name() - mysqlType := strings.ToLower(t.DatabaseTypeName()) - - var value string - rawValue := holder.Values[i].([]uint8) - if utils.IsBinaryMySQLType(mysqlType) { - rawValue, err := b.bytesDecoder.Bytes(rawValue) - if err != nil { - return nil, nil, errors.Trace(err) - } - value = string(rawValue) - } else if strings.Contains(mysqlType, "bit") || strings.Contains(mysqlType, "set") { - bitValue := ticommon.MustBinaryLiteralToInt(rawValue) - value = strconv.FormatUint(bitValue, 10) - } else { - value = string(rawValue) - } - mysqlTypeMap[name] = mysqlType - data[name] = value - } - - return data, mysqlTypeMap, nil -} - -func (b *batchDecoder) assembleHandleKeyOnlyRowChangedEvent( - ctx context.Context, message *canalJSONMessageWithTiDBExtension, -) (*common.RowChangedEvent, error) { - var ( - commitTs = message.Extensions.CommitTs - schema = message.Schema - table = message.Table - eventType = message.EventType - ) - - handleKeyData := message.getData() - pkNames := make([]string, 0, len(handleKeyData)) - for name := range handleKeyData { - pkNames = append(pkNames, name) - } - - result := &canalJSONMessageWithTiDBExtension{ - JSONMessage: &JSONMessage{ - Schema: schema, - Table: table, - PKNames: pkNames, - - EventType: eventType, - }, - Extensions: &tidbExtension{ - CommitTs: commitTs, - }, - } - switch eventType { - case "INSERT": - holder := ticommon.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, handleKeyData) - data, mysqlType, err := b.buildData(holder) - if err != nil { - return nil, err - } - result.MySQLType = mysqlType - result.Data = []map[string]interface{}{data} - case "UPDATE": - holder := ticommon.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs, schema, table, handleKeyData) - data, mysqlType, err := b.buildData(holder) - if err != nil { - return nil, err - } - result.MySQLType = mysqlType - result.Data = []map[string]interface{}{data} - - holder = ticommon.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, message.getOld()) - old, _, err := b.buildData(holder) - if err != nil { - return nil, err - } - result.Old = []map[string]interface{}{old} - case "DELETE": - holder := ticommon.MustSnapshotQuery(ctx, b.upstreamTiDB, commitTs-1, schema, table, handleKeyData) - data, mysqlType, err := b.buildData(holder) - if err != nil { - return nil, err - } - result.MySQLType = mysqlType - result.Data = []map[string]interface{}{data} - } - - b.msg = result - return b.NextRowChangedEvent() -} - -// NextRowChangedEvent implements the ticommonDecoder interface -// `HasNext` should be called before this. -func (b *batchDecoder) NextRowChangedEvent() (*common.RowChangedEvent, error) { - if b.msg == nil || b.msg.messageType() != model.MessageTypeRow { - return nil, cerror.ErrCanalDecodeFailed. - GenWithStack("not found row changed event message") - } - - message, withExtension := b.msg.(*canalJSONMessageWithTiDBExtension) - if withExtension { - ctx := context.Background() - if message.Extensions.OnlyHandleKey { - return b.assembleHandleKeyOnlyRowChangedEvent(ctx, message) - } - if message.Extensions.ClaimCheckLocation != "" { - return b.assembleClaimCheckRowChangedEvent(ctx, message.Extensions.ClaimCheckLocation) - } - } - - result, err := canalJSONMessage2RowChange(b.msg) - if err != nil { - return nil, err - } - b.msg = nil - return result, nil -} - -// NextDDLEvent implements the ticommonDecoder interface -// `HasNext` should be called before this. -func (b *batchDecoder) NextDDLEvent() (*model.DDLEvent, error) { - if b.msg == nil || b.msg.messageType() != model.MessageTypeDDL { - return nil, cerror.ErrCanalDecodeFailed. - GenWithStack("not found ddl event message") - } - - result := canalJSONMessage2DDLEvent(b.msg) - b.msg = nil - return result, nil -} - -// NextResolvedEvent implements the ticommonDecoder interface -// `HasNext` should be called before this. -func (b *batchDecoder) NextResolvedEvent() (uint64, error) { - if b.msg == nil || b.msg.messageType() != model.MessageTypeResolved { - return 0, cerror.ErrCanalDecodeFailed. - GenWithStack("not found resolved event message") - } - - withExtensionEvent, ok := b.msg.(*canalJSONMessageWithTiDBExtension) - if !ok { - log.Error("canal-json resolved event message should have tidb extension, but not found", - zap.Any("msg", b.msg)) - return 0, cerror.ErrCanalDecodeFailed. - GenWithStack("MessageTypeResolved tidb extension not found") - } - b.msg = nil - return withExtensionEvent.Extensions.WatermarkTs, nil -} diff --git a/pkg/sink/codec/canal/canal_json_message.go b/pkg/sink/codec/canal/canal_json_message.go index 635f61c9e..c2490a5f4 100644 --- a/pkg/sink/codec/canal/canal_json_message.go +++ b/pkg/sink/codec/canal/canal_json_message.go @@ -155,55 +155,55 @@ func (c *canalJSONMessageWithTiDBExtension) getCommitTs() uint64 { return c.Extensions.CommitTs } -func canalJSONMessage2RowChange(msg canalJSONMessageInterface) (*common.RowChangedEvent, error) { - result := new(common.RowChangedEvent) - result.CommitTs = msg.getCommitTs() - mysqlType := msg.getMySQLType() - var err error - if msg.eventType() == canal.EventType_DELETE { - // for `DELETE` event, `data` contain the old data, set it as the `PreColumns` - preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType) - result.TableInfo = common.BuildTableInfoWithPKNames4Test(*msg.getSchema(), *msg.getTable(), preCols, msg.pkNameSet()) - result.PreColumns = preCols - return result, err - } - - // for `INSERT` and `UPDATE`, `data` contain fresh data, set it as the `Columns` - cols, err := canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType) - result.TableInfo = common.BuildTableInfoWithPKNames4Test(*msg.getSchema(), *msg.getTable(), cols, msg.pkNameSet()) - result.Columns = cols - if err != nil { - return nil, err - } - - // for `UPDATE`, `old` contain old data, set it as the `PreColumns` - if msg.eventType() == canal.EventType_UPDATE { - preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getOld(), mysqlType) - if len(preCols) < len(cols) { - newPreCols := make([]*common.Column, 0, len(preCols)) - j := 0 - // Columns are ordered by name - for _, col := range cols { - if j < len(preCols) && col.Name == preCols[j].Name { - newPreCols = append(newPreCols, preCols[j]) - j += 1 - } else { - newPreCols = append(newPreCols, col) - } - } - preCols = newPreCols - } - if len(preCols) != len(cols) { - log.Panic("column count mismatch", zap.Any("preCols", preCols), zap.Any("cols", cols)) - } - result.PreColumns = preCols - if err != nil { - return nil, err - } - } - - return result, nil -} +// func canalJSONMessage2RowChange(msg canalJSONMessageInterface) (*commonEvent.RowEvent, error) { +// result := new(common.RowChangedEvent) +// result.CommitTs = msg.getCommitTs() +// mysqlType := msg.getMySQLType() +// var err error +// if msg.eventType() == canal.EventType_DELETE { +// // for `DELETE` event, `data` contain the old data, set it as the `PreColumns` +// preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType) +// result.TableInfo = common.BuildTableInfoWithPKNames4Test(*msg.getSchema(), *msg.getTable(), preCols, msg.pkNameSet()) +// result.PreColumns = preCols +// return result, err +// } + +// // for `INSERT` and `UPDATE`, `data` contain fresh data, set it as the `Columns` +// cols, err := canalJSONColumnMap2RowChangeColumns(msg.getData(), mysqlType) +// result.TableInfo = common.BuildTableInfoWithPKNames4Test(*msg.getSchema(), *msg.getTable(), cols, msg.pkNameSet()) +// result.Columns = cols +// if err != nil { +// return nil, err +// } + +// // for `UPDATE`, `old` contain old data, set it as the `PreColumns` +// if msg.eventType() == canal.EventType_UPDATE { +// preCols, err := canalJSONColumnMap2RowChangeColumns(msg.getOld(), mysqlType) +// if len(preCols) < len(cols) { +// newPreCols := make([]*common.Column, 0, len(preCols)) +// j := 0 +// // Columns are ordered by name +// for _, col := range cols { +// if j < len(preCols) && col.Name == preCols[j].Name { +// newPreCols = append(newPreCols, preCols[j]) +// j += 1 +// } else { +// newPreCols = append(newPreCols, col) +// } +// } +// preCols = newPreCols +// } +// if len(preCols) != len(cols) { +// log.Panic("column count mismatch", zap.Any("preCols", preCols), zap.Any("cols", cols)) +// } +// result.PreColumns = preCols +// if err != nil { +// return nil, err +// } +// } + +// return result, nil +// } func canalJSONColumnMap2RowChangeColumns(cols map[string]interface{}, mysqlType map[string]string) ([]*common.Column, error) { result := make([]*common.Column, 0, len(cols)) diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder.go b/pkg/sink/codec/canal/canal_json_row_event_encoder.go index be511fe8f..957004d23 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder.go @@ -19,7 +19,9 @@ import ( "github.com/flowbehappy/tigate/pkg/common" commonEvent "github.com/flowbehappy/tigate/pkg/common/event" + newcommon "github.com/flowbehappy/tigate/pkg/sink/codec/common" "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" + "github.com/flowbehappy/tigate/pkg/sink/codec/internal" "github.com/goccy/go-json" "github.com/mailru/easyjson/jwriter" "github.com/pingcap/errors" @@ -34,27 +36,62 @@ import ( ) func fillColumns( - columns []*common.Column, - onlyOutputUpdatedColumn bool, + valueMap map[int64]string, + tableInfo *common.TableInfo, onlyHandleKeyColumn bool, - newColumnMap map[string]*common.Column, out *jwriter.Writer, - builder *canalEntryBuilder, ) error { - if len(columns) == 0 { + if len(tableInfo.Columns) == 0 { out.RawString("null") return nil } out.RawByte('[') out.RawByte('{') isFirst := true - for _, col := range columns { + for _, col := range tableInfo.Columns { if col != nil { + colID := col.ID + if onlyHandleKeyColumn && !tableInfo.ColumnsFlag[colID].IsHandleKey() { + continue + } + if isFirst { + isFirst = false + } else { + out.RawByte(',') + } + out.String(col.Name.O) + out.RawByte(':') + out.String(valueMap[colID]) + } + } + out.RawByte('}') + out.RawByte(']') + return nil +} + +func fillUpdateColumns( + newValueMap map[int64]string, + oldValueMap map[int64]string, + tableInfo *common.TableInfo, + onlyHandleKeyColumn bool, + onlyOutputUpdatedColumn bool, + out *jwriter.Writer, +) error { + if len(tableInfo.Columns) == 0 { + out.RawString("null") + return nil + } + out.RawByte('[') + out.RawByte('{') + isFirst := true + for _, col := range tableInfo.Columns { + if col != nil { + colID := col.ID // column equal, do not output it - if onlyOutputUpdatedColumn && shouldIgnoreColumn(col, newColumnMap) { + if onlyOutputUpdatedColumn && newValueMap[colID] == oldValueMap[colID] { continue } - if onlyHandleKeyColumn && !col.Flag.IsHandleKey() { + if onlyHandleKeyColumn && !tableInfo.ColumnsFlag[colID].IsHandleKey() { continue } if isFirst { @@ -62,17 +99,9 @@ func fillColumns( } else { out.RawByte(',') } - value, err := builder.formatValue(col.Value, col.Flag.IsBinary()) - if err != nil { - return cerror.WrapError(cerror.ErrCanalEncodeFailed, err) - } - out.String(col.Name) + out.String(col.Name.O) out.RawByte(':') - if col.Value == nil { - out.RawString("null") - } else { - out.String(value) - } + out.String(oldValueMap[colID]) } } out.RawByte('}') @@ -82,8 +111,8 @@ func fillColumns( func newJSONMessageForDML( builder *canalEntryBuilder, - e *commonEvent.RowChangedEvent, - config *ticommon.Config, + e *commonEvent.RowEvent, + config *newcommon.Config, messageTooLarge bool, claimCheckFileName string, ) ([]byte, error) { @@ -94,7 +123,18 @@ func newJSONMessageForDML( onlyHandleKey = true } - mysqlTypeMap := make(map[string]string, len(e.Columns)) + columnLen := 0 + for _, col := range e.TableInfo.Columns { + if e.ColumnSelector.Select(col) { + columnLen += 1 + } + } + if columnLen == 0 { + return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found invlaid columns for the event") + } + + mysqlTypeMap := make(map[string]string, columnLen) + // TODO: use util.JsonWriter out := &jwriter.Writer{} out.RawByte('{') { @@ -154,22 +194,36 @@ func newJSONMessageForDML( out.RawString(prefix) out.String("") } - { - columns := e.PreColumns - if !isDelete { - columns = e.Columns + + valueMap := make(map[int64]string, 0) // colId -> value + javaTypeMap := make(map[int64]internal.JavaSQLType, 0) // colId -> javaType + + // tmd 什么垃圾写法,给我改了 + row := e.GetRows() + if e.IsDelete() { + row = e.GetPreRows() + } + for idx, col := range e.TableInfo.Columns { + flag := e.TableInfo.ColumnsFlag[col.ID] + value, javaType, err := formatColumnValue(row, idx, col, flag) + if err != nil { + return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) } + valueMap[col.ID] = value + javaTypeMap[col.ID] = javaType + } + + { const prefix string = ",\"sqlType\":" out.RawString(prefix) emptyColumn := true tableInfo := e.TableInfo - for _, col := range columns { + columnInfos := tableInfo.Columns + for _, col := range columnInfos { if col != nil { - colFlag := col.Flag - colID := tableInfo.ForceGetColumnIDByName(col.Name) - columnInfo := tableInfo.ForceGetColumnInfo(colID) - colType := col.Type - colName := col.Name + colID := col.ID + colFlag := tableInfo.ColumnsFlag[colID] + colName := col.Name.O if onlyHandleKey && !colFlag.IsHandleKey() { continue } @@ -179,14 +233,11 @@ func newJSONMessageForDML( } else { out.RawByte(',') } - javaType, err := getJavaSQLType(col.Value, colType, colFlag) - if err != nil { - return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) - } + out.String(colName) out.RawByte(':') - out.Int32(int32(javaType)) - mysqlTypeMap[colName] = utils.GetMySQLType(columnInfo, config.ContentCompatible) + out.Int32(int32(javaTypeMap[colID])) + mysqlTypeMap[colName] = utils.GetMySQLType(col, config.ContentCompatible) } } if emptyColumn { @@ -220,41 +271,35 @@ func newJSONMessageForDML( if e.IsDelete() { out.RawString(",\"old\":null") out.RawString(",\"data\":") - if err := fillColumns( - e.GetPreColumns(), - false, onlyHandleKey, nil, out, builder, - ); err != nil { + if err := fillColumns(valueMap, e.TableInfo, onlyHandleKey, out); err != nil { return nil, err } } else if e.IsInsert() { out.RawString(",\"old\":null") out.RawString(",\"data\":") - if err := fillColumns( - e.GetColumns(), - false, onlyHandleKey, nil, out, builder, - ); err != nil { + if err := fillColumns(valueMap, e.TableInfo, onlyHandleKey, out); err != nil { return nil, err } } else if e.IsUpdate() { - var newColsMap map[string]*common.Column - if config.OnlyOutputUpdatedColumns { - newColsMap = make(map[string]*common.Column, len(e.Columns)) - for _, col := range e.GetColumns() { - newColsMap[col.Name] = col + out.RawString(",\"old\":") + + oldValueMap := make(map[int64]string, 0) // colId -> value + preRow := e.GetPreRows() + for idx, col := range e.TableInfo.Columns { + flag := e.TableInfo.ColumnsFlag[col.ID] + value, _, err := formatColumnValue(preRow, idx, col, flag) + if err != nil { + return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) } + oldValueMap[col.ID] = value } - out.RawString(",\"old\":") - if err := fillColumns( - e.GetPreColumns(), - config.OnlyOutputUpdatedColumns, onlyHandleKey, newColsMap, out, builder, - ); err != nil { + + if err := fillUpdateColumns(valueMap, oldValueMap, e.TableInfo, onlyHandleKey, + config.OnlyOutputUpdatedColumns, out); err != nil { return nil, err } out.RawString(",\"data\":") - if err := fillColumns( - e.GetColumns(), - false, onlyHandleKey, nil, out, builder, - ); err != nil { + if err := fillColumns(valueMap, e.TableInfo, onlyHandleKey, out); err != nil { return nil, err } } else { @@ -293,11 +338,11 @@ func newJSONMessageForDML( return value, nil } -func eventTypeString(e *commonEvent.RowChangedEvent) string { +func eventTypeString(e *commonEvent.RowEvent) string { if e.IsDelete() { return "DELETE" } - if len(e.PreColumns) == 0 { + if e.IsInsert() { return "INSERT" } return "UPDATE" @@ -310,11 +355,11 @@ type JSONRowEventEncoder struct { claimCheck *claimcheck.ClaimCheck - config *ticommon.Config + config *newcommon.Config } // newJSONRowEventEncoder creates a new JSONRowEventEncoder -func NewJSONRowEventEncoder(ctx context.Context, config *ticommon.Config) (encoder.EventEncoder, error) { +func NewJSONRowEventEncoder(ctx context.Context, config *newcommon.Config) (encoder.EventEncoder, error) { claimCheck, err := claimcheck.New(ctx, config.LargeMessageHandle, config.ChangefeedID) if err != nil { return nil, errors.Trace(err) @@ -330,14 +375,14 @@ func NewJSONRowEventEncoder(ctx context.Context, config *ticommon.Config) (encod }, nil } -func (c *JSONRowEventEncoder) newJSONMessageForDDL(e *model.DDLEvent) canalJSONMessageInterface { +func (c *JSONRowEventEncoder) newJSONMessageForDDL(e *commonEvent.DDLEvent) canalJSONMessageInterface { msg := &JSONMessage{ ID: 0, // ignored by both Canal Adapter and Flink - Schema: e.TableInfo.TableName.Schema, - Table: e.TableInfo.TableName.Table, + Schema: e.SchemaName, + Table: e.TableName, IsDDL: true, EventType: convertDdlEventType(e).String(), - ExecutionTime: convertToCanalTs(e.CommitTs), + ExecutionTime: convertToCanalTs(e.GetCommitTs()), BuildTime: time.Now().UnixMilli(), // timestamp Query: e.Query, } @@ -348,7 +393,7 @@ func (c *JSONRowEventEncoder) newJSONMessageForDDL(e *model.DDLEvent) canalJSONM return &canalJSONMessageWithTiDBExtension{ JSONMessage: msg, - Extensions: &tidbExtension{CommitTs: e.CommitTs}, + Extensions: &tidbExtension{CommitTs: e.GetCommitTs()}, } } @@ -393,8 +438,7 @@ func (c *JSONRowEventEncoder) EncodeCheckpointEvent(ts uint64) (*ticommon.Messag func (c *JSONRowEventEncoder) AppendRowChangedEvent( ctx context.Context, _ string, - e *commonEvent.RowChangedEvent, - callback func(), + e *commonEvent.RowEvent, ) error { value, err := newJSONMessageForDML(c.builder, e, c.config, false, "") if err != nil { @@ -415,7 +459,7 @@ func (c *JSONRowEventEncoder) AppendRowChangedEvent( Table: e.TableInfo.GetTableNamePtr(), Type: model.MessageTypeRow, Protocol: config.ProtocolCanalJSON, - Callback: callback, + Callback: e.Callback, } m.IncRowsCount() @@ -465,7 +509,7 @@ func (c *JSONRowEventEncoder) AppendRowChangedEvent( return errors.Trace(err) } - m, err = c.newClaimCheckLocationMessage(e, callback, claimCheckFileName) + m, err = c.newClaimCheckLocationMessage(e, claimCheckFileName) if err != nil { return errors.Trace(err) } @@ -477,7 +521,7 @@ func (c *JSONRowEventEncoder) AppendRowChangedEvent( } func (c *JSONRowEventEncoder) newClaimCheckLocationMessage( - event *commonEvent.RowChangedEvent, callback func(), fileName string, + event *commonEvent.RowEvent, fileName string, ) (*ticommon.Message, error) { claimCheckLocation := c.claimCheck.FileNameWithPrefix(fileName) value, err := newJSONMessageForDML(c.builder, event, c.config, true, claimCheckLocation) @@ -493,7 +537,7 @@ func (c *JSONRowEventEncoder) newClaimCheckLocationMessage( } result := ticommon.NewMsg(config.ProtocolCanalJSON, nil, value, 0, model.MessageTypeRow, nil, nil) - result.Callback = callback + result.Callback = event.Callback result.IncRowsCount() length := result.Length() @@ -520,19 +564,26 @@ func (c *JSONRowEventEncoder) Build() []*ticommon.Message { // EncodeDDLEvent encodes DDL events func (c *JSONRowEventEncoder) EncodeDDLEvent(e *commonEvent.DDLEvent) (*ticommon.Message, error) { - // message := c.newJSONMessageForDDL(e) - // value, err := json.Marshal(message) - // if err != nil { - // return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) - // } - // value, err = ticommon.Compress( - // c.config.ChangefeedID, c.config.LargeMessageHandle.LargeMessageHandleCompression, value, - // ) - // if err != nil { - // return nil, errors.Trace(err) - // } - // return ticommon.NewDDLMsg(config.ProtocolCanalJSON, nil, value, e), nil - return nil, nil + message := c.newJSONMessageForDDL(e) + value, err := json.Marshal(message) + if err != nil { + return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) + } + value, err = ticommon.Compress( + c.config.ChangefeedID, c.config.LargeMessageHandle.LargeMessageHandleCompression, value, + ) + if err != nil { + return nil, errors.Trace(err) + } + + return &ticommon.Message{ + Key: nil, + Value: value, + Type: model.MessageTypeDDL, + Protocol: config.ProtocolCanalJSON, + Table: &e.TableName, + Schema: &e.SchemaName, + }, nil } func (b *JSONRowEventEncoder) Clean() { @@ -540,20 +591,3 @@ func (b *JSONRowEventEncoder) Clean() { b.claimCheck.CleanMetrics() } } - -func shouldIgnoreColumn(col *common.Column, - newColumnMap map[string]*common.Column, -) bool { - newCol, ok := newColumnMap[col.Name] - if ok && newCol != nil { - // sql type is not equal - if newCol.Type != col.Type { - return false - } - // value equal - if encoder.IsColumnValueEqual(newCol.Value, col.Value) { - return true - } - } - return false -} diff --git a/pkg/sink/codec/decoder/decoder.go b/pkg/sink/codec/decoder/decoder.go index b4ac8cac7..77d02af1c 100644 --- a/pkg/sink/codec/decoder/decoder.go +++ b/pkg/sink/codec/decoder/decoder.go @@ -14,7 +14,6 @@ package decoder import ( - "github.com/flowbehappy/tigate/pkg/common" "github.com/pingcap/tiflow/cdc/model" ) @@ -34,7 +33,7 @@ type RowEventDecoder interface { // NextResolvedEvent returns the next resolved event if exists NextResolvedEvent() (uint64, error) // NextRowChangedEvent returns the next row changed event if exists - NextRowChangedEvent() (*common.RowChangedEvent, error) + // NextRowChangedEvent() (*common.RowChangedEvent, error) // NextDDLEvent returns the next DDL event if exists NextDDLEvent() (*model.DDLEvent, error) } diff --git a/pkg/sink/codec/encoder_builder.go b/pkg/sink/codec/encoder_builder.go index d1166ee03..fe1d10c42 100644 --- a/pkg/sink/codec/encoder_builder.go +++ b/pkg/sink/codec/encoder_builder.go @@ -3,6 +3,7 @@ package codec import ( "context" + "github.com/flowbehappy/tigate/pkg/sink/codec/canal" "github.com/flowbehappy/tigate/pkg/sink/codec/common" "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" "github.com/flowbehappy/tigate/pkg/sink/codec/open" @@ -14,12 +15,10 @@ func NewEventEncoder(ctx context.Context, cfg *common.Config) (encoder.EventEnco switch cfg.Protocol { case config.ProtocolDefault, config.ProtocolOpen: return open.NewBatchEncoder(ctx, cfg) - // case config.ProtocolCanal: - // return canal.NewBatchEncoder(cfg) // case config.ProtocolAvro: // return avro.NewAvroEncoder(ctx, cfg) - // case config.ProtocolCanalJSON: - // return canal.NewJSONRowEventEncoder(ctx, cfg) + case config.ProtocolCanalJSON: + return canal.NewJSONRowEventEncoder(ctx, cfg) // case config.ProtocolCraft: // return craft.NewBatchEncoder(cfg), nil // case config.ProtocolDebezium: