Skip to content

Commit 4af46a5

Browse files
authored
txn: fix issue innodb_lock_wait_timeout doesn't work in some case (pingcap#56847) (pingcap#56919)
close pingcap#56688
1 parent 41f1b9e commit 4af46a5

File tree

5 files changed

+62
-0
lines changed

5 files changed

+62
-0
lines changed

executor/executor_txn_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"testing"
2323
"time"
2424

25+
"github.com/pingcap/failpoint"
2526
"github.com/pingcap/tidb/executor"
2627
"github.com/pingcap/tidb/sessionctx/binloginfo"
2728
"github.com/pingcap/tidb/testkit"
@@ -798,3 +799,40 @@ func (m mockPumpClient) WriteBinlog(ctx context.Context, in *binlog.WriteBinlogR
798799
func (m mockPumpClient) PullBinlogs(ctx context.Context, in *binlog.PullBinlogReq, opts ...grpc.CallOption) (binlog.Pump_PullBinlogsClient, error) {
799800
return nil, nil
800801
}
802+
803+
func TestInnodbLockWaitTimeout(t *testing.T) {
804+
store := testkit.CreateMockStore(t)
805+
tk := testkit.NewTestKit(t, store)
806+
tk.MustExec("use test")
807+
tk.MustExec("drop table if exists t")
808+
tk.MustExec("create table t (id int auto_increment, k int,c varchar(255), unique index idx(id))")
809+
tk.MustExec("insert into t (k,c) values (1,'abcdefg');")
810+
for i := 0; i < 8; i++ {
811+
tk.MustExec("insert into t (k,c) select k,c from t;")
812+
}
813+
tk.MustExec("update t set k= id, c = id")
814+
tk.MustExec("split table t by (0), (50), (100);")
815+
tk.MustExec("split table t index idx by (0), (50), (100);")
816+
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/tikv/pessimisticLockReturnWriteConflict", `return(true)`))
817+
defer func() {
818+
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/tikv/pessimisticLockReturnWriteConflict"))
819+
}()
820+
tk.MustExec("set @@innodb_lock_wait_timeout=1")
821+
isolations := []string{"REPEATABLE READ", "READ COMMITTED"}
822+
for _, isolation := range isolations {
823+
tk.MustExec("SET SESSION TRANSACTION ISOLATION LEVEL " + isolation)
824+
tk.MustExec("begin")
825+
start := time.Now()
826+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
827+
res, err := tk.ExecWithContext(ctx, "update t use index (idx) set k=k+1 where id >0;")
828+
cancel()
829+
if res != nil {
830+
require.NoError(t, res.Close())
831+
}
832+
require.Error(t, err)
833+
msg := fmt.Sprintf("cost: %v", time.Since(start))
834+
require.Equal(t, "lock wait timeout", err.Error(), msg)
835+
require.Less(t, time.Since(start), time.Second*2)
836+
tk.MustExec("commit")
837+
}
838+
}

sessiontxn/isolation/readcommitted.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,11 @@ func (p *PessimisticRCTxnContextProvider) handleAfterPessimisticLockError(ctx co
230230
return sessiontxn.ErrorAction(err)
231231
}
232232
} else if terror.ErrorEqual(kv.ErrWriteConflict, lockErr) {
233+
sessVars := p.sctx.GetSessionVars()
234+
waitTime := time.Since(sessVars.StmtCtx.GetLockWaitStartTime())
235+
if waitTime.Milliseconds() >= sessVars.LockWaitTimeout {
236+
return sessiontxn.ErrorAction(tikverr.ErrLockWaitTimeout)
237+
}
233238
logutil.Logger(p.ctx).Debug("pessimistic write conflict, retry statement",
234239
zap.Uint64("txn", txnCtx.StartTS),
235240
zap.Uint64("forUpdateTS", txnCtx.GetForUpdateTS()),

sessiontxn/isolation/repeatable_read.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,11 @@ func (p *PessimisticRRTxnContextProvider) handleAfterPessimisticLockError(ctx co
257257
return sessiontxn.ErrorAction(err)
258258
}
259259
} else if terror.ErrorEqual(kv.ErrWriteConflict, lockErr) {
260+
waitTime := time.Since(sessVars.StmtCtx.GetLockWaitStartTime())
261+
if waitTime.Milliseconds() >= sessVars.LockWaitTimeout {
262+
return sessiontxn.ErrorAction(tikverr.ErrLockWaitTimeout)
263+
}
264+
260265
// Always update forUpdateTS by getting a new timestamp from PD.
261266
// If we use the conflict commitTS as the new forUpdateTS and async commit
262267
// is used, the commitTS of this transaction may exceed the max timestamp

store/mockstore/unistore/tikv/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ go_library(
4141
"@com_github_pingcap_badger//:badger",
4242
"@com_github_pingcap_badger//y",
4343
"@com_github_pingcap_errors//:errors",
44+
"@com_github_pingcap_failpoint//:failpoint",
4445
"@com_github_pingcap_kvproto//pkg/coprocessor",
4546
"@com_github_pingcap_kvproto//pkg/deadlock",
4647
"@com_github_pingcap_kvproto//pkg/errorpb",

store/mockstore/unistore/tikv/server.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"time"
2222

2323
"github.com/pingcap/errors"
24+
"github.com/pingcap/failpoint"
2425
"github.com/pingcap/kvproto/pkg/coprocessor"
2526
deadlockPb "github.com/pingcap/kvproto/pkg/deadlock"
2627
"github.com/pingcap/kvproto/pkg/errorpb"
@@ -193,6 +194,18 @@ func (svr *Server) KvScan(ctx context.Context, req *kvrpcpb.ScanRequest) (*kvrpc
193194

194195
// KvPessimisticLock implements the tikvpb.TikvServer interface.
195196
func (svr *Server) KvPessimisticLock(ctx context.Context, req *kvrpcpb.PessimisticLockRequest) (*kvrpcpb.PessimisticLockResponse, error) {
197+
failpoint.Inject("pessimisticLockReturnWriteConflict", func(val failpoint.Value) {
198+
if val.(bool) {
199+
time.Sleep(time.Millisecond * 100)
200+
err := &kverrors.ErrConflict{
201+
StartTS: req.GetForUpdateTs(),
202+
ConflictTS: req.GetForUpdateTs() + 1,
203+
ConflictCommitTS: req.GetForUpdateTs() + 2,
204+
}
205+
failpoint.Return(&kvrpcpb.PessimisticLockResponse{Errors: []*kvrpcpb.KeyError{convertToKeyError(err)}}, nil)
206+
}
207+
})
208+
196209
reqCtx, err := newRequestCtx(svr, req.Context, "PessimisticLock")
197210
if err != nil {
198211
return &kvrpcpb.PessimisticLockResponse{Errors: []*kvrpcpb.KeyError{convertToKeyError(err)}}, nil

0 commit comments

Comments
 (0)