Skip to content

Commit d2cfbde

Browse files
authored
ddl,lightning: fix ingest data unexpectedly using zero commit ts (#48797)
close #48804
1 parent 3ed7732 commit d2cfbde

File tree

7 files changed

+104
-8
lines changed

7 files changed

+104
-8
lines changed

br/pkg/lightning/backend/local/local.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1761,6 +1761,12 @@ func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) err
17611761
if err = local.allocateTSIfNotExists(ctx, localEngine); err != nil {
17621762
return errors.Trace(err)
17631763
}
1764+
failpoint.Inject("mockAllocateTSErr", func() {
1765+
// mock generate timestamp error when reset engine.
1766+
localEngine.TS = 0
1767+
mockGRPCErr, _ := status.FromError(errors.Errorf("mock generate timestamp error"))
1768+
failpoint.Return(errors.Trace(mockGRPCErr.Err()))
1769+
})
17641770
}
17651771
localEngine.pendingFileSize.Store(0)
17661772

br/pkg/lightning/backend/local/region_job.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,9 +318,10 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error {
318318
clients = append(clients, wstream)
319319
allPeers = append(allPeers, peer)
320320
}
321+
dataCommitTS := j.ingestData.GetTS()
321322
req.Chunk = &sst.WriteRequest_Batch{
322323
Batch: &sst.WriteBatch{
323-
CommitTs: j.ingestData.GetTS(),
324+
CommitTs: dataCommitTS,
324325
},
325326
}
326327

@@ -407,7 +408,8 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error {
407408
logutil.Key("endKey", j.keyRange.End),
408409
logutil.Key("remainStart", remainingStartKey),
409410
logutil.Region(region),
410-
logutil.Leader(j.region.Leader))
411+
logutil.Leader(j.region.Leader),
412+
zap.Uint64("commitTS", dataCommitTS))
411413
}
412414
break
413415
}

pkg/ddl/ingest/backend.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"time"
2121

2222
"github.com/pingcap/failpoint"
23+
"github.com/pingcap/tidb/br/pkg/lightning/backend"
2324
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
2425
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
2526
lightning "github.com/pingcap/tidb/br/pkg/lightning/config"
@@ -214,18 +215,45 @@ func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported
214215
}
215216
}()
216217
}
217-
218-
logutil.Logger(bc.ctx).Info(LitInfoUnsafeImport, zap.Int64("index ID", indexID),
219-
zap.String("usage info", bc.diskRoot.UsageInfo()))
220-
err = bc.backend.UnsafeImportAndReset(bc.ctx, ei.uuid, int64(lightning.SplitRegionSize)*int64(lightning.MaxSplitRegionSizeRatio), int64(lightning.SplitRegionKeys))
218+
err = bc.unsafeImportAndReset(ei)
221219
if err != nil {
222-
logutil.Logger(bc.ctx).Error(LitErrIngestDataErr, zap.Int64("index ID", indexID),
223-
zap.String("usage info", bc.diskRoot.UsageInfo()))
224220
return true, false, err
225221
}
226222
return true, true, nil
227223
}
228224

225+
func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error {
226+
logutil.Logger(bc.ctx).Info(LitInfoUnsafeImport, zap.Int64("index ID", ei.indexID),
227+
zap.String("usage info", bc.diskRoot.UsageInfo()))
228+
logger := log.FromContext(bc.ctx).With(
229+
zap.Stringer("engineUUID", ei.uuid),
230+
)
231+
232+
ei.closedEngine = backend.NewClosedEngine(bc.backend, logger, ei.uuid, 0)
233+
234+
regionSplitSize := int64(lightning.SplitRegionSize) * int64(lightning.MaxSplitRegionSizeRatio)
235+
regionSplitKeys := int64(lightning.SplitRegionKeys)
236+
if err := ei.closedEngine.Import(bc.ctx, regionSplitSize, regionSplitKeys); err != nil {
237+
logutil.Logger(bc.ctx).Error(LitErrIngestDataErr, zap.Int64("index ID", ei.indexID),
238+
zap.String("usage info", bc.diskRoot.UsageInfo()))
239+
return err
240+
}
241+
242+
err := bc.backend.ResetEngine(bc.ctx, ei.uuid)
243+
if err != nil {
244+
logutil.Logger(bc.ctx).Error(LitErrResetEngineFail, zap.Int64("index ID", ei.indexID))
245+
err1 := ei.closedEngine.Cleanup(bc.ctx)
246+
if err1 != nil {
247+
logutil.Logger(ei.ctx).Error(LitErrCleanEngineErr, zap.Error(err1),
248+
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
249+
}
250+
ei.openedEngine = nil
251+
ei.closedEngine = nil
252+
return err
253+
}
254+
return nil
255+
}
256+
229257
// ForceSyncFlagForTest is a flag to force sync only for test.
230258
var ForceSyncFlagForTest = false
231259

pkg/ddl/ingest/message.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ const (
4242
LitErrCloseWriterErr string = "close writer error"
4343
LitErrReadSortPath string = "cannot read sort path"
4444
LitErrCleanSortPath string = "cannot cleanup sort path"
45+
LitErrResetEngineFail string = "reset engine failed"
4546
LitWarnEnvInitFail string = "initialize environment failed"
4647
LitWarnConfigError string = "build config for backend failed"
4748
LitInfoEnvInitSucc string = "init global ingest backend environment finished"

tests/realtikvtest/addindextest1/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,12 @@ go_test(
1515
"//pkg/disttask/framework/dispatcher",
1616
"//pkg/disttask/framework/proto",
1717
"//pkg/errno",
18+
"//pkg/kv",
1819
"//pkg/parser/model",
20+
"//pkg/store/helper",
21+
"//pkg/tablecodec",
1922
"//pkg/testkit",
23+
"//pkg/types",
2024
"//tests/realtikvtest",
2125
"@com_github_pingcap_failpoint//:failpoint",
2226
"@com_github_stretchr_testify//require",

tests/realtikvtest/addindextest1/disttask_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,12 @@ import (
2424
"github.com/pingcap/tidb/pkg/disttask/framework/dispatcher"
2525
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
2626
"github.com/pingcap/tidb/pkg/errno"
27+
"github.com/pingcap/tidb/pkg/kv"
2728
"github.com/pingcap/tidb/pkg/parser/model"
29+
"github.com/pingcap/tidb/pkg/store/helper"
30+
"github.com/pingcap/tidb/pkg/tablecodec"
2831
"github.com/pingcap/tidb/pkg/testkit"
32+
"github.com/pingcap/tidb/pkg/types"
2933
"github.com/pingcap/tidb/tests/realtikvtest"
3034
"github.com/stretchr/testify/require"
3135
)
@@ -229,3 +233,53 @@ func TestAddIndexForCurrentTimestampColumn(t *testing.T) {
229233
tk.MustExec("alter table t add index idx(a);")
230234
tk.MustExec("admin check table t;")
231235
}
236+
237+
func TestAddIndexTSErrorWhenResetImportEngine(t *testing.T) {
238+
store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t)
239+
var tblInfo *model.TableInfo
240+
var idxInfo *model.IndexInfo
241+
cb := &callback.TestDDLCallback{}
242+
interceptFn := func(job *model.Job) {
243+
if idxInfo == nil {
244+
tbl, _ := dom.InfoSchema().TableByID(job.TableID)
245+
tblInfo = tbl.Meta()
246+
if len(tblInfo.Indices) == 0 {
247+
return
248+
}
249+
idxInfo = tblInfo.Indices[0]
250+
}
251+
}
252+
cb.OnJobUpdatedExported.Store(&interceptFn)
253+
tk := testkit.NewTestKit(t, store)
254+
tk.MustExec("drop database if exists addindexlit;")
255+
tk.MustExec("create database addindexlit;")
256+
tk.MustExec("use addindexlit;")
257+
t.Cleanup(func() {
258+
tk.MustExec("set global tidb_enable_dist_task = off;")
259+
})
260+
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
261+
tk.MustExec("set global tidb_enable_dist_task = on;")
262+
263+
err := failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockAllocateTSErr", `1*return`)
264+
require.NoError(t, err)
265+
tk.MustExec("create table t (a int);")
266+
tk.MustExec("insert into t values (1), (2), (3);")
267+
dom.DDL().SetHook(cb)
268+
tk.MustExec("alter table t add index idx(a);")
269+
err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockAllocateTSErr")
270+
require.NoError(t, err)
271+
272+
dts := []types.Datum{types.NewIntDatum(1)}
273+
sctx := tk.Session().GetSessionVars().StmtCtx
274+
idxKey, _, err := tablecodec.GenIndexKey(sctx, tblInfo, idxInfo, tblInfo.ID, dts, kv.IntHandle(1), nil)
275+
require.NoError(t, err)
276+
277+
tikvStore := dom.Store().(helper.Storage)
278+
newHelper := helper.NewHelper(tikvStore)
279+
mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(idxKey, 0)
280+
require.NoError(t, err)
281+
require.NotNil(t, mvccResp)
282+
require.NotNil(t, mvccResp.Info)
283+
require.Greater(t, len(mvccResp.Info.Writes), 0)
284+
require.Greater(t, mvccResp.Info.Writes[0].CommitTs, uint64(0))
285+
}

tests/realtikvtest/addindextest4/ingest_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,7 @@ func TestAddIndexRemoteDuplicateCheck(t *testing.T) {
399399
tk.MustExec("use addindexlit;")
400400
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
401401
tk.MustExec("set global tidb_ddl_reorg_worker_cnt=1;")
402+
tk.MustExec("set global tidb_enable_dist_task = 0;")
402403

403404
tk.MustExec("create table t(id int primary key, b int, k int);")
404405
tk.MustQuery("split table t by (30000);").Check(testkit.Rows("1 1"))

0 commit comments

Comments
 (0)