Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.crossdc.common.CrossDcConf;
import org.apache.solr.crossdc.common.IQueueHandler;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
Expand Down Expand Up @@ -94,7 +95,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
private final ThreadPoolExecutor executor;

private final ExecutorService offsetCheckExecutor =
ExecutorUtil.newMDCAwareCachedThreadPool(r -> new Thread(r, "offset-check-thread"));
ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("offset-check-thread"));
private final PartitionManager partitionManager;

private final BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
Expand Down
31 changes: 29 additions & 2 deletions solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,9 @@ static void awaitTermination(ExecutorService pool, long timeout, TimeUnit unit)
pool.shutdownNow();
// Wait again for forced threads to stop.
if (!pool.awaitTermination(timeout, unit)) {
log.error("Threads from pool {} did not forcefully stop.", pool);
throw new RuntimeException("Timeout waiting for pool " + pool + " to shutdown.");
String executorDetails = describeExecutorForLogging(pool);
log.error("Threads from pool did not forcefully stop. {}", executorDetails);
throw new RuntimeException("Timeout waiting for pool to shutdown. " + executorDetails);
}
}
} catch (InterruptedException ie) {
Expand All @@ -174,6 +175,20 @@ static void awaitTermination(ExecutorService pool, long timeout, TimeUnit unit)
}
}

/** Executor logging details which include pool name when executor fails to terminate. */
public static String describeExecutorForLogging(ExecutorService pool) {
if (pool == null) return "";
if (pool instanceof ThreadPoolExecutor poolExecutor) {
ThreadFactory threadFactory = poolExecutor.getThreadFactory();
String poolName =
threadFactory instanceof SolrNamedThreadFactory solrNamedThreadFactory
? solrNamedThreadFactory.getPoolName()
: "";
return "[" + "poolName=" + poolName + "]" + poolExecutor;
}
return "";
}

/**
* Await the termination of an {@link ExecutorService} until all threads are complete, or until we
* are interrupted, at which point the {@link ExecutorService} will be interrupted as well.
Expand Down Expand Up @@ -322,6 +337,18 @@ public MDCAwareThreadPoolExecutor(
this.enableSubmitterStackTrace = true;
}

/** When the thread factory is a {@link SolrNamedThreadFactory}, prefixes the pool name. */
@Override
public String toString() {
ThreadFactory threadFactory = getThreadFactory();
String base = super.toString();
if (threadFactory instanceof SolrNamedThreadFactory solrNamedThreadFactory) {
String poolName = solrNamedThreadFactory.getPoolName();
return "[" + "poolName=" + poolName + "] " + base;
}
return base;
}

@Override
public void execute(final Runnable command) {
final Map<String, String> submitterContext = MDC.getCopyOfContextMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,17 @@ public class SolrNamedThreadFactory implements ThreadFactory {
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String prefix;
private final String poolName;

public SolrNamedThreadFactory(String namePrefix) {
group = getThreadGroup();
prefix = namePrefix + "-" + poolNumber.getAndIncrement() + "-thread-";
poolName = namePrefix;
}

/** Returns the name prefix passed to the constructor as a pool name. */
public String getPoolName() {
return poolName;
}

@SuppressWarnings("removal")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,4 +275,17 @@ public void submitAllWithExceptionsTest() {
ExecutorUtil.shutdownNowAndAwaitTermination(service);
}
}

@Test
public void mdcAwarePoolToStringIncludesPoolName() {
ExecutorService service = ExecutorUtil.newMDCAwareCachedThreadPool("test-async-task");
try {
assertTrue(service.toString(), service.toString().contains("[poolName=test-async-task]"));

String executorForLogging = ExecutorUtil.describeExecutorForLogging(service);
assertTrue(executorForLogging, executorForLogging.contains("[poolName=test-async-task]"));
} finally {
ExecutorUtil.shutdownNowAndAwaitTermination(service);
}
}
}