Skip to content

Commit f0ba482

Browse files
Tristan1900ti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#61610
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 240cdc5 commit f0ba482

File tree

6 files changed

+621
-22
lines changed

6 files changed

+621
-22
lines changed

br/pkg/registry/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ go_library(
1515
"//br/pkg/utils",
1616
"//pkg/domain",
1717
"//pkg/kv",
18+
"//pkg/util/chunk",
1819
"//pkg/util/sqlexec",
1920
"//pkg/util/table-filter",
2021
"@com_github_pingcap_errors//:errors",

br/pkg/registry/registration.go

Lines changed: 309 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/pingcap/tidb/br/pkg/utils"
2929
"github.com/pingcap/tidb/pkg/domain"
3030
"github.com/pingcap/tidb/pkg/kv"
31+
"github.com/pingcap/tidb/pkg/util/chunk"
3132
"github.com/pingcap/tidb/pkg/util/sqlexec"
3233
filter "github.com/pingcap/tidb/pkg/util/table-filter"
3334
"go.uber.org/zap"
@@ -43,6 +44,9 @@ const (
4344
// Using ASCII Unit Separator (US) character which never appears in SQL identifiers or expressions.
4445
FilterSeparator = "\x1F"
4546

47+
// StaleTaskThresholdMinutes is the threshold in minutes to consider a running task as potentially stale
48+
StaleTaskThresholdMinutes = 5
49+
4650
// lookupRegistrationSQLTemplate is the SQL template for looking up a registration by its parameters
4751
lookupRegistrationSQLTemplate = `
4852
SELECT id, status FROM %s.%s
@@ -83,6 +87,40 @@ const (
8387
(filter_strings, filter_hash, start_ts, restored_ts, upstream_cluster_id,
8488
with_sys_table, status, cmd, task_start_time, last_heartbeat_time)
8589
VALUES (%%?, MD5(%%?), %%?, %%?, %%?, %%?, 'running', %%?, %%?, %%?)`
90+
91+
// selectStaleRunningTasksSQLTemplate is the SQL template for finding potentially stale running tasks
92+
selectStaleRunningTasksSQLTemplate = `
93+
SELECT id, last_heartbeat_time
94+
FROM %s.%s
95+
WHERE status = 'running'
96+
AND last_heartbeat_time < DATE_SUB(NOW(), INTERVAL %%? MINUTE)
97+
ORDER BY id ASC`
98+
99+
// selectTaskHeartbeatSQLTemplate is the SQL template for getting a specific task's heartbeat time
100+
selectTaskHeartbeatSQLTemplate = `
101+
SELECT last_heartbeat_time
102+
FROM %s.%s
103+
WHERE id = %%?`
104+
105+
// selectConflictingTaskSQLTemplate is the SQL template for finding tasks with same parameters except restoredTS
106+
selectConflictingTaskSQLTemplate = `
107+
SELECT id, restored_ts, status FROM %s.%s
108+
WHERE filter_hash = MD5(%%?)
109+
AND start_ts = %%?
110+
AND upstream_cluster_id = %%?
111+
AND with_sys_table = %%?
112+
AND cmd = %%?
113+
AND restored_ts != %%?
114+
AND status IN ('running', 'paused')
115+
ORDER BY id DESC
116+
LIMIT 1`
117+
118+
// transitionStaleTaskToPausedSQLTemplate is the SQL template for atomically transitioning a
119+
// stale running task to paused
120+
transitionStaleTaskToPausedSQLTemplate = `
121+
UPDATE %s.%s
122+
SET status = 'paused'
123+
WHERE id = %%? AND status = 'running' AND last_heartbeat_time = %%?`
86124
)
87125

88126
// TaskStatus represents the current state of a restore task
@@ -194,7 +232,23 @@ func (r *Registry) executeInTransaction(ctx context.Context, fn func(context.Con
194232

195233
// ResumeOrCreateRegistration first looks for an existing registration with the given parameters.
196234
// If found and paused, it tries to resume it. Otherwise, it creates a new registration.
197-
func (r *Registry) ResumeOrCreateRegistration(ctx context.Context, info RegistrationInfo) (uint64, error) {
235+
// Returns: (taskID, resolvedRestoreTS, error)
236+
func (r *Registry) ResumeOrCreateRegistration(ctx context.Context, info RegistrationInfo,
237+
isRestoredTSUserSpecified bool) (uint64, uint64, error) {
238+
// resolve which restoredTS to use, handling auto-detection conflicts
239+
resolvedRestoreTS, err := r.resolveRestoreTS(ctx, info, isRestoredTSUserSpecified)
240+
if err != nil {
241+
return 0, 0, err
242+
}
243+
244+
// update info with resolved restoredTS if different
245+
if resolvedRestoreTS != info.RestoredTS {
246+
log.Info("using resolved restoredTS from existing task",
247+
zap.Uint64("original_restored_ts", info.RestoredTS),
248+
zap.Uint64("resolved_restored_ts", resolvedRestoreTS))
249+
info.RestoredTS = resolvedRestoreTS
250+
}
251+
198252
filterStrings := strings.Join(info.FilterStrings, FilterSeparator)
199253

200254
log.Info("attempting to resume or create registration",
@@ -203,11 +257,12 @@ func (r *Registry) ResumeOrCreateRegistration(ctx context.Context, info Registra
203257
zap.Uint64("restored_ts", info.RestoredTS),
204258
zap.Uint64("upstream_cluster_id", info.UpstreamClusterID),
205259
zap.Bool("with_sys_table", info.WithSysTable),
206-
zap.String("cmd", info.Cmd))
260+
zap.String("cmd", info.Cmd),
261+
zap.Bool("is_restored_ts_user_specified", isRestoredTSUserSpecified))
207262

208263
var taskID uint64
209264

210-
err := r.executeInTransaction(ctx, func(ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor,
265+
err = r.executeInTransaction(ctx, func(ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor,
211266
sessionOpts []sqlexec.OptionFuncAlias) error {
212267
// first look for an existing task with the same parameters
213268
lookupSQL := fmt.Sprintf(lookupRegistrationSQLTemplate, RestoreRegistryDBName, RestoreRegistryTableName)
@@ -292,10 +347,10 @@ func (r *Registry) ResumeOrCreateRegistration(ctx context.Context, info Registra
292347
})
293348

294349
if err != nil {
295-
return 0, err
350+
return 0, 0, err
296351
}
297352

298-
return taskID, nil
353+
return taskID, resolvedRestoreTS, nil
299354
}
300355

301356
// updateTaskStatusConditional updates a task's status only if its current status matches the expected status
@@ -498,3 +553,252 @@ func (r *Registry) StopHeartbeatManager() {
498553
log.Info("stopped heartbeat manager for restore task")
499554
}
500555
}
556+
557+
// resolveRestoreTS determines which restoredTS to use, handling conflicts with existing tasks
558+
// when restoredTS is not user-specified. Returns: (resolvedRestoreTS, error)
559+
func (r *Registry) resolveRestoreTS(ctx context.Context,
560+
info RegistrationInfo, isRestoredTSUserSpecified bool) (uint64, error) {
561+
// if restoredTS is user-specified, use it directly without any conflict resolution
562+
if isRestoredTSUserSpecified {
563+
log.Info("restoredTS is user-specified, using it directly",
564+
zap.Uint64("restored_ts", info.RestoredTS))
565+
return info.RestoredTS, nil
566+
}
567+
568+
filterStrings := strings.Join(info.FilterStrings, FilterSeparator)
569+
570+
// look for tasks with same filter, startTS, cluster, sysTable, cmd but different restoredTS
571+
execCtx := r.se.GetSessionCtx().GetRestrictedSQLExecutor()
572+
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR)
573+
574+
checkSQL := fmt.Sprintf(selectConflictingTaskSQLTemplate, RestoreRegistryDBName, RestoreRegistryTableName)
575+
rows, _, err := execCtx.ExecRestrictedSQL(ctx, nil, checkSQL,
576+
filterStrings, info.StartTS, info.UpstreamClusterID, info.WithSysTable, info.Cmd, info.RestoredTS)
577+
if err != nil {
578+
return 0, errors.Annotate(err, "failed to check for existing tasks with different restoredTS")
579+
}
580+
581+
// no conflicting task found, use the current restoredTS
582+
if len(rows) == 0 {
583+
log.Info("no existing tasks found with different restoredTS",
584+
zap.Uint64("restored_ts", info.RestoredTS))
585+
return info.RestoredTS, nil
586+
}
587+
588+
conflictingTaskID := rows[0].GetUint64(0)
589+
existingRestoredTS := rows[0].GetUint64(1)
590+
existingStatus := rows[0].GetString(2)
591+
592+
log.Info("found existing task with different restoredTS",
593+
zap.Uint64("existing_task_id", conflictingTaskID),
594+
zap.Uint64("existing_restored_ts", existingRestoredTS),
595+
zap.String("existing_status", existingStatus),
596+
zap.Uint64("current_restored_ts", info.RestoredTS),
597+
zap.Strings("filters", info.FilterStrings),
598+
zap.Uint64("start_ts", info.StartTS))
599+
600+
// if existing task is paused, reuse its restoredTS
601+
if existingStatus == string(TaskStatusPaused) {
602+
log.Info("existing task is paused, reusing its restoredTS",
603+
zap.Uint64("existing_task_id", conflictingTaskID),
604+
zap.Uint64("existing_restored_ts", existingRestoredTS))
605+
return existingRestoredTS, nil
606+
}
607+
608+
// if existing task is running, check if it's stale
609+
if existingStatus == string(TaskStatusRunning) {
610+
log.Info("existing task is running, checking if it's stale",
611+
zap.Uint64("existing_task_id", conflictingTaskID))
612+
613+
// First, get the current heartbeat time for atomic transition
614+
selectHeartbeatSQL := fmt.Sprintf(selectTaskHeartbeatSQLTemplate, RestoreRegistryDBName, RestoreRegistryTableName)
615+
heartbeatRows, _, err := execCtx.ExecRestrictedSQL(ctx, nil, selectHeartbeatSQL, conflictingTaskID)
616+
if err != nil {
617+
log.Warn("failed to get task heartbeat, using current restoredTS",
618+
zap.Uint64("task_id", conflictingTaskID),
619+
zap.Error(err))
620+
return info.RestoredTS, nil
621+
}
622+
623+
if len(heartbeatRows) == 0 {
624+
log.Info("task not found during heartbeat check, using current restoredTS",
625+
zap.Uint64("task_id", conflictingTaskID))
626+
return info.RestoredTS, nil
627+
}
628+
629+
currentHeartbeatTime := heartbeatRows[0].GetTime(0).String()
630+
631+
isStale, err := r.isTaskStale(ctx, conflictingTaskID)
632+
if err != nil {
633+
log.Warn("failed to check if task is stale, using current restoredTS",
634+
zap.Uint64("task_id", conflictingTaskID),
635+
zap.Error(err))
636+
return info.RestoredTS, nil
637+
}
638+
639+
if isStale {
640+
log.Info("existing running task is stale, attempting to transition to paused",
641+
zap.Uint64("existing_task_id", conflictingTaskID),
642+
zap.Uint64("existing_restored_ts", existingRestoredTS))
643+
644+
// atomically transition the stale task to paused state
645+
transitioned, transitionErr := r.transitionStaleTaskToPaused(ctx, conflictingTaskID, currentHeartbeatTime)
646+
if transitionErr != nil {
647+
log.Warn("failed to transition stale task to paused, using current restoredTS",
648+
zap.Uint64("task_id", conflictingTaskID),
649+
zap.Error(transitionErr))
650+
return info.RestoredTS, nil
651+
}
652+
653+
if transitioned {
654+
log.Info("successfully transitioned stale task to paused, will reuse its restoredTS",
655+
zap.Uint64("existing_task_id", conflictingTaskID),
656+
zap.Uint64("existing_restored_ts", existingRestoredTS))
657+
return existingRestoredTS, nil
658+
}
659+
log.Info("task was not transitioned (concurrent update), using current restoredTS",
660+
zap.Uint64("existing_task_id", conflictingTaskID))
661+
return info.RestoredTS, nil
662+
}
663+
664+
log.Info("existing running task is active, using current restoredTS",
665+
zap.Uint64("existing_task_id", conflictingTaskID))
666+
return info.RestoredTS, nil
667+
}
668+
669+
// existing task is in unexpected state, use current restoredTS
670+
log.Warn("existing task is in unexpected state, using current restoredTS",
671+
zap.Uint64("existing_task_id", conflictingTaskID),
672+
zap.String("status", existingStatus))
673+
return info.RestoredTS, nil
674+
}
675+
676+
// isTaskStale checks if a running task is stale by waiting up to 5 minutes and checking if heartbeat updates
677+
func (r *Registry) isTaskStale(ctx context.Context, taskID uint64) (bool, error) {
678+
execCtx := r.se.GetSessionCtx().GetRestrictedSQLExecutor()
679+
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR)
680+
681+
// get initial heartbeat time
682+
selectHeartbeatSQL := fmt.Sprintf(selectTaskHeartbeatSQLTemplate, RestoreRegistryDBName, RestoreRegistryTableName)
683+
initialRows, _, err := execCtx.ExecRestrictedSQL(ctx, nil, selectHeartbeatSQL, taskID)
684+
if err != nil {
685+
return false, errors.Annotate(err, "failed to get initial heartbeat time")
686+
}
687+
688+
if len(initialRows) == 0 {
689+
return false, nil // task not found (might have been deleted), proceed with user's restoredTS
690+
}
691+
692+
initialHeartbeatTime := initialRows[0].GetTime(0).String()
693+
694+
log.Info("checking if task is stale, will check heartbeat every minute up to 5 minutes",
695+
zap.Uint64("task_id", taskID),
696+
zap.String("initial_heartbeat", initialHeartbeatTime))
697+
698+
// check heartbeat every minute for up to 5 minutes
699+
ticker := time.NewTicker(time.Minute)
700+
defer ticker.Stop()
701+
702+
remainingMinutes := StaleTaskThresholdMinutes
703+
for remainingMinutes > 0 {
704+
select {
705+
case <-ctx.Done():
706+
return false, ctx.Err()
707+
case <-ticker.C:
708+
remainingMinutes--
709+
710+
// check heartbeat time at each tick
711+
currentRows, _, err := execCtx.ExecRestrictedSQL(ctx, nil, selectHeartbeatSQL, taskID)
712+
if err != nil {
713+
log.Warn("failed to check heartbeat during stale check, assuming task is active",
714+
zap.Uint64("task_id", taskID),
715+
zap.Error(err))
716+
return false, nil
717+
}
718+
719+
if len(currentRows) == 0 {
720+
return false, nil // task not found (might have been deleted), proceed with user's restoredTS
721+
}
722+
723+
currentHeartbeatTime := currentRows[0].GetTime(0).String()
724+
725+
// if heartbeat changed, task is active - exit early
726+
if currentHeartbeatTime != initialHeartbeatTime {
727+
log.Info("task heartbeat updated, task is active",
728+
zap.Uint64("task_id", taskID),
729+
zap.String("initial_heartbeat", initialHeartbeatTime),
730+
zap.String("current_heartbeat", currentHeartbeatTime),
731+
zap.Int("minutes_waited", StaleTaskThresholdMinutes-remainingMinutes))
732+
return false, nil
733+
}
734+
735+
if remainingMinutes > 0 {
736+
log.Info("task heartbeat unchanged, continuing to wait",
737+
zap.Int("remaining_minutes", remainingMinutes),
738+
zap.Uint64("task_id", taskID))
739+
}
740+
}
741+
}
742+
743+
// if we get here, heartbeat hasn't changed for 5 minutes - task is stale
744+
log.Info("task heartbeat unchanged for 5 minutes, task is stale",
745+
zap.Uint64("task_id", taskID),
746+
zap.String("initial_heartbeat", initialHeartbeatTime))
747+
748+
return true, nil
749+
}
750+
751+
// transitionStaleTaskToPaused atomically transitions a stale running task to paused state
752+
// if the heartbeat timestamp hasn't changed. Returns whether the transition was successful.
753+
func (r *Registry) transitionStaleTaskToPaused(ctx context.Context, taskID uint64,
754+
expectedHeartbeatTime string) (bool, error) {
755+
log.Info("attempting to transition stale task to paused state",
756+
zap.Uint64("task_id", taskID),
757+
zap.String("expected_heartbeat", expectedHeartbeatTime))
758+
759+
var transitioned bool
760+
err := r.executeInTransaction(ctx, func(ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor,
761+
sessionOpts []sqlexec.OptionFuncAlias) error {
762+
// atomically update task to paused only if it's still running with the same heartbeat time
763+
updateSQL := fmt.Sprintf(transitionStaleTaskToPausedSQLTemplate, RestoreRegistryDBName, RestoreRegistryTableName)
764+
765+
// We need to parse the heartbeat time string back to time.Time for the SQL query
766+
// The expectedHeartbeatTime comes from MySQL's time format
767+
expectedTime, parseErr := time.Parse("2006-01-02 15:04:05", expectedHeartbeatTime)
768+
if parseErr != nil {
769+
return errors.Annotatef(parseErr, "failed to parse expected heartbeat time: %s", expectedHeartbeatTime)
770+
}
771+
772+
_, _, updateErr := execCtx.ExecRestrictedSQL(ctx, sessionOpts, updateSQL, taskID, expectedTime)
773+
if updateErr != nil {
774+
return errors.Annotate(updateErr, "failed to transition stale task to paused")
775+
}
776+
777+
// Check if the task was actually transitioned by querying its current status
778+
checkTaskSQL := fmt.Sprintf(
779+
"SELECT status FROM %s.%s WHERE id = %%?", RestoreRegistryDBName, RestoreRegistryTableName)
780+
var statusRows []chunk.Row
781+
var checkErr error
782+
statusRows, _, checkErr = execCtx.ExecRestrictedSQL(ctx, sessionOpts, checkTaskSQL, taskID)
783+
if checkErr != nil {
784+
return errors.Annotate(checkErr, "failed to check task status after transition attempt")
785+
}
786+
787+
if len(statusRows) > 0 && statusRows[0].GetString(0) == string(TaskStatusPaused) {
788+
transitioned = true
789+
log.Info("successfully transitioned stale task to paused state",
790+
zap.Uint64("task_id", taskID))
791+
} else {
792+
log.Info("task was not transitioned (either already changed state or heartbeat was updated)",
793+
zap.Uint64("task_id", taskID))
794+
}
795+
796+
return nil
797+
})
798+
799+
if err != nil {
800+
return false, err
801+
}
802+
803+
return transitioned, nil
804+
}

0 commit comments

Comments
 (0)