Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions solr/cross-dc-manager/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies {
implementation libs.opentelemetry.sdk.metrics
implementation libs.eclipse.jetty.server
implementation libs.eclipse.jetty.ee10.servlet
implementation libs.google.guava
implementation libs.jakarta.servlet.api
implementation libs.slf4j.api
runtimeOnly libs.google.protobuf.javautils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

public static final String PROP_TOPIC_DEBUG = "solr.crossdc.consumer.topic.debug";

private final KafkaConsumer<String, MirroredSolrRequest<?>> kafkaConsumer;
private final AdminClient adminClient;
private final CountDownLatch startLatch;
Expand All @@ -101,6 +103,8 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {

private volatile boolean running = false;

private boolean topicDebug = Boolean.parseBoolean(System.getProperty(PROP_TOPIC_DEBUG, "false"));

/**
* Supplier for creating and managing a working CloudSolrClient instance. This class ensures that
* the CloudSolrClient instance doesn't try to use its {@link
Expand Down Expand Up @@ -175,6 +179,7 @@ public KafkaCrossDcConsumer(
conf.get(CrossDcConf.COLLAPSE_UPDATES), CrossDcConf.CollapseUpdates.PARTIAL);
this.maxCollapseRecords = conf.getInt(KafkaCrossDcConf.MAX_COLLAPSE_RECORDS);
this.startLatch = startLatch;

final Properties kafkaConsumerProps = new Properties();

kafkaConsumerProps.put(
Expand Down Expand Up @@ -375,6 +380,9 @@ boolean pollAndProcessRequests() {
ConsumerRecord<String, MirroredSolrRequest<?>> lastRecord = null;

for (TopicPartition partition : records.partitions()) {
if (log.isTraceEnabled()) {
log.trace("Checking partition {}", partition.partition());
}
List<ConsumerRecord<String, MirroredSolrRequest<?>>> partitionRecords =
records.records(partition);

Expand All @@ -396,19 +404,31 @@ boolean pollAndProcessRequests() {

metrics.incrementInputMsgCounter();
lastRecord = requestRecord;
MirroredSolrRequest<?> req = requestRecord.value();
SolrRequest<?> solrReq = req.getSolrRequest();
MirroredSolrRequest.Type type = req.getType();
final MirroredSolrRequest<?> req = requestRecord.value();
final SolrRequest<?> solrReq = req.getSolrRequest();
final MirroredSolrRequest.Type type = req.getType();

if (type != MirroredSolrRequest.Type.UPDATE) {
String action = solrReq.getParams().get("action", "unknown");
metrics.incrementInputReqCounter(type.name(), action);
}

ModifiableSolrParams params = new ModifiableSolrParams(solrReq.getParams());
final ModifiableSolrParams params = new ModifiableSolrParams(solrReq.getParams());
if (log.isTraceEnabled()) {
log.trace("-- picked type={}, params={}", req.getType(), params);
}
if (topicDebug) {
solrReq.addHeader("topic.debug", "true");
solrReq.addHeader("record.topic", requestRecord.topic());
solrReq.addHeader("record.partition", String.valueOf(requestRecord.partition()));
solrReq.addHeader("record.offset", String.valueOf(requestRecord.offset()));
solrReq.addHeader("record.timestamp", String.valueOf(requestRecord.timestamp()));
solrReq.addHeader("record.key", requestRecord.key());
solrReq.addHeader("workUnit.nextOffset", String.valueOf(workUnit.nextOffset));
solrReq.addHeader("workUnit.partition", String.valueOf(workUnit.partition));
solrReq.addHeader("workUnit.topic", workUnit.topic);
solrReq.addHeader("workUnit.items", String.valueOf(workUnit.workItems.size()));
}

// determine if it's an UPDATE with deletes, or if the existing batch has deletes
boolean hasDeletes = false;
Expand Down Expand Up @@ -450,6 +470,7 @@ boolean pollAndProcessRequests() {
if (updateReqBatch == null) {
// just initialize
updateReqBatch = new UpdateRequest();
updateReqBatch.addHeaders(solrReq.getHeaders());
} else {
if (collapseUpdates == CrossDcConf.CollapseUpdates.NONE) {
throw new RuntimeException("Can't collapse requests.");
Expand Down Expand Up @@ -490,6 +511,7 @@ boolean pollAndProcessRequests() {

if (updateReqBatch != null) {
sendBatch(updateReqBatch, MirroredSolrRequest.Type.UPDATE, lastRecord, workUnit);
updateReqBatch = null;
}
try {
partitionManager.checkForOffsetUpdates(partition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.solr.crossdc.manager.consumer;

import com.google.common.annotations.VisibleForTesting;
import java.lang.invoke.MethodHandles;
import java.util.ArrayDeque;
import java.util.HashSet;
Expand Down Expand Up @@ -44,13 +45,16 @@ static class PartitionWork {
final Queue<WorkUnit> partitionQueue = new ArrayDeque<>();
}

static class WorkUnit {
final TopicPartition partition;
Set<Future<?>> workItems = new HashSet<>();
@VisibleForTesting
public static class WorkUnit {
final int partition;
final String topic;
final Set<Future<?>> workItems = new HashSet<>();
long nextOffset;

public WorkUnit(TopicPartition partition) {
this.partition = partition;
WorkUnit(TopicPartition partition) {
this.partition = partition.partition();
this.topic = partition.topic();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
*/
package org.apache.solr.crossdc.manager;

import static org.apache.solr.crossdc.common.CrossDcConf.COLLAPSE_UPDATES;
import static org.apache.solr.crossdc.common.KafkaCrossDcConf.BATCH_SIZE_BYTES;
import static org.apache.solr.crossdc.common.KafkaCrossDcConf.BOOTSTRAP_SERVERS;
import static org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE;
import static org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE_DOCS;
import static org.apache.solr.crossdc.common.KafkaCrossDcConf.TOPIC_NAME;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
Expand All @@ -28,14 +32,18 @@
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.IntStream;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -45,6 +53,7 @@
import org.apache.solr.SolrIgnoredThreadsFilter;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.jetty.HttpJettySolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
Expand All @@ -56,14 +65,19 @@
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.Utils;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
import org.apache.solr.crossdc.manager.consumer.Consumer;
import org.apache.solr.crossdc.manager.consumer.ConsumerMetrics;
import org.apache.solr.crossdc.manager.consumer.KafkaCrossDcConsumer;
import org.apache.solr.crossdc.manager.consumer.PartitionManager;
import org.apache.solr.util.SolrKafkaTestsIgnoredThreadsFilter;
import org.junit.After;
import org.junit.Before;
Expand All @@ -90,11 +104,64 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
private static final int NUM_BROKERS = 1;
public EmbeddedKafkaCluster kafkaCluster;

private static class ConsumerBatch {
final String kafkaTopic;
final int partitionId;
final MirroredSolrRequest.Type type;
final String collection;
final Map<String, String> headers;
final Set<String> addIds = new HashSet<>();
final String json;

public ConsumerBatch(final MirroredSolrRequest.Type type, final SolrRequest<?> solrRequest) {
this.kafkaTopic = solrRequest.getHeaders().get("record.topic");
this.partitionId = Integer.parseInt(solrRequest.getHeaders().get("record.partition"));
this.type = type;
this.collection = solrRequest.getCollection();
this.headers = solrRequest.getHeaders();
if (solrRequest instanceof UpdateRequest) {
UpdateRequest updateReq = (UpdateRequest) solrRequest;
json =
Utils.toJSONString(
Map.of("params", updateReq.getParams(), "add", updateReq.getDocuments()));
updateReq.getDocuments().forEach(doc -> addIds.add(doc.getFieldValue("id").toString()));
} else {
json =
Utils.toJSONString(
Map.of("params", solrRequest.getParams(), "class", solrRequest.getClass()));
}
}

@Override
public String toString() {
return "ConsumerBatch{"
+ "kafkaTopic='"
+ kafkaTopic
+ '\''
+ ", partitionId="
+ partitionId
+ ", type="
+ type
+ ", collection='"
+ collection
+ '\''
+ ", headers="
+ headers
+ '\''
+ ", json='"
+ json
+ '\''
+ '}';
}
}

protected volatile MiniSolrCloudCluster solrCluster1;
protected volatile MiniSolrCloudCluster solrCluster2;

protected volatile Consumer consumer;

private List<ConsumerBatch> consumerBatches;

private static final String TOPIC = "topic1";

private static final String COLLECTION = "collection1";
Expand All @@ -112,7 +179,28 @@ public void beforeSolrAndKafkaIntegrationTest() throws Exception {
Thread.setDefaultUncaughtExceptionHandler(
(t, e) -> log.error("Uncaught exception in thread {}", t, e));
System.setProperty("otel.metrics.exporter", "prometheus");
consumer = new Consumer();
System.setProperty(KafkaCrossDcConsumer.PROP_TOPIC_DEBUG, "true");
consumerBatches = new ArrayList<>();
consumer =
new Consumer() {
@Override
protected CrossDcConsumer getCrossDcConsumer(
final KafkaCrossDcConf conf,
final ConsumerMetrics metrics,
final CountDownLatch startLatch) {
return new KafkaCrossDcConsumer(conf, metrics, startLatch) {
@Override
public void sendBatch(
final SolrRequest<? extends SolrResponse> solrReqBatch,
final MirroredSolrRequest.Type type,
final ConsumerRecord<String, MirroredSolrRequest<?>> lastRecord,
final PartitionManager.WorkUnit workUnit) {
consumerBatches.add(new ConsumerBatch(type, solrReqBatch));
super.sendBatch(solrReqBatch, type, lastRecord, workUnit);
}
};
}
};
Properties config = new Properties();

kafkaCluster =
Expand All @@ -124,13 +212,15 @@ public String bootstrapServers() {
};
kafkaCluster.start();

kafkaCluster.createTopic(TOPIC, 10, 1);
// create many partitions to test for re-ordered reads
kafkaCluster.createTopic(TOPIC, 3, 1);

// ensure small batches to test multi-partition ordering
System.setProperty("batchSizeBytes", "128");
System.setProperty("solr.crossdc.topicName", TOPIC);
System.setProperty("solr.crossdc.bootstrapServers", kafkaCluster.bootstrapServers());
System.setProperty(BATCH_SIZE_BYTES, "100");
System.setProperty(TOPIC_NAME, TOPIC);
System.setProperty(BOOTSTRAP_SERVERS, kafkaCluster.bootstrapServers());
System.setProperty(INDEX_UNMIRRORABLE_DOCS, "false");
System.setProperty(COLLAPSE_UPDATES, "none");

solrCluster1 =
configureCluster(1).addConfig("conf", getFile("configs/cloud-minimal/conf")).configure();
Expand Down Expand Up @@ -238,10 +328,62 @@ public void testProducerToCloud() throws Exception {
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.";

@Test
@Ignore("SOLR-18077")
public void testPartitioning() throws Exception {
CollectionAdminRequest.Create create =
CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1, 1);
create.process(solrCluster1.getSolrClient());
create.process(solrCluster2.getSolrClient());
solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1);
solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);

CloudSolrClient client = solrCluster1.getSolrClient();
int NUM_DOCS = 200;
for (int i = 0; i < NUM_DOCS; i++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "id-" + i);
doc.addField("id_i", i);
doc.addField("text", "some test with a relatively long field. " + LOREM_IPSUM);
doc.addField("collection_t", COLLECTION);

client.add(COLLECTION, doc);

doc = new SolrInputDocument();
doc.addField("id", "id-" + i);
doc.addField("id_i", i);
doc.addField("text", "some test with a relatively long field. " + LOREM_IPSUM);
doc.addField("collection_t", ALT_COLLECTION);

client.add(ALT_COLLECTION, doc);
}
client.commit(COLLECTION);
client.commit(ALT_COLLECTION);
// check that updates to different collections were always sent to the same partition
Map<Integer, String> partitionsPerCol = new HashMap<>();
Map<String, Set<String>> docsPerCol = new HashMap<>();
for (ConsumerBatch batch : consumerBatches) {
String collection =
partitionsPerCol.computeIfAbsent(batch.partitionId, k -> batch.collection);
docsPerCol.computeIfAbsent(collection, col -> new HashSet<>()).addAll(batch.addIds);
assertEquals(
"request in partition "
+ batch.partitionId
+ " has wrong collection "
+ batch.collection
+ ": "
+ batch
+ "\npartitions: "
+ partitionsPerCol,
collection,
batch.collection);
}
docsPerCol.forEach(
(col, ids) -> assertEquals("incorrect count in collection " + col, NUM_DOCS, ids.size()));
}

@Test
public void testStrictOrdering() throws Exception {
CloudSolrClient client = solrCluster1.getSolrClient();
int NUM_DOCS = 5000;
int NUM_DOCS = 1000;
// delay deletes by this many docs
int DELTA = 100;
for (int i = 0; i < NUM_DOCS; i++) {
Expand Down Expand Up @@ -454,11 +596,12 @@ private void assertClusterEventuallyHasDocs(
boolean foundUpdates = false;
for (int i = 0; i < 100; i++) {
client.commit(collection);
results = client.query(collection, new SolrQuery(query));
results =
client.query(collection, new SolrQuery(CommonParams.Q, query, CommonParams.FL, "*"));
if (results.getResults().getNumFound() == expectedNumDocs) {
foundUpdates = true;
} else {
Thread.sleep(200);
Thread.sleep(300);
}
}

Expand Down
Loading