From 3e2082cccd78b170f2a45a155744714702ce50f6 Mon Sep 17 00:00:00 2001 From: Chris Dennis Date: Mon, 29 Aug 2022 11:07:21 -0400 Subject: [PATCH 1/3] Upgrade to TC Core 5.10.8 & Platform 5.10.7 : Adopt new async client implementation --- .../SimpleClusterTierManagerClientEntity.java | 25 +++--- .../lock/VoltronReadWriteLockClient.java | 34 ++++---- .../internal/service/ConnectionState.java | 67 ++++++++++++++-- .../store/CommonServerStoreProxy.java | 2 +- .../store/ReconnectingServerStoreProxy.java | 13 +++- .../store/SimpleClusterTierClientEntity.java | 78 +++++++------------ ...rTierManagerClientEntityExceptionTest.java | 36 +-------- .../store/StrongServerStoreProxyTest.java | 2 +- clustered/ehcache-clustered/build.gradle | 28 +++++-- .../IterationFailureBehaviorTest.java | 4 +- .../clustered/TerminatedServerTest.java | 10 +-- ...ltronReadWriteLockServerEntityService.java | 6 ++ .../org/ehcache/core/util/ExceptionUtil.java | 32 ++++++++ gradle.properties | 8 +- 14 files changed, 199 insertions(+), 146 deletions(-) create mode 100644 ehcache-core/src/main/java/org/ehcache/core/util/ExceptionUtil.java diff --git a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/SimpleClusterTierManagerClientEntity.java b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/SimpleClusterTierManagerClientEntity.java index ae3f42e1b7..ada56edbb4 100644 --- a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/SimpleClusterTierManagerClientEntity.java +++ b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/SimpleClusterTierManagerClientEntity.java @@ -30,11 +30,11 @@ import org.terracotta.entity.EndpointDelegate; import org.terracotta.entity.EntityClientEndpoint; import org.terracotta.entity.EntityResponse; -import org.terracotta.entity.InvokeFuture; import org.terracotta.entity.MessageCodecException; -import org.terracotta.exception.EntityException; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; /** * The client-side {@link Entity} through which clustered cache operations are performed. @@ -76,13 +76,13 @@ public void close() { @Override public void validate(ServerSideConfiguration config) throws ClusterException { - invokeInternal(messageFactory.validateStoreManager(config), false); + invokeInternal(messageFactory.validateStoreManager(config)); } @Override public Set prepareForDestroy() { try { - PrepareForDestroy response = (PrepareForDestroy) invokeInternal(messageFactory.prepareForDestroy(), true); + PrepareForDestroy response = (PrepareForDestroy) invokeInternal(messageFactory.prepareForDestroy()); return response.getStores(); } catch (ClusterException e) { // TODO handle this @@ -90,24 +90,24 @@ public Set prepareForDestroy() { return null; } - private EhcacheEntityResponse invokeInternal(EhcacheEntityMessage message, boolean replicate) + private EhcacheEntityResponse invokeInternal(EhcacheEntityMessage message) throws ClusterException { try { - EhcacheEntityResponse response = waitFor(invokeAsync(message, replicate)); + EhcacheEntityResponse response = waitFor(invokeAsync(message)); if (EhcacheResponseType.FAILURE.equals(response.getResponseType())) { throw ((Failure)response).getCause(); } else { return response; } - } catch (EntityException | MessageCodecException e) { - throw new RuntimeException(message + " error: " + e.toString(), e); + } catch (ExecutionException | MessageCodecException e) { + throw new RuntimeException(message + " error: " + e, e); } } - private InvokeFuture invokeAsync(EhcacheEntityMessage message, boolean replicate) + private Future invokeAsync(EhcacheEntityMessage message) throws MessageCodecException { - return endpoint.beginInvoke().message(message).replicate(replicate).invoke(); + return endpoint.message(message).invoke(); } /** @@ -117,10 +117,9 @@ private InvokeFuture invokeAsync(EhcacheEntityMessage mes * @param future Future we want to get * @param type of the response * @return the result of the get - * @throws EntityException exception that might be thrown by the future in case of error */ - private static T waitFor(InvokeFuture future) - throws EntityException { + private static T waitFor(Future future) + throws ExecutionException { boolean interrupted = Thread.interrupted(); try { while (true) { diff --git a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/lock/VoltronReadWriteLockClient.java b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/lock/VoltronReadWriteLockClient.java index 1c9c6ab212..5d8e9d971e 100644 --- a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/lock/VoltronReadWriteLockClient.java +++ b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/lock/VoltronReadWriteLockClient.java @@ -16,6 +16,8 @@ package org.ehcache.clustered.client.internal.lock; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import org.ehcache.clustered.common.internal.lock.LockMessaging; import org.ehcache.clustered.common.internal.lock.LockMessaging.LockOperation; @@ -24,9 +26,7 @@ import org.terracotta.connection.entity.Entity; import org.terracotta.entity.EndpointDelegate; import org.terracotta.entity.EntityClientEndpoint; -import org.terracotta.entity.InvokeFuture; import org.terracotta.entity.MessageCodecException; -import org.terracotta.exception.EntityException; public class VoltronReadWriteLockClient implements Entity { @@ -106,26 +106,22 @@ private LockOperation getCurrentState() { } private LockTransition invoke(LockOperation operation) { + Future result = endpoint.message(operation).invoke(); + boolean interrupted = Thread.interrupted(); try { - InvokeFuture result = endpoint.beginInvoke().message(operation).replicate(false).invoke(); - boolean interrupted = false; - try { - while (true) { - try { - return result.get(); - } catch (InterruptedException ex) { - interrupted = true; - } catch (EntityException ex) { - throw new IllegalStateException(ex); - } - } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); + while (true) { + try { + return result.get(); + } catch (InterruptedException ex) { + interrupted = true; + } catch (ExecutionException ex) { + throw new IllegalStateException(ex.getCause()); } } - } catch (MessageCodecException ex) { - throw new AssertionError(ex); + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } } } } diff --git a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/service/ConnectionState.java b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/service/ConnectionState.java index d6d92a04f4..7876f935f8 100644 --- a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/service/ConnectionState.java +++ b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/service/ConnectionState.java @@ -41,7 +41,6 @@ import java.io.IOException; import java.util.Properties; -import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -50,6 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static java.util.Objects.requireNonNull; +import static org.ehcache.core.util.ExceptionUtil.containsCause; class ConnectionState { @@ -114,9 +114,13 @@ public ClusterTierClientEntity createClusterTierClientEntity(String cacheId, break; } catch (EntityNotFoundException e) { throw new PerpetualCachePersistenceException("Cluster tier proxy '" + cacheId + "' for entity '" + entityIdentifier + "' does not exist.", e); - } catch (ConnectionClosedException | ConnectionShutdownException e) { - LOGGER.info("Disconnected from the server", e); - handleConnectionClosedException(true); + } catch (Throwable t) { + if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) { + LOGGER.info("Disconnected from the server", t); + handleConnectionClosedException(true); + } else { + throw t; + } } } @@ -151,6 +155,12 @@ private void reconnect() { break; } catch (ConnectionClosedException | ConnectionException e) { LOGGER.error("Re-connection to server failed, trying again", e); + } catch (Throwable t) { + if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) { + LOGGER.error("Re-connection to server failed, trying again", t); + } else { + throw t; + } } } } @@ -180,6 +190,14 @@ private boolean silentDestroyUtil() { LOGGER.info("Disconnected from the server", e); reconnect(); return false; + } catch (Throwable t) { + if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) { + LOGGER.info("Disconnected from the server", t); + reconnect(); + return false; + } else { + throw t; + } } } @@ -262,6 +280,12 @@ public void destroyAll() throws CachePersistenceException { throw new CachePersistenceException("Cannot delete cluster tiers on " + connectionSource, e); } catch (ConnectionClosedException | ConnectionShutdownException e) { handleConnectionClosedException(false); + } catch (Throwable t) { + if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) { + handleConnectionClosedException(false); + } else { + throw t; + } } } } @@ -285,6 +309,12 @@ public void destroy(String name) throws CachePersistenceException { } } catch (ConnectionClosedException | ConnectionShutdownException e) { reconnect(); + } catch (Throwable t) { + if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) { + reconnect(); + } else { + throw t; + } } } @@ -299,6 +329,12 @@ public void destroy(String name) throws CachePersistenceException { break; } catch (ConnectionClosedException | ConnectionShutdownException e) { handleConnectionClosedException(false); + } catch (Throwable t) { + if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) { + handleConnectionClosedException(false); + } else { + throw t; + } } } } @@ -315,6 +351,14 @@ private void autoCreateEntity() throws ClusterTierManagerValidationException, Il LOGGER.info("Disconnected from the server", e); reconnect(); continue; + } catch (Throwable t) { + if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) { + LOGGER.info("Disconnected from the server", t); + reconnect(); + continue; + } else { + throw t; + } } try { @@ -330,6 +374,13 @@ private void autoCreateEntity() throws ClusterTierManagerValidationException, Il } catch (ConnectionClosedException | ConnectionShutdownException e) { LOGGER.info("Disconnected from the server", e); reconnect(); + } catch (Throwable t) { + if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) { + LOGGER.info("Disconnected from the server", t); + reconnect(); + } else { + throw t; + } } } @@ -349,8 +400,12 @@ private void handleConnectionClosedException(boolean retrieve) throws ClusterTie } connectionRecoveryListener.run(); break; - } catch (ConnectionClosedException | ConnectionShutdownException e) { - LOGGER.info("Disconnected from the server", e); + } catch (Throwable t) { + if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) { + LOGGER.info("Disconnected from the server", t); + } else { + throw t; + } } } } diff --git a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/CommonServerStoreProxy.java b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/CommonServerStoreProxy.java index 1d8d2949a6..8f44034bc6 100644 --- a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/CommonServerStoreProxy.java +++ b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/CommonServerStoreProxy.java @@ -143,7 +143,7 @@ public ChainEntry get(long key) throws TimeoutException { @Override public void append(long key, ByteBuffer payLoad) { try { - entity.invokeAndWaitForReceive(new AppendMessage(key, payLoad), true); + entity.invokeAndWaitForComplete(new AppendMessage(key, payLoad), true); } catch (Exception e) { throw new ServerStoreProxyException(e); } diff --git a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/ReconnectingServerStoreProxy.java b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/ReconnectingServerStoreProxy.java index c28a8d55f5..95d789aa00 100644 --- a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/ReconnectingServerStoreProxy.java +++ b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/ReconnectingServerStoreProxy.java @@ -16,7 +16,6 @@ package org.ehcache.clustered.client.internal.store; import org.ehcache.clustered.client.internal.store.lock.LockingServerStoreProxy; -import org.ehcache.clustered.client.internal.store.lock.LockingServerStoreProxyImpl; import org.ehcache.clustered.common.internal.store.Chain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +28,8 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import static org.ehcache.core.util.ExceptionUtil.containsCause; + public class ReconnectingServerStoreProxy implements LockingServerStoreProxy { private static final Logger LOGGER = LoggerFactory.getLogger(ReconnectingServerStoreProxy.class); @@ -54,8 +55,12 @@ public String getCacheId() { public void close() { try { proxy().close(); - } catch (ConnectionClosedException | ConnectionShutdownException e) { - LOGGER.debug("Store was already closed, since connection was closed"); + } catch (Throwable t) { + if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) { + LOGGER.debug("Store was already closed, since connection was closed"); + } else { + throw t; + } } } @@ -119,7 +124,7 @@ private T onStoreProxy(TimeoutExceptionFunction try { return function.apply(storeProxy); } catch (ServerStoreProxyException sspe) { - if (sspe.getCause() instanceof ConnectionClosedException) { + if (containsCause(sspe, ConnectionClosedException.class)) { if (delegateRef.compareAndSet(storeProxy, new ReconnectInProgressProxy(storeProxy.getCacheId()))) { onReconnect.run(); } diff --git a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/SimpleClusterTierClientEntity.java b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/SimpleClusterTierClientEntity.java index a2aa60dbe4..5db0bfef71 100644 --- a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/SimpleClusterTierClientEntity.java +++ b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/SimpleClusterTierClientEntity.java @@ -36,10 +36,7 @@ import org.terracotta.entity.EndpointDelegate; import org.terracotta.entity.EntityClientEndpoint; import org.terracotta.entity.EntityResponse; -import org.terracotta.entity.InvocationBuilder; -import org.terracotta.entity.InvokeFuture; -import org.terracotta.entity.MessageCodecException; -import org.terracotta.exception.EntityException; +import org.terracotta.entity.InvocationCallback; import java.time.Duration; import java.util.EnumSet; @@ -48,14 +45,18 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeoutException; -import java.util.function.LongSupplier; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static org.ehcache.clustered.client.config.Timeouts.nanosStartingFromNow; +import static org.terracotta.entity.InvocationCallback.Types.COMPLETE; +import static org.terracotta.entity.InvocationCallback.Types.RECEIVED; +import static org.terracotta.entity.InvocationCallback.Types.RETIRED; +import static org.terracotta.entity.InvocationCallback.Types.SENT; /** * ClusterTierClientEntity @@ -200,7 +201,7 @@ public void addResponseListener(Class respo @Override public void validate(ServerStoreConfiguration clientStoreConfiguration) throws ClusterTierValidationException, TimeoutException { try { - invokeInternalAndWait(endpoint.beginInvoke(), timeouts.getConnectionTimeout(), messageFactory.validateServerStore(storeIdentifier , clientStoreConfiguration), false); + invokeInternalAndWaitFor(COMPLETE, timeouts.getConnectionTimeout(), messageFactory.validateServerStore(storeIdentifier , clientStoreConfiguration)); } catch (ClusterException e) { throw new ClusterTierValidationException("Error validating cluster tier '" + storeIdentifier + "'", e); } @@ -212,46 +213,45 @@ public EhcacheEntityResponse invokeStateRepositoryOperation(StateRepositoryOpMes } @Override - public void invokeAndWaitForSend(EhcacheOperationMessage message, boolean track) throws TimeoutException { - invokeInternal(endpoint.beginInvoke().ackSent(), getTimeoutDuration(message), message, track); + public void invokeAndWaitForSend(EhcacheOperationMessage message, boolean track) throws TimeoutException, ClusterException { + invokeInternalAndWaitFor(SENT, getTimeoutDuration(message), message); } @Override public void invokeAndWaitForReceive(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException { - invokeInternalAndWait(endpoint.beginInvoke().ackReceived(), message, track); + invokeInternalAndWaitFor(RECEIVED, message); } @Override public EhcacheEntityResponse invokeAndWaitForComplete(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException { - return invokeInternalAndWait(endpoint.beginInvoke().blockGetOnRetire(false), message, track); + return invokeInternalAndWaitFor(COMPLETE, message); } @Override public EhcacheEntityResponse invokeAndWaitForRetired(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException { - return invokeInternalAndWait(endpoint.beginInvoke().blockGetOnRetire(true), message, track); + return invokeInternalAndWaitFor(RETIRED, message); } - private EhcacheEntityResponse invokeInternalAndWait(InvocationBuilder invocationBuilder, EhcacheOperationMessage message, boolean track) + private EhcacheEntityResponse invokeInternalAndWaitFor(InvocationCallback.Types type, EhcacheOperationMessage message) throws ClusterException, TimeoutException { - return invokeInternalAndWait(invocationBuilder, getTimeoutDuration(message), message, track); + return invokeInternalAndWaitFor(type, getTimeoutDuration(message), message); } - private EhcacheEntityResponse invokeInternalAndWait(InvocationBuilder invocationBuilder, Duration timeLimit, EhcacheEntityMessage message, boolean track) + private EhcacheEntityResponse invokeInternalAndWaitFor(InvocationCallback.Types type, Duration timeLimit, EhcacheEntityMessage message) throws ClusterException, TimeoutException { try { - LongSupplier nanosRemaining = nanosStartingFromNow(timeLimit); - InvokeFuture future = invokeInternal(invocationBuilder, Duration.ofNanos(nanosRemaining.getAsLong()), message, track); - EhcacheEntityResponse response = waitFor(nanosRemaining.getAsLong(), future); - if (EhcacheResponseType.FAILURE.equals(response.getResponseType())) { + Future future = endpoint.message(message).invokeAnd(type); + EhcacheEntityResponse response = waitFor(timeLimit.toNanos(), future); + if (response != null && EhcacheResponseType.FAILURE.equals(response.getResponseType())) { throw ((Failure)response).getCause(); } else { return response; } - } catch (EntityException e) { - throw new RuntimeException(message + " error: " + e.toString(), e); + } catch (ExecutionException e) { + throw new RuntimeException(message + " error: " + e.getCause(), e.getCause()); } catch (TimeoutException e) { String msg = "Timeout exceeded for " + message + " message; " + timeLimit; TimeoutException timeoutException = new TimeoutException(msg); @@ -261,31 +261,6 @@ private EhcacheEntityResponse invokeInternalAndWait(InvocationBuilder invokeInternal(InvocationBuilder invocationBuilder, Duration timeout, EhcacheEntityMessage message, boolean track) throws TimeoutException { - boolean interrupted = Thread.interrupted(); - try { - LongSupplier nanosRemaining = nanosStartingFromNow(timeout); - while (true) { - try { - long nanos = nanosRemaining.getAsLong(); - if (nanos > 0) { - return invocationBuilder.message(message).invokeWithTimeout(nanos, NANOSECONDS); - } else { - throw new TimeoutException("Timed out waiting for server response to message: " + message); - } - } catch (InterruptedException e) { - interrupted = true; - } - } - } catch (MessageCodecException e) { - throw new RuntimeException(message + " error: " + e.getMessage(), e); - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } - private Duration getTimeoutDuration(EhcacheOperationMessage message) { if (GET_STORE_OPS.contains(message.getMessageType())) { return timeouts.getReadOperationTimeout(); @@ -294,15 +269,18 @@ private Duration getTimeoutDuration(EhcacheOperationMessage message) { } } - private static T waitFor(long nanos, InvokeFuture future) - throws EntityException, TimeoutException { - boolean interrupted = false; + private static T waitFor(long nanos, Future future) + throws ExecutionException, TimeoutException { + boolean interrupted = Thread.interrupted(); long deadlineTimeout = System.nanoTime() + nanos; try { while (true) { try { long timeRemaining = deadlineTimeout - System.nanoTime(); - return future.getWithTimeout(timeRemaining, NANOSECONDS); + return future.get(timeRemaining, NANOSECONDS); + } catch (TimeoutException e) { + future.cancel(true); + throw e; } catch (InterruptedException e) { interrupted = true; } diff --git a/clustered/ehcache-client/src/test/java/org/ehcache/clustered/client/internal/service/ClusterTierManagerClientEntityExceptionTest.java b/clustered/ehcache-client/src/test/java/org/ehcache/clustered/client/internal/service/ClusterTierManagerClientEntityExceptionTest.java index 82ecc988d2..36bcd40f18 100644 --- a/clustered/ehcache-client/src/test/java/org/ehcache/clustered/client/internal/service/ClusterTierManagerClientEntityExceptionTest.java +++ b/clustered/ehcache-client/src/test/java/org/ehcache/clustered/client/internal/service/ClusterTierManagerClientEntityExceptionTest.java @@ -102,39 +102,11 @@ public void testServerExceptionPassThrough() throws Exception { } catch (RuntimeException e) { assertThat(e.getCause(), is(instanceOf(ClusterTierManagerValidationException.class))); - /* - * Find the last ClusterTierManagerClientEntity involved exception in the causal chain. This - * is where the server-side exception should have entered the client. - */ - Throwable clientSideException = null; - for (Throwable t = e; t.getCause() != null && t.getCause() != t; t = t.getCause()) { - for (StackTraceElement element : t.getStackTrace()) { - if (element.getClassName().endsWith("ClusterTierManagerClientEntity")) { - clientSideException = t; - } - } - } - assert clientSideException != null; - - /* - * In this specific failure case, the exception is expected to be an InvalidStoreException from - * the server and re-thrown in the client. - */ - Throwable clientSideCause = clientSideException.getCause(); - assertThat(clientSideCause, is(instanceOf(InvalidServerSideConfigurationException.class))); - - serverCheckLoop: - { - for (StackTraceElement element : clientSideCause.getStackTrace()) { - if (element.getClassName().endsWith("ClusterTierManagerActiveEntity")) { - break serverCheckLoop; - } - } - fail(clientSideException + " lacks server-based cause"); - } - - assertThat(clientSideException, is(instanceOf(InvalidServerSideConfigurationException.class))); + Throwable clientSide = e.getCause().getCause(); + assertThat(clientSide, is(instanceOf(InvalidServerSideConfigurationException.class))); + Throwable serverSide = clientSide.getCause(); + assertThat(serverSide, is(instanceOf(InvalidServerSideConfigurationException.class))); } finally { accessService.stop(); } diff --git a/clustered/ehcache-client/src/test/java/org/ehcache/clustered/client/internal/store/StrongServerStoreProxyTest.java b/clustered/ehcache-client/src/test/java/org/ehcache/clustered/client/internal/store/StrongServerStoreProxyTest.java index 4c7fc31bb0..1981654f31 100644 --- a/clustered/ehcache-client/src/test/java/org/ehcache/clustered/client/internal/store/StrongServerStoreProxyTest.java +++ b/clustered/ehcache-client/src/test/java/org/ehcache/clustered/client/internal/store/StrongServerStoreProxyTest.java @@ -477,7 +477,7 @@ public void compact(ServerStoreProxy.ChainEntry chain) { public void testAppendThrowsConnectionClosedExceptionDuringHashInvalidation() throws Exception { SimpleClusterTierClientEntity clientEntity1 = mock(SimpleClusterTierClientEntity.class); StrongServerStoreProxy serverStoreProxy1 = new StrongServerStoreProxy("testAppendThrowsConnectionClosedExceptionDuringHashInvalidation", clientEntity1, mock(ServerCallback.class)); - doThrow(new ConnectionClosedException("Test")).when(clientEntity1).invokeAndWaitForReceive(any(), anyBoolean()); + doThrow(new ConnectionClosedException("Test")).when(clientEntity1).invokeAndWaitForComplete(any(), anyBoolean()); when(clientEntity1.getTimeouts()).thenReturn(Timeouts.DEFAULT); when(clientEntity1.isConnected()).thenReturn(true); try { diff --git a/clustered/ehcache-clustered/build.gradle b/clustered/ehcache-clustered/build.gradle index 3bbd35dbec..cf77e3e956 100644 --- a/clustered/ehcache-clustered/build.gradle +++ b/clustered/ehcache-clustered/build.gradle @@ -89,6 +89,7 @@ dependencies { serverLibs project(':clustered:server:ehcache-entity') serverLibs project(':clustered:server:ehcache-service') + kit "org.terracotta.internal:terracotta-kit:$terracottaCoreVersion@tar.gz" kit "org.terracotta:platform-kit:$terracottaPlatformVersion@tar.gz" } @@ -120,12 +121,11 @@ distributions { } //tc kit into ('') { - from configurations.kit.elements.map { + from(configurations.kit.elements.map { files -> files.collect { tarTree(it) } - } - eachFile { f -> - // remove top level directory from the kit - f.path = f.path.replace("platform-kit-$terracottaPlatformVersion/", "") + }) { + // remove top level directories from the kits + eachFile(dropTopLevelDirectories(1)) } exclude { f -> // Exclude tc's README.txt - Issue 1273 @@ -191,3 +191,21 @@ publishing.publications.withType(MavenPublication) { } [distTar, distZip, distDir, installDist]*.dependsOn copyDocs, javadocJar, project(':ehcache').jar, project(':ehcache').javadocJar + +static Action dropTopLevelDirectories(int count) { + { fcd -> + RelativePath original = fcd.getRelativeSourcePath() + def originalSegments = original.segments + if (count >= originalSegments.length) { + if (original.isFile()) { + throw new IllegalStateException("Cannot drop ${count} ${(count > 1 ? 'directories' : 'directory')} from ${original}") + } else { + fcd.exclude() + } + } else { + def head = fcd.relativePath.segments.dropRight(originalSegments.length) + def tail = originalSegments.drop(count) + fcd.setRelativePath(new RelativePath(original.isFile(), (head + tail) as String[])) + } + } +} diff --git a/clustered/integration-test/src/test/java/org/ehcache/clustered/IterationFailureBehaviorTest.java b/clustered/integration-test/src/test/java/org/ehcache/clustered/IterationFailureBehaviorTest.java index 4a60afe49e..c9227b3215 100644 --- a/clustered/integration-test/src/test/java/org/ehcache/clustered/IterationFailureBehaviorTest.java +++ b/clustered/integration-test/src/test/java/org/ehcache/clustered/IterationFailureBehaviorTest.java @@ -119,7 +119,7 @@ public void testIteratorFailover() throws Exception { } catch (CacheIterationException e) { assertThat(e.getCause(), instanceOf(StoreAccessException.class)); assertThat(e.getCause().getCause(), instanceOf(ServerStoreProxyException.class)); - assertThat(e.getCause().getCause().getCause(), + assertThat(e.getCause().getCause().getCause().getCause(), either(instanceOf(ConnectionClosedException.class)) //lost in the space between active and passive .or(instanceOf(InvalidOperationException.class))); //picked up by the passive - it doesn't have our iterator } @@ -177,7 +177,7 @@ public void testIteratorReconnect() throws Exception { } catch (CacheIterationException e) { assertThat(e.getCause(), instanceOf(StoreAccessException.class)); assertThat(e.getCause().getCause(), instanceOf(ServerStoreProxyException.class)); - assertThat(e.getCause().getCause().getCause(), + assertThat(e.getCause().getCause().getCause().getCause(), either(instanceOf(ConnectionClosedException.class)) //lost in the space between the two cluster executions .or(instanceOf(InvalidOperationException.class))); //picked up by the new cluster - it doesn't have our iterator } diff --git a/clustered/integration-test/src/test/java/org/ehcache/clustered/TerminatedServerTest.java b/clustered/integration-test/src/test/java/org/ehcache/clustered/TerminatedServerTest.java index 793d03afcf..75aab179ca 100644 --- a/clustered/integration-test/src/test/java/org/ehcache/clustered/TerminatedServerTest.java +++ b/clustered/integration-test/src/test/java/org/ehcache/clustered/TerminatedServerTest.java @@ -87,8 +87,6 @@ @RunWith(Parallel.class) public class TerminatedServerTest { - private static final int CLIENT_MAX_PENDING_REQUESTS = 5; - private static Map OLD_PROPERTIES; @BeforeClass @@ -107,9 +105,6 @@ public static void setProperties() { overrideProperty(oldProperties, TCPropertiesConsts.L1_SHUTDOWN_THREADGROUP_GRACETIME, "1000"); overrideProperty(oldProperties, TCPropertiesConsts.TC_TRANSPORT_HANDSHAKE_TIMEOUT, "1000"); - // Used only by testTerminationFreezesTheClient to be able to fill the inflight queue - overrideProperty(oldProperties, TCPropertiesConsts.CLIENT_MAX_PENDING_REQUESTS, Integer.toString(CLIENT_MAX_PENDING_REQUESTS)); - OLD_PROPERTIES = oldProperties; } @@ -539,10 +534,7 @@ public void testTerminationFreezesTheClient() throws Exception { CLUSTER.get().getClusterControl().terminateAllServers(); - // Fill the inflight queue and check that we wait no longer than the read timeout - for (int i = 0; i < CLIENT_MAX_PENDING_REQUESTS; i++) { - cache.get(1L); - } + cache.get(1L); // The resilience strategy will pick it up and not exception is thrown new TimeLimitedTask(readOperationTimeout.multipliedBy(2)) { // I multiply by 2 to let some room after the expected timeout diff --git a/clustered/server/ehcache-entity/src/main/java/org/ehcache/clustered/lock/server/VoltronReadWriteLockServerEntityService.java b/clustered/server/ehcache-entity/src/main/java/org/ehcache/clustered/lock/server/VoltronReadWriteLockServerEntityService.java index b73b4930b8..4e7ccaec97 100644 --- a/clustered/server/ehcache-entity/src/main/java/org/ehcache/clustered/lock/server/VoltronReadWriteLockServerEntityService.java +++ b/clustered/server/ehcache-entity/src/main/java/org/ehcache/clustered/lock/server/VoltronReadWriteLockServerEntityService.java @@ -27,6 +27,7 @@ import org.terracotta.entity.ConcurrencyStrategy; import org.terracotta.entity.ConfigurationException; import org.terracotta.entity.EntityServerService; +import org.terracotta.entity.ExecutionStrategy; import org.terracotta.entity.MessageCodec; import org.terracotta.entity.PassiveServerEntity; import org.terracotta.entity.ServiceConfiguration; @@ -90,6 +91,11 @@ public SyncMessageCodec getSyncMessageCodec() { return LockSyncMessaging.syncCodec(); } + @Override + public ExecutionStrategy getExecutionStrategy(byte[] configuration) { + return message -> ExecutionStrategy.Location.ACTIVE; + } + private static ServiceConfiguration config(Class klazz) { return () -> klazz; } diff --git a/ehcache-core/src/main/java/org/ehcache/core/util/ExceptionUtil.java b/ehcache-core/src/main/java/org/ehcache/core/util/ExceptionUtil.java new file mode 100644 index 0000000000..a60709a964 --- /dev/null +++ b/ehcache-core/src/main/java/org/ehcache/core/util/ExceptionUtil.java @@ -0,0 +1,32 @@ +/* + * Copyright Terracotta, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.ehcache.core.util; + +public class ExceptionUtil { + + public static boolean containsCause(Throwable failure, Class cause) { + Throwable intermediate = failure; + do { + if (cause.isInstance(intermediate)) { + return true; + } + } while ((intermediate = intermediate.getCause()) != null); + + return false; + } + + +} diff --git a/gradle.properties b/gradle.properties index ee219eb0f0..96799038e9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -9,10 +9,10 @@ slf4jVersion = 1.7.36 sizeofVersion = 0.4.3 # Terracotta clustered -terracottaPlatformVersion = 5.9.18 -terracottaApisVersion = 1.8.2 -terracottaCoreVersion = 5.9.5 -terracottaPassthroughTestingVersion = 1.8.4 +terracottaPlatformVersion = 5.10.7 +terracottaApisVersion = 1.9.0 +terracottaCoreVersion = 5.10.8 +terracottaPassthroughTestingVersion = 1.9.0 terracottaUtilitiesVersion = 0.0.15 # Test lib versions From 67e8a9b16ec2ce9809d45a9db080c42467c9be41 Mon Sep 17 00:00:00 2001 From: Chris Dennis Date: Thu, 2 Feb 2023 08:07:41 -0500 Subject: [PATCH 2/3] [DO NOT MERGE] - target the specific flaky test --- azure-pipelines.yml | 12 ++++++------ .../internal/store/CommonServerStoreProxy.java | 2 +- ...lusteredCacheOpsReplicationMultiThreadedTest.java | 4 +--- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index e3fb0018ea..5793d4c30c 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -28,28 +28,28 @@ jobs: parameters: jdkVersion: '1.8' jobName: 'LinuxJava8' - gradleTasks: 'check -x dependencyCheckAggregate' + gradleTasks: ':clustered:integration-test:test --tests=org.ehcache.clustered.replication.BasicClusteredCacheOpsReplicationMultiThreadedTest' - template: build-templates/gradle-common.yml@templates parameters: jdkVersion: '1.8' options: '-PtestVM=java11Home' jobName: 'LinuxJava11' - gradleTasks: 'check -x dependencyCheckAggregate' + gradleTasks: ':clustered:integration-test:test --tests=org.ehcache.clustered.replication.BasicClusteredCacheOpsReplicationMultiThreadedTest' - template: build-templates/gradle-common.yml@templates parameters: jdkVersion: '1.8' options: '-PtestVM=java17Home' jobName: 'LinuxJava17' - gradleTasks: 'check -x dependencyCheckAggregate' + gradleTasks: ':clustered:integration-test:test --tests=org.ehcache.clustered.replication.BasicClusteredCacheOpsReplicationMultiThreadedTest' - template: build-templates/gradle-common.yml@templates parameters: vmImage: 'windows-latest' jdkVersion: '1.8' jobName: 'WindowsJava8' - gradleTasks: 'check -x dependencyCheckAggregate' + gradleTasks: ':clustered:integration-test:test --tests=org.ehcache.clustered.replication.BasicClusteredCacheOpsReplicationMultiThreadedTest' - template: build-templates/gradle-common.yml@templates parameters: @@ -57,7 +57,7 @@ jobs: jdkVersion: '1.8' options: '-PtestVM=java11Home' jobName: 'WindowsJava11' - gradleTasks: 'check -x dependencyCheckAggregate' + gradleTasks: ':clustered:integration-test:test --tests=org.ehcache.clustered.replication.BasicClusteredCacheOpsReplicationMultiThreadedTest' - template: build-templates/gradle-common.yml@templates parameters: @@ -65,4 +65,4 @@ jobs: jdkVersion: '1.8' options: '-PtestVM=java17Home' jobName: 'WindowsJava17' - gradleTasks: 'check -x dependencyCheckAggregate' + gradleTasks: ':clustered:integration-test:test --tests=org.ehcache.clustered.replication.BasicClusteredCacheOpsReplicationMultiThreadedTest' diff --git a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/CommonServerStoreProxy.java b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/CommonServerStoreProxy.java index 8f44034bc6..1d8d2949a6 100644 --- a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/CommonServerStoreProxy.java +++ b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/CommonServerStoreProxy.java @@ -143,7 +143,7 @@ public ChainEntry get(long key) throws TimeoutException { @Override public void append(long key, ByteBuffer payLoad) { try { - entity.invokeAndWaitForComplete(new AppendMessage(key, payLoad), true); + entity.invokeAndWaitForReceive(new AppendMessage(key, payLoad), true); } catch (Exception e) { throw new ServerStoreProxyException(e); } diff --git a/clustered/integration-test/src/test/java/org/ehcache/clustered/replication/BasicClusteredCacheOpsReplicationMultiThreadedTest.java b/clustered/integration-test/src/test/java/org/ehcache/clustered/replication/BasicClusteredCacheOpsReplicationMultiThreadedTest.java index ec10420c75..1896f48782 100644 --- a/clustered/integration-test/src/test/java/org/ehcache/clustered/replication/BasicClusteredCacheOpsReplicationMultiThreadedTest.java +++ b/clustered/integration-test/src/test/java/org/ehcache/clustered/replication/BasicClusteredCacheOpsReplicationMultiThreadedTest.java @@ -43,7 +43,6 @@ import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.terracotta.utilities.test.WaitForAssert; import java.io.Serializable; import java.time.Duration; @@ -81,7 +80,7 @@ @RunWith(ParallelParameterized.class) public class BasicClusteredCacheOpsReplicationMultiThreadedTest { - private static final int NUM_OF_THREADS = 10; + private static final int NUM_OF_THREADS = 4; private static final int JOB_SIZE = 100; private PersistentCacheManager cacheManager1; @@ -99,7 +98,6 @@ public static Consistency[] data() { @ClassRule @Rule public static final ParallelTestCluster CLUSTER = new ParallelTestCluster(newCluster(2).in(clusterPath()) - .withServerHeap(512) .withServiceFragment(offheapResource("primary-server-resource", 24)).build()); @Rule From 88ba97b287e48af4adbf10f3c8db2aaf1968fb93 Mon Sep 17 00:00:00 2001 From: Myron Scott Date: Thu, 2 Feb 2023 10:54:52 -0800 Subject: [PATCH 3/3] probe test servers --- azure-pipelines.yml | 12 ++--- ...dCacheOpsReplicationMultiThreadedTest.java | 53 ++++++++++++++++++- 2 files changed, 58 insertions(+), 7 deletions(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 5793d4c30c..204a220adf 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -28,28 +28,28 @@ jobs: parameters: jdkVersion: '1.8' jobName: 'LinuxJava8' - gradleTasks: ':clustered:integration-test:test --tests=org.ehcache.clustered.replication.BasicClusteredCacheOpsReplicationMultiThreadedTest' + gradleTasks: ':clustered:integration-test:test --info --tests=org.ehcache.clustered.replication.BasicClusteredCacheOpsReplicationMultiThreadedTest' - template: build-templates/gradle-common.yml@templates parameters: jdkVersion: '1.8' options: '-PtestVM=java11Home' jobName: 'LinuxJava11' - gradleTasks: ':clustered:integration-test:test --tests=org.ehcache.clustered.replication.BasicClusteredCacheOpsReplicationMultiThreadedTest' + gradleTasks: ':clustered:integration-test:test --info --tests=org.ehcache.clustered.replication.BasicClusteredCacheOpsReplicationMultiThreadedTest' - template: build-templates/gradle-common.yml@templates parameters: jdkVersion: '1.8' options: '-PtestVM=java17Home' jobName: 'LinuxJava17' - gradleTasks: ':clustered:integration-test:test --tests=org.ehcache.clustered.replication.BasicClusteredCacheOpsReplicationMultiThreadedTest' + gradleTasks: ':clustered:integration-test:test --info --tests=org.ehcache.clustered.replication.BasicClusteredCacheOpsReplicationMultiThreadedTest' - template: build-templates/gradle-common.yml@templates parameters: vmImage: 'windows-latest' jdkVersion: '1.8' jobName: 'WindowsJava8' - gradleTasks: ':clustered:integration-test:test --tests=org.ehcache.clustered.replication.BasicClusteredCacheOpsReplicationMultiThreadedTest' + gradleTasks: ':clustered:integration-test:test --info --tests=org.ehcache.clustered.replication.BasicClusteredCacheOpsReplicationMultiThreadedTest' - template: build-templates/gradle-common.yml@templates parameters: @@ -57,7 +57,7 @@ jobs: jdkVersion: '1.8' options: '-PtestVM=java11Home' jobName: 'WindowsJava11' - gradleTasks: ':clustered:integration-test:test --tests=org.ehcache.clustered.replication.BasicClusteredCacheOpsReplicationMultiThreadedTest' + gradleTasks: ':clustered:integration-test:test --info --tests=org.ehcache.clustered.replication.BasicClusteredCacheOpsReplicationMultiThreadedTest' - template: build-templates/gradle-common.yml@templates parameters: @@ -65,4 +65,4 @@ jobs: jdkVersion: '1.8' options: '-PtestVM=java17Home' jobName: 'WindowsJava17' - gradleTasks: ':clustered:integration-test:test --tests=org.ehcache.clustered.replication.BasicClusteredCacheOpsReplicationMultiThreadedTest' + gradleTasks: ':clustered:integration-test:test --info --tests=org.ehcache.clustered.replication.BasicClusteredCacheOpsReplicationMultiThreadedTest' diff --git a/clustered/integration-test/src/test/java/org/ehcache/clustered/replication/BasicClusteredCacheOpsReplicationMultiThreadedTest.java b/clustered/integration-test/src/test/java/org/ehcache/clustered/replication/BasicClusteredCacheOpsReplicationMultiThreadedTest.java index 1896f48782..024599539a 100644 --- a/clustered/integration-test/src/test/java/org/ehcache/clustered/replication/BasicClusteredCacheOpsReplicationMultiThreadedTest.java +++ b/clustered/integration-test/src/test/java/org/ehcache/clustered/replication/BasicClusteredCacheOpsReplicationMultiThreadedTest.java @@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.net.InetSocketAddress; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -52,6 +53,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -62,6 +64,7 @@ import java.util.concurrent.TimeoutException; import static org.ehcache.testing.StandardCluster.clusterPath; +import static org.ehcache.testing.StandardCluster.leaseLength; import static org.ehcache.testing.StandardCluster.newCluster; import static org.ehcache.testing.StandardCluster.offheapResource; import static org.hamcrest.MatcherAssert.assertThat; @@ -69,6 +72,9 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.fail; +import org.terracotta.connection.ConnectionException; +import org.terracotta.connection.Diagnostics; +import org.terracotta.connection.DiagnosticsFactory; /** @@ -98,7 +104,8 @@ public static Consistency[] data() { @ClassRule @Rule public static final ParallelTestCluster CLUSTER = new ParallelTestCluster(newCluster(2).in(clusterPath()) - .withServiceFragment(offheapResource("primary-server-resource", 24)).build()); + .withServiceFragment(offheapResource("primary-server-resource", 24)).withServiceFragment(leaseLength(Duration.ofDays(1))) + .build()); @Rule public final TestName testName = new TestName(); @@ -110,6 +117,8 @@ public static Consistency[] data() { private final ThreadLocalRandom random = ThreadLocalRandom.current(); private final ExecutorService executorService = Executors.newWorkStealingPool(NUM_OF_THREADS); + + private final Probe probe = new Probe(); @Before public void startServers() throws Exception { @@ -134,6 +143,7 @@ public void startServers() throws Exception { cache2 = cacheManager2.createCache(testName.getMethodName(), config); caches = Arrays.asList(cache1, cache2); + probe.loop(); } @After @@ -148,6 +158,7 @@ public void tearDown() throws Exception { if(cacheManager2 != null && cacheManager2.getStatus() != Status.UNINITIALIZED) { cacheManager2.close(); } + } @Test(timeout=180000) @@ -287,4 +298,44 @@ private static class BlobValue implements Serializable { private final byte[] data = new byte[10 * 1024]; } + public class Probe { + + /** + * @param args the command line arguments + */ + public void loop() { + String[] servers = CLUSTER.getClusterHostPorts(); + for (String hostPort : servers) { + String[] hp = hostPort.split("[:]"); + InetSocketAddress inet = InetSocketAddress.createUnresolved(hp[0], Integer.parseInt(hp[1])); + log.info("starting probe for " + hostPort); + new Thread(()->{ + while (!executorService.isShutdown()) { + + try (Diagnostics d = DiagnosticsFactory.connect(inet, new Properties())) { + while (!executorService.isShutdown()) { + try { + probe(d); + log.info("sleeping for 10 sec."); + Thread.sleep(10_000L); + } catch (InterruptedException ie) { + ie.printStackTrace(); + } + } + } catch (ConnectionException e) { + e.printStackTrace(); + } + } + + }).start(); + } + } + + private void probe(Diagnostics d) { + log.info("===== PROBE ====="); + log.info(d.getClusterState()); + log.info(d.getThreadDump()); + } + } + }