Skip to content

Commit a8d0ff4

Browse files
committed
Merge branch 'tidb_velox' of github.com:guo-shaoge/tidb into tidb_plan_to_substrait
2 parents 7c8725d + eb06cae commit a8d0ff4

File tree

3 files changed

+311
-142
lines changed

3 files changed

+311
-142
lines changed

executor/builder.go

Lines changed: 164 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ type executorBuilder struct {
111111
// can return a correct value even if the session context has already been destroyed
112112
forDataReaderBuilder bool
113113
dataReaderTS uint64
114+
115+
veloxExec *VeloxExec
114116
}
115117

116118
// CTEStorages stores resTbl and iterInTbl for CTEExec.
@@ -161,150 +163,152 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
161163
switch v := p.(type) {
162164
case nil:
163165
return nil
164-
case *plannercore.Change:
165-
return b.buildChange(v)
166-
case *plannercore.CheckTable:
167-
return b.buildCheckTable(v)
168-
case *plannercore.RecoverIndex:
169-
return b.buildRecoverIndex(v)
170-
case *plannercore.CleanupIndex:
171-
return b.buildCleanupIndex(v)
172-
case *plannercore.CheckIndexRange:
173-
return b.buildCheckIndexRange(v)
174-
case *plannercore.ChecksumTable:
175-
return b.buildChecksumTable(v)
176-
case *plannercore.ReloadExprPushdownBlacklist:
177-
return b.buildReloadExprPushdownBlacklist(v)
178-
case *plannercore.ReloadOptRuleBlacklist:
179-
return b.buildReloadOptRuleBlacklist(v)
180-
case *plannercore.AdminPlugins:
181-
return b.buildAdminPlugins(v)
182-
case *plannercore.DDL:
183-
return b.buildDDL(v)
184-
case *plannercore.Deallocate:
185-
return b.buildDeallocate(v)
186-
case *plannercore.Delete:
187-
return b.buildDelete(v)
188-
case *plannercore.Execute:
189-
return b.buildExecute(v)
190-
case *plannercore.Trace:
191-
return b.buildTrace(v)
192-
case *plannercore.Explain:
193-
return b.buildExplain(v)
194-
case *plannercore.PointGetPlan:
195-
return b.buildPointGet(v)
196-
case *plannercore.BatchPointGetPlan:
197-
return b.buildBatchPointGet(v)
198-
case *plannercore.Insert:
199-
return b.buildInsert(v)
200-
case *plannercore.LoadData:
201-
return b.buildLoadData(v)
202-
case *plannercore.LoadStats:
203-
return b.buildLoadStats(v)
204-
case *plannercore.IndexAdvise:
205-
return b.buildIndexAdvise(v)
206-
case *plannercore.PlanReplayer:
207-
return b.buildPlanReplayer(v)
208-
case *plannercore.PhysicalLimit:
209-
return b.buildLimit(v)
210-
case *plannercore.Prepare:
211-
return b.buildPrepare(v)
212-
case *plannercore.PhysicalLock:
213-
return b.buildSelectLock(v)
214-
case *plannercore.CancelDDLJobs:
215-
return b.buildCancelDDLJobs(v)
216-
case *plannercore.ShowNextRowID:
217-
return b.buildShowNextRowID(v)
218-
case *plannercore.ShowDDL:
219-
return b.buildShowDDL(v)
220-
case *plannercore.PhysicalShowDDLJobs:
221-
return b.buildShowDDLJobs(v)
222-
case *plannercore.ShowDDLJobQueries:
223-
return b.buildShowDDLJobQueries(v)
224-
case *plannercore.ShowDDLJobQueriesWithRange:
225-
return b.buildShowDDLJobQueriesWithRange(v)
226-
case *plannercore.ShowSlow:
227-
return b.buildShowSlow(v)
228-
case *plannercore.PhysicalShow:
229-
return b.buildShow(v)
230-
case *plannercore.Simple:
231-
return b.buildSimple(v)
232-
case *plannercore.PhysicalSimpleWrapper:
233-
return b.buildSimple(&v.Inner)
234-
case *plannercore.Set:
235-
return b.buildSet(v)
236-
case *plannercore.SetConfig:
237-
return b.buildSetConfig(v)
238-
case *plannercore.PhysicalSort:
239-
return b.buildSort(v)
240-
case *plannercore.PhysicalTopN:
241-
return b.buildTopN(v)
242-
case *plannercore.PhysicalUnionAll:
243-
return b.buildUnionAll(v)
244-
case *plannercore.Update:
245-
return b.buildUpdate(v)
246-
case *plannercore.PhysicalUnionScan:
247-
return b.buildUnionScanExec(v)
248-
case *plannercore.PhysicalHashJoin:
249-
return b.buildHashJoin(v)
250-
case *plannercore.PhysicalMergeJoin:
251-
return b.buildMergeJoin(v)
252-
case *plannercore.PhysicalIndexJoin:
253-
return b.buildIndexLookUpJoin(v)
254-
case *plannercore.PhysicalIndexMergeJoin:
255-
return b.buildIndexLookUpMergeJoin(v)
256-
case *plannercore.PhysicalIndexHashJoin:
257-
return b.buildIndexNestedLoopHashJoin(v)
258-
case *plannercore.PhysicalSelection:
259-
return b.buildSelection(v)
260-
case *plannercore.PhysicalHashAgg:
261-
return b.buildHashAgg(v)
262-
case *plannercore.PhysicalStreamAgg:
263-
return b.buildStreamAgg(v)
166+
// case *plannercore.Change:
167+
// return b.buildChange(v)
168+
// case *plannercore.CheckTable:
169+
// return b.buildCheckTable(v)
170+
// case *plannercore.RecoverIndex:
171+
// return b.buildRecoverIndex(v)
172+
// case *plannercore.CleanupIndex:
173+
// return b.buildCleanupIndex(v)
174+
// case *plannercore.CheckIndexRange:
175+
// return b.buildCheckIndexRange(v)
176+
// case *plannercore.ChecksumTable:
177+
// return b.buildChecksumTable(v)
178+
// case *plannercore.ReloadExprPushdownBlacklist:
179+
// return b.buildReloadExprPushdownBlacklist(v)
180+
// case *plannercore.ReloadOptRuleBlacklist:
181+
// return b.buildReloadOptRuleBlacklist(v)
182+
// case *plannercore.AdminPlugins:
183+
// return b.buildAdminPlugins(v)
184+
// case *plannercore.DDL:
185+
// return b.buildDDL(v)
186+
// case *plannercore.Deallocate:
187+
// return b.buildDeallocate(v)
188+
// case *plannercore.Delete:
189+
// return b.buildDelete(v)
190+
// case *plannercore.Execute:
191+
// return b.buildExecute(v)
192+
// case *plannercore.Trace:
193+
// return b.buildTrace(v)
194+
// case *plannercore.Explain:
195+
// return b.buildExplain(v)
196+
// case *plannercore.PointGetPlan:
197+
// return b.buildPointGet(v)
198+
// case *plannercore.BatchPointGetPlan:
199+
// return b.buildBatchPointGet(v)
200+
// case *plannercore.Insert:
201+
// return b.buildInsert(v)
202+
// case *plannercore.LoadData:
203+
// return b.buildLoadData(v)
204+
// case *plannercore.LoadStats:
205+
// return b.buildLoadStats(v)
206+
// case *plannercore.IndexAdvise:
207+
// return b.buildIndexAdvise(v)
208+
// case *plannercore.PlanReplayer:
209+
// return b.buildPlanReplayer(v)
210+
// case *plannercore.PhysicalLimit:
211+
// return b.buildLimit(v)
212+
// case *plannercore.Prepare:
213+
// return b.buildPrepare(v)
214+
// case *plannercore.PhysicalLock:
215+
// return b.buildSelectLock(v)
216+
// case *plannercore.CancelDDLJobs:
217+
// return b.buildCancelDDLJobs(v)
218+
// case *plannercore.ShowNextRowID:
219+
// return b.buildShowNextRowID(v)
220+
// case *plannercore.ShowDDL:
221+
// return b.buildShowDDL(v)
222+
// case *plannercore.PhysicalShowDDLJobs:
223+
// return b.buildShowDDLJobs(v)
224+
// case *plannercore.ShowDDLJobQueries:
225+
// return b.buildShowDDLJobQueries(v)
226+
// case *plannercore.ShowDDLJobQueriesWithRange:
227+
// return b.buildShowDDLJobQueriesWithRange(v)
228+
// case *plannercore.ShowSlow:
229+
// return b.buildShowSlow(v)
230+
// case *plannercore.PhysicalShow:
231+
// return b.buildShow(v)
232+
// case *plannercore.Simple:
233+
// return b.buildSimple(v)
234+
// case *plannercore.PhysicalSimpleWrapper:
235+
// return b.buildSimple(&v.Inner)
236+
// case *plannercore.Set:
237+
// return b.buildSet(v)
238+
// case *plannercore.SetConfig:
239+
// return b.buildSetConfig(v)
240+
// case *plannercore.PhysicalSort:
241+
// return b.buildSort(v)
242+
// case *plannercore.PhysicalTopN:
243+
// return b.buildTopN(v)
244+
// case *plannercore.PhysicalUnionAll:
245+
// return b.buildUnionAll(v)
246+
// case *plannercore.Update:
247+
// return b.buildUpdate(v)
248+
// case *plannercore.PhysicalUnionScan:
249+
// return b.buildUnionScanExec(v)
250+
// case *plannercore.PhysicalHashJoin:
251+
// return b.buildHashJoin(v)
252+
// case *plannercore.PhysicalMergeJoin:
253+
// return b.buildMergeJoin(v)
254+
// case *plannercore.PhysicalIndexJoin:
255+
// return b.buildIndexLookUpJoin(v)
256+
// case *plannercore.PhysicalIndexMergeJoin:
257+
// return b.buildIndexLookUpMergeJoin(v)
258+
// case *plannercore.PhysicalIndexHashJoin:
259+
// return b.buildIndexNestedLoopHashJoin(v)
260+
// case *plannercore.PhysicalSelection:
261+
// return b.buildSelection(v)
262+
// case *plannercore.PhysicalHashAgg:
263+
// return b.buildHashAgg(v)
264+
// case *plannercore.PhysicalStreamAgg:
265+
// return b.buildStreamAgg(v)
264266
case *plannercore.PhysicalProjection:
265-
return b.buildProjection(v)
266-
case *plannercore.PhysicalMemTable:
267-
return b.buildMemTable(v)
268-
case *plannercore.PhysicalTableDual:
269-
return b.buildTableDual(v)
270-
case *plannercore.PhysicalApply:
271-
return b.buildApply(v)
272-
case *plannercore.PhysicalMaxOneRow:
273-
return b.buildMaxOneRow(v)
274-
case *plannercore.Analyze:
275-
return b.buildAnalyze(v)
267+
return b.buildProjectionVelox(v)
268+
// case *plannercore.PhysicalMemTable:
269+
// return b.buildMemTable(v)
270+
// case *plannercore.PhysicalTableDual:
271+
// return b.buildTableDual(v)
272+
// case *plannercore.PhysicalApply:
273+
// return b.buildApply(v)
274+
// case *plannercore.PhysicalMaxOneRow:
275+
// return b.buildMaxOneRow(v)
276+
// case *plannercore.Analyze:
277+
// return b.buildAnalyze(v)
278+
// case *plannercore.PhysicalTableReader:
279+
// return b.buildTableReader(v)
280+
// case *plannercore.PhysicalTableSample:
281+
// return b.buildTableSample(v)
282+
// case *plannercore.PhysicalIndexReader:
283+
// return b.buildIndexReader(v)
284+
// case *plannercore.PhysicalIndexLookUpReader:
285+
// return b.buildIndexLookUpReader(v)
286+
// case *plannercore.PhysicalWindow:
287+
// return b.buildWindow(v)
288+
// case *plannercore.PhysicalShuffle:
289+
// return b.buildShuffle(v)
290+
// case *plannercore.PhysicalShuffleReceiverStub:
291+
// return b.buildShuffleReceiverStub(v)
292+
// case *plannercore.SQLBindPlan:
293+
// return b.buildSQLBindExec(v)
294+
// case *plannercore.SplitRegion:
295+
// return b.buildSplitRegion(v)
296+
// case *plannercore.PhysicalIndexMergeReader:
297+
// return b.buildIndexMergeReader(v)
298+
// case *plannercore.SelectInto:
299+
// return b.buildSelectInto(v)
300+
// case *plannercore.AdminShowTelemetry:
301+
// return b.buildAdminShowTelemetry(v)
302+
// case *plannercore.AdminResetTelemetryID:
303+
// return b.buildAdminResetTelemetryID(v)
304+
// case *plannercore.PhysicalCTE:
305+
// return b.buildCTE(v)
306+
// case *plannercore.PhysicalCTETable:
307+
// return b.buildCTETableReader(v)
308+
// case *plannercore.CompactTable:
309+
// return b.buildCompactTable(v)
276310
case *plannercore.PhysicalTableReader:
277-
return b.buildTableReader(v)
278-
case *plannercore.PhysicalTableSample:
279-
return b.buildTableSample(v)
280-
case *plannercore.PhysicalIndexReader:
281-
return b.buildIndexReader(v)
282-
case *plannercore.PhysicalIndexLookUpReader:
283-
return b.buildIndexLookUpReader(v)
284-
case *plannercore.PhysicalWindow:
285-
return b.buildWindow(v)
286-
case *plannercore.PhysicalShuffle:
287-
return b.buildShuffle(v)
288-
case *plannercore.PhysicalShuffleReceiverStub:
289-
return b.buildShuffleReceiverStub(v)
290-
case *plannercore.SQLBindPlan:
291-
return b.buildSQLBindExec(v)
292-
case *plannercore.SplitRegion:
293-
return b.buildSplitRegion(v)
294-
case *plannercore.PhysicalIndexMergeReader:
295-
return b.buildIndexMergeReader(v)
296-
case *plannercore.SelectInto:
297-
return b.buildSelectInto(v)
298-
case *plannercore.AdminShowTelemetry:
299-
return b.buildAdminShowTelemetry(v)
300-
case *plannercore.AdminResetTelemetryID:
301-
return b.buildAdminResetTelemetryID(v)
302-
case *plannercore.PhysicalCTE:
303-
return b.buildCTE(v)
304-
case *plannercore.PhysicalCTETable:
305-
return b.buildCTETableReader(v)
306-
case *plannercore.CompactTable:
307-
return b.buildCompactTable(v)
311+
return b.buildTableReaderVelox(v)
308312
default:
309313
if mp, ok := p.(MockPhysicalPlan); ok {
310314
return mp.GetExecutor()
@@ -315,6 +319,24 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
315319
}
316320
}
317321

322+
func (b *executorBuilder) buildProjectionVelox(v *plannercore.PhysicalProjection) Executor {
323+
return b.build(v.Children()[0])
324+
}
325+
326+
func (b *executorBuilder) buildTableReaderVelox(v *plannercore.PhysicalTableReader) Executor {
327+
if b.veloxExec == nil {
328+
b.veloxExec = &VeloxExec{
329+
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
330+
}
331+
}
332+
tableReader := b.build(v)
333+
if b.err != nil {
334+
return nil
335+
}
336+
b.veloxExec.tableReaders = append(b.veloxExec.tableReaders, tableReader.(*TableReaderExecutor))
337+
return b.veloxExec
338+
}
339+
318340
func (b *executorBuilder) buildCancelDDLJobs(v *plannercore.CancelDDLJobs) Executor {
319341
e := &CancelDDLJobsExec{
320342
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package tidb_velox_wrapper
2+
3+
//#cgo CXXFLAGS: -std=c++17
4+
//#cgo CFLAGS: -I/home/guojiangtao/code/velox
5+
//#cgo LDFLAGS: -L${SRCDIR} -ltidb_velox -lvelox_vector -lstdc++ -lvelox_exception -lglog -lgflags -lfolly -lm -lunwind -ldouble-conversion -lfmt -liberty
6+
//
7+
//#include <velox/connectors/tidb/tidb_velox_wrapper.h>
8+
import "C"
9+
10+
type CGoRowVector C.CGoRowVector
11+
12+
type CGoVeloxDataSource struct {
13+
tidbDataSource *C.CGoTiDBDataSource
14+
id int64
15+
}
16+
17+
func NewCGoVeloxDataSource(id int64) *CGoVeloxDataSource {
18+
s := C.get_tidb_data_source(C.long(id))
19+
return &CGoVeloxDataSource{
20+
tidbDataSource: &s,
21+
id: id,
22+
}
23+
}
24+
25+
func (s *CGoVeloxDataSource) Enqueue(data CGoRowVector) {
26+
C.enqueue_tidb_data_source(C.long(s.id), C.CGoRowVector(data))
27+
}
28+
29+
func (s *CGoVeloxDataSource) Destroy() {
30+
}

0 commit comments

Comments
 (0)