From 2ee31a04aa428575790fd632741f7c736660a6bc Mon Sep 17 00:00:00 2001 From: Martin Weiler Date: Fri, 1 Nov 2024 09:39:47 -0600 Subject: [PATCH] [incubator-kie-issues#1578] Enhance JobSchedulerManager to handle overdue jobs during the period job loading (#2131) --- .../scheduler/BaseTimerJobScheduler.java | 3 ++- .../scheduler/JobSchedulerManager.java | 19 +++++++++++++++++++ .../impl/TimerDelegateJobScheduler.java | 3 +++ .../jobs/embedded/EmbeddedJobExecutor.java | 10 +++++++++- 4 files changed, 33 insertions(+), 2 deletions(-) diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java index ac080cdb82..49e777ba8c 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java @@ -343,7 +343,8 @@ private PublisherBuilder handleRetry(CompletionStage fut .build()) .map(jobRepository::save) .flatMapCompletionStage(p -> p)) - .peek(job -> LOGGER.debug("Retry executed {}", job)); + .peek(job -> LOGGER.debug("Retry executed {}", job)) + .onError(errorHandler -> LOGGER.error("Failed to retrieve job due to {}", errorHandler.getMessage())); } private PointInTimeTrigger getRetryTrigger() { diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java index b0bd05feac..2211c3aeef 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java @@ -118,6 +118,9 @@ enum LoadJobErrorStrategy { } private void startJobsLoadingFromRepositoryTask() { + LOGGER.info( + "Starting with configuration: schedulerChunkInMinutes={}, loadJobIntervalInMinutes={}, loadJobFromCurrentTimeIntervalInMinutes={}, loadJobRetries={}, loadJobErrorStrategy={}", + schedulerChunkInMinutes, loadJobIntervalInMinutes, loadJobFromCurrentTimeIntervalInMinutes, loadJobRetries, loadJobErrorStrategy); //guarantee it starts the task just in case it is not already active initialLoading.set(true); if (periodicTimerIdForLoadJobs.get() < 0) { @@ -129,6 +132,13 @@ private void startJobsLoadingFromRepositoryTask() { schedulerChunkInMinutes); loadJobIntervalInMinutes = schedulerChunkInMinutes; } + if (loadJobFromCurrentTimeIntervalInMinutes < loadJobIntervalInMinutes) { + LOGGER.warn("The loadJobFromCurrentTimeIntervalInMinutes value ({}) is smaller than loadJobIntervalInMinutes ({}). " + + "This can potentially lead to overdue timers not getting rescheduled during the periodic job loading.", + loadJobFromCurrentTimeIntervalInMinutes, + loadJobIntervalInMinutes); + + } //first execution vertx.runOnContext(this::loadJobDetails); //next executions to run periodically @@ -198,6 +208,15 @@ private boolean isNotScheduled(JobDetails jobDetails) { Date triggerFireTime = jobDetails.getTrigger().hasNextFireTime(); ZonedDateTime nextFireTime = triggerFireTime != null ? DateUtil.instantToZonedDateTime(triggerFireTime.toInstant()) : null; boolean scheduled = scheduler.scheduled(jobDetails.getId()).isPresent(); + // cancel an overdue timer to have it rescheduled + if (!initialLoading.get() && nextFireTime != null && nextFireTime.isBefore(DateUtil.now())) { + LOGGER.debug("Job found, id: {}, nextFireTime: {}, created: {}, status: {} is overdue and will be rescheduled", jobDetails.getId(), + nextFireTime, + jobDetails.getCreated(), + jobDetails.getStatus()); + scheduler.cancel(jobDetails.getId()); + return true; + } LOGGER.debug("Job found, id: {}, nextFireTime: {}, created: {}, status: {}, already scheduled: {}", jobDetails.getId(), nextFireTime, jobDetails.getCreated(), diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java index 6b30c30b0b..b64bd87b95 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java @@ -62,6 +62,9 @@ public TimerDelegateJobScheduler(ReactiveJobRepository jobRepository, @ConfigProperty(name = "kogito.jobs-service.forceExecuteExpiredJobsOnServiceStart", defaultValue = "true") boolean forceExecuteExpiredJobsOnServiceStart, JobExecutorResolver jobExecutorResolver, VertxTimerServiceScheduler delegate) { super(jobRepository, backoffRetryMillis, maxIntervalLimitToRetryMillis, schedulerChunkInMinutes, forceExecuteExpiredJobs, forceExecuteExpiredJobsOnServiceStart); + LOGGER.info( + "Creating JobScheduler with backoffRetryMillis={}, maxIntervalLimitToRetryMillis={}, schedulerChunkInMinutes={}, forceExecuteExpiredJobs={}, forceExecuteExpiredJobsOnServiceStart={}", + backoffRetryMillis, maxIntervalLimitToRetryMillis, schedulerChunkInMinutes, forceExecuteExpiredJobs, forceExecuteExpiredJobsOnServiceStart); this.jobExecutorResolver = jobExecutorResolver; this.delegate = delegate; } diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java index 347eedcb69..ead3bbf2a2 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java @@ -56,7 +56,15 @@ public Uni execute(JobDetails jobDetails) { InVMRecipient recipient = (InVMRecipient) recipientModel.getRecipient(); String timerId = recipient.getPayload().getData().timerId(); String processInstanceId = recipient.getPayload().getData().processInstanceId(); - Optional> process = processes.processByProcessInstanceId(processInstanceId); + Optional> process; + try { + process = processes.processByProcessInstanceId(processInstanceId); + } catch (Exception ex) { + return Uni.createFrom().failure( + new JobExecutionException(jobDetails.getId(), + "Unexpected error when executing Embedded request for job: " + jobDetails.getId() + ". " + ex.getMessage(), + ex)); + } if (process.isEmpty()) { return Uni.createFrom().item( JobExecutionResponse.builder()