Skip to content

Commit d1bf544

Browse files
committed
add microservice into table cluster_info
Signed-off-by: Ryan Leung <[email protected]>
1 parent 27ce02a commit d1bf544

File tree

2 files changed

+113
-2
lines changed

2 files changed

+113
-2
lines changed

pkg/executor/infoschema_reader_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ func TestInspectionTables(t *testing.T) {
4646
"tikv,127.0.0.1:11080,127.0.0.1:10080,mock-version,mock-githash,0",
4747
"tiproxy,127.0.0.1:6000,127.0.0.1:3380,mock-version,mock-githash,0",
4848
"ticdc,127.0.0.1:8300,127.0.0.1:8301,mock-version,mock-githash,0",
49+
"tso,127.0.0.1:3379,127.0.0.1:3379,mock-version,mock-githash,0",
50+
"scheduling,127.0.0.1:4379,127.0.0.1:4379,mock-version,mock-githash,0",
4951
}
5052
fpName := "github.com/pingcap/tidb/pkg/infoschema/mockClusterInfo"
5153
fpExpr := `return("` + strings.Join(instances, ";") + `")`
@@ -58,6 +60,8 @@ func TestInspectionTables(t *testing.T) {
5860
"tikv 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash 0",
5961
"tiproxy 127.0.0.1:6000 127.0.0.1:3380 mock-version mock-githash 0",
6062
"ticdc 127.0.0.1:8300 127.0.0.1:8301 mock-version mock-githash 0",
63+
"tso 127.0.0.1:3379 127.0.0.1:3379 mock-version mock-githash 0",
64+
"scheduling 127.0.0.1:4379 127.0.0.1:4379 mock-version mock-githash 0",
6165
))
6266

6367
// enable inspection mode
@@ -69,9 +73,11 @@ func TestInspectionTables(t *testing.T) {
6973
"tikv 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash 0",
7074
"tiproxy 127.0.0.1:6000 127.0.0.1:3380 mock-version mock-githash 0",
7175
"ticdc 127.0.0.1:8300 127.0.0.1:8301 mock-version mock-githash 0",
76+
"tso 127.0.0.1:3379 127.0.0.1:3379 mock-version mock-githash 0",
77+
"scheduling 127.0.0.1:4379 127.0.0.1:4379 mock-version mock-githash 0",
7278
))
7379
require.NoError(t, inspectionTableCache["cluster_info"].Err)
74-
require.Len(t, inspectionTableCache["cluster_info"].Rows, 5)
80+
require.Len(t, inspectionTableCache["cluster_info"].Rows, 7)
7581

7682
// check whether is obtain data from cache at the next time
7783
inspectionTableCache["cluster_info"].Rows[0][0].SetString("modified-pd", mysql.DefaultCollationName)
@@ -81,6 +87,8 @@ func TestInspectionTables(t *testing.T) {
8187
"tikv 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash 0",
8288
"tiproxy 127.0.0.1:6000 127.0.0.1:3380 mock-version mock-githash 0",
8389
"ticdc 127.0.0.1:8300 127.0.0.1:8301 mock-version mock-githash 0",
90+
"tso 127.0.0.1:3379 127.0.0.1:3379 mock-version mock-githash 0",
91+
"scheduling 127.0.0.1:4379 127.0.0.1:4379 mock-version mock-githash 0",
8492
))
8593
tk.Session().GetSessionVars().InspectionTableCache = nil
8694
}

pkg/infoschema/tables.go

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,13 @@ const (
232232
DataLockWaitsColumnSQLDigestText = "SQL_DIGEST_TEXT"
233233
)
234234

235+
const (
236+
// TSOServiceName is the name of TSO service.
237+
TSOServiceName = "tso"
238+
// SchedulingServiceName is the name of scheduling service.
239+
SchedulingServiceName = "scheduling"
240+
)
241+
235242
var tableIDMap = map[string]int64{
236243
TableSchemata: autoid.InformationSchemaDBID + 1,
237244
TableTables: autoid.InformationSchemaDBID + 2,
@@ -1806,7 +1813,7 @@ func GetClusterServerInfo(ctx sessionctx.Context) ([]ServerInfo, error) {
18061813
type retriever func(ctx sessionctx.Context) ([]ServerInfo, error)
18071814
retrievers := []retriever{GetTiDBServerInfo, GetPDServerInfo, func(ctx sessionctx.Context) ([]ServerInfo, error) {
18081815
return GetStoreServerInfo(ctx.GetStore())
1809-
}, GetTiProxyServerInfo, GetTiCDCServerInfo}
1816+
}, GetTiProxyServerInfo, GetTiCDCServerInfo, GetTSOServerInfo, GetSchedulingServerInfo}
18101817
//nolint: prealloc
18111818
var servers []ServerInfo
18121819
for _, r := range retrievers {
@@ -1952,6 +1959,102 @@ func GetPDServerInfo(ctx sessionctx.Context) ([]ServerInfo, error) {
19521959
return servers, nil
19531960
}
19541961

1962+
// GetTSOServerInfo returns all TSO nodes information of cluster
1963+
func GetTSOServerInfo(ctx sessionctx.Context) ([]ServerInfo, error) {
1964+
return getMicroServiceServerInfo(ctx, TSOServiceName)
1965+
}
1966+
1967+
// GetSchedulingServerInfo returns all scheduling nodes information of cluster
1968+
func GetSchedulingServerInfo(ctx sessionctx.Context) ([]ServerInfo, error) {
1969+
return getMicroServiceServerInfo(ctx, SchedulingServiceName)
1970+
}
1971+
1972+
func getMicroServiceServerInfo(ctx sessionctx.Context, serviceName string) ([]ServerInfo, error) {
1973+
// Get servers info.
1974+
store := ctx.GetStore()
1975+
etcd, ok := store.(kv.EtcdBackend)
1976+
if !ok {
1977+
return nil, errors.Errorf("%T not an etcd backend", store)
1978+
}
1979+
members, err := etcd.EtcdAddrs()
1980+
if err != nil {
1981+
return nil, errors.Trace(err)
1982+
}
1983+
// TODO: maybe we should unify the PD API request interface.
1984+
var (
1985+
memberNum = len(members)
1986+
servers []ServerInfo
1987+
errs = make([]error, 0, memberNum)
1988+
)
1989+
if memberNum == 0 {
1990+
return servers, nil
1991+
}
1992+
// Try on each member until one succeeds or all fail.
1993+
for _, addr := range members {
1994+
// Get members
1995+
url := fmt.Sprintf("%s://%s%s/%s", util.InternalHTTPSchema(), addr, "/pd/api/v2/ms/members", serviceName)
1996+
req, err := http.NewRequest(http.MethodGet, url, nil)
1997+
if err != nil {
1998+
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
1999+
logutil.BgLogger().Warn("create server info request error", zap.String("service", serviceName), zap.String("url", url), zap.Error(err))
2000+
errs = append(errs, err)
2001+
continue
2002+
}
2003+
req.Header.Add("PD-Allow-follower-handle", "true")
2004+
resp, err := util.InternalHTTPClient().Do(req)
2005+
if err != nil {
2006+
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
2007+
logutil.BgLogger().Warn("request server info error", zap.String("service", serviceName), zap.String("url", url), zap.Error(err))
2008+
errs = append(errs, err)
2009+
continue
2010+
}
2011+
var content = []struct {
2012+
ServiceAddr string `json:"service-addr"`
2013+
Version string `json:"version"`
2014+
GitHash string `json:"git-hash"`
2015+
DeployPath string `json:"deploy-path"`
2016+
StartTimestamp int64 `json:"start-timestamp"`
2017+
}{}
2018+
err = json.NewDecoder(resp.Body).Decode(&content)
2019+
terror.Log(resp.Body.Close())
2020+
if err != nil {
2021+
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
2022+
logutil.BgLogger().Warn("close server info request error", zap.String("service", serviceName), zap.String("url", url), zap.Error(err))
2023+
errs = append(errs, err)
2024+
continue
2025+
}
2026+
2027+
for _, c := range content {
2028+
addr := strings.TrimPrefix(c.ServiceAddr, "http://")
2029+
addr = strings.TrimPrefix(addr, "https://")
2030+
if len(c.Version) > 0 && c.Version[0] == 'v' {
2031+
c.Version = c.Version[1:]
2032+
}
2033+
servers = append(servers, ServerInfo{
2034+
ServerType: serviceName,
2035+
Address: addr,
2036+
StatusAddr: addr,
2037+
Version: c.Version,
2038+
GitHash: c.GitHash,
2039+
StartTimestamp: c.StartTimestamp,
2040+
})
2041+
}
2042+
break
2043+
}
2044+
// Return the errors if all members' requests fail.
2045+
if len(errs) == memberNum {
2046+
errorMsg := ""
2047+
for idx, err := range errs {
2048+
errorMsg += err.Error()
2049+
if idx < memberNum-1 {
2050+
errorMsg += "; "
2051+
}
2052+
}
2053+
return nil, errors.Trace(fmt.Errorf("%s", errorMsg))
2054+
}
2055+
return servers, nil
2056+
}
2057+
19552058
func isTiFlashStore(store *metapb.Store) bool {
19562059
for _, label := range store.Labels {
19572060
if label.GetKey() == placement.EngineLabelKey && label.GetValue() == placement.EngineLabelTiFlash {

0 commit comments

Comments
 (0)