Skip to content

Commit 9f5f53a

Browse files
authored
statistics: add Destroy method and handle session recycling (#59546)
close #59524, close #59560
1 parent 5a166e1 commit 9f5f53a

File tree

21 files changed

+152
-31
lines changed

21 files changed

+152
-31
lines changed

pkg/bindinfo/global_handle.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,7 @@ type GlobalBindingHandle interface {
8888

8989
// globalBindingHandle is used to handle all global sql bind operations.
9090
type globalBindingHandle struct {
91-
sPool util.SessionPool
92-
91+
sPool util.DestroyableSessionPool
9392
bindingCache BindingCache
9493

9594
// lastTaskTime records the last update time for the global sql bind cache.
@@ -121,7 +120,7 @@ const (
121120
)
122121

123122
// NewGlobalBindingHandle creates a new GlobalBindingHandle.
124-
func NewGlobalBindingHandle(sPool util.SessionPool) GlobalBindingHandle {
123+
func NewGlobalBindingHandle(sPool util.DestroyableSessionPool) GlobalBindingHandle {
125124
h := &globalBindingHandle{sPool: sPool}
126125
h.lastUpdateTime.Store(types.ZeroTimestamp)
127126
h.bindingCache = newBindCache()
@@ -484,6 +483,9 @@ func (h *globalBindingHandle) callWithSCtx(wrapTxn bool, f func(sctx sessionctx.
484483
defer func() {
485484
if err == nil { // only recycle when no error
486485
h.sPool.Put(resource)
486+
} else {
487+
// Note: Otherwise, the session will be leaked.
488+
h.sPool.Destroy(resource)
487489
}
488490
}()
489491
sctx := resource.(sessionctx.Context)

pkg/bindinfo/global_handle_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,8 @@ func (p *mockSessionPool) Get() (pools.Resource, error) {
607607

608608
func (p *mockSessionPool) Put(pools.Resource) {}
609609

610+
func (p *mockSessionPool) Destroy(pools.Resource) {}
611+
610612
func (p *mockSessionPool) Close() {}
611613

612614
func TestShowBindingDigestField(t *testing.T) {

pkg/bindinfo/tests/cross_db_binding_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,4 +342,6 @@ func (p *mockSessionPool) Get() (pools.Resource, error) {
342342

343343
func (p *mockSessionPool) Put(pools.Resource) {}
344344

345+
func (p *mockSessionPool) Destroy(pools.Resource) {}
346+
345347
func (p *mockSessionPool) Close() {}

pkg/ddl/notifier/testkit_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ func TestBasicPubSub(t *testing.T) {
7979
},
8080
nil,
8181
nil,
82+
nil,
8283
)
8384

8485
n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond)
@@ -150,6 +151,7 @@ func TestDeliverOrderAndCleanup(t *testing.T) {
150151
},
151152
nil,
152153
nil,
154+
nil,
153155
)
154156
n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond)
155157

@@ -324,6 +326,7 @@ func Test2OwnerForAShortTime(t *testing.T) {
324326
},
325327
nil,
326328
nil,
329+
nil,
327330
)
328331

329332
n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond)
@@ -458,6 +461,7 @@ func TestBeginTwice(t *testing.T) {
458461
},
459462
nil,
460463
nil,
464+
nil,
461465
)
462466

463467
n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond)
@@ -508,6 +512,7 @@ func TestHandlersSeePessimisticTxnError(t *testing.T) {
508512
},
509513
nil,
510514
nil,
515+
nil,
511516
)
512517
n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond)
513518
// Always fails
@@ -561,6 +566,7 @@ func TestCommitFailed(t *testing.T) {
561566
},
562567
nil,
563568
nil,
569+
nil,
564570
)
565571
n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond)
566572
handler := func(_ context.Context, sctx sessionctx.Context, _ *notifier.SchemaChangeEvent) error {

pkg/domain/domain.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,10 @@ type Domain struct {
158158
m syncutil.Mutex
159159
SchemaValidator SchemaValidator
160160
schemaLease time.Duration
161-
sysSessionPool util.SessionPool
162-
exit chan struct{}
161+
// Note: If you no longer need the session, you must call Destroy to release it.
162+
// Otherwise, the session will be leaked. Because there is a strong reference from the domain to the session.
163+
sysSessionPool util.DestroyableSessionPool
164+
exit chan struct{}
163165
// `etcdClient` must be used when keyspace is not set, or when the logic to each etcd path needs to be separated by keyspace.
164166
etcdClient *clientv3.Client
165167
// autoidClient is used when there are tables with AUTO_ID_CACHE=1, it is the client to the autoid service.
@@ -1345,6 +1347,10 @@ func NewDomainWithEtcdClient(store kv.Storage, schemaLease time.Duration, statsL
13451347
})
13461348
infosync.DeleteInternalSession(r)
13471349
},
1350+
func(r pools.Resource) {
1351+
intest.Assert(r != nil)
1352+
infosync.DeleteInternalSession(r)
1353+
},
13481354
),
13491355
statsLease: statsLease,
13501356
schemaLease: schemaLease,
@@ -1814,7 +1820,7 @@ func (do *Domain) distTaskFrameworkLoop(ctx context.Context, taskManager *storag
18141820
}
18151821

18161822
// SysSessionPool returns the system session pool.
1817-
func (do *Domain) SysSessionPool() util.SessionPool {
1823+
func (do *Domain) SysSessionPool() util.DestroyableSessionPool {
18181824
return do.sysSessionPool
18191825
}
18201826

@@ -2550,7 +2556,7 @@ func (do *Domain) initStats(ctx context.Context) {
25502556
}
25512557
initstats.InitStatsPercentage.Store(100)
25522558
if err != nil {
2553-
logutil.BgLogger().Error("init stats info failed", zap.Bool("lite", liteInitStats), zap.Duration("take time", time.Since(t)), zap.Error(err))
2559+
logutil.ErrVerboseLogger().Error("init stats info failed", zap.Bool("lite", liteInitStats), zap.Duration("take time", time.Since(t)), zap.Error(err))
25542560
} else {
25552561
logutil.BgLogger().Info("init stats info time", zap.Bool("lite", liteInitStats), zap.Duration("take time", time.Since(t)))
25562562
}
@@ -2612,7 +2618,7 @@ func (do *Domain) asyncLoadHistogram() {
26122618
case <-cleanupTicker.C:
26132619
err = statsHandle.LoadNeededHistograms(do.InfoSchema())
26142620
if err != nil {
2615-
logutil.BgLogger().Warn("load histograms failed", zap.Error(err))
2621+
logutil.ErrVerboseLogger().Warn("load histograms failed", zap.Error(err))
26162622
}
26172623
case <-do.exit:
26182624
return

pkg/domain/infosync/info.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1339,6 +1339,21 @@ func DeleteInternalSession(se any) {
13391339
sm.DeleteInternalSession(se)
13401340
}
13411341

1342+
// ContainsInternalSessionForTest is the entry function for check whether an internal session is in SessionManager.
1343+
// It is only used for test.
1344+
func ContainsInternalSessionForTest(se any) bool {
1345+
is, err := getGlobalInfoSyncer()
1346+
if err != nil {
1347+
return false
1348+
}
1349+
sm := is.GetSessionManager()
1350+
if sm == nil {
1351+
return false
1352+
}
1353+
1354+
return sm.ContainsInternalSession(se)
1355+
}
1356+
13421357
// SetEtcdClient is only used for test.
13431358
func SetEtcdClient(etcdCli *clientv3.Client) {
13441359
is, err := getGlobalInfoSyncer()

pkg/planner/core/rule_collect_plan_stats.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,11 +259,11 @@ func RequestLoadStats(ctx base.PlanContext, neededHistItems []model.StatsLoadIte
259259
if err != nil {
260260
stmtCtx.IsSyncStatsFailed = true
261261
if vardef.StatsLoadPseudoTimeout.Load() {
262-
logutil.BgLogger().Warn("RequestLoadStats failed", zap.Error(err))
262+
logutil.ErrVerboseLogger().Warn("RequestLoadStats failed", zap.Error(err))
263263
stmtCtx.AppendWarning(err)
264264
return nil
265265
}
266-
logutil.BgLogger().Warn("RequestLoadStats failed", zap.Error(err))
266+
logutil.ErrVerboseLogger().Warn("RequestLoadStats failed", zap.Error(err))
267267
return err
268268
}
269269
return nil

pkg/server/server.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,6 +1064,14 @@ func (s *Server) StoreInternalSession(se any) {
10641064
s.sessionMapMutex.Unlock()
10651065
}
10661066

1067+
// ContainsInternalSession implements SessionManager interface.
1068+
func (s *Server) ContainsInternalSession(se any) bool {
1069+
s.sessionMapMutex.Lock()
1070+
defer s.sessionMapMutex.Unlock()
1071+
_, ok := s.internalSessions[se]
1072+
return ok
1073+
}
1074+
10671075
// DeleteInternalSession implements SessionManager interface.
10681076
// @param addr The address of a session.session struct variable
10691077
func (s *Server) DeleteInternalSession(se any) {

pkg/statistics/handle/bootstrap.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,9 @@ func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache sta
342342
defer func() {
343343
if err == nil { // only recycle when no error
344344
h.Pool.SPool().Put(se)
345+
} else {
346+
// Note: Otherwise, the session will be leaked.
347+
h.Pool.SPool().Destroy(se)
345348
}
346349
}()
347350

@@ -468,6 +471,9 @@ func (h *Handle) initStatsTopNByPaging(cache statstypes.StatsCache, task initsta
468471
defer func() {
469472
if err == nil { // only recycle when no error
470473
h.Pool.SPool().Put(se)
474+
} else {
475+
// Note: Otherwise, the session will be leaked.
476+
h.Pool.SPool().Destroy(se)
471477
}
472478
}()
473479
sctx := se.(sessionctx.Context)
@@ -667,6 +673,9 @@ func (h *Handle) initStatsBucketsByPaging(cache statstypes.StatsCache, task init
667673
defer func() {
668674
if err == nil { // only recycle when no error
669675
h.Pool.SPool().Put(se)
676+
} else {
677+
// Note: Otherwise, the session will be leaked.
678+
h.Pool.SPool().Destroy(se)
670679
}
671680
}()
672681
sctx := se.(sessionctx.Context)

pkg/statistics/handle/handle.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func NewHandle(
115115
initStatsCtx sessionctx.Context,
116116
lease time.Duration,
117117
is infoschema.InfoSchema,
118-
pool pkgutil.SessionPool,
118+
pool pkgutil.DestroyableSessionPool,
119119
tracker sysproctrack.Tracker,
120120
ddlNotifier *notifier.DDLNotifier,
121121
autoAnalyzeProcIDGetter func() uint64,

0 commit comments

Comments
 (0)