Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ rand = "0.8"
maplit = "1"
sqlparser = { version = "0.56.0" }
humantime = { version = "2.1.0" }
validator = { version = "0.19", features = ["derive"] }

[dev-dependencies]
criterion = "0.5"
Expand Down
7 changes: 7 additions & 0 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use self::filesystem_check::FileSystemCheckBuilder;
use self::optimize::OptimizeBuilder;
use self::restore::RestoreBuilder;
use self::set_tbl_properties::SetTablePropertiesBuilder;
use self::update_table_metadata::UpdateTableMetadataBuilder;
use self::vacuum::VacuumBuilder;
#[cfg(feature = "datafusion")]
use self::{
Expand All @@ -46,6 +47,7 @@ pub mod drop_constraints;
pub mod filesystem_check;
pub mod restore;
pub mod update_field_metadata;
pub mod update_table_metadata;
pub mod vacuum;

#[cfg(feature = "datafusion")]
Expand Down Expand Up @@ -303,6 +305,11 @@ impl DeltaOps {
pub fn update_field_metadata(self) -> UpdateFieldMetadataBuilder {
UpdateFieldMetadataBuilder::new(self.0.log_store, self.0.state.unwrap())
}

/// Update table metadata
pub fn update_table_metadata(self) -> UpdateTableMetadataBuilder {
UpdateTableMetadataBuilder::new(self.0.log_store, self.0.state.unwrap())
}
}

impl From<DeltaTable> for DeltaOps {
Expand Down
153 changes: 153 additions & 0 deletions crates/core/src/operations/update_table_metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
//! Update table metadata operation

use std::sync::Arc;

use futures::future::BoxFuture;
use validator::Validate;

use super::{CustomExecuteHandler, Operation};
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
use crate::kernel::Action;
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
use crate::table::state::DeltaTableState;
use crate::DeltaTable;
use crate::{DeltaResult, DeltaTableError};

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Validate)]
#[validate(schema(
function = "validate_at_least_one_field",
message = "No metadata update specified"
))]
pub struct TableMetadataUpdate {
#[validate(length(
min = 1,
max = 255,
message = "Table name cannot be empty and cannot exceed 255 characters"
))]
pub name: Option<String>,

#[validate(length(
max = 4000,
message = "Table description cannot exceed 4000 characters"
))]
pub description: Option<String>,
}

fn validate_at_least_one_field(
update: &TableMetadataUpdate,
) -> Result<(), validator::ValidationError> {
if update.name.is_none() && update.description.is_none() {
return Err(validator::ValidationError::new("no_fields_specified"));
}
Ok(())
}

/// Update table metadata operation
pub struct UpdateTableMetadataBuilder {
/// A snapshot of the table's state
snapshot: DeltaTableState,
/// The metadata update to apply
update: Option<TableMetadataUpdate>,
/// Delta object store for handling data files
log_store: LogStoreRef,
/// Additional information to add to the commit
commit_properties: CommitProperties,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
}

impl super::Operation<()> for UpdateTableMetadataBuilder {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}

impl UpdateTableMetadataBuilder {
/// Create a new builder
pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self {
Self {
update: None,
snapshot,
log_store,
commit_properties: CommitProperties::default(),
custom_execute_handler: None,
}
}

/// Specify the complete metadata update
pub fn with_update(mut self, update: TableMetadataUpdate) -> Self {
self.update = Some(update);
self
}

/// Additional metadata to be added to commit info
pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
self.commit_properties = commit_properties;
self
}

/// Set a custom execute handler, for pre and post execution
pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
self.custom_execute_handler = Some(handler);
self
}
}

impl std::future::IntoFuture for UpdateTableMetadataBuilder {
type Output = DeltaResult<DeltaTable>;

type IntoFuture = BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
let this = self;

Box::pin(async move {
let operation_id = this.get_operation_id();
this.pre_execute(operation_id).await?;

let update = this.update.ok_or_else(|| {
DeltaTableError::MetadataError("No metadata update specified".to_string())
})?;
update
.validate()
.map_err(|e| DeltaTableError::MetadataError(format!("{}", e)))?;

let mut metadata = this.snapshot.metadata().clone();

if let Some(name) = &update.name {
metadata.name = Some(name.clone());
}
if let Some(description) = &update.description {
metadata.description = Some(description.clone());
}

let operation = DeltaOperation::UpdateTableMetadata {
metadata_update: update,
};

let actions = vec![Action::Metadata(metadata)];

let commit = CommitBuilder::from(this.commit_properties.clone())
.with_actions(actions)
.with_operation_id(operation_id)
.with_post_commit_hook_handler(this.custom_execute_handler.clone())
.build(
Some(&this.snapshot),
this.log_store.clone(),
operation.clone(),
)
.await?;

if let Some(handler) = this.custom_execute_handler {
handler.post_execute(&this.log_store, operation_id).await?;
}
Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
))
})
}
}
8 changes: 8 additions & 0 deletions crates/core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,12 @@ pub enum DeltaOperation {
/// Fields added to existing schema
fields: Vec<StructField>,
},
/// Update table metadata operations
#[serde(rename_all = "camelCase")]
UpdateTableMetadata {
/// The metadata update to apply
metadata_update: crate::operations::update_table_metadata::TableMetadataUpdate,
},
}

impl DeltaOperation {
Expand Down Expand Up @@ -392,6 +398,7 @@ impl DeltaOperation {
DeltaOperation::DropConstraint { .. } => "DROP CONSTRAINT",
DeltaOperation::AddFeature { .. } => "ADD FEATURE",
DeltaOperation::UpdateFieldMetadata { .. } => "UPDATE FIELD METADATA",
DeltaOperation::UpdateTableMetadata { .. } => "UPDATE TABLE METADATA",
}
}

Expand Down Expand Up @@ -427,6 +434,7 @@ impl DeltaOperation {
match self {
Self::Optimize { .. }
| Self::UpdateFieldMetadata { .. }
| Self::UpdateTableMetadata { .. }
| Self::SetTableProperties { .. }
| Self::AddColumn { .. }
| Self::AddFeature { .. }
Expand Down
Loading
Loading