Skip to content

Commit d16dd9d

Browse files
authored
lightning, ddl: set TS to engineMeta after ResetEngineSkipAllocTS (#57998) (#58034)
close #57980
1 parent 2f9739a commit d16dd9d

File tree

9 files changed

+87
-14
lines changed

9 files changed

+87
-14
lines changed

pkg/ddl/ingest/backend.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,10 @@ func (bc *litBackendCtx) Flush(ctx context.Context, mode FlushMode) (flushed, im
229229
newTS, err = mgr.refreshTSAndUpdateCP()
230230
if err == nil {
231231
for _, ei := range bc.engines {
232-
ei.openedEngine.SetTS(newTS)
232+
err = bc.backend.SetTSAfterResetEngine(ei.uuid, newTS)
233+
if err != nil {
234+
return false, false, err
235+
}
233236
}
234237
}
235238
}

pkg/lightning/backend/backend.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,10 @@ type EngineConfig struct {
9898
// when opening the engine, instead of removing it.
9999
KeepSortDir bool
100100
// TS is the preset timestamp of data in the engine. When it's 0, the used TS
101-
// will be set lazily.
101+
// will be set lazily. This is used by local backend. This field will be written
102+
// to engineMeta.TS and take effect in below cases:
103+
// - engineManager.openEngine
104+
// - engineManager.closeEngine only for an external engine
102105
TS uint64
103106
}
104107

@@ -315,13 +318,6 @@ func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterCon
315318
return engine.backend.LocalWriter(ctx, cfg, engine.uuid)
316319
}
317320

318-
// SetTS sets the TS of the engine. In most cases if the caller wants to specify
319-
// TS it should use the TS field in EngineConfig. This method is only used after
320-
// a ResetEngine.
321-
func (engine *OpenedEngine) SetTS(ts uint64) {
322-
engine.config.TS = ts
323-
}
324-
325321
// UnsafeCloseEngine closes the engine without first opening it.
326322
// This method is "unsafe" as it does not follow the normal operation sequence
327323
// (Open -> Write -> Close -> Import). This method should only be used when one

pkg/lightning/backend/local/engine_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"path/filepath"
2424
"sync"
2525
"testing"
26+
"time"
2627

2728
"github.com/cockroachdb/pebble"
2829
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
@@ -34,6 +35,7 @@ import (
3435
"github.com/pingcap/tidb/pkg/lightning/common"
3536
"github.com/pingcap/tidb/pkg/lightning/log"
3637
"github.com/stretchr/testify/require"
38+
"github.com/tikv/client-go/v2/oracle"
3739
)
3840

3941
func makePebbleDB(t *testing.T, opt *pebble.Options) (*pebble.DB, string) {
@@ -68,6 +70,7 @@ func TestGetEngineSizeWhenImport(t *testing.T) {
6870
keyAdapter: common.NoopKeyAdapter{},
6971
logger: log.L(),
7072
}
73+
f.TS = oracle.GoTimeToTS(time.Now())
7174
f.db.Store(db)
7275
// simulate import
7376
f.lock(importMutexStateImport)
@@ -106,6 +109,7 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
106109
keyAdapter: common.NoopKeyAdapter{},
107110
logger: log.L(),
108111
}
112+
f.TS = oracle.GoTimeToTS(time.Now())
109113
f.db.Store(db)
110114
f.sstIngester = dbSSTIngester{e: f}
111115
sstPath := path.Join(tmpPath, uuid.New().String()+".sst")
@@ -142,6 +146,7 @@ func TestGetFirstAndLastKey(t *testing.T) {
142146
f := &Engine{
143147
sstDir: tmpPath,
144148
}
149+
f.TS = oracle.GoTimeToTS(time.Now())
145150
f.db.Store(db)
146151
err := db.Set([]byte("a"), []byte("a"), nil)
147152
require.NoError(t, err)
@@ -184,6 +189,7 @@ func TestIterOutputHasUniqueMemorySpace(t *testing.T) {
184189
f := &Engine{
185190
sstDir: tmpPath,
186191
}
192+
f.TS = oracle.GoTimeToTS(time.Now())
187193
f.db.Store(db)
188194
err := db.Set([]byte("a"), []byte("a"), nil)
189195
require.NoError(t, err)

pkg/lightning/backend/local/local.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1583,12 +1583,25 @@ func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) err
15831583
}
15841584

15851585
// ResetEngineSkipAllocTS is like ResetEngine but the inner TS of the engine is
1586-
// invalid. Caller must use OpenedEngine.SetTS to set a valid TS before import
1586+
// invalid. Caller must use SetTSAfterResetEngine to set a valid TS before import
15871587
// the engine.
15881588
func (local *Backend) ResetEngineSkipAllocTS(ctx context.Context, engineUUID uuid.UUID) error {
15891589
return local.engineMgr.resetEngine(ctx, engineUUID, true)
15901590
}
15911591

1592+
// SetTSAfterResetEngine allocates a new TS for the engine after it's reset.
1593+
// This is typically called after persisting the chosen TS of the engine to make
1594+
// sure TS is not changed after task failover.
1595+
func (local *Backend) SetTSAfterResetEngine(engineUUID uuid.UUID, ts uint64) error {
1596+
e := local.engineMgr.lockEngine(engineUUID, importMutexStateClose)
1597+
if e == nil {
1598+
return errors.Errorf("engine %s not found in SetTSAfterResetEngine", engineUUID.String())
1599+
}
1600+
defer e.unlock()
1601+
e.engineMeta.TS = ts
1602+
return e.saveEngineMeta()
1603+
}
1604+
15921605
// CleanupEngine cleanup the engine and reclaim the space.
15931606
func (local *Backend) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error {
15941607
return local.engineMgr.cleanupEngine(ctx, engineUUID)

pkg/lightning/backend/local/local_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import (
6060
"github.com/pingcap/tidb/pkg/util/hack"
6161
"github.com/pingcap/tidb/pkg/util/mathutil"
6262
"github.com/stretchr/testify/require"
63+
"github.com/tikv/client-go/v2/oracle"
6364
"github.com/tikv/client-go/v2/tikv"
6465
pd "github.com/tikv/pd/client"
6566
"github.com/tikv/pd/client/http"
@@ -353,6 +354,7 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) {
353354
keyAdapter: common.NoopKeyAdapter{},
354355
logger: log.L(),
355356
}
357+
f.TS = oracle.GoTimeToTS(time.Now())
356358
f.db.Store(db)
357359
f.sstIngester = dbSSTIngester{e: f}
358360
f.wg.Add(1)
@@ -587,6 +589,7 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) {
587589
},
588590
logger: log.L(),
589591
}
592+
f.TS = oracle.GoTimeToTS(time.Now())
590593
f.db.Store(db)
591594

592595
createSSTWriter := func() (*sstWriter, error) {
@@ -1176,7 +1179,7 @@ func (m mockIngestData) NewIter(_ context.Context, lowerBound, upperBound []byte
11761179
return &mockIngestIter{data: m, startIdx: i, endIdx: j, curIdx: i}
11771180
}
11781181

1179-
func (m mockIngestData) GetTS() uint64 { return 0 }
1182+
func (m mockIngestData) GetTS() uint64 { return oracle.GoTimeToTS(time.Now()) }
11801183

11811184
func (m mockIngestData) IncRef() {}
11821185

@@ -1565,6 +1568,7 @@ func TestPartialWriteIngestBusy(t *testing.T) {
15651568
keyAdapter: common.NoopKeyAdapter{},
15661569
logger: log.L(),
15671570
}
1571+
f.TS = oracle.GoTimeToTS(time.Now())
15681572
f.db.Store(db)
15691573
err = db.Set([]byte("a"), []byte("a"), nil)
15701574
require.NoError(t, err)
@@ -1708,6 +1712,7 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) {
17081712
regionSplitKeysCache: [][]byte{{1}, {11}},
17091713
regionSplitSize: 1 << 30,
17101714
}
1715+
f.TS = oracle.GoTimeToTS(time.Now())
17111716
f.db.Store(db)
17121717
// keys starts with 0 is meta keys, so we start with 1.
17131718
for i := byte(1); i <= 10; i++ {

pkg/lightning/backend/local/region_job.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
util2 "github.com/pingcap/tidb/pkg/util"
4444
"github.com/pingcap/tidb/pkg/util/codec"
4545
"github.com/pingcap/tidb/pkg/util/intest"
46+
"github.com/tikv/client-go/v2/oracle"
4647
"github.com/tikv/client-go/v2/util"
4748
"go.uber.org/zap"
4849
"google.golang.org/grpc"
@@ -431,6 +432,20 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error {
431432
allPeers = append(allPeers, peer)
432433
}
433434
dataCommitTS := j.ingestData.GetTS()
435+
intest.AssertFunc(func() bool {
436+
timeOfTS := oracle.GetTimeFromTS(dataCommitTS)
437+
now := time.Now()
438+
if timeOfTS.After(now) {
439+
return false
440+
}
441+
if now.Sub(timeOfTS) > 24*time.Hour {
442+
return false
443+
}
444+
return true
445+
}, "TS used in import should in [now-1d, now], but got %d", dataCommitTS)
446+
if dataCommitTS == 0 {
447+
return errors.New("data commitTS is 0")
448+
}
434449
req.Chunk = &sst.WriteRequest_Batch{
435450
Batch: &sst.WriteBatch{
436451
CommitTs: dataCommitTS,

tests/realtikvtest/addindextest2/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,6 @@ go_test(
2626
"@com_github_phayes_freeport//:freeport",
2727
"@com_github_pingcap_failpoint//:failpoint",
2828
"@com_github_stretchr_testify//require",
29+
"@com_github_tikv_client_go_v2//oracle",
2930
],
3031
)

tests/realtikvtest/addindextest2/global_sort_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"strconv"
2121
"strings"
2222
"testing"
23+
"time"
2324

2425
"github.com/fsouza/fake-gcs-server/fakestorage"
2526
"github.com/phayes/freeport"
@@ -38,6 +39,7 @@ import (
3839
"github.com/pingcap/tidb/pkg/types"
3940
"github.com/pingcap/tidb/tests/realtikvtest"
4041
"github.com/stretchr/testify/require"
42+
"github.com/tikv/client-go/v2/oracle"
4143
)
4244

4345
func init() {
@@ -318,7 +320,11 @@ func TestIngestUseGivenTS(t *testing.T) {
318320
t.Cleanup(func() {
319321
tk.MustExec("set @@global.tidb_cloud_storage_uri = '';")
320322
})
321-
err = failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockTSForGlobalSort", `return(123456789)`)
323+
324+
presetTS := oracle.GoTimeToTS(time.Now())
325+
failpointTerm := fmt.Sprintf(`return(%d)`, presetTS)
326+
327+
err = failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockTSForGlobalSort", failpointTerm)
322328
require.NoError(t, err)
323329

324330
tk.MustExec("create table t (a int);")
@@ -334,10 +340,10 @@ func TestIngestUseGivenTS(t *testing.T) {
334340
require.NoError(t, err)
335341
tikvStore := dom.Store().(helper.Storage)
336342
newHelper := helper.NewHelper(tikvStore)
337-
mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(idxKey, 123456789)
343+
mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(idxKey, presetTS)
338344
require.NoError(t, err)
339345
require.NotNil(t, mvccResp)
340346
require.NotNil(t, mvccResp.Info)
341347
require.Greater(t, len(mvccResp.Info.Writes), 0)
342-
require.Equal(t, uint64(123456789), mvccResp.Info.Writes[0].CommitTs)
348+
require.Equal(t, presetTS, mvccResp.Info.Writes[0].CommitTs)
343349
}

tests/realtikvtest/addindextest3/ingest_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,34 @@ func TestAddIndexMockFlushError(t *testing.T) {
448448
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
449449
}
450450

451+
func TestAddIndexDiskQuotaTS(t *testing.T) {
452+
store := realtikvtest.CreateMockStoreAndSetup(t)
453+
tk := testkit.NewTestKit(t, store)
454+
455+
tk.MustExec("set @@global.tidb_enable_dist_task = 0;")
456+
testAddIndexDiskQuotaTS(t, tk)
457+
tk.MustExec("set @@global.tidb_enable_dist_task = 1;")
458+
testAddIndexDiskQuotaTS(t, tk)
459+
}
460+
461+
func testAddIndexDiskQuotaTS(t *testing.T, tk *testkit.TestKit) {
462+
tk.MustExec("drop database if exists addindexlit;")
463+
tk.MustExec("create database addindexlit;")
464+
tk.MustExec("use addindexlit;")
465+
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
466+
tk.MustExec("set @@tidb_ddl_reorg_worker_cnt=1;")
467+
468+
tk.MustExec("create table t(id int primary key, b int, k int);")
469+
tk.MustQuery("split table t by (30000);").Check(testkit.Rows("1 1"))
470+
tk.MustExec("insert into t values(1, 1, 1);")
471+
tk.MustExec("insert into t values(100000, 1, 1);")
472+
473+
ingest.ForceSyncFlagForTest = true
474+
tk.MustExec("alter table t add index idx_test(b);")
475+
ingest.ForceSyncFlagForTest = false
476+
tk.MustExec("update t set b = b + 1;")
477+
}
478+
451479
func TestAddIndexRemoteDuplicateCheck(t *testing.T) {
452480
store := realtikvtest.CreateMockStoreAndSetup(t)
453481
tk := testkit.NewTestKit(t, store)

0 commit comments

Comments
 (0)