diff --git a/pkg/ddl/ddl_test.go b/pkg/ddl/ddl_test.go index ed5d14d2f389a..aab9278865bb5 100644 --- a/pkg/ddl/ddl_test.go +++ b/pkg/ddl/ddl_test.go @@ -405,7 +405,9 @@ func TestDetectAndUpdateJobVersion(t *testing.T) { serverInfos := make(map[string]*infosync.ServerInfo, len(versions)) for i, v := range versions { serverInfos[fmt.Sprintf("node%d", i)] = &infosync.ServerInfo{ - ServerVersionInfo: infosync.ServerVersionInfo{Version: v}} + StaticServerInfo: infosync.StaticServerInfo{ + ServerVersionInfo: infosync.ServerVersionInfo{Version: v}, + }} } bytes, err := json.Marshal(serverInfos) require.NoError(t, err) diff --git a/pkg/ddl/schemaver/syncer_nokit_test.go b/pkg/ddl/schemaver/syncer_nokit_test.go index aec80b1c7f6b0..cc767ead716ce 100644 --- a/pkg/ddl/schemaver/syncer_nokit_test.go +++ b/pkg/ddl/schemaver/syncer_nokit_test.go @@ -175,7 +175,7 @@ func TestSyncJobSchemaVerLoop(t *testing.T) { // job 4 is matched using WaitVersionSynced vardef.EnableMDL.Store(true) - serverInfos := map[string]*infosync.ServerInfo{"aa": {ID: "aa", IP: "test", Port: 4000}} + serverInfos := map[string]*infosync.ServerInfo{"aa": {StaticServerInfo: infosync.StaticServerInfo{ID: "aa", IP: "test", Port: 4000}}} bytes, err := json.Marshal(serverInfos) require.NoError(t, err) inTerms := fmt.Sprintf("return(`%s`)", string(bytes)) diff --git a/pkg/disttask/importinto/planner_test.go b/pkg/disttask/importinto/planner_test.go index cc7ad0ba9242d..a9dff12f127fe 100644 --- a/pkg/disttask/importinto/planner_test.go +++ b/pkg/disttask/importinto/planner_test.go @@ -43,7 +43,7 @@ func TestLogicalPlan(t *testing.T) { JobID: 1, Plan: importer.Plan{}, Stmt: `IMPORT INTO db.tb FROM 'gs://test-load/*.csv?endpoint=xxx'`, - EligibleInstances: []*infosync.ServerInfo{{ID: "1"}}, + EligibleInstances: []*infosync.ServerInfo{{StaticServerInfo: infosync.StaticServerInfo{ID: "1"}}}, ChunkMap: map[int32][]importer.Chunk{1: {{Path: "gs://test-load/1.csv"}}}, } bs, err := logicalPlan.ToTaskMeta() @@ -64,7 +64,7 @@ func TestToPhysicalPlan(t *testing.T) { }, }, Stmt: `IMPORT INTO db.tb FROM 'gs://test-load/*.csv?endpoint=xxx'`, - EligibleInstances: []*infosync.ServerInfo{{ID: "1"}}, + EligibleInstances: []*infosync.ServerInfo{{StaticServerInfo: infosync.StaticServerInfo{ID: "1"}}}, ChunkMap: map[int32][]importer.Chunk{chunkID: {{Path: "gs://test-load/1.csv"}}}, } planCtx := planner.PlanCtx{ diff --git a/pkg/disttask/importinto/scheduler_testkit_test.go b/pkg/disttask/importinto/scheduler_testkit_test.go index 7c69bd6a458cc..0b7805d6fc375 100644 --- a/pkg/disttask/importinto/scheduler_testkit_test.go +++ b/pkg/disttask/importinto/scheduler_testkit_test.go @@ -68,7 +68,7 @@ func TestSchedulerExtLocalSort(t *testing.T) { DisableTiKVImportMode: true, }, Stmt: `IMPORT INTO db.tb FROM 'gs://test-load/*.csv?endpoint=xxx'`, - EligibleInstances: []*infosync.ServerInfo{{ID: "1"}}, + EligibleInstances: []*infosync.ServerInfo{{StaticServerInfo: infosync.StaticServerInfo{ID: "1"}}}, ChunkMap: map[int32][]importer.Chunk{1: {{Path: "gs://test-load/1.csv"}}}, } bs, err := logicalPlan.ToTaskMeta() @@ -210,7 +210,7 @@ func TestSchedulerExtGlobalSort(t *testing.T) { InImportInto: true, }, Stmt: `IMPORT INTO db.tb FROM 'gs://test-load/*.csv?endpoint=xxx'`, - EligibleInstances: []*infosync.ServerInfo{{ID: "1"}}, + EligibleInstances: []*infosync.ServerInfo{{StaticServerInfo: infosync.StaticServerInfo{ID: "1"}}}, ChunkMap: map[int32][]importer.Chunk{ 1: {{Path: "gs://test-load/1.csv"}}, 2: {{Path: "gs://test-load/2.csv"}}, diff --git a/pkg/domain/domain_test.go b/pkg/domain/domain_test.go index 23a2910c81e1b..37da5d1ca2b7f 100644 --- a/pkg/domain/domain_test.go +++ b/pkg/domain/domain_test.go @@ -274,15 +274,23 @@ func TestClosestReplicaReadChecker(t *testing.T) { mockedAllServerInfos := map[string]*infosync.ServerInfo{ "s1": { - ID: "s1", - Labels: map[string]string{ - "zone": "zone1", + StaticServerInfo: infosync.StaticServerInfo{ + ID: "s1", + }, + DynamicServerInfo: infosync.DynamicServerInfo{ + Labels: map[string]string{ + "zone": "zone1", + }, }, }, "s2": { - ID: "s2", - Labels: map[string]string{ - "zone": "zone2", + StaticServerInfo: infosync.StaticServerInfo{ + ID: "s2", + }, + DynamicServerInfo: infosync.DynamicServerInfo{ + Labels: map[string]string{ + "zone": "zone2", + }, }, }, } @@ -348,33 +356,53 @@ func TestClosestReplicaReadChecker(t *testing.T) { // partial matches mockedAllServerInfos = map[string]*infosync.ServerInfo{ "s1": { - ID: "s1", - Labels: map[string]string{ - "zone": "zone1", + StaticServerInfo: infosync.StaticServerInfo{ + ID: "s1", + }, + DynamicServerInfo: infosync.DynamicServerInfo{ + Labels: map[string]string{ + "zone": "zone1", + }, }, }, "s2": { - ID: "s2", - Labels: map[string]string{ - "zone": "zone2", + StaticServerInfo: infosync.StaticServerInfo{ + ID: "s2", + }, + DynamicServerInfo: infosync.DynamicServerInfo{ + Labels: map[string]string{ + "zone": "zone2", + }, }, }, "s22": { - ID: "s22", - Labels: map[string]string{ - "zone": "zone2", + StaticServerInfo: infosync.StaticServerInfo{ + ID: "s22", + }, + DynamicServerInfo: infosync.DynamicServerInfo{ + Labels: map[string]string{ + "zone": "zone2", + }, }, }, "s3": { - ID: "s3", - Labels: map[string]string{ - "zone": "zone3", + StaticServerInfo: infosync.StaticServerInfo{ + ID: "s3", + }, + DynamicServerInfo: infosync.DynamicServerInfo{ + Labels: map[string]string{ + "zone": "zone3", + }, }, }, "s4": { - ID: "s4", - Labels: map[string]string{ - "zone": "zone4", + StaticServerInfo: infosync.StaticServerInfo{ + ID: "s4", + }, + DynamicServerInfo: infosync.DynamicServerInfo{ + Labels: map[string]string{ + "zone": "zone4", + }, }, }, } diff --git a/pkg/domain/infosync/BUILD.bazel b/pkg/domain/infosync/BUILD.bazel index 827cb35eb53e3..bd0be42ba986d 100644 --- a/pkg/domain/infosync/BUILD.bazel +++ b/pkg/domain/infosync/BUILD.bazel @@ -65,7 +65,7 @@ go_test( srcs = ["info_test.go"], embed = [":infosync"], flaky = True, - shard_count = 3, + shard_count = 4, deps = [ "//pkg/ddl/placement", "//pkg/ddl/util", diff --git a/pkg/domain/infosync/info.go b/pkg/domain/infosync/info.go index 4126762d13e32..5a8ce177b0219 100644 --- a/pkg/domain/infosync/info.go +++ b/pkg/domain/infosync/info.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "io" + "maps" "net" "net/http" "os" @@ -109,11 +110,12 @@ type InfoSyncer struct { // See keyspace RFC: https://github.com/pingcap/tidb/pull/39685 unprefixedEtcdCli *clientv3.Client pdHTTPCli pdhttp.Client - info *ServerInfo - serverInfoPath string - minStartTS uint64 - minStartTSPath string - managerMu struct { + info atomic.Pointer[ServerInfo] + + serverInfoPath string + minStartTS uint64 + minStartTSPath string + managerMu struct { mu sync.RWMutex util2.SessionManager } @@ -129,23 +131,21 @@ type InfoSyncer struct { infoCache infoschemaMinTS } -// ServerInfo is server static information. -// It will not be updated when tidb-server running. So please only put static information in ServerInfo struct. +// ServerInfo represents the server's basic information. +// It consists of two sections: static and dynamic. +// The static information is generated during the startup of the TiDB server and should never be modified while the TiDB server is running. +// The dynamic information can be updated while the TiDB server is running and should be synchronized with PD's etcd. type ServerInfo struct { - ServerVersionInfo - ID string `json:"ddl_id"` - IP string `json:"ip"` - Port uint `json:"listening_port"` - StatusPort uint `json:"status_port"` - Lease string `json:"lease"` - StartTimestamp int64 `json:"start_timestamp"` - Labels map[string]string `json:"labels"` - // ServerID is a function, to always retrieve latest serverID from `Domain`, - // which will be changed on occasions such as connection to PD is restored after broken. - ServerIDGetter func() uint64 `json:"-"` + StaticServerInfo + DynamicServerInfo +} - // JSONServerID is `serverID` for json marshal/unmarshal ONLY. - JSONServerID uint64 `json:"server_id"` +// clone the ServerInfo. +func (info *ServerInfo) clone() *ServerInfo { + return &ServerInfo{ + StaticServerInfo: info.StaticServerInfo, + DynamicServerInfo: *info.DynamicServerInfo.clone(), + } } // Marshal `ServerInfo` into bytes. @@ -169,6 +169,40 @@ func (info *ServerInfo) Unmarshal(v []byte) error { return nil } +// StaticServerInfo is server static information. +// It will not be updated when tidb-server running. So please only put static information in ServerInfo struct. +// DO NOT edit it after tidb-server started. +type StaticServerInfo struct { + ServerVersionInfo + ID string `json:"ddl_id"` + IP string `json:"ip"` + Port uint `json:"listening_port"` + StatusPort uint `json:"status_port"` + Lease string `json:"lease"` + StartTimestamp int64 `json:"start_timestamp"` + // ServerID is a function, to always retrieve latest serverID from `Domain`, + // which will be changed on occasions such as connection to PD is restored after broken. + ServerIDGetter func() uint64 `json:"-"` + + // JSONServerID is `serverID` for json marshal/unmarshal ONLY. + JSONServerID uint64 `json:"server_id"` +} + +// DynamicServerInfo represents the dynamic information of the server. +// Please note that it may change when TiDB is running. +// To update the dynamic server information, use `InfoSyncer.cloneDynamicServerInfo` to obtain a copy of the dynamic server info. +// After making modifications, use `InfoSyncer.setDynamicServerInfo` to update the dynamic server information. +type DynamicServerInfo struct { + Labels map[string]string `json:"labels"` +} + +// clone the DynamicServerInfo. +func (d *DynamicServerInfo) clone() *DynamicServerInfo { + return &DynamicServerInfo{ + Labels: maps.Clone(d.Labels), + } +} + // ServerVersionInfo is the server version and git_hash. type ServerVersionInfo struct { Version string `json:"version"` @@ -227,11 +261,11 @@ func GlobalInfoSyncerInit( etcdCli: etcdCli, unprefixedEtcdCli: unprefixedEtcdCli, pdHTTPCli: pdHTTPCli, - info: getServerInfo(id, serverIDGetter), serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id), minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id), infoCache: infoCache, } + is.info.Store(getServerInfo(id, serverIDGetter)) err := is.init(ctx, skipRegisterToDashBoard) if err != nil { return nil, err @@ -380,7 +414,7 @@ func GetServerInfo() (*ServerInfo, error) { if err != nil { return nil, err } - return is.info, nil + return is.info.Load(), nil } // GetServerInfoByID gets specified server static information from etcd. @@ -393,8 +427,9 @@ func GetServerInfoByID(ctx context.Context, id string) (*ServerInfo, error) { } func (is *InfoSyncer) getServerInfoByID(ctx context.Context, id string) (*ServerInfo, error) { - if is.etcdCli == nil || id == is.info.ID { - return is.info, nil + localInfo := is.info.Load() + if is.etcdCli == nil || id == localInfo.ID { + return localInfo, nil } key := fmt.Sprintf("%s/%s", ServerInformationPath, id) infoMap, err := getInfo(ctx, is.etcdCli, key, keyOpDefaultRetryCnt, keyOpDefaultTimeout) @@ -432,27 +467,31 @@ func UpdateServerLabel(ctx context.Context, labels map[string]string) error { if is.etcdCli == nil { return nil } - selfInfo, err := is.getServerInfoByID(ctx, is.info.ID) - if err != nil { - return err - } + dynamicInfo := is.cloneDynamicServerInfo() changed := false for k, v := range labels { - if selfInfo.Labels[k] != v { + if dynamicInfo.Labels[k] != v { changed = true - selfInfo.Labels[k] = v + dynamicInfo.Labels[k] = v } } if !changed { return nil } - infoBuf, err := selfInfo.Marshal() + info := is.getLocalServerInfo().clone() + info.DynamicServerInfo = *dynamicInfo + infoBuf, err := info.Marshal() if err != nil { return errors.Trace(err) } str := string(hack.String(infoBuf)) err = util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, is.serverInfoPath, str, clientv3.WithLease(is.session.Lease())) - return err + if err != nil { + return err + } + // update the dynamic info in the global info syncer after put etcd success. + is.setDynamicServerInfo(dynamicInfo) + return nil } // DeleteTiFlashTableSyncProgress is used to delete the tiflash table replica sync progress. @@ -651,7 +690,8 @@ func PutRuleBundlesWithDefaultRetry(ctx context.Context, bundles []*placement.Bu func (is *InfoSyncer) getAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) { allInfo := make(map[string]*ServerInfo) if is.etcdCli == nil { - allInfo[is.info.ID] = getServerInfo(is.info.ID, is.info.ServerIDGetter) + info := is.info.Load() + allInfo[info.ID] = getServerInfo(info.ID, info.ServerIDGetter) return allInfo, nil } allInfo, err := getInfo(ctx, is.etcdCli, ServerInformationPath, keyOpDefaultRetryCnt, keyOpDefaultTimeout, clientv3.WithPrefix()) @@ -666,7 +706,8 @@ func (is *InfoSyncer) StoreServerInfo(ctx context.Context) error { if is.etcdCli == nil { return nil } - infoBuf, err := is.info.Marshal() + info := is.info.Load() + infoBuf, err := info.Marshal() if err != nil { return errors.Trace(err) } @@ -696,7 +737,7 @@ type TopologyInfo struct { Labels map[string]string `json:"labels"` } -func (is *InfoSyncer) getTopologyInfo() TopologyInfo { +func (info *ServerInfo) asTopologyInfo() TopologyInfo { s, err := os.Executable() if err != nil { s = "" @@ -705,28 +746,29 @@ func (is *InfoSyncer) getTopologyInfo() TopologyInfo { return TopologyInfo{ ServerVersionInfo: ServerVersionInfo{ Version: mysql.TiDBReleaseVersion, - GitHash: is.info.ServerVersionInfo.GitHash, + GitHash: info.ServerVersionInfo.GitHash, }, - IP: is.info.IP, - StatusPort: is.info.StatusPort, + IP: info.IP, + StatusPort: info.StatusPort, DeployPath: dir, - StartTimestamp: is.info.StartTimestamp, - Labels: is.info.Labels, + StartTimestamp: info.StartTimestamp, + Labels: info.Labels, } } -// StoreTopologyInfo stores the topology of tidb to etcd. +// StoreTopologyInfo stores the topology of tidb to etcd. func (is *InfoSyncer) StoreTopologyInfo(ctx context.Context) error { if is.etcdCli == nil { return nil } - topologyInfo := is.getTopologyInfo() + info := is.info.Load() + topologyInfo := info.asTopologyInfo() infoBuf, err := json.Marshal(topologyInfo) if err != nil { return errors.Trace(err) } str := string(hack.String(infoBuf)) - key := fmt.Sprintf("%s/%s/info", TopologyInformationPath, net.JoinHostPort(is.info.IP, strconv.Itoa(int(is.info.Port)))) + key := fmt.Sprintf("%s/%s/info", TopologyInformationPath, net.JoinHostPort(info.IP, strconv.Itoa(int(info.Port)))) // Note: no lease is required here. err = util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, key, str) if err != nil { @@ -891,7 +933,8 @@ func (is *InfoSyncer) newTopologySessionAndStoreServerInfo(ctx context.Context, if is.etcdCli == nil { return nil } - logPrefix := fmt.Sprintf("[topology-syncer] %s/%s", TopologyInformationPath, net.JoinHostPort(is.info.IP, strconv.Itoa(int(is.info.Port)))) + info := is.getLocalServerInfo() + logPrefix := fmt.Sprintf("[topology-syncer] %s/%s", TopologyInformationPath, net.JoinHostPort(info.IP, strconv.Itoa(int(info.Port)))) session, err := util2.NewSession(ctx, logPrefix, is.etcdCli, retryCnt, TopologySessionTTL) if err != nil { return err @@ -906,7 +949,8 @@ func (is *InfoSyncer) updateTopologyAliveness(ctx context.Context) error { if is.etcdCli == nil { return nil } - key := fmt.Sprintf("%s/%s/ttl", TopologyInformationPath, net.JoinHostPort(is.info.IP, strconv.Itoa(int(is.info.Port)))) + info := is.getLocalServerInfo() + key := fmt.Sprintf("%s/%s/ttl", TopologyInformationPath, net.JoinHostPort(info.IP, strconv.Itoa(int(info.Port)))) return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, key, fmt.Sprintf("%v", time.Now().UnixNano()), clientv3.WithLease(is.topologySession.Lease())) @@ -917,10 +961,11 @@ func (is *InfoSyncer) RemoveTopologyInfo() { if is.etcdCli == nil { return } + info := is.info.Load() prefix := fmt.Sprintf( "%s/%s", TopologyInformationPath, - net.JoinHostPort(is.info.IP, strconv.Itoa(int(is.info.Port))), + net.JoinHostPort(info.IP, strconv.Itoa(int(info.Port))), ) err := util.DeleteKeysWithPrefixFromEtcd(prefix, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout) if err != nil { @@ -1053,14 +1098,18 @@ func getInfo(ctx context.Context, etcdCli *clientv3.Client, key string, retryCnt func getServerInfo(id string, serverIDGetter func() uint64) *ServerInfo { cfg := config.GetGlobalConfig() info := &ServerInfo{ - ID: id, - IP: cfg.AdvertiseAddress, - Port: cfg.Port, - StatusPort: cfg.Status.StatusPort, - Lease: cfg.Lease, - StartTimestamp: time.Now().Unix(), - Labels: cfg.Labels, - ServerIDGetter: serverIDGetter, + StaticServerInfo: StaticServerInfo{ + ID: id, + IP: cfg.AdvertiseAddress, + Port: cfg.Port, + StatusPort: cfg.Status.StatusPort, + Lease: cfg.Lease, + StartTimestamp: time.Now().Unix(), + ServerIDGetter: serverIDGetter, + }, + DynamicServerInfo: DynamicServerInfo{ + Labels: maps.Clone(cfg.Labels), + }, } info.Version = mysql.ServerVersion info.GitHash = versioninfo.TiDBGitHash @@ -1355,6 +1404,9 @@ func ContainsInternalSessionForTest(se any) bool { } // SetEtcdClient is only used for test. +// SetEtcdClient is not thread-safe and may cause data race with the initialization of the domain. +// Because this usage is test-only, we don't need to introduce a lock or atomic variable for it. +// Use it after the domain initialization is done. func SetEtcdClient(etcdCli *clientv3.Client) { is, err := getGlobalInfoSyncer() @@ -1522,3 +1574,23 @@ func (is *InfoSyncer) getTiCDCServerInfo(ctx context.Context) ([]*TiCDCInfo, err } return nil, errors.Trace(err) } + +// getLocalServerInfo returns the local server info. +func (is *InfoSyncer) getLocalServerInfo() *ServerInfo { + return is.info.Load() +} + +// cloneDynamicServerInfo returns a clone of the dynamic server info. +func (is *InfoSyncer) cloneDynamicServerInfo() *DynamicServerInfo { + return is.info.Load().DynamicServerInfo.clone() +} + +// setDynamicServerInfo updates the dynamic server info. +func (is *InfoSyncer) setDynamicServerInfo(ds *DynamicServerInfo) { + staticInfo := is.info.Load() + newInfo := &ServerInfo{ + StaticServerInfo: staticInfo.StaticServerInfo, + DynamicServerInfo: *ds, + } + is.info.Store(newInfo) +} diff --git a/pkg/domain/infosync/info_test.go b/pkg/domain/infosync/info_test.go index 9be9326e2ab3a..01ceed92e5b75 100644 --- a/pkg/domain/infosync/info_test.go +++ b/pkg/domain/infosync/info_test.go @@ -84,10 +84,11 @@ func TestTopology(t *testing.T) { v, ok := topology.Labels["foo"] require.True(t, ok) require.Equal(t, "bar", v) - require.Equal(t, info.getTopologyInfo(), *topology) + selfInfo := info.getLocalServerInfo() + require.Equal(t, selfInfo.asTopologyInfo(), *topology) - nonTTLKey := fmt.Sprintf("%s/%s:%v/info", TopologyInformationPath, info.info.IP, info.info.Port) - ttlKey := fmt.Sprintf("%s/%s:%v/ttl", TopologyInformationPath, info.info.IP, info.info.Port) + nonTTLKey := fmt.Sprintf("%s/%s:%v/info", TopologyInformationPath, selfInfo.IP, selfInfo.Port) + ttlKey := fmt.Sprintf("%s/%s:%v/ttl", TopologyInformationPath, selfInfo.IP, selfInfo.Port) err = util.DeleteKeyFromEtcd(nonTTLKey, client, util2.NewSessionDefaultRetryCnt, time.Second) require.NoError(t, err) @@ -105,7 +106,7 @@ func TestTopology(t *testing.T) { dir := path.Dir(s) require.Equal(t, dir, topology.DeployPath) require.Equal(t, int64(1282967700), topology.StartTimestamp) - require.Equal(t, info.getTopologyInfo(), *topology) + require.Equal(t, info.getLocalServerInfo().asTopologyInfo(), *topology) // check ttl key ttlExists, err := info.ttlKeyExists(ctx) @@ -124,7 +125,8 @@ func TestTopology(t *testing.T) { } func (is *InfoSyncer) getTopologyFromEtcd(ctx context.Context) (*TopologyInfo, error) { - key := fmt.Sprintf("%s/%s:%v/info", TopologyInformationPath, is.info.IP, is.info.Port) + info := is.getLocalServerInfo() + key := fmt.Sprintf("%s/%s:%v/info", TopologyInformationPath, info.IP, info.Port) resp, err := is.etcdCli.Get(ctx, key) if err != nil { return nil, err @@ -144,7 +146,8 @@ func (is *InfoSyncer) getTopologyFromEtcd(ctx context.Context) (*TopologyInfo, e } func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) { - key := fmt.Sprintf("%s/%s:%v/ttl", TopologyInformationPath, is.info.IP, is.info.Port) + info := is.getLocalServerInfo() + key := fmt.Sprintf("%s/%s:%v/ttl", TopologyInformationPath, info.IP, info.Port) resp, err := is.etcdCli.Get(ctx, key) if err != nil { return false, err @@ -281,3 +284,44 @@ func TestTiFlashManager(t *testing.T) { CloseTiFlashManager(ctx) } + +func TestInfoSyncerMarshal(t *testing.T) { + info := &ServerInfo{ + StaticServerInfo: StaticServerInfo{ + ServerVersionInfo: ServerVersionInfo{ + Version: "8.8.8", + GitHash: "123456", + }, + ID: "tidb1", + IP: "127.0.0.1", + Port: 4000, + StatusPort: 10080, + Lease: "1s", + StartTimestamp: 10000, + ServerIDGetter: func() uint64 { return 0 }, + JSONServerID: 1, + }, + DynamicServerInfo: DynamicServerInfo{ + Labels: map[string]string{"zone": "ap-northeast-1a"}, + }, + } + data, err := json.Marshal(info) + require.NoError(t, err) + require.Equal(t, data, []byte(`{"version":"8.8.8","git_hash":"123456",`+ + `"ddl_id":"tidb1","ip":"127.0.0.1","listening_port":4000,"status_port":10080,"lease":"1s","start_timestamp":10000,`+ + `"server_id":1,"labels":{"zone":"ap-northeast-1a"}}`)) + var decodeInfo *ServerInfo + err = json.Unmarshal(data, &decodeInfo) + require.NoError(t, err) + require.Nil(t, decodeInfo.ServerIDGetter) + require.Equal(t, info.Version, decodeInfo.Version) + require.Equal(t, info.GitHash, decodeInfo.GitHash) + require.Equal(t, info.ID, decodeInfo.ID) + require.Equal(t, info.IP, decodeInfo.IP) + require.Equal(t, info.Port, decodeInfo.Port) + require.Equal(t, info.StatusPort, decodeInfo.StatusPort) + require.Equal(t, info.Lease, decodeInfo.Lease) + require.Equal(t, info.StartTimestamp, decodeInfo.StartTimestamp) + require.Equal(t, info.JSONServerID, decodeInfo.JSONServerID) + require.Equal(t, info.Labels, decodeInfo.Labels) +} diff --git a/pkg/domain/infosync/mock_info.go b/pkg/domain/infosync/mock_info.go index a3d0b84c98716..9c527d0b24a22 100644 --- a/pkg/domain/infosync/mock_info.go +++ b/pkg/domain/infosync/mock_info.go @@ -85,14 +85,18 @@ func (m *MockGlobalServerInfoManager) getServerInfo(id string, serverIDGetter fu // TODO: each mock server can have different config info := &ServerInfo{ - ID: id, - IP: cfg.AdvertiseAddress, - Port: m.mockServerPort, - StatusPort: cfg.Status.StatusPort, - Lease: cfg.Lease, - StartTimestamp: time.Now().Unix(), - Labels: cfg.Labels, - ServerIDGetter: serverIDGetter, + StaticServerInfo: StaticServerInfo{ + ID: id, + IP: cfg.AdvertiseAddress, + Port: m.mockServerPort, + StatusPort: cfg.Status.StatusPort, + Lease: cfg.Lease, + StartTimestamp: time.Now().Unix(), + ServerIDGetter: serverIDGetter, + }, + DynamicServerInfo: DynamicServerInfo{ + Labels: cfg.Labels, + }, } m.mockServerPort++ diff --git a/pkg/server/handler/tests/BUILD.bazel b/pkg/server/handler/tests/BUILD.bazel index acd94daa68a05..47ebbe3f9841e 100644 --- a/pkg/server/handler/tests/BUILD.bazel +++ b/pkg/server/handler/tests/BUILD.bazel @@ -9,7 +9,8 @@ go_test( "main_test.go", ], flaky = True, - shard_count = 38, + race = "on", + shard_count = 39, deps = [ "//pkg/config", "//pkg/ddl", diff --git a/pkg/server/handler/tests/http_handler_test.go b/pkg/server/handler/tests/http_handler_test.go index be1efd69a3849..53556a9b28a6c 100644 --- a/pkg/server/handler/tests/http_handler_test.go +++ b/pkg/server/handler/tests/http_handler_test.go @@ -34,6 +34,7 @@ import ( "sort" "strconv" "strings" + "sync" "testing" "time" @@ -1225,6 +1226,7 @@ func TestSetLabelsWithEtcd(t *testing.T) { ts.startServer(t) defer ts.stopServer(t) + time.Sleep(time.Second) integration.BeforeTestExternal(t) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) @@ -1431,33 +1433,39 @@ func testUpgradeShow(t *testing.T, ts *basicHTTPHandlerTestSuite) { // check the result for upgrade show mockedAllServerInfos := map[string]*infosync.ServerInfo{ "s0": { - ID: ddlID, - IP: "127.0.0.1", - Port: 4000, - JSONServerID: 0, - ServerVersionInfo: infosync.ServerVersionInfo{ - Version: "ver", - GitHash: "hash", + StaticServerInfo: infosync.StaticServerInfo{ + ID: ddlID, + IP: "127.0.0.1", + Port: 4000, + JSONServerID: 0, + ServerVersionInfo: infosync.ServerVersionInfo{ + Version: "ver", + GitHash: "hash", + }, }, }, "s2": { - ID: "ID2", - IP: "127.0.0.1", - Port: 4002, - JSONServerID: 2, - ServerVersionInfo: infosync.ServerVersionInfo{ - Version: "ver2", - GitHash: "hash2", + StaticServerInfo: infosync.StaticServerInfo{ + ID: "ID2", + IP: "127.0.0.1", + Port: 4002, + JSONServerID: 2, + ServerVersionInfo: infosync.ServerVersionInfo{ + Version: "ver2", + GitHash: "hash2", + }, }, }, "s1": { - ID: "ID1", - IP: "127.0.0.1", - Port: 4001, - JSONServerID: 1, - ServerVersionInfo: infosync.ServerVersionInfo{ - Version: "ver", - GitHash: "hash", + StaticServerInfo: infosync.StaticServerInfo{ + ID: "ID1", + IP: "127.0.0.1", + Port: 4001, + JSONServerID: 1, + ServerVersionInfo: infosync.ServerVersionInfo{ + Version: "ver", + GitHash: "hash", + }, }, }, } @@ -1524,3 +1532,66 @@ func TestIssue52608(t *testing.T) { require.Equal(t, on, true) require.Equal(t, addr[:10], "127.0.0.1:") } + +func TestSetLabelsConcurrentWithStoreTopology(t *testing.T) { + ts := createBasicHTTPHandlerTestSuite() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ts.startServer(t) + defer ts.stopServer(t) + + time.Sleep(time.Second) + integration.BeforeTestExternal(t) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + client := cluster.RandClient() + infosync.SetEtcdClient(client) + + ts.domain.InfoSyncer().Restart(ctx) + ts.domain.InfoSyncer().RestartTopology(ctx) + + testUpdateLabels := func() { + labels := map[string]string{} + labels["zone"] = fmt.Sprintf("z-%v", rand.Intn(100000)) + buffer := bytes.NewBuffer([]byte{}) + require.Nil(t, json.NewEncoder(buffer).Encode(labels)) + resp, err := ts.PostStatus("/labels", "application/json", buffer) + require.NoError(t, err) + require.NotNil(t, resp) + defer func() { + require.NoError(t, resp.Body.Close()) + }() + require.Equal(t, http.StatusOK, resp.StatusCode) + newLabels := config.GetGlobalConfig().Labels + require.Equal(t, newLabels, labels) + } + testStoreTopology := func() { + require.NoError(t, ts.domain.InfoSyncer().StoreTopologyInfo(context.Background())) + } + + done := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-done: + return + default: + testStoreTopology() + } + } + }() + for i := 0; i < 100; i++ { + testUpdateLabels() + } + close(done) + wg.Wait() + + // reset the global variable + config.UpdateGlobal(func(conf *config.Config) { + conf.Labels = map[string]string{} + }) +} diff --git a/pkg/statistics/handle/autoanalyze/autoanalyze_test.go b/pkg/statistics/handle/autoanalyze/autoanalyze_test.go index 9af3dfad8e383..b9b79d1eadef2 100644 --- a/pkg/statistics/handle/autoanalyze/autoanalyze_test.go +++ b/pkg/statistics/handle/autoanalyze/autoanalyze_test.go @@ -460,14 +460,18 @@ func makeFailpointRes(t *testing.T, v any) string { func getMockedServerInfo() map[string]*infosync.ServerInfo { mockedAllServerInfos := map[string]*infosync.ServerInfo{ "s1": { - ID: "s1", - IP: "127.0.0.1", - Port: 4000, + StaticServerInfo: infosync.StaticServerInfo{ + ID: "s1", + IP: "127.0.0.1", + Port: 4000, + }, }, "s2": { - ID: "s2", - IP: "127.0.0.2", - Port: 4000, + StaticServerInfo: infosync.StaticServerInfo{ + ID: "s2", + IP: "127.0.0.2", + Port: 4000, + }, }, } return mockedAllServerInfos diff --git a/pkg/util/disttask/idservice_test.go b/pkg/util/disttask/idservice_test.go index d991b6b4b80b0..bde16de238ccc 100644 --- a/pkg/util/disttask/idservice_test.go +++ b/pkg/util/disttask/idservice_test.go @@ -24,15 +24,15 @@ import ( // This testCase show GenerateExecID only generate string by input parametas func TestGenServerID(t *testing.T) { var str string - serverIO := GenerateExecID(&infosync.ServerInfo{IP: "", Port: 0}) + serverIO := GenerateExecID(&infosync.ServerInfo{StaticServerInfo: infosync.StaticServerInfo{IP: "", Port: 0}}) require.Equal(t, serverIO, ":0") - serverIO = GenerateExecID(&infosync.ServerInfo{IP: "10.124.122.25", Port: 3456}) + serverIO = GenerateExecID(&infosync.ServerInfo{StaticServerInfo: infosync.StaticServerInfo{IP: "10.124.122.25", Port: 3456}}) require.Equal(t, serverIO, "10.124.122.25:3456") - serverIO = GenerateExecID(&infosync.ServerInfo{IP: "10.124", Port: 3456}) + serverIO = GenerateExecID(&infosync.ServerInfo{StaticServerInfo: infosync.StaticServerInfo{IP: "10.124", Port: 3456}}) require.Equal(t, serverIO, "10.124:3456") - serverIO = GenerateExecID(&infosync.ServerInfo{IP: str, Port: 65537}) + serverIO = GenerateExecID(&infosync.ServerInfo{StaticServerInfo: infosync.StaticServerInfo{IP: str, Port: 65537}}) require.Equal(t, serverIO, ":65537") // IPv6 testcase - serverIO = GenerateExecID(&infosync.ServerInfo{IP: "ABCD:EF01:2345:6789:ABCD:EF01:2345:6789", Port: 65537}) + serverIO = GenerateExecID(&infosync.ServerInfo{StaticServerInfo: infosync.StaticServerInfo{IP: "ABCD:EF01:2345:6789:ABCD:EF01:2345:6789", Port: 65537}}) require.Equal(t, serverIO, "[ABCD:EF01:2345:6789:ABCD:EF01:2345:6789]:65537") } diff --git a/tests/realtikvtest/txntest/stale_read_test.go b/tests/realtikvtest/txntest/stale_read_test.go index 810ed6c555f58..d12ce23c598b6 100644 --- a/tests/realtikvtest/txntest/stale_read_test.go +++ b/tests/realtikvtest/txntest/stale_read_test.go @@ -1522,6 +1522,7 @@ func TestStaleReadAllCombinations(t *testing.T) { time.Sleep(1000 * time.Millisecond) // Insert row #2 tk.MustExec("insert into t values (2, 20)") + row2CreatedTime := time.Now() time.Sleep(1000 * time.Millisecond) secondTime := time.Now().Add(-500 * time.Millisecond) @@ -1535,7 +1536,9 @@ func TestStaleReadAllCombinations(t *testing.T) { { name: "tidb_read_staleness", setup: func() { - tk.MustExec("set @@tidb_read_staleness='-2'") + row2CreatedElapsed := int(time.Since(row2CreatedTime).Seconds()) + staleness := row2CreatedElapsed + 1 // The time `now - staleness(second)` is between row1 and row2. + tk.MustExec(fmt.Sprintf("set @@tidb_read_staleness='-%d'", staleness)) }, query: "select * from t", clean: func() {