Skip to content

Commit b66a4db

Browse files
committed
use 'compatible mode' instead of 'database mode'
1 parent 47f0e53 commit b66a4db

File tree

2 files changed

+14
-11
lines changed

2 files changed

+14
-11
lines changed

flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseConnection.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class OceanBaseConnection extends JdbcConnection {
4040
private static final String OB_URL_PATTERN =
4141
"jdbc:oceanbase://${hostname}:${port}/?connectTimeout=${connectTimeout}&serverTimezone=${serverTimezone}";
4242

43-
private String databaseMode;
43+
private String compatibleMode;
4444

4545
public OceanBaseConnection(
4646
String hostname,
@@ -108,16 +108,16 @@ private static JdbcConnection.ConnectionFactory factory(
108108
}
109109

110110
/**
111-
* Get the database mode of this connection, should be 'MySQL' or 'Oracle'.
111+
* Get the compatible mode of this connection, should be 'MySQL' or 'Oracle'.
112112
*
113-
* @return The database mode.
113+
* @return The compatible mode.
114114
* @throws SQLException If a database access error occurs.
115115
*/
116-
public String getDatabaseMode() throws SQLException {
117-
if (databaseMode == null) {
118-
databaseMode = connection().getMetaData().getDatabaseProductName();
116+
public String getCompatibleMode() throws SQLException {
117+
if (compatibleMode == null) {
118+
compatibleMode = connection().getMetaData().getDatabaseProductName();
119119
}
120-
return databaseMode;
120+
return compatibleMode;
121121
}
122122

123123
/**
@@ -131,7 +131,7 @@ public String getDatabaseMode() throws SQLException {
131131
public List<String> getTables(String dbPattern, String tbPattern) throws SQLException {
132132
List<String> result = new ArrayList<>();
133133
DatabaseMetaData metaData = connection().getMetaData();
134-
switch (getDatabaseMode().toLowerCase()) {
134+
switch (getCompatibleMode().toLowerCase()) {
135135
case "mysql":
136136
List<String> dbNames = getResultList(metaData.getCatalogs(), "TABLE_CAT");
137137
dbNames =
@@ -165,7 +165,8 @@ public List<String> getTables(String dbPattern, String tbPattern) throws SQLExce
165165
}
166166
break;
167167
default:
168-
throw new FlinkRuntimeException("Unsupported database mode: " + getDatabaseMode());
168+
throw new FlinkRuntimeException(
169+
"Unsupported database mode: " + getCompatibleMode());
169170
}
170171
return result;
171172
}

flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,9 @@ private void initTableWhiteList() {
227227
&& StringUtils.isNotBlank(databaseName)
228228
&& StringUtils.isNotBlank(tableName)) {
229229
try {
230-
LOG.info("Connection database mode: {}", getSnapshotConnection().getDatabaseMode());
230+
LOG.info(
231+
"Connection database mode: {}",
232+
getSnapshotConnection().getCompatibleMode());
231233
List<String> tables = getSnapshotConnection().getTables(databaseName, tableName);
232234
LOG.info("Pattern matched tables: {}", tables);
233235
localTableSet.addAll(tables);
@@ -263,7 +265,7 @@ private void readSnapshotRecordsByTable(String databaseName, String tableName) {
263265
new OceanBaseRecord.SourceInfo(
264266
tenantName, databaseName, tableName, resolvedTimestamp);
265267
try {
266-
String databaseMode = getSnapshotConnection().getDatabaseMode();
268+
String databaseMode = getSnapshotConnection().getCompatibleMode();
267269
String fullName;
268270
if ("mysql".equalsIgnoreCase(databaseMode)) {
269271
fullName = String.format("`%s`.`%s`", databaseName, tableName);

0 commit comments

Comments
 (0)