diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 9d6aeae416d..33b04a62258 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -1455,6 +1455,12 @@ public boolean handleFailover(final RemotingConnection backupConnection, ActiveM } } + for (ClientProducerInternal producer : cloneProducers()) { + synchronized (producer) { + sessionContext.recreateProducerOnServer(producer); + } + } + if ((!autoCommitAcks || !autoCommitSends) && workDone) { // this is protected by a lock, so we can guarantee nothing will sneak here // while we do our work here diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 38233a7f270..85d75f5d874 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -73,6 +73,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage; @@ -130,11 +131,13 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage; +import org.apache.activemq.artemis.core.version.Version; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl; +import org.apache.activemq.artemis.utils.VersionLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; @@ -144,7 +147,7 @@ public class ActiveMQSessionContext extends SessionContext { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final Channel sessionChannel; - private final int serverVersion; + private int serverVersion; private int confirmationWindow; private String name; private boolean killed; @@ -428,6 +431,10 @@ public int getServerVersion() { return serverVersion; } + private void setServerVersion(int version) { + serverVersion = version; + } + @Override public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException { if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V5, getServerVersion())) { @@ -920,6 +927,33 @@ public void recreateSession(final String username, Thread.currentThread().interrupt(); throw e; } + } else if (e.getType() == ActiveMQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS) { + CreateSessionResponseMessage response; + + for (Version version : VersionLoader.getClientVersions()) { + if (version.getIncrementingVersion() == getServerVersion()) { + //We already know this version is incompatible + continue; + } + try { + createRequest = new CreateSessionMessage(name, sessionChannel.getID(), version.getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, null); + response = (CreateSessionResponseMessage) getCreateChannel().sendBlocking(createRequest, PacketImpl.CREATESESSION_RESP); + + setServerVersion(response.getServerVersion()); + + if (getCoreConnection().getChannelVersion() != serverVersion) { + getCoreConnection().setChannelVersion(serverVersion); + } + + return; + } catch (ActiveMQException ex) { + if (ex.getType() != ActiveMQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS) { + throw ex; + } + } + } + throw e; + } else { throw e; } @@ -982,6 +1016,13 @@ public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal, } } + @Override + public void recreateProducerOnServer(ClientProducerInternal producer) { + if (!sessionChannel.getConnection().isBeforeProducerMetricsChanged()) { + sendPacketWithoutLock(sessionChannel, new CreateProducerMessage(producer.getID(), producer.getAddress())); + } + } + @Override public void xaFailed(Xid xid) throws ActiveMQException { sendPacketWithoutLock(sessionChannel, new SessionXAAfterFailedMessage(xid)); @@ -1219,6 +1260,11 @@ private void sendPacketWithoutLock(final Channel parameterChannel, final Packet ActiveMQBuffer buffer = packet.encode(this.getCoreConnection()); conn.write(buffer, false, false); + + if (packet.isRequiresConfirmations()) { + ((ChannelImpl) parameterChannel).addResendPacket(packet); + } + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index c91075b8e17..87b001896e2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -844,12 +844,13 @@ private void doWrite(final Packet packet) { } - private void addResendPacket(Packet packet) { - resendCache.add(packet); + protected void addResendPacket(Packet packet) { + if (resendCache != null) { + resendCache.add(packet); - if (logger.isTraceEnabled()) { - logger.trace("RemotingConnectionID={} ChannelImpl::addResendPacket adding packet {} stored commandID={} possible commandIDr={}", - (connection == null ? "NULL" : connection.getID()), packet, firstStoredCommandID, (firstStoredCommandID + resendCache.size())); + if (logger.isTraceEnabled()) { + logger.trace("RemotingConnectionID={} ChannelImpl::addResendPacket adding packet {} stored commandID={} possible commandIDr={}", (connection == null ? "NULL" : connection.getID()), packet, firstStoredCommandID, (firstStoredCommandID + resendCache.size())); + } } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index 7c33fff2901..d68073e999f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -361,6 +361,8 @@ public abstract void recreateSession(String username, public abstract void recreateConsumerOnServer(ClientConsumerInternal consumerInternal, long consumerId, boolean isSessionStarted) throws ActiveMQException; + public abstract void recreateProducerOnServer(ClientProducerInternal producer); + public abstract void xaFailed(Xid xid) throws ActiveMQException; public abstract void restartSession() throws ActiveMQException; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ClientReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ClientReconnectTest.java new file mode 100644 index 00000000000..17acf382c33 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ClientReconnectTest.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.client; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.Properties; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.ServerProducer; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.version.impl.VersionImpl; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.SpawnedVMCheck; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.SpawnedVMSupport; +import org.apache.activemq.artemis.utils.VersionLoader; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + + +public class ClientReconnectTest extends ActiveMQTestBase { + + @Rule + public SpawnedVMCheck spawnedVMCheck = new SpawnedVMCheck(); + + private ActiveMQServer server; + + private ServerLocator locator; + + private Process serverProcess; + + private final SimpleString QUEUE = new SimpleString("TestQueue"); + + private boolean isNetty() { + return true; + } + + private boolean isDurable() { + return true; + } + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + + server = createServer(isDurable(), isNetty()); + server.start(); + + locator = createFactory(isNetty()); + locator.setReconnectAttempts(10); + locator.setConfirmationWindowSize(1024); + } + + @After + public void cleanup() { + SpawnedVMSupport.forceKill(); + } + + @Test + public void testProducersRecreatedOnReconnect() throws Exception { + ClientSessionFactory sf = locator.createSessionFactory(); + ClientSession session = sf.createSession(true, true); + session.createQueue(new QueueConfiguration(QUEUE).setDurable(isDurable())); + + ClientProducer producer = session.createProducer(QUEUE); + producer.send(session.createMessage(true)); + + Queue serverQueue = server.locateQueue(QUEUE); + Collection serverProducers = server.getSessions().iterator().next().getServerProducers(); + + Assert.assertEquals(1, serverQueue.getMessageCount()); + Assert.assertEquals(1, serverProducers.size()); + + restartServer(server); + + producer.send(session.createMessage(true)); + + serverQueue = server.locateQueue(QUEUE); + serverProducers = server.getSessions().iterator().next().getServerProducers(); + Assert.assertEquals(2, serverQueue.getMessageCount()); + Assert.assertEquals(1, serverProducers.size()); + } + + @Test + public void testNoStrayConfirmationsAfterReconnect() throws Exception { + AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler(); + runAfter(() -> loggerHandler.close()); + + ClientSessionFactory sf = locator.createSessionFactory(); + ClientSession session = sf.createSession(true, true); + session.createQueue(new QueueConfiguration(QUEUE).setDurable(isDurable())); + session.start(); + + ClientProducer producer = session.createProducer(QUEUE); + ClientConsumer consumer = session.createConsumer(QUEUE); + producer.send(session.createMessage(isDurable())); + + Assert.assertNotNull(consumer.receive(1000)); + + restartServer(server); + + producer.send(session.createMessage(isDurable())); + Assert.assertNotNull(consumer.receive(1000)); + + //Force flush to find "missing" packets faster + sf.getConnection().flush(); + Assert.assertFalse(loggerHandler.findText("AMQ212036")); + } + + @Test(timeout = 20000) + public void testIncompatibleVersionAfterReconnect() throws Exception { + String propertiesFileName = "reconnect-activemq-version.properties"; + int clientVersion = VersionLoader.getVersion().getIncrementingVersion(); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); + cf.setReconnectAttempts(-1); + cf.setConfirmationWindowSize(102400); + + Connection connection = cf.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(QUEUE.toString()); + + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + Message message = session.createTextMessage("Message"); + + producer.send(message); + Assert.assertNotNull(consumer.receive(1000)); + + server.stop(true); + waitForServerToStop(server); + + setServerVersionProperties(propertiesFileName, clientVersion - 1); + serverProcess = SpawnedVMSupport.spawnVM("org.apache.activemq.artemis.tests.integration.client.ClientReconnectTest", new String[]{"-D" + VersionLoader.VERSION_PROP_FILE_KEY + "=" + propertiesFileName}, new String[]{}); + + producer.send(message); + Assert.assertNotNull(consumer.receive(1000)); + + session.close(); + connection.close(); + cf.close(); + + Assert.assertEquals(0, serverProcess.waitFor()); + } + + private void runPreviousVersionServer() throws Exception { + Configuration config = createDefaultConfig(isNetty()); + config.setPersistenceEnabled(false); + + ActiveMQServer previousVersionServer = new ActiveMQServerImpl(config); + previousVersionServer.start(); + + Wait.assertTrue(() -> previousVersionServer.locateQueue(QUEUE) != null); + + Queue queue = previousVersionServer.locateQueue(QUEUE); + Wait.assertEquals(1, () -> queue.getMessagesAcknowledged()); + Wait.assertEquals(0, () -> queue.getConsumerCount()); + + previousVersionServer.stop(true); + } + + private void restartServer(ActiveMQServer server) throws Exception { + server.stop(true); + waitForServerToStop(server); + server.start(); + waitForServerToStart(server); + } + + private void setServerVersionProperties(String fileName, int version) throws IOException { + Properties versionProperties = new Properties(); + + InputStream in = VersionImpl.class.getClassLoader().getResourceAsStream("activemq-version.properties"); + versionProperties.load(in); + + versionProperties.setProperty("activemq.version.compatibleVersionList", Integer.toString(version)); + versionProperties.setProperty("activemq.version.incrementingVersion", Integer.toString(version)); + versionProperties.store(new FileOutputStream("target/test-classes/" + fileName), null); + } + + public static void main(String[] args) throws Exception { + ClientReconnectTest clientReconnectTest = new ClientReconnectTest(); + clientReconnectTest.runPreviousVersionServer(); + } + +}