Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions docs/content/about.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@ The Flink CDC Connectors integrates Debezium as the engine to capture data chang
| Oracle | Database: 11, 12, 19 <br/>Oracle Driver: 19.3.0.0|
| Sqlserver | Database: 2017, 2019 <br/>JDBC Driver: 7.2.2.jre8|

## Supported Formats

| Format | Supported Connector | Flink Version |
| --- | --- | --- |
| <a href="https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format">Changelog Json</a> | <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/connectors/kafka.html">Apache Kafka</a> | 1.11+ |

## Supported Flink Versions
The following table shows the version mapping between Flink CDC Connectors and Flink:

Expand Down
119 changes: 4 additions & 115 deletions docs/content/formats/changelog-json.md
Original file line number Diff line number Diff line change
@@ -1,119 +1,8 @@
# Changelog JSON Format

Flink supports to emit changelogs in JSON format and interpret the output back again.
**WARNING:** The CDC format `changelog-json` is deprecated since Flink CDC version 2.2.
The CDC format `changelog-json` was introduced at the point that Flink didn't offer any CDC format. Currently, Flink offers several well-maintained CDC formats i.e.[Debezium CDC, MAXWELL CDC, CANAL CDC](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/overview/), we recommend user to use above CDC formats.

Dependencies
------------

In order to setup the Changelog JSON format, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

### Maven dependency

```
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-format-changelog-json</artifactId>
<!-- the dependency is available only for stable releases. -->
<version>2.2-SNAPSHOT</version>
</dependency>
```

### SQL Client JAR

```Download link is available only for stable releases.```

Download [flink-format-changelog-json-2.2-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-format-changelog-json/2.2-SNAPSHOT/flink-format-changelog-json-2.2-SNAPSHOT.jar) and put it under `<FLINK_HOME>/lib/`.


How to use Changelog JSON format
----------------

```sql
-- assuming we have a user_behavior logs
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka', -- using kafka connector
'topic' = 'user_behavior', -- kafka topic
'scan.startup.mode' = 'earliest-offset', -- reading from the beginning
'properties.bootstrap.servers' = 'localhost:9092', -- kafka broker address
'format' = 'json' -- the data format is json
);

-- we want to store the the UV aggregation result in kafka using changelog-json format
create table day_uv (
day_str STRING,
uv BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'day_uv',
'scan.startup.mode' = 'earliest-offset', -- reading from the beginning
'properties.bootstrap.servers' = 'localhost:9092', -- kafka broker address
'format' = 'changelog-json' -- the data format is changelog-json
);

-- write the UV results into kafka using changelog-json format
INSERT INTO day_uv
SELECT DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str, count(distinct user_id) as uv
FROM user_behavior
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd');

-- reading the changelog back again
SELECT * FROM day_uv;
```

Format Options
----------------

<div class="highlight">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width: 25%">Option</th>
<th class="text-center" style="width: 8%">Required</th>
<th class="text-center" style="width: 7%">Default</th>
<th class="text-center" style="width: 10%">Type</th>
<th class="text-center" style="width: 50%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>format</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify what format to use, here should be 'changelog-json'.</td>
</tr>
<tr>
<td>changelog-json.ignore-parse-errors</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Skip fields and rows with parse errors instead of failing.
Fields are set to null in case of errors.</td>
</tr>
<tr>
<td>changelog-json.timestamp-format.standard</td>
<td>optional</td>
<td style="word-wrap: break-word;">'SQL'</td>
<td>String</td>
<td>Specify the input and output timestamp format. Currently supported values are 'SQL' and 'ISO-8601':
<ul>
<li>Option 'SQL' will parse input timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and output timestamp in the same format.</li>
<li>Option 'ISO-8601'will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and output timestamp in the same format.</li>
</ul>
</td>
</tr>
</tbody>
</table>
</div>

Data Type Mapping
----------------

Currently, the Canal format uses JSON format for deserialization. Please refer to [JSON format documentation](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/json.html#data-type-mapping) for more details about the data type mapping.
### Compatibility Note

User can still obtain and use the deprecated `changelog-json` format from older Flink CDC version e.g. [flink-format-changelog-json-2.1.1.jar](https://repo1.maven.org/maven2/com/ververica/flink-format-changelog-json/2.1.1/flink-format-changelog-json-2.1.1-SNAPSHOT.jar).
76 changes: 0 additions & 76 deletions flink-format-changelog-json/pom.xml

This file was deleted.

This file was deleted.

Loading