Skip to content

Commit 532ca35

Browse files
authored
br/operator: fix adapt env for snapshot backup stuck when encountered error (pingcap#52607) (pingcap#56717)
close pingcap#52049
1 parent c08ecf1 commit 532ca35

File tree

5 files changed

+46
-12
lines changed

5 files changed

+46
-12
lines changed

br/pkg/backup/prepare_snap/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ go_library(
1717
"@com_github_docker_go_units//:go-units",
1818
"@com_github_google_btree//:btree",
1919
"@com_github_pingcap_errors//:errors",
20+
"@com_github_pingcap_failpoint//:failpoint",
2021
"@com_github_pingcap_kvproto//pkg/brpb",
2122
"@com_github_pingcap_kvproto//pkg/errorpb",
2223
"@com_github_pingcap_kvproto//pkg/metapb",

br/pkg/backup/prepare_snap/prepare.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/google/btree"
2424
"github.com/pingcap/errors"
25+
"github.com/pingcap/failpoint"
2526
brpb "github.com/pingcap/kvproto/pkg/brpb"
2627
"github.com/pingcap/kvproto/pkg/metapb"
2728
"github.com/pingcap/log"
@@ -453,6 +454,9 @@ func (p *Preparer) pushWaitApply(reqs pendingRequests, region Region) {
453454
// PrepareConnections prepares the connections for each store.
454455
// This will pause the admin commands for each store.
455456
func (p *Preparer) PrepareConnections(ctx context.Context) error {
457+
failpoint.Inject("PrepareConnectionsErr", func() {
458+
failpoint.Return(errors.New("mock PrepareConnectionsErr"))
459+
})
456460
log.Info("Preparing connections to stores.")
457461
stores, err := p.env.GetAllLiveStores(ctx)
458462
if err != nil {

br/pkg/task/operator/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ go_library(
1616
"//br/pkg/task",
1717
"//br/pkg/utils",
1818
"@com_github_pingcap_errors//:errors",
19+
"@com_github_pingcap_failpoint//:failpoint",
1920
"@com_github_pingcap_log//:log",
2021
"@com_github_spf13_pflag//:pflag",
2122
"@com_github_tikv_client_go_v2//tikv",

br/pkg/task/operator/cmd.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,12 @@ package operator
55
import (
66
"context"
77
"crypto/tls"
8-
"fmt"
9-
"math/rand"
10-
"os"
118
"runtime/debug"
129
"sync"
1310
"time"
1411

1512
"github.com/pingcap/errors"
13+
"github.com/pingcap/failpoint"
1614
"github.com/pingcap/log"
1715
preparesnap "github.com/pingcap/tidb/br/pkg/backup/prepare_snap"
1816
berrors "github.com/pingcap/tidb/br/pkg/errors"
@@ -138,12 +136,19 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
138136
cx.run(func() error { return pauseGCKeeper(cx) })
139137
cx.run(func() error {
140138
log.Info("Pause scheduler waiting all connections established.")
141-
<-initChan
139+
select {
140+
case <-initChan:
141+
case <-cx.Done():
142+
return cx.Err()
143+
}
142144
log.Info("Pause scheduler noticed connections established.")
143145
return pauseSchedulerKeeper(cx)
144146
})
145147
cx.run(func() error { return pauseAdminAndWaitApply(cx, initChan) })
146148
go func() {
149+
failpoint.Inject("SkipReadyHint", func() {
150+
failpoint.Return()
151+
})
147152
cx.rdGrp.Wait()
148153
if cfg.OnAllReady != nil {
149154
cfg.OnAllReady()
@@ -192,14 +197,6 @@ func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext, afterConnectio
192197
return nil
193198
}
194199

195-
func getCallerName() string {
196-
name, err := os.Hostname()
197-
if err != nil {
198-
name = fmt.Sprintf("UNKNOWN-%d", rand.Int63())
199-
}
200-
return fmt.Sprintf("operator@%sT%d#%d", name, time.Now().Unix(), os.Getpid())
201-
}
202-
203200
func pauseGCKeeper(cx *AdaptEnvForSnapshotBackupContext) (err error) {
204201
// Note: should we remove the service safepoint as soon as this exits?
205202
sp := utils.BRServiceSafePoint{

tests/realtikvtest/brietest/operator_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"time"
2424

2525
"github.com/google/uuid"
26+
"github.com/pingcap/failpoint"
2627
"github.com/pingcap/kvproto/pkg/import_sstpb"
2728
"github.com/pingcap/kvproto/pkg/kvrpcpb"
2829
"github.com/pingcap/tidb/br/pkg/task"
@@ -224,3 +225,33 @@ func TestOperator(t *testing.T) {
224225
verifySchedulerNotStopped(req, cfg)
225226
verifyGCNotStopped(req, cfg)
226227
}
228+
229+
func TestFailure(t *testing.T) {
230+
req := require.New(t)
231+
req.NoError(failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/prepare_snap/PrepareConnectionsErr", "return()"))
232+
// Make goleak happy.
233+
req.NoError(failpoint.Enable("github.com/pingcap/tidb/br/pkg/task/operator/SkipReadyHint", "return()"))
234+
defer func() {
235+
req.NoError(failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/prepare_snap/PrepareConnectionsErr"))
236+
req.NoError(failpoint.Disable("github.com/pingcap/tidb/br/pkg/task/operator/SkipReadyHint"))
237+
}()
238+
239+
cfg := operator.PauseGcConfig{
240+
Config: task.Config{
241+
PD: []string{"127.0.0.1:2379"},
242+
},
243+
TTL: 5 * time.Minute,
244+
SafePoint: oracle.GoTimeToTS(time.Now()),
245+
}
246+
247+
verifyGCNotStopped(req, cfg)
248+
verifySchedulerNotStopped(req, cfg)
249+
250+
ctx, cancel := context.WithCancel(context.Background())
251+
defer cancel()
252+
err := operator.AdaptEnvForSnapshotBackup(ctx, &cfg)
253+
require.Error(t, err)
254+
255+
verifyGCNotStopped(req, cfg)
256+
verifySchedulerNotStopped(req, cfg)
257+
}

0 commit comments

Comments
 (0)