-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-12242: Added ability to route data that exceeds the configured t… #7895
Closed
Closed
Changes from 1 commit
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
import org.apache.nifi.annotation.behavior.TriggerSerially; | ||
import org.apache.nifi.annotation.documentation.CapabilityDescription; | ||
import org.apache.nifi.annotation.documentation.Tags; | ||
import org.apache.nifi.annotation.documentation.UseCase; | ||
import org.apache.nifi.annotation.lifecycle.OnScheduled; | ||
import org.apache.nifi.components.AllowableValue; | ||
import org.apache.nifi.components.PropertyDescriptor; | ||
|
@@ -33,9 +34,9 @@ | |
import org.apache.nifi.processor.AbstractProcessor; | ||
import org.apache.nifi.processor.DataUnit; | ||
import org.apache.nifi.processor.FlowFileFilter; | ||
import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult; | ||
import org.apache.nifi.processor.ProcessContext; | ||
import org.apache.nifi.processor.ProcessSession; | ||
import org.apache.nifi.processor.ProcessorInitializationContext; | ||
import org.apache.nifi.processor.Relationship; | ||
import org.apache.nifi.processor.exception.ProcessException; | ||
import org.apache.nifi.processor.util.StandardValidators; | ||
|
@@ -45,7 +46,6 @@ | |
|
||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
|
@@ -66,6 +66,52 @@ | |
@CapabilityDescription("Controls the rate at which data is transferred to follow-on processors." | ||
+ " If you configure a very small Time Duration, then the accuracy of the throttle gets worse." | ||
+ " You can improve this accuracy by decreasing the Yield Duration, at the expense of more Tasks given to the processor.") | ||
@UseCase(description = "Limit the rate at which data is sent to a downstream system with little to no bursts", | ||
keywords = {"throttle", "limit", "slow down", "data rate"}, | ||
configuration = """ | ||
Set the "Rate Control Criteria" to `data rate`. | ||
Set the "Time Duration" property to `1 sec`. | ||
Configure the "Maximum Rate" property to specify how much data should be allowed through each second. | ||
|
||
For example, to allow through 8 MB per second, "Maximum Rate" to `8 MB`. | ||
""" | ||
) | ||
@UseCase(description = "Limit the rate at which requests are sent to a downstream system with little to no bursts", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For this UseCase, would you change the word "requests" to "flowfiles"? (there are 3 places). That would make this UseCase generic for users that don't equate a request to a flowfile. The last 2 UseCases use the word "requests" which is fine for those cases. |
||
keywords = {"throttle", "limit", "slow down", "request rate"}, | ||
configuration = """ | ||
Set the "Rate Control Criteria" to `flowfile count`. | ||
Set the "Time Duration" property to `1 sec`. | ||
Configure the "Maximum Rate" property to specify how many requests should be allowed through each second. | ||
|
||
For example, to allow through 100 requests per second, set "Maximum Rate" to `100`. | ||
""" | ||
) | ||
@UseCase(description = "Reject requests that exceed a specific rate with little to no bursts", | ||
keywords = {"throttle", "limit", "slow down", "request rate"}, | ||
configuration = """ | ||
Set the "Rate Control Criteria" to `flowfile count`. | ||
Set the "Time Duration" property to `1 sec`. | ||
Set the "Rate Exceeded Strategy" property to `Route to 'rate exceeded'`. | ||
Configure the "Maximum Rate" property to specify how many requests should be allowed through each second. | ||
|
||
For example, to allow through 100 requests per second, "Maximum Rate" to `100`. | ||
If more than 100 requests come in during any one second, the additional requests will be routed to `rate exceeded` instead of `success`. | ||
""" | ||
) | ||
@UseCase(description = "Reject requests that exceed a specific rate, allowing for bursts", | ||
keywords = {"throttle", "limit", "slow down", "request rate"}, | ||
configuration = """ | ||
Set the "Rate Control Criteria" to `flowfile count`. | ||
Set the "Time Duration" property to `1 min`. | ||
Set the "Rate Exceeded Strategy" property to `Route to 'rate exceeded'`. | ||
Configure the "Maximum Rate" property to specify how many requests should be allowed through each minute. | ||
|
||
For example, to allow through 100 requests per second, "Maximum Rate" to `6000`. | ||
This will allow through 6,000 FlowFiles per minute, which averages to 100 FlowFiles per second. However, those 6,000 FlowFiles may come all within the first couple of | ||
seconds, or they may come in over a period of 60 seconds. As a result, this gives us an average rate of 100 FlowFiles per second but allows for bursts of data. | ||
If more than 6,000 requests come in during any one minute, the additional requests will be routed to `rate exceeded` instead of `success`. | ||
""" | ||
) | ||
public class ControlRate extends AbstractProcessor { | ||
|
||
public static final String DATA_RATE = "data rate"; | ||
|
@@ -82,6 +128,11 @@ public class ControlRate extends AbstractProcessor { | |
public static final AllowableValue DATA_OR_FLOWFILE_RATE_VALUE = new AllowableValue(DATA_OR_FLOWFILE_RATE, DATA_OR_FLOWFILE_RATE, | ||
"Rate is controlled by counting bytes and FlowFiles transferred per time duration; if either threshold is met, throttling is enforced"); | ||
|
||
static final AllowableValue HOLD_FLOWFILE = new AllowableValue("Hold FlowFile", "Hold FlowFile", | ||
"The FlowFile will be held in its input queue until the rate of data has fallen below the configured maximum and will then be allowed through."); | ||
static final AllowableValue ROUTE_TO_RATE_EXCEEDED = new AllowableValue("Route to 'rate exceeded'", "Route to 'rate exceeded'", | ||
"The FlowFile will be routed to the 'rate exceeded' Relationship."); | ||
|
||
// based on testing to balance commits and 10,000 FF swap limit | ||
public static final int MAX_FLOW_FILES_PER_BATCH = 1000; | ||
private static final long DEFAULT_ACCRUAL_COUNT = -1L; | ||
|
@@ -123,6 +174,14 @@ public class ControlRate extends AbstractProcessor { | |
.dependsOn(RATE_CONTROL_CRITERIA, DATA_OR_FLOWFILE_RATE) | ||
.build(); | ||
|
||
public static final PropertyDescriptor RATE_EXCEEDED_STRATEGY = new PropertyDescriptor.Builder() | ||
.name("Rate Exceeded Strategy") | ||
.description("Specifies how to handle an incoming FlowFile when the maximum data rate has been exceeded.") | ||
.required(true) | ||
.allowableValues(HOLD_FLOWFILE, ROUTE_TO_RATE_EXCEEDED) | ||
.defaultValue(HOLD_FLOWFILE.getValue()) | ||
.build(); | ||
|
||
public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() | ||
.name("Rate Controlled Attribute") | ||
.description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. " | ||
|
@@ -149,20 +208,37 @@ public class ControlRate extends AbstractProcessor { | |
.expressionLanguageSupported(ExpressionLanguageScope.NONE) | ||
.build(); | ||
|
||
public static final Relationship REL_SUCCESS = new Relationship.Builder() | ||
static final Relationship REL_SUCCESS = new Relationship.Builder() | ||
.name("success") | ||
.description("FlowFiles are transferred to this relationship under normal conditions") | ||
.build(); | ||
public static final Relationship REL_FAILURE = new Relationship.Builder() | ||
static final Relationship REL_FAILURE = new Relationship.Builder() | ||
.name("failure") | ||
.description("FlowFiles will be routed to this relationship if they are missing a necessary Rate Controlled Attribute or the attribute is not in the expected format") | ||
.build(); | ||
static final Relationship REL_RATE_EXCEEDED = new Relationship.Builder() | ||
.name("rate exceeded") | ||
.description("A FlowFile will be routed to this Relationship if it results in exceeding the maximum threshold allowed based on the Processor's configuration and if the Rate Exceeded " + | ||
"Strategy is configured to use this Relationship.") | ||
.build(); | ||
|
||
private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*"); | ||
private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###"; | ||
|
||
private List<PropertyDescriptor> properties; | ||
private Set<Relationship> relationships; | ||
private static final List<PropertyDescriptor> properties = List.of( | ||
RATE_CONTROL_CRITERIA, | ||
TIME_PERIOD, | ||
MAX_RATE, | ||
MAX_DATA_RATE, | ||
MAX_COUNT_RATE, | ||
RATE_EXCEEDED_STRATEGY, | ||
RATE_CONTROL_ATTRIBUTE_NAME, | ||
GROUPING_ATTRIBUTE_NAME | ||
); | ||
|
||
private static final Set<Relationship> defaultRelationships = Set.of(REL_SUCCESS, REL_FAILURE); | ||
private static final Set<Relationship> rateExceededRelationships = Set.of(REL_SUCCESS, REL_FAILURE, REL_RATE_EXCEEDED); | ||
private volatile Set<Relationship> relationships = defaultRelationships; | ||
|
||
private final ConcurrentMap<String, Throttle> dataThrottleMap = new ConcurrentHashMap<>(); | ||
private final ConcurrentMap<String, Throttle> countThrottleMap = new ConcurrentHashMap<>(); | ||
|
@@ -174,23 +250,6 @@ public class ControlRate extends AbstractProcessor { | |
private volatile String groupingAttributeName = null; | ||
private volatile int timePeriodSeconds = 1; | ||
|
||
@Override | ||
protected void init(final ProcessorInitializationContext context) { | ||
final List<PropertyDescriptor> properties = new ArrayList<>(); | ||
properties.add(RATE_CONTROL_CRITERIA); | ||
properties.add(TIME_PERIOD); | ||
properties.add(MAX_RATE); | ||
properties.add(MAX_DATA_RATE); | ||
properties.add(MAX_COUNT_RATE); | ||
properties.add(RATE_CONTROL_ATTRIBUTE_NAME); | ||
properties.add(GROUPING_ATTRIBUTE_NAME); | ||
this.properties = Collections.unmodifiableList(properties); | ||
|
||
final Set<Relationship> relationships = new HashSet<>(); | ||
relationships.add(REL_SUCCESS); | ||
relationships.add(REL_FAILURE); | ||
this.relationships = Collections.unmodifiableSet(relationships); | ||
} | ||
|
||
@Override | ||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { | ||
|
@@ -239,6 +298,14 @@ protected Collection<ValidationResult> customValidate(final ValidationContext co | |
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { | ||
super.onPropertyModified(descriptor, oldValue, newValue); | ||
|
||
if (descriptor.equals(RATE_EXCEEDED_STRATEGY)) { | ||
if (ROUTE_TO_RATE_EXCEEDED.getValue().equalsIgnoreCase(newValue)) { | ||
this.relationships = rateExceededRelationships; | ||
} else { | ||
this.relationships = defaultRelationships; | ||
} | ||
} | ||
|
||
if (descriptor.equals(RATE_CONTROL_CRITERIA) | ||
|| descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME) | ||
|| descriptor.equals(GROUPING_ATTRIBUTE_NAME) | ||
|
@@ -300,12 +367,63 @@ public void onScheduled(final ProcessContext context) { | |
|
||
@Override | ||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { | ||
List<FlowFile> flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH, this::getCurrentTimeMillis)); | ||
final String strategy = context.getProperty(RATE_EXCEEDED_STRATEGY).getValue(); | ||
if (ROUTE_TO_RATE_EXCEEDED.getValue().equalsIgnoreCase(strategy)) { | ||
routeFlowFilesExceedingRate(context, session); | ||
} else { | ||
holdFlowFilesExceedingRate(context, session); | ||
} | ||
} | ||
|
||
private void routeFlowFilesExceedingRate(final ProcessContext context, final ProcessSession session) { | ||
clearExpiredThrottles(context); | ||
|
||
final List<FlowFile> flowFiles = session.get(MAX_FLOW_FILES_PER_BATCH); | ||
if (flowFiles.isEmpty()) { | ||
context.yield(); | ||
return; | ||
} | ||
|
||
final ThrottleFilter filter = new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH, this::getCurrentTimeMillis); | ||
for (final FlowFile flowFile : flowFiles) { | ||
final Relationship relationship; | ||
if (!isRateAttributeValid(flowFile)) { | ||
relationship = REL_FAILURE; | ||
} else { | ||
final FlowFileFilterResult result = filter.filter(flowFile); | ||
relationship = result.isAccept() ? REL_SUCCESS : REL_RATE_EXCEEDED; | ||
} | ||
|
||
session.transfer(flowFile, relationship); | ||
getLogger().info("Routing {} to {}", flowFile, relationship.getName()); | ||
session.getProvenanceReporter().route(flowFile, relationship); | ||
} | ||
} | ||
|
||
|
||
private void holdFlowFilesExceedingRate(final ProcessContext context, final ProcessSession session) { | ||
clearExpiredThrottles(context); | ||
|
||
final List<FlowFile> flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH, this::getCurrentTimeMillis)); | ||
if (flowFiles.isEmpty()) { | ||
context.yield(); | ||
return; | ||
} | ||
|
||
final ComponentLog logger = getLogger(); | ||
for (FlowFile flowFile : flowFiles) { | ||
// call this to capture potential error | ||
if (isRateAttributeValid(flowFile)) { | ||
logger.info("transferring {} to 'success'", flowFile); | ||
session.transfer(flowFile, REL_SUCCESS); | ||
} else { | ||
logger.error("Routing {} to 'failure' due to missing or invalid attribute", flowFile); | ||
session.transfer(flowFile, REL_FAILURE); | ||
} | ||
} | ||
} | ||
|
||
private void clearExpiredThrottles(final ProcessContext context) { | ||
// Periodically clear any Throttle that has not been used in more than 2 throttling periods | ||
final long lastClearTime = lastThrottleClearTime.get(); | ||
final long throttleExpirationMillis = getCurrentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS); | ||
|
@@ -334,18 +452,6 @@ public void onTrigger(final ProcessContext context, final ProcessSession session | |
} | ||
} | ||
} | ||
|
||
final ComponentLog logger = getLogger(); | ||
for (FlowFile flowFile : flowFiles) { | ||
// call this to capture potential error | ||
if (isAccrualPossible(flowFile)) { | ||
logger.info("transferring {} to 'success'", flowFile); | ||
session.transfer(flowFile, REL_SUCCESS); | ||
} else { | ||
logger.error("Routing {} to 'failure' due to missing or invalid attribute", flowFile); | ||
session.transfer(flowFile, REL_FAILURE); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -361,7 +467,7 @@ protected long getCurrentTimeMillis() { | |
* Determine if the accrual amount is valid for the type of throttle being applied. For example, if throttling based on | ||
* flowfile attribute, the specified attribute must be present and must be a long integer. | ||
*/ | ||
private boolean isAccrualPossible(FlowFile flowFile) { | ||
private boolean isRateAttributeValid(FlowFile flowFile) { | ||
if (rateControlCriteria.equals(ATTRIBUTE_RATE)) { | ||
final String attributeValue = flowFile.getAttribute(rateControlAttribute); | ||
return attributeValue != null && POSITIVE_LONG_PATTERN.matcher(attributeValue).matches(); | ||
|
@@ -500,7 +606,7 @@ private class ThrottleFilter implements FlowFileFilter { | |
|
||
@Override | ||
public FlowFileFilterResult filter(FlowFile flowFile) { | ||
if (!isAccrualPossible(flowFile)) { | ||
if (!isRateAttributeValid(flowFile)) { | ||
// this FlowFile is invalid for this configuration so let the processor deal with it | ||
return FlowFileFilterResult.ACCEPT_AND_TERMINATE; | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing word "set" e.g. "set Maximum Rate to '8 MB'". The word "set" is also missing in UseCase 3 and 4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch @mosermw thanks