Skip to content

Commit a255101

Browse files
authored
statistics: fix wrong singleflight implementation for stats' syncload (#52301) (#52374)
close #52294
1 parent 2d6302d commit a255101

File tree

5 files changed

+43
-65
lines changed

5 files changed

+43
-65
lines changed

pkg/parser/model/model.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1766,6 +1766,11 @@ type TableItemID struct {
17661766
IsIndex bool
17671767
}
17681768

1769+
// Key is used to generate unique key for TableItemID to use in the syncload
1770+
func (t TableItemID) Key() string {
1771+
return fmt.Sprintf("%d#%d#%t", t.ID, t.TableID, t.IsIndex)
1772+
}
1773+
17691774
// PolicyRefInfo is the struct to refer the placement policy.
17701775
type PolicyRefInfo struct {
17711776
ID int64 `json:"id"`

pkg/statistics/handle/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ go_library(
4040
"@com_github_pingcap_errors//:errors",
4141
"@com_github_pingcap_failpoint//:failpoint",
4242
"@com_github_tiancaiamao_gp//:gp",
43+
"@org_golang_x_sync//singleflight",
4344
"@org_uber_go_atomic//:atomic",
4445
"@org_uber_go_zap//:zap",
4546
],

pkg/statistics/handle/handle.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
ddlUtil "github.com/pingcap/tidb/pkg/ddl/util"
2323
"github.com/pingcap/tidb/pkg/parser/model"
2424
"github.com/pingcap/tidb/pkg/sessionctx"
25-
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
2625
"github.com/pingcap/tidb/pkg/statistics"
2726
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze"
2827
"github.com/pingcap/tidb/pkg/statistics/handle/cache"
@@ -139,7 +138,6 @@ func NewHandle(_, initStatsCtx sessionctx.Context, lease time.Duration, pool uti
139138
handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency)
140139
handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
141140
handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
142-
handle.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{}
143141
return handle, nil
144142
}
145143

pkg/statistics/handle/handle_hist.go

Lines changed: 34 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/pingcap/tidb/pkg/util"
3636
"github.com/pingcap/tidb/pkg/util/logutil"
3737
"go.uber.org/zap"
38+
"golang.org/x/sync/singleflight"
3839
)
3940

4041
type statsWrapper struct {
@@ -46,7 +47,7 @@ type statsWrapper struct {
4647
type StatsLoad struct {
4748
NeededItemsCh chan *NeededItemTask
4849
TimeoutItemsCh chan *NeededItemTask
49-
WorkingColMap map[model.TableItemID][]chan stmtctx.StatsLoadResult
50+
Singleflight singleflight.Group
5051
SubCtxs []sessionctx.Context
5152
sync.Mutex
5253
}
@@ -221,46 +222,60 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask
221222
} else {
222223
task = lastTask
223224
}
224-
return h.handleOneItemTask(sctx, task)
225+
resultChan := h.StatsLoad.Singleflight.DoChan(task.TableItemID.Key(), func() (any, error) {
226+
return h.handleOneItemTask(sctx, task)
227+
})
228+
timeout := time.Until(task.ToTimeout)
229+
select {
230+
case result := <-resultChan:
231+
if result.Err == nil {
232+
slr := result.Val.(*stmtctx.StatsLoadResult)
233+
if slr.Error != nil {
234+
return task, slr.Error
235+
}
236+
task.ResultCh <- *slr
237+
return nil, nil
238+
}
239+
return task, result.Err
240+
case <-time.After(timeout):
241+
return task, nil
242+
}
225243
}
226244

227-
func (h *Handle) handleOneItemTask(sctx sessionctx.Context, task *NeededItemTask) (*NeededItemTask, error) {
228-
result := stmtctx.StatsLoadResult{Item: task.TableItemID}
245+
func (h *Handle) handleOneItemTask(sctx sessionctx.Context, task *NeededItemTask) (result *stmtctx.StatsLoadResult, err error) {
246+
defer func() {
247+
// recover for each task, worker keeps working
248+
if r := recover(); r != nil {
249+
logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack"))
250+
err = errors.Errorf("stats loading panicked: %v", r)
251+
}
252+
}()
253+
result = &stmtctx.StatsLoadResult{Item: task.TableItemID}
229254
item := result.Item
230255
tbl, ok := h.Get(item.TableID)
231256
if !ok {
232-
h.writeToResultChan(task.ResultCh, result)
233-
return nil, nil
257+
return result, nil
234258
}
235-
var err error
236259
wrapper := &statsWrapper{}
237260
if item.IsIndex {
238261
index, ok := tbl.Indices[item.ID]
239262
if !ok || index.IsFullLoad() {
240-
h.writeToResultChan(task.ResultCh, result)
241-
return nil, nil
263+
return result, nil
242264
}
243265
wrapper.idx = index
244266
} else {
245267
col, ok := tbl.Columns[item.ID]
246268
if !ok || col.IsFullLoad() {
247-
h.writeToResultChan(task.ResultCh, result)
248-
return nil, nil
269+
return result, nil
249270
}
250271
wrapper.col = col
251272
}
252-
// to avoid duplicated handling in concurrent scenario
253-
working := h.setWorking(result.Item, task.ResultCh)
254-
if !working {
255-
h.writeToResultChan(task.ResultCh, result)
256-
return nil, nil
257-
}
258273
t := time.Now()
259274
needUpdate := false
260275
wrapper, err = h.readStatsForOneItem(sctx, item, wrapper)
261276
if err != nil {
262277
result.Error = err
263-
return task, err
278+
return result, err
264279
}
265280
if item.IsIndex {
266281
if wrapper.idx != nil {
@@ -273,9 +288,8 @@ func (h *Handle) handleOneItemTask(sctx sessionctx.Context, task *NeededItemTask
273288
}
274289
metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds()))
275290
if needUpdate && h.updateCachedItem(item, wrapper.col, wrapper.idx) {
276-
h.writeToResultChan(task.ResultCh, result)
291+
return result, nil
277292
}
278-
h.finishWorking(result)
279293
return nil, nil
280294
}
281295

@@ -466,32 +480,3 @@ func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Co
466480
h.UpdateStatsCache([]*statistics.Table{tbl}, nil)
467481
return true
468482
}
469-
470-
func (h *Handle) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool {
471-
h.StatsLoad.Lock()
472-
defer h.StatsLoad.Unlock()
473-
chList, ok := h.StatsLoad.WorkingColMap[item]
474-
if ok {
475-
if chList[0] == resultCh {
476-
return true // just return for duplicate setWorking
477-
}
478-
h.StatsLoad.WorkingColMap[item] = append(chList, resultCh)
479-
return false
480-
}
481-
chList = []chan stmtctx.StatsLoadResult{}
482-
chList = append(chList, resultCh)
483-
h.StatsLoad.WorkingColMap[item] = chList
484-
return true
485-
}
486-
487-
func (h *Handle) finishWorking(result stmtctx.StatsLoadResult) {
488-
h.StatsLoad.Lock()
489-
defer h.StatsLoad.Unlock()
490-
if chList, ok := h.StatsLoad.WorkingColMap[result.Item]; ok {
491-
list := chList[1:]
492-
for _, ch := range list {
493-
h.writeToResultChan(ch, result)
494-
}
495-
}
496-
delete(h.StatsLoad.WorkingColMap, result.Item)
497-
}

pkg/statistics/handle/handle_hist_test.go

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -206,26 +206,15 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
206206
task1, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh)
207207
require.Error(t, err1)
208208
require.NotNil(t, task1)
209-
list, ok := h.StatsLoad.WorkingColMap[neededColumns[0]]
210-
require.True(t, ok)
211-
require.Len(t, list, 1)
212-
require.Equal(t, stmtCtx1.StatsLoad.ResultCh, list[0])
213-
214-
task2, err2 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh)
215-
require.Nil(t, err2)
216-
require.Nil(t, task2)
217-
list, ok = h.StatsLoad.WorkingColMap[neededColumns[0]]
218-
require.True(t, ok)
219-
require.Len(t, list, 2)
220-
require.Equal(t, stmtCtx2.StatsLoad.ResultCh, list[1])
221209

222210
require.NoError(t, failpoint.Disable(fp.failPath))
223211
task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
224212
require.NoError(t, err3)
225213
require.Nil(t, task3)
226214

227-
require.Len(t, stmtCtx1.StatsLoad.ResultCh, 1)
228-
require.Len(t, stmtCtx2.StatsLoad.ResultCh, 1)
215+
task, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh)
216+
require.NoError(t, err3)
217+
require.Nil(t, task)
229218

230219
rs1, ok1 := <-stmtCtx1.StatsLoad.ResultCh
231220
require.True(t, ok1)

0 commit comments

Comments
 (0)