|
14 | 14 | package main
|
15 | 15 |
|
16 | 16 | import (
|
| 17 | + "bufio" |
17 | 18 | "context"
|
18 | 19 | "database/sql"
|
19 | 20 | "encoding/json"
|
20 | 21 | "fmt"
|
21 | 22 | "io"
|
22 | 23 | "math/rand"
|
23 | 24 | "net/http"
|
| 25 | + "os" |
| 26 | + "path/filepath" |
| 27 | + "regexp" |
| 28 | + "strconv" |
24 | 29 | "strings"
|
25 | 30 | "sync/atomic"
|
26 | 31 | "time"
|
@@ -623,7 +628,8 @@ func getDownStreamSyncedEndTs(ctx context.Context, db *sql.DB, tidbAPIEndpoint,
|
623 | 628 | log.Error("get downstream sync end ts failed due to timeout", zap.String("table", tableName), zap.Error(ctx.Err()))
|
624 | 629 | return 0, ctx.Err()
|
625 | 630 | case <-time.After(2 * time.Second):
|
626 |
| - result, ok := tryGetEndTs(db, tidbAPIEndpoint, tableName) |
| 631 | + // result, ok := tryGetEndTs(db, tidbAPIEndpoint, tableName) |
| 632 | + result, ok := tryGetEndTsFromLog(db, tableName) |
627 | 633 | if ok {
|
628 | 634 | return result, nil
|
629 | 635 | }
|
@@ -675,3 +681,65 @@ func tryGetEndTs(db *sql.DB, tidbAPIEndpoint, tableName string) (result uint64,
|
675 | 681 | zap.Uint64("ts", ddlJob[0].Binlog.FinishedTS))
|
676 | 682 | return ddlJob[0].Binlog.FinishedTS, true
|
677 | 683 | }
|
| 684 | + |
| 685 | +func tryGetEndTsFromLog(_ *sql.DB, tableName string) (result uint64, ok bool) { |
| 686 | + log.Info("try parse finishedTs from ticdc log", zap.String("tableName", tableName)) |
| 687 | + |
| 688 | + logFilePath := "/tmp/tidb_cdc_test/bank" |
| 689 | + cdcLogFiles := make([]string, 0) |
| 690 | + // walk all file with cdc prefix |
| 691 | + err := filepath.WalkDir(logFilePath, func(path string, d os.DirEntry, err error) error { |
| 692 | + if err != nil { |
| 693 | + return err |
| 694 | + } |
| 695 | + if !d.IsDir() { |
| 696 | + if strings.Contains(d.Name(), "down") && strings.Contains(d.Name(), "cdc") && strings.Contains(d.Name(), "log") { |
| 697 | + cdcLogFiles = append(cdcLogFiles, path) |
| 698 | + fmt.Println(path) |
| 699 | + } |
| 700 | + } |
| 701 | + return nil |
| 702 | + }) |
| 703 | + if err != nil { |
| 704 | + log.Error("Failed to walk dir: %v", zap.Error(err)) |
| 705 | + } |
| 706 | + log.Info("total files", zap.Any("file", cdcLogFiles)) |
| 707 | + |
| 708 | + logRegex := regexp.MustCompile(`handle a ddl job`) |
| 709 | + tableNameRegex := regexp.MustCompile(tableName + "`") |
| 710 | + timeStampRegex := regexp.MustCompile(`finishedTs=([0-9]+)`) |
| 711 | + for _, f := range cdcLogFiles { |
| 712 | + file, err := os.Open(f) |
| 713 | + if err != nil { |
| 714 | + log.Error("Failed to open file: %v", zap.Error(err)) |
| 715 | + } |
| 716 | + defer file.Close() |
| 717 | + |
| 718 | + reader := bufio.NewReader(file) |
| 719 | + for { |
| 720 | + bs, _, err := reader.ReadLine() |
| 721 | + if err != nil { |
| 722 | + if err != io.EOF { |
| 723 | + fmt.Printf("Error reading file: %v\n", err) |
| 724 | + } |
| 725 | + return 0, false |
| 726 | + } |
| 727 | + line := string(bs) |
| 728 | + if !logRegex.MatchString(line) || !tableNameRegex.MatchString(line) { |
| 729 | + continue |
| 730 | + } |
| 731 | + |
| 732 | + matches := timeStampRegex.FindStringSubmatch(line) |
| 733 | + if len(matches) > 1 { |
| 734 | + fmt.Println("found first match line, Match Result: ", matches[1], ", line: ", line) |
| 735 | + // convert to uint64 |
| 736 | + result, err := strconv.ParseUint(matches[1], 10, 64) |
| 737 | + if err != nil { |
| 738 | + log.Error("Failed to parse uint64: %v", zap.Error(err)) |
| 739 | + } |
| 740 | + return result, true |
| 741 | + } |
| 742 | + } |
| 743 | + } |
| 744 | + return 0, false |
| 745 | +} |
0 commit comments