Skip to content

Commit

Permalink
docs: add demo for OceanBase EE (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe authored Apr 24, 2024
1 parent db1bb47 commit 0554b05
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 106 deletions.
96 changes: 43 additions & 53 deletions docs/sink/flink-connector-oceanbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,25 @@ To use this connector through Flink SQL directly, you need to download the shade
- Release versions: https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-oceanbase
- Snapshot versions: https://s01.oss.sonatype.org/content/repositories/snapshots/com/oceanbase/flink-sql-connector-oceanbase

This project has built-in MySQL driver 8.0.28. For users of OceanBase EE who want to use OceanBase JDBC driver, they need to manually introduce the following dependencies:

<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">Dependency Item</th>
<th class="text-left">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><a href="https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client/2.4.9">com.oceanbase:oceanbase-client:2.4.9</a></td>
<td>Used for connecting to OceanBase EE.</td>
</tr>
</tbody>
</table>
</div>

### Demo

#### Preparation
Expand All @@ -68,59 +87,6 @@ CREATE TABLE `t_sink` (
);
```

#### Java Demo

Take Maven project for example, add the required dependencies to the pom.xml, and then use the following code.

```java
package com.oceanbase;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Main {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().inStreamingMode().build());

tEnv.executeSql(
"CREATE TABLE t_sink ( "
+ " id INT,"
+ " username VARCHAR,"
+ " score INT,"
+ " PRIMARY KEY (id) NOT ENFORCED"
+ ") with ("
+ " 'connector' = 'oceanbase',"
+ " 'url' = 'jdbc:mysql://127.0.0.1:2881/test',"
+ " 'schema-name'= 'test',"
+ " 'table-name' = 't_sink',"
+ " 'username' = 'root@test#obcluster',"
+ " 'password' = 'pswd',"
+ " 'druid-properties' = 'druid.initialSize=10;druid.maxActive=100',"
+ " 'buffer-flush.interval' = '1s',"
+ " 'buffer-flush.buffer-size' = '5000',"
+ " 'max-retries' = '3'"
+ " );");

tEnv.executeSql(
"INSERT INTO t_sink VALUES "
+ "(1, 'Tom', 99),"
+ "(2, 'Jerry', 88),"
+ "(1, 'Tom', 89);")
.await();
}
}

```

Once executed, the records should have been written to OceanBase.

For more information please refer to [OceanBaseConnectorITCase.java](../../flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java).

#### Flink SQL Demo

Put the JAR files of dependencies to the 'lib' directory of Flink, and then create the destination table with Flink SQL through the sql client.
Expand Down Expand Up @@ -156,6 +122,30 @@ VALUES (1, 'Tom', 99),

Once executed, the records should have been written to OceanBase.

For users of OceanBase EE, you need to specify the `url` and `driver-class-name` corresponding to the OceanBase JDBC driver.

```sql
CREATE TABLE t_sink
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) with (
'connector' = 'oceanbase',
'url' = 'jdbc:oceanbase://127.0.0.1:2881/SYS',
'driver-class-name' = 'com.oceanbase.jdbc.Driver',
'schema-name' = 'SYS',
'table-name' = 'T_SINK',
'username' = 'SYS@test#obcluster',
'password' = 'pswd',
'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;',
'buffer-flush.interval' = '1s',
'buffer-flush.buffer-size' = '5000',
'max-retries' = '3'
);
```

## Configuration

| Option | Required by Table API | Required by DataStream | Default | Type | Description |
Expand Down
96 changes: 43 additions & 53 deletions docs/sink/flink-connector-oceanbase_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,25 @@ mvn clean package -DskipTests
- 正式版本:https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-oceanbase
- 快照版本:https://s01.oss.sonatype.org/content/repositories/snapshots/com/oceanbase/flink-sql-connector-oceanbase

本项目内置了 MySQL 驱动 8.0.28,对于想使用 OceanBase JDBC 驱动的 OceanBase 数据库企业版的用户,需要手动引入以下依赖:

<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">依赖名称</th>
<th class="text-left">说明</th>
</tr>
</thead>
<tbody>
<tr>
<td><a href="https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client/2.4.9">com.oceanbase:oceanbase-client:2.4.9</a></td>
<td>用于连接到 OceanBase 数据库企业版。</td>
</tr>
</tbody>
</table>
</div>

### 示例

#### 准备
Expand All @@ -69,59 +88,6 @@ CREATE TABLE `t_sink`
);
```

#### Java 应用示例

以 Maven 项目为例,将需要的依赖加入到应用的 pom.xml 文件中,然后使用以下代码。

```java
package com.oceanbase;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Main {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().inStreamingMode().build());

tEnv.executeSql(
"CREATE TABLE t_sink ( "
+ " id INT,"
+ " username VARCHAR,"
+ " score INT,"
+ " PRIMARY KEY (id) NOT ENFORCED"
+ ") with ("
+ " 'connector' = 'oceanbase',"
+ " 'url' = 'jdbc:mysql://127.0.0.1:2881/test',"
+ " 'schema-name'= 'test',"
+ " 'table-name' = 't_sink',"
+ " 'username' = 'root@test#obcluster',"
+ " 'password' = 'pswd',"
+ " 'druid-properties' = 'druid.initialSize=10;druid.maxActive=100',"
+ " 'buffer-flush.interval' = '1s',"
+ " 'buffer-flush.buffer-size' = '5000',"
+ " 'max-retries' = '3'"
+ " );");

tEnv.executeSql(
"INSERT INTO t_sink VALUES "
+ "(1, 'Tom', 99),"
+ "(2, 'Jerry', 88),"
+ "(1, 'Tom', 89);")
.await();
}
}

```

执行完成后,即可在 OceanBase 中检索验证。

更多信息请参考 [OceanBaseConnectorITCase.java](../../flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java)

#### Flink SQL 示例

将需要用到的依赖的 JAR 文件放到 Flink 的 lib 目录下,之后通过 SQL Client 在 Flink 中创建目的表。
Expand Down Expand Up @@ -158,6 +124,30 @@ VALUES (1, 'Tom', 99),

执行完成后,即可在 OceanBase 中检索验证。

对于 OceanBase 数据库企业版的用户,需要指定 OceanBase JDBC 驱动对应的 `url``driver-class-name`

```sql
CREATE TABLE t_sink
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) with (
'connector' = 'oceanbase',
'url' = 'jdbc:oceanbase://127.0.0.1:2881/SYS',
'driver-class-name' = 'com.oceanbase.jdbc.Driver',
'schema-name' = 'SYS',
'table-name' = 'T_SINK',
'username' = 'SYS@test#obcluster',
'password' = 'pswd',
'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;',
'buffer-flush.interval' = '1s',
'buffer-flush.buffer-size' = '5000',
'max-retries' = '3'
);
```

## 配置项

| 参数名 | Table API 必需 | DataStream 必需 | 默认值 | 类型 | 描述 |
Expand Down

0 comments on commit 0554b05

Please sign in to comment.