Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 41 additions & 19 deletions pkg/planner/core/memtable_infoschema_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ import (
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/set"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

Expand Down Expand Up @@ -110,24 +113,37 @@ func (e *InfoSchemaBaseExtractor) GetBase() *InfoSchemaBaseExtractor {
return e
}

// ListSchemas lists related schemas from predicate.
// ListSchemas lists all schemas from predicate. If no schema is specified, it lists
// all schemas in the storage.
func (e *InfoSchemaBaseExtractor) ListSchemas(is infoschema.InfoSchema) []pmodel.CIStr {
ec := e.extractableColumns
schemas := e.getSchemaObjectNames(ec.schema)
if len(schemas) == 0 {
extractedSchemas, unspecified := e.listPredicateSchemas(is)
if unspecified {
ret := is.AllSchemaNames()
slices.SortFunc(ret, func(a, b pmodel.CIStr) int {
return strings.Compare(a.L, b.L)
})
return filterSchemaObjectByRegexp(e, ec.schema, ret, extractStrCIStr)
return filterSchemaObjectByRegexp(e, e.extractableColumns.schema, ret, extractStrCIStr)
}
return extractedSchemas
}

// listPredicateSchemas lists all schemas specified in predicates.
// If no schema is specified in predicates, return `unspecified` as true.
func (e *InfoSchemaBaseExtractor) listPredicateSchemas(
is infoschema.InfoSchema,
) (schemas []pmodel.CIStr, unspecified bool) {
ec := e.extractableColumns
schemas = e.getSchemaObjectNames(ec.schema)
if len(schemas) == 0 {
return nil, true
}
ret := schemas[:0]
for _, s := range schemas {
if n, ok := is.SchemaByName(s); ok {
ret = append(ret, n.Name)
}
}
return filterSchemaObjectByRegexp(e, ec.schema, ret, extractStrCIStr)
return filterSchemaObjectByRegexp(e, ec.schema, ret, extractStrCIStr), false
}

// ListSchemasAndTables lists related tables and their corresponding schemas from predicate.
Expand All @@ -137,7 +153,6 @@ func (e *InfoSchemaBaseExtractor) ListSchemasAndTables(
is infoschema.InfoSchema,
) ([]pmodel.CIStr, []*model.TableInfo, error) {
ec := e.extractableColumns
schemas := e.ListSchemas(is)
var tableNames []pmodel.CIStr
if ec.table != "" {
tableNames = e.getSchemaObjectNames(ec.table)
Expand All @@ -150,7 +165,7 @@ func (e *InfoSchemaBaseExtractor) ListSchemasAndTables(
findTablesByID(is, tableIDs, tableNames, tableMap)
tableSlice := maps.Values(tableMap)
tableSlice = filterSchemaObjectByRegexp(e, ec.table, tableSlice, extractStrTableInfo)
return findSchemasForTables(ctx, is, schemas, tableSlice)
return findSchemasForTables(e, is, tableSlice)
}
}
if ec.partitionID != "" {
Expand All @@ -160,13 +175,15 @@ func (e *InfoSchemaBaseExtractor) ListSchemasAndTables(
findTablesByPartID(is, partIDs, tableNames, tableMap)
tableSlice := maps.Values(tableMap)
tableSlice = filterSchemaObjectByRegexp(e, ec.table, tableSlice, extractStrTableInfo)
return findSchemasForTables(ctx, is, schemas, tableSlice)
return findSchemasForTables(e, is, tableSlice)
}
}
if len(tableNames) > 0 {
tableNames = filterSchemaObjectByRegexp(e, ec.table, tableNames, extractStrCIStr)
schemas := e.ListSchemas(is)
return findTableAndSchemaByName(ctx, is, schemas, tableNames)
}
schemas := e.ListSchemas(is)
return listTablesForEachSchema(ctx, e, is, schemas)
}

Expand Down Expand Up @@ -720,23 +737,28 @@ func listTablesForEachSchema(
// returns a schema slice and a table slice that has the same length.
// Note that input arg "tableSlice" will be changed in place.
func findSchemasForTables(
ctx context.Context,
e *InfoSchemaBaseExtractor,
is infoschema.InfoSchema,
schemas []pmodel.CIStr,
tableSlice []*model.TableInfo,
) ([]pmodel.CIStr, []*model.TableInfo, error) {
schemas, unspecified := e.listPredicateSchemas(is)
schemaSlice := make([]pmodel.CIStr, 0, len(tableSlice))
for i, tbl := range tableSlice {
dbInfo, ok := is.SchemaByID(tbl.DBID)
intest.Assert(ok)
if !ok {
logutil.BgLogger().Warn("schema not found for table info",
zap.Int64("tableID", tbl.ID), zap.Int64("dbID", tbl.DBID))
continue
}
if unspecified { // all schemas should be included.
schemaSlice = append(schemaSlice, dbInfo.Name)
continue
}

found := false
for _, s := range schemas {
isTbl, err := is.TableByName(ctx, s, tbl.Name)
if err != nil {
if terror.ErrorEqual(err, infoschema.ErrTableNotExists) {
continue
}
return nil, nil, errors.Trace(err)
}
if isTbl.Meta().ID == tbl.ID {
if s.L == dbInfo.Name.L {
schemaSlice = append(schemaSlice, s)
found = true
break
Expand Down