Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions maintainer/replica/split_span_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var (
maxMoveSpansCountForMerge = 16
minTrafficPercentage = 0.9
maxTrafficPercentage = 1.1
maxLagThreshold = float64(30) // 30s
)

type BalanceCause string
Expand Down Expand Up @@ -352,8 +353,7 @@ func (s *SplitSpanChecker) Check(batch int) replica.GroupCheckResult {
lag := float64(oracle.GetPhysical(pdTime)-phyCkpTs) / 1e3

// only when the lag is less than 30s, we can consider to merge spans.
// TODO: set a better threshold
if lag > 30 {
if lag > maxLagThreshold {
log.Debug("the lag for the group is too large, skip merge",
zap.String("changefeed", s.changefeedID.Name()),
zap.Int64("groupID", s.groupID),
Expand Down Expand Up @@ -1220,4 +1220,5 @@ func SetEasyThresholdForTest() {
balanceScoreThreshold = 1
minTrafficPercentage = 0.8
maxTrafficPercentage = 1.2
maxLagThreshold = 120
}
6 changes: 5 additions & 1 deletion maintainer/scheduler/balance_splits.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ func (s *balanceSplitsScheduler) Name() string {
}

func (s *balanceSplitsScheduler) Execute() time.Time {
availableSize := s.batchSize - s.operatorController.OperatorSize()
if s.operatorController.OperatorSize() > 0 || s.spanController.GetAbsentSize() > 0 {
// not in stable schedule state, skip balance split
return time.Now().Add(time.Second * 5)
}
availableSize := s.batchSize
// We check the state of each group as following. Since each step has dependencies before and after,
// at most one operation step can be performed in each group.
// The main function please refer to check() in split_span_checker.go
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/replica/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (db *replicationDB[T, R]) getOrCreateGroup(task R) *replicationGroup[T, R]
db.taskGroups[groupID] = g
log.Info("scheduler: add new task group", zap.String("schedulerID", db.id),
zap.String("group", GetGroupName(groupID)),
zap.Stringer("groupType", GroupType(groupID)))
zap.Int64("groupID", int64(groupID)))
}
return g
}
Expand Down
12 changes: 10 additions & 2 deletions tests/integration_tests/_utils/query_dispatcher_count
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
# parameter 2: changefeed id
# parameter 3: target count, if target count is -1, means the target count is not important, just not to be null
# parameter 4: retry count
# parameter 5: comparison mode, if set to "le" (less than or equal), compare value <= target; otherwise compare value == target

set -ex

ipAddr=${1}
changefeedID=${2}
target=${3}
retryCount=${4}
comparisonMode=${5}

echo "query dispatcher count"
count=0
Expand All @@ -24,8 +26,14 @@ while [[ $count -lt $retryCount ]]; do
exit 0
fi
else
if [ "$value" == "$target" ]; then
exit 0
if [ "$comparisonMode" == "le" ]; then
if [ "$value" -le "$target" ]; then

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The arithmetic comparison [ "$value" -le "$target" ] will cause the script to fail if $value is not an integer (e.g., if it's null from the jq command). The previous string comparison == was safer in this regard. To make this more robust, you should check if $value is an integer before performing the arithmetic comparison.

Suggested change
if [ "$value" -le "$target" ]; then
if [[ "$value" =~ ^[0-9]+$ ]] && [ "$value" -le "$target" ]; then

exit 0
fi
else
if [ "$value" == "$target" ]; then
exit 0
fi
fi
fi

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[scheduler]
enable-table-across-nodes = true
region-threshold = 80
region-count-per-span = 10
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# diff Configuration.
check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/ddl_for_split_tables_random_schedule/sync_diff/output"

source-instances = ["tidb0"]

target-instance = "mysql1"

target-check-tables = ["test.*"]

[data-sources]
[data-sources.tidb0]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.mysql1]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
drop database if exists `test`;
create database `test`;
use `test`;
create table table_1 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));
create table table_2 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));
create table table_3 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));
create table table_4 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));
create table table_5 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));
create table table_6 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));
create table table_7 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));
create table table_8 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));
create table table_9 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));
create table table_10 (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(255));

split table test.table_1 between (1) and (200000) regions 100;
split table test.table_2 between (1) and (200000) regions 100;
split table test.table_3 between (1) and (200000) regions 100;
split table test.table_4 between (1) and (200000) regions 100;
split table test.table_5 between (1) and (200000) regions 100;
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#!/bin/bash
# This test is aimed to test the schedule process for split tables and non-split tables with dml
# 1. we start three TiCDC servers, and create 5 split tables and 5 non-split tables.
# 2. we enable the split table param, and start a changefeed.
# 3. we execute dmls to these table.
# finally, we check the data consistency between the upstream and downstream, and final dispatchers count of these tables,
# to make sure the schedule process is correct.

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
source $CUR/../_utils/execute_mixed_dml
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1
check_time=60

function prepare() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

# record tso before we create tables to skip the system table DDLs
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0" --addr "127.0.0.1:8300"
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1" --addr "127.0.0.1:8301"
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "2" --addr "127.0.0.1:8302"

run_sql_file $CUR/data/pre.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}

TOPIC_NAME="ticdc-ddl-split-table-with-random-schedule-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://root:@127.0.0.1:3306/" ;;
esac
do_retry 5 3 run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c "test" --config="$CUR/conf/changefeed.toml"
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac
}

function execute_dml() {
table_name="table_$1"
execute_mixed_dml "$table_name" "${UP_TIDB_HOST}" "${UP_TIDB_PORT}"
}

main() {
prepare

declare -a pids=()

for i in {1..10}; do
execute_dml $i &
pids+=("$!")
done

sleep 200

kill -9 ${pids[@]}

sleep 60

check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100

query_dispatcher_count "127.0.0.1:8300" "test" 36 100 le # 6 * 5 + 5 + 1

cdc_pid_1=$(pgrep -f "$CDC_BINARY.*--addr 127.0.0.1:8300")
if [ -z "$cdc_pid_1" ]; then
echo "ERROR: cdc server 1 is not running"
exit 1
fi
kill_cdc_pid $cdc_pid_1

sleep 60

query_dispatcher_count "127.0.0.1:8301" "test" 26 100 le # 4 * 5 + 5 + 1

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
main
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ function prepare() {
# record tso before we create tables to skip the system table DDLs
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})

export GO_FAILPOINTS='github.com/pingcap/ticdc/maintainer/scheduler/StopSplitScheduler=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0" --addr "127.0.0.1:8300"
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1" --addr "127.0.0.1:8301"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ function prepare() {

cd $WORK_DIR

export GO_FAILPOINTS='github.com/pingcap/ticdc/maintainer/scheduler/StopBalanceScheduler=return(true);github.com/pingcap/ticdc/maintainer/scheduler/StopSplitScheduler=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/maintainer/scheduler/StopBalanceScheduler=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0" --addr "127.0.0.1:8300"

run_sql_file $CUR/data/pre.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ function prepare() {
# record tso before we create tables to skip the system table DDLs
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})

export GO_FAILPOINTS='github.com/pingcap/ticdc/maintainer/scheduler/StopBalanceScheduler=return(true);github.com/pingcap/ticdc/maintainer/scheduler/StopSplitScheduler=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/maintainer/scheduler/StopBalanceScheduler=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0" --addr "127.0.0.1:8300"
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1" --addr "127.0.0.1:8301"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ function prepare() {
# record tso before we create tables to skip the system table DDLs
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})

export GO_FAILPOINTS='github.com/pingcap/ticdc/maintainer/scheduler/StopBalanceScheduler=return(true);github.com/pingcap/ticdc/maintainer/scheduler/StopSplitScheduler=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/maintainer/scheduler/StopBalanceScheduler=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0" --addr "127.0.0.1:8300"
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1" --addr "127.0.0.1:8301"
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "2" --addr "127.0.0.1:8302"
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/merge_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ function prepare() {

cd $WORK_DIR

export GO_FAILPOINTS='github.com/pingcap/ticdc/maintainer/scheduler/StopBalanceScheduler=return(true);github.com/pingcap/ticdc/maintainer/scheduler/StopSplitScheduler=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/maintainer/scheduler/StopBalanceScheduler=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0" --addr "127.0.0.1:8300"

run_sql_file $CUR/data/pre.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
8 changes: 4 additions & 4 deletions tests/integration_tests/run_heavy_it_in_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ mysql_groups=(
# G07
'resolve_lock merge_table'
# G08
'bank'
'bank ddl_for_split_tables_random_schedule'
# G09
'drop_many_tables'
# G10
Expand Down Expand Up @@ -95,7 +95,7 @@ kafka_groups=(
# G11
'fail_over region_merge'
# G12
''
'ddl_for_split_tables_random_schedule'
# G13
'debezium01'
# G14
Expand Down Expand Up @@ -129,7 +129,7 @@ pulsar_groups=(
'mq_sink_error_resume'
# G10
# fail_over_ddl_mix_with_syncpoint
''
'ddl_for_split_tables_random_schedule'
# G11
'ddl_with_random_move_table'
# G12
Expand Down Expand Up @@ -169,7 +169,7 @@ storage_groups=(
'ddl_for_split_tables_with_random_merge_and_split'
# G11
# fail_over_ddl_mix_with_syncpoint
''
'ddl_for_split_tables_random_schedule'
# G12
'ddl_with_random_move_table'
# G13
Expand Down