Skip to content

Commit

Permalink
[incubator-kie-issues#1578] Enhance JobSchedulerManager to handle ove…
Browse files Browse the repository at this point in the history
…rdue jobs during the period job loading (apache#2131)
  • Loading branch information
martinweiler authored and rgdoliveira committed Nov 7, 2024
1 parent 262af48 commit 2ee31a0
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ private PublisherBuilder<JobDetails> handleRetry(CompletionStage<JobDetails> fut
.build())
.map(jobRepository::save)
.flatMapCompletionStage(p -> p))
.peek(job -> LOGGER.debug("Retry executed {}", job));
.peek(job -> LOGGER.debug("Retry executed {}", job))
.onError(errorHandler -> LOGGER.error("Failed to retrieve job due to {}", errorHandler.getMessage()));
}

private PointInTimeTrigger getRetryTrigger() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ enum LoadJobErrorStrategy {
}

private void startJobsLoadingFromRepositoryTask() {
LOGGER.info(
"Starting with configuration: schedulerChunkInMinutes={}, loadJobIntervalInMinutes={}, loadJobFromCurrentTimeIntervalInMinutes={}, loadJobRetries={}, loadJobErrorStrategy={}",
schedulerChunkInMinutes, loadJobIntervalInMinutes, loadJobFromCurrentTimeIntervalInMinutes, loadJobRetries, loadJobErrorStrategy);
//guarantee it starts the task just in case it is not already active
initialLoading.set(true);
if (periodicTimerIdForLoadJobs.get() < 0) {
Expand All @@ -129,6 +132,13 @@ private void startJobsLoadingFromRepositoryTask() {
schedulerChunkInMinutes);
loadJobIntervalInMinutes = schedulerChunkInMinutes;
}
if (loadJobFromCurrentTimeIntervalInMinutes < loadJobIntervalInMinutes) {
LOGGER.warn("The loadJobFromCurrentTimeIntervalInMinutes value ({}) is smaller than loadJobIntervalInMinutes ({}). " +
"This can potentially lead to overdue timers not getting rescheduled during the periodic job loading.",
loadJobFromCurrentTimeIntervalInMinutes,
loadJobIntervalInMinutes);

}
//first execution
vertx.runOnContext(this::loadJobDetails);
//next executions to run periodically
Expand Down Expand Up @@ -198,6 +208,15 @@ private boolean isNotScheduled(JobDetails jobDetails) {
Date triggerFireTime = jobDetails.getTrigger().hasNextFireTime();
ZonedDateTime nextFireTime = triggerFireTime != null ? DateUtil.instantToZonedDateTime(triggerFireTime.toInstant()) : null;
boolean scheduled = scheduler.scheduled(jobDetails.getId()).isPresent();
// cancel an overdue timer to have it rescheduled
if (!initialLoading.get() && nextFireTime != null && nextFireTime.isBefore(DateUtil.now())) {
LOGGER.debug("Job found, id: {}, nextFireTime: {}, created: {}, status: {} is overdue and will be rescheduled", jobDetails.getId(),
nextFireTime,
jobDetails.getCreated(),
jobDetails.getStatus());
scheduler.cancel(jobDetails.getId());
return true;
}
LOGGER.debug("Job found, id: {}, nextFireTime: {}, created: {}, status: {}, already scheduled: {}", jobDetails.getId(),
nextFireTime,
jobDetails.getCreated(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public TimerDelegateJobScheduler(ReactiveJobRepository jobRepository,
@ConfigProperty(name = "kogito.jobs-service.forceExecuteExpiredJobsOnServiceStart", defaultValue = "true") boolean forceExecuteExpiredJobsOnServiceStart,
JobExecutorResolver jobExecutorResolver, VertxTimerServiceScheduler delegate) {
super(jobRepository, backoffRetryMillis, maxIntervalLimitToRetryMillis, schedulerChunkInMinutes, forceExecuteExpiredJobs, forceExecuteExpiredJobsOnServiceStart);
LOGGER.info(
"Creating JobScheduler with backoffRetryMillis={}, maxIntervalLimitToRetryMillis={}, schedulerChunkInMinutes={}, forceExecuteExpiredJobs={}, forceExecuteExpiredJobsOnServiceStart={}",
backoffRetryMillis, maxIntervalLimitToRetryMillis, schedulerChunkInMinutes, forceExecuteExpiredJobs, forceExecuteExpiredJobsOnServiceStart);
this.jobExecutorResolver = jobExecutorResolver;
this.delegate = delegate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,15 @@ public Uni<JobExecutionResponse> execute(JobDetails jobDetails) {
InVMRecipient recipient = (InVMRecipient) recipientModel.getRecipient();
String timerId = recipient.getPayload().getData().timerId();
String processInstanceId = recipient.getPayload().getData().processInstanceId();
Optional<Process<? extends Model>> process = processes.processByProcessInstanceId(processInstanceId);
Optional<Process<? extends Model>> process;
try {
process = processes.processByProcessInstanceId(processInstanceId);
} catch (Exception ex) {
return Uni.createFrom().failure(
new JobExecutionException(jobDetails.getId(),
"Unexpected error when executing Embedded request for job: " + jobDetails.getId() + ". " + ex.getMessage(),
ex));
}
if (process.isEmpty()) {
return Uni.createFrom().item(
JobExecutionResponse.builder()
Expand Down

0 comments on commit 2ee31a0

Please sign in to comment.