Skip to content

Commit 233b851

Browse files
authored
scheduler: fix bugs to make scheduler more robust (#1941)
1 parent 86d3e6d commit 233b851

File tree

14 files changed

+173
-15
lines changed

14 files changed

+173
-15
lines changed

maintainer/replica/split_span_checker.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ var (
4949
maxMoveSpansCountForMerge = 16
5050
minTrafficPercentage = 0.9
5151
maxTrafficPercentage = 1.1
52+
maxLagThreshold = float64(30) // 30s
5253
)
5354

5455
type BalanceCause string
@@ -352,8 +353,7 @@ func (s *SplitSpanChecker) Check(batch int) replica.GroupCheckResult {
352353
lag := float64(oracle.GetPhysical(pdTime)-phyCkpTs) / 1e3
353354

354355
// only when the lag is less than 30s, we can consider to merge spans.
355-
// TODO: set a better threshold
356-
if lag > 30 {
356+
if lag > maxLagThreshold {
357357
log.Debug("the lag for the group is too large, skip merge",
358358
zap.String("changefeed", s.changefeedID.Name()),
359359
zap.Int64("groupID", s.groupID),
@@ -1220,4 +1220,5 @@ func SetEasyThresholdForTest() {
12201220
balanceScoreThreshold = 1
12211221
minTrafficPercentage = 0.8
12221222
maxTrafficPercentage = 1.2
1223+
maxLagThreshold = 120
12231224
}

maintainer/scheduler/balance_splits.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,11 @@ func (s *balanceSplitsScheduler) Name() string {
6767
}
6868

6969
func (s *balanceSplitsScheduler) Execute() time.Time {
70-
availableSize := s.batchSize - s.operatorController.OperatorSize()
70+
if s.operatorController.OperatorSize() > 0 || s.spanController.GetAbsentSize() > 0 {
71+
// not in stable schedule state, skip balance split
72+
return time.Now().Add(time.Second * 5)
73+
}
74+
availableSize := s.batchSize
7175
// We check the state of each group as following. Since each step has dependencies before and after,
7276
// at most one operation step can be performed in each group.
7377
// The main function please refer to check() in split_span_checker.go

pkg/scheduler/replica/replication.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ func (db *replicationDB[T, R]) getOrCreateGroup(task R) *replicationGroup[T, R]
376376
db.taskGroups[groupID] = g
377377
log.Info("scheduler: add new task group", zap.String("schedulerID", db.id),
378378
zap.String("group", GetGroupName(groupID)),
379-
zap.Stringer("groupType", GroupType(groupID)))
379+
zap.Int64("groupID", int64(groupID)))
380380
}
381381
return g
382382
}

tests/integration_tests/_utils/query_dispatcher_count

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
# parameter 2: changefeed id
44
# parameter 3: target count, if target count is -1, means the target count is not important, just not to be null
55
# parameter 4: retry count
6+
# parameter 5: comparison mode, if set to "le" (less than or equal), compare value <= target; otherwise compare value == target
67

78
set -ex
89

910
ipAddr=${1}
1011
changefeedID=${2}
1112
target=${3}
1213
retryCount=${4}
14+
comparisonMode=${5}
1315

1416
echo "query dispatcher count"
1517
count=0
@@ -24,8 +26,14 @@ while [[ $count -lt $retryCount ]]; do
2426
exit 0
2527
fi
2628
else
27-
if [ "$value" == "$target" ]; then
28-
exit 0
29+
if [ "$comparisonMode" == "le" ]; then
30+
if [ "$value" -le "$target" ]; then
31+
exit 0
32+
fi
33+
else
34+
if [ "$value" == "$target" ]; then
35+
exit 0
36+
fi
2937
fi
3038
fi
3139

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
[scheduler]
2+
enable-table-across-nodes = true
3+
region-threshold = 80
4+
region-count-per-span = 10
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# diff Configuration.
2+
check-thread-count = 4
3+
4+
export-fix-sql = true
5+
6+
check-struct-only = false
7+
8+
[task]
9+
output-dir = "/tmp/tidb_cdc_test/ddl_for_split_tables_random_schedule/sync_diff/output"
10+
11+
source-instances = ["tidb0"]
12+
13+
target-instance = "mysql1"
14+
15+
target-check-tables = ["test.*"]
16+
17+
[data-sources]
18+
[data-sources.tidb0]
19+
host = "127.0.0.1"
20+
port = 4000
21+
user = "root"
22+
password = ""
23+
24+
[data-sources.mysql1]
25+
host = "127.0.0.1"
26+
port = 3306
27+
user = "root"
28+
password = ""
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
drop database if exists `test`;
2+
create database `test`;
3+
use `test`;
4+
create table table_1 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));
5+
create table table_2 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));
6+
create table table_3 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));
7+
create table table_4 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));
8+
create table table_5 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));
9+
create table table_6 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));
10+
create table table_7 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));
11+
create table table_8 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));
12+
create table table_9 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));
13+
create table table_10 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));
14+
15+
split table test.table_1 between (1) and (200000) regions 100;
16+
split table test.table_2 between (1) and (200000) regions 100;
17+
split table test.table_3 between (1) and (200000) regions 100;
18+
split table test.table_4 between (1) and (200000) regions 100;
19+
split table test.table_5 between (1) and (200000) regions 100;
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
#!/bin/bash
2+
# This test is aimed to test the schedule process for split tables and non-split tables with dml
3+
# 1. we start three TiCDC servers, and create 5 split tables and 5 non-split tables.
4+
# 2. we enable the split table param, and start a changefeed.
5+
# 3. we execute dmls to these table.
6+
# finally, we check the data consistency between the upstream and downstream, and final dispatchers count of these tables,
7+
# to make sure the schedule process is correct.
8+
9+
set -eu
10+
11+
CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
12+
source $CUR/../_utils/test_prepare
13+
source $CUR/../_utils/execute_mixed_dml
14+
WORK_DIR=$OUT_DIR/$TEST_NAME
15+
CDC_BINARY=cdc.test
16+
SINK_TYPE=$1
17+
check_time=60
18+
19+
function prepare() {
20+
rm -rf $WORK_DIR && mkdir -p $WORK_DIR
21+
22+
start_tidb_cluster --workdir $WORK_DIR
23+
24+
cd $WORK_DIR
25+
26+
# record tso before we create tables to skip the system table DDLs
27+
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})
28+
29+
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0" --addr "127.0.0.1:8300"
30+
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1" --addr "127.0.0.1:8301"
31+
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "2" --addr "127.0.0.1:8302"
32+
33+
run_sql_file $CUR/data/pre.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
34+
35+
TOPIC_NAME="ticdc-ddl-split-table-with-random-schedule-$RANDOM"
36+
case $SINK_TYPE in
37+
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
38+
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
39+
pulsar)
40+
run_pulsar_cluster $WORK_DIR normal
41+
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
42+
;;
43+
*) SINK_URI="mysql://root:@127.0.0.1:3306/" ;;
44+
esac
45+
do_retry 5 3 run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c "test" --config="$CUR/conf/changefeed.toml"
46+
case $SINK_TYPE in
47+
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
48+
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
49+
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
50+
esac
51+
}
52+
53+
function execute_dml() {
54+
table_name="table_$1"
55+
execute_mixed_dml "$table_name" "${UP_TIDB_HOST}" "${UP_TIDB_PORT}"
56+
}
57+
58+
main() {
59+
prepare
60+
61+
declare -a pids=()
62+
63+
for i in {1..10}; do
64+
execute_dml $i &
65+
pids+=("$!")
66+
done
67+
68+
sleep 200
69+
70+
kill -9 ${pids[@]}
71+
72+
sleep 60
73+
74+
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100
75+
76+
query_dispatcher_count "127.0.0.1:8300" "test" 36 100 le # 6 * 5 + 5 + 1
77+
78+
cdc_pid_1=$(pgrep -f "$CDC_BINARY.*--addr 127.0.0.1:8300")
79+
if [ -z "$cdc_pid_1" ]; then
80+
echo "ERROR: cdc server 1 is not running"
81+
exit 1
82+
fi
83+
kill_cdc_pid $cdc_pid_1
84+
85+
sleep 60
86+
87+
query_dispatcher_count "127.0.0.1:8301" "test" 26 100 le # 4 * 5 + 5 + 1
88+
89+
cleanup_process $CDC_BINARY
90+
}
91+
92+
trap stop_tidb_cluster EXIT
93+
main
94+
check_logs $WORK_DIR
95+
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

tests/integration_tests/ddl_for_split_tables_with_failover/run.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ function prepare() {
2727
# record tso before we create tables to skip the system table DDLs
2828
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})
2929

30-
export GO_FAILPOINTS='github.com/pingcap/ticdc/maintainer/scheduler/StopSplitScheduler=return(true)'
3130
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0" --addr "127.0.0.1:8300"
3231
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1" --addr "127.0.0.1:8301"
3332

tests/integration_tests/ddl_for_split_tables_with_merge_and_split/run.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ function prepare() {
2323

2424
cd $WORK_DIR
2525

26-
export GO_FAILPOINTS='github.com/pingcap/ticdc/maintainer/scheduler/StopBalanceScheduler=return(true);github.com/pingcap/ticdc/maintainer/scheduler/StopSplitScheduler=return(true)'
26+
export GO_FAILPOINTS='github.com/pingcap/ticdc/maintainer/scheduler/StopBalanceScheduler=return(true)'
2727
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0" --addr "127.0.0.1:8300"
2828

2929
run_sql_file $CUR/data/pre.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}

0 commit comments

Comments
 (0)