Skip to content

Commit 57464d9

Browse files
authored
cordinator(ticdc): Fix Puller Resolved TS Lag Calculation and Deprecate current_ts Field in Stats (#11624) (#11646)
close #11561
1 parent c6bfe85 commit 57464d9

File tree

4 files changed

+13
-11
lines changed

4 files changed

+13
-11
lines changed

cdc/processor/processor.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -387,11 +387,9 @@ func (p *processor) getStatsFromSourceManagerAndSinkManager(
387387
span tablepb.Span, sinkStats sinkmanager.TableStats,
388388
) tablepb.Stats {
389389
pullerStats := p.sourceManager.r.GetTablePullerStats(span)
390-
now := p.upstream.PDClock.CurrentTime()
391390

392391
stats := tablepb.Stats{
393392
RegionCount: pullerStats.RegionCount,
394-
CurrentTs: oracle.ComposeTS(oracle.GetPhysical(now), 0),
395393
BarrierTs: sinkStats.BarrierTs,
396394
StageCheckpoints: map[string]tablepb.Checkpoint{
397395
"puller-ingress": {

cdc/processor/tablepb/table.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ message Stats {
6565
// Number of captured regions.
6666
uint64 region_count = 1;
6767
// The current timestamp from the table's point of view.
68-
uint64 current_ts = 2 [(gogoproto.casttype) = "Ts"];
68+
uint64 current_ts = 2 [(gogoproto.casttype) = "Ts", deprecated = true]; // Deprecated: Do not use this field.
6969
// Checkponits at each stage.
7070
map<string, Checkpoint> stage_checkpoints = 3 [(gogoproto.nullable) = false];
7171
// The barrier timestamp of the table.

cdc/scheduler/internal/v3/coordinator.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,13 @@ func (c *coordinator) maybeCollectMetrics() {
449449
}
450450
c.lastCollectTime = now
451451

452+
pdTime := now
453+
// only nil in unit test
454+
if c.pdClock != nil {
455+
pdTime = c.pdClock.CurrentTime()
456+
}
457+
452458
c.schedulerM.CollectMetrics()
453-
c.replicationM.CollectMetrics()
459+
c.replicationM.CollectMetrics(pdTime)
454460
c.captureM.CollectMetrics()
455461
}

cdc/scheduler/internal/v3/replication/replication_manager.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,7 @@ func (r *Manager) logSlowTableInfo(currentPDTime time.Time) {
773773
}
774774

775775
// CollectMetrics collects metrics.
776-
func (r *Manager) CollectMetrics() {
776+
func (r *Manager) CollectMetrics(currentPDTime time.Time) {
777777
cf := r.changefeedID
778778
tableGauge.
779779
WithLabelValues(cf.Namespace, cf.ID).Set(float64(r.spans.Len()))
@@ -790,13 +790,12 @@ func (r *Manager) CollectMetrics() {
790790
WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyRTs))
791791

792792
// Slow table latency metrics.
793-
phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs)
794793
for stage, checkpoint := range table.Stats.StageCheckpoints {
795794
// Checkpoint ts
796795
phyCkpTs := oracle.ExtractPhysical(checkpoint.CheckpointTs)
797796
slowestTableStageCheckpointTsGaugeVec.
798797
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyCkpTs))
799-
checkpointLag := float64(phyCurrentTs-phyCkpTs) / 1e3
798+
checkpointLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.CheckpointTs)).Seconds()
800799
slowestTableStageCheckpointTsLagGaugeVec.
801800
WithLabelValues(cf.Namespace, cf.ID, stage).Set(checkpointLag)
802801
slowestTableStageCheckpointTsLagHistogramVec.
@@ -805,7 +804,7 @@ func (r *Manager) CollectMetrics() {
805804
phyRTs := oracle.ExtractPhysical(checkpoint.ResolvedTs)
806805
slowestTableStageResolvedTsGaugeVec.
807806
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyRTs))
808-
resolvedTsLag := float64(phyCurrentTs-phyRTs) / 1e3
807+
resolvedTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.ResolvedTs)).Seconds()
809808
slowestTableStageResolvedTsLagGaugeVec.
810809
WithLabelValues(cf.Namespace, cf.ID, stage).Set(resolvedTsLag)
811810
slowestTableStageResolvedTsLagHistogramVec.
@@ -816,7 +815,7 @@ func (r *Manager) CollectMetrics() {
816815
phyBTs := oracle.ExtractPhysical(table.Stats.BarrierTs)
817816
slowestTableStageResolvedTsGaugeVec.
818817
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyBTs))
819-
barrierTsLag := float64(phyCurrentTs-phyBTs) / 1e3
818+
barrierTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(table.Stats.BarrierTs)).Seconds()
820819
slowestTableStageResolvedTsLagGaugeVec.
821820
WithLabelValues(cf.Namespace, cf.ID, stage).Set(barrierTsLag)
822821
slowestTableStageResolvedTsLagHistogramVec.
@@ -867,8 +866,7 @@ func (r *Manager) CollectMetrics() {
867866
phyCkptTs := oracle.ExtractPhysical(pullerCkpt.ResolvedTs)
868867
slowestTablePullerResolvedTs.WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyCkptTs))
869868

870-
phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs)
871-
lag := float64(phyCurrentTs-phyCkptTs) / 1e3
869+
lag := currentPDTime.Sub(oracle.GetTimeFromTS(pullerCkpt.ResolvedTs)).Seconds()
872870
slowestTablePullerResolvedTsLag.WithLabelValues(cf.Namespace, cf.ID).Set(lag)
873871
}
874872
}

0 commit comments

Comments
 (0)