Skip to content

Commit b427e33

Browse files
authored
meta: separate reader and mutator (#56376)
ref #54436
1 parent c56694c commit b427e33

File tree

86 files changed

+652
-623
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+652
-623
lines changed

br/pkg/backup/client.go

Lines changed: 5 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -725,7 +725,7 @@ func BuildBackupRangeAndInitSchema(
725725
buildRange bool,
726726
) ([]rtree.Range, *Schemas, []*backuppb.PlacementPolicy, error) {
727727
snapshot := storage.GetSnapshot(kv.NewVersion(backupTS))
728-
m := meta.NewSnapshotMeta(snapshot)
728+
m := meta.NewReader(snapshot)
729729

730730
var policies []*backuppb.PlacementPolicy
731731
if isFullBackup {
@@ -821,7 +821,7 @@ func BuildBackupSchemas(
821821
fn func(dbInfo *model.DBInfo, tableInfo *model.TableInfo),
822822
) error {
823823
snapshot := storage.GetSnapshot(kv.NewVersion(backupTS))
824-
m := meta.NewSnapshotMeta(snapshot)
824+
m := meta.NewReader(snapshot)
825825

826826
dbs, err := m.ListDatabases()
827827
if err != nil {
@@ -936,37 +936,6 @@ func BuildBackupSchemas(
936936
return nil
937937
}
938938

939-
// BuildFullSchema builds a full backup schemas for databases and tables.
940-
func BuildFullSchema(storage kv.Storage, backupTS uint64, fn func(dbInfo *model.DBInfo, tableInfo *model.TableInfo)) error {
941-
snapshot := storage.GetSnapshot(kv.NewVersion(backupTS))
942-
m := meta.NewSnapshotMeta(snapshot)
943-
944-
dbs, err := m.ListDatabases()
945-
if err != nil {
946-
return errors.Trace(err)
947-
}
948-
949-
for _, db := range dbs {
950-
hasTable := false
951-
err = m.IterTables(db.ID, func(table *model.TableInfo) error {
952-
// add table
953-
fn(db, table)
954-
hasTable = true
955-
return nil
956-
})
957-
if err != nil {
958-
return errors.Trace(err)
959-
}
960-
961-
// backup this empty db if this schema is empty.
962-
if !hasTable {
963-
fn(db, nil)
964-
}
965-
}
966-
967-
return nil
968-
}
969-
970939
func skipUnsupportedDDLJob(job *model.Job) bool {
971940
switch job.Type {
972941
// TiDB V5.3.0 supports TableAttributes and TablePartitionAttributes.
@@ -988,9 +957,9 @@ func skipUnsupportedDDLJob(job *model.Job) bool {
988957
// WriteBackupDDLJobs sends the ddl jobs are done in (lastBackupTS, backupTS] to metaWriter.
989958
func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.Storage, lastBackupTS, backupTS uint64, needDomain bool) error {
990959
snapshot := store.GetSnapshot(kv.NewVersion(backupTS))
991-
snapMeta := meta.NewSnapshotMeta(snapshot)
960+
snapMeta := meta.NewReader(snapshot)
992961
lastSnapshot := store.GetSnapshot(kv.NewVersion(lastBackupTS))
993-
lastSnapMeta := meta.NewSnapshotMeta(lastSnapshot)
962+
lastSnapMeta := meta.NewReader(lastSnapshot)
994963
lastSchemaVersion, err := lastSnapMeta.GetSchemaVersionWithNonEmptyDiff()
995964
if err != nil {
996965
return errors.Trace(err)
@@ -1033,7 +1002,7 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S
10331002
return appendJobs, false
10341003
}
10351004

1036-
newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver)))
1005+
newestMeta := meta.NewReader(store.GetSnapshot(kv.NewVersion(version.Ver)))
10371006
var allJobs []*model.Job
10381007
err = g.UseOneShotSession(store, !needDomain, func(se glue.Session) error {
10391008
allJobs, err = ddl.GetAllDDLJobs(context.Background(), se.GetSessionCtx())

br/pkg/restore/ingestrec/ingest_recorder_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,11 @@ func hasOneItem(idxID int64, columnList string, columnArgs []any) (iterateFunc,
100100
}, &count
101101
}
102102

103-
func createMeta(t *testing.T, store kv.Storage, fn func(m *meta.Meta)) {
103+
func createMeta(t *testing.T, store kv.Storage, fn func(m *meta.Mutator)) {
104104
txn, err := store.Begin()
105105
require.NoError(t, err)
106106

107-
fn(meta.NewMeta(txn))
107+
fn(meta.NewMutator(txn))
108108

109109
err = txn.Commit(context.Background())
110110
require.NoError(t, err)
@@ -117,7 +117,7 @@ func TestAddIngestRecorder(t *testing.T) {
117117
require.NoError(t, store.Close())
118118
}()
119119

120-
createMeta(t, store, func(m *meta.Meta) {
120+
createMeta(t, store, func(m *meta.Mutator) {
121121
dbInfo := &model.DBInfo{
122122
ID: 1,
123123
Name: pmodel.NewCIStr(SchemaName),
@@ -300,7 +300,7 @@ func TestIndexesKind(t *testing.T) {
300300
require.NoError(t, err)
301301
_, err := se.ExecuteInternal(ctx)
302302
*/
303-
createMeta(t, store, func(m *meta.Meta) {
303+
createMeta(t, store, func(m *meta.Mutator) {
304304
dbInfo := &model.DBInfo{
305305
ID: 1,
306306
Name: pmodel.NewCIStr(SchemaName),
@@ -409,7 +409,7 @@ func TestRewriteTableID(t *testing.T) {
409409
require.NoError(t, store.Close())
410410
}()
411411

412-
createMeta(t, store, func(m *meta.Meta) {
412+
createMeta(t, store, func(m *meta.Mutator) {
413413
dbInfo := &model.DBInfo{
414414
ID: 1,
415415
Name: pmodel.NewCIStr(SchemaName),

br/pkg/restore/internal/prealloc_db/db_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ func cloneTableInfos(
211211
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR)
212212
tableInfos = make([]*metautil.Table, 0, len(originTableInfos))
213213
err := kv.RunInNewTxn(ctx, dom.Store(), true, func(_ context.Context, txn kv.Transaction) error {
214-
allocater := meta.NewMeta(txn)
214+
allocater := meta.NewMutator(txn)
215215
id, e := allocater.GetGlobalID()
216216
if e != nil {
217217
return e

br/pkg/restore/log_client/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,7 +1167,7 @@ func (rc *LogClient) GenGlobalID(ctx context.Context) (int64, error) {
11671167
true,
11681168
func(ctx context.Context, txn kv.Transaction) error {
11691169
var e error
1170-
t := meta.NewMeta(txn)
1170+
t := meta.NewMutator(txn)
11711171
id, e = t.GenGlobalID()
11721172
return e
11731173
})
@@ -1187,7 +1187,7 @@ func (rc *LogClient) GenGlobalIDs(ctx context.Context, n int) ([]int64, error) {
11871187
true,
11881188
func(ctx context.Context, txn kv.Transaction) error {
11891189
var e error
1190-
t := meta.NewMeta(txn)
1190+
t := meta.NewMutator(txn)
11911191
ids, e = t.GenGlobalIDs(n)
11921192
return e
11931193
})
@@ -1206,7 +1206,7 @@ func (rc *LogClient) UpdateSchemaVersion(ctx context.Context) error {
12061206
storage,
12071207
true,
12081208
func(ctx context.Context, txn kv.Transaction) error {
1209-
t := meta.NewMeta(txn)
1209+
t := meta.NewMutator(txn)
12101210
var e error
12111211
// To trigger full-reload instead of diff-reload, we need to increase the schema version
12121212
// by at least `domain.LoadSchemaDiffVersionGapThreshold`.

br/pkg/restore/misc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ const maxUserTablesNum = 10
7575
// AssertUserDBsEmpty check whether user dbs exist in the cluster
7676
func AssertUserDBsEmpty(dom *domain.Domain) error {
7777
databases := dom.InfoSchema().AllSchemas()
78-
m := meta.NewSnapshotMeta(dom.Store().GetSnapshot(kv.MaxVersion))
78+
m := meta.NewReader(dom.Store().GetSnapshot(kv.MaxVersion))
7979
userTables := make([]string, 0, maxUserTablesNum+1)
8080
appendTables := func(dbName, tableName string) bool {
8181
if len(userTables) >= maxUserTablesNum {

br/pkg/restore/snap_client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ func (rc *SnapClient) AllocTableIDs(ctx context.Context, tables []*metautil.Tabl
278278
preallocedTableIDs := tidalloc.New(tables)
279279
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR)
280280
err := kv.RunInNewTxn(ctx, rc.GetDomain().Store(), true, func(_ context.Context, txn kv.Transaction) error {
281-
return preallocedTableIDs.Alloc(meta.NewMeta(txn))
281+
return preallocedTableIDs.Alloc(meta.NewMutator(txn))
282282
})
283283
if err != nil {
284284
return err

br/pkg/stream/stream_mgr.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func buildObserveTableRanges(
8585
backupTS uint64,
8686
) ([]kv.KeyRange, error) {
8787
snapshot := storage.GetSnapshot(kv.NewVersion(backupTS))
88-
m := meta.NewSnapshotMeta(snapshot)
88+
m := meta.NewReader(snapshot)
8989

9090
dbs, err := m.ListDatabases()
9191
if err != nil {

lightning/pkg/importer/meta_manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func newTableRestore(t *testing.T,
7777

7878
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnLightning)
7979
err = kv.RunInNewTxn(ctx, kvStore, false, func(ctx context.Context, txn kv.Transaction) error {
80-
m := meta.NewMeta(txn)
80+
m := meta.NewMutator(txn)
8181
if err := m.CreateDatabase(&model.DBInfo{ID: dbInfo.ID}); err != nil && !errors.ErrorEqual(err, meta.ErrDBExists) {
8282
return err
8383
}

pkg/autoid_service/autoid.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (alloc *autoIDValue) alloc4Unsigned(ctx context.Context, store kv.Storage,
8181

8282
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
8383
err := kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error {
84-
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
84+
idAcc := meta.NewMutator(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
8585
var err1 error
8686
newBase, err1 = idAcc.Get()
8787
if err1 != nil {
@@ -156,7 +156,7 @@ func (alloc *autoIDValue) alloc4Signed(ctx context.Context,
156156

157157
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
158158
err := kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error {
159-
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
159+
idAcc := meta.NewMutator(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
160160
var err1 error
161161
newBase, err1 = idAcc.Get()
162162
if err1 != nil {
@@ -222,7 +222,7 @@ func (alloc *autoIDValue) rebase4Unsigned(ctx context.Context,
222222
startTime := time.Now()
223223
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
224224
err := kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error {
225-
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
225+
idAcc := meta.NewMutator(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
226226
currentEnd, err1 := idAcc.Get()
227227
if err1 != nil {
228228
return err1
@@ -264,7 +264,7 @@ func (alloc *autoIDValue) rebase4Signed(ctx context.Context, store kv.Storage, d
264264
startTime := time.Now()
265265
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
266266
err := kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error {
267-
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
267+
idAcc := meta.NewMutator(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
268268
currentEnd, err1 := idAcc.Get()
269269
if err1 != nil {
270270
return err1
@@ -483,7 +483,7 @@ func (s *Service) allocAutoID(ctx context.Context, req *autoid.AutoIDRequest) (*
483483
var currentEnd int64
484484
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
485485
err := kv.RunInNewTxn(ctx, s.store, true, func(_ context.Context, txn kv.Transaction) error {
486-
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(req.DbID, req.TblID).IncrementID(model.TableInfoVersion5)
486+
idAcc := meta.NewMutator(txn).GetAutoIDAccessors(req.DbID, req.TblID).IncrementID(model.TableInfoVersion5)
487487
var err1 error
488488
currentEnd, err1 = idAcc.Get()
489489
if err1 != nil {
@@ -523,7 +523,7 @@ func (alloc *autoIDValue) forceRebase(ctx context.Context, store kv.Storage, dbI
523523
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
524524
var oldValue int64
525525
err := kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error {
526-
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
526+
idAcc := meta.NewMutator(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
527527
currentEnd, err1 := idAcc.Get()
528528
if err1 != nil {
529529
return err1

pkg/ddl/add_column.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ import (
5252
"go.uber.org/zap"
5353
)
5454

55-
func onAddColumn(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, err error) {
55+
func onAddColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64, err error) {
5656
// Handle the rolling back job.
5757
if job.IsRollingback() {
5858
ver, err = onDropColumn(jobCtx, t, job)

0 commit comments

Comments
 (0)