diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java index f1163ca522a2..c9993313e9d8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java @@ -412,11 +412,9 @@ public void recoverState(final ProcessContext context) throws IOException { final Scope scope = getStateScope(context); final StateMap stateMap = context.getStateManager().getState(scope); - final String startPosition = context.getProperty(START_POSITION).getValue(); - if (stateMap.getVersion() == -1L || stateMap.toMap().isEmpty()) { //state has been cleared or never stored so recover as 'empty state' - initStates(filesToTail, Collections.emptyMap(), true, startPosition); + initStates(filesToTail, Collections.emptyMap(), true); recoverState(context, filesToTail, Collections.emptyMap()); return; } @@ -424,14 +422,14 @@ public void recoverState(final ProcessContext context) throws IOException { Map statesMap = stateMap.toMap(); if (statesMap.containsKey(TailFileState.StateKeys.FILENAME) - && !statesMap.keySet().stream().anyMatch(key -> key.startsWith(MAP_PREFIX))) { + && statesMap.keySet().stream().noneMatch(key -> key.startsWith(MAP_PREFIX))) { // If statesMap contains "filename" key without "file.0." prefix, // and there's no key with "file." prefix, then // it indicates that the statesMap is created with earlier version of NiFi. // In this case, we need to migrate the state by adding prefix indexed with 0. final Map migratedStatesMap = new HashMap<>(statesMap.size()); - for (String key : statesMap.keySet()) { - migratedStatesMap.put(MAP_PREFIX + "0." + key, statesMap.get(key)); + for (Entry entry : statesMap.entrySet()) { + migratedStatesMap.put(MAP_PREFIX + "0." + entry.getKey(), entry.getValue()); } // LENGTH is added from NiFi 1.1.0. Set the value with using the last position so that we can use existing state @@ -442,11 +440,44 @@ public void recoverState(final ProcessContext context) throws IOException { getLogger().info("statesMap has been migrated. {}", migratedStatesMap); } - initStates(filesToTail, statesMap, false, startPosition); + initStates(filesToTail, statesMap, false); recoverState(context, filesToTail, statesMap); + removeLegacyStateEntries(context, statesMap, scope); } - private void initStates(final List filesToTail, final Map statesMap, final boolean isCleared, final String startPosition) { + /** + * Removes legacy state entries from versions prior to NiFi 1.0, where state keys were not indexed + * with file-specific prefixes. In NiFi 1.0 and older versions, tailing multiple files wasn't supported, + * so there was no need to associate state information with individual files. + * Newer versions use "file.[index]." prefixes to handle multiple tailed files, + * which makes the legacy state keys ('checksum', 'filename', 'position', and 'timestamp') obsolete. + * This method filters out these legacy keys and persists the updated state map without them. + * + * @param context the ProcessContext for accessing the state manager + * @param statesMap the current state map containing both legacy and new entries + * @param scope the scope (cluster or local) for state persistence + */ + private void removeLegacyStateEntries(final ProcessContext context, final Map statesMap, Scope scope) { + Map updatedStatesMap = new HashMap<>(); + for (Entry entry : statesMap.entrySet()) { + final String key = entry.getKey(); + if (TailFileState.StateKeys.CHECKSUM.equals(key) + || TailFileState.StateKeys.FILENAME.equals(key) + || TailFileState.StateKeys.POSITION.equals(key) + || TailFileState.StateKeys.TIMESTAMP.equals(key)) { + getLogger().info("Removed state {}={} stored by older version of NiFi.", key, entry.getValue()); + continue; + } + updatedStatesMap.put(key, entry.getValue()); + } + try { + context.getStateManager().setState(updatedStatesMap, scope); + } catch (IOException e) { + getLogger().warn("Failed to store state due to {}; some data may be duplicated on restart of NiFi", e); + } + } + + private void initStates(final List filesToTail, final Map statesMap, final boolean isCleared) { int fileIndex = 0; if (isCleared) { @@ -456,30 +487,34 @@ private void initStates(final List filesToTail, final Map entry : statesMap.entrySet()) { + final String key = entry.getKey(); + final String value = entry.getValue(); + if (key.endsWith(TailFileState.StateKeys.FILENAME) && filesToTail.contains(value)) { int index = Integer.parseInt(key.split("\\.")[1]); - states.put(statesMap.get(key), new TailFileObject(index, statesMap, preAllocatedBufferSize)); + states.put(value, new TailFileObject(index, statesMap, preAllocatedBufferSize)); } } } // first, we remove the files that are no longer present - final List toBeRemoved = new ArrayList(); - for (String file : states.keySet()) { - if(!filesToTail.contains(file)) { - toBeRemoved.add(file); - cleanReader(states.get(file)); + final List toBeRemoved = new ArrayList<>(); + for (Entry entry : states.entrySet()) { + final String filePath = entry.getKey(); + final TailFileObject tailFileObject = entry.getValue(); + if(!filesToTail.contains(filePath)) { + toBeRemoved.add(filePath); + cleanReader(tailFileObject); } } - states.keySet().removeAll(toBeRemoved); + toBeRemoved.forEach(states.keySet()::remove); // then we need to get the highest ID used so far to be sure // we don't mix different files in case we add new files to tail - for (String file : states.keySet()) { - if (fileIndex <= states.get(file).getFilenameIndex()) { - fileIndex = states.get(file).getFilenameIndex() + 1; + for (TailFileObject tfo : states.values()) { + if (fileIndex <= tfo.getFilenameIndex()) { + fileIndex = tfo.getFilenameIndex() + 1; } } @@ -497,7 +532,7 @@ private void initStates(final List filesToTail, final Map filesToTail, final Map map) throws IOException { for (String file : filesToTail) { - recoverState(context, map, file); + recoverState(map, file); } } @@ -546,7 +581,6 @@ private List getFilesToTail(final String baseDir, String fileRegex, bool * checksum, so that we are ready to proceed with the * {@link #onTrigger(ProcessContext, ProcessSession)} call. * - * @param context the ProcessContext * @param stateValues the values that were recovered from state that was * previously stored. This Map should be populated with the keys defined in * {@link TailFileState.StateKeys}. @@ -554,23 +588,15 @@ private List getFilesToTail(final String baseDir, String fileRegex, bool * @throws IOException if unable to seek to the appropriate location in the * tailed file. */ - private void recoverState(final ProcessContext context, final Map stateValues, final String filePath) throws IOException { - - final String prefix = MAP_PREFIX + states.get(filePath).getFilenameIndex() + '.'; - - if (!stateValues.containsKey(prefix + TailFileState.StateKeys.FILENAME)) { - resetState(filePath); - return; - } - if (!stateValues.containsKey(prefix + TailFileState.StateKeys.POSITION)) { - resetState(filePath); - return; - } - if (!stateValues.containsKey(prefix + TailFileState.StateKeys.TIMESTAMP)) { - resetState(filePath); - return; - } - if (!stateValues.containsKey(prefix + TailFileState.StateKeys.LENGTH)) { + private void recoverState(final Map stateValues, final String filePath) throws IOException { + final TailFileObject tailFileObject = states.get(filePath); + final String prefix = MAP_PREFIX + tailFileObject.getFilenameIndex() + '.'; + + // Combine all key checks into a single condition + if (!stateValues.containsKey(prefix + TailFileState.StateKeys.FILENAME) + || !stateValues.containsKey(prefix + TailFileState.StateKeys.POSITION) + || !stateValues.containsKey(prefix + TailFileState.StateKeys.TIMESTAMP) + || !stateValues.containsKey(prefix + TailFileState.StateKeys.LENGTH)) { resetState(filePath); return; } @@ -586,7 +612,7 @@ private void recoverState(final ProcessContext context, final Map= position) { - try (final InputStream tailFileIs = new FileInputStream(existingTailFile); - final CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum)) { + try (final InputStream tailFileIs = Files.newInputStream(existingTailFile.toPath()); + final CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum)) { try { - StreamUtils.copy(in, new NullOutputStream(), states.get(filePath).getState().getPosition()); + StreamUtils.copy(in, new NullOutputStream(), tailFileObject.getState().getPosition()); } catch (final EOFException eof) { // If we hit EOFException, then the file is smaller than we expected. Assume rollover. getLogger().debug("When recovering state, file being tailed has less data than was stored in the state. " @@ -606,7 +632,7 @@ private void recoverState(final ProcessContext context, final Map context.getProperty(LOOKUP_FREQUENCY).asTimePeriod(TimeUnit.MILLISECONDS)) { + long lookupFrequency = context.getProperty(LOOKUP_FREQUENCY).asTimePeriod(TimeUnit.MILLISECONDS); + if (timeSinceLastLookup > lookupFrequency) { try { final List filesToTail = lookup(context); final Scope scope = getStateScope(context); final StateMap stateMap = session.getState(scope); - initStates(filesToTail, stateMap.toMap(), false, context.getProperty(START_POSITION).getValue()); + initStates(filesToTail, stateMap.toMap(), false); } catch (IOException e) { getLogger().error("Exception raised while attempting to recover state about where the tailing last left off", e); context.yield(); @@ -712,7 +740,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session try { processTailFile(context, session, tailFile); } catch (NulCharacterEncounteredException e) { - getLogger().warn("NUL character encountered in " + tailFile + " and '" + REREAD_ON_NUL.getDisplayName() + "' is set to 'true', yielding."); + getLogger().warn("NUL character encountered in {} and '{}' is set to 'true', yielding.", tailFile, REREAD_ON_NUL.getDisplayName()); context.yield(); return; } @@ -731,27 +759,26 @@ public void onTrigger(final ProcessContext context, final ProcessSession session StateMap sessionStateMap = session.getState(scope); Map sessionStates = new HashMap<>(sessionStateMap.toMap()); List keysToRemove = collectKeysToBeRemoved(sessionStates); - sessionStates.keySet().removeAll(keysToRemove); + keysToRemove.forEach(sessionStates.keySet()::remove); getLogger().debug("Removed {} references to nonexistent files from session's state map", keysToRemove.size()); session.setState(sessionStates, scope); } catch (IOException e) { getLogger().error("Exception raised while attempting to cleanup session's state map", e); context.yield(); - return; } } private List collectKeysToBeRemoved(Map sessionStates) { List keysToRemove = new ArrayList<>(); List filesToRemove = sessionStates.entrySet().stream() - .filter(entry -> entry.getKey().endsWith("filename") - && !states.keySet().contains(entry.getValue())) + .filter(entry -> entry.getKey().endsWith(StateKeys.FILENAME) + && !states.containsKey(entry.getValue())) .map(Entry::getKey) .collect(toList()); for (String key : filesToRemove) { - final String prefix = StringUtils.substringBefore(key, "filename"); + final String prefix = StringUtils.substringBefore(key, StateKeys.FILENAME); keysToRemove.add(prefix + StateKeys.FILENAME); keysToRemove.add(prefix + StateKeys.LENGTH); keysToRemove.add(prefix + StateKeys.POSITION); @@ -790,7 +817,7 @@ private void processTailFile(final ProcessContext context, final ProcessSession final long position = file.length(); final long timestamp = file.lastModified() + 1; - try (final InputStream fis = new FileInputStream(file); + try (final InputStream fis = Files.newInputStream(file.toPath()); final CheckedInputStream in = new CheckedInputStream(fis, checksum)) { StreamUtils.copy(in, new NullOutputStream(), position); } @@ -799,7 +826,7 @@ private void processTailFile(final ProcessContext context, final ProcessSession cleanup(context); tfo.setState(new TailFileState(filename, file, fileChannel, position, timestamp, file.length(), checksum, tfo.getState().getBuffer())); } catch (final IOException ioe) { - getLogger().error("Attempted to position Reader at current position in file {} but failed to do so due to {}", file, ioe.toString(), ioe); + getLogger().error("Attempted to position Reader at current position in file {} but failed to do so", file, ioe); context.yield(); return; } @@ -1179,33 +1206,19 @@ private List getRolledOffFiles(final ProcessContext context, final long mi final File file = path.toFile(); final long lastMod = file.lastModified(); - if (file.lastModified() < minTimestamp) { + if (lastMod >= minTimestamp && !file.equals(tailFile)) { + rolledOffFiles.add(file); + } else { getLogger().debug("Found rolled off file {} but its last modified timestamp is before the cutoff (Last Mod = {}, Cutoff = {}) so will not consume it", file, lastMod, minTimestamp); - - continue; - } else if (file.equals(tailFile)) { - continue; } - - rolledOffFiles.add(file); } } // Sort files based on last modified timestamp. If same timestamp, use filename as a secondary sort, as often // files that are rolled over are given a naming scheme that is lexicographically sort in the same order as the // timestamp, such as yyyy-MM-dd-HH-mm-ss - rolledOffFiles.sort(new Comparator() { - @Override - public int compare(final File o1, final File o2) { - final int lastModifiedComp = Long.compare(o1.lastModified(), o2.lastModified()); - if (lastModifiedComp != 0) { - return lastModifiedComp; - } - - return o1.getName().compareTo(o2.getName()); - } - }); + rolledOffFiles.sort(Comparator.comparingLong(File::lastModified).thenComparing(File::getName)); return rolledOffFiles; } @@ -1227,21 +1240,8 @@ private void persistState(final Map state, final ProcessSession try { final Scope scope = getStateScope(context); final StateMap oldState = session == null ? context.getStateManager().getState(scope) : session.getState(scope); - Map updatedState = new HashMap<>(); - - for(String key : oldState.toMap().keySet()) { - // These states are stored by older version of NiFi, and won't be used anymore. - // New states have 'file..' prefix. - if (TailFileState.StateKeys.CHECKSUM.equals(key) - || TailFileState.StateKeys.FILENAME.equals(key) - || TailFileState.StateKeys.POSITION.equals(key) - || TailFileState.StateKeys.TIMESTAMP.equals(key)) { - getLogger().info("Removed state {}={} stored by older version of NiFi.", key, oldState.get(key)); - continue; - } - updatedState.put(key, oldState.get(key)); - } + Map updatedState = new HashMap<>(oldState.toMap()); updatedState.putAll(state); if (session == null) { @@ -1250,7 +1250,7 @@ private void persistState(final Map state, final ProcessSession session.setState(updatedState, scope); } } catch (final IOException e) { - getLogger().warn("Failed to store state; some data may be duplicated on restart of NiFi", e); + getLogger().warn("Failed to store state due to {}; some data may be duplicated on restart of NiFi", e); } } @@ -1274,7 +1274,7 @@ private FileChannel createReader(final File file, final long position) { try { reader.close(); getLogger().debug("Closed FileChannel {}", reader); - } catch (final IOException ioe2) { + } catch (final IOException ignored) { } return null; @@ -1402,7 +1402,7 @@ private boolean recoverRolledFiles(final ProcessContext context, final ProcessSe final long millisSinceModified = getCurrentTimeMs() - newestFile.lastModified(); if (millisSinceModified < postRolloverTailMillis) { getLogger().debug("Rolled over file {} (size={}, lastModified={}) was modified {} millis ago, which isn't long enough to consume file fully without taking line endings into " + - "account. Will do nothing will file for now.", newestFile, newestFile.length(), newestFile.lastModified(), millisSinceModified); + "account. Will do nothing for now.", newestFile, newestFile.length(), newestFile.lastModified(), millisSinceModified); return true; } @@ -1541,7 +1541,7 @@ private boolean tailRolledFile(final ProcessContext context, final ProcessSessio private TailFileState consumeFileFully(final File file, final ProcessContext context, final ProcessSession session, TailFileObject tfo) throws IOException { FlowFile flowFile = session.create(); - try (final InputStream fis = new FileInputStream(file)) { + try (final InputStream fis = Files.newInputStream(file.toPath())) { flowFile = session.write(flowFile, out -> { flushLinesBuffer(out, new CRC32()); StreamUtils.copy(fis, out); @@ -1579,7 +1579,7 @@ static class TailFileObject { private TailFileState state; private Long expectedRecoveryChecksum; - private int filenameIndex; + private final int filenameIndex; private boolean tailFileChanged = true; public TailFileObject(final int index, final TailFileState fileState) { @@ -1739,8 +1739,8 @@ public long getRePos() { } @Override - public Throwable fillInStackTrace() { + public synchronized Throwable fillInStackTrace() { return this; } } -} +} \ No newline at end of file