Skip to content

Commit

Permalink
PROTON-2847 Ensure notification executor is shutdown on close
Browse files Browse the repository at this point in the history
When connection fails or is closed, ensure the notification executor is
shutdown.
  • Loading branch information
tabish121 committed Sep 4, 2024
1 parent b45180b commit 6a0c492
Showing 1 changed file with 32 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ ClientConnection connect() throws ClientException {
openFuture.failed(failureCause);
closeFuture.complete(this);
ioContext.shutdown();
notifications.shutdown();

throw failureCause;
}
Expand Down Expand Up @@ -777,23 +778,24 @@ private void handleEngineShutdown(Engine engine) {

ioContext.shutdownAsync();

if (failureCause != null)
{
openFuture.failed(failureCause);
closeFuture.complete(this);
try {
if (failureCause != null) {
openFuture.failed(failureCause);
closeFuture.complete(this);

LOG.warn("Connection {} has failed due to: {}", getId(), failureCause != null ?
failureCause.getClass().getSimpleName() + " -> " + failureCause.getMessage() : "No failure details provided.");
LOG.warn("Connection {} has failed due to: {}", getId(), failureCause != null ?
failureCause.getClass().getSimpleName() + " -> " + failureCause.getMessage() : "No failure details provided.");

submitDisconnectionEvent(options.disconnectedHandler(), transport.getHost(), transport.getPort(), failureCause);
}
else
{
openFuture.complete(this);
closeFuture.complete(this);
}
submitDisconnectionEvent(options.disconnectedHandler(), transport.getHost(), transport.getPort(), failureCause);
} else {
openFuture.complete(this);
closeFuture.complete(this);
}

client.unregisterConnection(this);
client.unregisterConnection(this);
} finally {
submitNotificationShutdownTask();
}
}

private void submitConnectionEvent(BiConsumer<Connection, ConnectionEvent> handler, String host, int port, ClientIOException cause) {
Expand Down Expand Up @@ -828,6 +830,22 @@ private void submitDisconnectionEvent(BiConsumer<Connection, DisconnectionEvent>
}
}

private void submitNotificationShutdownTask() {
try {
if (!notifications.isShutdown()) {
notifications.submit(() -> {
try {
notifications.shutdown();
} catch (Exception ex) {
LOG.trace("Shutdown of notification event handler threw an error: ", ex);
}
});
}
} catch (Exception ex) {
LOG.trace("Error thrown while attempting to submit shutdown of notification executor: ", ex);
}
}

private Engine configureEngineSaslSupport() {
if (options.saslOptions().saslEnabled()) {
SaslMechanismSelector mechSelector =
Expand Down

0 comments on commit 6a0c492

Please sign in to comment.