Skip to content

Commit 74983c9

Browse files
authored
executor: fix data inconsistency after load data ... replace into ... (pingcap#56415) (pingcap#56508)
close pingcap#56408
1 parent cceec97 commit 74983c9

File tree

3 files changed

+22
-6
lines changed

3 files changed

+22
-6
lines changed

pkg/executor/insert_common.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1172,11 +1172,7 @@ func (e *InsertValues) handleDuplicateKey(ctx context.Context, txn kv.Transactio
11721172
if handle == nil {
11731173
return false, nil
11741174
}
1175-
_, err = e.removeRow(ctx, txn, handle, r, true)
1176-
if err != nil {
1177-
return false, err
1178-
}
1179-
return false, nil
1175+
return e.removeRow(ctx, txn, handle, r, true)
11801176
}
11811177

11821178
// batchCheckAndInsert checks rows with duplicate errors.

pkg/executor/test/loaddatatest/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ go_test(
99
],
1010
flaky = True,
1111
race = "on",
12-
shard_count = 10,
12+
shard_count = 11,
1313
deps = [
1414
"//br/pkg/lightning/mydump",
1515
"//pkg/config",

pkg/executor/test/loaddatatest/load_data_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,3 +476,23 @@ func TestLoadDataIntoPartitionedTable(t *testing.T) {
476476
selectSQL := "select * from range_t order by a;"
477477
checkCases(tests, ld, t, tk, ctx, selectSQL, deleteSQL)
478478
}
479+
480+
func TestFix56408(t *testing.T) {
481+
store := testkit.CreateMockStore(t)
482+
tk := testkit.NewTestKit(t, store)
483+
tk.MustExec("USE test; DROP TABLE IF EXISTS load_data_replace;")
484+
tk.MustExec("create table a(id int,name varchar(20),addr varchar(100),primary key (id) nonclustered);")
485+
tk.MustExec("LOAD DATA LOCAL INFILE '/tmp/nonexistence.csv' REPLACE INTO TABLE a FIELDS terminated by '|';")
486+
ctx := tk.Session().(sessionctx.Context)
487+
ld := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataWorker)
488+
tests := []testCase{
489+
{[]byte("1|aa|beijing\n1|aa|beijing\n1|aa|beijing\n1|aa|beijing\n2|bb|shanghai\n2|bb|shanghai\n2|bb|shanghai\n3|cc|guangzhou\n"),
490+
[]string{"1 aa beijing", "2 bb shanghai", "3 cc guangzhou"},
491+
"Records: 8 Deleted: 0 Skipped: 5 Warnings: 0",
492+
},
493+
}
494+
deleteSQL := "delete from a;"
495+
selectSQL := "select * from a;"
496+
checkCases(tests, ld, t, tk, ctx, selectSQL, deleteSQL)
497+
tk.MustExec("ADMIN CHECK TABLE a")
498+
}

0 commit comments

Comments
 (0)