Skip to content

Commit ddc8e3a

Browse files
authored
feat(failover): add wal failover support (#2516) (#2517)
Signed-off-by: Robin Han <[email protected]>
1 parent c9a97eb commit ddc8e3a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+2199
-4564
lines changed

config/kraft/broker.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ s3.data.buckets=0@s3://ko3?region=us-east-1
143143
# The ops buckets
144144
s3.ops.buckets=0@s3://ko3?region=us-east-1
145145

146-
# The file path of delta WAL in block device
147-
s3.wal.path=0@file:///tmp/kraft-broker-logs/s3wal?capacity=2147483648
146+
# The wal storage config
147+
s3.wal.path=0@s3://ko3?region=us-east-1
148148

149149
# The maximum size of WAL cache can use, default 2GB
150150
# s3.wal.cache.size=2147483648

config/kraft/server.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,8 @@ s3.data.buckets=0@s3://ko3?region=us-east-1
146146
# The ops buckets
147147
s3.ops.buckets=0@s3://ko3?region=us-east-1
148148

149-
# The file path of delta WAL in block device
150-
s3.wal.path=0@file:///tmp/kraft-broker-logs/s3wal?capacity=2147483648
149+
# The wal storage config
150+
s3.wal.path=0@s3://ko3?region=us-east-1
151151

152152
# The maximum size of WAL cache can use, default 2GB
153153
# s3.wal.cache.size=2147483648

core/src/main/java/kafka/automq/AutoMQConfig.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,7 @@ public class AutoMQConfig {
5656
public static final String ELASTIC_STREAM_ENABLE_DOC = "Whether to enable AutoMQ, it has to be set to true";
5757

5858
public static final String ELASTIC_STREAM_ENDPOINT_CONFIG = "elasticstream.endpoint";
59-
public static final String ELASTIC_STREAM_ENDPOINT_DOC = "Specifies the Elastic Stream endpoint, ex. <code>es://hostname1:port1,hostname2:port2,hostname3:port3</code>.\n" +
60-
"You could also PoC launch it in memory mode with endpoint <code>memory:://</code> or redis mode with <code>redis://.</code>";
59+
public static final String ELASTIC_STREAM_ENDPOINT_DOC = "Specifies the Elastic Stream endpoint";
6160

6261
public static final String S3_DATA_BUCKETS_CONFIG = "s3.data.buckets";
6362
public static final String S3_DATA_BUCKETS_DOC = "The data buckets url with format 0@s3://$bucket?region=$region. \n" +
@@ -69,8 +68,7 @@ public class AutoMQConfig {
6968
public static final String S3_OPS_BUCKETS_DOC = "With the same format as s3.data.buckets";
7069

7170
public static final String S3_WAL_PATH_CONFIG = "s3.wal.path";
72-
public static final String S3_WAL_PATH_DOC = "The local WAL path for AutoMQ can be set to a block device path such as 0@file:///dev/xxx?iops=3000&iodepth=8&iobandwidth=157286400 or a filesystem file path." +
73-
"It is recommended to use a block device for better write performance.";
71+
public static final String S3_WAL_PATH_DOC = "The WAL path for AutoMQ, The format is '0@s3://$bucket?region=$region[&batchInterval=250][&maxBytesInBatch=8388608]'";
7472

7573
public static final String S3_WAL_CACHE_SIZE_CONFIG = "s3.wal.cache.size";
7674
public static final String S3_WAL_CACHE_SIZE_DOC = "The WAL (Write-Ahead Log) cache is a FIFO (First In, First Out) queue that contains data that has not yet been uploaded to object storage, as well as data that has been uploaded but not yet evicted from the cache." +
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package kafka.automq.controller;
21+
22+
import kafka.automq.failover.FailoverControlManager;
23+
24+
import org.apache.kafka.common.metadata.KVRecord;
25+
import org.apache.kafka.common.metadata.MetadataRecordType;
26+
import org.apache.kafka.common.protocol.ApiMessage;
27+
import org.apache.kafka.controller.QuorumController;
28+
import org.apache.kafka.controller.QuorumControllerExtension;
29+
import org.apache.kafka.raft.OffsetAndEpoch;
30+
31+
import java.util.Objects;
32+
import java.util.Optional;
33+
34+
public class DefaultQuorumControllerExtension implements QuorumControllerExtension {
35+
private final FailoverControlManager failoverControlManager;
36+
37+
public DefaultQuorumControllerExtension(QuorumController controller) {
38+
this.failoverControlManager = new FailoverControlManager(
39+
controller.snapshotRegistry(),
40+
controller,
41+
controller.clusterControl(),
42+
controller.nodeControlManager(),
43+
controller.streamControlManager()
44+
);
45+
}
46+
47+
@Override
48+
public boolean replay(MetadataRecordType type, ApiMessage message, Optional<OffsetAndEpoch> snapshotId,
49+
long batchLastOffset) {
50+
if (Objects.requireNonNull(type) == MetadataRecordType.KVRECORD) {
51+
failoverControlManager.replay((KVRecord) message);
52+
} else {
53+
return false;
54+
}
55+
return true;
56+
}
57+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package kafka.automq.failover;
21+
22+
import java.util.Objects;
23+
24+
public final class DefaultFailedWal implements FailedWal {
25+
private final NodeRuntimeMetadata nodeMetadata;
26+
27+
public DefaultFailedWal(NodeRuntimeMetadata nodeMetadata) {
28+
this.nodeMetadata = nodeMetadata;
29+
}
30+
31+
public NodeRuntimeMetadata nodeMetadata() {
32+
return nodeMetadata;
33+
}
34+
35+
@Override
36+
public boolean equals(Object obj) {
37+
if (obj == this)
38+
return true;
39+
if (obj == null || obj.getClass() != this.getClass())
40+
return false;
41+
var that = (DefaultFailedWal) obj;
42+
return Objects.equals(this.nodeMetadata, that.nodeMetadata);
43+
}
44+
45+
@Override
46+
public int hashCode() {
47+
return Objects.hash(nodeMetadata);
48+
}
49+
50+
@Override
51+
public String toString() {
52+
return "FailedWalV1[" +
53+
"nodeMetadata=" + nodeMetadata + ']';
54+
}
55+
56+
@Override
57+
public FailoverContext toFailoverContext(int target) {
58+
return new FailoverContext(nodeMetadata.id(), nodeMetadata.epoch(), target, nodeMetadata.walConfigs());
59+
}
60+
61+
@Override
62+
public FailedNode node() {
63+
return FailedNode.from(nodeMetadata);
64+
}
65+
66+
public static FailedWal from(NodeRuntimeMetadata failedNode) {
67+
return new DefaultFailedWal(failedNode);
68+
}
69+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package kafka.automq.failover;
21+
22+
public interface FailedNode {
23+
24+
/**
25+
* The node id of the failed node.
26+
*/
27+
int id();
28+
29+
static FailedNode from(NodeRuntimeMetadata node) {
30+
return new K8sFailedNode(node.id());
31+
}
32+
33+
static FailedNode from(FailoverContext context) {
34+
return new K8sFailedNode(context.getNodeId());
35+
}
36+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package kafka.automq.failover;
21+
22+
public interface FailedWal {
23+
24+
/**
25+
* Convert to a failover context.
26+
*/
27+
FailoverContext toFailoverContext(int target);
28+
29+
/**
30+
* The node of the failed WAL belongs to.
31+
*/
32+
FailedNode node();
33+
34+
default int nodeId() {
35+
return node().id();
36+
}
37+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package kafka.automq.failover;
21+
22+
public class FailoverConstants {
23+
public static final String FAILOVER_KEY = "__a.failover";
24+
25+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package kafka.automq.failover;
21+
22+
import com.fasterxml.jackson.annotation.JsonIgnore;
23+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
24+
import com.fasterxml.jackson.annotation.JsonInclude;
25+
import com.fasterxml.jackson.annotation.JsonProperty;
26+
27+
@JsonIgnoreProperties(ignoreUnknown = true)
28+
public class FailoverContext {
29+
/**
30+
* Failed node id
31+
*/
32+
@JsonProperty("n")
33+
private int nodeId;
34+
35+
/**
36+
* Failover target node id
37+
*
38+
* @since failover v0
39+
*/
40+
@JsonProperty("t")
41+
private int target;
42+
43+
/**
44+
* Failed node epoch
45+
*
46+
* @since failover v1
47+
*/
48+
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
49+
@JsonProperty(value = "e", defaultValue = "0")
50+
private long nodeEpoch;
51+
52+
/**
53+
* WAL configs for failover
54+
*/
55+
@JsonInclude(JsonInclude.Include.NON_NULL)
56+
@JsonProperty("c")
57+
private String kraftWalConfigs;
58+
59+
// for json deserialize
60+
public FailoverContext() {}
61+
62+
public FailoverContext(int nodeId, long nodeEpoch, int target, String kraftWalConfigs) {
63+
this.nodeId = nodeId;
64+
this.nodeEpoch = nodeEpoch;
65+
this.target = target;
66+
this.kraftWalConfigs = kraftWalConfigs;
67+
}
68+
69+
@JsonIgnore
70+
public FailedNode getFailedNode() {
71+
return FailedNode.from(this);
72+
}
73+
74+
public int getNodeId() {
75+
return nodeId;
76+
}
77+
78+
public int getTarget() {
79+
return target;
80+
}
81+
82+
public long getNodeEpoch() {
83+
return nodeEpoch;
84+
}
85+
86+
public String getKraftWalConfigs() {
87+
return kraftWalConfigs;
88+
}
89+
90+
@Override
91+
public String toString() {
92+
return "FailoverContext{" +
93+
"nodeId=" + nodeId +
94+
", target=" + target +
95+
", nodeEpoch=" + nodeEpoch +
96+
", kraftWalConfigs=" + kraftWalConfigs +
97+
'}';
98+
}
99+
}

0 commit comments

Comments
 (0)