Skip to content

Commit e0c8d61

Browse files
authored
Merge pull request #5 from spongedu/win_sort
Support window sort on stream agg
2 parents d0deb5a + d36b70b commit e0c8d61

File tree

7 files changed

+101
-9
lines changed

7 files changed

+101
-9
lines changed

executor/builder.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1254,6 +1254,7 @@ func (b *executorBuilder) buildSort(v *plannercore.PhysicalSort) Executor {
12541254
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec),
12551255
ByItems: v.ByItems,
12561256
schema: v.Schema(),
1257+
streamWindowSort: v.StreamWinSort,
12571258
}
12581259
metrics.ExecutorCounter.WithLabelValues("SortExec").Inc()
12591260
return &sortExec

executor/sort.go

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,12 @@ type SortExec struct {
4848
rowChunks *chunk.List
4949
// rowPointer store the chunk index and row index for each row.
5050
rowPtrs []chunk.RowPtr
51-
5251
memTracker *memory.Tracker
52+
53+
streamWindowSort bool
54+
55+
sterminated bool
56+
chk *chunk.Chunk
5357
}
5458

5559
// Close implements the Executor Close interface.
@@ -63,6 +67,9 @@ func (e *SortExec) Close() error {
6367
func (e *SortExec) Open(ctx context.Context) error {
6468
e.fetched = false
6569
e.Idx = 0
70+
e.sterminated = false
71+
e.chk = nil
72+
6673

6774
// To avoid duplicated initialization for TopNExec.
6875
if e.memTracker == nil {
@@ -74,6 +81,18 @@ func (e *SortExec) Open(ctx context.Context) error {
7481

7582
// Next implements the Executor Next interface.
7683
func (e *SortExec) Next(ctx context.Context, chk *chunk.Chunk) error {
84+
if !e.streamWindowSort {
85+
return e.next(ctx, chk)
86+
} else {
87+
if e.sterminated {
88+
return nil
89+
}
90+
e.sreset()
91+
return e.snext(ctx, chk)
92+
}
93+
}
94+
95+
func (e *SortExec) next(ctx context.Context, chk *chunk.Chunk) error {
7796
if e.runtimeStats != nil {
7897
start := time.Now()
7998
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
@@ -110,6 +129,74 @@ func (e *SortExec) Next(ctx context.Context, chk *chunk.Chunk) error {
110129
return nil
111130
}
112131

132+
func (e *SortExec) snext(ctx context.Context, chk *chunk.Chunk) error {
133+
if e.runtimeStats != nil {
134+
start := time.Now()
135+
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
136+
}
137+
chk.Reset()
138+
if !e.fetched {
139+
err := e.fetchWindow(ctx)
140+
if err != nil {
141+
return errors.Trace(err)
142+
}
143+
e.initPointers()
144+
e.initCompareFuncs()
145+
allColumnExpr := e.buildKeyColumns()
146+
if allColumnExpr {
147+
sort.Slice(e.rowPtrs, e.keyColumnsLess)
148+
} else {
149+
e.buildKeyExprsAndTypes()
150+
err = e.buildKeyChunks()
151+
if err != nil {
152+
return errors.Trace(err)
153+
}
154+
sort.Slice(e.rowPtrs, e.keyChunksLess)
155+
}
156+
e.fetched = true
157+
}
158+
for chk.NumRows() < e.maxChunkSize {
159+
if e.Idx >= len(e.rowPtrs) {
160+
break
161+
}
162+
rowPtr := e.rowPtrs[e.Idx]
163+
chk.AppendRow(e.rowChunks.GetRow(rowPtr))
164+
e.Idx++
165+
}
166+
return nil
167+
}
168+
169+
func (e *SortExec) sreset() {
170+
e.Idx = 0
171+
e.fetched = false
172+
}
173+
174+
func (e *SortExec) fetchWindow(ctx context.Context) error {
175+
fields := e.retTypes()
176+
e.rowChunks = chunk.NewList(fields, e.initCap, e.maxChunkSize)
177+
e.rowChunks.GetMemTracker().AttachTo(e.memTracker)
178+
e.rowChunks.GetMemTracker().SetLabel("rowChunks")
179+
for {
180+
if e.chk == nil {
181+
e.chk = e.children[0].newFirstChunk()
182+
}
183+
err := e.children[0].Next(ctx, e.chk)
184+
if err != nil {
185+
return errors.Trace(err)
186+
}
187+
rowCount := e.chk.NumRows()
188+
if rowCount == 0 {
189+
e.sterminated = true
190+
break
191+
}
192+
e.rowChunks.Add(e.chk)
193+
if rowCount < e.maxChunkSize {
194+
break
195+
}
196+
}
197+
return nil
198+
}
199+
113200
func (e *SortExec) fetchRowChunks(ctx context.Context) error {
114201
fields := e.retTypes()
115202
e.rowChunks = chunk.NewList(fields, e.initCap, e.maxChunkSize)
@@ -125,7 +212,6 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error {
125212
if rowCount == 0 {
126213
break
127214
}
128-
e.rowChunks.Add(chk)
129215
}
130216
return nil
131217
}
@@ -222,6 +308,9 @@ func (e *SortExec) keyChunksLess(i, j int) bool {
222308
return e.lessRow(keyRowI, keyRowJ)
223309
}
224310

311+
312+
313+
225314
// TopNExec implements a Top-N algorithm and it is built from a SELECT statement with ORDER BY and LIMIT.
226315
// Instead of sorting all the rows fetched from the table, it keeps the Top-N elements only in a heap to reduce memory usage.
227316
type TopNExec struct {

expression/aggregation/descriptor.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
"github.com/pingcap/tidb/types"
3232
)
3333

34-
// TODO: Complete here
3534
type AggWindowDesc struct {
3635
WinColName string
3736
Size uint64

planner/core/exhaust_physical_plans.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -905,7 +905,7 @@ func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty)
905905
}
906906

907907
func (ls *LogicalSort) getPhysicalSort(prop *property.PhysicalProperty) *PhysicalSort {
908-
ps := PhysicalSort{ByItems: ls.ByItems}.Init(ls.ctx, ls.stats.ScaleByExpectCnt(prop.ExpectedCnt), &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64})
908+
ps := PhysicalSort{ByItems: ls.ByItems, StreamWinSort: ls.StreamWindowSort}.Init(ls.ctx, ls.stats.ScaleByExpectCnt(prop.ExpectedCnt), &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64})
909909
return ps
910910
}
911911

planner/core/logical_plan_builder.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -742,7 +742,7 @@ func (b *PlanBuilder) buildUnion(union *ast.UnionStmt) (LogicalPlan, error) {
742742
oldLen := unionPlan.Schema().Len()
743743

744744
if union.OrderBy != nil {
745-
unionPlan, err = b.buildSort(unionPlan, union.OrderBy.Items, nil)
745+
unionPlan, err = b.buildSort(unionPlan, union.OrderBy.Items, nil, false)
746746
if err != nil {
747747
return nil, errors.Trace(err)
748748
}
@@ -830,7 +830,7 @@ func (by *ByItems) Clone() *ByItems {
830830
return &ByItems{Expr: by.Expr.Clone(), Desc: by.Desc}
831831
}
832832

833-
func (b *PlanBuilder) buildSort(p LogicalPlan, byItems []*ast.ByItem, aggMapper map[*ast.AggregateFuncExpr]int) (*LogicalSort, error) {
833+
func (b *PlanBuilder) buildSort(p LogicalPlan, byItems []*ast.ByItem, aggMapper map[*ast.AggregateFuncExpr]int, streamWinSort bool) (*LogicalSort, error) {
834834
b.curClause = orderByClause
835835
sort := LogicalSort{}.Init(b.ctx)
836836
exprs := make([]*ByItems, 0, len(byItems))
@@ -845,6 +845,7 @@ func (b *PlanBuilder) buildSort(p LogicalPlan, byItems []*ast.ByItem, aggMapper
845845
}
846846
sort.ByItems = exprs
847847
sort.SetChildren(p)
848+
sort.StreamWindowSort = streamWinSort
848849
return sort, nil
849850
}
850851

@@ -1761,7 +1762,7 @@ func (b *PlanBuilder) buildSelect(sel *ast.SelectStmt) (p LogicalPlan, err error
17611762
}
17621763

17631764
if sel.OrderBy != nil {
1764-
p, err = b.buildSort(p, sel.OrderBy.Items, orderMap)
1765+
p, err = b.buildSort(p, sel.OrderBy.Items, orderMap, sel.StreamWindowSpec != nil)
17651766
if err != nil {
17661767
return nil, errors.Trace(err)
17671768
}
@@ -2120,7 +2121,7 @@ func (b *PlanBuilder) buildUpdate(update *ast.UpdateStmt) (Plan, error) {
21202121
}
21212122
}
21222123
if sel.OrderBy != nil {
2123-
p, err = b.buildSort(p, sel.OrderBy.Items, nil)
2124+
p, err = b.buildSort(p, sel.OrderBy.Items, nil, false)
21242125
if err != nil {
21252126
return nil, errors.Trace(err)
21262127
}
@@ -2280,7 +2281,7 @@ func (b *PlanBuilder) buildDelete(delete *ast.DeleteStmt) (Plan, error) {
22802281
}
22812282

22822283
if sel.OrderBy != nil {
2283-
p, err = b.buildSort(p, sel.OrderBy.Items, nil)
2284+
p, err = b.buildSort(p, sel.OrderBy.Items, nil, false)
22842285
if err != nil {
22852286
return nil, errors.Trace(err)
22862287
}

planner/core/logical_plans.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,7 @@ type LogicalSort struct {
581581
baseLogicalPlan
582582

583583
ByItems []*ByItems
584+
StreamWindowSort bool
584585
}
585586

586587
func (ls *LogicalSort) extractCorrelatedCols() []*expression.CorrelatedColumn {

planner/core/physical_plans.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ type PhysicalSort struct {
339339
basePhysicalPlan
340340

341341
ByItems []*ByItems
342+
StreamWinSort bool
342343
}
343344

344345
// NominalSort asks sort properties for its child. It is a fake operator that will not

0 commit comments

Comments
 (0)