Skip to content

Commit

Permalink
NIFI-12230 Add configurable Log Level for IP not found in GeoEnrichIP
Browse files Browse the repository at this point in the history
NIFI-12253 Route to not found relationship instead of rolling back in GeoEnrichIPRecord

This closes #7909

Signed-off-by: David Handermann <exceptionfactory@apache.org>
(cherry picked from commit 184757f)
  • Loading branch information
pvillard31 authored and exceptionfactory committed Oct 20, 2023
1 parent 54030bf commit 3d607a7
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ public abstract class AbstractEnrichIP extends AbstractProcessor {
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.build();

public static final PropertyDescriptor LOG_LEVEL = new PropertyDescriptor.Builder()
.name("Log Level")
.displayName("Log Level")
.required(true)
.description("The Log Level to use when an IP is not found in the database. Accepted values: INFO, DEBUG, WARN, ERROR.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue(MessageLogLevel.WARN.toString())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

public static final Relationship REL_FOUND = new Relationship.Builder()
.name("found")
.description("Where to route flow files after successfully enriching attributes with data provided by database")
Expand All @@ -80,6 +90,10 @@ public abstract class AbstractEnrichIP extends AbstractProcessor {
.description("Where to route flow files after unsuccessfully enriching attributes because no data was found")
.build();

enum MessageLogLevel {
DEBUG, INFO, WARN, ERROR
}

private Set<Relationship> relationships;
private List<PropertyDescriptor> propertyDescriptors;
final AtomicReference<DatabaseReader> databaseReaderRef = new AtomicReference<>(null);
Expand Down Expand Up @@ -134,6 +148,7 @@ protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(GEO_DATABASE_FILE);
props.add(IP_ADDRESS_ATTRIBUTE);
props.add(LOG_LEVEL);
this.propertyDescriptors = Collections.unmodifiableList(props);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.maxmind.db.InvalidDatabaseException;
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.exception.AddressNotFoundException;
import com.maxmind.geoip2.exception.GeoIp2Exception;
import com.maxmind.geoip2.model.CityResponse;
import com.maxmind.geoip2.record.Subdivision;
Expand Down Expand Up @@ -95,6 +96,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}

DatabaseReader dbReader = databaseReaderRef.get();
final MessageLogLevel logLevel = MessageLogLevel.valueOf(context.getProperty(LOG_LEVEL).evaluateAttributeExpressions(flowFile).getValue().toUpperCase());
final String ipAttributeName = context.getProperty(IP_ADDRESS_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
final String ipAttributeValue = flowFile.getAttribute(ipAttributeName);

Expand Down Expand Up @@ -132,6 +134,26 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
getLogger().warn("Failure while trying to load enrichment data for {} due to {}, rolling back session "
+ "and will reload the database on the next run", flowFile, idbe.getMessage());
session.rollback();
return;
} catch (AddressNotFoundException anfe) {
session.transfer(flowFile, REL_NOT_FOUND);

switch (logLevel) {
case INFO:
getLogger().info("Address not found in the database", anfe);
break;
case WARN:
getLogger().warn("Address not found in the database", anfe);
break;
case ERROR:
getLogger().error("Address not found in the database", anfe);
break;
case DEBUG:
default:
getLogger().debug("Address not found in the database", anfe);
break;
}

return;
} catch (GeoIp2Exception | IOException ex) {
// Note IOException is captured again as dbReader also makes InetAddress.getByName() calls.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.maxmind.db.InvalidDatabaseException;
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.exception.AddressNotFoundException;
import com.maxmind.geoip2.model.CityResponse;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
Expand Down Expand Up @@ -160,7 +161,7 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {

private static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
GEO_DATABASE_FILE, READER, WRITER, SPLIT_FOUND_NOT_FOUND, IP_RECORD_PATH, GEO_CITY, GEO_LATITUDE,
GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE
GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE, LOG_LEVEL
));

@Override
Expand Down Expand Up @@ -231,6 +232,8 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
}

String rawIpPath = context.getProperty(IP_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
final MessageLogLevel logLevel = MessageLogLevel.valueOf(context.getProperty(LOG_LEVEL).evaluateAttributeExpressions(input).getValue().toUpperCase());

RecordPath ipPath = cache.getCompiled(rawIpPath);

RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
Expand All @@ -249,7 +252,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
int notFoundCount = 0;
while ((record = reader.nextRecord()) != null) {
CityResponse response;
response = geocode(ipPath, record, dbReader);
response = geocode(ipPath, record, dbReader, logLevel);
boolean wasEnriched = enrichRecord(response, record, paths);
if (wasEnriched) {
targetRelationship = REL_FOUND;
Expand Down Expand Up @@ -314,7 +317,7 @@ private Map<String, String> buildAttributes(int recordCount, String mimeType) {
return retVal;
}

private CityResponse geocode(RecordPath ipPath, Record record, DatabaseReader reader) throws Exception {
private CityResponse geocode(RecordPath ipPath, Record record, DatabaseReader reader, MessageLogLevel logLevel) throws Exception {
RecordPathResult result = ipPath.evaluate(record);
Optional<FieldValue> ipField = result.getSelectedFields().findFirst();
if (ipField.isPresent()) {
Expand All @@ -326,7 +329,28 @@ private CityResponse geocode(RecordPath ipPath, Record record, DatabaseReader re
String realValue = val.toString();
InetAddress address = InetAddress.getByName(realValue);

return reader.city(address);
try {
return reader.city(address);
} catch (AddressNotFoundException anfe) {

switch (logLevel) {
case INFO:
getLogger().info("Address not found in the database", anfe);
break;
case WARN:
getLogger().warn("Address not found in the database", anfe);
break;
case ERROR:
getLogger().error("Address not found in the database", anfe);
break;
case DEBUG:
default:
getLogger().debug("Address not found in the database", anfe);
break;
}

return null;
}
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@
import static org.mockito.Mockito.when;

public class TestGeoEnrichIPRecord {

private TestRunner runner;
private DatabaseReader reader;

@BeforeEach
public void setup() throws Exception {
reader = mock(DatabaseReader.class);
Expand Down Expand Up @@ -96,6 +98,7 @@ public void setup() throws Exception {
runner.setProperty(GeoEnrichIPRecord.GEO_POSTAL_CODE, "/geo/country_postal");
runner.setProperty(GeoEnrichIPRecord.GEO_LATITUDE, "/geo/lat");
runner.setProperty(GeoEnrichIPRecord.GEO_LONGITUDE, "/geo/lon");
runner.setProperty(AbstractEnrichIP.LOG_LEVEL, "WARN");
runner.assertValid();
}

Expand Down Expand Up @@ -129,7 +132,7 @@ public void testEnrichSendToFound() throws Exception {
byte[] raw = runner.getContentAsByteArray(ff);
String content = new String(raw);
ObjectMapper mapper = new ObjectMapper();
List<Map<String, Object>> result = (List<Map<String, Object>>)mapper.readValue(content, List.class);
List<Map<String, Object>> result = mapper.readValue(content, List.class);

assertNotNull(result);
assertEquals(1, result.size());
Expand All @@ -152,16 +155,19 @@ class TestableGeoEnrichIPRecord extends GeoEnrichIPRecord {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.unmodifiableList(Arrays.asList(
READER, WRITER, IP_RECORD_PATH, SPLIT_FOUND_NOT_FOUND, GEO_CITY, GEO_LATITUDE, GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE
READER, WRITER, IP_RECORD_PATH, SPLIT_FOUND_NOT_FOUND, GEO_CITY, GEO_LATITUDE, GEO_LONGITUDE,
GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE, LOG_LEVEL
));
}
@Override
@OnScheduled
public void onScheduled(ProcessContext context) {
databaseReaderRef.set(reader);
readerFactory = context.getProperty(READER).asControllerService(RecordReaderFactory.class);
writerFactory = context.getProperty(WRITER).asControllerService(RecordSetWriterFactory.class);
splitOutput = context.getProperty(SPLIT_FOUND_NOT_FOUND).asBoolean();
}
@Override
protected void loadDatabaseFile() {
// Do nothing, the mock database reader is used
}
Expand Down

0 comments on commit 3d607a7

Please sign in to comment.