diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java index 53e8ecf9119a1b..e1460c269c1fe4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java @@ -113,7 +113,8 @@ protected List selectAlternativeTabletsForCluster( numOfLowPaths += pathSlot.getTotalAvailBalanceSlotNum(); } } - LOG.info("get number of low load paths: {}, with medium: {}", numOfLowPaths, medium); + LOG.info("get number of low load paths: {}, with medium: {}, tag: {}, isUrgent {}", + numOfLowPaths, medium, clusterStat.getTag(), isUrgent); List alternativeTabletInfos = Lists.newArrayList(); int clusterAvailableBEnum = infoService.getAllBackendIds(true).size(); @@ -121,6 +122,8 @@ protected List selectAlternativeTabletsForCluster( .map(beStat -> Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId()))) .collect(Collectors.toList()); + boolean hasCandidateTablet = false; + // choose tablets from high load backends. // BackendLoadStatistic is sorted by load score in ascend order, // so we need to traverse it from last to first @@ -222,6 +225,8 @@ protected List selectAlternativeTabletsForCluster( continue; } + hasCandidateTablet = true; + // for urgent disk, pick tablets order by size, // then it may always pick tablets that was on the low backends. if (!lowBETablets.isEmpty() @@ -270,6 +275,9 @@ protected List selectAlternativeTabletsForCluster( if (!alternativeTablets.isEmpty()) { LOG.info("select alternative tablets, medium: {}, is urgent: {}, num: {}, detail: {}", medium, isUrgent, alternativeTablets.size(), alternativeTabletInfos); + } else if (isUrgent && !hasCandidateTablet) { + LOG.info("urgent balance cann't found candidate tablets. medium: {}, tag: {}", + medium, clusterStat.getTag()); } return alternativeTablets; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index c0a65966fd95f3..37a4b922023bfb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -170,22 +170,19 @@ private static long getNextPartitionSize(ArrayList historyPartitionsSize) return historyPartitionsSize.get(0); } - int size = historyPartitionsSize.size() > 7 ? 7 : historyPartitionsSize.size(); - boolean isAscending = true; - for (int i = 1; i < size; i++) { - if (historyPartitionsSize.get(i) < historyPartitionsSize.get(i - 1)) { + ArrayList ascendingDeltaSize = new ArrayList(); + for (int i = Math.max(1, historyPartitionsSize.size() - 7); i < historyPartitionsSize.size(); i++) { + long delta = historyPartitionsSize.get(i) - historyPartitionsSize.get(i - 1); + if (delta < 0) { isAscending = false; break; } + ascendingDeltaSize.add(delta); } if (isAscending) { - ArrayList historyDeltaSize = Lists.newArrayList(); - for (int i = 1; i < size; i++) { - historyDeltaSize.add(historyPartitionsSize.get(i) - historyPartitionsSize.get(i - 1)); - } - return historyPartitionsSize.get(size - 1) + ema(historyDeltaSize, 7); + return historyPartitionsSize.get(historyPartitionsSize.size() - 1) + ema(ascendingDeltaSize, 7); } else { return ema(historyPartitionsSize, 7); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java index 79093d6ed4b9ad..2ae051e4f2518e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java @@ -1736,6 +1736,8 @@ public void testAutoBuckets() throws Exception { + " PROPERTIES (\n" + " \"dynamic_partition.enable\" = \"true\",\n" + " \"dynamic_partition.time_unit\" = \"YEAR\",\n" + + " \"dynamic_partition.start\" = \"-50\",\n" + + " \"dynamic_partition.create_history_partition\" = \"true\",\n" + " \"dynamic_partition.end\" = \"1\",\n" + " \"dynamic_partition.prefix\" = \"p\",\n" + " \"replication_allocation\" = \"tag.location.default: 1\"\n" @@ -1744,22 +1746,59 @@ public void testAutoBuckets() throws Exception { Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException("test"); OlapTable table = (OlapTable) db.getTableOrAnalysisException("test_autobucket_dynamic_partition"); List partitions = Lists.newArrayList(table.getAllPartitions()); - Assert.assertEquals(2, partitions.size()); + Assert.assertEquals(52, partitions.size()); for (Partition partition : partitions) { Assert.assertEquals(FeConstants.default_bucket_num, partition.getDistributionInfo().getBucketNum()); partition.setVisibleVersionAndTime(2L, System.currentTimeMillis()); } RebalancerTestUtil.updateReplicaDataSize(1, 1, 1); - String alterStmt = + String alterStmt1 = "alter table test.test_autobucket_dynamic_partition set ('dynamic_partition.end' = '2')"; - ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt)); + ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt1)); List> tempDynamicPartitionTableInfo = Lists.newArrayList(Pair.of(db.getId(), table.getId())); Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo, false); partitions = Lists.newArrayList(table.getAllPartitions()); partitions.sort(Comparator.comparing(Partition::getId)); - Assert.assertEquals(3, partitions.size()); - Assert.assertEquals(1, partitions.get(2).getDistributionInfo().getBucketNum()); + Assert.assertEquals(53, partitions.size()); + Assert.assertEquals(1, partitions.get(partitions.size() - 1).getDistributionInfo().getBucketNum()); + + table.readLock(); + try { + // first 40 partitions with size 0, then 13 partitions with size 100GB(10GB * 10 buckets) + for (int i = 0; i < 52; i++) { + Partition partition = partitions.get(i); + partition.updateVisibleVersion(2L); + for (MaterializedIndex idx : partition.getMaterializedIndices( + MaterializedIndex.IndexExtState.VISIBLE)) { + Assert.assertEquals(10, idx.getTablets().size()); + for (Tablet tablet : idx.getTablets()) { + for (Replica replica : tablet.getReplicas()) { + replica.updateVersion(2L); + replica.setDataSize(i < 40 ? 0L : 10L << 30); + replica.setRowCount(1000L); + } + } + } + if (i >= 40) { + // first 52 partitions are 10 buckets(FeConstants.default_bucket_num) + Assert.assertEquals(10 * (10L << 30), partition.getAllDataSize(true)); + } + } + } finally { + table.readUnlock(); + } + + String alterStmt2 = + "alter table test.test_autobucket_dynamic_partition set ('dynamic_partition.end' = '3')"; + ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt2)); + Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo, false); + + partitions = Lists.newArrayList(table.getAllPartitions()); + partitions.sort(Comparator.comparing(Partition::getId)); + Assert.assertEquals(54, partitions.size()); + // 100GB total, 1GB per bucket, should 100 buckets. + Assert.assertEquals(100, partitions.get(partitions.size() - 1).getDistributionInfo().getBucketNum()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 407c1544a4b00b..8e25efdfada439 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -517,8 +517,8 @@ beHttpPort, beArrowFlightSqlPort, new DefaultHeartbeatServiceImpl(beThriftPort, Backend be = new Backend(Env.getCurrentEnv().getNextId(), backend.getHost(), backend.getHeartbeatPort()); DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId()); diskInfo1.setPathHash(be.getId()); - diskInfo1.setTotalCapacityB(10L << 30); - diskInfo1.setAvailableCapacityB(5L << 30); + diskInfo1.setTotalCapacityB(10L << 40); + diskInfo1.setAvailableCapacityB(5L << 40); diskInfo1.setDataUsedCapacityB(480000); diskInfo1.setPathHash(be.getId()); Map disks = Maps.newHashMap(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java index d09860351bf93b..22fa581391f738 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java @@ -317,8 +317,8 @@ beHttpPort, beArrowFlightSqlPort, new DefaultHeartbeatServiceImpl(beThriftPort, Backend be = new Backend(Env.getCurrentEnv().getNextId(), backend.getHost(), backend.getHeartbeatPort()); Map disks = Maps.newHashMap(); DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId()); - diskInfo1.setTotalCapacityB(10L << 30); - diskInfo1.setAvailableCapacityB(5L << 30); + diskInfo1.setTotalCapacityB(10L << 40); + diskInfo1.setAvailableCapacityB(5L << 40); diskInfo1.setDataUsedCapacityB(480000); diskInfo1.setPathHash(be.getId()); disks.put(diskInfo1.getRootPath(), diskInfo1);