Skip to content

Commit a07bee5

Browse files
fix(limits): Set ready when all partitions ready (#18092)
Co-authored-by: George Robinson <[email protected]>
1 parent 4fb5688 commit a07bee5

File tree

3 files changed

+109
-15
lines changed

3 files changed

+109
-15
lines changed

pkg/limits/partition_manager.go

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (s partitionState) String() string {
4747
// each partition a timestamp of when it was assigned.
4848
type partitionManager struct {
4949
partitions map[int32]partitionEntry
50-
mtx sync.Mutex
50+
mtx sync.RWMutex
5151

5252
// Used for tests.
5353
clock quartz.Clock
@@ -84,20 +84,39 @@ func (m *partitionManager) Assign(partitions []int32) {
8484
}
8585
}
8686

87+
// CheckReady returns true if all partitions are ready.
88+
func (m *partitionManager) CheckReady() bool {
89+
m.mtx.RLock()
90+
defer m.mtx.RUnlock()
91+
for _, entry := range m.partitions {
92+
if entry.state != partitionReady {
93+
return false
94+
}
95+
}
96+
return true
97+
}
98+
99+
// Count returns the number of assigned partitions.
100+
func (m *partitionManager) Count() int {
101+
m.mtx.Lock()
102+
defer m.mtx.Unlock()
103+
return len(m.partitions)
104+
}
105+
87106
// GetState returns the current state of the partition. It returns false
88107
// if the partition does not exist.
89108
func (m *partitionManager) GetState(partition int32) (partitionState, bool) {
90-
m.mtx.Lock()
91-
defer m.mtx.Unlock()
109+
m.mtx.RLock()
110+
defer m.mtx.RUnlock()
92111
entry, ok := m.partitions[partition]
93112
return entry.state, ok
94113
}
95114

96115
// TargetOffsetReached returns true if the partition is replaying and the
97116
// target offset has been reached.
98117
func (m *partitionManager) TargetOffsetReached(partition int32, offset int64) bool {
99-
m.mtx.Lock()
100-
defer m.mtx.Unlock()
118+
m.mtx.RLock()
119+
defer m.mtx.RUnlock()
101120
entry, ok := m.partitions[partition]
102121
if ok {
103122
return entry.state == partitionReplaying && entry.targetOffset <= offset
@@ -107,17 +126,17 @@ func (m *partitionManager) TargetOffsetReached(partition int32, offset int64) bo
107126

108127
// Has returns true if the partition is assigned, otherwise false.
109128
func (m *partitionManager) Has(partition int32) bool {
110-
m.mtx.Lock()
111-
defer m.mtx.Unlock()
129+
m.mtx.RLock()
130+
defer m.mtx.RUnlock()
112131
_, ok := m.partitions[partition]
113132
return ok
114133
}
115134

116135
// List returns a map of all assigned partitions and the timestamp of when
117136
// each partition was assigned.
118137
func (m *partitionManager) List() map[int32]int64 {
119-
m.mtx.Lock()
120-
defer m.mtx.Unlock()
138+
m.mtx.RLock()
139+
defer m.mtx.RUnlock()
121140
result := make(map[int32]int64)
122141
for partition, entry := range m.partitions {
123142
result[partition] = entry.assignedAt
@@ -128,8 +147,8 @@ func (m *partitionManager) List() map[int32]int64 {
128147
// ListByState returns all partitions with the specified state and their last
129148
// updated timestamps.
130149
func (m *partitionManager) ListByState(state partitionState) map[int32]int64 {
131-
m.mtx.Lock()
132-
defer m.mtx.Unlock()
150+
m.mtx.RLock()
151+
defer m.mtx.RUnlock()
133152
result := make(map[int32]int64)
134153
for partition, entry := range m.partitions {
135154
if entry.state == state {
@@ -184,8 +203,8 @@ func (m *partitionManager) Describe(descs chan<- *prometheus.Desc) {
184203

185204
// Collect implements [prometheus.Collector].
186205
func (m *partitionManager) Collect(metrics chan<- prometheus.Metric) {
187-
m.mtx.Lock()
188-
defer m.mtx.Unlock()
206+
m.mtx.RLock()
207+
defer m.mtx.RUnlock()
189208
for partition, entry := range m.partitions {
190209
metrics <- prometheus.MustNewConstMetric(
191210
partitionsDesc,

pkg/limits/partition_manager_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,24 @@ func TestPartitionManager_Assign(t *testing.T) {
6161
}, m.partitions)
6262
}
6363

64+
func TestPartitionManager_CheckReady(t *testing.T) {
65+
m, err := newPartitionManager(prometheus.NewRegistry())
66+
require.NoError(t, err)
67+
c := quartz.NewMock(t)
68+
m.clock = c
69+
c.Advance(1)
70+
m.Assign([]int32{1, 2})
71+
require.False(t, m.CheckReady())
72+
m.SetReplaying(1, 10)
73+
require.False(t, m.CheckReady())
74+
m.SetReady(1)
75+
require.False(t, m.CheckReady())
76+
m.SetReplaying(2, 10)
77+
require.False(t, m.CheckReady())
78+
m.SetReady(2)
79+
require.True(t, m.CheckReady())
80+
}
81+
6482
func TestPartitionManager_GetState(t *testing.T) {
6583
m, err := newPartitionManager(prometheus.NewRegistry())
6684
require.NoError(t, err)

pkg/limits/service.go

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"sync"
78
"time"
89

910
"github.com/coder/quartz"
1011
"github.com/go-kit/log"
12+
"github.com/go-kit/log/level"
1113
"github.com/grafana/dskit/ring"
1214
"github.com/grafana/dskit/services"
1315
"github.com/prometheus/client_golang/prometheus"
@@ -25,6 +27,11 @@ const (
2527
// Ring
2628
RingKey = "ingest-limits"
2729
RingName = "ingest-limits"
30+
31+
// The maximum amount of time to wait to join the consumer group and be
32+
// assigned some partitions before giving up and going ready in
33+
// [Service.CheckReady].
34+
partitionReadinessWaitAssignPeriod = 30 * time.Second
2835
)
2936

3037
// Service is a service that manages stream metadata limits.
@@ -47,6 +54,11 @@ type Service struct {
4754
// Metrics.
4855
streamEvictionsTotal *prometheus.CounterVec
4956

57+
// Readiness check, see [Service.CheckReady].
58+
partitionReadinessPassed bool
59+
partitionReadinessMtx sync.Mutex
60+
partitionReadinessWaitAssignSince time.Time
61+
5062
// Used for tests.
5163
clock quartz.Clock
5264
}
@@ -180,10 +192,55 @@ func (s *Service) CheckReady(ctx context.Context) error {
180192
if s.State() != services.Running {
181193
return fmt.Errorf("service is not running: %v", s.State())
182194
}
183-
err := s.lifecycler.CheckReady(ctx)
184-
if err != nil {
195+
if err := s.lifecycler.CheckReady(ctx); err != nil {
185196
return fmt.Errorf("lifecycler not ready: %w", err)
186197
}
198+
s.partitionReadinessMtx.Lock()
199+
defer s.partitionReadinessMtx.Unlock()
200+
// We are ready when all of our assigned partitions have replayed the
201+
// last active window records. This is referred to as partition readiness.
202+
// Once we have passed partition readiness we never check it again as
203+
// otherwise the service could become unready during a partition rebalance.
204+
if !s.partitionReadinessPassed {
205+
if s.partitionManager.Count() == 0 {
206+
// If partition readiness, once passed, is never checked again,
207+
// we can assume that the service has recently started and is
208+
// trying to become ready for the first time. If we do not have
209+
// any assigned partitions we should wait some time in case we
210+
// eventually get assigned some partitions, and if not, we give
211+
// up and become ready to guarantee liveness.
212+
return s.checkPartitionsAssigned(ctx)
213+
}
214+
return s.checkPartitionsReady(ctx)
215+
}
216+
return nil
217+
}
218+
219+
// checkPartitionsAssigned checks if we either have been assigned some
220+
// partitions or the wait assign period has elapsed. It must not be called
221+
// without a lock on partitionReadinessMtx.
222+
func (s *Service) checkPartitionsAssigned(_ context.Context) error {
223+
if s.partitionReadinessWaitAssignSince == (time.Time{}) {
224+
s.partitionReadinessWaitAssignSince = s.clock.Now()
225+
}
226+
if s.clock.Since(s.partitionReadinessWaitAssignSince) < partitionReadinessWaitAssignPeriod {
227+
return errors.New("waiting to be assigned some partitions")
228+
}
229+
level.Warn(s.logger).Log("msg", "no partitions assigned, going ready")
230+
s.partitionReadinessPassed = true
231+
return nil
232+
}
233+
234+
// checkPartitionsReady checks if all our assigned partitions are ready.
235+
// It must not be called without a lock on partitionReadinessMtx.
236+
func (s *Service) checkPartitionsReady(_ context.Context) error {
237+
// If we lose our assigned partitions while replaying them we want to
238+
// wait another complete wait assign period.
239+
s.partitionReadinessWaitAssignSince = time.Time{}
240+
if !s.partitionManager.CheckReady() {
241+
return errors.New("partitions are not ready")
242+
}
243+
s.partitionReadinessPassed = true
187244
return nil
188245
}
189246

0 commit comments

Comments
 (0)