Skip to content

Commit e636139

Browse files
committed
Merge branch 'master' into regex
2 parents dd59683 + 657f436 commit e636139

File tree

7 files changed

+125
-2
lines changed

7 files changed

+125
-2
lines changed

docs/content/connectors/mongodb-cdc.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,13 @@ Connector Options
226226
<td>Integer</td>
227227
<td>The max size of the queue to use when copying data.</td>
228228
</tr>
229+
<tr>
230+
<td>batch.size</td>
231+
<td>optional</td>
232+
<td style="word-wrap: break-word;">0</td>
233+
<td>Integer</td>
234+
<td>Change stream cursor batch size. Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster. The default is 0 meaning it uses the server's default value.</td>
235+
</tr>
229236
<tr>
230237
<td>poll.max.batch.size</td>
231238
<td>optional</td>

docs/content/connectors/sqlserver-cdc.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,40 @@ Download [flink-sql-connector-sqlserver-cdc-2.3-SNAPSHOT.jar](https://repo1.mave
2626

2727
**Note:** flink-sql-connector-sqlserver-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-sqlserver-cdc-XXX.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc), the released version will be available in the Maven central warehouse.
2828

29+
Setup SQLServer Database
30+
----------------
31+
A SQL Server administrator must enable change data capture on the source tables that you want to capture. The database must already be enabled for CDC. To enable CDC on a table, a SQL Server administrator runs the stored procedure ```sys.sp_cdc_enable_table``` for the table.
32+
33+
**Prerequisites:**
34+
* CDC is enabled on the SQL Server database.
35+
* The SQL Server Agent is running.
36+
* You are a member of the db_owner fixed database role for the database.
37+
38+
**Procedure:**
39+
* Connect to the SQL Server database by database management studio.
40+
* Run the following SQL statement to enable CDC on the table.
41+
```sql
42+
USE MyDB
43+
GO
44+
45+
EXEC sys.sp_cdc_enable_table
46+
@source_schema = N'dbo', -- Specifies the schema of the source table.
47+
@source_name = N'MyTable', -- Specifies the name of the table that you want to capture.
48+
@role_name = N'MyRole', -- Specifies a role MyRole to which you can add users to whom you want to grant SELECT permission on the captured columns of the source table. Users in the sysadmin or db_owner role also have access to the specified change tables. Set the value of @role_name to NULL, to allow only members in the sysadmin or db_owner to have full access to captured information.
49+
@filegroup_name = N'MyDB_CT',-- Specifies the filegroup where SQL Server places the change table for the captured table. The named filegroup must already exist. It is best not to locate change tables in the same filegroup that you use for source tables.
50+
@supports_net_changes = 0
51+
GO
52+
```
53+
* Verifying that the user has access to the CDC table
54+
```sql
55+
--The following example runs the stored procedure sys.sp_cdc_help_change_data_capture on the database MyDB:
56+
USE MyDB;
57+
GO
58+
EXEC sys.sp_cdc_help_change_data_capture
59+
GO
60+
```
61+
The query returns configuration information for each table in the database that is enabled for CDC and that contains change data that the caller is authorized to access. If the result is empty, verify that the user has privileges to access both the capture instance and the CDC tables.
62+
2963
How to create a SQLServer CDC table
3064
----------------
3165

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ public class MongoDBSource {
5959

6060
public static final String FULL_DOCUMENT_UPDATE_LOOKUP = FullDocument.UPDATE_LOOKUP.getValue();
6161

62+
public static final int BATCH_SIZE_DEFAULT = 0;
63+
6264
public static final int POLL_MAX_BATCH_SIZE_DEFAULT = 1000;
6365

6466
public static final int POLL_AWAIT_TIME_MILLIS_DEFAULT = 1500;
@@ -128,7 +130,7 @@ public static class Builder<T> {
128130
private List<String> databaseList;
129131
private List<String> collectionList;
130132
private String connectionOptions;
131-
private Integer batchSize;
133+
private Integer batchSize = BATCH_SIZE_DEFAULT;
132134
private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS_DEFAULT;
133135
private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE_DEFAULT;
134136
private Boolean copyExisting = true;
@@ -186,7 +188,9 @@ public Builder<T> collectionList(String... collectionList) {
186188
/**
187189
* batch.size
188190
*
189-
* <p>The cursor batch size. Default: 0
191+
* <p>The change stream cursor batch size. Specifies the maximum number of change events to
192+
* return in each batch of the response from the MongoDB cluster. The default is 0 meaning
193+
* it uses the server's default value. Default: 0
190194
*/
191195
public Builder<T> batchSize(int batchSize) {
192196
checkArgument(batchSize >= 0);

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
6969
private final String copyExistingPipeline;
7070
private final Integer copyExistingMaxThreads;
7171
private final Integer copyExistingQueueSize;
72+
private final Integer batchSize;
7273
private final Integer pollMaxBatchSize;
7374
private final Integer pollAwaitTimeMillis;
7475
private final Integer heartbeatIntervalMillis;
@@ -98,6 +99,7 @@ public MongoDBTableSource(
9899
@Nullable String copyExistingPipeline,
99100
@Nullable Integer copyExistingMaxThreads,
100101
@Nullable Integer copyExistingQueueSize,
102+
@Nullable Integer batchSize,
101103
@Nullable Integer pollMaxBatchSize,
102104
@Nullable Integer pollAwaitTimeMillis,
103105
@Nullable Integer heartbeatIntervalMillis,
@@ -115,6 +117,7 @@ public MongoDBTableSource(
115117
this.copyExistingPipeline = copyExistingPipeline;
116118
this.copyExistingMaxThreads = copyExistingMaxThreads;
117119
this.copyExistingQueueSize = copyExistingQueueSize;
120+
this.batchSize = batchSize;
118121
this.pollMaxBatchSize = pollMaxBatchSize;
119122
this.pollAwaitTimeMillis = pollAwaitTimeMillis;
120123
this.heartbeatIntervalMillis = heartbeatIntervalMillis;
@@ -175,6 +178,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
175178
Optional.ofNullable(copyExistingPipeline).ifPresent(builder::copyExistingPipeline);
176179
Optional.ofNullable(copyExistingMaxThreads).ifPresent(builder::copyExistingMaxThreads);
177180
Optional.ofNullable(copyExistingQueueSize).ifPresent(builder::copyExistingQueueSize);
181+
Optional.ofNullable(batchSize).ifPresent(builder::batchSize);
178182
Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize);
179183
Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis);
180184
Optional.ofNullable(heartbeatIntervalMillis).ifPresent(builder::heartbeatIntervalMillis);
@@ -232,6 +236,7 @@ public DynamicTableSource copy() {
232236
copyExistingPipeline,
233237
copyExistingMaxThreads,
234238
copyExistingQueueSize,
239+
batchSize,
235240
pollMaxBatchSize,
236241
pollAwaitTimeMillis,
237242
heartbeatIntervalMillis,
@@ -263,6 +268,7 @@ public boolean equals(Object o) {
263268
&& Objects.equals(copyExistingPipeline, that.copyExistingPipeline)
264269
&& Objects.equals(copyExistingMaxThreads, that.copyExistingMaxThreads)
265270
&& Objects.equals(copyExistingQueueSize, that.copyExistingQueueSize)
271+
&& Objects.equals(batchSize, that.batchSize)
266272
&& Objects.equals(pollMaxBatchSize, that.pollMaxBatchSize)
267273
&& Objects.equals(pollAwaitTimeMillis, that.pollAwaitTimeMillis)
268274
&& Objects.equals(heartbeatIntervalMillis, that.heartbeatIntervalMillis)
@@ -287,6 +293,7 @@ public int hashCode() {
287293
copyExistingPipeline,
288294
copyExistingMaxThreads,
289295
copyExistingQueueSize,
296+
batchSize,
290297
pollMaxBatchSize,
291298
pollAwaitTimeMillis,
292299
heartbeatIntervalMillis,

flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.HashSet;
3131
import java.util.Set;
3232

33+
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.BATCH_SIZE_DEFAULT;
3334
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE_NONE;
3435
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT;
3536
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT;
@@ -148,6 +149,16 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
148149
.withDescription(
149150
"The max size of the queue to use when copying data. Defaults to 16000.");
150151

152+
private static final ConfigOption<Integer> BATCH_SIZE =
153+
ConfigOptions.key("batch.size")
154+
.intType()
155+
.defaultValue(BATCH_SIZE_DEFAULT)
156+
.withDescription(
157+
"Change stream cursor batch size. "
158+
+ "Specifies the maximum number of change events to return in each batch "
159+
+ "of the response from the MongoDB cluster."
160+
+ "Defaults to 0 meaning it uses the server's default value.");
161+
151162
private static final ConfigOption<Integer> POLL_MAX_BATCH_SIZE =
152163
ConfigOptions.key("poll.max.batch.size")
153164
.intType()
@@ -196,6 +207,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
196207
String errorsTolerance = config.get(ERRORS_TOLERANCE);
197208
Boolean errorsLogEnable = config.get(ERRORS_LOG_ENABLE);
198209

210+
Integer batchSize = config.get(BATCH_SIZE);
199211
Integer pollMaxBatchSize = config.get(POLL_MAX_BATCH_SIZE);
200212
Integer pollAwaitTimeMillis = config.get(POLL_AWAIT_TIME_MILLIS);
201213

@@ -232,6 +244,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
232244
copyExistingPipeline,
233245
copyExistingMaxThreads,
234246
copyExistingQueueSize,
247+
batchSize,
235248
pollMaxBatchSize,
236249
pollAwaitTimeMillis,
237250
heartbeatIntervalMillis,
@@ -270,6 +283,7 @@ public Set<ConfigOption<?>> optionalOptions() {
270283
options.add(COPY_EXISTING_PIPELINE);
271284
options.add(COPY_EXISTING_MAX_THREADS);
272285
options.add(COPY_EXISTING_QUEUE_SIZE);
286+
options.add(BATCH_SIZE);
273287
options.add(POLL_MAX_BATCH_SIZE);
274288
options.add(POLL_AWAIT_TIME_MILLIS);
275289
options.add(HEARTBEAT_INTERVAL_MILLIS);
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2022 Ververica Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.ververica.cdc.connectors.mongodb;
18+
19+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
20+
import org.apache.flink.streaming.api.functions.source.SourceFunction;
21+
22+
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
23+
import org.junit.Ignore;
24+
import org.junit.Test;
25+
26+
/** Example Tests for {@link MongoDBSource}. */
27+
public class MongoDBSourceExampleTest extends MongoDBTestBase {
28+
29+
@Test
30+
@Ignore("Test ignored because it won't stop and is used for manual test")
31+
public void testConsumingAllEvents() throws Exception {
32+
String inventory = executeCommandFileInSeparateDatabase("inventory");
33+
34+
SourceFunction<String> sourceFunction =
35+
MongoDBSource.<String>builder()
36+
.hosts(MONGODB_CONTAINER.getHostAndPort())
37+
.username(FLINK_USER)
38+
.password(FLINK_USER_PASSWORD)
39+
.databaseList(inventory)
40+
.collectionList(inventory + ".products")
41+
.deserializer(new JsonDebeziumDeserializationSchema())
42+
.build();
43+
44+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
45+
46+
env.addSource(sourceFunction)
47+
.print()
48+
.setParallelism(1); // use parallelism 1 for sink to keep message ordering
49+
50+
env.execute("Print MongoDB Snapshot + Change Stream Events");
51+
}
52+
}

flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.HashMap;
4545
import java.util.Map;
4646

47+
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.BATCH_SIZE_DEFAULT;
4748
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE_ALL;
4849
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT;
4950
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT;
@@ -110,6 +111,7 @@ public void testCommonProperties() {
110111
null,
111112
null,
112113
null,
114+
BATCH_SIZE_DEFAULT,
113115
POLL_MAX_BATCH_SIZE_DEFAULT,
114116
POLL_AWAIT_TIME_MILLIS_DEFAULT,
115117
null,
@@ -127,6 +129,7 @@ public void testOptionalProperties() {
127129
options.put("copy.existing.pipeline", "[ { \"$match\": { \"closed\": \"false\" } } ]");
128130
options.put("copy.existing.max.threads", "1");
129131
options.put("copy.existing.queue.size", "101");
132+
options.put("batch.size", "101");
130133
options.put("poll.max.batch.size", "102");
131134
options.put("poll.await.time.ms", "103");
132135
options.put("heartbeat.interval.ms", "104");
@@ -147,6 +150,7 @@ public void testOptionalProperties() {
147150
"[ { \"$match\": { \"closed\": \"false\" } } ]",
148151
1,
149152
101,
153+
101,
150154
102,
151155
103,
152156
104,
@@ -181,6 +185,7 @@ public void testMetadataColumns() {
181185
null,
182186
null,
183187
null,
188+
BATCH_SIZE_DEFAULT,
184189
POLL_MAX_BATCH_SIZE_DEFAULT,
185190
POLL_AWAIT_TIME_MILLIS_DEFAULT,
186191
null,

0 commit comments

Comments
 (0)