Skip to content

Commit 681d79a

Browse files
YangKeaoti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#59985
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 6ab9aa3 commit 681d79a

File tree

3 files changed

+548
-0
lines changed

3 files changed

+548
-0
lines changed

pkg/server/conn_stmt.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,140 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
424424
return false, nil
425425
}
426426

427+
<<<<<<< HEAD
428+
=======
429+
func (cc *clientConn) executeWithCursor(ctx context.Context, stmt PreparedStatement, rs resultset.ResultSet) (lazy bool, err error) {
430+
vars := (&cc.ctx).GetSessionVars()
431+
if vars.EnableLazyCursorFetch {
432+
// try to execute with lazy cursor fetch
433+
ok, err := cc.executeWithLazyCursor(ctx, stmt, rs)
434+
435+
// if `ok` is false, should try to execute without lazy cursor fetch
436+
if ok {
437+
return true, err
438+
}
439+
}
440+
441+
failpoint.Inject("avoidEagerCursorFetch", func() {
442+
failpoint.Return(false, errors.New("failpoint avoids eager cursor fetch"))
443+
})
444+
cc.initResultEncoder(ctx)
445+
defer cc.rsEncoder.Clean()
446+
// fetch all results of the resultSet, and stored them locally, so that the future `FETCH` command can read
447+
// the rows directly to avoid running executor and accessing shared params/variables in the session
448+
// NOTE: chunk should not be allocated from the connection allocator, which will reset after executing this command
449+
// but the rows are still needed in the following FETCH command.
450+
451+
// create the row container to manage spill
452+
// this `rowContainer` will be released when the statement (or the connection) is closed.
453+
rowContainer := chunk.NewRowContainer(rs.FieldTypes(), vars.MaxChunkSize)
454+
rowContainer.GetMemTracker().AttachTo(vars.MemTracker)
455+
rowContainer.GetMemTracker().SetLabel(memory.LabelForCursorFetch)
456+
rowContainer.GetDiskTracker().AttachTo(vars.DiskTracker)
457+
rowContainer.GetDiskTracker().SetLabel(memory.LabelForCursorFetch)
458+
if vardef.EnableTmpStorageOnOOM.Load() {
459+
failpoint.Inject("testCursorFetchSpill", func(val failpoint.Value) {
460+
if val, ok := val.(bool); val && ok {
461+
actionSpill := rowContainer.ActionSpillForTest()
462+
defer actionSpill.WaitForTest()
463+
}
464+
})
465+
action := memory.NewActionWithPriority(rowContainer.ActionSpill(), memory.DefCursorFetchSpillPriority)
466+
vars.MemTracker.FallbackOldAndSetNewAction(action)
467+
}
468+
// store the rowContainer in the statement right after it's created, so that even if the logic in defer is not triggered,
469+
// the rowContainer will be released when the statement is closed.
470+
stmt.StoreRowContainer(rowContainer)
471+
defer func() {
472+
if err != nil {
473+
// if the execution panic, it'll not reach this branch. The `rowContainer` will be released in the `stmt.Close`.
474+
stmt.StoreRowContainer(nil)
475+
476+
rowContainer.GetMemTracker().Detach()
477+
rowContainer.GetDiskTracker().Detach()
478+
errCloseRowContainer := rowContainer.Close()
479+
if errCloseRowContainer != nil {
480+
logutil.Logger(ctx).Error("Fail to close rowContainer in error handler. May cause resource leak",
481+
zap.NamedError("original-error", err), zap.NamedError("close-error", errCloseRowContainer))
482+
}
483+
}
484+
}()
485+
486+
for {
487+
chk := rs.NewChunk(nil)
488+
489+
if err = rs.Next(ctx, chk); err != nil {
490+
return false, err
491+
}
492+
rowCount := chk.NumRows()
493+
if rowCount == 0 {
494+
break
495+
}
496+
497+
err = rowContainer.Add(chk)
498+
if err != nil {
499+
return false, err
500+
}
501+
}
502+
503+
reader := chunk.NewRowContainerReader(rowContainer)
504+
defer func() {
505+
if err != nil {
506+
reader.Close()
507+
}
508+
}()
509+
crs := resultset.WrapWithRowContainerCursor(rs, reader)
510+
if cl, ok := crs.(resultset.FetchNotifier); ok {
511+
cl.OnFetchReturned()
512+
}
513+
514+
err = cc.writeExecuteResultWithCursor(ctx, stmt, crs)
515+
return false, err
516+
}
517+
518+
// executeWithLazyCursor tries to detach the `ResultSet` and make it suitable to execute lazily.
519+
// Be careful that the return value `(bool, error)` has different meaning with other similar functions. The first `bool` represent whether
520+
// the `ResultSet` is suitable for lazy execution. If the return value is `(false, _)`, the `rs` in argument can still be used. If the
521+
// first return value is `true` and `err` is not nil, the `rs` cannot be used anymore and should return the error to the upper layer.
522+
func (cc *clientConn) executeWithLazyCursor(ctx context.Context, stmt PreparedStatement, rs resultset.ResultSet) (ok bool, err error) {
523+
drs, ok, err := rs.TryDetach()
524+
if !ok || err != nil {
525+
return false, err
526+
}
527+
528+
vars := (&cc.ctx).GetSessionVars()
529+
crs := resultset.WrapWithLazyCursor(drs, vars.InitChunkSize, vars.MaxChunkSize)
530+
err = cc.writeExecuteResultWithCursor(ctx, stmt, crs)
531+
return true, err
532+
}
533+
534+
// writeExecuteResultWithCursor will store the `ResultSet` in `stmt` and send the column info to the client. The logic is shared between
535+
// lazy cursor fetch and normal(eager) cursor fetch.
536+
func (cc *clientConn) writeExecuteResultWithCursor(ctx context.Context, stmt PreparedStatement, rs resultset.CursorResultSet) (err error) {
537+
stmt.StoreResultSet(rs)
538+
stmt.SetCursorActive(true)
539+
defer func() {
540+
if err != nil {
541+
// the resultSet and rowContainer have been closed in former "defer" statement.
542+
stmt.StoreResultSet(nil)
543+
stmt.SetCursorActive(false)
544+
}
545+
}()
546+
547+
if err = cc.writeColumnInfo(rs.Columns()); err != nil {
548+
return err
549+
}
550+
551+
// explicitly flush columnInfo to client.
552+
err = cc.writeEOF(ctx, cc.ctx.Status())
553+
if err != nil {
554+
return err
555+
}
556+
557+
return cc.flush(ctx)
558+
}
559+
560+
>>>>>>> dbf6a6e838b (stmt: clean `rowContainer` if the execution paniced (#59985))
427561
func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err error) {
428562
cc.ctx.GetSessionVars().StartTime = time.Now()
429563
cc.ctx.GetSessionVars().ClearAlloc(nil, false)

pkg/server/tests/cursor/BUILD.bazel

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,23 @@ go_test(
88
"main_test.go",
99
],
1010
flaky = True,
11+
<<<<<<< HEAD
1112
shard_count = 3,
13+
=======
14+
shard_count = 9,
15+
>>>>>>> dbf6a6e838b (stmt: clean `rowContainer` if the execution paniced (#59985))
1216
deps = [
1317
"//pkg/config",
18+
"//pkg/executor",
1419
"//pkg/metrics",
1520
"//pkg/parser/mysql",
1621
"//pkg/server",
22+
<<<<<<< HEAD
1723
"//pkg/session",
24+
=======
25+
"//pkg/server/internal/util",
26+
"//pkg/server/tests/servertestkit",
27+
>>>>>>> dbf6a6e838b (stmt: clean `rowContainer` if the execution paniced (#59985))
1828
"//pkg/store/mockstore/unistore",
1929
"//pkg/testkit",
2030
"//pkg/testkit/testsetup",

0 commit comments

Comments
 (0)