Skip to content

Commit

Permalink
NIFI-12228: This closes #7881. Fixed issue with FlowFile Concucrrency…
Browse files Browse the repository at this point in the history
… that can occasionally bring in more data than it should.

Code cleanup, fixing logback to avoid INFO-level stack trace from xodus

Signed-off-by: Joseph Witt <joewitt@apache.org>
  • Loading branch information
markap14 authored and joewitt committed Oct 13, 2023
1 parent 96eb1d8 commit 0eabbcd
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,9 @@ private void triggerInputPort(final ProcessContext context, final ProcessSession

final FlowFileConcurrency flowFileConcurrency = getProcessGroup().getFlowFileConcurrency();
switch (flowFileConcurrency) {
case UNBOUNDED:
transferUnboundedConcurrency(context, session);
break;
case SINGLE_FLOWFILE_PER_NODE:
transferSingleFlowFile(session);
break;
case SINGLE_BATCH_PER_NODE:
transferInputBatch(session);
break;
case UNBOUNDED -> transferUnboundedConcurrency(context, session);
case SINGLE_FLOWFILE_PER_NODE -> transferSingleFlowFile(session);
case SINGLE_BATCH_PER_NODE -> transferInputBatch(session);
}
} finally {
flowFileGate.releaseClaim(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,26 @@ public boolean tryClaim(final Port port) {
return false;
}

// We need to try to open flow into the Port's group. To do this, we need to get the data valve for the parent group,
// as it is responsible for data flowing into and out of its children.
final ProcessGroup dataValveGroup = port.getProcessGroup().getParent();
final DataValve dataValve = dataValveGroup.getDataValve();
final boolean openFlowIntoGroup = dataValve.tryOpenFlowIntoGroup(port.getProcessGroup());
if (!openFlowIntoGroup) {
claimed.set(false);
return false;
}

// The claim is now held by this thread. Check if the ProcessGroup is empty.
final boolean empty = !port.getProcessGroup().isDataQueued();
if (empty) {
// Process Group is empty so return true indicating that the claim is now held.
return true;
}

// We have already opened flow into group, so now we must close it, since we are not allowing flow in
dataValve.closeFlowIntoGroup(port.getProcessGroup());

// Process Group was not empty, so we cannot allow any more FlowFiles through. Reset claimed to false and return false,
// indicating that the caller did not obtain the claim.
claimed.set(false);
Expand All @@ -52,5 +65,9 @@ public boolean tryClaim(final Port port) {
@Override
public void releaseClaim(final Port port) {
claimed.set(false);

final ProcessGroup dataValveGroup = port.getProcessGroup().getParent();
final DataValve dataValve = dataValveGroup.getDataValve();
dataValve.closeFlowIntoGroup(port.getProcessGroup());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,15 @@ private String getReasonFlowIntoGroupNotAllowed(final ProcessGroup destinationGr
if (destinationGroup.isDataQueued()) {
// If the destination group already has data queued up, and the valve is not already open, do not allow data to
// flow into the group. If we did, we would end up mixing together two different batches of data.
logger.debug("Will not allow data to flow into {} because valve is not already open and the Process Group has data queued", destinationGroup);
logger.trace("Will not allow data to flow into {} because valve is not already open and the Process Group has data queued", destinationGroup);
return "Process Group already has data queued and valve is not already allowing data into group";
}

if (destinationGroup.getFlowFileOutboundPolicy() == FlowFileOutboundPolicy.BATCH_OUTPUT && groupsWithDataFlowingOut.contains(destinationGroup.getIdentifier())) {
logger.trace("Will not allow data to flow into {} because Outbound Policy is Batch Output and valve is already open to allow data to flow out of group", destinationGroup);
return "Data Valve is already allowing data to flow out of group";
}

for (final Port port : destinationGroup.getInputPorts()) {
for (final Connection connection : port.getIncomingConnections()) {
final Connectable sourceConnectable = connection.getSource();
Expand All @@ -102,7 +107,7 @@ private String getReasonFlowIntoGroupNotAllowed(final ProcessGroup destinationGr

final boolean flowingOutOfSourceGroup = groupsWithDataFlowingOut.contains(sourceGroup.getIdentifier());
if (Boolean.TRUE.equals(flowingOutOfSourceGroup)) {
logger.debug("Will not allow data to flow into {} because port {} has an incoming connection from {} and that Process Group is currently allowing data to flow out",
logger.trace("Will not allow data to flow into {} because port {} has an incoming connection from {} and that Process Group is currently allowing data to flow out",
destinationGroup, port, sourceConnectable);
return "Source connected to Input Port is an Output Port with Batch Output and is currently allowing data to flow out";
}
Expand All @@ -119,13 +124,15 @@ public synchronized void closeFlowIntoGroup(final ProcessGroup destinationGroup)
return;
}

for (final Port port : destinationGroup.getInputPorts()) {
for (final Connection connection : port.getIncomingConnections()) {
if (!connection.getFlowFileQueue().isEmpty()) {
logger.debug("Triggered to close flow of data into group {} but Input Port has incoming Connection {}, which is not empty, so will not close valve",
destinationGroup, connection);
if (destinationGroup.getFlowFileConcurrency() == FlowFileConcurrency.SINGLE_BATCH_PER_NODE) {
for (final Port port : destinationGroup.getInputPorts()) {
for (final Connection connection : port.getIncomingConnections()) {
if (!connection.getFlowFileQueue().isEmpty()) {
logger.debug("Triggered to close flow of data into group {} but Input Port has incoming Connection {}, which is not empty, so will not close valve",
destinationGroup, connection);

return;
return;
}
}
}
}
Expand Down Expand Up @@ -175,14 +182,14 @@ private String getReasonFlowOutOfGroupNotAllowed(final ProcessGroup sourceGroup)
}

if (!connection.getFlowFileQueue().isEmpty()) {
logger.debug("Not allowing data to flow out of {} because {} has a destination of {}, which has data queued and its Process Group is "
logger.trace("Not allowing data to flow out of {} because {} has a destination of {}, which has data queued and its Process Group is "
+ "configured with a FlowFileConcurrency of Batch Per Node.", sourceGroup, port, connection);
return "Output Connection already has data queued";
}

final boolean dataFlowingIntoDestination = groupsWithDataFlowingIn.contains(destinationProcessGroup.getIdentifier());
if (dataFlowingIntoDestination) {
logger.debug("Not allowing data to flow out of {} because {} has a destination of {}, and its Process Group is "
logger.trace("Not allowing data to flow out of {} because {} has a destination of {}, and its Process Group is "
+ "currently allowing data to flow in", sourceGroup, port, connection);
return "Destination Process Group is allowing data to flow in";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@
<!-- Suppress non-error messages from Apache Atlas which was emitting large amounts of INFO logs by default -->
<logger name="org.apache.atlas" level="WARN"/>

<!-- Suppress non-error messages from JetBrains Xodus FileDataWriter related to FileChannel -->
<logger name="jetbrains.exodus.io.FileDataWriter" level="WARN" />

<!-- These log messages would normally go to the USER_FILE log, but they belong in the APP_FILE -->
<logger name="org.apache.nifi.web.security.requests" level="INFO" additivity="false">
<appender-ref ref="APP_FILE"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@
<!-- Suppress non-error messages from Apache Atlas which was emitting large amounts of INFO logs by default -->
<logger name="org.apache.atlas" level="WARN"/>

<!-- Suppress non-error messages from JetBrains Xodus FileDataWriter related to FileChannel -->
<logger name="jetbrains.exodus.io.FileDataWriter" level="WARN" />

<!-- These log messages would normally go to the USER_FILE log, but they belong in the APP_FILE -->
<logger name="org.apache.nifi.web.security.requests" level="INFO" additivity="false">
<appender-ref ref="APP_FILE"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@
<logger name="org.apache.nifi.processors.standard.LogMessage" level="INFO"/>
<logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="WARN" />


<logger name="org.apache.zookeeper.ClientCnxn" level="ERROR" />
<logger name="org.apache.zookeeper.server.NIOServerCnxn" level="ERROR" />
<logger name="org.apache.zookeeper.server.NIOServerCnxnFactory" level="ERROR" />
Expand Down Expand Up @@ -149,6 +148,9 @@
<!-- Suppress non-error messages from Apache Atlas which was emitting large amounts of INFO logs by default -->
<logger name="org.apache.atlas" level="WARN"/>

<!-- Suppress non-error messages from JetBrains Xodus FileDataWriter related to FileChannel -->
<logger name="jetbrains.exodus.io.FileDataWriter" level="WARN" />

<!-- These log messages would normally go to the USER_FILE log, but they belong in the APP_FILE -->
<logger name="org.apache.nifi.web.security.requests" level="INFO" additivity="false">
<appender-ref ref="APP_FILE"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@
<!-- Suppress non-error messages from Apache Atlas which was emitting large amounts of INFO logs by default -->
<logger name="org.apache.atlas" level="WARN"/>

<!-- Suppress non-error messages from JetBrains Xodus FileDataWriter related to FileChannel -->
<logger name="jetbrains.exodus.io.FileDataWriter" level="WARN" />

<!-- These log messages would normally go to the USER_FILE log, but they belong in the APP_FILE -->
<logger name="org.apache.nifi.web.security.requests" level="INFO" additivity="false">
<appender-ref ref="APP_FILE"/>
Expand Down

0 comments on commit 0eabbcd

Please sign in to comment.