Skip to content

Commit dc2548a

Browse files
authored
ddl/ingest: record merge temp index rate and refine metrics (#62586) (#62711)
ref #61433
1 parent 6622eb0 commit dc2548a

File tree

6 files changed

+70
-101
lines changed

6 files changed

+70
-101
lines changed

pkg/ddl/index_merge_tmp.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,7 @@ func (w *mergeIndexWorker) BackfillData(ctx context.Context, taskRange reorgBack
268268
break
269269
}
270270

271-
metrics.DDLSetTempIndexScan(w.table.Meta().ID, uint64(taskCtx.scanCount))
272-
metrics.DDLSetTempIndexMerge(w.table.Meta().ID, uint64(taskCtx.addedCount))
271+
metrics.DDLSetTempIndexScanAndMerge(w.table.Meta().ID, uint64(taskCtx.scanCount), uint64(taskCtx.addedCount))
273272
failpoint.Inject("mockDMLExecutionMerging", func(val failpoint.Value) {
274273
//nolint:forcetypeassert
275274
if val.(bool) && MockDMLExecutionMerging != nil {
@@ -280,7 +279,8 @@ func (w *mergeIndexWorker) BackfillData(ctx context.Context, taskRange reorgBack
280279
return
281280
}
282281

283-
func (*mergeIndexWorker) AddMetricInfo(float64) {
282+
func (w *mergeIndexWorker) AddMetricInfo(cnt float64) {
283+
w.metricCounter.Add(cnt)
284284
}
285285

286286
func (*mergeIndexWorker) String() string {

pkg/ddl/ingest/collector.go

Lines changed: 50 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -42,23 +42,18 @@ func init() {
4242
return true
4343
})
4444
}
45-
metrics.DDLSetTempIndexWrite = func(connID uint64, tableID int64, opCount uint64, doubleWrite bool) {
45+
metrics.DDLAddOneTempIndexWrite = func(connID uint64, tableID int64, doubleWrite bool) {
4646
c, _ := coll.write.LoadOrStore(connID, &connIDCollector{
4747
tblID2Count: sync.Map{},
4848
})
4949
//nolint:forcetypeassert
50-
tc, _ := c.(*connIDCollector).tblID2Count.LoadOrStore(tableID, &tableCollector{
51-
singleWriteCnt: &atomic.Uint64{},
52-
doubleWriteCnt: &atomic.Uint64{},
53-
totalSingleWriteCnt: &atomic.Uint64{},
54-
totalDoubleWriteCnt: &atomic.Uint64{},
55-
})
50+
tc, _ := c.(*connIDCollector).tblID2Count.LoadOrStore(tableID, &tableCollector{})
5651
if doubleWrite {
5752
//nolint:forcetypeassert
58-
tc.(*tableCollector).doubleWriteCnt.Add(opCount)
53+
tc.(*tableCollector).doubleWriteCnt.Add(1)
5954
} else {
6055
//nolint:forcetypeassert
61-
tc.(*tableCollector).singleWriteCnt.Add(opCount)
56+
tc.(*tableCollector).singleWriteCnt.Add(1)
6257
}
6358
}
6459
metrics.DDLRollbackTempIndexWrite = func(connID uint64) {
@@ -83,77 +78,65 @@ func init() {
8378
connIDCollector.tblID2Count.Delete(tblID)
8479
return true
8580
})
86-
coll.merge.Delete(tblID)
87-
coll.scan.Delete(tblID)
81+
coll.read.Delete(tblID)
8882
}
89-
metrics.DDLSetTempIndexScan = func(tableID int64, opCount uint64) {
90-
c, _ := coll.scan.LoadOrStore(tableID, &atomic.Uint64{})
91-
//nolint:forcetypeassert
92-
c.(*atomic.Uint64).Add(opCount)
83+
metrics.DDLClearTempIndexWrite = func(connID uint64) {
84+
coll.write.Delete(connID)
9385
}
94-
metrics.DDLSetTempIndexMerge = func(tableID int64, opCount uint64) {
95-
c, _ := coll.merge.LoadOrStore(tableID, &atomic.Uint64{})
86+
87+
metrics.DDLSetTempIndexScanAndMerge = func(tableID int64, scanCnt, mergeCnt uint64) {
88+
c, _ := coll.read.LoadOrStore(tableID, &mergeAndScan{})
9689
//nolint:forcetypeassert
97-
c.(*atomic.Uint64).Add(opCount)
90+
c.(*mergeAndScan).scan.Add(scanCnt)
91+
//nolint:forcetypeassert
92+
c.(*mergeAndScan).merge.Add(mergeCnt)
9893
}
9994
}
10095

96+
const (
97+
labelSingleWrite = "single_write"
98+
labelDoubleWrite = "double_write"
99+
labelMerge = "merge"
100+
labelScan = "scan"
101+
)
102+
101103
type collector struct {
102104
write sync.Map // connectionID => connIDCollector
103-
merge sync.Map // tableID => atomic.Uint64
104-
scan sync.Map // tableID => atomic.Uint64
105+
read sync.Map // tableID => mergeAndScan
105106

106-
singleWriteDesc *prometheus.Desc
107-
doubleWriteDesc *prometheus.Desc
108-
mergeDesc *prometheus.Desc
109-
scanDesc *prometheus.Desc
107+
desc *prometheus.Desc
108+
}
109+
110+
type mergeAndScan struct {
111+
merge atomic.Uint64
112+
scan atomic.Uint64
110113
}
111114

112115
type connIDCollector struct {
113116
tblID2Count sync.Map // tableID => tableCollector
114117
}
115-
116118
type tableCollector struct {
117-
singleWriteCnt *atomic.Uint64
118-
doubleWriteCnt *atomic.Uint64
119+
singleWriteCnt atomic.Uint64
120+
doubleWriteCnt atomic.Uint64
119121

120-
totalSingleWriteCnt *atomic.Uint64
121-
totalDoubleWriteCnt *atomic.Uint64
122+
totalSingleWriteCnt atomic.Uint64
123+
totalDoubleWriteCnt atomic.Uint64
122124
}
123125

124126
func newCollector() *collector {
125127
return &collector{
126128
write: sync.Map{},
127-
merge: sync.Map{},
128-
scan: sync.Map{},
129-
singleWriteDesc: prometheus.NewDesc(
130-
"tidb_ddl_temp_index_write",
131-
"Gauge of temp index write times",
132-
[]string{"table_id"}, nil,
133-
),
134-
doubleWriteDesc: prometheus.NewDesc(
135-
"tidb_ddl_temp_index_double_write",
136-
"Gauge of temp index double write times",
137-
[]string{"table_id"}, nil,
138-
),
139-
mergeDesc: prometheus.NewDesc(
140-
"tidb_ddl_temp_index_merge",
141-
"Gauge of temp index merge times.",
142-
[]string{"table_id"}, nil,
143-
),
144-
scanDesc: prometheus.NewDesc(
145-
"tidb_ddl_temp_index_scan",
146-
"Gauge of temp index scan times.",
147-
[]string{"table_id"}, nil,
129+
read: sync.Map{},
130+
desc: prometheus.NewDesc(
131+
"tidb_ddl_temp_index_op_count",
132+
"Gauge of temp index operation count",
133+
[]string{"type", "table_id"}, nil,
148134
),
149135
}
150136
}
151137

152138
func (c *collector) Describe(ch chan<- *prometheus.Desc) {
153-
ch <- c.singleWriteDesc
154-
ch <- c.doubleWriteDesc
155-
ch <- c.mergeDesc
156-
ch <- c.scanDesc
139+
ch <- c.desc
157140
}
158141

159142
func (c *collector) Collect(ch chan<- prometheus.Metric) {
@@ -167,71 +150,56 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
167150
tableID := tableKey.(int64)
168151
//nolint:forcetypeassert
169152
tblColl := tableValue.(*tableCollector)
170-
if _, exists := singleMap[tableID]; !exists {
171-
singleMap[tableID] = 0
172-
}
173153
singleMap[tableID] += tblColl.totalSingleWriteCnt.Load()
174-
if _, exists := doubleMap[tableID]; !exists {
175-
doubleMap[tableID] = 0
176-
}
177154
doubleMap[tableID] += tblColl.totalDoubleWriteCnt.Load()
178155
return true
179156
})
180157
return true
181158
})
182159
for tableID, cnt := range singleMap {
183160
ch <- prometheus.MustNewConstMetric(
184-
c.singleWriteDesc,
161+
c.desc,
185162
prometheus.GaugeValue,
186163
float64(cnt),
164+
labelSingleWrite,
187165
strconv.FormatInt(tableID, 10),
188166
)
189167
}
190168
for tableID, cnt := range doubleMap {
191169
ch <- prometheus.MustNewConstMetric(
192-
c.doubleWriteDesc,
170+
c.desc,
193171
prometheus.GaugeValue,
194172
float64(cnt),
173+
labelDoubleWrite,
195174
strconv.FormatInt(tableID, 10),
196175
)
197176
}
198177
mergeMap := make(map[int64]uint64)
199-
c.merge.Range(func(key, value any) bool {
178+
scanMap := make(map[int64]uint64)
179+
c.read.Range(func(key, value any) bool {
200180
//nolint:forcetypeassert
201181
tableID := key.(int64)
202182
//nolint:forcetypeassert
203-
opCount := value.(*atomic.Uint64).Load()
204-
if _, exists := mergeMap[tableID]; !exists {
205-
mergeMap[tableID] = 0
206-
}
207-
mergeMap[tableID] += opCount
183+
ms := value.(*mergeAndScan)
184+
mergeMap[tableID] += ms.merge.Load()
185+
scanMap[tableID] += ms.scan.Load()
208186
return true
209187
})
210188
for tableID, cnt := range mergeMap {
211189
ch <- prometheus.MustNewConstMetric(
212-
c.mergeDesc,
190+
c.desc,
213191
prometheus.GaugeValue,
214192
float64(cnt),
193+
labelMerge,
215194
strconv.FormatInt(tableID, 10),
216195
)
217196
}
218-
scanMap := make(map[int64]uint64)
219-
c.scan.Range(func(key, value any) bool {
220-
//nolint:forcetypeassert
221-
tableID := key.(int64)
222-
//nolint:forcetypeassert
223-
opCount := value.(*atomic.Uint64).Load()
224-
if _, exists := scanMap[tableID]; !exists {
225-
scanMap[tableID] = 0
226-
}
227-
scanMap[tableID] += opCount
228-
return true
229-
})
230197
for tableID, cnt := range scanMap {
231198
ch <- prometheus.MustNewConstMetric(
232-
c.scanDesc,
199+
c.desc,
233200
prometheus.GaugeValue,
234201
float64(cnt),
202+
labelScan,
235203
strconv.FormatInt(tableID, 10),
236204
)
237205
}

pkg/metrics/ddl.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -187,18 +187,18 @@ func InitDDLMetrics() {
187187
}
188188

189189
var (
190-
// DDLSetTempIndexWrite records the number of writes to a temporary index.
191-
DDLSetTempIndexWrite = func(connID uint64, tableID int64, opCount uint64, doubleWrite bool) {}
190+
// DDLAddOneTempIndexWrite records the number of writes to a temporary index.
191+
DDLAddOneTempIndexWrite = func(connID uint64, tableID int64, doubleWrite bool) {}
192192
// DDLCommitTempIndexWrite commits the writes to a temporary index.
193193
DDLCommitTempIndexWrite = func(connID uint64) {}
194194
// DDLRollbackTempIndexWrite rolls back the writes to a temporary index.
195195
DDLRollbackTempIndexWrite = func(connID uint64) {}
196196
// DDLResetTempIndexWrite resets the write count for a temporary index.
197197
DDLResetTempIndexWrite = func(tblID int64) {}
198-
// DDLSetTempIndexScan sets the scan count for a temporary index.
199-
DDLSetTempIndexScan = func(tableID int64, opCount uint64) {}
200-
// DDLSetTempIndexMerge sets the merge count for a temporary index.
201-
DDLSetTempIndexMerge = func(tableID int64, opCount uint64) {}
198+
// DDLClearTempIndexWrite clears the write count for a temporary index.
199+
DDLClearTempIndexWrite = func(connID uint64) {}
200+
// DDLSetTempIndexScanAndMerge sets the scan count and merge count for a temporary index.
201+
DDLSetTempIndexScanAndMerge = func(tableID int64, scanCnt, mergeCnt uint64) {}
202202
)
203203

204204
// Label constants.

pkg/metrics/grafana/tidb.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15382,26 +15382,26 @@
1538215382
"refId": "B"
1538315383
},
1538415384
{
15385-
"expr": "sum by (table_id) (tidb_ddl_temp_index_write{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"})",
15385+
"expr": "sum by (table_id) (tidb_ddl_temp_index_op_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", type=\"single_write\"})",
1538615386
"instant": false,
1538715387
"legendFormat": "write-table_id_{{table_id}}",
1538815388
"range": true,
1538915389
"refId": "C"
1539015390
},
1539115391
{
15392-
"expr": "tidb_ddl_temp_index_merge{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}",
15392+
"expr": "tidb_ddl_temp_index_op_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", type=\"merge\"}",
1539315393
"legendFormat": "{{instance}}-merged-table_id_{{table_id}}",
1539415394
"range": true,
1539515395
"refId": "D"
1539615396
},
1539715397
{
15398-
"expr": "tidb_ddl_temp_index_scan{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}",
15398+
"expr": "tidb_ddl_temp_index_op_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", type=\"scan\"}",
1539915399
"legendFormat": "{{instance}}-scanned-table_id_{{table_id}}",
1540015400
"range": true,
1540115401
"refId": "E"
1540215402
},
1540315403
{
15404-
"expr": "sum by (table_id) (tidb_ddl_temp_index_double_write{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"})",
15404+
"expr": "sum by (table_id) (tidb_ddl_temp_index_op_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", type=\"double_write\"})",
1540515405
"legendFormat": "double-write-table_id_{{table_id}}",
1540615406
"range": true,
1540715407
"refId": "F"

pkg/server/conn.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,7 @@ func (cc *clientConn) Close() error {
378378
cc.server.rwlock.Lock()
379379
delete(cc.server.clients, cc.connectionID)
380380
cc.server.rwlock.Unlock()
381+
metrics.DDLClearTempIndexWrite(cc.connectionID)
381382
return closeConn(cc)
382383
}
383384

pkg/table/tables/index.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
281281
return nil, err
282282
}
283283
if keyIsTempIdxKey {
284-
metrics.DDLSetTempIndexWrite(sctx.ConnectionID(), c.tblInfo.ID, 1, false)
284+
metrics.DDLAddOneTempIndexWrite(sctx.ConnectionID(), c.tblInfo.ID, false)
285285
}
286286
if len(tempKey) > 0 {
287287
tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct}
@@ -290,7 +290,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
290290
if err != nil {
291291
return nil, err
292292
}
293-
metrics.DDLSetTempIndexWrite(sctx.ConnectionID(), c.tblInfo.ID, 1, true)
293+
metrics.DDLAddOneTempIndexWrite(sctx.ConnectionID(), c.tblInfo.ID, true)
294294
}
295295
if !ignoreAssertion && !untouched {
296296
if opt.DupKeyCheck() == table.DupKeyCheckLazy && !txn.IsPessimistic() {
@@ -379,7 +379,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
379379
return nil, err
380380
}
381381
if keyIsTempIdxKey {
382-
metrics.DDLSetTempIndexWrite(sctx.ConnectionID(), c.tblInfo.ID, 1, false)
382+
metrics.DDLAddOneTempIndexWrite(sctx.ConnectionID(), c.tblInfo.ID, false)
383383
}
384384
if len(tempKey) > 0 {
385385
tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: true}
@@ -388,7 +388,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
388388
if err != nil {
389389
return nil, err
390390
}
391-
metrics.DDLSetTempIndexWrite(sctx.ConnectionID(), c.tblInfo.ID, 1, true)
391+
metrics.DDLAddOneTempIndexWrite(sctx.ConnectionID(), c.tblInfo.ID, true)
392392
}
393393
} else if lazyCheck {
394394
flags := []kv.FlagsOp{kv.SetPresumeKeyNotExists}
@@ -490,7 +490,7 @@ func (c *index) Delete(ctx table.MutateContext, txn kv.Transaction, indexedValue
490490
if err != nil {
491491
return err
492492
}
493-
metrics.DDLSetTempIndexWrite(ctx.ConnectionID(), c.tblInfo.ID, 1, doubleWrite)
493+
metrics.DDLAddOneTempIndexWrite(ctx.ConnectionID(), c.tblInfo.ID, doubleWrite)
494494
}
495495
} else {
496496
if len(key) > 0 {
@@ -513,7 +513,7 @@ func (c *index) Delete(ctx table.MutateContext, txn kv.Transaction, indexedValue
513513
if err != nil {
514514
return err
515515
}
516-
metrics.DDLSetTempIndexWrite(ctx.ConnectionID(), c.tblInfo.ID, 1, doubleWrite)
516+
metrics.DDLAddOneTempIndexWrite(ctx.ConnectionID(), c.tblInfo.ID, doubleWrite)
517517
}
518518
}
519519
if c.idxInfo.State == model.StatePublic {

0 commit comments

Comments
 (0)