From 07712799727011119b0b149a1e743148d3345796 Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 27 May 2024 12:05:49 +0800 Subject: [PATCH 1/4] This is an automated cherry-pick of #53445 Signed-off-by: ti-chi-bot --- domain/domain.go | 2 + executor/kvtest/BUILD.bazel | 4 + infoschema/cache.go | 75 +++++++++++++++++ infoschema/cache_test.go | 163 ++++++++++++++++++++++++++++++++++++ 4 files changed, 244 insertions(+) diff --git a/domain/domain.go b/domain/domain.go index 4d41e1254d37e..bfb1230e172a1 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -370,6 +370,8 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 if diff == nil { // Empty diff means the txn of generating schema version is committed, but the txn of `runDDLJob` is not or fail. // It is safe to skip the empty diff because the infoschema is new enough and consistent. + logutil.BgLogger().Info("diff load InfoSchema get empty schema diff", zap.Int64("version", usedVersion)) + do.infoCache.InsertEmptySchemaVersion(usedVersion) continue } diffs = append(diffs, diff) diff --git a/executor/kvtest/BUILD.bazel b/executor/kvtest/BUILD.bazel index c746c6013029f..7b6f3b0d6bacc 100644 --- a/executor/kvtest/BUILD.bazel +++ b/executor/kvtest/BUILD.bazel @@ -8,7 +8,11 @@ go_test( "main_test.go", ], flaky = True, +<<<<<<< HEAD:executor/kvtest/BUILD.bazel race = "on", +======= + shard_count = 8, +>>>>>>> 0ac2ad0252b (infoschema: fix issue of information schema cache miss cause by schema version gap (#53445)):pkg/infoschema/test/cachetest/BUILD.bazel deps = [ "//config", "//meta/autoid", diff --git a/infoschema/cache.go b/infoschema/cache.go index eb9fbc6c4857b..8f939c3941c70 100644 --- a/infoschema/cache.go +++ b/infoschema/cache.go @@ -40,6 +40,15 @@ type InfoCache struct { mu sync.RWMutex // cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order cache []schemaAndTimestamp +<<<<<<< HEAD:infoschema/cache.go +======= + + // emptySchemaVersions stores schema version which has no schema_diff. + emptySchemaVersions map[int64]struct{} + + r autoid.Requirement + Data *Data +>>>>>>> 0ac2ad0252b (infoschema: fix issue of information schema cache miss cause by schema version gap (#53445)):pkg/infoschema/cache.go } type schemaAndTimestamp struct { @@ -50,7 +59,14 @@ type schemaAndTimestamp struct { // NewCache creates a new InfoCache. func NewCache(capacity int) *InfoCache { return &InfoCache{ +<<<<<<< HEAD:infoschema/cache.go cache: make([]schemaAndTimestamp, 0, capacity), +======= + cache: make([]schemaAndTimestamp, 0, capacity), + emptySchemaVersions: make(map[int64]struct{}), + r: r, + Data: infoData, +>>>>>>> 0ac2ad0252b (infoschema: fix issue of information schema cache miss cause by schema version gap (#53445)):pkg/infoschema/cache.go } } @@ -78,6 +94,11 @@ func (h *InfoCache) Len() int { return len(h.cache) } +// GetEmptySchemaVersions returns emptySchemaVersions, exports for testing. +func (h *InfoCache) GetEmptySchemaVersions() map[int64]struct{} { + return h.emptySchemaVersions +} + func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) { logutil.BgLogger().Debug("SCHEMA CACHE get schema", zap.Uint64("timestamp", ts)) // search one by one instead of binary search, because the timestamp of a schema could be 0 @@ -94,6 +115,38 @@ func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) { // found the largest version before the given ts return is.infoschema, true } +<<<<<<< HEAD:infoschema/cache.go +======= + + if uint64(h.cache[i-1].timestamp) > ts { + // The first condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts. + lastVersion := h.cache[i-1].infoschema.SchemaMetaVersion() + currentVersion := is.infoschema.SchemaMetaVersion() + if lastVersion == currentVersion+1 { + // This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, + // but current(cache[i]) schema-version is not 9, then current schema may not suitable for ts. + return is.infoschema, true + } + if lastVersion > currentVersion { + found := true + for ver := currentVersion + 1; ver < lastVersion; ver++ { + _, ok := h.emptySchemaVersions[ver] + if !ok { + found = false + break + } + } + if found { + // This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, and + // current(cache[i]) schema-version is 8, then there is a gap exist, and if all the gap version can be found in cache.emptySchemaVersions + // which means those gap versions don't have schema info, then current schema is also suitable for ts. + return is.infoschema, true + } + } + } + // current schema is not suitable for ts, then break the loop to avoid the unnecessary search. + break +>>>>>>> 0ac2ad0252b (infoschema: fix issue of information schema cache miss cause by schema version gap (#53445)):pkg/infoschema/cache.go } logutil.BgLogger().Debug("SCHEMA CACHE no schema found") @@ -200,3 +253,25 @@ func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool { return true } + +// InsertEmptySchemaVersion inserts empty schema version into a map. If exceeded the cache capacity, remove the oldest version. +func (h *InfoCache) InsertEmptySchemaVersion(version int64) { + h.mu.Lock() + defer h.mu.Unlock() + + h.emptySchemaVersions[version] = struct{}{} + if len(h.emptySchemaVersions) > cap(h.cache) { + // remove oldest version. + versions := make([]int64, 0, len(h.emptySchemaVersions)) + for ver := range h.emptySchemaVersions { + versions = append(versions, ver) + } + sort.Slice(versions, func(i, j int) bool { return versions[i] < versions[j] }) + for _, ver := range versions { + delete(h.emptySchemaVersions, ver) + if len(h.emptySchemaVersions) <= cap(h.cache) { + break + } + } + } +} diff --git a/infoschema/cache_test.go b/infoschema/cache_test.go index 60aad8bbdd985..4b6ad5a7decad 100644 --- a/infoschema/cache_test.go +++ b/infoschema/cache_test.go @@ -177,3 +177,166 @@ func TestGetByTimestamp(t *testing.T) { require.Equal(t, is3, ic.GetBySnapshotTS(3)) require.Equal(t, 3, ic.Len()) } +<<<<<<< HEAD:infoschema/cache_test.go +======= + +func TestReSize(t *testing.T) { + ic := infoschema.NewCache(nil, 2) + require.NotNil(t, ic) + is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1) + ic.Insert(is1, 1) + is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2) + ic.Insert(is2, 2) + + ic.ReSize(3) + require.Equal(t, 2, ic.Size()) + require.Equal(t, is1, ic.GetByVersion(1)) + require.Equal(t, is2, ic.GetByVersion(2)) + is3 := infoschema.MockInfoSchemaWithSchemaVer(nil, 3) + require.True(t, ic.Insert(is3, 3)) + require.Equal(t, is1, ic.GetByVersion(1)) + require.Equal(t, is2, ic.GetByVersion(2)) + require.Equal(t, is3, ic.GetByVersion(3)) + + ic.ReSize(1) + require.Equal(t, 1, ic.Size()) + require.Nil(t, ic.GetByVersion(1)) + require.Nil(t, ic.GetByVersion(2)) + require.Equal(t, is3, ic.GetByVersion(3)) + require.False(t, ic.Insert(is2, 2)) + require.Equal(t, 1, ic.Size()) + is4 := infoschema.MockInfoSchemaWithSchemaVer(nil, 4) + require.True(t, ic.Insert(is4, 4)) + require.Equal(t, 1, ic.Size()) + require.Nil(t, ic.GetByVersion(1)) + require.Nil(t, ic.GetByVersion(2)) + require.Nil(t, ic.GetByVersion(3)) + require.Equal(t, is4, ic.GetByVersion(4)) +} + +func TestCacheWithSchemaTsZero(t *testing.T) { + ic := infoschema.NewCache(nil, 16) + require.NotNil(t, ic) + + for i := 1; i <= 8; i++ { + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i)) + } + + checkFn := func(start, end int64, exist bool) { + require.True(t, start <= end) + latestSchemaVersion := ic.GetLatest().SchemaMetaVersion() + for ts := start; ts <= end; ts++ { + is := ic.GetBySnapshotTS(uint64(ts)) + if exist { + require.NotNil(t, is, fmt.Sprintf("ts %d", ts)) + if ts > latestSchemaVersion { + require.Equal(t, latestSchemaVersion, is.SchemaMetaVersion(), fmt.Sprintf("ts %d", ts)) + } else { + require.Equal(t, ts, is.SchemaMetaVersion(), fmt.Sprintf("ts %d", ts)) + } + } else { + require.Nil(t, is, fmt.Sprintf("ts %d", ts)) + } + } + } + checkFn(1, 8, true) + checkFn(8, 10, true) + + // mock for meet error There is no Write MVCC info for the schema version + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 9), 0) + checkFn(1, 7, true) + checkFn(8, 9, false) + checkFn(9, 10, false) + + for i := 10; i <= 16; i++ { + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i)) + checkFn(1, 7, true) + checkFn(8, 9, false) + checkFn(10, 16, true) + } + require.Equal(t, 16, ic.Size()) + + // refill the cache + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 9), 9) + checkFn(1, 16, true) + require.Equal(t, 16, ic.Size()) + + // Test more than capacity + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 17), 17) + checkFn(1, 1, false) + checkFn(2, 17, true) + checkFn(2, 20, true) + require.Equal(t, 16, ic.Size()) + + // Test for there is a hole in the middle. + ic = infoschema.NewCache(nil, 16) + + // mock for restart with full load the latest version schema. + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 100), 100) + checkFn(1, 99, false) + checkFn(100, 100, true) + + for i := 1; i <= 16; i++ { + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i)) + } + checkFn(1, 1, false) + checkFn(2, 15, true) + checkFn(16, 16, false) + checkFn(100, 100, true) + require.Equal(t, 16, ic.Size()) + + for i := 85; i < 100; i++ { + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i)) + } + checkFn(1, 84, false) + checkFn(85, 100, true) + require.Equal(t, 16, ic.Size()) + + // Test cache with schema version hole, which is cause by schema version doesn't has related schema-diff. + ic = infoschema.NewCache(nil, 16) + require.NotNil(t, ic) + for i := 1; i <= 8; i++ { + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i)) + } + checkFn(1, 10, true) + // mock for schema version hole, schema-version 9 is missing. + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 10), 10) + checkFn(1, 7, true) + // without empty schema version map, get snapshot by ts 8, 9 will both failed. + checkFn(8, 9, false) + checkFn(10, 10, true) + // add empty schema version 9. + ic.InsertEmptySchemaVersion(9) + // after set empty schema version, get snapshot by ts 8, 9 will both success. + checkFn(1, 8, true) + checkFn(10, 10, true) + is := ic.GetBySnapshotTS(uint64(9)) + require.NotNil(t, is) + // since schema version 9 is empty, so get by ts 9 will get schema which version is 8. + require.Equal(t, int64(8), is.SchemaMetaVersion()) +} + +func TestCacheEmptySchemaVersion(t *testing.T) { + ic := infoschema.NewCache(nil, 16) + require.NotNil(t, ic) + require.Equal(t, 0, len(ic.GetEmptySchemaVersions())) + for i := 0; i < 16; i++ { + ic.InsertEmptySchemaVersion(int64(i)) + } + emptyVersions := ic.GetEmptySchemaVersions() + require.Equal(t, 16, len(emptyVersions)) + for i := 0; i < 16; i++ { + _, ok := emptyVersions[int64(i)] + require.True(t, ok) + } + for i := 16; i < 20; i++ { + ic.InsertEmptySchemaVersion(int64(i)) + } + emptyVersions = ic.GetEmptySchemaVersions() + require.Equal(t, 16, len(emptyVersions)) + for i := 4; i < 20; i++ { + _, ok := emptyVersions[int64(i)] + require.True(t, ok) + } +} +>>>>>>> 0ac2ad0252b (infoschema: fix issue of information schema cache miss cause by schema version gap (#53445)):pkg/infoschema/test/cachetest/cache_test.go From 6daf8fe7715806480b7efda916f820d5cb3664dc Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Mon, 27 May 2024 17:28:34 +0800 Subject: [PATCH 2/4] Revert "This is an automated cherry-pick of #53445" This reverts commit 07712799727011119b0b149a1e743148d3345796. --- domain/domain.go | 2 - executor/kvtest/BUILD.bazel | 4 - infoschema/cache.go | 75 ----------------- infoschema/cache_test.go | 163 ------------------------------------ 4 files changed, 244 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index bfb1230e172a1..4d41e1254d37e 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -370,8 +370,6 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 if diff == nil { // Empty diff means the txn of generating schema version is committed, but the txn of `runDDLJob` is not or fail. // It is safe to skip the empty diff because the infoschema is new enough and consistent. - logutil.BgLogger().Info("diff load InfoSchema get empty schema diff", zap.Int64("version", usedVersion)) - do.infoCache.InsertEmptySchemaVersion(usedVersion) continue } diffs = append(diffs, diff) diff --git a/executor/kvtest/BUILD.bazel b/executor/kvtest/BUILD.bazel index 7b6f3b0d6bacc..c746c6013029f 100644 --- a/executor/kvtest/BUILD.bazel +++ b/executor/kvtest/BUILD.bazel @@ -8,11 +8,7 @@ go_test( "main_test.go", ], flaky = True, -<<<<<<< HEAD:executor/kvtest/BUILD.bazel race = "on", -======= - shard_count = 8, ->>>>>>> 0ac2ad0252b (infoschema: fix issue of information schema cache miss cause by schema version gap (#53445)):pkg/infoschema/test/cachetest/BUILD.bazel deps = [ "//config", "//meta/autoid", diff --git a/infoschema/cache.go b/infoschema/cache.go index 8f939c3941c70..eb9fbc6c4857b 100644 --- a/infoschema/cache.go +++ b/infoschema/cache.go @@ -40,15 +40,6 @@ type InfoCache struct { mu sync.RWMutex // cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order cache []schemaAndTimestamp -<<<<<<< HEAD:infoschema/cache.go -======= - - // emptySchemaVersions stores schema version which has no schema_diff. - emptySchemaVersions map[int64]struct{} - - r autoid.Requirement - Data *Data ->>>>>>> 0ac2ad0252b (infoschema: fix issue of information schema cache miss cause by schema version gap (#53445)):pkg/infoschema/cache.go } type schemaAndTimestamp struct { @@ -59,14 +50,7 @@ type schemaAndTimestamp struct { // NewCache creates a new InfoCache. func NewCache(capacity int) *InfoCache { return &InfoCache{ -<<<<<<< HEAD:infoschema/cache.go cache: make([]schemaAndTimestamp, 0, capacity), -======= - cache: make([]schemaAndTimestamp, 0, capacity), - emptySchemaVersions: make(map[int64]struct{}), - r: r, - Data: infoData, ->>>>>>> 0ac2ad0252b (infoschema: fix issue of information schema cache miss cause by schema version gap (#53445)):pkg/infoschema/cache.go } } @@ -94,11 +78,6 @@ func (h *InfoCache) Len() int { return len(h.cache) } -// GetEmptySchemaVersions returns emptySchemaVersions, exports for testing. -func (h *InfoCache) GetEmptySchemaVersions() map[int64]struct{} { - return h.emptySchemaVersions -} - func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) { logutil.BgLogger().Debug("SCHEMA CACHE get schema", zap.Uint64("timestamp", ts)) // search one by one instead of binary search, because the timestamp of a schema could be 0 @@ -115,38 +94,6 @@ func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) { // found the largest version before the given ts return is.infoschema, true } -<<<<<<< HEAD:infoschema/cache.go -======= - - if uint64(h.cache[i-1].timestamp) > ts { - // The first condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts. - lastVersion := h.cache[i-1].infoschema.SchemaMetaVersion() - currentVersion := is.infoschema.SchemaMetaVersion() - if lastVersion == currentVersion+1 { - // This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, - // but current(cache[i]) schema-version is not 9, then current schema may not suitable for ts. - return is.infoschema, true - } - if lastVersion > currentVersion { - found := true - for ver := currentVersion + 1; ver < lastVersion; ver++ { - _, ok := h.emptySchemaVersions[ver] - if !ok { - found = false - break - } - } - if found { - // This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, and - // current(cache[i]) schema-version is 8, then there is a gap exist, and if all the gap version can be found in cache.emptySchemaVersions - // which means those gap versions don't have schema info, then current schema is also suitable for ts. - return is.infoschema, true - } - } - } - // current schema is not suitable for ts, then break the loop to avoid the unnecessary search. - break ->>>>>>> 0ac2ad0252b (infoschema: fix issue of information schema cache miss cause by schema version gap (#53445)):pkg/infoschema/cache.go } logutil.BgLogger().Debug("SCHEMA CACHE no schema found") @@ -253,25 +200,3 @@ func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool { return true } - -// InsertEmptySchemaVersion inserts empty schema version into a map. If exceeded the cache capacity, remove the oldest version. -func (h *InfoCache) InsertEmptySchemaVersion(version int64) { - h.mu.Lock() - defer h.mu.Unlock() - - h.emptySchemaVersions[version] = struct{}{} - if len(h.emptySchemaVersions) > cap(h.cache) { - // remove oldest version. - versions := make([]int64, 0, len(h.emptySchemaVersions)) - for ver := range h.emptySchemaVersions { - versions = append(versions, ver) - } - sort.Slice(versions, func(i, j int) bool { return versions[i] < versions[j] }) - for _, ver := range versions { - delete(h.emptySchemaVersions, ver) - if len(h.emptySchemaVersions) <= cap(h.cache) { - break - } - } - } -} diff --git a/infoschema/cache_test.go b/infoschema/cache_test.go index 4b6ad5a7decad..60aad8bbdd985 100644 --- a/infoschema/cache_test.go +++ b/infoschema/cache_test.go @@ -177,166 +177,3 @@ func TestGetByTimestamp(t *testing.T) { require.Equal(t, is3, ic.GetBySnapshotTS(3)) require.Equal(t, 3, ic.Len()) } -<<<<<<< HEAD:infoschema/cache_test.go -======= - -func TestReSize(t *testing.T) { - ic := infoschema.NewCache(nil, 2) - require.NotNil(t, ic) - is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1) - ic.Insert(is1, 1) - is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2) - ic.Insert(is2, 2) - - ic.ReSize(3) - require.Equal(t, 2, ic.Size()) - require.Equal(t, is1, ic.GetByVersion(1)) - require.Equal(t, is2, ic.GetByVersion(2)) - is3 := infoschema.MockInfoSchemaWithSchemaVer(nil, 3) - require.True(t, ic.Insert(is3, 3)) - require.Equal(t, is1, ic.GetByVersion(1)) - require.Equal(t, is2, ic.GetByVersion(2)) - require.Equal(t, is3, ic.GetByVersion(3)) - - ic.ReSize(1) - require.Equal(t, 1, ic.Size()) - require.Nil(t, ic.GetByVersion(1)) - require.Nil(t, ic.GetByVersion(2)) - require.Equal(t, is3, ic.GetByVersion(3)) - require.False(t, ic.Insert(is2, 2)) - require.Equal(t, 1, ic.Size()) - is4 := infoschema.MockInfoSchemaWithSchemaVer(nil, 4) - require.True(t, ic.Insert(is4, 4)) - require.Equal(t, 1, ic.Size()) - require.Nil(t, ic.GetByVersion(1)) - require.Nil(t, ic.GetByVersion(2)) - require.Nil(t, ic.GetByVersion(3)) - require.Equal(t, is4, ic.GetByVersion(4)) -} - -func TestCacheWithSchemaTsZero(t *testing.T) { - ic := infoschema.NewCache(nil, 16) - require.NotNil(t, ic) - - for i := 1; i <= 8; i++ { - ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i)) - } - - checkFn := func(start, end int64, exist bool) { - require.True(t, start <= end) - latestSchemaVersion := ic.GetLatest().SchemaMetaVersion() - for ts := start; ts <= end; ts++ { - is := ic.GetBySnapshotTS(uint64(ts)) - if exist { - require.NotNil(t, is, fmt.Sprintf("ts %d", ts)) - if ts > latestSchemaVersion { - require.Equal(t, latestSchemaVersion, is.SchemaMetaVersion(), fmt.Sprintf("ts %d", ts)) - } else { - require.Equal(t, ts, is.SchemaMetaVersion(), fmt.Sprintf("ts %d", ts)) - } - } else { - require.Nil(t, is, fmt.Sprintf("ts %d", ts)) - } - } - } - checkFn(1, 8, true) - checkFn(8, 10, true) - - // mock for meet error There is no Write MVCC info for the schema version - ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 9), 0) - checkFn(1, 7, true) - checkFn(8, 9, false) - checkFn(9, 10, false) - - for i := 10; i <= 16; i++ { - ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i)) - checkFn(1, 7, true) - checkFn(8, 9, false) - checkFn(10, 16, true) - } - require.Equal(t, 16, ic.Size()) - - // refill the cache - ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 9), 9) - checkFn(1, 16, true) - require.Equal(t, 16, ic.Size()) - - // Test more than capacity - ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 17), 17) - checkFn(1, 1, false) - checkFn(2, 17, true) - checkFn(2, 20, true) - require.Equal(t, 16, ic.Size()) - - // Test for there is a hole in the middle. - ic = infoschema.NewCache(nil, 16) - - // mock for restart with full load the latest version schema. - ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 100), 100) - checkFn(1, 99, false) - checkFn(100, 100, true) - - for i := 1; i <= 16; i++ { - ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i)) - } - checkFn(1, 1, false) - checkFn(2, 15, true) - checkFn(16, 16, false) - checkFn(100, 100, true) - require.Equal(t, 16, ic.Size()) - - for i := 85; i < 100; i++ { - ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i)) - } - checkFn(1, 84, false) - checkFn(85, 100, true) - require.Equal(t, 16, ic.Size()) - - // Test cache with schema version hole, which is cause by schema version doesn't has related schema-diff. - ic = infoschema.NewCache(nil, 16) - require.NotNil(t, ic) - for i := 1; i <= 8; i++ { - ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i)) - } - checkFn(1, 10, true) - // mock for schema version hole, schema-version 9 is missing. - ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 10), 10) - checkFn(1, 7, true) - // without empty schema version map, get snapshot by ts 8, 9 will both failed. - checkFn(8, 9, false) - checkFn(10, 10, true) - // add empty schema version 9. - ic.InsertEmptySchemaVersion(9) - // after set empty schema version, get snapshot by ts 8, 9 will both success. - checkFn(1, 8, true) - checkFn(10, 10, true) - is := ic.GetBySnapshotTS(uint64(9)) - require.NotNil(t, is) - // since schema version 9 is empty, so get by ts 9 will get schema which version is 8. - require.Equal(t, int64(8), is.SchemaMetaVersion()) -} - -func TestCacheEmptySchemaVersion(t *testing.T) { - ic := infoschema.NewCache(nil, 16) - require.NotNil(t, ic) - require.Equal(t, 0, len(ic.GetEmptySchemaVersions())) - for i := 0; i < 16; i++ { - ic.InsertEmptySchemaVersion(int64(i)) - } - emptyVersions := ic.GetEmptySchemaVersions() - require.Equal(t, 16, len(emptyVersions)) - for i := 0; i < 16; i++ { - _, ok := emptyVersions[int64(i)] - require.True(t, ok) - } - for i := 16; i < 20; i++ { - ic.InsertEmptySchemaVersion(int64(i)) - } - emptyVersions = ic.GetEmptySchemaVersions() - require.Equal(t, 16, len(emptyVersions)) - for i := 4; i < 20; i++ { - _, ok := emptyVersions[int64(i)] - require.True(t, ok) - } -} ->>>>>>> 0ac2ad0252b (infoschema: fix issue of information schema cache miss cause by schema version gap (#53445)):pkg/infoschema/test/cachetest/cache_test.go From add7db2292f8e8c919002a647d64d644b629e6a2 Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 6 Nov 2023 23:53:11 +0800 Subject: [PATCH 3/4] infoschema: refine info cache logic to reduce the impact of DDL on information schema cache (#48284) close pingcap/tidb#48285 Signed-off-by: crazycs520 --- infoschema/cache.go | 28 +++++++++++--- infoschema/cache_test.go | 80 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 6 deletions(-) diff --git a/infoschema/cache.go b/infoschema/cache.go index eb9fbc6c4857b..14a512d691670 100644 --- a/infoschema/cache.go +++ b/infoschema/cache.go @@ -54,6 +54,13 @@ func NewCache(capacity int) *InfoCache { } } +// Size returns the size of the cache, export for test. +func (h *InfoCache) Size() int { + h.mu.Lock() + defer h.mu.Unlock() + return len(h.cache) +} + // Reset resets the cache. func (h *InfoCache) Reset(capacity int) { h.mu.Lock() @@ -85,15 +92,24 @@ func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) { // moreover, the most likely hit element in the array is the first one in steady mode // thus it may have better performance than binary search for i, is := range h.cache { - if is.timestamp == 0 || (i > 0 && h.cache[i-1].infoschema.SchemaMetaVersion() != is.infoschema.SchemaMetaVersion()+1) { - // the schema version doesn't have a timestamp or there is a gap in the schema cache - // ignore all the schema cache equals or less than this version in search by timestamp - break + if is.timestamp == 0 || ts < uint64(is.timestamp) { + // is.timestamp == 0 means the schema ts is unknown, so we can't use it, then just skip it. + // ts < is.timestamp means the schema is newer than ts, so we can't use it too, just skip it to find the older one. + continue + } + // ts >= is.timestamp must be true after the above condition. + if i == 0 { + // the first element is the latest schema, so we can return it directly. + return is.infoschema, true } - if ts >= uint64(is.timestamp) { - // found the largest version before the given ts + if h.cache[i-1].infoschema.SchemaMetaVersion() == is.infoschema.SchemaMetaVersion()+1 && uint64(h.cache[i-1].timestamp) > ts { + // This first condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, + // but current(cache[i]) schema-version is not 9, then current schema is not suitable for ts. + // The second condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts. return is.infoschema, true } + // current schema is not suitable for ts, then break the loop to avoid the unnecessary search. + break } logutil.BgLogger().Debug("SCHEMA CACHE no schema found") diff --git a/infoschema/cache_test.go b/infoschema/cache_test.go index 60aad8bbdd985..0af4b8ca98c84 100644 --- a/infoschema/cache_test.go +++ b/infoschema/cache_test.go @@ -15,6 +15,7 @@ package infoschema_test import ( + "fmt" "testing" "github.com/pingcap/tidb/infoschema" @@ -177,3 +178,82 @@ func TestGetByTimestamp(t *testing.T) { require.Equal(t, is3, ic.GetBySnapshotTS(3)) require.Equal(t, 3, ic.Len()) } + +func TestCacheWithSchemaTsZero(t *testing.T) { + ic := infoschema.NewCache(16) + require.NotNil(t, ic) + + for i := 1; i <= 8; i++ { + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i)) + } + + checkFn := func(start, end int64, exist bool) { + require.True(t, start <= end) + latestSchemaVersion := ic.GetLatest().SchemaMetaVersion() + for ts := start; ts <= end; ts++ { + is := ic.GetBySnapshotTS(uint64(ts)) + if exist { + require.NotNil(t, is, fmt.Sprintf("ts %d", ts)) + if ts > latestSchemaVersion { + require.Equal(t, latestSchemaVersion, is.SchemaMetaVersion(), fmt.Sprintf("ts %d", ts)) + } else { + require.Equal(t, ts, is.SchemaMetaVersion(), fmt.Sprintf("ts %d", ts)) + } + } else { + require.Nil(t, is, fmt.Sprintf("ts %d", ts)) + } + } + } + checkFn(1, 8, true) + checkFn(8, 10, true) + + // mock for meet error There is no Write MVCC info for the schema version + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 9), 0) + checkFn(1, 7, true) + checkFn(8, 9, false) + checkFn(9, 10, false) + + for i := 10; i <= 16; i++ { + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i)) + checkFn(1, 7, true) + checkFn(8, 9, false) + checkFn(10, 16, true) + } + require.Equal(t, 16, ic.Size()) + + // refill the cache + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 9), 9) + checkFn(1, 16, true) + require.Equal(t, 16, ic.Size()) + + // Test more than capacity + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 17), 17) + checkFn(1, 1, false) + checkFn(2, 17, true) + checkFn(2, 20, true) + require.Equal(t, 16, ic.Size()) + + // Test for there is a hole in the middle. + ic = infoschema.NewCache(16) + + // mock for restart with full load the latest version schema. + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 100), 100) + checkFn(1, 99, false) + checkFn(100, 100, true) + + for i := 1; i <= 16; i++ { + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i)) + } + checkFn(1, 1, false) + checkFn(2, 15, true) + checkFn(16, 16, false) + checkFn(100, 100, true) + require.Equal(t, 16, ic.Size()) + + for i := 85; i < 100; i++ { + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i)) + } + checkFn(1, 84, false) + checkFn(85, 100, true) + require.Equal(t, 16, ic.Size()) +} From 07fbf656ec80968a75581f0cda6c799365ac4825 Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 27 May 2024 12:05:49 +0800 Subject: [PATCH 4/4] infoschema: fix issue of information schema cache miss cause by schema version gap (#53445) close pingcap/tidb#53428 Signed-off-by: crazycs520 --- domain/domain.go | 2 ++ infoschema/cache.go | 64 ++++++++++++++++++++++++++++++++++++---- infoschema/cache_test.go | 47 +++++++++++++++++++++++++++++ 3 files changed, 107 insertions(+), 6 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 4d41e1254d37e..bfb1230e172a1 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -370,6 +370,8 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 if diff == nil { // Empty diff means the txn of generating schema version is committed, but the txn of `runDDLJob` is not or fail. // It is safe to skip the empty diff because the infoschema is new enough and consistent. + logutil.BgLogger().Info("diff load InfoSchema get empty schema diff", zap.Int64("version", usedVersion)) + do.infoCache.InsertEmptySchemaVersion(usedVersion) continue } diffs = append(diffs, diff) diff --git a/infoschema/cache.go b/infoschema/cache.go index 14a512d691670..fd72906a9e5a9 100644 --- a/infoschema/cache.go +++ b/infoschema/cache.go @@ -40,6 +40,9 @@ type InfoCache struct { mu sync.RWMutex // cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order cache []schemaAndTimestamp + + // emptySchemaVersions stores schema version which has no schema_diff. + emptySchemaVersions map[int64]struct{} } type schemaAndTimestamp struct { @@ -50,7 +53,8 @@ type schemaAndTimestamp struct { // NewCache creates a new InfoCache. func NewCache(capacity int) *InfoCache { return &InfoCache{ - cache: make([]schemaAndTimestamp, 0, capacity), + cache: make([]schemaAndTimestamp, 0, capacity), + emptySchemaVersions: make(map[int64]struct{}), } } @@ -85,6 +89,11 @@ func (h *InfoCache) Len() int { return len(h.cache) } +// GetEmptySchemaVersions returns emptySchemaVersions, exports for testing. +func (h *InfoCache) GetEmptySchemaVersions() map[int64]struct{} { + return h.emptySchemaVersions +} + func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) { logutil.BgLogger().Debug("SCHEMA CACHE get schema", zap.Uint64("timestamp", ts)) // search one by one instead of binary search, because the timestamp of a schema could be 0 @@ -102,11 +111,32 @@ func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) { // the first element is the latest schema, so we can return it directly. return is.infoschema, true } - if h.cache[i-1].infoschema.SchemaMetaVersion() == is.infoschema.SchemaMetaVersion()+1 && uint64(h.cache[i-1].timestamp) > ts { - // This first condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, - // but current(cache[i]) schema-version is not 9, then current schema is not suitable for ts. - // The second condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts. - return is.infoschema, true + + if uint64(h.cache[i-1].timestamp) > ts { + // The first condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts. + lastVersion := h.cache[i-1].infoschema.SchemaMetaVersion() + currentVersion := is.infoschema.SchemaMetaVersion() + if lastVersion == currentVersion+1 { + // This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, + // but current(cache[i]) schema-version is not 9, then current schema may not suitable for ts. + return is.infoschema, true + } + if lastVersion > currentVersion { + found := true + for ver := currentVersion + 1; ver < lastVersion; ver++ { + _, ok := h.emptySchemaVersions[ver] + if !ok { + found = false + break + } + } + if found { + // This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, and + // current(cache[i]) schema-version is 8, then there is a gap exist, and if all the gap version can be found in cache.emptySchemaVersions + // which means those gap versions don't have schema info, then current schema is also suitable for ts. + return is.infoschema, true + } + } } // current schema is not suitable for ts, then break the loop to avoid the unnecessary search. break @@ -216,3 +246,25 @@ func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool { return true } + +// InsertEmptySchemaVersion inserts empty schema version into a map. If exceeded the cache capacity, remove the oldest version. +func (h *InfoCache) InsertEmptySchemaVersion(version int64) { + h.mu.Lock() + defer h.mu.Unlock() + + h.emptySchemaVersions[version] = struct{}{} + if len(h.emptySchemaVersions) > cap(h.cache) { + // remove oldest version. + versions := make([]int64, 0, len(h.emptySchemaVersions)) + for ver := range h.emptySchemaVersions { + versions = append(versions, ver) + } + sort.Slice(versions, func(i, j int) bool { return versions[i] < versions[j] }) + for _, ver := range versions { + delete(h.emptySchemaVersions, ver) + if len(h.emptySchemaVersions) <= cap(h.cache) { + break + } + } + } +} diff --git a/infoschema/cache_test.go b/infoschema/cache_test.go index 0af4b8ca98c84..25305daa7e09d 100644 --- a/infoschema/cache_test.go +++ b/infoschema/cache_test.go @@ -256,4 +256,51 @@ func TestCacheWithSchemaTsZero(t *testing.T) { checkFn(1, 84, false) checkFn(85, 100, true) require.Equal(t, 16, ic.Size()) + + // Test cache with schema version hole, which is cause by schema version doesn't has related schema-diff. + ic = infoschema.NewCache(16) + require.NotNil(t, ic) + for i := 1; i <= 8; i++ { + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i)) + } + checkFn(1, 10, true) + // mock for schema version hole, schema-version 9 is missing. + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 10), 10) + checkFn(1, 7, true) + // without empty schema version map, get snapshot by ts 8, 9 will both failed. + checkFn(8, 9, false) + checkFn(10, 10, true) + // add empty schema version 9. + ic.InsertEmptySchemaVersion(9) + // after set empty schema version, get snapshot by ts 8, 9 will both success. + checkFn(1, 8, true) + checkFn(10, 10, true) + is := ic.GetBySnapshotTS(uint64(9)) + require.NotNil(t, is) + // since schema version 9 is empty, so get by ts 9 will get schema which version is 8. + require.Equal(t, int64(8), is.SchemaMetaVersion()) +} + +func TestCacheEmptySchemaVersion(t *testing.T) { + ic := infoschema.NewCache(16) + require.NotNil(t, ic) + require.Equal(t, 0, len(ic.GetEmptySchemaVersions())) + for i := 0; i < 16; i++ { + ic.InsertEmptySchemaVersion(int64(i)) + } + emptyVersions := ic.GetEmptySchemaVersions() + require.Equal(t, 16, len(emptyVersions)) + for i := 0; i < 16; i++ { + _, ok := emptyVersions[int64(i)] + require.True(t, ok) + } + for i := 16; i < 20; i++ { + ic.InsertEmptySchemaVersion(int64(i)) + } + emptyVersions = ic.GetEmptySchemaVersions() + require.Equal(t, 16, len(emptyVersions)) + for i := 4; i < 20; i++ { + _, ok := emptyVersions[int64(i)] + require.True(t, ok) + } }