Skip to content

Commit 979f761

Browse files
authored
perf: Optimization pass for reading from dataobjs (#16747)
1 parent e0e7133 commit 979f761

18 files changed

+646
-185
lines changed

pkg/dataobj/builder.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,8 @@ func (b *Builder) Append(stream logproto.Stream) error {
195195
b.logs.Append(logs.Record{
196196
StreamID: streamID,
197197
Timestamp: entry.Timestamp,
198-
Metadata: entry.StructuredMetadata,
199-
Line: entry.Line,
198+
Metadata: convertMetadata(entry.StructuredMetadata),
199+
Line: []byte(entry.Line),
200200
})
201201
}
202202

pkg/dataobj/internal/dataset/page_builder.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,9 @@ func valueSize(v Value) int {
175175
// Assuming that strings are PLAIN encoded using their length and bytes.
176176
str := v.String()
177177
return binary.Size(len(str)) + len(str)
178+
case datasetmd.VALUE_TYPE_BYTE_ARRAY:
179+
arr := v.ByteArray()
180+
return binary.Size(len(arr)) + len(arr)
178181
}
179182

180183
return 0

pkg/dataobj/internal/dataset/value.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package dataset
22

33
import (
4+
"bytes"
45
"cmp"
56
"encoding/binary"
67
"fmt"
@@ -12,6 +13,7 @@ import (
1213
// Helper types
1314
type (
1415
stringptr *byte
16+
bytearray *byte
1517
)
1618

1719
// A Value represents a single value within a dataset. Unlike [any], Values can
@@ -69,6 +71,14 @@ func StringValue(v string) Value {
6971
}
7072
}
7173

74+
// ByteArrayValue returns a [Value] for a byte slice representing a string.
75+
func ByteArrayValue(v []byte) Value {
76+
return Value{
77+
num: uint64(len(v)),
78+
any: (bytearray)(unsafe.SliceData(v)),
79+
}
80+
}
81+
7282
// IsNil returns whether v is nil.
7383
func (v Value) IsNil() bool {
7484
return v.any == nil
@@ -93,6 +103,8 @@ func (v Value) Type() datasetmd.ValueType {
93103
return v
94104
case stringptr:
95105
return datasetmd.VALUE_TYPE_STRING
106+
case bytearray:
107+
return datasetmd.VALUE_TYPE_BYTE_ARRAY
96108
default:
97109
panic(fmt.Sprintf("dataset.Value has unexpected type %T", v))
98110
}
@@ -126,6 +138,16 @@ func (v Value) String() string {
126138
return v.Type().String()
127139
}
128140

141+
// ByteSlice returns v's value as a byte slice. If v is not a string,
142+
// ByteSlice returns a byte slice of the form "VALUE_TYPE_T", where T is the
143+
// underlying type of v.
144+
func (v Value) ByteArray() []byte {
145+
if ba, ok := v.any.(bytearray); ok {
146+
return unsafe.Slice(ba, v.num)
147+
}
148+
panic(fmt.Sprintf("dataset.Value type is %s, not %s", v.Type(), datasetmd.VALUE_TYPE_BYTE_ARRAY))
149+
}
150+
129151
// MarshalBinary encodes v into a binary representation. Non-NULL values encode
130152
// first with the type (encoded as uvarint), followed by an encoded value,
131153
// where:
@@ -150,6 +172,8 @@ func (v Value) MarshalBinary() (data []byte, err error) {
150172
case datasetmd.VALUE_TYPE_STRING:
151173
str := v.String()
152174
buf = append(buf, unsafe.Slice(unsafe.StringData(str), len(str))...)
175+
case datasetmd.VALUE_TYPE_BYTE_ARRAY:
176+
buf = append(buf, v.ByteArray()...)
153177
default:
154178
return nil, fmt.Errorf("dataset.Value.MarshalBinary: unsupported type %s", v.Type())
155179
}
@@ -186,6 +210,8 @@ func (v *Value) UnmarshalBinary(data []byte) error {
186210
case datasetmd.VALUE_TYPE_STRING:
187211
str := string(data[n:])
188212
*v = StringValue(str)
213+
case datasetmd.VALUE_TYPE_BYTE_ARRAY:
214+
*v = ByteArrayValue(data[n:])
189215
default:
190216
return fmt.Errorf("dataset.Value.UnmarshalBinary: unsupported type %s", vtyp)
191217
}
@@ -221,6 +247,8 @@ func CompareValues(a, b Value) int {
221247
return cmp.Compare(a.Uint64(), b.Uint64())
222248
case datasetmd.VALUE_TYPE_STRING:
223249
return cmp.Compare(a.String(), b.String())
250+
case datasetmd.VALUE_TYPE_BYTE_ARRAY:
251+
return bytes.Compare(a.ByteArray(), b.ByteArray())
224252
default:
225253
panic(fmt.Sprintf("page.CompareValues: unsupported type %s", a.Type()))
226254
}

pkg/dataobj/internal/dataset/value_encoding_plain.go

Lines changed: 147 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,35 +16,41 @@ func init() {
1616
registerValueEncoding(
1717
datasetmd.VALUE_TYPE_STRING,
1818
datasetmd.ENCODING_TYPE_PLAIN,
19-
func(w streamio.Writer) valueEncoder { return newPlainEncoder(w) },
20-
func(r streamio.Reader) valueDecoder { return newPlainDecoder(r) },
19+
func(w streamio.Writer) valueEncoder { return newPlainStringEncoder(w) },
20+
func(r streamio.Reader) valueDecoder { return newPlainStringDecoder(r) },
21+
)
22+
registerValueEncoding(
23+
datasetmd.VALUE_TYPE_BYTE_ARRAY,
24+
datasetmd.ENCODING_TYPE_PLAIN,
25+
func(w streamio.Writer) valueEncoder { return newPlainBytesEncoder(w) },
26+
func(r streamio.Reader) valueDecoder { return newPlainBytesDecoder(r) },
2127
)
2228
}
2329

24-
// A plainEncoder encodes string values to an [streamio.Writer].
25-
type plainEncoder struct {
30+
// A plainStringEncoder encodes string values to an [streamio.Writer].
31+
type plainStringEncoder struct {
2632
w streamio.Writer
2733
}
2834

29-
var _ valueEncoder = (*plainEncoder)(nil)
35+
var _ valueEncoder = (*plainStringEncoder)(nil)
3036

3137
// newPlainEncoder creates a plainEncoder that writes encoded strings to w.
32-
func newPlainEncoder(w streamio.Writer) *plainEncoder {
33-
return &plainEncoder{w: w}
38+
func newPlainStringEncoder(w streamio.Writer) *plainStringEncoder {
39+
return &plainStringEncoder{w: w}
3440
}
3541

3642
// ValueType returns [datasetmd.VALUE_TYPE_STRING].
37-
func (enc *plainEncoder) ValueType() datasetmd.ValueType {
43+
func (enc *plainStringEncoder) ValueType() datasetmd.ValueType {
3844
return datasetmd.VALUE_TYPE_STRING
3945
}
4046

4147
// EncodingType returns [datasetmd.ENCODING_TYPE_PLAIN].
42-
func (enc *plainEncoder) EncodingType() datasetmd.EncodingType {
48+
func (enc *plainStringEncoder) EncodingType() datasetmd.EncodingType {
4349
return datasetmd.ENCODING_TYPE_PLAIN
4450
}
4551

4652
// Encode encodes an individual string value.
47-
func (enc *plainEncoder) Encode(v Value) error {
53+
func (enc *plainStringEncoder) Encode(v Value) error {
4854
if v.Type() != datasetmd.VALUE_TYPE_STRING {
4955
return fmt.Errorf("plain: invalid value type %v", v.Type())
5056
}
@@ -65,41 +71,41 @@ func (enc *plainEncoder) Encode(v Value) error {
6571
}
6672

6773
// Flush implements [valueEncoder]. It is a no-op for plainEncoder.
68-
func (enc *plainEncoder) Flush() error {
74+
func (enc *plainStringEncoder) Flush() error {
6975
return nil
7076
}
7177

7278
// Reset implements [valueEncoder]. It resets the encoder to write to w.
73-
func (enc *plainEncoder) Reset(w streamio.Writer) {
79+
func (enc *plainStringEncoder) Reset(w streamio.Writer) {
7480
enc.w = w
7581
}
7682

77-
// plainDecoder decodes strings from an [streamio.Reader].
78-
type plainDecoder struct {
83+
// plainStringDecoder decodes strings from an [streamio.Reader].
84+
type plainStringDecoder struct {
7985
r streamio.Reader
8086
}
8187

82-
var _ valueDecoder = (*plainDecoder)(nil)
88+
var _ valueDecoder = (*plainStringDecoder)(nil)
8389

84-
// newPlainDecoder creates a plainDecoder that reads encoded strings from r.
85-
func newPlainDecoder(r streamio.Reader) *plainDecoder {
86-
return &plainDecoder{r: r}
90+
// newPlainStringDecoder creates a plainDecoder that reads encoded strings from r.
91+
func newPlainStringDecoder(r streamio.Reader) *plainStringDecoder {
92+
return &plainStringDecoder{r: r}
8793
}
8894

89-
// ValueType returns [datasetmd.VALUE_TYPE_STRING].
90-
func (dec *plainDecoder) ValueType() datasetmd.ValueType {
95+
// ValueType returns [datasetmd.VALUE_TYPE_BYTE_ARRAY].
96+
func (dec *plainStringDecoder) ValueType() datasetmd.ValueType {
9197
return datasetmd.VALUE_TYPE_STRING
9298
}
9399

94100
// EncodingType returns [datasetmd.ENCODING_TYPE_PLAIN].
95-
func (dec *plainDecoder) EncodingType() datasetmd.EncodingType {
101+
func (dec *plainStringDecoder) EncodingType() datasetmd.EncodingType {
96102
return datasetmd.ENCODING_TYPE_PLAIN
97103
}
98104

99105
// Decode decodes up to len(s) values, storing the results into s. The
100106
// number of decoded values is returned, followed by an error (if any).
101107
// At the end of the stream, Decode returns 0, [io.EOF].
102-
func (dec *plainDecoder) Decode(s []Value) (int, error) {
108+
func (dec *plainStringDecoder) Decode(s []Value) (int, error) {
103109
if len(s) == 0 {
104110
return 0, nil
105111
}
@@ -123,7 +129,7 @@ func (dec *plainDecoder) Decode(s []Value) (int, error) {
123129
}
124130

125131
// decode decodes a string.
126-
func (dec *plainDecoder) decode() (Value, error) {
132+
func (dec *plainStringDecoder) decode() (Value, error) {
127133
sz, err := binary.ReadUvarint(dec.r)
128134
if err != nil {
129135
return StringValue(""), err
@@ -137,6 +143,123 @@ func (dec *plainDecoder) decode() (Value, error) {
137143
}
138144

139145
// Reset implements [valueDecoder]. It resets the decoder to read from r.
140-
func (dec *plainDecoder) Reset(r streamio.Reader) {
146+
func (dec *plainStringDecoder) Reset(r streamio.Reader) {
147+
dec.r = r
148+
}
149+
150+
// A plainBytesEncoder encodes byte array values to an [streamio.Writer].
151+
type plainBytesEncoder struct {
152+
w streamio.Writer
153+
}
154+
155+
var _ valueEncoder = (*plainStringEncoder)(nil)
156+
157+
// newPlainEncoder creates a plainEncoder that writes encoded strings to w.
158+
func newPlainBytesEncoder(w streamio.Writer) *plainBytesEncoder {
159+
return &plainBytesEncoder{w: w}
160+
}
161+
162+
// ValueType returns [datasetmd.VALUE_TYPE_BYTE_ARRAY].
163+
func (enc *plainBytesEncoder) ValueType() datasetmd.ValueType {
164+
return datasetmd.VALUE_TYPE_BYTE_ARRAY
165+
}
166+
167+
// EncodingType returns [datasetmd.ENCODING_TYPE_PLAIN].
168+
func (enc *plainBytesEncoder) EncodingType() datasetmd.EncodingType {
169+
return datasetmd.ENCODING_TYPE_PLAIN
170+
}
171+
172+
// Encode encodes an individual string value.
173+
func (enc *plainBytesEncoder) Encode(v Value) error {
174+
if v.Type() != datasetmd.VALUE_TYPE_BYTE_ARRAY {
175+
return fmt.Errorf("plain: invalid value type %v", v.Type())
176+
}
177+
sv := v.ByteArray()
178+
179+
if err := streamio.WriteUvarint(enc.w, uint64(len(sv))); err != nil {
180+
return err
181+
}
182+
183+
n, err := enc.w.Write(sv)
184+
if n != len(sv) {
185+
return fmt.Errorf("short write; expected %d bytes, wrote %d", len(sv), n)
186+
}
187+
return err
188+
}
189+
190+
// Flush implements [valueEncoder]. It is a no-op for plainEncoder.
191+
func (enc *plainBytesEncoder) Flush() error {
192+
return nil
193+
}
194+
195+
// Reset implements [valueEncoder]. It resets the encoder to write to w.
196+
func (enc *plainBytesEncoder) Reset(w streamio.Writer) {
197+
enc.w = w
198+
}
199+
200+
// plainBytesDecoder decodes byte arrays from an [streamio.Reader].
201+
type plainBytesDecoder struct {
202+
r streamio.Reader
203+
}
204+
205+
var _ valueDecoder = (*plainBytesDecoder)(nil)
206+
207+
// newPlainBytesDecoder creates a plainDecoder that reads encoded strings from r.
208+
func newPlainBytesDecoder(r streamio.Reader) *plainBytesDecoder {
209+
return &plainBytesDecoder{r: r}
210+
}
211+
212+
// ValueType returns [datasetmd.VALUE_TYPE_BYTE_ARRAY].
213+
func (dec *plainBytesDecoder) ValueType() datasetmd.ValueType {
214+
return datasetmd.VALUE_TYPE_BYTE_ARRAY
215+
}
216+
217+
// EncodingType returns [datasetmd.ENCODING_TYPE_PLAIN].
218+
func (dec *plainBytesDecoder) EncodingType() datasetmd.EncodingType {
219+
return datasetmd.ENCODING_TYPE_PLAIN
220+
}
221+
222+
// Decode decodes up to len(s) values, storing the results into s. The
223+
// number of decoded values is returned, followed by an error (if any).
224+
// At the end of the stream, Decode returns 0, [io.EOF].
225+
func (dec *plainBytesDecoder) Decode(s []Value) (int, error) {
226+
if len(s) == 0 {
227+
return 0, nil
228+
}
229+
230+
var err error
231+
var v Value
232+
233+
for i := range s {
234+
v, err = dec.decode()
235+
if errors.Is(err, io.EOF) {
236+
if i == 0 {
237+
return 0, io.EOF
238+
}
239+
return i, nil
240+
} else if err != nil {
241+
return i, err
242+
}
243+
s[i] = v
244+
}
245+
return len(s), nil
246+
}
247+
248+
// decode decodes a string.
249+
func (dec *plainBytesDecoder) decode() (Value, error) {
250+
sz, err := binary.ReadUvarint(dec.r)
251+
if err != nil {
252+
return ByteArrayValue([]byte{}), err
253+
}
254+
255+
dst := make([]byte, int(sz))
256+
if _, err := io.ReadFull(dec.r, dst); err != nil {
257+
return ByteArrayValue([]byte{}), err
258+
}
259+
return ByteArrayValue(dst), nil
260+
}
261+
262+
// Reset implements [valueDecoder]. It resets the decoder to read from r.
263+
func (dec *plainBytesDecoder) Reset(r streamio.Reader) {
141264
dec.r = r
142265
}

0 commit comments

Comments
 (0)