Skip to content

Commit

Permalink
Restore inter-segment search concurrency with synthetic source is ena…
Browse files Browse the repository at this point in the history
…bled (elastic#103690)

We recently (see elastic#102748) disabled concurrency whenever synthetic source is enabled in the mappings. That is because
the current design of synthetic field loaders relies on sequential access to segments. The synthetic loader
is global, and holds information that is local to the leaf, which gets overridden back and forth whenever
inter-segment concurrency is enabled. This causes non deterministic behaviour when a script or runtime
field  accesses synthetic source.

This commit applies a quick fix, which is to create a copy of the synthetic field loader for each segment,
so that each segment can do its independent loading without overriding each other. Long term, we'll want to
redesign these abstractions to not mix up global concepts with leaf concepts.
  • Loading branch information
javanna committed Jan 2, 2024
1 parent 0eade74 commit 87c4ee7
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 33 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/103690.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 103690
summary: Restore inter-segment search concurrency with synthetic source is enabled
area: Search
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -79,12 +80,15 @@ public Set<String> requiredStoredFields() {
* Load {@code _source} from doc values.
*/
class Synthetic implements SourceLoader {
private final SyntheticFieldLoader loader;
private final Map<String, SyntheticFieldLoader.StoredFieldLoader> storedFieldLoaders;
private final Supplier<SyntheticFieldLoader> syntheticFieldLoaderLeafSupplier;
private final Set<String> requiredStoredFields;

public Synthetic(Mapping mapping) {
loader = mapping.syntheticFieldLoader();
storedFieldLoaders = Map.copyOf(loader.storedFieldLoaders().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
this.syntheticFieldLoaderLeafSupplier = mapping::syntheticFieldLoader;
this.requiredStoredFields = syntheticFieldLoaderLeafSupplier.get()
.storedFieldLoaders()
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
}

@Override
Expand All @@ -94,19 +98,26 @@ public boolean reordersFieldValues() {

@Override
public Set<String> requiredStoredFields() {
return storedFieldLoaders.keySet();
return requiredStoredFields;
}

@Override
public Leaf leaf(LeafReader reader, int[] docIdsInLeaf) throws IOException {
return new SyntheticLeaf(loader.docValuesLoader(reader, docIdsInLeaf));
SyntheticFieldLoader loader = syntheticFieldLoaderLeafSupplier.get();
return new SyntheticLeaf(loader, loader.docValuesLoader(reader, docIdsInLeaf));
}

private class SyntheticLeaf implements Leaf {
private static class SyntheticLeaf implements Leaf {
private final SyntheticFieldLoader loader;
private final SyntheticFieldLoader.DocValuesLoader docValuesLoader;
private final Map<String, SyntheticFieldLoader.StoredFieldLoader> storedFieldLoaders;

private SyntheticLeaf(SyntheticFieldLoader.DocValuesLoader docValuesLoader) {
private SyntheticLeaf(SyntheticFieldLoader loader, SyntheticFieldLoader.DocValuesLoader docValuesLoader) {
this.loader = loader;
this.docValuesLoader = docValuesLoader;
this.storedFieldLoaders = Map.copyOf(
loader.storedFieldLoaders().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.index.fielddata.FieldDataContext;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexOrdinalsFieldData;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.IdLoader;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NestedLookup;
Expand Down Expand Up @@ -167,20 +166,13 @@ final class DefaultSearchContext extends SearchContext {
this.indexShard = readerContext.indexShard();

Engine.Searcher engineSearcher = readerContext.acquireSearcher("search");
int maximumNumberOfSlices;
if (hasSyntheticSource(indexService)) {
// accessing synthetic source is not thread safe
maximumNumberOfSlices = 1;
} else {
maximumNumberOfSlices = determineMaximumNumberOfSlices(
executor,
request,
resultsType,
enableQueryPhaseParallelCollection,
field -> getFieldCardinality(field, readerContext.indexService(), engineSearcher.getDirectoryReader())
);

}
int maximumNumberOfSlices = determineMaximumNumberOfSlices(
executor,
request,
resultsType,
enableQueryPhaseParallelCollection,
field -> getFieldCardinality(field, readerContext.indexService(), engineSearcher.getDirectoryReader())
);
if (executor == null) {
this.searcher = new ContextIndexSearcher(
engineSearcher.getIndexReader(),
Expand Down Expand Up @@ -222,14 +214,6 @@ final class DefaultSearchContext extends SearchContext {
}
}

private static boolean hasSyntheticSource(IndexService indexService) {
DocumentMapper documentMapper = indexService.mapperService().documentMapper();
if (documentMapper != null) {
return documentMapper.sourceMapper().isSynthetic();
}
return false;
}

static long getFieldCardinality(String field, IndexService indexService, DirectoryReader directoryReader) {
MappedFieldType mappedFieldType = indexService.mapperService().fieldType(field);
if (mappedFieldType == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,6 @@ public void testClearQueryCancellationsOnClose() throws IOException {
when(indexShard.getThreadPool()).thenReturn(threadPool);

IndexService indexService = mock(IndexService.class);
MapperService mapperService = mock(MapperService.class);
when(indexService.mapperService()).thenReturn(mapperService);

try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir)) {

Expand Down

0 comments on commit 87c4ee7

Please sign in to comment.