Skip to content

Commit c594ef6

Browse files
authored
sync_diff_inspector: fix flaky test TestMysqlRouter (#12159)
close #12141
1 parent 1080da9 commit c594ef6

File tree

5 files changed

+45
-33
lines changed

5 files changed

+45
-33
lines changed

sync_diff_inspector/source/chunks_iter.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package source
1515

1616
import (
1717
"context"
18+
"sync"
1819

1920
"github.com/pingcap/errors"
2021
"github.com/pingcap/log"
@@ -36,6 +37,7 @@ type ChunksIterator struct {
3637
errCh chan error
3738
splitThreadCount int
3839

40+
wg sync.WaitGroup
3941
cancel context.CancelFunc
4042
pool *utils.WorkerPool
4143
}
@@ -60,12 +62,19 @@ func NewChunksIterator(
6062
cancel: cancel,
6163
pool: utils.NewWorkerPool(uint(splitThreadCount), "chunks producer"),
6264
}
65+
66+
iter.wg.Add(1)
6367
go iter.produceChunks(ctxx, startRange)
6468
return iter, nil
6569
}
6670

6771
func (t *ChunksIterator) produceChunks(ctx context.Context, startRange *splitter.RangeInfo) {
68-
defer close(t.chunksCh)
72+
defer func() {
73+
t.pool.WaitFinished()
74+
close(t.chunksCh)
75+
t.wg.Done()
76+
}()
77+
6978
nextTableIndex := 0
7079

7180
// If chunkRange
@@ -166,7 +175,6 @@ func (t *ChunksIterator) produceChunks(ctx context.Context, startRange *splitter
166175
}
167176
})
168177
}
169-
t.pool.WaitFinished()
170178
}
171179

172180
// Next returns the next chunk
@@ -187,7 +195,7 @@ func (t *ChunksIterator) Next(ctx context.Context) (*splitter.RangeInfo, error)
187195
// Close closes the iterator
188196
func (t *ChunksIterator) Close() {
189197
t.cancel()
190-
t.pool.WaitFinished()
198+
t.wg.Wait()
191199
}
192200

193201
// TODO: getCurTableIndexID only used for binary search, should be optimized later.

sync_diff_inspector/source/source_test.go

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ import (
2222
"regexp"
2323
"strconv"
2424
"testing"
25-
"time"
2625

2726
"github.com/DATA-DOG/go-sqlmock"
2827
_ "github.com/go-sql-driver/mysql"
2928
"github.com/pingcap/tidb/pkg/parser"
29+
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
3030
"github.com/pingcap/tidb/pkg/util/dbutil"
3131
filter "github.com/pingcap/tidb/pkg/util/table-filter"
3232
router "github.com/pingcap/tidb/pkg/util/table-router"
@@ -221,6 +221,8 @@ func TestTiDBSource(t *testing.T) {
221221
rowIter, err := tidb.GetRowsIterator(ctx, tableCase.rangeInfo)
222222
require.NoError(t, err)
223223

224+
testfailpoint.Enable(t, "github.com/pingcap/tiflow/sync_diff_inspector/splitter/getRowCount", "return(0)")
225+
224226
row := 0
225227
var firstRow, secondRow map[string]*dbutil.ColumnData
226228
for {
@@ -259,8 +261,6 @@ func TestTiDBSource(t *testing.T) {
259261
rowIter.Close()
260262

261263
analyze := tidb.GetTableAnalyzer()
262-
countRows := sqlmock.NewRows([]string{"Cnt"}).AddRow(0)
263-
mock.ExpectQuery("SELECT COUNT.*").WillReturnRows(countRows)
264264
chunkIter, err := analyze.AnalyzeSplitter(ctx, tableDiffs[0], tableCase.rangeInfo)
265265
require.NoError(t, err)
266266
chunkIter.Close()
@@ -438,7 +438,7 @@ func TestMysqlShardSources(t *testing.T) {
438438
shard.Close()
439439
}
440440

441-
func TestMysqlRouter(t *testing.T) {
441+
func TestMySQLRouter(t *testing.T) {
442442
ctx := context.Background()
443443

444444
conn, mock, err := sqlmock.New()
@@ -502,12 +502,7 @@ func TestMysqlRouter(t *testing.T) {
502502
require.NoError(t, err)
503503

504504
// random splitter
505-
// query 1: SELECT COUNT(1) cnt FROM `source_test`.`test2`
506-
countRows := sqlmock.NewRows([]string{"Cnt"}).AddRow(0)
507-
mock.ExpectQuery("SELECT COUNT.*").WillReturnRows(countRows)
508-
// query 2: SELECT COUNT(1) cnt FROM `source_test_t`.`test_t`
509-
countRows = sqlmock.NewRows([]string{"Cnt"}).AddRow(0)
510-
mock.ExpectQuery("SELECT COUNT.*").WillReturnRows(countRows)
505+
testfailpoint.Enable(t, "github.com/pingcap/tiflow/sync_diff_inspector/splitter/getRowCount", "return(0)")
511506
rangeIter, err := mysql.GetRangeIterator(ctx, nil, mysql.GetTableAnalyzer(), 3)
512507
require.NoError(t, err)
513508
_, err = rangeIter.Next(ctx)
@@ -518,12 +513,9 @@ func TestMysqlRouter(t *testing.T) {
518513
require.NoError(t, err)
519514
rangeIter.Close()
520515

521-
// Wait goroutine quits to avoid data race
522-
time.Sleep(time.Second)
523-
524516
// row Iterator
525517
dataRows := sqlmock.NewRows(tableCases[0].rowColumns)
526-
for k := 0; k < 2; k++ {
518+
for k := range 2 {
527519
dataRows.AddRow(tableCases[0].rows[k]...)
528520
}
529521
mock.ExpectQuery(tableCases[0].rowQuery).WillReturnRows(dataRows)

sync_diff_inspector/splitter/limit.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func NewLimitIteratorWithCheckpoint(
123123

124124
chunkSize := table.ChunkSize
125125
if chunkSize <= 0 {
126-
cnt, err := dbutil.GetRowCount(ctx, dbConn, table.Schema, table.Table, "", nil)
126+
cnt, err := getRowCount(ctx, dbConn, table.Schema, table.Table, "", nil)
127127
if err != nil {
128128
return nil, errors.Trace(err)
129129
}

sync_diff_inspector/splitter/random.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,16 @@ type RandomIterator struct {
4141
dbConn *sql.DB
4242
}
4343

44+
// a wrapper for get row count to integrate failpoint.
45+
func getRowCount(ctx context.Context, db dbutil.QueryExecutor, schemaName string, tableName string, where string, args []any) (int64, error) {
46+
failpoint.Inject("getRowCount", func(val failpoint.Value) {
47+
if count, ok := val.(int); ok {
48+
failpoint.Return(int64(count), nil)
49+
}
50+
})
51+
return dbutil.GetRowCount(ctx, db, schemaName, tableName, where, args)
52+
}
53+
4454
// NewRandomIterator return a new iterator
4555
func NewRandomIterator(ctx context.Context, progressID string, table *common.TableDiff, dbConn *sql.DB) (*RandomIterator, error) {
4656
return NewRandomIteratorWithCheckpoint(ctx, progressID, table, dbConn, nil)
@@ -96,7 +106,7 @@ func NewRandomIteratorWithCheckpoint(
96106
// For chunk splitted by random splitter, the checkpoint chunk records the tableCnt.
97107
chunkCnt = bucketChunkCnt - beginIndex
98108
} else {
99-
cnt, err := dbutil.GetRowCount(ctx, dbConn, table.Schema, table.Table, table.Range, nil)
109+
cnt, err := getRowCount(ctx, dbConn, table.Schema, table.Table, table.Range, nil)
100110
if err != nil {
101111
return nil, errors.Trace(err)
102112
}

sync_diff_inspector/splitter/splitter_test.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
sqlmock "github.com/DATA-DOG/go-sqlmock"
2525
"github.com/pingcap/tidb/pkg/parser"
26+
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
2627
"github.com/pingcap/tiflow/sync_diff_inspector/chunk"
2728
"github.com/pingcap/tiflow/sync_diff_inspector/source/common"
2829
"github.com/pingcap/tiflow/sync_diff_inspector/utils"
@@ -146,7 +147,7 @@ func TestSplitRangeByRandom(t *testing.T) {
146147

147148
splitCols, err := GetSplitFields(tableInfo, nil)
148149
require.NoError(t, err)
149-
createFakeResultForRandomSplit(mock, 0, testCase.randomValues)
150+
createFakeResultForRandomSplit(t, mock, 0, testCase.randomValues)
150151
chunks, err := splitRangeByRandom(context.Background(), db, testCase.originChunk, testCase.splitCount, "test", "test", splitCols, "", "")
151152
require.NoError(t, err)
152153
for j, chunk := range chunks {
@@ -335,7 +336,7 @@ func TestRandomSpliter(t *testing.T) {
335336
ChunkSize: 5,
336337
}
337338

338-
createFakeResultForRandomSplit(mock, testCase.count, testCase.randomValues)
339+
createFakeResultForRandomSplit(t, mock, testCase.count, testCase.randomValues)
339340

340341
iter, err := NewRandomIterator(ctx, "", tableDiff, db)
341342
require.NoError(t, err)
@@ -368,7 +369,7 @@ func TestRandomSpliter(t *testing.T) {
368369
ChunkSize: 5,
369370
}
370371

371-
createFakeResultForRandomSplit(mock, testCases[0].count, testCases[0].randomValues)
372+
createFakeResultForRandomSplit(t, mock, testCases[0].count, testCases[0].randomValues)
372373

373374
iter, err := NewRandomIterator(ctx, "", tableDiff, db)
374375
require.NoError(t, err)
@@ -386,7 +387,7 @@ func TestRandomSpliter(t *testing.T) {
386387
ChunkRange: chunk,
387388
}
388389

389-
createFakeResultForRandomSplit(mock, testCases[0].count, testCases[0].randomValues)
390+
createFakeResultForRandomSplit(t, mock, testCases[0].count, testCases[0].randomValues)
390391

391392
iter, err = NewRandomIteratorWithCheckpoint(ctx, "", tableDiff, db, rangeInfo)
392393
require.NoError(t, err)
@@ -402,8 +403,8 @@ func TestRandomSpliter(t *testing.T) {
402403
require.Equal(t, chunk.Index.ChunkIndex, chunkID1.ChunkIndex+1)
403404
}
404405

405-
func createFakeResultForRandomSplit(mock sqlmock.Sqlmock, count int, randomValues [][]string) {
406-
createFakeResultForCount(mock, count)
406+
func createFakeResultForRandomSplit(t *testing.T, mock sqlmock.Sqlmock, count int, randomValues [][]string) {
407+
createFakeResultForCount(t, count)
407408
if randomValues == nil {
408409
return
409410
}
@@ -688,7 +689,7 @@ func TestBucketSpliter(t *testing.T) {
688689
db, mock, err = sqlmock.New()
689690
require.NoError(t, err)
690691
createFakeResultForBucketSplit(mock, nil, nil)
691-
createFakeResultForCount(mock, 64)
692+
testfailpoint.Enable(t, "github.com/pingcap/tiflow/sync_diff_inspector/splitter/getRowCount", "return(64)")
692693
createFakeResultForRandom(mock, testCases[0].aRandomValues[stopJ:], testCases[0].bRandomValues[stopJ:])
693694
iter, err = NewBucketIteratorWithCheckpoint(ctx, "", tableDiff, db, rangeInfo, utils.NewWorkerPool(1, "bucketIter"))
694695
require.NoError(t, err)
@@ -722,11 +723,13 @@ func createFakeResultForBucketSplit(mock sqlmock.Sqlmock, aRandomValues, bRandom
722723
createFakeResultForRandom(mock, aRandomValues, bRandomValues)
723724
}
724725

725-
func createFakeResultForCount(mock sqlmock.Sqlmock, count int) {
726+
func createFakeResultForCount(t *testing.T, count int) {
726727
if count > 0 {
727728
// generate fake result for get the row count of this table
728-
countRows := sqlmock.NewRows([]string{"cnt"}).AddRow(count)
729-
mock.ExpectQuery("SELECT COUNT.*").WillReturnRows(countRows)
729+
testfailpoint.Enable(t,
730+
"github.com/pingcap/tiflow/sync_diff_inspector/splitter/getRowCount",
731+
fmt.Sprintf("return(%d)", count),
732+
)
730733
}
731734
}
732735

@@ -910,12 +913,12 @@ func TestChunkSize(t *testing.T) {
910913

911914
// test random splitter chunksize
912915
// chunkNum is only 1, so don't need randomValues
913-
createFakeResultForRandomSplit(mock, 1000, nil)
916+
createFakeResultForRandomSplit(t, mock, 1000, nil)
914917
randomIter, err := NewRandomIterator(ctx, "", tableDiff, db)
915918
require.NoError(t, err)
916919
require.Equal(t, randomIter.chunkSize, int64(50000))
917920

918-
createFakeResultForRandomSplit(mock, 1000000000, [][]string{
921+
createFakeResultForRandomSplit(t, mock, 1000000000, [][]string{
919922
{"1", "2", "3", "4", "5"},
920923
{"a", "b", "c", "d", "e"},
921924
})
@@ -934,13 +937,12 @@ func TestChunkSize(t *testing.T) {
934937
ChunkSize: 0,
935938
}
936939
// no index
937-
createFakeResultForRandomSplit(mock, 1000, nil)
940+
createFakeResultForRandomSplit(t, mock, 1000, nil)
938941
randomIter, err = NewRandomIterator(ctx, "", tableDiffNoIndex, db)
939942
require.NoError(t, err)
940943
require.Equal(t, randomIter.chunkSize, int64(1001))
941944

942945
// test limit splitter chunksize
943-
createFakeResultForCount(mock, 1000)
944946
mock.ExpectQuery("SELECT `a`,.*limit 50000.*").WillReturnRows(sqlmock.NewRows([]string{"a", "b"}))
945947
_, err = NewLimitIterator(ctx, "", tableDiff, db)
946948
require.NoError(t, err)

0 commit comments

Comments
 (0)