Skip to content

Commit d59431d

Browse files
authored
Merge pull request pingcap#3 from guo-shaoge/tidb_plan_to_substrait
2 parents eb06cae + a8d0ff4 commit d59431d

File tree

4 files changed

+264
-12
lines changed

4 files changed

+264
-12
lines changed

executor/adapter.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"context"
2020
"fmt"
2121
substraitgo "github.com/AilinKid/substrait-go/proto"
22+
"github.com/substrait-io/substrait-go/proto/extensions"
2223
"runtime/trace"
2324
"strconv"
2425
"strings"
@@ -483,9 +484,16 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
483484
sctx.GetSessionVars().StmtCtx.MemTracker.SetBytesLimit(sctx.GetSessionVars().StmtCtx.MemQuotaQuery)
484485
}
485486
if pp, ok := a.Plan.(plannercore.PhysicalPlan); ok {
486-
rel, err := pp.ToSubstraitPB(sctx)
487-
if err == nil && sctx.GetSessionVars().ConnectionID != 0 {
487+
ssHandler := plannercore.NewSubstraitHandler()
488+
rel, err := pp.ToSubstraitPB(sctx, ssHandler)
489+
if rel != nil && err == nil && sctx.GetSessionVars().ConnectionID != 0 {
488490
plan := &substraitgo.Plan{
491+
ExtensionUris: []*extensions.SimpleExtensionURI{
492+
{
493+
ExtensionUriAnchor: uint32(1),
494+
Uri: "whatever",
495+
},
496+
},
489497
Relations: []*substraitgo.PlanRel{
490498
{
491499
RelType: &substraitgo.PlanRel_Root{
@@ -494,7 +502,19 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
494502
},
495503
},
496504
}
497-
// 把这个 rel 传给 velox
505+
for funcName, funcAnchor := range ssHandler.SigMap {
506+
plan.Extensions = append(plan.Extensions, &extensions.SimpleExtensionDeclaration{
507+
MappingType: &extensions.SimpleExtensionDeclaration_ExtensionFunction_{
508+
ExtensionFunction: &extensions.SimpleExtensionDeclaration_ExtensionFunction{
509+
ExtensionUriReference: 1,
510+
FunctionAnchor: funcAnchor,
511+
Name: funcName,
512+
},
513+
},
514+
})
515+
}
516+
517+
// 把这个 plan 传给 velox
498518
logutil.BgLogger().Error(plan.String())
499519
}
500520
}

planner/core/plan.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ type PhysicalPlan interface {
341341
// current task. If the child's task is cop task, some operator may close this task and return a new rootTask.
342342
attach2Task(...task) task
343343

344-
ToSubstraitPB(ctx sessionctx.Context) (*substraitgo.Rel, error)
344+
ToSubstraitPB(ctx sessionctx.Context, ssHandler *SubstraitHandler) (*substraitgo.Rel, error)
345345

346346
// ToPB converts physical plan to tipb executor.
347347
ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error)

planner/core/plan_to_pb.go

Lines changed: 238 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ import (
2020
"github.com/pingcap/tidb/expression"
2121
"github.com/pingcap/tidb/expression/aggregation"
2222
"github.com/pingcap/tidb/kv"
23+
"github.com/pingcap/tidb/parser/ast"
2324
"github.com/pingcap/tidb/parser/model"
25+
"github.com/pingcap/tidb/parser/mysql"
2426
"github.com/pingcap/tidb/sessionctx"
2527
"github.com/pingcap/tidb/table"
2628
"github.com/pingcap/tidb/table/tables"
@@ -29,12 +31,14 @@ import (
2931
"github.com/pingcap/tidb/types"
3032
"github.com/pingcap/tidb/util"
3133
"github.com/pingcap/tidb/util/codec"
34+
"github.com/pingcap/tidb/util/logutil"
3235
"github.com/pingcap/tidb/util/ranger"
3336
"github.com/pingcap/tipb/go-tipb"
37+
"go.uber.org/zap"
3438
)
3539

3640
// ToSubstraitPB implements PhysicalPlan ToSubstraitPB interface.
37-
func (p *basePhysicalPlan) ToSubstraitPB(ctx sessionctx.Context) (_ *substraitgo.Rel, err error) {
41+
func (p *basePhysicalPlan) ToSubstraitPB(ctx sessionctx.Context, ssHandler *SubstraitHandler) (_ *substraitgo.Rel, err error) {
3842
return nil, errors.Errorf("plan %s fails converts to PB", p.basePlan.ExplainID())
3943
}
4044

@@ -125,11 +129,239 @@ func (p *PhysicalSelection) ToPB(ctx sessionctx.Context, storeType kv.StoreType)
125129
}
126130
return &tipb.Executor{Tp: tipb.ExecType_TypeSelection, Selection: selExec, ExecutorId: &executorID}, nil
127131
}
132+
func getSubStraitType(tp byte) string {
133+
if tp == mysql.TypeLong {
134+
return "i32"
135+
}
136+
if tp == mysql.TypeLonglong {
137+
return "i64"
138+
}
139+
if tp == mysql.TypeDouble {
140+
return "fp64"
141+
}
142+
panic("not supproted type")
143+
}
144+
145+
func getSubstraitPBFunctionArguments(offsets []int32) (funcArgs []*substraitgo.FunctionArgument) {
146+
getFunctionArgument := func(offset int32) *substraitgo.FunctionArgument {
147+
return &substraitgo.FunctionArgument{
148+
ArgType: &substraitgo.FunctionArgument_Value{
149+
Value: &substraitgo.Expression{
150+
RexType: &substraitgo.Expression_Selection{
151+
Selection: &substraitgo.Expression_FieldReference{
152+
ReferenceType: &substraitgo.Expression_FieldReference_DirectReference{
153+
DirectReference: &substraitgo.Expression_ReferenceSegment{
154+
ReferenceType: &substraitgo.Expression_ReferenceSegment_StructField_{
155+
StructField: &substraitgo.Expression_ReferenceSegment_StructField{
156+
// col offset from child chunk
157+
Field: offset,
158+
},
159+
},
160+
},
161+
},
162+
},
163+
},
164+
},
165+
},
166+
}
167+
}
168+
for _, off := range offsets {
169+
funcArgs = append(funcArgs, getFunctionArgument(off))
170+
}
171+
return
172+
}
173+
174+
type SubstraitHandler struct {
175+
autoIncId uint32
176+
SigMap map[string]uint32
177+
}
178+
179+
func NewSubstraitHandler() *SubstraitHandler {
180+
return &SubstraitHandler{
181+
autoIncId: uint32(1),
182+
SigMap: map[string]uint32{},
183+
}
184+
}
185+
186+
func (h *SubstraitHandler) insertSig(sigName string) uint32 {
187+
if _, ok := h.SigMap[sigName]; !ok {
188+
h.SigMap[sigName] = h.autoIncId
189+
h.autoIncId++
190+
}
191+
return h.getSigID(sigName)
192+
}
193+
194+
func (h *SubstraitHandler) getSigID(sigName string) uint32 {
195+
return h.SigMap[sigName]
196+
}
128197

129-
//func (p *PhysicalProjection) ToSubstraitPB() (*substraitgo.Rel, error){
130-
// proj := &substraitgo.ProjectRel{}
131-
// proj.Input
132-
//}
198+
func scalarFuncFieldTypeToSubstraitOutputType(sf *expression.ScalarFunction) (outputType *substraitgo.Type) {
199+
switch sf.GetType().EvalType() {
200+
case types.ETInt:
201+
outputType = &substraitgo.Type{
202+
Kind: &substraitgo.Type_I64_{
203+
I64: &substraitgo.Type_I64{},
204+
},
205+
}
206+
case types.ETReal:
207+
outputType = &substraitgo.Type{
208+
Kind: &substraitgo.Type_Fp64{
209+
Fp64: &substraitgo.Type_FP64{},
210+
},
211+
}
212+
}
213+
switch sf.FuncName.L {
214+
case "lt", "lte", "gt", "gte", "and", "or", "eq", "not":
215+
outputType = &substraitgo.Type{
216+
Kind: &substraitgo.Type_Bool{
217+
Bool: &substraitgo.Type_Boolean{
218+
Nullability: substraitgo.Type_NULLABILITY_NULLABLE,
219+
},
220+
},
221+
}
222+
}
223+
return
224+
}
225+
226+
func (h *SubstraitHandler) scalarFuncToSubstraitgoExpr(sf *expression.ScalarFunction) (*substraitgo.Expression, error) {
227+
funcSig := tiDBFuncNameToVeloxFuncName[sf.FuncName.L]
228+
var offsets []int32
229+
for _, arg := range sf.GetArgs() {
230+
col, ok := arg.(*expression.Column)
231+
if !ok {
232+
return nil, errors.Errorf("fail to convert proj to substrait, only support arg of column type")
233+
}
234+
offsets = append(offsets, int32(col.Index))
235+
}
236+
funcSig = funcSig + ":" + getSubStraitType(sf.GetArgs()[0].GetType().GetType()) + "_" + getSubStraitType(sf.GetArgs()[1].GetType().GetType())
237+
238+
sExpr := &substraitgo.Expression{
239+
RexType: &substraitgo.Expression_ScalarFunction_{
240+
ScalarFunction: &substraitgo.Expression_ScalarFunction{
241+
// function anchor
242+
FunctionReference: h.insertSig(funcSig),
243+
// function args
244+
Arguments: getSubstraitPBFunctionArguments(offsets),
245+
OutputType: scalarFuncFieldTypeToSubstraitOutputType(sf),
246+
},
247+
},
248+
}
249+
return sExpr, nil
250+
}
251+
252+
func (h *SubstraitHandler) columnToSubstraitgoExpr(col *expression.Column) *substraitgo.Expression {
253+
sExpr := &substraitgo.Expression{
254+
RexType: &substraitgo.Expression_Selection{
255+
Selection: &substraitgo.Expression_FieldReference{
256+
ReferenceType: &substraitgo.Expression_FieldReference_DirectReference{
257+
DirectReference: &substraitgo.Expression_ReferenceSegment{
258+
ReferenceType: &substraitgo.Expression_ReferenceSegment_StructField_{
259+
StructField: &substraitgo.Expression_ReferenceSegment_StructField{
260+
Field: int32(col.Index),
261+
},
262+
},
263+
},
264+
},
265+
},
266+
},
267+
}
268+
return sExpr
269+
}
270+
271+
func tidbConstToVeloxExpressionLiteral(cont *expression.Constant) (el *substraitgo.Expression_Literal) {
272+
switch cont.GetType().GetType() {
273+
case mysql.TypeLonglong:
274+
el = &substraitgo.Expression_Literal{
275+
LiteralType: &substraitgo.Expression_Literal_I64{
276+
I64: cont.Value.GetInt64(),
277+
},
278+
}
279+
case mysql.TypeFloat:
280+
el = &substraitgo.Expression_Literal{
281+
LiteralType: &substraitgo.Expression_Literal_Fp32{
282+
Fp32: cont.Value.GetFloat32(),
283+
},
284+
}
285+
case mysql.TypeDouble:
286+
el = &substraitgo.Expression_Literal{
287+
LiteralType: &substraitgo.Expression_Literal_Fp64{
288+
Fp64: cont.Value.GetFloat64(),
289+
},
290+
}
291+
}
292+
return
293+
}
294+
295+
func (h *SubstraitHandler) constToSubstraitgoExpr(cont *expression.Constant) *substraitgo.Expression {
296+
sExpr := &substraitgo.Expression{
297+
RexType: &substraitgo.Expression_Literal_{
298+
Literal: tidbConstToVeloxExpressionLiteral(cont),
299+
},
300+
}
301+
return sExpr
302+
}
303+
304+
func (h *SubstraitHandler) buildSubstraitProjExpression(tidbExpr []expression.Expression) (substraitgoExprs []*substraitgo.Expression, err error) {
305+
for _, expr := range tidbExpr {
306+
var sExpr *substraitgo.Expression
307+
var err error
308+
switch x := expr.(type) {
309+
case *expression.Column:
310+
sExpr = h.columnToSubstraitgoExpr(x)
311+
case *expression.Constant:
312+
sExpr = h.constToSubstraitgoExpr(x)
313+
case *expression.ScalarFunction:
314+
sExpr, err = h.scalarFuncToSubstraitgoExpr(x)
315+
if err != nil {
316+
return nil, err
317+
}
318+
}
319+
substraitgoExprs = append(substraitgoExprs, sExpr)
320+
}
321+
return substraitgoExprs, nil
322+
}
323+
324+
var tiDBFuncNameToVeloxFuncName = map[string]string{
325+
ast.LE: "lte",
326+
ast.LT: "lt",
327+
ast.GE: "gte",
328+
ast.GT: "gt",
329+
ast.Plus: "plus",
330+
ast.Minus: "minus",
331+
ast.Mul: "multiply",
332+
ast.Mod: "mod",
333+
ast.EQ: "eq",
334+
ast.UnaryNot: "not",
335+
ast.And: "and",
336+
ast.LogicOr: "or",
337+
}
338+
339+
// ToSubstraitPB implements the substraitgo interface.
340+
func (p *PhysicalProjection) ToSubstraitPB(ctx sessionctx.Context, ssHandler *SubstraitHandler) (*substraitgo.Rel, error) {
341+
childRel, err := p.children[0].ToSubstraitPB(ctx, ssHandler)
342+
if err != nil {
343+
return nil, err
344+
}
345+
sspb := &substraitgo.ProjectRel{
346+
Common: &substraitgo.RelCommon{
347+
EmitKind: &substraitgo.RelCommon_Direct_{
348+
Direct: &substraitgo.RelCommon_Direct{},
349+
},
350+
},
351+
Input: childRel,
352+
}
353+
sspbExpression, err := ssHandler.buildSubstraitProjExpression(p.Exprs)
354+
if err != nil {
355+
logutil.BgLogger().Error("error", zap.Error(err))
356+
return nil, err
357+
}
358+
sspb.Expressions = sspbExpression
359+
logutil.BgLogger().Warn("expression", zap.Int("len", len(sspb.Expressions)))
360+
// extract function map (assume there are all simple functions)
361+
return &substraitgo.Rel{
362+
RelType: &substraitgo.Rel_Project{Project: sspb},
363+
}, nil
364+
}
133365

134366
// ToPB implements PhysicalPlan ToPB interface.
135367
func (p *PhysicalProjection) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) {
@@ -195,7 +427,7 @@ func (p *PhysicalLimit) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*t
195427
return &tipb.Executor{Tp: tipb.ExecType_TypeLimit, Limit: limitExec, ExecutorId: &executorID}, nil
196428
}
197429

198-
func (p *PhysicalTableReader) ToSubstraitPB(ctx sessionctx.Context) (rel *substraitgo.Rel, err error) {
430+
func (p *PhysicalTableReader) ToSubstraitPB(ctx sessionctx.Context, ssHandler *SubstraitHandler) (rel *substraitgo.Rel, err error) {
199431
tableScan, ok := p.TablePlans[0].(*PhysicalTableScan)
200432
if !ok {
201433
return nil, nil

planner/core/point_get_plan.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ type PointGetPlan struct {
9393
accessCols []*expression.Column
9494
}
9595

96-
func (p *PointGetPlan) ToSubstraitPB(ctx sessionctx.Context) (*substraitgo.Rel, error) {
96+
func (p *PointGetPlan) ToSubstraitPB(ctx sessionctx.Context, ssHandler *SubstraitHandler) (*substraitgo.Rel, error) {
9797
return nil, nil
9898
}
9999

@@ -324,7 +324,7 @@ type BatchPointGetPlan struct {
324324
accessCols []*expression.Column
325325
}
326326

327-
func (p *BatchPointGetPlan) ToSubstraitPB(ctx sessionctx.Context) (*substraitgo.Rel, error) {
327+
func (p *BatchPointGetPlan) ToSubstraitPB(ctx sessionctx.Context, ssHandler *SubstraitHandler) (*substraitgo.Rel, error) {
328328
return nil, nil
329329
}
330330

0 commit comments

Comments
 (0)