Skip to content

Commit eb94c8a

Browse files
tests(ticdc): fix bank test (#11407)
close #11806
1 parent 325d767 commit eb94c8a

File tree

3 files changed

+82
-2
lines changed

3 files changed

+82
-2
lines changed

cdc/owner/ddl_manager.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,8 @@ func (m *ddlManager) tick(
293293
continue
294294
}
295295

296+
// Note: do not change the key words in the log, it is used to search the
297+
// FinishTS of the DDL job. Some integration tests and users depend on it.
296298
log.Info("handle a ddl job",
297299
zap.String("namespace", m.changfeedID.Namespace),
298300
zap.String("changefeed", m.changfeedID.ID),

tests/integration_tests/bank/case.go

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,18 @@
1414
package main
1515

1616
import (
17+
"bufio"
1718
"context"
1819
"database/sql"
1920
"encoding/json"
2021
"fmt"
2122
"io"
2223
"math/rand"
2324
"net/http"
25+
"os"
26+
"path/filepath"
27+
"regexp"
28+
"strconv"
2429
"strings"
2530
"sync/atomic"
2631
"time"
@@ -623,7 +628,8 @@ func getDownStreamSyncedEndTs(ctx context.Context, db *sql.DB, tidbAPIEndpoint,
623628
log.Error("get downstream sync end ts failed due to timeout", zap.String("table", tableName), zap.Error(ctx.Err()))
624629
return 0, ctx.Err()
625630
case <-time.After(2 * time.Second):
626-
result, ok := tryGetEndTs(db, tidbAPIEndpoint, tableName)
631+
// result, ok := tryGetEndTs(db, tidbAPIEndpoint, tableName)
632+
result, ok := tryGetEndTsFromLog(db, tableName)
627633
if ok {
628634
return result, nil
629635
}
@@ -675,3 +681,73 @@ func tryGetEndTs(db *sql.DB, tidbAPIEndpoint, tableName string) (result uint64,
675681
zap.Uint64("ts", ddlJob[0].Binlog.FinishedTS))
676682
return ddlJob[0].Binlog.FinishedTS, true
677683
}
684+
685+
func tryGetEndTsFromLog(db *sql.DB, tableName string) (result uint64, ok bool) {
686+
query := "SELECT JOB_ID FROM information_schema.ddl_jobs WHERE table_name = ?"
687+
log.Info("try get end ts", zap.String("query", query), zap.String("tableName", tableName))
688+
var jobID uint64
689+
row := db.QueryRow(query, tableName)
690+
if err := row.Scan(&jobID); err != nil {
691+
if err != sql.ErrNoRows {
692+
log.Info("rows scan failed", zap.Error(err))
693+
}
694+
return 0, false
695+
}
696+
697+
log.Info("try parse finishedTs from ticdc log", zap.String("tableName", tableName))
698+
699+
logFilePath := "/tmp/tidb_cdc_test/bank"
700+
cdcLogFiles := make([]string, 0)
701+
// walk all file with cdc prefix
702+
err := filepath.WalkDir(logFilePath, func(path string, d os.DirEntry, err error) error {
703+
if err != nil {
704+
return err
705+
}
706+
if !d.IsDir() {
707+
if strings.Contains(d.Name(), "down") && strings.Contains(d.Name(), "cdc") && strings.Contains(d.Name(), "log") {
708+
cdcLogFiles = append(cdcLogFiles, path)
709+
fmt.Println(path)
710+
}
711+
}
712+
return nil
713+
})
714+
if err != nil {
715+
log.Error("Failed to walk dir: %v", zap.Error(err))
716+
}
717+
log.Info("total files", zap.Any("file", cdcLogFiles))
718+
719+
logRegex := regexp.MustCompile(`handle a ddl job`)
720+
tableNameRegex := regexp.MustCompile(tableName + "`")
721+
timeStampRegex := regexp.MustCompile(`finishedTs=([0-9]+)`)
722+
for _, f := range cdcLogFiles {
723+
file, err := os.Open(f)
724+
if err != nil {
725+
log.Error("Failed to open file: %v", zap.Error(err))
726+
}
727+
defer file.Close()
728+
729+
scanner := bufio.NewScanner(file)
730+
for scanner.Scan() {
731+
line := scanner.Text()
732+
if !logRegex.MatchString(line) || !tableNameRegex.MatchString(line) {
733+
continue
734+
}
735+
736+
matches := timeStampRegex.FindStringSubmatch(line)
737+
if len(matches) > 1 {
738+
fmt.Println("found first match line: ", matches[1], ": ", line)
739+
// convert to uint64
740+
result, err := strconv.ParseUint(matches[1], 10, 64)
741+
if err != nil {
742+
log.Error("Failed to parse uint64: %v", zap.Error(err))
743+
}
744+
return result, true
745+
}
746+
}
747+
748+
if err := scanner.Err(); err != nil {
749+
log.Error("Error scanning file: %v", zap.Error(err))
750+
}
751+
}
752+
return 0, false
753+
}

tests/integration_tests/bank/run.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ function prepare() {
2020
run_sql "CREATE DATABASE bank" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
2121

2222
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
23-
2423
run_cdc_cli changefeed create --sink-uri="mysql://root@${DOWN_TIDB_HOST}:${DOWN_TIDB_PORT}/"
24+
25+
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8400" --pd "http://${DOWN_PD_HOST}:${DOWN_PD_PORT}" --logsuffix "down"
26+
run_cdc_cli changefeed create --sink-uri="blackhole://" -c "changefeed-for-find-finished-ts" --server "http://127.0.0.1:8400"
2527
}
2628

2729
trap stop_tidb_cluster EXIT

0 commit comments

Comments
 (0)