Skip to content

Commit 351d458

Browse files
mymeiyidataroaring
authored andcommitted
[fix](publish) Catch exception in genPublishTask to make one failed txn does not block the other txns (#37724)
## Proposed changes If any exception(such as NullPointerException) is thrown in `genPublishTask` when publish, the publish for all txns will fail. This pr catch the exception to make the failed txn does not block other txns.
1 parent 9ff3ef7 commit 351d458

File tree

3 files changed

+59
-2
lines changed

3 files changed

+59
-2
lines changed

fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,12 @@ private void traverseReadyTxnAndDispatchPublishVersionTask(List<TransactionState
109109
if (transactionState.hasSendTask()) {
110110
continue;
111111
}
112-
genPublishTask(allBackends, transactionState, createPublishVersionTaskTime, beIdToBaseTabletIds, batchTask);
112+
try {
113+
genPublishTask(allBackends, transactionState, createPublishVersionTaskTime, beIdToBaseTabletIds,
114+
batchTask);
115+
} catch (Throwable t) {
116+
LOG.error("errors while generate publish task for transaction: {}", transactionState, t);
117+
}
113118
}
114119
if (!batchTask.getAllTasks().isEmpty()) {
115120
AgentTaskExecutor.submit(batchTask);
@@ -127,6 +132,10 @@ private void genPublishTask(List<Long> allBackends, TransactionState transaction
127132
publishBackends = Sets.newHashSet();
128133
publishBackends.addAll(allBackends);
129134
}
135+
if (transactionState.getTransactionId() == DebugPointUtil.getDebugParamOrDefault(
136+
"PublishVersionDaemon.genPublishTask.failed", "txnId", -1L)) {
137+
throw new NullPointerException("genPublishTask failed for txnId: " + transactionState.getTransactionId());
138+
}
130139

131140
if (transactionState.getSubTxnIds() != null) {
132141
for (Entry<Long, TableCommitInfo> entry : transactionState.getSubTxnIdToTableCommitInfo().entrySet()) {

regression-test/data/insert_p0/txn_insert_inject_case.out

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,17 @@
77
2 3.3 xyz [1] [1, 0]
88
2 3.3 xyz [1] [1, 0]
99

10+
-- !select2 --
11+
12+
-- !select3 --
13+
\N \N \N [null] [null, 0]
14+
1 2.2 abc [] []
15+
101 2.2 abc [] []
16+
2 3.3 xyz [1] [1, 0]
17+
18+
-- !select4 --
19+
\N \N \N [null] [null, 0]
20+
102 2.2 abc [] []
21+
3 2.2 abc [] []
22+
4 3.3 xyz [1] [1, 0]
23+

regression-test/suites/insert_p0/txn_insert_inject_case.groovy

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ suite("txn_insert_inject_case", "nonConcurrent") {
151151
}
152152

153153
// 2. commit failed
154+
sql """ truncate table ${table}_0 """
154155
def dbName = "regression_test_insert_p0"
155156
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
156157
logger.info("url: ${url}")
@@ -174,7 +175,7 @@ suite("txn_insert_inject_case", "nonConcurrent") {
174175
statement.execute("commit")
175176
assertTrue(false, "commit should fail")
176177
} catch (Exception e) {
177-
logger.error("commit failed", e);
178+
logger.info("commit failed " + e.getMessage())
178179
}
179180
} finally {
180181
GetDebugPoint().disableDebugPointForAllFEs('DatabaseTransactionMgr.commitTransaction.failed')
@@ -185,4 +186,37 @@ suite("txn_insert_inject_case", "nonConcurrent") {
185186
assertEquals(1, txn_info.size())
186187
assertEquals("ABORTED", txn_info[0].get("TransactionStatus"))
187188
assertTrue(txn_info[0].get("Reason").contains("DebugPoint: DatabaseTransactionMgr.commitTransaction.failed"))
189+
190+
// 3. one txn publish failed
191+
sql """ truncate table ${table}_0 """
192+
txn_id = 0
193+
try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword);
194+
Statement statement = conn.createStatement()) {
195+
statement.execute("begin")
196+
statement.execute("insert into ${table}_0 select * from ${table}_1;")
197+
txn_id = get_txn_id_from_server_info((((StatementImpl) statement).results).getServerInfo())
198+
GetDebugPoint().enableDebugPointForAllFEs('PublishVersionDaemon.genPublishTask.failed', [txnId:txn_id])
199+
statement.execute("insert into ${table}_0 select * from ${table}_2;")
200+
statement.execute("commit")
201+
202+
sql """insert into ${table}_0 values(100, 2.2, "abc", [], [])"""
203+
sql """insert into ${table}_1 values(101, 2.2, "abc", [], [])"""
204+
sql """insert into ${table}_2 values(102, 2.2, "abc", [], [])"""
205+
order_qt_select2 """select * from ${table}_0"""
206+
order_qt_select3 """select * from ${table}_1"""
207+
order_qt_select4 """select * from ${table}_2"""
208+
} finally {
209+
GetDebugPoint().disableDebugPointForAllFEs('PublishVersionDaemon.genPublishTask.failed')
210+
def rowCount = 0
211+
for (int i = 0; i < 20; i++) {
212+
def result = sql "select count(*) from ${table}_0"
213+
logger.info("rowCount: " + result + ", retry: " + i)
214+
rowCount = result[0][0]
215+
if (rowCount >= 7) {
216+
break
217+
}
218+
sleep(1000)
219+
}
220+
assertEquals(7, rowCount)
221+
}
188222
}

0 commit comments

Comments
 (0)