Skip to content

Commit

Permalink
add in vm user task execution
Browse files Browse the repository at this point in the history
  • Loading branch information
elguardian committed Nov 11, 2024
1 parent 7c34fed commit c27cea5
Showing 1 changed file with 48 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.kie.kogito.jobs.embedded;

import java.util.Optional;
import java.util.function.Supplier;

import org.kie.kogito.Application;
import org.kie.kogito.Model;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription;
import org.kie.kogito.jobs.service.api.Recipient;
import org.kie.kogito.jobs.service.exception.JobExecutionException;
import org.kie.kogito.jobs.service.executor.JobExecutor;
Expand All @@ -33,19 +35,27 @@
import org.kie.kogito.process.Process;
import org.kie.kogito.process.Processes;
import org.kie.kogito.services.jobs.impl.TriggerJobCommand;
import org.kie.kogito.usertask.UserTaskInstance;
import org.kie.kogito.usertask.UserTasks;

import io.smallrye.mutiny.Uni;

import static org.kie.kogito.services.uow.UnitOfWorkExecutor.executeInUnitOfWork;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Alternative;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

@ApplicationScoped
@Alternative
public class EmbeddedJobExecutor implements JobExecutor {

@Inject
Processes processes;
Instance<Processes> processes;

@Inject
Instance<UserTasks> userTasks;

@Inject
Application application;
Expand All @@ -55,9 +65,12 @@ public Uni<JobExecutionResponse> execute(JobDetails jobDetails) {
RecipientInstance recipientModel = (RecipientInstance) jobDetails.getRecipient();
InVMRecipient recipient = (InVMRecipient) recipientModel.getRecipient();
JobDescription jobDescription = recipient.getPayload().getData();
if (jobDescription instanceof ProcessInstanceJobDescription processInstanceJobDescription) {
if (jobDescription instanceof ProcessInstanceJobDescription processInstanceJobDescription && processes.isResolvable()) {
return processJobDescription(jobDetails, processInstanceJobDescription);
} else if (jobDescription instanceof UserTaskInstanceJobDescription userTaskInstanceJobDescription && userTasks.isResolvable()) {
return processJobDescription(jobDetails, userTaskInstanceJobDescription);
}

return Uni.createFrom().item(
JobExecutionResponse.builder()
.code("401")
Expand All @@ -67,10 +80,36 @@ public Uni<JobExecutionResponse> execute(JobDetails jobDetails) {
.build());
}

private Uni<JobExecutionResponse> processJobDescription(JobDetails jobDetails, UserTaskInstanceJobDescription userTaskInstanceJobDescription) {
Supplier<Void> execute = () -> executeInUnitOfWork(application.unitOfWorkManager(), () -> {
Optional<UserTaskInstance> userTaskInstance = userTasks.get().instances().findById(userTaskInstanceJobDescription.getUserTaskInstanceId());
if (userTaskInstance.isEmpty()) {
return null;
}
UserTaskInstance instance = userTaskInstance.get();
instance.trigger(userTaskInstanceJobDescription);
return null;
});

return Uni.createFrom().item(execute)
.onFailure()
.transform(
unexpected -> new JobExecutionException(jobDetails.getId(), "Unexpected error when executing Embedded request for job: " + jobDetails.getId() + ". " + unexpected.getMessage(),
unexpected))
.onItem()
.transform(res -> JobExecutionResponse.builder()
.message("Embedded job executed")
.code(String.valueOf(200))
.now()
.jobId(jobDetails.getId())
.build());

}

private Uni<JobExecutionResponse> processJobDescription(JobDetails jobDetails, ProcessInstanceJobDescription processInstanceJobDescription) {
String timerId = processInstanceJobDescription.timerId();
String processInstanceId = processInstanceJobDescription.processInstanceId();
Optional<Process<? extends Model>> process = processes.processByProcessInstanceId(processInstanceId);
Optional<Process<? extends Model>> process = processes.get().processByProcessInstanceId(processInstanceId);
if (process.isEmpty()) {
return Uni.createFrom().item(
JobExecutionResponse.builder()
Expand All @@ -83,9 +122,13 @@ private Uni<JobExecutionResponse> processJobDescription(JobDetails jobDetails, P

Integer limit = jobDetails.getRetries();

TriggerJobCommand command = new TriggerJobCommand(processInstanceId, jobDetails.getCorrelationId(), timerId, limit, process.get(), application.unitOfWorkManager());
Supplier<Boolean> execute = () -> executeInUnitOfWork(application.unitOfWorkManager(), () -> {
TriggerJobCommand command = new TriggerJobCommand(processInstanceId, jobDetails.getCorrelationId(), timerId, limit, process.get(), application.unitOfWorkManager());
return command.execute();
});

return Uni.createFrom().item(command::execute)
return Uni.createFrom()
.item(execute)
.onFailure()
.transform(
unexpected -> new JobExecutionException(jobDetails.getId(), "Unexpected error when executing Embedded request for job: " + jobDetails.getId() + ". " + unexpected.getMessage(),
Expand Down

0 comments on commit c27cea5

Please sign in to comment.