Skip to content

Commit 1dcaa01

Browse files
authored
cordinator(ticdc): Fix Puller Resolved TS Lag Calculation and Deprecate current_ts Field in Stats (#11624) (#11644)
close #11561
1 parent 8958811 commit 1dcaa01

File tree

4 files changed

+17
-11
lines changed

4 files changed

+17
-11
lines changed

cdc/processor/processor.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -356,11 +356,9 @@ func (p *processor) getStatsFromSourceManagerAndSinkManager(
356356
span tablepb.Span, sinkStats sinkmanager.TableStats,
357357
) tablepb.Stats {
358358
pullerStats := p.sourceManager.r.GetTablePullerStats(span)
359-
now, _ := p.upstream.PDClock.CurrentTime()
360359

361360
stats := tablepb.Stats{
362361
RegionCount: pullerStats.RegionCount,
363-
CurrentTs: oracle.ComposeTS(oracle.GetPhysical(now), 0),
364362
BarrierTs: sinkStats.BarrierTs,
365363
StageCheckpoints: map[string]tablepb.Checkpoint{
366364
"puller-ingress": {

cdc/processor/tablepb/table.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ message Stats {
6565
// Number of captured regions.
6666
uint64 region_count = 1;
6767
// The current timestamp from the table's point of view.
68-
uint64 current_ts = 2 [(gogoproto.casttype) = "Ts"];
68+
uint64 current_ts = 2 [(gogoproto.casttype) = "Ts", deprecated = true]; // Deprecated: Do not use this field.
6969
// Checkponits at each stage.
7070
map<string, Checkpoint> stage_checkpoints = 3 [(gogoproto.nullable) = false];
7171
// The barrier timestamp of the table.

cdc/scheduler/internal/v3/coordinator.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,17 @@ func (c *coordinator) maybeCollectMetrics() {
452452
}
453453
c.lastCollectTime = now
454454

455+
pdTime := now
456+
// only nil in unit test
457+
if c.pdClock != nil {
458+
var err error
459+
pdTime, err = c.pdClock.CurrentTime()
460+
if err != nil {
461+
log.Warn("schedulerv3: failed to get pd time", zap.Error(err))
462+
}
463+
}
464+
455465
c.schedulerM.CollectMetrics()
456-
c.replicationM.CollectMetrics()
466+
c.replicationM.CollectMetrics(pdTime)
457467
c.captureM.CollectMetrics()
458468
}

cdc/scheduler/internal/v3/replication/replication_manager.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -769,7 +769,7 @@ func (r *Manager) logSlowTableInfo(currentPDTime time.Time) {
769769
}
770770

771771
// CollectMetrics collects metrics.
772-
func (r *Manager) CollectMetrics() {
772+
func (r *Manager) CollectMetrics(currentPDTime time.Time) {
773773
cf := r.changefeedID
774774
tableGauge.
775775
WithLabelValues(cf.Namespace, cf.ID).Set(float64(r.spans.Len()))
@@ -786,13 +786,12 @@ func (r *Manager) CollectMetrics() {
786786
WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyRTs))
787787

788788
// Slow table latency metrics.
789-
phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs)
790789
for stage, checkpoint := range table.Stats.StageCheckpoints {
791790
// Checkpoint ts
792791
phyCkpTs := oracle.ExtractPhysical(checkpoint.CheckpointTs)
793792
slowestTableStageCheckpointTsGaugeVec.
794793
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyCkpTs))
795-
checkpointLag := float64(phyCurrentTs-phyCkpTs) / 1e3
794+
checkpointLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.CheckpointTs)).Seconds()
796795
slowestTableStageCheckpointTsLagGaugeVec.
797796
WithLabelValues(cf.Namespace, cf.ID, stage).Set(checkpointLag)
798797
slowestTableStageCheckpointTsLagHistogramVec.
@@ -801,7 +800,7 @@ func (r *Manager) CollectMetrics() {
801800
phyRTs := oracle.ExtractPhysical(checkpoint.ResolvedTs)
802801
slowestTableStageResolvedTsGaugeVec.
803802
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyRTs))
804-
resolvedTsLag := float64(phyCurrentTs-phyRTs) / 1e3
803+
resolvedTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.ResolvedTs)).Seconds()
805804
slowestTableStageResolvedTsLagGaugeVec.
806805
WithLabelValues(cf.Namespace, cf.ID, stage).Set(resolvedTsLag)
807806
slowestTableStageResolvedTsLagHistogramVec.
@@ -812,7 +811,7 @@ func (r *Manager) CollectMetrics() {
812811
phyBTs := oracle.ExtractPhysical(table.Stats.BarrierTs)
813812
slowestTableStageResolvedTsGaugeVec.
814813
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyBTs))
815-
barrierTsLag := float64(phyCurrentTs-phyBTs) / 1e3
814+
barrierTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(table.Stats.BarrierTs)).Seconds()
816815
slowestTableStageResolvedTsLagGaugeVec.
817816
WithLabelValues(cf.Namespace, cf.ID, stage).Set(barrierTsLag)
818817
slowestTableStageResolvedTsLagHistogramVec.
@@ -863,8 +862,7 @@ func (r *Manager) CollectMetrics() {
863862
phyCkptTs := oracle.ExtractPhysical(pullerCkpt.ResolvedTs)
864863
slowestTablePullerResolvedTs.WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyCkptTs))
865864

866-
phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs)
867-
lag := float64(phyCurrentTs-phyCkptTs) / 1e3
865+
lag := currentPDTime.Sub(oracle.GetTimeFromTS(pullerCkpt.ResolvedTs)).Seconds()
868866
slowestTablePullerResolvedTsLag.WithLabelValues(cf.Namespace, cf.ID).Set(lag)
869867
}
870868
}

0 commit comments

Comments
 (0)