Skip to content

Commit

Permalink
chore: update cli module shaded config and add e2e test (#121)
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe authored Jan 23, 2025
1 parent e6543e8 commit 5771d1d
Show file tree
Hide file tree
Showing 23 changed files with 513 additions and 236 deletions.
17 changes: 8 additions & 9 deletions docs/cli/flink-cdc/flink-cdc-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,19 @@ Replace the following command with your real database information, and execute i
$FLINK_HOME/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c com.oceanbase.connector.flink.CdcCli \
lib/flink-connector-oceanbase-cli-xxx.jar \
mysql-cdc \
--database test_db \
--source-type mysql-cdc \
--source-conf hostname=xxxx \
--source-conf port=3306 \
--source-conf username=root \
--source-conf password=xxxx \
--source-conf database-name=test_db \
--source-conf table-name=.* \
--including-tables ".*" \
--sink-conf username=xxxx \
--sink-conf password=xxxx \
--sink-conf url=jdbc:mysql://xxxx:xxxx
--sink-conf url=jdbc:mysql://xxxx:xxxx \
--database test_db \
--including-tables ".*"
```

### Check and Verify
Expand All @@ -101,18 +100,18 @@ You can go on insert test data to MySQL database, since it is a CDC task, after
</thead>
<tbody>
<tr>
<td>${job-type}</td>
<td>--source-type</td>
<td>Yes</td>
<td>Enumeration value</td>
<td style="word-wrap: break-word;"></td>
<td>Job type, can be <code>mysql-cdc</code>.</td>
<td>Source type, can be <code>mysql-cdc</code>.</td>
</tr>
<tr>
<td>--source-conf</td>
<td>Yes</td>
<td>Multi-value parameter</td>
<td style="word-wrap: break-word;"></td>
<td>Configurations of specific Flink CDC Source.</td>
<td>Configurations of the specific source.</td>
</tr>
<tr>
<td>--sink-conf</td>
Expand All @@ -125,7 +124,7 @@ You can go on insert test data to MySQL database, since it is a CDC task, after
<td>--job-name</td>
<td>No</td>
<td>String</td>
<td style="word-wrap: break-word;">${job-type} Sync</td>
<td style="word-wrap: break-word;">${source-type} Sync</td>
<td>The Flink job name.</td>
</tr>
<tr>
Expand Down
17 changes: 8 additions & 9 deletions docs/cli/flink-cdc/flink-cdc-source_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,19 @@ VALUES (default, "Sally", "Thomas", "sally.thomas@acme.com"),
$FLINK_HOME/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c com.oceanbase.connector.flink.CdcCli \
lib/flink-connector-oceanbase-cli-xxx.jar \
mysql-cdc \
--database test_db \
--source-type mysql-cdc \
--source-conf hostname=xxxx \
--source-conf port=3306 \
--source-conf username=root \
--source-conf password=xxxx \
--source-conf database-name=test_db \
--source-conf table-name=.* \
--including-tables ".*" \
--sink-conf username=xxxx \
--sink-conf password=xxxx \
--sink-conf url=jdbc:mysql://xxxx:xxxx
--sink-conf url=jdbc:mysql://xxxx:xxxx \
--database test_db \
--including-tables ".*"
```

请将以上的数据库信息替换为您真实的数据库信息,当出现类似于以下的信息时,任务构建成功并提交。
Expand All @@ -103,18 +102,18 @@ $FLINK_HOME/bin/flink run \
</thead>
<tbody>
<tr>
<td>${job-type}</td>
<td>--source-type</td>
<td>是</td>
<td>枚举值</td>
<td style="word-wrap: break-word;"></td>
<td>任务类型,可以是 <code>mysql-cdc</code>。</td>
<td>源端类型,可以是 <code>mysql-cdc</code>。</td>
</tr>
<tr>
<td>--source-conf</td>
<td>是</td>
<td>多值参数</td>
<td style="word-wrap: break-word;"></td>
<td>指定类型的 Flink CDC 源端连接器的配置参数。</td>
<td>指定类型的源端的配置参数。</td>
</tr>
<tr>
<td>--sink-conf</td>
Expand All @@ -127,7 +126,7 @@ $FLINK_HOME/bin/flink run \
<td>--job-name</td>
<td>否</td>
<td>String</td>
<td style="word-wrap: break-word;">${job-type} Sync</td>
<td style="word-wrap: break-word;">${source-type} Sync</td>
<td>Flink 任务名称。</td>
</tr>
<tr>
Expand Down
96 changes: 34 additions & 62 deletions flink-connector-oceanbase-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,10 @@ under the License.
<artifactId>flink-connector-oceanbase-cli</artifactId>
<packaging>jar</packaging>

<properties>
<flink.cdc.version>3.2.1</flink.cdc.version>
<ojdbc.version>19.3.0.0</ojdbc.version>
</properties>

<dependencies>
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase</artifactId>
<artifactId>flink-sql-connector-oceanbase</artifactId>
<version>${project.version}</version>
</dependency>

Expand All @@ -49,62 +44,6 @@ under the License.
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>${flink.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-postgres-cdc</artifactId>
<version>${flink.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
<version>${flink.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-db2-cdc</artifactId>
<version>${flink.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.oracle.ojdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>${ojdbc.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.oceanbase</groupId>
Expand All @@ -121,4 +60,37 @@ under the License.
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
<configuration>
<shadeTestJar>false</shadeTestJar>
<createDependencyReducedPom>false</createDependencyReducedPom>
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.oceanbase.connector.flink.Cli</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package com.oceanbase.connector.flink;

import com.oceanbase.connector.flink.source.cdc.CdcSync;
import com.oceanbase.connector.flink.source.cdc.CdcSyncConfig;
import com.oceanbase.connector.flink.config.CliConfig;
import com.oceanbase.connector.flink.process.Sync;
import com.oceanbase.connector.flink.source.cdc.mysql.MysqlCdcSync;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.StringUtils;

Expand All @@ -31,47 +33,60 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class CdcCli {
private static final Logger LOG = LoggerFactory.getLogger(CdcCli.class);
public class Cli {
private static final Logger LOG = LoggerFactory.getLogger(Cli.class);

private static StreamExecutionEnvironment flinkEnvironmentForTesting;
private static JobClient jobClientForTesting;

@VisibleForTesting
public static void setStreamExecutionEnvironmentForTesting(StreamExecutionEnvironment env) {
flinkEnvironmentForTesting = env;
}

@VisibleForTesting
public static JobClient getJobClientForTesting() {
return jobClientForTesting;
}

public static void main(String[] args) throws Exception {
LOG.info("Starting CdcCli with args: {}", Arrays.toString(args));
LOG.info("Starting CLI with args: {}", Arrays.toString(args));

String jobType = args[0];
String[] opArgs = Arrays.copyOfRange(args, 1, args.length);
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
String sourceType = params.getRequired(CliConfig.SOURCE_TYPE);

CdcSync cdcSync;
switch (jobType.trim().toLowerCase()) {
case CdcSyncConfig.MYSQL_CDC:
cdcSync = new MysqlCdcSync();
Sync sync;
switch (sourceType.trim().toLowerCase()) {
case CliConfig.MYSQL_CDC:
sync = new MysqlCdcSync();
break;
default:
throw new RuntimeException("Unsupported job type: " + jobType);
throw new RuntimeException("Unsupported source type: " + sourceType);
}

Map<String, String> sourceConfigMap = getConfigMap(params, CdcSyncConfig.SOURCE_CONF);
Map<String, String> sourceConfigMap = getConfigMap(params, CliConfig.SOURCE_CONF);
Configuration sourceConfig = Configuration.fromMap(sourceConfigMap);

Map<String, String> sinkConfigMap = getConfigMap(params, CdcSyncConfig.SINK_CONF);
Map<String, String> sinkConfigMap = getConfigMap(params, CliConfig.SINK_CONF);
Configuration sinkConfig = Configuration.fromMap(sinkConfigMap);

String jobName = params.get(CdcSyncConfig.JOB_NAME);
String database = params.get(CdcSyncConfig.DATABASE);
String tablePrefix = params.get(CdcSyncConfig.TABLE_PREFIX);
String tableSuffix = params.get(CdcSyncConfig.TABLE_SUFFIX);
String includingTables = params.get(CdcSyncConfig.INCLUDING_TABLES);
String excludingTables = params.get(CdcSyncConfig.EXCLUDING_TABLES);
String multiToOneOrigin = params.get(CdcSyncConfig.MULTI_TO_ONE_ORIGIN);
String multiToOneTarget = params.get(CdcSyncConfig.MULTI_TO_ONE_TARGET);
String jobName = params.get(CliConfig.JOB_NAME);
String database = params.get(CliConfig.DATABASE);
String tablePrefix = params.get(CliConfig.TABLE_PREFIX);
String tableSuffix = params.get(CliConfig.TABLE_SUFFIX);
String includingTables = params.get(CliConfig.INCLUDING_TABLES);
String excludingTables = params.get(CliConfig.EXCLUDING_TABLES);
String multiToOneOrigin = params.get(CliConfig.MULTI_TO_ONE_ORIGIN);
String multiToOneTarget = params.get(CliConfig.MULTI_TO_ONE_TARGET);

boolean createTableOnly = params.has(CdcSyncConfig.CREATE_TABLE_ONLY);
boolean ignoreDefaultValue = params.has(CdcSyncConfig.IGNORE_DEFAULT_VALUE);
boolean ignoreIncompatible = params.has(CdcSyncConfig.IGNORE_INCOMPATIBLE);
boolean createTableOnly = params.has(CliConfig.CREATE_TABLE_ONLY);
boolean ignoreDefaultValue = params.has(CliConfig.IGNORE_DEFAULT_VALUE);
boolean ignoreIncompatible = params.has(CliConfig.IGNORE_INCOMPATIBLE);

cdcSync.setEnv(env)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
sync.setEnv(env)
.setSourceConfig(sourceConfig)
.setSinkConfig(sinkConfig)
.setDatabase(database)
Expand All @@ -87,9 +102,14 @@ public static void main(String[] args) throws Exception {
.build();

if (StringUtils.isNullOrWhitespaceOnly(jobName)) {
jobName = String.format("%s Sync", jobType);
jobName = String.format("%s Sync", sourceType);
}

if (Objects.nonNull(flinkEnvironmentForTesting)) {
jobClientForTesting = env.executeAsync();
} else {
env.execute(jobName);
}
env.execute(jobName);
}

public static Map<String, String> getConfigMap(MultipleParameterTool params, String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
* limitations under the License.
*/

package com.oceanbase.connector.flink.source.cdc;
package com.oceanbase.connector.flink.config;

public class CdcSyncConfig {
public class CliConfig {

/** Option key for source type. */
public static final String SOURCE_TYPE = "source-type";

/** Option key for cdc source. */
public static final String SOURCE_CONF = "source-conf";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.oceanbase.connector.flink.source.cdc;

import com.oceanbase.connector.flink.source.TableNameConverter;
package com.oceanbase.connector.flink.process;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import java.util.HashMap;
import java.util.Map;
Expand Down
Loading

0 comments on commit 5771d1d

Please sign in to comment.