Skip to content

Commit a238816

Browse files
authored
perf: Use map for InPredicate when reading dataobj (#18325)
1 parent 152c9c6 commit a238816

File tree

11 files changed

+233
-44
lines changed

11 files changed

+233
-44
lines changed

pkg/dataobj/internal/dataset/predicate.go

Lines changed: 119 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package dataset
22

3-
import "fmt"
3+
import (
4+
"fmt"
5+
"iter"
6+
"unsafe"
7+
)
48

59
// Predicate is an expression used to filter rows in a [Reader].
610
type Predicate interface{ isPredicate() }
@@ -32,8 +36,8 @@ type (
3236
// An InPredicate is a [Predicate] which asserts that a row may only be
3337
// included if the Value of the Column is present in the provided Values.
3438
InPredicate struct {
35-
Column Column // Column to check.
36-
Values []Value // Values to check for inclusion.
39+
Column Column // Column to check.
40+
Values ValueSet // Set of values to check.
3741
}
3842

3943
// A GreaterThanPredicate is a [Predicate] which asserts that a row may only
@@ -111,3 +115,115 @@ func WalkPredicate(p Predicate, fn func(p Predicate) bool) {
111115

112116
fn(nil)
113117
}
118+
119+
type ValueSet interface {
120+
Contains(value Value) bool
121+
Iter() iter.Seq[Value]
122+
Size() int
123+
}
124+
125+
type Int64Set struct {
126+
values map[int64]Value
127+
}
128+
129+
func NewInt64ValueSet(values []Value) Int64Set {
130+
valuesMap := make(map[int64]Value, len(values))
131+
for _, v := range values {
132+
valuesMap[v.Int64()] = v
133+
}
134+
return Int64Set{
135+
values: valuesMap,
136+
}
137+
}
138+
139+
func (s Int64Set) Contains(value Value) bool {
140+
_, ok := s.values[value.Int64()]
141+
return ok
142+
}
143+
144+
func (s Int64Set) Iter() iter.Seq[Value] {
145+
return func(yield func(v Value) bool) {
146+
for _, v := range s.values {
147+
ok := yield(v)
148+
if !ok {
149+
return
150+
}
151+
}
152+
}
153+
}
154+
155+
func (s Int64Set) Size() int {
156+
return len(s.values)
157+
}
158+
159+
type Uint64ValueSet struct {
160+
values map[uint64]Value
161+
}
162+
163+
func NewUint64ValueSet(values []Value) Uint64ValueSet {
164+
valuesMap := make(map[uint64]Value, len(values))
165+
for _, v := range values {
166+
valuesMap[v.Uint64()] = v
167+
}
168+
return Uint64ValueSet{
169+
values: valuesMap,
170+
}
171+
}
172+
173+
func (s Uint64ValueSet) Contains(value Value) bool {
174+
_, ok := s.values[value.Uint64()]
175+
return ok
176+
}
177+
178+
func (s Uint64ValueSet) Iter() iter.Seq[Value] {
179+
return func(yield func(v Value) bool) {
180+
for _, v := range s.values {
181+
ok := yield(v)
182+
if !ok {
183+
return
184+
}
185+
}
186+
}
187+
}
188+
189+
func (s Uint64ValueSet) Size() int {
190+
return len(s.values)
191+
}
192+
193+
type ByteArrayValueSet struct {
194+
values map[string]Value
195+
}
196+
197+
func NewByteArrayValueSet(values []Value) ByteArrayValueSet {
198+
valuesMap := make(map[string]Value, len(values))
199+
for _, v := range values {
200+
valuesMap[unsafeString(v.ByteArray())] = v
201+
}
202+
return ByteArrayValueSet{
203+
values: valuesMap,
204+
}
205+
}
206+
207+
func (s ByteArrayValueSet) Contains(value Value) bool {
208+
_, ok := s.values[unsafeString(value.ByteArray())]
209+
return ok
210+
}
211+
212+
func (s ByteArrayValueSet) Iter() iter.Seq[Value] {
213+
return func(yield func(v Value) bool) {
214+
for _, v := range s.values {
215+
ok := yield(v)
216+
if !ok {
217+
return
218+
}
219+
}
220+
}
221+
}
222+
223+
func (s ByteArrayValueSet) Size() int {
224+
return len(s.values)
225+
}
226+
227+
func unsafeString(in []byte) string {
228+
return unsafe.String(unsafe.SliceData(in), len(in))
229+
}

pkg/dataobj/internal/dataset/reader.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -319,15 +319,11 @@ func checkPredicate(p Predicate, lookup map[Column]int, row Row) bool {
319319
panic("checkPredicate: column not found")
320320
}
321321

322-
found := false
323-
for _, v := range p.Values {
324-
if CompareValues(row.Values[columnIndex], v) == 0 {
325-
found = true
326-
break
327-
}
322+
value := row.Values[columnIndex]
323+
if value.IsNil() || value.Type() != p.Column.ColumnInfo().Type {
324+
return false
328325
}
329-
330-
return found
326+
return p.Values.Contains(value)
331327

332328
case GreaterThanPredicate:
333329
columnIndex, ok := lookup[p.Column]
@@ -796,7 +792,7 @@ func (r *Reader) buildColumnPredicateRanges(ctx context.Context, c Column, p Pre
796792
include = CompareValues(minValue, p.Value) < 0
797793
case InPredicate:
798794
// Check if any value falls within the page's range
799-
for _, v := range p.Values {
795+
for v := range p.Values.Iter() {
800796
if CompareValues(v, minValue) >= 0 && CompareValues(v, maxValue) <= 0 {
801797
include = true
802798
break

pkg/dataobj/internal/dataset/reader_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -343,12 +343,12 @@ func Test_BuildPredicateRanges(t *testing.T) {
343343
name: "InPredicate with values inside and outside page ranges",
344344
predicate: InPredicate{
345345
Column: cols[1], // timestamp column
346-
Values: []Value{
347-
Int64Value(50), // Inside page 1 (0-100)
348-
Int64Value(300), // Inside page 2 (200-500)
349-
Int64Value(150), // Outside all pages
350-
Int64Value(600), // Outside all pages
351-
},
346+
Values: NewInt64ValueSet([]Value{
347+
Int64Value(50),
348+
Int64Value(300),
349+
Int64Value(150),
350+
Int64Value(600),
351+
}), // 2 values in range. ~200 matching rows
352352
},
353353
want: rowRanges{
354354
{Start: 0, End: 249}, // Page 1: contains 50
@@ -359,10 +359,10 @@ func Test_BuildPredicateRanges(t *testing.T) {
359359
name: "InPredicate with values all outside page ranges",
360360
predicate: InPredicate{
361361
Column: cols[1], // timestamp column
362-
Values: []Value{
362+
Values: NewInt64ValueSet([]Value{
363363
Int64Value(150), // Outside all pages
364364
Int64Value(600), // Outside all pages
365-
},
365+
}),
366366
},
367367
want: nil, // No pages should be included
368368
},

pkg/dataobj/sections/logs/reader.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,9 +323,21 @@ func mapPredicate(p Predicate, columnLookup map[*Column]dataset.Column) dataset.
323323
vals[i] = arrowconv.FromScalar(p.Values[i], mustConvertType(p.Values[i].DataType()))
324324
}
325325

326+
var valueSet dataset.ValueSet
327+
switch col.ColumnInfo().Type {
328+
case datasetmd.VALUE_TYPE_INT64:
329+
valueSet = dataset.NewInt64ValueSet(vals)
330+
case datasetmd.VALUE_TYPE_UINT64:
331+
valueSet = dataset.NewUint64ValueSet(vals)
332+
case datasetmd.VALUE_TYPE_BYTE_ARRAY:
333+
valueSet = dataset.NewByteArrayValueSet(vals)
334+
default:
335+
panic("InPredicate not implemented for datatype")
336+
}
337+
326338
return dataset.InPredicate{
327339
Column: col,
328-
Values: vals,
340+
Values: valueSet,
329341
}
330342

331343
case GreaterThanPredicate:

pkg/dataobj/sections/logs/reader_test.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,20 @@ func TestReader(t *testing.T) {
3333
})
3434

3535
var (
36-
traceID = sec.Columns()[2]
37-
message = sec.Columns()[3]
36+
streamID = sec.Columns()[0]
37+
traceID = sec.Columns()[2]
38+
message = sec.Columns()[3]
3839
)
3940

41+
require.Equal(t, "", streamID.Name)
42+
require.Equal(t, logs.ColumnTypeStreamID, streamID.Type)
4043
require.Equal(t, "trace_id", traceID.Name)
4144
require.Equal(t, logs.ColumnTypeMetadata, traceID.Type)
4245
require.Equal(t, "", message.Name)
4346
require.Equal(t, logs.ColumnTypeMessage, message.Type)
4447

4548
r := logs.NewReader(logs.ReaderOptions{
46-
Columns: []*logs.Column{traceID, message},
49+
Columns: []*logs.Column{streamID, traceID, message},
4750
Allocator: alloc,
4851
Predicates: []logs.Predicate{
4952
logs.FuncPredicate{
@@ -57,12 +60,19 @@ func TestReader(t *testing.T) {
5760
return bytes.Equal(bb, []byte("abcdef")) || bytes.Equal(bb, []byte("123456"))
5861
},
5962
},
63+
logs.InPredicate{
64+
Column: streamID,
65+
Values: []scalar.Scalar{
66+
scalar.NewInt64Scalar(1),
67+
scalar.NewInt64Scalar(2),
68+
},
69+
},
6070
},
6171
})
6272

6373
expect := arrowtest.Rows{
64-
{"trace_id.metadata.binary": []byte("abcdef"), "message.binary": []byte("goodbye, world!")},
65-
{"trace_id.metadata.binary": []byte("123456"), "message.binary": []byte("foo bar")},
74+
{"stream_id.int64": int64(1), "trace_id.metadata.binary": []byte("abcdef"), "message.binary": []byte("goodbye, world!")},
75+
{"stream_id.int64": int64(2), "trace_id.metadata.binary": []byte("123456"), "message.binary": []byte("foo bar")},
6676
}
6777

6878
actualTable, err := readTable(context.Background(), r)

pkg/dataobj/sections/logs/row_predicate_order.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,13 @@ func getPredicateSelectivity(p dataset.Predicate) selectivityScore {
8080
return getBaseSelectivity(p)
8181
}
8282

83-
valuesInRange := len(p.Values)
83+
valuesInRange := p.Values.Size()
8484
if info.Statistics.MinValue != nil && info.Statistics.MaxValue != nil {
8585
var minValue, maxValue dataset.Value
8686
if e1, e2 := minValue.UnmarshalBinary(info.Statistics.MinValue), maxValue.UnmarshalBinary(info.Statistics.MaxValue); e1 == nil && e2 == nil {
8787
valuesInRange = 0
8888

89-
for _, v := range p.Values {
89+
for v := range p.Values.Iter() {
9090
if dataset.CompareValues(v, minValue) >= 0 && dataset.CompareValues(v, maxValue) <= 0 {
9191
valuesInRange++
9292
}

pkg/dataobj/sections/logs/row_predicate_order_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,12 @@ func TestGetPredicateSelectivity(t *testing.T) {
155155
min: 25,
156156
max: 75,
157157
}).ToMemColumn(t),
158-
Values: []dataset.Value{dataset.Int64Value(20), dataset.Int64Value(50), dataset.Int64Value(60), dataset.Int64Value(80)}, // 2 values in range. ~200 matching rows
158+
Values: dataset.NewInt64ValueSet([]dataset.Value{
159+
dataset.Int64Value(20),
160+
dataset.Int64Value(50),
161+
dataset.Int64Value(60),
162+
dataset.Int64Value(80),
163+
}), // 2 values in range. ~200 matching rows
159164
},
160165
want: selectivityScore(0.2), // 0.1 + 0.1
161166
},

pkg/dataobj/sections/logs/row_reader.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -218,17 +218,13 @@ func streamIDPredicate(ids iter.Seq[int64], columns []dataset.Column, columnDesc
218218
}
219219

220220
var values []dataset.Value
221-
for id := range ids {
222-
values = append(values, dataset.Int64Value(id))
223-
}
224-
225-
if len(values) == 0 {
226-
return nil
221+
for i := range ids {
222+
values = append(values, dataset.Int64Value(i))
227223
}
228224

229225
return dataset.InPredicate{
230226
Column: streamIDColumn,
231-
Values: values,
227+
Values: dataset.NewInt64ValueSet(values),
232228
}
233229
}
234230

pkg/dataobj/sections/pointers/row_reader.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -205,17 +205,13 @@ func streamIDPredicate(ids iter.Seq[int64], columns []dataset.Column, columnDesc
205205
}
206206

207207
var values []dataset.Value
208-
for id := range ids {
209-
values = append(values, dataset.Int64Value(id))
210-
}
211-
212-
if len(values) == 0 {
213-
return nil
208+
for i := range ids {
209+
values = append(values, dataset.Int64Value(i))
214210
}
215211

216212
return dataset.InPredicate{
217213
Column: streamIDColumn,
218-
Values: values,
214+
Values: dataset.NewInt64ValueSet(values),
219215
}
220216
}
221217

pkg/dataobj/sections/streams/reader.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,9 +327,21 @@ func mapPredicate(p Predicate, columnLookup map[*Column]dataset.Column) dataset.
327327
vals[i] = arrowconv.FromScalar(p.Values[i], mustConvertType(p.Values[i].DataType()))
328328
}
329329

330+
var valueSet dataset.ValueSet
331+
switch col.ColumnInfo().Type {
332+
case datasetmd.VALUE_TYPE_INT64:
333+
valueSet = dataset.NewInt64ValueSet(vals)
334+
case datasetmd.VALUE_TYPE_UINT64:
335+
valueSet = dataset.NewUint64ValueSet(vals)
336+
case datasetmd.VALUE_TYPE_BYTE_ARRAY:
337+
valueSet = dataset.NewByteArrayValueSet(vals)
338+
default:
339+
panic("InPredicate not implemented for datatype")
340+
}
341+
330342
return dataset.InPredicate{
331343
Column: col,
332-
Values: vals,
344+
Values: valueSet,
333345
}
334346

335347
case GreaterThanPredicate:

0 commit comments

Comments
 (0)