Skip to content

Commit 48da854

Browse files
ARTEMIS-5982 Fix start Ordering between Acceptor and Mirroring
1 parent 461d977 commit 48da854

3 files changed

Lines changed: 264 additions & 6 deletions

File tree

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ private void createBrokerConnection(AMQPBrokerConnectConfiguration configuration
9393
if (configuration.getLockCoordinator() != null) {
9494
LockCoordinator lockCoordinator = server.getLockCoordinator(configuration.getLockCoordinator());
9595
if (lockCoordinator == null) {
96-
throw new IllegalStateException("lock coordinator " + configuration.getName() + " not found");
96+
throw new IllegalStateException("lock coordinator " + configuration.getLockCoordinator() + " not found");
9797
}
9898
amqpBrokerConnection.setLockCoordinator(lockCoordinator);
9999
}

artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -468,14 +468,16 @@ public synchronized void start() throws Exception {
468468
if (lockCoordinator == null) {
469469
internalStart();
470470
} else {
471-
// The Acceptor needs to start before anything else, so low priority to start
472-
lockCoordinator.onLockAcquired(this::internalStart, LockCoordinator.LOW_PRIORITY);
473-
// And the Acceptor needs to stop after everything else, so high priority to stop
474-
lockCoordinator.onLockReleased(this::internalStop, LockCoordinator.HIGH_PRIORITY);
471+
// The acceptor needs to be the last to start
472+
// Mirroring and other components that may be capturing events need to be started before the acceptor
473+
lockCoordinator.onLockAcquired(this::internalStart, LockCoordinator.HIGH_PRIORITY);
474+
// And the Acceptor needs to stop before everything else, so low priority to stop
475+
// This is because we need to stop capturing events to avoid missing events
476+
lockCoordinator.onLockReleased(this::internalStop, LockCoordinator.LOW_PRIORITY);
475477
}
476478
}
477479

478-
private void internalStart() throws Exception {
480+
protected void internalStart() throws Exception {
479481
if (channelClazz != null) {
480482
// Already started
481483
return;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.artemis.tests.integration.amqp.connect;
18+
19+
import javax.jms.Connection;
20+
import javax.jms.ConnectionFactory;
21+
import javax.jms.MessageProducer;
22+
import javax.jms.Session;
23+
import java.lang.invoke.MethodHandles;
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.Executor;
28+
import java.util.concurrent.ScheduledExecutorService;
29+
import java.util.concurrent.TimeUnit;
30+
31+
import org.apache.activemq.artemis.api.core.QueueConfiguration;
32+
import org.apache.activemq.artemis.api.core.RoutingType;
33+
import org.apache.activemq.artemis.api.core.TransportConfiguration;
34+
import org.apache.activemq.artemis.core.config.LockCoordinatorConfiguration;
35+
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
36+
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
37+
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
38+
import org.apache.activemq.artemis.core.server.ActiveMQServer;
39+
import org.apache.activemq.artemis.core.server.Queue;
40+
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
41+
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
42+
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
43+
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
44+
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
45+
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
46+
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
47+
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
48+
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
49+
import org.apache.activemq.artemis.tests.util.CFUtil;
50+
import org.apache.activemq.artemis.utils.RandomUtil;
51+
import org.apache.activemq.artemis.utils.Wait;
52+
import org.apache.activemq.artemis.utils.actors.OrderedExecutor;
53+
import org.junit.jupiter.api.AfterEach;
54+
import org.junit.jupiter.api.BeforeEach;
55+
import org.junit.jupiter.api.Test;
56+
import org.slf4j.Logger;
57+
import org.slf4j.LoggerFactory;
58+
59+
import static org.junit.jupiter.api.Assertions.assertFalse;
60+
import static org.junit.jupiter.api.Assertions.assertNotNull;
61+
import static org.junit.jupiter.api.Assertions.assertTrue;
62+
63+
public class LockCoordinatorStartOrderTest extends AmqpClientTestSupport {
64+
65+
protected static final int AMQP_PORT_2 = 5673;
66+
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
67+
ActiveMQServer server_2;
68+
private AssertionLoggerHandler loggerHandler;
69+
70+
@AfterEach
71+
public void stopServer1() throws Exception {
72+
if (server != null) {
73+
server.stop();
74+
}
75+
}
76+
77+
@AfterEach
78+
public void stopServer2() throws Exception {
79+
if (server_2 != null) {
80+
server_2.stop();
81+
}
82+
}
83+
84+
@BeforeEach
85+
public void startLogging() {
86+
loggerHandler = new AssertionLoggerHandler();
87+
88+
}
89+
90+
@AfterEach
91+
public void stopLogging() throws Exception {
92+
try {
93+
assertFalse(loggerHandler.findText("AMQ222214"));
94+
} finally {
95+
loggerHandler.close();
96+
}
97+
}
98+
99+
@Override
100+
protected ActiveMQServer createServer() throws Exception {
101+
return createServer(AMQP_PORT, false);
102+
}
103+
104+
@Test
105+
public void testValidateAcceptorStartOrder() throws Exception {
106+
String queueName = getQueueName() + RandomUtil.randomUUIDString();
107+
108+
server.setIdentity("Server1");
109+
{
110+
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(300).setRetryInterval(100);
111+
amqpConnection.setLockCoordinator("theLock");
112+
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true).setQueueCreation(true));
113+
server.getConfiguration().addAMQPConnection(amqpConnection);
114+
115+
HashMap<String, Object> params = new HashMap();
116+
params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, 60_000);
117+
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
118+
transportConfiguration.setLockCoordinator("theLock");
119+
transportConfiguration.setName("locked");
120+
server.getConfiguration().addAcceptorConfiguration(transportConfiguration);
121+
HashMap<String, String> properties = new HashMap<>();
122+
123+
properties.put("locks-folder", getTemporaryDir());
124+
LockCoordinatorConfiguration lockCoordinatorConfiguration = new LockCoordinatorConfiguration(properties);
125+
lockCoordinatorConfiguration.setName("theLock").setClassName("org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager").setCheckPeriod(100).setLockId("theLock");
126+
server.getConfiguration().addLockCoordinatorConfiguration(lockCoordinatorConfiguration);
127+
server.getConfiguration().addQueueConfiguration(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
128+
}
129+
server.start();
130+
131+
server_2 = createServer(AMQP_PORT_2, false);
132+
server_2.setIdentity("Server2");
133+
134+
{
135+
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(300).setRetryInterval(100);
136+
amqpConnection.setLockCoordinator("theLock");
137+
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
138+
server_2.getConfiguration().addAMQPConnection(amqpConnection);
139+
140+
HashMap<String, Object> params = new HashMap();
141+
params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, 60_000);
142+
TransportConfiguration transportConfiguration = new TransportConfiguration(LockCoordinatorNettyFactory.class.getName(), params);
143+
transportConfiguration.setLockCoordinator("theLock");
144+
transportConfiguration.setName("locked");
145+
server_2.getConfiguration().addAcceptorConfiguration(transportConfiguration);
146+
147+
HashMap<String, String> properties = new HashMap<>();
148+
properties.put("locks-folder", getTemporaryDir());
149+
LockCoordinatorConfiguration lockCoordinatorConfiguration = new LockCoordinatorConfiguration(properties);
150+
lockCoordinatorConfiguration.setName("theLock").setClassName("org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager").setCheckPeriod(100).setLockId("theLock");
151+
server_2.getConfiguration().addLockCoordinatorConfiguration(lockCoordinatorConfiguration);
152+
server_2.getConfiguration().addQueueConfiguration(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
153+
}
154+
155+
server_2.start();
156+
157+
Wait.assertNotNull(() -> server_2.locateQueue(queueName), 5000, 100);
158+
159+
CountDownLatch started = new CountDownLatch(1);
160+
CountDownLatch waitSend = new CountDownLatch(1);
161+
162+
ReplacedNettyAcceptor replacedNettyAcceptor = (ReplacedNettyAcceptor) server_2.getRemotingService().getAcceptor("locked");
163+
assertNotNull(replacedNettyAcceptor);
164+
replacedNettyAcceptor.setAfterStartCallback(() -> {
165+
try {
166+
started.countDown();
167+
waitSend.await(10, TimeUnit.SECONDS);
168+
} catch (Throwable ignored) {
169+
}
170+
});
171+
172+
173+
server.stop();
174+
175+
assertTrue(started.await(10, TimeUnit.SECONDS));
176+
177+
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:60000");
178+
try (Connection connection = factory.createConnection()) {
179+
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
180+
MessageProducer producer = session.createProducer(session.createQueue(queueName));
181+
producer.send(session.createTextMessage("RacedMessage"));
182+
session.commit();
183+
}
184+
185+
waitSend.countDown();
186+
187+
server.start();
188+
189+
Queue queueOnServer1 = server.locateQueue(queueName);
190+
191+
assertNotNull(queueOnServer1);
192+
193+
Wait.assertEquals(1L, queueOnServer1::getMessageCount, 5000, 100);
194+
195+
server_2.stop();
196+
197+
}
198+
199+
200+
public static class LockCoordinatorNettyFactory implements AcceptorFactory {
201+
202+
@Override
203+
public Acceptor createAcceptor(String name,
204+
ClusterConnection connection,
205+
Map<String, Object> configuration,
206+
BufferHandler handler,
207+
ServerConnectionLifeCycleListener listener,
208+
Executor threadPool,
209+
ScheduledExecutorService scheduledThreadPool,
210+
Map<String, ProtocolManager> protocolMap,
211+
String threadFactoryGroupName,
212+
MetricsManager metricsManager) {
213+
Executor failureExecutor = new OrderedExecutor(threadPool);
214+
return new ReplacedNettyAcceptor(name, connection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap, threadFactoryGroupName, metricsManager);
215+
}
216+
}
217+
218+
219+
public static class ReplacedNettyAcceptor extends NettyAcceptor {
220+
221+
private Runnable afterStartCallback;
222+
223+
public ReplacedNettyAcceptor(String name,
224+
ClusterConnection clusterConnection,
225+
Map<String, Object> configuration,
226+
BufferHandler handler,
227+
ServerConnectionLifeCycleListener listener,
228+
ScheduledExecutorService scheduledThreadPool,
229+
Executor failureExecutor,
230+
Map<String, ProtocolManager> protocolMap,
231+
String threadFactoryGroupName,
232+
MetricsManager metricsManager) {
233+
super(name, clusterConnection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap, threadFactoryGroupName, metricsManager);
234+
}
235+
236+
237+
@Override
238+
protected void internalStart() throws Exception {
239+
super.internalStart();
240+
if (afterStartCallback != null) {
241+
afterStartCallback.run();
242+
}
243+
}
244+
245+
public Runnable getAfterStartCallback() {
246+
return afterStartCallback;
247+
}
248+
249+
public ReplacedNettyAcceptor setAfterStartCallback(Runnable afterStartCallback) {
250+
this.afterStartCallback = afterStartCallback;
251+
return this;
252+
}
253+
}
254+
255+
256+
}

0 commit comments

Comments
 (0)