Skip to content

Commit 385e317

Browse files
feat(compactor HS): use streaming RPCs for interacting with job queue over grpc (#18125)
1 parent 0990ecd commit 385e317

File tree

13 files changed

+1338
-1865
lines changed

13 files changed

+1338
-1865
lines changed

pkg/compactor/client/client.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package client
2+
3+
import (
4+
"context"
5+
6+
"github.com/grafana/loki/v3/pkg/compactor/client/grpc"
7+
"github.com/grafana/loki/v3/pkg/compactor/deletion"
8+
)
9+
10+
type CompactorClient interface {
11+
GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]deletion.DeleteRequest, error)
12+
GetCacheGenerationNumber(ctx context.Context, userID string) (string, error)
13+
14+
JobQueueClient() grpc.JobQueueClient
15+
16+
Name() string
17+
Stop()
18+
}

pkg/compactor/client/grpc.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"github.com/prometheus/common/model"
1414
"google.golang.org/grpc"
1515

16-
deletion_grpc "github.com/grafana/loki/v3/pkg/compactor/client/grpc"
16+
compactor_grpc "github.com/grafana/loki/v3/pkg/compactor/client/grpc"
1717
"github.com/grafana/loki/v3/pkg/compactor/deletion"
1818
)
1919

@@ -29,25 +29,26 @@ func (cfg *GRPCConfig) RegisterFlags(f *flag.FlagSet) {
2929
type compactorGRPCClient struct {
3030
cfg GRPCConfig
3131

32-
GRPCClientRequestDuration *prometheus.HistogramVec
32+
grpcClientRequestDuration *prometheus.HistogramVec
3333
conn *grpc.ClientConn
34-
grpcClient deletion_grpc.CompactorClient
34+
grpcClient compactor_grpc.CompactorClient
35+
jobQueueClient compactor_grpc.JobQueueClient
3536
}
3637

3738
// NewGRPCClient supports only methods which are used for internal communication of Loki like
38-
// loading delete requests and cache gen numbers for query time filtering.
39-
func NewGRPCClient(addr string, cfg GRPCConfig, r prometheus.Registerer) (deletion.CompactorClient, error) {
39+
// loading delete requests, cache gen numbers for query time filtering and interacting with job queue for horizontal scaling of compactor.
40+
func NewGRPCClient(addr string, cfg GRPCConfig, r prometheus.Registerer) (CompactorClient, error) {
4041
client := &compactorGRPCClient{
4142
cfg: cfg,
42-
GRPCClientRequestDuration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
43+
grpcClientRequestDuration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
4344
Namespace: "loki_compactor",
4445
Name: "grpc_request_duration_seconds",
4546
Help: "Time (in seconds) spent serving requests when using compactor GRPC client",
4647
Buckets: instrument.DefBuckets,
4748
}, []string{"operation", "status_code"}),
4849
}
4950

50-
unaryInterceptors, streamInterceptors := grpcclient.Instrument(client.GRPCClientRequestDuration)
51+
unaryInterceptors, streamInterceptors := grpcclient.Instrument(client.grpcClientRequestDuration)
5152
dialOpts, err := cfg.GRPCClientConfig.DialOption(unaryInterceptors, streamInterceptors, middleware.NoOpInvalidClusterValidationReporter)
5253
if err != nil {
5354
return nil, err
@@ -59,7 +60,8 @@ func NewGRPCClient(addr string, cfg GRPCConfig, r prometheus.Registerer) (deleti
5960
return nil, err
6061
}
6162

62-
client.grpcClient = deletion_grpc.NewCompactorClient(client.conn)
63+
client.grpcClient = compactor_grpc.NewCompactorClient(client.conn)
64+
client.jobQueueClient = compactor_grpc.NewJobQueueClient(client.conn)
6365
return client, nil
6466
}
6567

@@ -69,7 +71,7 @@ func (s *compactorGRPCClient) Stop() {
6971

7072
func (s *compactorGRPCClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]deletion.DeleteRequest, error) {
7173
ctx = user.InjectOrgID(ctx, userID)
72-
grpcResp, err := s.grpcClient.GetDeleteRequests(ctx, &deletion_grpc.GetDeleteRequestsRequest{ForQuerytimeFiltering: true})
74+
grpcResp, err := s.grpcClient.GetDeleteRequests(ctx, &compactor_grpc.GetDeleteRequestsRequest{ForQuerytimeFiltering: true})
7375
if err != nil {
7476
return nil, err
7577
}
@@ -91,14 +93,18 @@ func (s *compactorGRPCClient) GetAllDeleteRequestsForUser(ctx context.Context, u
9193

9294
func (s *compactorGRPCClient) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) {
9395
ctx = user.InjectOrgID(ctx, userID)
94-
grpcResp, err := s.grpcClient.GetCacheGenNumbers(ctx, &deletion_grpc.GetCacheGenNumbersRequest{})
96+
grpcResp, err := s.grpcClient.GetCacheGenNumbers(ctx, &compactor_grpc.GetCacheGenNumbersRequest{})
9597
if err != nil {
9698
return "", err
9799
}
98100

99101
return grpcResp.ResultsCacheGen, nil
100102
}
101103

104+
func (s *compactorGRPCClient) JobQueueClient() compactor_grpc.JobQueueClient {
105+
return compactor_grpc.NewJobQueueClient(s.conn)
106+
}
107+
102108
func (s *compactorGRPCClient) Name() string {
103109
return "grpc_client"
104110
}

0 commit comments

Comments
 (0)