Skip to content

Commit

Permalink
- Deprecate indexExists operation
Browse files Browse the repository at this point in the history
- Throw IndexAlreadyExistsException, IndexNotFoundException for index related error codes
- Add tests for AerospikeTemplate index create/delete operations
  • Loading branch information
Anastasiia Smirnova authored and Aloren committed Dec 6, 2019
1 parent cca766c commit 252df08
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.springframework.data.aerospike;

import org.springframework.dao.InvalidDataAccessResourceUsageException;

public class IndexAlreadyExistsException extends InvalidDataAccessResourceUsageException {

public IndexAlreadyExistsException(String msg, Throwable cause) {
super(msg, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.springframework.data.aerospike;

import org.springframework.dao.InvalidDataAccessResourceUsageException;

public class IndexNotFoundException extends InvalidDataAccessResourceUsageException {

public IndexNotFoundException(String msg, Throwable cause) {
super(msg, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.function.Supplier;
import java.util.stream.Stream;

import org.springframework.data.aerospike.IndexAlreadyExistsException;
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.data.domain.Sort;
import org.springframework.data.mapping.context.MappingContext;
Expand Down Expand Up @@ -181,7 +182,11 @@ <T> void createIndex(Class<T> entityClass, String indexName, String binName,
* Checks whether index by specified name exists in Aerospike.
* @param indexName
* @return true if exists
* @deprecated This operation is deprecated due to complications that are required for guaranteed index existence response.
* <p>If you need to conditionally create index \u2014 replace {@link #indexExists} with {@link #createIndex} and catch {@link IndexAlreadyExistsException}.
* <p>More information can be found at: <a href="https://github.com/aerospike/aerospike-client-java/pull/149">https://github.com/aerospike/aerospike-client-java/pull/149</a>
*/
@Deprecated
boolean indexExists(String indexName);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,13 @@ public <T> void deleteIndex(Class<T> entityClass, String indexName) {
@Override
public boolean indexExists(String indexName) {
Assert.notNull(indexName, "Index name must not be null!");
log.warn("`indexExists` operation is deprecated. Please stop using it as it will be removed in next major release.");

//TODO: should be moved to aerospike-client (https://github.com/aerospike/aerospike-client-java/pull/149)
try {
Node[] nodes = client.getNodes();
if (nodes.length == 0) {
throw new AerospikeException(ResultCode.SERVER_NOT_AVAILABLE, "Command failed because cluster is empty.");
}
//TODO: get random node
Node node = nodes[0];
String response = Info.request(node, "sindex/" + namespace + '/' + indexName);
return !response.startsWith("FAIL:201");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataRetrievalFailureException;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.dao.RecoverableDataAccessException;
import org.springframework.dao.TransientDataAccessResourceException;
import org.springframework.data.aerospike.IndexAlreadyExistsException;
import org.springframework.data.aerospike.IndexNotFoundException;

/**
* @author Peter Milne
Expand Down Expand Up @@ -59,8 +60,9 @@ public DataAccessException translateExceptionIfPossible(RuntimeException cause)
case ResultCode.KEY_NOT_FOUND_ERROR:
return new DataRetrievalFailureException(msg, cause);
case ResultCode.INDEX_NOTFOUND:
return new IndexNotFoundException(msg, cause);
case ResultCode.INDEX_ALREADY_EXISTS:
return new InvalidDataAccessResourceUsageException(msg, cause);
return new IndexAlreadyExistsException(msg, cause);
case ResultCode.TIMEOUT:
case ResultCode.QUERY_TIMEOUT:
return new QueryTimeoutException(msg, cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.springframework.data.aerospike.repository;

import com.aerospike.client.query.IndexType;
import org.springframework.data.aerospike.IndexAlreadyExistsException;
import org.springframework.data.repository.NoRepositoryBean;
import org.springframework.data.repository.PagingAndSortingRepository;
import org.springframework.data.repository.Repository;
Expand All @@ -34,6 +35,15 @@ public interface AerospikeRepository<T, ID> extends PagingAndSortingRepository<T

<T> void deleteIndex(Class<T> domainType, String indexName);

/**
* Checks whether index by specified name exists in Aerospike.
* @param indexName
* @return true if exists
* @deprecated This operation is deprecated due to complications that are required for guaranteed index existence response.
* <p>If you need to conditionally create index \u2014 replace {@link #indexExists} with {@link #createIndex} and catch {@link IndexAlreadyExistsException}.
* <p>More information can be found at: <a href="https://github.com/aerospike/aerospike-client-java/pull/149">https://github.com/aerospike/aerospike-client-java/pull/149</a>
*/
@Deprecated
boolean indexExists(String indexName);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.springframework.data.aerospike;

import org.awaitility.core.ThrowingRunnable;

import static org.awaitility.Awaitility.await;
import static org.awaitility.Durations.TEN_SECONDS;

public class AwaitilityUtils {

public static void awaitTenSecondsUntil(ThrowingRunnable runnable) {
await().atMost(TEN_SECONDS)
.untilAsserted(runnable);
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
package org.springframework.data.aerospike;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Info;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.ResultCode;
import com.aerospike.client.cluster.Node;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.task.IndexTask;
import lombok.RequiredArgsConstructor;
import org.awaitility.Awaitility;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.aerospike.core.AerospikeTemplate;

import java.time.Duration;
Expand Down Expand Up @@ -46,12 +48,27 @@ public <T> boolean isEmptySet(IAerospikeClient client, String namespace, Class<T
|| Stream.of(answer.split(";")).allMatch(s -> s.contains("objects=0"));
}

public <T> void createIndexIfNotExists(Class<T> domainType, String indexName, String binName, IndexType indexType) {
try {
template.createIndex(domainType, indexName, binName, indexType);
} catch (InvalidDataAccessResourceUsageException e) {
// ignore: index already exists
public <T> void createIndexIfNotExists(Class<T> entityClass, String indexName, String binName, IndexType indexType) {
ignoreError(ResultCode.INDEX_ALREADY_EXISTS,
() -> wait(client.createIndex(null, template.getNamespace(), template.getSetName(entityClass), indexName, binName, indexType)));
}

public <T> void dropIndexIfExists(Class<T> entityClass, String indexName) {
ignoreError(ResultCode.INDEX_NOTFOUND,
() -> wait(client.dropIndex(null, template.getNamespace(), template.getSetName(entityClass), indexName)));
}

// Do not use this code in production!
// This will not guarantee the correct answer from Aerospike Server for all cases.
// Also it requests index status only from one Aerospike node, which is OK for tests, and NOT OK for Production cluster.
public boolean indexExists(String indexName) {
Node[] nodes = client.getNodes();
if (nodes.length == 0) {
throw new AerospikeException(ResultCode.SERVER_NOT_AVAILABLE, "Command failed because cluster is empty.");
}
Node node = nodes[0];
String response = Info.request(node, "sindex/" + template.getNamespace() + '/' + indexName);
return !response.startsWith("FAIL:201");
}

public void addNewFieldToSavedDataInAerospike(Key key) {
Expand All @@ -68,4 +85,21 @@ public void addNewFieldToSavedDataInAerospike(Key key) {
assertThat(updated.bins.get("notPresent")).isEqualTo("cats");
}

private static void wait(IndexTask task) {
if (task == null) {
throw new IllegalStateException("task can not be null");
}
task.waitTillComplete();
}

private void ignoreError(int errorCodeToSkip, Runnable runnable) {
try {
runnable.run();
} catch (AerospikeException e) {
if (e.getResultCode() != errorCodeToSkip) {
throw e;
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package org.springframework.data.aerospike.core;

import com.aerospike.client.query.IndexType;
import lombok.Value;
import org.junit.Before;
import org.junit.Test;
import org.springframework.data.aerospike.AsyncUtils;
import org.springframework.data.aerospike.BaseBlockingIntegrationTests;
import org.springframework.data.aerospike.IndexAlreadyExistsException;
import org.springframework.data.aerospike.IndexNotFoundException;
import org.springframework.data.aerospike.mapping.Document;

import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.springframework.data.aerospike.AwaitilityUtils.awaitTenSecondsUntil;

public class AerospikeTemplateIndexTests extends BaseBlockingIntegrationTests {

private static final String INDEX_TEST_1 = "index-test-77777";

@Override
@Before
public void setUp() {
blockingAerospikeTestOperations.dropIndexIfExists(IndexedDocument.class, INDEX_TEST_1);
}

@Test
public void createIndex_createsIndexIfExecutedConcurrently() throws Exception {
AtomicInteger errors = new AtomicInteger();
AsyncUtils.executeConcurrently(5, () -> {
try {
template.createIndex(IndexedDocument.class, INDEX_TEST_1, "stringField", IndexType.STRING);
} catch (IndexAlreadyExistsException e) {
errors.incrementAndGet();
}
});

awaitTenSecondsUntil(() ->
assertThat(blockingAerospikeTestOperations.indexExists(INDEX_TEST_1)).isTrue());
assertThat(errors.get()).isLessThanOrEqualTo(4);// depending on the timing all 5 requests can succeed on Aerospike Server
}

@Test
public void createIndex_allCreateIndexConcurrentAttemptsFailIfIndexAlreadyExists() throws Exception {
template.createIndex(IndexedDocument.class, INDEX_TEST_1, "stringField", IndexType.STRING);

awaitTenSecondsUntil(() ->
assertThat(blockingAerospikeTestOperations.indexExists(INDEX_TEST_1)).isTrue());

AtomicInteger errors = new AtomicInteger();
AsyncUtils.executeConcurrently(5, () -> {
try {
template.createIndex(IndexedDocument.class, INDEX_TEST_1, "stringField", IndexType.STRING);
} catch (IndexAlreadyExistsException e) {
errors.incrementAndGet();
}
});

assertThat(errors.get()).isEqualTo(5);
}

@Test
public void createIndex_createsIndex() {
template.createIndex(IndexedDocument.class, INDEX_TEST_1, "stringField", IndexType.STRING);

awaitTenSecondsUntil(() ->
assertThat(blockingAerospikeTestOperations.indexExists(INDEX_TEST_1)).isTrue());
}

@Test
public void createIndex_throwsExceptionIfIndexAlreadyExists() {
template.createIndex(IndexedDocument.class, INDEX_TEST_1, "stringField", IndexType.STRING);

awaitTenSecondsUntil(() -> assertThat(blockingAerospikeTestOperations.indexExists(INDEX_TEST_1)).isTrue());

assertThatThrownBy(() -> template.createIndex(IndexedDocument.class, INDEX_TEST_1, "stringField", IndexType.STRING))
.isInstanceOf(IndexAlreadyExistsException.class);
}

@Test
public void deleteIndex_throwsExceptionIfIndexDoesNotExist() {
assertThatThrownBy(() -> template.deleteIndex(IndexedDocument.class, "not-existing-index"))
.isInstanceOf(IndexNotFoundException.class);
}

@Test
public void deleteIndex_deletesExistingIndex() {
template.createIndex(IndexedDocument.class, INDEX_TEST_1, "stringField", IndexType.STRING);

awaitTenSecondsUntil(() -> assertThat(blockingAerospikeTestOperations.indexExists(INDEX_TEST_1)).isTrue());

template.deleteIndex(IndexedDocument.class, INDEX_TEST_1);

awaitTenSecondsUntil(() -> assertThat(blockingAerospikeTestOperations.indexExists(INDEX_TEST_1)).isFalse());
}

@Value
@Document
public static class IndexedDocument {

String stringField;
int intField;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.springframework.dao.QueryTimeoutException;
import org.springframework.dao.RecoverableDataAccessException;
import org.springframework.dao.TransientDataAccessResourceException;
import org.springframework.data.aerospike.IndexAlreadyExistsException;
import org.springframework.data.aerospike.IndexNotFoundException;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -87,6 +89,20 @@ public void shouldTranslateQueryKeyBusyError() {
assertThat(actual).isExactlyInstanceOf(TransientDataAccessResourceException.class);
}

@Test
public void shouldTranslateIndexAlreadyExistsError() {
AerospikeException cause = new AerospikeException(ResultCode.INDEX_ALREADY_EXISTS);
DataAccessException actual = translator.translateExceptionIfPossible(cause);
assertThat(actual).isExactlyInstanceOf(IndexAlreadyExistsException.class);
}

@Test
public void shouldTranslateIndexNotFoundError() {
AerospikeException cause = new AerospikeException(ResultCode.INDEX_NOTFOUND);
DataAccessException actual = translator.translateExceptionIfPossible(cause);
assertThat(actual).isExactlyInstanceOf(IndexNotFoundException.class);
}

@Test
public void shouldTranslateConnectErrorToAerospike() {
AerospikeException.Connection cause = new AerospikeException.Connection("msg");
Expand Down

0 comments on commit 252df08

Please sign in to comment.