diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java index ab6a900f7a..340ffcd5f1 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java @@ -34,6 +34,7 @@ import com.automq.stream.s3.objects.ObjectManager; import com.automq.stream.utils.FutureUtil; import com.automq.stream.utils.LogSuppressor; +import com.automq.stream.utils.Time; import com.automq.stream.utils.threads.EventLoop; import com.google.common.annotations.VisibleForTesting; @@ -80,19 +81,27 @@ private final ObjectManager objectManager; private final ObjectReaderFactory objectReaderFactory; private final DataBlockCache dataBlockCache; + private final Time time; long nextReadOffset; private CompletableFuture inflightLoadIndexCf; private volatile CompletableFuture afterReadTryReadaheadCf; - private long lastAccessTimestamp = System.currentTimeMillis(); + private long lastAccessTimestamp; private boolean reading = false; private boolean closed = false; public StreamReader(long streamId, long nextReadOffset, EventLoop eventLoop, ObjectManager objectManager, ObjectReaderFactory objectReaderFactory, DataBlockCache dataBlockCache) { + this(streamId, nextReadOffset, eventLoop, objectManager, objectReaderFactory, dataBlockCache, Time.SYSTEM); + } + + public StreamReader(long streamId, long nextReadOffset, EventLoop eventLoop, ObjectManager objectManager, + ObjectReaderFactory objectReaderFactory, DataBlockCache dataBlockCache, Time time) { this.streamId = streamId; this.nextReadOffset = nextReadOffset; this.readahead = new Readahead(); + this.time = time; + this.lastAccessTimestamp = time.milliseconds(); this.eventLoop = eventLoop; this.objectManager = objectManager; @@ -117,7 +126,7 @@ public CompletableFuture read(long startOffset, long endOffset, i } CompletableFuture read(long startOffset, long endOffset, int maxBytes, int leftRetries) { - lastAccessTimestamp = System.currentTimeMillis(); + lastAccessTimestamp = time.milliseconds(); ReadContext readContext = new ReadContext(); read0(readContext, startOffset, endOffset, maxBytes); CompletableFuture retCf = new CompletableFuture<>(); @@ -617,7 +626,7 @@ class Readahead { private int cacheMissCount; public void tryReadahead(boolean cacheMiss) { - if (System.currentTimeMillis() - resetTimestamp < READAHEAD_RESET_COLD_DOWN_MILLS) { + if (time.milliseconds() - resetTimestamp < READAHEAD_RESET_COLD_DOWN_MILLS) { // skip readahead when readahead is in cold down return; } @@ -660,7 +669,7 @@ public void tryReadahead(boolean cacheMiss) { public void reset() { requireReset = true; - resetTimestamp = System.currentTimeMillis(); + resetTimestamp = time.milliseconds(); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReaders.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReaders.java index 4a9e27daee..f1028675a6 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReaders.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReaders.java @@ -26,7 +26,10 @@ import com.automq.stream.s3.trace.context.TraceContext; import com.automq.stream.utils.FutureUtil; import com.automq.stream.utils.Systems; +import com.automq.stream.utils.Threads; +import com.automq.stream.utils.Time; import com.automq.stream.utils.threads.EventLoop; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +46,7 @@ public class StreamReaders implements S3BlockCache { private static final long STREAM_READER_EXPIRED_MILLS = TimeUnit.MINUTES.toMillis(1); private static final long STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS = TimeUnit.MINUTES.toMillis(1); private final Cache[] caches; + private final Time time; private final DataBlockCache dataBlockCache; private final ObjectReaderFactory objectReaderFactory; @@ -51,11 +55,17 @@ public class StreamReaders implements S3BlockCache { public StreamReaders(long size, ObjectManager objectManager, ObjectStorage objectStorage, ObjectReaderFactory objectReaderFactory) { - this(size, objectManager, objectStorage, objectReaderFactory, Systems.CPU_CORES); + this(size, objectManager, objectStorage, objectReaderFactory, Systems.CPU_CORES, Time.SYSTEM); } public StreamReaders(long size, ObjectManager objectManager, ObjectStorage objectStorage, ObjectReaderFactory objectReaderFactory, int concurrency) { + this(size, objectManager, objectStorage, objectReaderFactory, concurrency, Time.SYSTEM); + } + + public StreamReaders(long size, ObjectManager objectManager, ObjectStorage objectStorage, + ObjectReaderFactory objectReaderFactory, int concurrency, Time time) { + this.time = time; EventLoop[] eventLoops = new EventLoop[concurrency]; for (int i = 0; i < concurrency; i++) { eventLoops[i] = new EventLoop("stream-reader-" + i); @@ -69,6 +79,11 @@ public StreamReaders(long size, ObjectManager objectManager, ObjectStorage objec this.objectReaderFactory = objectReaderFactory; this.objectManager = objectManager; this.objectStorage = objectStorage; + + Threads.COMMON_SCHEDULER.scheduleAtFixedRate(this::triggerExpiredStreamReaderCleanup, + STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS, + STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS, + TimeUnit.MILLISECONDS); } @Override @@ -78,6 +93,26 @@ public CompletableFuture read(TraceContext context, long streamId return cache.read(streamId, startOffset, endOffset, maxBytes); } + /** + * Get the total number of active StreamReaders across all caches. + * This method is intended for testing purposes only. + */ + @VisibleForTesting + int getActiveStreamReaderCount() { + int total = 0; + for (Cache cache : caches) { + total += cache.getStreamReaderCount(); + } + return total; + } + + @VisibleForTesting + void triggerExpiredStreamReaderCleanup() { + for (Cache cache : caches) { + cache.submitCleanupExpiredStreamReader(); + } + } + static class StreamReaderKey { final long streamId; final long startOffset; @@ -114,10 +149,11 @@ public String toString() { class Cache { private final EventLoop eventLoop; private final Map streamReaders = new HashMap<>(); - private long lastStreamReaderExpiredCheckTime = System.currentTimeMillis(); + private long lastStreamReaderExpiredCheckTime; public Cache(EventLoop eventLoop) { this.eventLoop = eventLoop; + this.lastStreamReaderExpiredCheckTime = time.milliseconds(); } public CompletableFuture read(long streamId, long startOffset, @@ -129,7 +165,7 @@ public CompletableFuture read(long streamId, long startOffset, StreamReaderKey key = new StreamReaderKey(streamId, startOffset); StreamReader streamReader = streamReaders.remove(key); if (streamReader == null) { - streamReader = new StreamReader(streamId, startOffset, eventLoop, objectManager, objectReaderFactory, dataBlockCache); + streamReader = new StreamReader(streamId, startOffset, eventLoop, objectManager, objectReaderFactory, dataBlockCache, time); } StreamReader finalStreamReader = streamReader; CompletableFuture streamReadCf = streamReader.read(startOffset, endOffset, maxBytes) @@ -150,8 +186,21 @@ public CompletableFuture read(long streamId, long startOffset, return cf; } + private void submitCleanupExpiredStreamReader() { + eventLoop.execute(this::cleanupExpiredStreamReader); + } + + /** + * Get the number of StreamReaders in this cache. + * This method is intended for testing purposes only. + */ + @VisibleForTesting + int getStreamReaderCount() { + return streamReaders.size(); + } + private void cleanupExpiredStreamReader() { - long now = System.currentTimeMillis(); + long now = time.milliseconds(); if (now > lastStreamReaderExpiredCheckTime + STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS) { lastStreamReaderExpiredCheckTime = now; Iterator> it = streamReaders.entrySet().iterator(); diff --git a/s3stream/src/test/java/com/automq/stream/s3/cache/blockcache/StreamReadersTest.java b/s3stream/src/test/java/com/automq/stream/s3/cache/blockcache/StreamReadersTest.java new file mode 100644 index 0000000000..bf15e4ce9b --- /dev/null +++ b/s3stream/src/test/java/com/automq/stream/s3/cache/blockcache/StreamReadersTest.java @@ -0,0 +1,190 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.stream.s3.cache.blockcache; + +import com.automq.stream.s3.ObjectReader; +import com.automq.stream.s3.TestUtils; +import com.automq.stream.s3.cache.ReadDataBlock; +import com.automq.stream.s3.metadata.S3ObjectMetadata; +import com.automq.stream.s3.model.StreamRecordBatch; +import com.automq.stream.s3.objects.ObjectManager; +import com.automq.stream.s3.operator.MemoryObjectStorage; +import com.automq.stream.s3.operator.ObjectStorage; +import com.automq.stream.s3.trace.context.TraceContext; +import com.automq.stream.utils.MockTime; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@Tag("S3Unit") +public class StreamReadersTest { + private static final long STREAM_ID_1 = 100L; + private static final long STREAM_ID_2 = 200L; + private static final int BLOCK_SIZE_THRESHOLD = 1024; + + private Map objects; + private ObjectManager objectManager; + private ObjectStorage objectStorage; + private ObjectReaderFactory objectReaderFactory; + private StreamReaders streamReaders; + private MockTime mockTime; + + @BeforeEach + void setup() { + objects = new HashMap<>(); + + // Create mock objects for testing with different offset ranges + // Object 1: STREAM_ID_1 offset 0-2 + objects.put(1L, MockObject.builder(1L, BLOCK_SIZE_THRESHOLD).write(STREAM_ID_1, List.of( + new StreamRecordBatch(STREAM_ID_1, 0, 0, 2, TestUtils.random(100)) + )).build()); + // Object 2: STREAM_ID_2 offset 0-1 + objects.put(2L, MockObject.builder(2L, BLOCK_SIZE_THRESHOLD).write(STREAM_ID_2, List.of( + new StreamRecordBatch(STREAM_ID_2, 0, 0, 1, TestUtils.random(100)) + )).build()); + + objectManager = mock(ObjectManager.class); + + when(objectManager.isObjectExist(anyLong())).thenReturn(true); + // Mock getObjects method to return appropriate objects based on offset ranges + // For STREAM_ID_1, use the combined object that covers 0-2 range + when(objectManager.getObjects(eq(STREAM_ID_1), anyLong(), anyLong(), anyInt())) + .thenReturn(CompletableFuture.completedFuture(List.of(objects.get(1L).metadata))); + // STREAM_ID_2 offset 0-1 -> object 3 + when(objectManager.getObjects(eq(STREAM_ID_2), anyLong(), anyLong(), anyInt())) + .thenReturn(CompletableFuture.completedFuture(List.of(objects.get(2L).metadata))); + + objectStorage = new MemoryObjectStorage(); + + objectReaderFactory = new ObjectReaderFactory() { + @Override + public ObjectReader get(S3ObjectMetadata metadata) { + return objects.get(metadata.objectId()).objectReader(); + } + + @Override + public ObjectStorage getObjectStorage() { + return objectStorage; + } + }; + + mockTime = new MockTime(); + streamReaders = new StreamReaders(Long.MAX_VALUE, objectManager, objectStorage, objectReaderFactory, 2, mockTime); + } + + @AfterEach + void tearDown() { + if (streamReaders != null) { + // Clean up resources + streamReaders = null; + } + } + + @Test + public void testStreamReaderCreationAndReuse() throws Exception { + TraceContext context = TraceContext.DEFAULT; + + // Initially no StreamReaders + assertEquals(0, streamReaders.getActiveStreamReaderCount()); + + // Create first StreamReader + CompletableFuture readFuture1 = streamReaders.read(context, STREAM_ID_1, 0, 1, Integer.MAX_VALUE); + ReadDataBlock result1 = readFuture1.get(5, TimeUnit.SECONDS); + result1.getRecords().forEach(StreamRecordBatch::release); + + assertEquals(1, streamReaders.getActiveStreamReaderCount()); + + // Read from same stream again - should reuse existing StreamReader + CompletableFuture readFuture2 = streamReaders.read(context, STREAM_ID_1, 1, 2, Integer.MAX_VALUE); + ReadDataBlock result2 = readFuture2.get(5, TimeUnit.SECONDS); + result2.getRecords().forEach(StreamRecordBatch::release); + + // Should still have 1 StreamReader (reused) + assertEquals(1, streamReaders.getActiveStreamReaderCount()); + } + + @Test + public void testCleanupTrigger() throws Exception { + TraceContext context = TraceContext.DEFAULT; + + // Create some StreamReaders + CompletableFuture readFuture1 = streamReaders.read(context, STREAM_ID_1, 0, 1, Integer.MAX_VALUE); + ReadDataBlock result1 = readFuture1.get(5, TimeUnit.SECONDS); + result1.getRecords().forEach(StreamRecordBatch::release); + + CompletableFuture readFuture2 = streamReaders.read(context, STREAM_ID_2, 0, 1, Integer.MAX_VALUE); + ReadDataBlock result2 = readFuture2.get(5, TimeUnit.SECONDS); + result2.getRecords().forEach(StreamRecordBatch::release); + + assertEquals(2, streamReaders.getActiveStreamReaderCount()); + + // Trigger cleanup - should not affect non-expired readers + streamReaders.triggerExpiredStreamReaderCleanup(); + + // Wait for async cleanup to complete + await().atMost(1, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> streamReaders.getActiveStreamReaderCount() == 2); + + // StreamReaders should still be there (not expired yet) + assertEquals(2, streamReaders.getActiveStreamReaderCount()); + } + + @Test + public void testExpiredStreamReaderCleanupExecution() throws Exception { + TraceContext context = TraceContext.DEFAULT; + + // Create a StreamReader + CompletableFuture readFuture = streamReaders.read(context, STREAM_ID_1, 0, 1, Integer.MAX_VALUE); + ReadDataBlock result = readFuture.get(5, TimeUnit.SECONDS); + result.getRecords().forEach(StreamRecordBatch::release); + + assertEquals(1, streamReaders.getActiveStreamReaderCount()); + + // Advance mock time to simulate expiration (advance by 2 minutes, expiration is 1 minute) + mockTime.sleep(TimeUnit.MINUTES.toMillis(2)); + + // Trigger cleanup - should now clean up expired StreamReaders + streamReaders.triggerExpiredStreamReaderCleanup(); + + // Wait for async cleanup to complete + await().atMost(5, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> streamReaders.getActiveStreamReaderCount() == 0); + + // Verify system still works after cleanup + CompletableFuture readFuture2 = streamReaders.read(context, STREAM_ID_2, 0, 1, Integer.MAX_VALUE); + ReadDataBlock result2 = readFuture2.get(5, TimeUnit.SECONDS); + result2.getRecords().forEach(StreamRecordBatch::release); + + assertEquals(1, streamReaders.getActiveStreamReaderCount()); + } + + + +}