Skip to content

Commit 507e7ef

Browse files
authored
feat(array): Run-end encoded string arrays (#5550)
* feat(array): Run-end encoded string arrays It is very common for flux to be using arrays of strings containing a single repeated value. This is often the case when processing InfluxDB tag types. Ad support for run-end encoded string arrays to optimize for this case. Such arrays are quick to access as they consist of a single run, but use less memory than a full string array, or even a dictionary which still needs a full length array to hold the dictionary indexes. * chore: update comments
1 parent 9edfdfa commit 507e7ef

File tree

5 files changed

+587
-139
lines changed

5 files changed

+587
-139
lines changed

array/array.go

Lines changed: 111 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package array
33
import (
44
"encoding/json"
55
"fmt"
6+
"sort"
67
"strconv"
78
"strings"
89
"sync/atomic"
@@ -36,6 +37,7 @@ var (
3637
ValueType: arrow.BinaryTypes.String,
3738
Ordered: false,
3839
}
40+
StringREEType = arrow.RunEndEncodedOf(arrow.PrimitiveTypes.Int32, arrow.BinaryTypes.String)
3941
)
4042

4143
// Array represents an immutable sequence of values.
@@ -113,21 +115,24 @@ type Builder interface {
113115
}
114116

115117
// String holds an array of flux string values. The arrow data must be
116-
// either a `utf8` or `dictionary<value=utf8, indices=int32, ordered=false>`.
118+
// either a `utf8`, a `dictionary<value=utf8, indices=int32, ordered=false>`,
119+
// or a `run_end_encoded<run_ends:int32, values:utf8>`.
117120
// Internally the string data is stored in an array.Binary value.
118121
type String struct {
119122
refCount int64
120123
data *array.Data
121124
nullBitmapBytes []byte
122125

123126
indices *array.Int32
127+
runEnds *array.Int32
124128
values *array.Binary
125129
}
126130

127131
// Create a new String array from an arrow.ArrayData that contains
128-
// either a `utf8` or a `dictionary<values=utf8, indices=int32, ordered=false>`
129-
// set of data buffers. NewStringData will panic if the array data is of
130-
// an unsupported type.
132+
// either a `utf8`, a `dictionary<values=utf8, indices=int32, ordered=false>`,
133+
// or a `run_end_encoded<run_ends:int32, values:utf8>` set of data
134+
// buffers. NewStringData will panic if the array data is of an
135+
// unsupported type.
131136
func NewStringData(data arrow.ArrayData) *String {
132137
a := String{
133138
refCount: 1,
@@ -136,19 +141,32 @@ func NewStringData(data arrow.ArrayData) *String {
136141
return &a
137142
}
138143

139-
// validateStringDataType checks that the datatype is supported for
140-
// using to create a String array.
141-
func validateStringDataType(dt arrow.DataType) {
144+
// isStringDataType checks if the given arrow.DataType is a string type
145+
// supported by flux.
146+
func isStringDataType(dt arrow.DataType) bool {
142147
switch dt := dt.(type) {
143148
case *arrow.DictionaryType:
144149
if dt.IndexType.ID() == arrow.INT32 && dt.ValueType.ID() == arrow.STRING {
145-
return
150+
return true
151+
}
152+
case *arrow.RunEndEncodedType:
153+
if dt.RunEnds().ID() == arrow.INT32 && dt.Encoded().ID() == arrow.STRING {
154+
return true
146155
}
147156
default:
148157
if dt.ID() == arrow.STRING {
149-
return
158+
return true
150159
}
151160
}
161+
return false
162+
}
163+
164+
// validateStringDataType checks that the datatype is supported for
165+
// using to create a String array.
166+
func validateStringDataType(dt arrow.DataType) {
167+
if isStringDataType(dt) {
168+
return
169+
}
152170
panic(errors.Newf(codes.Internal, "incorrect data type for String (%s)", dt))
153171
}
154172

@@ -167,45 +185,101 @@ func (a *String) setData(data *array.Data) {
167185
}
168186

169187
var indices *array.Int32
188+
var runEnds *array.Int32
170189
var values *array.Binary
171190

172191
if data.DataType().ID() == arrow.DICTIONARY {
173192
idxData := array.NewData(arrow.PrimitiveTypes.Int32, data.Len(), data.Buffers(), nil, data.NullN(), data.Offset())
174193
indices = array.NewInt32Data(idxData)
175194
idxData.Release()
176195
values = array.NewBinaryData(data.Dictionary())
196+
} else if data.DataType().ID() == arrow.RUN_END_ENCODED {
197+
runEnds = array.NewInt32Data(data.Children()[0])
198+
values = array.NewBinaryData(data.Children()[1])
177199
} else {
178200
values = array.NewBinaryData(data)
179201
}
180202
if a.indices != nil {
181203
a.indices.Release()
182204
}
205+
if a.runEnds != nil {
206+
a.runEnds.Release()
207+
}
183208
if a.values != nil {
184209
a.values.Release()
185210
}
186211
a.indices = indices
212+
a.runEnds = runEnds
187213
a.values = values
188214
a.data = data
189215
}
190216

217+
func (a *String) valuesIndex(i int) (int, bool) {
218+
if a.indices != nil {
219+
if a.indices.IsNull(i) {
220+
return 0, false
221+
}
222+
return int(a.indices.Value(i)), true
223+
} else if a.runEnds != nil {
224+
return sort.Search(a.runEnds.Len(), func(j int) bool {
225+
return a.runEnds.Value(j) > int32(i+a.data.Offset())
226+
}), true
227+
}
228+
return i, true
229+
}
230+
191231
func (a *String) DataType() arrow.DataType {
192232
return a.data.DataType()
193233
}
194234

195235
func (a *String) NullN() int {
236+
if a.runEnds != nil {
237+
nbm := a.NullBitmapBytes()
238+
if nbm == nil {
239+
return 0
240+
}
241+
sz := a.data.Len()
242+
return sz - bitutil.CountSetBits(nbm, 0, sz)
243+
}
196244
return a.data.NullN()
197245
}
198246

199247
func (a *String) NullBitmapBytes() []byte {
248+
if a.runEnds == nil {
249+
return a.nullBitmapBytes
250+
}
251+
if a.values.NullN() == 0 {
252+
return nil
253+
}
254+
if a.nullBitmapBytes == nil {
255+
a.nullBitmapBytes = make([]byte, bitutil.BytesForBits(int64(a.data.Len())))
256+
last := int64(a.data.Offset())
257+
end := last + int64(a.data.Len())
258+
for i, _ := a.valuesIndex(0); i < a.runEnds.Len() && last < end; i++ {
259+
runEnd := int64(a.runEnds.Value(i))
260+
if runEnd > end {
261+
runEnd = end
262+
}
263+
count := runEnd - last
264+
bitutil.SetBitsTo(a.nullBitmapBytes, last, count, a.values.IsValid(i))
265+
last += count
266+
}
267+
}
200268
return a.nullBitmapBytes
201269
}
202270

203271
func (a *String) IsNull(i int) bool {
204-
return len(a.nullBitmapBytes) != 0 && bitutil.BitIsNotSet(a.nullBitmapBytes, a.data.Offset()+i)
272+
if i, ok := a.valuesIndex(i); ok {
273+
return a.values.IsNull(i)
274+
}
275+
return true
205276
}
206277

207278
func (a *String) IsValid(i int) bool {
208-
return len(a.nullBitmapBytes) == 0 || bitutil.BitIsSet(a.nullBitmapBytes, a.data.Offset()+i)
279+
if i, ok := a.valuesIndex(i); ok {
280+
return a.values.IsValid(i)
281+
}
282+
return false
209283
}
210284

211285
func (a *String) ValueStr(i int) string {
@@ -264,6 +338,10 @@ func (a *String) Release() {
264338
a.indices.Release()
265339
a.indices = nil
266340
}
341+
if a.runEnds != nil {
342+
a.runEnds.Release()
343+
a.runEnds = nil
344+
}
267345
if a.values != nil {
268346
a.values.Release()
269347
a.values = nil
@@ -277,28 +355,14 @@ func (a *String) Release() {
277355
func (a *String) String() string {
278356
var sb strings.Builder
279357
sb.WriteByte('[')
280-
if a.indices != nil {
281-
for i := 0; i < a.Len(); i++ {
282-
if i > 0 {
283-
sb.WriteByte(' ')
284-
}
285-
if a.indices.IsValid(i) {
286-
idx := int(a.indices.Value(i))
287-
fmt.Fprintf(&sb, "%q", a.values.ValueString(idx))
288-
} else {
289-
sb.WriteString(array.NullValueStr)
290-
}
358+
for i := 0; i < a.Len(); i++ {
359+
if i > 0 {
360+
sb.WriteByte(' ')
291361
}
292-
} else {
293-
for i := 0; i < a.Len(); i++ {
294-
if i > 0 {
295-
sb.WriteByte(' ')
296-
}
297-
if a.values.IsValid(i) {
298-
fmt.Fprintf(&sb, "%q", a.values.ValueString(i))
299-
} else {
300-
sb.WriteString(array.NullValueStr)
301-
}
362+
if a.IsValid(i) {
363+
fmt.Fprintf(&sb, "%q", a.Value(i))
364+
} else {
365+
sb.WriteString(array.NullValueStr)
302366
}
303367
}
304368
sb.WriteByte(']')
@@ -309,20 +373,20 @@ func (a *String) String() string {
309373
// is only valid for the lifetime of the array. Care should be taken not
310374
// to store this string without also retaining the array.
311375
func (a *String) Value(i int) string {
312-
if a.indices != nil {
313-
if a.indices.IsNull(i) {
314-
// Flux relies on a NULL entry in the String array returning
315-
// the empty string.
316-
return ""
317-
}
318-
i = int(a.indices.Value(i))
376+
i, ok := a.valuesIndex(i)
377+
if !ok {
378+
// Flux relies on a NULL entry in the String array returning
379+
// the empty string.
380+
return ""
319381
}
320382
return a.values.ValueString(i)
321383
}
322384

323385
func (a *String) ValueLen(i int) int {
324-
if a.indices != nil {
325-
i = int(a.indices.Value(i))
386+
i, ok := a.valuesIndex(i)
387+
if !ok {
388+
// Null values are zero length.
389+
return 0
326390
}
327391
return a.values.ValueLen(i)
328392
}
@@ -374,11 +438,14 @@ func MakeFromData(data arrow.ArrayData) Array {
374438
return array.NewInt64Data(data)
375439
case arrow.UINT64:
376440
return array.NewUint64Data(data)
377-
case arrow.STRING, arrow.DICTIONARY:
441+
case arrow.STRING:
378442
return NewStringData(data)
379-
default:
380-
panic(errors.Newf(codes.Internal, "invalid data type for flux array (%s)", data.DataType()))
443+
case arrow.DICTIONARY, arrow.RUN_END_ENCODED:
444+
if isStringDataType(data.DataType()) {
445+
return NewStringData(data)
446+
}
381447
}
448+
panic(errors.Newf(codes.Internal, "invalid data type for flux array (%s)", data.DataType()))
382449
}
383450

384451
func ToFloatConv(mem memory.Allocator, arr Array) (*Float, error) {

0 commit comments

Comments
 (0)