Skip to content

Commit

Permalink
Secure that only 1 job scheduled will trigger at the same time.
Browse files Browse the repository at this point in the history
Lock entry on myversion table has been introduced.
  • Loading branch information
vertigo17 committed Dec 29, 2024
1 parent 35074b4 commit fd7d267
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 40 deletions.
22 changes: 22 additions & 0 deletions source/src/main/java/org/cerberus/core/crud/dao/IMyVersionDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,32 @@ public interface IMyVersionDAO {
*/
MyVersion findMyVersionByKey(String key);

/**
*
* @param myVersion
* @return
*/
boolean update(MyVersion myVersion);

/**
*
* @param myVersion
* @return
*/
boolean updateMyVersionString(MyVersion myVersion);

/**
*
* @param myVersion
* @return
*/
boolean updateAndLockSchedulerVersion(long myVersion);

/**
*
* @param key
* @return
*/
boolean flagMyVersionString(String key);

}
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,50 @@ public boolean updateMyVersionString(MyVersion myVersion) {
return result;
}

@Override
public boolean updateAndLockSchedulerVersion(long myVersion) {
boolean result = false;
final String query = "UPDATE myversion SET value = ? WHERE `key` = 'scheduler_active_instance_version' and value < ?";

// Debug message on SQL.
if (LOG.isDebugEnabled()) {
LOG.debug("SQL : " + query);
LOG.debug("SQL.param.value : " + myVersion);
LOG.debug("SQL.param.value : " + (myVersion - 10000));
}

Connection connection = this.databaseSpring.connect();
try {
PreparedStatement preStat = connection.prepareStatement(query);
try {
preStat.setLong(1, myVersion);
preStat.setLong(2, myVersion - 10000);

if (preStat.executeUpdate() >= 1) {
result = true;
} else {
result = false;
}

} catch (SQLException exception) {
LOG.warn("Unable to execute query : " + exception.toString());
} finally {
preStat.close();
}
} catch (SQLException exception) {
LOG.warn("Unable to execute query : " + exception.toString());
} finally {
try {
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
LOG.warn(e.toString());
}
}
return result;
}

@Override
public boolean flagMyVersionString(String key) {
boolean result = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ public interface IMyVersionService {
*/
boolean updateMyVersionString(String key, String value);

/**
*
* @param value
* @return true if the update was done. False in case there were an issue.
*/
boolean updateAndLockSchedulerVersion(long value);

/**
* Flag the key. Means that the method will return true if the previous
* value was N and update manage to move it to Y. It returns false if the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public boolean updateMyVersionString(String key, String value) {
return this.myVersionDAO.updateMyVersionString(DtbVersion);
}

@Override
public boolean updateAndLockSchedulerVersion(long value) {
return this.myVersionDAO.updateAndLockSchedulerVersion(value);
}

@Override
public boolean flagMyVersionString(String key) {
return this.myVersionDAO.flagMyVersionString(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
*/
package org.cerberus.core.engine.scheduledtasks;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Date;
import java.util.TimeZone;
import org.cerberus.core.crud.service.IMyVersionService;
import org.cerberus.core.crud.service.IParameterService;
import org.cerberus.core.crud.service.ITestCaseExecutionQueueDepService;
import org.cerberus.core.crud.service.ITestCaseExecutionQueueService;
Expand Down Expand Up @@ -47,98 +54,153 @@ public class ScheduledTaskRunner {
private SchedulerInit schedulerInit;
@Autowired
private ITestCaseExecutionQueueDepService testCaseExecutionQueueDepService;
@Autowired
private IMyVersionService myVersionService;

private int b1TickNumberTarget = 60;
private int b1TickNumber = 1;
private int b2TickNumberTarget = 30;
private int b2TickNumber = 1;
private int b3TickNumberTarget = 1;
private int b3TickNumber = 1;
private int b4TickNumberTarget = 1;
private int b4TickNumber = 1;

private long loadingTimestamp = 0;
private boolean instanceActive = true;

private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.S";

private static final org.apache.logging.log4j.Logger LOG = org.apache.logging.log4j.LogManager.getLogger(ScheduledTaskRunner.class);

@Scheduled(fixedRate = 60000, initialDelay = 30000 /* Every minute */)
public void nextStep() {
LOG.debug("Schedule Start. " + b1TickNumber + "/" + b1TickNumberTarget + " - " + b2TickNumber + "/" + b2TickNumberTarget + " - " + b3TickNumber + "/" + b3TickNumberTarget);

// We get the new period from parameter and trigger the Queue automatic cancellation job.
b1TickNumberTarget = parameterService.getParameterIntegerByKey("cerberus_automaticqueuecancellationjob_period", "", 60);
b2TickNumberTarget = parameterService.getParameterIntegerByKey("cerberus_automaticqueueprocessingjob_period", "", 30);

if (b1TickNumber < b1TickNumberTarget) {
b1TickNumber++;
} else {
b1TickNumber = 1;
performBatch1_CancelOldQueueEntries();
/**
* Secure only 1 Task trigger on a given cerberus instance. Multiple
* jobs could be triggered if several instance is running. The first
* scheduler trigger will define loadingTimestamp and instanceActive
* based on a lock value defined at database level. Only the 1st update
* (within 10 second delay) will be considered as active. All others
* will be disabled.
*/
if (loadingTimestamp == 0) {
Date d = new Date();
TimeZone tz = TimeZone.getTimeZone("UTC");
DateFormat df = new SimpleDateFormat(DATE_FORMAT);
df.setTimeZone(tz);
long newVersion = new java.util.Date().getTime();
loadingTimestamp = newVersion;
LOG.debug("Setting local scheduler Version to : {}", newVersion);
instanceActive = myVersionService.updateAndLockSchedulerVersion(newVersion);
}

if (b2TickNumber < b2TickNumberTarget) {
b2TickNumber++;
} else {
b2TickNumber = 1;
// We trigger the Queue Processing job.
performBatch2_ProcessQueue();
}
if (instanceActive) {

LOG.debug("Schedule ({}) Start. "
+ b1TickNumber + "/" + b1TickNumberTarget + " - "
+ b2TickNumber + "/" + b2TickNumberTarget + " - "
+ b3TickNumber + "/" + b3TickNumberTarget + " - "
+ b4TickNumber + "/" + b4TickNumberTarget,
loadingTimestamp);

// We get the new period of each job from parameter.
b1TickNumberTarget = parameterService.getParameterIntegerByKey("cerberus_automaticqueuecancellationjob_period", "", 60);
b2TickNumberTarget = parameterService.getParameterIntegerByKey("cerberus_automaticqueueprocessingjob_period", "", 30);
b3TickNumberTarget = 1;
b4TickNumberTarget = 1;

if (b1TickNumber < b1TickNumberTarget) {
b1TickNumber++;
} else {
b1TickNumber = 1;
performBatch1_CancelOldQueueEntries();
}

if (b2TickNumber < b2TickNumberTarget) {
b2TickNumber++;
} else {
b2TickNumber = 1;
// We trigger the Queue Processing job.
performBatch2_ProcessQueue();
}

if (b3TickNumber < b3TickNumberTarget) {
b3TickNumber++;
} else {
b3TickNumber = 1;
// We trigger the Scheduler init job.
performBatch3_SchedulerInit();
}

if (b4TickNumber < b4TickNumberTarget) {
b4TickNumber++;
} else {
b4TickNumber = 1;
// We trigger the Queue dependencies release by timing.
performBatch4_ProcessTimingBasedQueueDependencies();
}

LOG.debug("Schedule ({}) Stop. "
+ b1TickNumber + "/" + b1TickNumberTarget + " - "
+ b2TickNumber + "/" + b2TickNumberTarget + " - "
+ b3TickNumber + "/" + b3TickNumberTarget + " - "
+ b4TickNumber + "/" + b4TickNumberTarget,
loadingTimestamp);

if (b3TickNumber < b3TickNumberTarget) {
b3TickNumber++;
} else {
b3TickNumber = 1;
// We trigger the Scheduler init job.
performBatch3_SchedulerInit();
}
LOG.debug("Schedule ({}) disabled. ", loadingTimestamp);

performBatch3_ProcessTimingBasedQueueDependencies();
}

LOG.debug("Schedule Stop. " + b1TickNumber + "/" + b1TickNumberTarget + " - " + b2TickNumber + "/" + b2TickNumberTarget + " - " + b3TickNumber + "/" + b3TickNumberTarget);
}

private void performBatch1_CancelOldQueueEntries() {
LOG.info("automaticqueuecancellationjob Task triggered.");
LOG.info("Schedule ({}) : automaticqueuecancellationjob Task triggered.", loadingTimestamp);
if (parameterService.getParameterBooleanByKey("cerberus_automaticqueuecancellationjob_active", "", true)) {
testCaseExecutionQueueService.cancelRunningOldQueueEntries();
} else {
LOG.info("automaticqueuecancellationjob Task disabled by config (cerberus_automaticqueuecancellationjob_active).");
LOG.info("Schedule ({}) : automaticqueuecancellationjob Task disabled by config (cerberus_automaticqueuecancellationjob_active).", loadingTimestamp);
}
LOG.info("automaticqueuecancellationjob Task ended.");
LOG.info("Schedule ({}) : automaticqueuecancellationjob Task ended.", loadingTimestamp);
}

private void performBatch2_ProcessQueue() {
LOG.info("automaticqueueprocessingjob Task triggered.");
LOG.info("Schedule ({}) : automaticqueueprocessingjob Task triggered.", loadingTimestamp);
if (parameterService.getParameterBooleanByKey("cerberus_automaticqueueprocessingjob_active", "", true)) {
try {
executionThreadPoolService.executeNextInQueue(false);
} catch (CerberusException ex) {
LOG.error(ex.toString(), ex);
}
} else {
LOG.info("automaticqueueprocessingjob Task disabled by config (cerberus_automaticqueueprocessingjob_active).");
LOG.info("Schedule ({}) : automaticqueueprocessingjob Task disabled by config (cerberus_automaticqueueprocessingjob_active).", loadingTimestamp);
}
LOG.info("automaticqueueprocessingjob Task ended.");
LOG.info("Schedule ({}) : automaticqueueprocessingjob Task ended.", loadingTimestamp);
}

private void performBatch3_SchedulerInit() {
try {
LOG.debug("SchedulerInit Task triggered.");
LOG.debug("Schedule ({}) : SchedulerInit Task triggered.", loadingTimestamp);
schedulerInit.init();
LOG.debug("SchedulerInit Task ended.");
LOG.debug("Schedule ({}) : SchedulerInit Task ended.", loadingTimestamp);
} catch (Exception e) {
LOG.error("ScheduleEntry init from scheduletaskrunner failed : " + e);
LOG.error("ScheduleEntry init from scheduletaskrunner failed : " + e, e);
}

}

private void performBatch3_ProcessTimingBasedQueueDependencies() {
private void performBatch4_ProcessTimingBasedQueueDependencies() {
try {
LOG.debug("Queue dep timing Task triggered.");
LOG.debug("Schedule ({}) : Queue dep timing Task triggered.", loadingTimestamp);
int nbReleased = testCaseExecutionQueueDepService.manageDependenciesCheckTimingWaiting();
if (nbReleased > 0) {
LOG.info(nbReleased + " Queue entry(ies) has(have) been released due to TIMING dependencies. We trigger now the processing of the queue entry.");
LOG.info("Schedule ({}) : " + nbReleased + " Queue entry(ies) has(have) been released due to TIMING dependencies. We trigger now the processing of the queue entry.", loadingTimestamp);
executionThreadPoolService.executeNextInQueue(false);
}
LOG.debug("Queue dep timing Task ended.");
LOG.debug("Schedule ({}) : Queue dep timing Task ended.", loadingTimestamp);
} catch (Exception e) {
LOG.error("Queue dep timing Task from scheduletaskrunner failed : " + e);
LOG.error("Queue dep timing Task from scheduletaskrunner failed : " + e, e);
}

}
Expand Down
7 changes: 6 additions & 1 deletion source/src/main/resources/database.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6588,4 +6588,9 @@ UPDATE testcase SET `UsrModif`=REPLACE(REPLACE(`UsrModif`, '&#64;', '@'), '%40',
UPDATE testcasecountryproperties SET `Type` = 'getFromHtml' WHERE `Type` = 'getFromHTML';

-- 1874
UPDATE robot SET lbexemethod='BYRANKING' WHERE lbexemethod='';
UPDATE robot SET lbexemethod='BYRANKING' WHERE lbexemethod='';

-- 1875-1876
ALTER TABLE myversion MODIFY COLUMN Value BIGINT NULL;
INSERT INTO myversion (`Key`,Value) VALUES ('scheduler_active_instance_version',0);

0 comments on commit fd7d267

Please sign in to comment.