Skip to content

Commit 9e43d0f

Browse files
committed
fix(zerozone2): fix failover bug caused by #2764
Signed-off-by: Robin Han <[email protected]>
1 parent 37010fa commit 9e43d0f

File tree

2 files changed

+5
-6
lines changed

2 files changed

+5
-6
lines changed

s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/DefaultWriter.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ public DefaultWriter(Time time, ObjectStorage objectStorage, ObjectWALConfig con
120120
this.nodePrefix = ObjectUtils.nodePrefix(config.clusterId(), config.nodeId(), config.type());
121121
this.objectPrefix = nodePrefix + config.epoch() + "/wal/";
122122
this.scheduler = Threads.newSingleThreadScheduledExecutor("s3-wal-schedule", true, LOGGER);
123+
if (!(config.openMode() == OpenMode.READ_WRITE || config.openMode() == OpenMode.FAILOVER)) {
124+
throw new IllegalArgumentException("The open mode must be READ_WRITE or FAILOVER, but got " + config.openMode());
125+
}
123126
ObjectWALMetricsManager.setInflightUploadCountSupplier(() -> (long) uploadingBulks.size());
124127
ObjectWALMetricsManager.setBufferedDataInBytesSupplier(bufferedDataBytes::get);
125128
ObjectWALMetricsManager.setObjectDataInBytesSupplier(objectDataBytes::get);
@@ -213,10 +216,6 @@ protected void checkStatus() throws WALFencedException {
213216
}
214217

215218
protected void checkWriteStatus() throws WALFencedException {
216-
if (config.openMode() != OpenMode.READ_WRITE) {
217-
throw new IllegalStateException("WAL is in failover mode.");
218-
}
219-
220219
checkStatus();
221220
}
222221

@@ -420,7 +419,7 @@ private void callback() {
420419
// The inflight uploading bulks count was decreased, then trigger the upload of Bulk in waitingUploadBulks
421420
tryUploadBulkInWaiting();
422421
long commitStartTime = time.nanoseconds();
423-
return reservationService.verify(config.nodeId(), config.epoch(), false)
422+
return reservationService.verify(config.nodeId(), config.epoch(), config.openMode() == OpenMode.FAILOVER)
424423
.whenComplete((rst, ex) -> {
425424
ObjectWALMetricsManager.recordOperationLatency(time.nanoseconds() - commitStartTime, "commit", ex == null);
426425
if (ex != null) {

s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/ObjectWALService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class ObjectWALService implements WriteAheadLog {
5151
public ObjectWALService(Time time, ObjectStorage objectStorage, ObjectWALConfig config) {
5252
this.objectStorage = objectStorage;
5353
this.config = config;
54-
if (config.openMode() == OpenMode.READ_WRITE) {
54+
if (config.openMode() == OpenMode.READ_WRITE || config.openMode() == OpenMode.FAILOVER) {
5555
this.writer = new DefaultWriter(time, objectStorage, config);
5656
} else {
5757
this.writer = new NoopWriter();

0 commit comments

Comments
 (0)