Skip to content

Commit 485c646

Browse files
author
cbcwestwolf
committed
!426 同步开源 tag v8.5.0 上的 commits
* skip TestIndexJoin31494 * fix * global sort: add boundaries to split keys when generating plan (pingcap#58323) (pingcap#58356) * statistics: get right max table id when to init stats (pingcap#58280) (pingcap#58298) * executor: Fix the parse problematic slow log panic issue due to empty … * statstics: trigger evict by the timer (pingcap#58027) (pingcap#58268) * br: make table existence check unified on different br client (pingcap#58211) (pingcap#58262) * log backup: use global checkpoint ts as source of truth (pingcap#58135) (pingcap#58265) * executor: skip execution when build query for VIEW in I_S (pingcap#58203) (pingcap#58236) * statistics: copy stats when to update it for avoiding data race (pingcap#5810… * domain,infoschema: make infoschema activity block GC safepoint advanci… * planner: handle panic when loading bindings at startup (pingcap#58017) (pingcap#58035) * statistics: right deal with error for reading stats from storage (pingcap#58… * statistics: lite init used wrong value to build table stats ver (pingcap#5802… * lightning, ddl: set TS to engineMeta after ResetEngineSkipAllocTS (pingcap#5… * *: avoid unlock of unlocked mutex panic on TableDeltaMap (pingcap#57799) (pingcap#57997) * ddl: handle context done after sending DDL jobs (pingcap#57945) (pingcap#57989) * *: activate txn for query on infoschema tables (pingcap#57937) (pingcap#57951) * lightning: add PK to internal tables (pingcap#57480) (pingcap#57932) * statistics: correct behavior of non-lite InitStats and stats sync load… * statistics: avoid stats meta full load after table analysis (pingcap#57756) (pingcap#57911) * dumpling: use I_S to get table list for TiDB and add database to WHERE… * br: fix insert gc failed due to slow schema reload (pingcap#57742) (pingcap#57907) * statistics: do not record historical stats meta if the table is locked… * metrics: remove the filled colors (pingcap#57838) (pingcap#57866) * planner: use TableInfo.DBID to locate schema (pingcap#57785) (pingcap#57870) * *: support cancel query like 'select * from information_schema.tables'… * session: make `TxnInfo()` return even if process info is empty (pingcap#57044) (pingcap#57161) * ddl: Fixed partitioning a non-partitioned table with placement rules (… * *: Reorg partition fix delete ranges and handling non-clustered tables… * executor: fix query infoschema.tables table_schema/table_name with fil… * ddl: check context done in isReorgRunnable function (pingcap#57813) (pingcap#57820) * ddl: fix ExistsTableRow and add tests for skip reorg checks (pingcap#57778) (pingcap#57801) * *: Fix for TRUNCATE PARTITION and Global Index (pingcap#57724) * br: prompt k8s.io/api version (pingcap#57791) (pingcap#57802) * statistics: fix some problem related to stats async load (pingcap#57723) (pingcap#57775) * expression: fix wrong calculation order of `radians` (pingcap#57672) (pingcap#57688) * statistics: rightly deal with timout when to send sync load (pingcap#57712) (pingcap#57751) * ddl: `tidb_scatter_region` variable supports setting value in both upp… * planner: fix that vector index output empty result when pk is non-int … * ddl: dynamically adjusting the max write speed of reorganization job (… * executor: fix hang in hash agg when exceeding memory limit leads to pa… * statistics: use infoschema api to get table info (pingcap#57574) (pingcap#57614) * planner: Use realtimeRowCount when all topN collected (pingcap#56848) (pingcap#57689) * statistics: handle deleted tables correctly in the PQ (pingcap#57649) (pingcap#57674) * backup: reset timeout on store level (pingcap#55526) (pingcap#57667) * planner/core: fix a wrong privilege check for CTE & UPDATE statement (…
1 parent dc812f1 commit 485c646

File tree

212 files changed

+5908
-1921
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

212 files changed

+5908
-1921
lines changed

.bazelversion

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
7.4.1

DEPS.bzl

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5477,26 +5477,26 @@ def go_deps():
54775477
name = "com_github_onsi_ginkgo_v2",
54785478
build_file_proto_mode = "disable_global",
54795479
importpath = "github.com/onsi/ginkgo/v2",
5480-
sha256 = "f41e92baa52ec53d482603e4585c0906ca0c02e05004dca78a62bf1de88833ad",
5481-
strip_prefix = "github.com/onsi/ginkgo/v2@v2.9.4",
5480+
sha256 = "4865aab6c56b0d29a93cfe56206b586f1c9f36fde5a66e85650576344861b7cc",
5481+
strip_prefix = "github.com/onsi/ginkgo/v2@v2.13.0",
54825482
urls = [
5483-
"http://bazel-cache.pingcap.net:8080/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.9.4.zip",
5484-
"http://ats.apps.svc/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.9.4.zip",
5485-
"https://cache.hawkingrei.com/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.9.4.zip",
5486-
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.9.4.zip",
5483+
"http://bazel-cache.pingcap.net:8080/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.13.0.zip",
5484+
"http://ats.apps.svc/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.13.0.zip",
5485+
"https://cache.hawkingrei.com/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.13.0.zip",
5486+
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.13.0.zip",
54875487
],
54885488
)
54895489
go_repository(
54905490
name = "com_github_onsi_gomega",
54915491
build_file_proto_mode = "disable_global",
54925492
importpath = "github.com/onsi/gomega",
5493-
sha256 = "ea2b22782cc15569645dfdfc066a651e1335626677ad92d7ba4358a0885bf369",
5494-
strip_prefix = "github.com/onsi/gomega@v1.20.1",
5493+
sha256 = "923e8d0a1f95b3989f31c45142dee0b80a0aaa00cfa210bbd4d059f7046d12a8",
5494+
strip_prefix = "github.com/onsi/gomega@v1.29.0",
54955495
urls = [
5496-
"http://bazel-cache.pingcap.net:8080/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.20.1.zip",
5497-
"http://ats.apps.svc/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.20.1.zip",
5498-
"https://cache.hawkingrei.com/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.20.1.zip",
5499-
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.20.1.zip",
5496+
"http://bazel-cache.pingcap.net:8080/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.29.0.zip",
5497+
"http://ats.apps.svc/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.29.0.zip",
5498+
"https://cache.hawkingrei.com/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.29.0.zip",
5499+
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.29.0.zip",
55005500
],
55015501
)
55025502
go_repository(
@@ -9520,26 +9520,26 @@ def go_deps():
95209520
name = "io_k8s_api",
95219521
build_file_proto_mode = "disable_global",
95229522
importpath = "k8s.io/api",
9523-
sha256 = "2255428d2347df0b3a9cf6ac2791f5be6653b3c642359736e46733584d917335",
9524-
strip_prefix = "k8s.io/api@v0.28.6",
9523+
sha256 = "ae7b519f36431bc55fa56c47a51c1c37cf9e0df7e9d23741b3e839426d2627ff",
9524+
strip_prefix = "k8s.io/api@v0.29.11",
95259525
urls = [
9526-
"http://bazel-cache.pingcap.net:8080/gomod/k8s.io/api/io_k8s_api-v0.28.6.zip",
9527-
"http://ats.apps.svc/gomod/k8s.io/api/io_k8s_api-v0.28.6.zip",
9528-
"https://cache.hawkingrei.com/gomod/k8s.io/api/io_k8s_api-v0.28.6.zip",
9529-
"https://storage.googleapis.com/pingcapmirror/gomod/k8s.io/api/io_k8s_api-v0.28.6.zip",
9526+
"http://bazel-cache.pingcap.net:8080/gomod/k8s.io/api/io_k8s_api-v0.29.11.zip",
9527+
"http://ats.apps.svc/gomod/k8s.io/api/io_k8s_api-v0.29.11.zip",
9528+
"https://cache.hawkingrei.com/gomod/k8s.io/api/io_k8s_api-v0.29.11.zip",
9529+
"https://storage.googleapis.com/pingcapmirror/gomod/k8s.io/api/io_k8s_api-v0.29.11.zip",
95309530
],
95319531
)
95329532
go_repository(
95339533
name = "io_k8s_apimachinery",
95349534
build_file_proto_mode = "disable_global",
95359535
importpath = "k8s.io/apimachinery",
9536-
sha256 = "efc7e38cb4662d0b6c5648772e1ae92040a4d03af0a3a7731aedf17f8eab7359",
9537-
strip_prefix = "k8s.io/apimachinery@v0.28.6",
9536+
sha256 = "8dd5f53bf72f7bd6323bcc8f9f45823b30fc350daee4ab2d9e27cf1960d06b25",
9537+
strip_prefix = "k8s.io/apimachinery@v0.29.11",
95389538
urls = [
9539-
"http://bazel-cache.pingcap.net:8080/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.28.6.zip",
9540-
"http://ats.apps.svc/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.28.6.zip",
9541-
"https://cache.hawkingrei.com/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.28.6.zip",
9542-
"https://storage.googleapis.com/pingcapmirror/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.28.6.zip",
9539+
"http://bazel-cache.pingcap.net:8080/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.29.11.zip",
9540+
"http://ats.apps.svc/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.29.11.zip",
9541+
"https://cache.hawkingrei.com/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.29.11.zip",
9542+
"https://storage.googleapis.com/pingcapmirror/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.29.11.zip",
95439543
],
95449544
)
95459545
go_repository(

br/pkg/backup/store.go

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func (r ResponseAndStore) GetStoreID() uint64 {
6060

6161
// timeoutRecv cancel the context if `Refresh()` is not called within the specified time `timeout`.
6262
type timeoutRecv struct {
63+
storeID uint64
6364
wg sync.WaitGroup
6465
parentCtx context.Context
6566
cancel context.CancelCauseFunc
@@ -98,15 +99,17 @@ func (trecv *timeoutRecv) loop(timeout time.Duration) {
9899
return
99100
}
100101
case <-ticker.C:
101-
log.Warn("receive a backup response timeout")
102+
log.Warn("wait backup response timeout, cancel the backup",
103+
zap.Duration("timeout", timeout), zap.Uint64("storeID", trecv.storeID))
102104
trecv.cancel(errors.Errorf("receive a backup response timeout"))
103105
}
104106
}
105107
}
106108

107-
func StartTimeoutRecv(ctx context.Context, timeout time.Duration) (context.Context, *timeoutRecv) {
109+
func StartTimeoutRecv(ctx context.Context, timeout time.Duration, storeID uint64) (context.Context, *timeoutRecv) {
108110
cctx, cancel := context.WithCancelCause(ctx)
109111
trecv := &timeoutRecv{
112+
storeID: storeID,
110113
parentCtx: ctx,
111114
cancel: cancel,
112115
refresh: make(chan struct{}),
@@ -117,15 +120,11 @@ func StartTimeoutRecv(ctx context.Context, timeout time.Duration) (context.Conte
117120
}
118121

119122
func doSendBackup(
120-
pctx context.Context,
123+
ctx context.Context,
121124
client backuppb.BackupClient,
122125
req backuppb.BackupRequest,
123126
respFn func(*backuppb.BackupResponse) error,
124127
) error {
125-
// Backup might be stuck on GRPC `waitonHeader`, so start a timeout ticker to
126-
// terminate the backup if it does not receive any new response for a long time.
127-
ctx, timerecv := StartTimeoutRecv(pctx, TimeoutOneResponse)
128-
defer timerecv.Stop()
129128
failpoint.Inject("hint-backup-start", func(v failpoint.Value) {
130129
logutil.CL(ctx).Info("failpoint hint-backup-start injected, " +
131130
"process will notify the shell.")
@@ -170,7 +169,6 @@ func doSendBackup(
170169

171170
for {
172171
resp, err := bCli.Recv()
173-
timerecv.Refresh()
174172
if err != nil {
175173
if errors.Cause(err) == io.EOF { // nolint:errorlint
176174
logutil.CL(ctx).Debug("backup streaming finish",
@@ -193,7 +191,7 @@ func doSendBackup(
193191
}
194192

195193
func startBackup(
196-
ctx context.Context,
194+
pctx context.Context,
197195
storeID uint64,
198196
backupReq backuppb.BackupRequest,
199197
backupCli backuppb.BackupClient,
@@ -202,14 +200,21 @@ func startBackup(
202200
) error {
203201
// this goroutine handle the response from a single store
204202
select {
205-
case <-ctx.Done():
206-
return ctx.Err()
203+
case <-pctx.Done():
204+
return pctx.Err()
207205
default:
208-
logutil.CL(ctx).Info("try backup", zap.Uint64("storeID", storeID))
209206
// Send backup request to the store.
210207
// handle the backup response or internal error here.
211208
// handle the store error(reboot or network partition) outside.
212209
reqs := SplitBackupReqRanges(backupReq, concurrency)
210+
logutil.CL(pctx).Info("starting backup to the corresponding store", zap.Uint64("storeID", storeID),
211+
zap.Int("requestCount", len(reqs)), zap.Uint("concurrency", concurrency))
212+
213+
// Backup might be stuck on GRPC `waitonHeader`, so start a timeout ticker to
214+
// terminate the backup if it does not receive any new response for a long time.
215+
ctx, timerecv := StartTimeoutRecv(pctx, TimeoutOneResponse, storeID)
216+
defer timerecv.Stop()
217+
213218
pool := tidbutil.NewWorkerPool(concurrency, "store_backup")
214219
eg, ectx := errgroup.WithContext(ctx)
215220
for i, req := range reqs {
@@ -219,8 +224,10 @@ func startBackup(
219224
retry := -1
220225
return utils.WithRetry(ectx, func() error {
221226
retry += 1
222-
logutil.CL(ectx).Info("backup to store", zap.Uint64("storeID", storeID),
223-
zap.Int("retry", retry), zap.Int("reqIndex", reqIndex))
227+
if retry > 1 {
228+
logutil.CL(ectx).Info("retry backup to store", zap.Uint64("storeID", storeID),
229+
zap.Int("retry", retry), zap.Int("reqIndex", reqIndex))
230+
}
224231
return doSendBackup(ectx, backupCli, bkReq, func(resp *backuppb.BackupResponse) error {
225232
// Forward all responses (including error).
226233
failpoint.Inject("backup-timeout-error", func(val failpoint.Value) {
@@ -263,6 +270,8 @@ func startBackup(
263270
Resp: resp,
264271
StoreID: storeID,
265272
}:
273+
// reset timeout when receive a response
274+
timerecv.Refresh()
266275
}
267276
return nil
268277
})
@@ -326,7 +335,7 @@ func ObserveStoreChangesAsync(ctx context.Context, stateNotifier chan BackupRetr
326335
// reset the state
327336
sendAll = false
328337
clear(newJoinStoresMap)
329-
logutil.CL(ctx).Info("check store changes every tick")
338+
logutil.CL(ctx).Info("check store changes every 30s")
330339
err := watcher.Step(ctx)
331340
if err != nil {
332341
logutil.CL(ctx).Warn("failed to watch store changes, ignore it", zap.Error(err))

br/pkg/backup/store_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package backup
1616

1717
import (
1818
"context"
19-
"io"
2019
"testing"
2120
"time"
2221

@@ -58,41 +57,42 @@ func TestTimeoutRecv(t *testing.T) {
5857
TimeoutOneResponse = time.Millisecond * 800
5958
// Just Timeout Once
6059
{
61-
err := doSendBackup(ctx, &MockBackupClient{
60+
err := startBackup(ctx, 0, backuppb.BackupRequest{}, &MockBackupClient{
6261
recvFunc: func(ctx context.Context) (*backuppb.BackupResponse, error) {
6362
time.Sleep(time.Second)
6463
require.Error(t, ctx.Err())
65-
return nil, io.EOF
64+
return nil, ctx.Err()
6665
},
67-
}, backuppb.BackupRequest{}, func(br *backuppb.BackupResponse) error { return nil })
68-
require.NoError(t, err)
66+
}, 1, nil)
67+
require.Error(t, err)
6968
}
7069

7170
// Timeout Not At First
7271
{
7372
count := 0
74-
err := doSendBackup(ctx, &MockBackupClient{
73+
err := startBackup(ctx, 0, backuppb.BackupRequest{}, &MockBackupClient{
7574
recvFunc: func(ctx context.Context) (*backuppb.BackupResponse, error) {
7675
require.NoError(t, ctx.Err())
7776
if count == 15 {
7877
time.Sleep(time.Second)
7978
require.Error(t, ctx.Err())
80-
return nil, io.EOF
79+
return nil, ctx.Err()
8180
}
8281
count += 1
8382
time.Sleep(time.Millisecond * 80)
8483
return &backuppb.BackupResponse{}, nil
8584
},
86-
}, backuppb.BackupRequest{}, func(br *backuppb.BackupResponse) error { return nil })
87-
require.NoError(t, err)
85+
}, 1, make(chan *ResponseAndStore, 15))
86+
require.Error(t, err)
87+
require.Equal(t, count, 15)
8888
}
8989
}
9090

9191
func TestTimeoutRecvCancel(t *testing.T) {
9292
ctx := context.Background()
9393
cctx, cancel := context.WithCancel(ctx)
9494

95-
_, trecv := StartTimeoutRecv(cctx, time.Hour)
95+
_, trecv := StartTimeoutRecv(cctx, time.Hour, 0)
9696
cancel()
9797
trecv.wg.Wait()
9898
}
@@ -102,7 +102,7 @@ func TestTimeoutRecvCanceled(t *testing.T) {
102102
cctx, cancel := context.WithCancel(ctx)
103103
defer cancel()
104104

105-
tctx, trecv := StartTimeoutRecv(cctx, time.Hour)
105+
tctx, trecv := StartTimeoutRecv(cctx, time.Hour, 0)
106106
trecv.Stop()
107107
require.Equal(t, "context canceled", tctx.Err().Error())
108108
}

br/pkg/streamhelper/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ go_test(
6969
],
7070
flaky = True,
7171
race = "on",
72-
shard_count = 34,
72+
shard_count = 35,
7373
deps = [
7474
":streamhelper",
7575
"//br/pkg/errors",

br/pkg/streamhelper/advancer.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -425,8 +425,8 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
425425
c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0)))
426426
globalCheckpointTs, err := c.env.GetGlobalCheckpointForTask(ctx, e.Name)
427427
if err != nil {
428-
log.Error("failed to get global checkpoint, skipping.", logutil.ShortError(err))
429-
return err
428+
// ignore the error, just log it
429+
log.Warn("failed to get global checkpoint, skipping.", logutil.ShortError(err))
430430
}
431431
if globalCheckpointTs < c.task.StartTs {
432432
globalCheckpointTs = c.task.StartTs
@@ -567,13 +567,21 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro
567567
if c.cfg.CheckPointLagLimit <= 0 {
568568
return false, nil
569569
}
570+
globalTs, err := c.env.GetGlobalCheckpointForTask(ctx, c.task.Name)
571+
if err != nil {
572+
return false, err
573+
}
574+
if globalTs < c.task.StartTs {
575+
// unreachable.
576+
return false, nil
577+
}
570578

571579
now, err := c.env.FetchCurrentTS(ctx)
572580
if err != nil {
573581
return false, err
574582
}
575583

576-
lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(c.lastCheckpoint.TS))
584+
lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(globalTs))
577585
if lagDuration > c.cfg.CheckPointLagLimit {
578586
log.Warn("checkpoint lag is too large", zap.String("category", "log backup advancer"),
579587
zap.Stringer("lag", lagDuration))
@@ -591,7 +599,8 @@ func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
591599
}
592600
isLagged, err := c.isCheckpointLagged(ctx)
593601
if err != nil {
594-
return errors.Annotate(err, "failed to check timestamp")
602+
// ignore the error, just log it
603+
log.Warn("failed to check timestamp", logutil.ShortError(err))
595604
}
596605
if isLagged {
597606
err := c.env.PauseTask(ctx, c.task.Name)
@@ -656,7 +665,7 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {
656665
c.taskMu.Lock()
657666
defer c.taskMu.Unlock()
658667
if c.task == nil || c.isPaused.Load() {
659-
log.Debug("No tasks yet, skipping advancing.")
668+
log.Info("No tasks yet, skipping advancing.")
660669
return nil
661670
}
662671

0 commit comments

Comments
 (0)