Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions ddl/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"math"
"strconv"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
Expand Down Expand Up @@ -87,7 +89,7 @@ type SchemaSyncer interface {
type schemaVersionSyncer struct {
selfSchemaVerPath string
etcdCli *clientv3.Client
session *concurrency.Session
session unsafe.Pointer
mu struct {
sync.RWMutex
globalVerCh clientv3.WatchChan
Expand Down Expand Up @@ -138,23 +140,33 @@ func (s *schemaVersionSyncer) Init(ctx context.Context) error {
return errors.Trace(err)
}
logPrefix := fmt.Sprintf("[%s] %s", ddlPrompt, s.selfSchemaVerPath)
s.session, err = owner.NewSession(ctx, logPrefix, s.etcdCli, owner.NewSessionDefaultRetryCnt, SyncerSessionTTL)
session, err := owner.NewSession(ctx, logPrefix, s.etcdCli, owner.NewSessionDefaultRetryCnt, SyncerSessionTTL)
if err != nil {
return errors.Trace(err)
}
s.storeSession(session)

s.mu.Lock()
s.mu.globalVerCh = s.etcdCli.Watch(ctx, DDLGlobalSchemaVersion)
s.mu.Unlock()

err = s.putKV(ctx, keyOpDefaultRetryCnt, s.selfSchemaVerPath, InitialVersion,
clientv3.WithLease(s.session.Lease()))
clientv3.WithLease(s.loadSession().Lease()))

return errors.Trace(err)
}

func (s *schemaVersionSyncer) loadSession() *concurrency.Session {
return (*concurrency.Session)(atomic.LoadPointer(&s.session))
}

func (s *schemaVersionSyncer) storeSession(session *concurrency.Session) {
atomic.StorePointer(&s.session, (unsafe.Pointer)(session))
}

// Done implements SchemaSyncer.Done interface.
func (s *schemaVersionSyncer) Done() <-chan struct{} {
return s.session.Done()
return s.loadSession().Done()
}

// Restart implements SchemaSyncer.Restart interface.
Expand All @@ -171,12 +183,12 @@ func (s *schemaVersionSyncer) Restart(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
s.session = session
s.storeSession(session)

childCtx, cancel := context.WithTimeout(ctx, keyOpDefaultTimeout)
defer cancel()
err = s.putKV(childCtx, putKeyRetryUnlimited, s.selfSchemaVerPath, InitialVersion,
clientv3.WithLease(s.session.Lease()))
clientv3.WithLease(s.loadSession().Lease()))

return errors.Trace(err)
}
Expand Down Expand Up @@ -214,7 +226,7 @@ func (s *schemaVersionSyncer) UpdateSelfVersion(ctx context.Context, version int
startTime := time.Now()
ver := strconv.FormatInt(version, 10)
err := s.putKV(ctx, putKeyNoRetry, s.selfSchemaVerPath, ver,
clientv3.WithLease(s.session.Lease()))
clientv3.WithLease(s.loadSession().Lease()))

metrics.UpdateSelfVersionHistogram.WithLabelValues(metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
return errors.Trace(err)
Expand Down
10 changes: 8 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

// Domain represents a storage space. Different domains can use the same database name.
Expand Down Expand Up @@ -351,13 +352,12 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) {
case <-syncer.Done():
// The schema syncer stops, we need stop the schema validator to synchronize the schema version.
log.Info("[ddl] reload schema in loop, schema syncer need restart")
do.SchemaValidator.Stop()
err := do.mustRestartSyncer()
if err != nil {
log.Errorf("[ddl] reload schema in loop, schema syncer restart err %v", errors.ErrorStack(err))
break
}
do.SchemaValidator.Restart()
log.Info("[ddl] schema syncer restarted.")
case <-do.exit:
return
}
Expand Down Expand Up @@ -471,6 +471,12 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R
DialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
grpc.WithBackoffMaxDelay(time.Second * 3),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(10) * time.Second,
Timeout: time.Duration(3) * time.Second,
PermitWithoutStream: true,
}),
},
TLS: ebd.TLSConfig(),
})
Expand Down