Skip to content

Commit 56e7093

Browse files
authored
ddl: use a global owner manager instance for DDL, to avoid owner change (#57179)
ref #57185
1 parent 042846e commit 56e7093

File tree

39 files changed

+636
-434
lines changed

39 files changed

+636
-434
lines changed

br/pkg/conn/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ go_library(
1515
"//br/pkg/utils",
1616
"//br/pkg/version",
1717
"//pkg/config",
18+
"//pkg/ddl",
1819
"//pkg/domain",
1920
"//pkg/kv",
2021
"@com_github_docker_go_units//:go-units",

br/pkg/conn/conn.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/pingcap/tidb/br/pkg/utils"
3030
"github.com/pingcap/tidb/br/pkg/version"
3131
"github.com/pingcap/tidb/pkg/config"
32+
"github.com/pingcap/tidb/pkg/ddl"
3233
"github.com/pingcap/tidb/pkg/domain"
3334
"github.com/pingcap/tidb/pkg/kv"
3435
"github.com/tikv/client-go/v2/oracle"
@@ -194,6 +195,9 @@ func NewMgr(
194195
return nil, errors.Trace(err)
195196
}
196197

198+
if config.GetGlobalConfig().Store != "tikv" {
199+
config.GetGlobalConfig().Store = "tikv"
200+
}
197201
// Disable GC because TiDB enables GC already.
198202
path := fmt.Sprintf(
199203
"tikv://%s?disableGC=true&keyspaceName=%s",
@@ -292,6 +296,7 @@ func (mgr *Mgr) Close() {
292296
if mgr.dom != nil {
293297
mgr.dom.Close()
294298
}
299+
ddl.CloseOwnerManager()
295300
tikv.StoreShuttingDown(1)
296301
_ = mgr.storage.Close()
297302
}

br/pkg/gluetidb/glue.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,9 @@ func (g Glue) startDomainAsNeeded(store kv.Storage) error {
107107
if existDom != nil {
108108
return nil
109109
}
110+
if err := ddl.StartOwnerManager(context.Background(), store); err != nil {
111+
return errors.Trace(err)
112+
}
110113
dom, err := session.GetDomain(store)
111114
if err != nil {
112115
return err

br/pkg/task/stream.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -920,7 +920,11 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre
920920
env := streamhelper.CliEnv(mgr.StoreManager, mgr.GetStore(), etcdCLI)
921921
advancer := streamhelper.NewCheckpointAdvancer(env)
922922
advancer.UpdateConfig(cfg.AdvancerCfg)
923-
advancerd := daemon.New(advancer, streamhelper.OwnerManagerForLogBackup(ctx, etcdCLI), cfg.AdvancerCfg.TickDuration)
923+
ownerMgr := streamhelper.OwnerManagerForLogBackup(ctx, etcdCLI)
924+
defer func() {
925+
ownerMgr.Close()
926+
}()
927+
advancerd := daemon.New(advancer, ownerMgr, cfg.AdvancerCfg.TickDuration)
924928
loop, err := advancerd.Begin(ctx)
925929
if err != nil {
926930
return err

cmd/benchdb/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ go_library(
66
importpath = "github.com/pingcap/tidb/cmd/benchdb",
77
visibility = ["//visibility:private"],
88
deps = [
9+
"//pkg/config",
10+
"//pkg/ddl",
911
"//pkg/parser/terror",
1012
"//pkg/session",
1113
"//pkg/session/types",

cmd/benchdb/main.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
"time"
2525

2626
"github.com/pingcap/log"
27+
"github.com/pingcap/tidb/pkg/config"
28+
"github.com/pingcap/tidb/pkg/ddl"
2729
"github.com/pingcap/tidb/pkg/parser/terror"
2830
"github.com/pingcap/tidb/pkg/session"
2931
sessiontypes "github.com/pingcap/tidb/pkg/session/types"
@@ -96,11 +98,16 @@ func newBenchDB() *benchDB {
9698
// Create TiKV store and disable GC as we will trigger GC manually.
9799
store, err := store.New("tikv://" + *addr + "?disableGC=true")
98100
terror.MustNil(err)
101+
// maybe close below components, but it's for test anyway.
102+
ctx := context.Background()
103+
config.GetGlobalConfig().Store = "tikv"
104+
err = ddl.StartOwnerManager(ctx, store)
105+
terror.MustNil(err)
99106
_, err = session.BootstrapSession(store)
100107
terror.MustNil(err)
101108
se, err := session.CreateSession(store)
102109
terror.MustNil(err)
103-
_, err = se.ExecuteInternal(context.Background(), "use test")
110+
_, err = se.ExecuteInternal(ctx, "use test")
104111
terror.MustNil(err)
105112

106113
return &benchDB{

cmd/ddltest/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ go_test(
1414
race = "on",
1515
shard_count = 6,
1616
deps = [
17+
"//dumpling/context",
1718
"//pkg/config",
19+
"//pkg/ddl",
1820
"//pkg/domain",
1921
"//pkg/kv",
2022
"//pkg/parser/model",

cmd/ddltest/ddl_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ import (
3333
_ "github.com/go-sql-driver/mysql"
3434
"github.com/pingcap/errors"
3535
"github.com/pingcap/log"
36+
"github.com/pingcap/tidb/dumpling/context"
3637
"github.com/pingcap/tidb/pkg/config"
38+
"github.com/pingcap/tidb/pkg/ddl"
3739
"github.com/pingcap/tidb/pkg/domain"
3840
"github.com/pingcap/tidb/pkg/kv"
3941
"github.com/pingcap/tidb/pkg/parser/model"
@@ -95,12 +97,13 @@ func createDDLSuite(t *testing.T) (s *ddlSuite) {
9597

9698
s.quit = make(chan struct{})
9799

100+
config.GetGlobalConfig().Store = "tikv"
98101
s.store, err = store.New(fmt.Sprintf("tikv://%s%s", *etcd, *tikvPath))
99102
require.NoError(t, err)
100103

101104
// Make sure the schema lease of this session is equal to other TiDB servers'.
102105
session.SetSchemaLease(time.Duration(*lease) * time.Second)
103-
106+
require.NoError(t, ddl.StartOwnerManager(context.Background(), s.store))
104107
s.dom, err = session.BootstrapSession(s.store)
105108
require.NoError(t, err)
106109

@@ -118,6 +121,7 @@ func createDDLSuite(t *testing.T) (s *ddlSuite) {
118121
err = domain.GetDomain(s.ctx).DDL().Stop()
119122
require.NoError(t, err)
120123
config.GetGlobalConfig().Instance.TiDBEnableDDL.Store(false)
124+
ddl.CloseOwnerManager()
121125
session.ResetStoreForWithTiKVTest(s.store)
122126
s.dom.Close()
123127
require.NoError(t, s.store.Close())

cmd/tidb-server/main.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ func main() {
317317
keyspaceName := keyspace.GetKeyspaceNameBySettings()
318318
executor.Start()
319319
resourcemanager.InstanceResourceManager.Start()
320-
storage, dom := createStoreAndDomain(keyspaceName)
320+
storage, dom := createStoreDDLOwnerMgrAndDomain(keyspaceName)
321321
svr := createServer(storage, dom)
322322

323323
exited := make(chan struct{})
@@ -397,7 +397,7 @@ func registerStores() {
397397
terror.MustNil(err)
398398
}
399399

400-
func createStoreAndDomain(keyspaceName string) (kv.Storage, *domain.Domain) {
400+
func createStoreDDLOwnerMgrAndDomain(keyspaceName string) (kv.Storage, *domain.Domain) {
401401
cfg := config.GetGlobalConfig()
402402
var fullPath string
403403
if keyspaceName == "" {
@@ -411,6 +411,8 @@ func createStoreAndDomain(keyspaceName string) (kv.Storage, *domain.Domain) {
411411
copr.GlobalMPPFailedStoreProber.Run()
412412
mppcoordmanager.InstanceMPPCoordinatorManager.Run()
413413
// Bootstrap a session to load information schema.
414+
err = ddl.StartOwnerManager(context.Background(), storage)
415+
terror.MustNil(err)
414416
dom, err := session.BootstrapSession(storage)
415417
terror.MustNil(err)
416418
return storage, dom
@@ -859,7 +861,7 @@ func createServer(storage kv.Storage, dom *domain.Domain) *server.Server {
859861
svr, err := server.NewServer(cfg, driver)
860862
// Both domain and storage have started, so we have to clean them before exiting.
861863
if err != nil {
862-
closeDomainAndStorage(storage, dom)
864+
closeDDLOwnerMgrDomainAndStorage(storage, dom)
863865
log.Fatal("failed to create the server", zap.Error(err), zap.Stack("stack"))
864866
}
865867
svr.SetDomain(dom)
@@ -893,9 +895,10 @@ func setupTracing() {
893895
opentracing.SetGlobalTracer(tracer)
894896
}
895897

896-
func closeDomainAndStorage(storage kv.Storage, dom *domain.Domain) {
898+
func closeDDLOwnerMgrDomainAndStorage(storage kv.Storage, dom *domain.Domain) {
897899
tikv.StoreShuttingDown(1)
898900
dom.Close()
901+
ddl.CloseOwnerManager()
899902
copr.GlobalMPPFailedStoreProber.Stop()
900903
mppcoordmanager.InstanceMPPCoordinatorManager.Stop()
901904
err := storage.Close()
@@ -918,7 +921,7 @@ func cleanup(svr *server.Server, storage kv.Storage, dom *domain.Domain) {
918921
// See https://github.com/pingcap/tidb/issues/40038 for details.
919922
svr.KillSysProcesses()
920923
plugin.Shutdown(context.Background())
921-
closeDomainAndStorage(storage, dom)
924+
closeDDLOwnerMgrDomainAndStorage(storage, dom)
922925
disk.CleanUp()
923926
closeStmtSummary()
924927
topsql.Close()

pkg/autoid_service/autoid.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -379,8 +379,8 @@ func MockForTest(store kv.Storage) autoid.AutoIDAllocClient {
379379

380380
// Close closes the Service and clean up resource.
381381
func (s *Service) Close() {
382-
if s.leaderShip != nil && s.leaderShip.IsOwner() {
383-
s.leaderShip.Cancel()
382+
if s.leaderShip != nil {
383+
s.leaderShip.Close()
384384
}
385385
}
386386

0 commit comments

Comments
 (0)