Skip to content

Commit

Permalink
NIFI-13896 Removed legacy state migration from TailFile (#9425)
Browse files Browse the repository at this point in the history
Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
Lehel44 authored Oct 21, 2024
1 parent d693293 commit a44fb52
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -415,41 +415,20 @@ public void recoverState(final ProcessContext context) throws IOException {
final Scope scope = getStateScope(context);
final StateMap stateMap = context.getStateManager().getState(scope);

final String startPosition = context.getProperty(START_POSITION).getValue();

if (stateMap.getStateVersion().isEmpty() || stateMap.toMap().isEmpty()) {
//state has been cleared or never stored so recover as 'empty state'
initStates(filesToTail, Collections.emptyMap(), true, startPosition);
recoverState(context, filesToTail, Collections.emptyMap());
initStates(filesToTail, Collections.emptyMap(), true);
recoverState(filesToTail, Collections.emptyMap());
return;
}

Map<String, String> statesMap = stateMap.toMap();

if (statesMap.containsKey(TailFileState.StateKeys.FILENAME)
&& statesMap.keySet().stream().noneMatch(key -> key.startsWith(MAP_PREFIX))) {
// If statesMap contains "filename" key without "file.0." prefix,
// and there's no key with "file." prefix, then
// it indicates that the statesMap is created with earlier version of NiFi.
// In this case, we need to migrate the state by adding prefix indexed with 0.
final Map<String, String> migratedStatesMap = new HashMap<>(statesMap.size());
for (String key : statesMap.keySet()) {
migratedStatesMap.put(MAP_PREFIX + "0." + key, statesMap.get(key));
}

// LENGTH is added from NiFi 1.1.0. Set the value with using the last position so that we can use existing state
// to avoid sending duplicated log data after updating NiFi.
migratedStatesMap.put(MAP_PREFIX + "0." + TailFileState.StateKeys.LENGTH, statesMap.get(TailFileState.StateKeys.POSITION));
statesMap = Map.copyOf(migratedStatesMap);
final Map<String, String> statesMap = stateMap.toMap();

getLogger().info("statesMap has been migrated. {}", migratedStatesMap);
}

initStates(filesToTail, statesMap, false, startPosition);
recoverState(context, filesToTail, statesMap);
initStates(filesToTail, statesMap, false);
recoverState(filesToTail, statesMap);
}

private void initStates(final List<String> filesToTail, final Map<String, String> statesMap, final boolean isCleared, final String startPosition) {
private void initStates(final List<String> filesToTail, final Map<String, String> statesMap, final boolean isCleared) {
int fileIndex = 0;

if (isCleared) {
Expand All @@ -460,29 +439,33 @@ private void initStates(final List<String> filesToTail, final Map<String, String
// put back the files we already know about in 'states' object before
// doing the recovery
if (states.isEmpty() && !statesMap.isEmpty()) {
for (String key : statesMap.keySet()) {
if (key.endsWith(TailFileState.StateKeys.FILENAME) && filesToTail.contains(statesMap.get(key))) {
for (Entry<String, String> entry : statesMap.entrySet()) {
final String key = entry.getKey();
final String value = entry.getValue();
if (key.endsWith(TailFileState.StateKeys.FILENAME) && filesToTail.contains(value)) {
int index = Integer.parseInt(key.split("\\.")[1]);
states.put(statesMap.get(key), new TailFileObject(index, statesMap, preAllocatedBufferSize));
states.put(value, new TailFileObject(index, statesMap, preAllocatedBufferSize));
}
}
}

// first, we remove the files that are no longer present
final List<String> toBeRemoved = new ArrayList<>();
for (String file : states.keySet()) {
if (!filesToTail.contains(file)) {
toBeRemoved.add(file);
cleanReader(states.get(file));
for (Entry<String, TailFileObject> entry : states.entrySet()) {
final String filePath = entry.getKey();
final TailFileObject tailFileObject = entry.getValue();
if (!filesToTail.contains(filePath)) {
toBeRemoved.add(filePath);
cleanReader(tailFileObject);
}
}
states.keySet().removeAll(toBeRemoved);
toBeRemoved.forEach(states.keySet()::remove);

// then we need to get the highest ID used so far to be sure
// we don't mix different files in case we add new files to tail
for (String file : states.keySet()) {
if (fileIndex <= states.get(file).getFilenameIndex()) {
fileIndex = states.get(file).getFilenameIndex() + 1;
for (TailFileObject tfo : states.values()) {
if (fileIndex <= tfo.getFilenameIndex()) {
fileIndex = tfo.getFilenameIndex() + 1;
}
}

Expand All @@ -498,9 +481,9 @@ private void initStates(final List<String> filesToTail, final Map<String, String
}
}

private void recoverState(final ProcessContext context, final List<String> filesToTail, final Map<String, String> map) throws IOException {
private void recoverState(final List<String> filesToTail, final Map<String, String> map) throws IOException {
for (String file : filesToTail) {
recoverState(context, map, file);
recoverState(map, file);
}
}

Expand Down Expand Up @@ -549,31 +532,21 @@ private List<String> getFilesToTail(final String baseDir, String fileRegex, bool
* checksum, so that we are ready to proceed with the
* {@link #onTrigger(ProcessContext, ProcessSession)} call.
*
* @param context the ProcessContext
* @param stateValues the values that were recovered from state that was
* previously stored. This Map should be populated with the keys defined in
* {@link TailFileState.StateKeys}.
* @param filePath the file of the file for which state must be recovered
* @throws IOException if unable to seek to the appropriate location in the
* tailed file.
*/
private void recoverState(final ProcessContext context, final Map<String, String> stateValues, final String filePath) throws IOException {

final String prefix = MAP_PREFIX + states.get(filePath).getFilenameIndex() + '.';

if (!stateValues.containsKey(prefix + TailFileState.StateKeys.FILENAME)) {
resetState(filePath);
return;
}
if (!stateValues.containsKey(prefix + TailFileState.StateKeys.POSITION)) {
resetState(filePath);
return;
}
if (!stateValues.containsKey(prefix + TailFileState.StateKeys.TIMESTAMP)) {
resetState(filePath);
return;
}
if (!stateValues.containsKey(prefix + TailFileState.StateKeys.LENGTH)) {
private void recoverState(final Map<String, String> stateValues, final String filePath) throws IOException {
final TailFileObject tailFileObject = states.get(filePath);
final String prefix = MAP_PREFIX + tailFileObject.getFilenameIndex() + '.';

if (!stateValues.containsKey(prefix + TailFileState.StateKeys.FILENAME)
|| !stateValues.containsKey(prefix + TailFileState.StateKeys.POSITION)
|| !stateValues.containsKey(prefix + TailFileState.StateKeys.TIMESTAMP)
|| !stateValues.containsKey(prefix + TailFileState.StateKeys.LENGTH)) {
resetState(filePath);
return;
}
Expand All @@ -589,27 +562,27 @@ private void recoverState(final ProcessContext context, final Map<String, String
File tailFile = null;

if (checksumPresent && filePath.equals(storedStateFilename)) {
states.get(filePath).setExpectedRecoveryChecksum(Long.parseLong(checksumValue));
tailFileObject.setExpectedRecoveryChecksum(Long.parseLong(checksumValue));

// We have an expected checksum and the currently configured filename is the same as the state file.
// We need to check if the existing file is the same as the one referred to in the state file based on
// the checksum.
final Checksum checksum = new CRC32();
final File existingTailFile = new File(storedStateFilename);
if (existingTailFile.length() >= position) {
try (final InputStream tailFileIs = new FileInputStream(existingTailFile);
final CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum)) {
try (final InputStream tailFileIs = Files.newInputStream(existingTailFile.toPath());
final CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum)) {

try {
StreamUtils.copy(in, new NullOutputStream(), states.get(filePath).getState().getPosition());
StreamUtils.copy(in, new NullOutputStream(), tailFileObject.getState().getPosition());
} catch (final EOFException eof) {
// If we hit EOFException, then the file is smaller than we expected. Assume rollover.
getLogger().debug("When recovering state, file being tailed has less data than was stored in the state. "
+ "Assuming rollover. Will begin tailing current file from beginning.");
+ "Assuming rollover. Will begin tailing current file from beginning.");
}

final long checksumResult = in.getChecksum().getValue();
if (checksumResult == states.get(filePath).getExpectedRecoveryChecksum()) {
if (checksumResult == tailFileObject.getExpectedRecoveryChecksum()) {
// Checksums match. This means that we want to resume reading from where we left off.
// So we will populate the reader object so that it will be used in onTrigger. If the
// checksums do not match, then we will leave the reader object null, so that the next
Expand All @@ -632,12 +605,12 @@ private void recoverState(final ProcessContext context, final Map<String, String
+ "this indicates that the file has rotated. Will begin tailing current file from beginning.", existingTailFile.length(), position);
}

states.get(filePath).setState(new TailFileState(filePath, tailFile, reader, position, timestamp, length, checksum, ByteBuffer.allocate(preAllocatedBufferSize)));
tailFileObject.setState(new TailFileState(filePath, tailFile, reader, position, timestamp, length, checksum, ByteBuffer.allocate(preAllocatedBufferSize)));
} else {
resetState(filePath);
}

getLogger().debug("Recovered state {}", states.get(filePath).getState());
getLogger().debug("Recovered state {}", tailFileObject.getState());
}

private void resetState(final String filePath) {
Expand Down Expand Up @@ -685,7 +658,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final List<String> filesToTail = lookup(context);
final Scope scope = getStateScope(context);
final StateMap stateMap = session.getState(scope);
initStates(filesToTail, stateMap.toMap(), false, context.getProperty(START_POSITION).getValue());
initStates(filesToTail, stateMap.toMap(), false);
} catch (IOException e) {
getLogger().error("Exception raised while attempting to recover state about where the tailing last left off", e);
context.yield();
Expand Down Expand Up @@ -734,7 +707,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
StateMap sessionStateMap = session.getState(scope);
Map<String, String> sessionStates = new HashMap<>(sessionStateMap.toMap());
List<String> keysToRemove = collectKeysToBeRemoved(sessionStates);
sessionStates.keySet().removeAll(keysToRemove);
keysToRemove.forEach(sessionStates.keySet()::remove);
getLogger().debug("Removed {} references to nonexistent files from session's state map",
keysToRemove.size());
session.setState(sessionStates, scope);
Expand All @@ -747,13 +720,13 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
private List<String> collectKeysToBeRemoved(Map<String, String> sessionStates) {
List<String> keysToRemove = new ArrayList<>();
List<String> filesToRemove = sessionStates.entrySet().stream()
.filter(entry -> entry.getKey().endsWith("filename")
.filter(entry -> entry.getKey().endsWith(StateKeys.FILENAME)
&& !states.containsKey(entry.getValue()))
.map(Entry::getKey)
.toList();

for (String key : filesToRemove) {
final String prefix = StringUtils.substringBefore(key, "filename");
final String prefix = StringUtils.substringBefore(key, StateKeys.FILENAME);
keysToRemove.add(prefix + StateKeys.FILENAME);
keysToRemove.add(prefix + StateKeys.LENGTH);
keysToRemove.add(prefix + StateKeys.POSITION);
Expand Down Expand Up @@ -1180,33 +1153,19 @@ private List<File> getRolledOffFiles(final ProcessContext context, final long mi
final File file = path.toFile();
final long lastMod = file.lastModified();

if (file.lastModified() < minTimestamp) {
if (lastMod >= minTimestamp && !file.equals(tailFile)) {
rolledOffFiles.add(file);
} else {
getLogger().debug("Found rolled off file {} but its last modified timestamp is before the cutoff (Last Mod = {}, Cutoff = {}) so will not consume it",
file, lastMod, minTimestamp);

continue;
} else if (file.equals(tailFile)) {
continue;
}

rolledOffFiles.add(file);
}
}

// Sort files based on last modified timestamp. If same timestamp, use filename as a secondary sort, as often
// files that are rolled over are given a naming scheme that is lexicographically sort in the same order as the
// timestamp, such as yyyy-MM-dd-HH-mm-ss
rolledOffFiles.sort(new Comparator<>() {
@Override
public int compare(final File o1, final File o2) {
final int lastModifiedComp = Long.compare(o1.lastModified(), o2.lastModified());
if (lastModifiedComp != 0) {
return lastModifiedComp;
}

return o1.getName().compareTo(o2.getName());
}
});
rolledOffFiles.sort(Comparator.comparingLong(File::lastModified).thenComparing(File::getName));

return rolledOffFiles;
}
Expand All @@ -1228,21 +1187,8 @@ private void persistState(final Map<String, String> state, final ProcessSession
try {
final Scope scope = getStateScope(context);
final StateMap oldState = session == null ? context.getStateManager().getState(scope) : session.getState(scope);
Map<String, String> updatedState = new HashMap<>();

for (String key : oldState.toMap().keySet()) {
// These states are stored by older version of NiFi, and won't be used anymore.
// New states have 'file.<index>.' prefix.
if (TailFileState.StateKeys.CHECKSUM.equals(key)
|| TailFileState.StateKeys.FILENAME.equals(key)
|| TailFileState.StateKeys.POSITION.equals(key)
|| TailFileState.StateKeys.TIMESTAMP.equals(key)) {
getLogger().info("Removed state {}={} stored by older version of NiFi.", key, oldState.get(key));
continue;
}
updatedState.put(key, oldState.get(key));
}

Map<String, String> updatedState = new HashMap<>(oldState.toMap());
updatedState.putAll(state);

if (session == null) {
Expand All @@ -1251,7 +1197,7 @@ private void persistState(final Map<String, String> state, final ProcessSession
session.setState(updatedState, scope);
}
} catch (final IOException e) {
getLogger().warn("Some data may be duplicated on restart of NiFi since failed to store state", e);
getLogger().warn("Failed to store state due to {}; some data may be duplicated on restart of NiFi", e);
}
}

Expand Down Expand Up @@ -1403,7 +1349,7 @@ private boolean recoverRolledFiles(final ProcessContext context, final ProcessSe
final long millisSinceModified = getCurrentTimeMs() - newestFile.lastModified();
if (millisSinceModified < postRolloverTailMillis) {
getLogger().debug("Rolled over file {} (size={}, lastModified={}) was modified {} millis ago, which isn't long enough to consume file fully without taking line endings into " +
"account. Will do nothing will file for now.", newestFile, newestFile.length(), newestFile.lastModified(), millisSinceModified);
"account. Will do nothing for now.", newestFile, newestFile.length(), newestFile.lastModified(), millisSinceModified);
return true;
}

Expand Down Expand Up @@ -1718,7 +1664,7 @@ public String toString() {

public Map<String, String> toStateMap(int index) {
final String prefix = MAP_PREFIX + index + '.';
final Map<String, String> map = new HashMap<>(4);
final Map<String, String> map = HashMap.newHashMap(4);
map.put(prefix + StateKeys.FILENAME, filename);
map.put(prefix + StateKeys.POSITION, String.valueOf(position));
map.put(prefix + StateKeys.LENGTH, String.valueOf(length));
Expand All @@ -1741,7 +1687,7 @@ public long getRePos() {
}

@Override
public Throwable fillInStackTrace() {
public synchronized Throwable fillInStackTrace() {
return this;
}
}
Expand Down
Loading

0 comments on commit a44fb52

Please sign in to comment.