Skip to content

Commit 32c5088

Browse files
fix: concurrent map access for processedSeries in delete requests manager (#17469)
1 parent bfa2a1d commit 32c5088

File tree

1 file changed

+7
-1
lines changed

1 file changed

+7
-1
lines changed

pkg/compactor/deletion/delete_requests_manager.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func NewDeleteRequestsManager(workingDir string, store DeleteRequestsStore, dele
7070
return nil, err
7171
}
7272

73+
dm.wg.Add(1)
7374
go dm.loop()
7475

7576
if err := dm.deleteRequestsStore.MergeShardedRequests(context.Background()); err != nil {
@@ -83,7 +84,6 @@ func (d *DeleteRequestsManager) loop() {
8384
ticker := time.NewTicker(5 * time.Minute)
8485
defer ticker.Stop()
8586

86-
d.wg.Add(1)
8787
defer d.wg.Done()
8888

8989
for {
@@ -111,6 +111,9 @@ func (d *DeleteRequestsManager) Stop() {
111111
}
112112

113113
func (d *DeleteRequestsManager) storeSeriesProgress() error {
114+
d.deleteRequestsToProcessMtx.Lock()
115+
defer d.deleteRequestsToProcessMtx.Unlock()
116+
114117
if len(d.processedSeries) == 0 {
115118
return nil
116119
}
@@ -507,6 +510,9 @@ func (d *DeleteRequestsManager) DropFromIndex(_ []byte, _ retention.Chunk, _ lab
507510
}
508511

509512
func (d *DeleteRequestsManager) MarkSeriesAsProcessed(userID, seriesID []byte, lbls labels.Labels, tableName string) error {
513+
d.deleteRequestsToProcessMtx.Lock()
514+
defer d.deleteRequestsToProcessMtx.Unlock()
515+
510516
userIDStr := unsafeGetString(userID)
511517
if d.deleteRequestsToProcess[userIDStr] == nil {
512518
return nil

0 commit comments

Comments
 (0)