Skip to content

Commit e83dbdd

Browse files
authored
binlog: fix show pump/drainer status (pingcap#44764) (pingcap#44993)
ref pingcap#42643
1 parent 2675d9f commit e83dbdd

File tree

3 files changed

+23
-11
lines changed

3 files changed

+23
-11
lines changed

executor/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ go_library(
136136
"//privilege/privileges",
137137
"//session/txninfo",
138138
"//sessionctx",
139+
"//sessionctx/binloginfo",
139140
"//sessionctx/sessionstates",
140141
"//sessionctx/stmtctx",
141142
"//sessionctx/variable",

executor/change.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,15 @@ type ChangeExec struct {
3535
func (e *ChangeExec) Next(ctx context.Context, req *chunk.Chunk) error {
3636
kind := strings.ToLower(e.NodeType)
3737
urls := config.GetGlobalConfig().Path
38-
registry, err := createRegistry(urls)
38+
registry, needToClose, err := getOrCreateBinlogRegistry(urls)
3939
if err != nil {
4040
return err
4141
}
42+
if needToClose {
43+
defer func() {
44+
_ = registry.Close()
45+
}()
46+
}
4247
nodes, _, err := registry.Nodes(ctx, node.NodePrefix[kind])
4348
if err != nil {
4449
return err

executor/show.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"github.com/pingcap/tidb/privilege"
5151
"github.com/pingcap/tidb/privilege/privileges"
5252
"github.com/pingcap/tidb/sessionctx"
53+
"github.com/pingcap/tidb/sessionctx/binloginfo"
5354
"github.com/pingcap/tidb/sessionctx/sessionstates"
5455
"github.com/pingcap/tidb/sessionctx/stmtctx"
5556
"github.com/pingcap/tidb/sessionctx/variable"
@@ -1775,16 +1776,18 @@ func (e *ShowExec) fetchShowWarnings(errOnly bool) error {
17751776

17761777
// fetchShowPumpOrDrainerStatus gets status of all pumps or drainers and fill them into e.rows.
17771778
func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error {
1778-
registry, err := createRegistry(config.GetGlobalConfig().Path)
1779+
registry, needToClose, err := getOrCreateBinlogRegistry(config.GetGlobalConfig().Path)
17791780
if err != nil {
17801781
return errors.Trace(err)
17811782
}
17821783

1783-
nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind])
1784-
if err != nil {
1785-
return errors.Trace(err)
1784+
if needToClose {
1785+
defer func() {
1786+
_ = registry.Close()
1787+
}()
17861788
}
1787-
err = registry.Close()
1789+
1790+
nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind])
17881791
if err != nil {
17891792
return errors.Trace(err)
17901793
}
@@ -1799,18 +1802,21 @@ func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error {
17991802
return nil
18001803
}
18011804

1802-
// createRegistry returns an ectd registry
1803-
func createRegistry(urls string) (*node.EtcdRegistry, error) {
1805+
// getOrCreateBinlogRegistry returns an etcd registry for binlog, need to close, and error
1806+
func getOrCreateBinlogRegistry(urls string) (*node.EtcdRegistry, bool, error) {
1807+
if pumpClient := binloginfo.GetPumpsClient(); pumpClient != nil && pumpClient.EtcdRegistry != nil {
1808+
return pumpClient.EtcdRegistry, false, nil
1809+
}
18041810
ectdEndpoints, err := util.ParseHostPortAddr(urls)
18051811
if err != nil {
1806-
return nil, errors.Trace(err)
1812+
return nil, false, errors.Trace(err)
18071813
}
18081814
cli, err := etcd.NewClientFromCfg(ectdEndpoints, etcdDialTimeout, node.DefaultRootPath, nil)
18091815
if err != nil {
1810-
return nil, errors.Trace(err)
1816+
return nil, false, errors.Trace(err)
18111817
}
18121818

1813-
return node.NewEtcdRegistry(cli, etcdDialTimeout), nil
1819+
return node.NewEtcdRegistry(cli, etcdDialTimeout), true, nil
18141820
}
18151821

18161822
func (e *ShowExec) getTable() (table.Table, error) {

0 commit comments

Comments
 (0)