Skip to content
96 changes: 86 additions & 10 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -4350,6 +4350,7 @@ func buildCheckSQLConditionForRangeExprPartition(pi *model.PartitionInfo, index
// Since the pi.Expr string may contain the identifier, which couldn't be escaped in our ParseWithParams(...)
// So we write it to the origin sql string here.
if index == 0 {
// TODO: Handle MAXVALUE in first partition
buf.WriteString(pi.Expr)
buf.WriteString(" >= %?")
paramList = append(paramList, driver.UnwrapFromSingleQuotes(pi.Definitions[index].LessThan[0]))
Expand All @@ -4372,21 +4373,91 @@ func buildCheckSQLConditionForRangeExprPartition(pi *model.PartitionInfo, index
}

func buildCheckSQLConditionForRangeColumnsPartition(pi *model.PartitionInfo, index int) (string, []any) {
paramList := make([]any, 0, 2)
colName := pi.Columns[0].L
if index == 0 {
paramList = append(paramList, colName, driver.UnwrapFromSingleQuotes(pi.Definitions[index].LessThan[0]))
return "%n >= %?", paramList
} else if index == len(pi.Definitions)-1 && strings.EqualFold(pi.Definitions[index].LessThan[0], partitionMaxValue) {
paramList = append(paramList, colName, driver.UnwrapFromSingleQuotes(pi.Definitions[index-1].LessThan[0]))
return "%n < %?", paramList
var buf strings.Builder
paramList := make([]any, 0, len(pi.Columns)*2)

hasLowerBound := index > 0
needOR := false

// Lower bound check (for all partitions except first)
if hasLowerBound {
currVals := pi.Definitions[index-1].LessThan
for i := 0; i < len(pi.Columns); i++ {
nextIsMax := false
if i < (len(pi.Columns)-1) && strings.EqualFold(currVals[i+1], partitionMaxValue) {
nextIsMax = true
}
if needOR {
buf.WriteString(" OR ")
}
if i > 0 {
buf.WriteString("(")
// All previous columns must be equal and non-NULL
for j := 0; j < i; j++ {
if j > 0 {
buf.WriteString(" AND ")
}
buf.WriteString("(%n = %?)")
paramList = append(paramList, pi.Columns[j].L, driver.UnwrapFromSingleQuotes(currVals[j]))
}
buf.WriteString(" AND ")
}
paramList = append(paramList, pi.Columns[i].L, driver.UnwrapFromSingleQuotes(currVals[i]), pi.Columns[i].L)
if nextIsMax {
buf.WriteString("(%n <= %? OR %n IS NULL)")
} else {
buf.WriteString("(%n < %? OR %n IS NULL)")
}
if i > 0 {
buf.WriteString(")")
}
needOR = true
if nextIsMax {
break
}
}
}
paramList = append(paramList, colName, driver.UnwrapFromSingleQuotes(pi.Definitions[index-1].LessThan[0]), colName, driver.UnwrapFromSingleQuotes(pi.Definitions[index].LessThan[0]))
return "%n < %? or %n >= %?", paramList

currVals := pi.Definitions[index].LessThan
// Upper bound check (for all partitions)
for i := 0; i < len(pi.Columns); i++ {
if strings.EqualFold(currVals[i], partitionMaxValue) {
break
}
if needOR {
buf.WriteString(" OR ")
}
if i > 0 {
buf.WriteString("(")
// All previous columns must be equal
for j := 0; j < i; j++ {
if j > 0 {
buf.WriteString(" AND ")
}
paramList = append(paramList, pi.Columns[j].L, driver.UnwrapFromSingleQuotes(currVals[j]))
buf.WriteString("(%n = %?)")
}
buf.WriteString(" AND ")
}
isLast := i == len(pi.Columns)-1
if isLast {
buf.WriteString("(%n >= %?)")
} else {
buf.WriteString("(%n > %?)")
}
paramList = append(paramList, pi.Columns[i].L, driver.UnwrapFromSingleQuotes(currVals[i]))
if i > 0 {
buf.WriteString(")")
}
needOR = true
}

return buf.String(), paramList
}

func buildCheckSQLConditionForListPartition(pi *model.PartitionInfo, index int) string {
var buf strings.Builder
// TODO: Handle DEFAULT partition
buf.WriteString("not (")
for i, inValue := range pi.Definitions[index].InValues {
if i != 0 {
Expand All @@ -4408,14 +4479,19 @@ func buildCheckSQLConditionForListPartition(pi *model.PartitionInfo, index int)

func buildCheckSQLConditionForListColumnsPartition(pi *model.PartitionInfo, index int) string {
var buf strings.Builder
// TODO: Verify if this is correct!!!
// TODO: Handle DEFAULT partition!
// TODO: use paramList with column names, instead of quoting.
// How to find a match?
// (row <=> vals1) OR (row <=> vals2)
// How to find a non-matching row:
// NOT ( (row <=> vals1) OR (row <=> vals2) ... )
buf.WriteString("not (")
colNames := make([]string, 0, len(pi.Columns))
for i := range pi.Columns {
// TODO: Add test for this!
// TODO: check if there are no proper quoting function for this?
// TODO: Maybe Sprintf("%#q", str) ?
n := "`" + strings.ReplaceAll(pi.Columns[i].O, "`", "``") + "`"
colNames = append(colNames, n)
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/ddl/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,14 @@ func TestUpdateDuringAddColumn(t *testing.T) {

tk.MustQuery("select * from t1").Sort().Check(testkit.Rows("8 1 9", "8 2 9"))
}

func TestExchangePartitionMultiColumn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("CREATE TABLE t (a1 int(11) not null,a2 int(11) not null,a3 date default null, primary key (`a1`,`a2`)) partition by range columns(`a1`,`a2`)(partition `p10` values less than (10,10),partition `p20` values less than (20,20),partition `pmax` values less than (maxvalue,maxvalue))")
tk.MustExec(`insert into t values(5,10,null),(10,4,null)`)
tk.MustExec("CREATE TABLE t_np (a1 int(11) not null,a2 int(11) not null,a3 date default null, primary key (`a1`,`a2`))")
tk.MustExec(`insert into t_np values(10,4,null),(4,10,null)`)
tk.MustExec(`alter table t exchange partition p10 with table t_np`)
}
1 change: 1 addition & 0 deletions pkg/ddl/tests/partition/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_test(
srcs = [
"db_partition_test.go",
"error_injection_test.go",
"exchange_partition_test.go",
"main_test.go",
"multi_domain_test.go",
"placement_test.go",
Expand Down
143 changes: 143 additions & 0 deletions pkg/ddl/tests/partition/exchange_partition_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package partition

import (
"fmt"
"strings"
"testing"

"github.com/pingcap/tidb/pkg/testkit"
)

func TestExchangeRangeColumnsPartition(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_enable_exchange_partition=1")

// Create a table partitioned by range columns with multiple column types
tk.MustExec(`CREATE TABLE t1 (
id INT NOT NULL,
age INT,
name VARCHAR(50)
) PARTITION BY RANGE COLUMNS(age, name) (
PARTITION p0 VALUES LESS THAN (20, 'm'),
PARTITION p1 VALUES LESS THAN (30, 'm'),
PARTITION p2 VALUES LESS THAN (30, MAXVALUE),
PARTITION p3 VALUES LESS THAN (40, 'm'),
PARTITION p4 VALUES LESS THAN (MAXVALUE, MAXVALUE)
)`)

// Define test values for each column type
ageValues := []any{
nil, // NULL
-2147483648, // min int
2147483647, // max int
0,
19, // boundary-1
20, // boundary 1
29, // boundary-1
30, // boundary 2
39, // boundary-1
40, // boundary 3
}

nameValues := []any{
nil, // NULL
"", // empty string
"l", // boundary-1
"m", // boundary
"n", // boundary+1
}

// Generate all combinations
id := 0
addComma := false
query := "INSERT INTO t1 VALUES "
for _, age := range ageValues {
for _, name := range nameValues {
id++
// if id != 26 {
// continue
// }
if addComma {
query += ","
}
if age == nil && name == nil {
query += fmt.Sprintf("(%d, NULL, NULL)", id)
} else if age == nil {
query += fmt.Sprintf("(%d, NULL, %q)", id, name)
} else if name == nil {
query += fmt.Sprintf("(%d, %d, NULL)", id, age)
} else {
query += fmt.Sprintf("(%d, %d, %q)", id, age, name)
}
addComma = true
}
}
tk.MustExec(query)

// Save initial counts per partition
initialResults := make(map[string]*testkit.Result)
for _, p := range []string{"p0", "p1", "p2", "p3", "p4"} {
result := tk.MustQuery(fmt.Sprintf("SELECT * FROM t1 PARTITION(%s)", p)).Sort()
initialResults[p] = result
}

// Create empty exchange table
tk.MustExec(`CREATE TABLE t2 (
id INT NOT NULL,
age INT,
name VARCHAR(50)
)`)
// Test each partition
partitionNames := []string{"p0", "p1", "p2", "p3", "p4"}
for i, p := range partitionNames {
// Exchange partition out
tk.MustExec(fmt.Sprintf("ALTER TABLE t1 EXCHANGE PARTITION %s WITH TABLE t2", p))

// Verify partition is now empty
tk.MustQuery(fmt.Sprintf("SELECT COUNT(*) FROM t1 PARTITION(%s)", p)).Check(testkit.Rows("0"))

// Verify all rows moved to t2
tk.MustQuery("SELECT * FROM t2").Sort().Check(initialResults[p].Rows())

// Exchange partition back
tk.MustExec(fmt.Sprintf("ALTER TABLE t1 EXCHANGE PARTITION %s WITH TABLE t2", p))

// Verify results are back to initial state
tk.MustQuery(fmt.Sprintf("SELECT * FROM t1 PARTITION(%s)", p)).Sort().Check(initialResults[p].Rows())

// Check that no non-matching rows will be allowed to be exchanged
otherPartitions := strings.Join(append(append([]string{}, partitionNames[:i]...), partitionNames[i+1:]...), ",")
for j := 1; j <= id; j++ {
res := tk.MustQuery(fmt.Sprintf("select * from t1 partition (%s) where id = %d", p, j))
if len(res.Rows()) > 0 {
// Skip rows from current partition, since already tested above.
continue
}
tk.MustExec(fmt.Sprintf("insert into t2 select * from t1 partition (%s) where id = %d", otherPartitions, j))
tk.MustContainErrMsg(fmt.Sprintf("ALTER TABLE t1 EXCHANGE PARTITION %s WITH TABLE t2 /* j = %d */", p, j), "[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`truncate table t2`)
}
}
// Cleanup exchange table
tk.MustExec("DROP TABLE t2")

// Clean up
tk.MustExec("DROP TABLE t1")
}
12 changes: 6 additions & 6 deletions pkg/ddl/tests/partition/multi_domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,8 @@ func TestMultiSchemaPartitionByGlobalIndex(t *testing.T) {
// Global Index, to replace the existing one.
tkO.MustContainErrMsg(`insert into t values (1,2,3)`, "[kv:1062]Duplicate entry '2' for key 't.idx_b")
tkNO.MustContainErrMsg(`insert into t values (1,2,3)`, "[kv:1062]Duplicate entry '2' for key 't.idx_b")
tkO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101' for key 't.idx_b")
tkNO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101' for key 't.idx_b")
tkO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101")
tkNO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101")
tkNO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1 1", "101 101 101", "102 102 102", "2 2 2"))
tkNO.MustQuery(`select * from t where a < 1000`).Sort().Check(testkit.Rows("1 1 1", "101 101 101", "102 102 102", "2 2 2"))
tkNO.MustQuery(`select * from t where a > 0`).Sort().Check(testkit.Rows("1 1 1", "101 101 101", "102 102 102", "2 2 2"))
Expand All @@ -474,10 +474,10 @@ func TestMultiSchemaPartitionByGlobalIndex(t *testing.T) {
// Both tkO and tkNO uses the original table/partitions,
// but tkO should also update the newly created
// Global Index, and tkNO should only delete from it.
tkO.MustContainErrMsg(`insert into t values (1,1,1)`, "[kv:1062]Duplicate entry '1' for key 't.idx_b")
tkNO.MustContainErrMsg(`insert into t values (1,1,1)`, "[kv:1062]Duplicate entry '1' for key 't.idx_b")
tkO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101' for key 't.idx_b")
tkNO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101' for key 't.idx_b")
tkO.MustContainErrMsg(`insert into t values (1,1,1)`, "[kv:1062]Duplicate entry '1")
tkNO.MustContainErrMsg(`insert into t values (1,1,1)`, "[kv:1062]Duplicate entry '1")
tkO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101")
tkNO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101")
tkNO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1 1", "101 101 101", "102 102 102", "2 2 2", "3 3 3", "4 4 4"))
tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1 1", "101 101 101", "102 102 102", "2 2 2", "3 3 3", "4 4 4"))
logutil.BgLogger().Info("insert into t values (5,5,5)")
Expand Down