Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,18 +652,19 @@ type LockedMigrations struct {
ReadLock storage.RemoteLock
}

func (rc *LogClient) GetMigrations(ctx context.Context) (*LockedMigrations, error) {
func (rc *LogClient) GetLockedMigrations(ctx context.Context) (*LockedMigrations, error) {
ext := stream.MigrationExtension(rc.storage)
readLock, err := ext.GetReadLock(ctx, "restore stream")
if err != nil {
return nil, err
}

migs, err := ext.Load(ctx)
if err != nil {
return nil, errors.Trace(err)
}

ms := migs.ListAll()
readLock, err := ext.GetReadLock(ctx, "restore stream")
if err != nil {
return nil, err
}

lms := &LockedMigrations{
Migs: ms,
Expand Down
23 changes: 16 additions & 7 deletions br/pkg/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,21 @@ func (l *LocalStorage) WalkDir(_ context.Context, opt *WalkOption, fn func(strin
// we should return nil to continue.
return nil
}
path, err = filepath.Rel(l.base, path)
relativeToBase, err := filepath.Rel(base, path)
if err != nil {
log.Panic("filepath.Walk returns a path that isn't a subdir of the base dir.",
zap.String("path", path), zap.String("base", l.base), logutil.ShortError(err))
zap.String("path", path), zap.String("base", base), logutil.ShortError(err))
}
if !strings.HasPrefix(path, opt.ObjPrefix) {
if !strings.HasPrefix(relativeToBase, opt.ObjPrefix) {
return nil
}

// Convert to relative path from l.base for consistency with cloud storage
path, err = filepath.Rel(l.base, path)
if err != nil {
log.Panic("filepath.Walk returns a path that isn't a subdir of the base dir.",
zap.String("path", path), zap.String("base", l.base), logutil.ShortError(err))
}
// NOTE: This may cause a tombstone of the dir emit to the caller when
// call `Walk` in a non-exist dir.
return fn(path, TombstoneSize)
Expand All @@ -158,13 +165,15 @@ func (l *LocalStorage) WalkDir(_ context.Context, opt *WalkOption, fn func(strin
return nil
}
// in mac osx, the path parameter is absolute path; in linux, the path is relative path to execution base dir,
// so use Rel to convert to relative path to l.base
path, _ = filepath.Rel(l.base, path)

if !strings.HasPrefix(path, opt.ObjPrefix) {
// so use Rel to convert to relative path to the directory being walked (base)
relativeToBase, _ := filepath.Rel(base, path)
if !strings.HasPrefix(relativeToBase, opt.ObjPrefix) {
return nil
}

// Convert to relative path from l.base for consistency with cloud storage
path, _ = filepath.Rel(l.base, path)

size := f.Size()
// if not a regular file, we need to use os.stat to get the real file size
if !f.Mode().IsRegular() {
Expand Down
44 changes: 28 additions & 16 deletions br/pkg/storage/locking.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"math"
"math/rand"
"os"
"path"
Expand Down Expand Up @@ -65,7 +64,7 @@ func (cx *VerifyWriteContext) IntentFileName() string {
// - There shouldn't be any other intention files.
// - Verify() returns no error. (If there is one.)
func (w conditionalPut) CommitTo(ctx context.Context, s ExternalStorage) (uuid.UUID, error) {
if _, ok := s.(StrongConsisency); !ok {
if _, ok := s.(StrongConsistency); !ok {
log.Warn("The external storage implementation doesn't provide a strong consistency guarantee. "+
"Please avoid concurrently accessing it if possible.",
zap.String("type", fmt.Sprintf("%T", s)))
Expand Down Expand Up @@ -172,7 +171,7 @@ func MakeLockMeta(hint string) LockMeta {
return meta
}

func readLockMeta(ctx context.Context, storage ExternalStorage, path string) (LockMeta, error) {
func getLockMeta(ctx context.Context, storage ExternalStorage, path string) (LockMeta, error) {
file, err := storage.ReadFile(ctx, path)
if err != nil {
return LockMeta{}, errors.Annotatef(err, "failed to read existed lock file %s", path)
Expand All @@ -196,8 +195,8 @@ func (l *RemoteLock) String() string {
return fmt.Sprintf("{path=%s,uuid=%s,storage_uri=%s}", l.path, l.txnID, l.storage.URI())
}

func tryFetchRemoteLock(ctx context.Context, storage ExternalStorage, path string) error {
meta, err := readLockMeta(ctx, storage, path)
func tryFetchRemoteLockInfo(ctx context.Context, storage ExternalStorage, path string) error {
meta, err := getLockMeta(ctx, storage, path)
if err != nil {
return err
}
Expand Down Expand Up @@ -231,21 +230,22 @@ func TryLockRemote(ctx context.Context, storage ExternalStorage, path, hint stri
lock.path = path
lock.txnID, err = writer.CommitTo(ctx, storage)
if err != nil {
err = errors.Annotatef(err, "there is something about the lock: %s", tryFetchRemoteLock(ctx, storage, path))
lockInfo := tryFetchRemoteLockInfo(ctx, storage, path)
err = errors.Annotatef(err, "failed to acquire lock on '%s': %s", path, lockInfo)
}
return
}

// UnlockRemote removes the lock file at the specified path.
// Unlock removes the lock file at the specified path.
// Removing that file will release the lock.
func (l RemoteLock) Unlock(ctx context.Context) error {
meta, err := readLockMeta(ctx, l.storage, l.path)
meta, err := getLockMeta(ctx, l.storage, l.path)
if err != nil {
return err
}
// NOTE: this is for debug usage. For now, there isn't an Compare-And-Swap
// NOTE: this is for debug usage. For now, there isn't a Compare-And-Swap
// operation in our ExternalStorage abstraction.
// So, once our lock has been overwritten or we are overwriting other's lock,
// So, once our lock has been overwritten, or we are overwriting other's lock,
// this information will be useful for troubleshooting.
if !bytes.Equal(l.txnID[:], meta.TxnID) {
return errors.Errorf("Txn ID mismatch: remote is %v, our is %v", meta.TxnID, l.txnID)
Expand Down Expand Up @@ -287,22 +287,35 @@ func newReadLockName(path string) string {

type Locker = func(ctx context.Context, storage ExternalStorage, path, hint string) (lock RemoteLock, err error)

func LockWith(ctx context.Context, locker Locker, storage ExternalStorage, path, hint string) (lock RemoteLock, err error) {
const (
// lockRetryTimes specifies the maximum number of times to retry acquiring a lock.
// This prevents infinite retries while allowing enough attempts for temporary contention to resolve.
lockRetryTimes = 60
)

func LockWithRetry(ctx context.Context, locker Locker, storage ExternalStorage, path, hint string) (
lock RemoteLock, err error) {
const JitterMs = 5000

retry := utils.InitialRetryState(math.MaxInt, 1*time.Second, 60*time.Second)
retry := utils.InitialRetryState(lockRetryTimes, 1*time.Second, 60*time.Second)
jitter := time.Duration(rand.Uint32()%JitterMs+(JitterMs/2)) * time.Millisecond
for {
lock, err = locker(ctx, storage, path, hint)
if err == nil {
return lock, nil
}

if !retry.ShouldRetry() {
return RemoteLock{}, errors.Annotatef(err, "failed to acquire lock after %d retries", lockRetryTimes)
}

retryAfter := retry.ExponentialBackoff() + jitter
log.Info(
"Encountered lock, will retry then.",
"Encountered lock, will retry",
logutil.ShortError(err),
zap.String("path", path),
zap.Duration("retry-after", retryAfter),
zap.Int("remaining-attempts", retry.RemainingAttempts()),
)

select {
Expand Down Expand Up @@ -340,7 +353,7 @@ func TryLockRemoteWrite(ctx context.Context, storage ExternalStorage, path, hint
lock.path = target
lock.txnID, err = writer.CommitTo(ctx, storage)
if err != nil {
err = errors.Annotatef(err, "there is something about the lock: %s", tryFetchRemoteLock(ctx, storage, target))
err = errors.Annotatef(err, "something wrong about the lock: %s", tryFetchRemoteLockInfo(ctx, storage, target))
}
return
}
Expand Down Expand Up @@ -373,8 +386,7 @@ func TryLockRemoteRead(ctx context.Context, storage ExternalStorage, path, hint
lock.txnID, err = writer.CommitTo(ctx, storage)
if err != nil {
err = errors.Annotatef(err, "failed to commit the lock due to existing lock: "+
"there is something about the lock: %s", tryFetchRemoteLock(ctx, storage, writeLock))
"something wrong about the lock: %s", tryFetchRemoteLockInfo(ctx, storage, writeLock))
}

return
}
2 changes: 1 addition & 1 deletion br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Permission string

// StrongConsistency is a marker interface that indicates the storage is strong consistent
// over its `Read`, `Write` and `WalkDir` APIs.
type StrongConsisency interface {
type StrongConsistency interface {
MarkStrongConsistency()
}

Expand Down
60 changes: 40 additions & 20 deletions br/pkg/stream/stream_metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
metaSuffix = ".meta"
migrationPrefix = "v1/migrations"
lockPrefix = "v1/LOCK"
appendLockPrefix = "v1/APPEND_LOCK"

SupportedMigVersion = pb.MigrationVersion_M2
)
Expand Down Expand Up @@ -463,17 +464,17 @@ func (m MigrationExt) AddMigrationToTable(ctx context.Context, mig *pb.Migration
// MigrationExt is an extension to the `ExternalStorage` type.
// This added some support methods for the "migration" system of log backup.
//
// Migrations are idempontent batch modifications (adding a compaction, delete a file, etc..) to the backup files.
// Migrations are idempotent batch modifications (adding a compaction, delete a file, etc..) to the backup files.
// You may check the protocol buffer message `Migration` for more details.
// Idempontence is important for migrations, as they may be executed multi times due to retry or racing.
// Idempotence is important for migrations, as they may be executed multiple times due to retry or racing.
//
// The encoded migrations will be put in a folder in the external storage,
// they are ordered by a series number.
//
// Not all migrations can be applied to the storage then removed from the migration.
// Small "additions" will be inlined into the migration, for example, a `Compaction`.
// Small "deletions" will also be delayed, for example, deleting a span in a file.
// Such operations will be save to a special migration, the first migration, named "BASE".
// Such operations will be saved to a special migration, the first migration, named "BASE".
//
// A simple list of migrations (loaded by `Load()`):
/*
Expand Down Expand Up @@ -592,7 +593,7 @@ func (NoHooks) StartHandlingMetaEdits([]*pb.MetaEdit)
func (NoHooks) HandledAMetaEdit(*pb.MetaEdit) {}
func (NoHooks) HandingMetaEditDone() {}

// MigrateionExtnsion installs the extension methods to an `ExternalStorage`.
// MigrationExtension installs the extension methods to an `ExternalStorage`.
func MigrationExtension(s storage.ExternalStorage) MigrationExt {
return MigrationExt{
s: s,
Expand All @@ -601,7 +602,7 @@ func MigrationExtension(s storage.ExternalStorage) MigrationExt {
}
}

// Merge merges two migrations.
// MergeMigrations merges two migrations.
// The merged migration contains all operations from the two arguments.
func MergeMigrations(m1 *pb.Migration, m2 *pb.Migration) *pb.Migration {
out := NewMigration()
Expand Down Expand Up @@ -650,7 +651,7 @@ type Migrations struct {

// GetReadLock locks the storage and make sure there won't be other one modify this backup.
func (m *MigrationExt) GetReadLock(ctx context.Context, hint string) (storage.RemoteLock, error) {
return storage.LockWith(ctx, storage.TryLockRemoteRead, m.s, lockPrefix, hint)
return storage.LockWithRetry(ctx, storage.TryLockRemoteRead, m.s, lockPrefix, hint)
}

// OrderedMigration is a migration with its path and sequence number.
Expand Down Expand Up @@ -762,12 +763,37 @@ func (m MigrationExt) DryRun(f func(MigrationExt)) []storage.Effect {
return batchSelf.s.(*storage.Batched).ReadOnlyEffects()
}

// lockForAppend implements two-phase locking for append migration operations:
// 1. Acquire read lock on main path (allows coexistence with restore)
// 2. Acquire write lock on append path (prevents concurrent appends)
func (m MigrationExt) lockForAppend(ctx context.Context, hint string) (
readLock, appendLock storage.RemoteLock, err error) {
// Phase 1: Acquire read lock on main path to coexist with restore but conflict with truncate
readLock, err = storage.LockWithRetry(ctx, storage.TryLockRemoteRead, m.s, lockPrefix, hint+" (read)")
if err != nil {
return storage.RemoteLock{}, storage.RemoteLock{}, errors.Annotate(err,
"failed to acquire read lock for append operation")
}

// Phase 2: Acquire write lock on append path to prevent concurrent appends
appendLock, err = storage.LockWithRetry(ctx, storage.TryLockRemoteWrite, m.s, appendLockPrefix, hint+" (append)")
if err != nil {
// If append lock fails, release the read lock
readLock.UnlockOnCleanUp(ctx)
return storage.RemoteLock{}, storage.RemoteLock{}, errors.Annotate(err, "failed to acquire append lock")
}

return readLock, appendLock, nil
}

func (m MigrationExt) AppendMigration(ctx context.Context, mig *pb.Migration) (int, error) {
lock, err := storage.LockWith(ctx, storage.TryLockRemoteWrite, m.s, lockPrefix, "AppendMigration")
log.Info("appending migration, trying to acquire two-phase lock")
readLock, appendLock, err := m.lockForAppend(ctx, "AppendMigration")
if err != nil {
return 0, err
}
defer lock.UnlockOnCleanUp(ctx)
defer readLock.UnlockOnCleanUp(ctx)
defer appendLock.UnlockOnCleanUp(ctx)

migs, err := m.Load(ctx)
if err != nil {
Expand Down Expand Up @@ -856,7 +882,8 @@ func MMOptAppendPhantomMigration(migs ...pb.Migration) MergeAndMigrateToOpt {
}

// MergeAndMigrateTo will merge the migrations from BASE until the specified SN, then migrate to it.
// Finally it writes the new BASE and remove stale migrations from the storage.
// Finally, it writes the new BASE and remove stale migrations from the storage.
// It's used by Stream Truncation.
func (m MigrationExt) MergeAndMigrateTo(
ctx context.Context,
targetSpec int,
Expand All @@ -868,7 +895,8 @@ func (m MigrationExt) MergeAndMigrateTo(
}

if !config.skipLockingInTest {
lock, err := storage.LockWith(ctx, storage.TryLockRemoteWrite, m.s, lockPrefix, "AppendMigration")
lock, err := storage.LockWithRetry(ctx, storage.TryLockRemoteWrite, m.s, lockPrefix,
"StreamTruncation: MergeMigration")
if err != nil {
result.MigratedTo = MigratedTo{
Warnings: []error{
Expand Down Expand Up @@ -939,7 +967,7 @@ func (m MigrationExt) MergeAndMigrateTo(
}

for _, mig := range result.Source {
// Perhaps a phanom migration.
// Perhaps a phantom migration.
if len(mig.Path) > 0 {
err = m.s.DeleteFile(ctx, mig.Path)
if err != nil {
Expand Down Expand Up @@ -979,7 +1007,7 @@ func MTMaybeSkipTruncateLog(cond bool) migrateToOpt {

// migrateTo migrates to a migration.
// If encountered some error during executing some operation, the operation will be put
// to the new BASE, which can be retryed then.
// to the new BASE, which can be retried then.
func (m MigrationExt) migrateTo(ctx context.Context, mig *pb.Migration, opts ...migrateToOpt) MigratedTo {
opt := migToOpt{}
for _, o := range opts {
Expand Down Expand Up @@ -1671,11 +1699,3 @@ func hashMetaEdit(metaEdit *pb.MetaEdit) uint64 {
func nameOf(mig *pb.Migration, sn int) string {
return fmt.Sprintf("%08d_%016X.mgrt", sn, hashMigration(mig))
}

func isEmptyMigration(mig *pb.Migration) bool {
return len(mig.Compactions) == 0 &&
len(mig.EditMeta) == 0 &&
len(mig.IngestedSstPaths) == 0 &&
len(mig.DestructPrefix) == 0 &&
mig.TruncatedTo == 0
}
16 changes: 14 additions & 2 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1520,12 +1520,24 @@ func restoreStream(
}

client := cfg.logClient
migs, err := client.GetMigrations(ctx)
migs, err := client.GetLockedMigrations(ctx)
if err != nil {
return errors.Trace(err)
}
client.BuildMigrations(migs.Migs)
defer cleanUpWithRetErr(&err, migs.ReadLock.Unlock)

skipCleanup := false
failpoint.Inject("skip-migration-read-lock-cleanup", func(_ failpoint.Value) {
// Skip the cleanup - this keeps the read lock held
// and will cause lock conflicts for other restore operations
log.Info("Skipping migration read lock cleanup due to failpoint")
skipCleanup = true
})

if !skipCleanup {
defer cleanUpWithRetErr(&err, migs.ReadLock.Unlock)
}

defer client.RestoreSSTStatisticFields(&extraFields)

ddlFiles := cfg.ddlFiles
Expand Down
Loading
Loading