Skip to content

Commit cec7ab8

Browse files
authored
Fix read from range column partition table error (#2639) (#2658)
1 parent 88a12aa commit cec7ab8

File tree

4 files changed

+101
-3
lines changed

4 files changed

+101
-3
lines changed

core/src/main/scala/org/apache/spark/sql/execution/CoprocessorRDD.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,12 @@ trait LeafColumnarExecRDD extends LeafExecNode {
5757
}
5858
b.append("]")
5959
b.toString
60-
} else {
60+
} else if (tiRDDs.lengthCompare(1) == 0) {
6161
s"${dagRequest.getStoreType.name()} $nodeName{$dagRequest}" +
6262
s"${TiUtil.getReqEstCountStr(dagRequest)}"
63+
} else {
64+
// return empty TiRDD when there is no tiRDDs
65+
"Empty TiRDD"
6366
}
6467

6568
def dagRequest: TiDAGRequest = tiRDDs.head.dagRequest

core/src/test/scala/org/apache/spark/sql/PartitionTableSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,18 @@ class PartitionTableSuite extends BasePlanTest {
3333
super.afterAll()
3434
}
3535

36+
test("reading from range column partition") {
37+
tidbStmt.execute("drop table if exists range_column_test")
38+
tidbStmt.execute(
39+
"create table range_column_test (id varchar(10)) partition by RANGE COLUMNS(`id`) (PARTITION `p1` VALUES LESS THAN ('''CN001'''),PARTITION `p2` VALUES LESS THAN ('CN002'))")
40+
tidbStmt.execute("insert into `range_column_test` values('CN001')")
41+
tidbStmt.execute("insert into `range_column_test` values('''CN001''')")
42+
43+
judge("select * from range_column_test where id = 'CN001'")
44+
judge("select * from range_column_test where id = '\\'CN001\\''")
45+
judge("select * from range_column_test where id = 'CN002'")
46+
}
47+
3648
test("reading from hash partition") {
3749
enablePartitionForTiDB()
3850
tidbStmt.execute("drop table if exists t")

docs/features/partition_table.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# TiSpark partition table
2+
3+
## Read from partition table
4+
5+
TiSpark supports reads the range, hash and list partition table from TiDB.
6+
7+
TiSpark doesn't support a MySQL/TiDB partition table syntax `select col_name from table_name partition(partition_name)`, but you can still use `where` condition to filter the partitions.
8+
9+
## Partition pruning in Reading
10+
11+
TiSpark decides whether to apply partition pruning according to the partition type and the partition expression associated with the table. If partition pruning is not applied, TiSpark's reading is equivalent to doing a table scan over all partitions.
12+
13+
TiSpark only supports partition pruning with the following partition expression in **range** partition:
14+
15+
+ column expression
16+
+ `YEAR(col)` and its type is datetime/string/date literal that can be parsed as datetime.
17+
+ `TO_DAYS(col)` and its type is datetime/string/date literal that can be parsed as datetime.
18+
19+
### Limitations
20+
21+
- TiSpark does not support partition pruning in hash and list partition.
22+
- TiSpark can not apply partition pruning for some special characters in partition definition. For example, Partition definition with `""` can not be pruned. such as `partition p0 values less than ('"string"')`.
23+
24+
## Write into partition table
25+
26+
Currently, TiSpark only supports writing into the range and hash partition table under the following conditions:
27+
+ the partition expression is column expression
28+
+ the partition expression is `YEAR($argument)` where the argument is a column and its type is datetime or string literal
29+
that can be parsed as datetime.
30+
31+
There are two ways to write into partition table:
32+
1. Use datasource API to write into partition table which supports replace and append semantics.
33+
2. Use delete statement with Spark SQL.
34+
35+
> [!NOTE]
36+
> Because different character sets and collations have different sort orders, the character sets and
37+
> collations in use may affect which partition of a table partitioned by RANGE COLUMNS a given row
38+
> is stored in when using string columns as partitioning columns.
39+
> For supported character sets and collations, see [Limitations](../README.md#limitations)

tikv-client/src/main/java/com/pingcap/tikv/expression/PartitionPruner.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,13 @@ public static List<TiPartitionDef> prune(TiTableInfo tableInfo, List<Expression>
7070
return tableInfo.getPartitionInfo().getDefs();
7171
}
7272

73+
// prune can not handle \" now.
74+
for (int i = 0; i < tableInfo.getPartitionInfo().getDefs().size(); i++) {
75+
TiPartitionDef pDef = tableInfo.getPartitionInfo().getDefs().get(i);
76+
if (pDef.getLessThan().get(0).contains("\"")) {
77+
return tableInfo.getPartitionInfo().getDefs();
78+
}
79+
}
7380
RangeColumnPartitionPruner pruner = new RangeColumnPartitionPruner(tableInfo);
7481
return pruner.prune(filters);
7582
}
@@ -104,7 +111,7 @@ public static void generateRangeExprs(
104111
// partExprColRefs.addAll(PredicateUtils.extractColumnRefFromExpression(partExpr));
105112
for (int i = 0; i < partInfo.getDefs().size(); i++) {
106113
TiPartitionDef pDef = partInfo.getDefs().get(i);
107-
String current = pDef.getLessThan().get(lessThanIdx);
114+
String current = wrapValue(pDef.getLessThan().get(lessThanIdx));
108115
String leftHand;
109116
if (current.equals("MAXVALUE")) {
110117
leftHand = "true";
@@ -114,7 +121,7 @@ public static void generateRangeExprs(
114121
if (i == 0) {
115122
partExprs.add(parser.parseExpression(leftHand));
116123
} else {
117-
String previous = partInfo.getDefs().get(i - 1).getLessThan().get(lessThanIdx);
124+
String previous = wrapValue(partInfo.getDefs().get(i - 1).getLessThan().get(lessThanIdx));
118125
String and =
119126
String.format("%s >= %s and %s", wrapColumnName(partExprStr), previous, leftHand);
120127
partExprs.add(parser.parseExpression(and));
@@ -132,4 +139,41 @@ private static String wrapColumnName(String columnName) {
132139
return String.format("`%s`", columnName);
133140
}
134141
}
142+
143+
/**
144+
* Spark SQL will parse string literal without escape, So we need to parse partition definition
145+
* without escape too.
146+
*
147+
* <p>wrapValue will replace the first '' to "", so that antlr will not regard the first '' as a
148+
* part of string literal.
149+
*
150+
* <p>wrapValue will also delete the escape character in string literal.
151+
*
152+
* <p>e.g. 'string' -> "string" '''string''' -> "'string'" 'string''' -> "string'"
153+
*
154+
* <p>Can't handle '""'. e.g. '"string"' -> ""string"". parseExpression will parse ""string"" to
155+
* empty string, parse '"string"' to 'string'
156+
*
157+
* @param value
158+
* @return
159+
*/
160+
private static String wrapValue(String value) {
161+
if (value.startsWith("'") && value.endsWith("'")) {
162+
String newValue = String.format("\"%s\"", value.substring(1, value.length() - 1));
163+
StringBuilder valueWithoutEscape = new StringBuilder();
164+
for (int i = 0; i < newValue.length(); i++) {
165+
if (newValue.charAt(i) != '\'') {
166+
valueWithoutEscape.append(newValue.charAt(i));
167+
} else {
168+
if (i + 1 < newValue.length()) {
169+
valueWithoutEscape.append(newValue.charAt(i + 1));
170+
}
171+
i++;
172+
}
173+
}
174+
return valueWithoutEscape.toString();
175+
} else {
176+
return value;
177+
}
178+
}
135179
}

0 commit comments

Comments
 (0)