Skip to content

Commit 3081c90

Browse files
committed
[Enhancement](group commit)Optimize be select for group commit (apache#35558)
1. Streamload and insert into, if batched and sent to the master FE, should use a consistent BE strategy (previously, insert into reused the first selected BE, while streamload used round robin). First, a map <table id, be id> records a fixed be id for a certain table. The first time a table is imported, a BE is randomly selected, and this table id and be id are recorded in the map permanently. Subsequently, all data imported into this table will select the BE corresponding to the table id recorded in the map. This ensures that batching is maximized to a single BE. To address the issue of excessive load on a single BE, a variable similar to a bvar window is used to monitor the total data volume sent to a specific BE for a specific table during the batch interval (default 10 seconds). A second map <be id, window variable> is used to track this. If a new import finds that its corresponding BE's window variable is less than a certain value (e.g., 1G), the new import continues to be sent to the corresponding BE according to map1. If it exceeds this value, the new import is sent to another BE with the smallest window variable value, and map1 is updated. If every BE exceeds this value, the one with the smallest value is still chosen. This helps to alleviate excessive pressure on a single BE. 2. For streamload, if batched and sent to a BE, it will batch directly on this BE and will commit the transaction at the end of the import. At this point, a request is sent to the FE, which records the size of this import and adds it to the window variable. 3. Streamload sent to observer FE, as well as insert into sent to observer FE, follow the logic in 1 by RPC, passing the table id to the master FE to obtain the selected be id.
1 parent 9eac4f2 commit 3081c90

File tree

9 files changed

+505
-17
lines changed

9 files changed

+505
-17
lines changed

be/src/runtime/group_commit_mgr.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,13 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
397397
request.__set_db_id(db_id);
398398
request.__set_table_id(table_id);
399399
request.__set_txnId(txn_id);
400+
request.__set_groupCommit(true);
401+
request.__set_receiveBytes(state->num_bytes_load_total());
402+
if (_exec_env->master_info()->__isset.backend_id) {
403+
request.__set_backendId(_exec_env->master_info()->backend_id);
404+
} else {
405+
LOG(WARNING) << "_exec_env->master_info not set backend_id";
406+
}
400407
if (state) {
401408
request.__set_commitInfos(state->tablet_commit_infos());
402409
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.common.util;
19+
20+
import java.util.concurrent.atomic.AtomicLongArray;
21+
22+
public class SlidingWindowCounter {
23+
private final int windowSizeInSeconds;
24+
private final int numberOfBuckets;
25+
private final AtomicLongArray buckets;
26+
private final AtomicLongArray bucketTimestamps;
27+
28+
public SlidingWindowCounter(int windowSizeInSeconds) {
29+
this.windowSizeInSeconds = windowSizeInSeconds;
30+
this.numberOfBuckets = windowSizeInSeconds; // Each bucket represents 1 second
31+
this.buckets = new AtomicLongArray(numberOfBuckets);
32+
this.bucketTimestamps = new AtomicLongArray(numberOfBuckets);
33+
}
34+
35+
private int getCurrentBucketIndex() {
36+
long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds
37+
return (int) (currentTime % numberOfBuckets);
38+
}
39+
40+
private void updateCurrentBucket() {
41+
long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds
42+
int currentBucketIndex = getCurrentBucketIndex();
43+
long bucketTimestamp = bucketTimestamps.get(currentBucketIndex);
44+
45+
if (currentTime - bucketTimestamp >= 1) {
46+
buckets.set(currentBucketIndex, 0);
47+
bucketTimestamps.set(currentBucketIndex, currentTime);
48+
}
49+
}
50+
51+
public void add(long value) {
52+
updateCurrentBucket();
53+
int bucketIndex = getCurrentBucketIndex();
54+
buckets.addAndGet(bucketIndex, value);
55+
}
56+
57+
public long get() {
58+
updateCurrentBucket();
59+
long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds
60+
long count = 0;
61+
62+
for (int i = 0; i < numberOfBuckets; i++) {
63+
if (currentTime - bucketTimestamps.get(i) < windowSizeInSeconds) {
64+
count += buckets.get(i);
65+
}
66+
}
67+
return count;
68+
}
69+
70+
public String toString() {
71+
return String.valueOf(get());
72+
}
73+
}

fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.doris.catalog.Database;
2121
import org.apache.doris.catalog.Env;
22+
import org.apache.doris.catalog.OlapTable;
2223
import org.apache.doris.catalog.Table;
2324
import org.apache.doris.common.Config;
2425
import org.apache.doris.common.DdlException;
@@ -128,11 +129,16 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse
128129
String sql = request.getHeader("sql");
129130
LOG.info("streaming load sql={}", sql);
130131
boolean groupCommit = false;
132+
long tableId = -1;
131133
String groupCommitStr = request.getHeader("group_commit");
132134
if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) {
133135
groupCommit = true;
134136
try {
135137
String[] pair = parseDbAndTb(sql);
138+
Database db = Env.getCurrentInternalCatalog()
139+
.getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s));
140+
Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s));
141+
tableId = tbl.getId();
136142
if (isGroupCommitBlock(pair[0], pair[1])) {
137143
String msg = "insert table " + pair[1] + " is blocked on schema change";
138144
return new RestBaseResult(msg);
@@ -150,8 +156,7 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse
150156
}
151157

152158
String label = request.getHeader(LABEL_KEY);
153-
TNetworkAddress redirectAddr;
154-
redirectAddr = selectRedirectBackend(groupCommit);
159+
TNetworkAddress redirectAddr = selectRedirectBackend(request, groupCommit, tableId);
155160

156161
LOG.info("redirect load action to destination={}, label: {}",
157162
redirectAddr.toString(), label);
@@ -274,7 +279,9 @@ private Object executeWithoutPassword(HttpServletRequest request,
274279
return new RestBaseResult(e.getMessage());
275280
}
276281
} else {
277-
redirectAddr = selectRedirectBackend(groupCommit);
282+
long tableId = ((OlapTable) ((Database) Env.getCurrentEnv().getCurrentCatalog().getDb(dbName)
283+
.get()).getTable(tableName).get()).getId();
284+
redirectAddr = selectRedirectBackend(request, groupCommit, tableId);
278285
}
279286

280287
LOG.info("redirect load action to destination={}, stream: {}, db: {}, tbl: {}, label: {}",
@@ -305,7 +312,7 @@ private Object executeStreamLoad2PC(HttpServletRequest request, String db) {
305312
return new RestBaseResult("No transaction operation(\'commit\' or \'abort\') selected.");
306313
}
307314

308-
TNetworkAddress redirectAddr = selectRedirectBackend(false);
315+
TNetworkAddress redirectAddr = selectRedirectBackend(request, false, -1);
309316
LOG.info("redirect stream load 2PC action to destination={}, db: {}, txn: {}, operation: {}",
310317
redirectAddr.toString(), dbName, request.getHeader(TXN_ID_KEY), txnOperation);
311318

@@ -323,12 +330,40 @@ private final synchronized int getLastSelectedBackendIndexAndUpdate() {
323330
return index;
324331
}
325332

326-
private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws LoadException {
333+
private String getCloudClusterName(HttpServletRequest request) {
334+
String cloudClusterName = request.getHeader(SessionVariable.CLOUD_CLUSTER);
335+
if (!Strings.isNullOrEmpty(cloudClusterName)) {
336+
return cloudClusterName;
337+
}
338+
339+
cloudClusterName = ConnectContext.get().getCloudCluster();
340+
if (!Strings.isNullOrEmpty(cloudClusterName)) {
341+
return cloudClusterName;
342+
}
343+
344+
return "";
345+
}
346+
347+
private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolean groupCommit, long tableId)
348+
throws LoadException {
327349
long debugBackendId = DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId", -1L);
328350
if (debugBackendId != -1L) {
329351
Backend backend = Env.getCurrentSystemInfo().getBackend(debugBackendId);
330352
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
331353
}
354+
if (Config.isCloudMode()) {
355+
String cloudClusterName = getCloudClusterName(request);
356+
if (Strings.isNullOrEmpty(cloudClusterName)) {
357+
throw new LoadException("No cloud cluster name selected.");
358+
}
359+
return selectCloudRedirectBackend(cloudClusterName, request, groupCommit);
360+
} else {
361+
return selectLocalRedirectBackend(groupCommit, request, tableId);
362+
}
363+
}
364+
365+
private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServletRequest request, long tableId)
366+
throws LoadException {
332367
Backend backend = null;
333368
BeSelectionPolicy policy = null;
334369
String qualifiedUser = ConnectContext.get().getQualifiedUser();
@@ -348,12 +383,17 @@ private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws LoadEx
348383
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
349384
}
350385
if (groupCommit) {
351-
for (Long backendId : backendIds) {
352-
Backend candidateBe = Env.getCurrentSystemInfo().getBackend(backendId);
353-
if (!candidateBe.isDecommissioned()) {
354-
backend = candidateBe;
355-
break;
356-
}
386+
ConnectContext ctx = new ConnectContext();
387+
ctx.setEnv(Env.getCurrentEnv());
388+
ctx.setThreadLocalInfo();
389+
ctx.setRemoteIP(request.getRemoteAddr());
390+
ctx.setThreadLocalInfo();
391+
392+
try {
393+
backend = Env.getCurrentEnv().getGroupCommitManager()
394+
.selectBackendForGroupCommit(tableId, ctx, false);
395+
} catch (DdlException e) {
396+
throw new RuntimeException(e);
357397
}
358398
} else {
359399
backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
@@ -416,10 +456,10 @@ private Object executeWithClusterToken(HttpServletRequest request, String db,
416456
return new RestBaseResult("No label selected.");
417457
}
418458

419-
TNetworkAddress redirectAddr = selectRedirectBackend(false);
459+
TNetworkAddress redirectAddr = selectRedirectBackend(request, false, -1);
420460

421461
LOG.info("Redirect load action with auth token to destination={},"
422-
+ "stream: {}, db: {}, tbl: {}, label: {}",
462+
+ "stream: {}, db: {}, tbl: {}, label: {}",
423463
redirectAddr.toString(), isStreamLoad, dbName, tableName, label);
424464

425465
URI urlObj = null;

0 commit comments

Comments
 (0)