Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for lockWaitTimeout and statementTimeout options #261

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ public final class MySqlConnectionConfiguration {

private final List<String> sessionVariables;

@Nullable
private final Duration lockWaitTimeout;

@Nullable
private final Duration statementTimeout;

@Nullable
private final Path loadLocalInfilePath;

Expand Down Expand Up @@ -130,7 +136,7 @@ private MySqlConnectionConfiguration(
boolean forceConnectionTimeZoneToSession,
String user, @Nullable CharSequence password, @Nullable String database,
boolean createDatabaseIfNotExist, @Nullable Predicate<String> preferPrepareStatement,
List<String> sessionVariables,
List<String> sessionVariables, @Nullable Duration lockWaitTimeout, @Nullable Duration statementTimeout,
@Nullable Path loadLocalInfilePath, int localInfileBufferSize,
int queryCacheSize, int prepareCacheSize,
Set<CompressionAlgorithm> compressionAlgorithms, int zstdCompressionLevel,
Expand All @@ -154,6 +160,8 @@ private MySqlConnectionConfiguration(
this.createDatabaseIfNotExist = createDatabaseIfNotExist;
this.preferPrepareStatement = preferPrepareStatement;
this.sessionVariables = sessionVariables;
this.lockWaitTimeout = lockWaitTimeout;
this.statementTimeout = statementTimeout;
this.loadLocalInfilePath = loadLocalInfilePath;
this.localInfileBufferSize = localInfileBufferSize;
this.queryCacheSize = queryCacheSize;
Expand Down Expand Up @@ -245,6 +253,16 @@ List<String> getSessionVariables() {
return sessionVariables;
}

@Nullable
Duration getLockWaitTimeout() {
return lockWaitTimeout;
}

@Nullable
Duration getStatementTimeout() {
return statementTimeout;
}

@Nullable
Path getLoadLocalInfilePath() {
return loadLocalInfilePath;
Expand Down Expand Up @@ -309,6 +327,8 @@ public boolean equals(Object o) {
createDatabaseIfNotExist == that.createDatabaseIfNotExist &&
Objects.equals(preferPrepareStatement, that.preferPrepareStatement) &&
sessionVariables.equals(that.sessionVariables) &&
Objects.equals(lockWaitTimeout, that.lockWaitTimeout) &&
Objects.equals(statementTimeout, that.statementTimeout) &&
Objects.equals(loadLocalInfilePath, that.loadLocalInfilePath) &&
localInfileBufferSize == that.localInfileBufferSize &&
queryCacheSize == that.queryCacheSize &&
Expand All @@ -325,9 +345,14 @@ public int hashCode() {
return Objects.hash(isHost, domain, port, ssl, tcpKeepAlive, tcpNoDelay, connectTimeout,
preserveInstants, connectionTimeZone, forceConnectionTimeZoneToSession,
zeroDateOption, user, password, database, createDatabaseIfNotExist,
preferPrepareStatement, sessionVariables, loadLocalInfilePath,
localInfileBufferSize, queryCacheSize, prepareCacheSize, compressionAlgorithms,
zstdCompressionLevel, loopResources, extensions, passwordPublisher);
preferPrepareStatement,
sessionVariables,
lockWaitTimeout,
statementTimeout,
loadLocalInfilePath, localInfileBufferSize,
queryCacheSize, prepareCacheSize,
compressionAlgorithms, zstdCompressionLevel,
loopResources, extensions, passwordPublisher);
}

@Override
Expand All @@ -343,6 +368,8 @@ public String toString() {
", database='" + database + "', createDatabaseIfNotExist=" + createDatabaseIfNotExist +
", preferPrepareStatement=" + preferPrepareStatement +
", sessionVariables=" + sessionVariables +
", lockWaitTimeout=" + lockWaitTimeout +
", statementTimeout=" + statementTimeout +
", loadLocalInfilePath=" + loadLocalInfilePath +
", localInfileBufferSize=" + localInfileBufferSize +
", queryCacheSize=" + queryCacheSize + ", prepareCacheSize=" + prepareCacheSize +
Expand All @@ -361,6 +388,8 @@ public String toString() {
", database='" + database + "', createDatabaseIfNotExist=" + createDatabaseIfNotExist +
", preferPrepareStatement=" + preferPrepareStatement +
", sessionVariables=" + sessionVariables +
", lockWaitTimeout=" + lockWaitTimeout +
", statementTimeout=" + statementTimeout +
", loadLocalInfilePath=" + loadLocalInfilePath +
", localInfileBufferSize=" + localInfileBufferSize +
", queryCacheSize=" + queryCacheSize +
Expand Down Expand Up @@ -433,6 +462,12 @@ public static final class Builder {
@Nullable
private Predicate<String> preferPrepareStatement;

@Nullable
private Duration lockWaitTimeout;

@Nullable
private Duration statementTimeout;

private List<String> sessionVariables = Collections.emptyList();

@Nullable
Expand Down Expand Up @@ -486,7 +521,11 @@ public MySqlConnectionConfiguration build() {
connectionTimeZone,
forceConnectionTimeZoneToSession,
user, password, database,
createDatabaseIfNotExist, preferPrepareStatement, sessionVariables, loadLocalInfilePath,
createDatabaseIfNotExist, preferPrepareStatement,
sessionVariables,
lockWaitTimeout,
statementTimeout,
loadLocalInfilePath,
localInfileBufferSize, queryCacheSize, prepareCacheSize,
compressionAlgorithms, zstdCompressionLevel, loopResources,
Extensions.from(extensions, autodetectExtensions), passwordPublisher);
Expand Down Expand Up @@ -911,6 +950,30 @@ public Builder sessionVariables(String... sessionVariables) {
return this;
}

/**
* Configures the lock wait timeout. Default to use the server-side default value.
*
* @param lockWaitTimeout the lock wait timeout, or {@code null} to use the server-side default value.
* @return {@link Builder this}
* @since 1.1.3
*/
public Builder lockWaitTimeout(@Nullable Duration lockWaitTimeout) {
this.lockWaitTimeout = lockWaitTimeout;
return this;
}

/**
* Configures the statement timeout. Default to use the server-side default value.
*
* @param statementTimeout the statement timeout, or {@code null} to use the server-side default value.
* @return {@link Builder this}
* @since 1.1.3
*/
public Builder statementTimeout(@Nullable Duration statementTimeout) {
this.statementTimeout = statementTimeout;
return this;
}

/**
* Configures to allow the {@code LOAD DATA LOCAL INFILE} statement in the given {@code path} or
* disallow the statement. Default to {@code null} which means not allow the statement.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,20 @@ private static Mono<MySqlConnection> getMySqlConnection(
extensions.forEach(CodecRegistrar.class, registrar ->
registrar.register(allocator, builder));

return MySqlSimpleConnection.init(client, builder.build(), db, queryCache.get(),
Mono<MySqlConnection> c = MySqlSimpleConnection.init(client, builder.build(), db, queryCache.get(),
prepareCache, sessionVariables, prepare);

if (configuration.getLockWaitTimeout() != null) {
c = c.flatMap(connection -> connection.setLockWaitTimeout(configuration.getLockWaitTimeout())
.thenReturn(connection));
}

if (configuration.getStatementTimeout() != null) {
c = c.flatMap(connection -> connection.setStatementTimeout(configuration.getStatementTimeout())
.thenReturn(connection));
}

return c;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@
import static io.r2dbc.spi.ConnectionFactoryOptions.DATABASE;
import static io.r2dbc.spi.ConnectionFactoryOptions.DRIVER;
import static io.r2dbc.spi.ConnectionFactoryOptions.HOST;
import static io.r2dbc.spi.ConnectionFactoryOptions.LOCK_WAIT_TIMEOUT;
import static io.r2dbc.spi.ConnectionFactoryOptions.PASSWORD;
import static io.r2dbc.spi.ConnectionFactoryOptions.PORT;
import static io.r2dbc.spi.ConnectionFactoryOptions.SSL;
import static io.r2dbc.spi.ConnectionFactoryOptions.STATEMENT_TIMEOUT;
import static io.r2dbc.spi.ConnectionFactoryOptions.USER;

/**
Expand Down Expand Up @@ -393,6 +395,10 @@ static MySqlConnectionConfiguration setup(ConnectionFactoryOptions options) {
MySqlConnectionFactoryProvider::splitVariables,
String[]::new
).to(builder::sessionVariables);
mapper.optional(LOCK_WAIT_TIMEOUT).as(Duration.class, Duration::parse)
.to(builder::lockWaitTimeout);
mapper.optional(STATEMENT_TIMEOUT).as(Duration.class, Duration::parse)
.to(builder::statementTimeout);

return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ private static MySqlConnectionConfiguration filledUp() {
.sslHostnameVerifier((host, s) -> true)
.queryCacheSize(128)
.prepareCacheSize(0)
.sessionVariables("sql_mode=ANSI_QUOTES")
.lockWaitTimeout(Duration.ofSeconds(5))
.statementTimeout(Duration.ofSeconds(10))
.autodetectExtensions(false)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,17 +455,15 @@ void validPasswordSupplier() {

@Test
void allConfigurationOptions() {
List<String> exceptConfigs = Arrays.asList(
List<String> exceptConfigs = Arrays.asList(
"extendWith",
"username",
"zeroDateOption");
List<String> exceptOptions = Arrays.asList(
"driver",
"ssl",
"protocol",
"zeroDate",
"lockWaitTimeout",
"statementTimeout");
"zeroDate");
Set<String> allOptions = Stream.concat(
Arrays.stream(ConnectionFactoryOptions.class.getFields()),
Arrays.stream(MySqlConnectionFactoryProvider.class.getFields())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
package io.asyncer.r2dbc.mysql;

import io.asyncer.r2dbc.mysql.internal.util.StringUtils;
import io.r2dbc.spi.R2dbcTimeoutException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.time.ZoneId;
import java.util.LinkedHashMap;
import java.util.Map;
Expand Down Expand Up @@ -122,6 +125,57 @@ void sessionVariables(Map<String, String> variables) {
.verifyComplete();
}

@ParameterizedTest
@ValueSource(strings = { "PT1S", "PT10S", "PT1M" })
void initLockWaitTimeout(String timeout) {
Duration lockWaitTimeout = Duration.parse(timeout);

connectionFactory(builder -> builder.lockWaitTimeout(lockWaitTimeout))
.create()
.flatMapMany(connection -> connection.createStatement("SELECT @@innodb_lock_wait_timeout").execute()
.flatMap(result -> result.map(r -> r.get(0, Long.class)))
.onErrorResume(e -> connection.close().then(Mono.error(e)))
.concatWith(connection.close().then(Mono.empty()))
)
.as(StepVerifier::create)
.expectNext(lockWaitTimeout.getSeconds())
.verifyComplete();
}

@EnabledIf("isGreaterThanOrEqualToMariaDB10_1_1MySql5_7_4")
@ParameterizedTest
@ValueSource(strings = { "PT0.1S", "PT0.5S" })
void initStatementTimeout(String timeout) {
Duration statementTimeout = Duration.parse(timeout);

connectionFactory(builder -> builder.statementTimeout(statementTimeout))
.create()
.flatMapMany(connection -> connection.createStatement("SELECT 1 WHERE SLEEP(1) > 1").execute()
.flatMap(result -> result.map(r -> r.get(0)))
.onErrorResume(e -> connection.close().then(Mono.error(e)))
.concatWith(connection.close().then(Mono.empty()))
)
.as(StepVerifier::create)
.verifyError(R2dbcTimeoutException.class);
}

static boolean isGreaterThanOrEqualToMariaDB10_1_1MySql5_7_4() {
String version = System.getProperty("test.mysql.version");

if (version == null || version.isEmpty()) {
return false;
}

ServerVersion ver = ServerVersion.parse(version);
String type = System.getProperty("test.db.type");

if ("mariadb".equalsIgnoreCase(type)) {
return ver.isGreaterThanOrEqualTo(ServerVersion.create(10, 1, 1));
}

return ver.isGreaterThanOrEqualTo(ServerVersion.create(5, 7, 4));
}

static Stream<Arguments> sessionVariables() {
return Stream.of(
Arguments.of(mapOf("sql_mode", "ANSI_QUOTES")),
Expand Down
Loading