diff --git a/build.gradle b/build.gradle index 56cb21cd07..13b16adf35 100644 --- a/build.gradle +++ b/build.gradle @@ -986,6 +986,7 @@ project(':core') { } implementation 'redis.clients:jedis:4.3.1' implementation libs.slf4jlog4j + implementation libs.s3Client compileOnly libs.log4j diff --git a/core/src/main/scala/kafka/log/s3/S3Client.java b/core/src/main/scala/kafka/log/s3/S3Client.java new file mode 100644 index 0000000000..35dbacc5cc --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/S3Client.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import com.automq.elasticstream.client.api.Client; +import com.automq.elasticstream.client.api.KVClient; +import com.automq.elasticstream.client.api.StreamClient; + +public class S3Client implements Client { + @Override + public StreamClient streamClient() { + return null; + } + + @Override + public KVClient kvClient() { + return null; + } +} diff --git a/core/src/main/scala/kafka/log/s3/S3Stream.java b/core/src/main/scala/kafka/log/s3/S3Stream.java new file mode 100644 index 0000000000..8884c89c79 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/S3Stream.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import com.automq.elasticstream.client.api.AppendResult; +import com.automq.elasticstream.client.api.FetchResult; +import com.automq.elasticstream.client.api.RecordBatch; +import com.automq.elasticstream.client.api.Stream; +import kafka.log.s3.cache.S3BlockCache; +import kafka.log.s3.model.StreamMetadata; +import kafka.log.s3.objects.ObjectManager; +import kafka.log.s3.streams.StreamManager; + +import java.util.concurrent.CompletableFuture; + +public class S3Stream implements Stream { + private final StreamMetadata metadata; + private final Wal wal; + private final S3BlockCache blockCache; + private final StreamManager streamManager; + private final ObjectManager objectManager; + + public S3Stream(StreamMetadata metadata, Wal wal, S3BlockCache blockCache, StreamManager streamManager, ObjectManager objectManager) { + this.metadata = metadata; + this.wal = wal; + this.blockCache = blockCache; + this.streamManager = streamManager; + this.objectManager = objectManager; + } + + @Override + public long streamId() { + return metadata.getStreamId(); + } + + @Override + public long startOffset() { + return metadata.getStartOffset(); + } + + @Override + public long nextOffset() { + return 0; + } + + @Override + public CompletableFuture append(RecordBatch recordBatch) { + return null; + } + + @Override + public CompletableFuture fetch(long startOffset, long endOffset, int maxBytes) { + return null; + } + + @Override + public CompletableFuture trim(long newStartOffset) { + return streamManager.trimStream(metadata.getStreamId(), metadata.getEpoch(), newStartOffset); + } + + @Override + public CompletableFuture close() { + return null; + } + + @Override + public CompletableFuture destroy() { + return null; + } +} diff --git a/core/src/main/scala/kafka/log/s3/S3StreamClient.java b/core/src/main/scala/kafka/log/s3/S3StreamClient.java new file mode 100644 index 0000000000..4eca190c6e --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/S3StreamClient.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import com.automq.elasticstream.client.api.CreateStreamOptions; +import com.automq.elasticstream.client.api.OpenStreamOptions; +import com.automq.elasticstream.client.api.Stream; +import com.automq.elasticstream.client.api.StreamClient; +import kafka.log.s3.cache.S3BlockCache; +import kafka.log.s3.objects.ObjectManager; +import kafka.log.s3.streams.StreamManager; + +import java.util.concurrent.CompletableFuture; + +public class S3StreamClient implements StreamClient { + private final StreamManager streamController; + private final Wal wal; + private final S3BlockCache blockCache; + private final ObjectManager objectManager; + + public S3StreamClient(StreamManager streamController, Wal wal, S3BlockCache blockCache, ObjectManager objectManager) { + this.streamController = streamController; + this.wal = wal; + this.blockCache = blockCache; + this.objectManager = objectManager; + } + + @Override + public CompletableFuture createAndOpenStream(CreateStreamOptions options) { + return streamController.createStream().thenCompose(streamId -> openStream0(streamId, options.epoch())); + } + + @Override + public CompletableFuture openStream(long streamId, OpenStreamOptions openStreamOptions) { + return openStream0(streamId, openStreamOptions.epoch()); + } + + private CompletableFuture openStream0(long streamId, long epoch) { + return streamController.openStream(streamId, epoch).thenApply(metadata -> new S3Stream(metadata, wal, blockCache, streamController, objectManager)); + } +} diff --git a/core/src/main/scala/kafka/log/s3/Wal.java b/core/src/main/scala/kafka/log/s3/Wal.java new file mode 100644 index 0000000000..14088e0106 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/Wal.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import kafka.log.s3.model.StreamRecordBatch; + +import java.util.concurrent.CompletableFuture; + +/** + * Write ahead log for server. + */ +public interface Wal { + + /** + * Append stream record to wal. + * + * @param streamRecord {@link StreamRecordBatch} + */ + CompletableFuture append(StreamRecordBatch streamRecord); + +} diff --git a/core/src/main/scala/kafka/log/s3/cache/S3BlockCache.java b/core/src/main/scala/kafka/log/s3/cache/S3BlockCache.java new file mode 100644 index 0000000000..8a117dc7f0 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/cache/S3BlockCache.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.cache; + +import com.automq.elasticstream.client.api.RecordBatch; + +import java.util.concurrent.CompletableFuture; + +/** + * Like linux page cache, S3BlockCache is responsible for: + * 1. read from S3 when the data block is not in cache. + * 2. caching the data blocks of S3 objects. + */ +public interface S3BlockCache { + + CompletableFuture read(long objectId, long streamId, long startOffset, long endOffset, long maxBytes); + +} diff --git a/core/src/main/scala/kafka/log/s3/model/RangeMetadata.java b/core/src/main/scala/kafka/log/s3/model/RangeMetadata.java new file mode 100644 index 0000000000..74dd963a0f --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/model/RangeMetadata.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.model; + +import java.util.OptionalLong; + +public class RangeMetadata { + private static final long NOOP_OFFSET = -1; + private int index; + private long startOffset; + private long endOffset; + private long serverId; + + public int getIndex() { + return index; + } + + public void setIndex(int index) { + this.index = index; + } + + public long getStartOffset() { + return startOffset; + } + + public void setStartOffset(long startOffset) { + this.startOffset = startOffset; + } + + public OptionalLong getEndOffset() { + if (endOffset == NOOP_OFFSET) { + return OptionalLong.empty(); + } else { + return OptionalLong.of(endOffset); + } + } + + public void setEndOffset(long endOffset) { + this.endOffset = endOffset; + } + + public long getServerId() { + return serverId; + } + + public void setServerId(long serverId) { + this.serverId = serverId; + } +} diff --git a/core/src/main/scala/kafka/log/s3/model/StreamMetadata.java b/core/src/main/scala/kafka/log/s3/model/StreamMetadata.java new file mode 100644 index 0000000000..7e43c037d4 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/model/StreamMetadata.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.model; + +import java.util.List; + +public class StreamMetadata { + private long streamId; + private long epoch; + private long startOffset; + + private List ranges; + + public long getStreamId() { + return streamId; + } + + public void setStreamId(long streamId) { + this.streamId = streamId; + } + + public long getEpoch() { + return epoch; + } + + public void setEpoch(long epoch) { + this.epoch = epoch; + } + + public long getStartOffset() { + return startOffset; + } + + public void setStartOffset(long startOffset) { + this.startOffset = startOffset; + } + + public List getRanges() { + return ranges; + } + + public void setRanges(List ranges) { + this.ranges = ranges; + } +} diff --git a/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java b/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java new file mode 100644 index 0000000000..d08f1ca536 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.model; + +import com.automq.elasticstream.client.api.RecordBatch; + +public class StreamRecordBatch { + private long streamId; + + private RecordBatch recordBatch; + private long startOffset; +} diff --git a/core/src/main/scala/kafka/log/s3/objects/CommitCompactObjectRequest.java b/core/src/main/scala/kafka/log/s3/objects/CommitCompactObjectRequest.java new file mode 100644 index 0000000000..be886e636e --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/objects/CommitCompactObjectRequest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.objects; + +import java.util.List; + +public class CommitCompactObjectRequest { + private long objectId; + private long objectSize; + private List streamObjects; + private List compactedObjectIds; + + public long getObjectId() { + return objectId; + } + + public void setObjectId(long objectId) { + this.objectId = objectId; + } + + public long getObjectSize() { + return objectSize; + } + + public void setObjectSize(long objectSize) { + this.objectSize = objectSize; + } + + public List getStreamObjects() { + return streamObjects; + } + + public void setStreamObjects(List streamObjects) { + this.streamObjects = streamObjects; + } + + public List getCompactedObjectIds() { + return compactedObjectIds; + } + + public void setCompactedObjectIds(List compactedObjectIds) { + this.compactedObjectIds = compactedObjectIds; + } +} diff --git a/core/src/main/scala/kafka/log/s3/objects/CommitStreamObjectRequest.java b/core/src/main/scala/kafka/log/s3/objects/CommitStreamObjectRequest.java new file mode 100644 index 0000000000..84570ad26e --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/objects/CommitStreamObjectRequest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.objects; + +public class CommitStreamObjectRequest { + private long objectId; + private long objectSize; + private long streamId; + private long startOffset; + private long endOffset; + private long sourceObjectId; + + public long getObjectId() { + return objectId; + } + + public void setObjectId(long objectId) { + this.objectId = objectId; + } + + public long getObjectSize() { + return objectSize; + } + + public void setObjectSize(long objectSize) { + this.objectSize = objectSize; + } + + public long getStreamId() { + return streamId; + } + + public void setStreamId(long streamId) { + this.streamId = streamId; + } + + public long getStartOffset() { + return startOffset; + } + + public void setStartOffset(long startOffset) { + this.startOffset = startOffset; + } + + public long getEndOffset() { + return endOffset; + } + + public void setEndOffset(long endOffset) { + this.endOffset = endOffset; + } + + public long getSourceObjectId() { + return sourceObjectId; + } + + public void setSourceObjectId(long sourceObjectId) { + this.sourceObjectId = sourceObjectId; + } +} diff --git a/core/src/main/scala/kafka/log/s3/objects/CommitWalObjectRequest.java b/core/src/main/scala/kafka/log/s3/objects/CommitWalObjectRequest.java new file mode 100644 index 0000000000..d4d12ee66c --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/objects/CommitWalObjectRequest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.objects; + +import java.util.List; + +public class CommitWalObjectRequest { + private long objectId; + private long objectSize; + private List streams; + + public long getObjectId() { + return objectId; + } + + public void setObjectId(long objectId) { + this.objectId = objectId; + } + + public long getObjectSize() { + return objectSize; + } + + public void setObjectSize(long objectSize) { + this.objectSize = objectSize; + } + + public List getStreams() { + return streams; + } + + public void setStreams(List streams) { + this.streams = streams; + } +} diff --git a/core/src/main/scala/kafka/log/s3/objects/CommitWalObjectResponse.java b/core/src/main/scala/kafka/log/s3/objects/CommitWalObjectResponse.java new file mode 100644 index 0000000000..f7a285df81 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/objects/CommitWalObjectResponse.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.objects; + +import java.util.List; + +/** + * Commit wal loose object response. + * When stream is fenced, the stream id will be added to failedStreamIds. + */ +public class CommitWalObjectResponse { + private List failedStreamIds; + + public List getFailedStreamIds() { + return failedStreamIds; + } + + public void setFailedStreamIds(List failedStreamIds) { + this.failedStreamIds = failedStreamIds; + } +} diff --git a/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java b/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java new file mode 100644 index 0000000000..cbd9ebf3aa --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.objects; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Object metadata registry. + */ +public interface ObjectManager { + + /** + * Prepare object id for write, if the objects is not committed in ttl, then delete it. + * + * @param count object id count. + * @param ttl ttl in milliseconds. + * @return object id range start. + */ + CompletableFuture prepareObject(int count, long ttl); + + /** + * Commit wal object. + * + * @param request {@link CommitWalObjectRequest} + * @return {@link CommitWalObjectResponse} + */ + CompletableFuture commitWalObject(CommitWalObjectRequest request); + + /** + * Commit minor compact object. Use minor compact object and stream objects to substitute wal object. + * + * @param request {@link CommitCompactObjectRequest} + */ + CompletableFuture commitMinorCompactObject(CommitCompactObjectRequest request); + + /** + * Commit major compact object. Use major compact object and stream objects to substitute minor compact object. + * + * @param request {@link CommitCompactObjectRequest} + */ + CompletableFuture commitMajorCompactObject(CommitCompactObjectRequest request); + + /** + * Commit stream object. When the source object has no reference, then delete it. + * + * @param request {@link CommitStreamObjectRequest} + */ + CompletableFuture commitStreamObject(CommitStreamObjectRequest request); + + /** + * Get objects by stream range. + */ + List getObjects(long streamId, long startOffset, long endOffset, int maxBytes); + +} + diff --git a/core/src/main/scala/kafka/log/s3/objects/ObjectRange.java b/core/src/main/scala/kafka/log/s3/objects/ObjectRange.java new file mode 100644 index 0000000000..cd60e0a74b --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/objects/ObjectRange.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.objects; + +public class ObjectRange { + private long objectId; + private long startPosition; + private long endPosition; + + public ObjectRange(long objectId, long startPosition, long endPosition) { + this.objectId = objectId; + this.startPosition = startPosition; + this.endPosition = endPosition; + } + + public long getObjectId() { + return objectId; + } + + public void setObjectId(long objectId) { + this.objectId = objectId; + } + + public long getStartPosition() { + return startPosition; + } + + public void setStartPosition(long startPosition) { + this.startPosition = startPosition; + } + + public long getEndPosition() { + return endPosition; + } + + public void setEndPosition(long endPosition) { + this.endPosition = endPosition; + } +} diff --git a/core/src/main/scala/kafka/log/s3/objects/StreamObject.java b/core/src/main/scala/kafka/log/s3/objects/StreamObject.java new file mode 100644 index 0000000000..6de0400d90 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/objects/StreamObject.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.objects; + +public class StreamObject { + private long objectId; + private long objectSize; + private long streamId; + private long startOffset; + private long endOffset; + + public long getObjectId() { + return objectId; + } + + public void setObjectId(long objectId) { + this.objectId = objectId; + } + + public long getObjectSize() { + return objectSize; + } + + public void setObjectSize(long objectSize) { + this.objectSize = objectSize; + } + + public long getStreamId() { + return streamId; + } + + public void setStreamId(long streamId) { + this.streamId = streamId; + } + + public long getStartOffset() { + return startOffset; + } + + public void setStartOffset(long startOffset) { + this.startOffset = startOffset; + } + + public long getEndOffset() { + return endOffset; + } + + public void setEndOffset(long endOffset) { + this.endOffset = endOffset; + } +} diff --git a/core/src/main/scala/kafka/log/s3/objects/WalObjectStreamIndex.java b/core/src/main/scala/kafka/log/s3/objects/WalObjectStreamIndex.java new file mode 100644 index 0000000000..5c149eda03 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/objects/WalObjectStreamIndex.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.objects; + +public class WalObjectStreamIndex { + private long streamId; + private long startOffset; + private long endOffset; + + public long getStreamId() { + return streamId; + } + + public void setStreamId(long streamId) { + this.streamId = streamId; + } + + public long getStartOffset() { + return startOffset; + } + + public void setStartOffset(long startOffset) { + this.startOffset = startOffset; + } + + public long getEndOffset() { + return endOffset; + } + + public void setEndOffset(long endOffset) { + this.endOffset = endOffset; + } +} diff --git a/core/src/main/scala/kafka/log/s3/operator/S3Operator.java b/core/src/main/scala/kafka/log/s3/operator/S3Operator.java new file mode 100644 index 0000000000..9a9a6a1b1e --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/operator/S3Operator.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.operator; + +import io.netty.buffer.ByteBuf; + +import java.util.concurrent.CompletableFuture; + +public interface S3Operator { + + /** + * Read data from object. + * + * @param path object path. + * @return data. + */ + CompletableFuture read(String path); + + /** + * Range read from object. + * + * @param path object path. + * @param start range start. + * @param end range end. + * @return data. + */ + CompletableFuture rangeRead(String path, long start, long end); + + /** + * Write data to object. + * + * @param path object path. + * @param data data. + */ + CompletableFuture write(String path, ByteBuf data); + + /** + * New multi-part object writer. + * + * @param path object path + * @return {@link Writer} + */ + Writer writer(String path); + + CompletableFuture delete(String path); +} diff --git a/core/src/main/scala/kafka/log/s3/operator/Writer.java b/core/src/main/scala/kafka/log/s3/operator/Writer.java new file mode 100644 index 0000000000..bcefec5f0f --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/operator/Writer.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.operator; + +import io.netty.buffer.ByteBuf; + +import java.util.concurrent.CompletableFuture; + +/** + * Multi-part object writer. + */ +public interface Writer { + /** + * Write a part of the object. The parts will parallel upload to S3. + * + * @param part object part. + */ + void write(ByteBuf part); + + /** + * Copy a part of the object. + * + * @param sourcePath source object path. + * @param start start position of the source object. + * @param end end position of the source object. + */ + void copyWrite(String sourcePath, long start, long end); + + /** + * Complete the object. + */ + CompletableFuture close(); + +} diff --git a/core/src/main/scala/kafka/log/s3/streams/StreamManager.java b/core/src/main/scala/kafka/log/s3/streams/StreamManager.java new file mode 100644 index 0000000000..483efc7a01 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/streams/StreamManager.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.streams; + +import kafka.log.s3.model.StreamMetadata; + +import java.util.concurrent.CompletableFuture; + +public interface StreamManager { + + /** + * Create a new stream. + * + * @return stream id. + */ + CompletableFuture createStream(); + + /** + * Open stream with newer epoch. The controller will: + * 1. update stream epoch to fence old stream writer to commit object. + * 2. calculate the last range endOffset. + * 2. create a new range with serverId = current serverId, startOffset = last range endOffset. + * + * @param streamId stream id. + * @param epoch stream epoch. + * @return {@link StreamMetadata} + */ + CompletableFuture openStream(long streamId, long epoch); + + /** + * Trim stream to new start offset. + * + * @param streamId stream id. + * @param epoch stream epoch. + * @param newStartOffset new start offset. + */ + CompletableFuture trimStream(long streamId, long epoch, long newStartOffset); +} + diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 679f11ad9f..318a50dab9 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -127,7 +127,8 @@ versions += [ zinc: "1.7.2", zookeeper: "3.6.3", zstd: "1.5.2-1", - elasticstream: "1.0-SNAPSHOT" + elasticstream: "1.0-SNAPSHOT", + s3Client: "2.20.127", ] libs += [ activation: "javax.activation:activation:$versions.activation", @@ -219,5 +220,6 @@ libs += [ mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact", zstd: "com.github.luben:zstd-jni:$versions.zstd", httpclient: "org.apache.httpcomponents:httpclient:$versions.httpclient", - esClient: "com.automq.elasticstream:client:$versions.elasticstream" + esClient: "com.automq.elasticstream:client:$versions.elasticstream", + s3Client: "software.amazon.awssdk:s3:$versions.s3Client", ]