Skip to content

Commit 1d68047

Browse files
feat: support disabling the partition consumers cache (#17318)
1 parent 0df6f2d commit 1d68047

File tree

21 files changed

+298
-1611
lines changed

21 files changed

+298
-1611
lines changed

docs/sources/shared/configuration.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1218,9 +1218,9 @@ ingest_limits_frontend:
12181218
# CLI flag: -ingest-limits-frontend.num-partitions
12191219
[num_partitions: <int> | default = 64]
12201220

1221-
# The TTL for the stream usage cache.
1222-
# CLI flag: -ingest-limits-frontend.partition-id-cache-ttl
1223-
[partition_id_cache_ttl: <duration> | default = 1m]
1221+
# The TTL for the assigned partitions cache. 0 disables the cache.
1222+
# CLI flag: -ingest-limits-frontend.assigned-partitions-cache-ttl
1223+
[assigned_partitions_cache_ttl: <duration> | default = 1m]
12241224

12251225
ingest_limits_frontend_client:
12261226
# Configures client gRPC connections to limits service.

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,6 @@ require (
133133
github.com/grafana/loki/pkg/push v0.0.0-20240924133635-758364c7775f
134134
github.com/heroku/x v0.4.3
135135
github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b
136-
github.com/jellydator/ttlcache/v3 v3.3.0
137136
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db
138137
github.com/ncw/swift/v2 v2.0.3
139138
github.com/parquet-go/parquet-go v0.25.0

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -786,8 +786,6 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6
786786
github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs=
787787
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
788788
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
789-
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
790-
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
791789
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
792790
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
793791
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=

pkg/limits/frontend/cache.go

Lines changed: 105 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,115 @@
11
package frontend
22

33
import (
4+
"sync"
45
"time"
56

6-
"github.com/jellydator/ttlcache/v3"
7-
8-
"github.com/grafana/loki/v3/pkg/logproto"
7+
"github.com/coder/quartz"
98
)
109

11-
type PartitionConsumersCache = ttlcache.Cache[string, *logproto.GetAssignedPartitionsResponse]
10+
type Cache[K comparable, V any] interface {
11+
// Get returns the value for the key. It returns true if the key exists,
12+
// otherwise false.
13+
Get(K) (V, bool)
14+
// Set stores the value for the key.
15+
Set(K, V)
16+
// Delete removes the key. If the key does not exist, the operation is a
17+
// no-op.
18+
Delete(K)
19+
// Reset removes all keys.
20+
Reset()
21+
}
22+
23+
// item contains the value and expiration time for a key.
24+
type item[V any] struct {
25+
value V
26+
expiresAt time.Time
27+
}
28+
29+
func (i *item[V]) hasExpired(now time.Time) bool {
30+
return i.expiresAt.Before(now) || i.expiresAt.Equal(now)
31+
}
32+
33+
// TTLCache is a simple, thread-safe cache with a single per-cache TTL.
34+
type TTLCache[K comparable, V any] struct {
35+
items map[K]item[V]
36+
ttl time.Duration
37+
mu sync.RWMutex
38+
39+
// Used for tests.
40+
clock quartz.Clock
41+
}
42+
43+
func NewTTLCache[K comparable, V any](ttl time.Duration) *TTLCache[K, V] {
44+
return &TTLCache[K, V]{
45+
items: make(map[K]item[V]),
46+
ttl: ttl,
47+
clock: quartz.NewReal(),
48+
}
49+
}
1250

13-
func NewPartitionConsumerCache(ttl time.Duration) *PartitionConsumersCache {
14-
return ttlcache.New(
15-
ttlcache.WithTTL[string, *logproto.GetAssignedPartitionsResponse](ttl),
16-
ttlcache.WithDisableTouchOnHit[string, *logproto.GetAssignedPartitionsResponse](),
51+
// Get implements Cache.Get.
52+
func (c *TTLCache[K, V]) Get(key K) (V, bool) {
53+
var (
54+
value V
55+
exists bool
56+
now = c.clock.Now()
1757
)
58+
c.mu.RLock()
59+
defer c.mu.RUnlock()
60+
if item, ok := c.items[key]; ok && !item.hasExpired(now) {
61+
value = item.value
62+
exists = true
63+
}
64+
return value, exists
65+
}
66+
67+
// Set implements Cache.Set.
68+
func (c *TTLCache[K, V]) Set(key K, value V) {
69+
now := c.clock.Now()
70+
c.mu.Lock()
71+
defer c.mu.Unlock()
72+
c.items[key] = item[V]{
73+
value: value,
74+
expiresAt: now.Add(c.ttl),
75+
}
76+
c.removeExpiredItems(now)
77+
}
78+
79+
// Delete implements Cache.Delete.
80+
func (c *TTLCache[K, V]) Delete(key K) {
81+
c.mu.Lock()
82+
defer c.mu.Unlock()
83+
delete(c.items, key)
84+
}
85+
86+
// Reset implements Cache.Reset.
87+
func (c *TTLCache[K, V]) Reset() {
88+
c.mu.Lock()
89+
defer c.mu.Unlock()
90+
c.items = make(map[K]item[V])
91+
}
92+
93+
// removeExpiredItems removes expired items.
94+
func (c *TTLCache[K, V]) removeExpiredItems(now time.Time) {
95+
for key, item := range c.items {
96+
if item.hasExpired(now) {
97+
delete(c.items, key)
98+
}
99+
}
100+
}
101+
102+
// NopCache is a no-op cache. It does not store any keys. It is used in tests
103+
// and as a stub for disabled caches.
104+
type NopCache[K comparable, V any] struct{}
105+
106+
func NewNopCache[K comparable, V any]() *NopCache[K, V] {
107+
return &NopCache[K, V]{}
108+
}
109+
func (c *NopCache[K, V]) Get(_ K) (V, bool) {
110+
var value V
111+
return value, false
18112
}
113+
func (c *NopCache[K, V]) Set(_ K, _ V) {}
114+
func (c *NopCache[K, V]) Delete(_ K) {}
115+
func (c *NopCache[K, V]) Reset() {}

pkg/limits/frontend/cache_test.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package frontend
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/coder/quartz"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestTTLCache_Get(t *testing.T) {
12+
c := NewTTLCache[string, string](time.Minute)
13+
clock := quartz.NewMock(t)
14+
c.clock = clock
15+
// The value should be absent.
16+
value, ok := c.Get("foo")
17+
require.Equal(t, "", value)
18+
require.False(t, ok)
19+
// Set the value and it should be present.
20+
c.Set("foo", "bar")
21+
value, ok = c.Get("foo")
22+
require.Equal(t, "bar", value)
23+
require.True(t, ok)
24+
// Advance the time to be 1 second before the expiration time.
25+
clock.Advance(59 * time.Second)
26+
value, ok = c.Get("foo")
27+
require.Equal(t, "bar", value)
28+
require.True(t, ok)
29+
// Advance the time to be equal to the expiration time, the value should
30+
// be absent.
31+
clock.Advance(time.Second)
32+
value, ok = c.Get("foo")
33+
require.Equal(t, "", value)
34+
require.False(t, ok)
35+
// Advance the time past the expiration time, the value should still be
36+
// absent.
37+
clock.Advance(time.Second)
38+
value, ok = c.Get("foo")
39+
require.Equal(t, "", value)
40+
require.False(t, ok)
41+
}
42+
43+
func TestTTLCache_Set(t *testing.T) {
44+
c := NewTTLCache[string, string](time.Minute)
45+
clock := quartz.NewMock(t)
46+
c.clock = clock
47+
c.Set("foo", "bar")
48+
item1, ok := c.items["foo"]
49+
require.True(t, ok)
50+
require.Equal(t, c.clock.Now().Add(time.Minute), item1.expiresAt)
51+
// Set should refresh the expiration time.
52+
clock.Advance(time.Second)
53+
c.Set("foo", "bar")
54+
item2, ok := c.items["foo"]
55+
require.True(t, ok)
56+
require.Greater(t, item2.expiresAt, item1.expiresAt)
57+
require.Equal(t, item2.expiresAt, item1.expiresAt.Add(time.Second))
58+
// Set should replace the value.
59+
c.Set("foo", "baz")
60+
value, ok := c.Get("foo")
61+
require.True(t, ok)
62+
require.Equal(t, "baz", value)
63+
}
64+
65+
func TestTTLCache_Delete(t *testing.T) {
66+
c := NewTTLCache[string, string](time.Minute)
67+
clock := quartz.NewMock(t)
68+
c.clock = clock
69+
// Set the value and it should be present.
70+
c.Set("foo", "bar")
71+
value, ok := c.Get("foo")
72+
require.True(t, ok)
73+
require.Equal(t, "bar", value)
74+
// Delete the value, it should be absent.
75+
c.Delete("foo")
76+
value, ok = c.Get("foo")
77+
require.False(t, ok)
78+
require.Equal(t, "", value)
79+
}
80+
81+
func TestTTLCache_Reset(t *testing.T) {
82+
c := NewTTLCache[string, string](time.Minute)
83+
clock := quartz.NewMock(t)
84+
c.clock = clock
85+
// Set two values, both should be present.
86+
c.Set("foo", "bar")
87+
value, ok := c.Get("foo")
88+
require.True(t, ok)
89+
require.Equal(t, "bar", value)
90+
c.Set("bar", "baz")
91+
value, ok = c.Get("bar")
92+
require.True(t, ok)
93+
require.Equal(t, "baz", value)
94+
// Reset the cache, all should be absent.
95+
c.Reset()
96+
value, ok = c.Get("foo")
97+
require.False(t, ok)
98+
require.Equal(t, "", value)
99+
value, ok = c.Get("bar")
100+
require.False(t, ok)
101+
require.Equal(t, "", value)
102+
// Should be able to set values following a reset.
103+
c.Set("baz", "qux")
104+
value, ok = c.Get("baz")
105+
require.True(t, ok)
106+
require.Equal(t, "qux", value)
107+
}
108+
109+
func TestTTLCache_RemoveExpiredItems(t *testing.T) {
110+
c := NewTTLCache[string, string](time.Minute)
111+
clock := quartz.NewMock(t)
112+
c.clock = clock
113+
c.Set("foo", "bar")
114+
_, ok := c.items["foo"]
115+
require.True(t, ok)
116+
// Advance the clock and update foo, it should not be removed.
117+
clock.Advance(time.Minute)
118+
c.Set("foo", "bar")
119+
_, ok = c.items["foo"]
120+
require.True(t, ok)
121+
// Advance the clock again but this time set bar, foo should be removed.
122+
clock.Advance(time.Minute)
123+
c.Set("bar", "baz")
124+
_, ok = c.items["foo"]
125+
require.False(t, ok)
126+
_, ok = c.items["bar"]
127+
require.True(t, ok)
128+
}
129+
130+
func TestNopCache(t *testing.T) {
131+
c := NewNopCache[string, string]()
132+
// The value should be absent.
133+
value, ok := c.Get("foo")
134+
require.Equal(t, "", value)
135+
require.False(t, ok)
136+
// Despite setting the value, it should still be absent.
137+
c.Set("foo", "bar")
138+
value, ok = c.Get("foo")
139+
require.Equal(t, "", value)
140+
require.False(t, ok)
141+
}

pkg/limits/frontend/config.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,19 @@ import (
1313

1414
// Config contains the config for an ingest-limits-frontend.
1515
type Config struct {
16-
ClientConfig limits_client.Config `yaml:"client_config"`
17-
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
18-
RecheckPeriod time.Duration `yaml:"recheck_period"`
19-
NumPartitions int `yaml:"num_partitions"`
20-
PartitionIDCacheTTL time.Duration `yaml:"partition_id_cache_ttl"`
16+
ClientConfig limits_client.Config `yaml:"client_config"`
17+
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
18+
RecheckPeriod time.Duration `yaml:"recheck_period"`
19+
NumPartitions int `yaml:"num_partitions"`
20+
AssignedPartitionsCacheTTL time.Duration `yaml:"assigned_partitions_cache_ttl"`
2121
}
2222

2323
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
2424
cfg.ClientConfig.RegisterFlagsWithPrefix("ingest-limits-frontend", f)
2525
cfg.LifecyclerConfig.RegisterFlagsWithPrefix("ingest-limits-frontend.", f, util_log.Logger)
2626
f.DurationVar(&cfg.RecheckPeriod, "ingest-limits-frontend.recheck-period", 10*time.Second, "The period to recheck per tenant ingestion rate limit configuration.")
2727
f.IntVar(&cfg.NumPartitions, "ingest-limits-frontend.num-partitions", 64, "The number of partitions to use for the ring.")
28-
f.DurationVar(&cfg.PartitionIDCacheTTL, "ingest-limits-frontend.partition-id-cache-ttl", 1*time.Minute, "The TTL for the stream usage cache.")
28+
f.DurationVar(&cfg.AssignedPartitionsCacheTTL, "ingest-limits-frontend.assigned-partitions-cache-ttl", 1*time.Minute, "The TTL for the assigned partitions cache. 0 disables the cache.")
2929
}
3030

3131
func (cfg *Config) Validate() error {

pkg/limits/frontend/frontend.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,11 @@ type Frontend struct {
5858
cfg Config
5959
logger log.Logger
6060

61-
limits Limits
62-
rateLimiter *limiter.RateLimiter
63-
streamUsage StreamUsageGatherer
64-
partitionIDCache *PartitionConsumersCache
65-
metrics *metrics
61+
limits Limits
62+
rateLimiter *limiter.RateLimiter
63+
streamUsage StreamUsageGatherer
64+
assignedPartitionsCache Cache[string, *logproto.GetAssignedPartitionsResponse]
65+
metrics *metrics
6666

6767
subservices *services.Manager
6868
subservicesWatcher *services.FailureWatcher
@@ -84,18 +84,25 @@ func New(cfg Config, ringName string, limitsRing ring.ReadRing, limits Limits, l
8484
logger,
8585
)
8686

87+
var assignedPartitionsCache Cache[string, *logproto.GetAssignedPartitionsResponse]
88+
if cfg.AssignedPartitionsCacheTTL == 0 {
89+
// When the TTL is 0, the cache is disabled.
90+
assignedPartitionsCache = NewNopCache[string, *logproto.GetAssignedPartitionsResponse]()
91+
} else {
92+
assignedPartitionsCache = NewTTLCache[string, *logproto.GetAssignedPartitionsResponse](cfg.AssignedPartitionsCacheTTL)
93+
}
94+
8795
rateLimiter := limiter.NewRateLimiter(newRateLimitsAdapter(limits), cfg.RecheckPeriod)
88-
partitionIDCache := NewPartitionConsumerCache(cfg.PartitionIDCacheTTL)
89-
streamUsage := NewRingStreamUsageGatherer(limitsRing, clientPool, logger, partitionIDCache, cfg.NumPartitions)
96+
streamUsage := NewRingStreamUsageGatherer(limitsRing, clientPool, cfg.NumPartitions, assignedPartitionsCache, logger)
9097

9198
f := &Frontend{
92-
cfg: cfg,
93-
logger: logger,
94-
limits: limits,
95-
rateLimiter: rateLimiter,
96-
streamUsage: streamUsage,
97-
partitionIDCache: partitionIDCache,
98-
metrics: newMetrics(reg),
99+
cfg: cfg,
100+
logger: logger,
101+
limits: limits,
102+
rateLimiter: rateLimiter,
103+
streamUsage: streamUsage,
104+
assignedPartitionsCache: assignedPartitionsCache,
105+
metrics: newMetrics(reg),
99106
}
100107

101108
lifecycler, err := ring.NewLifecycler(cfg.LifecyclerConfig, f, RingName, RingKey, true, logger, reg)
@@ -138,8 +145,6 @@ func (f *Frontend) starting(ctx context.Context) (err error) {
138145
return fmt.Errorf("failed to start subservices: %w", err)
139146
}
140147

141-
go f.partitionIDCache.Start()
142-
143148
return nil
144149
}
145150

@@ -155,7 +160,6 @@ func (f *Frontend) running(ctx context.Context) error {
155160

156161
// stopping implements services.Service.
157162
func (f *Frontend) stopping(_ error) error {
158-
f.partitionIDCache.Stop()
159163
return services.StopManagerAndAwaitStopped(context.Background(), f.subservices)
160164
}
161165

0 commit comments

Comments
 (0)