|
1 | 1 | # Changelog JSON Format
|
2 | 2 |
|
3 |
| -Flink supports to emit changelogs in JSON format and interpret the output back again. |
| 3 | +**WARNING:** The CDC format `changelog-json` is deprecated since Flink CDC version 2.2. |
| 4 | +The CDC format `changelog-json` was introduced at the point that Flink didn't offer any CDC format. Currently, Flink offers several well-maintained CDC formats i.e.[Debezium CDC, MAXWELL CDC, CANAL CDC](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/overview/), we recommend user to use above CDC formats. |
4 | 5 |
|
5 |
| -Dependencies |
6 |
| ------------- |
7 |
| - |
8 |
| -In order to setup the Changelog JSON format, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. |
9 |
| - |
10 |
| -### Maven dependency |
11 |
| - |
12 |
| -``` |
13 |
| -<dependency> |
14 |
| - <groupId>com.ververica</groupId> |
15 |
| - <artifactId>flink-format-changelog-json</artifactId> |
16 |
| - <!-- the dependency is available only for stable releases. --> |
17 |
| - <version>2.2-SNAPSHOT</version> |
18 |
| -</dependency> |
19 |
| -``` |
20 |
| - |
21 |
| -### SQL Client JAR |
22 |
| - |
23 |
| -```Download link is available only for stable releases.``` |
24 |
| - |
25 |
| -Download [flink-format-changelog-json-2.2-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-format-changelog-json/2.2-SNAPSHOT/flink-format-changelog-json-2.2-SNAPSHOT.jar) and put it under `<FLINK_HOME>/lib/`. |
26 |
| - |
27 |
| - |
28 |
| -How to use Changelog JSON format |
29 |
| ----------------- |
30 |
| - |
31 |
| -```sql |
32 |
| --- assuming we have a user_behavior logs |
33 |
| -CREATE TABLE user_behavior ( |
34 |
| - user_id BIGINT, |
35 |
| - item_id BIGINT, |
36 |
| - category_id BIGINT, |
37 |
| - behavior STRING, |
38 |
| - ts TIMESTAMP(3) |
39 |
| -) WITH ( |
40 |
| - 'connector' = 'kafka', -- using kafka connector |
41 |
| - 'topic' = 'user_behavior', -- kafka topic |
42 |
| - 'scan.startup.mode' = 'earliest-offset', -- reading from the beginning |
43 |
| - 'properties.bootstrap.servers' = 'localhost:9092', -- kafka broker address |
44 |
| - 'format' = 'json' -- the data format is json |
45 |
| -); |
46 |
| - |
47 |
| --- we want to store the the UV aggregation result in kafka using changelog-json format |
48 |
| -create table day_uv ( |
49 |
| - day_str STRING, |
50 |
| - uv BIGINT |
51 |
| -) WITH ( |
52 |
| - 'connector' = 'kafka', |
53 |
| - 'topic' = 'day_uv', |
54 |
| - 'scan.startup.mode' = 'earliest-offset', -- reading from the beginning |
55 |
| - 'properties.bootstrap.servers' = 'localhost:9092', -- kafka broker address |
56 |
| - 'format' = 'changelog-json' -- the data format is changelog-json |
57 |
| -); |
58 |
| - |
59 |
| --- write the UV results into kafka using changelog-json format |
60 |
| -INSERT INTO day_uv |
61 |
| -SELECT DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str, count(distinct user_id) as uv |
62 |
| -FROM user_behavior |
63 |
| -GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd'); |
64 |
| - |
65 |
| --- reading the changelog back again |
66 |
| -SELECT * FROM day_uv; |
67 |
| -``` |
68 |
| - |
69 |
| -Format Options |
70 |
| ----------------- |
71 |
| - |
72 |
| -<div class="highlight"> |
73 |
| -<table class="colwidths-auto docutils"> |
74 |
| - <thead> |
75 |
| - <tr> |
76 |
| - <th class="text-left" style="width: 25%">Option</th> |
77 |
| - <th class="text-center" style="width: 8%">Required</th> |
78 |
| - <th class="text-center" style="width: 7%">Default</th> |
79 |
| - <th class="text-center" style="width: 10%">Type</th> |
80 |
| - <th class="text-center" style="width: 50%">Description</th> |
81 |
| - </tr> |
82 |
| - </thead> |
83 |
| - <tbody> |
84 |
| - <tr> |
85 |
| - <td>format</td> |
86 |
| - <td>required</td> |
87 |
| - <td style="word-wrap: break-word;">(none)</td> |
88 |
| - <td>String</td> |
89 |
| - <td>Specify what format to use, here should be 'changelog-json'.</td> |
90 |
| - </tr> |
91 |
| - <tr> |
92 |
| - <td>changelog-json.ignore-parse-errors</td> |
93 |
| - <td>optional</td> |
94 |
| - <td style="word-wrap: break-word;">false</td> |
95 |
| - <td>Boolean</td> |
96 |
| - <td>Skip fields and rows with parse errors instead of failing. |
97 |
| - Fields are set to null in case of errors.</td> |
98 |
| - </tr> |
99 |
| - <tr> |
100 |
| - <td>changelog-json.timestamp-format.standard</td> |
101 |
| - <td>optional</td> |
102 |
| - <td style="word-wrap: break-word;">'SQL'</td> |
103 |
| - <td>String</td> |
104 |
| - <td>Specify the input and output timestamp format. Currently supported values are 'SQL' and 'ISO-8601': |
105 |
| - <ul> |
106 |
| - <li>Option 'SQL' will parse input timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and output timestamp in the same format.</li> |
107 |
| - <li>Option 'ISO-8601'will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and output timestamp in the same format.</li> |
108 |
| - </ul> |
109 |
| - </td> |
110 |
| - </tr> |
111 |
| - </tbody> |
112 |
| -</table> |
113 |
| -</div> |
114 |
| - |
115 |
| -Data Type Mapping |
116 |
| ----------------- |
117 |
| - |
118 |
| -Currently, the Canal format uses JSON format for deserialization. Please refer to [JSON format documentation](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/json.html#data-type-mapping) for more details about the data type mapping. |
| 6 | +### Compatibility Note |
119 | 7 |
|
| 8 | +User can still obtain and use the deprecated `changelog-json` format from older Flink CDC version e.g. [flink-format-changelog-json-2.1.1.jar](https://repo1.maven.org/maven2/com/ververica/flink-format-changelog-json/2.1.1/flink-format-changelog-json-2.1.1-SNAPSHOT.jar). |
0 commit comments