Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,7 @@ project(':core') {
}
implementation 'redis.clients:jedis:4.3.1'
implementation libs.slf4jlog4j
implementation libs.s3Client

compileOnly libs.log4j

Expand Down
34 changes: 34 additions & 0 deletions core/src/main/scala/kafka/log/s3/S3Client.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.s3;

import com.automq.elasticstream.client.api.Client;
import com.automq.elasticstream.client.api.KVClient;
import com.automq.elasticstream.client.api.StreamClient;

public class S3Client implements Client {
@Override
public StreamClient streamClient() {
return null;
}

@Override
public KVClient kvClient() {
return null;
}
}
85 changes: 85 additions & 0 deletions core/src/main/scala/kafka/log/s3/S3Stream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.s3;

import com.automq.elasticstream.client.api.AppendResult;
import com.automq.elasticstream.client.api.FetchResult;
import com.automq.elasticstream.client.api.RecordBatch;
import com.automq.elasticstream.client.api.Stream;
import kafka.log.s3.cache.S3BlockCache;
import kafka.log.s3.model.StreamMetadata;
import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.streams.StreamManager;

import java.util.concurrent.CompletableFuture;

public class S3Stream implements Stream {
private final StreamMetadata metadata;
private final Wal wal;
private final S3BlockCache blockCache;
private final StreamManager streamManager;
private final ObjectManager objectManager;

public S3Stream(StreamMetadata metadata, Wal wal, S3BlockCache blockCache, StreamManager streamManager, ObjectManager objectManager) {
this.metadata = metadata;
this.wal = wal;
this.blockCache = blockCache;
this.streamManager = streamManager;
this.objectManager = objectManager;
}

@Override
public long streamId() {
return metadata.getStreamId();
}

@Override
public long startOffset() {
return metadata.getStartOffset();
}

@Override
public long nextOffset() {
return 0;
}

@Override
public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
return null;
}

@Override
public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytes) {
return null;
}

@Override
public CompletableFuture<Void> trim(long newStartOffset) {
return streamManager.trimStream(metadata.getStreamId(), metadata.getEpoch(), newStartOffset);
}

@Override
public CompletableFuture<Void> close() {
return null;
}

@Override
public CompletableFuture<Void> destroy() {
return null;
}
}
56 changes: 56 additions & 0 deletions core/src/main/scala/kafka/log/s3/S3StreamClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.s3;

import com.automq.elasticstream.client.api.CreateStreamOptions;
import com.automq.elasticstream.client.api.OpenStreamOptions;
import com.automq.elasticstream.client.api.Stream;
import com.automq.elasticstream.client.api.StreamClient;
import kafka.log.s3.cache.S3BlockCache;
import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.streams.StreamManager;

import java.util.concurrent.CompletableFuture;

public class S3StreamClient implements StreamClient {
private final StreamManager streamController;
private final Wal wal;
private final S3BlockCache blockCache;
private final ObjectManager objectManager;

public S3StreamClient(StreamManager streamController, Wal wal, S3BlockCache blockCache, ObjectManager objectManager) {
this.streamController = streamController;
this.wal = wal;
this.blockCache = blockCache;
this.objectManager = objectManager;
}

@Override
public CompletableFuture<Stream> createAndOpenStream(CreateStreamOptions options) {
return streamController.createStream().thenCompose(streamId -> openStream0(streamId, options.epoch()));
}

@Override
public CompletableFuture<Stream> openStream(long streamId, OpenStreamOptions openStreamOptions) {
return openStream0(streamId, openStreamOptions.epoch());
}

private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
return streamController.openStream(streamId, epoch).thenApply(metadata -> new S3Stream(metadata, wal, blockCache, streamController, objectManager));
}
}
36 changes: 36 additions & 0 deletions core/src/main/scala/kafka/log/s3/Wal.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.s3;

import kafka.log.s3.model.StreamRecordBatch;

import java.util.concurrent.CompletableFuture;

/**
* Write ahead log for server.
*/
public interface Wal {

/**
* Append stream record to wal.
*
* @param streamRecord {@link StreamRecordBatch}
*/
CompletableFuture<Void> append(StreamRecordBatch streamRecord);

}
33 changes: 33 additions & 0 deletions core/src/main/scala/kafka/log/s3/cache/S3BlockCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.s3.cache;

import com.automq.elasticstream.client.api.RecordBatch;

import java.util.concurrent.CompletableFuture;

/**
* Like linux page cache, S3BlockCache is responsible for:
* 1. read from S3 when the data block is not in cache.
* 2. caching the data blocks of S3 objects.
*/
public interface S3BlockCache {

CompletableFuture<RecordBatch> read(long objectId, long streamId, long startOffset, long endOffset, long maxBytes);

}
64 changes: 64 additions & 0 deletions core/src/main/scala/kafka/log/s3/model/RangeMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.s3.model;

import java.util.OptionalLong;

public class RangeMetadata {
private static final long NOOP_OFFSET = -1;
private int index;
private long startOffset;
private long endOffset;
private long serverId;

public int getIndex() {
return index;
}

public void setIndex(int index) {
this.index = index;
}

public long getStartOffset() {
return startOffset;
}

public void setStartOffset(long startOffset) {
this.startOffset = startOffset;
}

public OptionalLong getEndOffset() {
if (endOffset == NOOP_OFFSET) {
return OptionalLong.empty();
} else {
return OptionalLong.of(endOffset);
}
}

public void setEndOffset(long endOffset) {
this.endOffset = endOffset;
}

public long getServerId() {
return serverId;
}

public void setServerId(long serverId) {
this.serverId = serverId;
}
}
60 changes: 60 additions & 0 deletions core/src/main/scala/kafka/log/s3/model/StreamMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.s3.model;

import java.util.List;

public class StreamMetadata {
private long streamId;
private long epoch;
private long startOffset;

private List<RangeMetadata> ranges;

public long getStreamId() {
return streamId;
}

public void setStreamId(long streamId) {
this.streamId = streamId;
}

public long getEpoch() {
return epoch;
}

public void setEpoch(long epoch) {
this.epoch = epoch;
}

public long getStartOffset() {
return startOffset;
}

public void setStartOffset(long startOffset) {
this.startOffset = startOffset;
}

public List<RangeMetadata> getRanges() {
return ranges;
}

public void setRanges(List<RangeMetadata> ranges) {
this.ranges = ranges;
}
}
Loading