Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/conn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//br/pkg/utils",
"//br/pkg/version",
"//pkg/config",
"//pkg/ddl",
"//pkg/domain",
"//pkg/kv",
"@com_github_docker_go_units//:go-units",
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/kv"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -194,6 +195,9 @@ func NewMgr(
return nil, errors.Trace(err)
}

if config.GetGlobalConfig().Store != "tikv" {
config.GetGlobalConfig().Store = "tikv"
}
// Disable GC because TiDB enables GC already.
path := fmt.Sprintf(
"tikv://%s?disableGC=true&keyspaceName=%s",
Expand Down Expand Up @@ -292,6 +296,7 @@ func (mgr *Mgr) Close() {
if mgr.dom != nil {
mgr.dom.Close()
}
ddl.CloseOwnerManager()
tikv.StoreShuttingDown(1)
_ = mgr.storage.Close()
}
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ func (g Glue) startDomainAsNeeded(store kv.Storage) error {
if existDom != nil {
return nil
}
if err := ddl.StartOwnerManager(context.Background(), store); err != nil {
return errors.Trace(err)
}
dom, err := session.GetDomain(store)
if err != nil {
return err
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,11 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre
env := streamhelper.CliEnv(mgr.StoreManager, mgr.GetStore(), etcdCLI)
advancer := streamhelper.NewCheckpointAdvancer(env)
advancer.UpdateConfig(cfg.AdvancerCfg)
advancerd := daemon.New(advancer, streamhelper.OwnerManagerForLogBackup(ctx, etcdCLI), cfg.AdvancerCfg.TickDuration)
ownerMgr := streamhelper.OwnerManagerForLogBackup(ctx, etcdCLI)
defer func() {
ownerMgr.Close()
}()
advancerd := daemon.New(advancer, ownerMgr, cfg.AdvancerCfg.TickDuration)
loop, err := advancerd.Begin(ctx)
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions cmd/benchdb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ go_library(
importpath = "github.com/pingcap/tidb/cmd/benchdb",
visibility = ["//visibility:private"],
deps = [
"//pkg/config",
"//pkg/ddl",
"//pkg/parser/terror",
"//pkg/session",
"//pkg/session/types",
Expand Down
9 changes: 8 additions & 1 deletion cmd/benchdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/session"
sessiontypes "github.com/pingcap/tidb/pkg/session/types"
Expand Down Expand Up @@ -96,11 +98,16 @@ func newBenchDB() *benchDB {
// Create TiKV store and disable GC as we will trigger GC manually.
store, err := store.New("tikv://" + *addr + "?disableGC=true")
terror.MustNil(err)
// maybe close below components, but it's for test anyway.
ctx := context.Background()
config.GetGlobalConfig().Store = "tikv"
err = ddl.StartOwnerManager(ctx, store)
terror.MustNil(err)
_, err = session.BootstrapSession(store)
terror.MustNil(err)
se, err := session.CreateSession(store)
terror.MustNil(err)
_, err = se.ExecuteInternal(context.Background(), "use test")
_, err = se.ExecuteInternal(ctx, "use test")
terror.MustNil(err)

return &benchDB{
Expand Down
2 changes: 2 additions & 0 deletions cmd/ddltest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ go_test(
race = "on",
shard_count = 6,
deps = [
"//dumpling/context",
"//pkg/config",
"//pkg/ddl",
"//pkg/domain",
"//pkg/kv",
"//pkg/parser/model",
Expand Down
6 changes: 5 additions & 1 deletion cmd/ddltest/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import (
_ "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/dumpling/context"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -95,12 +97,13 @@ func createDDLSuite(t *testing.T) (s *ddlSuite) {

s.quit = make(chan struct{})

config.GetGlobalConfig().Store = "tikv"
s.store, err = store.New(fmt.Sprintf("tikv://%s%s", *etcd, *tikvPath))
require.NoError(t, err)

// Make sure the schema lease of this session is equal to other TiDB servers'.
session.SetSchemaLease(time.Duration(*lease) * time.Second)

require.NoError(t, ddl.StartOwnerManager(context.Background(), s.store))
s.dom, err = session.BootstrapSession(s.store)
require.NoError(t, err)

Expand All @@ -118,6 +121,7 @@ func createDDLSuite(t *testing.T) (s *ddlSuite) {
err = domain.GetDomain(s.ctx).DDL().Stop()
require.NoError(t, err)
config.GetGlobalConfig().Instance.TiDBEnableDDL.Store(false)
ddl.CloseOwnerManager()
session.ResetStoreForWithTiKVTest(s.store)
s.dom.Close()
require.NoError(t, s.store.Close())
Expand Down
13 changes: 8 additions & 5 deletions cmd/tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func main() {
keyspaceName := keyspace.GetKeyspaceNameBySettings()
executor.Start()
resourcemanager.InstanceResourceManager.Start()
storage, dom := createStoreAndDomain(keyspaceName)
storage, dom := createStoreDDLOwnerMgrAndDomain(keyspaceName)
svr := createServer(storage, dom)

exited := make(chan struct{})
Expand Down Expand Up @@ -397,7 +397,7 @@ func registerStores() {
terror.MustNil(err)
}

func createStoreAndDomain(keyspaceName string) (kv.Storage, *domain.Domain) {
func createStoreDDLOwnerMgrAndDomain(keyspaceName string) (kv.Storage, *domain.Domain) {
cfg := config.GetGlobalConfig()
var fullPath string
if keyspaceName == "" {
Expand All @@ -411,6 +411,8 @@ func createStoreAndDomain(keyspaceName string) (kv.Storage, *domain.Domain) {
copr.GlobalMPPFailedStoreProber.Run()
mppcoordmanager.InstanceMPPCoordinatorManager.Run()
// Bootstrap a session to load information schema.
err = ddl.StartOwnerManager(context.Background(), storage)
terror.MustNil(err)
dom, err := session.BootstrapSession(storage)
terror.MustNil(err)
return storage, dom
Expand Down Expand Up @@ -859,7 +861,7 @@ func createServer(storage kv.Storage, dom *domain.Domain) *server.Server {
svr, err := server.NewServer(cfg, driver)
// Both domain and storage have started, so we have to clean them before exiting.
if err != nil {
closeDomainAndStorage(storage, dom)
closeDDLOwnerMgrDomainAndStorage(storage, dom)
log.Fatal("failed to create the server", zap.Error(err), zap.Stack("stack"))
}
svr.SetDomain(dom)
Expand Down Expand Up @@ -893,9 +895,10 @@ func setupTracing() {
opentracing.SetGlobalTracer(tracer)
}

func closeDomainAndStorage(storage kv.Storage, dom *domain.Domain) {
func closeDDLOwnerMgrDomainAndStorage(storage kv.Storage, dom *domain.Domain) {
tikv.StoreShuttingDown(1)
dom.Close()
ddl.CloseOwnerManager()
copr.GlobalMPPFailedStoreProber.Stop()
mppcoordmanager.InstanceMPPCoordinatorManager.Stop()
err := storage.Close()
Expand All @@ -918,7 +921,7 @@ func cleanup(svr *server.Server, storage kv.Storage, dom *domain.Domain) {
// See https://github.com/pingcap/tidb/issues/40038 for details.
svr.KillSysProcesses()
plugin.Shutdown(context.Background())
closeDomainAndStorage(storage, dom)
closeDDLOwnerMgrDomainAndStorage(storage, dom)
disk.CleanUp()
closeStmtSummary()
topsql.Close()
Expand Down
4 changes: 2 additions & 2 deletions pkg/autoid_service/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ func MockForTest(store kv.Storage) autoid.AutoIDAllocClient {

// Close closes the Service and clean up resource.
func (s *Service) Close() {
if s.leaderShip != nil && s.leaderShip.IsOwner() {
s.leaderShip.Cancel()
if s.leaderShip != nil {
s.leaderShip.Close()
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_library(
"modify_column.go",
"multi_schema_change.go",
"options.go",
"owner_mgr.go",
"partition.go",
"placement_policy.go",
"reorg.go",
Expand Down Expand Up @@ -133,6 +134,7 @@ go_library(
"//pkg/sessiontxn",
"//pkg/statistics",
"//pkg/statistics/handle",
"//pkg/store",
"//pkg/store/driver/txn",
"//pkg/store/helper",
"//pkg/table",
Expand Down Expand Up @@ -251,6 +253,7 @@ go_test(
"multi_schema_change_test.go",
"mv_index_test.go",
"options_test.go",
"owner_mgr_test.go",
"partition_test.go",
"placement_policy_ddl_test.go",
"placement_policy_test.go",
Expand Down Expand Up @@ -356,6 +359,7 @@ go_test(
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_golang_google_grpc//:grpc",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
Expand Down
14 changes: 10 additions & 4 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ const (
ddlSchemaVersionKeyLock = "/tidb/ddl/schema_version_lock"
// addingDDLJobPrefix is the path prefix used to record the newly added DDL job, and it's saved to etcd.
addingDDLJobPrefix = "/tidb/ddl/add_ddl_job_"
ddlPrompt = "ddl"
// Prompt is the prompt for ddl owner manager.
Prompt = "ddl"

batchAddingJobs = 100

Expand Down Expand Up @@ -638,19 +639,21 @@ func newDDL(ctx context.Context, options ...Option) (*ddl, *executor) {
o(opt)
}

id := uuid.New().String()
var id string
var manager owner.Manager
var schemaVerSyncer schemaver.Syncer
var serverStateSyncer serverstate.Syncer
var deadLockCkr util.DeadTableLockChecker
if etcdCli := opt.EtcdCli; etcdCli == nil {
id = uuid.New().String()
// The etcdCli is nil if the store is localstore which is only used for testing.
// So we use mockOwnerManager and memSyncer.
manager = owner.NewMockManager(ctx, id, opt.Store, DDLOwnerKey)
schemaVerSyncer = schemaver.NewMemSyncer()
serverStateSyncer = serverstate.NewMemSyncer()
} else {
manager = owner.NewOwnerManager(ctx, etcdCli, ddlPrompt, id, DDLOwnerKey)
id = globalOwnerManager.ID()
manager = globalOwnerManager.OwnerManager()
schemaVerSyncer = schemaver.NewEtcdSyncer(etcdCli, id)
serverStateSyncer = serverstate.NewEtcdSyncer(etcdCli, util.ServerGlobalState)
deadLockCkr = util.NewDeadTableLockChecker(etcdCli)
Expand Down Expand Up @@ -1003,7 +1006,10 @@ func (d *ddl) close() {
startTime := time.Now()
d.cancel()
d.wg.Wait()
d.ownerManager.Cancel()
// when run with real-tikv, the lifecycle of ownerManager is managed by globalOwnerManager,
// when run with uni-store BreakCampaignLoop is same as Close.
// hope we can unify it after refactor to let some components only start once.
d.ownerManager.BreakCampaignLoop()
d.schemaVerSyncer.Close()

// d.delRangeMgr using sessions from d.sessPool.
Expand Down
97 changes: 97 additions & 0 deletions pkg/ddl/owner_mgr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"context"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/owner"
storepkg "github.com/pingcap/tidb/pkg/store"
"github.com/pingcap/tidb/pkg/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

var globalOwnerManager = &ownerManager{}

// StartOwnerManager starts a global DDL owner manager.
func StartOwnerManager(ctx context.Context, store kv.Storage) error {
return globalOwnerManager.Start(ctx, store)
}

// CloseOwnerManager closes the global DDL owner manager.
func CloseOwnerManager() {
globalOwnerManager.Close()
}

// ownerManager is used to manage lifecycle of a global DDL owner manager which
// we only want it to init session once, to avoid DDL owner change after upgrade.
type ownerManager struct {
etcdCli *clientv3.Client
id string
ownerMgr owner.Manager
started bool
}

// Start starts the TiDBInstance.
func (om *ownerManager) Start(ctx context.Context, store kv.Storage) error {
// BR might start domain multiple times, we need to avoid it. when BR have refactored
// this part, we can remove this.
if om.started {
return nil
}
if config.GetGlobalConfig().Store != "tikv" {
return nil
}
cli, err := storepkg.NewEtcdCli(store)
if err != nil {
return errors.Trace(err)
}
failpoint.InjectCall("injectEtcdClient", &cli)
if cli == nil {
return errors.New("etcd client is nil, maybe the server is not started with PD")
}
om.id = uuid.New().String()
om.etcdCli = cli
om.ownerMgr = owner.NewOwnerManager(ctx, om.etcdCli, Prompt, om.id, DDLOwnerKey)
om.started = true
return nil
}

// Close closes the TiDBInstance.
func (om *ownerManager) Close() {
if om.ownerMgr != nil {
om.ownerMgr.Close()
}
if om.etcdCli != nil {
if err := om.etcdCli.Close(); err != nil {
logutil.BgLogger().Error("close etcd client failed", zap.Error(err))
}
}
om.started = false
}

func (om *ownerManager) ID() string {
return om.id
}

func (om *ownerManager) OwnerManager() owner.Manager {
return om.ownerMgr
}
Loading