Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion br/pkg/restore/log_client/batch_meta_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (mp *MetaKVInfoProcessor) ReadMetaKVFilesAndBuildInfo(
); err != nil {
return errors.Trace(err)
}
mp.tableMappingManager.CleanTempKV()
return nil
}

Expand All @@ -162,7 +163,8 @@ func (mp *MetaKVInfoProcessor) ProcessBatch(
// process entries to collect table IDs
for _, entry := range curSortedEntries {
// parse entry and do the table mapping, using tableHistoryManager as the collector
if err = mp.tableMappingManager.ParseMetaKvAndUpdateIdMapping(&entry.E, cf, mp.tableHistoryManager); err != nil {
if err = mp.tableMappingManager.ParseMetaKvAndUpdateIdMapping(&entry.E, cf, entry.Ts,
mp.tableHistoryManager); err != nil {
return nil, errors.Trace(err)
}
}
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/stream/meta_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ func (v *RawWriteCFValue) IsDelete() bool {
return v.GetWriteType() == WriteTypeDelete
}

// IsPut checks whether the value in cf is a `put` record.
func (v *RawWriteCFValue) IsPut() bool {
return v.GetWriteType() == WriteTypePut
}

// HasShortValue checks whether short value is stored in write cf.
func (v *RawWriteCFValue) HasShortValue() bool {
return len(v.shortValue) > 0
Expand Down
48 changes: 42 additions & 6 deletions br/pkg/stream/rewrite_meta_rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -406,6 +407,7 @@ func TestRewriteTableInfoForExchangePartition(t *testing.T) {

tm := NewTableMappingManager()
tm.MergeBaseDBReplace(dbMap)
collector := NewMockMetaInfoCollector()

//exchange partition, t1 partition0 with the t2
t1Copy := t1.Clone()
Expand All @@ -415,15 +417,32 @@ func TestRewriteTableInfoForExchangePartition(t *testing.T) {
value, err := json.Marshal(&t1Copy)
require.Nil(t, err)

// Create an entry for parsing
// Create an entry for parsing with DefaultCF first
txnKey := utils.EncodeTxnMetaKey(meta.DBkey(dbID1), meta.TableKey(tableID1), ts)
entry := &kv.Entry{
defaultCFEntry := &kv.Entry{
Key: txnKey,
Value: value,
}
err = tm.ParseMetaKvAndUpdateIdMapping(entry, consts.DefaultCF, NewMockMetaInfoCollector())
err = tm.ParseMetaKvAndUpdateIdMapping(defaultCFEntry, consts.DefaultCF, ts, collector)
require.Nil(t, err)

// Verify that collector is not called for DefaultCF
require.NotContains(t, collector.tableInfos, dbID1)

// Now process with WriteCF to make table info visible
writeCFData := []byte{WriteTypePut}
writeCFData = codec.EncodeUvarint(writeCFData, ts)
writeCFEntry := &kv.Entry{
Key: txnKey,
Value: writeCFData,
}
err = tm.ParseMetaKvAndUpdateIdMapping(writeCFEntry, consts.WriteCF, ts+1, collector)
require.Nil(t, err)

// Verify that collector is now called for WriteCF
require.Contains(t, collector.tableInfos, dbID1)
require.Contains(t, collector.tableInfos[dbID1], tableID1)

sr := NewSchemasReplace(
tm.DBReplaceMap,
nil,
Expand All @@ -445,15 +464,32 @@ func TestRewriteTableInfoForExchangePartition(t *testing.T) {
value, err = json.Marshal(&t2Copy)
require.Nil(t, err)

// Create an entry for parsing the second table
// Create an entry for parsing the second table with DefaultCF first
txnKey = utils.EncodeTxnMetaKey(meta.DBkey(dbID2), meta.TableKey(pt1ID), ts)
entry = &kv.Entry{
defaultCFEntry2 := &kv.Entry{
Key: txnKey,
Value: value,
}
err = tm.ParseMetaKvAndUpdateIdMapping(entry, consts.DefaultCF, NewMockMetaInfoCollector())
err = tm.ParseMetaKvAndUpdateIdMapping(defaultCFEntry2, consts.DefaultCF, ts, collector)
require.Nil(t, err)

// Verify that collector is not called for DefaultCF for the second table
require.NotContains(t, collector.tableInfos[dbID2], pt1ID)

// Now process with WriteCF for the second table
writeCFData2 := []byte{WriteTypePut}
writeCFData2 = codec.EncodeUvarint(writeCFData2, ts)
writeCFEntry2 := &kv.Entry{
Key: txnKey,
Value: writeCFData2,
}
err = tm.ParseMetaKvAndUpdateIdMapping(writeCFEntry2, consts.WriteCF, ts+1, collector)
require.Nil(t, err)

// Verify that collector is now called for WriteCF for the second table
require.Contains(t, collector.tableInfos, dbID2)
require.Contains(t, collector.tableInfos[dbID2], pt1ID)

value, err = sr.rewriteTableInfo(value, dbID2)
require.Nil(t, err)
err = json.Unmarshal(value, &tableInfo)
Expand Down
45 changes: 27 additions & 18 deletions br/pkg/stream/table_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,50 +14,52 @@

package stream

import (
"github.com/pingcap/tidb/pkg/meta/model"
)

// TableLocationInfo stores the table name, db id, and parent table id if is a partition
type TableLocationInfo struct {
DbID int64
TableName string
IsPartition bool
ParentTableID int64 // only meaningful when IsPartition is true
ParentTableID int64 // only meaningful when IsPartition is true
Timestamp uint64 // timestamp when this location info was recorded
}

type LogBackupTableHistoryManager struct {
// maps table/partition ID to [original, current] location info
tableNameHistory map[int64][2]TableLocationInfo
dbIdToName map[int64]string
// maps db ID to timestamp when it was last updated
dbTimestamps map[int64]uint64
}

func NewTableHistoryManager() *LogBackupTableHistoryManager {
return &LogBackupTableHistoryManager{
tableNameHistory: make(map[int64][2]TableLocationInfo),
dbIdToName: make(map[int64]string),
dbTimestamps: make(map[int64]uint64),
}
}

// AddTableHistory adds or updates history for a regular table
func (info *LogBackupTableHistoryManager) AddTableHistory(tableId int64, tableName string, dbID int64) {
func (info *LogBackupTableHistoryManager) AddTableHistory(tableId int64, tableName string, dbID int64, ts uint64) {
locationInfo := TableLocationInfo{
DbID: dbID,
TableName: tableName,
IsPartition: false,
ParentTableID: 0,
Timestamp: ts,
}
info.addHistory(tableId, locationInfo)
}

// AddPartitionHistory adds or updates history for a partition
func (info *LogBackupTableHistoryManager) AddPartitionHistory(partitionID int64, tableName string,
dbID int64, parentTableID int64) {
dbID int64, parentTableID int64, ts uint64) {
locationInfo := TableLocationInfo{
DbID: dbID,
TableName: tableName,
IsPartition: true,
ParentTableID: parentTableID,
Timestamp: ts,
}
info.addHistory(partitionID, locationInfo)
}
Expand All @@ -69,12 +71,20 @@ func (info *LogBackupTableHistoryManager) addHistory(id int64, locationInfo Tabl
// first occurrence - store as both original and current
info.tableNameHistory[id] = [2]TableLocationInfo{locationInfo, locationInfo}
} else {
info.tableNameHistory[id] = [2]TableLocationInfo{existing[0], locationInfo}
// only update if the new timestamp is newer than the current one
if locationInfo.Timestamp >= existing[1].Timestamp {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, only the write CF kv can trigger OnTableInfo or OnDatabase. Therefore, maybe it is always true.

info.tableNameHistory[id] = [2]TableLocationInfo{existing[0], locationInfo}
}
// if timestamp is older, don't update (keep the newer entry)
}
}

func (info *LogBackupTableHistoryManager) RecordDBIdToName(dbId int64, dbName string) {
info.dbIdToName[dbId] = dbName
func (info *LogBackupTableHistoryManager) RecordDBIdToName(dbId int64, dbName string, ts uint64) {
// only update if the new timestamp is newer than the existing one
if existingTs, exists := info.dbTimestamps[dbId]; !exists || ts >= existingTs {
info.dbIdToName[dbId] = dbName
info.dbTimestamps[dbId] = ts
}
}

// GetTableHistory returns information about all tables that have been renamed.
Expand All @@ -93,18 +103,17 @@ func (info *LogBackupTableHistoryManager) GetNewlyCreatedDBHistory() map[int64]s
}

// OnDatabaseInfo implements MetaInfoCollector.OnDatabaseInfo
func (info *LogBackupTableHistoryManager) OnDatabaseInfo(dbInfo *model.DBInfo) {
info.RecordDBIdToName(dbInfo.ID, dbInfo.Name.O)
func (info *LogBackupTableHistoryManager) OnDatabaseInfo(dbId int64, dbName string, ts uint64) {
info.RecordDBIdToName(dbId, dbName, ts)
}

// OnTableInfo implements MetaInfoCollector.OnTableInfo
func (info *LogBackupTableHistoryManager) OnTableInfo(dbID int64, tableInfo *model.TableInfo) {
info.AddTableHistory(tableInfo.ID, tableInfo.Name.O, dbID)
func (info *LogBackupTableHistoryManager) OnTableInfo(
dbID, tableId int64, tableSimpleInfo *tableSimpleInfo, commitTs uint64) {
info.AddTableHistory(tableId, tableSimpleInfo.Name, dbID, commitTs)

// add history for all partitions if this is a partitioned table
if tableInfo.Partition != nil && tableInfo.Partition.Definitions != nil {
for _, partition := range tableInfo.Partition.Definitions {
info.AddPartitionHistory(partition.ID, tableInfo.Name.O, dbID, tableInfo.ID)
}
for _, partitionId := range tableSimpleInfo.PartitionIds {
info.AddPartitionHistory(partitionId, tableSimpleInfo.Name, dbID, tableId, commitTs)
}
}
Loading