Skip to content

Commit bb60bfe

Browse files
authored
[mysql-cdc] Supports MYSQL_TYPE_TYPED_ARRAY column type when parsing the table map event
This closes #2001
1 parent 892f2d6 commit bb60bfe

File tree

3 files changed

+354
-0
lines changed

3 files changed

+354
-0
lines changed
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright 2022 Ververica Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.github.shyiko.mysql.binlog.event.deserialization;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
22+
/**
23+
* Copied from mysql-binlog-connector 0.25.3 to support MYSQL_TYPE_TYPED_ARRAY.
24+
*
25+
* <p>Line 57: Add support for mysql data type: MYSQL_TYPE_TYPED_ARRAY. Its type code is changed to
26+
* 20 in <a
27+
* href="https://github.com/mysql/mysql-server/commit/9082b6a820f3948fd563cc32a050f5e8775f2855">MySql
28+
* Bug#29948925</a> since mysql 8.0.18+.
29+
*
30+
* <p>Remove this file once <a
31+
* href="https://github.com/osheroff/mysql-binlog-connector-java/issues/104">mysql-binlog-connector-java#104</a>
32+
* fixed.
33+
*/
34+
public enum ColumnType {
35+
DECIMAL(0),
36+
TINY(1),
37+
SHORT(2),
38+
LONG(3),
39+
FLOAT(4),
40+
DOUBLE(5),
41+
NULL(6),
42+
TIMESTAMP(7),
43+
LONGLONG(8),
44+
INT24(9),
45+
DATE(10),
46+
TIME(11),
47+
DATETIME(12),
48+
YEAR(13),
49+
NEWDATE(14),
50+
VARCHAR(15),
51+
BIT(16),
52+
// (TIMESTAMP|DATETIME|TIME)_V2 data types appeared in MySQL 5.6.4
53+
// @see http://dev.mysql.com/doc/internals/en/date-and-time-data-type-representation.html
54+
TIMESTAMP_V2(17),
55+
DATETIME_V2(18),
56+
TIME_V2(19),
57+
TYPED_ARRAY(20),
58+
JSON(245),
59+
NEWDECIMAL(246),
60+
ENUM(247),
61+
SET(248),
62+
TINY_BLOB(249),
63+
MEDIUM_BLOB(250),
64+
LONG_BLOB(251),
65+
BLOB(252),
66+
VAR_STRING(253),
67+
STRING(254),
68+
GEOMETRY(255);
69+
70+
private int code;
71+
72+
private ColumnType(int code) {
73+
this.code = code;
74+
}
75+
76+
public int getCode() {
77+
return code;
78+
}
79+
80+
private static final Map<Integer, ColumnType> INDEX_BY_CODE;
81+
82+
static {
83+
INDEX_BY_CODE = new HashMap<Integer, ColumnType>();
84+
for (ColumnType columnType : values()) {
85+
INDEX_BY_CODE.put(columnType.code, columnType);
86+
}
87+
}
88+
89+
public static ColumnType byCode(int code) {
90+
return INDEX_BY_CODE.get(code);
91+
}
92+
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright 2022 Ververica Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.github.shyiko.mysql.binlog.event.deserialization;
18+
19+
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
20+
import com.github.shyiko.mysql.binlog.event.TableMapEventMetadata;
21+
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
22+
23+
import java.io.IOException;
24+
25+
import static com.github.shyiko.mysql.binlog.event.deserialization.ColumnType.TYPED_ARRAY;
26+
27+
/**
28+
* Copied from mysql-binlog-connector 0.25.3 to support MYSQL_TYPE_TYPED_ARRAY.
29+
*
30+
* <p>Line 93 ~ 98: process MYSQL_TYPE_TYPED_ARRAY metadata, imitated the code in canal <a
31+
* href="https://github.com/alibaba/canal/blob/master/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/TableMapLogEvent.java#L546">TableMapLogEvent#decodeFields</a>.
32+
*
33+
* <p>Remove this file once <a
34+
* href="https://github.com/osheroff/mysql-binlog-connector-java/issues/104">mysql-binlog-connector-java#104</a>
35+
* fixed.
36+
*/
37+
public class TableMapEventDataDeserializer implements EventDataDeserializer<TableMapEventData> {
38+
39+
private final TableMapEventMetadataDeserializer metadataDeserializer =
40+
new TableMapEventMetadataDeserializer();
41+
42+
@Override
43+
public TableMapEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
44+
TableMapEventData eventData = new TableMapEventData();
45+
eventData.setTableId(inputStream.readLong(6));
46+
inputStream.skip(3); // 2 bytes reserved for future use + 1 for the length of database name
47+
eventData.setDatabase(inputStream.readZeroTerminatedString());
48+
inputStream.skip(1); // table name
49+
eventData.setTable(inputStream.readZeroTerminatedString());
50+
int numberOfColumns = inputStream.readPackedInteger();
51+
eventData.setColumnTypes(inputStream.read(numberOfColumns));
52+
inputStream.readPackedInteger(); // metadata length
53+
eventData.setColumnMetadata(readMetadata(inputStream, eventData.getColumnTypes()));
54+
eventData.setColumnNullability(inputStream.readBitSet(numberOfColumns, true));
55+
int metadataLength = inputStream.available();
56+
TableMapEventMetadata metadata = null;
57+
if (metadataLength > 0) {
58+
metadata =
59+
metadataDeserializer.deserialize(
60+
new ByteArrayInputStream(inputStream.read(metadataLength)),
61+
eventData.getColumnTypes().length,
62+
numericColumnCount(eventData.getColumnTypes()));
63+
}
64+
eventData.setEventMetadata(metadata);
65+
return eventData;
66+
}
67+
68+
private int numericColumnCount(byte[] types) {
69+
int count = 0;
70+
for (int i = 0; i < types.length; i++) {
71+
switch (ColumnType.byCode(types[i] & 0xff)) {
72+
case TINY:
73+
case SHORT:
74+
case INT24:
75+
case LONG:
76+
case LONGLONG:
77+
case NEWDECIMAL:
78+
case FLOAT:
79+
case DOUBLE:
80+
count++;
81+
break;
82+
default:
83+
break;
84+
}
85+
}
86+
return count;
87+
}
88+
89+
private int[] readMetadata(ByteArrayInputStream inputStream, byte[] columnTypes)
90+
throws IOException {
91+
int[] metadata = new int[columnTypes.length];
92+
for (int i = 0; i < columnTypes.length; i++) {
93+
ColumnType columnType = ColumnType.byCode(columnTypes[i] & 0xFF);
94+
if (columnType == TYPED_ARRAY) {
95+
byte[] arrayType = inputStream.read(1);
96+
columnType = ColumnType.byCode(arrayType[0] & 0xFF);
97+
}
98+
switch (columnType) {
99+
case FLOAT:
100+
case DOUBLE:
101+
case BLOB:
102+
case JSON:
103+
case GEOMETRY:
104+
metadata[i] = inputStream.readInteger(1);
105+
break;
106+
case BIT:
107+
case VARCHAR:
108+
case NEWDECIMAL:
109+
metadata[i] = inputStream.readInteger(2);
110+
break;
111+
case SET:
112+
case ENUM:
113+
case STRING:
114+
metadata[i] = bigEndianInteger(inputStream.read(2), 0, 2);
115+
break;
116+
case TIME_V2:
117+
case DATETIME_V2:
118+
case TIMESTAMP_V2:
119+
metadata[i] = inputStream.readInteger(1); // fsp (@see {@link ColumnType})
120+
break;
121+
default:
122+
metadata[i] = 0;
123+
}
124+
}
125+
return metadata;
126+
}
127+
128+
private static int bigEndianInteger(byte[] bytes, int offset, int length) {
129+
int result = 0;
130+
for (int i = offset; i < (offset + length); i++) {
131+
byte b = bytes[i];
132+
result = (result << 8) | (b >= 0 ? (int) b : (b + 256));
133+
}
134+
return result;
135+
}
136+
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2022 Ververica Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.github.shyiko.mysql.binlog.event.deserialization;
18+
19+
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
20+
import com.github.shyiko.mysql.binlog.event.TableMapEventMetadata;
21+
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
22+
import org.junit.Test;
23+
24+
import java.io.IOException;
25+
import java.util.BitSet;
26+
27+
import static org.assertj.core.api.Assertions.assertThat;
28+
29+
/** Tests for the copied class {@link TableMapEventDataDeserializer}. */
30+
public class TableMapEventDataDeserializerTest {
31+
@Test
32+
public void testDeserialize() throws IOException {
33+
TableMapEventDataDeserializer deserializer = new TableMapEventDataDeserializer();
34+
// The Table_map_event data. See its format at
35+
// https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1Table__map__event.html
36+
byte[] data = {
37+
// table_id : 6 bytes
38+
1,
39+
0,
40+
0,
41+
0,
42+
0,
43+
0,
44+
// flags : 2 bytes
45+
1,
46+
0,
47+
// database_name string length : 1 byte
48+
6,
49+
// database_name null-terminated string, end with 0
50+
116,
51+
101,
52+
115,
53+
116,
54+
68,
55+
98,
56+
0,
57+
// table_name string length : 1 byte
58+
9,
59+
// table_name null-terminated string, end with 0
60+
116,
61+
101,
62+
115,
63+
116,
64+
84,
65+
97,
66+
98,
67+
108,
68+
101,
69+
0,
70+
// column_count
71+
3,
72+
// column_type list
73+
8,
74+
1,
75+
20,
76+
// metadata_length
77+
1,
78+
// metadata
79+
8,
80+
// null_bits
81+
80,
82+
// optional metadata fields stored in Type, Length, Value(TLV) format.
83+
// Type takes 1 byte. Length is a packed integer value. Values takes Length bytes.
84+
85+
// SIGNEDNESS
86+
1,
87+
1,
88+
0,
89+
// DEFAULT_CHARSET
90+
2,
91+
1,
92+
45
93+
};
94+
TableMapEventData eventData = deserializer.deserialize(new ByteArrayInputStream(data));
95+
assertThat(eventData.toString()).isEqualTo(getExpectedEventData().toString());
96+
}
97+
98+
private TableMapEventData getExpectedEventData() {
99+
TableMapEventData eventData = new TableMapEventData();
100+
// table_id
101+
eventData.setTableId(1);
102+
// database_name
103+
eventData.setDatabase("testDb");
104+
// table_name
105+
eventData.setTable("testTable");
106+
107+
// column_type
108+
// 3 column types: MYSQL_TYPE_LONGLONG, MYSQL_TYPE_TINY, MYSQL_TYPE_TYPED_ARRAY<LONGLONG>
109+
eventData.setColumnTypes(new byte[] {8, 1, 20});
110+
111+
// metadata of the column types
112+
eventData.setColumnMetadata(new int[] {0, 0, 0});
113+
114+
// null_bits
115+
eventData.setColumnNullability(new BitSet());
116+
117+
// optional metadata fields
118+
TableMapEventMetadata metadata = new TableMapEventMetadata();
119+
metadata.setSignedness(new BitSet());
120+
TableMapEventMetadata.DefaultCharset charset = new TableMapEventMetadata.DefaultCharset();
121+
charset.setDefaultCharsetCollation(45);
122+
metadata.setDefaultCharset(charset);
123+
eventData.setEventMetadata(metadata);
124+
return eventData;
125+
}
126+
}

0 commit comments

Comments
 (0)