Skip to content

Commit 64c99c0

Browse files
committed
Add queuePurge(count) test to PurgeTest
1 parent 27ac699 commit 64c99c0

2 files changed

Lines changed: 35 additions & 2 deletions

File tree

  • activemq-broker/src/main/java/org/apache/activemq/broker/region
  • activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx

activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1341,7 +1341,7 @@ public void purge(long numberOfMessages) throws Exception {
13411341
pagedInMessagesLock.readLock().lock();
13421342
try {
13431343
list = new ArrayList<MessageReference>(pagedInMessages.values());
1344-
}finally {
1344+
} finally {
13451345
pagedInMessagesLock.readLock().unlock();
13461346
}
13471347

@@ -1365,7 +1365,7 @@ public void purge(long numberOfMessages) throws Exception {
13651365
this.destinationStatistics.getMessages().getCount() > 0 &&
13661366
purgeCount < numberOfMessages);
13671367

1368-
if (this.destinationStatistics.getMessages().getCount() > 0) {
1368+
if (numberOfMessages == originalMessageCount && this.destinationStatistics.getMessages().getCount() > 0) {
13691369
LOG.warn("{} after purge {} of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), numberOfMessages, originalMessageCount, this.destinationStatistics.getMessages().getCount());
13701370
}
13711371
} finally {

activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,39 @@ public void testPurge() throws Exception {
112112
producer.close();
113113
}
114114

115+
public void testPurgeCount() throws Exception {
116+
// Send some messages
117+
int messagesSent = 1_000;
118+
int messagesPurge = 200;
119+
120+
connection = connectionFactory.createConnection();
121+
connection.setClientID(clientID);
122+
connection.start();
123+
Session session = connection.createSession(transacted, authMode);
124+
destination = createDestination();
125+
MessageProducer producer = session.createProducer(destination);
126+
for (int i = 0; i < messagesSent; i++) {
127+
Message message = session.createTextMessage("Message: " + i);
128+
producer.send(message);
129+
}
130+
131+
// Now get the QueueViewMBean and purge
132+
String objectNameStr = broker.getBrokerObjectName().toString();
133+
objectNameStr += ",destinationType=Queue,destinationName="+getDestinationString();
134+
ObjectName queueViewMBeanName = assertRegisteredObjectName(objectNameStr);
135+
QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
136+
137+
long count = proxy.getQueueSize();
138+
assertEquals("Queue size", count, messagesSent);
139+
140+
for (int i = 1; i <= 5; i++) {
141+
proxy.purge(messagesPurge);
142+
count = proxy.getQueueSize();
143+
assertEquals("Queue size", count, messagesSent - (messagesPurge * i));
144+
}
145+
producer.close();
146+
}
147+
115148
public void initCombosForTestDelete() {
116149
addCombinationValues("persistenceAdapter", new Object[] {new MemoryPersistenceAdapter(), new KahaDBPersistenceAdapter()});
117150
}

0 commit comments

Comments
 (0)