Skip to content

Commit 3f88527

Browse files
authored
Merge pull request #2 from AutoMQ/feat_s3_stream_client_arch
feat(client): s3 stream client v0
2 parents 6460420 + 83761b4 commit 3f88527

21 files changed

+1024
-2
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -986,6 +986,7 @@ project(':core') {
986986
}
987987
implementation 'redis.clients:jedis:4.3.1'
988988
implementation libs.slf4jlog4j
989+
implementation libs.s3Client
989990

990991
compileOnly libs.log4j
991992

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
* <p>
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* <p>
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kafka.log.s3;
19+
20+
import com.automq.elasticstream.client.api.Client;
21+
import com.automq.elasticstream.client.api.KVClient;
22+
import com.automq.elasticstream.client.api.StreamClient;
23+
24+
public class S3Client implements Client {
25+
@Override
26+
public StreamClient streamClient() {
27+
return null;
28+
}
29+
30+
@Override
31+
public KVClient kvClient() {
32+
return null;
33+
}
34+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
* <p>
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* <p>
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kafka.log.s3;
19+
20+
import com.automq.elasticstream.client.api.AppendResult;
21+
import com.automq.elasticstream.client.api.FetchResult;
22+
import com.automq.elasticstream.client.api.RecordBatch;
23+
import com.automq.elasticstream.client.api.Stream;
24+
import kafka.log.s3.cache.S3BlockCache;
25+
import kafka.log.s3.model.StreamMetadata;
26+
import kafka.log.s3.objects.ObjectManager;
27+
import kafka.log.s3.streams.StreamManager;
28+
29+
import java.util.concurrent.CompletableFuture;
30+
31+
public class S3Stream implements Stream {
32+
private final StreamMetadata metadata;
33+
private final Wal wal;
34+
private final S3BlockCache blockCache;
35+
private final StreamManager streamManager;
36+
private final ObjectManager objectManager;
37+
38+
public S3Stream(StreamMetadata metadata, Wal wal, S3BlockCache blockCache, StreamManager streamManager, ObjectManager objectManager) {
39+
this.metadata = metadata;
40+
this.wal = wal;
41+
this.blockCache = blockCache;
42+
this.streamManager = streamManager;
43+
this.objectManager = objectManager;
44+
}
45+
46+
@Override
47+
public long streamId() {
48+
return metadata.getStreamId();
49+
}
50+
51+
@Override
52+
public long startOffset() {
53+
return metadata.getStartOffset();
54+
}
55+
56+
@Override
57+
public long nextOffset() {
58+
return 0;
59+
}
60+
61+
@Override
62+
public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
63+
return null;
64+
}
65+
66+
@Override
67+
public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytes) {
68+
return null;
69+
}
70+
71+
@Override
72+
public CompletableFuture<Void> trim(long newStartOffset) {
73+
return streamManager.trimStream(metadata.getStreamId(), metadata.getEpoch(), newStartOffset);
74+
}
75+
76+
@Override
77+
public CompletableFuture<Void> close() {
78+
return null;
79+
}
80+
81+
@Override
82+
public CompletableFuture<Void> destroy() {
83+
return null;
84+
}
85+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
* <p>
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* <p>
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kafka.log.s3;
19+
20+
import com.automq.elasticstream.client.api.CreateStreamOptions;
21+
import com.automq.elasticstream.client.api.OpenStreamOptions;
22+
import com.automq.elasticstream.client.api.Stream;
23+
import com.automq.elasticstream.client.api.StreamClient;
24+
import kafka.log.s3.cache.S3BlockCache;
25+
import kafka.log.s3.objects.ObjectManager;
26+
import kafka.log.s3.streams.StreamManager;
27+
28+
import java.util.concurrent.CompletableFuture;
29+
30+
public class S3StreamClient implements StreamClient {
31+
private final StreamManager streamController;
32+
private final Wal wal;
33+
private final S3BlockCache blockCache;
34+
private final ObjectManager objectManager;
35+
36+
public S3StreamClient(StreamManager streamController, Wal wal, S3BlockCache blockCache, ObjectManager objectManager) {
37+
this.streamController = streamController;
38+
this.wal = wal;
39+
this.blockCache = blockCache;
40+
this.objectManager = objectManager;
41+
}
42+
43+
@Override
44+
public CompletableFuture<Stream> createAndOpenStream(CreateStreamOptions options) {
45+
return streamController.createStream().thenCompose(streamId -> openStream0(streamId, options.epoch()));
46+
}
47+
48+
@Override
49+
public CompletableFuture<Stream> openStream(long streamId, OpenStreamOptions openStreamOptions) {
50+
return openStream0(streamId, openStreamOptions.epoch());
51+
}
52+
53+
private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
54+
return streamController.openStream(streamId, epoch).thenApply(metadata -> new S3Stream(metadata, wal, blockCache, streamController, objectManager));
55+
}
56+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
* <p>
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* <p>
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kafka.log.s3;
19+
20+
import kafka.log.s3.model.StreamRecordBatch;
21+
22+
import java.util.concurrent.CompletableFuture;
23+
24+
/**
25+
* Write ahead log for server.
26+
*/
27+
public interface Wal {
28+
29+
/**
30+
* Append stream record to wal.
31+
*
32+
* @param streamRecord {@link StreamRecordBatch}
33+
*/
34+
CompletableFuture<Void> append(StreamRecordBatch streamRecord);
35+
36+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
* <p>
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* <p>
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kafka.log.s3.cache;
19+
20+
import com.automq.elasticstream.client.api.RecordBatch;
21+
22+
import java.util.concurrent.CompletableFuture;
23+
24+
/**
25+
* Like linux page cache, S3BlockCache is responsible for:
26+
* 1. read from S3 when the data block is not in cache.
27+
* 2. caching the data blocks of S3 objects.
28+
*/
29+
public interface S3BlockCache {
30+
31+
CompletableFuture<RecordBatch> read(long objectId, long streamId, long startOffset, long endOffset, long maxBytes);
32+
33+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
* <p>
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* <p>
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kafka.log.s3.model;
19+
20+
import java.util.OptionalLong;
21+
22+
public class RangeMetadata {
23+
private static final long NOOP_OFFSET = -1;
24+
private int index;
25+
private long startOffset;
26+
private long endOffset;
27+
private long serverId;
28+
29+
public int getIndex() {
30+
return index;
31+
}
32+
33+
public void setIndex(int index) {
34+
this.index = index;
35+
}
36+
37+
public long getStartOffset() {
38+
return startOffset;
39+
}
40+
41+
public void setStartOffset(long startOffset) {
42+
this.startOffset = startOffset;
43+
}
44+
45+
public OptionalLong getEndOffset() {
46+
if (endOffset == NOOP_OFFSET) {
47+
return OptionalLong.empty();
48+
} else {
49+
return OptionalLong.of(endOffset);
50+
}
51+
}
52+
53+
public void setEndOffset(long endOffset) {
54+
this.endOffset = endOffset;
55+
}
56+
57+
public long getServerId() {
58+
return serverId;
59+
}
60+
61+
public void setServerId(long serverId) {
62+
this.serverId = serverId;
63+
}
64+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
* <p>
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* <p>
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kafka.log.s3.model;
19+
20+
import java.util.List;
21+
22+
public class StreamMetadata {
23+
private long streamId;
24+
private long epoch;
25+
private long startOffset;
26+
27+
private List<RangeMetadata> ranges;
28+
29+
public long getStreamId() {
30+
return streamId;
31+
}
32+
33+
public void setStreamId(long streamId) {
34+
this.streamId = streamId;
35+
}
36+
37+
public long getEpoch() {
38+
return epoch;
39+
}
40+
41+
public void setEpoch(long epoch) {
42+
this.epoch = epoch;
43+
}
44+
45+
public long getStartOffset() {
46+
return startOffset;
47+
}
48+
49+
public void setStartOffset(long startOffset) {
50+
this.startOffset = startOffset;
51+
}
52+
53+
public List<RangeMetadata> getRanges() {
54+
return ranges;
55+
}
56+
57+
public void setRanges(List<RangeMetadata> ranges) {
58+
this.ranges = ranges;
59+
}
60+
}

0 commit comments

Comments
 (0)