From 58331b8fcbf0510921854a2e524d5a9bac67a3c8 Mon Sep 17 00:00:00 2001 From: keyboardbobo Date: Wed, 1 Mar 2023 11:27:33 +0800 Subject: [PATCH] Update KafkaRequestHandler.java The commit offset and partition data expire at the same time, and the ListOffset interface returns an incorrect result --- .../pulsar/handlers/kop/KafkaRequestHandler.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 420e76de91..99505f3f22 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -1129,7 +1129,10 @@ private CompletableFuture> fetchOffset(String topicName, Long return; } if (position.compareTo(lac) > 0) { - partitionData.complete(Pair.of(Errors.NONE, latestOffset)); + // If all the data expires, the entry of lac becomes -1, + // and the actual offset will be 1 smaller than LATEST, + // here is special treatment, add 1 back + partitionData.complete(Pair.of(Errors.NONE, latestOffset + 1)); } else { MessageMetadataUtils.getOffsetOfPosition(managedLedger, position, false, timestamp, skipMessagesWithoutIndex)