Skip to content

Commit

Permalink
FMWK-279 Refactor batch write API (#678)
Browse files Browse the repository at this point in the history
Co-authored-by: yrizhkov <yrizhkov@aerospike.com>
  • Loading branch information
agrgr and reugn authored Dec 19, 2023
1 parent 4902235 commit 236c665
Show file tree
Hide file tree
Showing 99 changed files with 1,311 additions and 676 deletions.
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,10 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>**/*Test.java</include>
<include>**/*Tests.java</include>
</includes>
<argLine>--add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.math=ALL-UNNAMED
--add-opens java.base/java.net=ALL-UNNAMED
--add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED</argLine>
Expand Down
18 changes: 16 additions & 2 deletions src/main/asciidoc/reference/configure-data-settings.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ApplicationConfig extends AbstractAerospikeDataConfiguration {
@Override
protected void configureDataSettings(AerospikeDataSettings.AerospikeDataSettingsBuilder builder) {
builder.createIndexesOnStartup(true);
builder.indexCacheRefreshFrequencySeconds(3600);
builder.indexCacheRefreshSeconds(3600);
builder.queryMaxRecords(10000L);
}
}
Expand Down Expand Up @@ -58,19 +58,33 @@ Create secondary indexes specified using `@Indexed` annotation on startup.
*Default*: `true`.

[[configure-data-settings.index-cache-refresh-frequency-seconds]]
=== indexCacheRefreshFrequencySeconds
=== indexCacheRefreshSeconds

Automatically refresh indexes cache every <N> seconds.

*Default*: `3600`.

[[configure-data-settings.server-version-refresh-frequency-seconds]]
=== serverVersionRefreshSeconds

Automatically refresh cached server version every <N> seconds.

*Default*: `3600`.

[[configure-data-settings.query-max-records]]
=== queryMaxRecords

Limit amount of results returned by server. Non-positive value means no limit.

*Default*: `10 000`.

[[configure-data-settings.batch-write-size]]
=== batchWriteSize

Maximum batch size for batch write operations. Non-positive value means no limit.

*Default*: `100`.

[[configure-data-settings.keep-original-key-types]]
=== keepOriginalKeyTypes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.springframework.data.aerospike.query.cache.IndexRefresher;
import org.springframework.data.aerospike.query.cache.IndexesCacheUpdater;
import org.springframework.data.aerospike.query.cache.InternalIndexOperations;
import org.springframework.data.aerospike.server.version.ServerVersionSupport;

@Slf4j
@Configuration
Expand All @@ -43,9 +44,10 @@ public AerospikeTemplate aerospikeTemplate(IAerospikeClient aerospikeClient,
MappingAerospikeConverter mappingAerospikeConverter,
AerospikeMappingContext aerospikeMappingContext,
AerospikeExceptionTranslator aerospikeExceptionTranslator,
QueryEngine queryEngine, IndexRefresher indexRefresher) {
QueryEngine queryEngine, IndexRefresher indexRefresher,
ServerVersionSupport serverVersionSupport) {
return new AerospikeTemplate(aerospikeClient, nameSpace(), mappingAerospikeConverter,
aerospikeMappingContext, aerospikeExceptionTranslator, queryEngine, indexRefresher);
aerospikeMappingContext, aerospikeExceptionTranslator, queryEngine, indexRefresher, serverVersionSupport);
}

@Bean(name = "aerospikeQueryEngine")
Expand Down Expand Up @@ -75,22 +77,23 @@ public AerospikePersistenceEntityIndexCreator aerospikePersistenceEntityIndexCre
}

@Bean(name = "aerospikeIndexRefresher")
public IndexRefresher indexRefresher(IAerospikeClient aerospikeClient, IndexesCacheUpdater indexesCacheUpdater) {
public IndexRefresher indexRefresher(IAerospikeClient aerospikeClient, IndexesCacheUpdater indexesCacheUpdater,
ServerVersionSupport serverVersionSupport) {
IndexRefresher refresher = new IndexRefresher(aerospikeClient, aerospikeClient.getInfoPolicyDefault(),
new InternalIndexOperations(new IndexInfoParser()), indexesCacheUpdater);
new InternalIndexOperations(new IndexInfoParser()), indexesCacheUpdater, serverVersionSupport);
refresher.refreshIndexes();
int refreshFrequency = aerospikeDataSettings().getIndexCacheRefreshFrequencySeconds();
int refreshFrequency = aerospikeDataSettings().getIndexCacheRefreshSeconds();
processCacheRefreshFrequency(refreshFrequency, refresher);
log.debug("AerospikeDataSettings.indexCacheRefreshFrequencySeconds: {}", refreshFrequency);
log.debug("AerospikeDataSettings.indexCacheRefreshSeconds: {}", refreshFrequency);
return refresher;
}

private void processCacheRefreshFrequency(int indexCacheRefreshFrequencySeconds, IndexRefresher indexRefresher) {
if (indexCacheRefreshFrequencySeconds <= 0) {
private void processCacheRefreshFrequency(int indexCacheRefreshSeconds, IndexRefresher indexRefresher) {
if (indexCacheRefreshSeconds <= 0) {
log.info("Periodic index cache refreshing is not scheduled, interval ({}) is <= 0",
indexCacheRefreshFrequencySeconds);
indexCacheRefreshSeconds);
} else {
indexRefresher.scheduleRefreshIndexes(indexCacheRefreshFrequencySeconds);
indexRefresher.scheduleRefreshIndexes(indexCacheRefreshSeconds);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.springframework.data.aerospike.query.cache.IndexesCacheUpdater;
import org.springframework.data.aerospike.query.cache.InternalIndexOperations;
import org.springframework.data.aerospike.query.cache.ReactorIndexRefresher;
import org.springframework.data.aerospike.server.version.ServerVersionSupport;

/**
* Configuration with beans needed for reactive stuff
Expand All @@ -53,10 +54,11 @@ public ReactiveAerospikeTemplate reactiveAerospikeTemplate(MappingAerospikeConve
AerospikeExceptionTranslator aerospikeExceptionTranslator,
IAerospikeReactorClient aerospikeReactorClient,
ReactorQueryEngine reactorQueryEngine,
ReactorIndexRefresher reactorIndexRefresher) {
ReactorIndexRefresher reactorIndexRefresher,
ServerVersionSupport serverVersionSupport) {
return new ReactiveAerospikeTemplate(aerospikeReactorClient, nameSpace(), mappingAerospikeConverter,
aerospikeMappingContext,
aerospikeExceptionTranslator, reactorQueryEngine, reactorIndexRefresher);
aerospikeMappingContext, aerospikeExceptionTranslator, reactorQueryEngine, reactorIndexRefresher,
serverVersionSupport);
}

@Bean(name = "reactiveAerospikeQueryEngine")
Expand All @@ -76,10 +78,11 @@ public ReactorQueryEngine reactorQueryEngine(IAerospikeReactorClient aerospikeRe

@Bean(name = "reactiveAerospikeIndexRefresher")
public ReactorIndexRefresher reactorIndexRefresher(IAerospikeReactorClient aerospikeReactorClient,
IndexesCacheUpdater indexesCacheUpdater) {
IndexesCacheUpdater indexesCacheUpdater,
ServerVersionSupport serverVersionSupport) {
ReactorIndexRefresher refresher = new ReactorIndexRefresher(aerospikeReactorClient,
aerospikeReactorClient.getInfoPolicyDefault(),
new InternalIndexOperations(new IndexInfoParser()), indexesCacheUpdater);
new InternalIndexOperations(new IndexInfoParser()), indexesCacheUpdater, serverVersionSupport);
refresher.refreshIndexes().block();
return refresher;
}
Expand All @@ -103,8 +106,7 @@ protected ClientPolicy getClientPolicy() {
public ReactiveAerospikePersistenceEntityIndexCreator aerospikePersistenceEntityIndexCreator(
ObjectProvider<AerospikeMappingContext> aerospikeMappingContext,
AerospikeIndexResolver aerospikeIndexResolver,
ObjectProvider<ReactiveAerospikeTemplate> template)
{
ObjectProvider<ReactiveAerospikeTemplate> template) {
boolean indexesOnStartup = aerospikeDataSettings().isCreateIndexesOnStartup();
log.debug("AerospikeDataSettings.indexesOnStartup: {}", indexesOnStartup);
return new ReactiveAerospikePersistenceEntityIndexCreator(aerospikeMappingContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Host;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.policy.ClientPolicy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.BeanDefinition;
Expand All @@ -37,6 +38,7 @@
import org.springframework.data.aerospike.query.StatementBuilder;
import org.springframework.data.aerospike.query.cache.IndexesCache;
import org.springframework.data.aerospike.query.cache.IndexesCacheHolder;
import org.springframework.data.aerospike.server.version.ServerVersionSupport;
import org.springframework.data.annotation.Persistent;
import org.springframework.data.mapping.model.FieldNamingStrategy;
import org.springframework.data.mapping.model.PropertyNameFieldNamingStrategy;
Expand Down Expand Up @@ -119,6 +121,24 @@ public AerospikeIndexResolver aerospikeIndexResolver() {
return new AerospikeIndexResolver();
}

@Bean(name = "aerospikeServerVersionSupport")
public ServerVersionSupport serverVersionSupport(IAerospikeClient aerospikeClient) {
ServerVersionSupport serverVersionSupport = new ServerVersionSupport(aerospikeClient);
int serverVersionRefreshFrequency = aerospikeDataSettings().getServerVersionRefreshSeconds();
processServerVersionRefreshFrequency(serverVersionRefreshFrequency, serverVersionSupport);
return serverVersionSupport;
}

private void processServerVersionRefreshFrequency(int serverVersionRefreshSeconds,
ServerVersionSupport serverVersionSupport) {
if (serverVersionRefreshSeconds <= 0) {
log.info("Periodic server version refreshing is not scheduled, interval ({}) is <= 0",
serverVersionRefreshSeconds);
} else {
serverVersionSupport.scheduleServerVersionRefresh(serverVersionRefreshSeconds);
}
}

protected Set<Class<?>> getInitialEntitySet() throws ClassNotFoundException {
String basePackage = getMappingBasePackage();
Set<Class<?>> initialEntitySet = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,16 @@ public class AerospikeDataSettings {
boolean createIndexesOnStartup = true;
@Builder.Default
// Automatically refresh indexes cache every <N> seconds
int indexCacheRefreshFrequencySeconds = 3600;
int indexCacheRefreshSeconds = 3600;
@Builder.Default
// Automatically refresh cached server version every <N> seconds
int serverVersionRefreshSeconds = 3600;
@Builder.Default
// Limit amount of results returned by server. Non-positive value means no limit
long queryMaxRecords = 10_000L;
@Builder.Default
// Maximum batch size for batch write operations
int batchWriteSize = 100;
// Define how @Id fields (primary keys) and Map keys are stored: false - always as String,
// true - preserve original type if supported
@Builder.Default
Expand Down
Loading

0 comments on commit 236c665

Please sign in to comment.