Skip to content

Commit 33b7c85

Browse files
banmoyleonardBang
authored andcommitted
[pipeline-connector][starrocks] Add starrocks pipeline connector (#2765)
This closes #2645.
1 parent 4abd86a commit 33b7c85

File tree

18 files changed

+2279
-3
lines changed

18 files changed

+2279
-3
lines changed
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Copyright 2023 Ververica Inc.
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
Unless required by applicable law or agreed to in writing,
10+
software distributed under the License is distributed on an
11+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
12+
KIND, either express or implied. See the License for the
13+
specific language governing permissions and limitations
14+
under the License.
15+
-->
16+
<project xmlns="http://maven.apache.org/POM/4.0.0"
17+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
18+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
19+
<parent>
20+
<artifactId>flink-cdc-pipeline-connectors</artifactId>
21+
<groupId>com.ververica</groupId>
22+
<version>${revision}</version>
23+
</parent>
24+
<modelVersion>4.0.0</modelVersion>
25+
26+
<artifactId>flink-cdc-pipeline-connector-starrocks</artifactId>
27+
28+
<properties>
29+
<starrocks.connector.version>1.2.9_flink-${flink.major.version}</starrocks.connector.version>
30+
</properties>
31+
32+
<dependencies>
33+
<dependency>
34+
<groupId>com.starrocks</groupId>
35+
<artifactId>flink-connector-starrocks</artifactId>
36+
<version>${starrocks.connector.version}</version>
37+
</dependency>
38+
39+
<dependency>
40+
<!-- TODO connector 1.2.9 depends on this, but not package it, so add this dependency here.
41+
This dependency can be removed after upgrading connector to 1.2.10 which will not use
42+
commons-compress anymore. -->
43+
<groupId>org.apache.commons</groupId>
44+
<artifactId>commons-compress</artifactId>
45+
<version>1.21</version>
46+
</dependency>
47+
48+
<dependency>
49+
<groupId>com.ververica</groupId>
50+
<artifactId>flink-cdc-composer</artifactId>
51+
<version>${revision}</version>
52+
</dependency>
53+
</dependencies>
54+
55+
<build>
56+
<plugins>
57+
<plugin>
58+
<groupId>org.apache.maven.plugins</groupId>
59+
<artifactId>maven-shade-plugin</artifactId>
60+
<version>3.1.1</version>
61+
<executions>
62+
<execution>
63+
<id>shade-flink</id>
64+
<phase>package</phase>
65+
<goals>
66+
<goal>shade</goal>
67+
</goals>
68+
<configuration>
69+
<shadeTestJar>false</shadeTestJar>
70+
<artifactSet>
71+
<includes>
72+
<include>com.starrocks:*</include>
73+
<include>org.apache.commons:commons-compress</include>
74+
</includes>
75+
</artifactSet>
76+
<relocations>
77+
<relocation>
78+
<pattern>org.apache.commons.compress</pattern>
79+
<shadedPattern>com.starrocks.shade.org.apache.commons.compress</shadedPattern>
80+
</relocation>
81+
</relocations>
82+
</configuration>
83+
</execution>
84+
</executions>
85+
</plugin>
86+
</plugins>
87+
</build>
88+
</project>
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright 2023 Ververica Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.ververica.cdc.connectors.starrocks.sink;
18+
19+
import org.apache.flink.api.common.serialization.SerializationSchema;
20+
21+
import com.starrocks.connector.flink.table.data.DefaultStarRocksRowData;
22+
import com.starrocks.connector.flink.table.data.StarRocksRowData;
23+
import com.starrocks.connector.flink.table.sink.v2.RecordSerializationSchema;
24+
import com.starrocks.connector.flink.table.sink.v2.StarRocksSinkContext;
25+
import com.starrocks.connector.flink.tools.JsonWrapper;
26+
import com.ververica.cdc.common.data.RecordData;
27+
import com.ververica.cdc.common.event.CreateTableEvent;
28+
import com.ververica.cdc.common.event.DataChangeEvent;
29+
import com.ververica.cdc.common.event.Event;
30+
import com.ververica.cdc.common.event.SchemaChangeEvent;
31+
import com.ververica.cdc.common.event.TableId;
32+
import com.ververica.cdc.common.schema.Column;
33+
import com.ververica.cdc.common.schema.Schema;
34+
import com.ververica.cdc.common.utils.Preconditions;
35+
import com.ververica.cdc.common.utils.SchemaUtils;
36+
37+
import java.time.ZoneId;
38+
import java.util.HashMap;
39+
import java.util.List;
40+
import java.util.Map;
41+
42+
import static com.ververica.cdc.connectors.starrocks.sink.StarRocksUtils.createFieldGetter;
43+
44+
/** Serializer for the input {@link Event}. It will serialize a row to a json string. */
45+
public class EventRecordSerializationSchema implements RecordSerializationSchema<Event> {
46+
47+
private static final long serialVersionUID = 1L;
48+
49+
/**
50+
* The local time zone used when converting from <code>TIMESTAMP WITH LOCAL TIME ZONE</code>.
51+
*/
52+
private final ZoneId zoneId;
53+
54+
/** keep the relationship of TableId and table information. */
55+
private transient Map<TableId, TableInfo> tableInfoMap;
56+
57+
private transient DefaultStarRocksRowData reusableRowData;
58+
private transient JsonWrapper jsonWrapper;
59+
60+
public EventRecordSerializationSchema(ZoneId zoneId) {
61+
this.zoneId = zoneId;
62+
}
63+
64+
@Override
65+
public void open(
66+
SerializationSchema.InitializationContext context, StarRocksSinkContext sinkContext) {
67+
this.tableInfoMap = new HashMap<>();
68+
this.reusableRowData = new DefaultStarRocksRowData();
69+
this.jsonWrapper = new JsonWrapper();
70+
}
71+
72+
@Override
73+
public StarRocksRowData serialize(Event record) {
74+
if (record instanceof SchemaChangeEvent) {
75+
applySchemaChangeEvent((SchemaChangeEvent) record);
76+
return null;
77+
} else if (record instanceof DataChangeEvent) {
78+
return applyDataChangeEvent((DataChangeEvent) record);
79+
} else {
80+
throw new UnsupportedOperationException("Don't support event " + record);
81+
}
82+
}
83+
84+
private void applySchemaChangeEvent(SchemaChangeEvent event) {
85+
TableId tableId = event.tableId();
86+
Schema newSchema;
87+
if (event instanceof CreateTableEvent) {
88+
newSchema = ((CreateTableEvent) event).getSchema();
89+
} else {
90+
TableInfo tableInfo = tableInfoMap.get(tableId);
91+
if (tableInfo == null) {
92+
throw new RuntimeException("schema of " + tableId + " is not existed.");
93+
}
94+
newSchema = SchemaUtils.applySchemaChangeEvent(tableInfo.schema, event);
95+
}
96+
TableInfo tableInfo = new TableInfo();
97+
tableInfo.schema = newSchema;
98+
tableInfo.fieldGetters = new RecordData.FieldGetter[newSchema.getColumnCount()];
99+
for (int i = 0; i < newSchema.getColumnCount(); i++) {
100+
tableInfo.fieldGetters[i] =
101+
createFieldGetter(newSchema.getColumns().get(i).getType(), i, zoneId);
102+
}
103+
tableInfoMap.put(tableId, tableInfo);
104+
}
105+
106+
private StarRocksRowData applyDataChangeEvent(DataChangeEvent event) {
107+
TableInfo tableInfo = tableInfoMap.get(event.tableId());
108+
Preconditions.checkNotNull(tableInfo, event.tableId() + " is not existed");
109+
reusableRowData.setDatabase(event.tableId().getSchemaName());
110+
reusableRowData.setTable(event.tableId().getTableName());
111+
String value;
112+
switch (event.op()) {
113+
case INSERT:
114+
case UPDATE:
115+
case REPLACE:
116+
value = serializeRecord(tableInfo, event.after(), false);
117+
break;
118+
case DELETE:
119+
value = serializeRecord(tableInfo, event.before(), true);
120+
break;
121+
default:
122+
throw new UnsupportedOperationException(
123+
"Don't support operation type " + event.op());
124+
}
125+
reusableRowData.setRow(value);
126+
return reusableRowData;
127+
}
128+
129+
private String serializeRecord(TableInfo tableInfo, RecordData record, boolean isDelete) {
130+
List<Column> columns = tableInfo.schema.getColumns();
131+
Preconditions.checkArgument(columns.size() == record.getArity());
132+
Map<String, Object> rowMap = new HashMap<>(record.getArity() + 1);
133+
for (int i = 0; i < record.getArity(); i++) {
134+
rowMap.put(columns.get(i).getName(), tableInfo.fieldGetters[i].getFieldOrNull(record));
135+
}
136+
rowMap.put("__op", isDelete ? 1 : 0);
137+
return jsonWrapper.toJSONString(rowMap);
138+
}
139+
140+
@Override
141+
public void close() {}
142+
143+
/** Table information. */
144+
private static class TableInfo {
145+
Schema schema;
146+
RecordData.FieldGetter[] fieldGetters;
147+
}
148+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2023 Ververica Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.ververica.cdc.connectors.starrocks.sink;
18+
19+
import com.ververica.cdc.common.configuration.Configuration;
20+
import com.ververica.cdc.common.utils.Preconditions;
21+
22+
import java.io.Serializable;
23+
24+
import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.TABLE_SCHEMA_CHANGE_TIMEOUT;
25+
26+
/** Configurations for schema change. */
27+
public class SchemaChangeConfig implements Serializable {
28+
29+
private static final long serialVersionUID = 1L;
30+
31+
/** Timeout for a schema change on StarRocks side. */
32+
private final long timeoutSecond;
33+
34+
public SchemaChangeConfig(long timeoutSecond) {
35+
Preconditions.checkArgument(
36+
timeoutSecond > 0, "Timeout must be positive, but actually is %s", timeoutSecond);
37+
this.timeoutSecond = timeoutSecond;
38+
}
39+
40+
public long getTimeoutSecond() {
41+
return timeoutSecond;
42+
}
43+
44+
public static SchemaChangeConfig from(Configuration config) {
45+
long timeoutSecond = Math.max(1, config.get(TABLE_SCHEMA_CHANGE_TIMEOUT).getSeconds());
46+
return new SchemaChangeConfig(timeoutSecond);
47+
}
48+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2023 Ververica Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.ververica.cdc.connectors.starrocks.sink;
18+
19+
import com.starrocks.connector.flink.catalog.StarRocksCatalog;
20+
import com.starrocks.connector.flink.table.sink.SinkFunctionFactory;
21+
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
22+
import com.starrocks.connector.flink.table.sink.v2.StarRocksSink;
23+
import com.ververica.cdc.common.event.Event;
24+
import com.ververica.cdc.common.sink.DataSink;
25+
import com.ververica.cdc.common.sink.EventSinkProvider;
26+
import com.ververica.cdc.common.sink.FlinkSinkProvider;
27+
import com.ververica.cdc.common.sink.MetadataApplier;
28+
29+
import java.io.Serializable;
30+
import java.time.ZoneId;
31+
32+
/** A {@link DataSink} for StarRocks connector that supports schema evolution. */
33+
public class StarRocksDataSink implements DataSink, Serializable {
34+
35+
private static final long serialVersionUID = 1L;
36+
37+
/** Configurations for sink connector. */
38+
private final StarRocksSinkOptions sinkOptions;
39+
40+
/** Configurations for creating a StarRocks table. */
41+
private final TableCreateConfig tableCreateConfig;
42+
43+
/** Configurations for schema change. */
44+
private final SchemaChangeConfig schemaChangeConfig;
45+
46+
/**
47+
* The local time zone used when converting from <code>TIMESTAMP WITH LOCAL TIME ZONE</code>.
48+
*/
49+
private final ZoneId zoneId;
50+
51+
public StarRocksDataSink(
52+
StarRocksSinkOptions sinkOptions,
53+
TableCreateConfig tableCreateConfig,
54+
SchemaChangeConfig schemaChangeConfig,
55+
ZoneId zoneId) {
56+
this.sinkOptions = sinkOptions;
57+
this.tableCreateConfig = tableCreateConfig;
58+
this.schemaChangeConfig = schemaChangeConfig;
59+
this.zoneId = zoneId;
60+
}
61+
62+
@Override
63+
public EventSinkProvider getEventSinkProvider() {
64+
StarRocksSink<Event> starRocksSink =
65+
SinkFunctionFactory.createSink(
66+
sinkOptions, new EventRecordSerializationSchema(zoneId));
67+
return FlinkSinkProvider.of(starRocksSink);
68+
}
69+
70+
@Override
71+
public MetadataApplier getMetadataApplier() {
72+
StarRocksCatalog catalog =
73+
new StarRocksCatalog(
74+
sinkOptions.getJdbcUrl(),
75+
sinkOptions.getUsername(),
76+
sinkOptions.getPassword());
77+
return new StarRocksMetadataApplier(catalog, tableCreateConfig, schemaChangeConfig);
78+
}
79+
}

0 commit comments

Comments
 (0)