Skip to content

Commit f00cd31

Browse files
committed
br: sort meta kv by ts when updating table history (pingcap#61366)
close pingcap#61367
1 parent 74a1553 commit f00cd31

File tree

8 files changed

+679
-218
lines changed

8 files changed

+679
-218
lines changed

br/pkg/restore/log_client/batch_meta_processor.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ func (mp *MetaKVInfoProcessor) ReadMetaKVFilesAndBuildInfo(
150150
); err != nil {
151151
return errors.Trace(err)
152152
}
153+
mp.tableMappingManager.CleanTempKV()
153154
return nil
154155
}
155156

@@ -168,7 +169,8 @@ func (mp *MetaKVInfoProcessor) ProcessBatch(
168169
// process entries to collect table IDs
169170
for _, entry := range curSortedEntries {
170171
// parse entry and do the table mapping, using tableHistoryManager as the collector
171-
if err = mp.tableMappingManager.ParseMetaKvAndUpdateIdMapping(&entry.E, cf, mp.tableHistoryManager); err != nil {
172+
if err = mp.tableMappingManager.ParseMetaKvAndUpdateIdMapping(&entry.E, cf, entry.Ts,
173+
mp.tableHistoryManager); err != nil {
172174
return nil, errors.Trace(err)
173175
}
174176
}

br/pkg/stream/meta_kv.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,11 @@ func (v *RawWriteCFValue) IsDelete() bool {
220220
return v.GetWriteType() == WriteTypeDelete
221221
}
222222

223+
// IsPut checks whether the value in cf is a `put` record.
224+
func (v *RawWriteCFValue) IsPut() bool {
225+
return v.GetWriteType() == WriteTypePut
226+
}
227+
223228
// HasShortValue checks whether short value is stored in write cf.
224229
func (v *RawWriteCFValue) HasShortValue() bool {
225230
return len(v.shortValue) > 0

br/pkg/stream/rewrite_meta_rawkv_test.go

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/pingcap/tidb/pkg/parser/mysql"
1919
"github.com/pingcap/tidb/pkg/tablecodec"
2020
"github.com/pingcap/tidb/pkg/types"
21+
"github.com/pingcap/tidb/pkg/util/codec"
2122
"github.com/stretchr/testify/require"
2223
)
2324

@@ -407,6 +408,7 @@ func TestRewriteTableInfoForExchangePartition(t *testing.T) {
407408

408409
tm := NewTableMappingManager()
409410
tm.MergeBaseDBReplace(dbMap)
411+
collector := NewMockMetaInfoCollector()
410412

411413
//exchange partition, t1 partition0 with the t2
412414
t1Copy := t1.Clone()
@@ -416,15 +418,32 @@ func TestRewriteTableInfoForExchangePartition(t *testing.T) {
416418
value, err := json.Marshal(&t1Copy)
417419
require.Nil(t, err)
418420

419-
// Create an entry for parsing
421+
// Create an entry for parsing with DefaultCF first
420422
txnKey := utils.EncodeTxnMetaKey(meta.DBkey(dbID1), meta.TableKey(tableID1), ts)
421-
entry := &kv.Entry{
423+
defaultCFEntry := &kv.Entry{
422424
Key: txnKey,
423425
Value: value,
424426
}
425-
err = tm.ParseMetaKvAndUpdateIdMapping(entry, consts.DefaultCF, NewMockMetaInfoCollector())
427+
err = tm.ParseMetaKvAndUpdateIdMapping(defaultCFEntry, consts.DefaultCF, ts, collector)
426428
require.Nil(t, err)
427429

430+
// Verify that collector is not called for DefaultCF
431+
require.NotContains(t, collector.tableInfos, dbID1)
432+
433+
// Now process with WriteCF to make table info visible
434+
writeCFData := []byte{WriteTypePut}
435+
writeCFData = codec.EncodeUvarint(writeCFData, ts)
436+
writeCFEntry := &kv.Entry{
437+
Key: txnKey,
438+
Value: writeCFData,
439+
}
440+
err = tm.ParseMetaKvAndUpdateIdMapping(writeCFEntry, consts.WriteCF, ts+1, collector)
441+
require.Nil(t, err)
442+
443+
// Verify that collector is now called for WriteCF
444+
require.Contains(t, collector.tableInfos, dbID1)
445+
require.Contains(t, collector.tableInfos[dbID1], tableID1)
446+
428447
sr := NewSchemasReplace(
429448
tm.DBReplaceMap,
430449
nil,
@@ -446,15 +465,32 @@ func TestRewriteTableInfoForExchangePartition(t *testing.T) {
446465
value, err = json.Marshal(&t2Copy)
447466
require.Nil(t, err)
448467

449-
// Create an entry for parsing the second table
468+
// Create an entry for parsing the second table with DefaultCF first
450469
txnKey = utils.EncodeTxnMetaKey(meta.DBkey(dbID2), meta.TableKey(pt1ID), ts)
451-
entry = &kv.Entry{
470+
defaultCFEntry2 := &kv.Entry{
452471
Key: txnKey,
453472
Value: value,
454473
}
455-
err = tm.ParseMetaKvAndUpdateIdMapping(entry, consts.DefaultCF, NewMockMetaInfoCollector())
474+
err = tm.ParseMetaKvAndUpdateIdMapping(defaultCFEntry2, consts.DefaultCF, ts, collector)
456475
require.Nil(t, err)
457476

477+
// Verify that collector is not called for DefaultCF for the second table
478+
require.NotContains(t, collector.tableInfos[dbID2], pt1ID)
479+
480+
// Now process with WriteCF for the second table
481+
writeCFData2 := []byte{WriteTypePut}
482+
writeCFData2 = codec.EncodeUvarint(writeCFData2, ts)
483+
writeCFEntry2 := &kv.Entry{
484+
Key: txnKey,
485+
Value: writeCFData2,
486+
}
487+
err = tm.ParseMetaKvAndUpdateIdMapping(writeCFEntry2, consts.WriteCF, ts+1, collector)
488+
require.Nil(t, err)
489+
490+
// Verify that collector is now called for WriteCF for the second table
491+
require.Contains(t, collector.tableInfos, dbID2)
492+
require.Contains(t, collector.tableInfos[dbID2], pt1ID)
493+
458494
value, err = sr.rewriteTableInfo(value, dbID2)
459495
require.Nil(t, err)
460496
err = json.Unmarshal(value, &tableInfo)

br/pkg/stream/table_history.go

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,50 +14,52 @@
1414

1515
package stream
1616

17-
import (
18-
"github.com/pingcap/tidb/pkg/meta/model"
19-
)
20-
2117
// TableLocationInfo stores the table name, db id, and parent table id if is a partition
2218
type TableLocationInfo struct {
2319
DbID int64
2420
TableName string
2521
IsPartition bool
26-
ParentTableID int64 // only meaningful when IsPartition is true
22+
ParentTableID int64 // only meaningful when IsPartition is true
23+
Timestamp uint64 // timestamp when this location info was recorded
2724
}
2825

2926
type LogBackupTableHistoryManager struct {
3027
// maps table/partition ID to [original, current] location info
3128
tableNameHistory map[int64][2]TableLocationInfo
3229
dbIdToName map[int64]string
30+
// maps db ID to timestamp when it was last updated
31+
dbTimestamps map[int64]uint64
3332
}
3433

3534
func NewTableHistoryManager() *LogBackupTableHistoryManager {
3635
return &LogBackupTableHistoryManager{
3736
tableNameHistory: make(map[int64][2]TableLocationInfo),
3837
dbIdToName: make(map[int64]string),
38+
dbTimestamps: make(map[int64]uint64),
3939
}
4040
}
4141

4242
// AddTableHistory adds or updates history for a regular table
43-
func (info *LogBackupTableHistoryManager) AddTableHistory(tableId int64, tableName string, dbID int64) {
43+
func (info *LogBackupTableHistoryManager) AddTableHistory(tableId int64, tableName string, dbID int64, ts uint64) {
4444
locationInfo := TableLocationInfo{
4545
DbID: dbID,
4646
TableName: tableName,
4747
IsPartition: false,
4848
ParentTableID: 0,
49+
Timestamp: ts,
4950
}
5051
info.addHistory(tableId, locationInfo)
5152
}
5253

5354
// AddPartitionHistory adds or updates history for a partition
5455
func (info *LogBackupTableHistoryManager) AddPartitionHistory(partitionID int64, tableName string,
55-
dbID int64, parentTableID int64) {
56+
dbID int64, parentTableID int64, ts uint64) {
5657
locationInfo := TableLocationInfo{
5758
DbID: dbID,
5859
TableName: tableName,
5960
IsPartition: true,
6061
ParentTableID: parentTableID,
62+
Timestamp: ts,
6163
}
6264
info.addHistory(partitionID, locationInfo)
6365
}
@@ -69,12 +71,20 @@ func (info *LogBackupTableHistoryManager) addHistory(id int64, locationInfo Tabl
6971
// first occurrence - store as both original and current
7072
info.tableNameHistory[id] = [2]TableLocationInfo{locationInfo, locationInfo}
7173
} else {
72-
info.tableNameHistory[id] = [2]TableLocationInfo{existing[0], locationInfo}
74+
// only update if the new timestamp is newer than the current one
75+
if locationInfo.Timestamp >= existing[1].Timestamp {
76+
info.tableNameHistory[id] = [2]TableLocationInfo{existing[0], locationInfo}
77+
}
78+
// if timestamp is older, don't update (keep the newer entry)
7379
}
7480
}
7581

76-
func (info *LogBackupTableHistoryManager) RecordDBIdToName(dbId int64, dbName string) {
77-
info.dbIdToName[dbId] = dbName
82+
func (info *LogBackupTableHistoryManager) RecordDBIdToName(dbId int64, dbName string, ts uint64) {
83+
// only update if the new timestamp is newer than the existing one
84+
if existingTs, exists := info.dbTimestamps[dbId]; !exists || ts >= existingTs {
85+
info.dbIdToName[dbId] = dbName
86+
info.dbTimestamps[dbId] = ts
87+
}
7888
}
7989

8090
// GetTableHistory returns information about all tables that have been renamed.
@@ -93,18 +103,17 @@ func (info *LogBackupTableHistoryManager) GetNewlyCreatedDBHistory() map[int64]s
93103
}
94104

95105
// OnDatabaseInfo implements MetaInfoCollector.OnDatabaseInfo
96-
func (info *LogBackupTableHistoryManager) OnDatabaseInfo(dbInfo *model.DBInfo) {
97-
info.RecordDBIdToName(dbInfo.ID, dbInfo.Name.O)
106+
func (info *LogBackupTableHistoryManager) OnDatabaseInfo(dbId int64, dbName string, ts uint64) {
107+
info.RecordDBIdToName(dbId, dbName, ts)
98108
}
99109

100110
// OnTableInfo implements MetaInfoCollector.OnTableInfo
101-
func (info *LogBackupTableHistoryManager) OnTableInfo(dbID int64, tableInfo *model.TableInfo) {
102-
info.AddTableHistory(tableInfo.ID, tableInfo.Name.O, dbID)
111+
func (info *LogBackupTableHistoryManager) OnTableInfo(
112+
dbID, tableId int64, tableSimpleInfo *tableSimpleInfo, commitTs uint64) {
113+
info.AddTableHistory(tableId, tableSimpleInfo.Name, dbID, commitTs)
103114

104115
// add history for all partitions if this is a partitioned table
105-
if tableInfo.Partition != nil && tableInfo.Partition.Definitions != nil {
106-
for _, partition := range tableInfo.Partition.Definitions {
107-
info.AddPartitionHistory(partition.ID, tableInfo.Name.O, dbID, tableInfo.ID)
108-
}
116+
for _, partitionId := range tableSimpleInfo.PartitionIds {
117+
info.AddPartitionHistory(partitionId, tableSimpleInfo.Name, dbID, tableId, commitTs)
109118
}
110119
}

0 commit comments

Comments
 (0)