From fd7d26754480f05e8a5de371940bf7d1a9176acf Mon Sep 17 00:00:00 2001 From: Benoit DUMONT Date: Sun, 29 Dec 2024 18:15:18 +0100 Subject: [PATCH] Secure that only 1 job scheduled will trigger at the same time. Lock entry on myversion table has been introduced. --- .../cerberus/core/crud/dao/IMyVersionDAO.java | 22 +++ .../core/crud/dao/impl/MyVersionDAO.java | 44 ++++++ .../core/crud/service/IMyVersionService.java | 7 + .../crud/service/impl/MyVersionService.java | 5 + .../scheduledtasks/ScheduledTaskRunner.java | 140 +++++++++++++----- source/src/main/resources/database.sql | 7 +- 6 files changed, 185 insertions(+), 40 deletions(-) diff --git a/source/src/main/java/org/cerberus/core/crud/dao/IMyVersionDAO.java b/source/src/main/java/org/cerberus/core/crud/dao/IMyVersionDAO.java index 86399f36e..1b58f2fac 100644 --- a/source/src/main/java/org/cerberus/core/crud/dao/IMyVersionDAO.java +++ b/source/src/main/java/org/cerberus/core/crud/dao/IMyVersionDAO.java @@ -37,10 +37,32 @@ public interface IMyVersionDAO { */ MyVersion findMyVersionByKey(String key); + /** + * + * @param myVersion + * @return + */ boolean update(MyVersion myVersion); + /** + * + * @param myVersion + * @return + */ boolean updateMyVersionString(MyVersion myVersion); + /** + * + * @param myVersion + * @return + */ + boolean updateAndLockSchedulerVersion(long myVersion); + + /** + * + * @param key + * @return + */ boolean flagMyVersionString(String key); } diff --git a/source/src/main/java/org/cerberus/core/crud/dao/impl/MyVersionDAO.java b/source/src/main/java/org/cerberus/core/crud/dao/impl/MyVersionDAO.java index 8b278b9a7..5d01806ca 100644 --- a/source/src/main/java/org/cerberus/core/crud/dao/impl/MyVersionDAO.java +++ b/source/src/main/java/org/cerberus/core/crud/dao/impl/MyVersionDAO.java @@ -169,6 +169,50 @@ public boolean updateMyVersionString(MyVersion myVersion) { return result; } + @Override + public boolean updateAndLockSchedulerVersion(long myVersion) { + boolean result = false; + final String query = "UPDATE myversion SET value = ? WHERE `key` = 'scheduler_active_instance_version' and value < ?"; + + // Debug message on SQL. + if (LOG.isDebugEnabled()) { + LOG.debug("SQL : " + query); + LOG.debug("SQL.param.value : " + myVersion); + LOG.debug("SQL.param.value : " + (myVersion - 10000)); + } + + Connection connection = this.databaseSpring.connect(); + try { + PreparedStatement preStat = connection.prepareStatement(query); + try { + preStat.setLong(1, myVersion); + preStat.setLong(2, myVersion - 10000); + + if (preStat.executeUpdate() >= 1) { + result = true; + } else { + result = false; + } + + } catch (SQLException exception) { + LOG.warn("Unable to execute query : " + exception.toString()); + } finally { + preStat.close(); + } + } catch (SQLException exception) { + LOG.warn("Unable to execute query : " + exception.toString()); + } finally { + try { + if (connection != null) { + connection.close(); + } + } catch (SQLException e) { + LOG.warn(e.toString()); + } + } + return result; + } + @Override public boolean flagMyVersionString(String key) { boolean result = false; diff --git a/source/src/main/java/org/cerberus/core/crud/service/IMyVersionService.java b/source/src/main/java/org/cerberus/core/crud/service/IMyVersionService.java index 144c0bfdb..2aaa73189 100644 --- a/source/src/main/java/org/cerberus/core/crud/service/IMyVersionService.java +++ b/source/src/main/java/org/cerberus/core/crud/service/IMyVersionService.java @@ -52,6 +52,13 @@ public interface IMyVersionService { */ boolean updateMyVersionString(String key, String value); + /** + * + * @param value + * @return true if the update was done. False in case there were an issue. + */ + boolean updateAndLockSchedulerVersion(long value); + /** * Flag the key. Means that the method will return true if the previous * value was N and update manage to move it to Y. It returns false if the diff --git a/source/src/main/java/org/cerberus/core/crud/service/impl/MyVersionService.java b/source/src/main/java/org/cerberus/core/crud/service/impl/MyVersionService.java index 5a907b02e..376d473e5 100644 --- a/source/src/main/java/org/cerberus/core/crud/service/impl/MyVersionService.java +++ b/source/src/main/java/org/cerberus/core/crud/service/impl/MyVersionService.java @@ -65,6 +65,11 @@ public boolean updateMyVersionString(String key, String value) { return this.myVersionDAO.updateMyVersionString(DtbVersion); } + @Override + public boolean updateAndLockSchedulerVersion(long value) { + return this.myVersionDAO.updateAndLockSchedulerVersion(value); + } + @Override public boolean flagMyVersionString(String key) { return this.myVersionDAO.flagMyVersionString(key); diff --git a/source/src/main/java/org/cerberus/core/engine/scheduledtasks/ScheduledTaskRunner.java b/source/src/main/java/org/cerberus/core/engine/scheduledtasks/ScheduledTaskRunner.java index ddac304e5..02ca6c6a0 100644 --- a/source/src/main/java/org/cerberus/core/engine/scheduledtasks/ScheduledTaskRunner.java +++ b/source/src/main/java/org/cerberus/core/engine/scheduledtasks/ScheduledTaskRunner.java @@ -19,6 +19,13 @@ */ package org.cerberus.core.engine.scheduledtasks; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.Date; +import java.util.TimeZone; +import org.cerberus.core.crud.service.IMyVersionService; import org.cerberus.core.crud.service.IParameterService; import org.cerberus.core.crud.service.ITestCaseExecutionQueueDepService; import org.cerberus.core.crud.service.ITestCaseExecutionQueueService; @@ -47,6 +54,8 @@ public class ScheduledTaskRunner { private SchedulerInit schedulerInit; @Autowired private ITestCaseExecutionQueueDepService testCaseExecutionQueueDepService; + @Autowired + private IMyVersionService myVersionService; private int b1TickNumberTarget = 60; private int b1TickNumber = 1; @@ -54,57 +63,110 @@ public class ScheduledTaskRunner { private int b2TickNumber = 1; private int b3TickNumberTarget = 1; private int b3TickNumber = 1; + private int b4TickNumberTarget = 1; + private int b4TickNumber = 1; + + private long loadingTimestamp = 0; + private boolean instanceActive = true; + + private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.S"; private static final org.apache.logging.log4j.Logger LOG = org.apache.logging.log4j.LogManager.getLogger(ScheduledTaskRunner.class); @Scheduled(fixedRate = 60000, initialDelay = 30000 /* Every minute */) public void nextStep() { - LOG.debug("Schedule Start. " + b1TickNumber + "/" + b1TickNumberTarget + " - " + b2TickNumber + "/" + b2TickNumberTarget + " - " + b3TickNumber + "/" + b3TickNumberTarget); - // We get the new period from parameter and trigger the Queue automatic cancellation job. - b1TickNumberTarget = parameterService.getParameterIntegerByKey("cerberus_automaticqueuecancellationjob_period", "", 60); - b2TickNumberTarget = parameterService.getParameterIntegerByKey("cerberus_automaticqueueprocessingjob_period", "", 30); - - if (b1TickNumber < b1TickNumberTarget) { - b1TickNumber++; - } else { - b1TickNumber = 1; - performBatch1_CancelOldQueueEntries(); + /** + * Secure only 1 Task trigger on a given cerberus instance. Multiple + * jobs could be triggered if several instance is running. The first + * scheduler trigger will define loadingTimestamp and instanceActive + * based on a lock value defined at database level. Only the 1st update + * (within 10 second delay) will be considered as active. All others + * will be disabled. + */ + if (loadingTimestamp == 0) { + Date d = new Date(); + TimeZone tz = TimeZone.getTimeZone("UTC"); + DateFormat df = new SimpleDateFormat(DATE_FORMAT); + df.setTimeZone(tz); + long newVersion = new java.util.Date().getTime(); + loadingTimestamp = newVersion; + LOG.debug("Setting local scheduler Version to : {}", newVersion); + instanceActive = myVersionService.updateAndLockSchedulerVersion(newVersion); } - if (b2TickNumber < b2TickNumberTarget) { - b2TickNumber++; - } else { - b2TickNumber = 1; - // We trigger the Queue Processing job. - performBatch2_ProcessQueue(); - } + if (instanceActive) { + + LOG.debug("Schedule ({}) Start. " + + b1TickNumber + "/" + b1TickNumberTarget + " - " + + b2TickNumber + "/" + b2TickNumberTarget + " - " + + b3TickNumber + "/" + b3TickNumberTarget + " - " + + b4TickNumber + "/" + b4TickNumberTarget, + loadingTimestamp); + + // We get the new period of each job from parameter. + b1TickNumberTarget = parameterService.getParameterIntegerByKey("cerberus_automaticqueuecancellationjob_period", "", 60); + b2TickNumberTarget = parameterService.getParameterIntegerByKey("cerberus_automaticqueueprocessingjob_period", "", 30); + b3TickNumberTarget = 1; + b4TickNumberTarget = 1; + + if (b1TickNumber < b1TickNumberTarget) { + b1TickNumber++; + } else { + b1TickNumber = 1; + performBatch1_CancelOldQueueEntries(); + } + + if (b2TickNumber < b2TickNumberTarget) { + b2TickNumber++; + } else { + b2TickNumber = 1; + // We trigger the Queue Processing job. + performBatch2_ProcessQueue(); + } + + if (b3TickNumber < b3TickNumberTarget) { + b3TickNumber++; + } else { + b3TickNumber = 1; + // We trigger the Scheduler init job. + performBatch3_SchedulerInit(); + } + + if (b4TickNumber < b4TickNumberTarget) { + b4TickNumber++; + } else { + b4TickNumber = 1; + // We trigger the Queue dependencies release by timing. + performBatch4_ProcessTimingBasedQueueDependencies(); + } + + LOG.debug("Schedule ({}) Stop. " + + b1TickNumber + "/" + b1TickNumberTarget + " - " + + b2TickNumber + "/" + b2TickNumberTarget + " - " + + b3TickNumber + "/" + b3TickNumberTarget + " - " + + b4TickNumber + "/" + b4TickNumberTarget, + loadingTimestamp); - if (b3TickNumber < b3TickNumberTarget) { - b3TickNumber++; } else { - b3TickNumber = 1; - // We trigger the Scheduler init job. - performBatch3_SchedulerInit(); - } + LOG.debug("Schedule ({}) disabled. ", loadingTimestamp); - performBatch3_ProcessTimingBasedQueueDependencies(); + } - LOG.debug("Schedule Stop. " + b1TickNumber + "/" + b1TickNumberTarget + " - " + b2TickNumber + "/" + b2TickNumberTarget + " - " + b3TickNumber + "/" + b3TickNumberTarget); } private void performBatch1_CancelOldQueueEntries() { - LOG.info("automaticqueuecancellationjob Task triggered."); + LOG.info("Schedule ({}) : automaticqueuecancellationjob Task triggered.", loadingTimestamp); if (parameterService.getParameterBooleanByKey("cerberus_automaticqueuecancellationjob_active", "", true)) { testCaseExecutionQueueService.cancelRunningOldQueueEntries(); } else { - LOG.info("automaticqueuecancellationjob Task disabled by config (cerberus_automaticqueuecancellationjob_active)."); + LOG.info("Schedule ({}) : automaticqueuecancellationjob Task disabled by config (cerberus_automaticqueuecancellationjob_active).", loadingTimestamp); } - LOG.info("automaticqueuecancellationjob Task ended."); + LOG.info("Schedule ({}) : automaticqueuecancellationjob Task ended.", loadingTimestamp); } private void performBatch2_ProcessQueue() { - LOG.info("automaticqueueprocessingjob Task triggered."); + LOG.info("Schedule ({}) : automaticqueueprocessingjob Task triggered.", loadingTimestamp); if (parameterService.getParameterBooleanByKey("cerberus_automaticqueueprocessingjob_active", "", true)) { try { executionThreadPoolService.executeNextInQueue(false); @@ -112,33 +174,33 @@ private void performBatch2_ProcessQueue() { LOG.error(ex.toString(), ex); } } else { - LOG.info("automaticqueueprocessingjob Task disabled by config (cerberus_automaticqueueprocessingjob_active)."); + LOG.info("Schedule ({}) : automaticqueueprocessingjob Task disabled by config (cerberus_automaticqueueprocessingjob_active).", loadingTimestamp); } - LOG.info("automaticqueueprocessingjob Task ended."); + LOG.info("Schedule ({}) : automaticqueueprocessingjob Task ended.", loadingTimestamp); } private void performBatch3_SchedulerInit() { try { - LOG.debug("SchedulerInit Task triggered."); + LOG.debug("Schedule ({}) : SchedulerInit Task triggered.", loadingTimestamp); schedulerInit.init(); - LOG.debug("SchedulerInit Task ended."); + LOG.debug("Schedule ({}) : SchedulerInit Task ended.", loadingTimestamp); } catch (Exception e) { - LOG.error("ScheduleEntry init from scheduletaskrunner failed : " + e); + LOG.error("ScheduleEntry init from scheduletaskrunner failed : " + e, e); } } - private void performBatch3_ProcessTimingBasedQueueDependencies() { + private void performBatch4_ProcessTimingBasedQueueDependencies() { try { - LOG.debug("Queue dep timing Task triggered."); + LOG.debug("Schedule ({}) : Queue dep timing Task triggered.", loadingTimestamp); int nbReleased = testCaseExecutionQueueDepService.manageDependenciesCheckTimingWaiting(); if (nbReleased > 0) { - LOG.info(nbReleased + " Queue entry(ies) has(have) been released due to TIMING dependencies. We trigger now the processing of the queue entry."); + LOG.info("Schedule ({}) : " + nbReleased + " Queue entry(ies) has(have) been released due to TIMING dependencies. We trigger now the processing of the queue entry.", loadingTimestamp); executionThreadPoolService.executeNextInQueue(false); } - LOG.debug("Queue dep timing Task ended."); + LOG.debug("Schedule ({}) : Queue dep timing Task ended.", loadingTimestamp); } catch (Exception e) { - LOG.error("Queue dep timing Task from scheduletaskrunner failed : " + e); + LOG.error("Queue dep timing Task from scheduletaskrunner failed : " + e, e); } } diff --git a/source/src/main/resources/database.sql b/source/src/main/resources/database.sql index 9b5e5e925..2678f5e56 100644 --- a/source/src/main/resources/database.sql +++ b/source/src/main/resources/database.sql @@ -6588,4 +6588,9 @@ UPDATE testcase SET `UsrModif`=REPLACE(REPLACE(`UsrModif`, '@', '@'), '%40', UPDATE testcasecountryproperties SET `Type` = 'getFromHtml' WHERE `Type` = 'getFromHTML'; -- 1874 -UPDATE robot SET lbexemethod='BYRANKING' WHERE lbexemethod=''; \ No newline at end of file +UPDATE robot SET lbexemethod='BYRANKING' WHERE lbexemethod=''; + +-- 1875-1876 +ALTER TABLE myversion MODIFY COLUMN Value BIGINT NULL; +INSERT INTO myversion (`Key`,Value) VALUES ('scheduler_active_instance_version',0); +