Skip to content

Commit fc5e335

Browse files
authored
feat(zerozone2): S3WAL supports sequential append and distributed read (#2764)
- S3WAL supports sequential append & callback - Simple the S3Storage logic by S3WAl sequential append - S3WAL supports distributed read from another nodes - S3Storage supports linked record Signed-off-by: Robin Han <[email protected]>
1 parent 0798e2e commit fc5e335

File tree

61 files changed

+3285
-2829
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+3285
-2829
lines changed

core/src/main/java/kafka/automq/zerozone/SnapshotReadPartitionsManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
import org.apache.kafka.metadata.stream.S3StreamSetObject;
5252
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
5353

54-
import com.automq.stream.s3.cache.SnapshotReadCache;
54+
import com.automq.stream.Context;
5555
import com.automq.stream.s3.metadata.S3ObjectMetadata;
5656
import com.automq.stream.utils.Threads;
5757
import com.automq.stream.utils.threads.EventLoop;
@@ -103,7 +103,8 @@ public SnapshotReadPartitionsManager(KafkaConfig config, Metrics metrics, Time t
103103
this.time = time;
104104
this.replicaManager = replicaManager;
105105
this.metadataCache = metadataCache;
106-
this.dataLoader = objects -> SnapshotReadCache.instance().load(objects);
106+
// FIXME: tmp code in merging
107+
this.dataLoader = objects -> Context.instance().snapshotReadCache().replay(objects);
107108
this.asyncSender = new AsyncSender.BrokersAsyncSender(config, metrics, "snapshot_read", Time.SYSTEM, "AUTOMQ_SNAPSHOT_READ", new LogContext());
108109
}
109110

core/src/main/scala/kafka/log/stream/s3/wal/BootstrapWalV1.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,11 @@
2525
import org.apache.kafka.controller.stream.NodeMetadata;
2626

2727
import com.automq.stream.s3.exceptions.AutoMQException;
28+
import com.automq.stream.s3.model.StreamRecordBatch;
2829
import com.automq.stream.s3.trace.context.TraceContext;
2930
import com.automq.stream.s3.wal.AppendResult;
31+
import com.automq.stream.s3.wal.OpenMode;
32+
import com.automq.stream.s3.wal.RecordOffset;
3033
import com.automq.stream.s3.wal.RecoverResult;
3134
import com.automq.stream.s3.wal.WalFactory;
3235
import com.automq.stream.s3.wal.WalFactory.BuildOptions;
@@ -44,11 +47,12 @@
4447
import java.io.IOException;
4548
import java.util.Collections;
4649
import java.util.Iterator;
50+
import java.util.List;
4751
import java.util.concurrent.CompletableFuture;
4852
import java.util.concurrent.ExecutorService;
4953
import java.util.concurrent.TimeUnit;
5054

51-
import io.netty.buffer.ByteBuf;
55+
5256

5357
public class BootstrapWalV1 implements WriteAheadLog {
5458
private static final Logger LOGGER = LoggerFactory.getLogger(BootstrapWalV1.class);
@@ -114,8 +118,23 @@ public WALMetadata metadata() {
114118
}
115119

116120
@Override
117-
public AppendResult append(TraceContext context, ByteBuf data, int crc) throws OverCapacityException {
118-
return wal.append(context, data, crc);
121+
public CompletableFuture<AppendResult> append(TraceContext context, StreamRecordBatch streamRecordBatch) throws OverCapacityException {
122+
return wal.append(context, streamRecordBatch);
123+
}
124+
125+
@Override
126+
public CompletableFuture<StreamRecordBatch> get(RecordOffset recordOffset) {
127+
return wal.get(recordOffset);
128+
}
129+
130+
@Override
131+
public CompletableFuture<List<StreamRecordBatch>> get(RecordOffset startOffset, RecordOffset endOffset) {
132+
return wal.get(startOffset, endOffset);
133+
}
134+
135+
@Override
136+
public RecordOffset confirmOffset() {
137+
return wal.confirmOffset();
119138
}
120139

121140
@Override
@@ -157,15 +176,15 @@ public CompletableFuture<Void> reset() {
157176
}
158177

159178
@Override
160-
public CompletableFuture<Void> trim(long offset) {
179+
public CompletableFuture<Void> trim(RecordOffset offset) {
161180
return wal.trim(offset);
162181
}
163182

164183
private CompletableFuture<? extends WriteAheadLog> buildRecoverWal(String kraftWalConfigs, long oldNodeEpoch) {
165184
IdURI uri = IdURI.parse(kraftWalConfigs);
166185
CompletableFuture<Void> cf = walHandle
167186
.acquirePermission(nodeId, oldNodeEpoch, uri, new WalHandle.AcquirePermissionOptions().failoverMode(failoverMode));
168-
return cf.thenApplyAsync(nil -> factory.build(uri, BuildOptions.builder().nodeEpoch(oldNodeEpoch).failoverMode(failoverMode).build()), executor);
187+
return cf.thenApplyAsync(nil -> factory.build(uri, BuildOptions.builder().nodeEpoch(oldNodeEpoch).openMode(failoverMode ? OpenMode.FAILOVER : OpenMode.READ_WRITE).build()), executor);
169188
}
170189

171190
private CompletableFuture<? extends WriteAheadLog> buildWal(String kraftWalConfigs) {
@@ -175,7 +194,7 @@ private CompletableFuture<? extends WriteAheadLog> buildWal(String kraftWalConfi
175194
.failoverMode(false);
176195
CompletableFuture<Void> cf = walHandle
177196
.acquirePermission(nodeId, nodeEpoch, uri, options);
178-
return cf.thenApplyAsync(nil -> factory.build(uri, BuildOptions.builder().nodeEpoch(nodeEpoch).failoverMode(false).build()), executor);
197+
return cf.thenApplyAsync(nil -> factory.build(uri, BuildOptions.builder().nodeEpoch(nodeEpoch).openMode(OpenMode.READ_WRITE).build()), executor);
179198
}
180199

181200
private CompletableFuture<Void> releasePermission(String kraftWalConfigs) {

core/src/main/scala/kafka/log/stream/s3/wal/DefaultWalFactory.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import com.automq.stream.s3.operator.BucketURI;
2525
import com.automq.stream.s3.operator.ObjectStorage;
2626
import com.automq.stream.s3.operator.ObjectStorageFactory;
27+
import com.automq.stream.s3.wal.ReservationService;
2728
import com.automq.stream.s3.wal.WalFactory;
2829
import com.automq.stream.s3.wal.WriteAheadLog;
30+
import com.automq.stream.s3.wal.impl.object.ObjectReservationService;
2931
import com.automq.stream.s3.wal.impl.object.ObjectWALConfig;
3032
import com.automq.stream.s3.wal.impl.object.ObjectWALService;
3133
import com.automq.stream.utils.IdURI;
@@ -65,8 +67,9 @@ public WriteAheadLog build(IdURI uri, BuildOptions options) {
6567
.withClusterId(AutoMQApplication.getClusterId())
6668
.withNodeId(nodeId)
6769
.withEpoch(options.nodeEpoch())
68-
.withFailover(options.failoverMode());
69-
70+
.withOpenMode(options.openMode());
71+
ReservationService reservationService = new ObjectReservationService(AutoMQApplication.getClusterId(), walObjectStorage, walObjectStorage.bucketId());
72+
configBuilder.withReservationService(reservationService);
7073
return new ObjectWALService(Time.SYSTEM, walObjectStorage, configBuilder.build());
7174
default:
7275
throw new IllegalArgumentException("Unsupported WAL protocol: " + uri.protocol());

core/src/main/scala/kafka/log/streamaspect/ClientWrapper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,11 @@ public long confirmOffset() {
231231
return stream.confirmOffset();
232232
}
233233

234+
@Override
235+
public void confirmOffset(long offset) {
236+
stream.confirmOffset(offset);
237+
}
238+
234239
@Override
235240
public long nextOffset() {
236241
return stream.nextOffset();

core/src/main/scala/kafka/log/streamaspect/LazyStream.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,11 @@ public long confirmOffset() {
114114
return inner.confirmOffset();
115115
}
116116

117+
@Override
118+
public void confirmOffset(long offset) {
119+
inner.confirmOffset(offset);
120+
}
121+
117122
@Override
118123
public long nextOffset() {
119124
return inner.nextOffset();
@@ -199,6 +204,10 @@ public long confirmOffset() {
199204
return 0;
200205
}
201206

207+
@Override
208+
public void confirmOffset(long offset) {
209+
}
210+
202211
@Override
203212
public long nextOffset() {
204213
return 0;

core/src/main/scala/kafka/log/streamaspect/MemoryClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ public long confirmOffset() {
109109
return nextOffsetAlloc.get();
110110
}
111111

112+
@Override
113+
public void confirmOffset(long offset) {
114+
nextOffsetAlloc.set(offset);
115+
}
116+
112117
@Override
113118
public long nextOffset() {
114119
return nextOffsetAlloc.get();

core/src/main/scala/kafka/log/streamaspect/MetaStream.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,11 @@ public long confirmOffset() {
117117
return innerStream.confirmOffset();
118118
}
119119

120+
@Override
121+
public void confirmOffset(long offset) {
122+
innerStream.confirmOffset(offset);
123+
}
124+
120125
@Override
121126
public long nextOffset() {
122127
return innerStream.nextOffset();
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package com.automq.stream;
21+
22+
import com.automq.stream.api.KVClient;
23+
import com.automq.stream.s3.ConfirmWAL;
24+
import com.automq.stream.s3.cache.SnapshotReadCache;
25+
26+
public class Context {
27+
private SnapshotReadCache snapshotReadCache;
28+
private ConfirmWAL confirmWAL;
29+
private KVClient kvClient;
30+
31+
public static final Context INSTANCE = new Context();
32+
33+
public static Context instance() {
34+
return INSTANCE;
35+
}
36+
37+
public KVClient kvClient() {
38+
return kvClient;
39+
}
40+
41+
public void kvClient(KVClient kvClient) {
42+
this.kvClient = kvClient;
43+
}
44+
45+
public void snapshotReadCache(SnapshotReadCache snapshotReadCache) {
46+
this.snapshotReadCache = snapshotReadCache;
47+
}
48+
49+
public SnapshotReadCache snapshotReadCache() {
50+
return snapshotReadCache;
51+
}
52+
53+
public void confirmWAL(ConfirmWAL confirmWAL) {
54+
this.confirmWAL = confirmWAL;
55+
}
56+
57+
public ConfirmWAL confirmWAL() {
58+
return confirmWAL;
59+
}
60+
61+
}

s3stream/src/main/java/com/automq/stream/api/Stream.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ public interface Stream {
5050
*/
5151
long confirmOffset();
5252

53+
/**
54+
* Set confirm offset. Only support in snapshotRead mode
55+
*/
56+
void confirmOffset(long offset);
57+
5358
/**
5459
* Get stream next append record offset.
5560
*/
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package com.automq.stream.s3;
21+
22+
import com.automq.stream.s3.S3Storage.LazyCommit;
23+
import com.automq.stream.s3.wal.RecordOffset;
24+
import com.automq.stream.s3.wal.WriteAheadLog;
25+
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.function.Function;
28+
29+
public class ConfirmWAL {
30+
private final WriteAheadLog log;
31+
private final Function<LazyCommit, CompletableFuture<Void>> commitHandle;
32+
33+
public ConfirmWAL(WriteAheadLog log, Function<LazyCommit, CompletableFuture<Void>> commitHandle) {
34+
this.log = log;
35+
this.commitHandle = commitHandle;
36+
}
37+
38+
public RecordOffset confirmOffset() {
39+
return log.confirmOffset();
40+
}
41+
42+
/**
43+
* Commit with lazy timeout.
44+
* If in [0, lazyLingerMs), there is no other commit happened, then trigger a new commit.
45+
* @param lazyLingerMs lazy linger milliseconds.
46+
*/
47+
public CompletableFuture<Void> commit(long lazyLingerMs, boolean awaitTrim) {
48+
return commitHandle.apply(new LazyCommit(lazyLingerMs, awaitTrim));
49+
}
50+
51+
public CompletableFuture<Void> commit(long lazyLingerMs) {
52+
return commit(lazyLingerMs, true);
53+
}
54+
55+
}

0 commit comments

Comments
 (0)