Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion pkg/ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
)
Expand Down Expand Up @@ -673,10 +674,18 @@ func TestLocalJobs(t *testing.T) {
}

func TestSplitCnt(t *testing.T) {
mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
defer func() {
pdClient.Close()
err = mockClient.Close()
require.NoError(t, err)
}()

require.Equal(t, 64, getScanSplitCnt(nil))
require.Equal(t, 64, getScanSplitCnt(&mockKVStore{}))

s := &mockTiKVStore{regionCache: tikv.NewRegionCache(nil)}
s := &mockTiKVStore{regionCache: tikv.NewRegionCache(pdClient)}
for i := uint64(1); i <= 128; i++ {
s.GetRegionCache().SetRegionCacheStore(i, "", "", tikvrpc.TiKV, 1, nil)
if i <= 64 {
Expand Down
9 changes: 9 additions & 0 deletions pkg/ttl/ttlworker/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,15 @@ func (m *taskManager) rescheduleTasks(se session.Session, now time.Time) {
return
}

if len(tasks) == 0 {
return
}

err = m.infoSchemaCache.Update(se)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to update infoSchemaCache", zap.Error(err))
return
}
loop:
for _, t := range tasks {
logger := logutil.Logger(m.ctx).With(
Expand Down
45 changes: 13 additions & 32 deletions pkg/ttl/ttlworker/task_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ func TestParallelSchedule(t *testing.T) {
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, i)
tk.MustExec(sql)
}
isc := cache.NewInfoSchemaCache(time.Second)
require.NoError(t, isc.Update(sessionFactory()))
scheduleWg := sync.WaitGroup{}
finishTasks := make([]func(), 0, 4)
for i := 0; i < 4; i++ {
Expand All @@ -143,7 +141,7 @@ func TestParallelSchedule(t *testing.T) {
}

managerID := fmt.Sprintf("task-manager-%d", i)
m := ttlworker.NewTaskManager(context.Background(), nil, isc, managerID, store)
m := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Second), managerID, store)
m.SetScanWorkers4Test(workers)
scheduleWg.Add(1)
go func() {
Expand Down Expand Up @@ -187,14 +185,10 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) {
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, 1)
tk.MustExec(sql)

// update the infoschema cache
isc := cache.NewInfoSchemaCache(time.Second)
require.NoError(t, isc.Update(sessionFactory()))

// schedule in a task manager
scanWorker := ttlworker.NewMockScanWorker(t)
scanWorker.Start()
m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1", store)
m := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Second), "task-manager-1", store)
m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker})
se := sessionFactory()
now := se.Now()
Expand All @@ -204,7 +198,7 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) {
// another task manager should fetch this task after heartbeat expire
scanWorker2 := ttlworker.NewMockScanWorker(t)
scanWorker2.Start()
m2 := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-2", store)
m2 := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Second), "task-manager-2", store)
m2.SetScanWorkers4Test([]ttlworker.Worker{scanWorker2})
m2.RescheduleTasks(sessionFactory(), now.Add(time.Hour))
tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("running task-manager-2"))
Expand All @@ -215,7 +209,7 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) {
m2.CheckFinishedTask(sessionFactory(), now)
scanWorker3 := ttlworker.NewMockScanWorker(t)
scanWorker3.Start()
m3 := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-3", store)
m3 := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Second), "task-manager-3", store)
m3.SetScanWorkers4Test([]ttlworker.Worker{scanWorker3})
m3.RescheduleTasks(sessionFactory(), now.Add(time.Hour))
tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("finished task-manager-2"))
Expand All @@ -235,14 +229,10 @@ func TestTaskMetrics(t *testing.T) {
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, 1)
tk.MustExec(sql)

// update the infoschema cache
isc := cache.NewInfoSchemaCache(time.Second)
require.NoError(t, isc.Update(sessionFactory()))

// schedule in a task manager
scanWorker := ttlworker.NewMockScanWorker(t)
scanWorker.Start()
m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1", store)
m := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Minute), "task-manager-1", store)
m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker})
se := sessionFactory()
now := se.Now()
Expand All @@ -268,13 +258,11 @@ func TestRescheduleWithError(t *testing.T) {

se := sessionFactory()
now := se.Now()
isc := cache.NewInfoSchemaCache(time.Second)
require.NoError(t, isc.Update(se))

// schedule in a task manager
scanWorker := ttlworker.NewMockScanWorker(t)
scanWorker.Start()
m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1", store)
m := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Minute), "task-manager-1", store)
m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker})
notify := make(chan struct{})
go func() {
Expand Down Expand Up @@ -307,8 +295,7 @@ func TestTTLRunningTasksLimitation(t *testing.T) {
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, i)
tk.MustExec(sql)
}
isc := cache.NewInfoSchemaCache(time.Second)
require.NoError(t, isc.Update(sessionFactory()))

scheduleWg := sync.WaitGroup{}
for i := 0; i < 16; i++ {
workers := []ttlworker.Worker{}
Expand All @@ -319,7 +306,7 @@ func TestTTLRunningTasksLimitation(t *testing.T) {
}

ctx := logutil.WithKeyValue(context.Background(), "ttl-worker-test", fmt.Sprintf("task-manager-%d", i))
m := ttlworker.NewTaskManager(ctx, nil, isc, fmt.Sprintf("task-manager-%d", i), store)
m := ttlworker.NewTaskManager(ctx, nil, cache.NewInfoSchemaCache(time.Minute), fmt.Sprintf("task-manager-%d", i), store)
m.SetScanWorkers4Test(workers)
scheduleWg.Add(1)
go func() {
Expand Down Expand Up @@ -384,9 +371,7 @@ func TestShrinkScanWorkerAndResignOwner(t *testing.T) {
se := sessionFactory()
now := se.Now()

isc := cache.NewInfoSchemaCache(time.Minute)
require.NoError(t, isc.Update(se))
m := ttlworker.NewTaskManager(context.Background(), pool, isc, "scan-manager-1", store)
m := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "scan-manager-1", store)

startBlockNotifyCh := make(chan struct{})
blockCancelCh := make(chan struct{})
Expand Down Expand Up @@ -522,7 +507,7 @@ func TestShrinkScanWorkerAndResignOwner(t *testing.T) {
))

// A resigned task can be obtained by other task managers
m2 := ttlworker.NewTaskManager(context.Background(), pool, isc, "scan-manager-2", store)
m2 := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "scan-manager-2", store)
worker2 := ttlworker.NewMockScanWorker(t)
worker2.Start()
defer func() {
Expand Down Expand Up @@ -562,8 +547,6 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) {
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, i)
tk.MustExec(sql)
}
isc := cache.NewInfoSchemaCache(time.Second)
require.NoError(t, isc.Update(se))

workers := []ttlworker.Worker{}
for j := 0; j < 8; j++ {
Expand All @@ -573,10 +556,10 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) {
}

now := se.Now()
m1 := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-1", store)
m1 := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "task-manager-1", store)
m1.SetScanWorkers4Test(workers[0:4])
m1.RescheduleTasks(se, now)
m2 := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-2", store)
m2 := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "task-manager-2", store)
m2.SetScanWorkers4Test(workers[4:])

// All tasks should be scheduled to m1 and running
Expand Down Expand Up @@ -665,9 +648,7 @@ func TestHeartBeatErrorNotBlockOthers(t *testing.T) {
se := sessionFactory()
now := se.Now()

isc := cache.NewInfoSchemaCache(time.Minute)
require.NoError(t, isc.Update(se))
m := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-1", store)
m := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "task-manager-1", store)
workers := []ttlworker.Worker{}
for j := 0; j < 4; j++ {
scanWorker := ttlworker.NewMockScanWorker(t)
Expand Down
11 changes: 10 additions & 1 deletion pkg/ttl/ttlworker/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
)
Expand Down Expand Up @@ -283,6 +284,14 @@ func (s *mockTiKVStore) GetRegionCache() *tikv.RegionCache {
}

func TestGetMaxRunningTasksLimit(t *testing.T) {
mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
defer func() {
pdClient.Close()
err = mockClient.Close()
require.NoError(t, err)
}()

variable.TTLRunningTasks.Store(1)
require.Equal(t, 1, getMaxRunningTasksLimit(&mockTiKVStore{}))

Expand All @@ -294,7 +303,7 @@ func TestGetMaxRunningTasksLimit(t *testing.T) {
require.Equal(t, variable.MaxConfigurableConcurrency, getMaxRunningTasksLimit(&mockKVStore{}))
require.Equal(t, variable.MaxConfigurableConcurrency, getMaxRunningTasksLimit(&mockTiKVStore{}))

s := &mockTiKVStore{regionCache: tikv.NewRegionCache(nil)}
s := &mockTiKVStore{regionCache: tikv.NewRegionCache(pdClient)}
s.GetRegionCache().SetRegionCacheStore(1, "", "", tikvrpc.TiKV, 1, nil)
s.GetRegionCache().SetRegionCacheStore(2, "", "", tikvrpc.TiKV, 1, nil)
s.GetRegionCache().SetRegionCacheStore(3, "", "", tikvrpc.TiFlash, 1, nil)
Expand Down