From 5771d1d326eb78ad2d2381ab889a8d33322033a4 Mon Sep 17 00:00:00 2001 From: He Wang Date: Thu, 23 Jan 2025 18:59:27 +0800 Subject: [PATCH] chore: update cli module shaded config and add e2e test (#121) --- docs/cli/flink-cdc/flink-cdc-source.md | 17 ++-- docs/cli/flink-cdc/flink-cdc-source_cn.md | 17 ++-- flink-connector-oceanbase-cli/pom.xml | 96 +++++++------------ .../connector/flink/{CdcCli.java => Cli.java} | 80 ++++++++++------ .../CliConfig.java} | 7 +- .../ParsingProcessFunction.java | 9 +- .../cdc/CdcSync.java => process/Sync.java} | 45 +++++---- .../TableNameConverter.java | 2 +- .../OceanBaseJsonDeserializationSchema.java | 10 +- .../flink/source/cdc/mysql/MysqlCdcSync.java | 6 +- .../source/cdc/mysql/MysqlDateConverter.java | 40 ++++---- .../OceanBaseJsonSerializationSchema.java | 7 +- .../connector/flink/MysqlCdcSyncITCase.java | 82 +++++++--------- .../src/test/resources/sql/mysql-cdc.sql | 4 - flink-connector-oceanbase-e2e-tests/pom.xml | 24 +++++ .../flink/MysqlCdcSyncE2eITCase.java | 92 ++++++++++++++++++ .../utils/FlinkContainerTestEnvironment.java | 74 ++++++++++++-- .../src/test/resources/docker/mysql/my.cnf | 65 +++++++++++++ .../src/test/resources/sql/mysql-cdc.sql | 49 ++++++++++ flink-sql-connector-obkv-hbase/pom.xml | 3 + .../pom.xml | 3 + flink-sql-connector-oceanbase/pom.xml | 3 + pom.xml | 14 +++ 23 files changed, 513 insertions(+), 236 deletions(-) rename flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/{CdcCli.java => Cli.java} (57%) rename flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/{source/cdc/CdcSyncConfig.java => config/CliConfig.java} (95%) rename flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/{source/cdc => process}/ParsingProcessFunction.java (92%) rename flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/{source/cdc/CdcSync.java => process/Sync.java} (88%) rename flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/{source => process}/TableNameConverter.java (98%) create mode 100644 flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/MysqlCdcSyncE2eITCase.java create mode 100644 flink-connector-oceanbase-e2e-tests/src/test/resources/docker/mysql/my.cnf create mode 100644 flink-connector-oceanbase-e2e-tests/src/test/resources/sql/mysql-cdc.sql diff --git a/docs/cli/flink-cdc/flink-cdc-source.md b/docs/cli/flink-cdc/flink-cdc-source.md index bea8d180..108fbb89 100644 --- a/docs/cli/flink-cdc/flink-cdc-source.md +++ b/docs/cli/flink-cdc/flink-cdc-source.md @@ -64,20 +64,19 @@ Replace the following command with your real database information, and execute i $FLINK_HOME/bin/flink run \ -Dexecution.checkpointing.interval=10s \ -Dparallelism.default=1 \ - -c com.oceanbase.connector.flink.CdcCli \ lib/flink-connector-oceanbase-cli-xxx.jar \ - mysql-cdc \ - --database test_db \ + --source-type mysql-cdc \ --source-conf hostname=xxxx \ --source-conf port=3306 \ --source-conf username=root \ --source-conf password=xxxx \ --source-conf database-name=test_db \ --source-conf table-name=.* \ - --including-tables ".*" \ --sink-conf username=xxxx \ --sink-conf password=xxxx \ - --sink-conf url=jdbc:mysql://xxxx:xxxx + --sink-conf url=jdbc:mysql://xxxx:xxxx \ + --database test_db \ + --including-tables ".*" ``` ### Check and Verify @@ -101,18 +100,18 @@ You can go on insert test data to MySQL database, since it is a CDC task, after - ${job-type} + --source-type Yes Enumeration value - Job type, can be mysql-cdc. + Source type, can be mysql-cdc. --source-conf Yes Multi-value parameter - Configurations of specific Flink CDC Source. + Configurations of the specific source. --sink-conf @@ -125,7 +124,7 @@ You can go on insert test data to MySQL database, since it is a CDC task, after --job-name No String - ${job-type} Sync + ${source-type} Sync The Flink job name. diff --git a/docs/cli/flink-cdc/flink-cdc-source_cn.md b/docs/cli/flink-cdc/flink-cdc-source_cn.md index f9c51d0f..8398c57b 100644 --- a/docs/cli/flink-cdc/flink-cdc-source_cn.md +++ b/docs/cli/flink-cdc/flink-cdc-source_cn.md @@ -64,20 +64,19 @@ VALUES (default, "Sally", "Thomas", "sally.thomas@acme.com"), $FLINK_HOME/bin/flink run \ -Dexecution.checkpointing.interval=10s \ -Dparallelism.default=1 \ - -c com.oceanbase.connector.flink.CdcCli \ lib/flink-connector-oceanbase-cli-xxx.jar \ - mysql-cdc \ - --database test_db \ + --source-type mysql-cdc \ --source-conf hostname=xxxx \ --source-conf port=3306 \ --source-conf username=root \ --source-conf password=xxxx \ --source-conf database-name=test_db \ --source-conf table-name=.* \ - --including-tables ".*" \ --sink-conf username=xxxx \ --sink-conf password=xxxx \ - --sink-conf url=jdbc:mysql://xxxx:xxxx + --sink-conf url=jdbc:mysql://xxxx:xxxx \ + --database test_db \ + --including-tables ".*" ``` 请将以上的数据库信息替换为您真实的数据库信息,当出现类似于以下的信息时,任务构建成功并提交。 @@ -103,18 +102,18 @@ $FLINK_HOME/bin/flink run \ - ${job-type} + --source-type 是 枚举值 - 任务类型,可以是 mysql-cdc。 + 源端类型,可以是 mysql-cdc。 --source-conf 是 多值参数 - 指定类型的 Flink CDC 源端连接器的配置参数。 + 指定类型的源端的配置参数。 --sink-conf @@ -127,7 +126,7 @@ $FLINK_HOME/bin/flink run \ --job-name 否 String - ${job-type} Sync + ${source-type} Sync Flink 任务名称。 diff --git a/flink-connector-oceanbase-cli/pom.xml b/flink-connector-oceanbase-cli/pom.xml index cd270110..f51b4a53 100644 --- a/flink-connector-oceanbase-cli/pom.xml +++ b/flink-connector-oceanbase-cli/pom.xml @@ -25,15 +25,10 @@ under the License. flink-connector-oceanbase-cli jar - - 3.2.1 - 19.3.0.0 - - com.oceanbase - flink-connector-oceanbase + flink-sql-connector-oceanbase ${project.version} @@ -49,62 +44,6 @@ under the License. - - org.apache.flink - flink-sql-connector-oracle-cdc - ${flink.cdc.version} - provided - - - org.apache.flink - flink-shaded-guava - - - - - org.apache.flink - flink-sql-connector-postgres-cdc - ${flink.cdc.version} - provided - - - org.apache.flink - flink-shaded-guava - - - - - - org.apache.flink - flink-sql-connector-sqlserver-cdc - ${flink.cdc.version} - provided - - - org.apache.flink - flink-shaded-guava - - - - - org.apache.flink - flink-sql-connector-db2-cdc - ${flink.cdc.version} - provided - - - org.apache.flink - flink-shaded-guava - - - - - - com.oracle.ojdbc - ojdbc8 - ${ojdbc.version} - provided - com.oceanbase @@ -121,4 +60,37 @@ under the License. + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-flink + + shade + + package + + false + false + false + + + *:* + + + + + com.oceanbase.connector.flink.Cli + + + + + + + + + diff --git a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/CdcCli.java b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/Cli.java similarity index 57% rename from flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/CdcCli.java rename to flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/Cli.java index 97b852e5..5978e956 100644 --- a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/CdcCli.java +++ b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/Cli.java @@ -16,12 +16,14 @@ package com.oceanbase.connector.flink; -import com.oceanbase.connector.flink.source.cdc.CdcSync; -import com.oceanbase.connector.flink.source.cdc.CdcSyncConfig; +import com.oceanbase.connector.flink.config.CliConfig; +import com.oceanbase.connector.flink.process.Sync; import com.oceanbase.connector.flink.source.cdc.mysql.MysqlCdcSync; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.StringUtils; @@ -31,47 +33,60 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.Objects; -public class CdcCli { - private static final Logger LOG = LoggerFactory.getLogger(CdcCli.class); +public class Cli { + private static final Logger LOG = LoggerFactory.getLogger(Cli.class); + + private static StreamExecutionEnvironment flinkEnvironmentForTesting; + private static JobClient jobClientForTesting; + + @VisibleForTesting + public static void setStreamExecutionEnvironmentForTesting(StreamExecutionEnvironment env) { + flinkEnvironmentForTesting = env; + } + + @VisibleForTesting + public static JobClient getJobClientForTesting() { + return jobClientForTesting; + } public static void main(String[] args) throws Exception { - LOG.info("Starting CdcCli with args: {}", Arrays.toString(args)); + LOG.info("Starting CLI with args: {}", Arrays.toString(args)); - String jobType = args[0]; - String[] opArgs = Arrays.copyOfRange(args, 1, args.length); - MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + MultipleParameterTool params = MultipleParameterTool.fromArgs(args); + String sourceType = params.getRequired(CliConfig.SOURCE_TYPE); - CdcSync cdcSync; - switch (jobType.trim().toLowerCase()) { - case CdcSyncConfig.MYSQL_CDC: - cdcSync = new MysqlCdcSync(); + Sync sync; + switch (sourceType.trim().toLowerCase()) { + case CliConfig.MYSQL_CDC: + sync = new MysqlCdcSync(); break; default: - throw new RuntimeException("Unsupported job type: " + jobType); + throw new RuntimeException("Unsupported source type: " + sourceType); } - Map sourceConfigMap = getConfigMap(params, CdcSyncConfig.SOURCE_CONF); + Map sourceConfigMap = getConfigMap(params, CliConfig.SOURCE_CONF); Configuration sourceConfig = Configuration.fromMap(sourceConfigMap); - Map sinkConfigMap = getConfigMap(params, CdcSyncConfig.SINK_CONF); + Map sinkConfigMap = getConfigMap(params, CliConfig.SINK_CONF); Configuration sinkConfig = Configuration.fromMap(sinkConfigMap); - String jobName = params.get(CdcSyncConfig.JOB_NAME); - String database = params.get(CdcSyncConfig.DATABASE); - String tablePrefix = params.get(CdcSyncConfig.TABLE_PREFIX); - String tableSuffix = params.get(CdcSyncConfig.TABLE_SUFFIX); - String includingTables = params.get(CdcSyncConfig.INCLUDING_TABLES); - String excludingTables = params.get(CdcSyncConfig.EXCLUDING_TABLES); - String multiToOneOrigin = params.get(CdcSyncConfig.MULTI_TO_ONE_ORIGIN); - String multiToOneTarget = params.get(CdcSyncConfig.MULTI_TO_ONE_TARGET); + String jobName = params.get(CliConfig.JOB_NAME); + String database = params.get(CliConfig.DATABASE); + String tablePrefix = params.get(CliConfig.TABLE_PREFIX); + String tableSuffix = params.get(CliConfig.TABLE_SUFFIX); + String includingTables = params.get(CliConfig.INCLUDING_TABLES); + String excludingTables = params.get(CliConfig.EXCLUDING_TABLES); + String multiToOneOrigin = params.get(CliConfig.MULTI_TO_ONE_ORIGIN); + String multiToOneTarget = params.get(CliConfig.MULTI_TO_ONE_TARGET); - boolean createTableOnly = params.has(CdcSyncConfig.CREATE_TABLE_ONLY); - boolean ignoreDefaultValue = params.has(CdcSyncConfig.IGNORE_DEFAULT_VALUE); - boolean ignoreIncompatible = params.has(CdcSyncConfig.IGNORE_INCOMPATIBLE); + boolean createTableOnly = params.has(CliConfig.CREATE_TABLE_ONLY); + boolean ignoreDefaultValue = params.has(CliConfig.IGNORE_DEFAULT_VALUE); + boolean ignoreIncompatible = params.has(CliConfig.IGNORE_INCOMPATIBLE); - cdcSync.setEnv(env) + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + sync.setEnv(env) .setSourceConfig(sourceConfig) .setSinkConfig(sinkConfig) .setDatabase(database) @@ -87,9 +102,14 @@ public static void main(String[] args) throws Exception { .build(); if (StringUtils.isNullOrWhitespaceOnly(jobName)) { - jobName = String.format("%s Sync", jobType); + jobName = String.format("%s Sync", sourceType); + } + + if (Objects.nonNull(flinkEnvironmentForTesting)) { + jobClientForTesting = env.executeAsync(); + } else { + env.execute(jobName); } - env.execute(jobName); } public static Map getConfigMap(MultipleParameterTool params, String key) { diff --git a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/CdcSyncConfig.java b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/config/CliConfig.java similarity index 95% rename from flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/CdcSyncConfig.java rename to flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/config/CliConfig.java index 286a0d86..158f6320 100644 --- a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/CdcSyncConfig.java +++ b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/config/CliConfig.java @@ -14,9 +14,12 @@ * limitations under the License. */ -package com.oceanbase.connector.flink.source.cdc; +package com.oceanbase.connector.flink.config; -public class CdcSyncConfig { +public class CliConfig { + + /** Option key for source type. */ + public static final String SOURCE_TYPE = "source-type"; /** Option key for cdc source. */ public static final String SOURCE_CONF = "source-conf"; diff --git a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/ParsingProcessFunction.java b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/process/ParsingProcessFunction.java similarity index 92% rename from flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/ParsingProcessFunction.java rename to flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/process/ParsingProcessFunction.java index 287f3a98..713fe189 100644 --- a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/ParsingProcessFunction.java +++ b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/process/ParsingProcessFunction.java @@ -13,18 +13,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package com.oceanbase.connector.flink.source.cdc; - -import com.oceanbase.connector.flink.source.TableNameConverter; +package com.oceanbase.connector.flink.process; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import java.util.HashMap; import java.util.Map; diff --git a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/CdcSync.java b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/process/Sync.java similarity index 88% rename from flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/CdcSync.java rename to flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/process/Sync.java index 63aa717a..add966ed 100644 --- a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/CdcSync.java +++ b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/process/Sync.java @@ -13,13 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.oceanbase.connector.flink.source.cdc; +package com.oceanbase.connector.flink.process; import com.oceanbase.connector.flink.OceanBaseConnectorOptions; import com.oceanbase.connector.flink.connection.OceanBaseConnectionProvider; import com.oceanbase.connector.flink.sink.OceanBaseRecordFlusher; import com.oceanbase.connector.flink.sink.OceanBaseSink; -import com.oceanbase.connector.flink.source.TableNameConverter; import com.oceanbase.connector.flink.source.TableSchema; import com.oceanbase.connector.flink.table.DataChangeRecord; import com.oceanbase.connector.flink.table.OceanBaseJsonSerializationSchema; @@ -53,8 +52,8 @@ import static com.oceanbase.connector.flink.utils.OceanBaseCatalogUtils.tableExists; import static org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.PROPERTIES_PREFIX; -public abstract class CdcSync { - private static final Logger LOG = LoggerFactory.getLogger(CdcSync.class); +public abstract class Sync { + private static final Logger LOG = LoggerFactory.getLogger(Sync.class); protected StreamExecutionEnvironment env; protected Configuration sourceConfig; @@ -71,67 +70,67 @@ public abstract class CdcSync { protected boolean ignoreDefaultValue; protected boolean ignoreIncompatible; - public CdcSync setEnv(StreamExecutionEnvironment env) { + public Sync setEnv(StreamExecutionEnvironment env) { this.env = env; return this; } - public CdcSync setSourceConfig(Configuration sourceConfig) { + public Sync setSourceConfig(Configuration sourceConfig) { this.sourceConfig = sourceConfig; return this; } - public CdcSync setSinkConfig(Configuration sinkConfig) { + public Sync setSinkConfig(Configuration sinkConfig) { this.sinkConfig = sinkConfig; return this; } - public CdcSync setDatabase(String database) { + public Sync setDatabase(String database) { this.database = database; return this; } - public CdcSync setTablePrefix(String tablePrefix) { + public Sync setTablePrefix(String tablePrefix) { this.tablePrefix = tablePrefix; return this; } - public CdcSync setTableSuffix(String tableSuffix) { + public Sync setTableSuffix(String tableSuffix) { this.tableSuffix = tableSuffix; return this; } - public CdcSync setIncludingTables(String includingTables) { + public Sync setIncludingTables(String includingTables) { this.includingTables = includingTables; return this; } - public CdcSync setExcludingTables(String excludingTables) { + public Sync setExcludingTables(String excludingTables) { this.excludingTables = excludingTables; return this; } - public CdcSync setMultiToOneOrigin(String multiToOneOrigin) { + public Sync setMultiToOneOrigin(String multiToOneOrigin) { this.multiToOneOrigin = multiToOneOrigin; return this; } - public CdcSync setMultiToOneTarget(String multiToOneTarget) { + public Sync setMultiToOneTarget(String multiToOneTarget) { this.multiToOneTarget = multiToOneTarget; return this; } - public CdcSync setCreateTableOnly(boolean createTableOnly) { + public Sync setCreateTableOnly(boolean createTableOnly) { this.createTableOnly = createTableOnly; return this; } - public CdcSync setIgnoreDefaultValue(boolean ignoreDefaultValue) { + public Sync setIgnoreDefaultValue(boolean ignoreDefaultValue) { this.ignoreDefaultValue = ignoreDefaultValue; return this; } - public CdcSync setIgnoreIncompatible(boolean ignoreIncompatible) { + public Sync setIgnoreIncompatible(boolean ignoreIncompatible) { this.ignoreIncompatible = ignoreIncompatible; return this; } @@ -174,7 +173,7 @@ protected boolean isSyncNeeded(String tableName) { protected abstract List getTableSchemas(); - protected abstract DataStreamSource buildCdcSource(); + protected abstract DataStreamSource buildSource(); public void build() { this.includingPattern = includingTables == null ? null : Pattern.compile(includingTables); @@ -213,13 +212,17 @@ public void build() { return; } - DataStreamSource cdcSource = buildCdcSource(); + DataStreamSource source = buildSource(); + SingleOutputStreamOperator parsedStream = - cdcSource.process(new ParsingProcessFunction(tableNameConverter)); + source.process(new ParsingProcessFunction(tableNameConverter)); + for (Tuple2 dbTbl : targetTables) { + String tableName = dbTbl.f1; OutputTag recordOutputTag = - ParsingProcessFunction.createRecordOutputTag(dbTbl.f1); + ParsingProcessFunction.createRecordOutputTag(tableName); DataStream sideOutput = parsedStream.getSideOutput(recordOutputTag); + int sinkParallel = sinkConfig.getInteger( FactoryUtil.SINK_PARALLELISM, sideOutput.getParallelism()); diff --git a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/TableNameConverter.java b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/process/TableNameConverter.java similarity index 98% rename from flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/TableNameConverter.java rename to flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/process/TableNameConverter.java index 7a7e512d..f5084217 100644 --- a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/TableNameConverter.java +++ b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/process/TableNameConverter.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.oceanbase.connector.flink.source; +package com.oceanbase.connector.flink.process; import org.apache.flink.util.StringUtils; diff --git a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/OceanBaseJsonDeserializationSchema.java b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/OceanBaseJsonDeserializationSchema.java index e72aaaef..c5789365 100644 --- a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/OceanBaseJsonDeserializationSchema.java +++ b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/OceanBaseJsonDeserializationSchema.java @@ -26,11 +26,11 @@ import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.util.Collector; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeFactory; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import java.math.BigDecimal; import java.nio.ByteBuffer; diff --git a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/mysql/MysqlCdcSync.java b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/mysql/MysqlCdcSync.java index 4fff93b6..5d7a935f 100644 --- a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/mysql/MysqlCdcSync.java +++ b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/mysql/MysqlCdcSync.java @@ -16,9 +16,9 @@ package com.oceanbase.connector.flink.source.cdc.mysql; +import com.oceanbase.connector.flink.process.Sync; import com.oceanbase.connector.flink.source.FieldSchema; import com.oceanbase.connector.flink.source.TableSchema; -import com.oceanbase.connector.flink.source.cdc.CdcSync; import com.oceanbase.connector.flink.source.cdc.OceanBaseJsonDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -55,7 +55,7 @@ import static org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.PROPERTIES_PREFIX; -public class MysqlCdcSync extends CdcSync { +public class MysqlCdcSync extends Sync { private static final Logger LOG = LoggerFactory.getLogger(MysqlCdcSync.class); public static final String JDBC_URL_PATTERN = @@ -134,7 +134,7 @@ protected List getTableSchemas() { } @Override - protected DataStreamSource buildCdcSource() { + protected DataStreamSource buildSource() { String databaseName = sourceConfig.get(MySqlSourceOptions.DATABASE_NAME); MySqlSourceBuilder sourceBuilder = MySqlSource.builder(); sourceBuilder diff --git a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/mysql/MysqlDateConverter.java b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/mysql/MysqlDateConverter.java index fd080089..326149af 100644 --- a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/mysql/MysqlDateConverter.java +++ b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/mysql/MysqlDateConverter.java @@ -16,7 +16,7 @@ package com.oceanbase.connector.flink.source.cdc.mysql; -import com.oceanbase.connector.flink.source.cdc.CdcSyncConfig; +import com.oceanbase.connector.flink.config.CliConfig; import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder; @@ -49,37 +49,29 @@ public class MysqlDateConverter implements CustomConverter dateFormatter = DateTimeFormatter.ofPattern(p)); + props, CliConfig.FORMAT_DATE, p -> dateFormatter = DateTimeFormatter.ofPattern(p)); readProps( - props, - CdcSyncConfig.FORMAT_TIME, - p -> timeFormatter = DateTimeFormatter.ofPattern(p)); + props, CliConfig.FORMAT_TIME, p -> timeFormatter = DateTimeFormatter.ofPattern(p)); readProps( props, - CdcSyncConfig.FORMAT_DATETIME, + CliConfig.FORMAT_DATETIME, p -> datetimeFormatter = DateTimeFormatter.ofPattern(p)); readProps( props, - CdcSyncConfig.FORMAT_TIMESTAMP, + CliConfig.FORMAT_TIMESTAMP, p -> timestampFormatter = DateTimeFormatter.ofPattern(p)); - readProps(props, CdcSyncConfig.FORMAT_TIMESTAMP_ZONE, z -> timestampZoneId = ZoneId.of(z)); + readProps(props, CliConfig.FORMAT_TIMESTAMP_ZONE, z -> timestampZoneId = ZoneId.of(z)); } private void readProps(Properties properties, String settingKey, Consumer consumer) { @@ -101,19 +93,19 @@ public void converterFor( String sqlType = column.typeName().toUpperCase(); SchemaBuilder schemaBuilder = null; Converter converter = null; - if (CdcSyncConfig.UPPERCASE_DATE.equals(sqlType)) { + if (CliConfig.UPPERCASE_DATE.equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional(); converter = this::convertDate; } - if (CdcSyncConfig.TIME.equals(sqlType)) { + if (CliConfig.TIME.equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional(); converter = this::convertTime; } - if (CdcSyncConfig.DATETIME.equals(sqlType)) { + if (CliConfig.DATETIME.equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional(); converter = this::convertDateTime; } - if (CdcSyncConfig.TIMESTAMP.equals(sqlType)) { + if (CliConfig.TIMESTAMP.equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional(); converter = this::convertTimestamp; } diff --git a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/table/OceanBaseJsonSerializationSchema.java b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/table/OceanBaseJsonSerializationSchema.java index 75c8f7a3..2fe07a42 100644 --- a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/table/OceanBaseJsonSerializationSchema.java +++ b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/table/OceanBaseJsonSerializationSchema.java @@ -27,9 +27,10 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.VarCharType; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink-connector-oceanbase-cli/src/test/java/com/oceanbase/connector/flink/MysqlCdcSyncITCase.java b/flink-connector-oceanbase-cli/src/test/java/com/oceanbase/connector/flink/MysqlCdcSyncITCase.java index d98f61ec..742c0fa1 100644 --- a/flink-connector-oceanbase-cli/src/test/java/com/oceanbase/connector/flink/MysqlCdcSyncITCase.java +++ b/flink-connector-oceanbase-cli/src/test/java/com/oceanbase/connector/flink/MysqlCdcSyncITCase.java @@ -16,11 +16,6 @@ package com.oceanbase.connector.flink; -import com.oceanbase.connector.flink.source.cdc.CdcSync; -import com.oceanbase.connector.flink.source.cdc.mysql.MysqlCdcSync; - -import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions; -import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.jupiter.api.AfterAll; @@ -32,8 +27,6 @@ import org.testcontainers.containers.MySQLContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; -import java.util.HashMap; -import java.util.Map; import java.util.stream.Stream; public class MysqlCdcSyncITCase extends OceanBaseMySQLTestBase { @@ -64,50 +57,41 @@ public static void tearDown() { @Test public void testMysqlCdcSync() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - Map flinkMap = new HashMap<>(); - flinkMap.put("execution.checkpointing.interval", "10s"); - flinkMap.put("pipeline.operator-chaining", "false"); - flinkMap.put("parallelism.default", "1"); - Configuration configuration = Configuration.fromMap(flinkMap); - env.configure(configuration); - - // match all tables - String tableName = ".*"; - - Map mysqlConfig = new HashMap<>(); - mysqlConfig.put(MySqlSourceOptions.HOSTNAME.key(), MYSQL_CONTAINER.getHost()); - mysqlConfig.put( - MySqlSourceOptions.PORT.key(), - String.valueOf(MYSQL_CONTAINER.getMappedPort(MySQLContainer.MYSQL_PORT))); - mysqlConfig.put(MySqlSourceOptions.USERNAME.key(), MYSQL_CONTAINER.getUsername()); - mysqlConfig.put(MySqlSourceOptions.PASSWORD.key(), MYSQL_CONTAINER.getPassword()); - mysqlConfig.put(MySqlSourceOptions.DATABASE_NAME.key(), MYSQL_CONTAINER.getDatabaseName()); - mysqlConfig.put(MySqlSourceOptions.TABLE_NAME.key(), tableName); - mysqlConfig.put("jdbc.properties.useSSL", "false"); - Configuration sourceConfig = Configuration.fromMap(mysqlConfig); - - Map sinkConfig = new HashMap<>(); - sinkConfig.put(OceanBaseConnectorOptions.USERNAME.key(), CONTAINER.getUsername()); - sinkConfig.put(OceanBaseConnectorOptions.PASSWORD.key(), CONTAINER.getPassword()); - sinkConfig.put(OceanBaseConnectorOptions.URL.key(), CONTAINER.getJdbcUrl()); - sinkConfig.put("sink.enable-delete", "false"); - Configuration sinkConf = Configuration.fromMap(sinkConfig); - - CdcSync cdcSync = new MysqlCdcSync(); - cdcSync.setEnv(env) - .setSourceConfig(sourceConfig) - .setSinkConfig(sinkConf) - .setDatabase(CONTAINER.getDatabaseName()) - .setIncludingTables(tableName) - .build(); - - env.executeAsync( - String.format( - "MySQL-OceanBase Database Sync: %s -> %s", - MYSQL_CONTAINER.getDatabaseName(), CONTAINER.getDatabaseName())); + Cli.setStreamExecutionEnvironmentForTesting(env); + + Cli.main( + new String[] { + "--source-type", + "mysql-cdc", + "--source-conf", + "hostname=" + getContainerIP(MYSQL_CONTAINER), + "--source-conf", + "port=" + MySQLContainer.MYSQL_PORT, + "--source-conf", + "username=" + MYSQL_CONTAINER.getUsername(), + "--source-conf", + "password=" + MYSQL_CONTAINER.getPassword(), + "--source-conf", + "database-name=" + MYSQL_CONTAINER.getDatabaseName(), + "--source-conf", + "table-name=.*", + "--sink-conf", + "url=" + CONTAINER.getJdbcUrl(), + "--sink-conf", + "username=" + CONTAINER.getUsername(), + "--sink-conf", + "password=" + CONTAINER.getPassword(), + "--job-name", + "test-mysql-cdc-sync", + "--database", + CONTAINER.getDatabaseName(), + "--including-tables", + ".*" + }); waitingAndAssertTableCount("products", 9); waitingAndAssertTableCount("customers", 4); + + Cli.getJobClientForTesting().cancel(); } } diff --git a/flink-connector-oceanbase-cli/src/test/resources/sql/mysql-cdc.sql b/flink-connector-oceanbase-cli/src/test/resources/sql/mysql-cdc.sql index ed7cedd3..c76db64e 100644 --- a/flink-connector-oceanbase-cli/src/test/resources/sql/mysql-cdc.sql +++ b/flink-connector-oceanbase-cli/src/test/resources/sql/mysql-cdc.sql @@ -11,10 +11,6 @@ -- specific language governing permissions and limitations -- under the License. --- ---------------------------------------------------------------------------------------------------------------- --- DATABASE: inventory --- ---------------------------------------------------------------------------------------------------------------- - -- Create and populate our products using a single insert with many rows CREATE TABLE products ( diff --git a/flink-connector-oceanbase-e2e-tests/pom.xml b/flink-connector-oceanbase-e2e-tests/pom.xml index 75acdff6..8a6ec9d5 100644 --- a/flink-connector-oceanbase-e2e-tests/pom.xml +++ b/flink-connector-oceanbase-e2e-tests/pom.xml @@ -30,12 +30,14 @@ under the License. mysql-connector-java test + com.oceanbase flink-connector-oceanbase-base ${project.version} test + com.oceanbase flink-connector-oceanbase-base @@ -43,6 +45,12 @@ under the License. test-jar test + + + org.testcontainers + mysql + test + @@ -108,6 +116,22 @@ under the License. jar ${project.build.directory}/dependencies + + com.oceanbase + flink-connector-oceanbase-cli + ${project.version} + flink-connector-oceanbase-cli.jar + jar + ${project.build.directory}/dependencies + + + org.apache.flink + flink-sql-connector-mysql-cdc + ${flink.cdc.version} + flink-sql-connector-mysql-cdc.jar + jar + ${project.build.directory}/dependencies + diff --git a/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/MysqlCdcSyncE2eITCase.java b/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/MysqlCdcSyncE2eITCase.java new file mode 100644 index 00000000..5155e5e1 --- /dev/null +++ b/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/MysqlCdcSyncE2eITCase.java @@ -0,0 +1,92 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.connector.flink; + +import com.oceanbase.connector.flink.utils.FlinkContainerTestEnvironment; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIfSystemProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +import java.util.Collections; +import java.util.stream.Stream; + +@DisabledIfSystemProperty( + named = "flink_version", + matches = "1.15.4", + disabledReason = "Flink 1.15.4 does not contain 'SideOutputDataStream'") +public class MysqlCdcSyncE2eITCase extends FlinkContainerTestEnvironment { + + private static final Logger LOG = LoggerFactory.getLogger(MysqlCdcSyncE2eITCase.class); + + private static final MySQLContainer MYSQL_CONTAINER = + new MySQLContainer<>("mysql:8.0.20") + .withConfigurationOverride("docker/mysql") + .withInitScript("sql/mysql-cdc.sql") + .withNetwork(NETWORK) + .withExposedPorts(3306) + .withDatabaseName("test") + .withUsername("root") + .withPassword("mysqlpw") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @BeforeAll + public static void setup() { + CONTAINER.withLogConsumer(new Slf4jLogConsumer(LOG)).start(); + MYSQL_CONTAINER.start(); + } + + @AfterAll + public static void tearDown() { + Stream.of(CONTAINER, MYSQL_CONTAINER).forEach(GenericContainer::stop); + } + + @Test + public void testMysqlCdcSync() throws Exception { + submitJob( + Collections.singletonList(getResource("flink-sql-connector-mysql-cdc.jar")), + getResource("flink-connector-oceanbase-cli.jar"), + new String[] { + multipleParameterArg("source-type", "mysql-cdc"), + multipleParameterArg( + "source-conf", "hostname=" + getContainerIP(MYSQL_CONTAINER)), + multipleParameterArg("source-conf", "port=" + MySQLContainer.MYSQL_PORT), + multipleParameterArg( + "source-conf", "username=" + MYSQL_CONTAINER.getUsername()), + multipleParameterArg( + "source-conf", "password=" + MYSQL_CONTAINER.getPassword()), + multipleParameterArg( + "source-conf", "database-name=" + MYSQL_CONTAINER.getDatabaseName()), + multipleParameterArg("source-conf", "table-name=.*"), + multipleParameterArg("sink-conf", "url=" + getJdbcUrl()), + multipleParameterArg("sink-conf", "username=" + CONTAINER.getUsername()), + multipleParameterArg("sink-conf", "password=" + CONTAINER.getPassword()), + multipleParameterArg("job-name", "test-mysql-cdc-sync"), + multipleParameterArg("database", CONTAINER.getDatabaseName()), + multipleParameterArg("including-tables", ".*") + }); + + waitingAndAssertTableCount("products", 9); + waitingAndAssertTableCount("customers", 4); + } +} diff --git a/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/utils/FlinkContainerTestEnvironment.java b/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/utils/FlinkContainerTestEnvironment.java index f22b799a..39a139ce 100644 --- a/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/utils/FlinkContainerTestEnvironment.java +++ b/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/utils/FlinkContainerTestEnvironment.java @@ -18,14 +18,22 @@ import com.oceanbase.connector.flink.OceanBaseMySQLTestBase; +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.command.ExecCreateCmdResponse; +import com.github.dockerjava.api.model.Volume; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.BindMode; import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.FrameConsumerResultCallback; +import org.testcontainers.containers.output.OutputFrame; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.output.ToStringConsumer; import org.testcontainers.lifecycle.Startables; import org.testcontainers.utility.MountableFile; @@ -67,7 +75,8 @@ protected String getFlinkProperties() { "execution.checkpointing.interval: 300")); } - public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + protected final TemporaryFolder temporaryFolder = new TemporaryFolder(); + protected final Volume sharedVolume = new Volume("/tmp/shared"); public GenericContainer jobManager; public GenericContainer taskManager; @@ -75,9 +84,9 @@ protected String getFlinkProperties() { @SuppressWarnings("resource") @BeforeEach public void before() throws Exception { + LOG.info("Starting Flink containers..."); temporaryFolder.create(); - LOG.info("Starting Flink containers..."); jobManager = new GenericContainer<>(getFlinkDockerImageTag()) .withCommand("jobmanager") @@ -85,7 +94,10 @@ public void before() throws Exception { .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) .withExposedPorts(JOB_MANAGER_REST_PORT) .withEnv("FLINK_PROPERTIES", getFlinkProperties()) + .withCreateContainerCmdModifier(cmd -> cmd.withVolumes(sharedVolume)) .withLogConsumer(new Slf4jLogConsumer(LOG)); + Startables.deepStart(jobManager).join(); + runInContainerAsRoot(jobManager, "chmod", "0777", "-R", sharedVolume.toString()); taskManager = new GenericContainer<>(getFlinkDockerImageTag()) @@ -94,9 +106,11 @@ public void before() throws Exception { .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) .withEnv("FLINK_PROPERTIES", getFlinkProperties()) .dependsOn(jobManager) + .withVolumesFrom(jobManager, BindMode.READ_WRITE) .withLogConsumer(new Slf4jLogConsumer(LOG)); + Startables.deepStart(taskManager).join(); + runInContainerAsRoot(taskManager, "chmod", "0777", "-R", sharedVolume.toString()); - Startables.deepStart(Stream.of(jobManager, taskManager)).join(); LOG.info("Flink containers started"); } @@ -194,8 +208,7 @@ public void submitSQLJob(List sqlLines, Path... jars) commands.add(FLINK_BIN + "/sql-client.sh"); for (Path jar : jars) { commands.add("--jar"); - String containerPath = - copyAndGetContainerPath(jobManager, jar.toAbsolutePath().toString()); + String containerPath = copyAndGetContainerPath(jar.toAbsolutePath().toString()); commands.add(containerPath); } @@ -208,10 +221,55 @@ public void submitSQLJob(List sqlLines, Path... jars) } } - private String copyAndGetContainerPath(GenericContainer container, String filePath) { + private String copyAndGetContainerPath(String filePath) { Path path = Paths.get(filePath); - String containerPath = "/tmp/" + path.getFileName(); - container.copyFileToContainer(MountableFile.forHostPath(path), containerPath); + String containerPath = sharedVolume + "/" + path.getFileName(); + jobManager.copyFileToContainer(MountableFile.forHostPath(path), containerPath); return containerPath; } + + private void runInContainerAsRoot(GenericContainer container, String... command) + throws InterruptedException { + ToStringConsumer stdoutConsumer = new ToStringConsumer(); + ToStringConsumer stderrConsumer = new ToStringConsumer(); + DockerClient dockerClient = DockerClientFactory.instance().client(); + ExecCreateCmdResponse execCreateCmdResponse = + dockerClient + .execCreateCmd(container.getContainerId()) + .withUser("root") + .withCmd(command) + .exec(); + FrameConsumerResultCallback callback = new FrameConsumerResultCallback(); + callback.addConsumer(OutputFrame.OutputType.STDOUT, stdoutConsumer); + callback.addConsumer(OutputFrame.OutputType.STDERR, stderrConsumer); + dockerClient.execStartCmd(execCreateCmdResponse.getId()).exec(callback).awaitCompletion(); + } + + public String multipleParameterArg(String key, String value) { + return String.format("--%s '%s'", key, value); + } + + public void submitJob(List dependencies, Path jar, String[] args) + throws IOException, InterruptedException { + final List commands = new ArrayList<>(); + commands.add(FLINK_BIN + "/flink run --detached"); + if (dependencies != null && !dependencies.isEmpty()) { + for (Path dependency : dependencies) { + commands.add( + "--classpath " + + "file://" + + copyAndGetContainerPath(dependency.toAbsolutePath().toString())); + } + } + commands.add(copyAndGetContainerPath(jar.toAbsolutePath().toString())); + commands.addAll(Arrays.asList(args)); + + Container.ExecResult execResult = + jobManager.execInContainer("bash", "-c", String.join(" ", commands)); + LOG.info(execResult.getStdout()); + LOG.error(execResult.getStderr()); + if (execResult.getExitCode() != 0) { + throw new AssertionError("Failed when submitting the job."); + } + } } diff --git a/flink-connector-oceanbase-e2e-tests/src/test/resources/docker/mysql/my.cnf b/flink-connector-oceanbase-e2e-tests/src/test/resources/docker/mysql/my.cnf new file mode 100644 index 00000000..a3908978 --- /dev/null +++ b/flink-connector-oceanbase-e2e-tests/src/test/resources/docker/mysql/my.cnf @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# For advice on how to change settings please see +# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html + +[mysqld] +# +# Remove leading # and set to the amount of RAM for the most important data +# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%. +# innodb_buffer_pool_size = 128M +# +# Remove leading # to turn on a very important data integrity option: logging +# changes to the binary log between backups. +# log_bin +# +# Remove leading # to set options mainly useful for reporting servers. +# The server defaults are faster for transactions and fast SELECTs. +# Adjust sizes as needed, experiment to find the optimal values. +# join_buffer_size = 128M +# sort_buffer_size = 2M +# read_rnd_buffer_size = 2M +skip-host-cache +skip-name-resolve +#datadir=/var/lib/mysql +#socket=/var/lib/mysql/mysql.sock +secure-file-priv=/var/lib/mysql +user=mysql + +# Disabling symbolic-links is recommended to prevent assorted security risks +symbolic-links=0 + +#log-error=/var/log/mysqld.log +#pid-file=/var/run/mysqld/mysqld.pid + +# ---------------------------------------------- +# Enable the binlog for replication & CDC +# ---------------------------------------------- + +# Enable binary replication log and set the prefix, expiration, and log format. +# The prefix is arbitrary, expiration can be short for integration tests but would +# be longer on a production system. Row-level info is required for ingest to work. +# Server ID is required, but this will vary on production systems +server-id = 223344 +log_bin = mysql-bin +expire_logs_days = 1 +binlog_format = row + +# enable gtid mode +gtid_mode = on +enforce_gtid_consistency = on \ No newline at end of file diff --git a/flink-connector-oceanbase-e2e-tests/src/test/resources/sql/mysql-cdc.sql b/flink-connector-oceanbase-e2e-tests/src/test/resources/sql/mysql-cdc.sql new file mode 100644 index 00000000..c76db64e --- /dev/null +++ b/flink-connector-oceanbase-e2e-tests/src/test/resources/sql/mysql-cdc.sql @@ -0,0 +1,49 @@ +-- Copyright 2024 OceanBase. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Create and populate our products using a single insert with many rows +CREATE TABLE products +( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight FLOAT +); +ALTER TABLE products AUTO_INCREMENT = 101; + +INSERT INTO products +VALUES (default, "scooter", "Small 2-wheel scooter", 3.14), + (default, "car battery", "12V car battery", 8.1), + (default, "12-pack drill bits", "12-pack of drill bits with sizes ranging from #40 to #3", 0.8), + (default, "hammer", "12oz carpenter's hammer", 0.75), + (default, "hammer", "14oz carpenter's hammer", 0.875), + (default, "hammer", "16oz carpenter's hammer", 1.0), + (default, "rocks", "box of assorted rocks", 5.3), + (default, "jacket", "water resistent black wind breaker", 0.1), + (default, "spare tire", "24 inch spare tire", 22.2); + +-- Create some customers ... +CREATE TABLE customers +( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + first_name VARCHAR(255) NOT NULL, + last_name VARCHAR(255) NOT NULL, + email VARCHAR(255) NOT NULL UNIQUE KEY +) AUTO_INCREMENT = 1001; + + +INSERT INTO customers +VALUES (default, "Sally", "Thomas", "sally.thomas@acme.com"), + (default, "George", "Bailey", "gbailey@foobar.com"), + (default, "Edward", "Walker", "ed@walker.com"), + (default, "Anne", "Kretchmar", "annek@noanswer.org"); diff --git a/flink-sql-connector-obkv-hbase/pom.xml b/flink-sql-connector-obkv-hbase/pom.xml index ea562c18..70421329 100644 --- a/flink-sql-connector-obkv-hbase/pom.xml +++ b/flink-sql-connector-obkv-hbase/pom.xml @@ -61,6 +61,9 @@ under the License. package + false + false + false com.oceanbase:* diff --git a/flink-sql-connector-oceanbase-directload/pom.xml b/flink-sql-connector-oceanbase-directload/pom.xml index 80eb6b42..216bd5c5 100644 --- a/flink-sql-connector-oceanbase-directload/pom.xml +++ b/flink-sql-connector-oceanbase-directload/pom.xml @@ -61,6 +61,9 @@ under the License. package + false + false + false com.oceanbase:* diff --git a/flink-sql-connector-oceanbase/pom.xml b/flink-sql-connector-oceanbase/pom.xml index 5de1a38c..79fd04bb 100644 --- a/flink-sql-connector-oceanbase/pom.xml +++ b/flink-sql-connector-oceanbase/pom.xml @@ -61,6 +61,9 @@ under the License. package + false + false + false com.oceanbase:* diff --git a/pom.xml b/pom.xml index 6b57caf1..a16d574c 100644 --- a/pom.xml +++ b/pom.xml @@ -46,6 +46,8 @@ under the License. 1.18 2.12 com.oceanbase.connector.flink.shaded + + 3.2.1 @@ -145,24 +147,35 @@ under the License. ${flink.version} provided + + + org.apache.flink + flink-core + ${flink.version} + provided + + org.apache.flink flink-connector-base ${flink.version} provided + org.apache.flink flink-test-utils ${flink.version} test + org.apache.flink flink-table-planner_${scala.binary.version} ${flink.version} test + org.apache.flink flink-table-planner_${scala.binary.version} @@ -170,6 +183,7 @@ under the License. test-jar test + org.testcontainers oceanbase