Skip to content

Commit 95aa0ca

Browse files
authored
statistics: improve memory for mergeGlobalStatsTopNByConcurrency (#45993) (#46057)
close #45727
1 parent a0f1d7e commit 95aa0ca

File tree

3 files changed

+13
-30
lines changed

3 files changed

+13
-30
lines changed

statistics/cmsketch_bench_test.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,7 @@ func benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(partitions int, b *test
123123
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40})
124124
hists = append(hists, h)
125125
}
126-
wrapper := &statistics.StatsWrapper{
127-
AllTopN: topNs,
128-
AllHg: hists,
129-
}
126+
wrapper := statistics.NewStatsWrapper(hists, topNs)
130127
const mergeConcurrency = 4
131128
batchSize := len(wrapper.AllTopN) / mergeConcurrency
132129
if batchSize < 1 {

statistics/handle/handle.go

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package handle
1616

1717
import (
18-
"bytes"
1918
"context"
2019
"encoding/json"
2120
"fmt"
@@ -921,19 +920,15 @@ func MergeGlobalStatsTopNByConcurrency(mergeConcurrency, mergeBatchSize int, wra
921920

922921
// handle Error
923922
hasErr := false
923+
errMsg := make([]string, 0)
924924
for resp := range respCh {
925925
if resp.Err != nil {
926926
hasErr = true
927+
errMsg = append(errMsg, resp.Err.Error())
927928
}
928929
resps = append(resps, resp)
929930
}
930931
if hasErr {
931-
errMsg := make([]string, 0)
932-
for _, resp := range resps {
933-
if resp.Err != nil {
934-
errMsg = append(errMsg, resp.Err.Error())
935-
}
936-
}
937932
return nil, nil, nil, errors.New(strings.Join(errMsg, ","))
938933
}
939934

@@ -945,17 +940,6 @@ func MergeGlobalStatsTopNByConcurrency(mergeConcurrency, mergeBatchSize int, wra
945940
sorted = append(sorted, resp.TopN.TopN...)
946941
}
947942
leftTopn = append(leftTopn, resp.PopedTopn...)
948-
for i, removeTopn := range resp.RemoveVals {
949-
// Remove the value from the Hists.
950-
if len(removeTopn) > 0 {
951-
tmp := removeTopn
952-
slices.SortFunc(tmp, func(i, j statistics.TopNMeta) bool {
953-
cmpResult := bytes.Compare(i.Encoded, j.Encoded)
954-
return cmpResult < 0
955-
})
956-
wrapper.AllHg[i].RemoveVals(tmp)
957-
}
958-
}
959943
}
960944

961945
globalTopN, popedTopn := statistics.GetMergedTopNFromSortedSlice(sorted, n)

statistics/merge_worker.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package statistics
1616

1717
import (
18+
"sync"
1819
"sync/atomic"
1920
"time"
2021

@@ -44,6 +45,8 @@ type topnStatsMergeWorker struct {
4445
respCh chan<- *TopnStatsMergeResponse
4546
// the stats in the wrapper should only be read during the worker
4647
statsWrapper *StatsWrapper
48+
// shardMutex is used to protect `statsWrapper.AllHg`
49+
shardMutex []sync.Mutex
4750
}
4851

4952
// NewTopnStatsMergeWorker returns topn merge worker
@@ -57,6 +60,7 @@ func NewTopnStatsMergeWorker(
5760
respCh: respCh,
5861
}
5962
worker.statsWrapper = wrapper
63+
worker.shardMutex = make([]sync.Mutex, len(wrapper.AllHg))
6064
worker.killed = killed
6165
return worker
6266
}
@@ -77,10 +81,9 @@ func NewTopnStatsMergeTask(start, end int) *TopnStatsMergeTask {
7781

7882
// TopnStatsMergeResponse indicates topn merge worker response
7983
type TopnStatsMergeResponse struct {
80-
TopN *TopN
81-
PopedTopn []TopNMeta
82-
RemoveVals [][]TopNMeta
83-
Err error
84+
Err error
85+
TopN *TopN
86+
PopedTopn []TopNMeta
8487
}
8588

8689
// Run runs topn merge like statistics.MergePartTopN2GlobalTopN
@@ -99,7 +102,6 @@ func (worker *topnStatsMergeWorker) Run(timeZone *time.Location, isIndex bool,
99102
return
100103
}
101104
partNum := len(allTopNs)
102-
removeVals := make([][]TopNMeta, partNum)
103105
// Different TopN structures may hold the same value, we have to merge them.
104106
counter := make(map[hack.MutableString]float64)
105107
// datumMap is used to store the mapping from the string type to datum type.
@@ -168,13 +170,13 @@ func (worker *topnStatsMergeWorker) Run(timeZone *time.Location, isIndex bool,
168170
if count != 0 {
169171
counter[encodedVal] += count
170172
// Remove the value corresponding to encodedVal from the histogram.
171-
removeVals[j] = append(removeVals[j], TopNMeta{Encoded: datum.GetBytes(), Count: uint64(count)})
173+
worker.shardMutex[j].Lock()
174+
worker.statsWrapper.AllHg[j].BinarySearchRemoveVal(TopNMeta{Encoded: datum.GetBytes(), Count: uint64(count)})
175+
worker.shardMutex[j].Unlock()
172176
}
173177
}
174178
}
175179
}
176-
// record remove values
177-
resp.RemoveVals = removeVals
178180

179181
numTop := len(counter)
180182
if numTop == 0 {

0 commit comments

Comments
 (0)