From 23086ce8ce5aa6c5d243bb83cc6064cbdc9c9b9d Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Mon, 19 Dec 2022 17:49:13 +0100 Subject: [PATCH 1/9] Recover from temporary errors on PulsarTopicProducerStateManagerSnapshotBuffer.java (cherry picked from commit 3a61fdb8338d31d69846ad84584c28181e469469) --- ...picProducerStateManagerSnapshotBuffer.java | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java index 98cf532803..03353dd328 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java @@ -53,26 +53,53 @@ public class PulsarTopicProducerStateManagerSnapshotBuffer implements ProducerSt private synchronized CompletableFuture> ensureReaderHandle() { if (reader == null) { - reader = pulsarClient.newReaderBuilder() + CompletableFuture> newReader = pulsarClient.newReaderBuilder() .topic(topic) .startMessageId(MessageId.earliest) .readCompacted(true) .createAsync(); + reader = newReader; + + newReader.whenComplete((r, error) -> { + if (error != null) { + discardReader(newReader); + } + }); } return reader; } + private synchronized void discardReader(CompletableFuture> oldReader) { + if (reader == oldReader) { + reader = null; + } + } + private synchronized CompletableFuture> ensureProducerHandle() { if (producer == null) { - producer = pulsarClient.newProducerBuilder() + CompletableFuture> newProducer = pulsarClient.newProducerBuilder() .enableBatching(false) .topic(topic) .blockIfQueueFull(true) .createAsync(); + + producer = newProducer; + + newProducer.whenComplete((r, error) -> { + if (error != null) { + discardProducer(newProducer); + } + }); } return producer; } + private synchronized void discardProducer(CompletableFuture> oldProducer) { + if (producer == oldProducer) { + producer = null; + } + } + private CompletableFuture readNextMessageIfAvailable(Reader reader) { return reader .hasMessageAvailableAsync() From 0b475cb8a8a5f8fac375cf4d14572ad9d220cb01 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 30 Dec 2022 14:31:20 +0100 Subject: [PATCH 2/9] [transactions] PulsarTopicProducerStateManagerSnapshotBuffer - discard failred Readers (cherry picked from commit 3333ceff3bf7a108418349792614eba11d0cf16c) --- ...opicProducerStateManagerSnapshotBuffer.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java index 03353dd328..06d8e8fb58 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java @@ -75,6 +75,14 @@ private synchronized void discardReader(CompletableFuture> ol } } + private synchronized void discardReader(Reader oldReader) { + if (reader.isDone() + && !reader.isCompletedExceptionally() + && reader.getNow(null) == oldReader) { + reader = null; + } + } + private synchronized CompletableFuture> ensureProducerHandle() { if (producer == null) { CompletableFuture> newProducer = pulsarClient.newProducerBuilder() @@ -101,7 +109,7 @@ private synchronized void discardProducer(CompletableFuture } private CompletableFuture readNextMessageIfAvailable(Reader reader) { - return reader + CompletableFuture result = reader .hasMessageAvailableAsync() .thenCompose(hasMessageAvailable -> { if (hasMessageAvailable == null @@ -115,6 +123,14 @@ private CompletableFuture readNextMessageIfAvailable(Reader re }); } }); + + result.whenComplete((r, error) -> { + if (error != null) { + discardReader(reader); + } + }); + + return result; } From 59202c93919360ebcf479843248d8f046b985894 Mon Sep 17 00:00:00 2001 From: rangao Date: Wed, 12 Jul 2023 11:17:38 +0800 Subject: [PATCH 3/9] resolve the spotbugs problem --- ...picProducerStateManagerSnapshotBuffer.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java index 06d8e8fb58..aeca13fc9a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java @@ -31,6 +31,10 @@ import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.pulsar.client.api.Message; @@ -76,10 +80,17 @@ private synchronized void discardReader(CompletableFuture> ol } private synchronized void discardReader(Reader oldReader) { - if (reader.isDone() - && !reader.isCompletedExceptionally() - && reader.getNow(null) == oldReader) { - reader = null; + if (reader.isDone() && !reader.isCompletedExceptionally()) { + Reader newReader = null; + try { + newReader = reader.get(0, TimeUnit.MILLISECONDS); + if (newReader == oldReader) { + reader = null; + } + } catch (Exception exception) { + log.warn("Failed to get reader handle for topic {}, discard the reader.", topic, exception); + reader = null; + } } } From 9f241c07bf431cf7b40fe72f3564d6762b351402 Mon Sep 17 00:00:00 2001 From: rangao Date: Wed, 12 Jul 2023 11:30:34 +0800 Subject: [PATCH 4/9] fix checkstyle --- .../storage/PulsarTopicProducerStateManagerSnapshotBuffer.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java index aeca13fc9a..f94081645b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java @@ -31,10 +31,7 @@ import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.pulsar.client.api.Message; From 5e6e6e4c338cc35cb5eb829f9c699bd293c16a8a Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 30 Dec 2022 15:22:07 +0100 Subject: [PATCH 5/9] [transactions] More handling of errors during reads (cherry picked from commit 8f65791e61d5bc5a9dbe4c7f309f6d4308c10075) --- ...lsarTopicProducerStateManagerSnapshotBuffer.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java index f94081645b..cee0e257cf 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java @@ -71,17 +71,22 @@ private synchronized CompletableFuture> ensureReaderHandle() } private synchronized void discardReader(CompletableFuture> oldReader) { - if (reader == oldReader) { + if (reader == oldReader || (reader != null && reader.isCompletedExceptionally())) { reader = null; + log.info("discard broken reader for {}", topic); } } private synchronized void discardReader(Reader oldReader) { + if (reader == null) { + return; + } if (reader.isDone() && !reader.isCompletedExceptionally()) { Reader newReader = null; try { newReader = reader.get(0, TimeUnit.MILLISECONDS); if (newReader == oldReader) { + log.info("discard broken reader for {}", topic); reader = null; } } catch (Exception exception) { @@ -164,6 +169,12 @@ private synchronized CompletableFuture ensureLatestData(boolean beforeWrit final CompletableFuture newReadHandle = readerHandle.thenCompose(this::readNextMessageIfAvailable); currentReadHandle = newReadHandle; + + newReadHandle.exceptionally(___ -> { + endReadLoop(newReadHandle); + return null; + }); + return newReadHandle.thenApply((__) -> { endReadLoop(newReadHandle); return null; From 234ce4dffa0c7de30d493f58facd49e66bb59b9c Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 30 Dec 2022 15:37:38 +0100 Subject: [PATCH 6/9] Spotbugs (cherry picked from commit 3efa5970bf932aac15ab626f0f2f39704024b36d) --- .../PulsarTopicProducerStateManagerSnapshotBuffer.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java index cee0e257cf..d7b8f2fa42 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java @@ -13,6 +13,7 @@ */ package io.streamnative.pulsar.handlers.kop.storage; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufOutputStream; @@ -77,6 +78,7 @@ private synchronized void discardReader(CompletableFuture> ol } } + @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION") private synchronized void discardReader(Reader oldReader) { if (reader == null) { return; @@ -148,7 +150,7 @@ private CompletableFuture readNextMessageIfAvailable(Reader re private synchronized CompletableFuture ensureLatestData(boolean beforeWrite) { - if (currentReadHandle != null) { + if (currentReadHandle != null && !currentReadHandle.isCompletedExceptionally()) { if (beforeWrite) { // we are inside a write loop, so // we must ensure that we start to read now From 11f3393f06059157c150318367fb15d0a4e4e707 Mon Sep 17 00:00:00 2001 From: rangao Date: Wed, 12 Jul 2023 11:36:00 +0800 Subject: [PATCH 7/9] revert spotbugs problem change --- ...TopicProducerStateManagerSnapshotBuffer.java | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java index d7b8f2fa42..887d4502ca 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java @@ -83,18 +83,11 @@ private synchronized void discardReader(Reader oldReader) { if (reader == null) { return; } - if (reader.isDone() && !reader.isCompletedExceptionally()) { - Reader newReader = null; - try { - newReader = reader.get(0, TimeUnit.MILLISECONDS); - if (newReader == oldReader) { - log.info("discard broken reader for {}", topic); - reader = null; - } - } catch (Exception exception) { - log.warn("Failed to get reader handle for topic {}, discard the reader.", topic, exception); - reader = null; - } + if (reader.isCompletedExceptionally() || (reader.isDone() + && !reader.isCompletedExceptionally() + && reader.getNow(null) == oldReader)) { + log.info("discard broken reader for {}", topic); + reader = null; } } From 73a9a9cf7c5f13ce073e96b5f21c6db8d93d8c84 Mon Sep 17 00:00:00 2001 From: rangao Date: Mon, 17 Jul 2023 10:29:09 +0800 Subject: [PATCH 8/9] fix checkstyle --- .../storage/PulsarTopicProducerStateManagerSnapshotBuffer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java index 887d4502ca..226ef7551b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java @@ -32,7 +32,6 @@ import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.pulsar.client.api.Message; From 5183d18e6fe2bd8d9fc581ddb8239874741a44bd Mon Sep 17 00:00:00 2001 From: rangao Date: Mon, 24 Jul 2023 21:18:08 +0800 Subject: [PATCH 9/9] add NPE check and unit test --- ...picProducerStateManagerSnapshotBuffer.java | 19 +++++-- ...roducerStateManagerSnapshotBufferTest.java | 53 +++++++++++++++++++ 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java index 226ef7551b..3563f01ad6 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBuffer.java @@ -13,12 +13,14 @@ */ package io.streamnative.pulsar.handlers.kop.storage; +import com.google.common.annotations.VisibleForTesting; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.Unpooled; import io.streamnative.pulsar.handlers.kop.SystemTopicClient; +import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -52,7 +54,8 @@ public class PulsarTopicProducerStateManagerSnapshotBuffer implements ProducerSt private CompletableFuture currentReadHandle; - private synchronized CompletableFuture> ensureReaderHandle() { + @VisibleForTesting + public synchronized CompletableFuture> ensureReaderHandle() { if (reader == null) { CompletableFuture> newReader = pulsarClient.newReaderBuilder() .topic(topic) @@ -90,7 +93,8 @@ private synchronized void discardReader(Reader oldReader) { } } - private synchronized CompletableFuture> ensureProducerHandle() { + @VisibleForTesting + public synchronized CompletableFuture> ensureProducerHandle() { if (producer == null) { CompletableFuture> newProducer = pulsarClient.newProducerBuilder() .enableBatching(false) @@ -160,6 +164,10 @@ private synchronized CompletableFuture ensureLatestData(boolean beforeWrit // please note that the read operation is async, // and it is not execute inside this synchronized block CompletableFuture> readerHandle = ensureReaderHandle(); + if (readerHandle == null) { + return CompletableFuture.failedFuture( + new KoPTopicException("Failed to create reader handle for " + topic)); + } final CompletableFuture newReadHandle = readerHandle.thenCompose(this::readNextMessageIfAvailable); currentReadHandle = newReadHandle; @@ -188,7 +196,12 @@ public CompletableFuture write(ProducerStateManagerSnapshot snapshot) { // cannot serialise, skip return CompletableFuture.completedFuture(null); } - return ensureProducerHandle().thenCompose(opProducer -> { + CompletableFuture> producerFuture = ensureProducerHandle(); + if (producerFuture == null) { + return CompletableFuture.failedFuture( + new KoPTopicException("Failed to create producer handle for " + topic)); + } + return producerFuture.thenCompose(opProducer -> { // nobody can write now to the topic // wait for local cache to be up-to-date return ensureLatestData(true) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBufferTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBufferTest.java index 082c5dc060..2deadce55d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBufferTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/PulsarTopicProducerStateManagerSnapshotBufferTest.java @@ -13,17 +13,26 @@ */ package io.streamnative.pulsar.handlers.kop.storage; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import io.streamnative.pulsar.handlers.kop.SystemTopicClient; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.ReaderBuilder; import org.testng.annotations.Test; /** @@ -69,4 +78,48 @@ public void testSerializeAndDeserialize() { } } + @Test(timeOut = 5_000) + public void ensureReaderHandleCaughtExceptionTest() { + SystemTopicClient sysTopicClient = spy(new SystemTopicClient(pulsar, conf)); + ReaderBuilder readerBuilder = spy(sysTopicClient.newReaderBuilder()); + when(readerBuilder.createAsync()).thenReturn(CompletableFuture.failedFuture(new RuntimeException("inject"))); + when(sysTopicClient.newReaderBuilder()).thenReturn(readerBuilder); + + PulsarTopicProducerStateManagerSnapshotBuffer snapshotBuffer = + new PulsarTopicProducerStateManagerSnapshotBuffer("snapshot-test-topic", sysTopicClient); + CompletableFuture> readerFuture = snapshotBuffer.ensureReaderHandle(); + if (readerFuture != null) { + try { + readerFuture.get(); + fail("should fail"); + } catch (Exception e) { + assertEquals(e.getCause().getMessage(), "inject"); + } + } else { + log.info("This is expected behavior."); + } + } + + @Test(timeOut = 5_000) + public void ensureProducerCaughtExceptionTest() { + SystemTopicClient sysTopicClient = spy(new SystemTopicClient(pulsar, conf)); + ProducerBuilder producerBuilder = spy(sysTopicClient.newProducerBuilder()); + when(producerBuilder.createAsync()).thenReturn(CompletableFuture.failedFuture(new RuntimeException("inject"))); + when(sysTopicClient.newProducerBuilder()).thenReturn(producerBuilder); + + PulsarTopicProducerStateManagerSnapshotBuffer snapshotBuffer = + new PulsarTopicProducerStateManagerSnapshotBuffer("snapshot-test-topic", sysTopicClient); + CompletableFuture> producerFuture = snapshotBuffer.ensureProducerHandle(); + if (producerFuture != null) { + try { + producerFuture.get(); + fail("should fail"); + } catch (Exception e) { + assertEquals(e.getCause().getMessage(), "inject"); + } + } else { + log.info("This is expected behavior."); + } + } + }