Skip to content

Commit d8d11a4

Browse files
chenzl25Li0k
authored andcommitted
feat(iceberg): support dql for nimtable (#18408)
1 parent 823908d commit d8d11a4

File tree

3 files changed

+72
-8
lines changed

3 files changed

+72
-8
lines changed

src/frontend/src/binder/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,6 @@ impl Binder {
363363
matches!(self.bind_for, BindFor::Stream)
364364
}
365365

366-
#[expect(dead_code)]
367366
fn is_for_batch(&self) -> bool {
368367
matches!(self.bind_for, BindFor::Batch)
369368
}

src/frontend/src/binder/relation/table_or_source.rs

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ use std::sync::Arc;
1717
use either::Either;
1818
use itertools::Itertools;
1919
use risingwave_common::bail_not_implemented;
20-
use risingwave_common::catalog::{debug_assert_column_ids_distinct, is_system_schema, Field};
20+
use risingwave_common::catalog::{
21+
debug_assert_column_ids_distinct, is_system_schema, Engine, Field,
22+
};
2123
use risingwave_common::session_config::USER_NAME_WILD_CARD;
2224
use risingwave_connector::WithPropertiesExt;
2325
use risingwave_sqlparser::ast::{AsOf, Statement, TableAlias};
@@ -129,7 +131,31 @@ impl Binder {
129131
.catalog
130132
.get_created_table_by_name(&self.db_name, schema_path, table_name)
131133
{
132-
self.resolve_table_relation(table_catalog.clone(), schema_name, as_of)?
134+
match table_catalog.engine() {
135+
Engine::Iceberg => {
136+
if self.is_for_batch()
137+
&& let Ok((source_catalog, _)) =
138+
self.catalog.get_source_by_name(
139+
&self.db_name,
140+
schema_path,
141+
&table_catalog.iceberg_source_name().unwrap(),
142+
)
143+
{
144+
self.resolve_source_relation(&source_catalog.clone(), as_of)
145+
} else {
146+
self.resolve_table_relation(
147+
table_catalog.clone(),
148+
schema_name,
149+
as_of,
150+
)?
151+
}
152+
}
153+
Engine::Hummock => self.resolve_table_relation(
154+
table_catalog.clone(),
155+
schema_name,
156+
as_of,
157+
)?,
158+
}
133159
} else if let Ok((source_catalog, _)) =
134160
self.catalog
135161
.get_source_by_name(&self.db_name, schema_path, table_name)
@@ -177,11 +203,36 @@ impl Binder {
177203
} else if let Some(table_catalog) =
178204
schema.get_created_table_by_name(table_name)
179205
{
180-
return self.resolve_table_relation(
181-
table_catalog.clone(),
182-
&schema_name.clone(),
183-
as_of,
184-
);
206+
match table_catalog.engine {
207+
Engine::Iceberg => {
208+
if self.is_for_batch()
209+
&& let Some(source_catalog) = schema
210+
.get_source_by_name(
211+
&table_catalog
212+
.iceberg_source_name()
213+
.unwrap(),
214+
)
215+
{
216+
return Ok(self.resolve_source_relation(
217+
&source_catalog.clone(),
218+
as_of,
219+
));
220+
} else {
221+
return self.resolve_table_relation(
222+
table_catalog.clone(),
223+
&schema_name.clone(),
224+
as_of,
225+
);
226+
}
227+
}
228+
Engine::Hummock => {
229+
return self.resolve_table_relation(
230+
table_catalog.clone(),
231+
&schema_name.clone(),
232+
as_of,
233+
);
234+
}
235+
}
185236
} else if let Some(source_catalog) =
186237
schema.get_source_by_name(table_name)
187238
{

src/frontend/src/catalog/table_catalog.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,20 @@ impl TableCatalog {
296296
self.engine
297297
}
298298

299+
pub fn iceberg_source_name(&self) -> Option<String> {
300+
match self.engine {
301+
Engine::Iceberg => Some(format!("{}{}", ICEBERG_SOURCE_PREFIX, self.name)),
302+
Engine::Hummock => None,
303+
}
304+
}
305+
306+
pub fn iceberg_sink_name(&self) -> Option<String> {
307+
match self.engine {
308+
Engine::Iceberg => Some(format!("{}{}", ICEBERG_SINK_PREFIX, self.name)),
309+
Engine::Hummock => None,
310+
}
311+
}
312+
299313
pub fn is_table(&self) -> bool {
300314
self.table_type == TableType::Table
301315
}

0 commit comments

Comments
 (0)