Skip to content

Commit

Permalink
NIFI-12240 Added Python Processors for Docs, ChatGPT, Chroma, and Pin…
Browse files Browse the repository at this point in the history
…econe

Created new python processors for text embeddings, inserting into Chroma, querying Chroma, querying ChatGPT, inserting into and querying Pinecone. Fixed some bugs in the Python framework. Added Python extensions to assembly. Also added ability to load dependencies from a requirements.txt as that was important for making the different vectorstore implementations play more nicely together.

Excluded nifi-python-extensions-bundle from GitHub build because it requires Maven to use unpack-resources goal, which will not work in GitHub because it uses mvn compile instead of mvn install

- ParseDocument
- ChunkDocument
- PromptChatGPT
- PutChroma
- PutPinecone
- QueryChroma
- QueryPinecone

NIFI-12195 Added support for requirements.txt to define Python dependencies

This closes #7894

Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
markap14 authored and exceptionfactory committed Nov 1, 2023
1 parent 945d8b5 commit 5bcad9e
Show file tree
Hide file tree
Showing 37 changed files with 2,366 additions and 110 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ env:
-pl -:nifi-system-test-suite
-pl -:nifi-nar-provider-assembly
-pl -:nifi-py4j-integration-tests
-pl -:nifi-python-extensions-bundle
MAVEN_VERIFY_COMMAND: >-
verify
--show-version
Expand Down
7 changes: 7 additions & 0 deletions nifi-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,13 @@ language governing permissions and limitations under the License. -->
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-python-extensions-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
<type>zip</type>
</dependency>

<!-- AspectJ library needed by the Java Agent used for native library loading (see bootstrap.conf) -->
<dependency>
<groupId>org.aspectj</groupId>
Expand Down
21 changes: 21 additions & 0 deletions nifi-assembly/src/main/assembly/dependencies.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,27 @@
<exclude>org.aspectj:aspectjweaver</exclude>
</excludes>
</dependencySet>

<!-- Unpack Python extensions -->
<dependencySet>
<scope>runtime</scope>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>./python/extensions</outputDirectory>
<directoryMode>0770</directoryMode>
<fileMode>0664</fileMode>
<useTransitiveFiltering>true</useTransitiveFiltering>
<includes>
<include>*:nifi-python-extensions-bundle</include>
</includes>
<unpack>true</unpack>
<unpackOptions>
<excludes>
<exclude>META-INF/</exclude>
<exclude>META-INF/**</exclude>
</excludes>
</unpackOptions>
</dependencySet>

</dependencySets>

</assembly>
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,34 @@ public ValidationResult validate(final String subject, final String input, final
};
}

public static Validator createNonNegativeFloatingPointValidator(final double maximum) {
return new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}

String reason = null;
try {
final double doubleValue = Double.parseDouble(input);
if (doubleValue < 0) {
reason = "Value must be non-negative but was " + doubleValue;
}
final double maxPlusDelta = maximum + 0.00001D;
if (doubleValue < 0 || doubleValue > maxPlusDelta) {
reason = "Value must be between 0 and " + maximum + " but was " + doubleValue;
}
} catch (final NumberFormatException e) {
reason = "not a valid integer";
}

return new ValidationResult.Builder().subject(subject).input(input).explanation(reason).valid(reason == null).build();
}

};
}

//
//
// SPECIFIC VALIDATOR IMPLEMENTATIONS THAT CANNOT BE ANONYMOUS CLASSES
Expand Down
29 changes: 26 additions & 3 deletions nifi-docs/src/main/asciidoc/python-developer-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -485,10 +485,33 @@ to pickup any changes seamlessly as soon as the Processor is started.

[[dependencies]]
== Adding Third-Party Dependencies
Python based Processors can be a single module, or they can be bundled together as a Python package. How you specify third-party dependencies depends on how
the Processor is packaged.

Third-party dependencies are defined for a Processor using the `dependencies` member of the `ProcessorDetails` inner class.
This is a list of Strings that indicate the PyPI modules that the Processor depends on. The format is the same format expected
by PyPI.
=== Package-level Dependencies

If one or more Processors are defined within a Python package, the package should define a `requirements.txt` file that declares all third-party dependencies
that are necessary for any Processor in the package. The file structure will then typically look like this:
----
my-python-package/
├── __init__.py
├── ProcessorA.py
├── ProcessorB.py
└── requirements.txt
----

In this way, all of the requirements will be loaded from the `requirements.txt` file once for the package. There will be no need to load the dependencies once for
ProcessorA and once for ProcessorB.


=== Processor-Level Dependencies
If your Processor is not a part of a Python package, its dependencies can be declared using the `dependencies` member of the `ProcessorDetails` inner class.
This is a list of Strings that indicate the PyPI modules that the Processor depends on. The format is the same format expected by PyPI.
This provides a convenience for declaring third-party dependencies without requiring that Processors be bundled into a package.

For example, to indicate that a Processor needs `pandas` installed, the implementation might
look like this:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,7 @@ public boolean isSensitiveDynamicProperty(final String name) {
public PropertyDescriptor getPropertyDescriptor(final String name) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, getComponent().getClass(), getComponent().getIdentifier())) {
final PropertyDescriptor propertyDescriptor = getComponent().getPropertyDescriptor(name);
if (propertyDescriptor.isDynamic() && sensitiveDynamicPropertyNames.get().contains(name)) {
if (propertyDescriptor.isDynamic() && isSensitiveDynamicProperty(name)) {
return new PropertyDescriptor.Builder().fromPropertyDescriptor(propertyDescriptor).sensitive(true).build();
} else {
return propertyDescriptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ public synchronized void terminateProcessor(final ProcessorNode procNode) {

try {
final Set<URL> additionalUrls = procNode.getAdditionalClasspathResources(procNode.getPropertyDescriptors());
flowController.getReloadComponent().reload(procNode, procNode.getProcessor().getClass().getName(), procNode.getBundleCoordinate(), additionalUrls);
flowController.getReloadComponent().reload(procNode, procNode.getCanonicalClassName(), procNode.getBundleCoordinate(), additionalUrls);
} catch (final ProcessorInstantiationException e) {
// This shouldn't happen because we already have been able to instantiate the processor before
LOG.error("Failed to replace instance of Processor for {} when terminating Processor", procNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,18 @@ private Process launchPythonProcess(final int listeningPort, final String authTo
final List<String> commands = new ArrayList<>();
commands.add(pythonCommand);

String pythonPath = pythonApiDirectory.getAbsolutePath();


if (processConfig.isDebugController() && "Controller".equals(componentId)) {
commands.add("-m");
commands.add("debugpy");
commands.add("--listen");
commands.add(processConfig.getDebugHost() + ":" + processConfig.getDebugPort());
commands.add("--log-to");
commands.add(processConfig.getDebugLogsDirectory().getAbsolutePath());

pythonPath = pythonPath + File.pathSeparator + virtualEnvHome.getAbsolutePath();
}

commands.add(controllerPyFile.getAbsolutePath());
Expand All @@ -175,7 +180,7 @@ private Process launchPythonProcess(final int listeningPort, final String authTo
processBuilder.environment().put("JAVA_PORT", String.valueOf(listeningPort));
processBuilder.environment().put("LOGS_DIR", pythonLogsDirectory.getAbsolutePath());
processBuilder.environment().put("ENV_HOME", virtualEnvHome.getAbsolutePath());
processBuilder.environment().put("PYTHONPATH", pythonApiDirectory.getAbsolutePath());
processBuilder.environment().put("PYTHONPATH", pythonPath);
processBuilder.environment().put("PYTHON_CMD", pythonCommandFile.getAbsolutePath());
processBuilder.environment().put("AUTH_TOKEN", authToken);
processBuilder.inheritIO();
Expand Down Expand Up @@ -231,8 +236,8 @@ private void installDebugPy() throws IOException {
final String pythonCommand = processConfig.getPythonCommand();

final ProcessBuilder processBuilder = new ProcessBuilder(pythonCommand, "-m", "pip", "install", "--upgrade", "debugpy", "--target",
processConfig.getPythonWorkingDirectory().getAbsolutePath());
processBuilder.directory(virtualEnvHome.getParentFile());
virtualEnvHome.getAbsolutePath());
processBuilder.directory(virtualEnvHome);

final String command = String.join(" ", processBuilder.command());
logger.debug("Installing DebugPy to Virtual Env {} using command {}", virtualEnvHome, command);
Expand Down Expand Up @@ -298,4 +303,4 @@ public int getProcessorCount() {
public Map<String, Integer> getJavaObjectBindingCounts() {
return gateway.getObjectBindings().getCountsPerClass();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.nifi.python.processor;

import org.apache.nifi.annotation.behavior.DefaultRunDuration;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
Expand All @@ -30,7 +30,7 @@
import java.util.Map;
import java.util.Optional;

@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
@InputRequirement(Requirement.INPUT_REQUIRED)
public class FlowFileTransformProxy extends PythonProcessorProxy {

private final PythonProcessorBridge bridge;
Expand Down Expand Up @@ -60,7 +60,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
return;
}

FlowFile transformed = session.create(original);
FlowFile transformed = session.clone(original);

final FlowFileTransformResult result;
try (final StandardInputFlowFile inputFlowFile = new StandardInputFlowFile(session, original)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.nifi.python.processor;

import org.apache.nifi.annotation.behavior.DefaultRunDuration;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AsyncLoadedProcessor;
Expand All @@ -36,6 +39,8 @@
import java.util.Optional;
import java.util.Set;

@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
@SupportsSensitiveDynamicProperties
public abstract class PythonProcessorProxy extends AbstractProcessor implements AsyncLoadedProcessor {
private final PythonProcessorBridge bridge;
private volatile Set<Relationship> cachedRelationships = null;
Expand Down Expand Up @@ -94,8 +99,8 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final Optional<PythonProcessorAdapter> optionalAdapter = bridge.getProcessorAdapter();
if (optionalAdapter.isEmpty()) {
final LoadState loadState = bridge.getLoadState();
if (loadState == LoadState.LOADING_PROCESSOR_CODE || loadState == LoadState.DOWNLOADING_DEPENDENCIES) {
return List.of(new ValidationResult.Builder()
.subject("Processor")
.explanation("Processor has not yet completed initialization")
Expand All @@ -105,6 +110,16 @@ protected Collection<ValidationResult> customValidate(final ValidationContext va

try {
reload();

final Optional<PythonProcessorAdapter> optionalAdapter = bridge.getProcessorAdapter();
if (optionalAdapter.isEmpty()) {
return List.of(new ValidationResult.Builder()
.subject("Processor")
.explanation("Processor has not yet completed initialization")
.valid(false)
.build());
}

return optionalAdapter.get().customValidate(validationContext);
} catch (final Exception e) {
getLogger().warn("Failed to perform validation for Python Processor {}; assuming invalid", this, e);
Expand Down Expand Up @@ -166,11 +181,6 @@ public void cacheDynamicPropertyDescriptors(final ProcessContext context) {
this.cachedDynamicDescriptors = dynamicDescriptors;
}

@OnStopped
public void destroyCachedElements() {
this.cachedRelationships = null;
this.cachedDynamicDescriptors = null;
}

@Override
public Set<Relationship> getRelationships() {
Expand Down Expand Up @@ -224,12 +234,21 @@ private void reload() {
getLogger().info("Successfully reloaded Processor");
}

cachedPropertyDescriptors = null;
cachedRelationships = null;
supportsDynamicProperties = bridge.getProcessorAdapter()
.orElseThrow(() -> new IllegalStateException("Processor has not finished initializing"))
.isDynamicPropertySupported();
}

@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
cachedPropertyDescriptors = null;
cachedRelationships = null;
super.onPropertyModified(descriptor, oldValue, newValue);
}

protected Set<Relationship> getImplicitRelationships() {
return implicitRelationships;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.nifi.python.processor;

import org.apache.nifi.NullSuppression;
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
Expand Down Expand Up @@ -57,7 +57,7 @@
import java.util.Objects;
import java.util.Optional;

@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
@InputRequirement(Requirement.INPUT_REQUIRED)
public class RecordTransformProxy extends PythonProcessorProxy {
private final PythonProcessorBridge bridge;
private volatile RecordTransform transform;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,9 @@ public String getAttribute(final String name) {
public Map<String, String> getAttributes() {
return flowFile.getAttributes();
}
}

@Override
public String toString() {
return "FlowFile[id=" + getAttribute("uuid") + ", filename=" + getAttribute("filename") + ", size=" + getSize() + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ public void testGetProcessorDetails() {
.orElseThrow(() -> new RuntimeException("Could not find ConvertCsvToExcel"));

assertEquals("0.0.1-SNAPSHOT", convertCsvToExcel.getProcessorVersion());
assertNull(convertCsvToExcel.getPyPiPackageName());
assertEquals(new File("target/python/extensions/ConvertCsvToExcel.py").getAbsolutePath(),
new File(convertCsvToExcel.getSourceLocation()).getAbsolutePath());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,31 @@ def __init__(self, min_level, java_logger):
def trace(self, msg, *args):
if self.min_level < LogLevel.DEBUG:
return
self.java_logger.trace(msg, self.__to_java_array(args))
self.java_logger.trace(str(msg), self.__to_java_array(args))

def debug(self, msg, *args):
if self.min_level < LogLevel.DEBUG:
return
self.java_logger.debug(msg, self.__to_java_array(args))
self.java_logger.debug(str(msg), self.__to_java_array(args))

def info(self, msg, *args):
if self.min_level < LogLevel.DEBUG:
return
self.java_logger.info(msg, self.__to_java_array(args))
self.java_logger.info(str(msg), self.__to_java_array(args))

def warn(self, msg, *args):
if self.min_level < LogLevel.DEBUG:
return
self.java_logger.warn(msg, self.__to_java_array(args))
self.java_logger.warn(str(msg), self.__to_java_array(args))

def error(self, msg, *args):
if self.min_level < LogLevel.DEBUG:
return
self.java_logger.error(msg, self.__to_java_array(args))
self.java_logger.error(str(msg), self.__to_java_array(args))


def __to_java_array(self, *args):
arg_array = JvmHolder.gateway.new_array(JvmHolder.jvm.java.lang.Object, len(args))
for i, arg in enumerate(args):
arg_array[i] = arg
arg_array[i] = None if arg is None else str(arg)
return arg_array
Loading

0 comments on commit 5bcad9e

Please sign in to comment.