Skip to content

Commit 8f9949c

Browse files
authored
Merge branch 'pingcap:master' into fix_rollback_reorganize
2 parents d66d757 + 094f4df commit 8f9949c

File tree

416 files changed

+6793
-2996
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

416 files changed

+6793
-2996
lines changed

DEPS.bzl

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6157,13 +6157,13 @@ def go_deps():
61576157
name = "com_github_prometheus_common",
61586158
build_file_proto_mode = "disable_global",
61596159
importpath = "github.com/prometheus/common",
6160-
sha256 = "62f8ef01a9303e8767a035de11b10ef05ec100275109ab17734af21dbc22fa09",
6161-
strip_prefix = "github.com/prometheus/common@v0.48.0",
6160+
sha256 = "f5fdc8e60b2e3e4fa56d9af84d4a2d49039a08f0d2b63c732399d735360b1ba2",
6161+
strip_prefix = "github.com/prometheus/common@v0.51.0",
61626162
urls = [
6163-
"http://bazel-cache.pingcap.net:8080/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.48.0.zip",
6164-
"http://ats.apps.svc/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.48.0.zip",
6165-
"https://cache.hawkingrei.com/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.48.0.zip",
6166-
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.48.0.zip",
6163+
"http://bazel-cache.pingcap.net:8080/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.51.0.zip",
6164+
"http://ats.apps.svc/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.51.0.zip",
6165+
"https://cache.hawkingrei.com/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.51.0.zip",
6166+
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.51.0.zip",
61676167
],
61686168
)
61696169
go_repository(
@@ -7145,26 +7145,26 @@ def go_deps():
71457145
name = "com_github_tikv_client_go_v2",
71467146
build_file_proto_mode = "disable_global",
71477147
importpath = "github.com/tikv/client-go/v2",
7148-
sha256 = "1838f5b1e46ccef68f651efd1a01a2b70037062ba6a08d58b2bd6aa8d3580e0c",
7149-
strip_prefix = "github.com/tikv/client-go/[email protected].20240318065517-a9128e8200ab",
7148+
sha256 = "86c2da8180c318c8258d4759fcae926a5613f5ff929dfb3461ce307c15cc44a5",
7149+
strip_prefix = "github.com/tikv/client-go/[email protected].20240322070737-05aaba6cc6f7",
71507150
urls = [
7151-
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240318065517-a9128e8200ab.zip",
7152-
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240318065517-a9128e8200ab.zip",
7153-
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240318065517-a9128e8200ab.zip",
7154-
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240318065517-a9128e8200ab.zip",
7151+
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240322070737-05aaba6cc6f7.zip",
7152+
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240322070737-05aaba6cc6f7.zip",
7153+
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240322070737-05aaba6cc6f7.zip",
7154+
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240322070737-05aaba6cc6f7.zip",
71557155
],
71567156
)
71577157
go_repository(
71587158
name = "com_github_tikv_pd_client",
71597159
build_file_proto_mode = "disable_global",
71607160
importpath = "github.com/tikv/pd/client",
7161-
sha256 = "d146feec4d22cef3825cf50e6bf5e6ab10179678ccb389ca5bf6e88f3c625294",
7162-
strip_prefix = "github.com/tikv/pd/[email protected]20240229065730-92a31c12238e",
7161+
sha256 = "9ebd4a2ecfd4d7c03f59deafc9d901e3c2ea308870e4dbaf91db1295b24e7ac1",
7162+
strip_prefix = "github.com/tikv/pd/[email protected]20240322051414-fb9e2d561b6e",
71637163
urls = [
7164-
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240229065730-92a31c12238e.zip",
7165-
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240229065730-92a31c12238e.zip",
7166-
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240229065730-92a31c12238e.zip",
7167-
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240229065730-92a31c12238e.zip",
7164+
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240322051414-fb9e2d561b6e.zip",
7165+
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240322051414-fb9e2d561b6e.zip",
7166+
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240322051414-fb9e2d561b6e.zip",
7167+
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240322051414-fb9e2d561b6e.zip",
71687168
],
71697169
)
71707170
go_repository(

Makefile

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,13 @@ bazel_importintotest4: failpoint-enable bazel_ci_simple_prepare
628628
-- //tests/realtikvtest/importintotest4/...
629629
./build/jenkins_collect_coverage.sh
630630

631+
# on timeout, bazel won't print log sometimes, so we use --test_output=all to print log always
632+
bazel_pipelineddmltest: failpoint-enable bazel_ci_simple_prepare
633+
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) $(BAZEL_INSTRUMENTATION_FILTER) --test_output=all --test_arg=-with-real-tikv --define gotags=deadlock,intest \
634+
--@io_bazel_rules_go//go/config:cover_format=go_cover \
635+
-- //tests/realtikvtest/pipelineddmltest/...
636+
./build/jenkins_collect_coverage.sh
637+
631638
bazel_lint: bazel_prepare
632639
bazel build //... --//build:with_nogo_flag=true
633640

OWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ approvers:
5858
- Leavrth
5959
- leoppro
6060
- lichunzhu
61+
- lidezhu
6162
- Little-Wallace
6263
- liuzix
6364
- lonng

br/pkg/backup/prepare_snap/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ go_test(
3535
timeout = "short",
3636
srcs = ["prepare_test.go"],
3737
flaky = True,
38-
shard_count = 8,
38+
shard_count = 9,
3939
deps = [
4040
":prepare_snap",
4141
"//br/pkg/utils",

br/pkg/backup/prepare_snap/env.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package preparesnap
1717
import (
1818
"context"
1919
"slices"
20+
"sync"
2021
"time"
2122

2223
"github.com/docker/go-units"
@@ -110,6 +111,34 @@ func (c CliEnv) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) {
110111
return withoutTiFlash, err
111112
}
112113

114+
func AdaptForGRPCInTest(p PrepareClient) PrepareClient {
115+
return &gRPCGoAdapter{
116+
inner: p,
117+
}
118+
}
119+
120+
// GrpcGoAdapter makes the `Send` call synchronous.
121+
// grpc-go doesn't guarantee concurrency call to `Send` or `Recv` is safe.
122+
// But concurrency call to `send` and `recv` is safe.
123+
// This type is exported for testing.
124+
type gRPCGoAdapter struct {
125+
inner PrepareClient
126+
sendMu sync.Mutex
127+
recvMu sync.Mutex
128+
}
129+
130+
func (s *gRPCGoAdapter) Send(req *brpb.PrepareSnapshotBackupRequest) error {
131+
s.sendMu.Lock()
132+
defer s.sendMu.Unlock()
133+
return s.inner.Send(req)
134+
}
135+
136+
func (s *gRPCGoAdapter) Recv() (*brpb.PrepareSnapshotBackupResponse, error) {
137+
s.recvMu.Lock()
138+
defer s.recvMu.Unlock()
139+
return s.inner.Recv()
140+
}
141+
113142
func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
114143
var cli brpb.Backup_PrepareSnapshotBackupClient
115144
err := c.Mgr.TryWithConn(ctx, storeID, func(cc *grpc.ClientConn) error {
@@ -124,7 +153,7 @@ func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClie
124153
if err != nil {
125154
return nil, err
126155
}
127-
return cli, nil
156+
return &gRPCGoAdapter{inner: cli}, nil
128157
}
129158

130159
func (c CliEnv) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) {

br/pkg/backup/prepare_snap/prepare.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ type Preparer struct {
9191
RetryBackoff time.Duration
9292
RetryLimit int
9393
LeaseDuration time.Duration
94+
95+
/* Observers. Initialize them before starting.*/
96+
AfterConnectionsEstablished func()
9497
}
9598

9699
func New(env Env) *Preparer {
@@ -159,6 +162,9 @@ func (p *Preparer) DriveLoopAndWaitPrepare(ctx context.Context) error {
159162
log.Error("failed to prepare connections", logutil.ShortError(err))
160163
return errors.Annotate(err, "failed to prepare connections")
161164
}
165+
if p.AfterConnectionsEstablished != nil {
166+
p.AfterConnectionsEstablished()
167+
}
162168
if err := p.AdvanceState(ctx); err != nil {
163169
log.Error("failed to check the progress of our work", logutil.ShortError(err))
164170
return errors.Annotate(err, "failed to begin step")

br/pkg/backup/prepare_snap/prepare_test.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"io"
2222
"slices"
2323
"sync"
24+
"sync/atomic"
2425
"testing"
2526
"time"
2627

@@ -177,7 +178,7 @@ func (m *mockStores) ConnectToStore(ctx context.Context, storeID uint64) (Prepar
177178
}
178179
m.onCreateStore(m.stores[storeID])
179180
}
180-
return m.stores[storeID], nil
181+
return AdaptForGRPCInTest(m.stores[storeID]), nil
181182
}
182183

183184
func (m *mockStores) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) {
@@ -474,7 +475,6 @@ func TestSplitEnv(t *testing.T) {
474475
}
475476

476477
func TestConnectionDelay(t *testing.T) {
477-
log.SetLevel(zapcore.DebugLevel)
478478
req := require.New(t)
479479
pdc := fakeCluster(t, 3, dummyRegions(100)...)
480480
ms := newTestEnv(pdc)
@@ -510,3 +510,31 @@ func TestConnectionDelay(t *testing.T) {
510510
delayConn <- struct{}{}
511511
req.NoError(<-connectionPrepareResult)
512512
}
513+
514+
func TestHooks(t *testing.T) {
515+
req := require.New(t)
516+
pdc := fakeCluster(t, 3, dummyRegions(100)...)
517+
pauseWaitApply := make(chan struct{})
518+
ms := newTestEnv(pdc)
519+
ms.onCreateStore = func(ms *mockStore) {
520+
ms.onWaitApply = func(r *metapb.Region) error {
521+
<-pauseWaitApply
522+
return nil
523+
}
524+
}
525+
adv := New(ms)
526+
connectionsEstablished := new(atomic.Bool)
527+
adv.AfterConnectionsEstablished = func() {
528+
connectionsEstablished.Store(true)
529+
}
530+
errCh := make(chan error, 1)
531+
go func() {
532+
errCh <- adv.DriveLoopAndWaitPrepare(context.Background())
533+
}()
534+
req.Eventually(connectionsEstablished.Load, 1*time.Second, 100*time.Millisecond)
535+
close(pauseWaitApply)
536+
req.NoError(<-errCh)
537+
ms.AssertSafeForBackup(t)
538+
req.NoError(adv.Finalize(context.Background()))
539+
ms.AssertIsNormalMode(t)
540+
}

br/pkg/checkpoint/checkpoint.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"math/rand"
2525
"strings"
2626
"sync"
27+
"sync/atomic"
2728
"time"
2829

2930
"github.com/pingcap/errors"
@@ -846,27 +847,18 @@ func removeCheckpointData(ctx context.Context, s storage.ExternalStorage, subDir
846847
zap.Int64("remove-size", removeSize),
847848
)
848849

849-
maxFailedFilesNum := 16
850-
failedFilesCount := struct {
851-
lock sync.Mutex
852-
count int
853-
}{
854-
count: 0,
855-
}
850+
maxFailedFilesNum := int64(16)
851+
var failedFilesCount atomic.Int64
856852
pool := utils.NewWorkerPool(4, "checkpoint remove worker")
857853
eg, gCtx := errgroup.WithContext(ctx)
858854
for _, filename := range removedFileNames {
859855
name := filename
860856
pool.ApplyOnErrorGroup(eg, func() error {
861857
if err := s.DeleteFile(gCtx, name); err != nil {
862858
log.Warn("failed to remove the file", zap.String("filename", name), zap.Error(err))
863-
failedFilesCount.lock.Lock()
864-
failedFilesCount.count += 1
865-
if failedFilesCount.count >= maxFailedFilesNum {
866-
failedFilesCount.lock.Unlock()
859+
if failedFilesCount.Add(1) >= maxFailedFilesNum {
867860
return errors.Annotate(err, "failed to delete too many files")
868861
}
869-
failedFilesCount.lock.Unlock()
870862
}
871863
return nil
872864
})

br/pkg/lightning/backend/external/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ go_test(
6262
"engine_test.go",
6363
"file_test.go",
6464
"iter_test.go",
65+
"merge_test.go",
6566
"misc_bench_test.go",
6667
"onefile_writer_test.go",
6768
"reader_test.go",

br/pkg/lightning/backend/external/bench_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -500,13 +500,8 @@ func mergeStep(t *testing.T, s *mergeTestSuite) {
500500
datas,
501501
s.store,
502502
int64(5*size.MB),
503-
64*1024,
504503
mergeOutput,
505504
DefaultBlockSize,
506-
DefaultMemSizeLimit,
507-
8*1024,
508-
1*size.MB,
509-
8*1024,
510505
onClose,
511506
s.concurrency,
512507
s.mergeIterHotspot,

0 commit comments

Comments
 (0)