Skip to content

Commit c077d8b

Browse files
Fix cte concurrent unit test (#10377)
close #10373 We introduce a new variable to record how many threads have exited so that we can ensure that all blocks have been pushed into queue before `notifyEOF` is called.
1 parent 0dca5f8 commit c077d8b

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

dbms/src/Operators/tests/gtest_cte.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,9 @@ void concurrentTest()
226226
cte->sourceExit();
227227
};
228228

229+
std::vector<size_t> exit_num_for_each_partition;
230+
exit_num_for_each_partition.resize(EXPECTED_SINK_NUM, 0);
231+
229232
auto sink_func = [&](size_t sink_idx, size_t partition_idx) {
230233
std::uniform_int_distribution<size_t> di1(1, 10);
231234
std::uniform_int_distribution<size_t> di2(10, 50);
@@ -243,7 +246,8 @@ void concurrentTest()
243246
next_sink_idx = next_sink_idxs[sink_idx]++;
244247
if unlikely (next_sink_idx >= sink_blocks[sink_idx].size())
245248
{
246-
if (partition_idx == 0)
249+
++exit_num_for_each_partition[sink_idx];
250+
if (exit_num_for_each_partition[sink_idx] == PARTITION_NUM)
247251
cte->sinkExit<true>();
248252
break;
249253
}
@@ -312,7 +316,7 @@ void concurrentTest()
312316
TEST_F(TestCTE, Concurrent)
313317
try
314318
{
315-
for (size_t i = 0; i < 5; i++)
319+
for (size_t i = 0; i < 10; i++)
316320
concurrentTest();
317321
}
318322
CATCH

0 commit comments

Comments
 (0)