Skip to content

Commit ffcafda

Browse files
sangrealrkolesnev
andauthored
fix for message loss for issue 826 (#827)
* use offsetHighestSucceeded instead of offsetHighestSeen to fix issue #826 * update readme * add ut * add comments * remove ut --------- Co-authored-by: Roman Kolesnev <[email protected]>
1 parent 734d54b commit ffcafda

File tree

2 files changed

+4
-2
lines changed

2 files changed

+4
-2
lines changed

CHANGELOG.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ endif::[]
1818

1919
=== Fixes
2020

21+
* fix: message loss on closing or partitions revoked (#826)
2122
* fix: ConcurrentModificationException Happened while high load and draining (#822) fixes (#821)
2223
* fix: safely completing doClose() (#818) partially fixes (#809)
2324
* Improved offset commit retry. Add support for SaslAuthenticationException retry timeout (#819), partially fixes (#809) in Commit_Sync mode

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ public boolean isRecordPreviouslyCompleted(final ConsumerRecord<K, V> rec) {
240240
return false;
241241
} else {
242242
// if within the range of tracked offsets, must have been previously completed, as it's not in the incomplete set
243-
return recOffset <= offsetHighestSeen;
243+
return recOffset <= offsetHighestSucceeded;
244244
}
245245
}
246246

@@ -459,7 +459,8 @@ public long getOffsetHighestSequentialSucceeded() {
459459
*
460460
* See #200 for the complete correct solution.
461461
*/
462-
long currentOffsetHighestSeen = offsetHighestSeen;
462+
// use offsetHighestSucceeded instead of offsetHighestSeen to fix issue #826
463+
long currentOffsetHighestSeen = offsetHighestSucceeded;
463464
Long firstIncompleteOffset = incompleteOffsets.keySet().ceiling(KAFKA_OFFSET_ABSENCE);
464465
boolean incompleteOffsetsWasEmpty = firstIncompleteOffset == null;
465466

0 commit comments

Comments
 (0)