Skip to content

Commit 2b78bb4

Browse files
wshwsh12ti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#45549
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 42d2f9f commit 2b78bb4

File tree

3 files changed

+294
-1
lines changed

3 files changed

+294
-1
lines changed

executor/executor.go

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ type baseExecutor struct {
115115

116116
const (
117117
// globalPanicStorageExceed represents the panic message when out of storage quota.
118-
globalPanicStorageExceed string = "Out Of Global Storage Quota!"
118+
globalPanicStorageExceed string = "Out Of Quota For Local Temporary Space!"
119119
// globalPanicMemoryExceed represents the panic message when out of memory limit.
120120
globalPanicMemoryExceed string = "Out Of Global Memory Limit!"
121121
)
@@ -1705,6 +1705,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
17051705
if globalConfig.OOMUseTmpStorage && GlobalDiskUsageTracker != nil {
17061706
sc.DiskTracker.AttachToGlobalTracker(GlobalDiskUsageTracker)
17071707
}
1708+
<<<<<<< HEAD
17081709
switch globalConfig.OOMAction {
17091710
case config.OOMActionCancel:
17101711
action := &memory.PanicOnExceed{ConnID: ctx.GetSessionVars().ConnectionID}
@@ -1716,6 +1717,63 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
17161717
action := &memory.LogOnExceed{ConnID: ctx.GetSessionVars().ConnectionID}
17171718
action.SetLogHook(domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota)
17181719
sc.MemTracker.SetActionOnExceed(action)
1720+
=======
1721+
1722+
sc.StatsLoad.Timeout = 0
1723+
sc.StatsLoad.NeededItems = nil
1724+
sc.StatsLoad.ResultCh = nil
1725+
1726+
sc.SysdateIsNow = ctx.GetSessionVars().SysdateIsNow
1727+
1728+
vars.MemTracker.Detach()
1729+
vars.MemTracker.UnbindActions()
1730+
vars.MemTracker.SetBytesLimit(vars.MemQuotaQuery)
1731+
vars.MemTracker.ResetMaxConsumed()
1732+
vars.DiskTracker.Detach()
1733+
vars.DiskTracker.ResetMaxConsumed()
1734+
vars.MemTracker.SessionID.Store(vars.ConnectionID)
1735+
vars.StmtCtx.TableStats = make(map[int64]interface{})
1736+
1737+
isAnalyze := false
1738+
if execStmt, ok := s.(*ast.ExecuteStmt); ok {
1739+
prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars)
1740+
if err != nil {
1741+
return err
1742+
}
1743+
_, isAnalyze = prepareStmt.PreparedAst.Stmt.(*ast.AnalyzeTableStmt)
1744+
} else if _, ok := s.(*ast.AnalyzeTableStmt); ok {
1745+
isAnalyze = true
1746+
}
1747+
if isAnalyze {
1748+
sc.InitMemTracker(memory.LabelForAnalyzeMemory, -1)
1749+
vars.MemTracker.SetBytesLimit(-1)
1750+
vars.MemTracker.AttachTo(GlobalAnalyzeMemoryTracker)
1751+
} else {
1752+
sc.InitMemTracker(memory.LabelForSQLText, -1)
1753+
}
1754+
logOnQueryExceedMemQuota := domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota
1755+
switch variable.OOMAction.Load() {
1756+
case variable.OOMActionCancel:
1757+
action := &memory.PanicOnExceed{ConnID: vars.ConnectionID}
1758+
action.SetLogHook(logOnQueryExceedMemQuota)
1759+
vars.MemTracker.SetActionOnExceed(action)
1760+
case variable.OOMActionLog:
1761+
fallthrough
1762+
default:
1763+
action := &memory.LogOnExceed{ConnID: vars.ConnectionID}
1764+
action.SetLogHook(logOnQueryExceedMemQuota)
1765+
vars.MemTracker.SetActionOnExceed(action)
1766+
}
1767+
sc.MemTracker.SessionID.Store(vars.ConnectionID)
1768+
sc.MemTracker.AttachTo(vars.MemTracker)
1769+
sc.InitDiskTracker(memory.LabelForSQLText, -1)
1770+
globalConfig := config.GetGlobalConfig()
1771+
if variable.EnableTmpStorageOnOOM.Load() && sc.DiskTracker != nil {
1772+
sc.DiskTracker.AttachTo(vars.DiskTracker)
1773+
if GlobalDiskUsageTracker != nil {
1774+
vars.DiskTracker.AttachTo(GlobalDiskUsageTracker)
1775+
}
1776+
>>>>>>> 838b3674752 (executor, util: make tmp-storage-quota take affect (#45549))
17191777
}
17201778
if execStmt, ok := s.(*ast.ExecuteStmt); ok {
17211779
prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars)

util/chunk/row_container.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package chunk
1616

1717
import (
1818
"errors"
19+
"fmt"
1920
"sort"
2021
"sync"
2122
"time"
@@ -137,7 +138,23 @@ func (c *RowContainer) SpillToDisk() {
137138
N := c.m.records.inMemory.NumChunks()
138139
c.m.records.inDisk = NewListInDisk(c.m.records.inMemory.FieldTypes())
139140
c.m.records.inDisk.diskTracker.AttachTo(c.diskTracker)
141+
<<<<<<< HEAD
140142
for i := 0; i < N; i++ {
143+
=======
144+
defer func() {
145+
if r := recover(); r != nil {
146+
err := fmt.Errorf("%v", r)
147+
c.m.records.spillError = err
148+
logutil.BgLogger().Error("spill to disk failed", zap.Stack("stack"), zap.Error(err))
149+
}
150+
}()
151+
failpoint.Inject("spillToDiskOutOfDiskQuota", func(val failpoint.Value) {
152+
if val.(bool) {
153+
panic("out of disk quota when spilling")
154+
}
155+
})
156+
for i := 0; i < n; i++ {
157+
>>>>>>> 838b3674752 (executor, util: make tmp-storage-quota take affect (#45549))
141158
chk := c.m.records.inMemory.GetChunk(i)
142159
err = c.m.records.inDisk.Add(chk)
143160
if err != nil {

util/chunk/row_container_test.go

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,3 +303,221 @@ func TestActionBlocked(t *testing.T) {
303303
ac.Action(tracker)
304304
require.GreaterOrEqual(t, time.Since(starttime), 200*time.Millisecond)
305305
}
306+
<<<<<<< HEAD
307+
=======
308+
309+
func insertBytesRowsIntoRowContainer(t *testing.T, chkCount int, rowPerChk int) (*RowContainer, [][]byte) {
310+
longVarCharTyp := types.NewFieldTypeBuilder().SetType(mysql.TypeVarchar).SetFlen(4096).Build()
311+
fields := []*types.FieldType{&longVarCharTyp}
312+
313+
rc := NewRowContainer(fields, chkCount)
314+
315+
allRows := [][]byte{}
316+
// insert chunks
317+
for i := 0; i < chkCount; i++ {
318+
chk := NewChunkWithCapacity(fields, rowPerChk)
319+
// insert rows for each chunk
320+
for j := 0; j < rowPerChk; j++ {
321+
length := rand2.Uint32()
322+
randomBytes := make([]byte, length%4096)
323+
_, err := rand.Read(randomBytes)
324+
require.NoError(t, err)
325+
326+
chk.AppendBytes(0, randomBytes)
327+
allRows = append(allRows, randomBytes)
328+
}
329+
require.NoError(t, rc.Add(chk))
330+
}
331+
332+
return rc, allRows
333+
}
334+
335+
func TestRowContainerReaderInDisk(t *testing.T) {
336+
restore := config.RestoreFunc()
337+
defer restore()
338+
config.UpdateGlobal(func(conf *config.Config) {
339+
conf.TempStoragePath = t.TempDir()
340+
})
341+
342+
rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 16)
343+
rc.SpillToDisk()
344+
345+
reader := NewRowContainerReader(rc)
346+
defer reader.Close()
347+
for i := 0; i < 16; i++ {
348+
for j := 0; j < 16; j++ {
349+
row := reader.Current()
350+
require.Equal(t, allRows[i*16+j], row.GetBytes(0))
351+
reader.Next()
352+
}
353+
}
354+
}
355+
356+
func TestCloseRowContainerReader(t *testing.T) {
357+
restore := config.RestoreFunc()
358+
defer restore()
359+
config.UpdateGlobal(func(conf *config.Config) {
360+
conf.TempStoragePath = t.TempDir()
361+
})
362+
363+
rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 16)
364+
rc.SpillToDisk()
365+
366+
// read 8.5 of these chunks
367+
reader := NewRowContainerReader(rc)
368+
defer reader.Close()
369+
for i := 0; i < 8; i++ {
370+
for j := 0; j < 16; j++ {
371+
row := reader.Current()
372+
require.Equal(t, allRows[i*16+j], row.GetBytes(0))
373+
reader.Next()
374+
}
375+
}
376+
for j := 0; j < 8; j++ {
377+
row := reader.Current()
378+
require.Equal(t, allRows[8*16+j], row.GetBytes(0))
379+
reader.Next()
380+
}
381+
}
382+
383+
func TestConcurrentSpillWithRowContainerReader(t *testing.T) {
384+
restore := config.RestoreFunc()
385+
defer restore()
386+
config.UpdateGlobal(func(conf *config.Config) {
387+
conf.TempStoragePath = t.TempDir()
388+
})
389+
390+
rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 1024)
391+
392+
var wg sync.WaitGroup
393+
// concurrently read and spill to disk
394+
wg.Add(1)
395+
go func() {
396+
defer wg.Done()
397+
reader := NewRowContainerReader(rc)
398+
defer reader.Close()
399+
400+
for i := 0; i < 16; i++ {
401+
for j := 0; j < 1024; j++ {
402+
row := reader.Current()
403+
require.Equal(t, allRows[i*1024+j], row.GetBytes(0))
404+
reader.Next()
405+
}
406+
}
407+
}()
408+
rc.SpillToDisk()
409+
wg.Wait()
410+
}
411+
412+
func TestReadAfterSpillWithRowContainerReader(t *testing.T) {
413+
restore := config.RestoreFunc()
414+
defer restore()
415+
config.UpdateGlobal(func(conf *config.Config) {
416+
conf.TempStoragePath = t.TempDir()
417+
})
418+
419+
rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 1024)
420+
421+
reader := NewRowContainerReader(rc)
422+
defer reader.Close()
423+
for i := 0; i < 8; i++ {
424+
for j := 0; j < 1024; j++ {
425+
row := reader.Current()
426+
require.Equal(t, allRows[i*1024+j], row.GetBytes(0))
427+
reader.Next()
428+
}
429+
}
430+
rc.SpillToDisk()
431+
for i := 8; i < 16; i++ {
432+
for j := 0; j < 1024; j++ {
433+
row := reader.Current()
434+
require.Equal(t, allRows[i*1024+j], row.GetBytes(0))
435+
reader.Next()
436+
}
437+
}
438+
}
439+
440+
func TestPanicWhenSpillToDisk(t *testing.T) {
441+
fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}
442+
sz := 20
443+
chk := NewChunkWithCapacity(fields, sz)
444+
for i := 0; i < sz; i++ {
445+
chk.AppendInt64(0, int64(i))
446+
}
447+
448+
rc := NewRowContainer(fields, sz)
449+
tracker := rc.GetMemTracker()
450+
tracker.SetBytesLimit(chk.MemoryUsage() + 1)
451+
tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest())
452+
require.False(t, rc.AlreadySpilledSafeForTest())
453+
454+
require.NoError(t, rc.Add(chk))
455+
rc.actionSpill.WaitForTest()
456+
require.False(t, rc.AlreadySpilledSafeForTest())
457+
458+
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota", "return(true)"))
459+
defer func() {
460+
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota"))
461+
}()
462+
require.NoError(t, rc.Add(chk))
463+
rc.actionSpill.WaitForTest()
464+
require.True(t, rc.AlreadySpilledSafeForTest())
465+
466+
_, err := rc.GetRow(RowPtr{})
467+
require.EqualError(t, err, "out of disk quota when spilling")
468+
require.EqualError(t, rc.Add(chk), "out of disk quota when spilling")
469+
}
470+
471+
func BenchmarkRowContainerReaderInDiskWithRowSize512(b *testing.B) {
472+
benchmarkRowContainerReaderInDiskWithRowLength(b, 512)
473+
}
474+
475+
func BenchmarkRowContainerReaderInDiskWithRowSize1024(b *testing.B) {
476+
benchmarkRowContainerReaderInDiskWithRowLength(b, 1024)
477+
}
478+
479+
func BenchmarkRowContainerReaderInDiskWithRowSize4096(b *testing.B) {
480+
benchmarkRowContainerReaderInDiskWithRowLength(b, 4096)
481+
}
482+
483+
func benchmarkRowContainerReaderInDiskWithRowLength(b *testing.B, rowLength int) {
484+
b.StopTimer()
485+
486+
restore := config.RestoreFunc()
487+
defer restore()
488+
config.UpdateGlobal(func(conf *config.Config) {
489+
conf.TempStoragePath = b.TempDir()
490+
})
491+
492+
longVarCharTyp := types.NewFieldTypeBuilder().SetType(mysql.TypeVarchar).SetFlen(rowLength).Build()
493+
fields := []*types.FieldType{&longVarCharTyp}
494+
495+
randomBytes := make([]byte, rowLength)
496+
_, err := rand.Read(randomBytes)
497+
require.NoError(b, err)
498+
499+
// create a row container which stores the data in disk
500+
rc := NewRowContainer(fields, 1<<10)
501+
rc.SpillToDisk()
502+
503+
// insert `b.N * 1<<10` rows (`b.N` chunks) into the rc
504+
for i := 0; i < b.N; i++ {
505+
chk := NewChunkWithCapacity(fields, 1<<10)
506+
for j := 0; j < 1<<10; j++ {
507+
chk.AppendBytes(0, randomBytes)
508+
}
509+
510+
rc.Add(chk)
511+
}
512+
513+
reader := NewRowContainerReader(rc)
514+
defer reader.Close()
515+
b.StartTimer()
516+
for n := 0; n < b.N; n++ {
517+
for i := 0; i < 1<<10; i++ {
518+
reader.Next()
519+
}
520+
}
521+
require.NoError(b, reader.Error())
522+
}
523+
>>>>>>> 838b3674752 (executor, util: make tmp-storage-quota take affect (#45549))

0 commit comments

Comments
 (0)