Skip to content

Commit 6495be0

Browse files
authored
feat: adds distributor lag counter to push.go (#18012)
1 parent 14ef400 commit 6495be0

File tree

1 file changed

+20
-1
lines changed

1 file changed

+20
-1
lines changed

pkg/loghttp/push/push.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ var (
6464
Help: "The total number of streams with exporter=OTLP label",
6565
}, []string{"tenant"})
6666

67+
distributorLagByUserAgent = promauto.NewCounterVec(prometheus.CounterOpts{
68+
Namespace: constants.Loki,
69+
Name: "distributor_most_recent_lag_ms",
70+
Help: "The difference in time (in millis) between when a distributor receives a push request and the most recent log timestamp in that request",
71+
}, []string{"tenant", "userAgent"})
72+
6773
bytesReceivedStats = analytics.NewCounter("distributor_bytes_received")
6874
structuredMetadataBytesReceivedStats = analytics.NewCounter("distributor_structured_metadata_bytes_received")
6975
linesReceivedStats = analytics.NewCounter("distributor_lines_received")
@@ -221,6 +227,7 @@ func ParseRequest(logger log.Logger, userID string, maxRecvMsgSize int, r *http.
221227
totalNumLines += numLines
222228
}
223229
linesReceivedStats.Inc(totalNumLines)
230+
mostRecentLagMs := time.Since(pushStats.MostRecentEntryTimestamp).Milliseconds()
224231

225232
logValues := []interface{}{
226233
"msg", "push request parsed",
@@ -234,7 +241,7 @@ func ParseRequest(logger log.Logger, userID string, maxRecvMsgSize int, r *http.
234241
"entriesSize", humanize.Bytes(uint64(entriesSize)),
235242
"structuredMetadataSize", humanize.Bytes(uint64(structuredMetadataSize)),
236243
"totalSize", humanize.Bytes(uint64(entriesSize + pushStats.StreamLabelsSize)),
237-
"mostRecentLagMs", time.Since(pushStats.MostRecentEntryTimestamp).Milliseconds(),
244+
"mostRecentLagMs", mostRecentLagMs,
238245
}
239246

240247
if presumedAgentIP != "" {
@@ -245,6 +252,18 @@ func ParseRequest(logger log.Logger, userID string, maxRecvMsgSize int, r *http.
245252
if userAgent != "" {
246253
logValues = append(logValues, "userAgent", strings.TrimSpace(userAgent))
247254
}
255+
// Since we're using a counter (so we can do things w/rate, irate, deriv, etc.) on the lag metrics,
256+
// dispatch a warning if we ever get a negative value. This could occur if we start getting logs
257+
// whose timestamps are in the future (e.g. agents sending logs w/missing or invalid NTP configs).
258+
// Negative values can't give us much insight into whether-or-not a customer's ingestion is falling
259+
// behind, so we won't include it in the metrics, and instead will capture the occurrence in the
260+
// distributor logs.
261+
// We capture this metric even when the user agent is empty; we want insight into the tenant's
262+
// ingestion lag no matter what.
263+
if mostRecentLagMs >= 0 && mostRecentLagMs < 1_000_000_000 {
264+
// we're filtering out anything over 1B -- the OTLP endpoints often really mess with this metric...
265+
distributorLagByUserAgent.WithLabelValues(userID, userAgent).Add(float64(mostRecentLagMs))
266+
}
248267

249268
if tenantConfigs != nil && tenantConfigs.LogHashOfLabels(userID) {
250269
resultHash := uint64(0)

0 commit comments

Comments
 (0)