Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.LogSuppressor;
import com.automq.stream.utils.Time;
import com.automq.stream.utils.threads.EventLoop;
import com.google.common.annotations.VisibleForTesting;

Expand Down Expand Up @@ -80,19 +81,27 @@
private final ObjectManager objectManager;
private final ObjectReaderFactory objectReaderFactory;
private final DataBlockCache dataBlockCache;
private final Time time;
long nextReadOffset;
private CompletableFuture<Void> inflightLoadIndexCf;
private volatile CompletableFuture<Void> afterReadTryReadaheadCf;
private long lastAccessTimestamp = System.currentTimeMillis();
private long lastAccessTimestamp;
private boolean reading = false;

private boolean closed = false;

public StreamReader(long streamId, long nextReadOffset, EventLoop eventLoop, ObjectManager objectManager,
ObjectReaderFactory objectReaderFactory, DataBlockCache dataBlockCache) {
this(streamId, nextReadOffset, eventLoop, objectManager, objectReaderFactory, dataBlockCache, Time.SYSTEM);
}

public StreamReader(long streamId, long nextReadOffset, EventLoop eventLoop, ObjectManager objectManager,
ObjectReaderFactory objectReaderFactory, DataBlockCache dataBlockCache, Time time) {
this.streamId = streamId;
this.nextReadOffset = nextReadOffset;
this.readahead = new Readahead();
this.time = time;
this.lastAccessTimestamp = time.milliseconds();

this.eventLoop = eventLoop;
this.objectManager = objectManager;
Expand All @@ -117,7 +126,7 @@ public CompletableFuture<ReadDataBlock> read(long startOffset, long endOffset, i
}

CompletableFuture<ReadDataBlock> read(long startOffset, long endOffset, int maxBytes, int leftRetries) {
lastAccessTimestamp = System.currentTimeMillis();
lastAccessTimestamp = time.milliseconds();
ReadContext readContext = new ReadContext();
read0(readContext, startOffset, endOffset, maxBytes);
CompletableFuture<ReadDataBlock> retCf = new CompletableFuture<>();
Expand Down Expand Up @@ -617,7 +626,7 @@ class Readahead {
private int cacheMissCount;

public void tryReadahead(boolean cacheMiss) {
if (System.currentTimeMillis() - resetTimestamp < READAHEAD_RESET_COLD_DOWN_MILLS) {
if (time.milliseconds() - resetTimestamp < READAHEAD_RESET_COLD_DOWN_MILLS) {
// skip readahead when readahead is in cold down
return;
}
Expand Down Expand Up @@ -660,7 +669,7 @@ public void tryReadahead(boolean cacheMiss) {

public void reset() {
requireReset = true;
resetTimestamp = System.currentTimeMillis();
resetTimestamp = time.milliseconds();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import com.automq.stream.s3.trace.context.TraceContext;
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.Systems;
import com.automq.stream.utils.Threads;
import com.automq.stream.utils.Time;
import com.automq.stream.utils.threads.EventLoop;
import com.google.common.annotations.VisibleForTesting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,6 +46,7 @@ public class StreamReaders implements S3BlockCache {
private static final long STREAM_READER_EXPIRED_MILLS = TimeUnit.MINUTES.toMillis(1);
private static final long STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS = TimeUnit.MINUTES.toMillis(1);
private final Cache[] caches;
private final Time time;
private final DataBlockCache dataBlockCache;
private final ObjectReaderFactory objectReaderFactory;

Expand All @@ -51,11 +55,17 @@ public class StreamReaders implements S3BlockCache {

public StreamReaders(long size, ObjectManager objectManager, ObjectStorage objectStorage,
ObjectReaderFactory objectReaderFactory) {
this(size, objectManager, objectStorage, objectReaderFactory, Systems.CPU_CORES);
this(size, objectManager, objectStorage, objectReaderFactory, Systems.CPU_CORES, Time.SYSTEM);
}

public StreamReaders(long size, ObjectManager objectManager, ObjectStorage objectStorage,
ObjectReaderFactory objectReaderFactory, int concurrency) {
this(size, objectManager, objectStorage, objectReaderFactory, concurrency, Time.SYSTEM);
}

public StreamReaders(long size, ObjectManager objectManager, ObjectStorage objectStorage,
ObjectReaderFactory objectReaderFactory, int concurrency, Time time) {
this.time = time;
EventLoop[] eventLoops = new EventLoop[concurrency];
for (int i = 0; i < concurrency; i++) {
eventLoops[i] = new EventLoop("stream-reader-" + i);
Expand All @@ -69,6 +79,11 @@ public StreamReaders(long size, ObjectManager objectManager, ObjectStorage objec
this.objectReaderFactory = objectReaderFactory;
this.objectManager = objectManager;
this.objectStorage = objectStorage;

Threads.COMMON_SCHEDULER.scheduleAtFixedRate(this::triggerExpiredStreamReaderCleanup,
STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS,
STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS,
TimeUnit.MILLISECONDS);
}

@Override
Expand All @@ -78,6 +93,26 @@ public CompletableFuture<ReadDataBlock> read(TraceContext context, long streamId
return cache.read(streamId, startOffset, endOffset, maxBytes);
}

/**
* Get the total number of active StreamReaders across all caches.
* This method is intended for testing purposes only.
*/
@VisibleForTesting
int getActiveStreamReaderCount() {
int total = 0;
for (Cache cache : caches) {
total += cache.getStreamReaderCount();
}
return total;
}

@VisibleForTesting
void triggerExpiredStreamReaderCleanup() {
for (Cache cache : caches) {
cache.submitCleanupExpiredStreamReader();
}
}

static class StreamReaderKey {
final long streamId;
final long startOffset;
Expand Down Expand Up @@ -114,10 +149,11 @@ public String toString() {
class Cache {
private final EventLoop eventLoop;
private final Map<StreamReaderKey, StreamReader> streamReaders = new HashMap<>();
private long lastStreamReaderExpiredCheckTime = System.currentTimeMillis();
private long lastStreamReaderExpiredCheckTime;

public Cache(EventLoop eventLoop) {
this.eventLoop = eventLoop;
this.lastStreamReaderExpiredCheckTime = time.milliseconds();
}

public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset,
Expand All @@ -129,7 +165,7 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset,
StreamReaderKey key = new StreamReaderKey(streamId, startOffset);
StreamReader streamReader = streamReaders.remove(key);
if (streamReader == null) {
streamReader = new StreamReader(streamId, startOffset, eventLoop, objectManager, objectReaderFactory, dataBlockCache);
streamReader = new StreamReader(streamId, startOffset, eventLoop, objectManager, objectReaderFactory, dataBlockCache, time);
}
StreamReader finalStreamReader = streamReader;
CompletableFuture<ReadDataBlock> streamReadCf = streamReader.read(startOffset, endOffset, maxBytes)
Expand All @@ -150,8 +186,21 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset,
return cf;
}

private void submitCleanupExpiredStreamReader() {
eventLoop.execute(this::cleanupExpiredStreamReader);
}

/**
* Get the number of StreamReaders in this cache.
* This method is intended for testing purposes only.
*/
@VisibleForTesting
int getStreamReaderCount() {
return streamReaders.size();
}

private void cleanupExpiredStreamReader() {
long now = System.currentTimeMillis();
long now = time.milliseconds();
if (now > lastStreamReaderExpiredCheckTime + STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS) {
lastStreamReaderExpiredCheckTime = now;
Iterator<Map.Entry<StreamReaderKey, StreamReader>> it = streamReaders.entrySet().iterator();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.cache.blockcache;

import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.TestUtils;
import com.automq.stream.s3.cache.ReadDataBlock;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.MemoryObjectStorage;
import com.automq.stream.s3.operator.ObjectStorage;
import com.automq.stream.s3.trace.context.TraceContext;
import com.automq.stream.utils.MockTime;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@Tag("S3Unit")
public class StreamReadersTest {
private static final long STREAM_ID_1 = 100L;
private static final long STREAM_ID_2 = 200L;
private static final int BLOCK_SIZE_THRESHOLD = 1024;

private Map<Long, MockObject> objects;
private ObjectManager objectManager;
private ObjectStorage objectStorage;
private ObjectReaderFactory objectReaderFactory;
private StreamReaders streamReaders;
private MockTime mockTime;

@BeforeEach
void setup() {
objects = new HashMap<>();

// Create mock objects for testing with different offset ranges
// Object 1: STREAM_ID_1 offset 0-2
objects.put(1L, MockObject.builder(1L, BLOCK_SIZE_THRESHOLD).write(STREAM_ID_1, List.of(
new StreamRecordBatch(STREAM_ID_1, 0, 0, 2, TestUtils.random(100))
)).build());
// Object 2: STREAM_ID_2 offset 0-1
objects.put(2L, MockObject.builder(2L, BLOCK_SIZE_THRESHOLD).write(STREAM_ID_2, List.of(
new StreamRecordBatch(STREAM_ID_2, 0, 0, 1, TestUtils.random(100))
)).build());

objectManager = mock(ObjectManager.class);

when(objectManager.isObjectExist(anyLong())).thenReturn(true);
// Mock getObjects method to return appropriate objects based on offset ranges
// For STREAM_ID_1, use the combined object that covers 0-2 range
when(objectManager.getObjects(eq(STREAM_ID_1), anyLong(), anyLong(), anyInt()))
.thenReturn(CompletableFuture.completedFuture(List.of(objects.get(1L).metadata)));
// STREAM_ID_2 offset 0-1 -> object 3
when(objectManager.getObjects(eq(STREAM_ID_2), anyLong(), anyLong(), anyInt()))
.thenReturn(CompletableFuture.completedFuture(List.of(objects.get(2L).metadata)));

objectStorage = new MemoryObjectStorage();

objectReaderFactory = new ObjectReaderFactory() {
@Override
public ObjectReader get(S3ObjectMetadata metadata) {
return objects.get(metadata.objectId()).objectReader();
}

@Override
public ObjectStorage getObjectStorage() {
return objectStorage;
}
};

mockTime = new MockTime();
streamReaders = new StreamReaders(Long.MAX_VALUE, objectManager, objectStorage, objectReaderFactory, 2, mockTime);
}

@AfterEach
void tearDown() {
if (streamReaders != null) {
// Clean up resources
streamReaders = null;
}
}

@Test
public void testStreamReaderCreationAndReuse() throws Exception {
TraceContext context = TraceContext.DEFAULT;

// Initially no StreamReaders
assertEquals(0, streamReaders.getActiveStreamReaderCount());

// Create first StreamReader
CompletableFuture<ReadDataBlock> readFuture1 = streamReaders.read(context, STREAM_ID_1, 0, 1, Integer.MAX_VALUE);
ReadDataBlock result1 = readFuture1.get(5, TimeUnit.SECONDS);
result1.getRecords().forEach(StreamRecordBatch::release);

assertEquals(1, streamReaders.getActiveStreamReaderCount());

// Read from same stream again - should reuse existing StreamReader
CompletableFuture<ReadDataBlock> readFuture2 = streamReaders.read(context, STREAM_ID_1, 1, 2, Integer.MAX_VALUE);
ReadDataBlock result2 = readFuture2.get(5, TimeUnit.SECONDS);
result2.getRecords().forEach(StreamRecordBatch::release);

// Should still have 1 StreamReader (reused)
assertEquals(1, streamReaders.getActiveStreamReaderCount());
}

@Test
public void testCleanupTrigger() throws Exception {
TraceContext context = TraceContext.DEFAULT;

// Create some StreamReaders
CompletableFuture<ReadDataBlock> readFuture1 = streamReaders.read(context, STREAM_ID_1, 0, 1, Integer.MAX_VALUE);
ReadDataBlock result1 = readFuture1.get(5, TimeUnit.SECONDS);
result1.getRecords().forEach(StreamRecordBatch::release);

CompletableFuture<ReadDataBlock> readFuture2 = streamReaders.read(context, STREAM_ID_2, 0, 1, Integer.MAX_VALUE);
ReadDataBlock result2 = readFuture2.get(5, TimeUnit.SECONDS);
result2.getRecords().forEach(StreamRecordBatch::release);

assertEquals(2, streamReaders.getActiveStreamReaderCount());

// Trigger cleanup - should not affect non-expired readers
streamReaders.triggerExpiredStreamReaderCleanup();

// Wait for async cleanup to complete
await().atMost(1, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.until(() -> streamReaders.getActiveStreamReaderCount() == 2);

// StreamReaders should still be there (not expired yet)
assertEquals(2, streamReaders.getActiveStreamReaderCount());
}

@Test
public void testExpiredStreamReaderCleanupExecution() throws Exception {
TraceContext context = TraceContext.DEFAULT;

// Create a StreamReader
CompletableFuture<ReadDataBlock> readFuture = streamReaders.read(context, STREAM_ID_1, 0, 1, Integer.MAX_VALUE);
ReadDataBlock result = readFuture.get(5, TimeUnit.SECONDS);
result.getRecords().forEach(StreamRecordBatch::release);

assertEquals(1, streamReaders.getActiveStreamReaderCount());

// Advance mock time to simulate expiration (advance by 2 minutes, expiration is 1 minute)
mockTime.sleep(TimeUnit.MINUTES.toMillis(2));

// Trigger cleanup - should now clean up expired StreamReaders
streamReaders.triggerExpiredStreamReaderCleanup();

// Wait for async cleanup to complete
await().atMost(5, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.until(() -> streamReaders.getActiveStreamReaderCount() == 0);

// Verify system still works after cleanup
CompletableFuture<ReadDataBlock> readFuture2 = streamReaders.read(context, STREAM_ID_2, 0, 1, Integer.MAX_VALUE);
ReadDataBlock result2 = readFuture2.get(5, TimeUnit.SECONDS);
result2.getRecords().forEach(StreamRecordBatch::release);

assertEquals(1, streamReaders.getActiveStreamReaderCount());
}



}
Loading