Skip to content

Commit 3f07a1d

Browse files
wlwilliamxti-chi-bot
authored andcommitted
cordinator(ticdc): Fix Puller Resolved TS Lag Calculation and Deprecate current_ts Field in Stats (pingcap#11624)
close pingcap#11561
1 parent e2b6bb1 commit 3f07a1d

File tree

4 files changed

+13
-11
lines changed

4 files changed

+13
-11
lines changed

cdc/processor/processor.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,11 +378,9 @@ func (p *processor) getStatsFromSourceManagerAndSinkManager(
378378
span tablepb.Span, sinkStats sinkmanager.TableStats,
379379
) tablepb.Stats {
380380
pullerStats := p.sourceManager.r.GetTablePullerStats(span)
381-
now := p.upstream.PDClock.CurrentTime()
382381

383382
stats := tablepb.Stats{
384383
RegionCount: pullerStats.RegionCount,
385-
CurrentTs: oracle.ComposeTS(oracle.GetPhysical(now), 0),
386384
BarrierTs: sinkStats.BarrierTs,
387385
StageCheckpoints: map[string]tablepb.Checkpoint{
388386
"puller-ingress": {

cdc/processor/tablepb/table.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ message Stats {
6565
// Number of captured regions.
6666
uint64 region_count = 1;
6767
// The current timestamp from the table's point of view.
68-
uint64 current_ts = 2 [(gogoproto.casttype) = "Ts"];
68+
uint64 current_ts = 2 [(gogoproto.casttype) = "Ts", deprecated = true]; // Deprecated: Do not use this field.
6969
// Checkponits at each stage.
7070
map<string, Checkpoint> stage_checkpoints = 3 [(gogoproto.nullable) = false];
7171
// The barrier timestamp of the table.

cdc/scheduler/internal/v3/coordinator.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,13 @@ func (c *coordinator) maybeCollectMetrics() {
446446
}
447447
c.lastCollectTime = now
448448

449+
pdTime := now
450+
// only nil in unit test
451+
if c.pdClock != nil {
452+
pdTime = c.pdClock.CurrentTime()
453+
}
454+
449455
c.schedulerM.CollectMetrics()
450-
c.replicationM.CollectMetrics()
456+
c.replicationM.CollectMetrics(pdTime)
451457
c.captureM.CollectMetrics()
452458
}

cdc/scheduler/internal/v3/replication/replication_manager.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -769,7 +769,7 @@ func (r *Manager) logSlowTableInfo(currentPDTime time.Time) {
769769
}
770770

771771
// CollectMetrics collects metrics.
772-
func (r *Manager) CollectMetrics() {
772+
func (r *Manager) CollectMetrics(currentPDTime time.Time) {
773773
cf := r.changefeedID
774774
tableGauge.
775775
WithLabelValues(cf.Namespace, cf.ID).Set(float64(r.spans.Len()))
@@ -786,13 +786,12 @@ func (r *Manager) CollectMetrics() {
786786
WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyRTs))
787787

788788
// Slow table latency metrics.
789-
phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs)
790789
for stage, checkpoint := range table.Stats.StageCheckpoints {
791790
// Checkpoint ts
792791
phyCkpTs := oracle.ExtractPhysical(checkpoint.CheckpointTs)
793792
slowestTableStageCheckpointTsGaugeVec.
794793
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyCkpTs))
795-
checkpointLag := float64(phyCurrentTs-phyCkpTs) / 1e3
794+
checkpointLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.CheckpointTs)).Seconds()
796795
slowestTableStageCheckpointTsLagGaugeVec.
797796
WithLabelValues(cf.Namespace, cf.ID, stage).Set(checkpointLag)
798797
slowestTableStageCheckpointTsLagHistogramVec.
@@ -801,7 +800,7 @@ func (r *Manager) CollectMetrics() {
801800
phyRTs := oracle.ExtractPhysical(checkpoint.ResolvedTs)
802801
slowestTableStageResolvedTsGaugeVec.
803802
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyRTs))
804-
resolvedTsLag := float64(phyCurrentTs-phyRTs) / 1e3
803+
resolvedTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.ResolvedTs)).Seconds()
805804
slowestTableStageResolvedTsLagGaugeVec.
806805
WithLabelValues(cf.Namespace, cf.ID, stage).Set(resolvedTsLag)
807806
slowestTableStageResolvedTsLagHistogramVec.
@@ -812,7 +811,7 @@ func (r *Manager) CollectMetrics() {
812811
phyBTs := oracle.ExtractPhysical(table.Stats.BarrierTs)
813812
slowestTableStageResolvedTsGaugeVec.
814813
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyBTs))
815-
barrierTsLag := float64(phyCurrentTs-phyBTs) / 1e3
814+
barrierTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(table.Stats.BarrierTs)).Seconds()
816815
slowestTableStageResolvedTsLagGaugeVec.
817816
WithLabelValues(cf.Namespace, cf.ID, stage).Set(barrierTsLag)
818817
slowestTableStageResolvedTsLagHistogramVec.
@@ -863,8 +862,7 @@ func (r *Manager) CollectMetrics() {
863862
phyCkptTs := oracle.ExtractPhysical(pullerCkpt.ResolvedTs)
864863
slowestTablePullerResolvedTs.WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyCkptTs))
865864

866-
phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs)
867-
lag := float64(phyCurrentTs-phyCkptTs) / 1e3
865+
lag := currentPDTime.Sub(oracle.GetTimeFromTS(pullerCkpt.ResolvedTs)).Seconds()
868866
slowestTablePullerResolvedTsLag.WithLabelValues(cf.Namespace, cf.ID).Set(lag)
869867
}
870868
}

0 commit comments

Comments
 (0)