Skip to content

Commit 926a1e5

Browse files
authored
lightning: adapt new behaviour that "write" may return epoch error (#47667)
close #47694
1 parent 99a4f35 commit 926a1e5

File tree

4 files changed

+79
-10
lines changed

4 files changed

+79
-10
lines changed

br/pkg/lightning/backend/local/local.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1434,7 +1434,6 @@ func (local *Backend) executeJob(
14341434
// if it's retryable error, we retry from scanning region
14351435
log.FromContext(ctx).Warn("meet retryable error when writing to TiKV",
14361436
log.ShortError(err), zap.Stringer("job stage", job.stage))
1437-
job.convertStageTo(needRescan)
14381437
job.lastRetryableErr = err
14391438
return nil
14401439
}

br/pkg/lightning/backend/local/region_job.go

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"container/heap"
1919
"context"
2020
"fmt"
21+
"io"
2122
"strings"
2223
"sync"
2324
"time"
@@ -179,12 +180,29 @@ func (j *regionJob) done(wg *sync.WaitGroup) {
179180
}
180181

181182
// writeToTiKV writes the data to TiKV and mark this job as wrote stage.
182-
// if any write logic has error, writeToTiKV will set job to a proper stage and return nil. TODO: <-check this
183+
// if any write logic has error, writeToTiKV will set job to a proper stage and return nil.
183184
// if any underlying logic has error, writeToTiKV will return an error.
184185
// we don't need to do cleanup for the pairs written to tikv if encounters an error,
185186
// tikv will take the responsibility to do so.
186187
// TODO: let client-go provide a high-level write interface.
187188
func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
189+
err := local.doWrite(ctx, j)
190+
if err == nil {
191+
return nil
192+
}
193+
if !common.IsRetryableError(err) {
194+
return err
195+
}
196+
// currently only one case will restart write
197+
if strings.Contains(err.Error(), "RequestTooNew") {
198+
j.convertStageTo(regionScanned)
199+
return err
200+
}
201+
j.convertStageTo(needRescan)
202+
return err
203+
}
204+
205+
func (local *Backend) doWrite(ctx context.Context, j *regionJob) error {
188206
if j.stage != regionScanned {
189207
return nil
190208
}
@@ -238,9 +256,25 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
238256
ApiVersion: apiVersion,
239257
}
240258

241-
annotateErr := func(in error, peer *metapb.Peer) error {
259+
failpoint.Inject("changeEpochVersion", func(val failpoint.Value) {
260+
cloned := *meta.RegionEpoch
261+
meta.RegionEpoch = &cloned
262+
i := val.(int)
263+
if i >= 0 {
264+
meta.RegionEpoch.Version += uint64(i)
265+
} else {
266+
meta.RegionEpoch.ConfVer -= uint64(-i)
267+
}
268+
})
269+
270+
annotateErr := func(in error, peer *metapb.Peer, msg string) error {
242271
// annotate the error with peer/store/region info to help debug.
243-
return errors.Annotatef(in, "peer %d, store %d, region %d, epoch %s", peer.Id, peer.StoreId, region.Id, region.RegionEpoch.String())
272+
return errors.Annotatef(
273+
in,
274+
"peer %d, store %d, region %d, epoch %s, %s",
275+
peer.Id, peer.StoreId, region.Id, region.RegionEpoch.String(),
276+
msg,
277+
)
244278
}
245279

246280
leaderID := j.region.Leader.GetId()
@@ -260,17 +294,17 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
260294
for _, peer := range region.GetPeers() {
261295
cli, err := clientFactory.Create(ctx, peer.StoreId)
262296
if err != nil {
263-
return annotateErr(err, peer)
297+
return annotateErr(err, peer, "when create client")
264298
}
265299

266300
wstream, err := cli.Write(ctx)
267301
if err != nil {
268-
return annotateErr(err, peer)
302+
return annotateErr(err, peer, "when open write stream")
269303
}
270304

271305
// Bind uuid for this write request
272306
if err = wstream.Send(req); err != nil {
273-
return annotateErr(err, peer)
307+
return annotateErr(err, peer, "when send meta")
274308
}
275309
clients = append(clients, wstream)
276310
allPeers = append(allPeers, peer)
@@ -309,7 +343,12 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
309343
return errors.Trace(err)
310344
}
311345
if err := clients[i].SendMsg(preparedMsg); err != nil {
312-
return annotateErr(err, allPeers[i])
346+
if err == io.EOF {
347+
// if it's EOF, need RecvMsg to get the error
348+
dummy := &sst.WriteResponse{}
349+
err = clients[i].RecvMsg(dummy)
350+
}
351+
return annotateErr(err, allPeers[i], "when send data")
313352
}
314353
}
315354
failpoint.Inject("afterFlushKVs", func() {
@@ -383,10 +422,10 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
383422
for i, wStream := range clients {
384423
resp, closeErr := wStream.CloseAndRecv()
385424
if closeErr != nil {
386-
return annotateErr(closeErr, allPeers[i])
425+
return annotateErr(closeErr, allPeers[i], "when close write stream")
387426
}
388427
if resp.Error != nil {
389-
return annotateErr(errors.New(resp.Error.Message), allPeers[i])
428+
return annotateErr(errors.New("resp error: "+resp.Error.Message), allPeers[i], "when close write stream")
390429
}
391430
if leaderID == region.Peers[i].GetId() {
392431
leaderPeerMetas = resp.Metas

br/pkg/lightning/common/retry.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ func isSingleRetryableError(err error) bool {
140140
// 2. in write TiKV: rpc error: code = Unknown desc = EngineTraits(Engine(Status { code: IoError, sub_code:
141141
// None, sev: NoError, state: \"IO error: No such file or directory: while stat a file for size:
142142
// /...../63992d9c-fbc8-4708-b963-32495b299027_32279707_325_5280_write.sst: No such file or directory\"
143+
// 3. in write TiKV: rpc error: code = Unknown desc = Engine("request region 26 is staler than local region,
144+
// local epoch conf_ver: 5 version: 65, request epoch conf_ver: 5 version: 64, please rescan region later")
143145
return true
144146
default:
145147
return false

br/tests/lightning_max_random/run.sh

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ cleanup() {
4444

4545
cleanup
4646

47+
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/backend/local/changeEpochVersion=1*return(-1)"
48+
4749
# auto_random_max = 2^{64-1-10}-1
4850
# db.test contains key auto_random_max - 1
4951
# db.test1 contains key auto_random_max
@@ -63,4 +65,31 @@ check_contains 'ERROR'
6365
run_sql 'INSERT INTO db.test2(b) VALUES(33);'
6466
run_sql 'INSERT INTO db.test2(b) VALUES(44);'
6567
run_sql 'INSERT INTO db.test2(b) VALUES(55);'
68+
69+
grep 'RequestTooOld' "$TEST_DIR/lightning.log" | grep -q 'needRescan'
70+
cleanup
71+
72+
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/backend/local/changeEpochVersion=1*return(10)"
73+
74+
# auto_random_max = 2^{64-1-10}-1
75+
# db.test contains key auto_random_max - 1
76+
# db.test1 contains key auto_random_max
77+
# db.test2 contains key auto_random_max + 1 (overflow)
78+
run_lightning --sorted-kv-dir "$TEST_DIR/sst" --config "$CUR/config.toml" --log-file "$TEST_DIR/lightning.log"
79+
check_result
80+
# successfully insert: d.test auto_random key has not reached maximum
81+
run_sql 'INSERT INTO db.test(b) VALUES(11);'
82+
# fail for further insertion
83+
run_sql 'INSERT INTO db.test(b) VALUES(22);' 2>&1 | tee -a "$TEST_DIR/sql_res.$TEST_NAME.txt"
84+
check_contains 'ERROR'
85+
# fail: db.test1 has key auto_random_max
86+
run_sql 'INSERT INTO db.test1(b) VALUES(11);'
87+
run_sql 'INSERT INTO db.test1(b) VALUES(22);' 2>&1 | tee -a "$TEST_DIR/sql_res.$TEST_NAME.txt"
88+
check_contains 'ERROR'
89+
# successfully insert for overflow key
90+
run_sql 'INSERT INTO db.test2(b) VALUES(33);'
91+
run_sql 'INSERT INTO db.test2(b) VALUES(44);'
92+
run_sql 'INSERT INTO db.test2(b) VALUES(55);'
93+
94+
grep 'RequestTooNew' "$TEST_DIR/lightning.log" | grep -q 'regionScanned'
6695
cleanup

0 commit comments

Comments
 (0)