Skip to content

Commit

Permalink
Add InitFlow and move session states to context
Browse files Browse the repository at this point in the history
  • Loading branch information
mirromutth committed Mar 27, 2024
1 parent d533e1e commit 19fdae9
Show file tree
Hide file tree
Showing 15 changed files with 1,372 additions and 1,079 deletions.
151 changes: 135 additions & 16 deletions r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@

package io.asyncer.r2dbc.mysql;

import io.asyncer.r2dbc.mysql.cache.PrepareCache;
import io.asyncer.r2dbc.mysql.codec.CodecContext;
import io.asyncer.r2dbc.mysql.collation.CharCollation;
import io.asyncer.r2dbc.mysql.constant.ServerStatuses;
import io.asyncer.r2dbc.mysql.constant.ZeroDateOption;
import io.r2dbc.spi.IsolationLevel;
import org.jetbrains.annotations.Nullable;

import java.nio.file.Path;
import java.time.Duration;
import java.time.ZoneId;

import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull;
Expand All @@ -37,6 +40,10 @@ public final class ConnectionContext implements CodecContext {

private static final ServerVersion NONE_VERSION = ServerVersion.create(0, 0, 0);

private static final ServerVersion MYSQL_5_7_4 = ServerVersion.create(5, 7, 4);

private static final ServerVersion MARIA_10_1_1 = ServerVersion.create(10, 1, 1, true);

private final ZeroDateOption zeroDateOption;

@Nullable
Expand All @@ -52,16 +59,47 @@ public final class ConnectionContext implements CodecContext {

private Capability capability = Capability.DEFAULT;

private PrepareCache prepareCache;

@Nullable
private ZoneId timeZone;

private String product = "Unknown";

/**
* Current isolation level inferred by past statements.
* <p>
* Inference rules:
* <ol><li>In the beginning, it is also {@link #sessionIsolationLevel}.</li>
* <li>A transaction has began with a {@link IsolationLevel}, it will be changed to the value</li>
* <li>The transaction end (commit or rollback), it will recover to {@link #sessionIsolationLevel}.</li></ol>
*/
private volatile IsolationLevel currentIsolationLevel;

/**
* Session isolation level.
*
* <ol><li>It is applied to all subsequent transactions performed within the current session.</li>
* <li>Calls {@link io.r2dbc.spi.Connection#setTransactionIsolationLevel}, it will change to the value.</li>
* <li>It can be changed within transactions, but does not affect the current ongoing transaction.</li></ol>
*/
private volatile IsolationLevel sessionIsolationLevel;

private boolean lockWaitTimeoutSupported = false;

/**
* Current lock wait timeout in seconds.
*/
private volatile Duration currentLockWaitTimeout;

/**
* Session lock wait timeout in seconds.
*/
private volatile Duration sessionLockWaitTimeout;

/**
* Assume that the auto commit is always turned on, it will be set after handshake V10 request message, or OK
* message which means handshake V9 completed.
* <p>
* It would be updated multiple times, so {@code volatile} is required.
*/
private volatile short serverStatuses = ServerStatuses.AUTO_COMMIT;

Expand All @@ -80,18 +118,50 @@ public final class ConnectionContext implements CodecContext {
}

/**
* Initializes this context.
* Initializes handshake information after connection is established.
*
* @param connectionId the connection identifier that is specified by server.
* @param version the server version.
* @param capability the connection capabilities.
*/
void init(int connectionId, ServerVersion version, Capability capability) {
void initHandshake(int connectionId, ServerVersion version, Capability capability) {
this.connectionId = connectionId;
this.serverVersion = version;
this.capability = capability;
}

/**
* Initializes session information after logged-in.
*
* @param prepareCache the prepare cache.
* @param isolationLevel the session isolation level.
* @param lockWaitTimeoutSupported if the server supports lock wait timeout.
* @param lockWaitTimeout the lock wait timeout.
* @param product the server product name.
* @param timeZone the server timezone.
*/
void initSession(
PrepareCache prepareCache,
IsolationLevel isolationLevel,
boolean lockWaitTimeoutSupported,
Duration lockWaitTimeout,
@Nullable String product,
@Nullable ZoneId timeZone
) {
this.prepareCache = prepareCache;
this.currentIsolationLevel = this.sessionIsolationLevel = isolationLevel;
this.lockWaitTimeoutSupported = lockWaitTimeoutSupported;
this.currentLockWaitTimeout = this.sessionLockWaitTimeout = lockWaitTimeout;
this.product = product == null ? "Unknown" : product;

if (timeZone != null) {
if (isTimeZoneInitialized()) {
throw new IllegalStateException("Connection timezone have been initialized");
}
this.timeZone = timeZone;
}
}

/**
* Get the connection identifier that is specified by server.
*
Expand Down Expand Up @@ -128,6 +198,14 @@ public ZoneId getTimeZone() {
return timeZone;
}

String getProduct() {
return product;
}

PrepareCache getPrepareCache() {
return prepareCache;
}

boolean isTimeZoneInitialized() {
return timeZone != null;
}
Expand All @@ -138,13 +216,6 @@ public boolean isMariaDb() {
return (capability != null && capability.isMariaDb()) || serverVersion.isMariaDb();
}

void initTimeZone(ZoneId timeZone) {
if (isTimeZoneInitialized()) {
throw new IllegalStateException("Connection timezone have been initialized");
}
this.timeZone = timeZone;
}

@Override
public ZeroDateOption getZeroDateOption() {
return zeroDateOption;
Expand All @@ -170,19 +241,23 @@ public int getLocalInfileBufferSize() {
}

/**
* Checks if the server supports lock wait timeout.
* Checks if the server supports InnoDB lock wait timeout.
*
* @return if the server supports lock wait timeout.
* @return if the server supports InnoDB lock wait timeout.
*/
public boolean isLockWaitTimeoutSupported() {
return lockWaitTimeoutSupported;
}

/**
* Enables lock wait timeout supported when loading session variables.
* Checks if the server supports statement timeout.
*
* @return if the server supports statement timeout.
*/
void enableLockWaitTimeoutSupported() {
this.lockWaitTimeoutSupported = true;
public boolean isStatementTimeoutSupported() {
boolean isMariaDb = isMariaDb();
return (isMariaDb && serverVersion.isGreaterThanOrEqualTo(MARIA_10_1_1)) ||
(!isMariaDb && serverVersion.isGreaterThanOrEqualTo(MYSQL_5_7_4));
}

/**
Expand All @@ -202,4 +277,48 @@ public short getServerStatuses() {
public void setServerStatuses(short serverStatuses) {
this.serverStatuses = serverStatuses;
}

IsolationLevel getCurrentIsolationLevel() {
return currentIsolationLevel;
}

void setCurrentIsolationLevel(IsolationLevel isolationLevel) {
this.currentIsolationLevel = isolationLevel;
}

void resetCurrentIsolationLevel() {
this.currentIsolationLevel = this.sessionIsolationLevel;
}

IsolationLevel getSessionIsolationLevel() {
return sessionIsolationLevel;
}

void setSessionIsolationLevel(IsolationLevel isolationLevel) {
this.sessionIsolationLevel = isolationLevel;
}

void setCurrentLockWaitTimeout(Duration timeoutSeconds) {
this.currentLockWaitTimeout = timeoutSeconds;
}

void resetCurrentLockWaitTimeout() {
this.currentLockWaitTimeout = this.sessionLockWaitTimeout;
}

boolean isLockWaitTimeoutChanged() {
return currentLockWaitTimeout != sessionLockWaitTimeout;
}

Duration getSessionLockWaitTimeout() {
return sessionLockWaitTimeout;
}

void setAllLockWaitTimeout(Duration timeoutSeconds) {
this.currentLockWaitTimeout = this.sessionLockWaitTimeout = timeoutSeconds;
}

boolean isInTransaction() {
return (serverStatuses & ServerStatuses.IN_TRANSACTION) != 0;
}
}

This file was deleted.

Loading

0 comments on commit 19fdae9

Please sign in to comment.