Skip to content

Commit 2161c8c

Browse files
fix(otlp): calculate entry metadata size before adding resource/scope attributes (#17629)
Signed-off-by: Jordan Rushing <[email protected]>
1 parent 878abdc commit 2161c8c

File tree

2 files changed

+173
-4
lines changed

2 files changed

+173
-4
lines changed

pkg/loghttp/push/otlp.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,10 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl
330330
entryLbs = lbs
331331
}
332332

333+
// Calculate the entry's own metadata size BEFORE adding resource and scope attributes
334+
// This preserves the intent of tracking entry-specific metadata separately without requiring subtraction
335+
entryOwnMetadataSize := int64(loki_util.StructuredMetadataSize(entry.StructuredMetadata))
336+
333337
// if entry.StructuredMetadata doesn't have capacity to add resource and scope attributes, make a new slice with enough capacity
334338
attributesAsStructuredMetadataLen := len(resourceAttributesAsStructuredMetadata) + len(scopeAttributesAsStructuredMetadata)
335339
if cap(entry.StructuredMetadata) < len(entry.StructuredMetadata)+attributesAsStructuredMetadataLen {
@@ -347,19 +351,19 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl
347351
entryRetentionPeriod := streamResolver.RetentionPeriodFor(entryLbs)
348352
entryPolicy := streamResolver.PolicyFor(entryLbs)
349353

350-
metadataSize := int64(loki_util.StructuredMetadataSize(entry.StructuredMetadata) - resourceAttributesAsStructuredMetadataSize - scopeAttributesAsStructuredMetadataSize)
351-
352354
if _, ok := stats.StructuredMetadataBytes[entryPolicy]; !ok {
353355
stats.StructuredMetadataBytes[entryPolicy] = make(map[time.Duration]int64)
354356
}
355-
stats.StructuredMetadataBytes[entryPolicy][entryRetentionPeriod] += metadataSize
357+
// Use the entry's own metadata size (calculated before adding resource/scope attributes)
358+
// This keeps the same accounting intention without risk of negative values
359+
stats.StructuredMetadataBytes[entryPolicy][entryRetentionPeriod] += entryOwnMetadataSize
356360

357361
if _, ok := stats.LogLinesBytes[entryPolicy]; !ok {
358362
stats.LogLinesBytes[entryPolicy] = make(map[time.Duration]int64)
359363
}
360364
stats.LogLinesBytes[entryPolicy][entryRetentionPeriod] += int64(len(entry.Line))
361365

362-
totalBytesReceived += metadataSize
366+
totalBytesReceived += entryOwnMetadataSize
363367
totalBytesReceived += int64(len(entry.Line))
364368

365369
stats.PolicyNumLines[entryPolicy]++

pkg/loghttp/push/otlp_test.go

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,171 @@ func TestOTLPLogAttributesAsIndexLabels(t *testing.T) {
984984
require.Equal(t, int64(3), stats.PolicyNumLines["test-policy"], "Should have counted 3 log lines")
985985
}
986986

987+
func TestOTLPStructuredMetadataCalculation(t *testing.T) {
988+
now := time.Unix(0, time.Now().UnixNano())
989+
990+
generateLogs := func() plog.Logs {
991+
ld := plog.NewLogs()
992+
993+
// Create resource with attributes
994+
rl := ld.ResourceLogs().AppendEmpty()
995+
rl.Resource().Attributes().PutStr("service.name", "test-service")
996+
rl.Resource().Attributes().PutStr("resource.key", "resource.value")
997+
998+
// Create scope with attributes
999+
sl := rl.ScopeLogs().AppendEmpty()
1000+
sl.Scope().SetName("test-scope")
1001+
sl.Scope().Attributes().PutStr("scope.key", "scope.value")
1002+
1003+
// Add a log record with minimal metadata
1004+
logRecord := sl.LogRecords().AppendEmpty()
1005+
logRecord.Body().SetStr("Test entry with minimal metadata")
1006+
logRecord.SetTimestamp(pcommon.Timestamp(now.UnixNano()))
1007+
logRecord.Attributes().PutStr("entry.key", "entry.value")
1008+
1009+
return ld
1010+
}
1011+
1012+
// Run the test
1013+
stats := NewPushStats()
1014+
tracker := NewMockTracker()
1015+
streamResolver := newMockStreamResolver("fake", &fakeLimits{})
1016+
1017+
streamResolver.policyForOverride = func(_ labels.Labels) string {
1018+
return "test-policy"
1019+
}
1020+
1021+
// Convert OTLP logs to Loki push request
1022+
pushReq := otlpToLokiPushRequest(
1023+
context.Background(),
1024+
generateLogs(),
1025+
"test-user",
1026+
DefaultOTLPConfig(defaultGlobalOTLPConfig),
1027+
nil, // tenantConfigs
1028+
[]string{}, // discoverServiceName
1029+
tracker,
1030+
stats,
1031+
log.NewNopLogger(),
1032+
streamResolver,
1033+
)
1034+
1035+
// Verify there is exactly one stream
1036+
require.Equal(t, 1, len(pushReq.Streams))
1037+
1038+
// Verify we have a single entry with all the expected metadata
1039+
stream := pushReq.Streams[0]
1040+
require.Equal(t, 1, len(stream.Entries))
1041+
1042+
// Verify the structured metadata bytes are positive
1043+
require.Greater(t, stats.StructuredMetadataBytes["test-policy"][time.Hour], int64(0),
1044+
"Structured metadata bytes should be positive")
1045+
1046+
// Verify we can find the resource, scope, and entry metadata in the entry
1047+
entry := stream.Entries[0]
1048+
1049+
resourceMetadataFound := false
1050+
scopeMetadataFound := false
1051+
entryMetadataFound := false
1052+
1053+
for _, metadata := range entry.StructuredMetadata {
1054+
if metadata.Name == "resource_key" && metadata.Value == "resource.value" {
1055+
resourceMetadataFound = true
1056+
}
1057+
if metadata.Name == "scope_key" && metadata.Value == "scope.value" {
1058+
scopeMetadataFound = true
1059+
}
1060+
if metadata.Name == "entry_key" && metadata.Value == "entry.value" {
1061+
entryMetadataFound = true
1062+
}
1063+
}
1064+
1065+
require.True(t, resourceMetadataFound, "Resource metadata should be present in the entry")
1066+
require.True(t, scopeMetadataFound, "Scope metadata should be present in the entry")
1067+
require.True(t, entryMetadataFound, "Entry metadata should be present in the entry")
1068+
}
1069+
1070+
func TestNegativeMetadataScenarioExplicit(t *testing.T) {
1071+
// This test explicitly demonstrates how negative structured metadata size values
1072+
// could occur when subtracting resource/scope attributes from total structured metadata size
1073+
1074+
// Setup: Create metadata with a label that would be excluded from size calculation
1075+
resourceMeta := push.LabelsAdapter{
1076+
{Name: "resource_key", Value: "resource_value"}, // 27 bytes
1077+
{Name: "excluded_label", Value: "value"}, // This would be excluded from size calculation
1078+
}
1079+
1080+
scopeMeta := push.LabelsAdapter{
1081+
{Name: "scope_key", Value: "scope_value"}, // 20 bytes
1082+
}
1083+
1084+
entryMeta := push.LabelsAdapter{
1085+
{Name: "entry_key", Value: "entry_value"}, // 20 bytes
1086+
}
1087+
1088+
// ExcludedStructuredMetadataLabels would exclude certain labels
1089+
// from size calculations.
1090+
calculateSize := func(labels push.LabelsAdapter) int {
1091+
size := 0
1092+
for _, label := range labels {
1093+
// Simulate a label being excluded from size calc
1094+
if label.Name != "excluded_label" {
1095+
size += len(label.Name) + len(label.Value)
1096+
}
1097+
}
1098+
return size
1099+
}
1100+
1101+
// Calculate sizes with simulated exclusions
1102+
resourceSize := calculateSize(resourceMeta) // 27 bytes (excluded_label not counted)
1103+
scopeSize := calculateSize(scopeMeta) // 20 bytes
1104+
entrySize := calculateSize(entryMeta) // 20 bytes
1105+
1106+
// The original approach:
1107+
// 1. Add resource and scope attributes to entry metadata
1108+
combined := make(push.LabelsAdapter, 0)
1109+
combined = append(combined, entryMeta...)
1110+
combined = append(combined, resourceMeta...)
1111+
combined = append(combined, scopeMeta...)
1112+
1113+
// 2. Calculate combined size (with certain labels excluded)
1114+
combinedSize := calculateSize(combined) // Should be 27 + 20 + 20 = 67 bytes
1115+
1116+
// 3. Calculate entry-specific metadata by subtraction
1117+
// metadataSize := int64(combinedSize - resourceSize - scopeSize)
1118+
oldCalculation := combinedSize - resourceSize - scopeSize
1119+
1120+
// Should be: 67 - 27 - 20 = 20 bytes, which equals entrySize
1121+
1122+
t.Logf("Resource size: %d bytes", resourceSize)
1123+
t.Logf("Scope size: %d bytes", scopeSize)
1124+
t.Logf("Entry size: %d bytes", entrySize)
1125+
t.Logf("Combined size: %d bytes", combinedSize)
1126+
t.Logf("Old calculation (combined - resource - scope): %d bytes", oldCalculation)
1127+
1128+
// Now, to demonstrate how this could produce negative values:
1129+
// In reality, due to potential inconsistencies in how labels were excluded/combined/normalized,
1130+
// the combined size could be LESS than the sum of parts
1131+
simulatedRealCombinedSize := resourceSize + scopeSize - 5 // 5 bytes less than sum
1132+
1133+
// Using the original calculation method:
1134+
simulatedRealCalculation := simulatedRealCombinedSize - resourceSize - scopeSize
1135+
// This will be: (27 + 20 - 5) - 27 - 20 = 42 - 47 = -5 bytes
1136+
1137+
t.Logf("Simulated real combined size: %d bytes", simulatedRealCombinedSize)
1138+
t.Logf("Simulated real calculation (old method): %d bytes", simulatedRealCalculation)
1139+
1140+
// This would be a negative value!
1141+
require.Less(t, simulatedRealCalculation, 0,
1142+
"This demonstrates how the old calculation could produce negative values")
1143+
1144+
// Directly use entry's size before combining
1145+
t.Logf("New calculation (direct entry size): %d bytes", entrySize)
1146+
require.Equal(t, entrySize, 20,
1147+
"New calculation provides correct entry size")
1148+
require.Greater(t, entrySize, 0,
1149+
"New calculation always produces non-negative values")
1150+
}
1151+
9871152
func TestOTLPSeverityTextAsLabel(t *testing.T) {
9881153
now := time.Unix(0, time.Now().UnixNano())
9891154

0 commit comments

Comments
 (0)