Skip to content

Commit 3ebfad0

Browse files
authored
ddl: add the ddl notifier as a listener to the stats owner (#56795)
ref #55722
1 parent ed9a909 commit 3ebfad0

File tree

5 files changed

+109
-51
lines changed

5 files changed

+109
-51
lines changed

pkg/ddl/notifier/BUILD.bazel

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ go_library(
1414
"//pkg/ddl/session",
1515
"//pkg/kv",
1616
"//pkg/meta/model",
17-
"//pkg/session/types",
17+
"//pkg/owner",
1818
"//pkg/sessionctx",
19+
"//pkg/util",
1920
"//pkg/util/intest",
2021
"//pkg/util/logutil",
2122
"@com_github_pingcap_errors//:errors",
@@ -41,6 +42,8 @@ go_test(
4142
"//pkg/sessionctx",
4243
"//pkg/testkit",
4344
"//pkg/testkit/testfailpoint",
45+
"//pkg/util",
46+
"@com_github_ngaut_pools//:pools",
4447
"@com_github_stretchr_testify//require",
4548
],
4649
)

pkg/ddl/notifier/subscribe.go

Lines changed: 75 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ import (
2323
"github.com/pingcap/errors"
2424
sess "github.com/pingcap/tidb/pkg/ddl/session"
2525
"github.com/pingcap/tidb/pkg/kv"
26-
"github.com/pingcap/tidb/pkg/session/types"
26+
"github.com/pingcap/tidb/pkg/owner"
2727
"github.com/pingcap/tidb/pkg/sessionctx"
28+
"github.com/pingcap/tidb/pkg/util"
2829
"github.com/pingcap/tidb/pkg/util/intest"
2930
"github.com/pingcap/tidb/pkg/util/logutil"
3031
"go.uber.org/zap"
@@ -79,9 +80,23 @@ func (id HandlerID) String() string {
7980
}
8081
}
8182

83+
// Ensure DDLNotifier implements the owner.Listener interface.
84+
// The DDLNotifier is started only when the stats owner is elected to ensure consistency.
85+
// This design is crucial because:
86+
// 1. The stats handler(priority queue) processes DDLNotifier events in memory.
87+
// 2. Keeping the stats handler and DDLNotifier on the same node maintains data integrity.
88+
// 3. It prevents potential race conditions or inconsistencies that could arise from
89+
// distributed processing of these events across multiple nodes.
90+
var _ owner.Listener = (*DDLNotifier)(nil)
91+
8292
// DDLNotifier implements the subscription on DDL events.
8393
type DDLNotifier struct {
84-
ownedSess types.Session
94+
// The context is initialized in Start and canceled in Stop and Close.
95+
ctx context.Context
96+
cancel context.CancelFunc
97+
wg util.WaitGroupWrapper
98+
sysSessionPool util.SessionPool
99+
85100
store Store
86101
handlers map[HandlerID]SchemaChangeHandler
87102
pollInterval time.Duration
@@ -90,19 +105,17 @@ type DDLNotifier struct {
90105
handlersBitMap uint64
91106
}
92107

93-
// NewDDLNotifier initializes the global DDLNotifier. It should be called only
94-
// once and before any RegisterHandler call. The ownership of the sctx is passed
95-
// to the DDLNotifier.
108+
// NewDDLNotifier initializes the global DDLNotifier.
96109
func NewDDLNotifier(
97-
sess types.Session,
110+
sysSessionPool util.SessionPool,
98111
store Store,
99112
pollInterval time.Duration,
100113
) *DDLNotifier {
101114
return &DDLNotifier{
102-
ownedSess: sess,
103-
store: store,
104-
handlers: make(map[HandlerID]SchemaChangeHandler),
105-
pollInterval: pollInterval,
115+
sysSessionPool: sysSessionPool,
116+
store: store,
117+
handlers: make(map[HandlerID]SchemaChangeHandler),
118+
pollInterval: pollInterval,
106119
}
107120
}
108121

@@ -125,13 +138,14 @@ func (n *DDLNotifier) RegisterHandler(id HandlerID, handler SchemaChangeHandler)
125138
n.handlers[id] = handler
126139
}
127140

128-
// Start starts the DDLNotifier. It will block until the context is canceled.
129-
func (n *DDLNotifier) Start(ctx context.Context) {
141+
// start starts the DDLNotifier. It will block until the context is canceled.
142+
// Do not call this function directly. Use owner.Listener interface instead.
143+
func (n *DDLNotifier) start() {
130144
for id := range n.handlers {
131145
n.handlersBitMap |= 1 << id
132146
}
133147

134-
ctx = kv.WithInternalSourceType(ctx, kv.InternalDDLNotifier)
148+
ctx := kv.WithInternalSourceType(n.ctx, kv.InternalDDLNotifier)
135149
ctx = logutil.WithCategory(ctx, "ddl-notifier")
136150
ticker := time.NewTicker(n.pollInterval)
137151
defer ticker.Stop()
@@ -148,7 +162,14 @@ func (n *DDLNotifier) Start(ctx context.Context) {
148162
}
149163

150164
func (n *DDLNotifier) processEvents(ctx context.Context) error {
151-
changes, err := n.store.List(ctx, sess.NewSession(n.ownedSess))
165+
sysSession, err := n.sysSessionPool.Get()
166+
if err != nil {
167+
return errors.Trace(err)
168+
}
169+
defer n.sysSessionPool.Put(sysSession)
170+
171+
session := sess.NewSession(sysSession.(sessionctx.Context))
172+
changes, err := n.store.List(ctx, session)
152173
if err != nil {
153174
return errors.Trace(err)
154175
}
@@ -161,7 +182,7 @@ func (n *DDLNotifier) processEvents(ctx context.Context) error {
161182
if _, ok := skipHandlers[handlerID]; ok {
162183
continue
163184
}
164-
if err2 := n.processEventForHandler(ctx, change, handlerID, handler); err2 != nil {
185+
if err2 := n.processEventForHandler(ctx, session, change, handlerID, handler); err2 != nil {
165186
skipHandlers[handlerID] = struct{}{}
166187

167188
if !goerr.Is(err2, ErrNotReadyRetryLater) {
@@ -187,7 +208,7 @@ func (n *DDLNotifier) processEvents(ctx context.Context) error {
187208
if change.processedByFlag == n.handlersBitMap {
188209
if err2 := n.store.DeleteAndCommit(
189210
ctx,
190-
sess.NewSession(n.ownedSess),
211+
session,
191212
change.ddlJobID,
192213
int(change.multiSchemaChangeSeq),
193214
); err2 != nil {
@@ -206,6 +227,7 @@ const slowHandlerLogThreshold = time.Second * 5
206227

207228
func (n *DDLNotifier) processEventForHandler(
208229
ctx context.Context,
230+
session *sess.Session,
209231
change *schemaChange,
210232
handlerID HandlerID,
211233
handler SchemaChangeHandler,
@@ -214,21 +236,19 @@ func (n *DDLNotifier) processEventForHandler(
214236
return nil
215237
}
216238

217-
se := sess.NewSession(n.ownedSess)
218-
219-
if err = se.Begin(ctx); err != nil {
239+
if err = session.Begin(ctx); err != nil {
220240
return errors.Trace(err)
221241
}
222242
defer func() {
223243
if err == nil {
224-
err = errors.Trace(se.Commit(ctx))
244+
err = errors.Trace(session.Commit(ctx))
225245
} else {
226-
se.Rollback()
246+
session.Rollback()
227247
}
228248
}()
229249

230250
now := time.Now()
231-
if err = handler(ctx, n.ownedSess, change.event); err != nil {
251+
if err = handler(ctx, session.Context, change.event); err != nil {
232252
return errors.Trace(err)
233253
}
234254
if time.Since(now) > slowHandlerLogThreshold {
@@ -243,7 +263,7 @@ func (n *DDLNotifier) processEventForHandler(
243263
newFlag := change.processedByFlag | (1 << handlerID)
244264
if err = n.store.UpdateProcessed(
245265
ctx,
246-
se,
266+
session,
247267
change.ddlJobID,
248268
change.multiSchemaChangeSeq,
249269
newFlag,
@@ -255,7 +275,36 @@ func (n *DDLNotifier) processEventForHandler(
255275
return nil
256276
}
257277

258-
// Close releases the resources.
259-
func (n *DDLNotifier) Close() {
260-
n.ownedSess.Close()
278+
// Stop stops the background loop.
279+
// Exposed for testing.
280+
// Do not call this function directly. Use owner.Listener interface instead.
281+
func (n *DDLNotifier) Stop() {
282+
// If the notifier is not started, the cancel function is nil.
283+
if n.cancel == nil {
284+
return
285+
}
286+
n.cancel()
287+
n.wg.Wait()
288+
}
289+
290+
// OnBecomeOwner implements the owner.Listener interface.
291+
// We need to make sure only one DDLNotifier is running at any time.
292+
func (n *DDLNotifier) OnBecomeOwner() {
293+
n.ctx, n.cancel = context.WithCancel(context.Background())
294+
n.wg.RunWithRecover(n.start, func(r any) {
295+
if r == nil {
296+
return
297+
}
298+
// In unit tests, we want to panic directly to find the root cause.
299+
if intest.InTest {
300+
panic(r)
301+
}
302+
logutil.BgLogger().Error("panic in ddl notifier", zap.Any("recover", r), zap.Stack("stack"))
303+
})
304+
}
305+
306+
// OnRetireOwner implements the owner.Listener interface.
307+
// After the owner is retired, we need to stop the DDLNotifier.
308+
func (n *DDLNotifier) OnRetireOwner() {
309+
n.Stop()
261310
}

pkg/ddl/notifier/testkit_test.go

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"testing"
2323
"time"
2424

25+
"github.com/ngaut/pools"
2526
"github.com/pingcap/tidb/pkg/ddl"
2627
"github.com/pingcap/tidb/pkg/ddl/notifier"
2728
sess "github.com/pingcap/tidb/pkg/ddl/session"
@@ -30,6 +31,7 @@ import (
3031
"github.com/pingcap/tidb/pkg/sessionctx"
3132
"github.com/pingcap/tidb/pkg/testkit"
3233
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
34+
"github.com/pingcap/tidb/pkg/util"
3335
"github.com/stretchr/testify/require"
3436
)
3537

@@ -61,10 +63,17 @@ func TestBasicPubSub(t *testing.T) {
6163
tk.MustExec("DROP TABLE IF EXISTS " + ddl.NotifierTableName)
6264
tk.MustExec(ddl.NotifierTableSQL)
6365

64-
ctx, cancel := context.WithCancel(context.Background())
6566
s := notifier.OpenTableStore("test", ddl.NotifierTableName)
67+
sessionPool := util.NewSessionPool(
68+
1,
69+
func() (pools.Resource, error) {
70+
return tk.Session(), nil
71+
},
72+
nil,
73+
nil,
74+
)
6675

67-
n := notifier.NewDDLNotifier(tk.Session(), s, 50*time.Millisecond)
76+
n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond)
6877

6978
var seenChangesMu sync.Mutex
7079
seenChanges := make([]*notifier.SchemaChangeEvent, 0, 8)
@@ -94,12 +103,13 @@ func TestBasicPubSub(t *testing.T) {
94103

95104
done := make(chan struct{})
96105
go func() {
97-
n.Start(ctx)
106+
n.OnBecomeOwner()
98107
close(done)
99108
}()
100109

101110
tk2 := testkit.NewTestKit(t, store)
102111
se := sess.NewSession(tk2.Session())
112+
ctx := context.Background()
103113
event1 := notifier.NewCreateTableEvent(&model.TableInfo{ID: 1000, Name: pmodel.NewCIStr("t1")})
104114
err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, event1, s)
105115
require.NoError(t, err)
@@ -119,8 +129,7 @@ func TestBasicPubSub(t *testing.T) {
119129
require.Equal(t, event1, seenChanges[0])
120130
require.Equal(t, event2, seenChanges[1])
121131
require.Equal(t, event3, seenChanges[2])
122-
123-
cancel()
132+
n.OnRetireOwner()
124133
<-done
125134
}
126135

@@ -131,10 +140,16 @@ func TestDeliverOrderAndCleanup(t *testing.T) {
131140
tk.MustExec("DROP TABLE IF EXISTS " + ddl.NotifierTableName)
132141
tk.MustExec(ddl.NotifierTableSQL)
133142

134-
ctx, cancel := context.WithCancel(context.Background())
135143
s := notifier.OpenTableStore("test", ddl.NotifierTableName)
136-
137-
n := notifier.NewDDLNotifier(tk.Session(), s, 50*time.Millisecond)
144+
sessionPool := util.NewSessionPool(
145+
1,
146+
func() (pools.Resource, error) {
147+
return tk.Session(), nil
148+
},
149+
nil,
150+
nil,
151+
)
152+
n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond)
138153

139154
newRndFailHandler := func() (notifier.SchemaChangeHandler, *[]int64) {
140155
maxFail := 5
@@ -166,13 +181,13 @@ func TestDeliverOrderAndCleanup(t *testing.T) {
166181

167182
done := make(chan struct{})
168183
go func() {
169-
n.Start(ctx)
184+
n.OnBecomeOwner()
170185
close(done)
171186
}()
172187

173188
tk2 := testkit.NewTestKit(t, store)
174189
se := sess.NewSession(tk2.Session())
175-
190+
ctx := context.Background()
176191
event1 := notifier.NewCreateTableEvent(&model.TableInfo{ID: 1000, Name: pmodel.NewCIStr("t1")})
177192
err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, event1, s)
178193
require.NoError(t, err)
@@ -193,7 +208,7 @@ func TestDeliverOrderAndCleanup(t *testing.T) {
193208
require.Equal(t, []int64{1000, 1001, 1002}, *id2)
194209
require.Equal(t, []int64{1000, 1001, 1002}, *id3)
195210

196-
cancel()
211+
n.OnRetireOwner()
197212
<-done
198213
}
199214

pkg/domain/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ go_library(
5757
"//pkg/planner/core/metrics",
5858
"//pkg/privilege/privileges",
5959
"//pkg/resourcegroup/runaway",
60-
"//pkg/session/types",
6160
"//pkg/sessionctx",
6261
"//pkg/sessionctx/sessionstates",
6362
"//pkg/sessionctx/sysproctrack",

pkg/domain/domain.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ import (
6565
metrics2 "github.com/pingcap/tidb/pkg/planner/core/metrics"
6666
"github.com/pingcap/tidb/pkg/privilege/privileges"
6767
"github.com/pingcap/tidb/pkg/resourcegroup/runaway"
68-
sessiontypes "github.com/pingcap/tidb/pkg/session/types"
6968
"github.com/pingcap/tidb/pkg/sessionctx"
7069
"github.com/pingcap/tidb/pkg/sessionctx/sessionstates"
7170
"github.com/pingcap/tidb/pkg/sessionctx/sysproctrack"
@@ -1380,12 +1379,8 @@ func (do *Domain) Init(
13801379
var ddlNotifierStore notifier.Store
13811380
if intest.InTest {
13821381
ddlNotifierStore = notifier.OpenTableStore("mysql", ddl.NotifierTableName)
1383-
se, err := do.sysExecutorFactory(do)
1384-
if err != nil {
1385-
return err
1386-
}
13871382
do.ddlNotifier = notifier.NewDDLNotifier(
1388-
se.(sessiontypes.Session),
1383+
do.sysSessionPool,
13891384
ddlNotifierStore,
13901385
time.Second,
13911386
)
@@ -1511,12 +1506,6 @@ func (do *Domain) Start() error {
15111506
return err
15121507
}
15131508
do.minJobIDRefresher = do.ddl.GetMinJobIDRefresher()
1514-
if intest.InTest {
1515-
do.wg.Run(func() {
1516-
do.ddlNotifier.Start(do.ctx)
1517-
do.ddlNotifier.Close()
1518-
}, "ddlNotifier")
1519-
}
15201509

15211510
// Local store needs to get the change information for every DDL state in each session.
15221511
do.wg.Run(func() {
@@ -2350,6 +2339,9 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
23502339
variable.EnableStatsOwner = do.enableStatsOwner
23512340
variable.DisableStatsOwner = do.disableStatsOwner
23522341
do.statsOwner = do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey)
2342+
if intest.InTest {
2343+
do.statsOwner.SetListener(do.ddlNotifier)
2344+
}
23532345
do.wg.Run(func() {
23542346
do.indexUsageWorker()
23552347
}, "indexUsageWorker")

0 commit comments

Comments
 (0)