Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/gluetidb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_test(
deps = [
"//br/pkg/glue",
"//pkg/parser/model",
"//pkg/session",
"//pkg/testkit",
"//pkg/types",
"@com_github_stretchr_testify//require",
Expand Down
43 changes: 34 additions & 9 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package gluetidb

import (
"context"
"sync"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -40,28 +41,31 @@ func New() Glue {
conf.Log.EnableSlowLog.Store(false)
conf.TiKVClient.CoprReqTimeout = 1800 * time.Second
})
return Glue{}
return Glue{
startDomainMu: &sync.Mutex{},
}
}

// Glue is an implementation of glue.Glue using a new TiDB session.
type Glue struct {
glue.StdIOGlue

tikvGlue gluetikv.Glue
tikvGlue gluetikv.Glue
startDomainMu *sync.Mutex
}

type tidbSession struct {
se sessiontypes.Session
}

// GetDomain implements glue.Glue.
func (Glue) GetDomain(store kv.Storage) (*domain.Domain, error) {
func (g Glue) GetDomain(store kv.Storage) (*domain.Domain, error) {
existDom, _ := session.GetDomain(nil)
initStatsSe, err := session.CreateSession(store)
initStatsSe, err := g.createTypesSession(store)
if err != nil {
return nil, errors.Trace(err)
}
se, err := session.CreateSession(store)
se, err := g.createTypesSession(store)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -84,8 +88,8 @@ func (Glue) GetDomain(store kv.Storage) (*domain.Domain, error) {
}

// CreateSession implements glue.Glue.
func (Glue) CreateSession(store kv.Storage) (glue.Session, error) {
se, err := session.CreateSession(store)
func (g Glue) CreateSession(store kv.Storage) (glue.Session, error) {
se, err := g.createTypesSession(store)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -95,6 +99,27 @@ func (Glue) CreateSession(store kv.Storage) (glue.Session, error) {
return tiSession, nil
}

func (g Glue) startDomainAsNeeded(store kv.Storage) error {
g.startDomainMu.Lock()
defer g.startDomainMu.Unlock()
existDom, _ := session.GetDomain(nil)
if existDom != nil {
return nil
}
dom, err := session.GetDomain(store)
if err != nil {
return err
}
return dom.Start()
}

func (g Glue) createTypesSession(store kv.Storage) (sessiontypes.Session, error) {
if err := g.startDomainAsNeeded(store); err != nil {
return nil, errors.Trace(err)
}
return session.CreateSession(store)
}

// Open implements glue.Glue.
func (g Glue) Open(path string, option pd.SecurityOption) (kv.Storage, error) {
return g.tikvGlue.Open(path, option)
Expand Down Expand Up @@ -122,7 +147,7 @@ func (g Glue) GetVersion() string {

// UseOneShotSession implements glue.Glue.
func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error {
se, err := session.CreateSession(store)
se, err := g.createTypesSession(store)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -133,7 +158,7 @@ func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue
se.Close()
log.Info("one shot session closed")
}()
// dom will be created during session.CreateSession.
// dom will be created during create session.
dom, err := session.GetDomain(store)
if err != nil {
return errors.Trace(err)
Expand Down
32 changes: 24 additions & 8 deletions br/pkg/gluetidb/glue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,37 @@ import (

"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/types"
"github.com/stretchr/testify/require"
)

func TestTheSessionIsoation(t *testing.T) {
req := require.New(t)
store := testkit.CreateMockStore(t)
store, dom := session.CreateStoreAndBootstrap(t)
ctx := context.Background()

g := Glue{}
session, err := g.CreateSession(store)
// we want to test glue start domain explicitly, so close it first.
dom.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there case covering the case where dom is opened already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check glue.GetDomain, it will get session twice, domain has started when create the second one

g := New()
glueSe, err := g.CreateSession(store)
req.NoError(err)
t.Cleanup(func() {
existDom, _ := session.GetDomain(nil)
if existDom != nil {
existDom.Close()
}
})

require.NoError(t, glueSe.CreateDatabase(ctx, &model.DBInfo{
Name: model.NewCIStr("test_db"),
}))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test_db")
tk.MustExec("create table t(id int)")

req.NoError(session.ExecuteInternal(ctx, "use test;"))
req.NoError(glueSe.ExecuteInternal(ctx, "use test;"))
infos := []*model.TableInfo{}
infos = append(infos, &model.TableInfo{
Name: model.NewCIStr("tables_1"),
Expand Down Expand Up @@ -75,12 +91,12 @@ func TestTheSessionIsoation(t *testing.T) {
},
}
for _, pinfo := range polices {
before := session.(*tidbSession).se.GetInfoSchema().SchemaMetaVersion()
req.NoError(session.CreatePlacementPolicy(ctx, pinfo))
after := session.(*tidbSession).se.GetInfoSchema().SchemaMetaVersion()
before := glueSe.(*tidbSession).se.GetInfoSchema().SchemaMetaVersion()
req.NoError(glueSe.CreatePlacementPolicy(ctx, pinfo))
after := glueSe.(*tidbSession).se.GetInfoSchema().SchemaMetaVersion()
req.Greater(after, before)
}
req.NoError(session.(glue.BatchCreateTableSession).CreateTables(ctx, map[string][]*model.TableInfo{
req.NoError(glueSe.(glue.BatchCreateTableSession).CreateTables(ctx, map[string][]*model.TableInfo{
"test": infos,
}))
}