Skip to content

Commit

Permalink
failover2 on connect (#1133)
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiyvamz authored Oct 3, 2024
1 parent 249194d commit 31436c5
Showing 1 changed file with 84 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import software.amazon.jdbc.plugin.staledns.AuroraStaleDnsHelper;
import software.amazon.jdbc.targetdriverdialect.TargetDriverDialect;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.PropertyUtils;
import software.amazon.jdbc.util.RdsUrlType;
import software.amazon.jdbc.util.RdsUtils;
import software.amazon.jdbc.util.SqlState;
Expand All @@ -65,6 +66,8 @@ public class FailoverConnectionPlugin extends AbstractConnectionPlugin {
private static final String TELEMETRY_WRITER_FAILOVER = "failover to writer node";
private static final String TELEMETRY_READER_FAILOVER = "failover to replica";

private static final String INTERNAL_CONNECT_PROPERTY_NAME = "76c06979-49c4-4c86-9600-a63605b83f50";

public static final AwsWrapperProperty FAILOVER_TIMEOUT_MS =
new AwsWrapperProperty(
"failoverTimeoutMs",
Expand All @@ -87,10 +90,18 @@ public class FailoverConnectionPlugin extends AbstractConnectionPlugin {
"random",
"The strategy that should be used to select a new reader host while opening a new connection.");

public static final AwsWrapperProperty ENABLE_CONNECT_FAILOVER =
new AwsWrapperProperty(
"enableConnectFailover", "false",
"Enable/disable cluster-aware failover if the initial connection to the database fails due to a "
+ "network exception. Note that this may result in a connection to a different instance in the cluster "
+ "than was specified by the URL.");

private static final Set<String> subscribedMethods =
Collections.unmodifiableSet(new HashSet<String>() {
{
addAll(SubscribedMethodHelper.NETWORK_BOUND_METHODS);
add("connect");
add("initHostProvider");
}
});
Expand Down Expand Up @@ -317,7 +328,6 @@ protected <E extends Exception> void dealWithIllegalStateException(
* @throws SQLException if an error occurs
*/
protected void failover(final HostSpec failedHost) throws SQLException {
this.pluginService.setAvailability(failedHost.asAliases(), HostAvailability.NOT_AVAILABLE);

if (this.failoverMode == FailoverMode.STRICT_WRITER) {
failoverWriter();
Expand Down Expand Up @@ -364,6 +374,9 @@ protected void failoverReader() throws SQLException {
throw new FailoverFailedSQLException(Messages.get("Failover.unableToConnectToReader"));
}

final Properties copyProp = PropertyUtils.copyProperties(this.properties);
copyProp.setProperty(INTERNAL_CONNECT_PROPERTY_NAME, "true");

final List<HostSpec> hosts = this.pluginService.getHosts();
Connection readerCandidateConn = null;
HostSpec readerCandidate = null;
Expand All @@ -389,7 +402,7 @@ protected void failoverReader() throws SQLException {
}

try {
readerCandidateConn = this.pluginService.connect(readerCandidate, this.properties);
readerCandidateConn = this.pluginService.connect(readerCandidate, copyProp);
if (this.pluginService.getHostRole(readerCandidateConn) != HostRole.READER) {
readerCandidateConn.close();
readerCandidateConn = null;
Expand All @@ -412,7 +425,7 @@ protected void failoverReader() throws SQLException {
this.failoverReaderHostSelectorStrategySetting);
if (readerCandidate != null) {
try {
readerCandidateConn = this.pluginService.connect(readerCandidate, this.properties);
readerCandidateConn = this.pluginService.connect(readerCandidate, copyProp);
} catch (SQLException ex) {
readerCandidate = null;
}
Expand Down Expand Up @@ -480,6 +493,8 @@ protected void failoverWriter() throws SQLException {
}

final List<HostSpec> updatedHosts = this.pluginService.getHosts();
final Properties copyProp = PropertyUtils.copyProperties(this.properties);
copyProp.setProperty(INTERNAL_CONNECT_PROPERTY_NAME, "true");

Connection writerCandidateConn = null;
final HostSpec writerCandidate = updatedHosts.stream()
Expand All @@ -489,7 +504,7 @@ protected void failoverWriter() throws SQLException {

if (writerCandidate != null) {
try {
writerCandidateConn = this.pluginService.connect(writerCandidate, this.properties);
writerCandidateConn = this.pluginService.connect(writerCandidate, copyProp);
} catch (SQLException ex) {
// do nothing
}
Expand Down Expand Up @@ -606,4 +621,69 @@ protected boolean canDirectExecute(final String methodName) {
|| methodName.equals(METHOD_IS_CLOSED)
|| methodName.equals(METHOD_ABORT));
}

@Override
public Connection connect(
final String driverProtocol,
final HostSpec hostSpec,
final Properties props,
final boolean isInitialConnection,
final JdbcCallable<Connection, SQLException> connectFunc)
throws SQLException {

if (!ENABLE_CONNECT_FAILOVER.getBoolean(props)) {
return connectFunc.call();
}

// This call was initiated by this failover2 plugin and doesn't require any additional processing.
if (props.containsKey(INTERNAL_CONNECT_PROPERTY_NAME)) {
return connectFunc.call();
}

Connection conn = null;

final HostSpec hostSpecWithAvailability = this.pluginService.getHosts().stream()
.filter(x -> x.getHostAndPort().equals(hostSpec.getHostAndPort()))
.findFirst()
.orElse(null);

if (hostSpecWithAvailability == null
|| hostSpecWithAvailability.getAvailability() != HostAvailability.NOT_AVAILABLE) {

try {
conn = this.staleDnsHelper.getVerifiedConnection(isInitialConnection, this.hostListProviderService,
driverProtocol, hostSpec, props, connectFunc);
} catch (final SQLException e) {
if (!this.shouldExceptionTriggerConnectionSwitch(e)) {
throw e;
}

this.pluginService.setAvailability(hostSpec.asAliases(), HostAvailability.NOT_AVAILABLE);

try {
this.failover(hostSpec);
} catch (FailoverSuccessSQLException failoverSuccessException) {
conn = this.pluginService.getCurrentConnection();
}
}
} else {
try {
this.pluginService.refreshHostList();
this.failover(hostSpec);
} catch (FailoverSuccessSQLException failoverSuccessException) {
conn = this.pluginService.getCurrentConnection();
}
}

if (conn == null) {
// This should be unreachable, the above logic will either get a connection successfully or throw an exception.
throw new SQLException(Messages.get("Failover.unableToConnect"));
}

if (isInitialConnection) {
this.pluginService.refreshHostList(conn);
}

return conn;
}
}

0 comments on commit 31436c5

Please sign in to comment.