Skip to content

Commit 9fd5f4a

Browse files
authored
lightning: fix "context cancel" overwrites the real error (#44734) (#45000)
close #44733
1 parent 54f275b commit 9fd5f4a

File tree

2 files changed

+66
-3
lines changed

2 files changed

+66
-3
lines changed

br/pkg/lightning/backend/local/local.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1145,14 +1145,16 @@ func (local *Backend) generateAndSendJob(
11451145
for _, jobRange := range jobRanges {
11461146
r := jobRange
11471147
eg.Go(func() error {
1148-
select {
1149-
case <-egCtx.Done():
1148+
if egCtx.Err() != nil {
11501149
return nil
1151-
default:
11521150
}
11531151

1152+
failpoint.Inject("beforeGenerateJob", nil)
11541153
jobs, err := local.generateJobForRange(egCtx, engine, r, regionSplitSize, regionSplitKeys)
11551154
if err != nil {
1155+
if common.IsContextCanceledError(err) {
1156+
return nil
1157+
}
11561158
return err
11571159
}
11581160
for _, job := range jobs {
@@ -1189,6 +1191,9 @@ func (local *Backend) generateJobForRange(
11891191
regionSplitSize, regionSplitKeys int64,
11901192
) ([]*regionJob, error) {
11911193
failpoint.Inject("fakeRegionJobs", func() {
1194+
if ctx.Err() != nil {
1195+
failpoint.Return(nil, ctx.Err())
1196+
}
11921197
key := [2]string{string(keyRange.start), string(keyRange.end)}
11931198
injected := fakeRegionJobs[key]
11941199
// overwrite the stage to regionScanned, because some time same keyRange
@@ -1565,6 +1570,7 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges
15651570
jobWg.Wait()
15661571
workerCancel()
15671572
firstErr.Set(workGroup.Wait())
1573+
firstErr.Set(ctx.Err())
15681574
return firstErr.Get()
15691575
}
15701576

br/pkg/lightning/backend/local/local_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2028,3 +2028,60 @@ func TestRegionJobResetRetryCounter(t *testing.T) {
20282028
}
20292029
}
20302030
}
2031+
2032+
func TestCtxCancelIsIgnored(t *testing.T) {
2033+
backup := maxRetryBackoffSecond
2034+
maxRetryBackoffSecond = 1
2035+
t.Cleanup(func() {
2036+
maxRetryBackoffSecond = backup
2037+
})
2038+
2039+
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()")
2040+
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs", "return()")
2041+
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/beforeGenerateJob", "sleep(1000)")
2042+
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace", "return()")
2043+
t.Cleanup(func() {
2044+
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter")
2045+
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs")
2046+
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/beforeGenerateJob")
2047+
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace")
2048+
})
2049+
2050+
initRanges := []Range{
2051+
{start: []byte{'c'}, end: []byte{'d'}},
2052+
{start: []byte{'d'}, end: []byte{'e'}},
2053+
}
2054+
fakeRegionJobs = map[[2]string]struct {
2055+
jobs []*regionJob
2056+
err error
2057+
}{
2058+
{"c", "d"}: {
2059+
jobs: []*regionJob{
2060+
{
2061+
keyRange: Range{start: []byte{'c'}, end: []byte{'d'}},
2062+
engine: &Engine{},
2063+
injected: getSuccessInjectedBehaviour(),
2064+
},
2065+
},
2066+
},
2067+
{"d", "e"}: {
2068+
jobs: []*regionJob{
2069+
{
2070+
keyRange: Range{start: []byte{'d'}, end: []byte{'e'}},
2071+
engine: &Engine{},
2072+
injected: getSuccessInjectedBehaviour(),
2073+
},
2074+
},
2075+
},
2076+
}
2077+
2078+
ctx := context.Background()
2079+
l := &Backend{
2080+
BackendConfig: BackendConfig{
2081+
WorkerConcurrency: 1,
2082+
},
2083+
}
2084+
e := &Engine{}
2085+
err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
2086+
require.ErrorContains(t, err, "the remaining storage capacity of TiKV")
2087+
}

0 commit comments

Comments
 (0)