Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,9 @@ abstract class InMemoryBaseTable(
this
}

def alterTableWithData(data: Array[BufferedRows],
newSchema: StructType): InMemoryBaseTable = {
def alterTableWithData(
data: Array[BufferedRows],
newSchema: StructType): InMemoryBaseTable = {
data.foreach { bufferedRow =>
val oldSchema = bufferedRow.schema
bufferedRow.rows.foreach { row =>
Expand Down Expand Up @@ -757,8 +758,7 @@ object InMemoryBaseTable {
* @param key partition key
* @param schema schema used to write the rows
*/
class BufferedRows(val key: Seq[Any],
val schema: StructType)
class BufferedRows(val key: Seq[Any], val schema: StructType)
extends WriterCommitMessage
with InputPartition with HasPartitionKey with HasPartitionStatistics with Serializable {
val log = new mutable.ArrayBuffer[InternalRow]()
Expand Down Expand Up @@ -892,8 +892,8 @@ private class BufferedRowsReader(
if (arrayData == null) {
null
} else {
extractArrayValue(arrayData, elementType,
readSchema.fields(readIndex).dataType)
val writeType = writeSchema.fields(writeIndex).dataType.asInstanceOf[ArrayType]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: small bug here, updated the corresponding test

extractArrayValue(arrayData, elementType, writeType.elementType)
}

case dt =>
Expand All @@ -906,19 +906,21 @@ private class BufferedRowsReader(
}
}

private def extractArrayValue(arrayData: ArrayData,
readType: DataType,
writeType: DataType): ArrayData = {
private def extractArrayValue(
arrayData: ArrayData,
readType: DataType,
writeType: DataType): ArrayData = {
val elements = arrayData.toArray[Any](readType)
val convertedElements = extractCollection(elements, readType, writeType)
new GenericArrayData(convertedElements)
}

private def extractMapValue(mapData: MapData,
readKeyType: DataType,
readValueType: DataType,
writeKeyType: DataType,
writeValueType: DataType): MapData = {
private def extractMapValue(
mapData: MapData,
readKeyType: DataType,
readValueType: DataType,
writeKeyType: DataType,
writeValueType: DataType): MapData = {
val keys = mapData.keyArray().toArray[Any](readKeyType)
val values = mapData.valueArray().toArray[Any](readValueType)

Expand All @@ -927,9 +929,10 @@ private class BufferedRowsReader(
ArrayBasedMapData(convertedKeys, convertedValues)
}

private def extractCollection(elements: Array[Any],
readType: DataType,
writeType: DataType) = {
private def extractCollection(
elements: Array[Any],
readType: DataType,
writeType: DataType) = {
(readType, writeType) match {
case (readSt: StructType, writeSt: StructType) =>
elements.map { elem =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2452,7 +2452,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
}
}

test("merge into schema evolution add column with nested field and set all columns") {
test("merge into schema evolution add column with nested struct and set all columns") {
Seq(true, false).foreach { withSchemaEvolution =>
withTempView("source") {
createAndInitTable(
Expand Down Expand Up @@ -2511,7 +2511,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
}
}

test("merge into schema evolution replace column with nested field and set explicit columns") {
test("merge into schema evolution replace column with nested struct and set explicit columns") {
Seq(true, false).foreach { withSchemaEvolution =>
withTempView("source") {
createAndInitTable(
Expand Down Expand Up @@ -2572,7 +2572,8 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
}
}

// TODO- support schema evolution for missing nested types using UPDATE SET * and INSERT *
// currently the source struct needs to be fully compatible with target struct
Copy link
Member Author

@szehon-ho szehon-ho Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this is unrelated, but not sure we are going to do the TODO, so just downgrading it to a comment. It can be addressed it in #52347, but still debating whether it makes sense to do or not

// i.e. cannot remove a nested field
test("merge into schema evolution replace column with nested field and set all columns") {
Seq(true, false).foreach { withSchemaEvolution =>
withTempView("source") {
Expand Down Expand Up @@ -2664,8 +2665,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
sql(mergeStmt)
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
// TODO- InMemoryBaseTable does not return null for nested schema evolution.
Seq(Row(0, Array(Row(1, "a", true), Row(2, "b", true)), "sales"),
Seq(Row(0, Array(Row(1, "a", null), Row(2, "b", null)), "sales"),
Row(1, Array(Row(10, "c", true), Row(20, "d", false)), "hr"),
Row(2, Array(Row(30, "d", false), Row(40, "e", true)), "engineering")))
} else {
Expand Down