Skip to content
2 changes: 2 additions & 0 deletions pkg/kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ const (
SessionID
// BackgroundGoroutineLifecycleHooks is the hooks to track the start and end of background goroutine
BackgroundGoroutineLifecycleHooks
// PrewriteEncounterLockPolicy is the policy to handle lock conflict during prewrite
PrewriteEncounterLockPolicy
)

// TxnSizeLimits is the argument type for `SizeLimits` option
Expand Down
18 changes: 9 additions & 9 deletions pkg/session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,23 +665,23 @@ type txnFuture struct {
}

func (tf *txnFuture) wait() (kv.Transaction, error) {
options := []tikv.TxnOption{tikv.WithTxnScope(tf.txnScope)}
startTS, err := tf.future.Wait()
failpoint.Inject("txnFutureWait", func() {})
if err == nil {
if tf.pipelined {
return tf.store.Begin(tikv.WithTxnScope(tf.txnScope), tikv.WithStartTS(startTS), tikv.WithDefaultPipelinedTxn())
options = append(options, tikv.WithStartTS(startTS))
} else {
if config.GetGlobalConfig().Store == config.StoreTypeUniStore {
return nil, err
}
return tf.store.Begin(tikv.WithTxnScope(tf.txnScope), tikv.WithStartTS(startTS))
} else if config.GetGlobalConfig().Store == config.StoreTypeUniStore {
return nil, err
logutil.BgLogger().Warn("wait tso failed", zap.Error(err))
}

logutil.BgLogger().Warn("wait tso failed", zap.Error(err))
// It would retry get timestamp.
if tf.pipelined {
return tf.store.Begin(tikv.WithTxnScope(tf.txnScope), tikv.WithDefaultPipelinedTxn())
options = append(options, tikv.WithDefaultPipelinedTxn())
}
return tf.store.Begin(tikv.WithTxnScope(tf.txnScope))

return tf.store.Begin(options...)
}

// HasDirtyContent checks whether there's dirty update on the given table.
Expand Down
15 changes: 15 additions & 0 deletions pkg/sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,21 @@ func (p *baseTxnContextProvider) SetOptionsBeforeCommit(
if commitTSChecker != nil {
txn.SetOption(kv.CommitTSUpperBoundCheck, commitTSChecker)
}

// Optimization:
// If an auto-commit optimistic transaction can retry in pessimistic mode,
// do not resolve locks when prewrite.
// 1. safety: The locks can be resolved later when it retries in pessimistic mode.
// 2. benefit: In high-contention scenarios, pessimistic transactions perform better.
prewriteEncounterLockPolicy := transaction.TryResolvePolicy
if sessVars.TxnCtx.CouldRetry &&
sessVars.IsAutocommit() &&
!sessVars.InTxn() &&
!sessVars.TxnCtx.IsPessimistic {
prewriteEncounterLockPolicy = transaction.NoResolvePolicy
}
txn.SetOption(kv.PrewriteEncounterLockPolicy, prewriteEncounterLockPolicy)

return nil
}

Expand Down
17 changes: 14 additions & 3 deletions pkg/sessiontxn/isolation/optimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,32 @@ func (p *OptimisticTxnContextProvider) ResetForNewTxn(sctx sessionctx.Context, c
p.getStmtForUpdateTSFunc = p.getTxnStartTS
}

func (p *OptimisticTxnContextProvider) onTxnActive(_ kv.Transaction, tp sessiontxn.EnterNewTxnType) {
func (p *OptimisticTxnContextProvider) onTxnActive(
txn kv.Transaction,
tp sessiontxn.EnterNewTxnType,
) {
sessVars := p.sctx.GetSessionVars()
sessVars.TxnCtx.CouldRetry = isOptimisticTxnRetryable(sessVars, tp)
sessVars.TxnCtx.CouldRetry = isOptimisticTxnRetryable(sessVars, tp, txn.IsPipelined())
}

// isOptimisticTxnRetryable (if returns true) means the transaction could retry.
// We only consider retry in this optimistic mode.
// If the session is already in transaction, enable retry or internal SQL could retry.
// If not, the transaction could always retry, because it should be auto committed transaction.
// Anyway the retry limit is 0, the transaction could not retry.
func isOptimisticTxnRetryable(sessVars *variable.SessionVars, tp sessiontxn.EnterNewTxnType) bool {
func isOptimisticTxnRetryable(
sessVars *variable.SessionVars,
tp sessiontxn.EnterNewTxnType,
isPipelined bool,
) bool {
if tp == sessiontxn.EnterNewTxnDefault {
return false
}

if isPipelined {
return false
}

// If retry limit is 0, the transaction could not retry.
if sessVars.RetryLimit == 0 {
return false
Expand Down
2 changes: 2 additions & 0 deletions pkg/store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ func (txn *tikvTxn) SetOption(opt int, val any) {
txn.KVTxn.SetSessionID(val.(uint64))
case kv.BackgroundGoroutineLifecycleHooks:
txn.KVTxn.SetBackgroundGoroutineLifecycleHooks(val.(transaction.LifecycleHooks))
case kv.PrewriteEncounterLockPolicy:
txn.KVTxn.SetPrewriteEncounterLockPolicy(val.(transaction.PrewriteEncounterLockPolicy))
}
}

Expand Down