Skip to content

Commit e828c48

Browse files
authored
chore(engine): Implement execution pipeline for the limit operator (#17264)
Signed-off-by: Christian Haudum <[email protected]>
1 parent cff0df6 commit e828c48

File tree

5 files changed

+191
-9
lines changed

5 files changed

+191
-9
lines changed

pkg/engine/executor/executor.go

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"errors"
66
"fmt"
77

8+
"github.com/apache/arrow-go/v18/arrow"
9+
810
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
911
)
1012

@@ -68,7 +70,7 @@ func (c *Context) executeSortMerge(_ context.Context, _ *physical.SortMerge, inp
6870
return errorPipeline(errNotImplemented)
6971
}
7072

71-
func (c *Context) executeLimit(_ context.Context, _ *physical.Limit, inputs []Pipeline) Pipeline {
73+
func (c *Context) executeLimit(_ context.Context, limit *physical.Limit, inputs []Pipeline) Pipeline {
7274
if len(inputs) == 0 {
7375
return emptyPipeline()
7476
}
@@ -77,7 +79,51 @@ func (c *Context) executeLimit(_ context.Context, _ *physical.Limit, inputs []Pi
7779
return errorPipeline(fmt.Errorf("limit expects exactly one input, got %d", len(inputs)))
7880
}
7981

80-
return errorPipeline(errNotImplemented)
82+
// We gradually reduce offsetRemaining and limitRemaining as we process more records, as the
83+
// offsetRemaining and limitRemaining may cross record boundaries.
84+
var (
85+
offsetRemaining = int64(limit.Skip)
86+
limitRemaining = int64(limit.Fetch)
87+
)
88+
89+
return newGenericPipeline(Local, func(inputs []Pipeline) state {
90+
var length int64
91+
var start, end int64
92+
var batch arrow.Record
93+
94+
// We skip yielding zero-length batches while offsetRemainig > 0
95+
for length == 0 {
96+
// Stop once we reached the limit
97+
if limitRemaining <= 0 {
98+
return Exhausted
99+
}
100+
101+
// Pull the next item from downstream
102+
input := inputs[0]
103+
err := input.Read()
104+
if err != nil {
105+
return newState(input.Value())
106+
}
107+
batch, _ = input.Value()
108+
109+
// We want to slice batch so it only contains the rows we're looking for
110+
// accounting for both the limit and offset.
111+
// We constrain the start and end to be within the bounds of the record.
112+
start = min(offsetRemaining, batch.NumRows())
113+
end = min(start+limitRemaining, batch.NumRows())
114+
length = end - start
115+
116+
offsetRemaining -= start
117+
limitRemaining -= length
118+
}
119+
120+
if length <= 0 && offsetRemaining <= 0 {
121+
return Exhausted
122+
}
123+
124+
rec := batch.NewSlice(start, end)
125+
return successState(rec)
126+
}, inputs...)
81127
}
82128

83129
func (c *Context) executeFilter(_ context.Context, _ *physical.Filter, inputs []Pipeline) Pipeline {

pkg/engine/executor/executor_test.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,6 @@ func TestExecutor_Limit(t *testing.T) {
5757
require.ErrorContains(t, err, EOF.Error())
5858
})
5959

60-
t.Run("is not implemented", func(t *testing.T) {
61-
c := &Context{}
62-
pipeline := c.executeLimit(context.TODO(), &physical.Limit{}, []Pipeline{emptyPipeline()})
63-
err := pipeline.Read()
64-
require.ErrorContains(t, err, errNotImplemented.Error())
65-
})
66-
6760
t.Run("multiple inputs result in error", func(t *testing.T) {
6861
c := &Context{}
6962
pipeline := c.executeLimit(context.TODO(), &physical.Limit{}, []Pipeline{emptyPipeline(), emptyPipeline()})

pkg/engine/executor/limit.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package executor

pkg/engine/executor/limit_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package executor
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
10+
)
11+
12+
func TestLimit(t *testing.T) {
13+
for _, tt := range []struct {
14+
name string
15+
offset uint32
16+
limit uint32
17+
batchSize int64
18+
expectedBatches int64
19+
expectedRows int64
20+
}{
21+
{
22+
name: "without offset",
23+
offset: 0,
24+
limit: 5,
25+
batchSize: 3,
26+
expectedBatches: 2,
27+
expectedRows: 5,
28+
},
29+
{
30+
name: "with offset",
31+
offset: 3,
32+
limit: 5,
33+
batchSize: 4,
34+
expectedBatches: 2,
35+
expectedRows: 5,
36+
},
37+
{
38+
name: "with offset greater than batch size",
39+
offset: 5,
40+
limit: 6,
41+
batchSize: 2,
42+
expectedBatches: 4,
43+
expectedRows: 6,
44+
},
45+
} {
46+
t.Run(tt.name, func(t *testing.T) {
47+
c := &Context{
48+
batchSize: tt.batchSize,
49+
}
50+
limit := &physical.Limit{
51+
Skip: tt.offset,
52+
Fetch: tt.limit,
53+
}
54+
inputs := []Pipeline{
55+
incrementingIntPipeline.Pipeline(tt.batchSize, 1000),
56+
}
57+
58+
pipeline := c.executeLimit(context.Background(), limit, inputs)
59+
batches, rows := collect(t, pipeline)
60+
61+
require.Equal(t, tt.expectedBatches, batches)
62+
require.Equal(t, tt.expectedRows, rows)
63+
})
64+
}
65+
}

pkg/engine/executor/util_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package executor
2+
3+
import (
4+
"testing"
5+
6+
"github.com/apache/arrow-go/v18/arrow"
7+
"github.com/apache/arrow-go/v18/arrow/array"
8+
"github.com/apache/arrow-go/v18/arrow/memory"
9+
)
10+
11+
var (
12+
incrementingIntPipeline = newRecordGenerator(
13+
arrow.NewSchema([]arrow.Field{
14+
{Name: "id", Type: arrow.PrimitiveTypes.Int64},
15+
}, nil),
16+
func(offset, sz int64, schema *arrow.Schema) arrow.Record {
17+
builder := array.NewInt64Builder(memory.DefaultAllocator)
18+
defer builder.Release()
19+
20+
for i := int64(0); i < sz; i++ {
21+
builder.Append(offset + i)
22+
}
23+
24+
data := builder.NewArray()
25+
defer data.Release()
26+
27+
columns := []arrow.Array{data}
28+
return array.NewRecord(schema, columns, sz)
29+
},
30+
)
31+
)
32+
33+
type recordGenerator struct {
34+
schema *arrow.Schema
35+
batch func(offset, sz int64, schema *arrow.Schema) arrow.Record
36+
}
37+
38+
func newRecordGenerator(schema *arrow.Schema, batch func(offset, sz int64, schema *arrow.Schema) arrow.Record) *recordGenerator {
39+
return &recordGenerator{
40+
schema: schema,
41+
batch: batch,
42+
}
43+
}
44+
45+
func (p *recordGenerator) Pipeline(batchSize int64, rows int64) Pipeline {
46+
var pos int64
47+
return newGenericPipeline(
48+
Local,
49+
func(_ []Pipeline) state {
50+
if pos >= rows {
51+
return Exhausted
52+
}
53+
batch := p.batch(pos, batchSize, p.schema)
54+
pos += batch.NumRows()
55+
return successState(batch)
56+
},
57+
nil,
58+
)
59+
}
60+
61+
// collect reads all data from the pipeline until it is exhausted or returns an error.
62+
func collect(t *testing.T, pipeline Pipeline) (batches int64, rows int64) {
63+
for {
64+
err := pipeline.Read()
65+
if err == EOF {
66+
break
67+
}
68+
if err != nil {
69+
t.Fatalf("did not expect error, got %s", err.Error())
70+
}
71+
batch, _ := pipeline.Value()
72+
t.Log("batch", batch, "err", err)
73+
batches++
74+
rows += batch.NumRows()
75+
}
76+
return batches, rows
77+
}

0 commit comments

Comments
 (0)