Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 32 additions & 13 deletions pkg/limits/partition_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s partitionState) String() string {
// each partition a timestamp of when it was assigned.
type partitionManager struct {
partitions map[int32]partitionEntry
mtx sync.Mutex
mtx sync.RWMutex

// Used for tests.
clock quartz.Clock
Expand Down Expand Up @@ -84,20 +84,39 @@ func (m *partitionManager) Assign(partitions []int32) {
}
}

// CheckReady returns true if all partitions are ready.
func (m *partitionManager) CheckReady() bool {
m.mtx.RLock()
defer m.mtx.RUnlock()
for _, entry := range m.partitions {
if entry.state != partitionReady {
return false
}
}
return true
}

// Count returns the number of assigned partitions.
func (m *partitionManager) Count() int {
m.mtx.Lock()
defer m.mtx.Unlock()
return len(m.partitions)
}

// GetState returns the current state of the partition. It returns false
// if the partition does not exist.
func (m *partitionManager) GetState(partition int32) (partitionState, bool) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.mtx.RLock()
defer m.mtx.RUnlock()
entry, ok := m.partitions[partition]
return entry.state, ok
}

// TargetOffsetReached returns true if the partition is replaying and the
// target offset has been reached.
func (m *partitionManager) TargetOffsetReached(partition int32, offset int64) bool {
m.mtx.Lock()
defer m.mtx.Unlock()
m.mtx.RLock()
defer m.mtx.RUnlock()
entry, ok := m.partitions[partition]
if ok {
return entry.state == partitionReplaying && entry.targetOffset <= offset
Expand All @@ -107,17 +126,17 @@ func (m *partitionManager) TargetOffsetReached(partition int32, offset int64) bo

// Has returns true if the partition is assigned, otherwise false.
func (m *partitionManager) Has(partition int32) bool {
m.mtx.Lock()
defer m.mtx.Unlock()
m.mtx.RLock()
defer m.mtx.RUnlock()
_, ok := m.partitions[partition]
return ok
}

// List returns a map of all assigned partitions and the timestamp of when
// each partition was assigned.
func (m *partitionManager) List() map[int32]int64 {
m.mtx.Lock()
defer m.mtx.Unlock()
m.mtx.RLock()
defer m.mtx.RUnlock()
result := make(map[int32]int64)
for partition, entry := range m.partitions {
result[partition] = entry.assignedAt
Expand All @@ -128,8 +147,8 @@ func (m *partitionManager) List() map[int32]int64 {
// ListByState returns all partitions with the specified state and their last
// updated timestamps.
func (m *partitionManager) ListByState(state partitionState) map[int32]int64 {
m.mtx.Lock()
defer m.mtx.Unlock()
m.mtx.RLock()
defer m.mtx.RUnlock()
result := make(map[int32]int64)
for partition, entry := range m.partitions {
if entry.state == state {
Expand Down Expand Up @@ -184,8 +203,8 @@ func (m *partitionManager) Describe(descs chan<- *prometheus.Desc) {

// Collect implements [prometheus.Collector].
func (m *partitionManager) Collect(metrics chan<- prometheus.Metric) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.mtx.RLock()
defer m.mtx.RUnlock()
for partition, entry := range m.partitions {
metrics <- prometheus.MustNewConstMetric(
partitionsDesc,
Expand Down
18 changes: 18 additions & 0 deletions pkg/limits/partition_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,24 @@ func TestPartitionManager_Assign(t *testing.T) {
}, m.partitions)
}

func TestPartitionManager_CheckReady(t *testing.T) {
m, err := newPartitionManager(prometheus.NewRegistry())
require.NoError(t, err)
c := quartz.NewMock(t)
m.clock = c
c.Advance(1)
m.Assign([]int32{1, 2})
require.False(t, m.CheckReady())
m.SetReplaying(1, 10)
require.False(t, m.CheckReady())
m.SetReady(1)
require.False(t, m.CheckReady())
m.SetReplaying(2, 10)
require.False(t, m.CheckReady())
m.SetReady(2)
require.True(t, m.CheckReady())
}

func TestPartitionManager_GetState(t *testing.T) {
m, err := newPartitionManager(prometheus.NewRegistry())
require.NoError(t, err)
Expand Down
61 changes: 59 additions & 2 deletions pkg/limits/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/coder/quartz"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -25,6 +27,11 @@ const (
// Ring
RingKey = "ingest-limits"
RingName = "ingest-limits"

// The maximum amount of time to wait to join the consumer group and be
// assigned some partitions before giving up and going ready in
// [Service.CheckReady].
partitionReadinessWaitAssignPeriod = 30 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this so now the time that we wait is independent of:

  1. The readiness check interval
  2. The number of clients calling /ready

)

// Service is a service that manages stream metadata limits.
Expand All @@ -47,6 +54,11 @@ type Service struct {
// Metrics.
streamEvictionsTotal *prometheus.CounterVec

// Readiness check, see [Service.CheckReady].
partitionReadinessPassed bool
partitionReadinessMtx sync.Mutex
partitionReadinessWaitAssignSince time.Time

// Used for tests.
clock quartz.Clock
}
Expand Down Expand Up @@ -180,10 +192,55 @@ func (s *Service) CheckReady(ctx context.Context) error {
if s.State() != services.Running {
return fmt.Errorf("service is not running: %v", s.State())
}
err := s.lifecycler.CheckReady(ctx)
if err != nil {
if err := s.lifecycler.CheckReady(ctx); err != nil {
return fmt.Errorf("lifecycler not ready: %w", err)
}
s.partitionReadinessMtx.Lock()
defer s.partitionReadinessMtx.Unlock()
// We are ready when all of our assigned partitions have replayed the
// last active window records. This is referred to as partition readiness.
// Once we have passed partition readiness we never check it again as
// otherwise the service could become unready during a partition rebalance.
if !s.partitionReadinessPassed {
if s.partitionManager.Count() == 0 {
// If partition readiness, once passed, is never checked again,
// we can assume that the service has recently started and is
// trying to become ready for the first time. If we do not have
// any assigned partitions we should wait some time in case we
// eventually get assigned some partitions, and if not, we give
// up and become ready to guarantee liveness.
return s.checkPartitionsAssigned(ctx)
}
return s.checkPartitionsReady(ctx)
}
return nil
}

// checkPartitionsAssigned checks if we either have been assigned some
// partitions or the wait assign period has elapsed. It must not be called
// without a lock on partitionReadinessMtx.
func (s *Service) checkPartitionsAssigned(_ context.Context) error {
if s.partitionReadinessWaitAssignSince == (time.Time{}) {
s.partitionReadinessWaitAssignSince = s.clock.Now()
}
if s.clock.Since(s.partitionReadinessWaitAssignSince) < partitionReadinessWaitAssignPeriod {
return errors.New("waiting to be assigned some partitions")
}
level.Warn(s.logger).Log("msg", "no partitions assigned, going ready")
s.partitionReadinessPassed = true
return nil
}

// checkPartitionsReady checks if all our assigned partitions are ready.
// It must not be called without a lock on partitionReadinessMtx.
func (s *Service) checkPartitionsReady(_ context.Context) error {
// If we lose our assigned partitions while replaying them we want to
// wait another complete wait assign period.
s.partitionReadinessWaitAssignSince = time.Time{}
if !s.partitionManager.CheckReady() {
return errors.New("partitions are not ready")
}
s.partitionReadinessPassed = true
return nil
}

Expand Down
Loading