Skip to content

Commit 47961f8

Browse files
feat: Compactor deletion manifest builder (#17474)
1 parent 3ad1a64 commit 47961f8

File tree

13 files changed

+1871
-191
lines changed

13 files changed

+1871
-191
lines changed

pkg/compactor/deletion/delete_request.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,10 @@ func allMatch(matchers []*labels.Matcher, labels labels.Labels) bool {
106106
return true
107107
}
108108

109-
// IsDeleted checks if the given ChunkEntry will be deleted by this DeleteRequest.
110-
// It returns a filter.Func if the chunk is supposed to be deleted partially or the delete request contains line filters.
111-
// If the filter.Func is nil, the whole chunk is supposed to be deleted.
112-
func (d *DeleteRequest) IsDeleted(userID []byte, lbls labels.Labels, chunk retention.Chunk) (bool, filter.Func) {
109+
// IsDeleted checks if the given chunk entry would have data requested for deletion.
110+
func (d *DeleteRequest) IsDeleted(userID []byte, lbls labels.Labels, chunk retention.Chunk) bool {
113111
if d.UserID != unsafeGetString(userID) {
114-
return false, nil
112+
return false
115113
}
116114

117115
if !intervalsOverlap(model.Interval{
@@ -121,7 +119,7 @@ func (d *DeleteRequest) IsDeleted(userID []byte, lbls labels.Labels, chunk reten
121119
Start: d.StartTime,
122120
End: d.EndTime,
123121
}) {
124-
return false, nil
122+
return false
125123
}
126124

127125
if d.logSelectorExpr == nil {
@@ -133,11 +131,21 @@ func (d *DeleteRequest) IsDeleted(userID []byte, lbls labels.Labels, chunk reten
133131
"user", d.UserID,
134132
"err", err,
135133
)
136-
return false, nil
134+
return false
137135
}
138136
}
139137

140138
if !labels.Selector(d.matchers).Matches(lbls) {
139+
return false
140+
}
141+
142+
return true
143+
}
144+
145+
// GetChunkFilter tells whether the chunk is covered by the DeleteRequest and
146+
// optionally returns a filter.Func if the chunk is supposed to be deleted partially or the delete request has line filters.
147+
func (d *DeleteRequest) GetChunkFilter(userID []byte, lbls labels.Labels, chunk retention.Chunk) (bool, filter.Func) {
148+
if !d.IsDeleted(userID, lbls, chunk) {
141149
return false, nil
142150
}
143151

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package deletion
2+
3+
import (
4+
"time"
5+
6+
"github.com/go-kit/log/level"
7+
"github.com/prometheus/common/model"
8+
"github.com/prometheus/prometheus/model/labels"
9+
10+
"github.com/grafana/loki/v3/pkg/compactor/retention"
11+
"github.com/grafana/loki/v3/pkg/util/filter"
12+
util_log "github.com/grafana/loki/v3/pkg/util/log"
13+
)
14+
15+
// deleteRequestBatch holds a batch of requests loaded for processing
16+
type deleteRequestBatch struct {
17+
deleteRequestsToProcess map[string]*userDeleteRequests
18+
duplicateRequests []DeleteRequest
19+
count int
20+
metrics *deleteRequestsManagerMetrics
21+
}
22+
23+
func newDeleteRequestBatch(metrics *deleteRequestsManagerMetrics) *deleteRequestBatch {
24+
return &deleteRequestBatch{
25+
deleteRequestsToProcess: map[string]*userDeleteRequests{},
26+
metrics: metrics,
27+
}
28+
}
29+
30+
func (b *deleteRequestBatch) reset() {
31+
b.deleteRequestsToProcess = map[string]*userDeleteRequests{}
32+
b.duplicateRequests = []DeleteRequest{}
33+
b.count = 0
34+
}
35+
36+
func (b *deleteRequestBatch) requestCount() int {
37+
return b.count
38+
}
39+
40+
// addDeleteRequest add a requests to the batch
41+
func (b *deleteRequestBatch) addDeleteRequest(dr *DeleteRequest) {
42+
dr.Metrics = b.metrics
43+
ur, ok := b.deleteRequestsToProcess[dr.UserID]
44+
if !ok {
45+
ur = &userDeleteRequests{
46+
requestsInterval: model.Interval{
47+
Start: dr.StartTime,
48+
End: dr.EndTime,
49+
},
50+
}
51+
b.deleteRequestsToProcess[dr.UserID] = ur
52+
}
53+
54+
ur.requests = append(ur.requests, dr)
55+
if dr.StartTime < ur.requestsInterval.Start {
56+
ur.requestsInterval.Start = dr.StartTime
57+
}
58+
if dr.EndTime > ur.requestsInterval.End {
59+
ur.requestsInterval.End = dr.EndTime
60+
}
61+
b.count++
62+
}
63+
64+
func (b *deleteRequestBatch) checkDuplicate(deleteRequest DeleteRequest) error {
65+
ur, ok := b.deleteRequestsToProcess[deleteRequest.UserID]
66+
if !ok {
67+
return nil
68+
}
69+
for _, requestLoadedForProcessing := range ur.requests {
70+
isDuplicate, err := requestLoadedForProcessing.IsDuplicate(&deleteRequest)
71+
if err != nil {
72+
return err
73+
}
74+
if isDuplicate {
75+
level.Info(util_log.Logger).Log(
76+
"msg", "found duplicate request of one of the requests loaded for processing",
77+
"loaded_request_id", requestLoadedForProcessing.RequestID,
78+
"duplicate_request_id", deleteRequest.RequestID,
79+
"user", deleteRequest.UserID,
80+
)
81+
b.duplicateRequests = append(b.duplicateRequests, deleteRequest)
82+
}
83+
}
84+
85+
return nil
86+
}
87+
88+
func (b *deleteRequestBatch) expired(userID []byte, chk retention.Chunk, lbls labels.Labels, skipRequest func(*DeleteRequest) bool) (bool, filter.Func) {
89+
userIDStr := unsafeGetString(userID)
90+
if b.deleteRequestsToProcess[userIDStr] == nil || !intervalsOverlap(b.deleteRequestsToProcess[userIDStr].requestsInterval, model.Interval{
91+
Start: chk.From,
92+
End: chk.Through,
93+
}) {
94+
return false, nil
95+
}
96+
97+
var filterFuncs []filter.Func
98+
99+
for _, deleteRequest := range b.deleteRequestsToProcess[userIDStr].requests {
100+
if skipRequest(deleteRequest) {
101+
continue
102+
}
103+
isDeleted, ff := deleteRequest.GetChunkFilter(userID, lbls, chk)
104+
if !isDeleted {
105+
continue
106+
}
107+
108+
if ff == nil {
109+
level.Info(util_log.Logger).Log(
110+
"msg", "no chunks to retain: the whole chunk is deleted",
111+
"delete_request_id", deleteRequest.RequestID,
112+
"sequence_num", deleteRequest.SequenceNum,
113+
"user", deleteRequest.UserID,
114+
"chunkID", string(chk.ChunkID),
115+
)
116+
b.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(userID)).Inc()
117+
return true, nil
118+
}
119+
filterFuncs = append(filterFuncs, ff)
120+
}
121+
122+
if len(filterFuncs) == 0 {
123+
return false, nil
124+
}
125+
126+
b.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(userID)).Inc()
127+
return true, func(ts time.Time, s string, structuredMetadata labels.Labels) bool {
128+
for _, ff := range filterFuncs {
129+
if ff(ts, s, structuredMetadata) {
130+
return true
131+
}
132+
}
133+
134+
return false
135+
}
136+
}
137+
138+
func (b *deleteRequestBatch) intervalMayHaveExpiredChunks(userID string) bool {
139+
// We can't do the overlap check between the passed interval and delete requests interval from a user because
140+
// if a request is issued just for today and there are chunks spanning today and yesterday then
141+
// the overlap check would skip processing yesterday's index which would result in the index pointing to deleted chunks.
142+
if userID != "" {
143+
return b.deleteRequestsToProcess[userID] != nil
144+
}
145+
146+
return len(b.deleteRequestsToProcess) != 0
147+
}
148+
149+
func (b *deleteRequestBatch) getAllRequestsForUser(userID string) []*DeleteRequest {
150+
userRequests, ok := b.deleteRequestsToProcess[userID]
151+
if !ok {
152+
return nil
153+
}
154+
155+
return userRequests.requests
156+
}

0 commit comments

Comments
 (0)