Skip to content

Commit

Permalink
NIFI-12198 Add API and CLI commands to import reporting task snapshots (
Browse files Browse the repository at this point in the history
#7875)

* NIFI-12198 Add API and CLI commands to import reporting task snapshots
  • Loading branch information
bbende authored Oct 20, 2023
1 parent 184757f commit fd2de5a
Show file tree
Hide file tree
Showing 22 changed files with 928 additions and 139 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.entity;

import io.swagger.annotations.ApiModelProperty;
import org.apache.nifi.flow.VersionedReportingTaskSnapshot;

import javax.xml.bind.annotation.XmlRootElement;

@XmlRootElement(name = "versionedReportingTaskImportRequestEntity")
public class VersionedReportingTaskImportRequestEntity extends Entity {

private VersionedReportingTaskSnapshot reportingTaskSnapshot;
private Boolean disconnectedNodeAcknowledged;

@ApiModelProperty("The snapshot to import")
public VersionedReportingTaskSnapshot getReportingTaskSnapshot() {
return reportingTaskSnapshot;
}

public void setReportingTaskSnapshot(VersionedReportingTaskSnapshot reportingTaskSnapshot) {
this.reportingTaskSnapshot = reportingTaskSnapshot;
}

@ApiModelProperty("The disconnected node acknowledged flag")
public Boolean getDisconnectedNodeAcknowledged() {
return disconnectedNodeAcknowledged;
}

public void setDisconnectedNodeAcknowledged(Boolean disconnectedNodeAcknowledged) {
this.disconnectedNodeAcknowledged = disconnectedNodeAcknowledged;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.entity;

import io.swagger.annotations.ApiModelProperty;

import javax.xml.bind.annotation.XmlRootElement;
import java.util.Set;

@XmlRootElement(name = "versionedReportingTaskImportResponseEntity")
public class VersionedReportingTaskImportResponseEntity extends Entity {

private Set<ReportingTaskEntity> reportingTasks;
private Set<ControllerServiceEntity> controllerServices;

@ApiModelProperty("The reporting tasks created by the import")
public Set<ReportingTaskEntity> getReportingTasks() {
return reportingTasks;
}

public void setReportingTasks(Set<ReportingTaskEntity> reportingTasks) {
this.reportingTasks = reportingTasks;
}

@ApiModelProperty("The controller services created by the import")
public Set<ControllerServiceEntity> getControllerServices() {
return controllerServices;
}

public void setControllerServices(Set<ControllerServiceEntity> controllerServices) {
this.controllerServices = controllerServices;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.flow.VersionedConfigurableExtension;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedReportingTaskSnapshot;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarClassLoadersHolder;
import org.apache.nifi.nar.PythonBundle;
Expand Down Expand Up @@ -216,30 +218,35 @@ public static Optional<BundleCoordinate> getOptionalCompatibleBundle(final Exten
*/
public static void discoverCompatibleBundles(final ExtensionManager extensionManager, final VersionedProcessGroup versionedGroup) {
if (versionedGroup.getProcessors() != null) {
versionedGroup.getProcessors().forEach(processor -> {
final BundleDTO dto = createBundleDto(processor.getBundle());
final BundleCoordinate coordinate = BundleUtils.getOptionalCompatibleBundle(extensionManager, processor.getType(), dto).orElse(
new BundleCoordinate(dto.getGroup(), dto.getArtifact(), dto.getVersion()));
processor.setBundle(createBundle(coordinate));
});
versionedGroup.getProcessors().forEach(processor -> discoverCompatibleBundle(extensionManager, processor));
}

if (versionedGroup.getControllerServices() != null) {
versionedGroup.getControllerServices().forEach(controllerService -> {
final BundleDTO dto = createBundleDto(controllerService.getBundle());

final BundleCoordinate coordinate = BundleUtils.getOptionalCompatibleBundle(extensionManager, controllerService.getType(), createBundleDto(controllerService.getBundle())).orElse(
new BundleCoordinate(dto.getGroup(), dto.getArtifact(), dto.getVersion()));

controllerService.setBundle(createBundle(coordinate));
});
versionedGroup.getControllerServices().forEach(controllerService -> discoverCompatibleBundle(extensionManager, controllerService));
}

if (versionedGroup.getProcessGroups() != null) {
versionedGroup.getProcessGroups().forEach(processGroup -> discoverCompatibleBundles(extensionManager, processGroup));
}
}

public static void discoverCompatibleBundles(final ExtensionManager extensionManager, final VersionedReportingTaskSnapshot reportingTaskSnapshot) {
if (reportingTaskSnapshot.getReportingTasks() != null) {
reportingTaskSnapshot.getReportingTasks().forEach(reportingTask -> discoverCompatibleBundle(extensionManager, reportingTask));
}

if (reportingTaskSnapshot.getControllerServices() != null) {
reportingTaskSnapshot.getControllerServices().forEach(controllerService -> discoverCompatibleBundle(extensionManager, controllerService));
}
}

public static void discoverCompatibleBundle(final ExtensionManager extensionManager, final VersionedConfigurableExtension extension) {
final BundleDTO dto = createBundleDto(extension.getBundle());
final BundleCoordinate coordinate = BundleUtils.getOptionalCompatibleBundle(extensionManager, extension.getType(), dto).orElse(
new BundleCoordinate(dto.getGroup(), dto.getArtifact(), dto.getVersion()));
extension.setBundle(createBundle(coordinate));
}

public static BundleCoordinate discoverCompatibleBundle(final ExtensionManager extensionManager, final String type, final org.apache.nifi.flow.Bundle bundle) {
return getCompatibleBundle(extensionManager, type, createBundleDto(bundle));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
*/
package org.apache.nifi.controller.reporting;

import java.util.Set;

import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.nar.ExtensionManager;

import java.util.Set;

/**
* A ReportingTaskProvider is responsible for providing management of, and
* access to, Reporting Tasks
Expand All @@ -41,11 +41,8 @@ public interface ReportingTaskProvider {
* being restored after a restart of the software
*
* @return the ReportingTaskNode that is used to manage the reporting task
*
* @throws ReportingTaskInstantiationException if unable to create the
* Reporting Task
*/
ReportingTaskNode createReportingTask(String type, String id, BundleCoordinate bundleCoordinate, boolean firstTimeAdded) throws ReportingTaskInstantiationException;
ReportingTaskNode createReportingTask(String type, String id, BundleCoordinate bundleCoordinate, boolean firstTimeAdded);

/**
* @param identifier of node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.flowanalysis.StandardFlowAnalyzer;
import org.apache.nifi.flowanalysis.TriggerFlowAnalysisTask;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.authorization.Authorizer;
Expand Down Expand Up @@ -81,7 +79,6 @@
import org.apache.nifi.controller.queue.clustered.server.LoadBalanceAuthorizer;
import org.apache.nifi.controller.queue.clustered.server.LoadBalanceProtocol;
import org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.reporting.ReportingTaskProvider;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.CounterRepository;
Expand Down Expand Up @@ -148,6 +145,8 @@
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flowanalysis.FlowAnalysisRule;
import org.apache.nifi.flowanalysis.StandardFlowAnalyzer;
import org.apache.nifi.flowanalysis.TriggerFlowAnalysisTask;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.BundleUpdateStrategy;
Expand Down Expand Up @@ -180,9 +179,9 @@
import org.apache.nifi.python.PythonBridge;
import org.apache.nifi.python.PythonBridgeInitializationContext;
import org.apache.nifi.python.PythonProcessConfig;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
import org.apache.nifi.remote.HttpRemoteSiteListener;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RemoteResourceManager;
Expand Down Expand Up @@ -2296,7 +2295,7 @@ public ReportingTaskNode getReportingTaskNode(final String identifier) {
}

@Override
public ReportingTaskNode createReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
public ReportingTaskNode createReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded) {
return flowManager.createReportingTask(type, id, bundleCoordinate, firstTimeAdded);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.serialization;

import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.encrypt.EncryptionException;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.VersionedConfigurableExtension;
import org.apache.nifi.flow.VersionedPropertyDescriptor;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.web.api.dto.BundleDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class FlowSynchronizationUtils {

private static final Logger logger = LoggerFactory.getLogger(FlowSynchronizationUtils.class);

private FlowSynchronizationUtils() {
}

static BundleCoordinate createBundleCoordinate(final ExtensionManager extensionManager, final Bundle bundle, final String componentType) {
BundleCoordinate coordinate;
try {
final BundleDTO bundleDto = new BundleDTO(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion());
coordinate = BundleUtils.getCompatibleBundle(extensionManager, componentType, bundleDto);
} catch (final IllegalStateException e) {
coordinate = new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion());
}

return coordinate;
}

static Set<String> getSensitiveDynamicPropertyNames(final ComponentNode componentNode, final VersionedConfigurableExtension extension) {
final Set<String> versionedSensitivePropertyNames = new LinkedHashSet<>();

// Get Sensitive Property Names based on encrypted values including both supported and dynamic properties
extension.getProperties()
.entrySet()
.stream()
.filter(entry -> isValueSensitive(entry.getValue()))
.map(Map.Entry::getKey)
.forEach(versionedSensitivePropertyNames::add);

// Get Sensitive Property Names based on supported and dynamic property descriptors
extension.getPropertyDescriptors()
.values()
.stream()
.filter(VersionedPropertyDescriptor::isSensitive)
.map(VersionedPropertyDescriptor::getName)
.forEach(versionedSensitivePropertyNames::add);

// Filter combined Sensitive Property Names based on Component Property Descriptor status
return versionedSensitivePropertyNames.stream()
.map(componentNode::getPropertyDescriptor)
.filter(PropertyDescriptor::isDynamic)
.map(PropertyDescriptor::getName)
.collect(Collectors.toSet());
}

static boolean isValueSensitive(final String value) {
return value != null && value.startsWith(FlowSerializer.ENC_PREFIX) && value.endsWith(FlowSerializer.ENC_SUFFIX);
}

static Map<String, String> decryptProperties(final Map<String, String> encrypted, final PropertyEncryptor encryptor) {
final Map<String, String> decrypted = new HashMap<>(encrypted.size());
encrypted.forEach((key, value) -> decrypted.put(key, decrypt(value, encryptor)));
return decrypted;
}

static String decrypt(final String value, final PropertyEncryptor encryptor) {
if (isValueSensitive(value)) {
try {
return encryptor.decrypt(value.substring(FlowSerializer.ENC_PREFIX.length(), value.length() - FlowSerializer.ENC_SUFFIX.length()));
} catch (EncryptionException e) {
final String moreDescriptiveMessage = "There was a problem decrypting a sensitive flow configuration value. " +
"Check that the nifi.sensitive.props.key value in nifi.properties matches the value used to encrypt the flow.json.gz file";
logger.error(moreDescriptiveMessage, e);
throw new EncryptionException(moreDescriptiveMessage, e);
}
} else {
return value;
}
}


}
Loading

0 comments on commit fd2de5a

Please sign in to comment.