Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unit test improvement #1

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 94 additions & 90 deletions src/test/java/org/green/cproc/ConcurrentProcessTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,68 +35,66 @@
import static org.junit.Assert.assertTrue;

public class ConcurrentProcessTest {
private static final boolean MAX_MODE = Boolean.getBoolean("org.green.cproc.test.max_mode");
private static final boolean MAX_MODE = Boolean.getBoolean("deltix.deltix.wct.concurrent.process.test.max_mode");

private static final int TEST_MULTIPLIER = MAX_MODE ? 20 : 1;
private static final int TEST_TIMEOUT = 20 * TEST_MULTIPLIER;
private static final int TEST_TIMEOUT = 100 * TEST_MULTIPLIER;

private static final int CAB_SIZE = 10_000;
private static final int BACKING_OFF_MAX_SPINS = 1_000;
private static final int BACKING_OFF_MAX_YIELDS = 10_000;

@Rule
public Timeout globalTimeout = Timeout.seconds(TEST_TIMEOUT);

@Test
public void testExecuteSync() throws Exception {
final int sleep = 2_000;

final TestExecutor.Listener listener = new TestExecutor.Listener() {
@Override
public void onTestEntryAProcessed() {
}
public void testExecuteSync() {
assertTimeoutPreemptively(Duration.ofSeconds(TEST_TIMEOUT), () -> {
final int sleep = 2_000;

@Override
public void onTestEntryBProcessed() {
}
final TestExecutor.Listener listener = new TestExecutor.Listener() {
@Override
public void onTestEntryAProcessed() {
}

@Override
public void onStartExecuted() {
try {
Thread.sleep(sleep);
} catch (final InterruptedException ignore) {
@Override
public void onTestEntryBProcessed() {
}
}

@Override
public void onStopExecuted() {
}
@Override
public void onStartExecuted() {
try {
Thread.sleep(sleep);
} catch (final InterruptedException ignore) {
}
}

@Override
public void onTestCommandAExecuted() {
}
@Override
public void onStopExecuted() {
}

@Override
public void onTestCommandBExecuted() {
}
};
@Override
public void onTestCommandAExecuted() {
}

try (TestProcess process =
new TestProcess(
new CabBackingOff<>(CAB_SIZE, BACKING_OFF_MAX_SPINS, BACKING_OFF_MAX_YIELDS), listener)) {
@Override
public void onTestCommandBExecuted() {
}
};

final long startTime = System.nanoTime();
try (TestProcess process =
new TestProcess(
new CabBackingOff<>(CAB_SIZE, BACKING_OFF_MAX_SPINS, BACKING_OFF_MAX_YIELDS),
listener)) {

final Future feature = process.start();
final long startTime = System.nanoTime();

feature.sync();
process.start().sync();

final long spentTime = (System.nanoTime() - startTime) / 1_000_000;
final long spentTime = (System.nanoTime() - startTime) / 1_000_000;

final long jitterThreshold = sleep / 5; // useful with single CPU core test environment
final long jitterThreshold = sleep / 5; // useful with single CPU core test environment

assertTrue(spentTime >= (sleep - jitterThreshold));
}
assertTrue(spentTime >= (sleep - jitterThreshold));
}
});
}

@Test
Expand All @@ -110,18 +108,21 @@ public void threeWorkersScenarioTest() throws Exception {
}

private void nWorkersScenarioTest(final TestTarget target) throws Exception {
try (TestProcess process =
new TestProcess(
new CabBackingOff<>(CAB_SIZE, BACKING_OFF_MAX_SPINS, BACKING_OFF_MAX_YIELDS), target)) {
assertTimeoutPreemptively(Duration.ofSeconds(TEST_TIMEOUT), () -> {
try (TestProcess process =
new TestProcess(
new CabBackingOff<>(CAB_SIZE, BACKING_OFF_MAX_SPINS, BACKING_OFF_MAX_YIELDS),
target)) {

final TestScenarioGroup workerGroup = new TestScenarioGroup(process, target);
final TestScenarioGroup workerGroup = new TestScenarioGroup(process, target);

workerGroup.start();
workerGroup.start();

workerGroup.join();
}
workerGroup.reach();
}

target.reach();
target.reach();
});
}

class TestTarget implements TestExecutor.Listener {
Expand Down Expand Up @@ -224,15 +225,20 @@ class TestScenarioGroup {
private final TestScenario[] testScenarios;

TestScenarioGroup(final TestProcess process,
final TestTarget target) {
final TestTarget target) {

testScenarios = new TestScenario[target.numberOfWorkers];

final CountDownLatch beforeCommandSequence = new CountDownLatch(testScenarios.length);
final CountDownLatch afterCommandSequence = new CountDownLatch(testScenarios.length);
final CountDownLatch onAddProcessListener = new CountDownLatch(testScenarios.length);
final CountDownLatch onProcessStartListener = new CountDownLatch(target.numberOfStartsTotal);
final CountDownLatch onCommandAListener = new CountDownLatch(target.numberOfTestCommandsATotal);
final CountDownLatch onCommandBListener = new CountDownLatch(target.numberOfTestCommandsBTotal);
final CountDownLatch onProcessStopListener = new CountDownLatch(target.numberOfStopsTotal);
final CountDownLatch onRemoveProcessListener = new CountDownLatch(testScenarios.length);

for (int i = 0; i < testScenarios.length; i++) {
testScenarios[i] = new TestScenario(i, process, target, beforeCommandSequence, afterCommandSequence);
testScenarios[i] = new TestScenario(i, process, target, onAddProcessListener, onProcessStartListener,
onCommandAListener, onCommandBListener, onProcessStopListener, onRemoveProcessListener);
}
}

Expand All @@ -242,12 +248,12 @@ void start() {
}
}

void join() throws InterruptedException {
void reach() throws InterruptedException {
for (final TestScenario testScenario : testScenarios) {
testScenario.join();
}
for (final TestScenario testScenario : testScenarios) {
testScenario.postCheck();
testScenario.checkErrors();
}
}
}
Expand All @@ -256,44 +262,47 @@ class TestScenario extends Thread implements TestProcessListener {
private final int id;
private final TestProcess process;
private final TestTarget target;
private final CountDownLatch beforeCommandSequence;
private final CountDownLatch afterCommandSequence;
private final CountDownLatch onAddProcessListener;
private final CountDownLatch onRemoveProcessListener;

private final CountDownLatch onProcessStartListener;
private final CountDownLatch onCommandAListener;
private final CountDownLatch onCommandBListener;
private final CountDownLatch onProcessStopListener;

private volatile Exception scenarioError;
private volatile Exception executionError;

private volatile int addMyListenerCount;
private volatile int removeMyListenerCount;
private volatile int testCommandACount;
private volatile int testCommandBCount;
private volatile int startCount;
private volatile int stopCount;

TestScenario(
final int id,
final TestProcess process,
final TestTarget target,
final CountDownLatch beforeCommandSequence,
final CountDownLatch afterCommandSequence) {
final CountDownLatch onAddProcessListener,
final CountDownLatch onProcessStartListener,
final CountDownLatch onCommandAListener,
final CountDownLatch onCommandBListener,
final CountDownLatch onProcessStopListener,
final CountDownLatch onRemoveProcessListener) {

super(TestScenario.class.getSimpleName() + "#" + id);

this.id = id;
this.process = process;
this.target = target;
this.beforeCommandSequence = beforeCommandSequence;
this.afterCommandSequence = afterCommandSequence;
this.onAddProcessListener = onAddProcessListener;
this.onProcessStartListener = onProcessStartListener;
this.onCommandAListener = onCommandAListener;
this.onCommandBListener = onCommandBListener;
this.onProcessStopListener = onProcessStopListener;
this.onRemoveProcessListener = onRemoveProcessListener;
}

@Override
public void run() {
try {
process.addListener(this);

// let's wait for all listeners
// to count all the following signals
beforeCommandSequence.countDown();
beforeCommandSequence.await();
onAddProcessListener.await();

int testEntriesA = 0;
int testEntriesB = 0;
Expand Down Expand Up @@ -345,12 +354,14 @@ public void run() {
i++;
}

onProcessStartListener.await();
onCommandAListener.await();
onCommandBListener.await();
onProcessStopListener.await();

process.removeListener(this);

// let's wait for all commands
// before a listener is removed
afterCommandSequence.countDown();
afterCommandSequence.await();
onRemoveProcessListener.await();

} catch (final Exception e) {
e.printStackTrace(System.err);
Expand All @@ -364,7 +375,7 @@ public void onAddProcessListener(final TestExecutor executor, final ListenerResu
executionError = result.error();
}
if (result.listener() == this) {
addMyListenerCount++; // OK, since happens in one single thread
onAddProcessListener.countDown();
}
}

Expand All @@ -374,7 +385,7 @@ public void onRemoveProcessListener(final TestExecutor executor, final ListenerR
executionError = result.error();
}
if (result.listener() == this) {
removeMyListenerCount++; // OK, since happens in one single thread
onRemoveProcessListener.countDown();
}
}

Expand All @@ -383,43 +394,36 @@ public void onTestCommandA(final TestExecutor executor, final TestResult result)
if (result.error() != null) {
executionError = result.error();
}
testCommandACount++; // OK, since happens in one single thread
onCommandAListener.countDown();
}

@Override
public void onTestCommandB(final TestExecutor executor, final TestResult result) {
if (result.error() != null) {
executionError = result.error();
}
testCommandBCount++; // OK, since happens in one single thread
onCommandBListener.countDown();
}

@Override
public void onStart(final TestExecutor executor, final VoidResult result) {
if (result.error() != null) {
executionError = result.error();
}
startCount++; // OK, since happens in one single thread
onProcessStartListener.countDown();
}

@Override
public void onStop(final TestExecutor executor, final VoidResult result) {
if (result.error() != null) {
executionError = result.error();
}
stopCount++; // OK, since happens in one single thread
onProcessStopListener.countDown();
}

void postCheck() {
void checkErrors() {
assertNull(scenarioError);
assertNull(executionError);

assertEquals(1, addMyListenerCount);
assertEquals(1, removeMyListenerCount);
assertEquals(target.numberOfTestCommandsATotal, testCommandACount);
assertEquals(target.numberOfTestCommandsBTotal, testCommandBCount);
assertEquals(target.numberOfStartsTotal, startCount);
assertEquals(target.numberOfStopsTotal, stopCount);
}
}

Expand Down