Skip to content

Commit

Permalink
[proxima-transaction-manager] O2-Czech-Republic#255 stabilize Transac…
Browse files Browse the repository at this point in the history
…tionIT
  • Loading branch information
je-ik committed Jan 19, 2023
1 parent 33369e7 commit aed3f7a
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ public void withHandle(ObserveHandle handle) {
private final Map<DirectAttributeFamilyDescriptor, HandleWithAssignment> clientObservedFamilies =
new ConcurrentHashMap<>();
private final Map<DirectAttributeFamilyDescriptor, CachedView> stateViews =
new ConcurrentHashMap<>();
Collections.synchronizedMap(new HashMap<>());
private final Map<String, BiConsumer<String, Response>> transactionResponseConsumers =
new ConcurrentHashMap<>();
@Getter private final TransactionConfig cfg = new TransactionConfig();
Expand All @@ -326,10 +326,10 @@ public void withHandle(ObserveHandle handle) {
private long transactionTimeoutMs;

@Getter(AccessLevel.PRIVATE)
private long cleanupIntervalMs;
private final long cleanupIntervalMs;

@Getter(AccessLevel.PACKAGE)
private InitialSequenceIdPolicy initialSequenceIdPolicy;
private final InitialSequenceIdPolicy initialSequenceIdPolicy;

@VisibleForTesting
public TransactionResourceManager(DirectDataOperator direct, Map<String, Object> cfg) {
Expand Down Expand Up @@ -457,13 +457,14 @@ public void runObservations(
consumerName);
CommitLogReader reader = Optionals.get(requestFamily.getCommitLogReader());

CachedView view = stateViews.get(stateFamily);
if (view == null) {
view = Optionals.get(stateFamily.getCachedView());
Duration ttl = Duration.ofMillis(cleanupIntervalMs);
stateViews.put(stateFamily, view);
view.assign(view.getPartitions(), updateConsumer, ttl);
}
stateViews.computeIfAbsent(
stateFamily,
k -> {
CachedView view = Optionals.get(stateFamily.getCachedView());
Duration ttl = Duration.ofMillis(cleanupIntervalMs);
view.assign(view.getPartitions(), updateConsumer, ttl);
return view;
});
initializedLatch.countDown();

serverObservedFamilies.put(
Expand Down Expand Up @@ -537,7 +538,7 @@ public boolean onNext(StreamElement ingest, OnNextContext context) {
public void onRepartition(OnRepartitionContext context) {
Preconditions.checkArgument(
context.partitions().isEmpty() || context.partitions().size() == numPartitions,
"At least all or none partitions need to be assigned to the consumer. Got %s partitions from %s",
"All or zero partitions must be assigned to the consumer. Got %s partitions from %s",
context.partitions().size(),
numPartitions);
if (!context.partitions().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import cz.o2.proxima.annotations.Internal;
import cz.o2.proxima.direct.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.commitlog.CommitLogObservers.TerminationStrategy;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.transaction.TransactionalOnlineAttributeWriter.TransactionRejectedException;
Expand Down Expand Up @@ -133,9 +134,14 @@ public Contextual(
@Override
void doTransform(StreamElement ingest, OffsetCommitter context) {
log.debug("Transformation {}: processing input {}", name, ingest);
CommitCallback commitCallback =
(succ, exc) -> {
onReplicated(ingest);
context.commit(succ, exc);
};
for (int i = 0; ; i++) {
try {
transformation.transform(ingest, context::commit);
transformation.transform(ingest, commitCallback);
break;
} catch (TransactionRejectedRuntimeException ex) {
log.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import cz.o2.proxima.transaction.Response.Flags;
import cz.o2.proxima.transform.ElementWiseTransformation;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -48,6 +50,40 @@ public class TransformationObserverTest {
private final EntityDescriptor gateway = repo.getEntity("gateway");
private final Regular<byte[]> armed = Regular.of(gateway, gateway.getAttribute("armed"));

@Test
public void testContextualTransformCallsOnReplicated() {
AtomicInteger failedCnt = new AtomicInteger();
DirectElementWiseTransform transform =
new DirectElementWiseTransform() {
@Override
public void setup(
Repository repo, DirectDataOperator directDataOperator, Map<String, Object> cfg) {}

@Override
public void transform(StreamElement input, CommitCallback commit) {
commit.commit(true, null);
}

@Override
public void close() {}
};

List<StreamElement> replicated = new ArrayList<>();
Contextual observer =
new Contextual(direct, "name", transform, false, new PassthroughFilter()) {
@Override
protected void onReplicated(StreamElement element) {
replicated.add(element);
}
};
observer.doTransform(
armed.upsert("key", System.currentTimeMillis(), new byte[] {}),
(succ, exc) -> {
assertTrue(succ);
});
assertEquals(1, replicated.size());
}

@Test
public void testTransactionRejectedExceptionHandling() {
AtomicInteger failedCnt = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ private void submitConsumerWithObserver(
} catch (Throwable err) {
completedLatch.countDown();
log.error("Error processing consumer {}", name, err);
if (consumer.onError(err)) {
if (consumer.onError(err) && !shutdown.get()) {
try {
submitConsumerWithObserver(
name, offsets, position, stopAtCurrent, preWrite, consumer, executor, handle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.transaction.Commit;
import cz.o2.proxima.transaction.KeyAttribute;
import cz.o2.proxima.transaction.KeyAttributes;
import cz.o2.proxima.transaction.Response;
Expand All @@ -63,6 +64,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Before;
Expand All @@ -83,6 +85,7 @@ public class TransactionIT {
private CachedView view;
private ClientTransactionManager client;
private ObserveHandle transformationHandle;
private volatile @Nullable CountDownLatch replicatedLatch;

@Before
public void setUp() {
Expand All @@ -95,12 +98,19 @@ public void setUp() {
view = Optionals.get(direct.getCachedView(amount));
view.assign(view.getPartitions());
observer.run("transaction-observer");
EntityDescriptor transaction = repo.getEntity("_transaction");
Regular<Commit> commit = Regular.of(transaction, transaction.getAttribute("commit"));
transformationHandle =
TransformationRunner.runTransformation(
direct,
"_transaction-commit",
repo.getTransformations().get("_transaction-commit"),
ign -> {});
elem -> {
Commit value = Optionals.get(commit.valueOf(elem));
if (value.getSeqId() > 0) {
Optional.ofNullable(replicatedLatch).ifPresent(CountDownLatch::countDown);
}
});
}

@After
Expand All @@ -113,14 +123,13 @@ public void tearDown() {
@Test(timeout = 100_000)
public void testAtomicAmountTransfer() throws InterruptedException {
// we begin with all amounts equal to zero
// we randomly reshuffle random amounts between users and then we verify, that the sum is zero

// we randomly reshuffle random amounts between users, and then we verify that the sum is zero
int numThreads = 20;
int numSwaps = 500;
int numUsers = 20;
CountDownLatch latch = new CountDownLatch(numThreads);
ExecutorService service = direct.getContext().getExecutorService();
AtomicReference<Throwable> err = new AtomicReference<>();
replicatedLatch = new CountDownLatch(numSwaps);

for (int i = 0; i < numThreads; i++) {
service.submit(
Expand All @@ -135,10 +144,9 @@ public void testAtomicAmountTransfer() throws InterruptedException {
log.error("Failed to run the transaction", ex);
err.set(ex);
}
latch.countDown();
});
}
latch.await();
replicatedLatch.await();
if (err.get() != null) {
throw new RuntimeException(err.get());
}
Expand Down

0 comments on commit aed3f7a

Please sign in to comment.