Skip to content

Commit 16e0c28

Browse files
authored
executor, util: make tmp-storage-quota take affect (#45549) (#45732)
close #45161
1 parent 25e2528 commit 16e0c28

File tree

3 files changed

+49
-1
lines changed

3 files changed

+49
-1
lines changed

executor/executor.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ type baseExecutor struct {
139139

140140
const (
141141
// globalPanicStorageExceed represents the panic message when out of storage quota.
142-
globalPanicStorageExceed string = "Out Of Global Storage Quota!"
142+
globalPanicStorageExceed string = "Out Of Quota For Local Temporary Space!"
143143
// globalPanicMemoryExceed represents the panic message when out of memory limit.
144144
globalPanicMemoryExceed string = "Out Of Global Memory Limit!"
145145
// globalPanicAnalyzeMemoryExceed represents the panic message when out of analyze memory limit.
@@ -2076,6 +2076,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
20762076
vars.MemTracker.UnbindActions()
20772077
vars.MemTracker.SetBytesLimit(vars.MemQuotaQuery)
20782078
vars.MemTracker.ResetMaxConsumed()
2079+
vars.DiskTracker.Detach()
20792080
vars.DiskTracker.ResetMaxConsumed()
20802081
vars.MemTracker.SessionID.Store(vars.ConnectionID)
20812082
vars.StmtCtx.TableStats = make(map[int64]interface{})
@@ -2116,6 +2117,9 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
21162117
globalConfig := config.GetGlobalConfig()
21172118
if variable.EnableTmpStorageOnOOM.Load() && sc.DiskTracker != nil {
21182119
sc.DiskTracker.AttachTo(vars.DiskTracker)
2120+
if GlobalDiskUsageTracker != nil {
2121+
vars.DiskTracker.AttachTo(GlobalDiskUsageTracker)
2122+
}
21192123
}
21202124
if execStmt, ok := s.(*ast.ExecuteStmt); ok {
21212125
prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars)

util/chunk/row_container.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package chunk
1616

1717
import (
1818
"errors"
19+
"fmt"
1920
"sort"
2021
"sync"
2122
"time"
@@ -140,6 +141,18 @@ func (c *RowContainer) SpillToDisk() {
140141
n := c.m.records.inMemory.NumChunks()
141142
c.m.records.inDisk = NewListInDisk(c.m.records.inMemory.FieldTypes())
142143
c.m.records.inDisk.diskTracker.AttachTo(c.diskTracker)
144+
defer func() {
145+
if r := recover(); r != nil {
146+
err := fmt.Errorf("%v", r)
147+
c.m.records.spillError = err
148+
logutil.BgLogger().Error("spill to disk failed", zap.Stack("stack"), zap.Error(err))
149+
}
150+
}()
151+
failpoint.Inject("spillToDiskOutOfDiskQuota", func(val failpoint.Value) {
152+
if val.(bool) {
153+
panic("out of disk quota when spilling")
154+
}
155+
})
143156
for i := 0; i < n; i++ {
144157
chk := c.m.records.inMemory.GetChunk(i)
145158
err = c.m.records.inDisk.Add(chk)

util/chunk/row_container_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,37 @@ func TestReadAfterSpillWithRowContainerReader(t *testing.T) {
439439
}
440440
}
441441

442+
func TestPanicWhenSpillToDisk(t *testing.T) {
443+
fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}
444+
sz := 20
445+
chk := NewChunkWithCapacity(fields, sz)
446+
for i := 0; i < sz; i++ {
447+
chk.AppendInt64(0, int64(i))
448+
}
449+
450+
rc := NewRowContainer(fields, sz)
451+
tracker := rc.GetMemTracker()
452+
tracker.SetBytesLimit(chk.MemoryUsage() + 1)
453+
tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest())
454+
require.False(t, rc.AlreadySpilledSafeForTest())
455+
456+
require.NoError(t, rc.Add(chk))
457+
rc.actionSpill.WaitForTest()
458+
require.False(t, rc.AlreadySpilledSafeForTest())
459+
460+
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota", "return(true)"))
461+
defer func() {
462+
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota"))
463+
}()
464+
require.NoError(t, rc.Add(chk))
465+
rc.actionSpill.WaitForTest()
466+
require.True(t, rc.AlreadySpilledSafeForTest())
467+
468+
_, err := rc.GetRow(RowPtr{})
469+
require.EqualError(t, err, "out of disk quota when spilling")
470+
require.EqualError(t, rc.Add(chk), "out of disk quota when spilling")
471+
}
472+
442473
func BenchmarkRowContainerReaderInDiskWithRowSize512(b *testing.B) {
443474
benchmarkRowContainerReaderInDiskWithRowLength(b, 512)
444475
}

0 commit comments

Comments
 (0)