Skip to content
23 changes: 23 additions & 0 deletions pkg/sink/cloudstorage/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,19 @@ func (f *FilePathGenerator) CheckOrWriteSchema(
return err
}
if exist {
log.Info("table schema file already exists",
zap.String("namespace", f.changefeedID.Namespace()),
zap.Stringer("changefeedID", f.changefeedID.ID()),
zap.Any("versionedTableName", table),
zap.String("path", tblSchemaFile))
f.versionMap[table] = table.TableInfoVersion
return nil
}
log.Info("table schema file not exists",
zap.String("namespace", f.changefeedID.Namespace()),
zap.Stringer("changefeedID", f.changefeedID.ID()),
zap.Any("versionedTableName", table),
zap.String("path", tblSchemaFile))

// walk the table meta path to find the last schema file
_, checksum := mustParseSchemaName(tblSchemaFile)
Expand All @@ -216,6 +226,11 @@ func (f *FilePathGenerator) CheckOrWriteSchema(
if !strings.HasSuffix(path, checksumSuffix) {
return nil
}
log.Info("found schema file with the same checksum",
zap.String("namespace", f.changefeedID.Namespace()),
zap.Stringer("changefeedID", f.changefeedID.ID()),
zap.Any("versionedTableName", table),
zap.String("path", path))
version, parsedChecksum := mustParseSchemaName(path)
if parsedChecksum != checksum {
log.Error("invalid schema file name",
Expand All @@ -237,6 +252,14 @@ func (f *FilePathGenerator) CheckOrWriteSchema(

// Case 2: the table meta path is not empty.
if schemaFileCnt != 0 && lastVersion != 0 {
log.Info("table schema file with the same checksum already exists",
zap.String("namespace", f.changefeedID.Namespace()),
zap.Stringer("changefeedID", f.changefeedID.ID()),
zap.Any("versionedTableName", table),
zap.Uint64("tableVersion", lastVersion),
zap.Uint32("checksum", checksum))
// record the last version of the table schema file.
// we don't need to write schema file to external storage again.
f.versionMap[table] = lastVersion
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@ check-struct-only = false
[task]
output-dir = "/tmp/tidb_cdc_test/canal_json_storage_partition_table/sync_diff/output"

source-instances = ["mysql1"]
source-instances = ["tidb0"]

target-instance = "tidb0"
target-instance = "mysql1"

target-check-tables = ["partition_table.?*"]

[data-sources]
[data-sources.mysql1]
[data-sources.tidb0]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
[data-sources.mysql1]
host = "127.0.0.1"
port = 3306
user = "root"
Expand Down
33 changes: 16 additions & 17 deletions tests/integration_tests/run_heavy_it_in_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -145,39 +145,38 @@ pulsar_groups=(

storage_groups=(
# G00
'generate_column many_pk_or_uk multi_source'
'canal_json_storage_partition_table'
# G01
csv_storage_update_pk_clustered csv_storage_update_pk_nonclustered
''
'canal_json_storage_partition_table'
# G02
'canal_json_storage_basic canal_json_storage_partition_table'
'canal_json_storage_partition_table'
# G03
'csv_storage_basic storage_csv_update'
'canal_json_storage_partition_table'
# G04
'ddl_for_split_tables_with_random_move_table'
'canal_json_storage_partition_table'
# G05
'move_table drop_many_tables'
'canal_json_storage_partition_table'
# G06
'cdc default_value'
'canal_json_storage_partition_table'
# G07
'merge_table resolve_lock force_replicate_table'
'canal_json_storage_partition_table'
# G08
'tidb_mysql_test'
'canal_json_storage_partition_table'
# G09
'ddl_for_split_tables_with_merge_and_split'
'canal_json_storage_partition_table'
# G10
'ddl_for_split_tables_with_random_merge_and_split'
'canal_json_storage_partition_table'
# G11
# fail_over_ddl_mix_with_syncpoint
''
'canal_json_storage_partition_table'
# G12
'ddl_with_random_move_table'
'canal_json_storage_partition_table'
# G13
'fail_over region_merge'
'canal_json_storage_partition_table'
# G14
'fail_over_ddl_mix'
'canal_json_storage_partition_table'
# G15
''
'canal_json_storage_partition_table'
)

# Source shared functions and check test coverage
Expand Down