From 13df518e2eadf6e42414527998b53111a68b0f45 Mon Sep 17 00:00:00 2001 From: Mirro Mutth Date: Fri, 15 Mar 2024 12:56:57 +0900 Subject: [PATCH] Add lockWaitTimeout and statementTimeout options --- .../mysql/MySqlConnectionConfiguration.java | 73 +++++++++++++++++-- .../r2dbc/mysql/MySqlConnectionFactory.java | 14 +++- .../mysql/MySqlConnectionFactoryProvider.java | 6 ++ .../MySqlConnectionConfigurationTest.java | 3 + .../MySqlConnectionFactoryProviderTest.java | 6 +- .../mysql/SessionStateIntegrationTest.java | 54 ++++++++++++++ 6 files changed, 146 insertions(+), 10 deletions(-) diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java index 5953495ce..c165cf4b7 100644 --- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java +++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java @@ -101,6 +101,12 @@ public final class MySqlConnectionConfiguration { private final List sessionVariables; + @Nullable + private final Duration lockWaitTimeout; + + @Nullable + private final Duration statementTimeout; + @Nullable private final Path loadLocalInfilePath; @@ -130,7 +136,7 @@ private MySqlConnectionConfiguration( boolean forceConnectionTimeZoneToSession, String user, @Nullable CharSequence password, @Nullable String database, boolean createDatabaseIfNotExist, @Nullable Predicate preferPrepareStatement, - List sessionVariables, + List sessionVariables, @Nullable Duration lockWaitTimeout, @Nullable Duration statementTimeout, @Nullable Path loadLocalInfilePath, int localInfileBufferSize, int queryCacheSize, int prepareCacheSize, Set compressionAlgorithms, int zstdCompressionLevel, @@ -154,6 +160,8 @@ private MySqlConnectionConfiguration( this.createDatabaseIfNotExist = createDatabaseIfNotExist; this.preferPrepareStatement = preferPrepareStatement; this.sessionVariables = sessionVariables; + this.lockWaitTimeout = lockWaitTimeout; + this.statementTimeout = statementTimeout; this.loadLocalInfilePath = loadLocalInfilePath; this.localInfileBufferSize = localInfileBufferSize; this.queryCacheSize = queryCacheSize; @@ -245,6 +253,16 @@ List getSessionVariables() { return sessionVariables; } + @Nullable + Duration getLockWaitTimeout() { + return lockWaitTimeout; + } + + @Nullable + Duration getStatementTimeout() { + return statementTimeout; + } + @Nullable Path getLoadLocalInfilePath() { return loadLocalInfilePath; @@ -309,6 +327,8 @@ public boolean equals(Object o) { createDatabaseIfNotExist == that.createDatabaseIfNotExist && Objects.equals(preferPrepareStatement, that.preferPrepareStatement) && sessionVariables.equals(that.sessionVariables) && + Objects.equals(lockWaitTimeout, that.lockWaitTimeout) && + Objects.equals(statementTimeout, that.statementTimeout) && Objects.equals(loadLocalInfilePath, that.loadLocalInfilePath) && localInfileBufferSize == that.localInfileBufferSize && queryCacheSize == that.queryCacheSize && @@ -325,9 +345,14 @@ public int hashCode() { return Objects.hash(isHost, domain, port, ssl, tcpKeepAlive, tcpNoDelay, connectTimeout, preserveInstants, connectionTimeZone, forceConnectionTimeZoneToSession, zeroDateOption, user, password, database, createDatabaseIfNotExist, - preferPrepareStatement, sessionVariables, loadLocalInfilePath, - localInfileBufferSize, queryCacheSize, prepareCacheSize, compressionAlgorithms, - zstdCompressionLevel, loopResources, extensions, passwordPublisher); + preferPrepareStatement, + sessionVariables, + lockWaitTimeout, + statementTimeout, + loadLocalInfilePath, localInfileBufferSize, + queryCacheSize, prepareCacheSize, + compressionAlgorithms, zstdCompressionLevel, + loopResources, extensions, passwordPublisher); } @Override @@ -343,6 +368,8 @@ public String toString() { ", database='" + database + "', createDatabaseIfNotExist=" + createDatabaseIfNotExist + ", preferPrepareStatement=" + preferPrepareStatement + ", sessionVariables=" + sessionVariables + + ", lockWaitTimeout=" + lockWaitTimeout + + ", statementTimeout=" + statementTimeout + ", loadLocalInfilePath=" + loadLocalInfilePath + ", localInfileBufferSize=" + localInfileBufferSize + ", queryCacheSize=" + queryCacheSize + ", prepareCacheSize=" + prepareCacheSize + @@ -361,6 +388,8 @@ public String toString() { ", database='" + database + "', createDatabaseIfNotExist=" + createDatabaseIfNotExist + ", preferPrepareStatement=" + preferPrepareStatement + ", sessionVariables=" + sessionVariables + + ", lockWaitTimeout=" + lockWaitTimeout + + ", statementTimeout=" + statementTimeout + ", loadLocalInfilePath=" + loadLocalInfilePath + ", localInfileBufferSize=" + localInfileBufferSize + ", queryCacheSize=" + queryCacheSize + @@ -433,6 +462,12 @@ public static final class Builder { @Nullable private Predicate preferPrepareStatement; + @Nullable + private Duration lockWaitTimeout; + + @Nullable + private Duration statementTimeout; + private List sessionVariables = Collections.emptyList(); @Nullable @@ -486,7 +521,11 @@ public MySqlConnectionConfiguration build() { connectionTimeZone, forceConnectionTimeZoneToSession, user, password, database, - createDatabaseIfNotExist, preferPrepareStatement, sessionVariables, loadLocalInfilePath, + createDatabaseIfNotExist, preferPrepareStatement, + sessionVariables, + lockWaitTimeout, + statementTimeout, + loadLocalInfilePath, localInfileBufferSize, queryCacheSize, prepareCacheSize, compressionAlgorithms, zstdCompressionLevel, loopResources, Extensions.from(extensions, autodetectExtensions), passwordPublisher); @@ -911,6 +950,30 @@ public Builder sessionVariables(String... sessionVariables) { return this; } + /** + * Configures the lock wait timeout. Default to use the server-side default value. + * + * @param lockWaitTimeout the lock wait timeout, or {@code null} to use the server-side default value. + * @return {@link Builder this} + * @since 1.1.3 + */ + public Builder lockWaitTimeout(@Nullable Duration lockWaitTimeout) { + this.lockWaitTimeout = lockWaitTimeout; + return this; + } + + /** + * Configures the statement timeout. Default to use the server-side default value. + * + * @param statementTimeout the statement timeout, or {@code null} to use the server-side default value. + * @return {@link Builder this} + * @since 1.1.3 + */ + public Builder statementTimeout(@Nullable Duration statementTimeout) { + this.statementTimeout = statementTimeout; + return this; + } + /** * Configures to allow the {@code LOAD DATA LOCAL INFILE} statement in the given {@code path} or * disallow the statement. Default to {@code null} which means not allow the statement. diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java index 495f4ecec..6d76a8bed 100644 --- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java +++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java @@ -174,8 +174,20 @@ private static Mono getMySqlConnection( extensions.forEach(CodecRegistrar.class, registrar -> registrar.register(allocator, builder)); - return MySqlSimpleConnection.init(client, builder.build(), db, queryCache.get(), + Mono c = MySqlSimpleConnection.init(client, builder.build(), db, queryCache.get(), prepareCache, sessionVariables, prepare); + + if (configuration.getLockWaitTimeout() != null) { + c = c.flatMap(connection -> connection.setLockWaitTimeout(configuration.getLockWaitTimeout()) + .thenReturn(connection)); + } + + if (configuration.getStatementTimeout() != null) { + c = c.flatMap(connection -> connection.setStatementTimeout(configuration.getStatementTimeout()) + .thenReturn(connection)); + } + + return c; }); } diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java index 652bfd5fe..88771a372 100644 --- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java +++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java @@ -42,9 +42,11 @@ import static io.r2dbc.spi.ConnectionFactoryOptions.DATABASE; import static io.r2dbc.spi.ConnectionFactoryOptions.DRIVER; import static io.r2dbc.spi.ConnectionFactoryOptions.HOST; +import static io.r2dbc.spi.ConnectionFactoryOptions.LOCK_WAIT_TIMEOUT; import static io.r2dbc.spi.ConnectionFactoryOptions.PASSWORD; import static io.r2dbc.spi.ConnectionFactoryOptions.PORT; import static io.r2dbc.spi.ConnectionFactoryOptions.SSL; +import static io.r2dbc.spi.ConnectionFactoryOptions.STATEMENT_TIMEOUT; import static io.r2dbc.spi.ConnectionFactoryOptions.USER; /** @@ -393,6 +395,10 @@ static MySqlConnectionConfiguration setup(ConnectionFactoryOptions options) { MySqlConnectionFactoryProvider::splitVariables, String[]::new ).to(builder::sessionVariables); + mapper.optional(LOCK_WAIT_TIMEOUT).as(Duration.class, Duration::parse) + .to(builder::lockWaitTimeout); + mapper.optional(STATEMENT_TIMEOUT).as(Duration.class, Duration::parse) + .to(builder::statementTimeout); return builder.build(); } diff --git a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfigurationTest.java b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfigurationTest.java index 717ecaa0f..f050f4e4a 100644 --- a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfigurationTest.java +++ b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfigurationTest.java @@ -250,6 +250,9 @@ private static MySqlConnectionConfiguration filledUp() { .sslHostnameVerifier((host, s) -> true) .queryCacheSize(128) .prepareCacheSize(0) + .sessionVariables("sql_mode=ANSI_QUOTES") + .lockWaitTimeout(Duration.ofSeconds(5)) + .statementTimeout(Duration.ofSeconds(10)) .autodetectExtensions(false) .build(); } diff --git a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProviderTest.java b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProviderTest.java index 41b2ef45a..ab75161c1 100644 --- a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProviderTest.java +++ b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProviderTest.java @@ -455,7 +455,7 @@ void validPasswordSupplier() { @Test void allConfigurationOptions() { - List exceptConfigs = Arrays.asList( + List exceptConfigs = Arrays.asList( "extendWith", "username", "zeroDateOption"); @@ -463,9 +463,7 @@ void allConfigurationOptions() { "driver", "ssl", "protocol", - "zeroDate", - "lockWaitTimeout", - "statementTimeout"); + "zeroDate"); Set allOptions = Stream.concat( Arrays.stream(ConnectionFactoryOptions.class.getFields()), Arrays.stream(MySqlConnectionFactoryProvider.class.getFields()) diff --git a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/SessionStateIntegrationTest.java b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/SessionStateIntegrationTest.java index a121374e9..d44608d55 100644 --- a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/SessionStateIntegrationTest.java +++ b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/SessionStateIntegrationTest.java @@ -17,7 +17,9 @@ package io.asyncer.r2dbc.mysql; import io.asyncer.r2dbc.mysql.internal.util.StringUtils; +import io.r2dbc.spi.R2dbcTimeoutException; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIf; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -25,6 +27,7 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.time.Duration; import java.time.ZoneId; import java.util.LinkedHashMap; import java.util.Map; @@ -122,6 +125,57 @@ void sessionVariables(Map variables) { .verifyComplete(); } + @ParameterizedTest + @ValueSource(strings = { "PT1S", "PT10S", "PT1M" }) + void initLockWaitTimeout(String timeout) { + Duration lockWaitTimeout = Duration.parse(timeout); + + connectionFactory(builder -> builder.lockWaitTimeout(lockWaitTimeout)) + .create() + .flatMapMany(connection -> connection.createStatement("SELECT @@innodb_lock_wait_timeout").execute() + .flatMap(result -> result.map(r -> r.get(0, Long.class))) + .onErrorResume(e -> connection.close().then(Mono.error(e))) + .concatWith(connection.close().then(Mono.empty())) + ) + .as(StepVerifier::create) + .expectNext(lockWaitTimeout.getSeconds()) + .verifyComplete(); + } + + @EnabledIf("isGreaterThanOrEqualToMariaDB10_1_1MySql5_7_4") + @ParameterizedTest + @ValueSource(strings = { "PT0.1S", "PT0.5S" }) + void initStatementTimeout(String timeout) { + Duration statementTimeout = Duration.parse(timeout); + + connectionFactory(builder -> builder.statementTimeout(statementTimeout)) + .create() + .flatMapMany(connection -> connection.createStatement("SELECT 1 WHERE SLEEP(1) > 1").execute() + .flatMap(result -> result.map(r -> r.get(0))) + .onErrorResume(e -> connection.close().then(Mono.error(e))) + .concatWith(connection.close().then(Mono.empty())) + ) + .as(StepVerifier::create) + .verifyError(R2dbcTimeoutException.class); + } + + static boolean isGreaterThanOrEqualToMariaDB10_1_1MySql5_7_4() { + String version = System.getProperty("test.mysql.version"); + + if (version == null || version.isEmpty()) { + return false; + } + + ServerVersion ver = ServerVersion.parse(version); + String type = System.getProperty("test.db.type"); + + if ("mariadb".equalsIgnoreCase(type)) { + return ver.isGreaterThanOrEqualTo(ServerVersion.create(10, 1, 1)); + } + + return ver.isGreaterThanOrEqualTo(ServerVersion.create(5, 7, 4)); + } + static Stream sessionVariables() { return Stream.of( Arguments.of(mapOf("sql_mode", "ANSI_QUOTES")),