Skip to content

Commit 98b6c43

Browse files
authored
statistics: move MergePartTopN2GlobalTopN into handle/globalstats (#47901)
ref #46905
1 parent c21a5cf commit 98b6c43

File tree

7 files changed

+203
-177
lines changed

7 files changed

+203
-177
lines changed

pkg/statistics/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ go_test(
7474
data = glob(["testdata/**"]),
7575
embed = [":statistics"],
7676
flaky = True,
77-
shard_count = 34,
77+
shard_count = 32,
7878
deps = [
7979
"//pkg/config",
8080
"//pkg/parser/ast",

pkg/statistics/cmsketch.go

Lines changed: 0 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ import (
2323
"slices"
2424
"sort"
2525
"strings"
26-
"sync/atomic"
27-
"time"
2826

2927
"github.com/pingcap/errors"
3028
"github.com/pingcap/failpoint"
@@ -795,84 +793,6 @@ func NewTopN(n int) *TopN {
795793
return &TopN{TopN: make([]TopNMeta, 0, n)}
796794
}
797795

798-
// MergePartTopN2GlobalTopN is used to merge the partition-level topN to global-level topN.
799-
// The input parameters:
800-
// 1. `topNs` are the partition-level topNs to be merged.
801-
// 2. `n` is the size of the global-level topN. Notice: This value can be 0 and has no default value, we must explicitly specify this value.
802-
// 3. `hists` are the partition-level histograms. Some values not in topN may be placed in the histogram. We need it here to make the value in the global-level TopN more accurate.
803-
//
804-
// The output parameters:
805-
// 1. `*TopN` is the final global-level topN.
806-
// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, but is not placed to global-level TopN. We should put them back to histogram latter.
807-
// 3. `[]*Histogram` are the partition-level histograms which just delete some values when we merge the global-level topN.
808-
func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*TopN, n uint32, hists []*Histogram,
809-
isIndex bool, killed *uint32) (*TopN, []TopNMeta, []*Histogram, error) {
810-
if CheckEmptyTopNs(topNs) {
811-
return nil, nil, hists, nil
812-
}
813-
partNum := len(topNs)
814-
// Different TopN structures may hold the same value, we have to merge them.
815-
counter := make(map[hack.MutableString]float64)
816-
// datumMap is used to store the mapping from the string type to datum type.
817-
// The datum is used to find the value in the histogram.
818-
datumMap := NewDatumMapCache()
819-
for i, topN := range topNs {
820-
if atomic.LoadUint32(killed) == 1 {
821-
return nil, nil, nil, errors.Trace(ErrQueryInterrupted)
822-
}
823-
if topN.TotalCount() == 0 {
824-
continue
825-
}
826-
for _, val := range topN.TopN {
827-
encodedVal := hack.String(val.Encoded)
828-
_, exists := counter[encodedVal]
829-
counter[encodedVal] += float64(val.Count)
830-
if exists {
831-
// We have already calculated the encodedVal from the histogram, so just continue to next topN value.
832-
continue
833-
}
834-
// We need to check whether the value corresponding to encodedVal is contained in other partition-level stats.
835-
// 1. Check the topN first.
836-
// 2. If the topN doesn't contain the value corresponding to encodedVal. We should check the histogram.
837-
for j := 0; j < partNum; j++ {
838-
if atomic.LoadUint32(killed) == 1 {
839-
return nil, nil, nil, errors.Trace(ErrQueryInterrupted)
840-
}
841-
if (j == i && version >= 2) || topNs[j].FindTopN(val.Encoded) != -1 {
842-
continue
843-
}
844-
// Get the encodedVal from the hists[j]
845-
datum, exists := datumMap.Get(encodedVal)
846-
if !exists {
847-
d, err := datumMap.Put(val, encodedVal, hists[0].Tp.GetType(), isIndex, loc)
848-
if err != nil {
849-
return nil, nil, nil, err
850-
}
851-
datum = d
852-
}
853-
// Get the row count which the value is equal to the encodedVal from histogram.
854-
count, _ := hists[j].EqualRowCount(nil, datum, isIndex)
855-
if count != 0 {
856-
counter[encodedVal] += count
857-
// Remove the value corresponding to encodedVal from the histogram.
858-
hists[j].BinarySearchRemoveVal(TopNMeta{Encoded: datum.GetBytes(), Count: uint64(count)})
859-
}
860-
}
861-
}
862-
}
863-
numTop := len(counter)
864-
if numTop == 0 {
865-
return nil, nil, hists, nil
866-
}
867-
sorted := make([]TopNMeta, 0, numTop)
868-
for value, cnt := range counter {
869-
data := hack.Slice(string(value))
870-
sorted = append(sorted, TopNMeta{Encoded: data, Count: uint64(cnt)})
871-
}
872-
globalTopN, leftTopN := GetMergedTopNFromSortedSlice(sorted, n)
873-
return globalTopN, leftTopN, hists, nil
874-
}
875-
876796
// MergeTopN is used to merge more TopN structures to generate a new TopN struct by the given size.
877797
// The input parameters are multiple TopN structures to be merged and the size of the new TopN that will be generated.
878798
// The output parameters are the newly generated TopN structure and the remaining numbers.

pkg/statistics/cmsketch_test.go

Lines changed: 0 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,9 @@ import (
1919
"math"
2020
"math/rand"
2121
"testing"
22-
"time"
2322

2423
"github.com/pingcap/errors"
2524
"github.com/pingcap/tidb/pkg/parser/mysql"
26-
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
2725
"github.com/pingcap/tidb/pkg/types"
2826
"github.com/pingcap/tidb/pkg/util/chunk"
2927
"github.com/pingcap/tidb/pkg/util/codec"
@@ -256,39 +254,6 @@ func TestCMSketchCodingTopN(t *testing.T) {
256254
require.NoError(t, err)
257255
}
258256

259-
func TestMergePartTopN2GlobalTopNWithoutHists(t *testing.T) {
260-
loc := time.UTC
261-
sc := stmtctx.NewStmtCtxWithTimeZone(loc)
262-
version := 1
263-
isKilled := uint32(0)
264-
265-
// Prepare TopNs.
266-
topNs := make([]*TopN, 0, 10)
267-
for i := 0; i < 10; i++ {
268-
// Construct TopN, should be key(1, 1) -> 2, key(1, 2) -> 2, key(1, 3) -> 3.
269-
topN := NewTopN(3)
270-
{
271-
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(1))
272-
require.NoError(t, err)
273-
topN.AppendTopN(key1, 2)
274-
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(2))
275-
require.NoError(t, err)
276-
topN.AppendTopN(key2, 2)
277-
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(3))
278-
require.NoError(t, err)
279-
topN.AppendTopN(key3, 3)
280-
}
281-
topNs = append(topNs, topN)
282-
}
283-
284-
// Test merge 2 topN with nil hists.
285-
globalTopN, leftTopN, _, err := MergePartTopN2GlobalTopN(loc, version, topNs, 2, nil, false, &isKilled)
286-
require.NoError(t, err)
287-
require.Len(t, globalTopN.TopN, 2, "should only have 2 topN")
288-
require.Equal(t, uint64(50), globalTopN.TotalCount(), "should have 50 rows")
289-
require.Len(t, leftTopN, 1, "should have 1 left topN")
290-
}
291-
292257
func TestSortTopnMeta(t *testing.T) {
293258
data := []TopNMeta{{
294259
Encoded: []byte("a"),
@@ -300,54 +265,3 @@ func TestSortTopnMeta(t *testing.T) {
300265
SortTopnMeta(data)
301266
require.Equal(t, uint64(2), data[0].Count)
302267
}
303-
304-
func TestMergePartTopN2GlobalTopNWithHists(t *testing.T) {
305-
loc := time.UTC
306-
sc := stmtctx.NewStmtCtxWithTimeZone(loc)
307-
version := 1
308-
isKilled := uint32(0)
309-
310-
// Prepare TopNs.
311-
topNs := make([]*TopN, 0, 10)
312-
for i := 0; i < 10; i++ {
313-
// Construct TopN, should be key1 -> 2, key2 -> 2, key3 -> 3.
314-
topN := NewTopN(3)
315-
{
316-
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1))
317-
require.NoError(t, err)
318-
topN.AppendTopN(key1, 2)
319-
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(2))
320-
require.NoError(t, err)
321-
topN.AppendTopN(key2, 2)
322-
if i%2 == 0 {
323-
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(3))
324-
require.NoError(t, err)
325-
topN.AppendTopN(key3, 3)
326-
}
327-
}
328-
topNs = append(topNs, topN)
329-
}
330-
331-
// Prepare Hists.
332-
hists := make([]*Histogram, 0, 10)
333-
for i := 0; i < 10; i++ {
334-
// Construct Hist
335-
h := NewHistogram(1, 10, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0)
336-
h.Bounds.AppendInt64(0, 1)
337-
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 20})
338-
h.Bounds.AppendInt64(0, 2)
339-
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 30})
340-
h.Bounds.AppendInt64(0, 3)
341-
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 30})
342-
h.Bounds.AppendInt64(0, 4)
343-
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 40})
344-
hists = append(hists, h)
345-
}
346-
347-
// Test merge 2 topN.
348-
globalTopN, leftTopN, _, err := MergePartTopN2GlobalTopN(loc, version, topNs, 2, hists, false, &isKilled)
349-
require.NoError(t, err)
350-
require.Len(t, globalTopN.TopN, 2, "should only have 2 topN")
351-
require.Equal(t, uint64(55), globalTopN.TotalCount(), "should have 55")
352-
require.Len(t, leftTopN, 1, "should have 1 left topN")
353-
}

pkg/statistics/handle/globalstats/BUILD.bazel

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,12 @@ go_test(
3838
"globalstats_test.go",
3939
"main_test.go",
4040
"topn_bench_test.go",
41+
"topn_test.go",
4142
],
43+
embed = [":globalstats"],
4244
flaky = True,
43-
shard_count = 19,
45+
shard_count = 21,
4446
deps = [
45-
":globalstats",
4647
"//pkg/config",
4748
"//pkg/parser/model",
4849
"//pkg/parser/mysql",

pkg/statistics/handle/globalstats/topn.go

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ package globalstats
1717
import (
1818
"strings"
1919
"sync"
20+
"sync/atomic"
2021
"time"
2122

2223
"github.com/pingcap/errors"
2324
"github.com/pingcap/tidb/pkg/sessionctx"
2425
"github.com/pingcap/tidb/pkg/statistics"
26+
"github.com/pingcap/tidb/pkg/util/hack"
2527
"github.com/tiancaiamao/gp"
2628
)
2729

@@ -32,7 +34,7 @@ func mergeGlobalStatsTopN(gp *gp.Pool, sc sessionctx.Context, wrapper *StatsWrap
3234
killed := &sc.GetSessionVars().Killed
3335
// use original method if concurrency equals 1 or for version1
3436
if mergeConcurrency < 2 {
35-
return statistics.MergePartTopN2GlobalTopN(timeZone, version, wrapper.AllTopN, n, wrapper.AllHg, isIndex, killed)
37+
return MergePartTopN2GlobalTopN(timeZone, version, wrapper.AllTopN, n, wrapper.AllHg, isIndex, killed)
3638
}
3739
batchSize := len(wrapper.AllTopN) / mergeConcurrency
3840
if batchSize < 1 {
@@ -113,3 +115,81 @@ func MergeGlobalStatsTopNByConcurrency(gp *gp.Pool, mergeConcurrency, mergeBatch
113115
statistics.SortTopnMeta(result)
114116
return globalTopN, result, wrapper.AllHg, nil
115117
}
118+
119+
// MergePartTopN2GlobalTopN is used to merge the partition-level topN to global-level topN.
120+
// The input parameters:
121+
// 1. `topNs` are the partition-level topNs to be merged.
122+
// 2. `n` is the size of the global-level topN. Notice: This value can be 0 and has no default value, we must explicitly specify this value.
123+
// 3. `hists` are the partition-level histograms. Some values not in topN may be placed in the histogram. We need it here to make the value in the global-level TopN more accurate.
124+
//
125+
// The output parameters:
126+
// 1. `*TopN` is the final global-level topN.
127+
// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, but is not placed to global-level TopN. We should put them back to histogram latter.
128+
// 3. `[]*Histogram` are the partition-level histograms which just delete some values when we merge the global-level topN.
129+
func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statistics.TopN, n uint32, hists []*statistics.Histogram,
130+
isIndex bool, killed *uint32) (*statistics.TopN, []statistics.TopNMeta, []*statistics.Histogram, error) {
131+
if statistics.CheckEmptyTopNs(topNs) {
132+
return nil, nil, hists, nil
133+
}
134+
partNum := len(topNs)
135+
// Different TopN structures may hold the same value, we have to merge them.
136+
counter := make(map[hack.MutableString]float64)
137+
// datumMap is used to store the mapping from the string type to datum type.
138+
// The datum is used to find the value in the histogram.
139+
datumMap := statistics.NewDatumMapCache()
140+
for i, topN := range topNs {
141+
if atomic.LoadUint32(killed) == 1 {
142+
return nil, nil, nil, errors.Trace(statistics.ErrQueryInterrupted)
143+
}
144+
if topN.TotalCount() == 0 {
145+
continue
146+
}
147+
for _, val := range topN.TopN {
148+
encodedVal := hack.String(val.Encoded)
149+
_, exists := counter[encodedVal]
150+
counter[encodedVal] += float64(val.Count)
151+
if exists {
152+
// We have already calculated the encodedVal from the histogram, so just continue to next topN value.
153+
continue
154+
}
155+
// We need to check whether the value corresponding to encodedVal is contained in other partition-level stats.
156+
// 1. Check the topN first.
157+
// 2. If the topN doesn't contain the value corresponding to encodedVal. We should check the histogram.
158+
for j := 0; j < partNum; j++ {
159+
if atomic.LoadUint32(killed) == 1 {
160+
return nil, nil, nil, errors.Trace(statistics.ErrQueryInterrupted)
161+
}
162+
if (j == i && version >= 2) || topNs[j].FindTopN(val.Encoded) != -1 {
163+
continue
164+
}
165+
// Get the encodedVal from the hists[j]
166+
datum, exists := datumMap.Get(encodedVal)
167+
if !exists {
168+
d, err := datumMap.Put(val, encodedVal, hists[0].Tp.GetType(), isIndex, loc)
169+
if err != nil {
170+
return nil, nil, nil, err
171+
}
172+
datum = d
173+
}
174+
// Get the row count which the value is equal to the encodedVal from histogram.
175+
count, _ := hists[j].EqualRowCount(nil, datum, isIndex)
176+
if count != 0 {
177+
counter[encodedVal] += count
178+
// Remove the value corresponding to encodedVal from the histogram.
179+
hists[j].BinarySearchRemoveVal(statistics.TopNMeta{Encoded: datum.GetBytes(), Count: uint64(count)})
180+
}
181+
}
182+
}
183+
}
184+
numTop := len(counter)
185+
if numTop == 0 {
186+
return nil, nil, hists, nil
187+
}
188+
sorted := make([]statistics.TopNMeta, 0, numTop)
189+
for value, cnt := range counter {
190+
data := hack.Slice(string(value))
191+
sorted = append(sorted, statistics.TopNMeta{Encoded: data, Count: uint64(cnt)})
192+
}
193+
globalTopN, leftTopN := statistics.GetMergedTopNFromSortedSlice(sorted, n)
194+
return globalTopN, leftTopN, hists, nil
195+
}

pkg/statistics/handle/globalstats/topn_bench_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package globalstats_test
15+
package globalstats
1616

1717
import (
1818
"fmt"
@@ -22,7 +22,6 @@ import (
2222
"github.com/pingcap/tidb/pkg/parser/mysql"
2323
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
2424
"github.com/pingcap/tidb/pkg/statistics"
25-
"github.com/pingcap/tidb/pkg/statistics/handle/globalstats"
2625
"github.com/pingcap/tidb/pkg/types"
2726
"github.com/pingcap/tidb/pkg/util/chunk"
2827
"github.com/pingcap/tidb/pkg/util/codec"
@@ -77,7 +76,7 @@ func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) {
7776
b.ResetTimer()
7877
for i := 0; i < b.N; i++ {
7978
// Benchmark merge 10 topN.
80-
_, _, _, _ = statistics.MergePartTopN2GlobalTopN(loc, version, topNs, 10, hists, false, &isKilled)
79+
_, _, _, _ = MergePartTopN2GlobalTopN(loc, version, topNs, 10, hists, false, &isKilled)
8180
}
8281
}
8382

@@ -124,20 +123,20 @@ func benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(partitions int, b *test
124123
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40})
125124
hists = append(hists, h)
126125
}
127-
wrapper := globalstats.NewStatsWrapper(hists, topNs)
126+
wrapper := NewStatsWrapper(hists, topNs)
128127
const mergeConcurrency = 4
129128
batchSize := len(wrapper.AllTopN) / mergeConcurrency
130129
if batchSize < 1 {
131130
batchSize = 1
132-
} else if batchSize > globalstats.MaxPartitionMergeBatchSize {
133-
batchSize = globalstats.MaxPartitionMergeBatchSize
131+
} else if batchSize > MaxPartitionMergeBatchSize {
132+
batchSize = MaxPartitionMergeBatchSize
134133
}
135134
gpool := gp.New(mergeConcurrency, 5*time.Minute)
136135
defer gpool.Close()
137136
b.ResetTimer()
138137
for i := 0; i < b.N; i++ {
139138
// Benchmark merge 10 topN.
140-
_, _, _, _ = globalstats.MergeGlobalStatsTopNByConcurrency(gpool, mergeConcurrency, batchSize, wrapper, loc, version, 10, false, &isKilled)
139+
_, _, _, _ = MergeGlobalStatsTopNByConcurrency(gpool, mergeConcurrency, batchSize, wrapper, loc, version, 10, false, &isKilled)
141140
}
142141
}
143142

0 commit comments

Comments
 (0)