From 0f1ce8cc22336f4fbc70efba6af45d4bcafc4e7f Mon Sep 17 00:00:00 2001 From: MrBender2996 Date: Tue, 9 Feb 2021 11:42:18 +0300 Subject: [PATCH 1/2] ConcurrentProcessTest improved --- .../green/cproc/ConcurrentProcessTest.java | 184 +++++++++--------- 1 file changed, 94 insertions(+), 90 deletions(-) diff --git a/src/test/java/org/green/cproc/ConcurrentProcessTest.java b/src/test/java/org/green/cproc/ConcurrentProcessTest.java index bbb56f9..13776d0 100644 --- a/src/test/java/org/green/cproc/ConcurrentProcessTest.java +++ b/src/test/java/org/green/cproc/ConcurrentProcessTest.java @@ -35,68 +35,66 @@ import static org.junit.Assert.assertTrue; public class ConcurrentProcessTest { - private static final boolean MAX_MODE = Boolean.getBoolean("org.green.cproc.test.max_mode"); + private static final boolean MAX_MODE = Boolean.getBoolean("deltix.deltix.wct.concurrent.process.test.max_mode"); private static final int TEST_MULTIPLIER = MAX_MODE ? 20 : 1; - private static final int TEST_TIMEOUT = 20 * TEST_MULTIPLIER; + private static final int TEST_TIMEOUT = 100 * TEST_MULTIPLIER; private static final int CAB_SIZE = 10_000; private static final int BACKING_OFF_MAX_SPINS = 1_000; private static final int BACKING_OFF_MAX_YIELDS = 10_000; - @Rule - public Timeout globalTimeout = Timeout.seconds(TEST_TIMEOUT); - @Test - public void testExecuteSync() throws Exception { - final int sleep = 2_000; - - final TestExecutor.Listener listener = new TestExecutor.Listener() { - @Override - public void onTestEntryAProcessed() { - } + public void testExecuteSync() { + assertTimeoutPreemptively(Duration.ofSeconds(TEST_TIMEOUT), () -> { + final int sleep = 2_000; - @Override - public void onTestEntryBProcessed() { - } + final TestExecutor.Listener listener = new TestExecutor.Listener() { + @Override + public void onTestEntryAProcessed() { + } - @Override - public void onStartExecuted() { - try { - Thread.sleep(sleep); - } catch (final InterruptedException ignore) { + @Override + public void onTestEntryBProcessed() { } - } - @Override - public void onStopExecuted() { - } + @Override + public void onStartExecuted() { + try { + Thread.sleep(sleep); + } catch (final InterruptedException ignore) { + } + } - @Override - public void onTestCommandAExecuted() { - } + @Override + public void onStopExecuted() { + } - @Override - public void onTestCommandBExecuted() { - } - }; + @Override + public void onTestCommandAExecuted() { + } - try (TestProcess process = - new TestProcess( - new CabBackingOff<>(CAB_SIZE, BACKING_OFF_MAX_SPINS, BACKING_OFF_MAX_YIELDS), listener)) { + @Override + public void onTestCommandBExecuted() { + } + }; - final long startTime = System.nanoTime(); + try (TestProcess process = + new TestProcess( + new CabBackingOff<>(CAB_SIZE, BACKING_OFF_MAX_SPINS, BACKING_OFF_MAX_YIELDS), + listener)) { - final Future feature = process.start(); + final long startTime = System.nanoTime(); - feature.sync(); + process.start().sync(); - final long spentTime = (System.nanoTime() - startTime) / 1_000_000; + final long spentTime = (System.nanoTime() - startTime) / 1_000_000; - final long jitterThreshold = sleep / 5; // useful with single CPU core test environment + final long jitterThreshold = sleep / 5; // useful with single CPU core test environment - assertTrue(spentTime >= (sleep - jitterThreshold)); - } + assertTrue(spentTime >= (sleep - jitterThreshold)); + } + }); } @Test @@ -110,18 +108,21 @@ public void threeWorkersScenarioTest() throws Exception { } private void nWorkersScenarioTest(final TestTarget target) throws Exception { - try (TestProcess process = - new TestProcess( - new CabBackingOff<>(CAB_SIZE, BACKING_OFF_MAX_SPINS, BACKING_OFF_MAX_YIELDS), target)) { + assertTimeoutPreemptively(Duration.ofSeconds(TEST_TIMEOUT), () -> { + try (TestProcess process = + new TestProcess( + new CabBackingOff<>(CAB_SIZE, BACKING_OFF_MAX_SPINS, BACKING_OFF_MAX_YIELDS), + target)) { - final TestScenarioGroup workerGroup = new TestScenarioGroup(process, target); + final TestScenarioGroup workerGroup = new TestScenarioGroup(process, target); - workerGroup.start(); + workerGroup.start(); - workerGroup.join(); - } + workerGroup.reach(); + } - target.reach(); + target.reach(); + }); } class TestTarget implements TestExecutor.Listener { @@ -228,11 +229,16 @@ class TestScenarioGroup { testScenarios = new TestScenario[target.numberOfWorkers]; - final CountDownLatch beforeCommandSequence = new CountDownLatch(testScenarios.length); - final CountDownLatch afterCommandSequence = new CountDownLatch(testScenarios.length); + final CountDownLatch onAddProcessListener = new CountDownLatch(testScenarios.length); + final CountDownLatch onProcessStartListener = new CountDownLatch(target.numberOfStartsTotal); + final CountDownLatch onCommandAListener = new CountDownLatch(target.numberOfTestCommandsATotal); + final CountDownLatch onCommandBListener = new CountDownLatch(target.numberOfTestCommandsBTotal); + final CountDownLatch onProcessStopListener = new CountDownLatch(target.numberOfStopsTotal); + final CountDownLatch onRemoveProcessListener = new CountDownLatch(testScenarios.length); - for (int i = 0; i < testScenarios.length; i++) { - testScenarios[i] = new TestScenario(i, process, target, beforeCommandSequence, afterCommandSequence); + for (int i = 0; i < testScenarios.length; i++) { + testScenarios[i] = new TestScenario(i, process, target, onAddProcessListener, onProcessStartListener, + onCommandAListener, onCommandBListener, onProcessStopListener, onRemoveProcessListener); } } @@ -242,12 +248,12 @@ void start() { } } - void join() throws InterruptedException { + void reach() throws InterruptedException { for (final TestScenario testScenario : testScenarios) { testScenario.join(); } for (final TestScenario testScenario : testScenarios) { - testScenario.postCheck(); + testScenario.checkErrors(); } } } @@ -256,33 +262,39 @@ class TestScenario extends Thread implements TestProcessListener { private final int id; private final TestProcess process; private final TestTarget target; - private final CountDownLatch beforeCommandSequence; - private final CountDownLatch afterCommandSequence; + private final CountDownLatch onAddProcessListener; + private final CountDownLatch onRemoveProcessListener; + + private final CountDownLatch onProcessStartListener; + private final CountDownLatch onCommandAListener; + private final CountDownLatch onCommandBListener; + private final CountDownLatch onProcessStopListener; private volatile Exception scenarioError; private volatile Exception executionError; - private volatile int addMyListenerCount; - private volatile int removeMyListenerCount; - private volatile int testCommandACount; - private volatile int testCommandBCount; - private volatile int startCount; - private volatile int stopCount; - TestScenario( final int id, final TestProcess process, final TestTarget target, - final CountDownLatch beforeCommandSequence, - final CountDownLatch afterCommandSequence) { + final CountDownLatch onAddProcessListener, + final CountDownLatch onProcessStartListener, + final CountDownLatch onCommandAListener, + final CountDownLatch onCommandBListener, + final CountDownLatch onProcessStopListener, + final CountDownLatch onRemoveProcessListener) { super(TestScenario.class.getSimpleName() + "#" + id); this.id = id; this.process = process; this.target = target; - this.beforeCommandSequence = beforeCommandSequence; - this.afterCommandSequence = afterCommandSequence; + this.onAddProcessListener = onAddProcessListener; + this.onProcessStartListener = onProcessStartListener; + this.onCommandAListener = onCommandAListener; + this.onCommandBListener = onCommandBListener; + this.onProcessStopListener = onProcessStopListener; + this.onRemoveProcessListener = onRemoveProcessListener; } @Override @@ -290,10 +302,7 @@ public void run() { try { process.addListener(this); - // let's wait for all listeners - // to count all the following signals - beforeCommandSequence.countDown(); - beforeCommandSequence.await(); + onAddProcessListener.await(); int testEntriesA = 0; int testEntriesB = 0; @@ -345,12 +354,14 @@ public void run() { i++; } + onProcessStartListener.await(); + onCommandAListener.await(); + onCommandBListener.await(); + onProcessStopListener.await(); + process.removeListener(this); - // let's wait for all commands - // before a listener is removed - afterCommandSequence.countDown(); - afterCommandSequence.await(); + onRemoveProcessListener.await(); } catch (final Exception e) { e.printStackTrace(System.err); @@ -364,7 +375,7 @@ public void onAddProcessListener(final TestExecutor executor, final ListenerResu executionError = result.error(); } if (result.listener() == this) { - addMyListenerCount++; // OK, since happens in one single thread + onAddProcessListener.countDown(); } } @@ -374,7 +385,7 @@ public void onRemoveProcessListener(final TestExecutor executor, final ListenerR executionError = result.error(); } if (result.listener() == this) { - removeMyListenerCount++; // OK, since happens in one single thread + onRemoveProcessListener.countDown(); } } @@ -383,7 +394,7 @@ public void onTestCommandA(final TestExecutor executor, final TestResult result) if (result.error() != null) { executionError = result.error(); } - testCommandACount++; // OK, since happens in one single thread + onCommandAListener.countDown(); } @Override @@ -391,7 +402,7 @@ public void onTestCommandB(final TestExecutor executor, final TestResult result) if (result.error() != null) { executionError = result.error(); } - testCommandBCount++; // OK, since happens in one single thread + onCommandBListener.countDown(); } @Override @@ -399,7 +410,7 @@ public void onStart(final TestExecutor executor, final VoidResult result) { if (result.error() != null) { executionError = result.error(); } - startCount++; // OK, since happens in one single thread + onProcessStartListener.countDown(); } @Override @@ -407,19 +418,12 @@ public void onStop(final TestExecutor executor, final VoidResult result) { if (result.error() != null) { executionError = result.error(); } - stopCount++; // OK, since happens in one single thread + onProcessStopListener.countDown(); } - void postCheck() { + void checkErrors() { assertNull(scenarioError); assertNull(executionError); - - assertEquals(1, addMyListenerCount); - assertEquals(1, removeMyListenerCount); - assertEquals(target.numberOfTestCommandsATotal, testCommandACount); - assertEquals(target.numberOfTestCommandsBTotal, testCommandBCount); - assertEquals(target.numberOfStartsTotal, startCount); - assertEquals(target.numberOfStopsTotal, stopCount); } } From 1205d03e651cc5dcddb12cb23cc0aecab91ae602 Mon Sep 17 00:00:00 2001 From: MrBender2996 Date: Tue, 9 Feb 2021 17:23:45 +0300 Subject: [PATCH 2/2] ConcurrentProcessTest fix checkstyle --- src/test/java/org/green/cproc/ConcurrentProcessTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/org/green/cproc/ConcurrentProcessTest.java b/src/test/java/org/green/cproc/ConcurrentProcessTest.java index 13776d0..a36b938 100644 --- a/src/test/java/org/green/cproc/ConcurrentProcessTest.java +++ b/src/test/java/org/green/cproc/ConcurrentProcessTest.java @@ -225,7 +225,7 @@ class TestScenarioGroup { private final TestScenario[] testScenarios; TestScenarioGroup(final TestProcess process, - final TestTarget target) { + final TestTarget target) { testScenarios = new TestScenario[target.numberOfWorkers]; @@ -236,7 +236,7 @@ class TestScenarioGroup { final CountDownLatch onProcessStopListener = new CountDownLatch(target.numberOfStopsTotal); final CountDownLatch onRemoveProcessListener = new CountDownLatch(testScenarios.length); - for (int i = 0; i < testScenarios.length; i++) { + for (int i = 0; i < testScenarios.length; i++) { testScenarios[i] = new TestScenario(i, process, target, onAddProcessListener, onProcessStartListener, onCommandAListener, onCommandBListener, onProcessStopListener, onRemoveProcessListener); }