Skip to content

Commit c6dc5fc

Browse files
Support non-recursive CTE in TiFlash (#10086)
ref #10085
1 parent ef34db9 commit c6dc5fc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+2070
-33
lines changed

dbms/src/Common/TiFlashMetrics.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
9595
F(type_exchange_receiver, {"type", "exchange_receiver"}), \
9696
F(type_projection, {"type", "projection"}), \
9797
F(type_partition_ts, {"type", "partition_table_scan"}), \
98+
F(type_cte_sink, {"type", "cte_sink"}), \
99+
F(type_cte_source, {"type", "cte_source"}), \
98100
F(type_window, {"type", "window"}), \
99101
F(type_window_sort, {"type", "window_sort"}), \
100102
F(type_expand, {"type", "expand"})) \
@@ -775,7 +777,8 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
775777
F(type_wait_on_tunnel_sender_write, {"type", "wait_on_tunnel_sender_write"}), \
776778
F(type_wait_on_join_build, {"type", "wait_on_join_build"}), \
777779
F(type_wait_on_join_probe, {"type", "wait_on_join_probe"}), \
778-
F(type_wait_on_result_queue_write, {"type", "wait_on_result_queue_write"})) \
780+
F(type_wait_on_result_queue_write, {"type", "wait_on_result_queue_write"}), \
781+
F(type_type_wait_on_cte_read, {"type", "type_wait_on_cte_read"})) \
779782
M(tiflash_pipeline_task_duration_seconds, \
780783
"Bucketed histogram of pipeline task duration in seconds", \
781784
Histogram, /* these command usually cost several hundred milliseconds to several seconds, increase the start bucket to 5ms */ \

dbms/src/Common/WeakHash.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class WeakHash32
2828
public:
2929
using Container = PaddedPODArray<UInt32>;
3030

31-
static constexpr UInt32 initial_hash = ~UInt32(0);
31+
static constexpr UInt32 initial_hash = ~static_cast<UInt32>(0);
3232

3333
explicit WeakHash32(size_t size)
3434
: data(size, initial_hash)

dbms/src/Flash/Coprocessor/DAGContext.h

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414

1515
#pragma once
1616

17+
#include <cstddef>
18+
#include <mutex>
19+
#include <unordered_map>
1720
#pragma GCC diagnostic push
1821
#pragma GCC diagnostic ignored "-Wunused-parameter"
1922
#ifdef __clang__
@@ -23,6 +26,7 @@
2326
#pragma GCC diagnostic pop
2427

2528
#include <Common/ConcurrentBoundedQueue.h>
29+
#include <Common/Exception.h>
2630
#include <Common/Logger.h>
2731
#include <Core/QueryOperatorSpillContexts.h>
2832
#include <Core/TaskOperatorSpillContexts.h>
@@ -55,6 +59,7 @@ class CoprocessorReader;
5559
using CoprocessorReaderPtr = std::shared_ptr<CoprocessorReader>;
5660

5761
class AutoSpillTrigger;
62+
class CTE;
5863

5964
struct JoinProfileInfo
6065
{
@@ -359,6 +364,70 @@ class DAGContext
359364

360365
MPPReceiverSetPtr getMPPReceiverSet() const { return mpp_receiver_set; }
361366

367+
String getQueryIDAndCTEIDForSink()
368+
{
369+
std::lock_guard<std::mutex> lock(this->cte_mu);
370+
return this->query_id_and_cte_id_for_sink;
371+
}
372+
373+
String getQueryIDAndCTEIDForSource(size_t cte_id)
374+
{
375+
std::lock_guard<std::mutex> lock(this->cte_mu);
376+
return this->query_id_and_cte_id_for_sources[cte_id];
377+
}
378+
379+
void setQueryIDAndCTEIDForSink(const String & query_id_and_cte_id)
380+
{
381+
std::lock_guard<std::mutex> lock(this->cte_mu);
382+
383+
// MPP Task has only one CTESink, it's impossible to set query_id_and_cte_id_for_sink twice
384+
RUNTIME_CHECK(this->query_id_and_cte_id_for_sink.empty(), this->query_id_and_cte_id_for_sink);
385+
this->query_id_and_cte_id_for_sink = query_id_and_cte_id;
386+
}
387+
388+
void addQueryIDAndCTEIDForSource(size_t cte_id, const String & query_id_and_cte_id)
389+
{
390+
std::lock_guard<std::mutex> lock(this->cte_mu);
391+
auto iter = this->query_id_and_cte_id_for_sources.find(cte_id);
392+
if (iter != this->query_id_and_cte_id_for_sources.end())
393+
{
394+
RUNTIME_CHECK_MSG(iter->second == query_id_and_cte_id, "{} vs {}", iter->second, query_id_and_cte_id);
395+
return;
396+
}
397+
this->query_id_and_cte_id_for_sources.insert(std::make_pair(cte_id, query_id_and_cte_id));
398+
}
399+
400+
std::shared_ptr<CTE> getCTESink()
401+
{
402+
std::lock_guard<std::mutex> lock(this->cte_mu);
403+
return this->sink_cte;
404+
}
405+
406+
std::unordered_map<size_t, std::shared_ptr<CTE>> getCTESource()
407+
{
408+
std::lock_guard<std::mutex> lock(this->cte_mu);
409+
return this->source_ctes;
410+
}
411+
412+
void setCTESink(std::shared_ptr<CTE> & cte)
413+
{
414+
std::lock_guard<std::mutex> lock(this->cte_mu);
415+
RUNTIME_CHECK(!this->sink_cte);
416+
this->sink_cte = cte;
417+
}
418+
419+
void addCTESource(size_t cte_id, std::shared_ptr<CTE> & cte)
420+
{
421+
std::lock_guard<std::mutex> lock(this->cte_mu);
422+
auto iter = this->source_ctes.find(cte_id);
423+
if (iter != this->source_ctes.end())
424+
{
425+
RUNTIME_CHECK(iter->second.get() == cte.get());
426+
return;
427+
}
428+
this->source_ctes.insert(std::make_pair(cte_id, cte));
429+
}
430+
362431
public:
363432
DAGRequest dag_request;
364433
/// Some existing code inherited from Clickhouse assume that each query must have a valid query string and query ast,
@@ -475,6 +544,13 @@ class DAGContext
475544
UInt64 connection_id;
476545
// It's the session alias between mysql client and tidb
477546
String connection_alias;
547+
548+
String query_id_and_cte_id_for_sink;
549+
std::unordered_map<size_t, String> query_id_and_cte_id_for_sources;
550+
551+
std::mutex cte_mu;
552+
std::shared_ptr<CTE> sink_cte;
553+
std::unordered_map<size_t, std::shared_ptr<CTE>> source_ctes;
478554
};
479555

480556
} // namespace DB

dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ String genNameForExchangeReceiver(Int32 col_index)
6161
return fmt::format("exchange_receiver_{}", col_index);
6262
}
6363

64+
String genNameForCTESource(Int32 cte_id, Int32 col_index)
65+
{
66+
return fmt::format("cte_source_{}_{}", cte_id, col_index);
67+
}
68+
6469
NamesAndTypes genNamesAndTypes(const TiDB::ColumnInfos & column_infos, const StringRef & column_prefix)
6570
{
6671
NamesAndTypes names_and_types;

dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ namespace DB
2727
NamesAndTypes genNamesAndTypesForExchangeReceiver(const TiDBTableScan & table_scan);
2828
NamesAndTypes genNamesAndTypesForTableScan(const TiDBTableScan & table_scan);
2929
String genNameForExchangeReceiver(Int32 col_index);
30+
String genNameForCTESource(Int32 cte_id, Int32 col_index);
3031

3132
NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef & column_prefix);
3233
NamesAndTypes genNamesAndTypes(const TiDB::ColumnInfos & column_infos, const StringRef & column_prefix);

dbms/src/Flash/Executor/PipelineExecutor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include <Flash/Coprocessor/DAGContext.h>
1616
#include <Flash/Executor/PipelineExecutor.h>
17+
#include <Flash/Executor/PipelineExecutorContext.h>
1718
#include <Flash/Pipeline/Pipeline.h>
1819
#include <Flash/Pipeline/Schedule/Events/Event.h>
1920
#include <Flash/Planner/PhysicalPlan.h>

dbms/src/Flash/Executor/PipelineExecutorContext.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <Flash/Mpp/Utils.h>
2121
#include <Flash/Pipeline/Schedule/TaskScheduler.h>
2222
#include <Flash/Pipeline/Schedule/Tasks/OneTimeNotifyFuture.h>
23+
#include <Operators/CTE.h>
2324
#include <Operators/SharedQueue.h>
2425

2526
#include <exception>
@@ -185,6 +186,14 @@ void PipelineExecutorContext::cancel()
185186
// pipeline tasks waiting in the WAIT_FOR_NOTIFY state from never being notified.
186187
if (dag_context->tunnel_set)
187188
dag_context->tunnel_set->close(getTrimmedErrMsg(), false);
189+
190+
auto cte = dag_context->getCTESink();
191+
if (cte)
192+
cte->notifyCancel<false>(getTrimmedErrMsg());
193+
194+
for (auto & p : dag_context->getCTESource())
195+
p.second->notifyCancel<false>(getTrimmedErrMsg());
196+
188197
if (auto mpp_receiver_set = dag_context->getMPPReceiverSet(); mpp_receiver_set)
189198
mpp_receiver_set->cancel();
190199
}

dbms/src/Flash/Executor/PipelineExecutorContext.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <Common/Logger.h>
1818
#include <Common/MemoryTracker.h>
1919
#include <Core/AutoSpillTrigger.h>
20+
#include <Flash/Coprocessor/DAGContext.h>
2021
#include <Flash/Executor/ExecutionResult.h>
2122
#include <Flash/Executor/ResultHandler.h>
2223
#include <Flash/Executor/ResultQueue_fwd.h>

dbms/src/Flash/Mpp/CTEManager.cpp

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include <Flash/Mpp/CTEManager.h>
16+
#include <fmt/core.h>
17+
#include <tipb/select.pb.h>
18+
19+
#include <mutex>
20+
#include <utility>
21+
22+
namespace DB
23+
{
24+
void CTEManager::releaseCTEBySource(const String & query_id_and_cte_id)
25+
{
26+
std::lock_guard<std::mutex> lock(this->mu);
27+
auto iter = this->ctes.find(query_id_and_cte_id);
28+
if unlikely (iter == this->ctes.end())
29+
// Maybe the task is cancelled and the cte has been released
30+
return;
31+
32+
iter->second->sourceExit();
33+
if (iter->second->allExit())
34+
this->ctes.erase(iter);
35+
}
36+
37+
void CTEManager::releaseCTEBySink(const tipb::SelectResponse & resp, const String & query_id_and_cte_id)
38+
{
39+
std::lock_guard<std::mutex> lock(this->mu);
40+
auto iter = this->ctes.find(query_id_and_cte_id);
41+
if unlikely (iter == this->ctes.end())
42+
// Maybe the task is cancelled and the cte has been released
43+
return;
44+
45+
std::shared_ptr<CTE> cte = iter->second;
46+
cte->addResp(resp);
47+
cte->sinkExit<false>();
48+
if (cte->allExit())
49+
this->ctes.erase(iter);
50+
}
51+
52+
void CTEManager::releaseCTE(const String & query_id_and_cte_id)
53+
{
54+
std::lock_guard<std::mutex> lock(this->mu);
55+
this->ctes.erase(query_id_and_cte_id);
56+
}
57+
58+
std::shared_ptr<CTE> CTEManager::getOrCreateCTE(
59+
const String & query_id_and_cte_id,
60+
Int32 concurrency,
61+
Int32 expected_sink_num,
62+
Int32 expected_source_num)
63+
{
64+
std::lock_guard<std::mutex> lock(this->mu);
65+
auto iter = this->ctes.find(query_id_and_cte_id);
66+
std::shared_ptr<CTE> cte;
67+
if (iter == this->ctes.end())
68+
{
69+
cte = std::make_shared<CTE>(concurrency, expected_sink_num, expected_source_num);
70+
this->ctes.insert(std::make_pair(query_id_and_cte_id, cte));
71+
}
72+
else
73+
{
74+
cte = iter->second;
75+
}
76+
77+
return cte;
78+
}
79+
80+
bool CTEManager::hasCTEForTest(const String & query_id_and_cte_id)
81+
{
82+
std::lock_guard<std::mutex> lock(this->mu);
83+
return !(this->ctes.find(query_id_and_cte_id) == this->ctes.end());
84+
}
85+
} // namespace DB

dbms/src/Flash/Mpp/CTEManager.h

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include <Operators/CTE.h>
18+
19+
#include <memory>
20+
#include <mutex>
21+
22+
namespace DB
23+
{
24+
class CTEManager
25+
{
26+
public:
27+
std::shared_ptr<CTE> getOrCreateCTE(
28+
const String & query_id_and_cte_id,
29+
Int32 concurrency,
30+
Int32 expected_sink_num,
31+
Int32 expected_source_num);
32+
void releaseCTEBySource(const String & query_id_and_cte_id);
33+
void releaseCTEBySink(const tipb::SelectResponse & resp, const String & query_id_and_cte_id);
34+
void releaseCTE(const String & query_id_and_cte_id);
35+
36+
bool hasCTEForTest(const String & query_id_and_cte_id);
37+
38+
private:
39+
std::mutex mu;
40+
std::unordered_map<String, std::shared_ptr<CTE>> ctes;
41+
};
42+
} // namespace DB

0 commit comments

Comments
 (0)