Skip to content

Commit 7ca7c3b

Browse files
authored
fix: Implement approx-topk function on querier (#17816)
1 parent 8f58df6 commit 7ca7c3b

File tree

5 files changed

+79
-1
lines changed

5 files changed

+79
-1
lines changed

pkg/logql/count_min_sketch.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package logql
22

33
import (
44
"container/heap"
5+
"context"
56
"fmt"
67
"slices"
78
"strings"
@@ -322,7 +323,7 @@ type CountMinSketchVectorStepEvaluator struct {
322323
buffer []byte
323324
}
324325

325-
var _ StepEvaluator = NewQuantileSketchVectorStepEvaluator(nil, 0)
326+
var _ StepEvaluator = NewCountMinSketchVectorStepEvaluator(nil)
326327

327328
func NewCountMinSketchVectorStepEvaluator(vec *CountMinSketchVector) *CountMinSketchVectorStepEvaluator {
328329
return &CountMinSketchVectorStepEvaluator{
@@ -357,3 +358,45 @@ func (e *CountMinSketchVectorStepEvaluator) Next() (bool, int64, StepResult) {
357358
func (*CountMinSketchVectorStepEvaluator) Close() error { return nil }
358359

359360
func (*CountMinSketchVectorStepEvaluator) Error() error { return nil }
361+
362+
var _ StepEvaluator = (*CountMinSketchEvalStepEvaluator)(nil)
363+
364+
// CountMinSketchEvalStepEvaluator transforms a CountMinSketchEvalExpr into a CountMinSketchVector.
365+
type CountMinSketchEvalStepEvaluator struct {
366+
ctx context.Context
367+
nextEvFactory SampleEvaluatorFactory
368+
expr *CountMinSketchEvalExpr
369+
params Params
370+
}
371+
372+
func NewCountMinSketchEvalStepEvaluator(ctx context.Context, nextEvFactory SampleEvaluatorFactory, expr *CountMinSketchEvalExpr, params Params) (*CountMinSketchEvalStepEvaluator, error) {
373+
return &CountMinSketchEvalStepEvaluator{
374+
ctx: ctx,
375+
nextEvFactory: nextEvFactory,
376+
expr: expr,
377+
params: params,
378+
}, nil
379+
}
380+
381+
func (e *CountMinSketchEvalStepEvaluator) Next() (bool, int64, StepResult) {
382+
nextEv, err := e.nextEvFactory.NewStepEvaluator(e.ctx, e.nextEvFactory, e.expr.SampleExpr, e.params)
383+
if err != nil {
384+
return false, 0, CountMinSketchVector{}
385+
}
386+
387+
ok, _, results := nextEv.Next()
388+
if !ok {
389+
return false, 0, CountMinSketchVector{}
390+
}
391+
392+
data := results.CountMinSketchVec()
393+
handler := NewCountMinSketchVectorStepEvaluator(&data)
394+
395+
return handler.Next()
396+
}
397+
398+
func (*CountMinSketchEvalStepEvaluator) Close() error { return nil }
399+
400+
func (*CountMinSketchEvalStepEvaluator) Error() error { return nil }
401+
402+
func (e *CountMinSketchEvalStepEvaluator) Explain(_ Node) {}

pkg/logql/downstream.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,9 @@ func (e CountMinSketchEvalExpr) String() string {
414414

415415
sb.WriteString(d.String())
416416
}
417+
if len(e.downstreams) == 0 {
418+
sb.WriteString(e.SampleExpr.String())
419+
}
417420
return fmt.Sprintf("CountMinSketchEval<%s>", sb.String())
418421
}
419422

pkg/logql/evaluator.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,8 @@ func (ev *DefaultEvaluator) NewStepEvaluator(
359359
})
360360
}
361361
return newVectorAggEvaluator(ctx, nextEvFactory, e, q, ev.maxCountMinSketchHeapSize)
362+
case *CountMinSketchEvalExpr:
363+
return NewCountMinSketchEvalStepEvaluator(ctx, nextEvFactory, e, q)
362364
case *syntax.RangeAggregationExpr:
363365
it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{
364366
&logproto.SampleQueryRequest{

pkg/logql/optimize.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,36 @@ func optimizeSampleExpr(expr syntax.SampleExpr) (syntax.SampleExpr, error) {
2020
if err != nil {
2121
return nil, err
2222
}
23+
replaceApproxTopK(expr)
2324
removeLineformat(expr)
2425
return expr, nil
2526
}
2627

28+
// replaceApproxTopKWithTopk replaces all ApproxTopKExpr with TopKExpr.
29+
// ApproxTopKExpr is not supported by the querier, so we replace it with the implementation if this function reaches the querier.
30+
func replaceApproxTopK(expr syntax.SampleExpr) {
31+
expr.Walk(func(e syntax.Expr) bool {
32+
vectorExpr, ok := e.(*syntax.VectorAggregationExpr)
33+
if !ok {
34+
return true
35+
}
36+
if vectorExpr.Operation != syntax.OpTypeApproxTopK {
37+
return true
38+
}
39+
40+
vectorExpr.Operation = syntax.OpTypeTopK
41+
vectorExpr.Left = &CountMinSketchEvalExpr{
42+
SampleExpr: &syntax.VectorAggregationExpr{
43+
Operation: syntax.OpTypeCountMinSketch,
44+
Params: 0,
45+
Grouping: vectorExpr.Grouping,
46+
Left: vectorExpr.Left,
47+
},
48+
}
49+
return true
50+
})
51+
}
52+
2753
// removeLineformat removes unnecessary line_format within a SampleExpr.
2854
func removeLineformat(expr syntax.SampleExpr) {
2955
expr.Walk(func(e syntax.Expr) bool {

pkg/logql/optimize_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ func Test_optimizeSampleExpr(t *testing.T) {
2727
{`sum by(name)(rate({region="us-east1"} | json | line_format "something else" | unwrap foo[5m]))`, `sum by (name)(rate({region="us-east1"} | json | unwrap foo[5m]))`},
2828
{`quantile_over_time(1,{region="us-east1"} | json | line_format "something else" | unwrap foo[5m])`, `quantile_over_time(1,{region="us-east1"} | json | unwrap foo[5m])`},
2929
{`sum by(name)(count_over_time({region="us-east1"} | json | line_format "something else" | label_format foo=bar | line_format "boo"[5m]))`, `sum by (name)(count_over_time({region="us-east1"} | json | label_format foo=bar[5m]))`},
30+
31+
// Replace approx_topk with __count_min_sketch__ variant
32+
{`approx_topk(3, 1+1)`, `topk(3,CountMinSketchEval<__count_min_sketch__(2)>)`},
33+
{`approx_topk(3, sum by(name)(rate({region="us-east1"}[5m])))`, `topk(3,CountMinSketchEval<__count_min_sketch__(sum by (name)(rate({region="us-east1"}[5m])))>)`},
3034
}
3135
for _, tt := range tests {
3236
t.Run(tt.in, func(t *testing.T) {

0 commit comments

Comments
 (0)