Skip to content

Commit

Permalink
propagate errors during async refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
viqtor authored and viktornystrom committed Aug 21, 2023
1 parent 1c4f393 commit 0ea3c95
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@ public void triggerRefresh() {
* <p>
* This is an asynchronous call.
*/
public void triggerAsyncRefresh() {
triggerAsyncRefreshWithDelay(0);
public CompletableFuture<Void> triggerAsyncRefresh() {
return triggerAsyncRefreshWithDelay(0);
}

/**
Expand All @@ -256,10 +256,10 @@ public void triggerAsyncRefresh() {
*
* @param delayMillis the delay, in millseconds, before triggering the refresh
*/
public void triggerAsyncRefreshWithDelay(int delayMillis) {
public CompletableFuture<Void> triggerAsyncRefreshWithDelay(int delayMillis) {
final long targetBeginTime = System.currentTimeMillis() + delayMillis;

refreshExecutor.execute(() -> {
return CompletableFuture.runAsync(() -> {
try {
long delay = targetBeginTime - System.currentTimeMillis();
if (delay > 0)
Expand All @@ -278,7 +278,7 @@ public void triggerAsyncRefreshWithDelay(int delayMillis) {
LOG.log(Level.SEVERE, "Async refresh failed", e);
throw e;
}
});
}, refreshExecutor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.netflix.hollow.tools.compact.HollowCompactor.CompactionConfig;
import java.time.Duration;
import java.util.BitSet;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -487,6 +488,28 @@ public void consumerFilteringSupport() {
Assert.fail(); // fail if UnsupportedOperationException was not thrown
}

@Test
public void consumerErrorsDuringRefreshArePropagated() {
HollowProducer producer = HollowProducer.withPublisher(blobStore)
.withAnnouncer(announcement)
.withBlobStager(new HollowInMemoryBlobStager())
.build();
long v1 = runCycle(producer, 1);

InMemoryBlobStore otherBlobStore = new InMemoryBlobStore();
HollowConsumer consumer = HollowConsumer.withBlobRetriever(otherBlobStore)
.withAnnouncementWatcher(announcement)
.build();

try {
consumer.triggerAsyncRefresh().toCompletableFuture().join();
Assert.fail("Expected exception to be thrown by async refresh.");
} catch (Exception e) {
Assert.assertTrue(e instanceof CompletionException);
Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
}
}

private long runCycle(HollowProducer producer, final int cycleNumber) {
return producer.runCycle(state -> state.add(cycleNumber));
}
Expand Down

0 comments on commit 0ea3c95

Please sign in to comment.