Skip to content

Commit

Permalink
Storing a userkey twice fix (#275)
Browse files Browse the repository at this point in the history
* Fix: storing a user key twice.
#154
* Do not store "@user_key" bin, instead use client's sendKey(true).
* sendKey should be configurable variable in the DataSettings.
  • Loading branch information
roimenashe authored Aug 25, 2021
1 parent aff41fe commit 77db6cf
Show file tree
Hide file tree
Showing 16 changed files with 77 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.springframework.data.aerospike.convert.AerospikeConverter;
import org.springframework.data.aerospike.convert.AerospikeReadData;
import org.springframework.data.aerospike.convert.AerospikeWriteData;
import org.springframework.data.aerospike.core.WritePolicyBuilder;

import java.util.Objects;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -54,11 +55,13 @@ public AerospikeCache(String name,
this.client = client;
this.aerospikeConverter = aerospikeConverter;
this.cacheConfiguration = cacheConfiguration;
this.createOnly = new WritePolicy(client.getWritePolicyDefault());
this.createOnly.recordExistsAction = RecordExistsAction.CREATE_ONLY;
this.createOnly.expiration = cacheConfiguration.getExpirationInSeconds();
this.writePolicyForPut = new WritePolicy(client.getWritePolicyDefault());
this.writePolicyForPut.expiration = cacheConfiguration.getExpirationInSeconds();
this.createOnly = WritePolicyBuilder.builder(client.getWritePolicyDefault())
.recordExistsAction(RecordExistsAction.CREATE_ONLY)
.expiration(cacheConfiguration.getExpirationInSeconds())
.build();
this.writePolicyForPut = WritePolicyBuilder.builder(client.getWritePolicyDefault())
.expiration(cacheConfiguration.getExpirationInSeconds())
.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,14 @@ protected AerospikeDataSettings aerospikeDataSettings() {

protected void configureDataSettings(AerospikeDataSettings.AerospikeDataSettingsBuilder builder) {
builder.scansEnabled(false);
builder.sendKey(true);
}

protected ClientPolicy getClientPolicy() {
ClientPolicy clientPolicy = new ClientPolicy();
clientPolicy.failIfNotConnected = true;
clientPolicy.timeout = 10_000;
clientPolicy.writePolicyDefault.sendKey = aerospikeDataSettings().isSendKey();
return clientPolicy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class AerospikeDataSettings {

@Builder.Default
boolean scansEnabled = false;
@Builder.Default
boolean sendKey = true;

/*
* (non-Javadoc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,5 @@
*/
public interface AerospikeMetaData {

//sometimes aerospike does not retrieve userKey
//so we need to save it as a Bin
//see https://github.com/aerospike/aerospike-client-java/issues/77
String USER_KEY = "@user_key";

String PRIMARY_KEY = "PK";
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import java.util.Collections;
import java.util.Map;

import static org.springframework.data.aerospike.convert.AerospikeMetaData.USER_KEY;
import static org.springframework.data.aerospike.convert.AerospikeMetaData.PRIMARY_KEY;
import static org.springframework.data.aerospike.utility.TimeUtils.offsetInSecondsToUnixTime;

public class MappingAerospikeReadConverter implements EntityReader<Object, AerospikeReadData> {
Expand Down Expand Up @@ -97,7 +97,7 @@ public <R> R read(Class<R> targetClass, final AerospikeReadData data) {
@SuppressWarnings("unchecked")
private <T> T getIdValue(Key key, Map<String, Object> data, AerospikePersistentProperty property) {
Value userKey = key.userKey;
Object value = userKey == null ? data.get(USER_KEY) : userKey.getObject();
Object value = userKey == null ? data.get(PRIMARY_KEY) : userKey.getObject();
Assert.notNull(value, "Id must not be null!");
return (T) convertIfNeeded(value, property.getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static org.springframework.data.aerospike.convert.AerospikeMetaData.USER_KEY;
import static org.springframework.data.aerospike.utility.TimeUtils.unixTimeToOffsetInSeconds;

public class MappingAerospikeWriteConverter implements EntityWriter<Object, AerospikeWriteData> {
Expand Down Expand Up @@ -80,7 +79,6 @@ public void write(Object source, final AerospikeWriteData data) {
Assert.notNull(id, "Id must not be null!");

data.setKey(new Key(data.getKey().namespace, entity.getSetName(), id));
data.addBin(USER_KEY, id);
}

AerospikePersistentProperty versionProperty = entity.getVersionProperty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ public AerospikeKeyValueAdapter(IAerospikeClient client, AerospikeConverter conv
this.client = client;
this.converter = converter;
this.namespace = namespace;
this.insertPolicy = new WritePolicy(this.client.getWritePolicyDefault());
this.updatePolicy = new WritePolicy(this.client.getWritePolicyDefault());
this.insertPolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
this.updatePolicy.recordExistsAction = RecordExistsAction.UPDATE_ONLY;
this.insertPolicy = WritePolicyBuilder.builder(client.getWritePolicyDefault())
.recordExistsAction(RecordExistsAction.CREATE_ONLY)
.build();
this.updatePolicy = WritePolicyBuilder.builder(client.getWritePolicyDefault())
.recordExistsAction(RecordExistsAction.UPDATE_ONLY)
.build();
}

/*
* (non-Javadoc)
* @see org.springframework.data.keyvalue.core.KeyValueAdapter#put(java.io.Serializable, java.lang.Object, java.io.Serializable)
Expand Down Expand Up @@ -108,9 +110,10 @@ public Object delete(Object id, String keyspace) {
Key key = new Key(namespace, keyspace, id.toString());
Object object = get(id, keyspace);
if (object != null) {
WritePolicy wp = new WritePolicy();
wp.recordExistsAction = RecordExistsAction.UPDATE_ONLY;
client.delete(wp, key);
WritePolicy writePolicy = WritePolicyBuilder.builder(client.getWritePolicyDefault())
.recordExistsAction(RecordExistsAction.UPDATE_ONLY)
.build();
client.delete(writePolicy, key);
}
return object;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,9 @@ record = this.client.get(null, key);
}

private Record getAndTouch(Key key, int expiration) {
WritePolicy writePolicy = new WritePolicy(client.getWritePolicyDefault());
writePolicy.expiration = expiration;
WritePolicy writePolicy = WritePolicyBuilder.builder(client.getWritePolicyDefault())
.expiration(expiration)
.build();

if (this.client.exists(null, key)) {
return this.client.operate(writePolicy, key, Operation.touch(), Operation.get());
Expand Down Expand Up @@ -542,8 +543,9 @@ public <T> T add(T objectToAddTo, Map<String, Long> values) {
AerospikeWriteData data = writeData(objectToAddTo);
Operation[] ops = operations(values, Operation.Type.ADD, Operation.get());

WritePolicy writePolicy = new WritePolicy(this.client.getWritePolicyDefault());
writePolicy.expiration = data.getExpiration();
WritePolicy writePolicy = WritePolicyBuilder.builder(client.getWritePolicyDefault())
.expiration(data.getExpiration())
.build();

Record record = this.client.operate(writePolicy, data.getKey(), ops);

Expand All @@ -561,8 +563,9 @@ public <T> T add(T objectToAddTo, String binName, long value) {
try {
AerospikeWriteData data = writeData(objectToAddTo);

WritePolicy writePolicy = new WritePolicy(this.client.getWritePolicyDefault());
writePolicy.expiration = data.getExpiration();
WritePolicy writePolicy = WritePolicyBuilder.builder(client.getWritePolicyDefault())
.expiration(data.getExpiration())
.build();

Record record = this.client.operate(writePolicy, data.getKey(),
Operation.add(new Bin(binName, value)), Operation.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ WritePolicy expectGenerationSavePolicy(AerospikeWriteData data, RecordExistsActi
return WritePolicyBuilder.builder(this.writePolicyDefault)
.generationPolicy(GenerationPolicy.EXPECT_GEN_EQUAL)
.generation(data.getVersion().orElse(0))
.sendKey(true)
.expiration(data.getExpiration())
.recordExistsAction(recordExistsAction)
.build();
Expand All @@ -223,7 +222,6 @@ WritePolicy expectGenerationSavePolicy(AerospikeWriteData data, RecordExistsActi
WritePolicy ignoreGenerationSavePolicy(AerospikeWriteData data, RecordExistsAction recordExistsAction) {
return WritePolicyBuilder.builder(this.writePolicyDefault)
.generationPolicy(GenerationPolicy.NONE)
.sendKey(true)
.expiration(data.getExpiration())
.recordExistsAction(recordExistsAction)
.build();
Expand Down Expand Up @@ -274,5 +272,4 @@ private <T> List<Key> toKeysList(Class<T> entityClass, Collection<?> ids) {
private <S> S convertIfNecessary(Object source, Class<S> type) {
return type.isAssignableFrom(source.getClass()) ? (S) source : converter.getConversionService().convert(source, type);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ public <T> Mono<T> add(T objectToAddTo, Map<String, Long> values) {
}
operations[x] = Operation.get();

WritePolicy writePolicy = new WritePolicy(this.writePolicyDefault);
writePolicy.expiration = data.getExpiration();
WritePolicy writePolicy = WritePolicyBuilder.builder(this.writePolicyDefault)
.expiration(data.getExpiration())
.build();

return executeOperationsOnValue(objectToAddTo, data, operations, writePolicy);
}
Expand All @@ -169,8 +170,9 @@ public <T> Mono<T> add(T objectToAddTo, String binName, long value) {

AerospikeWriteData data = writeData(objectToAddTo);

WritePolicy writePolicy = new WritePolicy(this.writePolicyDefault);
writePolicy.expiration = data.getExpiration();
WritePolicy writePolicy = WritePolicyBuilder.builder(this.writePolicyDefault)
.expiration(data.getExpiration())
.build();

Operation[] operations = {Operation.add(new Bin(binName, value)), Operation.get(binName)};
return executeOperationsOnValue(objectToAddTo, data, operations, writePolicy);
Expand Down Expand Up @@ -466,9 +468,10 @@ private <T> Mono<Record> putAndGetHeader(AerospikeWriteData data, WritePolicy po
}

private Mono<KeyRecord> getAndTouch(Key key, int expiration) {
WritePolicy policy = new WritePolicy(writePolicyDefault);
policy.expiration = expiration;
return reactorClient.operate(policy, key, Operation.touch(), Operation.get());
WritePolicy writePolicy = WritePolicyBuilder.builder(this.writePolicyDefault)
.expiration(expiration)
.build();
return reactorClient.operate(writePolicy, key, Operation.touch(), Operation.get());
}

private Throwable translateError(Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import lombok.Value;
import lombok.*;
import org.awaitility.Awaitility;
import org.springframework.data.aerospike.core.WritePolicyBuilder;
import org.springframework.data.aerospike.query.cache.IndexInfoParser;
import org.springframework.data.aerospike.query.model.Index;
import org.springframework.data.aerospike.utility.ResponseUtils;
Expand Down Expand Up @@ -129,8 +130,10 @@ public void addNewFieldToSavedDataInAerospike(Key key) {
Bin[] bins = Stream.concat(
initial.bins.entrySet().stream().map(e -> new Bin(e.getKey(), e.getValue())),
Stream.of(new Bin("notPresent", "cats"))).toArray(Bin[]::new);
WritePolicy policy = new WritePolicy();
policy.recordExistsAction = RecordExistsAction.REPLACE;

WritePolicy policy = WritePolicyBuilder.builder(client.getWritePolicyDefault())
.recordExistsAction(RecordExistsAction.REPLACE)
.build();

client.put(policy, key, bins);

Expand All @@ -155,5 +158,4 @@ public static class ScanJob {
@NonNull
String status;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.springframework.data.aerospike;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class PoliciesVerificationTests extends BaseBlockingIntegrationTests {
@Test
public void sendKeyShouldBeTrueByDefault() {
assertThat(client.getWritePolicyDefault().sendKey).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,6 @@ public void shouldWriteAndReadIfTypeKeyIsNull() {
converter.write(user, forWrite);

assertThat(forWrite.getKey()).consistsOf(NAMESPACE, SIMPLESET3, user.getId());
assertThat(forWrite.getBins()).containsOnly(
new Bin("@user_key", "678")
);

Map<String, Object> bins = of("@user_key", "678");
User read = converter.read(User.class, AerospikeReadData.forRead(forWrite.getKey(), record(bins)));

assertThat(read).isEqualTo(user);
}

@Test
Expand Down Expand Up @@ -287,7 +279,6 @@ public void shouldNotWriteVersionToBins() {
converter.write(new VersionedClass("id", "data", 42L), forWrite);

assertThat(forWrite.getBins()).containsOnly(
new Bin("@user_key", "id"),
new Bin("@_class", VersionedClass.class.getName()),
new Bin("field", "data")
);
Expand Down
Loading

0 comments on commit 77db6cf

Please sign in to comment.