Skip to content

Commit 87922da

Browse files
authored
fix: improve structured metadata label normalization performance (#17332)
1 parent 2c27a58 commit 87922da

File tree

4 files changed

+223
-36
lines changed

4 files changed

+223
-36
lines changed

pkg/chunkenc/symbols.go

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"sync"
1111

1212
"github.com/pkg/errors"
13+
"github.com/prometheus/otlptranslator"
1314
"github.com/prometheus/prometheus/model/labels"
1415

1516
"github.com/grafana/loki/v3/pkg/compression"
@@ -37,11 +38,14 @@ type symbolizer struct {
3738
labels []string
3839
size int
3940
compressedSize int
41+
// Runtime-only map to track which symbols are label names and have been normalized
42+
normalizedNames map[uint32]string
4043
}
4144

4245
func newSymbolizer() *symbolizer {
4346
return &symbolizer{
44-
symbolsMap: map[string]uint32{},
47+
symbolsMap: map[string]uint32{},
48+
normalizedNames: map[uint32]string{},
4549
}
4650
}
4751

@@ -54,6 +58,7 @@ func (s *symbolizer) Reset() {
5458
s.labels = s.labels[:0]
5559
s.size = 0
5660
s.compressedSize = 0
61+
s.normalizedNames = map[uint32]string{}
5762
}
5863

5964
// Add adds new labels pairs to the collection and returns back a symbol for each existing and new label pair
@@ -107,7 +112,25 @@ func (s *symbolizer) Lookup(syms symbols, buf labels.Labels) labels.Labels {
107112
buf = buf[:0]
108113

109114
for _, symbol := range syms {
110-
buf = append(buf, labels.Label{Name: s.lookup(symbol.Name), Value: s.lookup(symbol.Value)})
115+
// First check if we have a normalized name for this symbol
116+
s.mtx.RLock()
117+
normalized, exists := s.normalizedNames[symbol.Name]
118+
s.mtx.RUnlock()
119+
120+
var name string
121+
if exists {
122+
name = normalized
123+
} else {
124+
// If we haven't seen this name before, look it up and normalize it
125+
name = s.lookup(symbol.Name)
126+
normalized := otlptranslator.NormalizeLabel(name)
127+
s.mtx.Lock()
128+
s.normalizedNames[symbol.Name] = normalized
129+
s.mtx.Unlock()
130+
name = normalized
131+
}
132+
133+
buf = append(buf, labels.Label{Name: name, Value: s.lookup(symbol.Value)})
111134
}
112135

113136
return buf
@@ -312,6 +335,9 @@ func symbolizerFromCheckpoint(b []byte) *symbolizer {
312335
s := symbolizer{
313336
symbolsMap: make(map[string]uint32, numLabels),
314337
labels: make([]string, 0, numLabels),
338+
// Labels are key-value pairs, preallocate to half the number to store just the keys,
339+
// likely less memory than the exponential growth Go will do.
340+
normalizedNames: make(map[uint32]string, numLabels/2),
315341
}
316342

317343
for i := 0; i < numLabels; i++ {
@@ -338,8 +364,11 @@ func symbolizerFromEnc(b []byte, pool compression.ReaderPool) (*symbolizer, erro
338364
defer pool.PutReader(reader)
339365

340366
s := symbolizer{
341-
labels: make([]string, 0, numLabels),
342-
compressedSize: len(b),
367+
labels: make([]string, 0, numLabels),
368+
// Same as symbolizerFromCheckpoint
369+
normalizedNames: make(map[uint32]string, numLabels/2),
370+
symbolsMap: make(map[string]uint32, numLabels),
371+
compressedSize: len(b),
343372
}
344373

345374
var (
@@ -400,7 +429,9 @@ func symbolizerFromEnc(b []byte, pool compression.ReaderPool) (*symbolizer, erro
400429
return nil, err
401430
}
402431
}
403-
s.labels = append(s.labels, string(buf))
432+
label := string(buf)
433+
s.symbolsMap[label] = uint32(len(s.labels))
434+
s.labels = append(s.labels, label)
404435
s.size += len(buf)
405436
}
406437

pkg/chunkenc/symbols_test.go

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/binary"
66
"fmt"
77
"testing"
8+
"unsafe"
89

910
"github.com/prometheus/prometheus/model/labels"
1011
"github.com/stretchr/testify/require"
@@ -175,3 +176,189 @@ func TestSymbolizer(t *testing.T) {
175176
}
176177
}
177178
}
179+
180+
func TestSymbolizerLabelNormalization(t *testing.T) {
181+
for _, tc := range []struct {
182+
name string
183+
labelsToAdd []labels.Labels
184+
expectedLabels []labels.Labels
185+
description string
186+
}{
187+
{
188+
name: "basic label normalization",
189+
labelsToAdd: []labels.Labels{
190+
{
191+
{Name: "foo-bar", Value: "value1"},
192+
{Name: "fizz_buzz", Value: "value2"},
193+
},
194+
},
195+
expectedLabels: []labels.Labels{
196+
{
197+
{Name: "foo_bar", Value: "value1"},
198+
{Name: "fizz_buzz", Value: "value2"},
199+
},
200+
},
201+
description: "hyphens should be converted to underscores in label names",
202+
},
203+
{
204+
name: "same string as name and value",
205+
labelsToAdd: []labels.Labels{
206+
{
207+
{Name: "foo-bar", Value: "foo-bar"},
208+
{Name: "fizz-buzz", Value: "fizz-buzz"},
209+
},
210+
},
211+
expectedLabels: []labels.Labels{
212+
{
213+
{Name: "foo_bar", Value: "foo-bar"},
214+
{Name: "fizz_buzz", Value: "fizz-buzz"},
215+
},
216+
},
217+
description: "only normalize when string is used as a name, not as a value",
218+
},
219+
} {
220+
t.Run(tc.name, func(t *testing.T) {
221+
// Test direct addition
222+
s := newSymbolizer()
223+
for i, labels := range tc.labelsToAdd {
224+
symbols := s.Add(labels)
225+
result := s.Lookup(symbols, nil)
226+
require.Equal(t, tc.expectedLabels[i], result, "direct addition: %s", tc.description)
227+
}
228+
229+
// Test serialization/deserialization via checkpoint
230+
buf := bytes.NewBuffer(nil)
231+
_, _, err := s.CheckpointTo(buf)
232+
require.NoError(t, err)
233+
234+
loaded := symbolizerFromCheckpoint(buf.Bytes())
235+
for i, labels := range tc.labelsToAdd {
236+
symbols := loaded.Add(labels)
237+
result := loaded.Lookup(symbols, nil)
238+
require.Equal(t, tc.expectedLabels[i], result, "after checkpoint: %s", tc.description)
239+
}
240+
241+
// Test serialization/deserialization via compression
242+
buf.Reset()
243+
_, _, err = s.SerializeTo(buf, compression.GetWriterPool(compression.Snappy))
244+
require.NoError(t, err)
245+
246+
loaded, err = symbolizerFromEnc(buf.Bytes(), compression.GetReaderPool(compression.Snappy))
247+
require.NoError(t, err)
248+
for i, labels := range tc.labelsToAdd {
249+
symbols := loaded.Add(labels)
250+
result := loaded.Lookup(symbols, nil)
251+
require.Equal(t, tc.expectedLabels[i], result, "after compression: %s", tc.description)
252+
}
253+
})
254+
}
255+
}
256+
257+
func TestSymbolizerNormalizationCache(t *testing.T) {
258+
s := newSymbolizer()
259+
260+
// Add a label with a name that needs normalization
261+
labels1 := labels.Labels{{Name: "foo-bar", Value: "value1"}}
262+
symbols1 := s.Add(labels1)
263+
264+
// Look up the label multiple times
265+
for i := 0; i < 3; i++ {
266+
result := s.Lookup(symbols1, nil)
267+
require.Equal(t, "foo_bar", result[0].Name, "normalized name should be consistent")
268+
require.Equal(t, "value1", result[0].Value, "value should remain unchanged")
269+
}
270+
271+
// Add the same label name with a different value
272+
labels2 := labels.Labels{{Name: "foo-bar", Value: "value2"}}
273+
symbols2 := s.Add(labels2)
274+
275+
// The normalized name should be reused
276+
result := s.Lookup(symbols1, nil)
277+
firstPtr := unsafe.StringData(result[0].Name)
278+
result = s.Lookup(symbols2, nil)
279+
secondPtr := unsafe.StringData(result[0].Name)
280+
require.Equal(t, firstPtr, secondPtr, "normalized name string data pointers should be identical")
281+
require.Equal(t, "value2", result[0].Value, "new value should be used")
282+
283+
// Check that we have only one entry in normalizedNames for this label name
284+
require.Equal(t, 1, len(s.normalizedNames), "should have only one normalized name entry")
285+
}
286+
287+
func TestSymbolizerLabelNormalizationAfterDeserialization(t *testing.T) {
288+
s := newSymbolizer()
289+
290+
// Add some labels and serialize them
291+
originalLabels := labels.Labels{
292+
{Name: "foo-bar", Value: "value1"},
293+
{Name: "fizz-buzz", Value: "value2"},
294+
}
295+
s.Add(originalLabels)
296+
297+
buf := bytes.NewBuffer(nil)
298+
_, _, err := s.SerializeTo(buf, compression.GetWriterPool(compression.Snappy))
299+
require.NoError(t, err)
300+
301+
// Load the serialized data
302+
loaded, err := symbolizerFromEnc(buf.Bytes(), compression.GetReaderPool(compression.Snappy))
303+
require.NoError(t, err)
304+
305+
// Add new labels with the same names but different values
306+
newLabels := labels.Labels{
307+
{Name: "foo-bar", Value: "new-value1"},
308+
{Name: "fizz-buzz", Value: "new-value2"},
309+
}
310+
symbols := loaded.Add(newLabels)
311+
312+
// Check that the normalization is consistent
313+
result := loaded.Lookup(symbols, nil)
314+
require.Equal(t, "foo_bar", result[0].Name, "first label should be normalized")
315+
require.Equal(t, "new-value1", result[0].Value, "first value should be unchanged")
316+
require.Equal(t, "fizz_buzz", result[1].Name, "second label should be normalized")
317+
require.Equal(t, "new-value2", result[1].Value, "second value should be unchanged")
318+
}
319+
320+
func TestSymbolizerLabelNormalizationSameNameValue(t *testing.T) {
321+
s := newSymbolizer()
322+
323+
// Add labels where the name and value are the same string
324+
originalLabels := labels.Labels{
325+
{Name: "foo-bar", Value: "foo-bar"},
326+
{Name: "test-label", Value: "test-label"},
327+
}
328+
originalSymbols := s.Add(originalLabels)
329+
330+
// Verify initial state
331+
result := s.Lookup(originalSymbols, nil)
332+
require.Equal(t, "foo_bar", result[0].Name, "name should be normalized")
333+
require.Equal(t, "foo-bar", result[0].Value, "value should remain unchanged")
334+
require.Equal(t, "test_label", result[1].Name, "name should be normalized")
335+
require.Equal(t, "test-label", result[1].Value, "value should remain unchanged")
336+
337+
// Serialize the symbolizer
338+
buf := bytes.NewBuffer(nil)
339+
_, _, err := s.SerializeTo(buf, compression.GetWriterPool(compression.Snappy))
340+
require.NoError(t, err)
341+
342+
// Load the serialized data
343+
loaded, err := symbolizerFromEnc(buf.Bytes(), compression.GetReaderPool(compression.Snappy))
344+
require.NoError(t, err)
345+
346+
// Look up using the original symbols without re-adding the labels
347+
result = loaded.Lookup(originalSymbols, nil)
348+
require.Equal(t, "foo_bar", result[0].Name, "name should be normalized after deserialization")
349+
require.Equal(t, "foo-bar", result[0].Value, "value should remain unchanged after deserialization")
350+
require.Equal(t, "test_label", result[1].Name, "name should be normalized after deserialization")
351+
require.Equal(t, "test-label", result[1].Value, "value should remain unchanged after deserialization")
352+
353+
// Also test with checkpoint serialization
354+
buf.Reset()
355+
_, _, err = s.CheckpointTo(buf)
356+
require.NoError(t, err)
357+
358+
loadedFromCheckpoint := symbolizerFromCheckpoint(buf.Bytes())
359+
result = loadedFromCheckpoint.Lookup(originalSymbols, nil)
360+
require.Equal(t, "foo_bar", result[0].Name, "name should be normalized after checkpoint")
361+
require.Equal(t, "foo-bar", result[0].Value, "value should remain unchanged after checkpoint")
362+
require.Equal(t, "test_label", result[1].Name, "name should be normalized after checkpoint")
363+
require.Equal(t, "test-label", result[1].Value, "value should remain unchanged after checkpoint")
364+
}

pkg/logql/log/pipeline.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package log
33
import (
44
"context"
55

6-
"github.com/prometheus/otlptranslator"
76
"github.com/prometheus/prometheus/model/labels"
87

98
"sync"
@@ -102,9 +101,6 @@ func (n noopStreamPipeline) ReferencedStructuredMetadata() bool {
102101

103102
func (n noopStreamPipeline) Process(_ int64, line []byte, structuredMetadata labels.Labels) ([]byte, LabelsResult, bool) {
104103
n.builder.Reset()
105-
for i, lb := range structuredMetadata {
106-
structuredMetadata[i].Name = otlptranslator.NormalizeLabel(lb.Name)
107-
}
108104
n.builder.Add(StructuredMetadataLabel, structuredMetadata)
109105
return line, n.builder.LabelsResult(), true
110106
}
@@ -225,10 +221,6 @@ func (p *streamPipeline) Process(ts int64, line []byte, structuredMetadata label
225221
var ok bool
226222
p.builder.Reset()
227223

228-
for i, lb := range structuredMetadata {
229-
structuredMetadata[i].Name = otlptranslator.NormalizeLabel(lb.Name)
230-
}
231-
232224
p.builder.Add(StructuredMetadataLabel, structuredMetadata)
233225

234226
for _, s := range p.stages {

pkg/logql/log/pipeline_test.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -54,18 +54,6 @@ func TestNoopPipeline(t *testing.T) {
5454
require.Equal(t, expectedLabelsResults.String(), lbr.String())
5555
require.Equal(t, true, matches)
5656

57-
// test structured metadata with disallowed label names
58-
structuredMetadata = append(labels.FromStrings("y", "1", "z", "2"), labels.Label{Name: "zsomething-bad", Value: "foo"})
59-
expectedStructuredMetadata := append(labels.FromStrings("y", "1", "z", "2"), labels.Label{Name: "zsomething_bad", Value: "foo"})
60-
expectedLabelsResults = append(lbs, expectedStructuredMetadata...)
61-
62-
l, lbr, matches = pipeline.ForStream(lbs).Process(0, []byte(""), structuredMetadata)
63-
require.Equal(t, []byte(""), l)
64-
require.Equal(t, NewLabelsResult(expectedLabelsResults.String(), expectedLabelsResults.Hash(), lbs, expectedStructuredMetadata, labels.EmptyLabels()), lbr)
65-
require.Equal(t, expectedLabelsResults.Hash(), lbr.Hash())
66-
require.Equal(t, expectedLabelsResults.String(), lbr.String())
67-
require.Equal(t, true, matches)
68-
6957
pipeline.Reset()
7058
require.Len(t, pipeline.cache, 0)
7159
}
@@ -177,17 +165,6 @@ func TestPipelineWithStructuredMetadata(t *testing.T) {
177165
require.Equal(t, nil, lbr)
178166
require.Equal(t, false, matches)
179167

180-
// test structured metadata with disallowed label names
181-
withBadLabel := append(structuredMetadata, labels.Label{Name: "zsomething-bad", Value: "foo"})
182-
expectedStructuredMetadata := append(structuredMetadata, labels.Label{Name: "zsomething_bad", Value: "foo"})
183-
expectedLabelsResults = append(lbs, expectedStructuredMetadata...)
184-
185-
_, lbr, matches = p.ForStream(lbs).Process(0, []byte(""), withBadLabel)
186-
require.Equal(t, NewLabelsResult(expectedLabelsResults.String(), expectedLabelsResults.Hash(), lbs, expectedStructuredMetadata, labels.EmptyLabels()), lbr)
187-
require.Equal(t, expectedLabelsResults.Hash(), lbr.Hash())
188-
require.Equal(t, expectedLabelsResults.String(), lbr.String())
189-
require.Equal(t, true, matches)
190-
191168
// Reset caches
192169
p.baseBuilder.del = []string{"foo", "bar"}
193170
p.baseBuilder.add = [numValidCategories]labels.Labels{

0 commit comments

Comments
 (0)