1
- # Streaming ETL from MySQL and Postgres to Elasticsearch
1
+ # Streaming ETL for MySQL and Postgres with Flink CDC
2
2
3
- 1 . Create ` docker-compose.yml ` file using following contents:
3
+ This tutorial is to show how to quickly build streaming ETL for MySQL and Postgres with Flink CDC.
4
4
5
+ Assuming we are running an e-commerce business. The product and order data stored in MySQL, the shipment data related to the order is stored in Postgres.
6
+ We want to enrich the orders using the product and shipment table, and then load the enriched orders to ElasticSearch in real time.
7
+
8
+ In the following sections, we will describe how to use Flink Mysql/Postgres CDC to implement it.
9
+ All exercises in this tutorial are performed in the Flink SQL CLI, and the entire process uses standard SQL syntax, without a single line of Java/Scala code or IDE installation.
10
+
11
+ The overview of the architecture is as follows:
12
+ ![ Flink CDC Streaming ETL] ( /_static/fig/mysql-postgress-tutorial/flink-cdc-streaming-etl.png " Flink CDC Streaming ETL ")
13
+
14
+ ## Preparation
15
+ Prepare a Linux or MacOS computer with Docker installed.
16
+
17
+ ### Starting components required
18
+ The components required in this demo are all managed in containers, so we will use ` docker-compose ` to start them.
19
+
20
+ Create ` docker-compose.yml ` file using following contents:
5
21
```
6
22
version: '2.1'
7
23
services:
@@ -43,112 +59,139 @@ services:
43
59
image: elastic/kibana:7.6.0
44
60
ports:
45
61
- "5601:5601"
46
- zookeeper:
47
- image: wurstmeister/zookeeper:3.4.6
48
- ports:
49
- - "2181:2181"
50
- kafka:
51
- image: wurstmeister/kafka:2.12-2.2.1
52
- ports:
53
- - "9092:9092"
54
- - "9094:9094"
55
- depends_on:
56
- - zookeeper
57
- environment:
58
- - KAFKA_ADVERTISED_LISTENERS=INSIDE://:9094,OUTSIDE://localhost:9092
59
- - KAFKA_LISTENERS=INSIDE://:9094,OUTSIDE://:9092
60
- - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
61
- - KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
62
- - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
63
- - KAFKA_CREATE_TOPICS="user_behavior:1:1"
64
- volumes:
65
- - /var/run/docker.sock:/var/run/docker.sock
66
62
```
67
-
68
- 2 . Enter mysql's container and initialize data:
69
-
63
+ The Docker Compose environment consists of the following containers:
64
+ - MySQL: the ` products ` ,` orders ` tables will be store in the database. They will be joined with data in Postgres to enrich the orders.
65
+ - Postgres: the ` shipments ` table will be store in the database.
66
+ - Elasticsearch: mainly used as a data sink to store enriched orders.
67
+ - Kibana: used to visualize the data in Elasticsearch.
68
+
69
+ To start all containers, run the following command in the directory that contains the ` docker-compose.yml ` file.
70
+ ``` shell
71
+ docker-compose up -d
70
72
```
71
- docker-compose exec mysql mysql -uroot -p123456
72
- ```
73
-
74
- ``` sql
75
- -- MySQL
76
- CREATE DATABASE mydb ;
77
- USE mydb;
78
- CREATE TABLE products (
79
- id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY ,
80
- name VARCHAR (255 ) NOT NULL ,
81
- description VARCHAR (512 )
82
- );
83
- ALTER TABLE products AUTO_INCREMENT = 101 ;
84
-
85
- INSERT INTO products
86
- VALUES (default," scooter" ," Small 2-wheel scooter" ),
87
- (default," car battery" ," 12V car battery" ),
88
- (default," 12-pack drill bits" ," 12-pack of drill bits with sizes ranging from #40 to #3" ),
89
- (default," hammer" ," 12oz carpenter's hammer" ),
90
- (default," hammer" ," 14oz carpenter's hammer" ),
91
- (default," hammer" ," 16oz carpenter's hammer" ),
92
- (default," rocks" ," box of assorted rocks" ),
93
- (default," jacket" ," water resistent black wind breaker" ),
94
- (default," spare tire" ," 24 inch spare tire" );
95
-
96
- CREATE TABLE orders (
97
- order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY ,
98
- order_date DATETIME NOT NULL ,
99
- customer_name VARCHAR (255 ) NOT NULL ,
100
- price DECIMAL (10 , 5 ) NOT NULL ,
101
- product_id INTEGER NOT NULL ,
102
- order_status BOOLEAN NOT NULL -- Whether order has been placed
103
- ) AUTO_INCREMENT = 10001 ;
104
-
105
- INSERT INTO orders
106
- VALUES (default, ' 2020-07-30 10:08:22' , ' Jark' , 50 .50 , 102 , false),
107
- (default, ' 2020-07-30 10:11:09' , ' Sally' , 15 .00 , 105 , false),
108
- (default, ' 2020-07-30 12:00:30' , ' Edward' , 25 .25 , 106 , false);
73
+ This command automatically starts all the containers defined in the Docker Compose configuration in a detached mode. Run docker ps to check whether these containers are running properly.
74
+ We can also visit [ http://localhost:5601/ ] ( http://localhost:5601/ ) to see if Kibana is running normally.
109
75
76
+ Don’t forget to run the following command to stop all containers after finishing the tutorial:
77
+ ``` shell
78
+ docker-compose down
110
79
```
111
80
112
- 3 . Enter Postgres's container and initialize data:
113
-
114
- ```
115
- docker-compose exec postgres psql -h localhost -U postgres
116
- ```
117
-
81
+ ### Preparing Flink and JAR package required
82
+ 1 . Download [ Flink 1.13.2] ( https://downloads.apache.org/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.11.tgz ) and unzip it to the directory ` flink-1.13.2 `
83
+ 2 . Download following JAR package required and put them under ` flink-1.13.2/lib/ ` :
84
+
85
+ ** Download links are available only for stable releases.**
86
+ - [ flink-sql-connector-elasticsearch7_2.11-1.13.2.jar] ( https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.2/flink-sql-connector-elasticsearch7_2.11-1.13.2.jar )
87
+ - [ flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar] ( https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1-SNAPSHOT/flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar )
88
+ - [ flink-sql-connector-postgres-cdc-2.1-SNAPSHOT.jar] ( https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.1-SNAPSHOT/flink-sql-connector-postgres-cdc-2.1-SNAPSHOT.jar )
89
+
90
+ ### Preparing data in databases
91
+ #### Preparing data in MySQL
92
+ 1 . Enter mysql's container:
93
+ ``` shell
94
+ docker-compose exec mysql mysql -uroot -p123456
95
+ ```
96
+ 2. Create tables and populate data:
97
+ ` ` ` sql
98
+ -- MySQL
99
+ CREATE DATABASE mydb;
100
+ USE mydb;
101
+ CREATE TABLE products (
102
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
103
+ name VARCHAR(255) NOT NULL,
104
+ description VARCHAR(512)
105
+ );
106
+ ALTER TABLE products AUTO_INCREMENT = 101;
107
+
108
+ INSERT INTO products
109
+ VALUES (default," scooter" ," Small 2-wheel scooter" ),
110
+ (default," car battery" ," 12V car battery" ),
111
+ (default," 12-pack drill bits" ," 12-pack of drill bits with sizes ranging from #40 to #3" ),
112
+ (default," hammer" ," 12oz carpenter's hammer" ),
113
+ (default," hammer" ," 14oz carpenter's hammer" ),
114
+ (default," hammer" ," 16oz carpenter's hammer" ),
115
+ (default," rocks" ," box of assorted rocks" ),
116
+ (default," jacket" ," water resistent black wind breaker" ),
117
+ (default," spare tire" ," 24 inch spare tire" );
118
+
119
+ CREATE TABLE orders (
120
+ order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
121
+ order_date DATETIME NOT NULL,
122
+ customer_name VARCHAR(255) NOT NULL,
123
+ price DECIMAL(10, 5) NOT NULL,
124
+ product_id INTEGER NOT NULL,
125
+ order_status BOOLEAN NOT NULL -- Whether order has been placed
126
+ ) AUTO_INCREMENT = 10001;
127
+
128
+ INSERT INTO orders
129
+ VALUES (default, ' 2020-07-30 10:08:22' , ' Jark' , 50.50, 102, false),
130
+ (default, ' 2020-07-30 10:11:09' , ' Sally' , 15.00, 105, false),
131
+ (default, ' 2020-07-30 12:00:30' , ' Edward' , 25.25, 106, false);
132
+ ` ` `
133
+ # ### Preparing data in Postgres
134
+ 1. Enter postgres' s container:
135
+ ```shell
136
+ docker-compose exec postgres psql -h localhost -U postgres
137
+ ```
138
+ 2. Create tables and populate data
139
+ ```sql
140
+ -- PG
141
+ CREATE TABLE shipments (
142
+ shipment_id SERIAL NOT NULL PRIMARY KEY,
143
+ order_id SERIAL NOT NULL,
144
+ origin VARCHAR(255) NOT NULL,
145
+ destination VARCHAR(255) NOT NULL,
146
+ is_arrived BOOLEAN NOT NULL
147
+ );
148
+ ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
149
+ ALTER TABLE public.shipments REPLICA IDENTITY FULL;
150
+ INSERT INTO shipments
151
+ VALUES (default,10001,' Beijing' ,' Shanghai' ,false),
152
+ (default,10002,' Hangzhou' ,' Shanghai' ,false),
153
+ (default,10003,' Shanghai' ,' Hangzhou' ,false);
154
+ ```
155
+
156
+ ## Starting Flink cluster and Flink SQL CLI
157
+
158
+ 1. Use the following command to change to the Flink directory:
159
+ ```
160
+ cd flink-1.13.2
161
+ ```
162
+ 2. Change the value of `taskmanager.numberOfTaskSlots` to 2 in `conf/flink-conf.yaml` for we will run two tasks at the same time.
163
+
164
+ 3. Use the following command to start a Flink cluster:
165
+ ```shell
166
+ ./bin/start-cluster.sh
167
+ ```
168
+ Then we can visit [http://localhost:8081/](http://localhost:8081/) to see if Flink is running normally, and the web page looks like:
169
+
170
+ 
171
+
172
+ Don’t forget to run the following command to stop the Flink cluster after finishing the tutorial:
173
+ ```shell
174
+ ./bin/stop-cluster.sh
175
+ ```
176
+
177
+ 4. Use the following command to start a Flink SQL CLI:
178
+ ```shell
179
+ ./bin/sql-client.sh
180
+ ```
181
+ We should see the welcome screen of the CLI client.
182
+
183
+ 
184
+
185
+ ## Creating tables using Flink DDL in Flink SQL CLI
186
+ First, enable checkpoints every 3000 milliseconds
118
187
```sql
119
- -- PG
120
- CREATE TABLE shipments (
121
- shipment_id SERIAL NOT NULL PRIMARY KEY ,
122
- order_id SERIAL NOT NULL ,
123
- origin VARCHAR (255 ) NOT NULL ,
124
- destination VARCHAR (255 ) NOT NULL ,
125
- is_arrived BOOLEAN NOT NULL
126
- );
127
- ALTER SEQUENCE public .shipments_shipment_id_seq RESTART WITH 1001 ;
128
- ALTER TABLE public .shipments REPLICA IDENTITY FULL;
129
-
130
- INSERT INTO shipments
131
- VALUES (default,10001 ,' Beijing' ,' Shanghai' ,false),
132
- (default,10002 ,' Hangzhou' ,' Shanghai' ,false),
133
- (default,10003 ,' Shanghai' ,' Hangzhou' ,false);
188
+ -- Flink SQL
189
+ Flink SQL> SET execution.checkpointing.interval = 3s;
134
190
```
135
191
136
-
137
-
138
- 4 . Download following JAR package to ` <FLINK_HOME>/lib/ ` :
139
-
140
- ``` Download links are available only for stable releases. ```
141
-
142
- - [ flink-sql-connector-elasticsearch7_2.11-1.13.2.jar] ( https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.2/flink-sql-connector-elasticsearch7_2.11-1.13.2.jar )
143
- - [ flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar] ( https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1-SNAPSHOT/flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar )
144
- - [ flink-sql-connector-postgres-cdc-2.1-SNAPSHOT.jar] ( https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.1-SNAPSHOT/flink-sql-connector-postgres-cdc-2.1-SNAPSHOT.jar )
145
-
146
- 5 . Launch a Flink cluster, then start a Flink SQL CLI and execute following SQL statements inside:
147
-
192
+ Then, create tables that capture the change data from the corresponding database tables.
148
193
```sql
149
194
-- Flink SQL
150
- -- checkpoint every 3000 milliseconds
151
- Flink SQL> SET execution .checkpointing .interval = 3s;
152
195
Flink SQL> CREATE TABLE products (
153
196
id INT,
154
197
name STRING,
@@ -199,7 +242,11 @@ Flink SQL> CREATE TABLE shipments (
199
242
' schema-name' = ' public' ,
200
243
' table-name' = ' shipments'
201
244
);
245
+ ```
202
246
247
+ Finally, create `enriched_orders` table that is used to load data to the Elasticsearch.
248
+ ```sql
249
+ -- Flink SQL
203
250
Flink SQL> CREATE TABLE enriched_orders (
204
251
order_id INT,
205
252
order_date TIMESTAMP(0),
@@ -219,78 +266,54 @@ Flink SQL> CREATE TABLE enriched_orders (
219
266
' hosts' = ' http://localhost:9200' ,
220
267
' index' = ' enriched_orders'
221
268
);
269
+ ```
222
270
271
+ ## Enriching orders and load to ElasticSearch
272
+ Use Flink SQL to join the `order` table with the `products` and `shipments` table to enrich orders and write to the Elasticsearch.
273
+ ```sql
274
+ -- Flink SQL
223
275
Flink SQL> INSERT INTO enriched_orders
224
276
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
225
277
FROM orders AS o
226
278
LEFT JOIN products AS p ON o.product_id = p.id
227
279
LEFT JOIN shipments AS s ON o.order_id = s.order_id;
228
280
```
229
-
230
- 6 . Make some changes in MySQL and Postgres, then check the result in Elasticsearch:
231
-
232
- ``` sql
233
- -- MySQL
234
- INSERT INTO orders
235
- VALUES (default, ' 2020-07-30 15:22:00' , ' Jark' , 29 .71 , 104 , false);
236
-
237
- -- PG
238
- INSERT INTO shipments
239
- VALUES (default,10004 ,' Shanghai' ,' Beijing' ,false);
240
-
241
- -- MySQL
242
- UPDATE orders SET order_status = true WHERE order_id = 10004 ;
243
-
244
- -- PG
245
- UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004 ;
246
-
247
- -- MySQL
248
- DELETE FROM orders WHERE order_id = 10004 ;
249
- ```
250
-
251
- 7 . Kafka Changelog JSON format
252
-
253
- Execute following SQL in Flink SQL CLI:
254
-
255
- ``` sql
256
- -- Flink SQL
257
- Flink SQL> CREATE TABLE kafka_gmv (
258
- day_str STRING,
259
- gmv DECIMAL (10 , 5 )
260
- ) WITH (
261
- ' connector' = ' kafka' ,
262
- ' topic' = ' kafka_gmv' ,
263
- ' scan.startup.mode' = ' earliest-offset' ,
264
- ' properties.bootstrap.servers' = ' localhost:9092' ,
265
- ' format' = ' changelog-json'
266
- );
267
-
268
- Flink SQL> INSERT INTO kafka_gmv
269
- SELECT DATE_FORMAT(order_date, ' yyyy-MM-dd' ) as day_str, SUM (price) as gmv
270
- FROM orders
271
- WHERE order_status = true
272
- GROUP BY DATE_FORMAT(order_date, ' yyyy-MM-dd' );
273
-
274
- -- Consumer changelog data from Kafka, and check the result of materialized view:
275
- Flink SQL> SELECT * FROM kafka_gmv;
276
- ```
277
- To consumer records in Kafka using ` kafka-console-consumer ` :
278
-
279
- ```
280
- docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic kafka_gmv --bootstrap-server kafka:9094 --from-beginning'
281
- ```
282
-
283
- Update orders data and check the output in Flink SQL CLI and Kafka console consumer:
284
- ``` sql
285
- -- MySQL
286
- UPDATE orders SET order_status = true WHERE order_id = 10001 ;
287
- UPDATE orders SET order_status = true WHERE order_id = 10002 ;
288
- UPDATE orders SET order_status = true WHERE order_id = 10003 ;
289
-
290
- INSERT INTO orders
291
- VALUES (default, ' 2020-07-30 17:33:00' , ' Timo' , 50 .00 , 104 , true);
292
-
293
- UPDATE orders SET price = 40 .00 WHERE order_id = 10005 ;
294
-
295
- DELETE FROM orders WHERE order_id = 10005 ;
296
- ```
281
+ Now, the enriched orders should be shown in Kibana.
282
+ Visit [http://localhost:5601/app/kibana#/management/kibana/index_pattern](http://localhost:5601/app/kibana#/management/kibana/index_pattern) to create an index pattern `enriched_orders`.
283
+
284
+ 
285
+
286
+ Visit [http://localhost:5601/app/kibana#/discover](http://localhost:5601/app/kibana#/discover) to find the enriched orders.
287
+
288
+ 
289
+
290
+ Next, do some change in the databases, and then the enriched orders shown in Kibana will be updated after each step in real time.
291
+ 1. Insert a new order in MySQL
292
+ ```sql
293
+ --MySQL
294
+ INSERT INTO orders
295
+ VALUES (default, ' 2020-07-30 15:22:00' , ' Jark' , 29.71, 104, false);
296
+ ```
297
+ 2. Insert a shipment in Postgres
298
+ ```sql
299
+ --PG
300
+ INSERT INTO shipments
301
+ VALUES (default,10004,' Shanghai' ,' Beijing' ,false);
302
+ ```
303
+ 3. Update order status in MySQL
304
+ ```sql
305
+ --MySQL
306
+ UPDATE orders SET order_status = true WHERE order_id = 10004;
307
+ ```
308
+ 4. Update the shipment status in Postgres
309
+ ```sql
310
+ --PG
311
+ UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;
312
+ ```
313
+ 5. Delete the shipment in Postgres
314
+ ```sql
315
+ --MySQL
316
+ DELETE FROM orders WHERE order_id = 10004;
317
+ ```
318
+ The changes of enriched orders in Kibana are as follows:
319
+ 
0 commit comments