Skip to content

Commit

Permalink
[kie-issues-249] Data index improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
elguardian committed Oct 4, 2023
1 parent 26833ff commit 8649d2a
Show file tree
Hide file tree
Showing 146 changed files with 5,393 additions and 3,237 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@
package org.kie.kogito.events.mongodb;

import java.util.Collection;
import java.util.function.BooleanSupplier;

import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.UserTaskInstanceDataEvent;
import org.kie.kogito.event.process.VariableInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;
import org.kie.kogito.events.mongodb.codec.EventMongoDBCodecProvider;
import org.kie.kogito.mongodb.transaction.AbstractTransactionManager;
import org.slf4j.Logger;
Expand All @@ -48,9 +46,8 @@ public abstract class MongoDBEventPublisher implements EventPublisher {

static final String ID = "_id";

private MongoCollection<ProcessInstanceDataEvent> processInstanceDataEventCollection;
private MongoCollection<UserTaskInstanceDataEvent> userTaskInstanceDataEventCollection;
private MongoCollection<VariableInstanceDataEvent> variableInstanceDataEventCollection;
private MongoCollection<ProcessInstanceStateDataEvent> processInstanceDataEventCollection;
private MongoCollection<UserTaskInstanceStateDataEvent> userTaskInstanceDataEventCollection;

protected abstract MongoClient mongoClient();

Expand All @@ -60,54 +57,47 @@ public abstract class MongoDBEventPublisher implements EventPublisher {

protected abstract boolean userTasksEvents();

protected abstract boolean variablesEvents();

protected abstract String eventsDatabaseName();

protected abstract String processInstancesEventsCollection();

protected abstract String userTasksEventsCollection();

protected abstract String variablesEventsCollection();

protected void configure() {
CodecRegistry registry = CodecRegistries.fromRegistries(MongoClientSettings.getDefaultCodecRegistry(), fromProviders(new EventMongoDBCodecProvider(),
PojoCodecProvider.builder().automatic(true).build()));
MongoDatabase mongoDatabase = mongoClient().getDatabase(eventsDatabaseName()).withCodecRegistry(registry);
processInstanceDataEventCollection = mongoDatabase.getCollection(processInstancesEventsCollection(), ProcessInstanceDataEvent.class).withCodecRegistry(registry);
userTaskInstanceDataEventCollection = mongoDatabase.getCollection(userTasksEventsCollection(), UserTaskInstanceDataEvent.class).withCodecRegistry(registry);
variableInstanceDataEventCollection = mongoDatabase.getCollection(variablesEventsCollection(), VariableInstanceDataEvent.class).withCodecRegistry(registry);
processInstanceDataEventCollection = mongoDatabase.getCollection(processInstancesEventsCollection(), ProcessInstanceStateDataEvent.class).withCodecRegistry(registry);
userTaskInstanceDataEventCollection = mongoDatabase.getCollection(userTasksEventsCollection(), UserTaskInstanceStateDataEvent.class).withCodecRegistry(registry);
}

@Override
public void publish(DataEvent<?> event) {
switch (event.getType()) {
case "ProcessInstanceEvent":
publishEvent(processInstanceDataEventCollection, (ProcessInstanceDataEvent) event, this::processInstancesEvents);
break;
case "UserTaskInstanceEvent":
publishEvent(userTaskInstanceDataEventCollection, (UserTaskInstanceDataEvent) event, this::userTasksEvents);
break;
case "VariableInstanceEvent":
publishEvent(variableInstanceDataEventCollection, (VariableInstanceDataEvent) event, this::variablesEvents);
break;
default:
logger.warn("Unknown type of event '{}', ignoring", event.getType());
if (this.processInstancesEvents() && event instanceof ProcessInstanceStateDataEvent) {
publishEvent(processInstanceDataEventCollection, (ProcessInstanceStateDataEvent) event);
return;
}

if (this.userTasksEvents() && event instanceof UserTaskInstanceStateDataEvent) {
publishEvent(userTaskInstanceDataEventCollection, (UserTaskInstanceStateDataEvent) event);
return;
}

logger.debug("Unknown type of event '{}', ignoring", event.getType());

}

private <T extends DataEvent<?>> void publishEvent(MongoCollection<T> collection, T event, BooleanSupplier enabled) {
if (enabled.getAsBoolean()) {
if (transactionManager().enabled()) {
collection.insertOne(transactionManager().getClientSession(), event);
// delete the event immediately from the outbox collection
collection.deleteOne(transactionManager().getClientSession(), Filters.eq(ID, event.getId()));
} else {
collection.insertOne(event);
// delete the event from the outbox collection
collection.deleteOne(Filters.eq(ID, event.getId()));
}
private <T extends DataEvent<?>> void publishEvent(MongoCollection<T> collection, T event) {
if (transactionManager().enabled()) {
collection.insertOne(transactionManager().getClientSession(), event);
// delete the event immediately from the outbox collection
collection.deleteOne(transactionManager().getClientSession(), Filters.eq(ID, event.getId()));
} else {
collection.insertOne(event);
// delete the event from the outbox collection
collection.deleteOne(Filters.eq(ID, event.getId()));
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.bson.codecs.Codec;
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistry;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.UserTaskInstanceDataEvent;
import org.kie.kogito.event.process.VariableInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;

public class EventMongoDBCodecProvider implements CodecProvider {

Expand All @@ -34,13 +34,13 @@ public class EventMongoDBCodecProvider implements CodecProvider {
@SuppressWarnings("unchecked")
@Override
public <T> Codec<T> get(Class<T> aClass, CodecRegistry codecRegistry) {
if (aClass == ProcessInstanceDataEvent.class) {
if (aClass == ProcessInstanceStateDataEvent.class) {
return (Codec<T>) PROCESS_INSTANCE_DATA_EVENT_CODEC;
}
if (aClass == UserTaskInstanceDataEvent.class) {
if (aClass == UserTaskInstanceStateDataEvent.class) {
return (Codec<T>) USER_TASK_INSTANCE_DATA_EVENT_CODEC;
}
if (aClass == VariableInstanceDataEvent.class) {
if (aClass == ProcessInstanceVariableDataEvent.class) {
return (Codec<T>) VARIABLE_INSTANCE_DATA_EVENT_CODEC;
}
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.kie.kogito.events.mongodb.codec;

import org.bson.BsonReader;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.bson.BsonWriter;
import org.bson.Document;
import org.bson.codecs.CollectibleCodec;
import org.bson.codecs.DecoderContext;
import org.bson.codecs.EncoderContext;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeEventBody;

import static org.kie.kogito.events.mongodb.codec.CodecUtils.codec;
import static org.kie.kogito.events.mongodb.codec.CodecUtils.encodeDataEvent;

public class NodeInstanceDataEventCodec implements CollectibleCodec<ProcessInstanceNodeDataEvent> {

@Override
public ProcessInstanceNodeDataEvent generateIdIfAbsentFromDocument(ProcessInstanceNodeDataEvent nodeInstanceDataEvent) {
return nodeInstanceDataEvent;
}

@Override
public boolean documentHasId(ProcessInstanceNodeDataEvent processInstanceDataEvent) {
return processInstanceDataEvent.getId() != null;
}

@Override
public BsonValue getDocumentId(ProcessInstanceNodeDataEvent processInstanceDataEvent) {
return new BsonString(processInstanceDataEvent.getId());
}

@Override
public ProcessInstanceNodeDataEvent decode(BsonReader bsonReader, DecoderContext decoderContext) {
// The events persist in an outbox collection
// The events are deleted immediately (in the same transaction)
// "decode" is not supposed to take place in any scenario
return null;
}

@Override
public void encode(BsonWriter bsonWriter, ProcessInstanceNodeDataEvent nodeInstanceDataEvent, EncoderContext encoderContext) {
Document doc = new Document();
encodeDataEvent(doc, nodeInstanceDataEvent);
doc.put("kogitoProcessType", nodeInstanceDataEvent.getKogitoProcessType());
doc.put("kogitoProcessInstanceVersion", nodeInstanceDataEvent.getKogitoProcessInstanceVersion());
doc.put("kogitoParentProcessinstanceId", nodeInstanceDataEvent.getKogitoParentProcessInstanceId());
doc.put("kogitoProcessinstanceState", nodeInstanceDataEvent.getKogitoProcessInstanceState());
doc.put("kogitoReferenceId", nodeInstanceDataEvent.getKogitoReferenceId());
doc.put("kogitoStartFromNode", nodeInstanceDataEvent.getKogitoStartFromNode());
doc.put("data", encodeData(nodeInstanceDataEvent.getData()));
codec().encode(bsonWriter, doc, encoderContext);
}

private Document encodeData(ProcessInstanceNodeEventBody data) {

Document doc = new Document();
doc.put("processInstanceId", data.getProcessInstanceId());
doc.put("connectionNodeInstanceId", data.getConnectionNodeInstanceId());
doc.put("id", data.getNodeInstanceId());
doc.put("nodeId", data.getNodeDefinitionId());
doc.put("nodeDefinitionId", data.getNodeDefinitionId());
doc.put("nodeName", data.getNodeName());
doc.put("nodeType", data.getNodeType());
doc.put("eventTime", data.getEventDate());
doc.put("eventType", data.getEventType());

if (!data.getData().isEmpty()) {
doc.put("data", new Document(data.getData()));
}

return doc;
}

@Override
public Class<ProcessInstanceNodeDataEvent> getEncoderClass() {
return ProcessInstanceNodeDataEvent.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.kie.kogito.events.mongodb.codec;

import java.util.stream.Collectors;

import org.bson.BsonReader;
import org.bson.BsonString;
import org.bson.BsonValue;
Expand All @@ -28,39 +26,39 @@
import org.bson.codecs.CollectibleCodec;
import org.bson.codecs.DecoderContext;
import org.bson.codecs.EncoderContext;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceEventBody;
import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
import org.kie.kogito.event.process.ProcessInstanceStateEventBody;

import static org.kie.kogito.events.mongodb.codec.CodecUtils.codec;
import static org.kie.kogito.events.mongodb.codec.CodecUtils.encodeDataEvent;

public class ProcessInstanceDataEventCodec implements CollectibleCodec<ProcessInstanceDataEvent> {
public class ProcessInstanceDataEventCodec implements CollectibleCodec<ProcessInstanceStateDataEvent> {

@Override
public ProcessInstanceDataEvent generateIdIfAbsentFromDocument(ProcessInstanceDataEvent processInstanceDataEvent) {
public ProcessInstanceStateDataEvent generateIdIfAbsentFromDocument(ProcessInstanceStateDataEvent processInstanceDataEvent) {
return processInstanceDataEvent;
}

@Override
public boolean documentHasId(ProcessInstanceDataEvent processInstanceDataEvent) {
public boolean documentHasId(ProcessInstanceStateDataEvent processInstanceDataEvent) {
return processInstanceDataEvent.getId() != null;
}

@Override
public BsonValue getDocumentId(ProcessInstanceDataEvent processInstanceDataEvent) {
public BsonValue getDocumentId(ProcessInstanceStateDataEvent processInstanceDataEvent) {
return new BsonString(processInstanceDataEvent.getId());
}

@Override
public ProcessInstanceDataEvent decode(BsonReader bsonReader, DecoderContext decoderContext) {
public ProcessInstanceStateDataEvent decode(BsonReader bsonReader, DecoderContext decoderContext) {
// The events persist in an outbox collection
// The events are deleted immediately (in the same transaction)
// "decode" is not supposed to take place in any scenario
return null;
}

@Override
public void encode(BsonWriter bsonWriter, ProcessInstanceDataEvent processInstanceDataEvent, EncoderContext encoderContext) {
public void encode(BsonWriter bsonWriter, ProcessInstanceStateDataEvent processInstanceDataEvent, EncoderContext encoderContext) {
Document doc = new Document();
encodeDataEvent(doc, processInstanceDataEvent);
doc.put("kogitoProcessType", processInstanceDataEvent.getKogitoProcessType());
Expand All @@ -74,67 +72,27 @@ public void encode(BsonWriter bsonWriter, ProcessInstanceDataEvent processInstan
codec().encode(bsonWriter, doc, encoderContext);
}

private Document encodeData(ProcessInstanceEventBody data) {
private Document encodeData(ProcessInstanceStateEventBody data) {
Document doc = new Document();
doc.put("id", data.getId());
doc.put("version", data.getVersion());
doc.put("id", data.getProcessInstanceId());
doc.put("version", data.getProcessVersion());
doc.put("parentInstanceId", data.getParentInstanceId());
doc.put("rootInstanceId", data.getRootInstanceId());
doc.put("rootInstanceId", data.getRootProcessInstanceId());
doc.put("processId", data.getProcessId());
doc.put("processType", data.getProcessType());
doc.put("rootProcessId", data.getRootProcessId());
doc.put("processName", data.getProcessName());
doc.put("startDate", data.getStartDate());
doc.put("endDate", data.getEndDate());
doc.put("eventDate", data.getEventDate());
doc.put("state", data.getState());
doc.put("businessKey", data.getBusinessKey());
doc.put("roles", data.getRoles());
doc.put("identity", data.getIdentity());

if (data.getVariables() != null) {
doc.put("variables", new Document(data.getVariables()));
}

if (data.getNodeInstances() != null) {
doc.put("nodeInstances",
data.getNodeInstances().stream().map(ni -> {
Document niDoc = new Document();
niDoc.put("id", ni.getId());
niDoc.put("nodeId", ni.getNodeId());
niDoc.put("nodeDefinitionId", ni.getNodeDefinitionId());
niDoc.put("nodeName", ni.getNodeName());
niDoc.put("nodeType", ni.getNodeType());
niDoc.put("triggerTime", ni.getTriggerTime());
if (ni.getLeaveTime() != null) {
niDoc.put("leaveTime", ni.getLeaveTime());
}
return niDoc;
}).collect(Collectors.toSet()));
}

if (data.getError() != null) {
Document eDoc = new Document();
eDoc.put("errorMessage", data.getError().getErrorMessage());
eDoc.put("nodeDefinitionId", data.getError().getNodeDefinitionId());
doc.put("error", eDoc);
}

if (data.getMilestones() != null) {
doc.put("milestones",
data.getMilestones().stream().map(m -> {
Document mDoc = new Document();
mDoc.put("id", m.getId());
mDoc.put("name", m.getName());
mDoc.put("status", m.getStatus());
return mDoc;
}).collect(Collectors.toSet()));
}
doc.put("identity", data.getEventUser());

return doc;
}

@Override
public Class<ProcessInstanceDataEvent> getEncoderClass() {
return ProcessInstanceDataEvent.class;
public Class<ProcessInstanceStateDataEvent> getEncoderClass() {
return ProcessInstanceStateDataEvent.class;
}
}
Loading

0 comments on commit 8649d2a

Please sign in to comment.