Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add live index settings override from config file #610

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@
package com.yelp.nrtsearch.server.config;

import com.google.inject.Inject;
import com.google.protobuf.util.JsonFormat;
import com.yelp.nrtsearch.server.grpc.IndexLiveSettings;
import com.yelp.nrtsearch.server.grpc.ReplicationServerClient;
import com.yelp.nrtsearch.server.luceneserver.warming.WarmerConfig;
import com.yelp.nrtsearch.server.utils.JsonUtils;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -101,6 +107,7 @@ public class LuceneServerConfiguration {
private final int discoveryFileUpdateIntervalMs;
private final FSTLoadMode completionCodecLoadMode;
private final boolean filterIncompatibleSegmentReaders;
private final Map<String, IndexLiveSettings> indexLiveSettingsOverrides;

private final YamlConfigReader configReader;
private final long maxConnectionAgeForReplication;
Expand Down Expand Up @@ -181,6 +188,27 @@ public LuceneServerConfiguration(InputStream yamlStream) {
configReader.getBoolean("filterIncompatibleSegmentReaders", false);
savePluginBeforeUnzip = configReader.getBoolean("savePluginBeforeUnzip", false);
enableGlobalBucketAccess = configReader.getBoolean("enableGlobalBucketAccess", false);

List<String> indicesWithOverrides = configReader.getKeysOrEmpty("indexLiveSettingsOverrides");
Map<String, IndexLiveSettings> liveSettingsMap = new HashMap<>();
for (String index : indicesWithOverrides) {
IndexLiveSettings liveSettings =
configReader.get(
"indexLiveSettingsOverrides." + index,
obj -> {
try {
String jsonStr = JsonUtils.objectToJsonStr(obj);
IndexLiveSettings.Builder liveSettingsBuilder = IndexLiveSettings.newBuilder();
JsonFormat.parser().ignoringUnknownFields().merge(jsonStr, liveSettingsBuilder);
return liveSettingsBuilder.build();
} catch (IOException e) {
throw new RuntimeException(e);
}
},
IndexLiveSettings.newBuilder().build());
liveSettingsMap.put(index, liveSettings);
}
indexLiveSettingsOverrides = Collections.unmodifiableMap(liveSettingsMap);
}

public ThreadPoolConfiguration getThreadPoolConfiguration() {
Expand Down Expand Up @@ -356,6 +384,11 @@ public boolean getEnableGlobalBucketAccess() {
return enableGlobalBucketAccess;
}

public IndexLiveSettings getLiveSettingsOverride(String indexName) {
return indexLiveSettingsOverrides.getOrDefault(
indexName, IndexLiveSettings.newBuilder().build());
}

/**
* Substitute all sub strings of the form ${FOO} with the environment variable value env[FOO].
* Variable names may only contain letters, numbers, and underscores. If a variable is not present
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class BackendStateManager implements IndexStateManager {
private final String indexName;
private final String indexUniqueName;
private final String id;
private final IndexLiveSettings liveSettingsOverrides;
private final StateBackend stateBackend;
private final GlobalState globalState;

Expand All @@ -59,14 +60,20 @@ public class BackendStateManager implements IndexStateManager {
*
* @param indexName index name
* @param id index instance id
* @param liveSettingsOverrides local overrides for index live settings
* @param stateBackend state backend
* @param globalState global state
*/
public BackendStateManager(
String indexName, String id, StateBackend stateBackend, GlobalState globalState) {
String indexName,
String id,
IndexLiveSettings liveSettingsOverrides,
StateBackend stateBackend,
GlobalState globalState) {
this.indexName = indexName;
this.id = id;
this.indexUniqueName = BackendGlobalState.getUniqueIndexName(indexName, id);
this.liveSettingsOverrides = liveSettingsOverrides;
this.stateBackend = stateBackend;
this.globalState = globalState;
}
Expand All @@ -84,7 +91,8 @@ public synchronized void load() throws IOException {
UpdatedFieldInfo updatedFieldInfo =
FieldUpdateHandler.updateFields(
new FieldAndFacetState(), Collections.emptyMap(), stateInfo.getFieldsMap().values());
currentState = createIndexState(stateInfo, updatedFieldInfo.fieldAndFacetState);
currentState =
createIndexState(stateInfo, updatedFieldInfo.fieldAndFacetState, liveSettingsOverrides);
}

/**
Expand Down Expand Up @@ -115,7 +123,8 @@ public synchronized void create() throws IOException {
throw new IllegalStateException("Creating index, but state already exists for: " + indexName);
}
stateInfo = getDefaultStateInfo();
ImmutableIndexState indexState = createIndexState(stateInfo, new FieldAndFacetState());
ImmutableIndexState indexState =
createIndexState(stateInfo, new FieldAndFacetState(), liveSettingsOverrides);
stateBackend.commitIndexState(indexUniqueName, stateInfo);
currentState = indexState;
}
Expand All @@ -140,19 +149,20 @@ public synchronized IndexSettings updateSettings(IndexSettings settings) throws
}
IndexStateInfo updatedStateInfo = mergeSettings(currentState.getCurrentStateInfo(), settings);
ImmutableIndexState updatedIndexState =
createIndexState(updatedStateInfo, currentState.getFieldAndFacetState());
createIndexState(
updatedStateInfo, currentState.getFieldAndFacetState(), liveSettingsOverrides);
stateBackend.commitIndexState(indexUniqueName, updatedStateInfo);
currentState = updatedIndexState;
return updatedIndexState.getMergedSettings();
}

@Override
public IndexLiveSettings getLiveSettings() {
public IndexLiveSettings getLiveSettings(boolean withLocal) {
ImmutableIndexState indexState = currentState;
if (indexState == null) {
throw new IllegalStateException("No state for index: " + indexName);
}
return indexState.getMergedLiveSettings();
return indexState.getMergedLiveSettings(withLocal);
}

@Override
Expand All @@ -165,13 +175,14 @@ public synchronized IndexLiveSettings updateLiveSettings(IndexLiveSettings liveS
IndexStateInfo updatedStateInfo =
mergeLiveSettings(currentState.getCurrentStateInfo(), liveSettings);
ImmutableIndexState updatedIndexState =
createIndexState(updatedStateInfo, currentState.getFieldAndFacetState());
createIndexState(
updatedStateInfo, currentState.getFieldAndFacetState(), liveSettingsOverrides);
stateBackend.commitIndexState(indexUniqueName, updatedStateInfo);
currentState = updatedIndexState;
for (Map.Entry<Integer, ShardState> entry : currentState.getShards().entrySet()) {
entry.getValue().updatedLiveSettings(liveSettings);
}
return updatedIndexState.getMergedLiveSettings();
return updatedIndexState.getMergedLiveSettings(false);
}

@Override
Expand All @@ -187,7 +198,8 @@ public synchronized String updateFields(List<Field> fields) throws IOException {
IndexStateInfo updatedStateInfo =
replaceFields(currentState.getCurrentStateInfo(), updatedFieldInfo.fields);
ImmutableIndexState updatedIndexState =
createIndexState(updatedStateInfo, updatedFieldInfo.fieldAndFacetState);
createIndexState(
updatedStateInfo, updatedFieldInfo.fieldAndFacetState, liveSettingsOverrides);
stateBackend.commitIndexState(indexUniqueName, updatedStateInfo);
currentState = updatedIndexState;
return updatedIndexState.getAllFieldsJSON();
Expand Down Expand Up @@ -215,7 +227,8 @@ public synchronized void start(
.setGen(currentState.getCurrentStateInfo().getGen() + 1)
.build();
ImmutableIndexState updatedIndexState =
createIndexState(updatedStateInfo, currentState.getFieldAndFacetState());
createIndexState(
updatedStateInfo, currentState.getFieldAndFacetState(), liveSettingsOverrides);
stateBackend.commitIndexState(indexUniqueName, updatedStateInfo);
currentState = updatedIndexState;
}
Expand All @@ -232,7 +245,10 @@ public IndexState getCurrent() {

// Declared protected for use during testing
protected ImmutableIndexState createIndexState(
IndexStateInfo indexStateInfo, FieldAndFacetState fieldAndFacetState) throws IOException {
IndexStateInfo indexStateInfo,
FieldAndFacetState fieldAndFacetState,
IndexLiveSettings liveSettingsOverrides)
throws IOException {
Map<Integer, ShardState> previousShardState =
currentState == null ? null : currentState.getShards();
return new ImmutableIndexState(
Expand All @@ -242,6 +258,7 @@ protected ImmutableIndexState createIndexState(
indexUniqueName,
indexStateInfo,
fieldAndFacetState,
liveSettingsOverrides,
previousShardState);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public class ImmutableIndexState extends IndexState {
private final IndexStateInfo currentStateInfo;
private final IndexSettings mergedSettings;
private final IndexLiveSettings mergedLiveSettings;
private final IndexLiveSettings mergedLiveSettingsWithLocal;
private final FieldAndFacetState fieldAndFacetState;
private final Map<Integer, ShardState> shards;

Expand All @@ -201,6 +202,7 @@ public class ImmutableIndexState extends IndexState {
* @param uniqueName index name with instance identifier
* @param stateInfo current settings state
* @param fieldAndFacetState current field state
* @param liveSettingsOverrides local overrides for index live settings
* @param previousShardState shard state from previous index state, or null
* @throws IOException on file system error
*/
Expand All @@ -211,6 +213,7 @@ public ImmutableIndexState(
String uniqueName,
IndexStateInfo stateInfo,
FieldAndFacetState fieldAndFacetState,
IndexLiveSettings liveSettingsOverrides,
Map<Integer, ShardState> previousShardState)
throws IOException {
super(globalState, name, globalState.getIndexDirBase().resolve(uniqueName));
Expand Down Expand Up @@ -252,24 +255,27 @@ public ImmutableIndexState(
// live settings
mergedLiveSettings =
mergeLiveSettings(DEFAULT_INDEX_LIVE_SETTINGS, stateInfo.getLiveSettings());
validateLiveSettings(mergedLiveSettings);

maxRefreshSec = mergedLiveSettings.getMaxRefreshSec().getValue();
minRefreshSec = mergedLiveSettings.getMinRefreshSec().getValue();
maxSearcherAgeSec = mergedLiveSettings.getMaxSearcherAgeSec().getValue();
indexRamBufferSizeMB = mergedLiveSettings.getIndexRamBufferSizeMB().getValue();
addDocumentsMaxBufferLen = mergedLiveSettings.getAddDocumentsMaxBufferLen().getValue();
sliceMaxDocs = mergedLiveSettings.getSliceMaxDocs().getValue();
sliceMaxSegments = mergedLiveSettings.getSliceMaxSegments().getValue();
virtualShards = mergedLiveSettings.getVirtualShards().getValue();
maxMergedSegmentMB = mergedLiveSettings.getMaxMergedSegmentMB().getValue();
segmentsPerTier = mergedLiveSettings.getSegmentsPerTier().getValue();
defaultSearchTimeoutSec = mergedLiveSettings.getDefaultSearchTimeoutSec().getValue();
mergedLiveSettingsWithLocal = mergeLiveSettings(mergedLiveSettings, liveSettingsOverrides);

validateLiveSettings(mergedLiveSettingsWithLocal);

maxRefreshSec = mergedLiveSettingsWithLocal.getMaxRefreshSec().getValue();
minRefreshSec = mergedLiveSettingsWithLocal.getMinRefreshSec().getValue();
maxSearcherAgeSec = mergedLiveSettingsWithLocal.getMaxSearcherAgeSec().getValue();
indexRamBufferSizeMB = mergedLiveSettingsWithLocal.getIndexRamBufferSizeMB().getValue();
addDocumentsMaxBufferLen = mergedLiveSettingsWithLocal.getAddDocumentsMaxBufferLen().getValue();
sliceMaxDocs = mergedLiveSettingsWithLocal.getSliceMaxDocs().getValue();
sliceMaxSegments = mergedLiveSettingsWithLocal.getSliceMaxSegments().getValue();
virtualShards = mergedLiveSettingsWithLocal.getVirtualShards().getValue();
maxMergedSegmentMB = mergedLiveSettingsWithLocal.getMaxMergedSegmentMB().getValue();
segmentsPerTier = mergedLiveSettingsWithLocal.getSegmentsPerTier().getValue();
defaultSearchTimeoutSec = mergedLiveSettingsWithLocal.getDefaultSearchTimeoutSec().getValue();
defaultSearchTimeoutCheckEvery =
mergedLiveSettings.getDefaultSearchTimeoutCheckEvery().getValue();
defaultTerminateAfter = mergedLiveSettings.getDefaultTerminateAfter().getValue();
maxMergePreCopyDurationSec = mergedLiveSettings.getMaxMergePreCopyDurationSec().getValue();
verboseMetrics = mergedLiveSettings.getVerboseMetrics().getValue();
mergedLiveSettingsWithLocal.getDefaultSearchTimeoutCheckEvery().getValue();
defaultTerminateAfter = mergedLiveSettingsWithLocal.getDefaultTerminateAfter().getValue();
maxMergePreCopyDurationSec =
mergedLiveSettingsWithLocal.getMaxMergePreCopyDurationSec().getValue();
verboseMetrics = mergedLiveSettingsWithLocal.getVerboseMetrics().getValue();

// If there is previous shard state, use it. Otherwise, initialize the shard.
if (previousShardState != null) {
Expand Down Expand Up @@ -360,9 +366,13 @@ public IndexSettings getMergedSettings() {
return mergedSettings;
}

/** Get the fully merged (with defaults) index live settings. */
public IndexLiveSettings getMergedLiveSettings() {
return mergedLiveSettings;
/**
* Get the fully merged (with defaults) index live settings.
*
* @param withLocal If local overrides should be included in the live settings
*/
public IndexLiveSettings getMergedLiveSettings(boolean withLocal) {
return withLocal ? mergedLiveSettingsWithLocal : mergedLiveSettings;
}

/** Get field and facet state for index. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,12 @@ public interface IndexStateManager extends Closeable {
*/
IndexSettings updateSettings(IndexSettings settings) throws IOException;

/** Get the current index live settings. */
IndexLiveSettings getLiveSettings();
/**
* Get the current index live settings.
*
* @param withLocal If local overrides should be included in the live settings
*/
IndexLiveSettings getLiveSettings(boolean withLocal);

/**
* Update the index live setting from the given input settings. Input settings will be merged into
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static LiveSettingsV2Response handle(
responseSettings =
indexStateManager.updateLiveSettings(liveSettingsRequest.getLiveSettings());
} else {
responseSettings = indexStateManager.getLiveSettings();
responseSettings = indexStateManager.getLiveSettings(false);
}
return LiveSettingsV2Response.newBuilder().setLiveSettings(responseSettings).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.yelp.nrtsearch.server.grpc.DummyResponse;
import com.yelp.nrtsearch.server.grpc.GlobalStateInfo;
import com.yelp.nrtsearch.server.grpc.IndexGlobalState;
import com.yelp.nrtsearch.server.grpc.IndexLiveSettings;
import com.yelp.nrtsearch.server.grpc.Mode;
import com.yelp.nrtsearch.server.grpc.RestoreIndex;
import com.yelp.nrtsearch.server.grpc.StartIndexRequest;
Expand Down Expand Up @@ -129,7 +130,11 @@ public BackendGlobalState(
Map<String, IndexStateManager> managerMap = new HashMap<>();
for (Map.Entry<String, IndexGlobalState> entry : globalStateInfo.getIndicesMap().entrySet()) {
IndexStateManager stateManager =
createIndexStateManager(entry.getKey(), entry.getValue().getId(), stateBackend);
createIndexStateManager(
entry.getKey(),
entry.getValue().getId(),
luceneServerConfiguration.getLiveSettingsOverride(entry.getKey()),
stateBackend);
stateManager.load();
managerMap.put(entry.getKey(), stateManager);
}
Expand Down Expand Up @@ -163,8 +168,11 @@ protected StateBackend createStateBackend() {
* @return index state manager
*/
protected IndexStateManager createIndexStateManager(
String indexName, String indexId, StateBackend stateBackend) {
return new BackendStateManager(indexName, indexId, stateBackend, this);
String indexName,
String indexId,
IndexLiveSettings liveSettingsOverrides,
StateBackend stateBackend) {
return new BackendStateManager(indexName, indexId, liveSettingsOverrides, stateBackend, this);
}

/**
Expand All @@ -188,7 +196,12 @@ public synchronized void reloadStateFromBackend() throws IOException {
String indexName = entry.getKey();
IndexStateManager stateManager = immutableState.indexStateManagerMap.get(indexName);
if (stateManager == null || !entry.getValue().getId().equals(stateManager.getIndexId())) {
stateManager = createIndexStateManager(indexName, entry.getValue().getId(), stateBackend);
stateManager =
createIndexStateManager(
indexName,
entry.getValue().getId(),
getConfiguration().getLiveSettingsOverride(indexName),
stateBackend);
}
stateManager.load();
newManagerMap.put(indexName, stateManager);
Expand Down Expand Up @@ -262,11 +275,21 @@ public synchronized IndexState createIndex(CreateIndexRequest createIndexRequest
IndexStateManager stateManager;
if (createIndexRequest.getExistsWithId().isEmpty()) {
indexId = getIndexId();
stateManager = createIndexStateManager(indexName, indexId, stateBackend);
stateManager =
createIndexStateManager(
indexName,
indexId,
getConfiguration().getLiveSettingsOverride(indexName),
stateBackend);
stateManager.create();
} else {
indexId = createIndexRequest.getExistsWithId();
stateManager = createIndexStateManager(indexName, indexId, stateBackend);
stateManager =
createIndexStateManager(
indexName,
indexId,
getConfiguration().getLiveSettingsOverride(indexName),
stateBackend);
stateManager.load();
}

Expand Down
Loading
Loading