Skip to content
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
b4dcf82
logpuller: add region task priority queue for subscription client
asddongmen Aug 18, 2025
8b79ff9
adjust priority
asddongmen Aug 18, 2025
f3d9c1c
update
hongyunyan Aug 18, 2025
1fd5241
Merge remote-tracking branch 'hyy/0818' into impl-puller-prior-queue
asddongmen Aug 18, 2025
e25972c
update
hongyunyan Aug 19, 2025
b7f18e5
Merge remote-tracking branch 'hyy/0818' into impl-puller-prior-queue
asddongmen Aug 19, 2025
d9fa3c9
puller: add region req cache
asddongmen Aug 19, 2025
5b3a257
add config
asddongmen Aug 19, 2025
ffe838a
add metrics
asddongmen Aug 19, 2025
533683a
puller: add ticker
asddongmen Aug 19, 2025
d15e8c0
puller: add denug log
asddongmen Aug 19, 2025
eb91e00
puller: force add high prior task
asddongmen Aug 19, 2025
86d0694
adjust some paremeter
asddongmen Aug 25, 2025
726fb15
Merge remote-tracking branch 'upstream/master' into impl-puller-prior…
asddongmen Aug 25, 2025
a2918a6
add more debug log
asddongmen Aug 27, 2025
b987418
add metrics
asddongmen Aug 27, 2025
95926bf
adjust
asddongmen Aug 27, 2025
bcf95a8
add debug log
asddongmen Aug 27, 2025
e091010
fix bug
asddongmen Aug 27, 2025
7aa97fa
fix bug 2
asddongmen Aug 27, 2025
4f9c797
puller: add unit test
asddongmen Sep 1, 2025
d885810
puller: fix a bug
asddongmen Sep 1, 2025
4faae24
puller: fix a bug 2
asddongmen Sep 1, 2025
8df69ac
Merge remote-tracking branch 'upstream/master' into impl-puller-prior…
asddongmen Sep 2, 2025
6b0bc1d
puller: fix a bug 3
asddongmen Sep 3, 2025
86062b2
puller: add debug log
asddongmen Sep 3, 2025
59274e7
puller: add debug log
asddongmen Sep 3, 2025
45efe07
puller: add debug log 2
asddongmen Sep 3, 2025
ff4a5fc
puller: fix bug 2
asddongmen Sep 3, 2025
bd55041
puller: fix bug 3
asddongmen Sep 3, 2025
1b75be6
puller: fix bug 4
asddongmen Sep 3, 2025
a357b7f
puller: fix bug 5
asddongmen Sep 3, 2025
18a0f15
puller: adjust
asddongmen Sep 3, 2025
f02a947
puller: adjust 2
asddongmen Sep 3, 2025
8a6d2e3
puller: adjust 3
asddongmen Sep 3, 2025
8788d5c
puller: adjust 4
asddongmen Sep 3, 2025
732ea58
adjust log
asddongmen Sep 3, 2025
ca806dc
adjust 3
asddongmen Sep 3, 2025
799e770
adjust 4
asddongmen Sep 3, 2025
4d286a6
adjust 5
asddongmen Sep 3, 2025
7afd299
fix data race
asddongmen Sep 4, 2025
46aed04
make fmt
asddongmen Sep 4, 2025
9cebbc3
update grafana
asddongmen Sep 4, 2025
c3efdd7
fix ut
asddongmen Sep 4, 2025
d37ce1d
add ut
asddongmen Sep 4, 2025
9ce395d
add ut
asddongmen Sep 4, 2025
4213039
merge upstream
asddongmen Sep 4, 2025
ddb278f
fix metrics
asddongmen Sep 4, 2025
f8d6a0b
merge upstream master
asddongmen Sep 5, 2025
7793669
adjust metrics
asddongmen Sep 5, 2025
5057695
add subscribe region count metrics
asddongmen Sep 5, 2025
ce6b2e6
refine metrics
asddongmen Sep 5, 2025
a3d6112
fix some typo
asddongmen Sep 5, 2025
fe5548d
fix metrics
asddongmen Sep 5, 2025
4f8f383
update
hongyunyan Sep 5, 2025
ca50036
update
hongyunyan Sep 5, 2025
e9b2e2d
puller: filter out negative metrics
asddongmen Sep 5, 2025
346f8ab
update
hongyunyan Sep 5, 2025
42532a3
Merge remote-tracking branch 'hyy/0905-2' into impl-puller-prior-queue
asddongmen Sep 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions logservice/logpuller/priority_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package logpuller

import (
"context"
"sync"

"github.com/pingcap/ticdc/utils/heap"
)

// PriorityQueue is a thread-safe priority queue for region tasks
// It integrates a signal channel to support blocking operations
type PriorityQueue struct {
mu sync.Mutex
heap *heap.Heap[PriorityTask]

// signal channel for blocking operations
signal chan struct{}
}

// NewPriorityQueue creates a new priority queue
func NewPriorityQueue() *PriorityQueue {
return &PriorityQueue{
heap: heap.NewHeap[PriorityTask](),
signal: make(chan struct{}, 1024),
}
}

// Push adds a task to the priority queue and sends a signal
// This is a non-blocking operation
func (pq *PriorityQueue) Push(task PriorityTask) {
pq.mu.Lock()
pq.heap.AddOrUpdate(task)
pq.mu.Unlock()

// Send signal to notify waiting consumers
select {
case pq.signal <- struct{}{}:
default:
// Signal channel is full, ignore
}
}

// Pop removes and returns the highest priority task
// This is a blocking operation that waits for a signal
// Returns nil if the context is cancelled
func (pq *PriorityQueue) Pop(ctx context.Context) PriorityTask {
for {
// First try to pop without waiting
pq.mu.Lock()
task, ok := pq.heap.PopTop()
pq.mu.Unlock()

if ok {
return task
}

// Queue is empty, wait for signal
select {
case <-ctx.Done():
return nil
case <-pq.signal:
// Got signal, try to pop again
continue
}
}
}

// TryPop attempts to pop a task without blocking
// Returns nil if the queue is empty
func (pq *PriorityQueue) TryPop() PriorityTask {
pq.mu.Lock()
defer pq.mu.Unlock()

task, ok := pq.heap.PopTop()
if !ok {
return nil
}
return task
}

// Peek returns the highest priority task without removing it
// Returns nil if the queue is empty
func (pq *PriorityQueue) Peek() PriorityTask {
pq.mu.Lock()
defer pq.mu.Unlock()

task, ok := pq.heap.PeekTop()
if !ok {
return nil
}
return task
}

// Len returns the number of tasks in the queue
func (pq *PriorityQueue) Len() int {
pq.mu.Lock()
defer pq.mu.Unlock()

return pq.heap.Len()
}

// Close closes the signal channel
func (pq *PriorityQueue) Close() {
// pop all tasks
for pq.Len() > 0 {
pq.TryPop()
}
close(pq.signal)
}
Loading
Loading