Skip to content

Commit d08472b

Browse files
committed
Merge remote-tracking branch 'origin/master' into vectorindex-nonintpk
2 parents ccb08e7 + 3016889 commit d08472b

File tree

118 files changed

+1561
-474
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

118 files changed

+1561
-474
lines changed

.bazelrc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ run:ci --color=yes
66

77
build --announce_rc
88
build --experimental_guard_against_concurrent_changes
9-
build --experimental_remote_merkle_tree_cache
109
build --java_language_version=17
1110
build --java_runtime_version=17
1211
build --tool_java_language_version=17

DEPS.bzl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4970,13 +4970,13 @@ def go_deps():
49704970
name = "com_github_mgechev_revive",
49714971
build_file_proto_mode = "disable_global",
49724972
importpath = "github.com/mgechev/revive",
4973-
sha256 = "3d46bef25a1a3822b922a67b98d0d984cebf6828ee90e0db260cb05bb89351bf",
4974-
strip_prefix = "github.com/mgechev/[email protected].0",
4973+
sha256 = "a349123cc03e6d0a89328aebe884aa8b3e405d9548a44a4d20437dec7e7b11d0",
4974+
strip_prefix = "github.com/mgechev/[email protected].1",
49754975
urls = [
4976-
"http://bazel-cache.pingcap.net:8080/gomod/github.com/mgechev/revive/com_github_mgechev_revive-v1.5.0.zip",
4977-
"http://ats.apps.svc/gomod/github.com/mgechev/revive/com_github_mgechev_revive-v1.5.0.zip",
4978-
"https://cache.hawkingrei.com/gomod/github.com/mgechev/revive/com_github_mgechev_revive-v1.5.0.zip",
4979-
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/mgechev/revive/com_github_mgechev_revive-v1.5.0.zip",
4976+
"http://bazel-cache.pingcap.net:8080/gomod/github.com/mgechev/revive/com_github_mgechev_revive-v1.5.1.zip",
4977+
"http://ats.apps.svc/gomod/github.com/mgechev/revive/com_github_mgechev_revive-v1.5.1.zip",
4978+
"https://cache.hawkingrei.com/gomod/github.com/mgechev/revive/com_github_mgechev_revive-v1.5.1.zip",
4979+
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/mgechev/revive/com_github_mgechev_revive-v1.5.1.zip",
49804980
],
49814981
)
49824982
go_repository(

br/pkg/backup/store.go

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func (r ResponseAndStore) GetStoreID() uint64 {
6060

6161
// timeoutRecv cancel the context if `Refresh()` is not called within the specified time `timeout`.
6262
type timeoutRecv struct {
63+
storeID uint64
6364
wg sync.WaitGroup
6465
parentCtx context.Context
6566
cancel context.CancelCauseFunc
@@ -98,15 +99,17 @@ func (trecv *timeoutRecv) loop(timeout time.Duration) {
9899
return
99100
}
100101
case <-ticker.C:
101-
log.Warn("receive a backup response timeout")
102+
log.Warn("wait backup response timeout, cancel the backup",
103+
zap.Duration("timeout", timeout), zap.Uint64("storeID", trecv.storeID))
102104
trecv.cancel(errors.Errorf("receive a backup response timeout"))
103105
}
104106
}
105107
}
106108

107-
func StartTimeoutRecv(ctx context.Context, timeout time.Duration) (context.Context, *timeoutRecv) {
109+
func StartTimeoutRecv(ctx context.Context, timeout time.Duration, storeID uint64) (context.Context, *timeoutRecv) {
108110
cctx, cancel := context.WithCancelCause(ctx)
109111
trecv := &timeoutRecv{
112+
storeID: storeID,
110113
parentCtx: ctx,
111114
cancel: cancel,
112115
refresh: make(chan struct{}),
@@ -117,15 +120,11 @@ func StartTimeoutRecv(ctx context.Context, timeout time.Duration) (context.Conte
117120
}
118121

119122
func doSendBackup(
120-
pctx context.Context,
123+
ctx context.Context,
121124
client backuppb.BackupClient,
122125
req backuppb.BackupRequest,
123126
respFn func(*backuppb.BackupResponse) error,
124127
) error {
125-
// Backup might be stuck on GRPC `waitonHeader`, so start a timeout ticker to
126-
// terminate the backup if it does not receive any new response for a long time.
127-
ctx, timerecv := StartTimeoutRecv(pctx, TimeoutOneResponse)
128-
defer timerecv.Stop()
129128
failpoint.Inject("hint-backup-start", func(v failpoint.Value) {
130129
logutil.CL(ctx).Info("failpoint hint-backup-start injected, " +
131130
"process will notify the shell.")
@@ -170,7 +169,6 @@ func doSendBackup(
170169

171170
for {
172171
resp, err := bCli.Recv()
173-
timerecv.Refresh()
174172
if err != nil {
175173
if errors.Cause(err) == io.EOF { // nolint:errorlint
176174
logutil.CL(ctx).Debug("backup streaming finish",
@@ -193,7 +191,7 @@ func doSendBackup(
193191
}
194192

195193
func startBackup(
196-
ctx context.Context,
194+
pctx context.Context,
197195
storeID uint64,
198196
backupReq backuppb.BackupRequest,
199197
backupCli backuppb.BackupClient,
@@ -202,14 +200,21 @@ func startBackup(
202200
) error {
203201
// this goroutine handle the response from a single store
204202
select {
205-
case <-ctx.Done():
206-
return ctx.Err()
203+
case <-pctx.Done():
204+
return pctx.Err()
207205
default:
208-
logutil.CL(ctx).Info("try backup", zap.Uint64("storeID", storeID))
209206
// Send backup request to the store.
210207
// handle the backup response or internal error here.
211208
// handle the store error(reboot or network partition) outside.
212209
reqs := SplitBackupReqRanges(backupReq, concurrency)
210+
logutil.CL(pctx).Info("starting backup to the corresponding store", zap.Uint64("storeID", storeID),
211+
zap.Int("requestCount", len(reqs)), zap.Uint("concurrency", concurrency))
212+
213+
// Backup might be stuck on GRPC `waitonHeader`, so start a timeout ticker to
214+
// terminate the backup if it does not receive any new response for a long time.
215+
ctx, timerecv := StartTimeoutRecv(pctx, TimeoutOneResponse, storeID)
216+
defer timerecv.Stop()
217+
213218
pool := tidbutil.NewWorkerPool(concurrency, "store_backup")
214219
eg, ectx := errgroup.WithContext(ctx)
215220
for i, req := range reqs {
@@ -219,8 +224,10 @@ func startBackup(
219224
retry := -1
220225
return utils.WithRetry(ectx, func() error {
221226
retry += 1
222-
logutil.CL(ectx).Info("backup to store", zap.Uint64("storeID", storeID),
223-
zap.Int("retry", retry), zap.Int("reqIndex", reqIndex))
227+
if retry > 1 {
228+
logutil.CL(ectx).Info("retry backup to store", zap.Uint64("storeID", storeID),
229+
zap.Int("retry", retry), zap.Int("reqIndex", reqIndex))
230+
}
224231
return doSendBackup(ectx, backupCli, bkReq, func(resp *backuppb.BackupResponse) error {
225232
// Forward all responses (including error).
226233
failpoint.Inject("backup-timeout-error", func(val failpoint.Value) {
@@ -263,6 +270,8 @@ func startBackup(
263270
Resp: resp,
264271
StoreID: storeID,
265272
}:
273+
// reset timeout when receive a response
274+
timerecv.Refresh()
266275
}
267276
return nil
268277
})
@@ -326,7 +335,7 @@ func ObserveStoreChangesAsync(ctx context.Context, stateNotifier chan BackupRetr
326335
// reset the state
327336
sendAll = false
328337
clear(newJoinStoresMap)
329-
logutil.CL(ctx).Info("check store changes every tick")
338+
logutil.CL(ctx).Info("check store changes every 30s")
330339
err := watcher.Step(ctx)
331340
if err != nil {
332341
logutil.CL(ctx).Warn("failed to watch store changes, ignore it", zap.Error(err))

br/pkg/backup/store_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package backup
1616

1717
import (
1818
"context"
19-
"io"
2019
"testing"
2120
"time"
2221

@@ -58,41 +57,42 @@ func TestTimeoutRecv(t *testing.T) {
5857
TimeoutOneResponse = time.Millisecond * 800
5958
// Just Timeout Once
6059
{
61-
err := doSendBackup(ctx, &MockBackupClient{
60+
err := startBackup(ctx, 0, backuppb.BackupRequest{}, &MockBackupClient{
6261
recvFunc: func(ctx context.Context) (*backuppb.BackupResponse, error) {
6362
time.Sleep(time.Second)
6463
require.Error(t, ctx.Err())
65-
return nil, io.EOF
64+
return nil, ctx.Err()
6665
},
67-
}, backuppb.BackupRequest{}, func(br *backuppb.BackupResponse) error { return nil })
68-
require.NoError(t, err)
66+
}, 1, nil)
67+
require.Error(t, err)
6968
}
7069

7170
// Timeout Not At First
7271
{
7372
count := 0
74-
err := doSendBackup(ctx, &MockBackupClient{
73+
err := startBackup(ctx, 0, backuppb.BackupRequest{}, &MockBackupClient{
7574
recvFunc: func(ctx context.Context) (*backuppb.BackupResponse, error) {
7675
require.NoError(t, ctx.Err())
7776
if count == 15 {
7877
time.Sleep(time.Second)
7978
require.Error(t, ctx.Err())
80-
return nil, io.EOF
79+
return nil, ctx.Err()
8180
}
8281
count += 1
8382
time.Sleep(time.Millisecond * 80)
8483
return &backuppb.BackupResponse{}, nil
8584
},
86-
}, backuppb.BackupRequest{}, func(br *backuppb.BackupResponse) error { return nil })
87-
require.NoError(t, err)
85+
}, 1, make(chan *ResponseAndStore, 15))
86+
require.Error(t, err)
87+
require.Equal(t, count, 15)
8888
}
8989
}
9090

9191
func TestTimeoutRecvCancel(t *testing.T) {
9292
ctx := context.Background()
9393
cctx, cancel := context.WithCancel(ctx)
9494

95-
_, trecv := StartTimeoutRecv(cctx, time.Hour)
95+
_, trecv := StartTimeoutRecv(cctx, time.Hour, 0)
9696
cancel()
9797
trecv.wg.Wait()
9898
}
@@ -102,7 +102,7 @@ func TestTimeoutRecvCanceled(t *testing.T) {
102102
cctx, cancel := context.WithCancel(ctx)
103103
defer cancel()
104104

105-
tctx, trecv := StartTimeoutRecv(cctx, time.Hour)
105+
tctx, trecv := StartTimeoutRecv(cctx, time.Hour, 0)
106106
trecv.Stop()
107107
require.Equal(t, "context canceled", tctx.Err().Error())
108108
}

br/pkg/restore/snap_client/systable_restore_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,5 +116,5 @@ func TestCheckSysTableCompatibility(t *testing.T) {
116116
//
117117
// The above variables are in the file br/pkg/restore/systable_restore.go
118118
func TestMonitorTheSystemTableIncremental(t *testing.T) {
119-
require.Equal(t, int64(218), session.CurrentBootstrapVersion)
119+
require.Equal(t, int64(239), session.CurrentBootstrapVersion)
120120
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
create table `pt_case_0` (a int, b int, unique index idx(a) global) partition by hash(b) partitions 5;
2+
insert into `pt_case_0` values
3+
(0, 10),
4+
(1, 9),
5+
(2, 8),
6+
(3, 7),
7+
(4, 6),
8+
(5, 5),
9+
(6, 4),
10+
(7, 3),
11+
(8, 2),
12+
(9, 1),
13+
(10, 0);
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
create table `pt_case_1` (a int, b int, unique index idx(a) global) partition by list(b)
2+
(partition p0 values in (0, 1, 2, 3),
3+
partition p1 values in (4, 5, 6),
4+
partition p2 values in (7, 8, 9, 10));
5+
insert into `pt_case_1` values
6+
(0, 10),
7+
(1, 9),
8+
(2, 8),
9+
(3, 7),
10+
(4, 6),
11+
(5, 5),
12+
(6, 4),
13+
(7, 3),
14+
(8, 2),
15+
(9, 1),
16+
(10, 0);
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
create table `pt_case_2` (a int, b int, unique index idx(a) global) partition by range(b)
2+
(partition p0 values less than (4),
3+
partition p1 values less than (7),
4+
partition p2 values less than (11));
5+
insert into `pt_case_2` values
6+
(0, 10),
7+
(1, 9),
8+
(2, 8),
9+
(3, 7),
10+
(4, 6),
11+
(5, 5),
12+
(6, 4),
13+
(7, 3),
14+
(8, 2),
15+
(9, 1),
16+
(10, 0);
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
/*!40014 SET FOREIGN_KEY_CHECKS=0*/;
2+
/*!40101 SET NAMES binary*/;
3+
CREATE TABLE `pt_case_0` (
4+
`a` int DEFAULT NULL,
5+
`b` int DEFAULT NULL,
6+
UNIQUE KEY `idx` (`a`) /*T![global_index] GLOBAL */
7+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
8+
PARTITION BY HASH (`b`) PARTITIONS 5;
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*!40014 SET FOREIGN_KEY_CHECKS=0*/;
2+
/*!40101 SET NAMES binary*/;
3+
INSERT INTO `pt_case_0` VALUES
4+
(0,10),
5+
(1,9),
6+
(2,8),
7+
(3,7),
8+
(4,6),
9+
(5,5),
10+
(6,4),
11+
(7,3),
12+
(8,2),
13+
(9,1),
14+
(10,0);

0 commit comments

Comments
 (0)