Skip to content

Commit

Permalink
Remove config prop maxAggregateAge, move metrics to PreAggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
at055612 committed Jan 24, 2025
1 parent f13475e commit 3a7a0df
Show file tree
Hide file tree
Showing 16 changed files with 216 additions and 101 deletions.
1 change: 0 additions & 1 deletion stroom-proxy/stroom-proxy-app/proxy-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ proxyConfig:
enabled: true
maxItemsPerAggregate: 1000
maxUncompressedByteSize: "1G"
maxAggregateAge: 10s
aggregationFrequency: 10s

forwardHttpDestinations:
Expand Down
3 changes: 1 addition & 2 deletions stroom-proxy/stroom-proxy-app/proxy-prod.yml.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,7 @@ proxyConfig:
aggregator:
maxItemsPerAggregate: 1000
maxUncompressedByteSize: "1G"
maxAggregateAge: 10m
aggregationFrequency: 1m
aggregationFrequency: 10m

# If you want multiple forward destinations then you will need to edit this file directly
# instead of using env var substitution
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package stroom.proxy.app.handler;

import stroom.proxy.app.DataDirProvider;
import stroom.util.Metrics;
import stroom.util.io.FileName;
import stroom.util.io.FileUtil;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;

import com.codahale.metrics.Histogram;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
Expand All @@ -32,7 +30,6 @@ public class Aggregator {
private final NumberedDirProvider tempAggregatesDirProvider;

private Consumer<Path> destination;
private Histogram aggregateCountHistogram;

@Inject
public Aggregator(final CleanupDirQueue deleteDirQueue,
Expand All @@ -49,10 +46,6 @@ public Aggregator(final CleanupDirQueue deleteDirQueue,
}

tempAggregatesDirProvider = new NumberedDirProvider(aggregatesDir);
aggregateCountHistogram = Metrics.registrationBuilder(getClass())
.addNamePart("aggregateCount")
.histogram()
.createAndRegister();
}

public void addDir(final Path dir) {
Expand All @@ -69,7 +62,6 @@ public void addDir(final Path dir) {
throw new RuntimeException("Unexpected dir count");

} else if (sourceDirCount == 1) {
aggregateCountHistogram.update(1);
// If we only have one source dir then no merging is required, just forward.
try (final Stream<Path> stream = Files.list(dir)) {
stream.forEach(fileGroupDir -> destination.accept(fileGroupDir));
Expand Down Expand Up @@ -129,8 +121,6 @@ public void addDir(final Path dir) {
}
}

aggregateCountHistogram.update(count.get());

// We have finished the merge so transfer the new item to be forwarded.
destination.accept(tempDir);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
import stroom.meta.api.AttributeMapUtil;
import stroom.meta.api.StandardHeaderArguments;
import stroom.proxy.app.DataDirProvider;
import stroom.proxy.app.ProxyConfig;
import stroom.proxy.repo.AggregatorConfig;
import stroom.proxy.repo.FeedKey;
import stroom.proxy.repo.ProxyServices;
import stroom.util.Metrics;
import stroom.util.io.FileName;
import stroom.util.io.FileUtil;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;
import stroom.util.logging.LogUtil;

import com.codahale.metrics.Histogram;
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.inject.Singleton;
Expand Down Expand Up @@ -42,26 +44,31 @@
public class PreAggregator {

private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(PreAggregator.class);
public static final String AGGREGATE_NAME_PART = "aggregate";

private final NumberedDirProvider tempSplittingDirProvider;
private final Path stagedSplittingDir;
private final CleanupDirQueue deleteDirQueue;
private final AggregatorConfig aggregatorConfig;
private final Provider<AggregatorConfig> aggregatorConfigProvider;
private final DataDirProvider dataDirProvider;

private final Path aggregatingDir;

private final Map<FeedKey, AggregateState> aggregateStateMap = new ConcurrentHashMap<>();

private final Histogram aggregateItemCountHistogram;
private final Histogram aggregateByteSizeHistogram;
private final Histogram aggregateAgeHistogram;

private Consumer<Path> destination;

@Inject
public PreAggregator(final CleanupDirQueue deleteDirQueue,
final Provider<ProxyConfig> proxyConfigProvider,
final DataDirProvider dataDirProvider,
final ProxyServices proxyServices) {
final ProxyServices proxyServices,
final Provider<AggregatorConfig> aggregatorConfigProvider) {
this.deleteDirQueue = deleteDirQueue;
this.aggregatorConfig = proxyConfigProvider.get().getAggregatorConfig();
this.aggregatorConfigProvider = aggregatorConfigProvider;
this.dataDirProvider = dataDirProvider;

// Get or create the aggregating dir.
Expand Down Expand Up @@ -101,6 +108,22 @@ public PreAggregator(final CleanupDirQueue deleteDirQueue,
throw new UncheckedIOException(e);
}

aggregateItemCountHistogram = Metrics.registrationBuilder(getClass())
.addNamePart(AGGREGATE_NAME_PART)
.addNamePart(Metrics.COUNT)
.histogram()
.createAndRegister();
aggregateByteSizeHistogram = Metrics.registrationBuilder(getClass())
.addNamePart(AGGREGATE_NAME_PART)
.addNamePart(Metrics.SIZE_IN_BYTES)
.histogram()
.createAndRegister();
aggregateAgeHistogram = Metrics.registrationBuilder(getClass())
.addNamePart(AGGREGATE_NAME_PART)
.addNamePart(Metrics.AGE_MS)
.histogram()
.createAndRegister();

// Periodically close old aggregates.
proxyServices
.addFrequencyExecutor(
Expand All @@ -114,7 +137,8 @@ private void initialiseAggregateStateMap() {
try (final Stream<Path> stream = Files.list(aggregatingDir)) {
// Look at each aggregate dir.
stream.forEach(aggregateDir -> {
final AggregateState aggregateState = new AggregateState(Instant.now(), aggregateDir);
final AggregateState aggregateState = new AggregateState(
aggregatorConfigProvider.get(), aggregateDir);
final AtomicReference<FeedKey> feedKeyRef = new AtomicReference<>();

// Now examine each file group to read state.
Expand Down Expand Up @@ -166,6 +190,7 @@ private void initialiseAggregateStateMap() {

public synchronized void addDir(final Path dir) {
try {
final AggregatorConfig aggregatorConfig = aggregatorConfigProvider.get();
final FileGroup fileGroup = new FileGroup(dir);
final AttributeMap attributeMap = new AttributeMap();
AttributeMapUtil.read(fileGroup.getMeta(), attributeMap);
Expand All @@ -179,7 +204,7 @@ public synchronized void addDir(final Path dir) {
// Calculate where we might want to split the incoming data.
final List<Part> parts;
if (aggregatorConfig.isSplitSources()) {
parts = calculateSplitParts(feedKey, fileGroup);
parts = calculateSplitParts(feedKey, fileGroup, aggregatorConfig);
} else {
parts = calculateOverflowingParts(fileGroup);
}
Expand Down Expand Up @@ -230,13 +255,9 @@ public synchronized void addDir(final Path dir) {
// If we have an aggregate we can close now then do so.
final AggregateState aggregateState = aggregateStateMap
.computeIfAbsent(feedKey, this::createAggregate);
final long maxItemsPerAggregate = aggregatorConfig.getMaxItemsPerAggregate();
final long maxUncompressedByteSize = aggregatorConfig.getMaxUncompressedByteSize();
if (aggregateState.itemCount >= maxItemsPerAggregate ||
aggregateState.totalBytes >= maxUncompressedByteSize) {
if (aggregateState.isTooBig()) {
closeAggregate(feedKey, aggregateState);
}

} catch (final IOException e) {
LOGGER.error(e::getMessage, e);
}
Expand All @@ -261,22 +282,23 @@ private AggregateState addPartToAggregate(final FeedKey feedKey,
dir,
aggregateState.aggregateDir.resolve(dir.getFileName()),
StandardCopyOption.ATOMIC_MOVE);
aggregateState.itemCount += part.items;
aggregateState.totalBytes += part.bytes;
aggregateState.add(part);
return aggregateState;
}

/**
* Calculate the number of logical parts the source zip will need to be split into in order to fit output aggregates
* without them exceeding the size and item count constraints.
*
* @param feedKey The feed
* @param fileGroup the file group to examine.
* @param feedKey The feed
* @param fileGroup the file group to examine.
* @param aggregatorConfig
* @return A list of parts to split the zip data by.
* @throws IOException Could be throws when reading entries.
*/
private List<Part> calculateSplitParts(final FeedKey feedKey,
final FileGroup fileGroup) throws IOException {
final FileGroup fileGroup,
final AggregatorConfig aggregatorConfig) throws IOException {
// Determine if we need to split this data into parts.
final List<Part> parts = new ArrayList<>();
AggregateState aggregateState = aggregateStateMap.computeIfAbsent(feedKey, this::createAggregate);
Expand All @@ -298,8 +320,8 @@ private List<Part> calculateSplitParts(final FeedKey feedKey,

// If the current aggregate has items then we might want to close and start a new one.
if (currentAggregateItemCount > 0 &&
(currentAggregateItemCount + 1 > maxItemsPerAggregate ||
currentAggregateBytes + totalUncompressedSize > maxUncompressedByteSize)) {
(currentAggregateItemCount + 1 > maxItemsPerAggregate ||
currentAggregateBytes + totalUncompressedSize > maxUncompressedByteSize)) {
if (firstEntry) {
// If the first entry immediately causes the current aggregate to exceed the required bounds
// then close it and create a new one.
Expand All @@ -310,9 +332,7 @@ private List<Part> calculateSplitParts(final FeedKey feedKey,
// Create a new aggregate.
aggregateState = aggregateStateMap
.computeIfAbsent(feedKey, this::createAggregate);

} else {

// Split.
parts.add(new Part(partItems, partBytes));
}
Expand Down Expand Up @@ -369,9 +389,20 @@ private synchronized void closeAggregate(final FeedKey feedKey,
LOGGER.debug(() -> "Closing aggregate: " + FileUtil.getCanonicalPath(aggregateState.aggregateDir));
destination.accept(aggregateState.aggregateDir);
aggregateStateMap.remove(feedKey);
captureAggregateMetrics(aggregateState);
LOGGER.debug(() -> "Closed aggregate: " + FileUtil.getCanonicalPath(aggregateState.aggregateDir));
}

private void captureAggregateMetrics(final AggregateState aggregateState) {
try {
aggregateItemCountHistogram.update(aggregateState.itemCount);
aggregateByteSizeHistogram.update(aggregateState.totalBytes);
aggregateAgeHistogram.update(aggregateState.getAge().toMillis());
} catch (Exception e) {
LOGGER.error("Error capturing aggregate stats: {}", LogUtil.exceptionMessage(e), e);
}
}

private PartDirs split(final Path dir, final List<Part> parts) throws IOException {
final String inputDirName = dir.getFileName().toString();
final FileGroup fileGroup = new FileGroup(dir);
Expand Down Expand Up @@ -450,7 +481,7 @@ private AggregateState createAggregate(final FeedKey feedKey) {
Files.createDirectories(aggregateDir);

LOGGER.debug(() -> "Created aggregate: " + FileUtil.getCanonicalPath(aggregateDir));
return new AggregateState(Instant.now(), aggregateDir);
return new AggregateState(aggregatorConfigProvider.get(), aggregateDir);

} catch (final IOException e) {
LOGGER.error(e::getMessage, e);
Expand All @@ -459,12 +490,10 @@ private AggregateState createAggregate(final FeedKey feedKey) {
}

private synchronized void closeOldAggregates() {
aggregateStateMap.forEach((k, v) -> {
final Instant createTime = v.createTime;
final Instant aggregateAfter = createTime.plus(aggregatorConfig.getAggregationFrequency().getDuration());
if (aggregateAfter.isBefore(Instant.now())) {
aggregateStateMap.forEach((feedKey, aggregateState) -> {
if (aggregateState.isTooOld()) {
// Close the current aggregate.
closeAggregate(k, v);
closeAggregate(feedKey, aggregateState);
}
});
}
Expand All @@ -473,29 +502,79 @@ public void setDestination(final Consumer<Path> destination) {
this.destination = destination;
}


// --------------------------------------------------------------------------------


private static class AggregateState {

final Instant createTime;
final Instant aggregateAfter;
final long maxItemsPerAggregate;
final long maxUncompressedByteSize;
final Path aggregateDir;
long itemCount;
long totalBytes;

public AggregateState(final Instant createTime, final Path aggregateDir) {
this.createTime = createTime;
public AggregateState(final AggregatorConfig aggregatorConfig,
final Path aggregateDir) {
this.createTime = Instant.now();
this.aggregateAfter = createTime.plus(aggregatorConfig.getAggregationFrequency().getDuration());
this.maxItemsPerAggregate = aggregatorConfig.getMaxItemsPerAggregate();
this.maxUncompressedByteSize = aggregatorConfig.getMaxUncompressedByteSize();
this.aggregateDir = aggregateDir;
}

void add(final Part part) {
itemCount += part.items;
totalBytes += part.bytes;
}

/**
* @return True if the aggregate's agg is greater than the configured aggregationFrequency
*/
boolean isTooOld() {
return Instant.now().isAfter(aggregateAfter);
}

boolean isTooBig() {
return itemCount >= maxItemsPerAggregate
|| totalBytes >= maxUncompressedByteSize;
}

/**
* @return Current age of the aggregate, i.e. time between its creation time and now
*/
Duration getAge() {
return Duration.between(createTime, Instant.now());
}
}


// --------------------------------------------------------------------------------


/**
* Record of a part items and total byte size.
*
* @param items The number of items.
* @param bytes The total byte size of the part.
*/
private record Part(long items, long bytes) {
record Part(long items, long bytes) {

static Part ZERO = new Part(0, 0);

Part addItem(final long bytes) {
return new Part(
this.items + 1,
this.bytes + bytes);
}
}


// --------------------------------------------------------------------------------


/**
* Associate a dir with a part.
*
Expand All @@ -506,6 +585,10 @@ private record PartDir(Part part, Path dir) {

}


// --------------------------------------------------------------------------------


/**
* Output of a split operation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ void assertMaxItemsPerAggregate(final Config config) {
.isLessThanOrEqualTo(maxItemsPerAggregate);
}

final StroomDuration maxAggregateAge = aggregatorConfig.getMaxAggregateAge();
final StroomDuration aggregationFrequency = aggregatorConfig.getAggregationFrequency();
final List<Duration> aggAges = forwardFiles.stream()
.map(ForwardFileItem::zipItems)
.map(zipItems -> {
Expand All @@ -246,7 +246,7 @@ void assertMaxItemsPerAggregate(final Config config) {
// Each agg should have a receipt time range no wider than the configured max agg age
for (final Duration aggAge : aggAges) {
Assertions.assertThat(aggAge)
.isLessThanOrEqualTo(maxAggregateAge.getDuration());
.isLessThanOrEqualTo(aggregationFrequency.getDuration());
}
}

Expand Down
Loading

0 comments on commit 3a7a0df

Please sign in to comment.