From f3c9db24f888722e936676f05cb843a8167c1872 Mon Sep 17 00:00:00 2001 From: Enrique Gonzalez Martinez Date: Thu, 24 Oct 2024 15:10:52 +0200 Subject: [PATCH] [incubator-kie-issues-1551] Deadlines for Human Task --- .../service/json/JacksonConfiguration.java | 2 +- ...essInstanceJobDescriptionDeserializer.java | 6 ++--- ...ocessInstanceJobDescriptionSerializer.java | 2 +- .../jobs/embedded/EmbeddedJobExecutor.java | 24 +++++++++++++++---- .../jobs/embedded/EmbeddedJobsService.java | 11 ++------- .../kogito/jobs/embedded/InVMPayloadData.java | 13 +++++----- .../jobs/embedded/JobInVMEventPublisher.java | 19 ++++++++------- .../embedded/EmbeddedJobsServiceTest.java | 10 ++++---- 8 files changed, 49 insertions(+), 38 deletions(-) diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java index cfbbd024a5..2cc0d1275e 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java @@ -20,7 +20,7 @@ import org.kie.kogito.jobs.DurationExpirationTime; import org.kie.kogito.jobs.ExactExpirationTime; -import org.kie.kogito.jobs.ProcessInstanceJobDescription; +import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescription; import org.kie.kogito.jobs.service.api.serlialization.SerializationUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java index 6ccfb99f47..723928705b 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java @@ -21,8 +21,8 @@ import java.io.IOException; import org.kie.kogito.jobs.ExpirationTime; -import org.kie.kogito.jobs.ProcessInstanceJobDescription; -import org.kie.kogito.jobs.ProcessInstanceJobDescriptionBuilder; +import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescription; +import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescriptionBuilder; import com.fasterxml.jackson.core.JacksonException; import com.fasterxml.jackson.core.JsonParser; @@ -42,7 +42,7 @@ public ProcessInstanceJobDescriptionDeserializer() { @Override public ProcessInstanceJobDescription deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JacksonException { - ProcessInstanceJobDescriptionBuilder builder = ProcessInstanceJobDescription.builder(); + ProcessInstanceJobDescriptionBuilder builder = ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder(); JsonNode node = jp.getCodec().readTree(jp); ofNullable(node.get("id")).ifPresent(e -> builder.id(e.textValue())); diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionSerializer.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionSerializer.java index 379da2a0e5..7dec635fa8 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionSerializer.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionSerializer.java @@ -20,7 +20,7 @@ import java.io.IOException; -import org.kie.kogito.jobs.ProcessInstanceJobDescription; +import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescription; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.SerializerProvider; diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java index 347eedcb69..6f170d775a 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java @@ -22,6 +22,8 @@ import org.kie.kogito.Application; import org.kie.kogito.Model; +import org.kie.kogito.jobs.JobDescription; +import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescription; import org.kie.kogito.jobs.service.api.Recipient; import org.kie.kogito.jobs.service.exception.JobExecutionException; import org.kie.kogito.jobs.service.executor.JobExecutor; @@ -50,12 +52,24 @@ public class EmbeddedJobExecutor implements JobExecutor { @Override public Uni execute(JobDetails jobDetails) { - - String correlationId = jobDetails.getCorrelationId(); RecipientInstance recipientModel = (RecipientInstance) jobDetails.getRecipient(); InVMRecipient recipient = (InVMRecipient) recipientModel.getRecipient(); - String timerId = recipient.getPayload().getData().timerId(); - String processInstanceId = recipient.getPayload().getData().processInstanceId(); + JobDescription jobDescription = recipient.getPayload().getData(); + if (jobDescription instanceof ProcessInstanceJobDescription processInstanceJobDescription) { + return processJobDescription(jobDetails, processInstanceJobDescription); + } + return Uni.createFrom().item( + JobExecutionResponse.builder() + .code("401") + .jobId(jobDetails.getId()) + .now() + .message("job cannot be processed") + .build()); + } + + private Uni processJobDescription(JobDetails jobDetails, ProcessInstanceJobDescription processInstanceJobDescription) { + String timerId = processInstanceJobDescription.timerId(); + String processInstanceId = processInstanceJobDescription.processInstanceId(); Optional> process = processes.processByProcessInstanceId(processInstanceId); if (process.isEmpty()) { return Uni.createFrom().item( @@ -69,7 +83,7 @@ public Uni execute(JobDetails jobDetails) { Integer limit = jobDetails.getRetries(); - TriggerJobCommand command = new TriggerJobCommand(processInstanceId, correlationId, timerId, limit, process.get(), application.unitOfWorkManager()); + TriggerJobCommand command = new TriggerJobCommand(processInstanceId, jobDetails.getCorrelationId(), timerId, limit, process.get(), application.unitOfWorkManager()); return Uni.createFrom().item(command::execute) .onFailure() diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java index 95b9b0d92e..84754c6dc2 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java @@ -20,9 +20,8 @@ import java.util.concurrent.ExecutionException; +import org.kie.kogito.jobs.JobDescription; import org.kie.kogito.jobs.JobsService; -import org.kie.kogito.jobs.ProcessInstanceJobDescription; -import org.kie.kogito.jobs.ProcessJobDescription; import org.kie.kogito.jobs.api.JobCallbackResourceDef; import org.kie.kogito.jobs.service.adapter.JobDetailsAdapter; import org.kie.kogito.jobs.service.api.Job; @@ -54,13 +53,7 @@ public EmbeddedJobsService() { } @Override - public String scheduleProcessJob(ProcessJobDescription description) { - LOGGER.debug("ScheduleProcessJob: {} not supported", description); - return null; - } - - @Override - public String scheduleProcessInstanceJob(ProcessInstanceJobDescription description) { + public String scheduleJob(JobDescription description) { try { Job job = Job.builder() .id(description.id()) diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java index cb49a135b6..bf3ded5577 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java @@ -18,12 +18,13 @@ */ package org.kie.kogito.jobs.embedded; -import org.kie.kogito.jobs.ProcessInstanceJobDescription; +import org.kie.kogito.jobs.JobDescription; +import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescription; import org.kie.kogito.jobs.service.api.PayloadData; -public class InVMPayloadData extends PayloadData { +public class InVMPayloadData extends PayloadData { - private ProcessInstanceJobDescription jobDescription; + private JobDescription jobDescription; public InVMPayloadData() { // do nothing @@ -33,16 +34,16 @@ public void setJobDescription(ProcessInstanceJobDescription jobDescription) { this.jobDescription = jobDescription; } - public ProcessInstanceJobDescription getJobDescription() { + public JobDescription getJobDescription() { return jobDescription; } @Override - public ProcessInstanceJobDescription getData() { + public JobDescription getData() { return jobDescription; } - public InVMPayloadData(ProcessInstanceJobDescription data) { + public InVMPayloadData(JobDescription data) { this.jobDescription = data; } diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java index dae4ac4ecf..61a83dbdd6 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java @@ -25,7 +25,8 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import org.kie.kogito.event.EventPublisher; import org.kie.kogito.event.job.JobInstanceDataEvent; -import org.kie.kogito.jobs.ProcessInstanceJobDescription; +import org.kie.kogito.jobs.JobDescription; +import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescription; import org.kie.kogito.jobs.service.adapter.ScheduledJobAdapter; import org.kie.kogito.jobs.service.api.Recipient; import org.kie.kogito.jobs.service.model.JobDetails; @@ -93,13 +94,15 @@ public void observe(@ObservesAsync EmbeddedJobServiceEvent serviceEvent) { try { ScheduledJob scheduledJob = ScheduledJobAdapter.of(jobDetails); Recipient recipient = jobDetails.getRecipient().getRecipient(); - ProcessInstanceJobDescription jobDescription = recipient.getPayload().getJobDescription(); - - scheduledJob.setProcessInstanceId(jobDescription.processInstanceId()); - scheduledJob.setProcessId(jobDescription.processId()); - scheduledJob.setRootProcessInstanceId(jobDescription.rootProcessInstanceId()); - scheduledJob.setRootProcessId(jobDescription.rootProcessId()); - scheduledJob.setNodeInstanceId(jobDescription.nodeInstanceId()); + JobDescription jobDescription = recipient.getPayload().getJobDescription(); + + if (jobDescription instanceof ProcessInstanceJobDescription processInstanceJobDescription) { + scheduledJob.setProcessInstanceId(processInstanceJobDescription.processInstanceId()); + scheduledJob.setProcessId(processInstanceJobDescription.processId()); + scheduledJob.setRootProcessInstanceId(processInstanceJobDescription.rootProcessInstanceId()); + scheduledJob.setRootProcessId(processInstanceJobDescription.rootProcessId()); + scheduledJob.setNodeInstanceId(processInstanceJobDescription.nodeInstanceId()); + } byte[] jsonContent = objectMapper.writeValueAsBytes(scheduledJob); JobInstanceDataEvent event = new JobInstanceDataEvent(JOB_EVENT_TYPE, diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java index c854f86f7f..acc4e5e7fb 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java @@ -28,7 +28,7 @@ import org.kie.kogito.event.job.JobInstanceDataEvent; import org.kie.kogito.jobs.DurationExpirationTime; import org.kie.kogito.jobs.JobsService; -import org.kie.kogito.jobs.ProcessInstanceJobDescription; +import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescription; import io.quarkus.test.junit.QuarkusTest; @@ -58,7 +58,7 @@ public void testJobService() throws Exception { CountDownLatch latch = new CountDownLatch(8); publisher.setLatch(latch); - ProcessInstanceJobDescription description = ProcessInstanceJobDescription.builder() + ProcessInstanceJobDescription description = ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder() .generateId() .timerId("-1") .expirationTime(DurationExpirationTime.now()) @@ -68,9 +68,9 @@ public void testJobService() throws Exception { .rootProcessId(null) .nodeInstanceId(NODE_INSTANCE_ID) .build(); - jobService.scheduleProcessInstanceJob(description); + jobService.scheduleJob(description); - ProcessInstanceJobDescription descriptionWRootProcess = ProcessInstanceJobDescription.builder() + ProcessInstanceJobDescription descriptionWRootProcess = ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder() .generateId() .timerId("-1") .expirationTime(DurationExpirationTime.now()) @@ -80,7 +80,7 @@ public void testJobService() throws Exception { .rootProcessId(ROOT_PROCESS_ID) .nodeInstanceId(NODE_INSTANCE_ID) .build(); - jobService.scheduleProcessInstanceJob(descriptionWRootProcess); + jobService.scheduleJob(descriptionWRootProcess); latch.await();