-
Notifications
You must be signed in to change notification settings - Fork 684
feat: support partition by for iceberg table engine #21594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support partition by for iceberg table engine #21594
Conversation
…on_by_for_iceberg_table_engine
Hi, there. 📝 Telemetry Reminder:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enhances the Iceberg table engine support by allowing users to specify partition-by columns and ensuring that these columns are prefixes of the primary key. Key changes include:
- Parsing and validating the partition_by option using a regex to support various formats (e.g., column, transform(column), transform(n,column)).
- Dropping the partially created table when either sink or source creation fails to handle DDL atomicity limitations.
- Adding end-to-end tests for various partition_by scenarios.
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
File | Description |
---|---|
src/frontend/src/handler/create_table.rs | Implements partition_by option parsing, validation (including prefix check on primary key), and error handling via table drop on failure. |
e2e_test/iceberg/test_case/pure_slt/iceberg_engine.slt | Updates and extends tests to validate partition_by behavior and error cases. |
Comments suppressed due to low confidence (1)
src/frontend/src/handler/create_table.rs:1724
- [nitpick] The regex pattern used for parsing partition fields is quite complex; consider adding inline comments or refactoring it for improved clarity and maintainability.
let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
if let Some(partition_by) = &partition_by { | ||
// captures column, transform(column), transform(n,column), transform(n, column) | ||
let re = | ||
Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap(); | ||
if !re.is_match(partition_by) { | ||
bail!(format!( | ||
"Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)", | ||
partition_by | ||
)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These code are copy-pasted from sink? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure whether we need keep both of them. If needed, maybe better to have a common fn(String) -> Vec<(String, Transform)>
instead
statement ok | ||
create table t_partition2(c1 int, c2 int, c3 int, primary key(c1, c2, c3)) with(commit_checkpoint_interval = 1, partition_by='c1,c2') engine = iceberg; | ||
|
||
statement ok | ||
create table t_partition3(c1 int, c2 int, c3 int, primary key(c1, c2, c3)) with(commit_checkpoint_interval = 1, partition_by='bucket(4, c1),c2') engine = iceberg; | ||
|
||
statement ok | ||
create table t_partition4(c1 int, c2 int, c3 int, primary key(c1, c2, c3)) with(commit_checkpoint_interval = 1, partition_by='c1,truncate(8, c2)') engine = iceberg; | ||
|
||
# the partition key should be the prefix of the primary key | ||
statement error | ||
create table t_partition5(c1 int, c2 int, c3 int, primary key(c1, c2, c3)) with(commit_checkpoint_interval = 1, partition_by='c2,c3') engine = iceberg; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally validating partitions here. But maybe after support sth like https://iceberg.apache.org/docs/nightly/spark-queries/#partitions (apache/iceberg-rust#823)
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
partition_by
options as how it used by the iceberg sink, so users can define their partition by columns.Checklist
Documentation
Release note