From cd975a77b410e1561c0fd45edc1546f0b691f983 Mon Sep 17 00:00:00 2001 From: Enrique Gonzalez Martinez Date: Thu, 24 Oct 2024 15:10:52 +0200 Subject: [PATCH] [incubator-kie-issues-1551] Deadlines for Human Task --- .../service/json/JacksonConfiguration.java | 6 +- .../json/JobDescriptionDeserializer.java | 85 +++++++++++++++++++ ...zer.java => JobDescriptionSerializer.java} | 32 ++++--- ...essInstanceJobDescriptionDeserializer.java | 67 --------------- .../jobs/embedded/EmbeddedJobExecutor.java | 79 +++++++++++++---- .../jobs/embedded/EmbeddedJobsService.java | 11 +-- .../kogito/jobs/embedded/InVMPayloadData.java | 14 +-- .../jobs/embedded/JobInVMEventPublisher.java | 19 +++-- .../embedded/EmbeddedJobsServiceTest.java | 10 +-- 9 files changed, 197 insertions(+), 126 deletions(-) create mode 100644 jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionDeserializer.java rename jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/{ProcessInstanceJobDescriptionSerializer.java => JobDescriptionSerializer.java} (50%) delete mode 100644 jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java index cfbbd024a5..f1603c1ef4 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JacksonConfiguration.java @@ -20,7 +20,7 @@ import org.kie.kogito.jobs.DurationExpirationTime; import org.kie.kogito.jobs.ExactExpirationTime; -import org.kie.kogito.jobs.ProcessInstanceJobDescription; +import org.kie.kogito.jobs.JobDescription; import org.kie.kogito.jobs.service.api.serlialization.SerializationUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,8 +46,8 @@ public ObjectMapperCustomizer customizer() { return objectMapper -> { LOGGER.debug("Jackson customization initialized."); SimpleModule kogitoCustomModule = new SimpleModule(); - kogitoCustomModule.addSerializer(ProcessInstanceJobDescription.class, new ProcessInstanceJobDescriptionSerializer()); - kogitoCustomModule.addDeserializer(ProcessInstanceJobDescription.class, new ProcessInstanceJobDescriptionDeserializer()); + kogitoCustomModule.addSerializer(JobDescription.class, new JobDescriptionSerializer()); + kogitoCustomModule.addDeserializer(JobDescription.class, new JobDescriptionDeserializer()); kogitoCustomModule.addSerializer(DurationExpirationTime.class, new DurationExpirationTimeSerializer()); kogitoCustomModule.addDeserializer(DurationExpirationTime.class, new DurationExpirationTimeDeserializer()); kogitoCustomModule.addSerializer(ExactExpirationTime.class, new ExactExpirationTimeSerializer()); diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionDeserializer.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionDeserializer.java new file mode 100644 index 0000000000..f0a72db2b4 --- /dev/null +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionDeserializer.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.jobs.service.json; + +import java.io.IOException; + +import org.kie.kogito.jobs.ExpirationTime; +import org.kie.kogito.jobs.JobDescription; +import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription; +import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescriptionBuilder; +import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription; +import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescriptionBuilder; + +import com.fasterxml.jackson.core.JacksonException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; + +import static java.util.Optional.ofNullable; + +public class JobDescriptionDeserializer extends StdDeserializer { + + private static final long serialVersionUID = -8307549297456060422L; + + public JobDescriptionDeserializer() { + super(ProcessInstanceJobDescription.class); + } + + @Override + public JobDescription deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JacksonException { + try { + JsonNode node = jp.getCodec().readTree(jp); + String jobDescriptionType = node.get("@type").asText(); + switch (jobDescriptionType) { + case "ProcessInstanceJobDescription": { + ProcessInstanceJobDescriptionBuilder builder = ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder(); + ofNullable(node.get("id")).ifPresent(e -> builder.id(e.textValue())); + ofNullable(node.get("priority")).ifPresent(e -> builder.priority(e.asInt())); + String expirationTimeType = node.get("expirationTime").get("@type").asText(); + builder.expirationTime((ExpirationTime) ctxt.readTreeAsValue(node.get("expirationTime"), Class.forName(expirationTimeType))); + + ofNullable(node.get("timerId")).ifPresent(e -> builder.timerId(e.textValue())); + ofNullable(node.get("processInstanceId")).ifPresent(e -> builder.processInstanceId(e.textValue())); + ofNullable(node.get("rootProcessInstanceId")).ifPresent(e -> builder.rootProcessInstanceId(e.textValue())); + ofNullable(node.get("processId")).ifPresent(e -> builder.processId(e.textValue())); + ofNullable(node.get("rootProcessId")).ifPresent(e -> builder.rootProcessId(e.textValue())); + ofNullable(node.get("nodeInstanceId")).ifPresent(e -> builder.nodeInstanceId(e.textValue())); + + return builder.build(); + } + case "UserTaskInstanceJobDescription": { + UserTaskInstanceJobDescriptionBuilder builder = UserTaskInstanceJobDescription.newUserTaskInstanceJobDescriptionBuilder(); + ofNullable(node.get("id")).ifPresent(e -> builder.id(e.textValue())); + ofNullable(node.get("priority")).ifPresent(e -> builder.priority(e.asInt())); + String expirationTimeType = node.get("expirationTime").get("@type").asText(); + builder.expirationTime((ExpirationTime) ctxt.readTreeAsValue(node.get("expirationTime"), Class.forName(expirationTimeType))); + + ofNullable(node.get("userTaskInstanceId")).ifPresent(e -> builder.userTaskInstanceId(e.textValue())); + return builder.build(); + } + } + } catch (ClassNotFoundException e1) { + throw new IllegalArgumentException("expiration time class not found", e1); + } + return null; + } + +} diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionSerializer.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionSerializer.java similarity index 50% rename from jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionSerializer.java rename to jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionSerializer.java index 379da2a0e5..22935e0e37 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionSerializer.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionSerializer.java @@ -20,32 +20,40 @@ import java.io.IOException; -import org.kie.kogito.jobs.ProcessInstanceJobDescription; +import org.kie.kogito.jobs.JobDescription; +import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription; +import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.ser.std.StdSerializer; -public class ProcessInstanceJobDescriptionSerializer extends StdSerializer { +public class JobDescriptionSerializer extends StdSerializer { private static final long serialVersionUID = -8307549297456060422L; - public ProcessInstanceJobDescriptionSerializer() { - super(ProcessInstanceJobDescription.class); + public JobDescriptionSerializer() { + super(JobDescription.class); } @Override - public void serialize(ProcessInstanceJobDescription value, JsonGenerator jgen, SerializerProvider provider) throws IOException { + public void serialize(JobDescription value, JsonGenerator jgen, SerializerProvider provider) throws IOException { jgen.writeStartObject(); + jgen.writeStringField("@type", value.getClass().getSimpleName()); jgen.writeStringField("id", value.id()); - jgen.writeStringField("timerId", value.timerId()); - jgen.writeObjectField("expirationTime", value.expirationTime()); jgen.writeNumberField("priority", value.priority()); - jgen.writeStringField("processInstanceId", value.processInstanceId()); - jgen.writeStringField("rootProcessInstanceId", value.rootProcessInstanceId()); - jgen.writeStringField("processId", value.processId()); - jgen.writeStringField("rootProcessId", value.rootProcessId()); - jgen.writeStringField("nodeInstanceId", value.nodeInstanceId()); + jgen.writeObjectField("expirationTime", value.expirationTime()); + if (value instanceof ProcessInstanceJobDescription jobDescription) { + jgen.writeStringField("timerId", jobDescription.timerId()); + jgen.writeStringField("processInstanceId", jobDescription.processInstanceId()); + jgen.writeStringField("rootProcessInstanceId", jobDescription.rootProcessInstanceId()); + jgen.writeStringField("processId", jobDescription.processId()); + jgen.writeStringField("rootProcessId", jobDescription.rootProcessId()); + jgen.writeStringField("nodeInstanceId", jobDescription.nodeInstanceId()); + jgen.writeEndObject(); + } else if (value instanceof UserTaskInstanceJobDescription userTaskInstanceJobDescription) { + jgen.writeStringField("userTaskInstanceId", userTaskInstanceJobDescription.getUserTaskInstanceId()); + } jgen.writeEndObject(); } diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java deleted file mode 100644 index 6ccfb99f47..0000000000 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.kie.kogito.jobs.service.json; - -import java.io.IOException; - -import org.kie.kogito.jobs.ExpirationTime; -import org.kie.kogito.jobs.ProcessInstanceJobDescription; -import org.kie.kogito.jobs.ProcessInstanceJobDescriptionBuilder; - -import com.fasterxml.jackson.core.JacksonException; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.deser.std.StdDeserializer; - -import static java.util.Optional.ofNullable; - -public class ProcessInstanceJobDescriptionDeserializer extends StdDeserializer { - - private static final long serialVersionUID = -8307549297456060422L; - - public ProcessInstanceJobDescriptionDeserializer() { - super(ProcessInstanceJobDescription.class); - } - - @Override - public ProcessInstanceJobDescription deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JacksonException { - ProcessInstanceJobDescriptionBuilder builder = ProcessInstanceJobDescription.builder(); - - JsonNode node = jp.getCodec().readTree(jp); - ofNullable(node.get("id")).ifPresent(e -> builder.id(e.textValue())); - ofNullable(node.get("timerId")).ifPresent(e -> builder.timerId(e.textValue())); - ofNullable(node.get("priority")).ifPresent(e -> builder.priority(e.asInt())); - ofNullable(node.get("processInstanceId")).ifPresent(e -> builder.processInstanceId(e.textValue())); - ofNullable(node.get("rootProcessInstanceId")).ifPresent(e -> builder.rootProcessInstanceId(e.textValue())); - ofNullable(node.get("processId")).ifPresent(e -> builder.processId(e.textValue())); - ofNullable(node.get("rootProcessId")).ifPresent(e -> builder.rootProcessId(e.textValue())); - ofNullable(node.get("nodeInstanceId")).ifPresent(e -> builder.nodeInstanceId(e.textValue())); - - String type = node.get("expirationTime").get("@type").asText(); - try { - builder.expirationTime((ExpirationTime) ctxt.readTreeAsValue(node.get("expirationTime"), Class.forName(type))); - } catch (ClassNotFoundException | IOException e1) { - e1.printStackTrace(); - } - - return builder.build(); - } - -} diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java index ead3bbf2a2..961e0485aa 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java @@ -19,9 +19,13 @@ package org.kie.kogito.jobs.embedded; import java.util.Optional; +import java.util.function.Supplier; import org.kie.kogito.Application; import org.kie.kogito.Model; +import org.kie.kogito.jobs.JobDescription; +import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription; +import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription; import org.kie.kogito.jobs.service.api.Recipient; import org.kie.kogito.jobs.service.exception.JobExecutionException; import org.kie.kogito.jobs.service.executor.JobExecutor; @@ -31,40 +35,81 @@ import org.kie.kogito.process.Process; import org.kie.kogito.process.Processes; import org.kie.kogito.services.jobs.impl.TriggerJobCommand; +import org.kie.kogito.usertask.UserTaskInstance; +import org.kie.kogito.usertask.UserTasks; import io.smallrye.mutiny.Uni; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.inject.Alternative; +import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; +import static org.kie.kogito.services.uow.UnitOfWorkExecutor.executeInUnitOfWork; + @ApplicationScoped @Alternative public class EmbeddedJobExecutor implements JobExecutor { @Inject - Processes processes; + Instance processes; + + @Inject + Instance userTasks; @Inject Application application; @Override public Uni execute(JobDetails jobDetails) { - - String correlationId = jobDetails.getCorrelationId(); RecipientInstance recipientModel = (RecipientInstance) jobDetails.getRecipient(); InVMRecipient recipient = (InVMRecipient) recipientModel.getRecipient(); - String timerId = recipient.getPayload().getData().timerId(); - String processInstanceId = recipient.getPayload().getData().processInstanceId(); - Optional> process; - try { - process = processes.processByProcessInstanceId(processInstanceId); - } catch (Exception ex) { - return Uni.createFrom().failure( - new JobExecutionException(jobDetails.getId(), - "Unexpected error when executing Embedded request for job: " + jobDetails.getId() + ". " + ex.getMessage(), - ex)); + JobDescription jobDescription = recipient.getPayload().getData(); + if (jobDescription instanceof ProcessInstanceJobDescription processInstanceJobDescription && processes.isResolvable()) { + return processJobDescription(jobDetails, processInstanceJobDescription); + } else if (jobDescription instanceof UserTaskInstanceJobDescription userTaskInstanceJobDescription && userTasks.isResolvable()) { + return processJobDescription(jobDetails, userTaskInstanceJobDescription); } + + return Uni.createFrom().item( + JobExecutionResponse.builder() + .code("401") + .jobId(jobDetails.getId()) + .now() + .message("job cannot be processed") + .build()); + } + + private Uni processJobDescription(JobDetails jobDetails, UserTaskInstanceJobDescription userTaskInstanceJobDescription) { + Supplier execute = () -> executeInUnitOfWork(application.unitOfWorkManager(), () -> { + Optional userTaskInstance = userTasks.get().instances().findById(userTaskInstanceJobDescription.getUserTaskInstanceId()); + if (userTaskInstance.isEmpty()) { + return null; + } + UserTaskInstance instance = userTaskInstance.get(); + instance.trigger(userTaskInstanceJobDescription); + return null; + }); + + return Uni.createFrom().item(execute) + .onFailure() + .transform( + unexpected -> new JobExecutionException(jobDetails.getId(), "Unexpected error when executing Embedded request for job: " + jobDetails.getId() + ". " + unexpected.getMessage(), + unexpected)) + .onItem() + .transform(res -> JobExecutionResponse.builder() + .message("Embedded job executed") + .code(String.valueOf(200)) + .now() + .jobId(jobDetails.getId()) + .build()); + + } + + private Uni processJobDescription(JobDetails jobDetails, ProcessInstanceJobDescription processInstanceJobDescription) { + String timerId = processInstanceJobDescription.timerId(); + String processInstanceId = processInstanceJobDescription.processInstanceId(); + Optional> process = processes.get().processByProcessInstanceId(processInstanceId); if (process.isEmpty()) { return Uni.createFrom().item( JobExecutionResponse.builder() @@ -77,9 +122,13 @@ public Uni execute(JobDetails jobDetails) { Integer limit = jobDetails.getRetries(); - TriggerJobCommand command = new TriggerJobCommand(processInstanceId, correlationId, timerId, limit, process.get(), application.unitOfWorkManager()); + Supplier execute = () -> executeInUnitOfWork(application.unitOfWorkManager(), () -> { + TriggerJobCommand command = new TriggerJobCommand(processInstanceId, jobDetails.getCorrelationId(), timerId, limit, process.get(), application.unitOfWorkManager()); + return command.execute(); + }); - return Uni.createFrom().item(command::execute) + return Uni.createFrom() + .item(execute) .onFailure() .transform( unexpected -> new JobExecutionException(jobDetails.getId(), "Unexpected error when executing Embedded request for job: " + jobDetails.getId() + ". " + unexpected.getMessage(), diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java index 95b9b0d92e..84754c6dc2 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobsService.java @@ -20,9 +20,8 @@ import java.util.concurrent.ExecutionException; +import org.kie.kogito.jobs.JobDescription; import org.kie.kogito.jobs.JobsService; -import org.kie.kogito.jobs.ProcessInstanceJobDescription; -import org.kie.kogito.jobs.ProcessJobDescription; import org.kie.kogito.jobs.api.JobCallbackResourceDef; import org.kie.kogito.jobs.service.adapter.JobDetailsAdapter; import org.kie.kogito.jobs.service.api.Job; @@ -54,13 +53,7 @@ public EmbeddedJobsService() { } @Override - public String scheduleProcessJob(ProcessJobDescription description) { - LOGGER.debug("ScheduleProcessJob: {} not supported", description); - return null; - } - - @Override - public String scheduleProcessInstanceJob(ProcessInstanceJobDescription description) { + public String scheduleJob(JobDescription description) { try { Job job = Job.builder() .id(description.id()) diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java index cb49a135b6..3a9cdf9286 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/InVMPayloadData.java @@ -18,31 +18,31 @@ */ package org.kie.kogito.jobs.embedded; -import org.kie.kogito.jobs.ProcessInstanceJobDescription; +import org.kie.kogito.jobs.JobDescription; import org.kie.kogito.jobs.service.api.PayloadData; -public class InVMPayloadData extends PayloadData { +public class InVMPayloadData extends PayloadData { - private ProcessInstanceJobDescription jobDescription; + private JobDescription jobDescription; public InVMPayloadData() { // do nothing } - public void setJobDescription(ProcessInstanceJobDescription jobDescription) { + public void setJobDescription(JobDescription jobDescription) { this.jobDescription = jobDescription; } - public ProcessInstanceJobDescription getJobDescription() { + public JobDescription getJobDescription() { return jobDescription; } @Override - public ProcessInstanceJobDescription getData() { + public JobDescription getData() { return jobDescription; } - public InVMPayloadData(ProcessInstanceJobDescription data) { + public InVMPayloadData(JobDescription data) { this.jobDescription = data; } diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java index dae4ac4ecf..78434dc841 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java @@ -25,7 +25,8 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import org.kie.kogito.event.EventPublisher; import org.kie.kogito.event.job.JobInstanceDataEvent; -import org.kie.kogito.jobs.ProcessInstanceJobDescription; +import org.kie.kogito.jobs.JobDescription; +import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription; import org.kie.kogito.jobs.service.adapter.ScheduledJobAdapter; import org.kie.kogito.jobs.service.api.Recipient; import org.kie.kogito.jobs.service.model.JobDetails; @@ -93,13 +94,15 @@ public void observe(@ObservesAsync EmbeddedJobServiceEvent serviceEvent) { try { ScheduledJob scheduledJob = ScheduledJobAdapter.of(jobDetails); Recipient recipient = jobDetails.getRecipient().getRecipient(); - ProcessInstanceJobDescription jobDescription = recipient.getPayload().getJobDescription(); - - scheduledJob.setProcessInstanceId(jobDescription.processInstanceId()); - scheduledJob.setProcessId(jobDescription.processId()); - scheduledJob.setRootProcessInstanceId(jobDescription.rootProcessInstanceId()); - scheduledJob.setRootProcessId(jobDescription.rootProcessId()); - scheduledJob.setNodeInstanceId(jobDescription.nodeInstanceId()); + JobDescription jobDescription = recipient.getPayload().getJobDescription(); + + if (jobDescription instanceof ProcessInstanceJobDescription processInstanceJobDescription) { + scheduledJob.setProcessInstanceId(processInstanceJobDescription.processInstanceId()); + scheduledJob.setProcessId(processInstanceJobDescription.processId()); + scheduledJob.setRootProcessInstanceId(processInstanceJobDescription.rootProcessInstanceId()); + scheduledJob.setRootProcessId(processInstanceJobDescription.rootProcessId()); + scheduledJob.setNodeInstanceId(processInstanceJobDescription.nodeInstanceId()); + } byte[] jsonContent = objectMapper.writeValueAsBytes(scheduledJob); JobInstanceDataEvent event = new JobInstanceDataEvent(JOB_EVENT_TYPE, diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java index c854f86f7f..33e0c8be4b 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java @@ -28,7 +28,7 @@ import org.kie.kogito.event.job.JobInstanceDataEvent; import org.kie.kogito.jobs.DurationExpirationTime; import org.kie.kogito.jobs.JobsService; -import org.kie.kogito.jobs.ProcessInstanceJobDescription; +import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription; import io.quarkus.test.junit.QuarkusTest; @@ -58,7 +58,7 @@ public void testJobService() throws Exception { CountDownLatch latch = new CountDownLatch(8); publisher.setLatch(latch); - ProcessInstanceJobDescription description = ProcessInstanceJobDescription.builder() + ProcessInstanceJobDescription description = ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder() .generateId() .timerId("-1") .expirationTime(DurationExpirationTime.now()) @@ -68,9 +68,9 @@ public void testJobService() throws Exception { .rootProcessId(null) .nodeInstanceId(NODE_INSTANCE_ID) .build(); - jobService.scheduleProcessInstanceJob(description); + jobService.scheduleJob(description); - ProcessInstanceJobDescription descriptionWRootProcess = ProcessInstanceJobDescription.builder() + ProcessInstanceJobDescription descriptionWRootProcess = ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder() .generateId() .timerId("-1") .expirationTime(DurationExpirationTime.now()) @@ -80,7 +80,7 @@ public void testJobService() throws Exception { .rootProcessId(ROOT_PROCESS_ID) .nodeInstanceId(NODE_INSTANCE_ID) .build(); - jobService.scheduleProcessInstanceJob(descriptionWRootProcess); + jobService.scheduleJob(descriptionWRootProcess); latch.await();