Skip to content

Commit 6a1e94f

Browse files
authored
domain: fast new a etcd session when the session is stale in the schemaVersionSyncer (#7774)
1 parent 0d979a2 commit 6a1e94f

File tree

2 files changed

+29
-9
lines changed

2 files changed

+29
-9
lines changed

ddl/syncer.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ import (
1818
"math"
1919
"strconv"
2020
"sync"
21+
"sync/atomic"
2122
"time"
23+
"unsafe"
2224

2325
"github.com/coreos/etcd/clientv3"
2426
"github.com/coreos/etcd/clientv3/concurrency"
@@ -88,7 +90,7 @@ type SchemaSyncer interface {
8890
type schemaVersionSyncer struct {
8991
selfSchemaVerPath string
9092
etcdCli *clientv3.Client
91-
session *concurrency.Session
93+
session unsafe.Pointer
9294
mu struct {
9395
sync.RWMutex
9496
globalVerCh clientv3.WatchChan
@@ -143,23 +145,32 @@ func (s *schemaVersionSyncer) Init(ctx context.Context) error {
143145
return errors.Trace(err)
144146
}
145147
logPrefix := fmt.Sprintf("[%s] %s", ddlPrompt, s.selfSchemaVerPath)
146-
s.session, err = owner.NewSession(ctx, logPrefix, s.etcdCli, owner.NewSessionDefaultRetryCnt, SyncerSessionTTL)
148+
session, err := owner.NewSession(ctx, logPrefix, s.etcdCli, owner.NewSessionDefaultRetryCnt, SyncerSessionTTL)
147149
if err != nil {
148150
return errors.Trace(err)
149151
}
152+
s.storeSession(session)
150153

151154
s.mu.Lock()
152155
s.mu.globalVerCh = s.etcdCli.Watch(ctx, DDLGlobalSchemaVersion)
153156
s.mu.Unlock()
154157

155158
err = PutKVToEtcd(ctx, s.etcdCli, keyOpDefaultRetryCnt, s.selfSchemaVerPath, InitialVersion,
156-
clientv3.WithLease(s.session.Lease()))
159+
clientv3.WithLease(s.loadSession().Lease()))
157160
return errors.Trace(err)
158161
}
159162

163+
func (s *schemaVersionSyncer) loadSession() *concurrency.Session {
164+
return (*concurrency.Session)(atomic.LoadPointer(&s.session))
165+
}
166+
167+
func (s *schemaVersionSyncer) storeSession(session *concurrency.Session) {
168+
atomic.StorePointer(&s.session, (unsafe.Pointer)(session))
169+
}
170+
160171
// Done implements SchemaSyncer.Done interface.
161172
func (s *schemaVersionSyncer) Done() <-chan struct{} {
162-
return s.session.Done()
173+
return s.loadSession().Done()
163174
}
164175

165176
// Restart implements SchemaSyncer.Restart interface.
@@ -176,12 +187,12 @@ func (s *schemaVersionSyncer) Restart(ctx context.Context) error {
176187
if err != nil {
177188
return errors.Trace(err)
178189
}
179-
s.session = session
190+
s.storeSession(session)
180191

181192
childCtx, cancel := context.WithTimeout(ctx, keyOpDefaultTimeout)
182193
defer cancel()
183194
err = PutKVToEtcd(childCtx, s.etcdCli, putKeyRetryUnlimited, s.selfSchemaVerPath, InitialVersion,
184-
clientv3.WithLease(s.session.Lease()))
195+
clientv3.WithLease(s.loadSession().Lease()))
185196

186197
return errors.Trace(err)
187198
}
@@ -219,7 +230,7 @@ func (s *schemaVersionSyncer) UpdateSelfVersion(ctx context.Context, version int
219230
startTime := time.Now()
220231
ver := strconv.FormatInt(version, 10)
221232
err := PutKVToEtcd(ctx, s.etcdCli, putKeyNoRetry, s.selfSchemaVerPath, ver,
222-
clientv3.WithLease(s.session.Lease()))
233+
clientv3.WithLease(s.loadSession().Lease()))
223234

224235
metrics.UpdateSelfVersionHistogram.WithLabelValues(metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
225236
return errors.Trace(err)

domain/domain.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/grpc-ecosystem/go-grpc-prometheus"
2626
"github.com/ngaut/pools"
2727
"github.com/pingcap/tidb/ast"
28+
"github.com/pingcap/tidb/config"
2829
"github.com/pingcap/tidb/ddl"
2930
"github.com/pingcap/tidb/infoschema"
3031
"github.com/pingcap/tidb/kv"
@@ -42,6 +43,7 @@ import (
4243
log "github.com/sirupsen/logrus"
4344
"golang.org/x/net/context"
4445
"google.golang.org/grpc"
46+
"google.golang.org/grpc/keepalive"
4547
)
4648

4749
// Domain represents a storage space. Different domains can use the same database name.
@@ -405,16 +407,16 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) {
405407
case <-syncer.Done():
406408
// The schema syncer stops, we need stop the schema validator to synchronize the schema version.
407409
log.Info("[ddl] reload schema in loop, schema syncer need restart")
408-
do.SchemaValidator.Stop()
409410
err := do.mustRestartSyncer()
410411
if err != nil {
411412
log.Errorf("[ddl] reload schema in loop, schema syncer restart err %v", errors.ErrorStack(err))
412413
break
413414
}
414-
do.SchemaValidator.Restart()
415+
log.Info("[ddl] schema syncer restarted.")
415416
case <-do.info.Done():
416417
log.Info("[ddl] reload schema in loop, server info syncer need restart")
417418
do.info.Restart(context.Background())
419+
log.Info("[ddl] server info syncer restarted.")
418420
case <-do.exit:
419421
return
420422
}
@@ -527,12 +529,19 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
527529
func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.Resource, error)) error {
528530
if ebd, ok := do.store.(EtcdBackend); ok {
529531
if addrs := ebd.EtcdAddrs(); addrs != nil {
532+
cfg := config.GetGlobalConfig()
530533
cli, err := clientv3.New(clientv3.Config{
531534
Endpoints: addrs,
532535
DialTimeout: 5 * time.Second,
533536
DialOptions: []grpc.DialOption{
534537
grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
535538
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
539+
grpc.WithBackoffMaxDelay(time.Second * 3),
540+
grpc.WithKeepaliveParams(keepalive.ClientParameters{
541+
Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second,
542+
Timeout: time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second,
543+
PermitWithoutStream: true,
544+
}),
536545
},
537546
TLS: ebd.TLSConfig(),
538547
})

0 commit comments

Comments
 (0)