Skip to content

Commit 2de4903

Browse files
authored
[oceanbase] Introduce 'table-list' option to support capture list of tables (#1369)
This closes #1369.
1 parent 0438416 commit 2de4903

File tree

9 files changed

+275
-151
lines changed

9 files changed

+275
-151
lines changed

docs/content/connectors/oceanbase-cdc.md

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ Flink SQL> SELECT * FROM orders;
9494
Connector Options
9595
----------------
9696

97+
The OceanBase CDC Connector contains some options for both sql and stream api as the following sheet.
98+
99+
*Note*: The connector supports two ways to specify the table list to listen to, and will get the union of the results when both way are used at the same time.
100+
1. Use `database-name` and `table-name` to match database and table names in regex. As the `obcdc` (former `liboblog`) only supports `fnmatch` now, we can't use regex directly to filter change events, so these two options can only be used in `initial` startup mode.
101+
2. Use `table-list` to match the exact value of database and table names.
102+
97103
<div class="highlight">
98104
<table class="colwidths-auto docutils">
99105
<thead>
@@ -148,21 +154,28 @@ Connector Options
148154
<td>required</td>
149155
<td style="word-wrap: break-word;">(none)</td>
150156
<td>String</td>
151-
<td>Tenant name of OceanBase to monitor.</td>
157+
<td>Tenant name of OceanBase to monitor, should be exact value.</td>
152158
</tr>
153159
<tr>
154160
<td>database-name</td>
155-
<td>required</td>
161+
<td>optional</td>
156162
<td style="word-wrap: break-word;">(none)</td>
157163
<td>String</td>
158-
<td>Database name of OceanBase to monitor.</td>
164+
<td>Database name of OceanBase to monitor, should be regular expression. Only can be used with 'initial' mode.</td>
159165
</tr>
160166
<tr>
161167
<td>table-name</td>
162-
<td>required</td>
168+
<td>optional</td>
169+
<td style="word-wrap: break-word;">(none)</td>
170+
<td>String</td>
171+
<td>Table name of OceanBase to monitor, should be regular expression. Only can be used with 'initial' mode.</td>
172+
</tr>
173+
<tr>
174+
<td>table-list</td>
175+
<td>optional</td>
163176
<td style="word-wrap: break-word;">(none)</td>
164177
<td>String</td>
165-
<td>Table name of OceanBase to monitor.</td>
178+
<td>List of full names of tables, separated by commas, e.g. "db1.table1, db2.table2".</td>
166179
</tr>
167180
<tr>
168181
<td>hostname</td>
@@ -211,7 +224,7 @@ Connector Options
211224
<td>optional</td>
212225
<td style="word-wrap: break-word;">By rule.</td>
213226
<td>String</td>
214-
<td>Id of a log proxy client connection, will be in format {flink_ip}_{process_id}_{timestamp}_{tenant}.{db}.{table} by default.</td>
227+
<td>Id of a log proxy client connection, will be in format {flink_ip}_{process_id}_{timestamp}_{thread_id}_{tenant} by default.</td>
215228
</tr>
216229
<tr>
217230
<td>rootserver-list</td>

docs/content/quickstart/oceanbase-tutorial.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,8 @@ Flink SQL> CREATE TABLE orders (
133133
'username' = 'root',
134134
'password' = 'pswd',
135135
'tenant-name' = 'sys',
136-
'database-name' = 'ob',
137-
'table-name' = 'orders',
136+
'database-name' = '^ob$',
137+
'table-name' = '^orders$',
138138
'hostname' = 'localhost',
139139
'port' = '2881',
140140
'rootserver-list' = '127.0.0.1:2882:2881',
@@ -153,8 +153,8 @@ Flink SQL> CREATE TABLE products (
153153
'username' = 'root',
154154
'password' = 'pswd',
155155
'tenant-name' = 'sys',
156-
'database-name' = 'ob',
157-
'table-name' = 'products',
156+
'database-name' = '^ob$',
157+
'table-name' = '^products$',
158158
'hostname' = 'localhost',
159159
'port' = '2881',
160160
'rootserver-list' = '127.0.0.1:2882:2881',

docs/content/快速上手/oceanbase-tutorial-zh.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ Flink SQL> CREATE TABLE orders (
131131
'username' = 'root',
132132
'password' = 'pswd',
133133
'tenant-name' = 'sys',
134-
'database-name' = 'ob',
135-
'table-name' = 'orders',
134+
'database-name' = '^ob$',
135+
'table-name' = '^orders$',
136136
'hostname' = 'localhost',
137137
'port' = '2881',
138138
'rootserver-list' = '127.0.0.1:2882:2881',
@@ -151,8 +151,8 @@ Flink SQL> CREATE TABLE products (
151151
'username' = 'root',
152152
'password' = 'pswd',
153153
'tenant-name' = 'sys',
154-
'database-name' = 'ob',
155-
'table-name' = 'products',
154+
'database-name' = '^ob$',
155+
'table-name' = '^products$',
156156
'hostname' = 'localhost',
157157
'port' = '2881',
158158
'rootserver-list' = '127.0.0.1:2882:2881',

flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public static class Builder<T> {
5454
private String tenantName;
5555
private String databaseName;
5656
private String tableName;
57+
private String tableList;
5758
private String serverTimeZone;
5859
private Duration connectTimeout;
5960

@@ -102,6 +103,11 @@ public Builder<T> tableName(String tableName) {
102103
return this;
103104
}
104105

106+
public Builder<T> tableList(String tableList) {
107+
this.tableList = tableList;
108+
return this;
109+
}
110+
105111
public Builder<T> serverTimeZone(String serverTimeZone) {
106112
this.serverTimeZone = serverTimeZone;
107113
return this;
@@ -182,6 +188,17 @@ public SourceFunction<T> build() {
182188
startupMode + " mode is not supported.");
183189
}
184190

191+
if (StringUtils.isNotEmpty(databaseName) || StringUtils.isNotEmpty(tableName)) {
192+
if (StringUtils.isEmpty(databaseName) || StringUtils.isEmpty(tableName)) {
193+
throw new IllegalArgumentException(
194+
"'database-name' and 'table-name' should be configured at the same time");
195+
}
196+
} else {
197+
checkNotNull(
198+
tableList,
199+
"'database-name', 'table-name' or 'table-list' should be configured");
200+
}
201+
185202
if (serverTimeZone == null) {
186203
serverTimeZone = "UTC";
187204
}
@@ -191,15 +208,13 @@ public SourceFunction<T> build() {
191208
connectTimeout = Duration.ofSeconds(30);
192209
}
193210

194-
String tableWhiteList =
195-
String.format(
196-
"%s.%s.%s",
197-
checkNotNull(tenantName),
198-
checkNotNull(databaseName),
199-
checkNotNull(tableName));
200-
201211
if (logProxyClientId == null) {
202-
logProxyClientId = ClientIdGenerator.generate() + "_" + tableWhiteList;
212+
logProxyClientId =
213+
String.format(
214+
"%s_%s_%s",
215+
ClientIdGenerator.generate(),
216+
Thread.currentThread().getId(),
217+
checkNotNull(tenantName));
203218
}
204219
ClientConf clientConf =
205220
ClientConf.builder()
@@ -219,7 +234,6 @@ public SourceFunction<T> build() {
219234
}
220235
obReaderConfig.setUsername(username);
221236
obReaderConfig.setPassword(password);
222-
obReaderConfig.setTableWhiteList(tableWhiteList);
223237
obReaderConfig.setStartTimestamp(startupTimestamp);
224238
obReaderConfig.setTimezone(zoneOffset.getId());
225239

@@ -230,6 +244,7 @@ public SourceFunction<T> build() {
230244
tenantName,
231245
databaseName,
232246
tableName,
247+
tableList,
233248
zoneOffset,
234249
connectTimeout,
235250
hostname,

0 commit comments

Comments
 (0)