Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,17 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
numOfLowPaths += pathSlot.getTotalAvailBalanceSlotNum();
}
}
LOG.info("get number of low load paths: {}, with medium: {}", numOfLowPaths, medium);
LOG.info("get number of low load paths: {}, with medium: {}, tag: {}, isUrgent {}",
numOfLowPaths, medium, clusterStat.getTag(), isUrgent);

List<String> alternativeTabletInfos = Lists.newArrayList();
int clusterAvailableBEnum = infoService.getAllBackendIds(true).size();
List<Set<Long>> lowBETablets = lowBEs.stream()
.map(beStat -> Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId())))
.collect(Collectors.toList());

boolean hasCandidateTablet = false;

// choose tablets from high load backends.
// BackendLoadStatistic is sorted by load score in ascend order,
// so we need to traverse it from last to first
Expand Down Expand Up @@ -222,6 +225,8 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
continue;
}

hasCandidateTablet = true;

// for urgent disk, pick tablets order by size,
// then it may always pick tablets that was on the low backends.
if (!lowBETablets.isEmpty()
Expand Down Expand Up @@ -270,6 +275,9 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
if (!alternativeTablets.isEmpty()) {
LOG.info("select alternative tablets, medium: {}, is urgent: {}, num: {}, detail: {}",
medium, isUrgent, alternativeTablets.size(), alternativeTabletInfos);
} else if (isUrgent && !hasCandidateTablet) {
LOG.info("urgent balance cann't found candidate tablets. medium: {}, tag: {}",
medium, clusterStat.getTag());
}
return alternativeTablets;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,22 +170,19 @@ private static long getNextPartitionSize(ArrayList<Long> historyPartitionsSize)
return historyPartitionsSize.get(0);
}

int size = historyPartitionsSize.size() > 7 ? 7 : historyPartitionsSize.size();

boolean isAscending = true;
for (int i = 1; i < size; i++) {
if (historyPartitionsSize.get(i) < historyPartitionsSize.get(i - 1)) {
ArrayList<Long> ascendingDeltaSize = new ArrayList<Long>();
for (int i = Math.max(1, historyPartitionsSize.size() - 7); i < historyPartitionsSize.size(); i++) {
long delta = historyPartitionsSize.get(i) - historyPartitionsSize.get(i - 1);
if (delta < 0) {
isAscending = false;
break;
}
ascendingDeltaSize.add(delta);
}

if (isAscending) {
ArrayList<Long> historyDeltaSize = Lists.newArrayList();
for (int i = 1; i < size; i++) {
historyDeltaSize.add(historyPartitionsSize.get(i) - historyPartitionsSize.get(i - 1));
}
return historyPartitionsSize.get(size - 1) + ema(historyDeltaSize, 7);
return historyPartitionsSize.get(historyPartitionsSize.size() - 1) + ema(ascendingDeltaSize, 7);
} else {
return ema(historyPartitionsSize, 7);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1736,6 +1736,8 @@ public void testAutoBuckets() throws Exception {
+ " PROPERTIES (\n"
+ " \"dynamic_partition.enable\" = \"true\",\n"
+ " \"dynamic_partition.time_unit\" = \"YEAR\",\n"
+ " \"dynamic_partition.start\" = \"-50\",\n"
+ " \"dynamic_partition.create_history_partition\" = \"true\",\n"
+ " \"dynamic_partition.end\" = \"1\",\n"
+ " \"dynamic_partition.prefix\" = \"p\",\n"
+ " \"replication_allocation\" = \"tag.location.default: 1\"\n"
Expand All @@ -1744,22 +1746,59 @@ public void testAutoBuckets() throws Exception {
Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException("test");
OlapTable table = (OlapTable) db.getTableOrAnalysisException("test_autobucket_dynamic_partition");
List<Partition> partitions = Lists.newArrayList(table.getAllPartitions());
Assert.assertEquals(2, partitions.size());
Assert.assertEquals(52, partitions.size());
for (Partition partition : partitions) {
Assert.assertEquals(FeConstants.default_bucket_num, partition.getDistributionInfo().getBucketNum());
partition.setVisibleVersionAndTime(2L, System.currentTimeMillis());
}
RebalancerTestUtil.updateReplicaDataSize(1, 1, 1);

String alterStmt =
String alterStmt1 =
"alter table test.test_autobucket_dynamic_partition set ('dynamic_partition.end' = '2')";
ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt));
ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt1));
List<Pair<Long, Long>> tempDynamicPartitionTableInfo = Lists.newArrayList(Pair.of(db.getId(), table.getId()));
Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo, false);

partitions = Lists.newArrayList(table.getAllPartitions());
partitions.sort(Comparator.comparing(Partition::getId));
Assert.assertEquals(3, partitions.size());
Assert.assertEquals(1, partitions.get(2).getDistributionInfo().getBucketNum());
Assert.assertEquals(53, partitions.size());
Assert.assertEquals(1, partitions.get(partitions.size() - 1).getDistributionInfo().getBucketNum());

table.readLock();
try {
// first 40 partitions with size 0, then 13 partitions with size 100GB(10GB * 10 buckets)
for (int i = 0; i < 52; i++) {
Partition partition = partitions.get(i);
partition.updateVisibleVersion(2L);
for (MaterializedIndex idx : partition.getMaterializedIndices(
MaterializedIndex.IndexExtState.VISIBLE)) {
Assert.assertEquals(10, idx.getTablets().size());
for (Tablet tablet : idx.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
replica.updateVersion(2L);
replica.setDataSize(i < 40 ? 0L : 10L << 30);
replica.setRowCount(1000L);
}
}
}
if (i >= 40) {
// first 52 partitions are 10 buckets(FeConstants.default_bucket_num)
Assert.assertEquals(10 * (10L << 30), partition.getAllDataSize(true));
}
}
} finally {
table.readUnlock();
}

String alterStmt2 =
"alter table test.test_autobucket_dynamic_partition set ('dynamic_partition.end' = '3')";
ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt2));
Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo, false);

partitions = Lists.newArrayList(table.getAllPartitions());
partitions.sort(Comparator.comparing(Partition::getId));
Assert.assertEquals(54, partitions.size());
// 100GB total, 1GB per bucket, should 100 buckets.
Assert.assertEquals(100, partitions.get(partitions.size() - 1).getDistributionInfo().getBucketNum());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,8 @@ beHttpPort, beArrowFlightSqlPort, new DefaultHeartbeatServiceImpl(beThriftPort,
Backend be = new Backend(Env.getCurrentEnv().getNextId(), backend.getHost(), backend.getHeartbeatPort());
DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
diskInfo1.setPathHash(be.getId());
diskInfo1.setTotalCapacityB(10L << 30);
diskInfo1.setAvailableCapacityB(5L << 30);
diskInfo1.setTotalCapacityB(10L << 40);
diskInfo1.setAvailableCapacityB(5L << 40);
diskInfo1.setDataUsedCapacityB(480000);
diskInfo1.setPathHash(be.getId());
Map<String, DiskInfo> disks = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,8 @@ beHttpPort, beArrowFlightSqlPort, new DefaultHeartbeatServiceImpl(beThriftPort,
Backend be = new Backend(Env.getCurrentEnv().getNextId(), backend.getHost(), backend.getHeartbeatPort());
Map<String, DiskInfo> disks = Maps.newHashMap();
DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
diskInfo1.setTotalCapacityB(10L << 30);
diskInfo1.setAvailableCapacityB(5L << 30);
diskInfo1.setTotalCapacityB(10L << 40);
diskInfo1.setAvailableCapacityB(5L << 40);
diskInfo1.setDataUsedCapacityB(480000);
diskInfo1.setPathHash(be.getId());
disks.put(diskInfo1.getRootPath(), diskInfo1);
Expand Down
Loading