diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 24d0f8374..12a37cafe 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -421,7 +421,7 @@ func (e *eventStore) RegisterDispatcher( // Track the smallest containing span that meets ts requirements // Note: this is still not bestMatch // for example, if we have a dispatcher with span [b, c), - // it is hard to determin whether [a, d) or [b, h) is beshMatch without some statistics. + // it is hard to determine whether [a, d) or [b, h) is bestMatch without some statistics. if bestMatch == nil || (bytes.Compare(subStat.tableSpan.StartKey, bestMatch.tableSpan.StartKey) >= 0 && bytes.Compare(subStat.tableSpan.EndKey, bestMatch.tableSpan.EndKey) <= 0) { diff --git a/logservice/logpuller/priority_queue.go b/logservice/logpuller/priority_queue.go new file mode 100644 index 000000000..b4c3c3e99 --- /dev/null +++ b/logservice/logpuller/priority_queue.go @@ -0,0 +1,127 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpuller + +import ( + "context" + "sync" + + "github.com/pingcap/errors" + "github.com/pingcap/ticdc/utils/heap" +) + +// PriorityQueue is a thread-safe priority queue for region tasks +// It integrates a signal channel to support blocking operations +type PriorityQueue struct { + mu sync.Mutex + heap *heap.Heap[PriorityTask] + + // signal channel for blocking operations + signal chan struct{} +} + +// NewPriorityQueue creates a new priority queue +func NewPriorityQueue() *PriorityQueue { + return &PriorityQueue{ + heap: heap.NewHeap[PriorityTask](), + signal: make(chan struct{}, 1024), + } +} + +// Push adds a task to the priority queue and sends a signal +// This is a non-blocking operation +func (pq *PriorityQueue) Push(task PriorityTask) { + pq.mu.Lock() + pq.heap.AddOrUpdate(task) + pq.mu.Unlock() + + // Send signal to notify waiting consumers + select { + case pq.signal <- struct{}{}: + default: + // Signal channel is full, ignore + } +} + +// Pop removes and returns the highest priority task +// This is a blocking operation that waits for a signal +// Returns nil if the context is cancelled +func (pq *PriorityQueue) Pop(ctx context.Context) (PriorityTask, error) { + for { + // First try to pop without waiting + pq.mu.Lock() + task, ok := pq.heap.PopTop() + pq.mu.Unlock() + + if ok { + return task, nil + } + + // Queue is empty, wait for signal + select { + case <-ctx.Done(): + return nil, ctx.Err() + case _, ok := <-pq.signal: + if !ok { + // Signal channel is closed. + return nil, errors.New("signal channel is closed") + } + // Got signal, try to pop again + continue + } + } +} + +// TryPop attempts to pop a task without blocking +// Returns nil if the queue is empty +func (pq *PriorityQueue) TryPop() PriorityTask { + pq.mu.Lock() + defer pq.mu.Unlock() + + task, ok := pq.heap.PopTop() + if !ok { + return nil + } + return task +} + +// Peek returns the highest priority task without removing it +// Returns nil if the queue is empty +func (pq *PriorityQueue) Peek() PriorityTask { + pq.mu.Lock() + defer pq.mu.Unlock() + + task, ok := pq.heap.PeekTop() + if !ok { + return nil + } + return task +} + +// Len returns the number of tasks in the queue +func (pq *PriorityQueue) Len() int { + pq.mu.Lock() + defer pq.mu.Unlock() + + return pq.heap.Len() +} + +// Close closes the signal channel +func (pq *PriorityQueue) Close() { + // pop all tasks + for pq.Len() > 0 { + pq.TryPop() + } + close(pq.signal) +} diff --git a/logservice/logpuller/priority_queue_test.go b/logservice/logpuller/priority_queue_test.go new file mode 100644 index 000000000..ea019a0c0 --- /dev/null +++ b/logservice/logpuller/priority_queue_test.go @@ -0,0 +1,458 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpuller + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/client-go/v2/tikv" +) + +// mockPriorityTask is a simple mock implementation of PriorityTask for testing +type mockPriorityTask struct { + priority int + heapIndex int + regionInfo regionInfo + description string +} + +func newMockPriorityTask(priority int, description string) *mockPriorityTask { + // Create a minimal regionInfo for testing + verID := tikv.NewRegionVerID(1, 1, 1) + span := heartbeatpb.TableSpan{TableID: 1, StartKey: []byte("a"), EndKey: []byte("z")} + + // Create a subscribedSpan with atomic resolvedTs + subscribedSpan := &subscribedSpan{ + resolvedTs: atomic.Uint64{}, + } + subscribedSpan.resolvedTs.Store(oracle.GoTimeToTS(time.Now())) + + regionInfo := regionInfo{ + verID: verID, + span: span, + subscribedSpan: subscribedSpan, + } + + return &mockPriorityTask{ + priority: priority, + heapIndex: 0, + regionInfo: regionInfo, + description: description, + } +} + +func (m *mockPriorityTask) Priority() int { + return m.priority +} + +func (m *mockPriorityTask) GetRegionInfo() regionInfo { + return m.regionInfo +} + +func (m *mockPriorityTask) SetHeapIndex(index int) { + m.heapIndex = index +} + +func (m *mockPriorityTask) GetHeapIndex() int { + return m.heapIndex +} + +func (m *mockPriorityTask) LessThan(other PriorityTask) bool { + return m.Priority() < other.Priority() +} + +func TestNewPriorityQueue(t *testing.T) { + pq := NewPriorityQueue() + require.NotNil(t, pq) + require.NotNil(t, pq.heap) + require.NotNil(t, pq.signal) + require.Equal(t, 0, pq.Len()) +} + +func TestPriorityQueue_Push(t *testing.T) { + pq := NewPriorityQueue() + + task1 := newMockPriorityTask(10, "task1") + task2 := newMockPriorityTask(5, "task2") + + // Test pushing single task + pq.Push(task1) + require.Equal(t, 1, pq.Len()) + + // Test pushing multiple tasks + pq.Push(task2) + require.Equal(t, 2, pq.Len()) + + // Verify signal channel receives notifications + select { + case <-pq.signal: + // Expected - signal received + case <-time.After(time.Millisecond * 100): + t.Fatal("Expected signal but none received") + } +} + +func TestPriorityQueue_Peek(t *testing.T) { + pq := NewPriorityQueue() + + // Test peek on empty queue + task := pq.Peek() + require.Nil(t, task) + + // Add tasks with different priorities + task1 := newMockPriorityTask(10, "task1") + task2 := newMockPriorityTask(5, "task2") // Higher priority (lower value) + task3 := newMockPriorityTask(15, "task3") + + pq.Push(task1) + pq.Push(task2) + pq.Push(task3) + + // Peek should return highest priority task (lowest value) + topTask := pq.Peek() + require.NotNil(t, topTask) + require.Equal(t, 5, topTask.Priority()) + require.Equal(t, "task2", topTask.(*mockPriorityTask).description) + + // Verify peek doesn't remove the task + require.Equal(t, 3, pq.Len()) + + // Peek again should return the same task + topTaskAgain := pq.Peek() + require.Equal(t, topTask, topTaskAgain) +} + +func TestPriorityQueue_PopBlocking(t *testing.T) { + pq := NewPriorityQueue() + + // Test pop on empty queue with context cancellation + t.Run("PopWithCancellation", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50) + defer cancel() + + start := time.Now() + task, err := pq.Pop(ctx) + require.Error(t, err) + elapsed := time.Since(start) + + require.Nil(t, task) + require.True(t, elapsed >= time.Millisecond*50) + }) + + // Test pop with signal + t.Run("PopWithSignal", func(t *testing.T) { + ctx := context.Background() + + // Add a task in a goroutine after a short delay + go func() { + time.Sleep(time.Millisecond * 50) + task1 := newMockPriorityTask(10, "task1") + pq.Push(task1) + }() + + start := time.Now() + task, err := pq.Pop(ctx) + require.NoError(t, err) + elapsed := time.Since(start) + + require.NotNil(t, task) + require.Equal(t, 10, task.Priority()) + require.True(t, elapsed >= time.Millisecond*50) + require.True(t, elapsed < time.Millisecond*200) // Should not wait too long + }) +} + +func TestPriorityQueue_PopOrder(t *testing.T) { + pq := NewPriorityQueue() + ctx := context.Background() + + // Add tasks with different priorities + tasks := []*mockPriorityTask{ + newMockPriorityTask(10, "task1"), + newMockPriorityTask(5, "task2"), // Highest priority + newMockPriorityTask(15, "task3"), + newMockPriorityTask(7, "task4"), + newMockPriorityTask(12, "task5"), + } + + for _, task := range tasks { + pq.Push(task) + } + + // Pop tasks and verify they come out in priority order + expectedOrder := []string{"task2", "task4", "task1", "task5", "task3"} + expectedPriorities := []int{5, 7, 10, 12, 15} + + for i, expectedDesc := range expectedOrder { + task, err := pq.Pop(ctx) + require.NoError(t, err) + require.NotNil(t, task) + require.Equal(t, expectedPriorities[i], task.Priority()) + require.Equal(t, expectedDesc, task.(*mockPriorityTask).description) + } + + // Verify queue is empty + require.Equal(t, 0, pq.Len()) +} + +func TestPriorityQueue_Len(t *testing.T) { + pq := NewPriorityQueue() + + // Test empty queue + require.Equal(t, 0, pq.Len()) + + // Add tasks and verify length + for i := 0; i < 5; i++ { + task := newMockPriorityTask(i, "task") + pq.Push(task) + require.Equal(t, i+1, pq.Len()) + } + + // Remove tasks and verify length + ctx := context.Background() + for i := 4; i >= 0; i-- { + pq.Pop(ctx) + require.Equal(t, i, pq.Len()) + } +} + +func TestPriorityQueue_ConcurrentOperations(t *testing.T) { + pq := NewPriorityQueue() + + numProducers := 3 + numConsumers := 2 + tasksPerProducer := 10 + totalTasks := numProducers * tasksPerProducer + + var wg sync.WaitGroup + var consumedCount int64 + var mu sync.Mutex + consumedTasks := make([]PriorityTask, 0, totalTasks) + ctx, cancel := context.WithCancel(context.Background()) + + // Start consumers + for i := 0; i < numConsumers; i++ { + wg.Add(1) + go func(consumerID int) { + defer wg.Done() + for { + task, err := pq.Pop(ctx) + if err != nil { + return + } + + mu.Lock() + consumedTasks = append(consumedTasks, task) + count := atomic.AddInt64(&consumedCount, 1) + mu.Unlock() + + if count >= int64(totalTasks) { + cancel() // Signal other consumers to stop + return + } + } + }(i) + } + + // Start producers + for i := 0; i < numProducers; i++ { + wg.Add(1) + go func(producerID int) { + defer wg.Done() + for j := 0; j < tasksPerProducer; j++ { + priority := (producerID * tasksPerProducer) + j + task := newMockPriorityTask(priority, "concurrent_task") + pq.Push(task) + time.Sleep(time.Microsecond * 10) // Small delay to simulate real work + } + }(i) + } + + // Wait for all producers to finish + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + // Wait with timeout + select { + case <-done: + // Success + case <-time.After(time.Second * 5): + cancel() // Cancel to stop consumers + t.Fatal("Test timed out") + } + + // Verify all tasks were consumed + require.Equal(t, int64(totalTasks), atomic.LoadInt64(&consumedCount)) + require.Equal(t, totalTasks, len(consumedTasks)) + + // Verify all tasks were processed + for i := 0; i < len(consumedTasks); i++ { + require.NotNil(t, consumedTasks[i]) + } +} + +func TestPriorityQueue_SignalChannelFull(t *testing.T) { + pq := NewPriorityQueue() + + // Fill the signal channel to capacity + for i := 0; i < cap(pq.signal); i++ { + select { + case pq.signal <- struct{}{}: + default: + t.Fatalf("Failed to fill signal channel at iteration %d", i) + } + } + + // Push a task when signal channel is full - should not block + task := newMockPriorityTask(10, "task") + start := time.Now() + pq.Push(task) + elapsed := time.Since(start) + + // Should complete quickly even though signal channel is full + require.True(t, elapsed < time.Millisecond*100) + require.Equal(t, 1, pq.Len()) +} + +func TestPriorityQueue_UpdateExistingTask(t *testing.T) { + pq := NewPriorityQueue() + + // Create a task and add it to queue + task := newMockPriorityTask(10, "task") + pq.Push(task) + require.Equal(t, 1, pq.Len()) + + // Update the task's priority and push again + task.priority = 5 + pq.Push(task) + + // Length should still be 1 (task was updated, not added) + require.Equal(t, 1, pq.Len()) + + // Verify the task has the updated priority + ctx := context.Background() + poppedTask, err := pq.Pop(ctx) + require.NoError(t, err) + require.NotNil(t, poppedTask) + require.Equal(t, 5, poppedTask.Priority()) +} + +func TestPriorityQueue_Close(t *testing.T) { + pq := NewPriorityQueue() + + // Add 3 task before closing + task := newMockPriorityTask(10, "task") + pq.Push(task) + require.Equal(t, 1, pq.Len()) + task2 := newMockPriorityTask(5, "task2") + pq.Push(task2) + require.Equal(t, 2, pq.Len()) + task3 := newMockPriorityTask(15, "task3") + pq.Push(task3) + require.Equal(t, 3, pq.Len()) + + // Test that close doesn't panic + require.NotPanics(t, func() { + pq.Close() + }) + + // Test that the tasks are popped + require.Equal(t, 0, pq.Len()) +} + +func TestPriorityQueue_EmptyQueueOperations(t *testing.T) { + pq := NewPriorityQueue() + + // Test peek on empty queue + task := pq.Peek() + require.Nil(t, task) + + // Test len on empty queue + require.Equal(t, 0, pq.Len()) + + // Test pop on empty queue with immediate cancellation + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + task2, err := pq.Pop(ctx) + require.Nil(t, task2) + require.Error(t, err) +} + +func TestPriorityQueue_RealPriorityTaskIntegration(t *testing.T) { + pq := NewPriorityQueue() + ctx := context.Background() + currentTs := oracle.GoTimeToTS(time.Now()) + + // Create real priority tasks with different types + verID := tikv.NewRegionVerID(1, 1, 1) + span := heartbeatpb.TableSpan{TableID: 1, StartKey: []byte("a"), EndKey: []byte("z")} + + subscribedSpan := &subscribedSpan{ + resolvedTs: atomic.Uint64{}, + } + subscribedSpan.resolvedTs.Store(oracle.GoTimeToTS(time.Now().Add(-time.Second))) + + regionInfo := regionInfo{ + verID: verID, + span: span, + subscribedSpan: subscribedSpan, + } + + // Create tasks with different priorities + errorTask := NewRegionPriorityTask(TaskHighPrior, regionInfo, currentTs+1) + highTask := NewRegionPriorityTask(TaskHighPrior, regionInfo, currentTs) + lowTask := NewRegionPriorityTask(TaskLowPrior, regionInfo, currentTs) + + // Add tasks in non-priority order + pq.Push(lowTask) + pq.Push(errorTask) + pq.Push(highTask) + + require.Equal(t, 3, pq.Len()) + + // Pop tasks and verify they come out in priority order + // TaskRegionError should have highest priority (lowest value) + first, err := pq.Pop(ctx) + require.NoError(t, err) + require.NotNil(t, first) + require.Equal(t, TaskHighPrior, first.(*regionPriorityTask).taskType) + + second, err := pq.Pop(ctx) + require.NoError(t, err) + require.NotNil(t, second) + require.Equal(t, TaskHighPrior, second.(*regionPriorityTask).taskType) + + third, err := pq.Pop(ctx) + require.NoError(t, err) + require.NotNil(t, third) + require.Equal(t, TaskLowPrior, third.(*regionPriorityTask).taskType) + + require.Equal(t, 0, pq.Len()) + + pq.Close() + task, err := pq.Pop(ctx) + require.Nil(t, task) + require.Error(t, err) +} diff --git a/logservice/logpuller/priority_task.go b/logservice/logpuller/priority_task.go new file mode 100644 index 000000000..68ef9e885 --- /dev/null +++ b/logservice/logpuller/priority_task.go @@ -0,0 +1,127 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpuller + +import ( + "fmt" + "time" + + "github.com/tikv/client-go/v2/oracle" +) + +// TaskType represents the type of region task +type TaskType int + +const ( + // TaskHighPrior represents region error or region change + // This type has the highest priority + TaskHighPrior TaskType = iota + // TaskLowPrior represents new subscription + // This type has the lowest priority + TaskLowPrior +) + +const ( + highPriorityBase = 0 + lowPriorityBase = 60 * 60 * 24 // 1 day + forcedPriorityBase = 60 * 60 // 60 minutes +) + +func (t TaskType) String() string { + return fmt.Sprintf("%d", t) +} + +// PriorityTask is the interface for priority-based tasks +// It implements heap.Item interface +type PriorityTask interface { + // Priority returns the priority value, lower value means higher priority + Priority() int + + // GetRegionInfo returns the underlying regionInfo + GetRegionInfo() regionInfo + + // heap.Item interface methods + SetHeapIndex(int) + GetHeapIndex() int + LessThan(PriorityTask) bool +} + +// regionPriorityTask implements PriorityTask interface +type regionPriorityTask struct { + taskType TaskType + createTime time.Time + regionInfo regionInfo + heapIndex int // for heap.Item interface + currentTs uint64 +} + +// NewRegionPriorityTask creates a new priority task for region +func NewRegionPriorityTask(taskType TaskType, regionInfo regionInfo, currentTs uint64) PriorityTask { + return ®ionPriorityTask{ + taskType: taskType, + createTime: time.Now(), + regionInfo: regionInfo, + heapIndex: 0, // 0 means not in heap + currentTs: currentTs, + } +} + +// Priority calculates the priority based on task type and wait time +// Lower value means higher priority +func (pt *regionPriorityTask) Priority() int { + // Base priority based on task type + basePriority := 0 + switch pt.taskType { + case TaskHighPrior: + basePriority = highPriorityBase // Highest priority + case TaskLowPrior: + basePriority = lowPriorityBase // Lowest priority + } + + // Add time-based priority bonus + // Wait time in seconds, longer wait time means higher priority (lower value) + waitTime := time.Since(pt.createTime) + timeBonus := int(waitTime.Seconds()) + + // ResolvedTsLag in seconds, longer lag means lower priority (higher value) + resolvedTsLag := oracle.GetTimeFromTS(pt.currentTs).Sub(oracle.GetTimeFromTS(pt.regionInfo.subscribedSpan.resolvedTs.Load())) + resolvedTsLagPenalty := int(resolvedTsLag.Seconds()) + + priority := basePriority - timeBonus + resolvedTsLagPenalty + if priority < 0 { + priority = 0 + } + return priority +} + +// GetRegionInfo returns the underlying regionInfo +func (pt *regionPriorityTask) GetRegionInfo() regionInfo { + return pt.regionInfo +} + +// SetHeapIndex sets the heap index for heap.Item interface +func (pt *regionPriorityTask) SetHeapIndex(index int) { + pt.heapIndex = index +} + +// GetHeapIndex gets the heap index for heap.Item interface +func (pt *regionPriorityTask) GetHeapIndex() int { + return pt.heapIndex +} + +// LessThan implements heap.Item interface +// Returns true if this task has higher priority (lower priority value) than the other task +func (pt *regionPriorityTask) LessThan(other PriorityTask) bool { + return pt.Priority() < other.Priority() +} diff --git a/logservice/logpuller/priority_task_test.go b/logservice/logpuller/priority_task_test.go new file mode 100644 index 000000000..5b1b373af --- /dev/null +++ b/logservice/logpuller/priority_task_test.go @@ -0,0 +1,200 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpuller + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" +) + +// TestPriorityCalculationLogic tests the priority calculation logic in isolation +func TestPriorityCalculationLogic(t *testing.T) { + currentTime := time.Now() + currentTs := oracle.GoTimeToTS(currentTime) + + // Test cases for priority calculation + tests := []struct { + name string + taskType TaskType + resolvedTsOffsetSeconds int64 // Offset relative to currentTs (negative means resolvedTs is older) + waitTimeSeconds int // Task wait time + description string + }{ + { + name: "high_priority_new_resolvedTs", + taskType: TaskHighPrior, + resolvedTsOffsetSeconds: -5, // resolvedTs is 5 seconds earlier than currentTs + waitTimeSeconds: 10, // Waited for 10 seconds + description: "High priority task with newer resolvedTs", + }, + { + name: "high_priority_old_resolvedTs", + taskType: TaskHighPrior, + resolvedTsOffsetSeconds: -30, // resolvedTs is 30 seconds earlier than currentTs + waitTimeSeconds: 10, // Waited for 10 seconds + description: "High priority task with older resolvedTs", + }, + { + name: "low_priority_new_resolvedTs", + taskType: TaskLowPrior, + resolvedTsOffsetSeconds: -5, // resolvedTs is 5 seconds earlier than currentTs + waitTimeSeconds: 10, // Waited for 10 seconds + description: "Low priority task with newer resolvedTs", + }, + { + name: "low_priority_old_resolvedTs", + taskType: TaskLowPrior, + resolvedTsOffsetSeconds: -30, // resolvedTs is 30 seconds earlier than currentTs + waitTimeSeconds: 10, // Waited for 10 seconds + description: "Low priority task with older resolvedTs", + }, + } + + var priorities []int + var taskDescriptions []string + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Calculate resolvedTs: currentTs + offset + resolvedTime := oracle.GetTimeFromTS(currentTs).Add(time.Duration(tt.resolvedTsOffsetSeconds) * time.Second) + resolvedTs := oracle.GoTimeToTS(resolvedTime) + + // Simulate priority calculation logic + priority := calculatePriorityDirectly(tt.taskType, currentTs, resolvedTs, tt.waitTimeSeconds) + + t.Logf("%s: Priority = %d", tt.description, priority) + priorities = append(priorities, priority) + taskDescriptions = append(taskDescriptions, tt.description) + }) + } + + // Verify priority order + t.Run("verify_priority_order", func(t *testing.T) { + require.Equal(t, 4, len(priorities)) + + highPriorNewResolvedTs := priorities[0] // High priority with new resolvedTs + highPriorOldResolvedTs := priorities[1] // High priority with old resolvedTs + lowPriorNewResolvedTs := priorities[2] // Low priority with new resolvedTs + lowPriorOldResolvedTs := priorities[3] // Low priority with old resolvedTs + + t.Logf("Priority comparison:") + for i, desc := range taskDescriptions { + t.Logf(" %s: %d", desc, priorities[i]) + } + + // Core verification: For the same task type, newer resolvedTs (smaller lag) should have higher priority (smaller value) + require.Less(t, highPriorNewResolvedTs, highPriorOldResolvedTs, + "For the same task type, tasks with newer resolvedTs should have higher priority") + require.Less(t, lowPriorNewResolvedTs, lowPriorOldResolvedTs, + "For the same task type, tasks with newer resolvedTs should have higher priority") + + // Verify: High priority tasks always have higher priority than low priority tasks + require.Less(t, highPriorNewResolvedTs, lowPriorNewResolvedTs, + "High priority tasks should have higher priority than low priority tasks") + require.Less(t, highPriorOldResolvedTs, lowPriorOldResolvedTs, + "Even with older resolvedTs, high priority tasks should still have higher priority than low priority tasks") + }) +} + +// calculatePriorityDirectly directly calculates priority for testing +// Copies the logic from regionPriorityTask.Priority() +func calculatePriorityDirectly(taskType TaskType, currentTs, resolvedTs uint64, waitTimeSeconds int) int { + // Base priority based on task type + basePriority := 0 + switch taskType { + case TaskHighPrior: + basePriority = highPriorityBase // 1200 + case TaskLowPrior: + basePriority = lowPriorityBase // 3600 + } + + // Add time-based priority bonus + // Wait time in seconds, longer wait time means higher priority (lower value) + timeBonus := waitTimeSeconds + + // Calculate resolvedTs lag + resolvedTsLag := oracle.GetTimeFromTS(currentTs).Sub(oracle.GetTimeFromTS(resolvedTs)) + resolvedTsLagBonus := int(resolvedTsLag.Seconds()) + + priority := basePriority - timeBonus + resolvedTsLagBonus + + if priority < 0 { + priority = 0 + } + return priority +} + +func TestResolvedTsLagLogic(t *testing.T) { + currentTime := time.Now() + currentTs := oracle.GoTimeToTS(currentTime) + + t.Run("test_resolvedTs_lag_calculation_logic", func(t *testing.T) { + // Scenario 1: resolvedTs is 10 seconds earlier than currentTs (resolvedTs is older) + resolvedTs1 := oracle.GoTimeToTS(currentTime.Add(-10 * time.Second)) + lag1 := oracle.GetTimeFromTS(currentTs).Sub(oracle.GetTimeFromTS(resolvedTs1)) + t.Logf("resolvedTs is 10 seconds earlier, lag = %v (%.0f seconds)", lag1, lag1.Seconds()) + + // Scenario 2: resolvedTs is 1 second earlier than currentTs (resolvedTs is newer) + resolvedTs2 := oracle.GoTimeToTS(currentTime.Add(-1 * time.Second)) + lag2 := oracle.GetTimeFromTS(currentTs).Sub(oracle.GetTimeFromTS(resolvedTs2)) + t.Logf("resolvedTs is 1 second earlier, lag = %v (%.0f seconds)", lag2, lag2.Seconds()) + + // Verify: newer resolvedTs should have smaller lag + require.Less(t, lag2, lag1, "newer resolvedTs should have smaller lag") + + // Calculate the impact on priority + priority1 := calculatePriorityDirectly(TaskHighPrior, currentTs, resolvedTs1, 5) + priority2 := calculatePriorityDirectly(TaskHighPrior, currentTs, resolvedTs2, 5) + + t.Logf("Priority with resolvedTs 10 seconds old: %d", priority1) + t.Logf("Priority with resolvedTs 1 second old: %d", priority2) + + // Verify: newer resolvedTs should have higher priority (smaller value) + require.Less(t, priority2, priority1, + "tasks with newer resolvedTs should have higher priority") + }) +} + +func TestEdgeCases(t *testing.T) { + currentTime := time.Now() + currentTs := oracle.GoTimeToTS(currentTime) + + t.Run("resolvedTs in the future", func(t *testing.T) { + resolvedTs := oracle.GoTimeToTS(currentTime.Add(5 * time.Second)) + + lag := oracle.GetTimeFromTS(currentTs).Sub(oracle.GetTimeFromTS(resolvedTs)) + t.Logf("resolvedTs in the future 5 seconds, lag = %v (%.0f seconds)", lag, lag.Seconds()) + + priority := calculatePriorityDirectly(TaskHighPrior, currentTs, resolvedTs, 5) + t.Logf("resolvedTs in the future priority: %d", priority) + + require.GreaterOrEqual(t, priority, 0, "priority should not be less than 0") + }) + + t.Run("different wait time impact", func(t *testing.T) { + resolvedTs := oracle.GoTimeToTS(currentTime.Add(-10 * time.Second)) + + priority1 := calculatePriorityDirectly(TaskHighPrior, currentTs, resolvedTs, 2) + priority2 := calculatePriorityDirectly(TaskHighPrior, currentTs, resolvedTs, 10) + + t.Logf("wait 2 seconds priority: %d", priority1) + t.Logf("wait 10 seconds priority: %d", priority2) + + // wait time longer task priority should be higher + require.Less(t, priority2, priority1, "wait time longer task priority should be higher") + }) +} diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index e565c0eff..4ae620802 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -225,7 +225,6 @@ func handleEventEntries(span *subscribedSpan, state *regionFeedState, entries *c zap.Uint64("regionID", regionID), zap.Uint64("requestID", state.requestID), zap.Stringer("span", &state.region.span)) - for _, cachedEvent := range state.matcher.matchCachedRow(true) { span.kvEventsCache = append(span.kvEventsCache, assembleRowEvent(regionID, cachedEvent)) } diff --git a/logservice/logpuller/region_event_handler_test.go b/logservice/logpuller/region_event_handler_test.go index 3bc581484..d4099d879 100644 --- a/logservice/logpuller/region_event_handler_test.go +++ b/logservice/logpuller/region_event_handler_test.go @@ -75,6 +75,9 @@ func TestHandleEventEntryEventOutOfOrder(t *testing.T) { } ds.AddPath(subID, subSpan, dynstream.AreaSettings{}) + worker := ®ionRequestWorker{ + requestCache: &requestCache{}, + } region := newRegionInfo( tikv.RegionVerID{}, span, @@ -83,7 +86,7 @@ func TestHandleEventEntryEventOutOfOrder(t *testing.T) { false, ) region.lockedRangeState = ®ionlock.LockedRangeState{} - state := newRegionFeedState(region, 1) + state := newRegionFeedState(region, 1, worker) state.start() // Receive prewrite2 with empty value. @@ -211,8 +214,10 @@ func TestHandleResolvedTs(t *testing.T) { } subID1 := SubscriptionID(1) - - state1 := newRegionFeedState(regionInfo{verID: tikv.NewRegionVerID(1, 1, 1)}, uint64(subID1)) + worker := ®ionRequestWorker{ + requestCache: &requestCache{}, + } + state1 := newRegionFeedState(regionInfo{verID: tikv.NewRegionVerID(1, 1, 1)}, uint64(subID1), worker) state1.start() { span := heartbeatpb.TableSpan{ @@ -236,7 +241,7 @@ func TestHandleResolvedTs(t *testing.T) { } subID2 := SubscriptionID(2) - state2 := newRegionFeedState(regionInfo{verID: tikv.NewRegionVerID(2, 2, 2)}, uint64(subID2)) + state2 := newRegionFeedState(regionInfo{verID: tikv.NewRegionVerID(2, 2, 2)}, uint64(subID2), worker) state2.start() { span := heartbeatpb.TableSpan{ @@ -260,7 +265,7 @@ func TestHandleResolvedTs(t *testing.T) { } subID3 := SubscriptionID(3) - state3 := newRegionFeedState(regionInfo{verID: tikv.NewRegionVerID(3, 3, 3)}, uint64(subID3)) + state3 := newRegionFeedState(regionInfo{verID: tikv.NewRegionVerID(3, 3, 3)}, uint64(subID3), worker) state3.start() { span := heartbeatpb.TableSpan{ diff --git a/logservice/logpuller/region_req_cache.go b/logservice/logpuller/region_req_cache.go new file mode 100644 index 000000000..ef995c223 --- /dev/null +++ b/logservice/logpuller/region_req_cache.go @@ -0,0 +1,301 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpuller + +import ( + "context" + "sync" + "time" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/metrics" + "go.uber.org/atomic" + "go.uber.org/zap" +) + +const ( + checkStaleRequestInterval = time.Second * 5 + requestGCLifeTime = time.Minute * 10 + addReqRetryInterval = time.Millisecond * 1 + addReqRetryLimit = 3 +) + +// regionReq represents a wrapped region request with state +type regionReq struct { + regionInfo regionInfo + createTime time.Time +} + +func newRegionReq(region regionInfo) regionReq { + return regionReq{ + regionInfo: region, + createTime: time.Now(), + } +} + +func (r *regionReq) isStale() bool { + return time.Since(r.createTime) > requestGCLifeTime +} + +// requestCache manages region requests with flow control +type requestCache struct { + // pending requests waiting to be sent + pendingQueue chan regionReq + + // sent requests waiting for initialization (subscriptionID -> regions -> regionReq) + sentRequests struct { + sync.RWMutex + regionReqs map[SubscriptionID]map[uint64]regionReq + } + + // counter for sent but not initialized requests + pendingCount atomic.Int64 + // maximum number of pending requests allowed + maxPendingCount int64 + + // channel to signal when space becomes available + spaceAvailable chan struct{} + + lastCheckStaleRequestTime time.Time +} + +func newRequestCache(maxPendingCount int) *requestCache { + return &requestCache{ + pendingQueue: make(chan regionReq, maxPendingCount), // Large buffer to reduce blocking + sentRequests: struct { + sync.RWMutex + regionReqs map[SubscriptionID]map[uint64]regionReq + }{regionReqs: make(map[SubscriptionID]map[uint64]regionReq)}, + pendingCount: atomic.Int64{}, + maxPendingCount: int64(maxPendingCount), + spaceAvailable: make(chan struct{}, 16), // Buffered to avoid blocking + lastCheckStaleRequestTime: time.Now(), + } +} + +// add adds a new region request to the cache +// It blocks if pendingCount >= maxPendingCount until there's space or ctx is cancelled +func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) (bool, error) { + start := time.Now() + ticker := time.NewTicker(addReqRetryInterval) + defer ticker.Stop() + addReqRetryLimit := addReqRetryLimit + c.clearStaleRequest() + + for { + current := c.pendingCount.Load() + if current < c.maxPendingCount || force { + // Try to add the request + req := newRegionReq(region) + select { + case <-ctx.Done(): + return false, ctx.Err() + case c.pendingQueue <- req: + c.incPendingCount() + cost := time.Since(start) + metrics.SubscriptionClientAddRegionRequestDuration.Observe(cost.Seconds()) + return true, nil + case <-c.spaceAvailable: + continue + case <-ticker.C: + addReqRetryLimit-- + if addReqRetryLimit <= 0 { + return false, nil + } + continue + } + } + + // Wait for space to become available + select { + case <-ticker.C: + addReqRetryLimit-- + if addReqRetryLimit <= 0 { + return false, nil + } + continue + case <-c.spaceAvailable: + continue + case <-ctx.Done(): + return false, ctx.Err() + } + } +} + +// pop gets the next pending request, returns nil if queue is empty +func (c *requestCache) pop(ctx context.Context) (regionReq, error) { + select { + case req := <-c.pendingQueue: + return req, nil + case <-ctx.Done(): + return regionReq{}, ctx.Err() + } +} + +// markSent marks a request as sent and adds it to sent requests +func (c *requestCache) markSent(req regionReq) { + c.sentRequests.Lock() + defer c.sentRequests.Unlock() + + m, ok := c.sentRequests.regionReqs[req.regionInfo.subscribedSpan.subID] + + if !ok { + m = make(map[uint64]regionReq) + c.sentRequests.regionReqs[req.regionInfo.subscribedSpan.subID] = m + } + + m[req.regionInfo.verID.GetID()] = req +} + +// markStopped removes a sent request without changing pending count (for stopped regions) +func (c *requestCache) markStopped(subID SubscriptionID, regionID uint64) { + c.sentRequests.Lock() + defer c.sentRequests.Unlock() + + regionReqs, ok := c.sentRequests.regionReqs[subID] + if !ok { + return + } + + _, exists := regionReqs[regionID] + if !exists { + return + } + + delete(regionReqs, regionID) + c.decPendingCount() + // Notify waiting add operations that there's space available + select { + case c.spaceAvailable <- struct{}{}: + default: // If channel is full, skip notification + } +} + +// resolve marks a region as initialized and removes it from sent requests +func (c *requestCache) resolve(subscriptionID SubscriptionID, regionID uint64) bool { + defer c.clearStaleRequest() + + c.sentRequests.Lock() + defer c.sentRequests.Unlock() + regionReqs, ok := c.sentRequests.regionReqs[subscriptionID] + if !ok { + return false + } + + req, exists := regionReqs[regionID] + if !exists { + return false + } + + // Check if the subscription ID matches + if req.regionInfo.subscribedSpan.subID == subscriptionID { + delete(regionReqs, regionID) + c.decPendingCount() + cost := time.Since(req.createTime).Seconds() + if cost > 0 { + log.Debug("cdc resolve region request", zap.Uint64("subID", uint64(subscriptionID)), zap.Uint64("regionID", regionID), zap.Float64("cost", cost), zap.Int("pendingCount", int(c.pendingCount.Load())), zap.Int("pendingQueueLen", len(c.pendingQueue))) + metrics.RegionRequestFinishScanDuration.Observe(cost) + } + // Notify waiting add operations that there's space available + select { + case c.spaceAvailable <- struct{}{}: + default: // If channel is full, skip notification + } + return true + } + + return false +} + +// clearStaleRequest clears stale requests from the cache +// Note: Sometimes, the CDC sends the same region request to TiKV multiple times. In such cases, this method is needed to reduce the pendingSize. +func (c *requestCache) clearStaleRequest() { + if time.Since(c.lastCheckStaleRequestTime) < checkStaleRequestInterval { + return + } + c.sentRequests.Lock() + defer c.sentRequests.Unlock() + reqCount := 0 + for subID, regionReqs := range c.sentRequests.regionReqs { + for regionID, regionReq := range regionReqs { + if regionReq.regionInfo.isStopped() || regionReq.isStale() { + c.decPendingCount() + log.Info("region worker delete stale region request", zap.Uint64("subID", uint64(subID)), zap.Uint64("regionID", regionID), zap.Int("pendingCount", int(c.pendingCount.Load())), zap.Int("pendingQueueLen", len(c.pendingQueue)), zap.Bool("isStopped", regionReq.regionInfo.isStopped()), zap.Bool("isStale", regionReq.isStale()), zap.Time("createTime", regionReq.createTime)) + delete(regionReqs, regionID) + } else { + reqCount += 1 + } + } + if len(regionReqs) == 0 { + delete(c.sentRequests.regionReqs, subID) + } + } + + if reqCount == 0 && c.pendingCount.Load() != 0 { + log.Info("region worker pending request count is not equal to actual region request count, correct it", zap.Int("pendingCount", int(c.pendingCount.Load())), zap.Int("actualReqCount", reqCount)) + c.pendingCount.Store(0) + } + + c.lastCheckStaleRequestTime = time.Now() +} + +// clear removes all requests and returns them +func (c *requestCache) clear() []regionInfo { + var regions []regionInfo + + // Drain pending requests from channel +LOOP: + for { + select { + case req := <-c.pendingQueue: + regions = append(regions, req.regionInfo) + c.decPendingCount() + default: + break LOOP + } + } + + c.sentRequests.Lock() + defer c.sentRequests.Unlock() + + for subID, regionReqs := range c.sentRequests.regionReqs { + for regionID := range regionReqs { + regions = append(regions, regionReqs[regionID].regionInfo) + delete(regionReqs, regionID) + c.decPendingCount() + } + delete(c.sentRequests.regionReqs, subID) + } + return regions +} + +// getPendingCount returns the current pending count +func (c *requestCache) getPendingCount() int { + return int(c.pendingCount.Load()) +} + +func (c *requestCache) incPendingCount() { + c.pendingCount.Inc() +} + +func (c *requestCache) decPendingCount() { + // Ensure pendingCount doesn't go below 0 + current := c.pendingCount.Load() + newCount := current - int64(1) + if newCount < 0 { + c.pendingCount.Store(0) + return + } + c.pendingCount.Dec() +} diff --git a/logservice/logpuller/region_req_cache_test.go b/logservice/logpuller/region_req_cache_test.go new file mode 100644 index 000000000..0bcb9d783 --- /dev/null +++ b/logservice/logpuller/region_req_cache_test.go @@ -0,0 +1,281 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpuller + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikv" +) + +func createTestRegionInfo(subID SubscriptionID, regionID uint64) regionInfo { + verID := tikv.NewRegionVerID(regionID, 1, 1) + + span := heartbeatpb.TableSpan{ + TableID: 1, + StartKey: []byte("start"), + EndKey: []byte("end"), + } + + subscribedSpan := &subscribedSpan{ + subID: subID, + startTs: 100, + span: span, + } + + return newRegionInfo(verID, span, nil, subscribedSpan, false) +} + +func TestRequestCacheAdd_NormalCase(t *testing.T) { + cache := newRequestCache(10) + ctx := context.Background() + + region := createTestRegionInfo(1, 1) + + ok, err := cache.add(ctx, region, false) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, 1, cache.getPendingCount()) + + // Verify the request was added to the queue + req, err := cache.pop(ctx) + require.NoError(t, err) + require.NotNil(t, req) + require.Equal(t, region.verID.GetID(), req.regionInfo.verID.GetID()) + require.Equal(t, region.subscribedSpan.subID, req.regionInfo.subscribedSpan.subID) +} + +func TestRequestCacheAdd_ForceFlag(t *testing.T) { + cache := newRequestCache(1) + ctx := context.Background() + + // Fill up the cache + region1 := createTestRegionInfo(1, 1) + ok, err := cache.add(ctx, region1, false) + require.True(t, ok) + require.NoError(t, err) + require.Equal(t, 1, cache.getPendingCount()) + + // Try to add another request without force - should fail due to retry limit + region2 := createTestRegionInfo(1, 2) + ok, err = cache.add(ctx, region2, false) + require.False(t, ok) + require.NoError(t, err) + + // With force=true, it should still fail because the channel is full + // The force flag only bypasses the pendingCount check, not the channel capacity + region3 := createTestRegionInfo(1, 3) + ok, err = cache.add(ctx, region3, true) + require.False(t, ok) + require.NoError(t, err) + + // consume the pending queue ann add with force + req, err := cache.pop(ctx) + require.NoError(t, err) + require.NotNil(t, req) + require.Equal(t, region1.verID.GetID(), req.regionInfo.verID.GetID()) + require.Equal(t, region1.subscribedSpan.subID, req.regionInfo.subscribedSpan.subID) + cache.markSent(req) + require.Equal(t, 1, cache.getPendingCount()) + + ok, err = cache.add(ctx, region3, true) + require.True(t, ok) + require.NoError(t, err) + // It is 2 since region1 is unresolved + require.Equal(t, 2, cache.getPendingCount()) + + // resolve region1 + cache.resolve(region1.subscribedSpan.subID, region1.verID.GetID()) + require.Equal(t, 1, cache.getPendingCount()) +} + +func TestRequestCacheAdd_ContextCancellation(t *testing.T) { + cache := newRequestCache(1) + + // Fill up the cache + region1 := createTestRegionInfo(1, 1) + ctx1 := context.Background() + ok, err := cache.add(ctx1, region1, false) + require.True(t, ok) + require.NoError(t, err) + + // Try to add another request with a cancelled context + ctx2, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + region2 := createTestRegionInfo(1, 2) + ok, err = cache.add(ctx2, region2, false) + require.False(t, ok) + require.Error(t, err) + require.Equal(t, context.Canceled, err) +} + +func TestRequestCacheAdd_RetryLimitExceeded(t *testing.T) { + cache := newRequestCache(1) + ctx := context.Background() + + // Fill up the cache + region1 := createTestRegionInfo(1, 1) + ok, err := cache.add(ctx, region1, false) + require.True(t, ok) + require.NoError(t, err) + + // Try to add another request - should eventually hit retry limit + region2 := createTestRegionInfo(1, 2) + ok, err = cache.add(ctx, region2, false) + require.False(t, ok) + require.NoError(t, err) +} + +func TestRequestCacheAdd_SpaceAvailableNotification(t *testing.T) { + cache := newRequestCache(2) + ctx := context.Background() + + // Fill up the cache + region1 := createTestRegionInfo(1, 1) + ok, err := cache.add(ctx, region1, false) + require.True(t, ok) + require.NoError(t, err) + require.Equal(t, 1, cache.getPendingCount()) + + region2 := createTestRegionInfo(1, 2) + ok, err = cache.add(ctx, region2, false) + require.True(t, ok) + require.NoError(t, err) + require.Equal(t, 2, cache.getPendingCount()) + + // Pop a request and mark it as sent, then resolve it to free up space + req, err := cache.pop(ctx) + require.NoError(t, err) + require.NotNil(t, req) + require.Equal(t, 2, cache.getPendingCount()) // pop doesn't change pendingCount + // Mark as sent + cache.markSent(req) + require.Equal(t, 2, cache.getPendingCount()) + + // Resolve the request to free up space + success := cache.resolve(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID()) + require.True(t, success) + require.Equal(t, 1, cache.getPendingCount()) + + // Now we should be able to add another request + region3 := createTestRegionInfo(1, 3) + ok, err = cache.add(ctx, region3, false) + require.True(t, ok) + require.NoError(t, err) + require.Equal(t, 2, cache.getPendingCount()) +} + +func TestRequestCacheAdd_ConcurrentAdds(t *testing.T) { + cache := newRequestCache(10) + ctx := context.Background() + + const numGoroutines = 5 + done := make(chan error, numGoroutines) + + // Start multiple goroutines adding requests concurrently + for i := 0; i < numGoroutines; i++ { + go func(id int) { + region := createTestRegionInfo(SubscriptionID(id%3), uint64(id)) + ok, err := cache.add(ctx, region, false) + require.True(t, ok) + require.NoError(t, err) + done <- err + }(i) + } + + // Wait for all goroutines to complete + for i := 0; i < numGoroutines; i++ { + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(1 * time.Second): + t.Fatal("Timeout waiting for concurrent adds to complete") + } + } + + require.Equal(t, numGoroutines, cache.getPendingCount()) +} + +func TestRequestCacheAdd_StaleRequestCleanup(t *testing.T) { + cache := newRequestCache(10) + ctx := context.Background() + + // Add a request and mark it as sent + region := createTestRegionInfo(1, 1) + ok, err := cache.add(ctx, region, false) + require.True(t, ok) + require.NoError(t, err) + + req, err := cache.pop(ctx) + require.NoError(t, err) + require.NotNil(t, req) + + // Mark as sent + cache.markSent(req) + require.Equal(t, 1, cache.getPendingCount()) + + // Manually set the request as stale by modifying createTime + cache.sentRequests.Lock() + regionReqs := cache.sentRequests.regionReqs[req.regionInfo.subscribedSpan.subID] + regionReqs[req.regionInfo.verID.GetID()] = regionReq{ + regionInfo: req.regionInfo, + createTime: time.Now().Add(-requestGCLifeTime - time.Second), // Make it stale + } + cache.sentRequests.Unlock() + + // Manually set lastCheckStaleRequestTime to bypass the time interval check + cache.lastCheckStaleRequestTime = time.Now().Add(-checkStaleRequestInterval - time.Second) + + // Manually trigger stale cleanup by calling clearStaleRequest + cache.clearStaleRequest() + + // The stale request should be cleaned up + require.Equal(t, 0, cache.getPendingCount()) +} + +func TestRequestCacheAdd_WithStoppedRegion(t *testing.T) { + cache := newRequestCache(10) + ctx := context.Background() + + // Create a region info with stopped state (lockedRangeState = nil) + region := createTestRegionInfo(1, 1) + region.lockedRangeState = nil // This makes it stopped + + ok, err := cache.add(ctx, region, false) + require.True(t, ok) + require.NoError(t, err) + require.Equal(t, 1, cache.getPendingCount()) + + req, err := cache.pop(ctx) + require.NoError(t, err) + require.NotNil(t, req) + + // Mark as sent + cache.markSent(req) + require.Equal(t, 1, cache.getPendingCount()) + + // Manually set lastCheckStaleRequestTime to bypass the time interval check + cache.lastCheckStaleRequestTime = time.Now().Add(-checkStaleRequestInterval - time.Second) + + // Manually trigger cleanup of stopped region + cache.clearStaleRequest() + + // The stopped region should be cleaned up + require.Equal(t, 0, cache.getPendingCount()) +} diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index 72e68cbf4..20a3c44ce 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/kvproto/pkg/cdcpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/security" "github.com/pingcap/ticdc/pkg/util" @@ -50,8 +51,8 @@ type regionRequestWorker struct { // only in this way we can avoid to try to connect to an offline store infinitely. preFetchForConnecting *regionInfo - // used to receive region requests from outside. - requestsCh chan regionInfo + // request cache with flow control + requestCache *requestCache // all regions maintained by this worker. requestedRegions struct { @@ -68,11 +69,14 @@ func newRegionRequestWorker( g *errgroup.Group, store *requestedStore, ) *regionRequestWorker { + config := config.GetGlobalServerConfig() + maxPendingCount := config.Debug.Puller.PendingRegionRequestQueueSize + worker := ®ionRequestWorker{ - workerID: workerIDGen.Add(1), - client: client, - store: store, - requestsCh: make(chan regionInfo, 256), // 256 is an arbitrary number. + workerID: workerIDGen.Add(1), + client: client, + store: store, + requestCache: newRequestCache(maxPendingCount), } worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) @@ -83,15 +87,16 @@ func newRegionRequestWorker( zap.String("addr", store.storeAddr)) } for { - select { - case <-ctx.Done(): - return ctx.Err() - case region := <-worker.requestsCh: - if !region.isStopped() { - worker.preFetchForConnecting = new(regionInfo) - *worker.preFetchForConnecting = region - return nil - } + region, err := worker.requestCache.pop(ctx) + if err != nil { + return err + } + if !region.regionInfo.isStopped() { + worker.preFetchForConnecting = new(regionInfo) + *worker.preFetchForConnecting = region.regionInfo + return nil + } else { + continue } } } @@ -331,22 +336,24 @@ func (s *regionRequestWorker) processRegionSendTask( return nil } - fetchMoreReq := func() (regionInfo, error) { + fetchMoreReq := func() (regionReq, error) { for { - var region regionInfo - select { - case <-ctx.Done(): - return region, ctx.Err() - case region = <-s.requestsCh: - return region, nil + // Try to get from cache + if req, err := s.requestCache.pop(ctx); err != nil { + return regionReq{}, err + } else { + return req, nil } } } + // Handle pre-fetched region first region := *s.preFetchForConnecting s.preFetchForConnecting = nil + regionReq := newRegionReq(region) + var err error for { - // TODO: can region be nil? + region := regionReq.regionInfo subID := region.subscribedSpan.subID log.Debug("region request worker gets a singleRegionInfo", zap.Uint64("workerID", s.workerID), @@ -376,23 +383,23 @@ func (s *regionRequestWorker) processRegionSendTask( } s.client.pushRegionEventToDS(subID, regionEvent) } + } else if region.subscribedSpan.stopped.Load() { // It can be skipped directly because there must be no pending states from // the stopped subscribedTable, or the special singleRegionInfo for stopping // the table will be handled later. s.client.onRegionFail(newRegionErrorInfo(region, &sendRequestToStoreErr{})) } else { - state := newRegionFeedState(region, uint64(subID)) + state := newRegionFeedState(region, uint64(subID), s) state.start() s.addRegionState(subID, region.verID.GetID(), state) - if err := doSend(s.createRegionRequest(region)); err != nil { return err } + s.requestCache.markSent(regionReq) } - - var err error - if region, err = fetchMoreReq(); err != nil { + regionReq, err = fetchMoreReq() + if err != nil { return err } } @@ -420,6 +427,7 @@ func (s *regionRequestWorker) addRegionState(subscriptionID SubscriptionID, regi states = make(regionFeedStates) s.requestedRegions.subscriptions[subscriptionID] = states } + states[regionID] = state } @@ -462,16 +470,24 @@ func (s *regionRequestWorker) clearRegionStates() map[SubscriptionID]regionFeedS return subscriptions } +// add adds a region request to the worker's cache +// It blocks if the cache is full until there's space or ctx is cancelled +func (s *regionRequestWorker) add(ctx context.Context, region regionInfo, force bool) (bool, error) { + return s.requestCache.add(ctx, region, force) +} + func (s *regionRequestWorker) clearPendingRegions() []regionInfo { - regions := make([]regionInfo, 0, len(s.requestsCh)) + var regions []regionInfo + + // Clear pre-fetched region if s.preFetchForConnecting != nil { region := *s.preFetchForConnecting s.preFetchForConnecting = nil regions = append(regions, region) } - // TODO: do we need to start with i := 0(i := len(regions)) if s.preFetchForConnecting is nil? - for i := 1; i < cap(regions); i++ { - regions = append(regions, <-s.requestsCh) - } + + // Clear all regions from cache + cacheRegions := s.requestCache.clear() + regions = append(regions, cacheRegions...) return regions } diff --git a/logservice/logpuller/region_state.go b/logservice/logpuller/region_state.go index 84e23f131..e9c21a7aa 100644 --- a/logservice/logpuller/region_state.go +++ b/logservice/logpuller/region_state.go @@ -87,7 +87,7 @@ func newRegionErrorInfo(info regionInfo, err error) regionErrorInfo { type regionFeedState struct { region regionInfo - requestID uint64 + requestID uint64 // It is also the subscription ID matcher *matcher // Transform: normal -> stopped -> removed. @@ -102,12 +102,15 @@ type regionFeedState struct { // `err` is used to retrieve errors generated outside. err error } + + worker *regionRequestWorker } -func newRegionFeedState(region regionInfo, requestID uint64) *regionFeedState { +func newRegionFeedState(region regionInfo, requestID uint64, worker *regionRequestWorker) *regionFeedState { return ®ionFeedState{ region: region, requestID: requestID, + worker: worker, } } @@ -123,6 +126,7 @@ func (s *regionFeedState) markStopped(err error) { s.state.v = stateStopped s.state.err = err } + s.worker.requestCache.markStopped(s.region.subscribedSpan.subID, s.region.verID.GetID()) } // mark regionFeedState as removed if possible. @@ -134,6 +138,7 @@ func (s *regionFeedState) markRemoved() (changed bool) { changed = true s.matcher.clear() } + s.worker.requestCache.markStopped(s.region.subscribedSpan.subID, s.region.verID.GetID()) return } @@ -157,6 +162,7 @@ func (s *regionFeedState) isInitialized() bool { func (s *regionFeedState) setInitialized() { s.region.lockedRangeState.Initialized.Store(true) + s.worker.requestCache.resolve(s.region.subscribedSpan.subID, s.region.verID.GetID()) } func (s *regionFeedState) getRegionID() uint64 { diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index c17e79cbb..fd4b21e40 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -69,11 +69,8 @@ var ( metricKvIsBusyCounter = metrics.EventFeedErrorCounter.WithLabelValues("KvIsBusy") metricKvCongestedCounter = metrics.EventFeedErrorCounter.WithLabelValues("KvCongested") - metricsubscriptionClientDSChannelSize = metrics.DynamicStreamEventChanSize.WithLabelValues("event-store", "default") - metricsubscriptionClientDSPendingQueueLen = metrics.DynamicStreamPendingQueueLen.WithLabelValues("event-store", "default") - metricEventStoreDSAddPathNum = metrics.DynamicStreamAddPathNum.WithLabelValues("event-store") - metricEventStoreDSRemovePathNum = metrics.DynamicStreamRemovePathNum.WithLabelValues("event-store") - // metricEventStoreDSArrageStreamNum = metrics.DynamicStreamArrangeStreamNum.WithLabelValues("event-store") + metricSubscriptionClientDSChannelSize = metrics.DynamicStreamEventChanSize.WithLabelValues("event-store", "default") + metricSubscriptionClientDSPendingQueueLen = metrics.DynamicStreamPendingQueueLen.WithLabelValues("event-store", "default") ) // To generate an ID for a new subscription. @@ -98,6 +95,7 @@ type rangeTask struct { span heartbeatpb.TableSpan subscribedSpan *subscribedSpan filterLoop bool + priority TaskType } const kvEventsCacheMaxSize = 32 @@ -152,15 +150,7 @@ type SubscriptionClientConfig struct { } type sharedClientMetrics struct { - // regionLockDuration prometheus.Observer - // regionLocateDuration prometheus.Observer - // regionConnectDuration prometheus.Observer batchResolvedSize prometheus.Observer - kvCounter prometheus.Counter - resolvedTsCounter prometheus.Counter - // lockResolveWaitDuration prometheus.Observer - // lockResolveRunDuration prometheus.Observer - // slowInitializeRegion prometheus.Gauge } // subscriptionClient is used to subscribe events of table ranges from TiKV. @@ -193,6 +183,8 @@ type subscriptionClient struct { pdClock pdutil.Clock lockResolver txnutil.LockResolver + stores sync.Map + ds dynstream.DynamicStream[int, SubscriptionID, regionEvent, *subscribedSpan, *regionEventHandler] // the following three fields are used to manage feedback from ds and notify other goroutines mu sync.Mutex @@ -210,9 +202,9 @@ type subscriptionClient struct { // rangeTaskCh is used to receive range tasks. // The tasks will be handled in `handleRangeTask` goroutine. rangeTaskCh chan rangeTask - // regionCh is used to receive region tasks have been locked in rangeLock. + // regionTaskQueue is used to receive region tasks with priority. // The region will be handled in `handleRegions` goroutine. - regionCh chan regionInfo + regionTaskQueue *PriorityQueue // resolveLockTaskCh is used to receive resolve lock tasks. // The tasks will be handled in `handleResolveLockTasks` goroutine. resolveLockTaskCh chan resolveLockTask @@ -231,6 +223,7 @@ func NewSubscriptionClient( subClient := &subscriptionClient{ config: config, + stores: sync.Map{}, pd: pd, regionCache: appcontext.GetService[*tikv.RegionCache](appcontext.RegionCache), pdClock: appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock), @@ -239,7 +232,7 @@ func NewSubscriptionClient( credential: credential, rangeTaskCh: make(chan rangeTask, 1024), - regionCh: make(chan regionInfo, 1024), + regionTaskQueue: NewPriorityQueue(), resolveLockTaskCh: make(chan resolveLockTask, 1024), errCache: newErrCache(), } @@ -292,8 +285,8 @@ func (s *subscriptionClient) updateMetrics(ctx context.Context) error { metrics.LogPullerResolvedTsLag.Set(resolvedTsLag) } dsMetrics := s.ds.GetMetrics() - metricsubscriptionClientDSChannelSize.Set(float64(dsMetrics.EventChanSize)) - metricsubscriptionClientDSPendingQueueLen.Set(float64(dsMetrics.PendingQueueLen)) + metricSubscriptionClientDSChannelSize.Set(float64(dsMetrics.EventChanSize)) + metricSubscriptionClientDSPendingQueueLen.Set(float64(dsMetrics.PendingQueueLen)) if len(dsMetrics.MemoryControl.AreaMemoryMetrics) > 1 { log.Panic("subscription client should have only one area") } @@ -312,6 +305,25 @@ func (s *subscriptionClient) updateMetrics(ctx context.Context) error { "default", ).Set(float64(areaMetric.MemoryUsage())) } + + pendingRegionReqCount := 0 + s.stores.Range(func(key, value any) bool { + store := value.(*requestedStore) + for _, worker := range store.requestWorkers { + pendingRegionReqCount += worker.requestCache.getPendingCount() + } + return true + }) + + metrics.SubscriptionClientRequestedRegionCount.WithLabelValues("pending").Set(float64(pendingRegionReqCount)) + + count := 0 + s.totalSpans.RLock() + for _, rt := range s.totalSpans.spanMap { + count += rt.rangeLock.Len() + } + s.totalSpans.RUnlock() + metrics.SubscriptionClientSubscribedRegionCount.Set(float64(count)) } } } @@ -351,7 +363,7 @@ func (s *subscriptionClient) Subscribe( areaSetting := dynstream.NewAreaSettingsWithMaxPendingSize(1*1024*1024*1024, dynstream.MemoryControlForPuller, "logPuller") // 1GB s.ds.AddPath(rt.subID, rt, areaSetting) - s.rangeTaskCh <- rangeTask{span: span, subscribedSpan: rt, filterLoop: bdrMode} + s.rangeTaskCh <- rangeTask{span: span, subscribedSpan: rt, filterLoop: bdrMode, priority: TaskLowPrior} } // Unsubscribe the given table span. All covered regions will be deregistered asynchronously. @@ -441,6 +453,7 @@ func (s *subscriptionClient) Run(ctx context.Context) error { func (s *subscriptionClient) Close(ctx context.Context) error { // FIXME: close and drain all channels s.ds.Close() + s.regionTaskQueue.Close() return nil } @@ -452,7 +465,7 @@ func (s *subscriptionClient) setTableStopped(rt *subscribedSpan) { // Then send a special singleRegionInfo to regionRouter to deregister the table // from all TiKV instances. if rt.stopped.CompareAndSwap(false, true) { - s.regionCh <- regionInfo{subscribedSpan: rt} + s.regionTaskQueue.Push(NewRegionPriorityTask(TaskHighPrior, regionInfo{subscribedSpan: rt}, s.pdClock.CurrentTS())) if rt.rangeLock.Stop() { s.onTableDrained(rt) } @@ -499,66 +512,83 @@ func (rs *requestedStore) getRequestWorker() *regionRequestWorker { return rs.requestWorkers[index] } -// handleRegions receives regionInfo from regionCh and attch rpcCtx to them, +// handleRegions receives regionInfo from regionTaskQueue and attach rpcCtx to them, // then send them to corresponding requestedStore. func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Group) error { - stores := make(map[string]*requestedStore) // storeAddr -> requestedStore getStore := func(storeAddr string) *requestedStore { var rs *requestedStore - if rs = stores[storeAddr]; rs != nil { + if v, ok := s.stores.Load(storeAddr); ok { + rs = v.(*requestedStore) return rs } + rs = &requestedStore{storeAddr: storeAddr} - stores[storeAddr] = rs + s.stores.Store(storeAddr, rs) for i := uint(0); i < s.config.RegionRequestWorkerPerStore; i++ { requestWorker := newRegionRequestWorker(ctx, s, s.credential, eg, rs) rs.requestWorkers = append(rs.requestWorkers, requestWorker) } - return rs } defer func() { - for _, rs := range stores { + s.stores.Range(func(key, value any) bool { + rs := value.(*requestedStore) for _, w := range rs.requestWorkers { - close(w.requestsCh) - for range w.requestsCh { - // TODO: do we need handle it? - } + w.requestCache.clear() } - } + return true + }) }() for { - select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - case region := <-s.regionCh: - if region.isStopped() { - for _, rs := range stores { - for _, worker := range rs.requestWorkers { - worker.requestsCh <- region - } + // Use blocking Pop to wait for tasks + regionTask, err := s.regionTaskQueue.Pop(ctx) + if err != nil { + return err + } + + region := regionTask.GetRegionInfo() + if region.isStopped() { + s.stores.Range(func(key, value any) bool { + rs := value.(*requestedStore) + for _, worker := range rs.requestWorkers { + worker.add(ctx, region, true) } - continue - } + return true + }) + continue + } - region, ok := s.attachRPCContextForRegion(ctx, region) - // If attachRPCContextForRegion fails, the region will be re-scheduled. - if !ok { - continue - } + region, ok := s.attachRPCContextForRegion(ctx, region) + // If attachRPCContextForRegion fails, the region will be re-scheduled. + if !ok { + continue + } - store := getStore(region.rpcCtx.Addr) - worker := store.getRequestWorker() - worker.requestsCh <- region + store := getStore(region.rpcCtx.Addr) + worker := store.getRequestWorker() + force := regionTask.Priority() <= forcedPriorityBase - log.Debug("subscription client will request a region", - zap.Uint64("workID", worker.workerID), + ok, err = worker.add(ctx, region, force) + if err != nil { + log.Warn("subscription client add region request failed", zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)), zap.Uint64("regionID", region.verID.GetID()), - zap.String("addr", store.storeAddr)) + zap.Error(err)) + return err } + + if !ok { + s.regionTaskQueue.Push(regionTask) + continue + } + + log.Debug("subscription client will request a region", + zap.Uint64("workID", worker.workerID), + zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)), + zap.Uint64("regionID", region.verID.GetID()), + zap.String("addr", store.storeAddr)) } } @@ -589,7 +619,7 @@ func (s *subscriptionClient) handleRangeTasks(ctx context.Context) error { return ctx.Err() case task := <-s.rangeTaskCh: g.Go(func() error { - return s.divideSpanAndScheduleRegionRequests(ctx, task.span, task.subscribedSpan, task.filterLoop) + return s.divideSpanAndScheduleRegionRequests(ctx, task.span, task.subscribedSpan, task.filterLoop, task.priority) }) } } @@ -605,6 +635,7 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests( span heartbeatpb.TableSpan, subscribedSpan *subscribedSpan, filterLoop bool, + taskType TaskType, ) error { // Limit the number of regions loaded at a time to make the load more stable. limit := 1024 @@ -631,7 +662,6 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests( backoffBeforeLoad = true continue } - regionMetas := make([]*metapb.Region, 0, len(regions)) for _, region := range regions { if meta := region.GetMeta(); meta != nil { @@ -668,7 +698,7 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests( regionInfo := newRegionInfo(verID, intersectSpan, nil, subscribedSpan, filterLoop) // Schedule a region request to subscribe the region. - s.scheduleRegionRequest(ctx, regionInfo) + s.scheduleRegionRequest(ctx, regionInfo, taskType) nextSpan.StartKey = regionMeta.EndKey // If the nextSpan.StartKey is larger than the subscribedSpan.span.EndKey, @@ -680,9 +710,9 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests( } } -// scheduleRegionRequest locks the region's range and send the region to regionCh, +// scheduleRegionRequest locks the region's range and send the region to regionTaskQueue, // which will be handled by handleRegions. -func (s *subscriptionClient) scheduleRegionRequest(ctx context.Context, region regionInfo) { +func (s *subscriptionClient) scheduleRegionRequest(ctx context.Context, region regionInfo, priority TaskType) { lockRangeResult := region.subscribedSpan.rangeLock.LockRange( ctx, region.span.StartKey, region.span.EndKey, region.verID.GetID(), region.verID.GetVer()) @@ -693,13 +723,10 @@ func (s *subscriptionClient) scheduleRegionRequest(ctx context.Context, region r switch lockRangeResult.Status { case regionlock.LockRangeStatusSuccess: region.lockedRangeState = lockRangeResult.LockedRangeState - select { - case s.regionCh <- region: - case <-ctx.Done(): - } + s.regionTaskQueue.Push(NewRegionPriorityTask(priority, region, s.pdClock.CurrentTS())) case regionlock.LockRangeStatusStale: for _, r := range lockRangeResult.RetryRanges { - s.scheduleRangeRequest(ctx, r, region.subscribedSpan, region.filterLoop) + s.scheduleRangeRequest(ctx, r, region.subscribedSpan, region.filterLoop, priority) } default: return @@ -710,10 +737,11 @@ func (s *subscriptionClient) scheduleRangeRequest( ctx context.Context, span heartbeatpb.TableSpan, subscribedSpan *subscribedSpan, filterLoop bool, + priority TaskType, ) { select { case <-ctx.Done(): - case s.rangeTaskCh <- rangeTask{span: span, subscribedSpan: subscribedSpan, filterLoop: filterLoop}: + case s.rangeTaskCh <- rangeTask{span: span, subscribedSpan: subscribedSpan, filterLoop: filterLoop, priority: priority}: } } @@ -733,38 +761,38 @@ func (s *subscriptionClient) handleErrors(ctx context.Context) error { func (s *subscriptionClient) doHandleError(ctx context.Context, errInfo regionErrorInfo) error { err := errors.Cause(errInfo.err) + log.Debug("cdc region error", + zap.Uint64("subscriptionID", uint64(errInfo.subscribedSpan.subID)), + zap.Uint64("regionID", errInfo.verID.GetID()), + zap.Error(err)) + switch eerr := err.(type) { case *eventError: innerErr := eerr.err - log.Debug("cdc region error", - zap.Uint64("subscriptionID", uint64(errInfo.subscribedSpan.subID)), - zap.Uint64("regionID", errInfo.verID.GetID()), - zap.Stringer("error", innerErr)) - if notLeader := innerErr.GetNotLeader(); notLeader != nil { metricFeedNotLeaderCounter.Inc() s.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx) - s.scheduleRegionRequest(ctx, errInfo.regionInfo) + s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskHighPrior) return nil } if innerErr.GetEpochNotMatch() != nil { metricFeedEpochNotMatchCounter.Inc() - s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop) + s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) return nil } if innerErr.GetRegionNotFound() != nil { metricFeedRegionNotFoundCounter.Inc() - s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop) + s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) return nil } if innerErr.GetCongested() != nil { metricKvCongestedCounter.Inc() - s.scheduleRegionRequest(ctx, errInfo.regionInfo) + s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskLowPrior) return nil } if innerErr.GetServerIsBusy() != nil { metricKvIsBusyCounter.Inc() - s.scheduleRegionRequest(ctx, errInfo.regionInfo) + s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskLowPrior) return nil } if duplicated := innerErr.GetDuplicateRequest(); duplicated != nil { @@ -783,24 +811,24 @@ func (s *subscriptionClient) doHandleError(ctx context.Context, errInfo regionEr zap.Uint64("subscriptionID", uint64(errInfo.subscribedSpan.subID)), zap.Stringer("error", innerErr)) metricFeedUnknownErrorCounter.Inc() - s.scheduleRegionRequest(ctx, errInfo.regionInfo) + s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskHighPrior) return nil case *rpcCtxUnavailableErr: metricFeedRPCCtxUnavailable.Inc() - s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop) + s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) return nil case *getStoreErr: metricGetStoreErr.Inc() bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) // cannot get the store the region belongs to, so we need to reload the region. s.regionCache.OnSendFail(bo, errInfo.rpcCtx, true, err) - s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop) + s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) return nil case *sendRequestToStoreErr: metricStoreSendRequestErr.Inc() bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) s.regionCache.OnSendFail(bo, errInfo.rpcCtx, regionScheduleReload, err) - s.scheduleRegionRequest(ctx, errInfo.regionInfo) + s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskHighPrior) return nil case *requestCancelledErr: // the corresponding subscription has been unsubscribed, just ignore. diff --git a/logservice/logpuller/subscription_client_test.go b/logservice/logpuller/subscription_client_test.go index 09d48e267..f96d3cbb7 100644 --- a/logservice/logpuller/subscription_client_test.go +++ b/logservice/logpuller/subscription_client_test.go @@ -64,10 +64,13 @@ func TestGenerateResolveLockTask(t *testing.T) { require.True(t, false, "must get a resolve lock task") } + worker := ®ionRequestWorker{ + requestCache: &requestCache{}, + } // Lock another range, no task will be triggered before initialized. res = span.rangeLock.LockRange(context.Background(), []byte{'c'}, []byte{'d'}, 2, 100) require.Equal(t, regionlock.LockRangeStatusSuccess, res.Status) - state := newRegionFeedState(regionInfo{lockedRangeState: res.LockedRangeState, subscribedSpan: span}, 1) + state := newRegionFeedState(regionInfo{lockedRangeState: res.LockedRangeState, subscribedSpan: span}, 1, worker) span.resolveStaleLocks(200) select { case task := <-client.resolveLockTaskCh: diff --git a/metrics/grafana/ticdc_new_arch.json b/metrics/grafana/ticdc_new_arch.json index ce4ce6ae8..1235c1a21 100644 --- a/metrics/grafana/ticdc_new_arch.json +++ b/metrics/grafana/ticdc_new_arch.json @@ -6399,86 +6399,94 @@ } }, { - "cards": { - "cardPadding": 0, - "cardRound": 0 - }, - "color": { - "cardColor": "#FF9830", - "colorScale": "linear", - "colorScheme": "interpolateSpectral", - "exponent": 0.5, - "min": 0, - "mode": "spectrum" - }, - "dataFormat": "tsbuckets", + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The size of batch resolved regions count", + "description": "To prevent excessive accumulation of region request tasks on the TiKV side, CDC will implement rate limiting on the number of requests it initiates.", "fieldConfig": { "defaults": {}, "overrides": [] }, + "fill": 1, + "fillGradient": 0, "gridPos": { "h": 8, "w": 12, "x": 12, "y": 8 }, - "heatmap": {}, - "hideZeroBuckets": true, - "highlightCards": true, - "id": 12046, + "hiddenSeries": false, + "id": 22296, "legend": { - "alignAsTable": true, "avg": false, - "current": true, - "max": true, + "current": false, + "max": false, "min": false, - "rightSide": true, "show": true, - "sort": "current", - "sortDesc": true, "total": false, - "values": true + "values": false }, - "links": [], - "reverseYBuckets": false, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.17", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, "targets": [ { "exemplar": true, - "expr": "sum(rate(ticdc_kvclient_batch_resolved_event_size_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\"}[1m])) by (le)", - "format": "heatmap", - "instant": false, + "expr": "ticdc_subscription_client_requested_region_count", + "hide": false, "interval": "", - "intervalFactor": 2, - "legendFormat": "{{le}}", + "legendFormat": "{{instance}}-count", "refId": "A" } ], - "title": "KV client batch resolved region count", + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Unresolved Region Request Count ", "tooltip": { - "show": true, - "showHistogram": true - }, - "tooltipDecimals": 1, - "type": "heatmap", - "xAxis": { - "show": true + "shared": true, + "sort": 0, + "value_type": "individual" }, - "xBucketNumber": null, - "xBucketSize": null, - "yAxis": { - "decimals": 1, - "format": "none", - "logBase": 1, - "max": null, - "min": null, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, "show": true, - "splitFactor": null + "values": [] }, - "yBucketBound": "upper", - "yBucketNumber": null, - "yBucketSize": null + "yaxes": [ + { + "format": "short", + "logBase": 1, + "min": "0", + "show": true + }, + { + "format": "short", + "logBase": 1, + "show": false + } + ], + "yaxis": { + "align": false + } }, { "aliasColors": {}, @@ -6486,12 +6494,12 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "Log puller memory quota", + "description": "", "fieldConfig": { "defaults": {}, "overrides": [] }, - "fill": 0, + "fill": 1, "fillGradient": 0, "gridPos": { "h": 8, @@ -6500,7 +6508,7 @@ "y": 16 }, "hiddenSeries": false, - "id": 22291, + "id": 22261, "legend": { "alignAsTable": true, "avg": false, @@ -6513,6 +6521,7 @@ }, "lines": true, "linewidth": 1, + "links": [], "nullPointMode": "null", "options": { "alertThreshold": true @@ -6522,6 +6531,7 @@ "pointradius": 2, "points": false, "renderer": "flot", + "repeatDirection": "h", "seriesOverrides": [], "spaceLength": 10, "stack": false, @@ -6529,17 +6539,28 @@ "targets": [ { "exemplar": true, - "expr": "ticdc_dynamic_stream_memory_usage{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", namespace=~\"$namespace\",changefeed=~\"$changefeed\", component=~\"log-puller\", instance=~\"$ticdc_instance\"}", + "expr": "histogram_quantile(0.99, sum(rate(ticdc_subscription_client_region_request_finish_scan_duration_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\"}[1m])) by (le, instance))", + "format": "heatmap", + "instant": false, "interval": "", - "legendFormat": "{{area}}-{{instance}}-memory-{{component}}-{{type}}", + "intervalFactor": 2, + "legendFormat": "{{instance}}-p99", "refId": "A" + }, + { + "exemplar": true, + "expr": "sum(rate(ticdc_subscription_client_region_request_finish_scan_duration_sum{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\"}[1m])) by (instance)\n/\nsum(rate(ticdc_subscription_client_region_request_finish_scan_duration_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\"}[1m])) by (instance)", + "hide": false, + "interval": "", + "legendFormat": "{{instance}}-avg", + "refId": "B" } ], "thresholds": [], "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Memory Quota", + "title": "Region request finish scan duration", "tooltip": { "shared": true, "sort": 0, @@ -6555,19 +6576,25 @@ }, "yaxes": [ { - "format": "bytes", + "format": "s", + "label": null, "logBase": 1, - "min": "0", + "max": null, + "min": null, "show": true }, { "format": "short", + "label": null, "logBase": 1, + "max": null, + "min": null, "show": false } ], "yaxis": { - "align": false + "align": false, + "alignLevel": null } }, { @@ -6590,7 +6617,7 @@ "y": 16 }, "hiddenSeries": false, - "id": 22296, + "id": 22299, "legend": { "avg": false, "current": false, @@ -6618,10 +6645,10 @@ "targets": [ { "exemplar": true, - "expr": "ticdc_subscription_client_requested_region_count", + "expr": "ticdc_subscription_client_subscribed_region_count", "hide": false, "interval": "", - "legendFormat": "", + "legendFormat": "{{instance}}-count", "refId": "A" } ], @@ -6629,7 +6656,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Unresolved Region Request Count ", + "title": "Subscribed region count", "tooltip": { "shared": true, "sort": 0, @@ -6666,21 +6693,21 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "", + "description": "Log puller memory quota", "fieldConfig": { "defaults": {}, "overrides": [] }, - "fill": 1, + "fill": 0, "fillGradient": 0, "gridPos": { - "h": 6, + "h": 8, "w": 12, "x": 0, "y": 24 }, "hiddenSeries": false, - "id": 22261, + "id": 22291, "legend": { "alignAsTable": true, "avg": false, @@ -6693,7 +6720,6 @@ }, "lines": true, "linewidth": 1, - "links": [], "nullPointMode": "null", "options": { "alertThreshold": true @@ -6703,7 +6729,6 @@ "pointradius": 2, "points": false, "renderer": "flot", - "repeatDirection": "h", "seriesOverrides": [], "spaceLength": 10, "stack": false, @@ -6711,12 +6736,9 @@ "targets": [ { "exemplar": true, - "expr": "histogram_quantile(0.99, sum(rate(ticdc_subscription_client_region_request_finish_scan_duration_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\"}[1m])) by (le, instance))", - "format": "heatmap", - "instant": false, + "expr": "ticdc_dynamic_stream_memory_usage{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", namespace=~\"$namespace\",changefeed=~\"$changefeed\", component=~\"log-puller\", instance=~\"$ticdc_instance\"}", "interval": "", - "intervalFactor": 2, - "legendFormat": "{{instance}}-p99", + "legendFormat": "{{area}}-{{instance}}-memory-{{component}}-{{type}}", "refId": "A" } ], @@ -6724,7 +6746,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Region request finish scan duration", + "title": "Memory Quota", "tooltip": { "shared": true, "sort": 0, @@ -6740,135 +6762,102 @@ }, "yaxes": [ { - "format": "s", - "label": null, + "format": "bytes", "logBase": 1, - "max": null, - "min": null, + "min": "0", "show": true }, { "format": "short", - "label": null, "logBase": 1, - "max": null, - "min": null, "show": false } ], "yaxis": { - "align": false, - "alignLevel": null + "align": false } }, { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, + "cards": { + "cardPadding": 0, + "cardRound": 0 + }, + "color": { + "cardColor": "#FF9830", + "colorScale": "linear", + "colorScheme": "interpolateSpectral", + "exponent": 0.5, + "min": 0, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", "datasource": "${DS_TEST-CLUSTER}", - "description": "", + "description": "The size of batch resolved regions count", "fieldConfig": { "defaults": {}, "overrides": [] }, - "fill": 1, - "fillGradient": 0, "gridPos": { - "h": 6, + "h": 8, "w": 12, - "x": 0, + "x": 12, "y": 24 }, - "hiddenSeries": false, - "id": 22261, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 12046, "legend": { "alignAsTable": true, "avg": false, "current": true, "max": true, "min": false, + "rightSide": true, "show": true, + "sort": "current", + "sortDesc": true, "total": false, "values": true }, - "lines": true, - "linewidth": 1, "links": [], - "nullPointMode": "null", - "options": { - "alertThreshold": true - }, - "percentage": false, - "pluginVersion": "7.5.17", - "pointradius": 2, - "points": false, - "renderer": "flot", - "repeatDirection": "h", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, + "reverseYBuckets": false, "targets": [ { "exemplar": true, - "expr": "sum(rate(ticdc_event_service_scan_duration_sum{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\"}[1m])) by (instance) / \nsum(rate(ticdc_event_service_scan_duration_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\"}[1m])) by (instance)", + "expr": "sum(rate(ticdc_kvclient_batch_resolved_event_size_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\"}[1m])) by (le)", "format": "heatmap", "instant": false, "interval": "", "intervalFactor": 2, - "legendFormat": "{{instance}}-avg", + "legendFormat": "{{le}}", "refId": "A" - }, - { - "exemplar": true, - "expr": "histogram_quantile(0.999, sum(rate(ticdc_event_service_scan_duration_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\"}[1m])) by (le, instance))", - "hide": false, - "interval": "", - "legendFormat": "{{instance}}-p999", - "refId": "B" } ], - "thresholds": [], - "timeFrom": null, - "timeRegions": [], - "timeShift": null, - "title": "Event service scan duration", + "title": "KV client batch resolved region count", "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" + "show": true, + "showHistogram": true }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, + "tooltipDecimals": 1, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": 1, + "format": "none", + "logBase": 1, + "max": null, + "min": null, "show": true, - "values": [] + "splitFactor": null }, - "yaxes": [ - { - "format": "s", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": false - } - ], - "yaxis": { - "align": false, - "alignLevel": null - } + "yBucketBound": "upper", + "yBucketNumber": null, + "yBucketSize": null } ], "title": "Log Puller", diff --git a/pkg/config/debug.go b/pkg/config/debug.go index 484732484..f46b688d4 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -59,6 +59,8 @@ type PullerConfig struct { ResolvedTsStuckInterval TomlDuration `toml:"resolved-ts-stuck-interval" json:"resolved-ts-stuck-interval"` // LogRegionDetails determines whether logs Region details or not in puller and kv-client. LogRegionDetails bool `toml:"log-region-details" json:"log-region-details"` + + PendingRegionRequestQueueSize int `toml:"pending-region-request-queue-size" json:"pending-region-request-queue-size"` } // NewDefaultPullerConfig return the default puller configuration @@ -67,6 +69,7 @@ func NewDefaultPullerConfig() *PullerConfig { EnableResolvedTsStuckDetection: false, ResolvedTsStuckInterval: TomlDuration(5 * time.Minute), LogRegionDetails: false, + PendingRegionRequestQueueSize: 64, // Base on test result } } diff --git a/pkg/errors/error.go b/pkg/errors/error.go index 0671c0cea..6965b3b75 100644 --- a/pkg/errors/error.go +++ b/pkg/errors/error.go @@ -714,4 +714,10 @@ var ( ErrTCPServerClosed = errors.Normalize("The TCP server has been closed", errors.RFCCodeText("CDC:ErrTCPServerClosed"), ) + + // puller related errors + ErrAddRegionRequestRetryLimitExceeded = errors.Normalize( + "add region request retry limit exceeded", + errors.RFCCodeText("CDC:ErrAddRegionRequestRetryLimitExceeded"), + ) ) diff --git a/pkg/metrics/log_puller.go b/pkg/metrics/log_puller.go index 209c72858..d84ccd7e8 100644 --- a/pkg/metrics/log_puller.go +++ b/pkg/metrics/log_puller.go @@ -45,10 +45,45 @@ var ( Name: "resolved_ts_lag", Help: "The resolved ts lag of subscription client.", }) + + SubscriptionClientRequestedRegionCount = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "subscription_client", + Name: "requested_region_count", + Help: "The number of requested regions", + }, []string{"state"}) + RegionRequestFinishScanDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "subscription_client", + Name: "region_request_finish_scan_duration", + Help: "duration (s) for region request to be finished.", + Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 28), // 40us to 1.5h + }) + SubscriptionClientAddRegionRequestDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "subscription_client", + Name: "add_region_request_duration", + Help: "The cost of adding region request", + Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 28), // 40us to 1.5h + }) + SubscriptionClientSubscribedRegionCount = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "subscription_client", + Name: "subscribed_region_count", + Help: "The number of locked ranges", + }) ) func InitLogPullerMetrics(registry *prometheus.Registry) { registry.MustRegister(LogPullerPrewriteCacheRowNum) registry.MustRegister(LogPullerMatcherCount) registry.MustRegister(LogPullerResolvedTsLag) + registry.MustRegister(SubscriptionClientRequestedRegionCount) + registry.MustRegister(SubscriptionClientAddRegionRequestDuration) + registry.MustRegister(RegionRequestFinishScanDuration) + registry.MustRegister(SubscriptionClientSubscribedRegionCount) }