diff --git a/pkg/ddl/index_cop.go b/pkg/ddl/index_cop.go index 0293853b14a87..29c790bec2e0a 100644 --- a/pkg/ddl/index_cop.go +++ b/pkg/ddl/index_cop.go @@ -19,6 +19,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/ddl/copr" sess "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/distsql" @@ -46,11 +47,13 @@ func wrapInBeginRollback(se *sess.Session, f func(startTS uint64) error) error { return errors.Trace(err) } defer se.Rollback() - var startTS uint64 - sessVars := se.GetSessionVars() - sessVars.TxnCtxMu.Lock() - startTS = sessVars.TxnCtx.StartTS - sessVars.TxnCtxMu.Unlock() + + txn, err := se.Txn() + if err != nil { + return err + } + startTS := txn.StartTS() + failpoint.InjectCall("wrapInBeginRollbackStartTS", startTS) return f(startTS) } diff --git a/pkg/ddl/ingest/BUILD.bazel b/pkg/ddl/ingest/BUILD.bazel index 26f3a1c86a44a..5eba310809d4d 100644 --- a/pkg/ddl/ingest/BUILD.bazel +++ b/pkg/ddl/ingest/BUILD.bazel @@ -72,7 +72,7 @@ go_test( embed = [":ingest"], flaky = True, race = "on", - shard_count = 22, + shard_count = 23, deps = [ "//pkg/config", "//pkg/ddl/ingest/testutil", @@ -82,7 +82,6 @@ go_test( "//pkg/meta/model", "//pkg/testkit", "//pkg/testkit/testfailpoint", - "//tests/realtikvtest", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//assert", diff --git a/pkg/ddl/ingest/integration_test.go b/pkg/ddl/ingest/integration_test.go index 8557a62936080..206b3e2d11b20 100644 --- a/pkg/ddl/ingest/integration_test.go +++ b/pkg/ddl/ingest/integration_test.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testfailpoint" - "github.com/pingcap/tidb/tests/realtikvtest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -121,7 +120,7 @@ func TestIngestError(t *testing.T) { } func TestAddIndexIngestPanic(t *testing.T) { - store := realtikvtest.CreateMockStoreAndSetup(t) + store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test;") defer ingesttestutil.InjectMockBackendMgr(t, store)() @@ -147,8 +146,34 @@ func TestAddIndexIngestPanic(t *testing.T) { }) } +func TestAddIndexSetInternalSessions(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + defer ingesttestutil.InjectMockBackendMgr(t, store)() + + tk.MustExec("set global tidb_enable_dist_task = 0;") + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;") + tk.MustExec("create table t (a int);") + tk.MustExec("insert into t values (1);") + expectInternalTS := []uint64{} + actualInternalTS := []uint64{} + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/wrapInBeginRollbackStartTS", func(startTS uint64) { + expectInternalTS = append(expectInternalTS, startTS) + }) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/scanRecordExec", func() { + mgr := tk.Session().GetSessionManager() + actualInternalTS = mgr.GetInternalSessionStartTSList() + }) + tk.MustExec("alter table t add index idx(a);") + require.Len(t, expectInternalTS, 1) + for _, ts := range expectInternalTS { + require.Contains(t, actualInternalTS, ts) + } +} + func TestAddIndexIngestCancel(t *testing.T) { - store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t) + store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test;") defer ingesttestutil.InjectMockBackendMgr(t, store)() diff --git a/pkg/ddl/session/session_pool.go b/pkg/ddl/session/session_pool.go index e34ffec959282..c09d787aa5a75 100644 --- a/pkg/ddl/session/session_pool.go +++ b/pkg/ddl/session/session_pool.go @@ -15,6 +15,7 @@ package session import ( + "context" "fmt" "sync" @@ -73,6 +74,11 @@ func (sg *Pool) Get() (sessionctx.Context, error) { func (sg *Pool) Put(ctx sessionctx.Context) { // no need to protect sg.resPool, even the sg.resPool is closed, the ctx still need to // Put into resPool, because when resPool is closing, it will wait all the ctx returns, then resPool finish closing. + intest.AssertFunc(func() bool { + txn, _ := ctx.Txn(false) + return txn == nil || !txn.Valid() + }) + ctx.RollbackTxn(context.Background()) sg.resPool.Put(ctx.(pools.Resource)) infosync.DeleteInternalSession(ctx) } diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index cfbfaa4add87c..f4e4796e4c38b 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -1281,8 +1281,12 @@ func NewDomain(store kv.Storage, schemaLease time.Duration, statsLease time.Dura infosync.StoreInternalSession(r) }, func(r pools.Resource) { - _, ok := r.(sessionctx.Context) + sctx, ok := r.(sessionctx.Context) intest.Assert(ok) + intest.AssertFunc(func() bool { + txn, _ := sctx.Txn(false) + return txn == nil || !txn.Valid() + }) infosync.DeleteInternalSession(r) }, ), diff --git a/pkg/executor/infoschema_reader.go b/pkg/executor/infoschema_reader.go index a0a8fbdb4dcd1..e5c1ce49bd3d1 100644 --- a/pkg/executor/infoschema_reader.go +++ b/pkg/executor/infoschema_reader.go @@ -2900,7 +2900,7 @@ func (e *tidbTrxTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Co for _, info := range infoList { // If you have the PROCESS privilege, you can see all running transactions. // Otherwise, you can see only your own transactions. - if !hasProcessPriv && loginUser != nil && info.Username != loginUser.Username { + if !hasProcessPriv && loginUser != nil && info.ProcessInfo.Username != loginUser.Username { continue } e.txnInfo = append(e.txnInfo, info) diff --git a/pkg/infoschema/test/clustertablestest/tables_test.go b/pkg/infoschema/test/clustertablestest/tables_test.go index 4b69412db69d4..9c3b62d1e1223 100644 --- a/pkg/infoschema/test/clustertablestest/tables_test.go +++ b/pkg/infoschema/test/clustertablestest/tables_test.go @@ -1232,9 +1232,11 @@ func TestTiDBTrx(t *testing.T) { CurrentSQLDigest: digest.String(), State: txninfo.TxnIdle, EntriesCount: 1, - ConnectionID: 2, - Username: "root", - CurrentDB: "test", + ProcessInfo: &txninfo.ProcessInfo{ + ConnectionID: 2, + Username: "root", + CurrentDB: "test", + }, } blockTime2 := time.Date(2021, 05, 20, 13, 18, 30, 123456000, time.Local) @@ -1243,9 +1245,11 @@ func TestTiDBTrx(t *testing.T) { CurrentSQLDigest: "", AllSQLDigests: []string{"sql1", "sql2", digest.String()}, State: txninfo.TxnLockAcquiring, - ConnectionID: 10, - Username: "user1", - CurrentDB: "db1", + ProcessInfo: &txninfo.ProcessInfo{ + ConnectionID: 10, + Username: "user1", + CurrentDB: "db1", + }, } sm.TxnInfo[1].BlockStartTime.Valid = true sm.TxnInfo[1].BlockStartTime.Time = blockTime2 diff --git a/pkg/server/server.go b/pkg/server/server.go index 34c199b4f4779..59e16a4f2a5cc 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -839,7 +839,8 @@ func (s *Server) getUserProcessList() map[uint64]*util.ProcessInfo { return rs } -// ShowTxnList shows all txn info for displaying in `TIDB_TRX` +// ShowTxnList shows all txn info for displaying in `TIDB_TRX`. +// Internal sessions are not taken into consideration. func (s *Server) ShowTxnList() []*txninfo.TxnInfo { s.rwlock.RLock() defer s.rwlock.RUnlock() @@ -847,7 +848,7 @@ func (s *Server) ShowTxnList() []*txninfo.TxnInfo { for _, client := range s.clients { if client.ctx.Session != nil { info := client.ctx.Session.TxnInfo() - if info != nil { + if info != nil && info.ProcessInfo != nil { rs = append(rs, info) } } diff --git a/pkg/session/session.go b/pkg/session/session.go index 9ee4edb6f5d2a..fa0e562083ce9 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -462,6 +462,7 @@ func (s *session) FieldList(tableName string) ([]*resolve.ResultField, error) { } // TxnInfo returns a pointer to a *copy* of the internal TxnInfo, thus is *read only* +// Process field may not initialize if this is a session used internally. func (s *session) TxnInfo() *txninfo.TxnInfo { s.txn.mu.RLock() // Copy on read to get a snapshot, this API shouldn't be frequently called. @@ -474,17 +475,18 @@ func (s *session) TxnInfo() *txninfo.TxnInfo { processInfo := s.ShowProcess() if processInfo == nil { - return nil + return &txnInfo + } + txnInfo.ProcessInfo = &txninfo.ProcessInfo{ + ConnectionID: processInfo.ID, + Username: processInfo.User, + CurrentDB: processInfo.DB, + RelatedTableIDs: make(map[int64]struct{}), } - txnInfo.ConnectionID = processInfo.ID - txnInfo.Username = processInfo.User - txnInfo.CurrentDB = processInfo.DB - txnInfo.RelatedTableIDs = make(map[int64]struct{}) s.GetSessionVars().GetRelatedTableForMDL().Range(func(key, _ any) bool { - txnInfo.RelatedTableIDs[key.(int64)] = struct{}{} + txnInfo.ProcessInfo.RelatedTableIDs[key.(int64)] = struct{}{} return true }) - return &txnInfo } @@ -4062,9 +4064,10 @@ func GetStartTSFromSession(se any) (startTS, processInfoID uint64) { txnInfo := tmp.TxnInfo() if txnInfo != nil { startTS = txnInfo.StartTS - processInfoID = txnInfo.ConnectionID + if txnInfo.ProcessInfo != nil { + processInfoID = txnInfo.ProcessInfo.ConnectionID + } } - logutil.BgLogger().Debug( "GetStartTSFromSession getting startTS of internal session", zap.Uint64("startTS", startTS), zap.Time("start time", oracle.GetTimeFromTS(startTS))) diff --git a/pkg/session/txninfo/txn_info.go b/pkg/session/txninfo/txn_info.go index 50e60da829a46..5067487ebe577 100644 --- a/pkg/session/txninfo/txn_info.go +++ b/pkg/session/txninfo/txn_info.go @@ -174,8 +174,12 @@ type TxnInfo struct { // How many entries are in MemDB EntriesCount uint64 - // The following fields will be filled in `session` instead of `LazyTxn` + // The following field will be filled in `session` instead of `LazyTxn` + ProcessInfo *ProcessInfo +} +// ProcessInfo is part of fields of txnInfo, which will be filled in `session` instead of `LazyTxn` +type ProcessInfo struct { // Which session this transaction belongs to ConnectionID uint64 // The user who open this session @@ -219,13 +223,25 @@ var columnValueGetterMap = map[string]func(*TxnInfo) types.Datum{ return types.NewDatum(info.EntriesCount) }, SessionIDStr: func(info *TxnInfo) types.Datum { - return types.NewDatum(info.ConnectionID) + var connectionID uint64 + if info.ProcessInfo != nil { + connectionID = info.ProcessInfo.ConnectionID + } + return types.NewDatum(connectionID) }, UserStr: func(info *TxnInfo) types.Datum { - return types.NewDatum(info.Username) + var userName string + if info.ProcessInfo != nil { + userName = info.ProcessInfo.Username + } + return types.NewDatum(userName) }, DBStr: func(info *TxnInfo) types.Datum { - return types.NewDatum(info.CurrentDB) + var currentDB string + if info.ProcessInfo != nil { + currentDB = info.ProcessInfo.CurrentDB + } + return types.NewDatum(currentDB) }, AllSQLDigestsStr: func(info *TxnInfo) types.Datum { allSQLDigests := info.AllSQLDigests @@ -241,7 +257,10 @@ var columnValueGetterMap = map[string]func(*TxnInfo) types.Datum{ return types.NewDatum(string(res)) }, RelatedTableIDsStr: func(info *TxnInfo) types.Datum { - relatedTableIDs := info.RelatedTableIDs + var relatedTableIDs map[int64]struct{} + if info.ProcessInfo != nil { + relatedTableIDs = info.ProcessInfo.RelatedTableIDs + } str := strings.Builder{} first := true for tblID := range relatedTableIDs { diff --git a/pkg/testkit/BUILD.bazel b/pkg/testkit/BUILD.bazel index 72eaed4b42af5..93e29f8184c73 100644 --- a/pkg/testkit/BUILD.bazel +++ b/pkg/testkit/BUILD.bazel @@ -32,7 +32,6 @@ go_library( "//pkg/session", "//pkg/session/txninfo", "//pkg/session/types", - "//pkg/sessionctx", "//pkg/sessionctx/variable", "//pkg/store/driver", "//pkg/store/helper", diff --git a/pkg/testkit/mocksessionmanager.go b/pkg/testkit/mocksessionmanager.go index d1f2a41d64ba6..3f7504a5452a7 100644 --- a/pkg/testkit/mocksessionmanager.go +++ b/pkg/testkit/mocksessionmanager.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/session/txninfo" sessiontypes "github.com/pingcap/tidb/pkg/session/types" - "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/util" ) @@ -53,7 +52,7 @@ func (msm *MockSessionManager) ShowTxnList() []*txninfo.TxnInfo { rs := make([]*txninfo.TxnInfo, 0, len(msm.Conn)) for _, se := range msm.Conn { info := se.TxnInfo() - if info != nil { + if info != nil && info.ProcessInfo != nil { rs = append(rs, info) } } @@ -161,13 +160,6 @@ func (msm *MockSessionManager) GetInternalSessionStartTSList() []uint64 { } continue } - - se := internalSess.(sessionctx.Context) - sessVars := se.GetSessionVars() - sessVars.TxnCtxMu.Lock() - startTS := sessVars.TxnCtx.StartTS - sessVars.TxnCtxMu.Unlock() - ret = append(ret, startTS) } return ret } diff --git a/tests/realtikvtest/txntest/txn_state_test.go b/tests/realtikvtest/txntest/txn_state_test.go index 3df1e9f290305..88dbd8680fa7c 100644 --- a/tests/realtikvtest/txntest/txn_state_test.go +++ b/tests/realtikvtest/txntest/txn_state_test.go @@ -74,9 +74,9 @@ func TestBasicTxnState(t *testing.T) { require.Equal(t, []string{beginDigest.String(), selectTSDigest.String(), expectedDigest.String()}, info.AllSQLDigests) // len and size will be covered in TestLenAndSize - require.Equal(t, tk.Session().GetSessionVars().ConnectionID, info.ConnectionID) - require.Equal(t, "", info.Username) - require.Equal(t, "test", info.CurrentDB) + require.Equal(t, tk.Session().GetSessionVars().ConnectionID, info.ProcessInfo.ConnectionID) + require.Equal(t, "", info.ProcessInfo.Username) + require.Equal(t, "test", info.ProcessInfo.CurrentDB) require.Equal(t, startTS, info.StartTS) require.NoError(t, failpoint.Enable("tikvclient/beforePrewrite", "pause"))