Skip to content

Commit 0e8ccbc

Browse files
authored
session: make TxnInfo() return even if process info is empty (#57044) (#57161)
close #57043
1 parent 62a1bb5 commit 0e8ccbc

File tree

13 files changed

+102
-47
lines changed

13 files changed

+102
-47
lines changed

pkg/ddl/index_cop.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"time"
2020

2121
"github.com/pingcap/errors"
22+
"github.com/pingcap/failpoint"
2223
"github.com/pingcap/tidb/pkg/ddl/copr"
2324
sess "github.com/pingcap/tidb/pkg/ddl/session"
2425
"github.com/pingcap/tidb/pkg/distsql"
@@ -46,11 +47,13 @@ func wrapInBeginRollback(se *sess.Session, f func(startTS uint64) error) error {
4647
return errors.Trace(err)
4748
}
4849
defer se.Rollback()
49-
var startTS uint64
50-
sessVars := se.GetSessionVars()
51-
sessVars.TxnCtxMu.Lock()
52-
startTS = sessVars.TxnCtx.StartTS
53-
sessVars.TxnCtxMu.Unlock()
50+
51+
txn, err := se.Txn()
52+
if err != nil {
53+
return err
54+
}
55+
startTS := txn.StartTS()
56+
failpoint.InjectCall("wrapInBeginRollbackStartTS", startTS)
5457
return f(startTS)
5558
}
5659

pkg/ddl/ingest/BUILD.bazel

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ go_test(
7272
embed = [":ingest"],
7373
flaky = True,
7474
race = "on",
75-
shard_count = 22,
75+
shard_count = 23,
7676
deps = [
7777
"//pkg/config",
7878
"//pkg/ddl/ingest/testutil",
@@ -82,7 +82,6 @@ go_test(
8282
"//pkg/meta/model",
8383
"//pkg/testkit",
8484
"//pkg/testkit/testfailpoint",
85-
"//tests/realtikvtest",
8685
"@com_github_ngaut_pools//:pools",
8786
"@com_github_pingcap_failpoint//:failpoint",
8887
"@com_github_stretchr_testify//assert",

pkg/ddl/ingest/integration_test.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"github.com/pingcap/tidb/pkg/meta/model"
2929
"github.com/pingcap/tidb/pkg/testkit"
3030
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
31-
"github.com/pingcap/tidb/tests/realtikvtest"
3231
"github.com/stretchr/testify/assert"
3332
"github.com/stretchr/testify/require"
3433
)
@@ -121,7 +120,7 @@ func TestIngestError(t *testing.T) {
121120
}
122121

123122
func TestAddIndexIngestPanic(t *testing.T) {
124-
store := realtikvtest.CreateMockStoreAndSetup(t)
123+
store := testkit.CreateMockStore(t)
125124
tk := testkit.NewTestKit(t, store)
126125
tk.MustExec("use test;")
127126
defer ingesttestutil.InjectMockBackendMgr(t, store)()
@@ -147,8 +146,34 @@ func TestAddIndexIngestPanic(t *testing.T) {
147146
})
148147
}
149148

149+
func TestAddIndexSetInternalSessions(t *testing.T) {
150+
store := testkit.CreateMockStore(t)
151+
tk := testkit.NewTestKit(t, store)
152+
tk.MustExec("use test;")
153+
defer ingesttestutil.InjectMockBackendMgr(t, store)()
154+
155+
tk.MustExec("set global tidb_enable_dist_task = 0;")
156+
tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;")
157+
tk.MustExec("create table t (a int);")
158+
tk.MustExec("insert into t values (1);")
159+
expectInternalTS := []uint64{}
160+
actualInternalTS := []uint64{}
161+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/wrapInBeginRollbackStartTS", func(startTS uint64) {
162+
expectInternalTS = append(expectInternalTS, startTS)
163+
})
164+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/scanRecordExec", func() {
165+
mgr := tk.Session().GetSessionManager()
166+
actualInternalTS = mgr.GetInternalSessionStartTSList()
167+
})
168+
tk.MustExec("alter table t add index idx(a);")
169+
require.Len(t, expectInternalTS, 1)
170+
for _, ts := range expectInternalTS {
171+
require.Contains(t, actualInternalTS, ts)
172+
}
173+
}
174+
150175
func TestAddIndexIngestCancel(t *testing.T) {
151-
store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t)
176+
store, dom := testkit.CreateMockStoreAndDomain(t)
152177
tk := testkit.NewTestKit(t, store)
153178
tk.MustExec("use test;")
154179
defer ingesttestutil.InjectMockBackendMgr(t, store)()

pkg/ddl/session/session_pool.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package session
1616

1717
import (
18+
"context"
1819
"fmt"
1920
"sync"
2021

@@ -73,6 +74,11 @@ func (sg *Pool) Get() (sessionctx.Context, error) {
7374
func (sg *Pool) Put(ctx sessionctx.Context) {
7475
// no need to protect sg.resPool, even the sg.resPool is closed, the ctx still need to
7576
// Put into resPool, because when resPool is closing, it will wait all the ctx returns, then resPool finish closing.
77+
intest.AssertFunc(func() bool {
78+
txn, _ := ctx.Txn(false)
79+
return txn == nil || !txn.Valid()
80+
})
81+
ctx.RollbackTxn(context.Background())
7682
sg.resPool.Put(ctx.(pools.Resource))
7783
infosync.DeleteInternalSession(ctx)
7884
}

pkg/domain/domain.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1281,8 +1281,12 @@ func NewDomain(store kv.Storage, schemaLease time.Duration, statsLease time.Dura
12811281
infosync.StoreInternalSession(r)
12821282
},
12831283
func(r pools.Resource) {
1284-
_, ok := r.(sessionctx.Context)
1284+
sctx, ok := r.(sessionctx.Context)
12851285
intest.Assert(ok)
1286+
intest.AssertFunc(func() bool {
1287+
txn, _ := sctx.Txn(false)
1288+
return txn == nil || !txn.Valid()
1289+
})
12861290
infosync.DeleteInternalSession(r)
12871291
},
12881292
),

pkg/executor/infoschema_reader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2900,7 +2900,7 @@ func (e *tidbTrxTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Co
29002900
for _, info := range infoList {
29012901
// If you have the PROCESS privilege, you can see all running transactions.
29022902
// Otherwise, you can see only your own transactions.
2903-
if !hasProcessPriv && loginUser != nil && info.Username != loginUser.Username {
2903+
if !hasProcessPriv && loginUser != nil && info.ProcessInfo.Username != loginUser.Username {
29042904
continue
29052905
}
29062906
e.txnInfo = append(e.txnInfo, info)

pkg/infoschema/test/clustertablestest/tables_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,9 +1232,11 @@ func TestTiDBTrx(t *testing.T) {
12321232
CurrentSQLDigest: digest.String(),
12331233
State: txninfo.TxnIdle,
12341234
EntriesCount: 1,
1235-
ConnectionID: 2,
1236-
Username: "root",
1237-
CurrentDB: "test",
1235+
ProcessInfo: &txninfo.ProcessInfo{
1236+
ConnectionID: 2,
1237+
Username: "root",
1238+
CurrentDB: "test",
1239+
},
12381240
}
12391241

12401242
blockTime2 := time.Date(2021, 05, 20, 13, 18, 30, 123456000, time.Local)
@@ -1243,9 +1245,11 @@ func TestTiDBTrx(t *testing.T) {
12431245
CurrentSQLDigest: "",
12441246
AllSQLDigests: []string{"sql1", "sql2", digest.String()},
12451247
State: txninfo.TxnLockAcquiring,
1246-
ConnectionID: 10,
1247-
Username: "user1",
1248-
CurrentDB: "db1",
1248+
ProcessInfo: &txninfo.ProcessInfo{
1249+
ConnectionID: 10,
1250+
Username: "user1",
1251+
CurrentDB: "db1",
1252+
},
12491253
}
12501254
sm.TxnInfo[1].BlockStartTime.Valid = true
12511255
sm.TxnInfo[1].BlockStartTime.Time = blockTime2

pkg/server/server.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -839,15 +839,16 @@ func (s *Server) getUserProcessList() map[uint64]*util.ProcessInfo {
839839
return rs
840840
}
841841

842-
// ShowTxnList shows all txn info for displaying in `TIDB_TRX`
842+
// ShowTxnList shows all txn info for displaying in `TIDB_TRX`.
843+
// Internal sessions are not taken into consideration.
843844
func (s *Server) ShowTxnList() []*txninfo.TxnInfo {
844845
s.rwlock.RLock()
845846
defer s.rwlock.RUnlock()
846847
rs := make([]*txninfo.TxnInfo, 0, len(s.clients))
847848
for _, client := range s.clients {
848849
if client.ctx.Session != nil {
849850
info := client.ctx.Session.TxnInfo()
850-
if info != nil {
851+
if info != nil && info.ProcessInfo != nil {
851852
rs = append(rs, info)
852853
}
853854
}

pkg/session/session.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,7 @@ func (s *session) FieldList(tableName string) ([]*resolve.ResultField, error) {
462462
}
463463

464464
// TxnInfo returns a pointer to a *copy* of the internal TxnInfo, thus is *read only*
465+
// Process field may not initialize if this is a session used internally.
465466
func (s *session) TxnInfo() *txninfo.TxnInfo {
466467
s.txn.mu.RLock()
467468
// Copy on read to get a snapshot, this API shouldn't be frequently called.
@@ -474,17 +475,18 @@ func (s *session) TxnInfo() *txninfo.TxnInfo {
474475

475476
processInfo := s.ShowProcess()
476477
if processInfo == nil {
477-
return nil
478+
return &txnInfo
479+
}
480+
txnInfo.ProcessInfo = &txninfo.ProcessInfo{
481+
ConnectionID: processInfo.ID,
482+
Username: processInfo.User,
483+
CurrentDB: processInfo.DB,
484+
RelatedTableIDs: make(map[int64]struct{}),
478485
}
479-
txnInfo.ConnectionID = processInfo.ID
480-
txnInfo.Username = processInfo.User
481-
txnInfo.CurrentDB = processInfo.DB
482-
txnInfo.RelatedTableIDs = make(map[int64]struct{})
483486
s.GetSessionVars().GetRelatedTableForMDL().Range(func(key, _ any) bool {
484-
txnInfo.RelatedTableIDs[key.(int64)] = struct{}{}
487+
txnInfo.ProcessInfo.RelatedTableIDs[key.(int64)] = struct{}{}
485488
return true
486489
})
487-
488490
return &txnInfo
489491
}
490492

@@ -4062,9 +4064,10 @@ func GetStartTSFromSession(se any) (startTS, processInfoID uint64) {
40624064
txnInfo := tmp.TxnInfo()
40634065
if txnInfo != nil {
40644066
startTS = txnInfo.StartTS
4065-
processInfoID = txnInfo.ConnectionID
4067+
if txnInfo.ProcessInfo != nil {
4068+
processInfoID = txnInfo.ProcessInfo.ConnectionID
4069+
}
40664070
}
4067-
40684071
logutil.BgLogger().Debug(
40694072
"GetStartTSFromSession getting startTS of internal session",
40704073
zap.Uint64("startTS", startTS), zap.Time("start time", oracle.GetTimeFromTS(startTS)))

pkg/session/txninfo/txn_info.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,12 @@ type TxnInfo struct {
174174
// How many entries are in MemDB
175175
EntriesCount uint64
176176

177-
// The following fields will be filled in `session` instead of `LazyTxn`
177+
// The following field will be filled in `session` instead of `LazyTxn`
178+
ProcessInfo *ProcessInfo
179+
}
178180

181+
// ProcessInfo is part of fields of txnInfo, which will be filled in `session` instead of `LazyTxn`
182+
type ProcessInfo struct {
179183
// Which session this transaction belongs to
180184
ConnectionID uint64
181185
// The user who open this session
@@ -219,13 +223,25 @@ var columnValueGetterMap = map[string]func(*TxnInfo) types.Datum{
219223
return types.NewDatum(info.EntriesCount)
220224
},
221225
SessionIDStr: func(info *TxnInfo) types.Datum {
222-
return types.NewDatum(info.ConnectionID)
226+
var connectionID uint64
227+
if info.ProcessInfo != nil {
228+
connectionID = info.ProcessInfo.ConnectionID
229+
}
230+
return types.NewDatum(connectionID)
223231
},
224232
UserStr: func(info *TxnInfo) types.Datum {
225-
return types.NewDatum(info.Username)
233+
var userName string
234+
if info.ProcessInfo != nil {
235+
userName = info.ProcessInfo.Username
236+
}
237+
return types.NewDatum(userName)
226238
},
227239
DBStr: func(info *TxnInfo) types.Datum {
228-
return types.NewDatum(info.CurrentDB)
240+
var currentDB string
241+
if info.ProcessInfo != nil {
242+
currentDB = info.ProcessInfo.CurrentDB
243+
}
244+
return types.NewDatum(currentDB)
229245
},
230246
AllSQLDigestsStr: func(info *TxnInfo) types.Datum {
231247
allSQLDigests := info.AllSQLDigests
@@ -241,7 +257,10 @@ var columnValueGetterMap = map[string]func(*TxnInfo) types.Datum{
241257
return types.NewDatum(string(res))
242258
},
243259
RelatedTableIDsStr: func(info *TxnInfo) types.Datum {
244-
relatedTableIDs := info.RelatedTableIDs
260+
var relatedTableIDs map[int64]struct{}
261+
if info.ProcessInfo != nil {
262+
relatedTableIDs = info.ProcessInfo.RelatedTableIDs
263+
}
245264
str := strings.Builder{}
246265
first := true
247266
for tblID := range relatedTableIDs {

0 commit comments

Comments
 (0)