diff --git a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java index c3b5bdb3a707..053f316a3d28 100644 --- a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java +++ b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java @@ -51,6 +51,7 @@ import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.IOUtils; +import org.apache.solr.common.util.SolrNamedThreadFactory; import org.apache.solr.crossdc.common.CrossDcConf; import org.apache.solr.crossdc.common.IQueueHandler; import org.apache.solr.crossdc.common.KafkaCrossDcConf; @@ -94,7 +95,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer { private final ThreadPoolExecutor executor; private final ExecutorService offsetCheckExecutor = - ExecutorUtil.newMDCAwareCachedThreadPool(r -> new Thread(r, "offset-check-thread")); + ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("offset-check-thread")); private final PartitionManager partitionManager; private final BlockingQueue queue = new BlockingQueue<>(10); diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java index e6e1cd449417..48ac61f55d87 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java @@ -162,8 +162,9 @@ static void awaitTermination(ExecutorService pool, long timeout, TimeUnit unit) pool.shutdownNow(); // Wait again for forced threads to stop. if (!pool.awaitTermination(timeout, unit)) { - log.error("Threads from pool {} did not forcefully stop.", pool); - throw new RuntimeException("Timeout waiting for pool " + pool + " to shutdown."); + String executorDetails = describeExecutorForLogging(pool); + log.error("Threads from pool did not forcefully stop. {}", executorDetails); + throw new RuntimeException("Timeout waiting for pool to shutdown. " + executorDetails); } } } catch (InterruptedException ie) { @@ -174,6 +175,20 @@ static void awaitTermination(ExecutorService pool, long timeout, TimeUnit unit) } } + /** Executor logging details which include pool name when executor fails to terminate. */ + public static String describeExecutorForLogging(ExecutorService pool) { + if (pool == null) return ""; + if (pool instanceof ThreadPoolExecutor poolExecutor) { + ThreadFactory threadFactory = poolExecutor.getThreadFactory(); + String poolName = + threadFactory instanceof SolrNamedThreadFactory solrNamedThreadFactory + ? solrNamedThreadFactory.getPoolName() + : ""; + return "[" + "poolName=" + poolName + "]" + poolExecutor; + } + return ""; + } + /** * Await the termination of an {@link ExecutorService} until all threads are complete, or until we * are interrupted, at which point the {@link ExecutorService} will be interrupted as well. @@ -322,6 +337,18 @@ public MDCAwareThreadPoolExecutor( this.enableSubmitterStackTrace = true; } + /** When the thread factory is a {@link SolrNamedThreadFactory}, prefixes the pool name. */ + @Override + public String toString() { + ThreadFactory threadFactory = getThreadFactory(); + String base = super.toString(); + if (threadFactory instanceof SolrNamedThreadFactory solrNamedThreadFactory) { + String poolName = solrNamedThreadFactory.getPoolName(); + return "[" + "poolName=" + poolName + "] " + base; + } + return base; + } + @Override public void execute(final Runnable command) { final Map submitterContext = MDC.getCopyOfContextMap(); diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrNamedThreadFactory.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrNamedThreadFactory.java index 32950971cc79..e36bb54db5a7 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/SolrNamedThreadFactory.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrNamedThreadFactory.java @@ -25,10 +25,17 @@ public class SolrNamedThreadFactory implements ThreadFactory { private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String prefix; + private final String poolName; public SolrNamedThreadFactory(String namePrefix) { group = getThreadGroup(); prefix = namePrefix + "-" + poolNumber.getAndIncrement() + "-thread-"; + poolName = namePrefix; + } + + /** Returns the name prefix passed to the constructor as a pool name. */ + public String getPoolName() { + return poolName; } @SuppressWarnings("removal") diff --git a/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java b/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java index f9d6026edd0c..ed739632e84b 100644 --- a/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java +++ b/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java @@ -275,4 +275,17 @@ public void submitAllWithExceptionsTest() { ExecutorUtil.shutdownNowAndAwaitTermination(service); } } + + @Test + public void mdcAwarePoolToStringIncludesPoolName() { + ExecutorService service = ExecutorUtil.newMDCAwareCachedThreadPool("test-async-task"); + try { + assertTrue(service.toString(), service.toString().contains("[poolName=test-async-task]")); + + String executorForLogging = ExecutorUtil.describeExecutorForLogging(service); + assertTrue(executorForLogging, executorForLogging.contains("[poolName=test-async-task]")); + } finally { + ExecutorUtil.shutdownNowAndAwaitTermination(service); + } + } }