From 0554b0589e1523f35c4a175ae559630acc6d01f4 Mon Sep 17 00:00:00 2001 From: He Wang Date: Wed, 24 Apr 2024 16:20:24 +0800 Subject: [PATCH] docs: add demo for OceanBase EE (#63) --- docs/sink/flink-connector-oceanbase.md | 96 ++++++++++------------- docs/sink/flink-connector-oceanbase_cn.md | 96 ++++++++++------------- 2 files changed, 86 insertions(+), 106 deletions(-) diff --git a/docs/sink/flink-connector-oceanbase.md b/docs/sink/flink-connector-oceanbase.md index da4abcf5..3c824622 100644 --- a/docs/sink/flink-connector-oceanbase.md +++ b/docs/sink/flink-connector-oceanbase.md @@ -52,6 +52,25 @@ To use this connector through Flink SQL directly, you need to download the shade - Release versions: https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-oceanbase - Snapshot versions: https://s01.oss.sonatype.org/content/repositories/snapshots/com/oceanbase/flink-sql-connector-oceanbase +This project has built-in MySQL driver 8.0.28. For users of OceanBase EE who want to use OceanBase JDBC driver, they need to manually introduce the following dependencies: + +
+ + + + + + + + + + + + + +
Dependency ItemDescription
com.oceanbase:oceanbase-client:2.4.9Used for connecting to OceanBase EE.
+
+ ### Demo #### Preparation @@ -68,59 +87,6 @@ CREATE TABLE `t_sink` ( ); ``` -#### Java Demo - -Take Maven project for example, add the required dependencies to the pom.xml, and then use the following code. - -```java -package com.oceanbase; - -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; - -public class Main { - public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); - StreamTableEnvironment tEnv = - StreamTableEnvironment.create( - env, EnvironmentSettings.newInstance().inStreamingMode().build()); - - tEnv.executeSql( - "CREATE TABLE t_sink ( " - + " id INT," - + " username VARCHAR," - + " score INT," - + " PRIMARY KEY (id) NOT ENFORCED" - + ") with (" - + " 'connector' = 'oceanbase'," - + " 'url' = 'jdbc:mysql://127.0.0.1:2881/test'," - + " 'schema-name'= 'test'," - + " 'table-name' = 't_sink'," - + " 'username' = 'root@test#obcluster'," - + " 'password' = 'pswd'," - + " 'druid-properties' = 'druid.initialSize=10;druid.maxActive=100'," - + " 'buffer-flush.interval' = '1s'," - + " 'buffer-flush.buffer-size' = '5000'," - + " 'max-retries' = '3'" - + " );"); - - tEnv.executeSql( - "INSERT INTO t_sink VALUES " - + "(1, 'Tom', 99)," - + "(2, 'Jerry', 88)," - + "(1, 'Tom', 89);") - .await(); - } -} - -``` - -Once executed, the records should have been written to OceanBase. - -For more information please refer to [OceanBaseConnectorITCase.java](../../flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java). - #### Flink SQL Demo Put the JAR files of dependencies to the 'lib' directory of Flink, and then create the destination table with Flink SQL through the sql client. @@ -156,6 +122,30 @@ VALUES (1, 'Tom', 99), Once executed, the records should have been written to OceanBase. +For users of OceanBase EE, you need to specify the `url` and `driver-class-name` corresponding to the OceanBase JDBC driver. + +```sql +CREATE TABLE t_sink +( + id INT, + username VARCHAR, + score INT, + PRIMARY KEY (id) NOT ENFORCED +) with ( + 'connector' = 'oceanbase', + 'url' = 'jdbc:oceanbase://127.0.0.1:2881/SYS', + 'driver-class-name' = 'com.oceanbase.jdbc.Driver', + 'schema-name' = 'SYS', + 'table-name' = 'T_SINK', + 'username' = 'SYS@test#obcluster', + 'password' = 'pswd', + 'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;', + 'buffer-flush.interval' = '1s', + 'buffer-flush.buffer-size' = '5000', + 'max-retries' = '3' + ); +``` + ## Configuration | Option | Required by Table API | Required by DataStream | Default | Type | Description | diff --git a/docs/sink/flink-connector-oceanbase_cn.md b/docs/sink/flink-connector-oceanbase_cn.md index 41ed780c..16e40975 100644 --- a/docs/sink/flink-connector-oceanbase_cn.md +++ b/docs/sink/flink-connector-oceanbase_cn.md @@ -52,6 +52,25 @@ mvn clean package -DskipTests - 正式版本:https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-oceanbase - 快照版本:https://s01.oss.sonatype.org/content/repositories/snapshots/com/oceanbase/flink-sql-connector-oceanbase +本项目内置了 MySQL 驱动 8.0.28,对于想使用 OceanBase JDBC 驱动的 OceanBase 数据库企业版的用户,需要手动引入以下依赖: + +
+ + + + + + + + + + + + + +
依赖名称说明
com.oceanbase:oceanbase-client:2.4.9用于连接到 OceanBase 数据库企业版。
+
+ ### 示例 #### 准备 @@ -69,59 +88,6 @@ CREATE TABLE `t_sink` ); ``` -#### Java 应用示例 - -以 Maven 项目为例,将需要的依赖加入到应用的 pom.xml 文件中,然后使用以下代码。 - -```java -package com.oceanbase; - -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; - -public class Main { - public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); - StreamTableEnvironment tEnv = - StreamTableEnvironment.create( - env, EnvironmentSettings.newInstance().inStreamingMode().build()); - - tEnv.executeSql( - "CREATE TABLE t_sink ( " - + " id INT," - + " username VARCHAR," - + " score INT," - + " PRIMARY KEY (id) NOT ENFORCED" - + ") with (" - + " 'connector' = 'oceanbase'," - + " 'url' = 'jdbc:mysql://127.0.0.1:2881/test'," - + " 'schema-name'= 'test'," - + " 'table-name' = 't_sink'," - + " 'username' = 'root@test#obcluster'," - + " 'password' = 'pswd'," - + " 'druid-properties' = 'druid.initialSize=10;druid.maxActive=100'," - + " 'buffer-flush.interval' = '1s'," - + " 'buffer-flush.buffer-size' = '5000'," - + " 'max-retries' = '3'" - + " );"); - - tEnv.executeSql( - "INSERT INTO t_sink VALUES " - + "(1, 'Tom', 99)," - + "(2, 'Jerry', 88)," - + "(1, 'Tom', 89);") - .await(); - } -} - -``` - -执行完成后,即可在 OceanBase 中检索验证。 - -更多信息请参考 [OceanBaseConnectorITCase.java](../../flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java)。 - #### Flink SQL 示例 将需要用到的依赖的 JAR 文件放到 Flink 的 lib 目录下,之后通过 SQL Client 在 Flink 中创建目的表。 @@ -158,6 +124,30 @@ VALUES (1, 'Tom', 99), 执行完成后,即可在 OceanBase 中检索验证。 +对于 OceanBase 数据库企业版的用户,需要指定 OceanBase JDBC 驱动对应的 `url` 和 `driver-class-name`。 + +```sql +CREATE TABLE t_sink +( + id INT, + username VARCHAR, + score INT, + PRIMARY KEY (id) NOT ENFORCED +) with ( + 'connector' = 'oceanbase', + 'url' = 'jdbc:oceanbase://127.0.0.1:2881/SYS', + 'driver-class-name' = 'com.oceanbase.jdbc.Driver', + 'schema-name' = 'SYS', + 'table-name' = 'T_SINK', + 'username' = 'SYS@test#obcluster', + 'password' = 'pswd', + 'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;', + 'buffer-flush.interval' = '1s', + 'buffer-flush.buffer-size' = '5000', + 'max-retries' = '3' + ); +``` + ## 配置项 | 参数名 | Table API 必需 | DataStream 必需 | 默认值 | 类型 | 描述 |