Skip to content

Commit b613811

Browse files
committed
fix(streamReader): implement scheduled cleanup for expired stream readers (#2719)
* fix(streamReader): implement scheduled cleanup for expired stream readers * fix(streamReader): implement scheduled cleanup for expired stream readers * fix(streamReader): add missing import statements in StreamReaders and StreamReadersTest * fix(StreamReadersTest): improve test setup and cleanup logic for stream readers * test(StreamReadersTest): update expired stream reader cleanup test for manual trigger and faster execution * style(StreamReadersTest): remove extra blank line in import statements * test(StreamReadersTest): use reflection to simulate expired stream readers for faster cleanup testing Signed-off-by: Gezi-lzq <[email protected]> * refactor(StreamReader, StreamReaders): inject Time for testable time control and remove reflection from tests - Add Time dependency to StreamReader and StreamReaders for time-related operations - Update constructors to accept Time, defaulting to Time.SYSTEM - Replace System.currentTimeMillis() with time.milliseconds() throughout - Refactor StreamReadersTest to use MockTime for simulating time passage - Remove reflection-based time manipulation in tests for cleaner and safer testing --------- Signed-off-by: Gezi-lzq <[email protected]>
1 parent 59693e2 commit b613811

File tree

3 files changed

+256
-8
lines changed

3 files changed

+256
-8
lines changed

s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.automq.stream.s3.objects.ObjectManager;
2727
import com.automq.stream.utils.FutureUtil;
2828
import com.automq.stream.utils.LogSuppressor;
29+
import com.automq.stream.utils.Time;
2930
import com.automq.stream.utils.threads.EventLoop;
3031
import com.google.common.annotations.VisibleForTesting;
3132

@@ -72,19 +73,27 @@
7273
private final ObjectManager objectManager;
7374
private final ObjectReaderFactory objectReaderFactory;
7475
private final DataBlockCache dataBlockCache;
76+
private final Time time;
7577
long nextReadOffset;
7678
private CompletableFuture<Void> inflightLoadIndexCf;
7779
private volatile CompletableFuture<Void> afterReadTryReadaheadCf;
78-
private long lastAccessTimestamp = System.currentTimeMillis();
80+
private long lastAccessTimestamp;
7981
private boolean reading = false;
8082

8183
private boolean closed = false;
8284

8385
public StreamReader(long streamId, long nextReadOffset, EventLoop eventLoop, ObjectManager objectManager,
8486
ObjectReaderFactory objectReaderFactory, DataBlockCache dataBlockCache) {
87+
this(streamId, nextReadOffset, eventLoop, objectManager, objectReaderFactory, dataBlockCache, Time.SYSTEM);
88+
}
89+
90+
public StreamReader(long streamId, long nextReadOffset, EventLoop eventLoop, ObjectManager objectManager,
91+
ObjectReaderFactory objectReaderFactory, DataBlockCache dataBlockCache, Time time) {
8592
this.streamId = streamId;
8693
this.nextReadOffset = nextReadOffset;
8794
this.readahead = new Readahead();
95+
this.time = time;
96+
this.lastAccessTimestamp = time.milliseconds();
8897

8998
this.eventLoop = eventLoop;
9099
this.objectManager = objectManager;
@@ -109,7 +118,7 @@ public CompletableFuture<ReadDataBlock> read(long startOffset, long endOffset, i
109118
}
110119

111120
CompletableFuture<ReadDataBlock> read(long startOffset, long endOffset, int maxBytes, int leftRetries) {
112-
lastAccessTimestamp = System.currentTimeMillis();
121+
lastAccessTimestamp = time.milliseconds();
113122
ReadContext readContext = new ReadContext();
114123
read0(readContext, startOffset, endOffset, maxBytes);
115124
CompletableFuture<ReadDataBlock> retCf = new CompletableFuture<>();
@@ -609,7 +618,7 @@ class Readahead {
609618
private int cacheMissCount;
610619

611620
public void tryReadahead(boolean cacheMiss) {
612-
if (System.currentTimeMillis() - resetTimestamp < READAHEAD_RESET_COLD_DOWN_MILLS) {
621+
if (time.milliseconds() - resetTimestamp < READAHEAD_RESET_COLD_DOWN_MILLS) {
613622
// skip readahead when readahead is in cold down
614623
return;
615624
}
@@ -652,7 +661,7 @@ public void tryReadahead(boolean cacheMiss) {
652661

653662
public void reset() {
654663
requireReset = true;
655-
resetTimestamp = System.currentTimeMillis();
664+
resetTimestamp = time.milliseconds();
656665
}
657666
}
658667

s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReaders.java

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
import com.automq.stream.s3.trace.context.TraceContext;
1919
import com.automq.stream.utils.FutureUtil;
2020
import com.automq.stream.utils.Systems;
21+
import com.automq.stream.utils.Threads;
22+
import com.automq.stream.utils.Time;
2123
import com.automq.stream.utils.threads.EventLoop;
24+
import com.google.common.annotations.VisibleForTesting;
2225

2326
import org.slf4j.Logger;
2427
import org.slf4j.LoggerFactory;
@@ -35,6 +38,7 @@ public class StreamReaders implements S3BlockCache {
3538
private static final long STREAM_READER_EXPIRED_MILLS = TimeUnit.MINUTES.toMillis(1);
3639
private static final long STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS = TimeUnit.MINUTES.toMillis(1);
3740
private final Cache[] caches;
41+
private final Time time;
3842
private final DataBlockCache dataBlockCache;
3943
private final ObjectReaderFactory objectReaderFactory;
4044

@@ -43,11 +47,17 @@ public class StreamReaders implements S3BlockCache {
4347

4448
public StreamReaders(long size, ObjectManager objectManager, ObjectStorage objectStorage,
4549
ObjectReaderFactory objectReaderFactory) {
46-
this(size, objectManager, objectStorage, objectReaderFactory, Systems.CPU_CORES);
50+
this(size, objectManager, objectStorage, objectReaderFactory, Systems.CPU_CORES, Time.SYSTEM);
4751
}
4852

4953
public StreamReaders(long size, ObjectManager objectManager, ObjectStorage objectStorage,
5054
ObjectReaderFactory objectReaderFactory, int concurrency) {
55+
this(size, objectManager, objectStorage, objectReaderFactory, concurrency, Time.SYSTEM);
56+
}
57+
58+
public StreamReaders(long size, ObjectManager objectManager, ObjectStorage objectStorage,
59+
ObjectReaderFactory objectReaderFactory, int concurrency, Time time) {
60+
this.time = time;
5161
EventLoop[] eventLoops = new EventLoop[concurrency];
5262
for (int i = 0; i < concurrency; i++) {
5363
eventLoops[i] = new EventLoop("stream-reader-" + i);
@@ -61,6 +71,11 @@ public StreamReaders(long size, ObjectManager objectManager, ObjectStorage objec
6171
this.objectReaderFactory = objectReaderFactory;
6272
this.objectManager = objectManager;
6373
this.objectStorage = objectStorage;
74+
75+
Threads.COMMON_SCHEDULER.scheduleAtFixedRate(this::triggerExpiredStreamReaderCleanup,
76+
STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS,
77+
STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS,
78+
TimeUnit.MILLISECONDS);
6479
}
6580

6681
@Override
@@ -70,6 +85,26 @@ public CompletableFuture<ReadDataBlock> read(TraceContext context, long streamId
7085
return cache.read(streamId, startOffset, endOffset, maxBytes);
7186
}
7287

88+
/**
89+
* Get the total number of active StreamReaders across all caches.
90+
* This method is intended for testing purposes only.
91+
*/
92+
@VisibleForTesting
93+
int getActiveStreamReaderCount() {
94+
int total = 0;
95+
for (Cache cache : caches) {
96+
total += cache.getStreamReaderCount();
97+
}
98+
return total;
99+
}
100+
101+
@VisibleForTesting
102+
void triggerExpiredStreamReaderCleanup() {
103+
for (Cache cache : caches) {
104+
cache.submitCleanupExpiredStreamReader();
105+
}
106+
}
107+
73108
static class StreamReaderKey {
74109
final long streamId;
75110
final long startOffset;
@@ -106,10 +141,11 @@ public String toString() {
106141
class Cache {
107142
private final EventLoop eventLoop;
108143
private final Map<StreamReaderKey, StreamReader> streamReaders = new HashMap<>();
109-
private long lastStreamReaderExpiredCheckTime = System.currentTimeMillis();
144+
private long lastStreamReaderExpiredCheckTime;
110145

111146
public Cache(EventLoop eventLoop) {
112147
this.eventLoop = eventLoop;
148+
this.lastStreamReaderExpiredCheckTime = time.milliseconds();
113149
}
114150

115151
public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset,
@@ -121,7 +157,7 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset,
121157
StreamReaderKey key = new StreamReaderKey(streamId, startOffset);
122158
StreamReader streamReader = streamReaders.remove(key);
123159
if (streamReader == null) {
124-
streamReader = new StreamReader(streamId, startOffset, eventLoop, objectManager, objectReaderFactory, dataBlockCache);
160+
streamReader = new StreamReader(streamId, startOffset, eventLoop, objectManager, objectReaderFactory, dataBlockCache, time);
125161
}
126162
StreamReader finalStreamReader = streamReader;
127163
CompletableFuture<ReadDataBlock> streamReadCf = streamReader.read(startOffset, endOffset, maxBytes)
@@ -142,8 +178,21 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset,
142178
return cf;
143179
}
144180

181+
private void submitCleanupExpiredStreamReader() {
182+
eventLoop.execute(this::cleanupExpiredStreamReader);
183+
}
184+
185+
/**
186+
* Get the number of StreamReaders in this cache.
187+
* This method is intended for testing purposes only.
188+
*/
189+
@VisibleForTesting
190+
int getStreamReaderCount() {
191+
return streamReaders.size();
192+
}
193+
145194
private void cleanupExpiredStreamReader() {
146-
long now = System.currentTimeMillis();
195+
long now = time.milliseconds();
147196
if (now > lastStreamReaderExpiredCheckTime + STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS) {
148197
lastStreamReaderExpiredCheckTime = now;
149198
Iterator<Map.Entry<StreamReaderKey, StreamReader>> it = streamReaders.entrySet().iterator();
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
* Copyright 2024, AutoMQ HK Limited.
3+
*
4+
* The use of this file is governed by the Business Source License,
5+
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
package com.automq.stream.s3.cache.blockcache;
13+
14+
import com.automq.stream.s3.ObjectReader;
15+
import com.automq.stream.s3.TestUtils;
16+
import com.automq.stream.s3.cache.ReadDataBlock;
17+
import com.automq.stream.s3.metadata.S3ObjectMetadata;
18+
import com.automq.stream.s3.model.StreamRecordBatch;
19+
import com.automq.stream.s3.objects.ObjectManager;
20+
import com.automq.stream.s3.operator.MemoryObjectStorage;
21+
import com.automq.stream.s3.operator.ObjectStorage;
22+
import com.automq.stream.s3.trace.context.TraceContext;
23+
import com.automq.stream.utils.MockTime;
24+
25+
import org.junit.jupiter.api.AfterEach;
26+
import org.junit.jupiter.api.BeforeEach;
27+
import org.junit.jupiter.api.Tag;
28+
import org.junit.jupiter.api.Test;
29+
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.concurrent.CompletableFuture;
34+
import java.util.concurrent.TimeUnit;
35+
36+
import static org.awaitility.Awaitility.await;
37+
import static org.junit.jupiter.api.Assertions.assertEquals;
38+
import static org.mockito.ArgumentMatchers.anyInt;
39+
import static org.mockito.ArgumentMatchers.anyLong;
40+
import static org.mockito.ArgumentMatchers.eq;
41+
import static org.mockito.Mockito.mock;
42+
import static org.mockito.Mockito.when;
43+
44+
@Tag("S3Unit")
45+
public class StreamReadersTest {
46+
private static final long STREAM_ID_1 = 100L;
47+
private static final long STREAM_ID_2 = 200L;
48+
private static final int BLOCK_SIZE_THRESHOLD = 1024;
49+
50+
private Map<Long, MockObject> objects;
51+
private ObjectManager objectManager;
52+
private ObjectStorage objectStorage;
53+
private ObjectReaderFactory objectReaderFactory;
54+
private StreamReaders streamReaders;
55+
private MockTime mockTime;
56+
57+
@BeforeEach
58+
void setup() {
59+
objects = new HashMap<>();
60+
61+
// Create mock objects for testing with different offset ranges
62+
// Object 1: STREAM_ID_1 offset 0-2
63+
objects.put(1L, MockObject.builder(1L, BLOCK_SIZE_THRESHOLD).write(STREAM_ID_1, List.of(
64+
new StreamRecordBatch(STREAM_ID_1, 0, 0, 2, TestUtils.random(100))
65+
)).build());
66+
// Object 2: STREAM_ID_2 offset 0-1
67+
objects.put(2L, MockObject.builder(2L, BLOCK_SIZE_THRESHOLD).write(STREAM_ID_2, List.of(
68+
new StreamRecordBatch(STREAM_ID_2, 0, 0, 1, TestUtils.random(100))
69+
)).build());
70+
71+
objectManager = mock(ObjectManager.class);
72+
73+
when(objectManager.isObjectExist(anyLong())).thenReturn(true);
74+
// Mock getObjects method to return appropriate objects based on offset ranges
75+
// For STREAM_ID_1, use the combined object that covers 0-2 range
76+
when(objectManager.getObjects(eq(STREAM_ID_1), anyLong(), anyLong(), anyInt()))
77+
.thenReturn(CompletableFuture.completedFuture(List.of(objects.get(1L).metadata)));
78+
// STREAM_ID_2 offset 0-1 -> object 3
79+
when(objectManager.getObjects(eq(STREAM_ID_2), anyLong(), anyLong(), anyInt()))
80+
.thenReturn(CompletableFuture.completedFuture(List.of(objects.get(2L).metadata)));
81+
82+
objectStorage = new MemoryObjectStorage();
83+
84+
objectReaderFactory = new ObjectReaderFactory() {
85+
@Override
86+
public ObjectReader get(S3ObjectMetadata metadata) {
87+
return objects.get(metadata.objectId()).objectReader();
88+
}
89+
90+
@Override
91+
public ObjectStorage getObjectStorage() {
92+
return objectStorage;
93+
}
94+
};
95+
96+
mockTime = new MockTime();
97+
streamReaders = new StreamReaders(Long.MAX_VALUE, objectManager, objectStorage, objectReaderFactory, 2, mockTime);
98+
}
99+
100+
@AfterEach
101+
void tearDown() {
102+
if (streamReaders != null) {
103+
// Clean up resources
104+
streamReaders = null;
105+
}
106+
}
107+
108+
@Test
109+
public void testStreamReaderCreationAndReuse() throws Exception {
110+
TraceContext context = TraceContext.DEFAULT;
111+
112+
// Initially no StreamReaders
113+
assertEquals(0, streamReaders.getActiveStreamReaderCount());
114+
115+
// Create first StreamReader
116+
CompletableFuture<ReadDataBlock> readFuture1 = streamReaders.read(context, STREAM_ID_1, 0, 1, Integer.MAX_VALUE);
117+
ReadDataBlock result1 = readFuture1.get(5, TimeUnit.SECONDS);
118+
result1.getRecords().forEach(StreamRecordBatch::release);
119+
120+
assertEquals(1, streamReaders.getActiveStreamReaderCount());
121+
122+
// Read from same stream again - should reuse existing StreamReader
123+
CompletableFuture<ReadDataBlock> readFuture2 = streamReaders.read(context, STREAM_ID_1, 1, 2, Integer.MAX_VALUE);
124+
ReadDataBlock result2 = readFuture2.get(5, TimeUnit.SECONDS);
125+
result2.getRecords().forEach(StreamRecordBatch::release);
126+
127+
// Should still have 1 StreamReader (reused)
128+
assertEquals(1, streamReaders.getActiveStreamReaderCount());
129+
}
130+
131+
@Test
132+
public void testCleanupTrigger() throws Exception {
133+
TraceContext context = TraceContext.DEFAULT;
134+
135+
// Create some StreamReaders
136+
CompletableFuture<ReadDataBlock> readFuture1 = streamReaders.read(context, STREAM_ID_1, 0, 1, Integer.MAX_VALUE);
137+
ReadDataBlock result1 = readFuture1.get(5, TimeUnit.SECONDS);
138+
result1.getRecords().forEach(StreamRecordBatch::release);
139+
140+
CompletableFuture<ReadDataBlock> readFuture2 = streamReaders.read(context, STREAM_ID_2, 0, 1, Integer.MAX_VALUE);
141+
ReadDataBlock result2 = readFuture2.get(5, TimeUnit.SECONDS);
142+
result2.getRecords().forEach(StreamRecordBatch::release);
143+
144+
assertEquals(2, streamReaders.getActiveStreamReaderCount());
145+
146+
// Trigger cleanup - should not affect non-expired readers
147+
streamReaders.triggerExpiredStreamReaderCleanup();
148+
149+
// Wait for async cleanup to complete
150+
await().atMost(1, TimeUnit.SECONDS)
151+
.pollInterval(100, TimeUnit.MILLISECONDS)
152+
.until(() -> streamReaders.getActiveStreamReaderCount() == 2);
153+
154+
// StreamReaders should still be there (not expired yet)
155+
assertEquals(2, streamReaders.getActiveStreamReaderCount());
156+
}
157+
158+
@Test
159+
public void testExpiredStreamReaderCleanupExecution() throws Exception {
160+
TraceContext context = TraceContext.DEFAULT;
161+
162+
// Create a StreamReader
163+
CompletableFuture<ReadDataBlock> readFuture = streamReaders.read(context, STREAM_ID_1, 0, 1, Integer.MAX_VALUE);
164+
ReadDataBlock result = readFuture.get(5, TimeUnit.SECONDS);
165+
result.getRecords().forEach(StreamRecordBatch::release);
166+
167+
assertEquals(1, streamReaders.getActiveStreamReaderCount());
168+
169+
// Advance mock time to simulate expiration (advance by 2 minutes, expiration is 1 minute)
170+
mockTime.sleep(TimeUnit.MINUTES.toMillis(2));
171+
172+
// Trigger cleanup - should now clean up expired StreamReaders
173+
streamReaders.triggerExpiredStreamReaderCleanup();
174+
175+
// Wait for async cleanup to complete
176+
await().atMost(5, TimeUnit.SECONDS)
177+
.pollInterval(100, TimeUnit.MILLISECONDS)
178+
.until(() -> streamReaders.getActiveStreamReaderCount() == 0);
179+
180+
// Verify system still works after cleanup
181+
CompletableFuture<ReadDataBlock> readFuture2 = streamReaders.read(context, STREAM_ID_2, 0, 1, Integer.MAX_VALUE);
182+
ReadDataBlock result2 = readFuture2.get(5, TimeUnit.SECONDS);
183+
result2.getRecords().forEach(StreamRecordBatch::release);
184+
185+
assertEquals(1, streamReaders.getActiveStreamReaderCount());
186+
}
187+
188+
189+
190+
}

0 commit comments

Comments
 (0)