Skip to content

Commit 7482c6f

Browse files
authored
[fix] (inverted index ) Fix the incorrect index size during compaction (#37232)
Index compaction didn't update the total size and index size.
1 parent ba7f5d8 commit 7482c6f

File tree

3 files changed

+229
-5
lines changed

3 files changed

+229
-5
lines changed

be/src/olap/compaction.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -742,21 +742,36 @@ Status Compaction::do_inverted_index_compaction() {
742742
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(e.what());
743743
}
744744
}
745+
746+
uint64_t inverted_index_file_size = 0;
745747
for (auto& inverted_index_file_writer : inverted_index_file_writers) {
746748
if (Status st = inverted_index_file_writer->close(); !st.ok()) {
747749
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(st.msg());
750+
} else {
751+
inverted_index_file_size += inverted_index_file_writer->get_index_file_size();
748752
}
749753
}
750754
// check index compaction status. If status is not ok, we should return error and end this compaction round.
751755
if (!status.ok()) {
752756
return status;
753757
}
754758

759+
// index compaction should update total disk size and index disk size
760+
_output_rowset->rowset_meta()->set_data_disk_size(_output_rowset->data_disk_size() +
761+
inverted_index_file_size);
762+
_output_rowset->rowset_meta()->set_total_disk_size(_output_rowset->data_disk_size() +
763+
inverted_index_file_size);
764+
_output_rowset->rowset_meta()->set_index_disk_size(_output_rowset->index_disk_size() +
765+
inverted_index_file_size);
766+
767+
COUNTER_UPDATE(_output_rowset_data_size_counter, _output_rowset->data_disk_size());
768+
755769
LOG(INFO) << "succeed to do index compaction"
756770
<< ". tablet=" << _tablet->tablet_id() << ", input row number=" << _input_row_num
757771
<< ", output row number=" << _output_rowset->num_rows()
758772
<< ", input_rowset_size=" << _input_rowsets_size
759773
<< ", output_rowset_size=" << _output_rowset->data_disk_size()
774+
<< ", inverted index file size=" << inverted_index_file_size
760775
<< ". elapsed time=" << inverted_watch.get_elapse_second() << "s.";
761776

762777
return Status::OK();

be/src/olap/rowset/vertical_beta_rowset_writer.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,7 @@ Status VerticalBetaRowsetWriter<T>::_flush_columns(segment_v2::SegmentWriter* se
142142
this->_segment_num_rows.resize(_cur_writer_idx + 1);
143143
this->_segment_num_rows[_cur_writer_idx] = _segment_writers[_cur_writer_idx]->row_count();
144144
}
145-
this->_total_index_size +=
146-
static_cast<int64_t>(index_size) + segment_writer->get_inverted_index_file_size();
145+
this->_total_index_size += static_cast<int64_t>(index_size);
147146
return Status::OK();
148147
}
149148

@@ -217,6 +216,7 @@ Status VerticalBetaRowsetWriter<T>::final_flush() {
217216
return st;
218217
}
219218
this->_total_data_size += segment_size + segment_writer->get_inverted_index_file_size();
219+
this->_total_index_size += segment_writer->get_inverted_index_file_size();
220220
segment_writer.reset();
221221
}
222222
return Status::OK();

regression-test/suites/inverted_index_p0/test_show_data.groovy

Lines changed: 212 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ suite("test_show_data", "p0") {
6464
}
6565

6666
def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false,
67-
expected_succ_rows = -1, load_to_single_tablet = 'true' ->
67+
expected_succ_rows = -1 ->
6868

6969
// load the json data
7070
streamLoad {
@@ -261,7 +261,7 @@ suite("test_show_data_for_bkd", "p0") {
261261
}
262262

263263
def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false,
264-
expected_succ_rows = -1, load_to_single_tablet = 'true' ->
264+
expected_succ_rows = -1 ->
265265

266266
// load the json data
267267
streamLoad {
@@ -459,7 +459,7 @@ suite("test_show_data_multi_add", "p0") {
459459
}
460460

461461
def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false,
462-
expected_succ_rows = -1, load_to_single_tablet = 'true' ->
462+
expected_succ_rows = -1 ->
463463

464464
// load the json data
465465
streamLoad {
@@ -611,3 +611,212 @@ suite("test_show_data_multi_add", "p0") {
611611
//try_sql("DROP TABLE IF EXISTS ${testTable}")
612612
}
613613
}
614+
615+
suite("test_show_data_with_compaction", "p0, nonConcurrent") {
616+
// define a sql table
617+
def tableWithIndexCompaction = "test_with_index_compaction"
618+
def tableWithOutIndexCompaction = "test_without_index_compaction"
619+
def delta_time = 5000
620+
def timeout = 60000
621+
def alter_res = "null"
622+
def useTime = 0
623+
String database = context.config.getDbNameByFile(context.file)
624+
boolean invertedIndexCompactionEnable = true
625+
626+
def backendId_to_backendIP = [:]
627+
def backendId_to_backendHttpPort = [:]
628+
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
629+
630+
backend_id = backendId_to_backendIP.keySet()[0]
631+
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
632+
633+
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
634+
assertEquals(code, 0)
635+
def configList = parseJson(out.trim())
636+
assert configList instanceof List
637+
638+
for (Object ele in (List) configList) {
639+
assert ele instanceof List<String>
640+
if (((List<String>) ele)[0] == "inverted_index_compaction_enable") {
641+
invertedIndexCompactionEnable = Boolean.parseBoolean(((List<String>) ele)[2])
642+
logger.info("inverted_index_compaction_enable: ${((List<String>) ele)[2]}")
643+
}
644+
}
645+
646+
def set_be_config = { key, value ->
647+
for (String backend_id: backendId_to_backendIP.keySet()) {
648+
(code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value)
649+
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
650+
}
651+
}
652+
653+
def create_table_with_index = {testTablex ->
654+
// multi-line sql
655+
def result = sql """
656+
CREATE TABLE IF NOT EXISTS ${testTablex} (
657+
`@timestamp` int(11) NULL,
658+
`clientip` varchar(20) NULL,
659+
`request` text NULL,
660+
`status` int(11) NULL,
661+
`size` int(11) NULL,
662+
INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser"="english") COMMENT ''
663+
) ENGINE=OLAP
664+
DUPLICATE KEY(`@timestamp`)
665+
DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1
666+
PROPERTIES (
667+
"replication_allocation" = "tag.location.default: 1",
668+
"disable_auto_compaction" = "true"
669+
);
670+
"""
671+
}
672+
673+
def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false,
674+
expected_succ_rows = -1 ->
675+
676+
// load the json data
677+
streamLoad {
678+
table "${table_name}"
679+
680+
// set http request header params
681+
set 'label', label + "_" + UUID.randomUUID().toString()
682+
set 'read_json_by_line', read_flag
683+
set 'format', format_flag
684+
file file_name // import json file
685+
time 10000 // limit inflight 10s
686+
if (expected_succ_rows >= 0) {
687+
set 'max_filter_ratio', '1'
688+
}
689+
690+
// if declared a check callback, the default check condition will ignore.
691+
// So you must check all condition
692+
check { result, exception, startTime, endTime ->
693+
if (ignore_failure && expected_succ_rows < 0) { return }
694+
if (exception != null) {
695+
throw exception
696+
}
697+
log.info("Stream load result: ${result}".toString())
698+
def json = parseJson(result)
699+
assertEquals("success", json.Status.toLowerCase())
700+
if (expected_succ_rows >= 0) {
701+
assertEquals(json.NumberLoadedRows, expected_succ_rows)
702+
} else {
703+
assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows)
704+
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
705+
}
706+
}
707+
}
708+
}
709+
710+
def wait_for_show_data_finish = { table_name, OpTimeout, origin_size, maxRetries = 5 ->
711+
def size = origin_size
712+
def retries = 0
713+
def last_size = origin_size
714+
715+
while (retries < maxRetries) {
716+
for (int t = 0; t < OpTimeout; t += delta_time) {
717+
def result = sql """show data from ${database}.${table_name};"""
718+
if (result.size() > 0) {
719+
logger.info(table_name + " show data, detail: " + result[0].toString())
720+
size = result[0][2].replace(" KB", "").toDouble()
721+
}
722+
useTime += delta_time
723+
Thread.sleep(delta_time)
724+
725+
// If size changes, break the for loop to check in the next while iteration
726+
if (size != origin_size && size != last_size) {
727+
break
728+
}
729+
}
730+
731+
if (size != last_size) {
732+
last_size = size
733+
} else {
734+
// If size didn't change during the last OpTimeout period, return size
735+
if (size != origin_size) {
736+
return size
737+
}
738+
}
739+
740+
retries++
741+
}
742+
return "wait_timeout"
743+
}
744+
745+
746+
try {
747+
748+
def run_compaction_and_wait = { tableName ->
749+
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus
750+
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
751+
752+
// trigger compactions for all tablets in ${tableName}
753+
for (def tablet in tablets) {
754+
String tablet_id = tablet.TabletId
755+
backend_id = tablet.BackendId
756+
(code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
757+
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
758+
assertEquals(code, 0)
759+
def compactJson = parseJson(out.trim())
760+
if (compactJson.status.toLowerCase() == "fail") {
761+
logger.info("Compaction was done automatically!")
762+
} else {
763+
assertEquals("success", compactJson.status.toLowerCase())
764+
}
765+
}
766+
767+
// wait for all compactions done
768+
for (def tablet in tablets) {
769+
boolean running = true
770+
do {
771+
Thread.sleep(1000)
772+
String tablet_id = tablet.TabletId
773+
backend_id = tablet.BackendId
774+
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
775+
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
776+
assertEquals(code, 0)
777+
def compactionStatus = parseJson(out.trim())
778+
assertEquals("success", compactionStatus.status.toLowerCase())
779+
running = compactionStatus.run_status
780+
} while (running)
781+
}
782+
}
783+
784+
set_be_config.call("inverted_index_compaction_enable", "false")
785+
sql "DROP TABLE IF EXISTS ${tableWithIndexCompaction}"
786+
create_table_with_index.call(tableWithIndexCompaction)
787+
788+
load_httplogs_data.call(tableWithIndexCompaction, '1', 'true', 'json', 'documents-1000.json')
789+
load_httplogs_data.call(tableWithIndexCompaction, '2', 'true', 'json', 'documents-1000.json')
790+
load_httplogs_data.call(tableWithIndexCompaction, '3', 'true', 'json', 'documents-1000.json')
791+
load_httplogs_data.call(tableWithIndexCompaction, '4', 'true', 'json', 'documents-1000.json')
792+
load_httplogs_data.call(tableWithIndexCompaction, '5', 'true', 'json', 'documents-1000.json')
793+
794+
sql "sync"
795+
796+
run_compaction_and_wait(tableWithIndexCompaction)
797+
def with_index_size = wait_for_show_data_finish(tableWithIndexCompaction, 60000, 0)
798+
assertTrue(with_index_size != "wait_timeout")
799+
800+
set_be_config.call("inverted_index_compaction_enable", "true")
801+
802+
sql "DROP TABLE IF EXISTS ${tableWithOutIndexCompaction}"
803+
create_table_with_index.call(tableWithOutIndexCompaction)
804+
load_httplogs_data.call(tableWithOutIndexCompaction, '6', 'true', 'json', 'documents-1000.json')
805+
load_httplogs_data.call(tableWithOutIndexCompaction, '7', 'true', 'json', 'documents-1000.json')
806+
load_httplogs_data.call(tableWithOutIndexCompaction, '8', 'true', 'json', 'documents-1000.json')
807+
load_httplogs_data.call(tableWithOutIndexCompaction, '9', 'true', 'json', 'documents-1000.json')
808+
load_httplogs_data.call(tableWithOutIndexCompaction, '10', 'true', 'json', 'documents-1000.json')
809+
810+
run_compaction_and_wait(tableWithOutIndexCompaction)
811+
def another_with_index_size = wait_for_show_data_finish(tableWithOutIndexCompaction, 60000, 0)
812+
assertTrue(another_with_index_size != "wait_timeout")
813+
814+
if (!isCloudMode()) {
815+
assertEquals(another_with_index_size, with_index_size)
816+
}
817+
} finally {
818+
// sql "DROP TABLE IF EXISTS ${tableWithIndexCompaction}"
819+
// sql "DROP TABLE IF EXISTS ${tableWithOutIndexCompaction}"
820+
set_be_config.call("inverted_index_compaction_enable", invertedIndexCompactionEnable.toString())
821+
}
822+
}

0 commit comments

Comments
 (0)