Skip to content

Commit ccb65f9

Browse files
owen-dchaudumrfratto
authored
feat(engine, dataobj): logql bench wiring for new engine [non-fork] (#17627)
Signed-off-by: Christian Haudum <[email protected]> Co-authored-by: Christian Haudum <[email protected]> Co-authored-by: Robert Fratto <[email protected]>
1 parent c8d2398 commit ccb65f9

File tree

9 files changed

+229
-44
lines changed

9 files changed

+229
-44
lines changed

pkg/engine/engine.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo
8282
logicalPlan, err := logical.BuildPlan(params)
8383
if err != nil {
8484
level.Warn(logger).Log("msg", "failed to create logical plan", "err", err)
85-
e.metrics.subqueries.WithLabelValues(status, statusNotImplemented).Inc()
85+
e.metrics.subqueries.WithLabelValues(statusNotImplemented).Inc()
8686
return builder.empty(), ErrNotSupported
8787
}
8888
e.metrics.logicalPlanning.Observe(time.Since(t).Seconds())
@@ -94,13 +94,13 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo
9494
plan, err := planner.Build(logicalPlan)
9595
if err != nil {
9696
level.Warn(logger).Log("msg", "failed to create physical plan", "err", err)
97-
e.metrics.subqueries.WithLabelValues(status, statusFailure).Inc()
97+
e.metrics.subqueries.WithLabelValues(statusFailure).Inc()
9898
return builder.empty(), ErrNotSupported
9999
}
100100
plan, err = planner.Optimize(plan)
101101
if err != nil {
102102
level.Warn(logger).Log("msg", "failed to optimize physical plan", "err", err)
103-
e.metrics.subqueries.WithLabelValues(status, statusFailure).Inc()
103+
e.metrics.subqueries.WithLabelValues(statusFailure).Inc()
104104
return builder.empty(), ErrNotSupported
105105
}
106106
e.metrics.physicalPlanning.Observe(time.Since(t).Seconds())
@@ -117,14 +117,14 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo
117117
defer pipeline.Close()
118118

119119
if err := collectResult(ctx, pipeline, builder); err != nil {
120-
e.metrics.subqueries.WithLabelValues(status, statusFailure).Inc()
120+
e.metrics.subqueries.WithLabelValues(statusFailure).Inc()
121121
return builder.empty(), err
122122
}
123123

124124
statsCtx := stats.FromContext(ctx)
125125
builder.setStats(statsCtx.Result(time.Since(start), 0, builder.len()))
126126

127-
e.metrics.subqueries.WithLabelValues(status, statusSuccess).Inc()
127+
e.metrics.subqueries.WithLabelValues(statusSuccess).Inc()
128128
e.metrics.execution.Observe(time.Since(t).Seconds())
129129
durExecution := time.Since(t)
130130

@@ -142,7 +142,7 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo
142142
func collectResult(_ context.Context, pipeline executor.Pipeline, result *resultBuilder) error {
143143
for {
144144
if err := pipeline.Read(); err != nil {
145-
if err == executor.EOF {
145+
if errors.Is(err, executor.EOF) {
146146
break
147147
}
148148
return err

pkg/engine/executor/dataobjscan.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"golang.org/x/sync/errgroup"
1818

1919
"github.com/grafana/loki/v3/pkg/dataobj"
20+
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
2021
"github.com/grafana/loki/v3/pkg/engine/internal/types"
2122
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
2223
"github.com/grafana/loki/v3/pkg/util/topk"
@@ -194,7 +195,6 @@ func (s *dataobjScan) read() (arrow.Record, error) {
194195
for _, reader := range s.readers {
195196
g.Go(func() error {
196197
buf := make([]dataobj.Record, 512)
197-
198198
for {
199199
n, err := reader.Read(ctx, buf)
200200
if n == 0 && errors.Is(err, io.EOF) {
@@ -425,9 +425,10 @@ func schemaFromColumns(columns []physical.ColumnExpression) (*arrow.Schema, erro
425425
})
426426

427427
case types.ColumnTypeBuiltin:
428+
ty, md := builtinColumnType(columnExpr.Ref)
428429
addField(arrow.Field{
429430
Name: columnExpr.Ref.Column,
430-
Type: builtinColumnType(columnExpr.Ref),
431+
Type: ty,
431432
Nullable: true,
432433
Metadata: md,
433434
})
@@ -469,16 +470,16 @@ func schemaFromColumns(columns []physical.ColumnExpression) (*arrow.Schema, erro
469470
return arrow.NewSchema(fields, nil), nil
470471
}
471472

472-
func builtinColumnType(ref types.ColumnRef) arrow.DataType {
473+
func builtinColumnType(ref types.ColumnRef) (arrow.DataType, arrow.Metadata) {
473474
if ref.Type != types.ColumnTypeBuiltin {
474475
panic("builtinColumnType called with a non-builtin column")
475476
}
476477

477478
switch ref.Column {
478479
case types.ColumnNameBuiltinTimestamp:
479-
return arrow.FixedWidthTypes.Timestamp_ns
480+
return arrow.FixedWidthTypes.Timestamp_ns, datatype.ColumnMetadataBuiltinTimestamp
480481
case types.ColumnNameBuiltinMessage:
481-
return arrow.BinaryTypes.String
482+
return arrow.BinaryTypes.String, datatype.ColumnMetadataBuiltinMessage
482483
default:
483484
panic(fmt.Sprintf("unsupported builtin column type %s", ref))
484485
}

pkg/engine/executor/dataobjscan_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/stretchr/testify/require"
1111

1212
"github.com/grafana/loki/v3/pkg/dataobj"
13+
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
1314
"github.com/grafana/loki/v3/pkg/engine/internal/types"
1415
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
1516
"github.com/grafana/loki/v3/pkg/logproto"
@@ -20,7 +21,6 @@ import (
2021
var (
2122
labelMD = buildMetadata(types.ColumnTypeLabel)
2223
metadataMD = buildMetadata(types.ColumnTypeMetadata)
23-
builtinMD = buildMetadata(types.ColumnTypeBuiltin)
2424
)
2525

2626
func buildMetadata(ty types.ColumnType) arrow.Metadata {
@@ -77,8 +77,8 @@ func Test_dataobjScan(t *testing.T) {
7777
{Name: "service", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
7878
{Name: "guid", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
7979
{Name: "pod", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
80-
{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: builtinMD, Nullable: true},
81-
{Name: "message", Type: arrow.BinaryTypes.String, Metadata: builtinMD, Nullable: true},
80+
{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: datatype.ColumnMetadataBuiltinTimestamp, Nullable: true},
81+
{Name: "message", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadataBuiltinMessage, Nullable: true},
8282
}
8383

8484
expectCSV := `prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:02,hello world
@@ -106,7 +106,7 @@ prod,loki,eeee-ffff-aaaa-bbbb,NULL,1970-01-01 00:00:10,goodbye world`
106106
})
107107

108108
expectFields := []arrow.Field{
109-
{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: builtinMD, Nullable: true},
109+
{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: datatype.ColumnMetadataBuiltinTimestamp, Nullable: true},
110110
{Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
111111
}
112112

@@ -206,8 +206,8 @@ func Test_dataobjScan_DuplicateColumns(t *testing.T) {
206206
{Name: "namespace", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
207207
{Name: "pod", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
208208

209-
{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: builtinMD, Nullable: true},
210-
{Name: "message", Type: arrow.BinaryTypes.String, Metadata: builtinMD, Nullable: true},
209+
{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: datatype.ColumnMetadataBuiltinTimestamp, Nullable: true},
210+
{Name: "message", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadataBuiltinMessage, Nullable: true},
211211
}
212212

213213
expectCSV := `prod,NULL,pod-1,loki,NULL,override,1970-01-01 00:00:01,message 1

pkg/engine/executor/limit.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func NewLimitPipeline(input Pipeline, skip, fetch uint32) *GenericPipeline {
2626
input := inputs[0]
2727
err := input.Read()
2828
if err != nil {
29-
return newState(input.Value())
29+
return failureState(err)
3030
}
3131
batch, _ = input.Value()
3232

pkg/engine/executor/sortmerge.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func (p *KWayMerge) read() error {
119119
if p.batches[i] == nil || p.offsets[i] == p.batches[i].NumRows() {
120120
err := p.inputs[i].Read()
121121
if err != nil {
122-
if err == EOF {
122+
if errors.Is(err, EOF) {
123123
p.exhausted[i] = true
124124
continue
125125
}
@@ -136,15 +136,16 @@ func (p *KWayMerge) read() error {
136136
if err != nil {
137137
return err
138138
}
139-
tsCol, ok := col.ToArray().(*array.Int64)
139+
arr := col.ToArray()
140+
tsCol, ok := arr.(*array.Timestamp)
140141
if !ok {
141142
return errors.New("column is not a timestamp column")
142143
}
143144
ts := tsCol.Value(int(p.offsets[i]))
144145

145146
// Populate slices for sorting
146147
batchIndexes = append(batchIndexes, i)
147-
timestamps = append(timestamps, ts)
148+
timestamps = append(timestamps, int64(ts))
148149
}
149150

150151
// Pipeline is exhausted if no more input batches are available
@@ -182,7 +183,7 @@ func (p *KWayMerge) read() error {
182183
return err
183184
}
184185
// We assume the column is a Uint64 array
185-
tsCol, ok := col.ToArray().(*array.Int64)
186+
tsCol, ok := col.ToArray().(*array.Timestamp)
186187
if !ok {
187188
return errors.New("column is not a timestamp column")
188189
}
@@ -193,7 +194,7 @@ func (p *KWayMerge) read() error {
193194
for end < p.batches[j].NumRows() {
194195
ts := tsCol.Value(int(end))
195196
end++
196-
if p.compare(ts, timestamps[1]) {
197+
if p.compare(int64(ts), timestamps[1]) {
197198
break
198199
}
199200
}

pkg/engine/executor/sortmerge_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func TestSortMerge(t *testing.T) {
4747
merge := &physical.SortMerge{
4848
Column: &physical.ColumnExpr{
4949
Ref: types.ColumnRef{
50-
Column: "timestamp",
50+
Column: types.ColumnNameBuiltinTimestamp,
5151
Type: types.ColumnTypeBuiltin,
5252
},
5353
},
@@ -77,14 +77,14 @@ func TestSortMerge(t *testing.T) {
7777

7878
tsCol, err := c.evaluator.eval(merge.Column, batch)
7979
require.NoError(t, err)
80-
arr := tsCol.ToArray().(*array.Int64)
80+
arr := tsCol.ToArray().(*array.Timestamp)
8181

8282
// Check if ts column is sorted
8383
for i := 0; i < arr.Len()-1; i++ {
8484
require.LessOrEqual(t, arr.Value(i), arr.Value(i+1))
8585
// also check ascending order across batches
8686
require.GreaterOrEqual(t, arr.Value(i), lastTs)
87-
lastTs = arr.Value(i + 1)
87+
lastTs = int64(arr.Value(i + 1))
8888
}
8989
batches++
9090
rows += batch.NumRows()
@@ -99,7 +99,7 @@ func TestSortMerge(t *testing.T) {
9999
merge := &physical.SortMerge{
100100
Column: &physical.ColumnExpr{
101101
Ref: types.ColumnRef{
102-
Column: "timestamp",
102+
Column: types.ColumnNameBuiltinTimestamp,
103103
Type: types.ColumnTypeBuiltin,
104104
},
105105
},
@@ -129,14 +129,14 @@ func TestSortMerge(t *testing.T) {
129129

130130
tsCol, err := c.evaluator.eval(merge.Column, batch)
131131
require.NoError(t, err)
132-
arr := tsCol.ToArray().(*array.Int64)
132+
arr := tsCol.ToArray().(*array.Timestamp)
133133

134134
// Check if ts column is sorted
135135
for i := 0; i < arr.Len()-1; i++ {
136136
require.GreaterOrEqual(t, arr.Value(i), arr.Value(i+1))
137137
// also check descending order across batches
138138
require.LessOrEqual(t, arr.Value(i), lastTs)
139-
lastTs = arr.Value(i + 1)
139+
lastTs = int64(arr.Value(i + 1))
140140
}
141141
batches++
142142
rows += batch.NumRows()

pkg/engine/executor/util_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package executor
22

33
import (
4+
"errors"
45
"testing"
56
"time"
67

@@ -52,19 +53,19 @@ func timestampPipeline(start time.Time, order time.Duration) *recordGenerator {
5253
return newRecordGenerator(
5354
arrow.NewSchema([]arrow.Field{
5455
{Name: "id", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Integer)},
55-
{Name: "timestamp", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Timestamp)},
56+
{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Timestamp)},
5657
}, nil),
5758

5859
func(offset, sz int64, schema *arrow.Schema) arrow.Record {
5960
idColBuilder := array.NewInt64Builder(memory.DefaultAllocator)
6061
defer idColBuilder.Release()
6162

62-
tsColBuilder := array.NewInt64Builder(memory.DefaultAllocator)
63+
tsColBuilder := array.NewTimestampBuilder(memory.DefaultAllocator, arrow.FixedWidthTypes.Timestamp_ns.(*arrow.TimestampType))
6364
defer tsColBuilder.Release()
6465

6566
for i := int64(0); i < sz; i++ {
6667
idColBuilder.Append(offset + i)
67-
tsColBuilder.Append(start.Add(order * (time.Duration(offset)*time.Second + time.Duration(i)*time.Millisecond)).UnixNano())
68+
tsColBuilder.Append(arrow.Timestamp(start.Add(order * (time.Duration(offset)*time.Second + time.Duration(i)*time.Millisecond)).UnixNano()))
6869
}
6970

7071
idData := idColBuilder.NewArray()
@@ -111,7 +112,7 @@ func (p *recordGenerator) Pipeline(batchSize int64, rows int64) Pipeline {
111112
func collect(t *testing.T, pipeline Pipeline) (batches int64, rows int64) {
112113
for {
113114
err := pipeline.Read()
114-
if err == EOF {
115+
if errors.Is(err, EOF) {
115116
break
116117
}
117118
if err != nil {

0 commit comments

Comments
 (0)