|
| 1 | +//! Update table metadata operation |
| 2 | +
|
| 3 | +use std::sync::Arc; |
| 4 | + |
| 5 | +use futures::future::BoxFuture; |
| 6 | + |
| 7 | +use super::{CustomExecuteHandler, Operation}; |
| 8 | +use crate::kernel::transaction::{CommitBuilder, CommitProperties}; |
| 9 | +use crate::kernel::Action; |
| 10 | +use crate::logstore::LogStoreRef; |
| 11 | +use crate::protocol::DeltaOperation; |
| 12 | +use crate::table::state::DeltaTableState; |
| 13 | +use crate::DeltaTable; |
| 14 | +use crate::{DeltaResult, DeltaTableError}; |
| 15 | + |
| 16 | +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] |
| 17 | +pub enum TableMetadataUpdate { |
| 18 | + /// Update the table name |
| 19 | + TableName(String), |
| 20 | + /// Update the table description |
| 21 | + TableDescription(String), |
| 22 | +} |
| 23 | + |
| 24 | +impl TableMetadataUpdate { |
| 25 | + /// Create a validated table name update |
| 26 | + pub fn table_name(name: impl Into<String>) -> DeltaResult<Self> { |
| 27 | + let name = name.into(); |
| 28 | + if name.len() > 255 { |
| 29 | + return Err(DeltaTableError::MetadataError(format!( |
| 30 | + "Table name cannot exceed 255 characters. Provided name has {} characters.", |
| 31 | + name.len() |
| 32 | + ))); |
| 33 | + } |
| 34 | + Ok(TableMetadataUpdate::TableName(name)) |
| 35 | + } |
| 36 | + |
| 37 | + /// Create a validated table description update |
| 38 | + pub fn table_description(description: impl Into<String>) -> DeltaResult<Self> { |
| 39 | + let description = description.into(); |
| 40 | + let max_description_length = 4000; |
| 41 | + if description.len() > max_description_length { |
| 42 | + return Err(DeltaTableError::MetadataError(format!( |
| 43 | + "Table description cannot exceed {} characters. Provided description has {} characters.", |
| 44 | + max_description_length, |
| 45 | + description.len() |
| 46 | + ))); |
| 47 | + } |
| 48 | + Ok(TableMetadataUpdate::TableDescription(description)) |
| 49 | + } |
| 50 | +} |
| 51 | + |
| 52 | +/// Update table metadata operation |
| 53 | +pub struct UpdateTableMetadataBuilder { |
| 54 | + /// A snapshot of the table's state |
| 55 | + snapshot: DeltaTableState, |
| 56 | + /// The metadata update to apply |
| 57 | + update: Option<TableMetadataUpdate>, |
| 58 | + /// Delta object store for handling data files |
| 59 | + log_store: LogStoreRef, |
| 60 | + /// Additional information to add to the commit |
| 61 | + commit_properties: CommitProperties, |
| 62 | + custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>, |
| 63 | +} |
| 64 | + |
| 65 | +impl super::Operation<()> for UpdateTableMetadataBuilder { |
| 66 | + fn log_store(&self) -> &LogStoreRef { |
| 67 | + &self.log_store |
| 68 | + } |
| 69 | + fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> { |
| 70 | + self.custom_execute_handler.clone() |
| 71 | + } |
| 72 | +} |
| 73 | + |
| 74 | +impl UpdateTableMetadataBuilder { |
| 75 | + /// Create a new builder |
| 76 | + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { |
| 77 | + Self { |
| 78 | + update: None, |
| 79 | + snapshot, |
| 80 | + log_store, |
| 81 | + commit_properties: CommitProperties::default(), |
| 82 | + custom_execute_handler: None, |
| 83 | + } |
| 84 | + } |
| 85 | + |
| 86 | + pub fn with_update(mut self, update: TableMetadataUpdate) -> Self { |
| 87 | + self.update = Some(update); |
| 88 | + self |
| 89 | + } |
| 90 | + |
| 91 | + pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self { |
| 92 | + self.commit_properties = commit_properties; |
| 93 | + self |
| 94 | + } |
| 95 | + |
| 96 | + pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self { |
| 97 | + self.custom_execute_handler = Some(handler); |
| 98 | + self |
| 99 | + } |
| 100 | +} |
| 101 | + |
| 102 | +impl std::future::IntoFuture for UpdateTableMetadataBuilder { |
| 103 | + type Output = DeltaResult<DeltaTable>; |
| 104 | + |
| 105 | + type IntoFuture = BoxFuture<'static, Self::Output>; |
| 106 | + |
| 107 | + fn into_future(self) -> Self::IntoFuture { |
| 108 | + let this = self; |
| 109 | + |
| 110 | + Box::pin(async move { |
| 111 | + let operation_id = this.get_operation_id(); |
| 112 | + this.pre_execute(operation_id).await?; |
| 113 | + |
| 114 | + let update = this.update.ok_or_else(|| { |
| 115 | + DeltaTableError::MetadataError("No metadata update specified".to_string()) |
| 116 | + })?; |
| 117 | + |
| 118 | + let mut metadata = this.snapshot.metadata().clone(); |
| 119 | + |
| 120 | + match update.clone() { |
| 121 | + TableMetadataUpdate::TableName(name) => { |
| 122 | + if name.len() > 255 { |
| 123 | + return Err(DeltaTableError::MetadataError(format!( |
| 124 | + "Table name cannot exceed 255 characters. Provided name has {} characters.", |
| 125 | + name.len() |
| 126 | + ))); |
| 127 | + } |
| 128 | + metadata.name = Some(name.clone()); |
| 129 | + } |
| 130 | + TableMetadataUpdate::TableDescription(description) => { |
| 131 | + if description.len() > 4000 { |
| 132 | + return Err(DeltaTableError::MetadataError(format!( |
| 133 | + "Table description cannot exceed 4,000 characters. Provided description has {} characters.", |
| 134 | + description.len() |
| 135 | + ))); |
| 136 | + } |
| 137 | + metadata.description = Some(description.clone()); |
| 138 | + } |
| 139 | + } |
| 140 | + |
| 141 | + let operation = DeltaOperation::UpdateTableMetadata { |
| 142 | + metadata_update: update, |
| 143 | + }; |
| 144 | + |
| 145 | + let actions = vec![Action::Metadata(metadata)]; |
| 146 | + |
| 147 | + let commit = CommitBuilder::from(this.commit_properties.clone()) |
| 148 | + .with_actions(actions) |
| 149 | + .with_operation_id(operation_id) |
| 150 | + .with_post_commit_hook_handler(this.custom_execute_handler.clone()) |
| 151 | + .build( |
| 152 | + Some(&this.snapshot), |
| 153 | + this.log_store.clone(), |
| 154 | + operation.clone(), |
| 155 | + ) |
| 156 | + .await?; |
| 157 | + |
| 158 | + if let Some(handler) = this.custom_execute_handler { |
| 159 | + handler.post_execute(&this.log_store, operation_id).await?; |
| 160 | + } |
| 161 | + Ok(DeltaTable::new_with_state( |
| 162 | + this.log_store, |
| 163 | + commit.snapshot(), |
| 164 | + )) |
| 165 | + }) |
| 166 | + } |
| 167 | +} |
0 commit comments