Skip to content

Commit 2589dd5

Browse files
yujun777dataroaring
authored andcommitted
[improvement](balance) partition rebalance chose disk by rr (#36826)
Partition rebalance will call function takeAnAvailBalanceSlotFrom to chose disk. But this function prefer to choose the first disk. This PR make this function chose disk by RR.
1 parent 49df269 commit 2589dd5

File tree

3 files changed

+93
-24
lines changed

3 files changed

+93
-24
lines changed

fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import java.util.Map;
4242
import java.util.NavigableSet;
4343
import java.util.Random;
44-
import java.util.Set;
4544
import java.util.concurrent.atomic.AtomicLong;
4645
import java.util.stream.Collectors;
4746

@@ -280,9 +279,9 @@ protected void completeSchedCtx(TabletSchedCtx tabletCtx)
280279
Preconditions.checkNotNull(slot, "unable to get slot of toBe " + move.toBe);
281280

282281
List<RootPathLoadStatistic> paths = beStat.getPathStatistics();
283-
Set<Long> availPath = paths.stream().filter(path -> path.getStorageMedium() == tabletCtx.getStorageMedium()
282+
List<Long> availPath = paths.stream().filter(path -> path.getStorageMedium() == tabletCtx.getStorageMedium()
284283
&& path.isFit(tabletCtx.getTabletSize(), false) == BalanceStatus.OK)
285-
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toSet());
284+
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toList());
286285
long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath);
287286
if (pathHash == -1) {
288287
throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,

fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2005,9 +2005,12 @@ public static class PathSlot {
20052005
// path hash -> slot num
20062006
private Map<Long, Slot> pathSlots = Maps.newConcurrentMap();
20072007
private long beId;
2008+
// only use in takeAnAvailBalanceSlotFrom, make pick RR
2009+
private long lastPickPathHash;
20082010

20092011
public PathSlot(Map<Long, TStorageMedium> paths, long beId) {
20102012
this.beId = beId;
2013+
this.lastPickPathHash = -1;
20112014
for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) {
20122015
pathSlots.put(entry.getKey(), new Slot(entry.getValue()));
20132016
}
@@ -2122,19 +2125,6 @@ public synchronized int getTotalAvailBalanceSlotNum() {
21222125
return num;
21232126
}
21242127

2125-
/**
2126-
* get path whose balance slot num is larger than 0
2127-
*/
2128-
public synchronized Set<Long> getAvailPathsForBalance() {
2129-
Set<Long> pathHashs = Sets.newHashSet();
2130-
for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) {
2131-
if (entry.getValue().getAvailableBalance() > 0) {
2132-
pathHashs.add(entry.getKey());
2133-
}
2134-
}
2135-
return pathHashs;
2136-
}
2137-
21382128
public synchronized List<List<String>> getSlotInfo(long beId) {
21392129
List<List<String>> results = Lists.newArrayList();
21402130
pathSlots.forEach((key, value) -> {
@@ -2167,15 +2157,31 @@ public synchronized long takeBalanceSlot(long pathHash) {
21672157
return -1;
21682158
}
21692159

2170-
public synchronized long takeAnAvailBalanceSlotFrom(Set<Long> pathHashs) {
2171-
for (Long pathHash : pathHashs) {
2172-
Slot slot = pathSlots.get(pathHash);
2173-
if (slot == null) {
2174-
continue;
2160+
public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs) {
2161+
if (pathHashs.isEmpty()) {
2162+
return -1;
2163+
}
2164+
2165+
Collections.sort(pathHashs);
2166+
synchronized (this) {
2167+
int preferSlotIndex = pathHashs.indexOf(lastPickPathHash) + 1;
2168+
if (preferSlotIndex < 0 || preferSlotIndex >= pathHashs.size()) {
2169+
preferSlotIndex = 0;
21752170
}
2176-
if (slot.balanceUsed < slot.getBalanceTotal()) {
2177-
slot.balanceUsed++;
2178-
return pathHash;
2171+
2172+
for (int i = preferSlotIndex; i < pathHashs.size(); i++) {
2173+
long pathHash = pathHashs.get(i);
2174+
if (takeBalanceSlot(pathHash) != -1) {
2175+
lastPickPathHash = pathHash;
2176+
return pathHash;
2177+
}
2178+
}
2179+
for (int i = 0; i < preferSlotIndex; i++) {
2180+
long pathHash = pathHashs.get(i);
2181+
if (takeBalanceSlot(pathHash) != -1) {
2182+
lastPickPathHash = pathHash;
2183+
return pathHash;
2184+
}
21792185
}
21802186
}
21812187
return -1;
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.clone;
19+
20+
import org.apache.doris.clone.TabletScheduler.PathSlot;
21+
import org.apache.doris.common.Config;
22+
import org.apache.doris.thrift.TStorageMedium;
23+
24+
import com.google.common.collect.Lists;
25+
import com.google.common.collect.Maps;
26+
import org.junit.Assert;
27+
import org.junit.Test;
28+
29+
import java.util.Collections;
30+
import java.util.List;
31+
import java.util.Map;
32+
33+
class PathSlotTest {
34+
35+
@Test
36+
public void test() {
37+
Config.balance_slot_num_per_path = 2;
38+
Map<Long, TStorageMedium> paths = Maps.newHashMap();
39+
List<Long> availPathHashs = Lists.newArrayList();
40+
List<Long> expectPathHashs = Lists.newArrayList();
41+
List<Long> gotPathHashs = Lists.newArrayList();
42+
long startPath = 10001L;
43+
long endPath = 10006L;
44+
for (long pathHash = startPath; pathHash < endPath; pathHash++) {
45+
paths.put(pathHash, TStorageMedium.HDD);
46+
availPathHashs.add(pathHash);
47+
expectPathHashs.add(pathHash);
48+
}
49+
for (long pathHash = startPath; pathHash < endPath; pathHash++) {
50+
expectPathHashs.add(pathHash);
51+
}
52+
for (long pathHash = startPath; pathHash < endPath; pathHash++) {
53+
expectPathHashs.add(-1L);
54+
}
55+
56+
PathSlot ps = new PathSlot(paths, 1L);
57+
for (int i = 0; i < expectPathHashs.size(); i++) {
58+
Collections.shuffle(availPathHashs);
59+
gotPathHashs.add(ps.takeAnAvailBalanceSlotFrom(availPathHashs));
60+
}
61+
Assert.assertEquals(expectPathHashs, gotPathHashs);
62+
}
63+
64+
}

0 commit comments

Comments
 (0)