From b8bc0f5ee4c1a68d177a282f8a4f6175de4152c3 Mon Sep 17 00:00:00 2001 From: Mirro Mutth Date: Tue, 12 Mar 2024 17:42:59 +0900 Subject: [PATCH] Add support for ignore lock wait timeout --- .../r2dbc/mysql/ConnectionContext.java | 18 ++++ .../r2dbc/mysql/MySqlSimpleConnection.java | 82 +++++++++++------ .../io/asyncer/r2dbc/mysql/QueryFlow.java | 90 ++++++++++--------- 3 files changed, 121 insertions(+), 69 deletions(-) diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java index 47ac71076..1b43dcdbc 100644 --- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java +++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java @@ -53,6 +53,8 @@ public final class ConnectionContext implements CodecContext { @Nullable private ZoneId timeZone; + private boolean lockWaitTimeoutSupported = false; + /** * Assume that the auto commit is always turned on, it will be set after handshake V10 request message, or * OK message which means handshake V9 completed. @@ -162,6 +164,22 @@ public int getLocalInfileBufferSize() { return localInfileBufferSize; } + /** + * Checks if the server supports lock wait timeout. + * + * @return if the server supports lock wait timeout. + */ + public boolean isLockWaitTimeoutSupported() { + return lockWaitTimeoutSupported; + } + + /** + * Enables lock wait timeout supported when loading session variables. + */ + public void enableLockWaitTimeoutSupported() { + this.lockWaitTimeoutSupported = true; + } + /** * Get the bitmap of server statuses. * diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlSimpleConnection.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlSimpleConnection.java index f4a2b3746..6e609073c 100644 --- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlSimpleConnection.java +++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlSimpleConnection.java @@ -205,7 +205,7 @@ public Mono beginTransaction() { @Override public Mono beginTransaction(TransactionDefinition definition) { - return Mono.defer(() -> QueryFlow.beginTransaction(client, this, batchSupported, definition)); + return Mono.defer(() -> QueryFlow.beginTransaction(client, this, batchSupported, definition, context)); } @Override @@ -306,8 +306,8 @@ public MySqlConnectionMetadata getMetadata() { } /** - * MySQL does not have any way to query the isolation level of the current transaction, only inferred from - * past statements, so driver can not make sure the result is right. + * MySQL does not have any way to query the isolation level of the current transaction, only inferred from past + * statements, so driver can not make sure the result is right. *

* See MySQL Bug 53341 *

@@ -424,6 +424,11 @@ public boolean isInTransaction() { public Mono setLockWaitTimeout(Duration timeout) { requireNonNull(timeout, "timeout must not be null"); + if (!context.isLockWaitTimeoutSupported()) { + logger.warn("Lock wait timeout is not supported by server, setLockWaitTimeout operation is ignored"); + return Mono.empty(); + } + long timeoutSeconds = timeout.getSeconds(); return QueryFlow.executeVoid(client, "SET innodb_lock_wait_timeout=" + timeoutSeconds) .doOnSuccess(ignored -> this.lockWaitTimeout = this.currentLockWaitTimeout = timeoutSeconds); @@ -484,6 +489,7 @@ static Mono init( ) { Mono connection = initSessionVariables(client, sessionVariables) .then(loadSessionVariables(client, codecs, context)) + .flatMap(data -> loadInnoDbEngineStatus(data, client, codecs, context)) .map(data -> { ZoneId timeZone = data.timeZone; if (timeZone != null) { @@ -491,6 +497,12 @@ static Mono init( context.setTimeZone(timeZone); } + if (data.lockWaitTimeoutSupported) { + context.enableLockWaitTimeoutSupported(); + } else { + logger.info("Lock wait timeout is not supported by server, all related operations will be ignored"); + } + return new MySqlSimpleConnection(client, context, codecs, data.level, data.lockWaitTimeout, queryCache, prepareCache, data.product, prepare); }); @@ -534,10 +546,10 @@ private static Mono initSessionVariables(Client client, List sessi private static Mono loadSessionVariables( Client client, Codecs codecs, ConnectionContext context ) { - StringBuilder query = new StringBuilder(160) + StringBuilder query = new StringBuilder(128) .append("SELECT ") .append(transactionIsolationColumn(context)) - .append(",@@innodb_lock_wait_timeout AS l,@@version_comment AS v"); + .append(",@@version_comment AS v"); Function> handler; @@ -554,6 +566,24 @@ private static Mono loadSessionVariables( .last(); } + private static Mono loadInnoDbEngineStatus( + SessionData data, Client client, Codecs codecs, ConnectionContext context + ) { + return new TextSimpleStatement(client, codecs, context, + "SHOW VARIABLES LIKE 'innodb\\\\_lock\\\\_wait\\\\_timeout'") + .execute() + .flatMap(r -> r.map(readable -> { + String value = readable.get(1, String.class); + + if (value == null || value.isEmpty()) { + return data; + } else { + return data.lockWaitTimeout(Long.parseLong(value)); + } + })) + .single(data); + } + private static Mono initDatabase(Client client, String database) { return client.exchange(new InitDbMessage(database), INIT_DB) .last() @@ -572,16 +602,15 @@ private static Mono initDatabase(Client client, String database) { private static Flux convertSessionData(MySqlResult r, boolean timeZone) { return r.map(readable -> { IsolationLevel level = convertIsolationLevel(readable.get(0, String.class)); - long lockWaitTimeout = convertLockWaitTimeout(readable.get(1, Long.class)); - String product = readable.get(2, String.class); + String product = readable.get(1, String.class); - return new SessionData(level, lockWaitTimeout, product, timeZone ? readZoneId(readable) : null); + return new SessionData(level, product, timeZone ? readZoneId(readable) : null); }); } private static ZoneId readZoneId(Readable readable) { - String systemTimeZone = readable.get(3, String.class); - String timeZone = readable.get(4, String.class); + String systemTimeZone = readable.get(2, String.class); + String timeZone = readable.get(3, String.class); if (timeZone == null || timeZone.isEmpty() || "SYSTEM".equalsIgnoreCase(timeZone)) { if (systemTimeZone == null || systemTimeZone.isEmpty()) { @@ -628,24 +657,13 @@ private static IsolationLevel convertIsolationLevel(@Nullable String name) { return IsolationLevel.REPEATABLE_READ; } - private static long convertLockWaitTimeout(@Nullable Long timeout) { - if (timeout == null) { - logger.error("Lock wait timeout is null, fallback to " + DEFAULT_LOCK_WAIT_TIMEOUT + " seconds"); - - return DEFAULT_LOCK_WAIT_TIMEOUT; - } - - return timeout; - } - /** - * Resolves the column of session isolation level, the {@literal @@tx_isolation} has been marked as - * deprecated. + * Resolves the column of session isolation level, the {@literal @@tx_isolation} has been marked as deprecated. *

* If server is MariaDB, {@literal @@transaction_isolation} is used starting from {@literal 11.1.1}. *

- * If the server is MySQL, use {@literal @@transaction_isolation} starting from {@literal 8.0.3}, or - * between {@literal 5.7.20} and {@literal 8.0.0} (exclusive). + * If the server is MySQL, use {@literal @@transaction_isolation} starting from {@literal 8.0.3}, or between + * {@literal 5.7.20} and {@literal 8.0.0} (exclusive). */ private static String transactionIsolationColumn(ConnectionContext context) { ServerVersion version = context.getServerVersion(); @@ -664,20 +682,26 @@ private static final class SessionData { private final IsolationLevel level; - private final long lockWaitTimeout; - @Nullable private final String product; @Nullable private final ZoneId timeZone; - private SessionData(IsolationLevel level, long lockWaitTimeout, @Nullable String product, - @Nullable ZoneId timeZone) { + private long lockWaitTimeout = -1; + + private boolean lockWaitTimeoutSupported; + + private SessionData(IsolationLevel level, @Nullable String product, @Nullable ZoneId timeZone) { this.level = level; - this.lockWaitTimeout = lockWaitTimeout; this.product = product; this.timeZone = timeZone; } + + SessionData lockWaitTimeout(long timeout) { + this.lockWaitTimeoutSupported = true; + this.lockWaitTimeout = timeout; + return this; + } } } diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java index 7b100cd24..f5911dcb8 100644 --- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java +++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java @@ -107,10 +107,10 @@ final class QueryFlow { }; /** - * Execute multiple bindings of a server-preparing statement with one-by-one binary execution. The - * execution terminates with the last {@link CompleteMessage} or a {@link ErrorMessage}. If client - * receives a {@link ErrorMessage} will cancel subsequent {@link Binding}s. The exchange will be completed - * by {@link CompleteMessage} after receive the last result for the last binding. + * Execute multiple bindings of a server-preparing statement with one-by-one binary execution. The execution + * terminates with the last {@link CompleteMessage} or a {@link ErrorMessage}. If client receives a + * {@link ErrorMessage} will cancel subsequent {@link Binding}s. The exchange will be completed by + * {@link CompleteMessage} after receive the last result for the last binding. * * @param client the {@link Client} to exchange messages with. * @param sql the statement for exception tracing. @@ -133,10 +133,10 @@ static Flux> execute(Client client, String sql, List> execute( /** * Execute a simple compound query. Query execution terminates with the last {@link CompleteMessage} or a - * {@link ErrorMessage}. The {@link ErrorMessage} will emit an exception. The exchange will be completed - * by {@link CompleteMessage} after receive the last result for the last binding. + * {@link ErrorMessage}. The {@link ErrorMessage} will emit an exception. The exchange will be completed by + * {@link CompleteMessage} after receive the last result for the last binding. * * @param client the {@link Client} to exchange messages with. * @param sql the query to execute, can be contains multi-statements. @@ -172,9 +172,9 @@ static Flux> execute(Client client, String sql) { /** * Execute multiple simple compound queries with one-by-one. Query execution terminates with the last - * {@link CompleteMessage} or a {@link ErrorMessage}. The {@link ErrorMessage} will emit an exception and - * cancel subsequent statements' execution. The exchange will be completed by {@link CompleteMessage} - * after receive the last result for the last binding. + * {@link CompleteMessage} or a {@link ErrorMessage}. The {@link ErrorMessage} will emit an exception and cancel + * subsequent statements' execution. The exchange will be completed by {@link CompleteMessage} after receive the + * last result for the last binding. * * @param client the {@link Client} to exchange messages with. * @param statements bundled sql for execute. @@ -195,8 +195,8 @@ static Flux> execute(Client client, List statements) } /** - * Login a {@link Client} and receive the {@code client} after logon. It will emit an exception when - * client receives a {@link ErrorMessage}. + * Login a {@link Client} and receive the {@code client} after logon. It will emit an exception when client receives + * a {@link ErrorMessage}. * * @param client the {@link Client} to exchange messages with. * @param sslMode the {@link SslMode} defines SSL capability and behavior. @@ -219,10 +219,9 @@ static Mono login(Client client, SslMode sslMode, String database, Strin } /** - * Execute a simple query and return a {@link Mono} for the complete signal or error. Query execution - * terminates with the last {@link CompleteMessage} or a {@link ErrorMessage}. The {@link ErrorMessage} - * will emit an exception. The exchange will be completed by {@link CompleteMessage} after receive the - * last result for the last binding. + * Execute a simple query and return a {@link Mono} for the complete signal or error. Query execution terminates + * with the last {@link CompleteMessage} or a {@link ErrorMessage}. The {@link ErrorMessage} will emit an exception. + * The exchange will be completed by {@link CompleteMessage} after receive the last result for the last binding. *

* Note: this method does not support {@code LOCAL INFILE} due to it should be used for excepted queries. * @@ -246,18 +245,19 @@ static Mono executeVoid(Client client, String sql) { } /** - * Begins a new transaction with a {@link TransactionDefinition}. It will change current transaction - * statuses of the {@link ConnectionState}. + * Begins a new transaction with a {@link TransactionDefinition}. It will change current transaction statuses of + * the {@link ConnectionState}. * * @param client the {@link Client} to exchange messages with. * @param state the connection state for checks and sets transaction statuses. * @param batchSupported if connection supports batch query. * @param definition the {@link TransactionDefinition}. + * @param context the {@link ConnectionContext} for initialization. * @return receives complete signal. */ static Mono beginTransaction(Client client, ConnectionState state, boolean batchSupported, - TransactionDefinition definition) { - final StartTransactionState startState = new StartTransactionState(state, definition); + TransactionDefinition definition, ConnectionContext context) { + final StartTransactionState startState = new StartTransactionState(state, definition, context); if (batchSupported) { return client.exchange(new TransactionBatchExchangeable(startState)).then(); @@ -267,8 +267,8 @@ static Mono beginTransaction(Client client, ConnectionState state, boolean } /** - * Commits or rollbacks current transaction. It will recover statuses of the {@link ConnectionState} in - * the initial connection state. + * Commits or rollbacks current transaction. It will recover statuses of the {@link ConnectionState} in the initial + * connection state. * * @param client the {@link Client} to exchange messages with. * @param state the connection state for checks and resets transaction statuses. @@ -298,9 +298,9 @@ static Mono createSavepoint(Client client, ConnectionState state, String n /** * Execute a simple query statement. Query execution terminates with the last {@link CompleteMessage} or a - * {@link ErrorMessage}. The {@link ErrorMessage} will emit an exception. The exchange will be completed - * by {@link CompleteMessage} after receive the last result for the last binding. The exchange will be - * completed by {@link CompleteMessage} after receive the last result for the last binding. + * {@link ErrorMessage}. The {@link ErrorMessage} will emit an exception. The exchange will be completed by + * {@link CompleteMessage} after receive the last result for the last binding. The exchange will be completed by + * {@link CompleteMessage} after receive the last result for the last binding. * * @param client the {@link Client} to exchange messages with. * @param sql the query to execute, can be contains multi-statements. @@ -310,7 +310,8 @@ private static Flux execute0(Client client, String sql) { return client.exchange(new SimpleQueryExchangeable(sql)); } - private QueryFlow() { } + private QueryFlow() { + } } /** @@ -523,12 +524,12 @@ protected String offendingSql() { } /** - * An implementation of {@link FluxExchangeable} that considers server-preparing queries. Which contains a - * built-in state machine. + * An implementation of {@link FluxExchangeable} that considers server-preparing queries. Which contains a built-in + * state machine. *

- * It will reset a prepared statement if cache has matched it, otherwise it will prepare statement to a new - * statement ID and put the ID into the cache. If the statement ID does not exist in the cache after the last - * row sent, the ID will be closed. + * It will reset a prepared statement if cache has matched it, otherwise it will prepare statement to a new statement ID + * and put the ID into the cache. If the statement ID does not exist in the cache after the last row sent, the ID will + * be closed. */ final class PrepareExchangeable extends FluxExchangeable { @@ -813,8 +814,8 @@ private void onCompleteMessage(CompleteMessage message, SynchronousSink - * Not like other {@link FluxExchangeable}s, it is started by a server-side message, which should be an - * implementation of {@link HandshakeRequest}. + * Not like other {@link FluxExchangeable}s, it is started by a server-side message, which should be an implementation + * of {@link HandshakeRequest}. */ final class LoginExchangeable extends FluxExchangeable { @@ -1181,6 +1182,7 @@ boolean cancelTasks() { } if (state.isLockWaitTimeoutChanged()) { + // If server does not support lock wait timeout, the state will not be changed, so it is safe. tasks |= LOCK_WAIT_TIMEOUT; statements.add("SET innodb_lock_wait_timeout=" + state.getSessionLockWaitTimeout()); } @@ -1224,9 +1226,12 @@ final class StartTransactionState extends AbstractTransactionState { private final TransactionDefinition definition; - StartTransactionState(ConnectionState state, TransactionDefinition definition) { + private final ConnectionContext context; + + StartTransactionState(ConnectionState state, TransactionDefinition definition, ConnectionContext context) { super(state); this.definition = definition; + this.context = context; } @Override @@ -1237,9 +1242,14 @@ boolean cancelTasks() { } final Duration timeout = definition.getAttribute(TransactionDefinition.LOCK_WAIT_TIMEOUT); if (timeout != null) { - final long lockWaitTimeout = timeout.getSeconds(); - tasks |= LOCK_WAIT_TIMEOUT; - statements.add("SET innodb_lock_wait_timeout=" + lockWaitTimeout); + if (context.isLockWaitTimeoutSupported()) { + long lockWaitTimeout = timeout.getSeconds(); + tasks |= LOCK_WAIT_TIMEOUT; + statements.add("SET innodb_lock_wait_timeout=" + lockWaitTimeout); + } else { + QueryFlow.logger.warn( + "Lock wait timeout is not supported by server, transaction definition lockWaitTimeout is ignored"); + } } final IsolationLevel isolationLevel = definition.getAttribute(TransactionDefinition.ISOLATION_LEVEL);