Skip to content

Commit

Permalink
[incubator-kie-issues-1551] Deadlines for Human Task
Browse files Browse the repository at this point in the history
  • Loading branch information
elguardian committed Nov 11, 2024
1 parent 538f33c commit 7c34fed
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.kie.kogito.jobs.DurationExpirationTime;
import org.kie.kogito.jobs.ExactExpirationTime;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.service.api.serlialization.SerializationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import java.io.IOException;

import org.kie.kogito.jobs.ExpirationTime;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.ProcessInstanceJobDescriptionBuilder;
import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescriptionBuilder;

import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonParser;
Expand All @@ -42,7 +42,7 @@ public ProcessInstanceJobDescriptionDeserializer() {

@Override
public ProcessInstanceJobDescription deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JacksonException {
ProcessInstanceJobDescriptionBuilder builder = ProcessInstanceJobDescription.builder();
ProcessInstanceJobDescriptionBuilder builder = ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder();

JsonNode node = jp.getCodec().readTree(jp);
ofNullable(node.get("id")).ifPresent(e -> builder.id(e.textValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.io.IOException;

import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescription;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import org.kie.kogito.Application;
import org.kie.kogito.Model;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.service.api.Recipient;
import org.kie.kogito.jobs.service.exception.JobExecutionException;
import org.kie.kogito.jobs.service.executor.JobExecutor;
Expand Down Expand Up @@ -50,21 +52,25 @@ public class EmbeddedJobExecutor implements JobExecutor {

@Override
public Uni<JobExecutionResponse> execute(JobDetails jobDetails) {

String correlationId = jobDetails.getCorrelationId();
RecipientInstance recipientModel = (RecipientInstance) jobDetails.getRecipient();
InVMRecipient recipient = (InVMRecipient) recipientModel.getRecipient();
String timerId = recipient.getPayload().getData().timerId();
String processInstanceId = recipient.getPayload().getData().processInstanceId();
Optional<Process<? extends Model>> process;
try {
process = processes.processByProcessInstanceId(processInstanceId);
} catch (Exception ex) {
return Uni.createFrom().failure(
new JobExecutionException(jobDetails.getId(),
"Unexpected error when executing Embedded request for job: " + jobDetails.getId() + ". " + ex.getMessage(),
ex));
JobDescription jobDescription = recipient.getPayload().getData();
if (jobDescription instanceof ProcessInstanceJobDescription processInstanceJobDescription) {
return processJobDescription(jobDetails, processInstanceJobDescription);
}
return Uni.createFrom().item(
JobExecutionResponse.builder()
.code("401")
.jobId(jobDetails.getId())
.now()
.message("job cannot be processed")
.build());
}

private Uni<JobExecutionResponse> processJobDescription(JobDetails jobDetails, ProcessInstanceJobDescription processInstanceJobDescription) {
String timerId = processInstanceJobDescription.timerId();
String processInstanceId = processInstanceJobDescription.processInstanceId();
Optional<Process<? extends Model>> process = processes.processByProcessInstanceId(processInstanceId);
if (process.isEmpty()) {
return Uni.createFrom().item(
JobExecutionResponse.builder()
Expand All @@ -77,7 +83,7 @@ public Uni<JobExecutionResponse> execute(JobDetails jobDetails) {

Integer limit = jobDetails.getRetries();

TriggerJobCommand command = new TriggerJobCommand(processInstanceId, correlationId, timerId, limit, process.get(), application.unitOfWorkManager());
TriggerJobCommand command = new TriggerJobCommand(processInstanceId, jobDetails.getCorrelationId(), timerId, limit, process.get(), application.unitOfWorkManager());

return Uni.createFrom().item(command::execute)
.onFailure()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@

import java.util.concurrent.ExecutionException;

import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.JobsService;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.ProcessJobDescription;
import org.kie.kogito.jobs.api.JobCallbackResourceDef;
import org.kie.kogito.jobs.service.adapter.JobDetailsAdapter;
import org.kie.kogito.jobs.service.api.Job;
Expand Down Expand Up @@ -54,13 +53,7 @@ public EmbeddedJobsService() {
}

@Override
public String scheduleProcessJob(ProcessJobDescription description) {
LOGGER.debug("ScheduleProcessJob: {} not supported", description);
return null;
}

@Override
public String scheduleProcessInstanceJob(ProcessInstanceJobDescription description) {
public String scheduleJob(JobDescription description) {
try {
Job job = Job.builder()
.id(description.id())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
*/
package org.kie.kogito.jobs.embedded;

import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.service.api.PayloadData;

public class InVMPayloadData extends PayloadData<ProcessInstanceJobDescription> {
public class InVMPayloadData extends PayloadData<JobDescription> {

private ProcessInstanceJobDescription jobDescription;
private JobDescription jobDescription;

public InVMPayloadData() {
// do nothing
Expand All @@ -33,16 +34,16 @@ public void setJobDescription(ProcessInstanceJobDescription jobDescription) {
this.jobDescription = jobDescription;
}

public ProcessInstanceJobDescription getJobDescription() {
public JobDescription getJobDescription() {
return jobDescription;
}

@Override
public ProcessInstanceJobDescription getData() {
public JobDescription getData() {
return jobDescription;
}

public InVMPayloadData(ProcessInstanceJobDescription data) {
public InVMPayloadData(JobDescription data) {
this.jobDescription = data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.event.job.JobInstanceDataEvent;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.service.adapter.ScheduledJobAdapter;
import org.kie.kogito.jobs.service.api.Recipient;
import org.kie.kogito.jobs.service.model.JobDetails;
Expand Down Expand Up @@ -93,13 +94,15 @@ public void observe(@ObservesAsync EmbeddedJobServiceEvent serviceEvent) {
try {
ScheduledJob scheduledJob = ScheduledJobAdapter.of(jobDetails);
Recipient<InVMPayloadData> recipient = jobDetails.getRecipient().getRecipient();
ProcessInstanceJobDescription jobDescription = recipient.getPayload().getJobDescription();

scheduledJob.setProcessInstanceId(jobDescription.processInstanceId());
scheduledJob.setProcessId(jobDescription.processId());
scheduledJob.setRootProcessInstanceId(jobDescription.rootProcessInstanceId());
scheduledJob.setRootProcessId(jobDescription.rootProcessId());
scheduledJob.setNodeInstanceId(jobDescription.nodeInstanceId());
JobDescription jobDescription = recipient.getPayload().getJobDescription();

if (jobDescription instanceof ProcessInstanceJobDescription processInstanceJobDescription) {
scheduledJob.setProcessInstanceId(processInstanceJobDescription.processInstanceId());
scheduledJob.setProcessId(processInstanceJobDescription.processId());
scheduledJob.setRootProcessInstanceId(processInstanceJobDescription.rootProcessInstanceId());
scheduledJob.setRootProcessId(processInstanceJobDescription.rootProcessId());
scheduledJob.setNodeInstanceId(processInstanceJobDescription.nodeInstanceId());
}

byte[] jsonContent = objectMapper.writeValueAsBytes(scheduledJob);
JobInstanceDataEvent event = new JobInstanceDataEvent(JOB_EVENT_TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.kie.kogito.event.job.JobInstanceDataEvent;
import org.kie.kogito.jobs.DurationExpirationTime;
import org.kie.kogito.jobs.JobsService;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;

import io.quarkus.test.junit.QuarkusTest;

Expand Down Expand Up @@ -58,7 +58,7 @@ public void testJobService() throws Exception {
CountDownLatch latch = new CountDownLatch(8);
publisher.setLatch(latch);

ProcessInstanceJobDescription description = ProcessInstanceJobDescription.builder()
ProcessInstanceJobDescription description = ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder()
.generateId()
.timerId("-1")
.expirationTime(DurationExpirationTime.now())
Expand All @@ -68,9 +68,9 @@ public void testJobService() throws Exception {
.rootProcessId(null)
.nodeInstanceId(NODE_INSTANCE_ID)
.build();
jobService.scheduleProcessInstanceJob(description);
jobService.scheduleJob(description);

ProcessInstanceJobDescription descriptionWRootProcess = ProcessInstanceJobDescription.builder()
ProcessInstanceJobDescription descriptionWRootProcess = ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder()
.generateId()
.timerId("-1")
.expirationTime(DurationExpirationTime.now())
Expand All @@ -80,7 +80,7 @@ public void testJobService() throws Exception {
.rootProcessId(ROOT_PROCESS_ID)
.nodeInstanceId(NODE_INSTANCE_ID)
.build();
jobService.scheduleProcessInstanceJob(descriptionWRootProcess);
jobService.scheduleJob(descriptionWRootProcess);

latch.await();

Expand Down

0 comments on commit 7c34fed

Please sign in to comment.