Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 41 additions & 19 deletions pkg/planner/core/memtable_infoschema_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ import (
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/set"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

Expand Down Expand Up @@ -110,24 +113,37 @@ func (e *InfoSchemaBaseExtractor) GetBase() *InfoSchemaBaseExtractor {
return e
}

// ListSchemas lists related schemas from predicate.
// ListSchemas lists all schemas from predicate. If no schema is specified, it lists
// all schemas in the storage.
func (e *InfoSchemaBaseExtractor) ListSchemas(is infoschema.InfoSchema) []pmodel.CIStr {
ec := e.extractableColumns
schemas := e.getSchemaObjectNames(ec.schema)
if len(schemas) == 0 {
extractedSchemas, unspecified := e.listPredicateSchemas(is)
if unspecified {
ret := is.AllSchemaNames()
slices.SortFunc(ret, func(a, b pmodel.CIStr) int {
return strings.Compare(a.L, b.L)
})
return filterSchemaObjectByRegexp(e, ec.schema, ret, extractStrCIStr)
return filterSchemaObjectByRegexp(e, e.extractableColumns.schema, ret, extractStrCIStr)
}
return extractedSchemas
}

// listPredicateSchemas lists all schemas specified in predicates.
// If no schema is specified in predicates, return `unspecified` as true.
func (e *InfoSchemaBaseExtractor) listPredicateSchemas(
is infoschema.InfoSchema,
) (schemas []pmodel.CIStr, unspecified bool) {
ec := e.extractableColumns
schemas = e.getSchemaObjectNames(ec.schema)
if len(schemas) == 0 {
return nil, true
}
ret := schemas[:0]
for _, s := range schemas {
if n, ok := is.SchemaByName(s); ok {
ret = append(ret, n.Name)
}
}
return filterSchemaObjectByRegexp(e, ec.schema, ret, extractStrCIStr)
return filterSchemaObjectByRegexp(e, ec.schema, ret, extractStrCIStr), false
}

// ListSchemasAndTables lists related tables and their corresponding schemas from predicate.
Expand All @@ -137,7 +153,6 @@ func (e *InfoSchemaBaseExtractor) ListSchemasAndTables(
is infoschema.InfoSchema,
) ([]pmodel.CIStr, []*model.TableInfo, error) {
ec := e.extractableColumns
schemas := e.ListSchemas(is)
var tableNames []pmodel.CIStr
if ec.table != "" {
tableNames = e.getSchemaObjectNames(ec.table)
Expand All @@ -150,7 +165,7 @@ func (e *InfoSchemaBaseExtractor) ListSchemasAndTables(
findTablesByID(is, tableIDs, tableNames, tableMap)
tableSlice := maps.Values(tableMap)
tableSlice = filterSchemaObjectByRegexp(e, ec.table, tableSlice, extractStrTableInfo)
return findSchemasForTables(ctx, is, schemas, tableSlice)
return findSchemasForTables(e, is, tableSlice)
}
}
if ec.partitionID != "" {
Expand All @@ -160,13 +175,15 @@ func (e *InfoSchemaBaseExtractor) ListSchemasAndTables(
findTablesByPartID(is, partIDs, tableNames, tableMap)
tableSlice := maps.Values(tableMap)
tableSlice = filterSchemaObjectByRegexp(e, ec.table, tableSlice, extractStrTableInfo)
return findSchemasForTables(ctx, is, schemas, tableSlice)
return findSchemasForTables(e, is, tableSlice)
}
}
if len(tableNames) > 0 {
tableNames = filterSchemaObjectByRegexp(e, ec.table, tableNames, extractStrCIStr)
schemas := e.ListSchemas(is)
return findTableAndSchemaByName(ctx, is, schemas, tableNames)
}
schemas := e.ListSchemas(is)
return listTablesForEachSchema(ctx, e, is, schemas)
Comment on lines 181 to 187
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if len(tableNames) > 0 {
tableNames = filterSchemaObjectByRegexp(e, ec.table, tableNames, extractStrCIStr)
schemas := e.ListSchemas(is)
return findTableAndSchemaByName(ctx, is, schemas, tableNames)
}
schemas := e.ListSchemas(is)
return listTablesForEachSchema(ctx, e, is, schemas)
schemas := e.ListSchemas(is)
if len(tableNames) > 0 {
tableNames = filterSchemaObjectByRegexp(e, ec.table, tableNames, extractStrCIStr)
return findTableAndSchemaByName(ctx, is, schemas, tableNames)
}
return listTablesForEachSchema(ctx, e, is, schemas)

}

Expand Down Expand Up @@ -723,23 +740,28 @@ func listTablesForEachSchema(
// returns a schema slice and a table slice that has the same length.
// Note that input arg "tableSlice" will be changed in place.
func findSchemasForTables(
ctx context.Context,
e *InfoSchemaBaseExtractor,
is infoschema.InfoSchema,
schemas []pmodel.CIStr,
tableSlice []*model.TableInfo,
) ([]pmodel.CIStr, []*model.TableInfo, error) {
schemas, unspecified := e.listPredicateSchemas(is)
schemaSlice := make([]pmodel.CIStr, 0, len(tableSlice))
for i, tbl := range tableSlice {
dbInfo, ok := is.SchemaByID(tbl.DBID)
Copy link
Contributor

@joechenrh joechenrh Nov 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seem like DBID field is not persisted, can we use this field to do search?

Sorry, I found it's set in GetTable

intest.Assert(ok)
if !ok {
logutil.BgLogger().Warn("schema not found for table info",
zap.Int64("tableID", tbl.ID), zap.Int64("dbID", tbl.DBID))
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something is wrong? how about add some log in this situation?

}
if unspecified { // all schemas should be included.
schemaSlice = append(schemaSlice, dbInfo.Name)
continue
}

found := false
for _, s := range schemas {
isTbl, err := is.TableByName(ctx, s, tbl.Name)
if err != nil {
if terror.ErrorEqual(err, infoschema.ErrTableNotExists) {
continue
}
return nil, nil, errors.Trace(err)
}
if isTbl.Meta().ID == tbl.ID {
if s.L == dbInfo.Name.L {
schemaSlice = append(schemaSlice, s)
found = true
break
Expand Down