Skip to content

Commit 6d670a2

Browse files
authored
Support (left outer) (anti) semi join in hash join v2 (#10133)
ref #9060 Support (left outer) (anti) semi join in hash join v2 Signed-off-by: gengliqi <[email protected]>
1 parent 1c5cb13 commit 6d670a2

File tree

12 files changed

+1355
-40
lines changed

12 files changed

+1355
-40
lines changed

dbms/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ check_then_add_sources_compile_flag (
113113
src/Interpreters/JoinV2/HashJoin.cpp
114114
src/Interpreters/JoinV2/HashJoinBuild.cpp
115115
src/Interpreters/JoinV2/HashJoinProbe.cpp
116+
src/Interpreters/JoinV2/SemiJoinProbe.cpp
116117
src/IO/Compression/EncodingUtil.cpp
117118
src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp
118119
src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp

dbms/src/Flash/Planner/Plans/PhysicalJoinV2.cpp

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -210,15 +210,11 @@ bool PhysicalJoinV2::isSupported(const tipb::Join & join)
210210
{
211211
case Inner:
212212
case LeftOuter:
213-
if (!tiflash_join.getBuildJoinKeys().empty())
214-
return true;
215-
break;
216213
case Semi:
217214
case Anti:
218215
case LeftOuterSemi:
219216
case LeftOuterAnti:
220-
if (!tiflash_join.getBuildJoinKeys().empty() && join.other_conditions_size() == 0
221-
&& join.other_eq_conditions_from_in_size() == 0)
217+
if (!tiflash_join.getBuildJoinKeys().empty())
222218
return true;
223219
break;
224220
//case RightOuter:

dbms/src/Interpreters/JoinV2/HashJoin.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,10 @@ void HashJoin::workAfterBuildRowFinish()
498498
fiu_do_on(FailPoints::force_join_v2_probe_enable_lm, { late_materialization = true; });
499499
fiu_do_on(FailPoints::force_join_v2_probe_disable_lm, { late_materialization = false; });
500500

501-
join_probe_helper = std::make_unique<JoinProbeHelper>(this, late_materialization);
501+
if (SemiJoinProbeHelper::isSupported(kind, has_other_condition))
502+
semi_join_probe_helper = std::make_unique<SemiJoinProbeHelper>(this);
503+
else
504+
join_probe_helper = std::make_unique<JoinProbeHelper>(this, late_materialization);
502505

503506
LOG_INFO(
504507
log,
@@ -626,6 +629,7 @@ Block HashJoin::probeBlock(JoinProbeContext & ctx, size_t stream_index)
626629
method,
627630
kind,
628631
has_other_condition,
632+
!non_equal_conditions.other_eq_cond_from_in_name.empty(),
629633
key_names_left,
630634
non_equal_conditions.left_filter_column,
631635
probe_output_name_set,
@@ -636,7 +640,11 @@ Block HashJoin::probeBlock(JoinProbeContext & ctx, size_t stream_index)
636640
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_join_prob_failpoint);
637641

638642
auto & wd = probe_workers_data[stream_index];
639-
Block res = join_probe_helper->probe(ctx, wd);
643+
Block res;
644+
if (semi_join_probe_helper)
645+
res = semi_join_probe_helper->probe(ctx, wd);
646+
else
647+
res = join_probe_helper->probe(ctx, wd);
640648
if (ctx.isAllFinished())
641649
wd.probe_handle_rows += ctx.rows;
642650
return res;

dbms/src/Interpreters/JoinV2/HashJoin.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <Interpreters/JoinV2/HashJoinProbe.h>
2727
#include <Interpreters/JoinV2/HashJoinRowLayout.h>
2828
#include <Interpreters/JoinV2/HashJoinSettings.h>
29+
#include <Interpreters/JoinV2/SemiJoinProbe.h>
2930

3031

3132
namespace DB
@@ -82,6 +83,7 @@ class HashJoin
8283

8384
private:
8485
friend JoinProbeHelper;
86+
friend SemiJoinProbeHelper;
8587

8688
static const DataTypePtr match_helper_type;
8789

@@ -151,6 +153,7 @@ class HashJoin
151153
std::vector<JoinProbeWorkerData> probe_workers_data;
152154
std::atomic<size_t> active_probe_worker = 0;
153155
std::unique_ptr<JoinProbeHelper> join_probe_helper;
156+
std::unique_ptr<SemiJoinProbeHelper> semi_join_probe_helper;
154157

155158
const JoinProfileInfoPtr profile_info = std::make_shared<JoinProfileInfo>();
156159

dbms/src/Interpreters/JoinV2/HashJoinKey.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,7 @@ std::unique_ptr<void, std::function<void(void *)>> createHashJoinKeyGetter(
7979
using KeyGetterType##METHOD = typename HashJoinKeyGetterForType<HashJoinKeyMethod::METHOD>::Type; \
8080
return std::unique_ptr<void, std::function<void(void *)>>( \
8181
static_cast<void *>(new KeyGetterType##METHOD(collators)), \
82-
[](void * ptr) { delete reinterpret_cast<KeyGetterType##METHOD *>(ptr); }); \
83-
break;
82+
[](void * ptr) { delete reinterpret_cast<KeyGetterType##METHOD *>(ptr); });
8483
APPLY_FOR_HASH_JOIN_VARIANTS(M)
8584
#undef M
8685

dbms/src/Interpreters/JoinV2/HashJoinProbe.cpp

Lines changed: 54 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <Interpreters/JoinUtils.h>
2222
#include <Interpreters/JoinV2/HashJoin.h>
2323
#include <Interpreters/JoinV2/HashJoinProbe.h>
24+
#include <Interpreters/JoinV2/SemiJoinProbe.h>
2425
#include <Interpreters/NullableUtils.h>
2526
#include <Parsers/ASTTablesInSelectQuery.h>
2627

@@ -39,7 +40,9 @@ bool JoinProbeContext::isProbeFinished() const
3940
{
4041
return current_row_idx >= rows
4142
// For prefetching
42-
&& prefetch_active_states == 0;
43+
&& prefetch_active_states == 0
44+
// For (left outer) (anti) semi join with other conditions
45+
&& (semi_join_probe_list == nullptr || semi_join_probe_list->activeSlots() == 0);
4346
}
4447

4548
bool JoinProbeContext::isAllFinished() const
@@ -71,6 +74,7 @@ void JoinProbeContext::prepareForHashProbe(
7174
HashJoinKeyMethod method,
7275
ASTTableJoin::Kind kind,
7376
bool has_other_condition,
77+
bool has_other_eq_cond_from_in,
7478
const Names & key_names,
7579
const String & filter_column,
7680
const NameSet & probe_output_name_set,
@@ -123,6 +127,23 @@ void JoinProbeContext::prepareForHashProbe(
123127
{
124128
left_semi_match_res.clear();
125129
left_semi_match_res.resize_fill_zero(rows);
130+
if (has_other_eq_cond_from_in)
131+
{
132+
left_semi_match_null_res.clear();
133+
left_semi_match_null_res.resize_fill_zero(rows);
134+
}
135+
}
136+
if ((kind == Semi || kind == Anti) && has_other_condition)
137+
{
138+
semi_selective_offsets.clear();
139+
semi_selective_offsets.reserve(rows);
140+
}
141+
142+
if (SemiJoinProbeHelper::isSupported(kind, has_other_condition))
143+
{
144+
if unlikely (!semi_join_probe_list)
145+
semi_join_probe_list = createSemiJoinProbeList(method);
146+
semi_join_probe_list->reset(rows);
126147
}
127148

128149
is_prepared = true;
@@ -634,16 +655,17 @@ void JoinProbeHelper::probeFillColumns(JoinProbeContext & ctx, JoinProbeWorkerDa
634655
using Adder = JoinProbeAdder<kind, has_other_condition, late_materialization>;
635656

636657
auto & key_getter = *static_cast<KeyGetterType *>(ctx.key_getter.get());
658+
// Some columns in wd.result_block may remain empty due to late materialization for join with other conditions.
659+
// But since all columns are cleared after handling other conditions, wd.result_block.rows() is always 0.
637660
size_t current_offset = wd.result_block.rows();
661+
if constexpr (has_other_condition)
662+
RUNTIME_CHECK(current_offset == 0);
638663
size_t idx = ctx.current_row_idx;
639664
RowPtr ptr = ctx.current_build_row_ptr;
640665
bool is_matched = ctx.current_row_is_matched;
641666
size_t collision = 0;
642-
size_t key_offset = sizeof(RowPtr);
643-
if constexpr (KeyGetterType::joinKeyCompareHashFirst())
644-
{
645-
key_offset += sizeof(HashValueType);
646-
}
667+
constexpr size_t key_offset
668+
= sizeof(RowPtr) + (KeyGetterType::joinKeyCompareHashFirst() ? sizeof(HashValueType) : 0);
647669

648670
#define NOT_MATCHED(not_matched) \
649671
if constexpr (Adder::need_not_matched) \
@@ -782,8 +804,10 @@ struct ProbePrefetchState
782804
KeyType key{};
783805
union
784806
{
785-
RowPtr ptr = nullptr;
786-
std::atomic<RowPtr> * pointer_ptr;
807+
/// Used when stage is FindHeader
808+
std::atomic<RowPtr> * pointer_ptr = nullptr;
809+
/// Used when stage is FindNext
810+
RowPtr ptr;
787811
};
788812
};
789813

@@ -801,24 +825,26 @@ void JoinProbeHelper::probeFillColumnsPrefetch(
801825
using Adder = JoinProbeAdder<kind, has_other_condition, late_materialization>;
802826

803827
auto & key_getter = *static_cast<KeyGetterType *>(ctx.key_getter.get());
804-
if (!ctx.prefetch_states)
828+
const size_t probe_prefetch_step = settings.probe_prefetch_step;
829+
if unlikely (!ctx.prefetch_states)
805830
{
806831
ctx.prefetch_states = decltype(ctx.prefetch_states)(
807-
static_cast<void *>(new ProbePrefetchState<KeyGetter>[settings.probe_prefetch_step]),
832+
static_cast<void *>(new ProbePrefetchState<KeyGetter>[probe_prefetch_step]),
808833
[](void * ptr) { delete[] static_cast<ProbePrefetchState<KeyGetter> *>(ptr); });
809834
}
810835
auto * states = static_cast<ProbePrefetchState<KeyGetter> *>(ctx.prefetch_states.get());
811836

812837
size_t idx = ctx.current_row_idx;
813838
size_t active_states = ctx.prefetch_active_states;
814839
size_t k = ctx.prefetch_iter;
840+
// Some columns in wd.result_block may remain empty due to late materialization for join with other conditions.
841+
// But since all columns are cleared after handling other conditions, wd.result_block.rows() is always 0.
815842
size_t current_offset = wd.result_block.rows();
843+
if constexpr (has_other_condition)
844+
RUNTIME_CHECK(current_offset == 0);
816845
size_t collision = 0;
817-
size_t key_offset = sizeof(RowPtr);
818-
if constexpr (KeyGetterType::joinKeyCompareHashFirst())
819-
{
820-
key_offset += sizeof(HashValueType);
821-
}
846+
constexpr size_t key_offset
847+
= sizeof(RowPtr) + (KeyGetterType::joinKeyCompareHashFirst() ? sizeof(HashValueType) : 0);
822848

823849
#define NOT_MATCHED(not_matched, idx) \
824850
if constexpr (Adder::need_not_matched) \
@@ -831,7 +857,6 @@ void JoinProbeHelper::probeFillColumnsPrefetch(
831857
} \
832858
}
833859

834-
const size_t probe_prefetch_step = settings.probe_prefetch_step;
835860
while (idx < ctx.rows || active_states > 0)
836861
{
837862
k = k == probe_prefetch_step ? 0 : k;
@@ -845,11 +870,10 @@ void JoinProbeHelper::probeFillColumnsPrefetch(
845870
const auto & key2 = key_getter.deserializeJoinKey(ptr + key_offset);
846871
bool key_is_equal = joinKeyIsEqual(key_getter, state->key, key2, state->hash, ptr);
847872
collision += !key_is_equal;
873+
if constexpr (Adder::need_not_matched)
874+
state->is_matched |= key_is_equal;
848875
if (key_is_equal)
849876
{
850-
if constexpr (Adder::need_not_matched)
851-
state->is_matched = true;
852-
853877
if constexpr (Adder::need_matched)
854878
{
855879
bool is_end = Adder::addMatched(
@@ -1026,6 +1050,17 @@ Block JoinProbeHelper::handleOtherConditions(
10261050

10271051
non_equal_conditions.other_cond_expr->execute(exec_block);
10281052

1053+
SCOPE_EXIT({
1054+
RUNTIME_CHECK(wd.result_block.columns() == left_columns + right_columns);
1055+
/// Clear the data in result_block.
1056+
for (size_t i = 0; i < left_columns + right_columns; ++i)
1057+
{
1058+
auto column = wd.result_block.getByPosition(i).column->assumeMutable();
1059+
column->popBack(column->size());
1060+
wd.result_block.getByPosition(i).column = std::move(column);
1061+
}
1062+
});
1063+
10291064
size_t rows = exec_block.rows();
10301065
// Ensure BASE_OFFSETS is accessed within bound.
10311066
// It must be true because max_block_size <= BASE_OFFSETS.size(HASH_JOIN_MAX_BLOCK_SIZE_UPPER_BOUND).
@@ -1212,17 +1247,6 @@ Block JoinProbeHelper::handleOtherConditions(
12121247
}
12131248
};
12141249

1215-
SCOPE_EXIT({
1216-
RUNTIME_CHECK(wd.result_block.columns() == left_columns + right_columns);
1217-
/// Clear the data in result_block.
1218-
for (size_t i = 0; i < left_columns + right_columns; ++i)
1219-
{
1220-
auto column = wd.result_block.getByPosition(i).column->assumeMutable();
1221-
column->popBack(column->size());
1222-
wd.result_block.getByPosition(i).column = std::move(column);
1223-
}
1224-
});
1225-
12261250
size_t length = std::min(result_size, remaining_insert_size);
12271251
fill_matched(0, length);
12281252
if (result_size >= remaining_insert_size)

dbms/src/Interpreters/JoinV2/HashJoinProbe.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <Interpreters/JoinV2/HashJoinKey.h>
2424
#include <Interpreters/JoinV2/HashJoinPointerTable.h>
2525
#include <Interpreters/JoinV2/HashJoinSettings.h>
26+
#include <Interpreters/JoinV2/SemiJoinProbeList.h>
2627
#include <Parsers/ASTTablesInSelectQuery.h>
2728
#include <absl/base/optimization.h>
2829

@@ -40,13 +41,19 @@ struct JoinProbeContext
4041
RowPtr current_build_row_ptr = nullptr;
4142
/// For left outer/(left outer) (anti) semi join without other conditions.
4243
bool current_row_is_matched = false;
43-
/// For left outer/(left outer) (anti) semi join with other conditions.
44+
/// For left outer with other conditions.
4445
IColumn::Filter rows_not_matched;
4546
/// < 0 means not_matched_offsets is not initialized.
4647
ssize_t not_matched_offsets_idx = -1;
4748
IColumn::Offsets not_matched_offsets;
4849
/// For left outer (anti) semi join.
4950
PaddedPODArray<Int8> left_semi_match_res;
51+
/// For left outer (anti) semi join with other-eq-from-in conditions.
52+
PaddedPODArray<UInt8> left_semi_match_null_res;
53+
/// For (anti) semi join with other conditions.
54+
IColumn::Offsets semi_selective_offsets;
55+
/// For (left outer) (anti) semi join with other conditions.
56+
std::unique_ptr<ISemiJoinProbeList> semi_join_probe_list;
5057

5158
size_t prefetch_active_states = 0;
5259
size_t prefetch_iter = 0;
@@ -69,6 +76,7 @@ struct JoinProbeContext
6976
HashJoinKeyMethod method,
7077
ASTTableJoin::Kind kind,
7178
bool has_other_condition,
79+
bool has_other_eq_cond_from_in,
7280
const Names & key_names,
7381
const String & filter_column,
7482
const NameSet & probe_output_name_set,

0 commit comments

Comments
 (0)