Skip to content

Commit

Permalink
NIFI-12224 Added Support for updateMany Method in PutMongo
Browse files Browse the repository at this point in the history
This closes #8610

Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
umarhussain15 authored and exceptionfactory committed Aug 5, 2024
1 parent 141ca71 commit c5ed5c5
Show file tree
Hide file tree
Showing 5 changed files with 387 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
Expand All @@ -46,13 +47,13 @@
import java.io.UnsupportedEncodingException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;

public abstract class AbstractMongoProcessor extends AbstractProcessor {
public static final String ATTRIBUTE_MONGODB_UPDATE_MODE = "mongodb.update.mode";

protected static final String JSON_TYPE_EXTENDED = "Extended";
protected static final String JSON_TYPE_STANDARD = "Standard";
protected static final AllowableValue JSON_EXTENDED = new AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON",
Expand Down Expand Up @@ -156,14 +157,41 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

static final List<PropertyDescriptor> descriptors;
static final List<PropertyDescriptor> descriptors = List.of(
CLIENT_SERVICE,
DATABASE_NAME,
COLLECTION_NAME
);

public enum UpdateMethod implements DescribedValue {
UPDATE_ONE("one", "Update One", "Updates only the first document that matches the query."),
UPDATE_MANY("many", "Update Many", "Updates every document that matches the query."),
UPDATE_FF_ATTRIBUTE("flowfile-attribute", "Use '" + ATTRIBUTE_MONGODB_UPDATE_MODE + "' FlowFile attribute.",
"Use the value of the '" + ATTRIBUTE_MONGODB_UPDATE_MODE + "' attribute of the incoming FlowFile. Acceptable values are 'one' and 'many'.");
private final String value;
private final String displayName;
private final String description;

UpdateMethod(final String value, final String displayName, final String description) {
this.value = value;
this.displayName = displayName;
this.description = description;
}

static {
List<PropertyDescriptor> _temp = new ArrayList<>();
_temp.add(CLIENT_SERVICE);
_temp.add(DATABASE_NAME);
_temp.add(COLLECTION_NAME);
descriptors = Collections.unmodifiableList(_temp);
@Override
public String getValue() {
return value;
}

@Override
public String getDisplayName() {
return displayName;
}

@Override
public String getDescription() {
return description;
}
}

protected ObjectMapper objectMapper;
Expand Down Expand Up @@ -235,4 +263,20 @@ protected synchronized void configureMapper(String setting, String dateFormat) {
objectMapper.setDateFormat(df);
}
}

/**
* Checks if given update mode option matches for the incoming flow file
* @param updateMethodToMatch the value against which processor's mode is compared
* @param configuredUpdateMethod the value coming from running processor
* @param flowFile incoming flow file to extract processor mode
* @return true if the incoming files update mode matches with updateMethodToMatch
*/
protected boolean updateModeMatches(
UpdateMethod updateMethodToMatch, UpdateMethod configuredUpdateMethod, FlowFile flowFile) {

return updateMethodToMatch == configuredUpdateMethod
|| (UpdateMethod.UPDATE_FF_ATTRIBUTE == configuredUpdateMethod
&& updateMethodToMatch.getValue().equalsIgnoreCase(flowFile.getAttribute(ATTRIBUTE_MONGODB_UPDATE_MODE)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.UpdateResult;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
Expand All @@ -42,14 +45,14 @@
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StringUtils;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.types.ObjectId;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -58,12 +61,21 @@
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Writes the contents of a FlowFile to MongoDB")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@WritesAttributes({
@WritesAttribute(attribute = PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT, description = "The match count from result if update/upsert is performed, otherwise not set."),
@WritesAttribute(attribute = PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT, description = "The modify count from result if update/upsert is performed, otherwise not set."),
@WritesAttribute(attribute = PutMongo.ATTRIBUTE_UPSERT_ID, description = "The '_id' hex value if upsert is performed, otherwise not set.")
})
public class PutMongo extends AbstractMongoProcessor {
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("All FlowFiles that are written to MongoDB are routed to this relationship").build();
static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build();

static final String ATTRIBUTE_UPDATE_MATCH_COUNT = "mongo.put.update.match.count";
static final String ATTRIBUTE_UPDATE_MODIFY_COUNT = "mongo.put.update.modify.count";
static final String ATTRIBUTE_UPSERT_ID = "mongo.put.upsert.id";

static final String MODE_INSERT = "insert";
static final String MODE_UPDATE = "update";

Expand All @@ -82,39 +94,50 @@ public class PutMongo extends AbstractMongoProcessor {
.description("When true, inserts a document if no document matches the update query criteria; this property is valid only when using update mode, "
+ "otherwise it is ignored")
.required(true)
.dependsOn(MODE, MODE_UPDATE)
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("false")
.build();
static final PropertyDescriptor UPDATE_QUERY_KEY = new PropertyDescriptor.Builder()
.name("Update Query Key")
.description("Key name used to build the update query criteria; this property is valid only when using update mode, "
+ "otherwise it is ignored. Example: _id")
.description("One or more comma-separated document key names used to build the update query criteria, such as _id")
.required(false)
.dependsOn(MODE, MODE_UPDATE)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor UPDATE_QUERY = new PropertyDescriptor.Builder()
.name("putmongo-update-query")
.displayName("Update Query")
.description("Specify a full MongoDB query to be used for the lookup query to do an update/upsert.")
.description("Specify a full MongoDB query to be used for the lookup query to do an update/upsert. NOTE: this field is ignored if the '%s' value is not empty."
.formatted(UPDATE_QUERY_KEY.getDisplayName()))
.required(false)
.dependsOn(MODE, MODE_UPDATE)
.addValidator(JsonValidator.INSTANCE)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

static final PropertyDescriptor UPDATE_MODE = new PropertyDescriptor.Builder()
static final PropertyDescriptor UPDATE_OPERATION_MODE = new PropertyDescriptor.Builder()
.displayName("Update Mode")
.name("put-mongo-update-mode")
.required(true)
.dependsOn(MODE, MODE_UPDATE)
.allowableValues(UPDATE_WITH_DOC, UPDATE_WITH_OPERATORS)
.defaultValue(UPDATE_WITH_DOC.getValue())
.defaultValue(UPDATE_WITH_DOC)
.description("Choose an update mode. You can either supply a JSON document to use as a direct replacement " +
"or specify a document that contains update operators like $set, $unset, and $inc. " +
"When Operators mode is enabled, the flowfile content is expected to be the operator part " +
"for example: {$set:{\"key\": \"value\"},$inc:{\"count\":1234}} and the update query will come " +
"from the configured Update Query property.")
.build();
static final PropertyDescriptor UPDATE_METHOD = new PropertyDescriptor.Builder()
.name("Update Method")
.dependsOn(UPDATE_OPERATION_MODE, UPDATE_WITH_OPERATORS)
.description("MongoDB method for running collection update operations, such as updateOne or updateMany")
.allowableValues(UpdateMethod.class)
.defaultValue(UpdateMethod.UPDATE_ONE)
.build();
static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("The Character Set in which the data is encoded")
Expand All @@ -123,24 +146,20 @@ public class PutMongo extends AbstractMongoProcessor {
.defaultValue("UTF-8")
.build();

private final static Set<Relationship> relationships;
private final static Set<Relationship> relationships = Set.of(REL_SUCCESS, REL_FAILURE);

private final static List<PropertyDescriptor> propertyDescriptors;

static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(descriptors);
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(descriptors);
_propertyDescriptors.add(MODE);
_propertyDescriptors.add(UPSERT);
_propertyDescriptors.add(UPDATE_QUERY_KEY);
_propertyDescriptors.add(UPDATE_QUERY);
_propertyDescriptors.add(UPDATE_MODE);
_propertyDescriptors.add(UPDATE_OPERATION_MODE);
_propertyDescriptors.add(UPDATE_METHOD);
_propertyDescriptors.add(CHARACTER_SET);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);

final Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(_relationships);
}

@Override
Expand Down Expand Up @@ -183,16 +202,16 @@ protected Collection<ValidationResult> customValidate(final ValidationContext va

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}

final ComponentLog logger = getLogger();

final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final String mode = context.getProperty(MODE).getValue();
final String updateMode = context.getProperty(UPDATE_MODE).getValue();
final String processorMode = context.getProperty(MODE).getValue();
final String updateOperationMode = context.getProperty(UPDATE_OPERATION_MODE).getValue();
final WriteConcern writeConcern = clientService.getWriteConcern();

try {
Expand All @@ -202,32 +221,50 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, true));

// parse
final Object doc = (mode.equals(MODE_INSERT) || (mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
final Object doc = (processorMode.equals(MODE_INSERT) || (processorMode.equals(MODE_UPDATE) && updateOperationMode.equals(UPDATE_WITH_DOC.getValue())))
? Document.parse(new String(content, charset)) : BasicDBObject.parse(new String(content, charset));

if (MODE_INSERT.equalsIgnoreCase(mode)) {
if (MODE_INSERT.equals(processorMode)) {
collection.insertOne((Document) doc);
logger.info("inserted {} into MongoDB", flowFile);
} else {
// update
final boolean upsert = context.getProperty(UPSERT).asBoolean();
final String updateKey = context.getProperty(UPDATE_QUERY_KEY).evaluateAttributeExpressions(flowFile).getValue();
final String filterQuery = context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue();
final Document query;
final Document updateQuery;

if (!StringUtils.isBlank(updateKey)) {
query = parseUpdateKey(updateKey, (Map) doc);
if (StringUtils.isNotBlank(updateKey)) {
updateQuery = parseUpdateKey(updateKey, (Map) doc);
removeUpdateKeys(updateKey, (Map) doc);
} else {
query = Document.parse(filterQuery);
updateQuery = Document.parse(filterQuery);
}

if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
collection.replaceOne(query, (Document) doc, new ReplaceOptions().upsert(upsert));
UpdateResult updateResult;
if (updateOperationMode.equals(UPDATE_WITH_DOC.getValue())) {
updateResult = collection.replaceOne(updateQuery, (Document) doc, new ReplaceOptions().upsert(upsert));
} else {
BasicDBObject update = (BasicDBObject) doc;
update.remove(updateKey);
collection.updateOne(query, update, new UpdateOptions().upsert(upsert));
UpdateOptions updateOptions = new UpdateOptions().upsert(upsert);
UpdateMethod updateQueryMode = context.getProperty(UPDATE_METHOD).asAllowableValue(UpdateMethod.class);

if (this.updateModeMatches(UpdateMethod.UPDATE_ONE, updateQueryMode, flowFile)) {
updateResult = collection.updateOne(updateQuery, update, updateOptions);
} else if (this.updateModeMatches(UpdateMethod.UPDATE_MANY, updateQueryMode, flowFile)) {
updateResult = collection.updateMany(updateQuery, update, updateOptions);
} else {
String flowfileUpdateMode = flowFile.getAttribute(ATTRIBUTE_MONGODB_UPDATE_MODE);
throw new ProcessException("Unrecognized '" + ATTRIBUTE_MONGODB_UPDATE_MODE + "' value '" + flowfileUpdateMode + "'");
}
}

flowFile = session.putAttribute(flowFile, ATTRIBUTE_UPDATE_MATCH_COUNT, String.valueOf(updateResult.getMatchedCount()));
flowFile = session.putAttribute(flowFile, ATTRIBUTE_UPDATE_MODIFY_COUNT, String.valueOf(updateResult.getModifiedCount()));
BsonValue upsertedId = updateResult.getUpsertedId();
if (upsertedId != null) {
String id = upsertedId.isString() ? upsertedId.asString().getValue() : upsertedId.asObjectId().getValue().toString();
flowFile = session.putAttribute(flowFile, ATTRIBUTE_UPSERT_ID, id);
}
logger.info("updated {} into MongoDB", flowFile);
}
Expand Down
Loading

0 comments on commit c5ed5c5

Please sign in to comment.