Skip to content

Commit e08b997

Browse files
authored
fix(irc): map conflict exception per iceberg spec (#5960) (#13751)
1 parent 28d58e8 commit e08b997

File tree

6 files changed

+132
-20
lines changed

6 files changed

+132
-20
lines changed

metadata-service/iceberg-catalog/src/integrationTest/integration_test.py

Lines changed: 80 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,16 @@ def get_gms_url():
1010
return os.getenv("DATAHUB_GMS_URL") or "http://localhost:8080"
1111

1212

13+
def get_user_pass():
14+
user = os.getenv("DH_USER", "datahub")
15+
password = os.getenv("DH_PASS", "datahub")
16+
return user, password
17+
18+
1319
@pytest.fixture
1420
def personal_access_token():
15-
username = "datahub"
16-
password = "datahub"
17-
token_name, token = cli_utils.generate_access_token(
18-
username, password, get_gms_url()
19-
)
21+
user, password = get_user_pass()
22+
token_name, token = cli_utils.generate_access_token(user, password, get_gms_url())
2023

2124
# Setting this env var makes get_default_graph use these env vars to create a graphql client.
2225
os.environ["DATAHUB_GMS_TOKEN"] = token
@@ -98,21 +101,22 @@ def spark_session(personal_access_token, warehouse):
98101
def warehouse(request, personal_access_token):
99102
warehouse_name = request.param
100103
# PAT dependency just to ensure env vars are setup with token
101-
give_all_permissions("datahub", "test-policy")
102-
103-
data_root = os.getenv(
104-
"ICEBERG_DATA_ROOT", f"s3://srinath-dev/test/{warehouse_name}"
105-
)
104+
user = os.getenv("DH_USER", "datahub")
106105
client_id = os.getenv("ICEBERG_CLIENT_ID")
107106
client_secret = os.getenv("ICEBERG_CLIENT_SECRET")
108107
region = os.getenv("ICEBERG_REGION")
109108
role = os.getenv("ICEBERG_ROLE")
109+
data_root = os.getenv(
110+
"ICEBERG_DATA_ROOT", f"s3://srinath-dev/test/{warehouse_name}"
111+
)
110112

111113
if not all((data_root, client_id, client_secret, region, role)):
112114
pytest.fail(
113-
"Must set ICEBERG_DATA_ROOT, ICEBERG_CLIENT_ID, ICEBERG_CLIENT_SECRET, ICEBERG_REGION, ICEBERG_ROLE"
115+
"Must set DH_USER, DH_PASS, ICEBERG_DATA_ROOT, ICEBERG_CLIENT_ID, ICEBERG_CLIENT_SECRET, ICEBERG_REGION, ICEBERG_ROLE"
114116
)
115117

118+
give_all_permissions(user, "test-policy")
119+
116120
try:
117121
iceberg_cli.delete.callback(warehouse_name, dry_run=False, force=True)
118122
print(
@@ -204,6 +208,69 @@ def _test_rename_ops(spark_session):
204208
spark_session.sql("drop view test_table_renamed")
205209

206210

211+
def run_query(spark, id, table):
212+
merge_query = f"""MERGE INTO {table} target
213+
USING (
214+
SELECT
215+
'{id}' AS id,
216+
'user_{id}' AS user
217+
) source
218+
ON source.id = target.id
219+
WHEN MATCHED THEN UPDATE SET *
220+
WHEN NOT MATCHED THEN INSERT *
221+
"""
222+
spark.sql(merge_query)
223+
224+
225+
def _test_concurrency(spark_session, warehouse):
226+
from pyspark.sql.types import StructType, StructField, StringType
227+
from pyspark import InheritableThread
228+
from uuid import uuid4
229+
230+
df = spark_session.createDataFrame(
231+
[("id1", "user1"), ("id2", "user2")],
232+
schema=StructType(
233+
[
234+
StructField("id", StringType(), True),
235+
StructField("user", StringType(), True),
236+
]
237+
),
238+
)
239+
240+
table_name = "test_table_concurrency"
241+
spark_session.sql(f"DROP TABLE IF EXISTS {table_name}")
242+
243+
# Create table using SQL
244+
spark_session.sql(f"""
245+
CREATE TABLE {table_name} (
246+
id string,
247+
user string
248+
) USING iceberg
249+
TBLPROPERTIES (
250+
'commit.retry.num-retries'='10',
251+
'commit.retry.min-wait-ms'='1000',
252+
'write.merge.isolation-level'='snapshot'
253+
)
254+
""")
255+
256+
# Insert data
257+
df.writeTo(f"default.{table_name}").using("iceberg").append()
258+
259+
# Run concurrent merges on the table
260+
threads = []
261+
n_threads = 4
262+
for _ in range(n_threads):
263+
id = str(uuid4())[:5]
264+
t = InheritableThread(
265+
target=run_query, args=(spark_session, id, f"{table_name}")
266+
)
267+
threads.append(t)
268+
for t in threads:
269+
t.start()
270+
for t in threads:
271+
t.join()
272+
273+
207274
@pytest.mark.quick
208275
@pytest.mark.parametrize("warehouse", ["test_wh_0"], indirect=True)
209276
def test_iceberg_quick(spark_session, warehouse):
@@ -212,6 +279,8 @@ def test_iceberg_quick(spark_session, warehouse):
212279
_test_basic_view_ops(spark_session)
213280
_test_rename_ops(spark_session)
214281

282+
_test_concurrency(spark_session, warehouse)
283+
215284
result = spark_session.sql("show namespaces")
216285
assert result[result["namespace"] == "default"].count() == 1
217286

metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/TableOrViewOpsDelegate.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.linkedin.metadata.aspect.batch.AspectsBatch;
2121
import com.linkedin.metadata.authorization.PoliciesConfig;
2222
import com.linkedin.metadata.entity.EntityService;
23+
import com.linkedin.metadata.entity.validation.ValidationException;
2324
import com.linkedin.mxe.MetadataChangeProposal;
2425
import com.linkedin.schema.SchemaMetadata;
2526
import com.linkedin.util.Pair;
@@ -36,7 +37,10 @@
3637
import org.apache.iceberg.TableMetadataParser;
3738
import org.apache.iceberg.avro.AvroSchemaUtil;
3839
import org.apache.iceberg.catalog.TableIdentifier;
39-
import org.apache.iceberg.exceptions.*;
40+
import org.apache.iceberg.exceptions.AlreadyExistsException;
41+
import org.apache.iceberg.exceptions.CommitFailedException;
42+
import org.apache.iceberg.exceptions.NoSuchTableException;
43+
import org.apache.iceberg.exceptions.NoSuchViewException;
4044
import org.apache.iceberg.io.FileIO;
4145
import org.apache.iceberg.view.SQLViewRepresentation;
4246
import org.apache.iceberg.view.ViewMetadata;

metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/rest/common/IcebergExceptionHandlerAdvice.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,15 @@ public ResponseEntity<?> handle(BadRequestException e) {
5656
return err(e, HttpStatus.BAD_REQUEST);
5757
}
5858

59+
@ExceptionHandler(CommitFailedException.class)
60+
public ResponseEntity<?> handle(CommitFailedException e) {
61+
return err(e, HttpStatus.CONFLICT);
62+
}
63+
5964
@ExceptionHandler(Exception.class)
60-
public ResponseEntity<?> handle(Exception e) throws Exception {
65+
public ResponseEntity<?> handle(Exception e) {
6166
log.error("Server exception", e);
62-
throw e;
67+
return err(e, HttpStatus.INTERNAL_SERVER_ERROR);
6368
}
6469

6570
private ResponseEntity<?> err(Exception e, HttpStatus errCode) {

metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/TableOpsDelegateTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.linkedin.entity.EnvelopedAspect;
2424
import com.linkedin.metadata.aspect.batch.AspectsBatch;
2525
import com.linkedin.metadata.entity.EntityService;
26+
import com.linkedin.metadata.entity.validation.ValidationException;
2627
import com.linkedin.mxe.MetadataChangeProposal;
2728
import com.linkedin.mxe.SystemMetadata;
2829
import com.linkedin.schema.SchemaMetadata;
@@ -42,7 +43,6 @@
4243
import org.apache.iceberg.exceptions.AlreadyExistsException;
4344
import org.apache.iceberg.exceptions.CommitFailedException;
4445
import org.apache.iceberg.exceptions.NoSuchTableException;
45-
import org.apache.iceberg.exceptions.ValidationException;
4646
import org.apache.iceberg.io.FileIO;
4747
import org.apache.iceberg.io.InputFile;
4848
import org.apache.iceberg.types.Types;

metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/ViewOpsDelegateTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.linkedin.entity.EnvelopedAspect;
2525
import com.linkedin.metadata.aspect.batch.AspectsBatch;
2626
import com.linkedin.metadata.entity.EntityService;
27+
import com.linkedin.metadata.entity.validation.ValidationException;
2728
import com.linkedin.mxe.MetadataChangeProposal;
2829
import com.linkedin.mxe.SystemMetadata;
2930
import com.linkedin.schema.SchemaMetadata;
@@ -40,7 +41,9 @@
4041
import org.apache.iceberg.avro.AvroSchemaUtil;
4142
import org.apache.iceberg.catalog.Namespace;
4243
import org.apache.iceberg.catalog.TableIdentifier;
43-
import org.apache.iceberg.exceptions.*;
44+
import org.apache.iceberg.exceptions.AlreadyExistsException;
45+
import org.apache.iceberg.exceptions.CommitFailedException;
46+
import org.apache.iceberg.exceptions.NoSuchViewException;
4447
import org.apache.iceberg.io.FileIO;
4548
import org.apache.iceberg.io.InputFile;
4649
import org.apache.iceberg.types.Types;

metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/rest/common/IcebergExceptionHandlerAdviceTest.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,14 @@
33
import static org.testng.Assert.assertEquals;
44
import static org.testng.Assert.assertNotNull;
55

6-
import org.apache.iceberg.exceptions.*;
6+
import org.apache.iceberg.exceptions.AlreadyExistsException;
7+
import org.apache.iceberg.exceptions.BadRequestException;
8+
import org.apache.iceberg.exceptions.CommitFailedException;
9+
import org.apache.iceberg.exceptions.ForbiddenException;
10+
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
11+
import org.apache.iceberg.exceptions.NoSuchTableException;
12+
import org.apache.iceberg.exceptions.NoSuchViewException;
13+
import org.apache.iceberg.exceptions.NotFoundException;
714
import org.apache.iceberg.rest.responses.ErrorResponse;
815
import org.springframework.http.HttpStatus;
916
import org.springframework.http.ResponseEntity;
@@ -128,13 +135,37 @@ public void testHandleBadRequestException() {
128135
errorResponse, HttpStatus.BAD_REQUEST.value(), TEST_ERROR_MESSAGE, "BadRequestException");
129136
}
130137

131-
@Test(expectedExceptions = RuntimeException.class)
138+
@Test
139+
public void testHandleCommitFailedException() {
140+
// Arrange
141+
CommitFailedException exception = new CommitFailedException(TEST_ERROR_MESSAGE);
142+
143+
// Act
144+
ResponseEntity<?> response = exceptionHandler.handle(exception);
145+
ErrorResponse errorResponse = (ErrorResponse) response.getBody();
146+
147+
// Assert
148+
assertEquals(response.getStatusCode(), HttpStatus.CONFLICT);
149+
assertErrorResponse(
150+
errorResponse, HttpStatus.CONFLICT.value(), TEST_ERROR_MESSAGE, "CommitFailedException");
151+
}
152+
153+
@Test
132154
public void testHandleGenericException() throws Exception {
133155
// Arrange
134156
RuntimeException exception = new RuntimeException(TEST_ERROR_MESSAGE);
135157

136-
// Act & Assert
137-
exceptionHandler.handle(exception);
158+
// Act
159+
ResponseEntity<?> response = exceptionHandler.handle(exception);
160+
ErrorResponse errorResponse = (ErrorResponse) response.getBody();
161+
162+
// Assert
163+
assertEquals(response.getStatusCode(), HttpStatus.INTERNAL_SERVER_ERROR);
164+
assertErrorResponse(
165+
errorResponse,
166+
HttpStatus.INTERNAL_SERVER_ERROR.value(),
167+
TEST_ERROR_MESSAGE,
168+
"RuntimeException");
138169
}
139170

140171
private void assertErrorResponse(

0 commit comments

Comments
 (0)