Skip to content

Commit fdcc12d

Browse files
feat: query persisted patterns (#17980)
Signed-off-by: Trevor Whitney <[email protected]>
1 parent fd7321c commit fdcc12d

File tree

16 files changed

+989
-121
lines changed

16 files changed

+989
-121
lines changed

docs/sources/shared/configuration.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -863,6 +863,10 @@ pattern_ingester:
863863
# CLI flag: -pattern-ingester.max-allowed-line-length
864864
[max_allowed_line_length: <int> | default = 3000]
865865

866+
# How long to retain patterns in the pattern ingester after they are pushed.
867+
# CLI flag: -pattern-ingester.retain-for
868+
[retain_for: <duration> | default = 3h]
869+
866870
# The index_gateway block configures the Loki index gateway server, responsible
867871
# for serving index queries without the need to constantly interact with the
868872
# object store.

pkg/logproto/extensions.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
package logproto
22

33
import (
4+
"fmt"
5+
"slices"
46
"sort"
57
"strings"
68
"sync/atomic" //lint:ignore faillint we can't use go.uber.org/atomic with a protobuf struct without wrapping it.
9+
"time"
710

811
"github.com/cespare/xxhash/v2"
912
"github.com/dustin/go-humanize"
1013
jsoniter "github.com/json-iterator/go"
1114
"github.com/prometheus/common/model"
1215
"github.com/prometheus/prometheus/model/labels"
1316

17+
"github.com/grafana/loki/v3/pkg/logql/syntax"
1418
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
19+
"github.com/grafana/loki/v3/pkg/util/constants"
1520
)
1621

1722
// This is the separator define in the Prometheus Labels.Hash function.
@@ -189,3 +194,83 @@ func (m *ShardsResponse) Merge(other *ShardsResponse) {
189194
m.ChunkGroups = append(m.ChunkGroups, other.ChunkGroups...)
190195
m.Statistics.Merge(other.Statistics)
191196
}
197+
198+
func (m *QueryPatternsRequest) GetSampleQuery() (string, error) {
199+
expr, err := syntax.ParseExpr(m.Query)
200+
if err != nil {
201+
return "", err
202+
}
203+
204+
// Extract matchers from the expression
205+
var matchers []*labels.Matcher
206+
switch e := expr.(type) {
207+
case *syntax.MatchersExpr:
208+
matchers = e.Mts
209+
case syntax.LogSelectorExpr:
210+
matchers = e.Matchers()
211+
default:
212+
// Cannot extract matchers
213+
return "", nil
214+
}
215+
216+
// Find service_name from matchers
217+
var serviceName string
218+
var serviceMatcher labels.MatchType
219+
for i, m := range matchers {
220+
if m.Name == "service_name" {
221+
matchers = slices.Delete(matchers, i, i+1)
222+
serviceName = m.Value
223+
serviceMatcher = m.Type
224+
break
225+
}
226+
}
227+
228+
if serviceName == "" {
229+
serviceName = ".+"
230+
serviceMatcher = labels.MatchRegexp
231+
}
232+
233+
// Build LogQL query for persisted patterns
234+
logqlQuery := buildPatternLogQLQuery(serviceName, serviceMatcher, matchers, m.Step)
235+
236+
return logqlQuery, nil
237+
}
238+
239+
func buildPatternLogQLQuery(serviceName string, serviceMatcher labels.MatchType, matchers []*labels.Matcher, step int64) string {
240+
// Use step duration for sum_over_time window
241+
stepDuration := max(time.Duration(step)*time.Millisecond, 10*time.Second)
242+
243+
if len(matchers) == 0 {
244+
return buildPatterLogQLQueryString(serviceName, serviceMatcher.String(), "", stepDuration.String())
245+
}
246+
247+
stringBuilder := strings.Builder{}
248+
for i, matcher := range matchers {
249+
stringBuilder.WriteString(matcher.String())
250+
if i < len(matchers)-1 {
251+
stringBuilder.WriteString(" | ")
252+
}
253+
}
254+
255+
return buildPatterLogQLQueryString(serviceName, serviceMatcher.String(), stringBuilder.String(), stepDuration.String())
256+
}
257+
258+
func buildPatterLogQLQueryString(serviceName, serviceMatcher, matchers, step string) string {
259+
decodePatternTransform := `label_format decoded_pattern=` + "`{{urldecode .detected_pattern}}`"
260+
261+
matchAndTransform := ""
262+
if matchers == "" {
263+
matchAndTransform = decodePatternTransform
264+
} else {
265+
matchAndTransform = fmt.Sprintf(`%s | %s`, matchers, decodePatternTransform)
266+
267+
}
268+
return fmt.Sprintf(
269+
`sum by (decoded_pattern, %s) (sum_over_time({__pattern__%s"%s"} | logfmt | %s | unwrap count [%s]))`,
270+
constants.LevelLabel,
271+
serviceMatcher,
272+
serviceName,
273+
matchAndTransform,
274+
step,
275+
)
276+
}

pkg/logproto/extensions_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,127 @@ func TestShard_SpaceFor(t *testing.T) {
4040
})
4141
}
4242
}
43+
44+
func TestQueryPatternsRequest_GetSampleQuery(t *testing.T) {
45+
tests := []struct {
46+
name string
47+
query string
48+
step int64
49+
expected string
50+
expectedError bool
51+
}{
52+
{
53+
name: "simple selector with service_name",
54+
query: `{service_name="test-service"}`,
55+
step: 60000, // 1 minute in milliseconds
56+
expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__="test-service"} | logfmt | ` +
57+
"label_format decoded_pattern=`{{urldecode .detected_pattern}}` " +
58+
`| unwrap count [1m0s]))`,
59+
},
60+
{
61+
name: "selector with service_name and additional labels",
62+
query: `{service_name="test-service", env="prod", cluster="us-east-1"}`,
63+
step: 300000, // 5 minutes in milliseconds
64+
expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__="test-service"} | logfmt | ` +
65+
"env=\"prod\" | cluster=\"us-east-1\" | " +
66+
"label_format decoded_pattern=`{{urldecode .detected_pattern}}` " +
67+
`| unwrap count [5m0s]))`,
68+
},
69+
{
70+
name: "selector with service_name and additional labels and match types",
71+
query: `{service_name="test-service", env=~"prod", cluster!="us-east-1", foo!~"bar"}`,
72+
step: 300000, // 5 minutes in milliseconds
73+
expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__="test-service"} | logfmt | ` +
74+
"env=~\"prod\" | cluster!=\"us-east-1\" | foo!~\"bar\" | " +
75+
"label_format decoded_pattern=`{{urldecode .detected_pattern}}` " +
76+
`| unwrap count [5m0s]))`,
77+
},
78+
{
79+
name: "small step gets minimum 10s window",
80+
query: `{service_name="test-service"}`,
81+
step: 5000, // 5 seconds in milliseconds
82+
expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__="test-service"} | logfmt | ` +
83+
"label_format decoded_pattern=`{{urldecode .detected_pattern}}` " +
84+
`| unwrap count [10s]))`,
85+
},
86+
{
87+
name: "simple regex selector with service_name",
88+
query: `{service_name=~"test-service"}`,
89+
step: 10000, // 10 seconds in milliseconds
90+
expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__=~"test-service"} | logfmt | ` +
91+
"label_format decoded_pattern=`{{urldecode .detected_pattern}}` " +
92+
`| unwrap count [10s]))`,
93+
},
94+
}
95+
96+
for _, tc := range tests {
97+
t.Run(tc.name, func(t *testing.T) {
98+
req := &QueryPatternsRequest{
99+
Query: tc.query,
100+
Step: tc.step,
101+
}
102+
103+
result, err := req.GetSampleQuery()
104+
105+
if tc.expectedError {
106+
require.Error(t, err)
107+
} else {
108+
require.NoError(t, err)
109+
require.Equal(t, tc.expected, result)
110+
}
111+
})
112+
}
113+
}
114+
115+
func TestQueryPatternsRequest_GetSampleQuery_NoServiceName(t *testing.T) {
116+
tests := []struct {
117+
name string
118+
query string
119+
expected string
120+
expectedError bool
121+
}{
122+
{
123+
name: "no service_name label",
124+
query: `{env="prod", cluster="us-east-1"}`,
125+
expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__=~".+"} | logfmt | ` +
126+
"env=\"prod\" | cluster=\"us-east-1\" | " +
127+
"label_format decoded_pattern=`{{urldecode .detected_pattern}}` " +
128+
`| unwrap count [1m0s]))`,
129+
},
130+
{
131+
name: "no service_name label, mixed match types",
132+
query: `{env!="prod", cluster=~"us-east-1", app!~"foo"}`,
133+
expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__=~".+"} | logfmt | ` +
134+
"env!=\"prod\" | cluster=~\"us-east-1\" | app!~\"foo\" | " +
135+
"label_format decoded_pattern=`{{urldecode .detected_pattern}}` " +
136+
`| unwrap count [1m0s]))`,
137+
},
138+
}
139+
140+
for _, tc := range tests {
141+
t.Run(tc.name, func(t *testing.T) {
142+
req := &QueryPatternsRequest{
143+
Query: tc.query,
144+
Step: 60000,
145+
}
146+
147+
result, err := req.GetSampleQuery()
148+
if tc.expectedError {
149+
require.Error(t, err)
150+
} else {
151+
require.NoError(t, err)
152+
require.Equal(t, tc.expected, result)
153+
}
154+
})
155+
}
156+
}
157+
158+
func TestQueryPatternsRequest_GetSampleQuery_InvalidQuery(t *testing.T) {
159+
req := &QueryPatternsRequest{
160+
Query: `{invalid query syntax`,
161+
Step: 60000,
162+
}
163+
164+
_, err := req.GetSampleQuery()
165+
require.Error(t, err)
166+
}

pkg/loki/modules.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,10 @@ func (t *Loki) initQuerier() (services.Service, error) {
563563
if t.Cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 {
564564
t.Cfg.Querier.IngesterQueryStoreMaxLookback = t.Cfg.Ingester.QueryStoreMaxLookBackPeriod
565565
}
566+
567+
// Use Pattern ingester RetainFor value to determine when to query pattern ingesters
568+
t.Cfg.Querier.QueryPatternIngestersWithin = t.Cfg.Pattern.RetainFor
569+
566570
// Querier worker's max concurrent must be the same as the querier setting
567571
t.Cfg.Worker.MaxConcurrent = t.Cfg.Querier.MaxConcurrent
568572
deleteStore, err := t.deleteRequestsClient("querier", t.Overrides)

pkg/pattern/flush.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,13 @@ package pattern
22

33
import (
44
"fmt"
5-
"time"
65

76
"github.com/go-kit/log/level"
87
"github.com/prometheus/common/model"
98

109
"github.com/grafana/loki/v3/pkg/util"
1110
)
1211

13-
const retainSampleFor = 3 * time.Hour
14-
1512
func (i *Ingester) initFlushQueues() {
1613
// i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)
1714
for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
@@ -66,7 +63,7 @@ func (i *Ingester) sweepInstance(instance *instance, _, mayRemoveStreams bool) {
6663
_ = instance.streams.ForEach(func(s *stream) (bool, error) {
6764
if mayRemoveStreams {
6865
instance.streams.WithLock(func() {
69-
if s.prune(retainSampleFor) {
66+
if s.prune(i.cfg.RetainFor) {
7067
instance.removeStream(s)
7168
}
7269
})

pkg/pattern/ingester.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type Config struct {
4545
TeeConfig TeeConfig `yaml:"tee_config,omitempty" doc:"description=Configures the pattern tee which forwards requests to the pattern ingester."`
4646
ConnectionTimeout time.Duration `yaml:"connection_timeout"`
4747
MaxAllowedLineLength int `yaml:"max_allowed_line_length,omitempty" doc:"description=The maximum length of log lines that can be used for pattern detection."`
48+
RetainFor time.Duration `yaml:"retain_for,omitempty" doc:"description=How long to retain patterns in the pattern ingester after they are pushed."`
4849

4950
// For testing.
5051
factory ring_client.PoolFactory `yaml:"-"`
@@ -100,6 +101,12 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
100101
drain.DefaultConfig().MaxAllowedLineLength,
101102
"The maximum length of log lines that can be used for pattern detection.",
102103
)
104+
fs.DurationVar(
105+
&cfg.RetainFor,
106+
"pattern-ingester.retain-for",
107+
3*time.Hour,
108+
"How long to retain patterns in the pattern ingester after they are pushed.",
109+
)
103110
}
104111

105112
type TeeConfig struct {

0 commit comments

Comments
 (0)