From f3a6b1ed0368a3179713d1110fb094f4fa5ca41d Mon Sep 17 00:00:00 2001 From: Xiaodan Date: Wed, 27 Mar 2024 12:06:20 -0700 Subject: [PATCH] hollow consumer start to read version tag in blob metadata --- .../test/consumer/TestBlobRetriever.java | 10 +-- ...BlobRetrieverWithNearestSnapshotMatch.java | 42 +++++++++++++ .../api/client/HollowClientUpdater.java | 16 ++--- .../hollow/api/client/HollowDataHolder.java | 40 ++++++------ .../api/error/VersionMismatchException.java | 21 +++++++ .../core/read/engine/HollowBlobReader.java | 27 +++++--- .../api/client/HollowClientUpdaterTest.java | 61 +++++++++++++++++++ 7 files changed, 181 insertions(+), 36 deletions(-) create mode 100644 hollow-test/src/main/java/com/netflix/hollow/test/consumer/TestBlobRetrieverWithNearestSnapshotMatch.java create mode 100644 hollow/src/main/java/com/netflix/hollow/api/error/VersionMismatchException.java diff --git a/hollow-test/src/main/java/com/netflix/hollow/test/consumer/TestBlobRetriever.java b/hollow-test/src/main/java/com/netflix/hollow/test/consumer/TestBlobRetriever.java index 90df3d0567..b39fa60e32 100644 --- a/hollow-test/src/main/java/com/netflix/hollow/test/consumer/TestBlobRetriever.java +++ b/hollow-test/src/main/java/com/netflix/hollow/test/consumer/TestBlobRetriever.java @@ -28,10 +28,10 @@ * memory. */ public class TestBlobRetriever implements BlobRetriever { - private final Map snapshots = new HashMap<>(); - private final Map deltas = new HashMap<>(); - private final Map reverseDeltas = new HashMap<>(); - private final Map headers = new HashMap<>(); + protected final Map snapshots = new HashMap<>(); + protected final Map deltas = new HashMap<>(); + protected final Map reverseDeltas = new HashMap<>(); + protected final Map headers = new HashMap<>(); @Override public HeaderBlob retrieveHeaderBlob(long desiredVersion) { @@ -60,7 +60,7 @@ public Blob retrieveReverseDeltaBlob(long currentVersion) { } // so blob can be reused - private void resetStream(Blob b) { + protected void resetStream(Blob b) { try { if (b!= null && b.getInputStream() != null) { b.getInputStream().reset(); diff --git a/hollow-test/src/main/java/com/netflix/hollow/test/consumer/TestBlobRetrieverWithNearestSnapshotMatch.java b/hollow-test/src/main/java/com/netflix/hollow/test/consumer/TestBlobRetrieverWithNearestSnapshotMatch.java new file mode 100644 index 0000000000..e581460f98 --- /dev/null +++ b/hollow-test/src/main/java/com/netflix/hollow/test/consumer/TestBlobRetrieverWithNearestSnapshotMatch.java @@ -0,0 +1,42 @@ +package com.netflix.hollow.test.consumer; + +import com.netflix.hollow.api.consumer.HollowConsumer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class TestBlobRetrieverWithNearestSnapshotMatch extends TestBlobRetriever { + + @Override + public HollowConsumer.Blob retrieveSnapshotBlob(long desiredVersion) { + long version = findNearestSnapshotVersion(desiredVersion); + HollowConsumer.Blob b = snapshots.get(version); + resetStream(b); + return b; + } + + + private long findNearestSnapshotVersion(long desiredVersion) { + List snapshotVersions = new ArrayList<>(); + snapshotVersions.addAll(snapshots.keySet()); + Collections.sort(snapshotVersions); + int start = 0; + int end = snapshotVersions.size() - 1; + int mid = 0; + while (start + 1< end) { + mid = (start + end) / 2; + if (mid < desiredVersion) { + start = mid; + } else if (mid > desiredVersion){ + end = mid; + } else { + return snapshotVersions.get(mid); + } + } + if (end <= desiredVersion) { + return snapshotVersions.get(end); + } + return snapshotVersions.get(start); + } +} diff --git a/hollow/src/main/java/com/netflix/hollow/api/client/HollowClientUpdater.java b/hollow/src/main/java/com/netflix/hollow/api/client/HollowClientUpdater.java index c70ce519d9..6fecd9bb9e 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/client/HollowClientUpdater.java +++ b/hollow/src/main/java/com/netflix/hollow/api/client/HollowClientUpdater.java @@ -16,6 +16,7 @@ */ package com.netflix.hollow.api.client; +import static com.netflix.hollow.core.HollowConstants.VERSION_NONE; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_SCHEMA_HASH; import com.netflix.hollow.api.consumer.HollowConsumer; @@ -119,7 +120,7 @@ public synchronized boolean updateTo(long requestedVersion) throws Throwable { public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersionInfo) throws Throwable { long requestedVersion = requestedVersionInfo.getVersion(); if (requestedVersion == getCurrentVersionId()) { - if (requestedVersion == HollowConstants.VERSION_NONE && hollowDataHolderVolatile == null) { + if (requestedVersion == VERSION_NONE && hollowDataHolderVolatile == null) { LOG.warning("No versions to update to, initializing to empty state"); // attempting to refresh, but no available versions - initialize to empty state hollowDataHolderVolatile = newHollowDataHolder(); @@ -148,16 +149,17 @@ public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersion ? planner.planInitializingUpdate(requestedVersion) : planner.planUpdate(hollowDataHolderVolatile.getCurrentVersion(), requestedVersion, doubleSnapshotConfig.allowDoubleSnapshot()); + boolean isInitialUpdate = getCurrentVersionId() == VERSION_NONE; for (HollowConsumer.RefreshListener listener : localListeners) if (listener instanceof HollowConsumer.TransitionAwareRefreshListener) ((HollowConsumer.TransitionAwareRefreshListener)listener).transitionsPlanned(beforeVersion, requestedVersion, updatePlan.isSnapshotPlan(), updatePlan.getTransitionSequence()); - if (updatePlan.destinationVersion() == HollowConstants.VERSION_NONE + if (updatePlan.destinationVersion() == VERSION_NONE && requestedVersion != HollowConstants.VERSION_LATEST) { String msg = String.format("Could not create an update plan for version %s, because " + "that version or any qualifying previous versions could not be retrieved.", requestedVersion); - if (beforeVersion != HollowConstants.VERSION_NONE) { + if (beforeVersion != VERSION_NONE) { msg += String.format(" Consumer will remain at current version %s until next update attempt.", beforeVersion); } throw new IllegalArgumentException(msg); @@ -185,7 +187,7 @@ public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersion * Also note that hollowDataHolderVolatile only changes for snapshot plans, * and it is only for snapshot plans that HollowDataHolder#initializeAPI is * called. */ - newDh.update(updatePlan, localListeners, () -> hollowDataHolderVolatile = newDh); + newDh.update(updatePlan, localListeners, () -> hollowDataHolderVolatile = newDh, isInitialUpdate); } catch (Throwable t) { // If the update plan failed then revert back to the old holder hollowDataHolderVolatile = oldDh; @@ -194,7 +196,7 @@ public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersion forceDoubleSnapshot = false; } } else { // 0 snapshot and 1+ delta transitions - hollowDataHolderVolatile.update(updatePlan, localListeners, () -> {}); + hollowDataHolderVolatile.update(updatePlan, localListeners, () -> {}, isInitialUpdate); } for(HollowConsumer.RefreshListener refreshListener : localListeners) @@ -245,7 +247,7 @@ public synchronized void removeRefreshListener(HollowConsumer.RefreshListener re public long getCurrentVersionId() { HollowDataHolder hollowDataHolderLocal = hollowDataHolderVolatile; return hollowDataHolderLocal != null ? hollowDataHolderLocal.getCurrentVersion() - : HollowConstants.VERSION_NONE; + : VERSION_NONE; } public void forceDoubleSnapshotNextUpdate() { @@ -256,7 +258,7 @@ public void forceDoubleSnapshotNextUpdate() { * Whether or not a snapshot plan should be created. Visible for testing. */ boolean shouldCreateSnapshotPlan(HollowConsumer.VersionInfo incomingVersionInfo) { - if (getCurrentVersionId() == HollowConstants.VERSION_NONE + if (getCurrentVersionId() == VERSION_NONE || (forceDoubleSnapshot && doubleSnapshotConfig.allowDoubleSnapshot())) { return true; } diff --git a/hollow/src/main/java/com/netflix/hollow/api/client/HollowDataHolder.java b/hollow/src/main/java/com/netflix/hollow/api/client/HollowDataHolder.java index 37ba151694..eda9be4a22 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/client/HollowDataHolder.java +++ b/hollow/src/main/java/com/netflix/hollow/api/client/HollowDataHolder.java @@ -19,7 +19,6 @@ import com.netflix.hollow.api.consumer.HollowConsumer; import com.netflix.hollow.api.consumer.HollowConsumer.TransitionAwareRefreshListener; import com.netflix.hollow.api.custom.HollowAPI; -import com.netflix.hollow.core.HollowConstants; import com.netflix.hollow.core.memory.MemoryMode; import com.netflix.hollow.core.read.HollowBlobInput; import com.netflix.hollow.core.read.OptionalBlobPartInput; @@ -35,6 +34,8 @@ import java.lang.ref.WeakReference; import java.util.logging.Logger; +import static com.netflix.hollow.core.HollowConstants.VERSION_NONE; + /** * A class comprising much of the internal state of a {@link HollowConsumer}. Not intended for external consumption. */ @@ -56,7 +57,7 @@ class HollowDataHolder { private WeakReference priorHistoricalDataAccess; - private long currentVersion = HollowConstants.VERSION_NONE; + private long currentVersion = VERSION_NONE; HollowDataHolder(HollowReadStateEngine stateEngine, HollowAPIFactory apiFactory, @@ -106,7 +107,7 @@ HollowDataHolder setSkipTypeShardUpdateWithNoAdditions(boolean skipTypeShardUpda } void update(HollowUpdatePlan updatePlan, HollowConsumer.RefreshListener[] refreshListeners, - Runnable apiInitCallback) throws Throwable { + Runnable apiInitCallback, boolean isInitialUpdate) throws Throwable { // Only fail if double snapshot is configured. // This is a short term solution until it is decided to either remove this feature // or refine it. @@ -123,19 +124,19 @@ void update(HollowUpdatePlan updatePlan, HollowConsumer.RefreshListener[] refres } if (updatePlan.isSnapshotPlan()) { - applySnapshotPlan(updatePlan, refreshListeners, apiInitCallback); + applySnapshotPlan(updatePlan, refreshListeners, apiInitCallback, isInitialUpdate); } else { - applyDeltaOnlyPlan(updatePlan, refreshListeners); + applyDeltaOnlyPlan(updatePlan, refreshListeners, isInitialUpdate); } } private void applySnapshotPlan(HollowUpdatePlan updatePlan, HollowConsumer.RefreshListener[] refreshListeners, - Runnable apiInitCallback) throws Throwable { - applySnapshotTransition(updatePlan.getSnapshotTransition(), refreshListeners, apiInitCallback); + Runnable apiInitCallback, boolean isInitialUpdate) throws Throwable { + applySnapshotTransition(updatePlan.getSnapshotTransition(), refreshListeners, apiInitCallback, isInitialUpdate); for(HollowConsumer.Blob blob : updatePlan.getDeltaTransitions()) { - applyDeltaTransition(blob, true, refreshListeners); + applyDeltaTransition(blob, true, refreshListeners, isInitialUpdate); } try { @@ -149,10 +150,10 @@ private void applySnapshotPlan(HollowUpdatePlan updatePlan, private void applySnapshotTransition(HollowConsumer.Blob snapshotBlob, HollowConsumer.RefreshListener[] refreshListeners, - Runnable apiInitCallback) throws Throwable { + Runnable apiInitCallback, boolean isInitialUpdate) throws Throwable { try (HollowBlobInput in = HollowBlobInput.modeBasedSelector(memoryMode, snapshotBlob); OptionalBlobPartInput optionalPartIn = snapshotBlob.getOptionalBlobPartInputs()) { - applyStateEngineTransition(in, optionalPartIn, snapshotBlob, refreshListeners); + applyStateEngineTransition(in, optionalPartIn, snapshotBlob, refreshListeners, isInitialUpdate); initializeAPI(apiInitCallback); for (HollowConsumer.RefreshListener refreshListener : refreshListeners) { @@ -165,7 +166,11 @@ private void applySnapshotTransition(HollowConsumer.Blob snapshotBlob, } } - private void applyStateEngineTransition(HollowBlobInput in, OptionalBlobPartInput optionalPartIn, HollowConsumer.Blob transition, HollowConsumer.RefreshListener[] refreshListeners) throws IOException { + private void applyStateEngineTransition(HollowBlobInput in, + OptionalBlobPartInput optionalPartIn, + HollowConsumer.Blob transition, + HollowConsumer.RefreshListener[] refreshListeners, + boolean isInitialUpdate) throws IOException { if(transition.isSnapshot()) { if(filter == null) { reader.readSnapshot(in, optionalPartIn); @@ -174,11 +179,11 @@ private void applyStateEngineTransition(HollowBlobInput in, OptionalBlobPartInpu reader.readSnapshot(in, optionalPartIn, filter); } } else { - reader.applyDelta(in, optionalPartIn); + long expectedToVersion = transition.getToVersion(); + reader.applyDelta(in, optionalPartIn, expectedToVersion, isInitialUpdate); } setVersion(transition.getToVersion()); - for(HollowConsumer.RefreshListener refreshListener : refreshListeners) refreshListener.blobLoaded(transition); } @@ -199,13 +204,14 @@ private void initializeAPI(Runnable r) { } } - private void applyDeltaOnlyPlan(HollowUpdatePlan updatePlan, HollowConsumer.RefreshListener[] refreshListeners) throws Throwable { + private void applyDeltaOnlyPlan(HollowUpdatePlan updatePlan, HollowConsumer.RefreshListener[] refreshListeners, + boolean isInitialUpdate) throws Throwable { for(HollowConsumer.Blob blob : updatePlan) { - applyDeltaTransition(blob, false, refreshListeners); + applyDeltaTransition(blob, false, refreshListeners, isInitialUpdate); } } - private void applyDeltaTransition(HollowConsumer.Blob blob, boolean isSnapshotPlan, HollowConsumer.RefreshListener[] refreshListeners) throws Throwable { + private void applyDeltaTransition(HollowConsumer.Blob blob, boolean isSnapshotPlan, HollowConsumer.RefreshListener[] refreshListeners, boolean isInitialUpdate) throws Throwable { if (!memoryMode.equals(MemoryMode.ON_HEAP)) { LOG.warning("Skipping delta transition in shared-memory mode"); return; @@ -213,7 +219,7 @@ private void applyDeltaTransition(HollowConsumer.Blob blob, boolean isSnapshotPl try (HollowBlobInput in = HollowBlobInput.modeBasedSelector(memoryMode, blob); OptionalBlobPartInput optionalPartIn = blob.getOptionalBlobPartInputs()) { - applyStateEngineTransition(in, optionalPartIn, blob, refreshListeners); + applyStateEngineTransition(in, optionalPartIn, blob, refreshListeners, isInitialUpdate); if(objLongevityConfig.enableLongLivedObjectSupport()) { HollowDataAccess previousDataAccess = currentAPI.getDataAccess(); diff --git a/hollow/src/main/java/com/netflix/hollow/api/error/VersionMismatchException.java b/hollow/src/main/java/com/netflix/hollow/api/error/VersionMismatchException.java new file mode 100644 index 0000000000..3f3288da02 --- /dev/null +++ b/hollow/src/main/java/com/netflix/hollow/api/error/VersionMismatchException.java @@ -0,0 +1,21 @@ +package com.netflix.hollow.api.error; + +public class VersionMismatchException extends HollowException { + private final long expectedVersion; + + private final long actualVersion; + + public VersionMismatchException(long expectedVersion, long actualVersion) { + super("toVersion in blob did not match toVersion requested in transition; actualToVersion=" + actualVersion + ", expectedToVersion=" + expectedVersion); + this.expectedVersion = expectedVersion; + this.actualVersion = actualVersion; + } + + public long getExpectedVersion() { + return expectedVersion; + } + + public long getActualVersion() { + return actualVersion; + } +} diff --git a/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java index 8c6bf60893..f4fecd80df 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java +++ b/hollow/src/main/java/com/netflix/hollow/core/read/engine/HollowBlobReader.java @@ -16,6 +16,7 @@ */ package com.netflix.hollow.core.read.engine; +import com.netflix.hollow.api.error.VersionMismatchException; import com.netflix.hollow.core.HollowBlobHeader; import com.netflix.hollow.core.HollowBlobOptionalPartHeader; import com.netflix.hollow.core.memory.MemoryMode; @@ -44,6 +45,8 @@ import java.util.TreeSet; import java.util.logging.Logger; +import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION; + /** * A HollowBlobReader is used to populate and update data in a {@link HollowReadStateEngine}, via the consumption * of snapshot and delta blobs. Caller can choose between on-heap or shared-memory mode; defaults to (and for @@ -149,7 +152,8 @@ public void readSnapshot(HollowBlobInput in, OptionalBlobPartInput optionalParts if(optionalParts != null) optionalPartInputs = optionalParts.getInputsByPartName(in.getMemoryMode()); - HollowBlobHeader header = readHeader(in, false); + HollowBlobHeader header = headerReader.readHeader(in); + applyHeader(header, false); List partHeaders = readPartHeaders(header, optionalPartInputs, in.getMemoryMode()); List allSchemas = combineSchemas(header, partHeaders); @@ -218,12 +222,24 @@ public void applyDelta(HollowBlobInput in) throws IOException { } public void applyDelta(HollowBlobInput in, OptionalBlobPartInput optionalParts) throws IOException { + applyDelta(in, optionalParts, 0, false); + } + public void applyDelta(HollowBlobInput in, OptionalBlobPartInput optionalParts, long expectedVersion, boolean isInitialUpdate) throws IOException { validateMemoryMode(in.getMemoryMode()); Map optionalPartInputs = null; if(optionalParts != null) optionalPartInputs = optionalParts.getInputsByPartName(in.getMemoryMode()); - - HollowBlobHeader header = readHeader(in, true); + HollowBlobHeader header = headerReader.readHeader(in); + if (expectedVersion != 0 && !isInitialUpdate) { + String to_version_tag = header.getHeaderTags().get(HEADER_TAG_PRODUCER_TO_VERSION); + if (to_version_tag != null) { + long to_version = Long.parseLong(to_version_tag); + if (expectedVersion != to_version) { + throw new VersionMismatchException(expectedVersion, to_version); + } + } + } + applyHeader(header, true); List partHeaders = readPartHeaders(header, optionalPartInputs, in.getMemoryMode()); notifyBeginUpdate(); @@ -258,16 +274,13 @@ public void applyDelta(HollowBlobInput in, OptionalBlobPartInput optionalParts) notifyEndUpdate(); } - private HollowBlobHeader readHeader(HollowBlobInput in, boolean isDelta) throws IOException { - HollowBlobHeader header = headerReader.readHeader(in); - + private void applyHeader(HollowBlobHeader header, boolean isDelta) throws IOException { if(isDelta && header.getOriginRandomizedTag() != stateEngine.getCurrentRandomizedTag()) throw new IOException("Attempting to apply a delta to a state from which it was not originated!"); stateEngine.setCurrentRandomizedTag(header.getDestinationRandomizedTag()); stateEngine.setOriginRandomizedTag(header.getOriginRandomizedTag()); stateEngine.setHeaderTags(header.getHeaderTags()); - return header; } private List readPartHeaders(HollowBlobHeader header, Map inputsByPartName, MemoryMode mode) throws IOException { diff --git a/hollow/src/test/java/com/netflix/hollow/api/client/HollowClientUpdaterTest.java b/hollow/src/test/java/com/netflix/hollow/api/client/HollowClientUpdaterTest.java index e7b16c0437..2ffc9c78ba 100644 --- a/hollow/src/test/java/com/netflix/hollow/api/client/HollowClientUpdaterTest.java +++ b/hollow/src/test/java/com/netflix/hollow/api/client/HollowClientUpdaterTest.java @@ -18,6 +18,7 @@ import static com.netflix.hollow.core.HollowConstants.VERSION_LATEST; import static com.netflix.hollow.core.HollowConstants.VERSION_NONE; +import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_SCHEMA_HASH; import static java.util.Collections.emptyList; import static org.junit.Assert.assertEquals; @@ -30,6 +31,7 @@ import com.netflix.hollow.api.consumer.HollowConsumer; import com.netflix.hollow.api.custom.HollowAPI; +import com.netflix.hollow.api.error.VersionMismatchException; import com.netflix.hollow.api.metrics.HollowConsumerMetrics; import com.netflix.hollow.core.HollowStateEngine; import com.netflix.hollow.core.memory.MemoryMode; @@ -43,9 +45,11 @@ import com.netflix.hollow.test.HollowWriteStateEngineBuilder; import com.netflix.hollow.test.consumer.TestBlob; import com.netflix.hollow.test.consumer.TestBlobRetriever; +import com.netflix.hollow.test.consumer.TestBlobRetrieverWithNearestSnapshotMatch; import com.netflix.hollow.test.consumer.TestHollowConsumer; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -55,6 +59,8 @@ import java.util.logging.LogManager; import java.util.logging.LogRecord; import java.util.logging.Logger; + +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -261,6 +267,61 @@ private static void addActor(HollowWriteStateEngine stateEngine, int id) { stateEngine.add("Actor", rec); } + @Test + public void testReadBlobVersionInitialInconsistent() throws IOException { + testReadBlobVersion(2, 3, 1, true); + } + + @Test + public void testReadBlobVersionNotInitialInconsistent() throws IOException { + testReadBlobVersion(2, 3, 1, false); + } + + private void testReadBlobVersion(long metadataDesiredVersion1, + long blobDesiredVersion1, + long snapshotVersion, + boolean isInitialUpdate) throws IOException { + TestBlobRetrieverWithNearestSnapshotMatch testBlobRetriever = new TestBlobRetrieverWithNearestSnapshotMatch(); + TestHollowConsumer testHollowConsumer = (new TestHollowConsumer.Builder()) + .withBlobRetriever(testBlobRetriever) + .build(); + HollowWriteStateEngine stateEngine = new HollowWriteStateEngine(); + HollowBlobWriter writer = new HollowBlobWriter(stateEngine); + + HollowObjectSchema movieSchema = new HollowObjectSchema("Movie", 1, "id"); + movieSchema.addField("id", HollowObjectSchema.FieldType.INT); + stateEngine.addTypeState(new HollowObjectTypeWriteState(movieSchema)); + addMovie(stateEngine, 1); + stateEngine.prepareForWrite(); + ByteArrayOutputStream baos_v1 = new ByteArrayOutputStream(); + writer.writeSnapshot(baos_v1); + testBlobRetriever.addSnapshot(snapshotVersion, new TestBlob(1,new ByteArrayInputStream(baos_v1.toByteArray()))); + + if (isInitialUpdate) { + stateEngine.prepareForNextCycle(); + ByteArrayOutputStream baos_delta = new ByteArrayOutputStream(); + stateEngine.addHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION, String.valueOf(blobDesiredVersion1)); + addMovie(stateEngine, 2); + stateEngine.prepareForWrite(); + writer.writeDelta(baos_delta); + testBlobRetriever.addDelta(snapshotVersion, new TestBlob(snapshotVersion, metadataDesiredVersion1, new ByteArrayInputStream(baos_delta.toByteArray()))); + testHollowConsumer.triggerRefreshTo(metadataDesiredVersion1); + assertEquals(metadataDesiredVersion1, testHollowConsumer.getCurrentVersionId()); + } else { + expectedException.expect(VersionMismatchException.class); + testHollowConsumer.triggerRefreshTo(snapshotVersion); + stateEngine.prepareForNextCycle(); + ByteArrayOutputStream baos_delta = new ByteArrayOutputStream(); + stateEngine.addHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION, String.valueOf(blobDesiredVersion1)); + addMovie(stateEngine, 2); + stateEngine.prepareForWrite(); + writer.writeDelta(baos_delta); + testBlobRetriever.addDelta(snapshotVersion, new TestBlob(snapshotVersion, metadataDesiredVersion1, new ByteArrayInputStream(baos_delta.toByteArray()))); + testHollowConsumer.triggerRefreshTo(metadataDesiredVersion1); + assertEquals(snapshotVersion, testHollowConsumer.getCurrentVersionId()); + } + } + private TestHollowConsumer schemaChangeSubject(HollowWriteStateEngine stateEngine, boolean doubleSnapshotOnSchemaChange, boolean failIfDoubleSnapshot, boolean failIfDelta, boolean allowDoubleSnapshots) throws Exception { HollowConsumer.DoubleSnapshotConfig supportsSchemaChange = new HollowConsumer.DoubleSnapshotConfig() {