1
+ // Licensed to the Apache Software Foundation (ASF) under one
2
+ // or more contributor license agreements. See the NOTICE file
3
+ // distributed with this work for additional information
4
+ // regarding copyright ownership. The ASF licenses this file
5
+ // to you under the Apache License, Version 2.0 (the
6
+ // "License"); you may not use this file except in compliance
7
+ // with the License. You may obtain a copy of the License at
8
+ //
9
+ // http://www.apache.org/licenses/LICENSE-2.0
10
+ //
11
+ // Unless required by applicable law or agreed to in writing,
12
+ // software distributed under the License is distributed on an
13
+ // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
+ // KIND, either express or implied. See the License for the
15
+ // specific language governing permissions and limitations
16
+ // under the License.
17
+
18
+ import java.util.concurrent.TimeUnit
19
+ import org.awaitility.Awaitility
20
+
21
+ suite(" test_variant_bloom_filter" , " nonConcurrent" ) {
22
+
23
+ def index_table = " test_variant_bloom_filter"
24
+
25
+ def load_json_data = {table_name , file_name ->
26
+ // load the json data
27
+ streamLoad {
28
+ table " ${ table_name} "
29
+
30
+ // set http request header params
31
+ set ' read_json_by_line' , ' true'
32
+ set ' format' , ' json'
33
+ set ' max_filter_ratio' , ' 0.1'
34
+ set ' memtable_on_sink_node' , ' true'
35
+ file file_name // import json file
36
+ time 10000 // limit inflight 10s
37
+
38
+ // if declared a check callback, the default check condition will ignore.
39
+ // So you must check all condition
40
+
41
+ check { result , exception , startTime , endTime ->
42
+ if (exception != null ) {
43
+ throw exception
44
+ }
45
+ logger. info(" Stream load ${ file_name} result: ${ result} " . toString())
46
+ def json = parseJson(result)
47
+ assertEquals (" success" , json.Status . toLowerCase())
48
+ // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows)
49
+ assertTrue (json.NumberLoadedRows > 0 && json.LoadBytes > 0 )
50
+ }
51
+ }
52
+ }
53
+
54
+ sql """ DROP TABLE IF EXISTS ${ index_table} """
55
+ sql """
56
+ CREATE TABLE IF NOT EXISTS ${ index_table} (
57
+ k bigint,
58
+ v variant
59
+ )
60
+ DUPLICATE KEY(`k`)
61
+ DISTRIBUTED BY HASH(k) BUCKETS 1
62
+ properties("replication_num" = "1", "disable_auto_compaction" = "false", "bloom_filter_columns" = "v");
63
+ """
64
+ load_json_data. call(index_table, """ ${ getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'} """ )
65
+ load_json_data. call(index_table, """ ${ getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'} """ )
66
+ load_json_data. call(index_table, """ ${ getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'} """ )
67
+ load_json_data. call(index_table, """ ${ getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'} """ )
68
+ load_json_data. call(index_table, """ ${ getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'} """ )
69
+
70
+ def backendId_to_backendIP = [:]
71
+ def backendId_to_backendHttpPort = [:]
72
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
73
+ def tablets = sql_return_maparray """ show tablets from ${ index_table} ; """
74
+
75
+ int beforeSegmentCount = 0
76
+ for (def tablet in tablets) {
77
+ String tablet_id = tablet.TabletId
78
+ (code, out, err) = curl(" GET" , tablet.CompactionStatus )
79
+ logger. info(" Show tablets status: code=" + code + " , out=" + out + " , err=" + err)
80
+ assertEquals (code, 0 )
81
+ def tabletJson = parseJson(out. trim())
82
+ assert tabletJson. rowsets instanceof List
83
+ for (String rowset in (List<String > ) tabletJson. rowsets) {
84
+ beforeSegmentCount + = Integer . parseInt(rowset. split(" " )[1 ])
85
+ }
86
+ }
87
+ assertEquals (beforeSegmentCount, 5 )
88
+
89
+ // trigger compactions for all tablets in ${tableName}
90
+ for (def tablet in tablets) {
91
+ String tablet_id = tablet.TabletId
92
+ backend_id = tablet.BackendId
93
+ (code, out, err) = be_run_full_compaction(backendId_to_backendIP. get(backend_id), backendId_to_backendHttpPort. get(backend_id), tablet_id)
94
+ logger. info(" Run compaction: code=" + code + " , out=" + out + " , err=" + err)
95
+ assertEquals (code, 0 )
96
+ def compactJson = parseJson(out. trim())
97
+ assertEquals (" success" , compactJson. status. toLowerCase())
98
+ }
99
+
100
+ // wait for all compactions done
101
+ for (def tablet in tablets) {
102
+ Awaitility . await(). atMost(3 , TimeUnit . MINUTES ). untilAsserted(() -> {
103
+ String tablet_id = tablet.TabletId
104
+ backend_id = tablet.BackendId
105
+ (code, out, err) = be_get_compaction_status(backendId_to_backendIP. get(backend_id), backendId_to_backendHttpPort. get(backend_id), tablet_id)
106
+ logger. info(" Get compaction status: code=" + code + " , out=" + out + " , err=" + err)
107
+ assertEquals (code, 0 )
108
+ def compactionStatus = parseJson(out. trim())
109
+ assertEquals (" compaction task for this tablet is not running" , compactionStatus. msg. toLowerCase())
110
+ return compactionStatus. run_status;
111
+ });
112
+ }
113
+
114
+ int afterSegmentCount = 0
115
+ for (def tablet in tablets) {
116
+ String tablet_id = tablet.TabletId
117
+ (code, out, err) = curl(" GET" , tablet.CompactionStatus )
118
+ logger. info(" Show tablets status: code=" + code + " , out=" + out + " , err=" + err)
119
+ assertEquals (code, 0 )
120
+ def tabletJson = parseJson(out. trim())
121
+ assert tabletJson. rowsets instanceof List
122
+ for (String rowset in (List<String > ) tabletJson. rowsets) {
123
+ logger. info(" rowset is: " + rowset)
124
+ afterSegmentCount + = Integer . parseInt(rowset. split(" " )[1 ])
125
+ }
126
+ }
127
+ assertEquals (afterSegmentCount, 1 )
128
+
129
+ try {
130
+ GetDebugPoint (). enableDebugPointForAllBEs(" bloom_filter_must_filter_data" )
131
+
132
+ // number
133
+ qt_sql1 """ select cast(v['repo']['id'] as int) from ${ index_table} where cast(v['repo']['id'] as int) = 20291263; """
134
+
135
+ // string
136
+ qt_sql2 """ select cast(v['repo']['name'] as text) from ${ index_table} where cast(v['repo']['name'] as text) = "ridget/dotfiles"; """
137
+ } finally {
138
+ GetDebugPoint (). disableDebugPointForAllBEs(" bloom_filter_must_filter_data" )
139
+ }
140
+ }
0 commit comments