Skip to content

Commit

Permalink
JCES-1896 Adding isWrite information to API - added isWrite informati…
Browse files Browse the repository at this point in the history
…on on compile lvl to Reason and make it main source of truth for isWriteOperation. Introduced SqlQuery with OO goodness. Only (RW_API_CALL, WRITE_OPERATION) are Write reasons, rest is Read reasons, but might be run on MAIN due to connection state and other condtions
  • Loading branch information
afaruga-atlassian committed Mar 9, 2021
1 parent 5b4f970 commit f807cfc
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 20 deletions.
65 changes: 49 additions & 16 deletions src/main/java/com/atlassian/db/replica/api/reason/Reason.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,30 @@
public final class Reason {
private final String name;
private final boolean isRunOnMain;
private final boolean isWrite;

private Reason(final String name, boolean isRunOnMain) {
private Reason(final String name, boolean isRunOnMain, boolean isWrite) {
this.name = name;
this.isRunOnMain = isRunOnMain;
this.isWrite = isWrite;
}

public static final Reason RW_API_CALL = new Reason("RW_API_CALL", true);
public static final Reason REPLICA_INCONSISTENT = new Reason("REPLICA_INCONSISTENT", true);
public static final Reason READ_OPERATION = new Reason("READ_OPERATION", false);
public static final Reason WRITE_OPERATION = new Reason("WRITE_OPERATION", true);
public static final Reason LOCK = new Reason("LOCK", true);
public static final Reason MAIN_CONNECTION_REUSE = new Reason(
"MAIN_CONNECTION_REUSE",
true
);
public static final Reason HIGH_TRANSACTION_ISOLATION_LEVEL = new Reason(
"HIGH_TRANSACTION_ISOLATION_LEVEL",
true
);
public static final Reason RO_API_CALL = new Reason("RO_API_CALL", false);
public static final Reason RW_API_CALL =
new ReasonBuilder("RW_API_CALL").isRunOnMain(true).isWrite(true).build();
public static final Reason REPLICA_INCONSISTENT =
new ReasonBuilder("REPLICA_INCONSISTENT").isRunOnMain(true).isWrite(false).build();
public static final Reason READ_OPERATION =
new ReasonBuilder("READ_OPERATION").isRunOnMain(false).isWrite(false).build();
public static final Reason WRITE_OPERATION =
new ReasonBuilder("WRITE_OPERATION").isRunOnMain(true).isWrite(true).build();
public static final Reason LOCK =
new ReasonBuilder("LOCK").isRunOnMain(true).isWrite(false).build();
public static final Reason MAIN_CONNECTION_REUSE =
new ReasonBuilder("MAIN_CONNECTION_REUSE").isRunOnMain(true).isWrite(false).build();
public static final Reason HIGH_TRANSACTION_ISOLATION_LEVEL =
new ReasonBuilder("HIGH_TRANSACTION_ISOLATION_LEVEL").isRunOnMain(true).isWrite(false).build();
public static final Reason RO_API_CALL =
new ReasonBuilder("RO_API_CALL").isRunOnMain(false).isWrite(false).build();

public String getName() {
return name;
Expand All @@ -37,12 +41,16 @@ public boolean isRunOnMain() {
return isRunOnMain;
}

public boolean isWrite() {
return isWrite;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Reason reason = (Reason) o;
return isRunOnMain == reason.isRunOnMain && Objects.equals(name, reason.name);
return isRunOnMain == reason.isRunOnMain && isWrite == reason.isWrite && Objects.equals(name, reason.name);
}

@Override
Expand All @@ -55,6 +63,31 @@ public String toString() {
return "Reason{" +
"name='" + name + '\'' +
", isRunOnMain=" + isRunOnMain +
", isWrite=" + isWrite +
'}';
}

private static class ReasonBuilder {
private final String name;
private boolean isRunOnMain = false;
private boolean isWrite = false;

ReasonBuilder(final String name) {
this.name = name;
}

ReasonBuilder isRunOnMain(boolean isRunOnMain) {
this.isRunOnMain = isRunOnMain;
return this;
}

ReasonBuilder isWrite(boolean isWrite) {
this.isWrite = isWrite;
return this;
}

Reason build() {
return new Reason(name, isRunOnMain, isWrite);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ public Optional<RouteDecision> getCause() {
return Optional.ofNullable(cause);
}

/**
* @return True if the database call would always fail on a read replica.
*/
public boolean isWrite() {
return reason.isWrite();
}

/**
* @return Decision whether current route is Run on Main or Replica
*/
public boolean isRunOnMain() {
return reason.isRunOnMain();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
178 changes: 174 additions & 4 deletions src/test/java/com/atlassian/db/replica/api/TestDualConnection.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package com.atlassian.db.replica.api;

import com.atlassian.db.replica.api.mocks.CircularConsistency;
import com.atlassian.db.replica.api.mocks.ConnectionMock;
import com.atlassian.db.replica.api.mocks.ConnectionProviderMock;
import com.atlassian.db.replica.api.mocks.NoOpConnection;
import com.atlassian.db.replica.api.mocks.NoOpConnectionProvider;
import com.atlassian.db.replica.api.mocks.ReadOnlyAwareConnection;
import com.atlassian.db.replica.api.mocks.SingleConnectionProvider;
import com.atlassian.db.replica.api.reason.Reason;
import com.atlassian.db.replica.api.state.State;
import com.atlassian.db.replica.internal.RouteDecisionBuilder;
import com.atlassian.db.replica.spi.DatabaseCall;
import com.atlassian.db.replica.spi.ReplicaConsistency;
import com.atlassian.db.replica.spi.state.StateListener;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.assertj.core.api.Assertions;
import org.junit.Test;
Expand Down Expand Up @@ -40,7 +44,9 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

@SuppressWarnings({"SqlDialectInspection", "SqlNoDataSourceInspection"})
Expand Down Expand Up @@ -103,10 +109,12 @@ public void shouldUseMainConnectionForWrites() throws SQLException {
@Test
public void shouldUseMainConnectionForExecute() throws SQLException {
final ConnectionProviderMock connectionProvider = new ConnectionProviderMock();
final DatabaseCall databaseCall = mock(DatabaseCall.class);
when(databaseCall.call(any(), any())).thenReturn(true);
final Connection connection = DualConnection.builder(
connectionProvider,
permanentConsistency().build()
).build();
).databaseCall(databaseCall).build();

connection.prepareStatement(SIMPLE_QUERY).execute();
connection.prepareStatement(SIMPLE_QUERY).execute(SIMPLE_QUERY);
Expand All @@ -119,6 +127,8 @@ public void shouldUseMainConnectionForExecute() throws SQLException {
.hasSize(1);
assertThat(connectionProvider.getProvidedConnectionTypes())
.containsOnly(MAIN);

verify(databaseCall, times(6)).call(any(), eq(new RouteDecisionBuilder(Reason.RW_API_CALL).sql(SIMPLE_QUERY).build()));
}

@Test
Expand Down Expand Up @@ -286,7 +296,7 @@ public void shouldSupprtCustomReadOnlyFunctions() throws SQLException {
}

@Test
public void shouldDetectWriteOperation() throws SQLException {
public void shouldDetectWriteOperationForSqlFunction() throws SQLException {
final ConnectionProviderMock connectionProvider = new ConnectionProviderMock();
final DatabaseCall databaseCall = mock(DatabaseCall.class);
final Connection connection = DualConnection.builder(
Expand Down Expand Up @@ -759,6 +769,98 @@ public void shouldReuseMainConnection() throws SQLException {
);
}

@Test
public void shouldShowThatReadRunAfterWriteIsRunOnMainNotReplica() throws SQLException {
final ConnectionProviderMock connectionProvider = new ConnectionProviderMock();
final DatabaseCall databaseCall = mock(DatabaseCall.class);
final StateListener stateListener = mock(StateListener.class);
when(databaseCall.call(any(), any())).thenReturn(mock(java.sql.ResultSet.class));
final Connection dualConnection = DualConnection.builder(
connectionProvider,
permanentConsistency().build()
).databaseCall(databaseCall).stateListener(stateListener).build();


dualConnection.prepareStatement(SIMPLE_QUERY).executeQuery();
dualConnection.prepareStatement(SIMPLE_QUERY).executeQuery();
when(databaseCall.call(any(), any())).thenReturn(true);
dualConnection.prepareStatement(SIMPLE_QUERY).execute();
Mockito.reset(databaseCall);
dualConnection.prepareStatement(SIMPLE_QUERY).executeQuery();


verify(stateListener).transition(State.NOT_INITIALISED, State.REPLICA);
verify(stateListener).transition(State.REPLICA, State.MAIN);
verifyNoMoreInteractions(stateListener);
verify(databaseCall).call(
any(),
eq(
new RouteDecisionBuilder(Reason.MAIN_CONNECTION_REUSE)
.sql(SIMPLE_QUERY)
.cause(
new RouteDecisionBuilder(Reason.RW_API_CALL).sql(SIMPLE_QUERY).build()
)
.build()
)
);
}

@Test
public void shouldReuseMainConnectionForNoneWriteAfterInconsistencyWrite() throws SQLException {
final ConnectionProviderMock connectionProvider = new ConnectionProviderMock();
final DatabaseCall databaseCall = mock(DatabaseCall.class);
final StateListener stateListener = mock(StateListener.class);
when(databaseCall.call(any(), any())).thenReturn(mock(java.sql.ResultSet.class));
final Connection dualConnection = DualConnection.builder(
connectionProvider,
new CircularConsistency.Builder(ImmutableList.of(true, false)).build()
).databaseCall(databaseCall).stateListener(stateListener).build();


dualConnection.prepareStatement(SIMPLE_QUERY).executeQuery();
dualConnection.prepareStatement(SIMPLE_QUERY).executeQuery();
when(databaseCall.call(any(), any())).thenReturn(true);
dualConnection.prepareStatement(SIMPLE_QUERY).execute();
Mockito.reset(databaseCall);
when(databaseCall.call(any(), any())).thenReturn(mock(ResultSet.class));
dualConnection.prepareStatement(SIMPLE_QUERY).executeQuery();


verify(stateListener).transition(State.NOT_INITIALISED, State.REPLICA);
verify(stateListener).transition(State.REPLICA, State.COMMITED_MAIN);
verify(stateListener).transition(State.COMMITED_MAIN, State.MAIN);
verifyNoMoreInteractions(stateListener);
verify(databaseCall).call(
any(),
eq(
new RouteDecisionBuilder(Reason.MAIN_CONNECTION_REUSE)
.sql(SIMPLE_QUERY)
.cause(
new RouteDecisionBuilder(Reason.REPLICA_INCONSISTENT).sql(SIMPLE_QUERY).build()
)
.build()
)
);
}

@Test
public void shouldForgiveReplicaIfItCatchesUpOnReads() throws SQLException {
final ConnectionProviderMock connectionProvider = new ConnectionProviderMock();
final StateListener stateListener = mock(StateListener.class);

final Connection connection = DualConnection.builder(
connectionProvider,
new CircularConsistency.Builder(ImmutableList.of(false, true)).build()
).stateListener(stateListener)
.build();
connection.prepareStatement(SIMPLE_QUERY).executeQuery();
Mockito.reset(stateListener);

connection.prepareStatement(SIMPLE_QUERY).executeQuery();

verify(stateListener).transition(State.COMMITED_MAIN, State.REPLICA);
}

@Test
public void shouldUsePrepareNewStatement() throws SQLException {
final ConnectionProviderMock connectionProvider = new ConnectionProviderMock();
Expand Down Expand Up @@ -895,7 +997,7 @@ public void shouldGetNullCatalog() throws SQLException {
}

@Test
public void shouldSetTransactionIsolationLevel() throws SQLException {
public void shouldSetTransactionIsolationLevelForRead() throws SQLException {
final ConnectionProviderMock connectionProvider = new ConnectionProviderMock();
final DatabaseCall databaseCall = mock(DatabaseCall.class);
final Connection connection = DualConnection.builder(
Expand All @@ -914,6 +1016,26 @@ public void shouldSetTransactionIsolationLevel() throws SQLException {
);
}

@Test
public void shouldSetTransactionIsolationLevelForWrite() throws SQLException {
final ConnectionProviderMock connectionProvider = new ConnectionProviderMock();
final DatabaseCall databaseCall = mock(DatabaseCall.class);
final Connection connection = DualConnection.builder(
connectionProvider,
permanentInconsistency().build()
).databaseCall(databaseCall).build();

connection.setTransactionIsolation(TRANSACTION_SERIALIZABLE);
when(databaseCall.call(any(), any())).thenReturn(true);
connection.prepareStatement(SIMPLE_QUERY).execute();

verify(connectionProvider.singleProvidedConnection()).setTransactionIsolation(TRANSACTION_SERIALIZABLE);
verify(databaseCall).call(
any(),
eq(new RouteDecisionBuilder(Reason.RW_API_CALL).sql(SIMPLE_QUERY).build())
);
}

@Test
public void shouldGetTransactionIsolationLevelCallMainDatabase() throws SQLException {
final ConnectionProviderMock connectionProvider = new ConnectionProviderMock();
Expand Down Expand Up @@ -964,7 +1086,7 @@ public void shouldSetTypeMap() throws SQLException {

connection.setTypeMap(typeMap);

assertThat(connection.getTypeMap().keySet()).containsOnly("MyType");
assertThat(connection.getTypeMap()).containsOnlyKeys("MyType");
}

@Test
Expand Down Expand Up @@ -1127,6 +1249,54 @@ public void shouldGetSchema() throws SQLException {
verify(connectionProvider.singleProvidedConnection()).getSchema();
}

@Test
public void shouldGetSchemaAndRunUpdate() throws SQLException {
final DatabaseCall databaseCall = mock(DatabaseCall.class);
when(databaseCall.call(any(), any())).thenReturn(1);
final StateListener stateListener = mock(StateListener.class);
final ConnectionProviderMock connectionProvider = new ConnectionProviderMock();
final Connection connection = DualConnection.builder(
connectionProvider,
permanentConsistency().build()
).stateListener(stateListener).databaseCall(databaseCall).build();

connection.prepareStatement(SIMPLE_QUERY).executeUpdate();
connection.getSchema();
connection.prepareStatement(SIMPLE_QUERY).executeUpdate();

verify(connectionProvider.singleProvidedConnection()).getSchema();
verify(stateListener).transition(State.NOT_INITIALISED, State.MAIN);

verify(databaseCall, times(2)).call(
any(),
eq(new RouteDecisionBuilder(Reason.RW_API_CALL).sql(SIMPLE_QUERY).build())
);
}

@Test
public void shouldGetSchemaAndRunQuery() throws SQLException {
final DatabaseCall databaseCall = mock(DatabaseCall.class);
when(databaseCall.call(any(), any())).thenReturn(mock(ResultSet.class));
final StateListener stateListener = mock(StateListener.class);
final ConnectionProviderMock connectionProvider = new ConnectionProviderMock();
final Connection connection = DualConnection.builder(
connectionProvider,
permanentConsistency().build()
).stateListener(stateListener).databaseCall(databaseCall).build();

connection.prepareStatement(SIMPLE_QUERY).executeQuery();
verify(stateListener).transition(State.NOT_INITIALISED, State.REPLICA);
connection.getSchema();
connection.prepareStatement(SIMPLE_QUERY).executeQuery();
verifyNoMoreInteractions(stateListener);
verify(connectionProvider.singleProvidedConnection()).getSchema();

verify(databaseCall, times(2)).call(
any(),
eq(new RouteDecisionBuilder(Reason.READ_OPERATION).sql(SIMPLE_QUERY).build())
);
}

@Test
public void shouldUnwrapConnection() throws SQLException {
final Connection dualConnection = DualConnection
Expand Down

0 comments on commit f807cfc

Please sign in to comment.