Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ddl/notifier/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ go_test(
],
embed = [":notifier"],
flaky = True,
shard_count = 10,
shard_count = 12,
deps = [
"//pkg/ddl",
"//pkg/ddl/session",
Expand Down
24 changes: 18 additions & 6 deletions pkg/ddl/notifier/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ type Store interface {
se *sess.Session,
ddlJobID int64,
multiSchemaChangeID int64,
processedBy uint64,
oldProcessedBy uint64,
newProcessedBy uint64,
) error
DeleteAndCommit(ctx context.Context, se *sess.Session, ddlJobID int64, multiSchemaChangeID int) error
// List will start a transaction of given session and read all schema changes
Expand Down Expand Up @@ -87,17 +88,28 @@ func (t *tableStore) UpdateProcessed(
se *sess.Session,
ddlJobID int64,
multiSchemaChangeID int64,
processedBy uint64,
oldProcessedBy uint64,
newProcessedBy uint64,
) error {
sql := fmt.Sprintf(`
UPDATE %s.%s
SET processed_by_flag = %d
WHERE ddl_job_id = %d AND sub_job_id = %d`,
WHERE ddl_job_id = %d AND sub_job_id = %d AND processed_by_flag = %d`,
t.db, t.table,
processedBy,
ddlJobID, multiSchemaChangeID)
newProcessedBy,
ddlJobID, multiSchemaChangeID, oldProcessedBy,
)
_, err := se.Execute(ctx, sql, "ddl_notifier")
return err
if err != nil {
return errors.Trace(err)
}
if se.GetSessionVars().StmtCtx.AffectedRows() == 0 {
return errors.Errorf(
"failed to update processed_by_flag, maybe the row has been updated by other owner. ddl_job_id: %d, sub_job_id: %d",
ddlJobID, multiSchemaChangeID,
)
}
return nil
}

// DeleteAndCommit implements Store interface.
Expand Down
24 changes: 12 additions & 12 deletions pkg/ddl/notifier/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,20 @@ func (n *DDLNotifier) processEventForHandler(
if (change.processedByFlag & (1 << handlerID)) != 0 {
return nil
}
newFlag := change.processedByFlag | (1 << handlerID)

if err = session.Begin(ctx); err != nil {
if err = session.BeginPessimistic(ctx); err != nil {
return errors.Trace(err)
}
defer func() {
if err == nil {
err = errors.Trace(session.Commit(ctx))
} else {
if err != nil {
session.Rollback()
return
}

err = errors.Trace(session.Commit(ctx))
if err == nil {
change.processedByFlag = newFlag
}
}()

Expand All @@ -293,19 +298,14 @@ func (n *DDLNotifier) processEventForHandler(
zap.Duration("duration", time.Since(now)))
}

newFlag := change.processedByFlag | (1 << handlerID)
if err = n.store.UpdateProcessed(
return errors.Trace(n.store.UpdateProcessed(
ctx,
session,
change.ddlJobID,
change.subJobID,
change.processedByFlag,
newFlag,
); err != nil {
return errors.Trace(err)
}
change.processedByFlag = newFlag

return nil
))
}

// Stop stops the background loop.
Expand Down
107 changes: 104 additions & 3 deletions pkg/ddl/notifier/testkit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,9 @@ func Test2OwnerForAShortTime(t *testing.T) {

s := notifier.OpenTableStore("test", ddl.NotifierTableName)
sessionPool := util.NewSessionPool(
1,
4,
func() (pools.Resource, error) {
return tk.Session(), nil
return testkit.NewTestKit(t, store).Session(), nil
},
nil,
nil,
Expand Down Expand Up @@ -360,7 +360,7 @@ func Test2OwnerForAShortTime(t *testing.T) {
if !bytes.Contains(content, []byte("Error processing change")) {
return false
}
return bytes.Contains(content, []byte("Write conflict"))
return bytes.Contains(content, []byte("maybe the row has been updated by other owner"))
}, time.Second, 25*time.Millisecond)
// the handler should not commit
tk2.MustQuery("SELECT * FROM test.result").Check(testkit.Rows())
Expand Down Expand Up @@ -489,3 +489,104 @@ func TestBeginTwice(t *testing.T) {
require.NoError(t, err)
require.NotContains(t, string(content), "context provider not set")
}

func TestHandlersSeePessimisticTxnError(t *testing.T) {
// 1. One always fails
// 2. One always succeeds
// Make sure events don't get lost after the second handler succeeds.
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("USE test")
tk.MustExec("DROP TABLE IF EXISTS " + ddl.NotifierTableName)
tk.MustExec(ddl.NotifierTableSQL)
ctx := context.Background()
s := notifier.OpenTableStore("test", ddl.NotifierTableName)
sessionPool := util.NewSessionPool(
4,
func() (pools.Resource, error) {
return testkit.NewTestKit(t, store).Session(), nil
},
nil,
nil,
)
n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond)
// Always fails
failHandler := func(_ context.Context, sctx sessionctx.Context, _ *notifier.SchemaChangeEvent) error {
// Mock a duplicate key error
_, err := sctx.GetSQLExecutor().Execute(ctx, "INSERT INTO test."+ddl.NotifierTableName+" VALUES(1, -1, 'some', 0)")
return err
}
// Always succeeds
successHandler := func(context.Context, sessionctx.Context, *notifier.SchemaChangeEvent) error {
return nil
}
n.RegisterHandler(2, successHandler)
n.RegisterHandler(1, failHandler)
n.OnBecomeOwner()
tk2 := testkit.NewTestKit(t, store)
se := sess.NewSession(tk2.Session())
event1 := notifier.NewCreateTableEvent(&model.TableInfo{ID: 1000, Name: ast.NewCIStr("t1")})
err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, event1, s)
require.NoError(t, err)
require.Never(t, func() bool {
changes := make([]*notifier.SchemaChange, 8)
result, closeFn := s.List(ctx, se)
count, err2 := result.Read(changes)
require.NoError(t, err2)
closeFn()
return count == 0
}, time.Second, 50*time.Millisecond)
}

func TestCommitFailed(t *testing.T) {
// Make sure events don't get lost if internal txn commit failed.
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("USE test")
tk.MustExec("set global tidb_enable_metadata_lock=0")
t.Cleanup(func() {
tk.MustExec("set global tidb_enable_metadata_lock=1")
})
tk.MustExec("DROP TABLE IF EXISTS " + ddl.NotifierTableName)
tk.MustExec(ddl.NotifierTableSQL)
tk.MustExec("CREATE TABLE subscribe_table (id INT PRIMARY KEY, c INT)")
tk.MustExec("INSERT INTO subscribe_table VALUES (1, 1)")

ctx := context.Background()
s := notifier.OpenTableStore("test", ddl.NotifierTableName)
sessionPool := util.NewSessionPool(
4,
func() (pools.Resource, error) {
return testkit.NewTestKit(t, store).Session(), nil
},
nil,
nil,
)
n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond)
handler := func(_ context.Context, sctx sessionctx.Context, _ *notifier.SchemaChangeEvent) error {
// pessimistic + DDL will cause an "infoschema is changed" error at commit time.
_, err := sctx.GetSQLExecutor().Execute(
ctx, "UPDATE test.subscribe_table SET c = c + 1 WHERE id = 1",
)
require.NoError(t, err)

tk.MustExec("TRUNCATE test.subscribe_table")
return nil
}
n.RegisterHandler(notifier.TestHandlerID, handler)
n.OnBecomeOwner()
tk2 := testkit.NewTestKit(t, store)
se := sess.NewSession(tk2.Session())
event1 := notifier.NewCreateTableEvent(&model.TableInfo{ID: 1000, Name: ast.NewCIStr("t1")})
err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, event1, s)
require.NoError(t, err)
require.Never(t, func() bool {
changes := make([]*notifier.SchemaChange, 8)
result, closeFn := s.List(ctx, se)
count, err2 := result.Read(changes)
require.NoError(t, err2)
closeFn()
return count == 0
}, time.Second, 50*time.Millisecond)
n.OnRetireOwner()
}
1 change: 1 addition & 0 deletions pkg/ddl/session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//pkg/domain/infosync",
"//pkg/kv",
"//pkg/metrics",
"//pkg/parser/ast",
"//pkg/parser/mysql",
"//pkg/parser/terror",
"//pkg/sessionctx",
Expand Down
14 changes: 14 additions & 0 deletions pkg/ddl/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessiontxn"
Expand Down Expand Up @@ -49,6 +50,19 @@ func (s *Session) Begin(ctx context.Context) error {
return nil
}

// BeginPessimistic starts a pessimistic transaction.
func (s *Session) BeginPessimistic(ctx context.Context) error {
err := sessiontxn.GetTxnManager(s.Context).EnterNewTxn(ctx, &sessiontxn.EnterNewTxnRequest{
Type: sessiontxn.EnterNewTxnDefault,
TxnMode: ast.Pessimistic,
})
if err != nil {
return err
}
s.GetSessionVars().SetInTxn(true)
return nil
}

// Commit commits the transaction.
func (s *Session) Commit(ctx context.Context) error {
s.StmtCommit(ctx)
Expand Down
50 changes: 50 additions & 0 deletions pkg/ddl/session/session_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package session_test
import (
"context"
"testing"
"time"

"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/ddl/session"
Expand Down Expand Up @@ -66,3 +67,52 @@ func TestSessionPool(t *testing.T) {
}
require.Equal(t, uint64(0), targetTS)
}

func TestPessimisticTxn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int primary key, b int)")
tk.MustExec("insert into t values (1, 1)")

resourcePool := pools.NewResourcePool(func() (pools.Resource, error) {
newTk := testkit.NewTestKit(t, store)
return newTk.Session(), nil
}, 4, 4, 0)
pool := session.NewSessionPool(resourcePool)
ctx := context.Background()

sessCtx, err := pool.Get()
require.NoError(t, err)
se := session.NewSession(sessCtx)
sessCtx2, err := pool.Get()
require.NoError(t, err)
se2 := session.NewSession(sessCtx2)

err = se.BeginPessimistic(ctx)
require.NoError(t, err)
err = se2.BeginPessimistic(ctx)
require.NoError(t, err)
_, err = se.Execute(ctx, "update test.t set b = b + 1 where a = 1", "ut")
require.NoError(t, err)
done := make(chan struct{}, 1)
go func() {
_, err := se2.Execute(ctx, "update test.t set b = b + 1 where a = 1", "ut")
require.NoError(t, err)
done <- struct{}{}
err = se2.Commit(ctx)
require.NoError(t, err)
close(done)
}()

time.Sleep(100 * time.Millisecond)
// because this is a pessimistic transaction, the second transaction should be blocked
require.Len(t, done, 0)
err = se.Commit(ctx)
require.NoError(t, err)
<-done
_, ok := <-done
require.False(t, ok)
pool.Put(sessCtx)
pool.Put(sessCtx2)
}
6 changes: 2 additions & 4 deletions tests/integrationtest/r/executor/analyze.result
Original file line number Diff line number Diff line change
Expand Up @@ -739,16 +739,14 @@ create index idxc on t (c);
analyze table t partition p0 index idxa;
analyze table t partition p1 index idxb;
analyze table t partition p2 index idxc;
show warnings;
SHOW WARNINGS WHERE Level IN ('Warning', 'Error');
Level Code Message
Warning 1105 No predicate column has been collected yet for table executor__analyze.t, so only indexes and the columns composing the indexes will be analyzed
Warning 1105 The version 2 would collect all statistics not only the selected indexes
Note 1105 Analyze use auto adjusted sample rate 1.000000 for table executor__analyze.t's partition p2, reason to use this rate is "use min(1, 110000/10000) as the sample-rate=1"
analyze table t partition p0;
show warnings;
SHOW WARNINGS WHERE Level IN ('Warning', 'Error');
Level Code Message
Warning 1105 No predicate column has been collected yet for table executor__analyze.t, so only indexes and the columns composing the indexes will be analyzed
Note 1105 Analyze use auto adjusted sample rate 1.000000 for table executor__analyze.t's partition p0, reason to use this rate is "use min(1, 110000/2) as the sample-rate=1"
set tidb_partition_prune_mode=default;
set @@session.tidb_enable_fast_analyze=1;
show warnings;
Expand Down
28 changes: 22 additions & 6 deletions tests/integrationtest/r/executor/issues.result
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ insert into pt (val) select (val) from pt;
split table pt between (0) and (40960) regions 30;
TOTAL_SPLIT_REGION SCATTER_FINISH_RATIO
203 1
analyze table pt;
analyze table pt all columns;
set @@tidb_distsql_scan_concurrency = default;
explain analyze select * from t order by id; # expected distsql concurrency 2
id estRows actRows task access object execution info operator info memory disk
Expand Down Expand Up @@ -969,6 +969,7 @@ Limit_7 256.00 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─TableFullScan_9 256.00 <actRows> cop[tikv] table:pt NULL keep order:false <memory> <disk>
explain analyze select * from pt where val = 125 limit 100; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
<<<<<<< HEAD
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please resolve this conflict.

Limit_8 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_13 100.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_12 100.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
Expand All @@ -982,6 +983,21 @@ Limit_8 204.80 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─Selection_11 204.80 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 125) <memory> <disk>
└─TableFullScan_10 256.00 <actRows> cop[tikv] table:pt NULL keep order:false, stats:partial[val:missing] <memory> <disk>
explain analyze select * from pt order by id limit 100; # expected distsql concurrency 7, but currently get 1, see issue #55190
=======
Limit_8 1.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_13 1.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_12 1.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─Selection_11 1.00 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 125) <memory> <disk>
└─TableFullScan_10 256.00 <actRows> cop[tikv] table:pt NULL keep order:false <memory> <disk>
explain analyze select * from pt where val = 125 limit 100000;
id estRows actRows task access object execution info operator info memory disk
Limit_8 1.00 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─TableReader_13 1.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_12 1.00 <actRows> cop[tikv] NULL NULL offset:0, count:100000 <memory> <disk>
└─Selection_11 1.00 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 125) <memory> <disk>
└─TableFullScan_10 256.00 <actRows> cop[tikv] table:pt NULL keep order:false <memory> <disk>
explain analyze select * from pt order by id limit 100;
>>>>>>> 0fdb32530d6 (ddl notifier: use pessimistic txn and fix updating memory state too early (#59157))
id estRows actRows task access object execution info operator info memory disk
Limit_10 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_17 100.00 <actRows> root partition:all max_distsql_concurrency: 1 NULL <memory> <disk>
Expand All @@ -995,11 +1011,11 @@ Limit_11 256.00 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─TableFullScan_19 256.00 <actRows> cop[tikv] table:pt NULL keep order:true <memory> <disk>
explain analyze select * from pt where val = 126 order by id limit 100; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_11 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_20 100.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_19 100.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─Selection_18 100.00 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 126) <memory> <disk>
└─TableFullScan_17 125.00 <actRows> cop[tikv] table:pt NULL keep order:true, stats:partial[val:missing] <memory> <disk>
Limit_11 1.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_20 1.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_19 1.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─Selection_18 1.00 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 126) <memory> <disk>
└─TableFullScan_17 256.00 <actRows> cop[tikv] table:pt NULL keep order:true <memory> <disk>
CREATE TABLE test_55837 (col1 int(4) NOT NULL, col2 bigint(4) NOT NULL, KEY col2_index (col2));
insert into test_55837 values(0,1725292800),(0,1725292800);
select from_unixtime( if(col2 >9999999999, col2/1000, col2), '%Y-%m-%d %H:%i:%s') as result from test_55837;
Expand Down
Loading