Skip to content

Commit 865bc62

Browse files
chore: clean up code
1 parent a6c2ffa commit 865bc62

File tree

2 files changed

+30
-17
lines changed

2 files changed

+30
-17
lines changed

pkg/limits/partition_manager.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ func (m *partitionManager) Assign(partitions []int32) {
8484
}
8585
}
8686

87+
// Count returns the number of assigned partitions.
88+
func (m *partitionManager) Count() int {
89+
m.mtx.Lock()
90+
defer m.mtx.Unlock()
91+
return len(m.partitions)
92+
}
93+
8794
// GetState returns the current state of the partition. It returns false
8895
// if the partition does not exist.
8996
func (m *partitionManager) GetState(partition int32) (partitionState, bool) {

pkg/limits/service.go

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"sync"
78
"time"
89

910
"github.com/coder/quartz"
@@ -27,8 +28,9 @@ const (
2728
RingKey = "ingest-limits"
2829
RingName = "ingest-limits"
2930

30-
// Readiness check
31-
maxPartitionReadinessAttempts int32 = 10
31+
// The maximum number of checks to fail while waiting to be assigned
32+
// some partitions before giving up and going ready.
33+
maxPartitionReadinessWaitAssignChecks = 10
3234
)
3335

3436
// Service is a service that manages stream metadata limits.
@@ -52,9 +54,9 @@ type Service struct {
5254
streamEvictionsTotal *prometheus.CounterVec
5355

5456
// Readiness check
55-
partitionReadinessAttempts int
56-
partitionReadinessPassed bool
57-
partitionReadinessMtx sync.Mutex
57+
partitionReadinessWaitAssignChecks int
58+
partitionReadinessPassed bool
59+
partitionReadinessMtx sync.Mutex
5860

5961
// Used for tests.
6062
clock quartz.Clock
@@ -192,27 +194,31 @@ func (s *Service) CheckReady(ctx context.Context) error {
192194
if err := s.lifecycler.CheckReady(ctx); err != nil {
193195
return fmt.Errorf("lifecycler not ready: %w", err)
194196
}
195-
// Check if the partitions assignment and replay
196-
// are complete on the service startup only.
197197
s.partitionReadinessMtx.Lock()
198198
defer s.partitionReadinessMtx.Unlock()
199+
// We are ready when all of our assigned partitions have replayed the
200+
// last active window of data. This is referred to as partition readiness.
201+
// Once we have passed partition readiness we never check it again as
202+
// otherwise the service could become unready during a partition rebalance.
199203
if !s.partitionReadinessPassed {
200-
if len(s.partitionManager.List()) == 0 {
201-
if s.partitionReadinessAttempts >= maxPartitionReadinessAttempts {
202-
// If no partition assigment on startup,
203-
// declare the service initialized.
204+
if s.partitionManager.Count() == 0 {
205+
// If partition readiness, once passed, is never checked again,
206+
// we can assume that the service has recently started and is
207+
// trying to become ready for the first time. If we do not have
208+
// any assigned partitions we should wait some time in case we
209+
// eventually get assigned some partitions, and if not, we give
210+
// give up and become ready to guarantee liveness.
211+
s.partitionReadinessWaitAssignChecks++
212+
if s.partitionReadinessWaitAssignChecks > maxPartitionReadinessWaitAssignChecks {
213+
level.Warn(s.logger).Log("msg", "no partitions assigned, going ready")
204214
s.partitionReadinessPassed = true
205-
level.Warn(s.logger).Log("msg", "no partitions assigned after max retries, going ready")
206215
return nil
207216
}
208-
s.partitionReadinessAttempts++
209-
return fmt.Errorf("no partitions assigned, retrying")
217+
return fmt.Errorf("waiting initial period to be assigned some partitions")
210218
}
211219
if !s.partitionManager.CheckReady() {
212-
return fmt.Errorf("partitions not ready")
220+
return fmt.Errorf("partitions are not ready")
213221
}
214-
// If the partitions are assigned, and the replay is complete,
215-
// declare the service initialized.
216222
s.partitionReadinessPassed = true
217223
}
218224
return nil

0 commit comments

Comments
 (0)