Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions br/pkg/restore/log_client/batch_meta_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ func (mp *MetaKVInfoProcessor) ProcessBatch(

// add to table rename history
mp.tableHistoryManager.AddTableHistory(tableInfo.ID, tableInfo.Name.String(), dbID)

// track partitions if this is a partitioned table
if tableInfo.Partition != nil {
for _, def := range tableInfo.Partition.Definitions {
mp.tableHistoryManager.AddPartitionHistory(def.ID, tableInfo.Name.String(), dbID, tableInfo.ID)
}
}
}
}
}
Expand Down
24 changes: 20 additions & 4 deletions br/pkg/stream/table_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package stream

// TableLocationInfo stores the table name, db id, and parent table id if is a partition
type TableLocationInfo struct {
DbID int64
TableName string
DbID int64
TableName string
IsPartition bool
ParentTableID int64 // only meaningful when IsPartition is true
}

type LogBackupTableHistoryManager struct {
Expand All @@ -36,12 +38,26 @@ func NewTableHistoryManager() *LogBackupTableHistoryManager {
// AddTableHistory adds or updates history for a regular table
func (info *LogBackupTableHistoryManager) AddTableHistory(tableId int64, tableName string, dbID int64) {
locationInfo := TableLocationInfo{
DbID: dbID,
TableName: tableName,
DbID: dbID,
TableName: tableName,
IsPartition: false,
ParentTableID: 0,
}
info.addHistory(tableId, locationInfo)
}

// AddPartitionHistory adds or updates history for a partition
func (info *LogBackupTableHistoryManager) AddPartitionHistory(partitionID int64, tableName string,
dbID int64, parentTableID int64) {
locationInfo := TableLocationInfo{
DbID: dbID,
TableName: tableName,
IsPartition: true,
ParentTableID: parentTableID,
}
info.addHistory(partitionID, locationInfo)
}

// addHistory is a helper method to maintain the history
func (info *LogBackupTableHistoryManager) addHistory(id int64, locationInfo TableLocationInfo) {
existing, exists := info.tableNameHistory[id]
Expand Down
90 changes: 88 additions & 2 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,11 @@ func handleTableRenames(
start := dbIDAndTableName[0]
end := dbIDAndTableName[1]

// skip if it contains partition
if start.IsPartition || end.IsPartition {
continue
}

startDBName, exists := getDBNameFromIDInBackup(start.DbID, snapshotDBMap, history)
if !exists {
continue
Expand Down Expand Up @@ -1483,6 +1488,84 @@ func handleTableRenames(
}
}

// shouldRestoreTable checks if a table or partition is being tracked for restore
func shouldRestoreTable(
dbID int64,
tableName string,
isPartition bool,
parentTableID int64,
snapshotDBMap map[int64]*metautil.Database,
history *stream.LogBackupTableHistoryManager,
cfg *RestoreConfig,
) bool {
if isPartition {
return cfg.PiTRTableTracker.ContainsTableId(dbID, parentTableID)
}
dbName, exists := getDBNameFromIDInBackup(dbID, snapshotDBMap, history)
if !exists {
return false
}
return utils.MatchTable(cfg.TableFilter, dbName, tableName, cfg.WithSysTable)
}

// handlePartitionExchanges checks for partition exchanges and returns an error if a partition
// was exchanged between tables where one is in the filter and one is not
func handlePartitionExchanges(
history *stream.LogBackupTableHistoryManager,
snapshotDBMap map[int64]*metautil.Database,
cfg *RestoreConfig,
) error {
for tableId, dbIDAndTableName := range history.GetTableHistory() {
start := dbIDAndTableName[0]
end := dbIDAndTableName[1]

// skip if both are not partition
if !start.IsPartition && !end.IsPartition {
continue
}

// skip if parent table id are the same (if it's a table, parent table id will be 0)
if start.ParentTableID == end.ParentTableID {
continue
}

restoreStart := shouldRestoreTable(start.DbID, start.TableName, start.IsPartition, start.ParentTableID,
snapshotDBMap, history, cfg)
restoreEnd := shouldRestoreTable(end.DbID, end.TableName, end.IsPartition, end.ParentTableID,
snapshotDBMap, history, cfg)

// error out if partition is exchanged between tables where one should restore and one shouldn't
if restoreStart != restoreEnd {
startDBName, exists := getDBNameFromIDInBackup(start.DbID, snapshotDBMap, history)
if !exists {
startDBName = fmt.Sprintf("(unknown db name %d)", start.DbID)
}
endDBName, exists := getDBNameFromIDInBackup(end.DbID, snapshotDBMap, history)
if !exists {
endDBName = fmt.Sprintf("(unknown db name %d)", end.DbID)
}

return errors.Annotatef(berrors.ErrRestoreModeMismatch,
"partition exchange detected: partition ID %d was exchanged from table '%s.%s' (ID: %d) "+
"eventually to table '%s.%s' (ID: %d), which is not supported in table filter",
tableId, startDBName, start.TableName, start.ParentTableID,
endDBName, end.TableName, end.ParentTableID)
}

// if we reach here, it will only be both are restore or not restore,
// if it's table, need to add to table tracker, this is for table created during log backup.
// if it's table and exist in snapshot, the actual table and files should already been added
// since matches filter.
if restoreStart && !start.IsPartition {
cfg.PiTRTableTracker.TrackTableId(start.DbID, tableId)
}
if restoreEnd && !end.IsPartition {
cfg.PiTRTableTracker.TrackTableId(end.DbID, tableId)
}
}
return nil
}

func AdjustTablesToRestoreAndCreateTableTracker(
logBackupTableHistory *stream.LogBackupTableHistoryManager,
cfg *RestoreConfig,
Expand All @@ -1493,6 +1576,7 @@ func AdjustTablesToRestoreAndCreateTableTracker(
) (err error) {
// build tracker for pitr restore to use later
piTRIdTracker := utils.NewPiTRIdTracker()
cfg.PiTRTableTracker = piTRIdTracker

// track newly created databases
newlyCreatedDBs := logBackupTableHistory.GetNewlyCreatedDBHistory()
Expand All @@ -1505,15 +1589,17 @@ func AdjustTablesToRestoreAndCreateTableTracker(
// first handle table renames to determine which tables we need
handleTableRenames(logBackupTableHistory, snapshotDBMap, cfg, tableMap, dbMap, fileMap, piTRIdTracker)

// handle partition exchange if needed in future
// handle partition exchange after all tables are tracked
if err := handlePartitionExchanges(logBackupTableHistory, snapshotDBMap, cfg); err != nil {
return err
}

// track all snapshot tables that's going to restore in PiTR tracker
for tableID, table := range tableMap {
piTRIdTracker.TrackTableId(table.DB.ID, tableID)
}

log.Info("pitr table tracker", zap.String("map", piTRIdTracker.String()))
cfg.PiTRTableTracker = piTRIdTracker
return nil
}

Expand Down
7 changes: 5 additions & 2 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1981,7 +1981,10 @@ func buildPauseSafePointName(taskName string) string {
return fmt.Sprintf("%s_pause_safepoint", taskName)
}

func checkPiTRRequirements(mgr *conn.Mgr) error {
func checkPiTRRequirements(mgr *conn.Mgr, hasExplicitFilter bool) error {
if hasExplicitFilter {
return nil
}
return restore.AssertUserDBsEmpty(mgr.GetDomain())
}

Expand Down Expand Up @@ -2055,7 +2058,7 @@ func generatePiTRTaskInfo(
// Only when use checkpoint and not the first execution,
// skip checking requirements.
log.Info("check pitr requirements for the first execution")
if err := checkPiTRRequirements(mgr); err != nil {
if err := checkPiTRRequirements(mgr, cfg.ExplicitFilter); err != nil {
// delay cluster checks after we get the backupmeta.
// for the case that the restore inc + log backup,
// we can still restore them.
Expand Down
Loading