diff --git a/br/pkg/restore/log_client/batch_meta_processor.go b/br/pkg/restore/log_client/batch_meta_processor.go index f6fe3841affb8..7ba3c4f5c9213 100644 --- a/br/pkg/restore/log_client/batch_meta_processor.go +++ b/br/pkg/restore/log_client/batch_meta_processor.go @@ -206,6 +206,13 @@ func (mp *MetaKVInfoProcessor) ProcessBatch( // add to table rename history mp.tableHistoryManager.AddTableHistory(tableInfo.ID, tableInfo.Name.String(), dbID) + + // track partitions if this is a partitioned table + if tableInfo.Partition != nil { + for _, def := range tableInfo.Partition.Definitions { + mp.tableHistoryManager.AddPartitionHistory(def.ID, tableInfo.Name.String(), dbID, tableInfo.ID) + } + } } } } diff --git a/br/pkg/stream/table_history.go b/br/pkg/stream/table_history.go index e90cd1bccddfa..89382700c85e4 100644 --- a/br/pkg/stream/table_history.go +++ b/br/pkg/stream/table_history.go @@ -16,8 +16,10 @@ package stream // TableLocationInfo stores the table name, db id, and parent table id if is a partition type TableLocationInfo struct { - DbID int64 - TableName string + DbID int64 + TableName string + IsPartition bool + ParentTableID int64 // only meaningful when IsPartition is true } type LogBackupTableHistoryManager struct { @@ -36,12 +38,26 @@ func NewTableHistoryManager() *LogBackupTableHistoryManager { // AddTableHistory adds or updates history for a regular table func (info *LogBackupTableHistoryManager) AddTableHistory(tableId int64, tableName string, dbID int64) { locationInfo := TableLocationInfo{ - DbID: dbID, - TableName: tableName, + DbID: dbID, + TableName: tableName, + IsPartition: false, + ParentTableID: 0, } info.addHistory(tableId, locationInfo) } +// AddPartitionHistory adds or updates history for a partition +func (info *LogBackupTableHistoryManager) AddPartitionHistory(partitionID int64, tableName string, + dbID int64, parentTableID int64) { + locationInfo := TableLocationInfo{ + DbID: dbID, + TableName: tableName, + IsPartition: true, + ParentTableID: parentTableID, + } + info.addHistory(partitionID, locationInfo) +} + // addHistory is a helper method to maintain the history func (info *LogBackupTableHistoryManager) addHistory(id int64, locationInfo TableLocationInfo) { existing, exists := info.tableNameHistory[id] diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index d120054aa292e..f4fad8624d975 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -1434,6 +1434,11 @@ func handleTableRenames( start := dbIDAndTableName[0] end := dbIDAndTableName[1] + // skip if it contains partition + if start.IsPartition || end.IsPartition { + continue + } + startDBName, exists := getDBNameFromIDInBackup(start.DbID, snapshotDBMap, history) if !exists { continue @@ -1483,6 +1488,84 @@ func handleTableRenames( } } +// shouldRestoreTable checks if a table or partition is being tracked for restore +func shouldRestoreTable( + dbID int64, + tableName string, + isPartition bool, + parentTableID int64, + snapshotDBMap map[int64]*metautil.Database, + history *stream.LogBackupTableHistoryManager, + cfg *RestoreConfig, +) bool { + if isPartition { + return cfg.PiTRTableTracker.ContainsTableId(dbID, parentTableID) + } + dbName, exists := getDBNameFromIDInBackup(dbID, snapshotDBMap, history) + if !exists { + return false + } + return utils.MatchTable(cfg.TableFilter, dbName, tableName, cfg.WithSysTable) +} + +// handlePartitionExchanges checks for partition exchanges and returns an error if a partition +// was exchanged between tables where one is in the filter and one is not +func handlePartitionExchanges( + history *stream.LogBackupTableHistoryManager, + snapshotDBMap map[int64]*metautil.Database, + cfg *RestoreConfig, +) error { + for tableId, dbIDAndTableName := range history.GetTableHistory() { + start := dbIDAndTableName[0] + end := dbIDAndTableName[1] + + // skip if both are not partition + if !start.IsPartition && !end.IsPartition { + continue + } + + // skip if parent table id are the same (if it's a table, parent table id will be 0) + if start.ParentTableID == end.ParentTableID { + continue + } + + restoreStart := shouldRestoreTable(start.DbID, start.TableName, start.IsPartition, start.ParentTableID, + snapshotDBMap, history, cfg) + restoreEnd := shouldRestoreTable(end.DbID, end.TableName, end.IsPartition, end.ParentTableID, + snapshotDBMap, history, cfg) + + // error out if partition is exchanged between tables where one should restore and one shouldn't + if restoreStart != restoreEnd { + startDBName, exists := getDBNameFromIDInBackup(start.DbID, snapshotDBMap, history) + if !exists { + startDBName = fmt.Sprintf("(unknown db name %d)", start.DbID) + } + endDBName, exists := getDBNameFromIDInBackup(end.DbID, snapshotDBMap, history) + if !exists { + endDBName = fmt.Sprintf("(unknown db name %d)", end.DbID) + } + + return errors.Annotatef(berrors.ErrRestoreModeMismatch, + "partition exchange detected: partition ID %d was exchanged from table '%s.%s' (ID: %d) "+ + "eventually to table '%s.%s' (ID: %d), which is not supported in table filter", + tableId, startDBName, start.TableName, start.ParentTableID, + endDBName, end.TableName, end.ParentTableID) + } + + // if we reach here, it will only be both are restore or not restore, + // if it's table, need to add to table tracker, this is for table created during log backup. + // if it's table and exist in snapshot, the actual table and files should already been added + // since matches filter. + if restoreStart && !start.IsPartition { + cfg.PiTRTableTracker.TrackTableId(start.DbID, tableId) + } + if restoreEnd && !end.IsPartition { + cfg.PiTRTableTracker.TrackTableId(end.DbID, tableId) + } + } + return nil +} + func AdjustTablesToRestoreAndCreateTableTracker( logBackupTableHistory *stream.LogBackupTableHistoryManager, cfg *RestoreConfig, @@ -1493,6 +1576,7 @@ func AdjustTablesToRestoreAndCreateTableTracker( ) (err error) { // build tracker for pitr restore to use later piTRIdTracker := utils.NewPiTRIdTracker() + cfg.PiTRTableTracker = piTRIdTracker // track newly created databases newlyCreatedDBs := logBackupTableHistory.GetNewlyCreatedDBHistory() @@ -1505,7 +1589,10 @@ func AdjustTablesToRestoreAndCreateTableTracker( // first handle table renames to determine which tables we need handleTableRenames(logBackupTableHistory, snapshotDBMap, cfg, tableMap, dbMap, fileMap, piTRIdTracker) - // handle partition exchange if needed in future + // handle partition exchange after all tables are tracked + if err := handlePartitionExchanges(logBackupTableHistory, snapshotDBMap, cfg); err != nil { + return err + } // track all snapshot tables that's going to restore in PiTR tracker for tableID, table := range tableMap { @@ -1513,7 +1600,6 @@ func AdjustTablesToRestoreAndCreateTableTracker( } log.Info("pitr table tracker", zap.String("map", piTRIdTracker.String())) - cfg.PiTRTableTracker = piTRIdTracker return nil } diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index da18000ecce65..de08ce2213a1a 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1981,7 +1981,10 @@ func buildPauseSafePointName(taskName string) string { return fmt.Sprintf("%s_pause_safepoint", taskName) } -func checkPiTRRequirements(mgr *conn.Mgr) error { +func checkPiTRRequirements(mgr *conn.Mgr, hasExplicitFilter bool) error { + if hasExplicitFilter { + return nil + } return restore.AssertUserDBsEmpty(mgr.GetDomain()) } @@ -2055,7 +2058,7 @@ func generatePiTRTaskInfo( // Only when use checkpoint and not the first execution, // skip checking requirements. log.Info("check pitr requirements for the first execution") - if err := checkPiTRRequirements(mgr); err != nil { + if err := checkPiTRRequirements(mgr, cfg.ExplicitFilter); err != nil { // delay cluster checks after we get the backupmeta. // for the case that the restore inc + log backup, // we can still restore them. diff --git a/br/tests/br_pitr_table_filter/run.sh b/br/tests/br_pitr_table_filter/run.sh index 07d739e52575c..e7ea36a26b17b 100755 --- a/br/tests/br_pitr_table_filter/run.sh +++ b/br/tests/br_pitr_table_filter/run.sh @@ -14,6 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +# disable global ENCRYPTION_ARGS and ENABLE_ENCRYPTION_CHECK for this script +ENCRYPTION_ARGS="" +ENABLE_ENCRYPTION_CHECK=false +export ENCRYPTION_ARGS +export ENABLE_ENCRYPTION_CHECK + set -eux DB="$TEST_NAME" CUR=$(cd `dirname $0`; pwd) @@ -704,6 +710,656 @@ test_index_filter() { echo "Index filter test passed" } +test_partition_exchange() { + restart_services || { echo "Failed to restart services"; exit 1; } + + echo "start testing partition exchange with filter" + run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log" + + run_sql "create schema $DB;" + + # Create tables that will be in backup + echo "creating tables for backup..." + run_sql "CREATE TABLE $DB.backup_source ( + id INT, + value INT, + PRIMARY KEY(id, value) + ) PARTITION BY RANGE (value) ( + PARTITION p0 VALUES LESS THAN (100), + PARTITION p1 VALUES LESS THAN (200) + );" + + run_sql "CREATE TABLE $DB.backup_target1 ( + id INT, + value INT, + PRIMARY KEY(id, value) + );" + + run_sql "CREATE TABLE $DB.backup_target2 ( + id INT, + value INT, + PRIMARY KEY(id, value) + );" + + # Insert data into backup tables + run_sql "INSERT INTO $DB.backup_source VALUES (1, 50), (2, 150);" + run_sql "INSERT INTO $DB.backup_target1 VALUES (3, 50);" + run_sql "INSERT INTO $DB.backup_target2 VALUES (4, 150);" + + # Take full backup + run_br backup full -s "local://$TEST_DIR/$TASK_NAME/full" --pd $PD_ADDR + + # Create tables that will only exist in log + echo "creating tables that will only exist in log..." + run_sql "CREATE TABLE $DB.log_source ( + id INT, + value INT, + PRIMARY KEY(id, value) + ) PARTITION BY RANGE (value) ( + PARTITION p0 VALUES LESS THAN (100), + PARTITION p1 VALUES LESS THAN (200) + );" + + run_sql "CREATE TABLE $DB.log_target1 ( + id INT, + value INT, + PRIMARY KEY(id, value) + );" + + run_sql "CREATE TABLE $DB.log_target2 ( + id INT, + value INT, + PRIMARY KEY(id, value) + );" + + # Insert data into log-only tables + run_sql "INSERT INTO $DB.log_source VALUES (5, 50), (6, 150);" + run_sql "INSERT INTO $DB.log_target1 VALUES (7, 50);" + run_sql "INSERT INTO $DB.log_target2 VALUES (8, 150);" + + echo "performing all partition exchange operations..." + + # Case 1: Exchange between backup tables + run_sql "ALTER TABLE $DB.backup_source EXCHANGE PARTITION p0 WITH TABLE $DB.backup_target1;" + + # Case 2: Exchange between log-only tables + run_sql "ALTER TABLE $DB.log_source EXCHANGE PARTITION p0 WITH TABLE $DB.log_target1;" + + # Case 3: Exchange between backup source and log target + run_sql "ALTER TABLE $DB.backup_source EXCHANGE PARTITION p1 WITH TABLE $DB.log_target2;" + + # Case 4: Exchange between log source and backup target + run_sql "ALTER TABLE $DB.log_source EXCHANGE PARTITION p1 WITH TABLE $DB.backup_target2;" + + # Wait for log backup to catch up with all operations + . "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance "$TASK_NAME" + + # Stop log backup before starting restore tests + run_br log stop --task-name $TASK_NAME + + echo "starting restore tests..." + + # Test 1: Backup source and all in filter - should succeed + echo "test 1: backup source and all in filter" + run_sql "drop schema if exists $DB;" + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.backup_source" -f "$DB.backup_target1" -f "$DB.log_target2"|| { + echo "Failed: backup source and all in filter should succeed" + exit 1 + } + # Verify data after restore + run_sql "SELECT COUNT(*) = 1 FROM $DB.backup_source PARTITION (p0) WHERE id = 3 AND value = 50" || { + echo "backup_source p0 doesn't have expected data after restore" + exit 1 + } + run_sql "SELECT COUNT(*) = 1 FROM $DB.backup_target1 WHERE id = 1 AND value = 50" || { + echo "backup_target1 doesn't have expected data after restore" + exit 1 + } + run_sql "SELECT COUNT(*) = 1 FROM $DB.log_target2 WHERE id = 2 AND value = 150" || { + echo "backup_target1 doesn't have expected data after restore" + exit 1 + } + + # Test 2: Log source and all in filter - should succeed + echo "test 2: log source and all in filter" + run_sql "drop schema if exists $DB;" + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.log_source" -f "$DB.log_target1" -f "$DB.backup_target2" || { + echo "Failed: log source and all in filter should succeed" + exit 1 + } + # Verify data after restore + run_sql "SELECT COUNT(*) = 1 FROM $DB.log_source PARTITION (p0) WHERE id = 7 AND value = 50" || { + echo "log_source p0 doesn't have expected data after restore" + exit 1 + } + run_sql "SELECT COUNT(*) = 1 FROM $DB.log_target1 WHERE id = 5 AND value = 50" || { + echo "log_target1 doesn't have expected data after restore" + exit 1 + } + run_sql "SELECT COUNT(*) = 1 FROM $DB.backup_target2 WHERE id = 6 AND value = 150" || { + echo "backup_target1 doesn't have expected data after restore" + exit 1 + } + + # Test 3: Only backup source in filter - should fail + echo "test 3: only backup source in filter" + run_sql "drop schema if exists $DB;" + restore_fail=0 + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.backup_source" || restore_fail=1 + if [ $restore_fail -ne 1 ]; then + echo "Failed: backup source only in filter should fail" + exit 1 + fi + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.backup_source" 2>&1 | grep "partition exchange detected" || { + echo "Error message does not contain partition exchange information" + exit 1 + } + + # Test 4: Only backup target in filter - should fail + echo "test 4: only backup target in filter" + run_sql "drop schema if exists $DB;" + restore_fail=0 + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.backup_target1" || restore_fail=1 + if [ $restore_fail -ne 1 ]; then + echo "Failed: backup target only in filter should fail" + exit 1 + fi + + restore_fail=0 + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.backup_target2" || restore_fail=1 + if [ $restore_fail -ne 1 ]; then + echo "Failed: backup target only in filter should fail" + exit 1 + fi + + # Test 5: Only log source in filter - should fail + echo "test 5: only log source in filter" + run_sql "drop schema if exists $DB;" + restore_fail=0 + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.log_source" || restore_fail=1 + if [ $restore_fail -ne 1 ]; then + echo "Failed: log source only in filter should fail" + exit 1 + fi + + # Test 6: Only log target in filter - should fail + echo "test 6: only log target in filter" + run_sql "drop schema if exists $DB;" + restore_fail=0 + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.log_target1" || restore_fail=1 + if [ $restore_fail -ne 1 ]; then + echo "Failed: log target only in filter should fail" + exit 1 + fi + + restore_fail=0 + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.log_target2" || restore_fail=1 + if [ $restore_fail -ne 1 ]; then + echo "Failed: log target only in filter should fail" + exit 1 + fi + + # Test 7: Neither table in filter - should succeed with no tables + echo "test 7: neither table in filter" + run_sql "drop schema if exists $DB;" + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.unrelated_table" || { + echo "Failed: neither table in filter should succeed" + exit 1 + } + # Verify no tables were restored + verify_no_unexpected_tables 0 "$DB" || { + echo "Found unexpected tables after neither table in filter" + exit 1 + } + + # Test 8: Wildcard filter including all tables - should succeed + echo "test 8: wildcard filter including all tables" + run_sql "drop schema if exists $DB;" + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.*" || { + echo "Failed: wildcard filter should succeed" + exit 1 + } + # Verify all tables are restored + verify_no_unexpected_tables 6 "$DB" || { + echo "Wrong number of tables restored with wildcard filter" + exit 1 + } + + # cleanup + rm -rf "$TEST_DIR/$TASK_NAME" + + echo "partition exchange test passed" +} + +test_table_truncation() { + restart_services || { echo "Failed to restart services"; exit 1; } + + echo "start testing table truncation with filter" + run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log" + + run_sql "create schema $DB;" + + # Create tables for snapshot backup + echo "creating tables for snapshot backup..." + run_sql "CREATE TABLE $DB.snapshot_truncate ( + id INT PRIMARY KEY, + value VARCHAR(50) + );" + + # Insert initial data + run_sql "INSERT INTO $DB.snapshot_truncate VALUES (1, 'initial data 1'), (2, 'initial data 2');" + + # Take full backup + run_br backup full -s "local://$TEST_DIR/$TASK_NAME/full" --pd $PD_ADDR + + # Create tables during log backup phase + echo "creating tables during log backup phase..." + run_sql "CREATE TABLE $DB.log_truncate ( + id INT PRIMARY KEY, + value VARCHAR(50) + );" + + # Insert data into log-created table + run_sql "INSERT INTO $DB.log_truncate VALUES (1, 'log data 1'), (2, 'log data 2');" + + # Add more data to snapshot table before truncation + run_sql "INSERT INTO $DB.snapshot_truncate VALUES (3, 'pre-truncate data');" + + # Truncate both tables + echo "truncating tables..." + run_sql "TRUNCATE TABLE $DB.snapshot_truncate;" + run_sql "TRUNCATE TABLE $DB.log_truncate;" + + # Insert new data after truncation + run_sql "INSERT INTO $DB.snapshot_truncate VALUES (10, 'post-truncate data 1'), (20, 'post-truncate data 2');" + run_sql "INSERT INTO $DB.log_truncate VALUES (10, 'post-truncate data 1'), (20, 'post-truncate data 2');" + + . "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance "$TASK_NAME" + + # Stop log backup before starting restore operations + run_br log stop --task-name $TASK_NAME --pd $PD_ADDR + + run_sql "drop schema if exists $DB;" + + echo "Test 1: Restore both truncated tables" + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.*" + + # Verify data after restore - should only have post-truncate data + run_sql "SELECT COUNT(*) = 2 FROM $DB.snapshot_truncate" || { + echo "snapshot_truncate doesn't have expected row count after restore" + exit 1 + } + + run_sql "SELECT COUNT(*) = 2 FROM $DB.log_truncate" || { + echo "log_truncate doesn't have expected row count after restore" + exit 1 + } + + # Verify specific values to ensure we have post-truncate data + run_sql "SELECT COUNT(*) = 1 FROM $DB.snapshot_truncate WHERE id = 10" || { + echo "snapshot_truncate doesn't have expected post-truncate data" + exit 1 + } + + run_sql "SELECT COUNT(*) = 1 FROM $DB.log_truncate WHERE id = 10" || { + echo "log_truncate doesn't have expected post-truncate data" + exit 1 + } + + # Verify pre-truncate data is gone + run_sql "SELECT COUNT(*) = 0 FROM $DB.snapshot_truncate WHERE id IN (1, 2, 3)" || { + echo "snapshot_truncate still has pre-truncate data which should be gone" + exit 1 + } + + run_sql "SELECT COUNT(*) = 0 FROM $DB.log_truncate WHERE id IN (1, 2)" || { + echo "log_truncate still has pre-truncate data which should be gone" + exit 1 + } + + # Test 2: Restore only snapshot table + run_sql "drop schema if exists $DB;" + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.snapshot_truncate" + + # Verify only snapshot table exists + verify_no_unexpected_tables 1 "$DB" || { + echo "Wrong number of tables restored with snapshot_truncate filter" + exit 1 + } + + # Verify data is correct + run_sql "SELECT COUNT(*) = 2 FROM $DB.snapshot_truncate" || { + echo "snapshot_truncate doesn't have expected row count after filtered restore" + exit 1 + } + + # Test 3: Restore only log table + run_sql "drop schema if exists $DB;" + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.log_truncate" + + # Verify only log table exists + verify_no_unexpected_tables 1 "$DB" || { + echo "Wrong number of tables restored with log_truncate filter" + exit 1 + } + + # Verify data is correct + run_sql "SELECT COUNT(*) = 2 FROM $DB.log_truncate" || { + echo "log_truncate doesn't have expected row count after filtered restore" + exit 1 + } + + # cleanup + rm -rf "$TEST_DIR/$TASK_NAME" + + echo "table truncation test passed" +} + +test_sequential_restore() { + restart_services || { echo "Failed to restart services"; exit 1; } + + echo "start testing sequential table restore with filter" + run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log" + + run_sql "create schema $DB;" + + # Create multiple tables with different data + echo "creating tables for testing sequential restore..." + run_sql "CREATE TABLE $DB.table1 ( + id INT PRIMARY KEY, + value VARCHAR(50) + );" + run_sql "CREATE TABLE $DB.table2 ( + id INT PRIMARY KEY, + value VARCHAR(50) + );" + run_sql "CREATE TABLE $DB.table3 ( + id INT PRIMARY KEY, + value VARCHAR(50) + );" + + # Insert initial data + run_sql "INSERT INTO $DB.table1 VALUES (1, 'table1 data 1'), (2, 'table1 data 2');" + run_sql "INSERT INTO $DB.table2 VALUES (1, 'table2 data 1'), (2, 'table2 data 2');" + run_sql "INSERT INTO $DB.table3 VALUES (1, 'table3 data 1'), (2, 'table3 data 2');" + + # Take full backup + run_br backup full -s "local://$TEST_DIR/$TASK_NAME/full" --pd $PD_ADDR + + # Add more data after backup + run_sql "INSERT INTO $DB.table1 VALUES (3, 'table1 data 3'), (4, 'table1 data 4');" + run_sql "INSERT INTO $DB.table2 VALUES (3, 'table2 data 3'), (4, 'table2 data 4');" + run_sql "INSERT INTO $DB.table3 VALUES (3, 'table3 data 3'), (4, 'table3 data 4');" + + # Wait for log backup to catch up with all operations + . "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance "$TASK_NAME" + + # Stop log backup before starting restore operations + run_br log stop --task-name $TASK_NAME --pd $PD_ADDR + + # Clean up the database before starting the sequential restore tests + run_sql "drop schema if exists $DB;" + + echo "Test 1: Restore first table" + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.table1" + + # Verify only table1 exists with correct data + verify_no_unexpected_tables 1 "$DB" || { + echo "Wrong number of tables after restoring table1" + exit 1 + } + + run_sql "SELECT COUNT(*) = 4 FROM $DB.table1" || { + echo "table1 doesn't have expected row count after restore" + exit 1 + } + + echo "Test 2: Restore second table without cleaning up" + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.table2" + + # Verify both table1 and table2 exist with correct data + verify_no_unexpected_tables 2 "$DB" || { + echo "Wrong number of tables after restoring table2" + exit 1 + } + + run_sql "SELECT COUNT(*) = 4 FROM $DB.table1" || { + echo "table1 doesn't have expected row count after second restore" + exit 1 + } + + run_sql "SELECT COUNT(*) = 4 FROM $DB.table2" || { + echo "table2 doesn't have expected row count after restore" + exit 1 + } + + echo "Test 3: Restore third table without cleaning up" + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.table3" + + # Verify all three tables exist with correct data + verify_no_unexpected_tables 3 "$DB" || { + echo "Wrong number of tables after restoring table3" + exit 1 + } + + run_sql "SELECT COUNT(*) = 4 FROM $DB.table1" || { + echo "table1 doesn't have expected row count after third restore" + exit 1 + } + + run_sql "SELECT COUNT(*) = 4 FROM $DB.table2" || { + echo "table2 doesn't have expected row count after third restore" + exit 1 + } + + run_sql "SELECT COUNT(*) = 4 FROM $DB.table3" || { + echo "table3 doesn't have expected row count after restore" + exit 1 + } + + # cleanup + rm -rf "$TEST_DIR/$TASK_NAME" + + echo "sequential restore test passed" +} + +test_log_compaction() { + restart_services || { echo "Failed to restart services"; exit 1; } + + echo "start testing table filter with log compaction" + run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log" + + run_sql "create schema $DB;" + + # Create tables for snapshot backup + echo "creating tables for snapshot backup..." + run_sql "CREATE TABLE $DB.compaction_snapshot ( + id INT PRIMARY KEY, + value VARCHAR(50) + );" + + # Insert initial data + run_sql "INSERT INTO $DB.compaction_snapshot VALUES (1, 'initial data 1'), (2, 'initial data 2');" + + # Take full backup + run_br backup full -s "local://$TEST_DIR/$TASK_NAME/full" --pd $PD_ADDR + + # Create tables during log backup phase + echo "creating tables during log backup phase..." + run_sql "CREATE TABLE $DB.compaction_log ( + id INT PRIMARY KEY, + value VARCHAR(50) + );" + + # Insert data into log-created table + run_sql "INSERT INTO $DB.compaction_log VALUES (1, 'log data 1'), (2, 'log data 2');" + + # Add more data to snapshot table + run_sql "INSERT INTO $DB.compaction_snapshot VALUES (3, 'more data 1'), (4, 'more data 2');" + + # Wait for log backup to catch up with all operations + current_ts=$(python3 -c "import time; print(int(time.time() * 1000) << 18)") + . "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance "$TASK_NAME" + + # Verify no SST files exist before compaction + pre_compaction_files=$(find "$TEST_DIR/$TASK_NAME/log" -name "*.sst" | wc -l) + if [ "$pre_compaction_files" -ne 0 ]; then + echo "Found $pre_compaction_files SST files before compaction, expected 0" + exit 1 + fi + echo "Verified no SST files exist before compaction" + + # Step 1: Get the Base64 encoded storage URL + echo "Encoding storage URL to Base64" + + # Run the base64ify command and capture its output, redirecting stderr to stdout + base64_output=$(run_br operator base64ify --storage "local://$TEST_DIR/$TASK_NAME/log" 2>&1) + + # Extract only lines that look like Base64 (long string of base64 chars) + storage_base64=$(echo "$base64_output" | grep -o '[A-Za-z0-9+/]\{20,\}=\{0,2\}' | grep '^E' | head -1) + + # Verify that we got a valid Base64 string + if [ -z "$storage_base64" ]; then + echo "Failed to extract Base64 encoded storage URL. Full output:" + echo "$base64_output" + exit 1 + fi + + echo "Extracted Base64 encoded storage URL: $storage_base64" + + # Get current timestamp and a timestamp from 1 hour ago for compaction range + one_hour_ago_ts=$(python3 -c "import time; print(int((time.time() - 3600) * 1000) << 18)") + + echo "Current timestamp: $current_ts" + echo "One hour ago timestamp: $one_hour_ago_ts" + + echo "Compacting logs from $one_hour_ago_ts to $current_ts" + + # Run tikv-ctl to perform compaction + tikv-ctl --log-level=info compact-log-backup --from "$one_hour_ago_ts" --until "$current_ts" -s "$storage_base64" -N 4 --minimal-compaction-size 0 + + # Verify SST files exist after compaction + post_compaction_files=$(find "$TEST_DIR/$TASK_NAME/log" -name "*.sst" | wc -l) + if [ "$post_compaction_files" -eq 0 ]; then + echo "No SST files found after compaction, expected at least 1" + exit 1 + fi + echo "Verified $post_compaction_files SST files exist after compaction" + + # Add more data after compaction + run_sql "INSERT INTO $DB.compaction_snapshot VALUES (5, 'post-compaction data 1');" + run_sql "INSERT INTO $DB.compaction_log VALUES (3, 'post-compaction data 2');" + + # Wait for log backup to catch up again + . "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance "$TASK_NAME" + + # Stop log backup before starting restore operations + run_br log stop --task-name $TASK_NAME --pd $PD_ADDR + + run_sql "drop schema if exists $DB;" + + echo "Test 1: Restore both tables with compacted logs" + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.*" + + # Verify data after restore - should have all data including post-compaction + run_sql "SELECT COUNT(*) = 5 FROM $DB.compaction_snapshot" || { + echo "compaction_snapshot doesn't have expected row count after restore" + exit 1 + } + + run_sql "SELECT COUNT(*) = 3 FROM $DB.compaction_log" || { + echo "compaction_log doesn't have expected row count after restore" + exit 1 + } + + # Verify specific values to ensure we have post-compaction data + run_sql "SELECT COUNT(*) = 1 FROM $DB.compaction_snapshot WHERE id = 5" || { + echo "compaction_snapshot doesn't have expected post-compaction data" + exit 1 + } + + run_sql "SELECT COUNT(*) = 1 FROM $DB.compaction_log WHERE id = 3" || { + echo "compaction_log doesn't have expected post-compaction data" + exit 1 + } + + # Test 2: Restore only snapshot table with filter + run_sql "drop schema if exists $DB;" + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.compaction_snapshot" + + # Verify only snapshot table exists + verify_no_unexpected_tables 1 "$DB" || { + echo "Wrong number of tables restored with compaction_snapshot filter" + exit 1 + } + + # Verify data is correct + run_sql "SELECT COUNT(*) = 5 FROM $DB.compaction_snapshot" || { + echo "compaction_snapshot doesn't have expected row count after filtered restore" + exit 1 + } + + # Test 3: Restore only log table with filter + run_sql "drop schema if exists $DB;" + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \ + --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \ + -f "$DB.compaction_log" + + # Verify only log table exists + verify_no_unexpected_tables 1 "$DB" || { + echo "Wrong number of tables restored with compaction_log filter" + exit 1 + } + + # Verify data is correct + run_sql "SELECT COUNT(*) = 3 FROM $DB.compaction_log" || { + echo "compaction_log doesn't have expected row count after filtered restore" + exit 1 + } + + rm -rf "$TEST_DIR/$TASK_NAME" + + echo "log compaction with filter test passed" +} + echo "run all test cases" test_basic_filter test_with_full_backup_filter @@ -712,5 +1368,9 @@ test_with_checkpoint test_system_tables test_foreign_keys test_index_filter +test_partition_exchange +test_table_truncation +test_sequential_restore +test_log_compaction echo "br pitr table filter all tests passed"