Skip to content

Commit 49df269

Browse files
yujun777dataroaring
authored andcommitted
[improvement](clone) dead be will abort sched task (#36795)
When be is down, its related clone task need to abort. Otherwise this task need to wait until timeout.
1 parent e945a1b commit 49df269

File tree

5 files changed

+183
-2
lines changed

5 files changed

+183
-2
lines changed

fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1866,10 +1866,13 @@ private void gatherStatistics(TabletSchedCtx tabletCtx) {
18661866
* If task is timeout, remove the tablet.
18671867
*/
18681868
public void handleRunningTablets() {
1869+
Set<Long> aliveBeIds = Sets.newHashSet(Env.getCurrentSystemInfo().getAllBackendIds(true));
18691870
// 1. remove the tablet ctx if timeout
18701871
List<TabletSchedCtx> cancelTablets = Lists.newArrayList();
18711872
synchronized (this) {
18721873
for (TabletSchedCtx tabletCtx : runningTablets.values()) {
1874+
long srcBeId = tabletCtx.getSrcBackendId();
1875+
long destBeId = tabletCtx.getDestBackendId();
18731876
if (Config.disable_tablet_scheduler) {
18741877
tabletCtx.setErrMsg("tablet scheduler is disabled");
18751878
cancelTablets.add(tabletCtx);
@@ -1880,6 +1883,12 @@ public void handleRunningTablets() {
18801883
tabletCtx.setErrMsg("timeout");
18811884
cancelTablets.add(tabletCtx);
18821885
stat.counterCloneTaskTimeout.incrementAndGet();
1886+
} else if (destBeId > 0 && !aliveBeIds.contains(destBeId)) {
1887+
tabletCtx.setErrMsg("dest be " + destBeId + " is dead");
1888+
cancelTablets.add(tabletCtx);
1889+
} else if (srcBeId > 0 && !aliveBeIds.contains(srcBeId)) {
1890+
tabletCtx.setErrMsg("src be " + srcBeId + " is dead");
1891+
cancelTablets.add(tabletCtx);
18831892
}
18841893
}
18851894
}

fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,12 +134,18 @@ public static void addDebugPoint(String name) {
134134
addDebugPoint(name, new DebugPoint());
135135
}
136136

137-
public static <E> void addDebugPointWithValue(String name, E value) {
137+
public static void addDebugPointWithParams(String name, Map<String, String> params) {
138138
DebugPoint debugPoint = new DebugPoint();
139-
debugPoint.params.put("value", String.format("%s", value));
139+
debugPoint.params = params;
140140
addDebugPoint(name, debugPoint);
141141
}
142142

143+
public static <E> void addDebugPointWithValue(String name, E value) {
144+
Map<String, String> params = Maps.newHashMap();
145+
params.put("value", String.format("%s", value));
146+
addDebugPointWithParams(name, params);
147+
}
148+
143149
public static void removeDebugPoint(String name) {
144150
DebugPoint debugPoint = debugPoints.remove(name);
145151
LOG.info("remove debug point: name={}, exists={}", name, debugPoint != null);

fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.doris.common.FeConstants;
2525
import org.apache.doris.common.ThreadPoolManager;
2626
import org.apache.doris.common.Version;
27+
import org.apache.doris.common.util.DebugPointUtil;
2728
import org.apache.doris.common.util.MasterDaemon;
2829
import org.apache.doris.persist.HbPackage;
2930
import org.apache.doris.resource.Tag;
@@ -56,6 +57,7 @@
5657
import org.apache.logging.log4j.LogManager;
5758
import org.apache.logging.log4j.Logger;
5859

60+
import java.util.Arrays;
5961
import java.util.List;
6062
import java.util.Map;
6163
import java.util.concurrent.Callable;
@@ -253,6 +255,14 @@ public HeartbeatResponse call() {
253255
result.setBackendInfo(backendInfo);
254256
}
255257

258+
String debugDeadBeIds = DebugPointUtil.getDebugParamOrDefault(
259+
"HeartbeatMgr.BackendHeartbeatHandler", "deadBeIds", "");
260+
if (!Strings.isNullOrEmpty(debugDeadBeIds)
261+
&& Arrays.stream(debugDeadBeIds.split(",")).anyMatch(id -> Long.parseLong(id) == backendId)) {
262+
result.getStatus().setStatusCode(TStatusCode.INTERNAL_ERROR);
263+
result.getStatus().addToErrorMsgs("debug point HeartbeatMgr.deadBeIds set dead be");
264+
}
265+
256266
ok = true;
257267
if (result.getStatus().getStatusCode() == TStatusCode.OK) {
258268
TBackendInfo tBackendInfo = result.getBackendInfo();
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.clone;
19+
20+
import org.apache.doris.catalog.Database;
21+
import org.apache.doris.catalog.Env;
22+
import org.apache.doris.catalog.MaterializedIndex;
23+
import org.apache.doris.catalog.OlapTable;
24+
import org.apache.doris.catalog.Replica;
25+
import org.apache.doris.catalog.Tablet;
26+
import org.apache.doris.common.Config;
27+
import org.apache.doris.common.FeConstants;
28+
import org.apache.doris.common.util.DebugPointUtil;
29+
import org.apache.doris.system.Backend;
30+
import org.apache.doris.utframe.TestWithFeService;
31+
32+
import com.google.common.collect.Maps;
33+
import org.junit.jupiter.api.Assertions;
34+
import org.junit.jupiter.api.Test;
35+
36+
import java.util.List;
37+
import java.util.Map;
38+
39+
public class BeDownCancelCloneTest extends TestWithFeService {
40+
41+
@Override
42+
protected int backendNum() {
43+
return 4;
44+
}
45+
46+
@Override
47+
protected void beforeCreatingConnectContext() throws Exception {
48+
FeConstants.runningUnitTest = true;
49+
FeConstants.default_scheduler_interval_millisecond = 1000;
50+
Config.enable_debug_points = true;
51+
Config.tablet_checker_interval_ms = 100;
52+
Config.tablet_schedule_interval_ms = 100;
53+
Config.tablet_repair_delay_factor_second = 1;
54+
Config.allow_replica_on_same_host = true;
55+
Config.disable_balance = true;
56+
Config.schedule_batch_size = 1000;
57+
Config.schedule_slot_num_per_hdd_path = 1000;
58+
Config.heartbeat_interval_second = 5;
59+
Config.max_backend_heartbeat_failure_tolerance_count = 1;
60+
Config.min_clone_task_timeout_sec = 20 * 60 * 1000;
61+
}
62+
63+
@Test
64+
public void test() throws Exception {
65+
connectContext = createDefaultCtx();
66+
67+
createDatabase("db1");
68+
System.out.println(Env.getCurrentInternalCatalog().getDbNames());
69+
70+
// 3. create table tbl1
71+
createTable("create table db1.tbl1(k1 int) distributed by hash(k1) buckets 1;");
72+
RebalancerTestUtil.updateReplicaPathHash();
73+
74+
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("db1");
75+
OlapTable tbl = (OlapTable) db.getTableOrMetaException("tbl1");
76+
Assertions.assertNotNull(tbl);
77+
Tablet tablet = tbl.getPartitions().iterator().next()
78+
.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next()
79+
.getTablets().iterator().next();
80+
81+
Assertions.assertEquals(3, tablet.getReplicas().size());
82+
long destBeId = Env.getCurrentSystemInfo().getAllBackendIds(true).stream()
83+
.filter(beId -> tablet.getReplicaByBackendId(beId) == null)
84+
.findFirst()
85+
.orElse(-1L);
86+
Assertions.assertTrue(destBeId != -1L);
87+
Backend destBe = Env.getCurrentSystemInfo().getBackend(destBeId);
88+
Assertions.assertNotNull(destBe);
89+
Assertions.assertTrue(destBe.isAlive());
90+
91+
// add debug point, make clone wait
92+
DebugPointUtil.addDebugPoint("MockedBackendFactory.handleCloneTablet.block");
93+
94+
// move replica[0] to destBeId
95+
Replica srcReplica = tablet.getReplicas().get(0);
96+
String moveTabletSql = "ADMIN SET REPLICA STATUS PROPERTIES(\"tablet_id\" = \"" + tablet.getId() + "\", "
97+
+ "\"backend_id\" = \"" + srcReplica.getBackendId() + "\", \"status\" = \"drop\")";
98+
Assertions.assertNotNull(getSqlStmtExecutor(moveTabletSql));
99+
Assertions.assertFalse(srcReplica.isScheduleAvailable());
100+
101+
Thread.sleep(3000);
102+
103+
Assertions.assertEquals(0, Env.getCurrentEnv().getTabletScheduler().getHistoryTablets(100).size());
104+
Assertions.assertEquals(4, tablet.getReplicas().size());
105+
Replica destReplica = tablet.getReplicaByBackendId(destBeId);
106+
Assertions.assertNotNull(destReplica);
107+
Assertions.assertEquals(Replica.ReplicaState.CLONE, destReplica.getState());
108+
109+
// clone a replica on destBe
110+
List<TabletSchedCtx> runningTablets = Env.getCurrentEnv().getTabletScheduler().getRunningTablets(100);
111+
Assertions.assertEquals(1, runningTablets.size());
112+
Assertions.assertEquals(destBeId, runningTablets.get(0).getDestBackendId());
113+
114+
Map<String, String> params2 = Maps.newHashMap();
115+
params2.put("deadBeIds", String.valueOf(destBeId));
116+
DebugPointUtil.addDebugPointWithParams("HeartbeatMgr.BackendHeartbeatHandler", params2);
117+
118+
Thread.sleep((Config.heartbeat_interval_second
119+
* Config.max_backend_heartbeat_failure_tolerance_count + 4) * 1000L);
120+
121+
destBe = Env.getCurrentSystemInfo().getBackend(destBeId);
122+
Assertions.assertNotNull(destBe);
123+
Assertions.assertFalse(destBe.isAlive());
124+
125+
// delete clone dest task
126+
Assertions.assertFalse(Env.getCurrentEnv().getTabletScheduler().getHistoryTablets(100).isEmpty());
127+
128+
// first drop dest replica (its backend is dead) and src replica (it's mark as drop)
129+
// then re clone a replica to src be, and waiting for cloning.
130+
runningTablets = Env.getCurrentEnv().getTabletScheduler().getRunningTablets(100);
131+
Assertions.assertEquals(1, runningTablets.size());
132+
Assertions.assertEquals(srcReplica.getBackendId(), runningTablets.get(0).getDestBackendId());
133+
134+
DebugPointUtil.removeDebugPoint("MockedBackendFactory.handleCloneTablet.block");
135+
Thread.sleep(2000);
136+
137+
// destBe is dead, cancel clone task
138+
runningTablets = Env.getCurrentEnv().getTabletScheduler().getRunningTablets(100);
139+
Assertions.assertEquals(0, runningTablets.size());
140+
141+
Assertions.assertEquals(3, tablet.getReplicas().size());
142+
for (Replica replica : tablet.getReplicas()) {
143+
Assertions.assertTrue(replica.getBackendId() != destBeId);
144+
Assertions.assertTrue(replica.isScheduleAvailable());
145+
Assertions.assertEquals(Replica.ReplicaState.NORMAL, replica.getState());
146+
}
147+
}
148+
}

fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.doris.catalog.DiskInfo;
2222
import org.apache.doris.catalog.DiskInfo.DiskState;
2323
import org.apache.doris.common.ClientPool;
24+
import org.apache.doris.common.util.DebugPointUtil;
2425
import org.apache.doris.proto.Data;
2526
import org.apache.doris.proto.InternalService;
2627
import org.apache.doris.proto.PBackendServiceGrpc;
@@ -247,6 +248,13 @@ private void handleDropTablet(TAgentTaskRequest request, TFinishTaskRequest fini
247248
}
248249

249250
private void handleCloneTablet(TAgentTaskRequest request, TFinishTaskRequest finishTaskRequest) {
251+
while (DebugPointUtil.isEnable("MockedBackendFactory.handleCloneTablet.block")) {
252+
try {
253+
Thread.sleep(10);
254+
} catch (InterruptedException e) {
255+
// ignore
256+
}
257+
}
250258
TCloneReq req = request.getCloneReq();
251259
long dataSize = Math.max(1, CatalogTestUtil.getTabletDataSize(req.tablet_id));
252260
long pathHash = req.dest_path_hash;

0 commit comments

Comments
 (0)