Skip to content

Commit 6f761e2

Browse files
authored
ddl notifier: use pessimistic txn and fix updating memory state too early (#59157) (#59330)
close #59055
1 parent 47279b9 commit 6f761e2

File tree

11 files changed

+222
-46
lines changed

11 files changed

+222
-46
lines changed

pkg/ddl/notifier/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ go_test(
3636
],
3737
embed = [":notifier"],
3838
flaky = True,
39-
shard_count = 10,
39+
shard_count = 12,
4040
deps = [
4141
"//pkg/ddl",
4242
"//pkg/ddl/session",

pkg/ddl/notifier/store.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ type Store interface {
3535
se *sess.Session,
3636
ddlJobID int64,
3737
multiSchemaChangeID int64,
38-
processedBy uint64,
38+
oldProcessedBy uint64,
39+
newProcessedBy uint64,
3940
) error
4041
DeleteAndCommit(ctx context.Context, se *sess.Session, ddlJobID int64, multiSchemaChangeID int) error
4142
// List will start a transaction of given session and read all schema changes
@@ -87,17 +88,28 @@ func (t *tableStore) UpdateProcessed(
8788
se *sess.Session,
8889
ddlJobID int64,
8990
multiSchemaChangeID int64,
90-
processedBy uint64,
91+
oldProcessedBy uint64,
92+
newProcessedBy uint64,
9193
) error {
9294
sql := fmt.Sprintf(`
9395
UPDATE %s.%s
9496
SET processed_by_flag = %d
95-
WHERE ddl_job_id = %d AND sub_job_id = %d`,
97+
WHERE ddl_job_id = %d AND sub_job_id = %d AND processed_by_flag = %d`,
9698
t.db, t.table,
97-
processedBy,
98-
ddlJobID, multiSchemaChangeID)
99+
newProcessedBy,
100+
ddlJobID, multiSchemaChangeID, oldProcessedBy,
101+
)
99102
_, err := se.Execute(ctx, sql, "ddl_notifier")
100-
return err
103+
if err != nil {
104+
return errors.Trace(err)
105+
}
106+
if se.GetSessionVars().StmtCtx.AffectedRows() == 0 {
107+
return errors.Errorf(
108+
"failed to update processed_by_flag, maybe the row has been updated by other owner. ddl_job_id: %d, sub_job_id: %d",
109+
ddlJobID, multiSchemaChangeID,
110+
)
111+
}
112+
return nil
101113
}
102114

103115
// DeleteAndCommit implements Store interface.

pkg/ddl/notifier/subscribe.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -268,15 +268,20 @@ func (n *DDLNotifier) processEventForHandler(
268268
if (change.processedByFlag & (1 << handlerID)) != 0 {
269269
return nil
270270
}
271+
newFlag := change.processedByFlag | (1 << handlerID)
271272

272-
if err = session.Begin(ctx); err != nil {
273+
if err = session.BeginPessimistic(ctx); err != nil {
273274
return errors.Trace(err)
274275
}
275276
defer func() {
276-
if err == nil {
277-
err = errors.Trace(session.Commit(ctx))
278-
} else {
277+
if err != nil {
279278
session.Rollback()
279+
return
280+
}
281+
282+
err = errors.Trace(session.Commit(ctx))
283+
if err == nil {
284+
change.processedByFlag = newFlag
280285
}
281286
}()
282287

@@ -293,19 +298,14 @@ func (n *DDLNotifier) processEventForHandler(
293298
zap.Duration("duration", time.Since(now)))
294299
}
295300

296-
newFlag := change.processedByFlag | (1 << handlerID)
297-
if err = n.store.UpdateProcessed(
301+
return errors.Trace(n.store.UpdateProcessed(
298302
ctx,
299303
session,
300304
change.ddlJobID,
301305
change.subJobID,
306+
change.processedByFlag,
302307
newFlag,
303-
); err != nil {
304-
return errors.Trace(err)
305-
}
306-
change.processedByFlag = newFlag
307-
308-
return nil
308+
))
309309
}
310310

311311
// Stop stops the background loop.

pkg/ddl/notifier/testkit_test.go

Lines changed: 104 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -318,9 +318,9 @@ func Test2OwnerForAShortTime(t *testing.T) {
318318

319319
s := notifier.OpenTableStore("test", ddl.NotifierTableName)
320320
sessionPool := util.NewSessionPool(
321-
1,
321+
4,
322322
func() (pools.Resource, error) {
323-
return tk.Session(), nil
323+
return testkit.NewTestKit(t, store).Session(), nil
324324
},
325325
nil,
326326
nil,
@@ -360,7 +360,7 @@ func Test2OwnerForAShortTime(t *testing.T) {
360360
if !bytes.Contains(content, []byte("Error processing change")) {
361361
return false
362362
}
363-
return bytes.Contains(content, []byte("Write conflict"))
363+
return bytes.Contains(content, []byte("maybe the row has been updated by other owner"))
364364
}, time.Second, 25*time.Millisecond)
365365
// the handler should not commit
366366
tk2.MustQuery("SELECT * FROM test.result").Check(testkit.Rows())
@@ -489,3 +489,104 @@ func TestBeginTwice(t *testing.T) {
489489
require.NoError(t, err)
490490
require.NotContains(t, string(content), "context provider not set")
491491
}
492+
493+
func TestHandlersSeePessimisticTxnError(t *testing.T) {
494+
// 1. One always fails
495+
// 2. One always succeeds
496+
// Make sure events don't get lost after the second handler succeeds.
497+
store := testkit.CreateMockStore(t)
498+
tk := testkit.NewTestKit(t, store)
499+
tk.MustExec("USE test")
500+
tk.MustExec("DROP TABLE IF EXISTS " + ddl.NotifierTableName)
501+
tk.MustExec(ddl.NotifierTableSQL)
502+
ctx := context.Background()
503+
s := notifier.OpenTableStore("test", ddl.NotifierTableName)
504+
sessionPool := util.NewSessionPool(
505+
4,
506+
func() (pools.Resource, error) {
507+
return testkit.NewTestKit(t, store).Session(), nil
508+
},
509+
nil,
510+
nil,
511+
)
512+
n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond)
513+
// Always fails
514+
failHandler := func(_ context.Context, sctx sessionctx.Context, _ *notifier.SchemaChangeEvent) error {
515+
// Mock a duplicate key error
516+
_, err := sctx.GetSQLExecutor().Execute(ctx, "INSERT INTO test."+ddl.NotifierTableName+" VALUES(1, -1, 'some', 0)")
517+
return err
518+
}
519+
// Always succeeds
520+
successHandler := func(context.Context, sessionctx.Context, *notifier.SchemaChangeEvent) error {
521+
return nil
522+
}
523+
n.RegisterHandler(2, successHandler)
524+
n.RegisterHandler(1, failHandler)
525+
n.OnBecomeOwner()
526+
tk2 := testkit.NewTestKit(t, store)
527+
se := sess.NewSession(tk2.Session())
528+
event1 := notifier.NewCreateTableEvent(&model.TableInfo{ID: 1000, Name: pmodel.NewCIStr("t1")})
529+
err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, event1, s)
530+
require.NoError(t, err)
531+
require.Never(t, func() bool {
532+
changes := make([]*notifier.SchemaChange, 8)
533+
result, closeFn := s.List(ctx, se)
534+
count, err2 := result.Read(changes)
535+
require.NoError(t, err2)
536+
closeFn()
537+
return count == 0
538+
}, time.Second, 50*time.Millisecond)
539+
}
540+
541+
func TestCommitFailed(t *testing.T) {
542+
// Make sure events don't get lost if internal txn commit failed.
543+
store := testkit.CreateMockStore(t)
544+
tk := testkit.NewTestKit(t, store)
545+
tk.MustExec("USE test")
546+
tk.MustExec("set global tidb_enable_metadata_lock=0")
547+
t.Cleanup(func() {
548+
tk.MustExec("set global tidb_enable_metadata_lock=1")
549+
})
550+
tk.MustExec("DROP TABLE IF EXISTS " + ddl.NotifierTableName)
551+
tk.MustExec(ddl.NotifierTableSQL)
552+
tk.MustExec("CREATE TABLE subscribe_table (id INT PRIMARY KEY, c INT)")
553+
tk.MustExec("INSERT INTO subscribe_table VALUES (1, 1)")
554+
555+
ctx := context.Background()
556+
s := notifier.OpenTableStore("test", ddl.NotifierTableName)
557+
sessionPool := util.NewSessionPool(
558+
4,
559+
func() (pools.Resource, error) {
560+
return testkit.NewTestKit(t, store).Session(), nil
561+
},
562+
nil,
563+
nil,
564+
)
565+
n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond)
566+
handler := func(_ context.Context, sctx sessionctx.Context, _ *notifier.SchemaChangeEvent) error {
567+
// pessimistic + DDL will cause an "infoschema is changed" error at commit time.
568+
_, err := sctx.GetSQLExecutor().Execute(
569+
ctx, "UPDATE test.subscribe_table SET c = c + 1 WHERE id = 1",
570+
)
571+
require.NoError(t, err)
572+
573+
tk.MustExec("TRUNCATE test.subscribe_table")
574+
return nil
575+
}
576+
n.RegisterHandler(notifier.TestHandlerID, handler)
577+
n.OnBecomeOwner()
578+
tk2 := testkit.NewTestKit(t, store)
579+
se := sess.NewSession(tk2.Session())
580+
event1 := notifier.NewCreateTableEvent(&model.TableInfo{ID: 1000, Name: pmodel.NewCIStr("t1")})
581+
err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, event1, s)
582+
require.NoError(t, err)
583+
require.Never(t, func() bool {
584+
changes := make([]*notifier.SchemaChange, 8)
585+
result, closeFn := s.List(ctx, se)
586+
count, err2 := result.Read(changes)
587+
require.NoError(t, err2)
588+
closeFn()
589+
return count == 0
590+
}, time.Second, 50*time.Millisecond)
591+
n.OnRetireOwner()
592+
}

pkg/ddl/session/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ go_library(
1313
"//pkg/domain/infosync",
1414
"//pkg/kv",
1515
"//pkg/metrics",
16+
"//pkg/parser/ast",
1617
"//pkg/parser/mysql",
1718
"//pkg/parser/terror",
1819
"//pkg/sessionctx",

pkg/ddl/session/session.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/pingcap/failpoint"
2323
"github.com/pingcap/tidb/pkg/kv"
2424
"github.com/pingcap/tidb/pkg/metrics"
25+
"github.com/pingcap/tidb/pkg/parser/ast"
2526
"github.com/pingcap/tidb/pkg/parser/terror"
2627
"github.com/pingcap/tidb/pkg/sessionctx"
2728
"github.com/pingcap/tidb/pkg/sessiontxn"
@@ -49,6 +50,19 @@ func (s *Session) Begin(ctx context.Context) error {
4950
return nil
5051
}
5152

53+
// BeginPessimistic starts a pessimistic transaction.
54+
func (s *Session) BeginPessimistic(ctx context.Context) error {
55+
err := sessiontxn.GetTxnManager(s.Context).EnterNewTxn(ctx, &sessiontxn.EnterNewTxnRequest{
56+
Type: sessiontxn.EnterNewTxnDefault,
57+
TxnMode: ast.Pessimistic,
58+
})
59+
if err != nil {
60+
return err
61+
}
62+
s.GetSessionVars().SetInTxn(true)
63+
return nil
64+
}
65+
5266
// Commit commits the transaction.
5367
func (s *Session) Commit(ctx context.Context) error {
5468
s.StmtCommit(ctx)

pkg/ddl/session/session_pool_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package session_test
1717
import (
1818
"context"
1919
"testing"
20+
"time"
2021

2122
"github.com/ngaut/pools"
2223
"github.com/pingcap/tidb/pkg/ddl/session"
@@ -66,3 +67,52 @@ func TestSessionPool(t *testing.T) {
6667
}
6768
require.Equal(t, uint64(0), targetTS)
6869
}
70+
71+
func TestPessimisticTxn(t *testing.T) {
72+
store := testkit.CreateMockStore(t)
73+
tk := testkit.NewTestKit(t, store)
74+
tk.MustExec("use test")
75+
tk.MustExec("create table t (a int primary key, b int)")
76+
tk.MustExec("insert into t values (1, 1)")
77+
78+
resourcePool := pools.NewResourcePool(func() (pools.Resource, error) {
79+
newTk := testkit.NewTestKit(t, store)
80+
return newTk.Session(), nil
81+
}, 4, 4, 0)
82+
pool := session.NewSessionPool(resourcePool)
83+
ctx := context.Background()
84+
85+
sessCtx, err := pool.Get()
86+
require.NoError(t, err)
87+
se := session.NewSession(sessCtx)
88+
sessCtx2, err := pool.Get()
89+
require.NoError(t, err)
90+
se2 := session.NewSession(sessCtx2)
91+
92+
err = se.BeginPessimistic(ctx)
93+
require.NoError(t, err)
94+
err = se2.BeginPessimistic(ctx)
95+
require.NoError(t, err)
96+
_, err = se.Execute(ctx, "update test.t set b = b + 1 where a = 1", "ut")
97+
require.NoError(t, err)
98+
done := make(chan struct{}, 1)
99+
go func() {
100+
_, err := se2.Execute(ctx, "update test.t set b = b + 1 where a = 1", "ut")
101+
require.NoError(t, err)
102+
done <- struct{}{}
103+
err = se2.Commit(ctx)
104+
require.NoError(t, err)
105+
close(done)
106+
}()
107+
108+
time.Sleep(100 * time.Millisecond)
109+
// because this is a pessimistic transaction, the second transaction should be blocked
110+
require.Len(t, done, 0)
111+
err = se.Commit(ctx)
112+
require.NoError(t, err)
113+
<-done
114+
_, ok := <-done
115+
require.False(t, ok)
116+
pool.Put(sessCtx)
117+
pool.Put(sessCtx2)
118+
}

tests/integrationtest/r/executor/analyze.result

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -739,16 +739,14 @@ create index idxc on t (c);
739739
analyze table t partition p0 index idxa;
740740
analyze table t partition p1 index idxb;
741741
analyze table t partition p2 index idxc;
742-
show warnings;
742+
SHOW WARNINGS WHERE Level IN ('Warning', 'Error');
743743
Level Code Message
744744
Warning 1105 No predicate column has been collected yet for table executor__analyze.t, so only indexes and the columns composing the indexes will be analyzed
745745
Warning 1105 The version 2 would collect all statistics not only the selected indexes
746-
Note 1105 Analyze use auto adjusted sample rate 1.000000 for table executor__analyze.t's partition p2, reason to use this rate is "use min(1, 110000/10000) as the sample-rate=1"
747746
analyze table t partition p0;
748-
show warnings;
747+
SHOW WARNINGS WHERE Level IN ('Warning', 'Error');
749748
Level Code Message
750749
Warning 1105 No predicate column has been collected yet for table executor__analyze.t, so only indexes and the columns composing the indexes will be analyzed
751-
Note 1105 Analyze use auto adjusted sample rate 1.000000 for table executor__analyze.t's partition p0, reason to use this rate is "use min(1, 110000/2) as the sample-rate=1"
752750
set tidb_partition_prune_mode=default;
753751
set @@session.tidb_enable_fast_analyze=1;
754752
show warnings;

tests/integrationtest/r/executor/issues.result

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -897,7 +897,7 @@ insert into pt (val) select (val) from pt;
897897
split table pt between (0) and (40960) regions 30;
898898
TOTAL_SPLIT_REGION SCATTER_FINISH_RATIO
899899
203 1
900-
analyze table pt;
900+
analyze table pt all columns;
901901
set @@tidb_distsql_scan_concurrency = default;
902902
explain analyze select * from t order by id; # expected distsql concurrency 2
903903
id estRows actRows task access object execution info operator info memory disk
@@ -969,18 +969,18 @@ Limit_7 256.00 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
969969
└─TableFullScan_9 256.00 <actRows> cop[tikv] table:pt NULL keep order:false <memory> <disk>
970970
explain analyze select * from pt where val = 125 limit 100; # expected distsql concurrency 15
971971
id estRows actRows task access object execution info operator info memory disk
972-
Limit_8 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
973-
└─TableReader_13 100.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
974-
└─Limit_12 100.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
975-
└─Selection_11 100.00 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 125) <memory> <disk>
976-
└─TableFullScan_10 125.00 <actRows> cop[tikv] table:pt NULL keep order:false, stats:partial[val:missing] <memory> <disk>
972+
Limit_8 1.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
973+
└─TableReader_13 1.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
974+
└─Limit_12 1.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
975+
└─Selection_11 1.00 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 125) <memory> <disk>
976+
└─TableFullScan_10 256.00 <actRows> cop[tikv] table:pt NULL keep order:false <memory> <disk>
977977
explain analyze select * from pt where val = 125 limit 100000; # expected distsql concurrency 15
978978
id estRows actRows task access object execution info operator info memory disk
979-
Limit_8 204.80 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
980-
└─TableReader_13 204.80 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
981-
└─Limit_12 204.80 <actRows> cop[tikv] NULL NULL offset:0, count:100000 <memory> <disk>
982-
└─Selection_11 204.80 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 125) <memory> <disk>
983-
└─TableFullScan_10 256.00 <actRows> cop[tikv] table:pt NULL keep order:false, stats:partial[val:missing] <memory> <disk>
979+
Limit_8 1.00 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
980+
└─TableReader_13 1.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
981+
└─Limit_12 1.00 <actRows> cop[tikv] NULL NULL offset:0, count:100000 <memory> <disk>
982+
└─Selection_11 1.00 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 125) <memory> <disk>
983+
└─TableFullScan_10 256.00 <actRows> cop[tikv] table:pt NULL keep order:false <memory> <disk>
984984
explain analyze select * from pt order by id limit 100; # expected distsql concurrency 7, but currently get 1, see issue #55190
985985
id estRows actRows task access object execution info operator info memory disk
986986
Limit_10 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
@@ -995,11 +995,11 @@ Limit_11 256.00 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
995995
└─TableFullScan_19 256.00 <actRows> cop[tikv] table:pt NULL keep order:true <memory> <disk>
996996
explain analyze select * from pt where val = 126 order by id limit 100; # expected distsql concurrency 15
997997
id estRows actRows task access object execution info operator info memory disk
998-
Limit_11 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
999-
└─TableReader_20 100.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
1000-
└─Limit_19 100.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
1001-
└─Selection_18 100.00 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 126) <memory> <disk>
1002-
└─TableFullScan_17 125.00 <actRows> cop[tikv] table:pt NULL keep order:true, stats:partial[val:missing] <memory> <disk>
998+
Limit_11 1.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
999+
└─TableReader_20 1.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
1000+
└─Limit_19 1.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
1001+
└─Selection_18 1.00 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 126) <memory> <disk>
1002+
└─TableFullScan_17 256.00 <actRows> cop[tikv] table:pt NULL keep order:true <memory> <disk>
10031003
CREATE TABLE test_55837 (col1 int(4) NOT NULL, col2 bigint(4) NOT NULL, KEY col2_index (col2));
10041004
insert into test_55837 values(0,1725292800),(0,1725292800);
10051005
select from_unixtime( if(col2 >9999999999, col2/1000, col2), '%Y-%m-%d %H:%i:%s') as result from test_55837;

0 commit comments

Comments
 (0)