@@ -1439,6 +1439,11 @@ func handleTableRenames(
1439
1439
start := dbIDAndTableName [0 ]
1440
1440
end := dbIDAndTableName [1 ]
1441
1441
1442
+ // skip if it contains partition
1443
+ if start .IsPartition || end .IsPartition {
1444
+ continue
1445
+ }
1446
+
1442
1447
startDBName , exists := getDBNameFromIDInBackup (start .DbID , snapshotDBMap , history )
1443
1448
if ! exists {
1444
1449
continue
@@ -1488,6 +1493,84 @@ func handleTableRenames(
1488
1493
}
1489
1494
}
1490
1495
1496
+ // shouldRestoreTable checks if a table or partition is being tracked for restore
1497
+ func shouldRestoreTable (
1498
+ dbID int64 ,
1499
+ tableName string ,
1500
+ isPartition bool ,
1501
+ parentTableID int64 ,
1502
+ snapshotDBMap map [int64 ]* metautil.Database ,
1503
+ history * stream.LogBackupTableHistoryManager ,
1504
+ cfg * RestoreConfig ,
1505
+ ) bool {
1506
+ if isPartition {
1507
+ return cfg .PiTRTableTracker .ContainsTableId (dbID , parentTableID )
1508
+ }
1509
+ dbName , exists := getDBNameFromIDInBackup (dbID , snapshotDBMap , history )
1510
+ if ! exists {
1511
+ return false
1512
+ }
1513
+ return utils .MatchTable (cfg .TableFilter , dbName , tableName , cfg .WithSysTable )
1514
+ }
1515
+
1516
+ // handlePartitionExchanges checks for partition exchanges and returns an error if a partition
1517
+ // was exchanged between tables where one is in the filter and one is not
1518
+ func handlePartitionExchanges (
1519
+ history * stream.LogBackupTableHistoryManager ,
1520
+ snapshotDBMap map [int64 ]* metautil.Database ,
1521
+ cfg * RestoreConfig ,
1522
+ ) error {
1523
+ for tableId , dbIDAndTableName := range history .GetTableHistory () {
1524
+ start := dbIDAndTableName [0 ]
1525
+ end := dbIDAndTableName [1 ]
1526
+
1527
+ // skip if both are not partition
1528
+ if ! start .IsPartition && ! end .IsPartition {
1529
+ continue
1530
+ }
1531
+
1532
+ // skip if parent table id are the same (if it's a table, parent table id will be 0)
1533
+ if start .ParentTableID == end .ParentTableID {
1534
+ continue
1535
+ }
1536
+
1537
+ restoreStart := shouldRestoreTable (start .DbID , start .TableName , start .IsPartition , start .ParentTableID ,
1538
+ snapshotDBMap , history , cfg )
1539
+ restoreEnd := shouldRestoreTable (end .DbID , end .TableName , end .IsPartition , end .ParentTableID ,
1540
+ snapshotDBMap , history , cfg )
1541
+
1542
+ // error out if partition is exchanged between tables where one should restore and one shouldn't
1543
+ if restoreStart != restoreEnd {
1544
+ startDBName , exists := getDBNameFromIDInBackup (start .DbID , snapshotDBMap , history )
1545
+ if ! exists {
1546
+ startDBName = fmt .Sprintf ("(unknown db name %d)" , start .DbID )
1547
+ }
1548
+ endDBName , exists := getDBNameFromIDInBackup (end .DbID , snapshotDBMap , history )
1549
+ if ! exists {
1550
+ endDBName = fmt .Sprintf ("(unknown db name %d)" , end .DbID )
1551
+ }
1552
+
1553
+ return errors .Annotatef (berrors .ErrRestoreModeMismatch ,
1554
+ "partition exchange detected: partition ID %d was exchanged from table '%s.%s' (ID: %d) " +
1555
+ "eventually to table '%s.%s' (ID: %d), which is not supported in table filter" ,
1556
+ tableId , startDBName , start .TableName , start .ParentTableID ,
1557
+ endDBName , end .TableName , end .ParentTableID )
1558
+ }
1559
+
1560
+ // if we reach here, it will only be both are restore or not restore,
1561
+ // if it's table, need to add to table tracker, this is for table created during log backup.
1562
+ // if it's table and exist in snapshot, the actual table and files should already been added
1563
+ // since matches filter.
1564
+ if restoreStart && ! start .IsPartition {
1565
+ cfg .PiTRTableTracker .TrackTableId (start .DbID , tableId )
1566
+ }
1567
+ if restoreEnd && ! end .IsPartition {
1568
+ cfg .PiTRTableTracker .TrackTableId (end .DbID , tableId )
1569
+ }
1570
+ }
1571
+ return nil
1572
+ }
1573
+
1491
1574
func AdjustTablesToRestoreAndCreateTableTracker (
1492
1575
logBackupTableHistory * stream.LogBackupTableHistoryManager ,
1493
1576
cfg * RestoreConfig ,
@@ -1498,6 +1581,7 @@ func AdjustTablesToRestoreAndCreateTableTracker(
1498
1581
) (err error ) {
1499
1582
// build tracker for pitr restore to use later
1500
1583
piTRIdTracker := utils .NewPiTRIdTracker ()
1584
+ cfg .PiTRTableTracker = piTRIdTracker
1501
1585
1502
1586
// track newly created databases
1503
1587
newlyCreatedDBs := logBackupTableHistory .GetNewlyCreatedDBHistory ()
@@ -1510,15 +1594,17 @@ func AdjustTablesToRestoreAndCreateTableTracker(
1510
1594
// first handle table renames to determine which tables we need
1511
1595
handleTableRenames (logBackupTableHistory , snapshotDBMap , cfg , tableMap , dbMap , fileMap , piTRIdTracker )
1512
1596
1513
- // handle partition exchange if needed in future
1597
+ // handle partition exchange after all tables are tracked
1598
+ if err := handlePartitionExchanges (logBackupTableHistory , snapshotDBMap , cfg ); err != nil {
1599
+ return err
1600
+ }
1514
1601
1515
1602
// track all snapshot tables that's going to restore in PiTR tracker
1516
1603
for tableID , table := range tableMap {
1517
1604
piTRIdTracker .TrackTableId (table .DB .ID , tableID )
1518
1605
}
1519
1606
1520
1607
log .Info ("pitr table tracker" , zap .String ("map" , piTRIdTracker .String ()))
1521
- cfg .PiTRTableTracker = piTRIdTracker
1522
1608
return nil
1523
1609
}
1524
1610
0 commit comments