Skip to content

Commit 2503d50

Browse files
authored
global sort: add metrics for merge sort stage (#60971)
close #61025
1 parent f3f85fb commit 2503d50

File tree

4 files changed

+28
-0
lines changed

4 files changed

+28
-0
lines changed

pkg/lightning/backend/external/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ go_library(
4545
"@com_github_jfcg_sorty_v2//:sorty",
4646
"@com_github_pingcap_errors//:errors",
4747
"@com_github_pingcap_failpoint//:failpoint",
48+
"@com_github_prometheus_client_golang//prometheus",
4849
"@com_github_stretchr_testify//require",
4950
"@com_github_tikv_client_go_v2//tikv",
5051
"@org_golang_x_sync//errgroup",

pkg/lightning/backend/external/byte_reader.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
goerrors "errors"
2020
"fmt"
2121
"io"
22+
"time"
2223

2324
"github.com/aws/aws-sdk-go/aws"
2425
"github.com/pingcap/errors"
@@ -27,6 +28,7 @@ import (
2728
"github.com/pingcap/tidb/pkg/lightning/membuf"
2829
"github.com/pingcap/tidb/pkg/util/logutil"
2930
"github.com/pingcap/tidb/pkg/util/size"
31+
"github.com/prometheus/client_golang/prometheus"
3032
"go.uber.org/zap"
3133
)
3234

@@ -67,6 +69,9 @@ type byteReader struct {
6769
}
6870

6971
logger *zap.Logger
72+
// monitor the speed of reading from external storage
73+
readDurHist prometheus.Observer
74+
readRateHist prometheus.Observer
7075
}
7176

7277
func openStoreReaderAndSeek(
@@ -273,6 +278,18 @@ func (r *byteReader) next(n int) (int, [][]byte) {
273278
}
274279

275280
func (r *byteReader) reload() error {
281+
if r.readDurHist != nil && r.readRateHist != nil {
282+
startTime := time.Now()
283+
defer func() {
284+
readSecond := time.Since(startTime).Seconds()
285+
size := 0
286+
for _, b := range r.curBuf {
287+
size += len(b)
288+
}
289+
r.readDurHist.Observe(readSecond)
290+
r.readRateHist.Observe(float64(size) / 1024.0 / 1024.0 / readSecond)
291+
}()
292+
}
276293
to := r.concurrentReader.expected
277294
now := r.concurrentReader.now
278295
// in read only false -> true is possible

pkg/lightning/backend/external/iter.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/pingcap/errors"
2828
"github.com/pingcap/tidb/br/pkg/storage"
2929
"github.com/pingcap/tidb/pkg/lightning/membuf"
30+
"github.com/pingcap/tidb/pkg/metrics"
3031
"github.com/pingcap/tidb/pkg/util/logutil"
3132
"github.com/pingcap/tidb/pkg/util/size"
3233
"go.uber.org/zap"
@@ -534,6 +535,8 @@ func NewMergeKVIter(
534535
if err != nil {
535536
return nil, err
536537
}
538+
rd.byteReader.readDurHist = metrics.GlobalSortReadFromCloudStorageDuration.WithLabelValues("merge_sort_read")
539+
rd.byteReader.readRateHist = metrics.GlobalSortReadFromCloudStorageRate.WithLabelValues("merge_sort_read")
537540
rd.byteReader.enableConcurrentRead(
538541
exStorage,
539542
paths[i],

pkg/lightning/backend/external/onefile_writer.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ import (
1919
"encoding/binary"
2020
"path/filepath"
2121
"slices"
22+
"time"
2223

2324
"github.com/docker/go-units"
2425
"github.com/pingcap/errors"
2526
"github.com/pingcap/tidb/br/pkg/storage"
2627
"github.com/pingcap/tidb/pkg/ingestor/engineapi"
2728
tidbkv "github.com/pingcap/tidb/pkg/kv"
2829
"github.com/pingcap/tidb/pkg/lightning/membuf"
30+
"github.com/pingcap/tidb/pkg/metrics"
2931
"github.com/pingcap/tidb/pkg/util/intest"
3032
"github.com/pingcap/tidb/pkg/util/logutil"
3133
"go.uber.org/zap"
@@ -212,6 +214,7 @@ func (w *OneFileWriter) doWriteRow(ctx context.Context, idxKey, idxVal []byte) e
212214
return err
213215
}
214216
// 1. encode data and write to kvStore.
217+
writeStartTime := time.Now()
215218
keyLen := len(idxKey)
216219
length := len(idxKey) + len(idxVal) + lengthBytes*2
217220
buf, _ := w.kvBuffer.AllocBytesWithSliceLocation(length)
@@ -241,6 +244,10 @@ func (w *OneFileWriter) doWriteRow(ctx context.Context, idxKey, idxVal []byte) e
241244
}
242245
w.totalCnt += 1
243246
w.totalSize += uint64(keyLen + len(idxVal))
247+
writeDuration := time.Since(writeStartTime)
248+
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("merge_sort_write").Observe(writeDuration.Seconds())
249+
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("merge_sort_write").
250+
Observe(float64(length) / 1024.0 / 1024.0 / writeDuration.Seconds())
244251
return nil
245252
}
246253

0 commit comments

Comments
 (0)