Skip to content

Commit 47179ae

Browse files
authored
table: introduce RowIDShardGenerator and ReservedRowIDAlloc to alloc auto row id (#54789)
ref #54397
1 parent e6e8f7f commit 47179ae

File tree

18 files changed

+223
-41
lines changed

18 files changed

+223
-41
lines changed

pkg/ddl/partition.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3526,15 +3526,16 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo
35263526
// Non-clustered table / not unique _tidb_rowid for the whole table
35273527
// Generate new _tidb_rowid if exists.
35283528
// Due to EXCHANGE PARTITION, the existing _tidb_rowid may collide between partitions!
3529-
stmtCtx := w.sessCtx.GetSessionVars().StmtCtx
3530-
if stmtCtx.BaseRowID >= stmtCtx.MaxRowID {
3529+
if reserved, ok := w.tblCtx.GetReservedRowIDAlloc(); ok && reserved.Exhausted() {
35313530
// TODO: Which autoid allocator to use?
35323531
ids := uint64(max(1, w.batchCnt-len(w.rowRecords)))
35333532
// Keep using the original table's allocator
3534-
stmtCtx.BaseRowID, stmtCtx.MaxRowID, err = tables.AllocHandleIDs(w.ctx, w.tblCtx, w.reorgedTbl, ids)
3533+
var baseRowID, maxRowID int64
3534+
baseRowID, maxRowID, err = tables.AllocHandleIDs(w.ctx, w.tblCtx, w.reorgedTbl, ids)
35353535
if err != nil {
35363536
return false, errors.Trace(err)
35373537
}
3538+
reserved.Reset(baseRowID, maxRowID)
35383539
}
35393540
recordID, err := tables.AllocHandle(w.ctx, w.tblCtx, w.reorgedTbl)
35403541
if err != nil {

pkg/executor/insert_common.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1067,7 +1067,7 @@ func (e *InsertValues) allocAutoRandomID(ctx context.Context, fieldType *types.F
10671067
if err != nil {
10681068
return 0, err
10691069
}
1070-
currentShard := e.Ctx().GetSessionVars().GetCurrentShard(1)
1070+
currentShard := e.Ctx().GetSessionVars().GetRowIDShardGenerator().GetCurrentShard(1)
10711071
return shardFmt.Compose(currentShard, autoRandomID), nil
10721072
}
10731073

pkg/sessionctx/stmtctx/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ go_test(
4444
],
4545
embed = [":stmtctx"],
4646
flaky = True,
47-
shard_count = 12,
47+
shard_count = 13,
4848
deps = [
4949
"//pkg/errctx",
5050
"//pkg/kv",

pkg/sessionctx/stmtctx/stmtctx.go

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,33 @@ func (rf *ReferenceCount) UnFreeze() {
105105
atomic.StoreInt32((*int32)(rf), ReferenceCountNoReference)
106106
}
107107

108+
// ReservedRowIDAlloc is used to reserve autoID for the auto_increment column.
109+
type ReservedRowIDAlloc struct {
110+
base int64
111+
max int64
112+
}
113+
114+
// Reset resets the base and max of reserved rowIDs.
115+
func (r *ReservedRowIDAlloc) Reset(base int64, max int64) {
116+
r.base = base
117+
r.max = max
118+
}
119+
120+
// Consume consumes a reserved rowID.
121+
// If the second return value is false, it means the reserved rowID is exhausted.
122+
func (r *ReservedRowIDAlloc) Consume() (int64, bool) {
123+
if r.base < r.max {
124+
r.base++
125+
return r.base, true
126+
}
127+
return 0, false
128+
}
129+
130+
// Exhausted returns whether the reserved rowID is exhausted.
131+
func (r *ReservedRowIDAlloc) Exhausted() bool {
132+
return r.base >= r.max
133+
}
134+
108135
// StatementContext contains variables for a statement.
109136
// It should be reset before executing a statement.
110137
type StatementContext struct {
@@ -223,8 +250,8 @@ type StatementContext struct {
223250
// InsertID is the given insert ID of an auto_increment column.
224251
InsertID uint64
225252

226-
BaseRowID int64
227-
MaxRowID int64
253+
// ReservedRowIDAlloc is used to alloc auto ID from the reserved IDs.
254+
ReservedRowIDAlloc ReservedRowIDAlloc
228255

229256
// Copied from SessionVars.TimeZone.
230257
Priority mysql.PriorityEnum
@@ -972,8 +999,7 @@ func (sc *StatementContext) resetMuForRetry() {
972999
// ResetForRetry resets the changed states during execution.
9731000
func (sc *StatementContext) ResetForRetry() {
9741001
sc.resetMuForRetry()
975-
sc.MaxRowID = 0
976-
sc.BaseRowID = 0
1002+
sc.ReservedRowIDAlloc.Reset(0, 0)
9771003
sc.TableIDs = sc.TableIDs[:0]
9781004
sc.IndexNames = sc.IndexNames[:0]
9791005
sc.TaskID = AllocateTaskID()

pkg/sessionctx/stmtctx/stmtctx_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,32 @@ func TestErrCtx(t *testing.T) {
464464
require.Equal(t, errctx.NewContextWithLevels(levels, sc), sc.ErrCtx())
465465
}
466466

467+
func TestReservedRowIDAlloc(t *testing.T) {
468+
var reserved stmtctx.ReservedRowIDAlloc
469+
// no reserved by default
470+
require.True(t, reserved.Exhausted())
471+
id, ok := reserved.Consume()
472+
require.False(t, ok)
473+
require.Equal(t, int64(0), id)
474+
// reset some ids
475+
reserved.Reset(12, 15)
476+
require.False(t, reserved.Exhausted())
477+
id, ok = reserved.Consume()
478+
require.True(t, ok)
479+
require.Equal(t, int64(13), id)
480+
id, ok = reserved.Consume()
481+
require.True(t, ok)
482+
require.Equal(t, int64(14), id)
483+
id, ok = reserved.Consume()
484+
require.True(t, ok)
485+
require.Equal(t, int64(15), id)
486+
// exhausted
487+
require.True(t, reserved.Exhausted())
488+
id, ok = reserved.Consume()
489+
require.False(t, ok)
490+
require.Equal(t, int64(0), id)
491+
}
492+
467493
func BenchmarkErrCtx(b *testing.B) {
468494
sc := stmtctx.NewStmtCtx()
469495

pkg/sessionctx/variable/session.go

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import (
5353
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
5454
"github.com/pingcap/tidb/pkg/util/disk"
5555
"github.com/pingcap/tidb/pkg/util/execdetails"
56+
"github.com/pingcap/tidb/pkg/util/intest"
5657
"github.com/pingcap/tidb/pkg/util/kvcache"
5758
"github.com/pingcap/tidb/pkg/util/mathutil"
5859
"github.com/pingcap/tidb/pkg/util/memory"
@@ -203,11 +204,6 @@ type TxnCtxNoNeedToRestore struct {
203204
StartTS uint64
204205
StaleReadTs uint64
205206

206-
// ShardStep indicates the max size of continuous rowid shard in one transaction.
207-
ShardStep int
208-
shardRemain int
209-
currentShard int64
210-
211207
// unchangedKeys is used to store the unchanged keys that needs to lock for pessimistic transaction.
212208
unchangedKeys map[string]struct{}
213209

@@ -269,24 +265,62 @@ type SavepointRecord struct {
269265
TxnCtxSavepoint TxnCtxNeedToRestore
270266
}
271267

272-
// GetCurrentShard returns the shard for the next `count` IDs.
273-
func (s *SessionVars) GetCurrentShard(count int) int64 {
274-
tc := s.TxnCtx
275-
if s.shardRand == nil {
276-
s.shardRand = rand.New(rand.NewSource(int64(tc.StartTS))) // #nosec G404
268+
// RowIDShardGenerator is used to generate shard for row id.
269+
type RowIDShardGenerator struct {
270+
// shardRand is used for generated rand shard
271+
shardRand *rand.Rand
272+
// shardStep indicates the max size of continuous rowid shard in one transaction.
273+
shardStep int
274+
shardRemain int
275+
currentShard int64
276+
}
277+
278+
// NewRowIDShardGenerator creates a new RowIDShardGenerator.
279+
func NewRowIDShardGenerator(shardRand *rand.Rand, step int) *RowIDShardGenerator {
280+
intest.AssertNotNil(shardRand)
281+
return &RowIDShardGenerator{
282+
shardRand: shardRand,
283+
shardStep: step,
277284
}
278-
if tc.shardRemain <= 0 {
279-
tc.updateShard(s.shardRand)
280-
tc.shardRemain = tc.ShardStep
285+
}
286+
287+
// SetShardStep sets the step of shard
288+
func (s *RowIDShardGenerator) SetShardStep(step int) {
289+
s.shardStep = step
290+
s.shardRemain = 0
291+
}
292+
293+
// GetShardStep returns the shard step
294+
func (s *RowIDShardGenerator) GetShardStep() int {
295+
return s.shardStep
296+
}
297+
298+
// GetCurrentShard returns the shard for the next `count` IDs.
299+
func (s *RowIDShardGenerator) GetCurrentShard(count int) int64 {
300+
if s.shardRemain <= 0 {
301+
s.updateShard(s.shardRand)
302+
s.shardRemain = s.GetShardStep()
281303
}
282-
tc.shardRemain -= count
283-
return tc.currentShard
304+
s.shardRemain -= count
305+
return s.currentShard
284306
}
285307

286-
func (tc *TransactionContext) updateShard(shardRand *rand.Rand) {
308+
func (s *RowIDShardGenerator) updateShard(shardRand *rand.Rand) {
287309
var buf [8]byte
288310
binary.LittleEndian.PutUint64(buf[:], shardRand.Uint64())
289-
tc.currentShard = int64(murmur3.Sum32(buf[:]))
311+
s.currentShard = int64(murmur3.Sum32(buf[:]))
312+
}
313+
314+
// GetRowIDShardGenerator shard row id generator
315+
func (s *SessionVars) GetRowIDShardGenerator() *RowIDShardGenerator {
316+
if s.shardGenerator != nil {
317+
return s.shardGenerator
318+
}
319+
320+
intest.Assert(s.TxnCtx.StartTS > 0)
321+
r := rand.New(rand.NewSource(int64(s.TxnCtx.StartTS))) // #nosec G404
322+
s.shardGenerator = NewRowIDShardGenerator(r, int(s.ShardAllocateStep))
323+
return s.shardGenerator
290324
}
291325

292326
// AddUnchangedKeyForLock adds an unchanged key for pessimistic lock.
@@ -1514,8 +1548,8 @@ type SessionVars struct {
15141548
// StoreBatchSize indicates the batch size limit of store batch, set this field to 0 to disable store batch.
15151549
StoreBatchSize int
15161550

1517-
// shardRand is used by TxnCtx, for the GetCurrentShard() method.
1518-
shardRand *rand.Rand
1551+
// shardGenerator indicates to generate shard for row id.
1552+
shardGenerator *RowIDShardGenerator
15191553

15201554
// Resource group name
15211555
// NOTE: all statement relate operation should use StmtCtx.ResourceGroupName instead.

pkg/sessionctx/variable/session_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package variable_test
1616

1717
import (
1818
"context"
19+
"math/rand"
1920
"strconv"
2021
"sync"
2122
"testing"
@@ -602,3 +603,27 @@ func TestMapDeltaCols(t *testing.T) {
602603
}
603604
}
604605
}
606+
607+
func TestRowIDShardGenerator(t *testing.T) {
608+
g := variable.NewRowIDShardGenerator(rand.New(rand.NewSource(12345)), 128) // #nosec G404)
609+
// default settings
610+
require.Equal(t, 128, g.GetShardStep())
611+
shard := g.GetCurrentShard(127)
612+
require.Equal(t, int64(3535546008), shard)
613+
require.Equal(t, shard, g.GetCurrentShard(1))
614+
// reset alloc step
615+
g.SetShardStep(5)
616+
require.Equal(t, 5, g.GetShardStep())
617+
// generate shard in step
618+
shard = g.GetCurrentShard(1)
619+
require.Equal(t, int64(1371624976), shard)
620+
require.Equal(t, shard, g.GetCurrentShard(1))
621+
require.Equal(t, shard, g.GetCurrentShard(1))
622+
require.Equal(t, shard, g.GetCurrentShard(2))
623+
// generate shard in next step
624+
shard = g.GetCurrentShard(1)
625+
require.Equal(t, int64(895725277), shard)
626+
// set step will reset clear remain
627+
g.SetShardStep(5)
628+
require.NotEqual(t, shard, g.GetCurrentShard(1))
629+
}

pkg/sessiontxn/isolation/base.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,6 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn
120120
TxnCtxNoNeedToRestore: variable.TxnCtxNoNeedToRestore{
121121
CreateTime: time.Now(),
122122
InfoSchema: p.infoSchema,
123-
ShardStep: int(sessVars.ShardAllocateStep),
124123
TxnScope: sessVars.CheckAndGetTxnScope(),
125124
},
126125
}
@@ -295,6 +294,7 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) {
295294
sessVars := p.sctx.GetSessionVars()
296295
sessVars.TxnCtxMu.Lock()
297296
sessVars.TxnCtx.StartTS = txn.StartTS()
297+
sessVars.GetRowIDShardGenerator().SetShardStep(int(sessVars.ShardAllocateStep))
298298
sessVars.TxnCtxMu.Unlock()
299299
if sessVars.MemDBFootprint != nil {
300300
sessVars.MemDBFootprint.Detach()

pkg/sessiontxn/isolation/main_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (a *txnAssert[T]) Check(t testing.TB) {
8181
require.Equal(t, a.isolation, txnCtx.Isolation)
8282
require.Equal(t, a.isolation != "", txnCtx.IsPessimistic)
8383
require.Equal(t, sessVars.CheckAndGetTxnScope(), txnCtx.TxnScope)
84-
require.Equal(t, sessVars.ShardAllocateStep, int64(txnCtx.ShardStep))
84+
require.Equal(t, sessVars.ShardAllocateStep, int64(sessVars.GetRowIDShardGenerator().GetShardStep()))
8585
require.False(t, txnCtx.IsStaleness)
8686
require.GreaterOrEqual(t, txnCtx.CreateTime.UnixNano(), a.minStartTime.UnixNano())
8787
require.Equal(t, a.inTxn, sessVars.InTxn())

pkg/sessiontxn/staleread/provider.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,11 @@ func (p *StalenessTxnContextProvider) activateStaleTxn() error {
120120
InfoSchema: is,
121121
CreateTime: time.Now(),
122122
StartTS: txn.StartTS(),
123-
ShardStep: int(sessVars.ShardAllocateStep),
124123
IsStaleness: true,
125124
TxnScope: txnScope,
126125
},
127126
}
127+
sessVars.GetRowIDShardGenerator().SetShardStep(int(sessVars.ShardAllocateStep))
128128
sessVars.TxnCtxMu.Unlock()
129129

130130
if interceptor := temptable.SessionSnapshotInterceptor(p.sctx, is); interceptor != nil {

0 commit comments

Comments
 (0)