Skip to content

Commit 2b1fafa

Browse files
authored
plugin: fix bug that watch loop will refresh frequently when channel closed (#49275) (#49289)
close #49273
1 parent de7060b commit 2b1fafa

File tree

3 files changed

+69
-5
lines changed

3 files changed

+69
-5
lines changed

plugin/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ go_test(
3838
],
3939
embed = [":plugin"],
4040
flaky = True,
41-
shard_count = 10,
41+
shard_count = 11,
4242
deps = [
4343
"//kv",
4444
"//parser/mysql",
@@ -48,6 +48,7 @@ go_test(
4848
"//testkit",
4949
"//testkit/testsetup",
5050
"@com_github_stretchr_testify//require",
51+
"@io_etcd_go_etcd_client_v3//:client",
5152
"@org_uber_go_goleak//:goleak",
5253
],
5354
)

plugin/plugin.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
gplugin "plugin"
2121
"strconv"
2222
"sync/atomic"
23+
"time"
2324
"unsafe"
2425

2526
"github.com/pingcap/errors"
@@ -273,14 +274,33 @@ func (w *flushWatcher) refreshPluginState() error {
273274
}
274275
return nil
275276
}
276-
277277
func (w *flushWatcher) watchLoop() {
278-
watchChan := w.etcd.Watch(w.ctx, w.path)
278+
const reWatchInterval = time.Second * 5
279+
logutil.BgLogger().Info("plugin flushWatcher loop started", zap.String("plugin", w.manifest.Name))
280+
for w.ctx.Err() == nil {
281+
ch := w.etcd.Watch(w.ctx, w.path)
282+
if exit := w.watchLoopWithChan(ch); exit {
283+
break
284+
}
285+
286+
logutil.BgLogger().Info(
287+
"plugin flushWatcher old chan closed, restart loop later",
288+
zap.String("plugin", w.manifest.Name),
289+
zap.Duration("after", reWatchInterval))
290+
time.Sleep(reWatchInterval)
291+
}
292+
}
293+
294+
func (w *flushWatcher) watchLoopWithChan(ch clientv3.WatchChan) (exit bool) {
279295
for {
280296
select {
281297
case <-w.ctx.Done():
282-
return
283-
case <-watchChan:
298+
return true
299+
case _, ok := <-ch:
300+
if !ok {
301+
return false
302+
}
303+
logutil.BgLogger().Info("plugin flushWatcher detected event to reload plugin config", zap.String("plugin", w.manifest.Name))
284304
_ = w.refreshPluginState()
285305
}
286306
}

plugin/plugin_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@ import (
1818
"context"
1919
"io"
2020
"strconv"
21+
"sync/atomic"
2122
"testing"
23+
"time"
2224

2325
"github.com/pingcap/tidb/sessionctx/variable"
2426
"github.com/stretchr/testify/require"
27+
clientv3 "go.etcd.io/etcd/client/v3"
2528
)
2629

2730
func TestLoadPluginSuccess(t *testing.T) {
@@ -242,3 +245,43 @@ func TestPluginsClone(t *testing.T) {
242245
require.Equal(t, uint16(1), cps.versions["whitelist"])
243246
require.Len(t, cps.dyingPlugins, 1)
244247
}
248+
249+
func TestPluginWatcherLoop(t *testing.T) {
250+
// exit when ctx done
251+
ctx, cancel := context.WithCancel(context.Background())
252+
watcher := &flushWatcher{
253+
ctx: ctx,
254+
manifest: &Manifest{
255+
Name: "test",
256+
},
257+
}
258+
ch := make(chan clientv3.WatchResponse)
259+
var cancelled atomic.Bool
260+
go func() {
261+
time.Sleep(10 * time.Millisecond)
262+
cancelled.Store(true)
263+
cancel()
264+
}()
265+
exit := watcher.watchLoopWithChan(ch)
266+
require.True(t, exit)
267+
require.True(t, cancelled.Load())
268+
269+
// exit when ch closed
270+
watcher = &flushWatcher{
271+
ctx: context.Background(),
272+
manifest: &Manifest{
273+
Name: "test",
274+
},
275+
}
276+
277+
var closed atomic.Bool
278+
ch = make(chan clientv3.WatchResponse)
279+
go func() {
280+
time.Sleep(10 * time.Millisecond)
281+
closed.Store(true)
282+
close(ch)
283+
}()
284+
exit = watcher.watchLoopWithChan(ch)
285+
require.False(t, exit)
286+
require.True(t, cancelled.Load())
287+
}

0 commit comments

Comments
 (0)