Skip to content

Commit

Permalink
[incubator-kie-issues-1551] Deadlines for Human Task (apache#2128)
Browse files Browse the repository at this point in the history
  • Loading branch information
elguardian authored and rgdoliveira committed Nov 18, 2024
1 parent 09037bb commit 0c2614a
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.kie.kogito.jobs.DurationExpirationTime;
import org.kie.kogito.jobs.ExactExpirationTime;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.service.api.serlialization.SerializationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,8 +46,8 @@ public ObjectMapperCustomizer customizer() {
return objectMapper -> {
LOGGER.debug("Jackson customization initialized.");
SimpleModule kogitoCustomModule = new SimpleModule();
kogitoCustomModule.addSerializer(ProcessInstanceJobDescription.class, new ProcessInstanceJobDescriptionSerializer());
kogitoCustomModule.addDeserializer(ProcessInstanceJobDescription.class, new ProcessInstanceJobDescriptionDeserializer());
kogitoCustomModule.addSerializer(JobDescription.class, new JobDescriptionSerializer());
kogitoCustomModule.addDeserializer(JobDescription.class, new JobDescriptionDeserializer());
kogitoCustomModule.addSerializer(DurationExpirationTime.class, new DurationExpirationTimeSerializer());
kogitoCustomModule.addDeserializer(DurationExpirationTime.class, new DurationExpirationTimeDeserializer());
kogitoCustomModule.addSerializer(ExactExpirationTime.class, new ExactExpirationTimeSerializer());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.jobs.service.json;

import java.io.IOException;

import org.kie.kogito.jobs.ExpirationTime;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescriptionBuilder;
import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription;
import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescriptionBuilder;

import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;

import static java.util.Optional.ofNullable;

public class JobDescriptionDeserializer extends StdDeserializer<JobDescription> {

private static final long serialVersionUID = -8307549297456060422L;

public JobDescriptionDeserializer() {
super(ProcessInstanceJobDescription.class);
}

@Override
public JobDescription deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JacksonException {
try {
JsonNode node = jp.getCodec().readTree(jp);
String jobDescriptionType = node.get("@type").asText();
switch (jobDescriptionType) {
case "ProcessInstanceJobDescription": {
ProcessInstanceJobDescriptionBuilder builder = ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder();
ofNullable(node.get("id")).ifPresent(e -> builder.id(e.textValue()));
ofNullable(node.get("priority")).ifPresent(e -> builder.priority(e.asInt()));
String expirationTimeType = node.get("expirationTime").get("@type").asText();
builder.expirationTime((ExpirationTime) ctxt.readTreeAsValue(node.get("expirationTime"), Class.forName(expirationTimeType)));

ofNullable(node.get("timerId")).ifPresent(e -> builder.timerId(e.textValue()));
ofNullable(node.get("processInstanceId")).ifPresent(e -> builder.processInstanceId(e.textValue()));
ofNullable(node.get("rootProcessInstanceId")).ifPresent(e -> builder.rootProcessInstanceId(e.textValue()));
ofNullable(node.get("processId")).ifPresent(e -> builder.processId(e.textValue()));
ofNullable(node.get("rootProcessId")).ifPresent(e -> builder.rootProcessId(e.textValue()));
ofNullable(node.get("nodeInstanceId")).ifPresent(e -> builder.nodeInstanceId(e.textValue()));

return builder.build();
}
case "UserTaskInstanceJobDescription": {
UserTaskInstanceJobDescriptionBuilder builder = UserTaskInstanceJobDescription.newUserTaskInstanceJobDescriptionBuilder();
ofNullable(node.get("id")).ifPresent(e -> builder.id(e.textValue()));
ofNullable(node.get("priority")).ifPresent(e -> builder.priority(e.asInt()));
String expirationTimeType = node.get("expirationTime").get("@type").asText();
builder.expirationTime((ExpirationTime) ctxt.readTreeAsValue(node.get("expirationTime"), Class.forName(expirationTimeType)));

ofNullable(node.get("userTaskInstanceId")).ifPresent(e -> builder.userTaskInstanceId(e.textValue()));
return builder.build();
}
}
} catch (ClassNotFoundException e1) {
throw new IllegalArgumentException("expiration time class not found", e1);
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,40 @@

import java.io.IOException;

import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;

public class ProcessInstanceJobDescriptionSerializer extends StdSerializer<ProcessInstanceJobDescription> {
public class JobDescriptionSerializer extends StdSerializer<JobDescription> {

private static final long serialVersionUID = -8307549297456060422L;

public ProcessInstanceJobDescriptionSerializer() {
super(ProcessInstanceJobDescription.class);
public JobDescriptionSerializer() {
super(JobDescription.class);
}

@Override
public void serialize(ProcessInstanceJobDescription value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
public void serialize(JobDescription value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
jgen.writeStartObject();
jgen.writeStringField("@type", value.getClass().getSimpleName());
jgen.writeStringField("id", value.id());
jgen.writeStringField("timerId", value.timerId());
jgen.writeObjectField("expirationTime", value.expirationTime());
jgen.writeNumberField("priority", value.priority());
jgen.writeStringField("processInstanceId", value.processInstanceId());
jgen.writeStringField("rootProcessInstanceId", value.rootProcessInstanceId());
jgen.writeStringField("processId", value.processId());
jgen.writeStringField("rootProcessId", value.rootProcessId());
jgen.writeStringField("nodeInstanceId", value.nodeInstanceId());
jgen.writeObjectField("expirationTime", value.expirationTime());
if (value instanceof ProcessInstanceJobDescription jobDescription) {
jgen.writeStringField("timerId", jobDescription.timerId());
jgen.writeStringField("processInstanceId", jobDescription.processInstanceId());
jgen.writeStringField("rootProcessInstanceId", jobDescription.rootProcessInstanceId());
jgen.writeStringField("processId", jobDescription.processId());
jgen.writeStringField("rootProcessId", jobDescription.rootProcessId());
jgen.writeStringField("nodeInstanceId", jobDescription.nodeInstanceId());
jgen.writeEndObject();
} else if (value instanceof UserTaskInstanceJobDescription userTaskInstanceJobDescription) {
jgen.writeStringField("userTaskInstanceId", userTaskInstanceJobDescription.getUserTaskInstanceId());
}
jgen.writeEndObject();
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
package org.kie.kogito.jobs.embedded;

import java.util.Optional;
import java.util.function.Supplier;

import org.kie.kogito.Application;
import org.kie.kogito.Model;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription;
import org.kie.kogito.jobs.service.api.Recipient;
import org.kie.kogito.jobs.service.exception.JobExecutionException;
import org.kie.kogito.jobs.service.executor.JobExecutor;
Expand All @@ -31,40 +35,81 @@
import org.kie.kogito.process.Process;
import org.kie.kogito.process.Processes;
import org.kie.kogito.services.jobs.impl.TriggerJobCommand;
import org.kie.kogito.usertask.UserTaskInstance;
import org.kie.kogito.usertask.UserTasks;

import io.smallrye.mutiny.Uni;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Alternative;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

import static org.kie.kogito.services.uow.UnitOfWorkExecutor.executeInUnitOfWork;

@ApplicationScoped
@Alternative
public class EmbeddedJobExecutor implements JobExecutor {

@Inject
Processes processes;
Instance<Processes> processes;

@Inject
Instance<UserTasks> userTasks;

@Inject
Application application;

@Override
public Uni<JobExecutionResponse> execute(JobDetails jobDetails) {

String correlationId = jobDetails.getCorrelationId();
RecipientInstance recipientModel = (RecipientInstance) jobDetails.getRecipient();
InVMRecipient recipient = (InVMRecipient) recipientModel.getRecipient();
String timerId = recipient.getPayload().getData().timerId();
String processInstanceId = recipient.getPayload().getData().processInstanceId();
Optional<Process<? extends Model>> process;
try {
process = processes.processByProcessInstanceId(processInstanceId);
} catch (Exception ex) {
return Uni.createFrom().failure(
new JobExecutionException(jobDetails.getId(),
"Unexpected error when executing Embedded request for job: " + jobDetails.getId() + ". " + ex.getMessage(),
ex));
JobDescription jobDescription = recipient.getPayload().getData();
if (jobDescription instanceof ProcessInstanceJobDescription processInstanceJobDescription && processes.isResolvable()) {
return processJobDescription(jobDetails, processInstanceJobDescription);
} else if (jobDescription instanceof UserTaskInstanceJobDescription userTaskInstanceJobDescription && userTasks.isResolvable()) {
return processJobDescription(jobDetails, userTaskInstanceJobDescription);
}

return Uni.createFrom().item(
JobExecutionResponse.builder()
.code("401")
.jobId(jobDetails.getId())
.now()
.message("job cannot be processed")
.build());
}

private Uni<JobExecutionResponse> processJobDescription(JobDetails jobDetails, UserTaskInstanceJobDescription userTaskInstanceJobDescription) {
Supplier<Void> execute = () -> executeInUnitOfWork(application.unitOfWorkManager(), () -> {
Optional<UserTaskInstance> userTaskInstance = userTasks.get().instances().findById(userTaskInstanceJobDescription.getUserTaskInstanceId());
if (userTaskInstance.isEmpty()) {
return null;
}
UserTaskInstance instance = userTaskInstance.get();
instance.trigger(userTaskInstanceJobDescription);
return null;
});

return Uni.createFrom().item(execute)
.onFailure()
.transform(
unexpected -> new JobExecutionException(jobDetails.getId(), "Unexpected error when executing Embedded request for job: " + jobDetails.getId() + ". " + unexpected.getMessage(),
unexpected))
.onItem()
.transform(res -> JobExecutionResponse.builder()
.message("Embedded job executed")
.code(String.valueOf(200))
.now()
.jobId(jobDetails.getId())
.build());

}

private Uni<JobExecutionResponse> processJobDescription(JobDetails jobDetails, ProcessInstanceJobDescription processInstanceJobDescription) {
String timerId = processInstanceJobDescription.timerId();
String processInstanceId = processInstanceJobDescription.processInstanceId();
Optional<Process<? extends Model>> process = processes.get().processByProcessInstanceId(processInstanceId);
if (process.isEmpty()) {
return Uni.createFrom().item(
JobExecutionResponse.builder()
Expand All @@ -77,9 +122,13 @@ public Uni<JobExecutionResponse> execute(JobDetails jobDetails) {

Integer limit = jobDetails.getRetries();

TriggerJobCommand command = new TriggerJobCommand(processInstanceId, correlationId, timerId, limit, process.get(), application.unitOfWorkManager());
Supplier<Boolean> execute = () -> executeInUnitOfWork(application.unitOfWorkManager(), () -> {
TriggerJobCommand command = new TriggerJobCommand(processInstanceId, jobDetails.getCorrelationId(), timerId, limit, process.get(), application.unitOfWorkManager());
return command.execute();
});

return Uni.createFrom().item(command::execute)
return Uni.createFrom()
.item(execute)
.onFailure()
.transform(
unexpected -> new JobExecutionException(jobDetails.getId(), "Unexpected error when executing Embedded request for job: " + jobDetails.getId() + ". " + unexpected.getMessage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@

import java.util.concurrent.ExecutionException;

import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.JobsService;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.ProcessJobDescription;
import org.kie.kogito.jobs.api.JobCallbackResourceDef;
import org.kie.kogito.jobs.service.adapter.JobDetailsAdapter;
import org.kie.kogito.jobs.service.api.Job;
Expand Down Expand Up @@ -54,13 +53,7 @@ public EmbeddedJobsService() {
}

@Override
public String scheduleProcessJob(ProcessJobDescription description) {
LOGGER.debug("ScheduleProcessJob: {} not supported", description);
return null;
}

@Override
public String scheduleProcessInstanceJob(ProcessInstanceJobDescription description) {
public String scheduleJob(JobDescription description) {
try {
Job job = Job.builder()
.id(description.id())
Expand Down
Loading

0 comments on commit 0c2614a

Please sign in to comment.