Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-12674 Modified ValidateCSV to make the schema optional if a head… #8362

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
*/
package org.apache.nifi.processors.standard;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -51,8 +52,6 @@
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.supercsv.cellprocessor.Optional;
import org.supercsv.cellprocessor.ParseBigDecimal;
Expand Down Expand Up @@ -103,6 +102,7 @@ public class ValidateCsv extends AbstractProcessor {

private static final String routeWholeFlowFile = "FlowFile validation";
private static final String routeLinesIndividually = "Line by line validation";
private static final String routeByAttribute = "Attribute validation";

public static final AllowableValue VALIDATE_WHOLE_FLOWFILE = new AllowableValue(routeWholeFlowFile, routeWholeFlowFile,
"As soon as an error is found in the CSV file, the validation will stop and the whole flow file will be routed to the 'invalid'"
Expand All @@ -120,11 +120,20 @@ public class ValidateCsv extends AbstractProcessor {
.description("The schema to be used for validation. Is expected a comma-delimited string representing the cell "
+ "processors to apply. The following cell processors are allowed in the schema definition: "
+ allowedOperators.toString() + ". Note: cell processors cannot be nested except with Optional.")
.required(true)
.required(false)
Copy link
Contributor

@mattyb149 mattyb149 Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have a dependsOn(HEADER, "false"), unless you can provide the explicit schema yet there be a header line that should be ignored. If that's the case maybe update the documentation for Header to reflect the current behavior based on the different combinations of settings.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the "unless" part is true here. But now there are merge conflicts, sorry I lost track of this

.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();

public static final PropertyDescriptor CSV_SOURCE_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("CSV Source Attribute")
.displayName("CSV Source Attribute")
.description("The name of the attribute containing CSV data to be validated. If this property is blank, the FlowFile content will be validated.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
.build();

public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder()
.name("validate-csv-header")
.displayName("Header")
Expand Down Expand Up @@ -204,6 +213,7 @@ public class ValidateCsv extends AbstractProcessor {
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(SCHEMA);
properties.add(CSV_SOURCE_ATTRIBUTE);
properties.add(HEADER);
properties.add(DELIMITER_CHARACTER);
properties.add(QUOTE_CHARACTER);
Expand Down Expand Up @@ -232,6 +242,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
protected Collection<ValidationResult> customValidate(ValidationContext context) {

PropertyValue schemaProp = context.getProperty(SCHEMA);
PropertyValue headerProp = context.getProperty(HEADER);
String schema = schemaProp.getValue();
String subject = SCHEMA.getName();

Expand All @@ -240,7 +251,11 @@ protected Collection<ValidationResult> customValidate(ValidationContext context)
}
// If no Expression Language is present, try parsing the schema
try {
this.parseSchema(schema);
if (schema != null) {
this.parseSchema(schema);
} else if (!headerProp.asBoolean()) {
throw(new Exception("Schema cannot be empty if header is false."));
Freedom9339 marked this conversation as resolved.
Show resolved Hide resolved
}
} catch (Exception e) {
final List<ValidationResult> problems = new ArrayList<>(1);
problems.add(new ValidationResult.Builder().subject(subject)
Expand Down Expand Up @@ -458,155 +473,147 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final CsvPreference csvPref = getPreference(context, flowFile);
final boolean header = context.getProperty(HEADER).asBoolean();
final ComponentLog logger = getLogger();
final String schema = context.getProperty(SCHEMA).evaluateAttributeExpressions(flowFile).getValue();
final CellProcessor[] cellProcs = this.parseSchema(schema);
final boolean isWholeFFValidation = context.getProperty(VALIDATION_STRATEGY).getValue().equals(VALIDATE_WHOLE_FLOWFILE.getValue());
String schema = context.getProperty(SCHEMA).evaluateAttributeExpressions(flowFile).getValue();
CellProcessor[] cellProcs = null;
if (schema != null) {
cellProcs = this.parseSchema(schema);
}
final String validationStrategy = context.getProperty(VALIDATION_STRATEGY).getValue();
final boolean isWholeFFValidation = !validationStrategy.equals(VALIDATE_LINES_INDIVIDUALLY.getValue());
final boolean includeAllViolations = context.getProperty(INCLUDE_ALL_VIOLATIONS).asBoolean();

final AtomicReference<Boolean> valid = new AtomicReference<Boolean>(true);
boolean valid = true;
int okCount = 0;
int totalCount = 0;
FlowFile invalidFF = null;
FlowFile validFF = null;
String validationError = null;
final AtomicReference<Boolean> isFirstLineValid = new AtomicReference<Boolean>(true);
final AtomicReference<Boolean> isFirstLineInvalid = new AtomicReference<Boolean>(true);
final AtomicReference<Integer> okCount = new AtomicReference<Integer>(0);
final AtomicReference<Integer> totalCount = new AtomicReference<Integer>(0);
final AtomicReference<FlowFile> invalidFF = new AtomicReference<FlowFile>(null);
final AtomicReference<FlowFile> validFF = new AtomicReference<FlowFile>(null);
final AtomicReference<String> validationError = new AtomicReference<String>(null);

if (!isWholeFFValidation) {
invalidFF.set(session.create(flowFile));
validFF.set(session.create(flowFile));
invalidFF = session.create(flowFile);
validFF = session.create(flowFile);
}

InputStream stream;
if (context.getProperty(CSV_SOURCE_ATTRIBUTE).isSet()) {
stream = new ByteArrayInputStream(flowFile.getAttribute(context.getProperty(CSV_SOURCE_ATTRIBUTE).getValue()).getBytes(StandardCharsets.UTF_8));
Copy link
Contributor

@jrsteinebrey jrsteinebrey Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Freedom9339 Thanks for making that change. It turned out well except you need to call .evaluateAttributeExpressions() without passing a flowfile after the .getProperty(CSV_SOURCE_ATTRIBUTE) call. After that, the change looks complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Freedom9339 Thanks for your contribution.

} else {
stream = session.read(flowFile);
}

session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final NifiCsvListReader listReader = new NifiCsvListReader(new InputStreamReader(in), csvPref)) {

// handling of header
if (header) {

// read header
listReader.read();

if (!isWholeFFValidation) {
invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(print(listReader.getUntokenizedRow(), csvPref, true));
}
}));
validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(print(listReader.getUntokenizedRow(), csvPref, true));
}
}));
try (final NifiCsvListReader listReader = new NifiCsvListReader(new InputStreamReader(stream), csvPref)) {

// handling of header
if (header) {

// read header
List<String> headers = listReader.read();

if (schema == null) {
String newSchema = "Optional(StrNotNullOrEmpty()),".repeat(headers.size());
schema = newSchema.substring(0, newSchema.length() - 1);
cellProcs = this.parseSchema(schema);
}

if (!isWholeFFValidation) {
invalidFF = session.append(invalidFF, out -> out.write(print(listReader.getUntokenizedRow(), csvPref, true)));
validFF = session.append(validFF, out -> out.write(print(listReader.getUntokenizedRow(), csvPref, true)));
isFirstLineValid.set(false);
isFirstLineInvalid.set(false);
}
}

boolean stop = false;

while (!stop) {
try {

// read next row and check if no more row
stop = listReader.read(includeAllViolations && valid, cellProcs) == null;

if (!isWholeFFValidation && !stop) {
validFF = session.append(validFF, out -> out.write(print(listReader.getUntokenizedRow(), csvPref, isFirstLineValid.get())));
okCount++;

if (isFirstLineValid.get()) {
isFirstLineValid.set(false);
isFirstLineInvalid.set(false);
}
}
} catch (final SuperCsvException e) {
valid = false;
if (isWholeFFValidation) {
validationError = e.getLocalizedMessage();
logger.debug("Failed to validate {} against schema due to {}; routing to 'invalid'", flowFile, e);
break;
} else {
// we append the invalid line to the flow file that will be routed to invalid relationship
invalidFF = session.append(invalidFF, out -> out.write(print(listReader.getUntokenizedRow(), csvPref, isFirstLineInvalid.get())));

boolean stop = false;

while (!stop) {
try {

// read next row and check if no more row
stop = listReader.read(includeAllViolations && valid.get(), cellProcs) == null;

if (!isWholeFFValidation && !stop) {
validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(print(listReader.getUntokenizedRow(), csvPref, isFirstLineValid.get()));
}
}));
okCount.set(okCount.get() + 1);

if (isFirstLineValid.get()) {
isFirstLineValid.set(false);
}
}

} catch (final SuperCsvException e) {
valid.set(false);
if (isWholeFFValidation) {
validationError.set(e.getLocalizedMessage());
logger.debug("Failed to validate {} against schema due to {}; routing to 'invalid'", flowFile, e);
break;
} else {
// we append the invalid line to the flow file that will be routed to invalid relationship
invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(print(listReader.getUntokenizedRow(), csvPref, isFirstLineInvalid.get()));
}
}));

if (isFirstLineInvalid.get()) {
isFirstLineInvalid.set(false);
}

if (validationError.get() == null) {
validationError.set(e.getLocalizedMessage());
}
}
} finally {
if (!isWholeFFValidation) {
totalCount.set(totalCount.get() + 1);
}
if (isFirstLineInvalid.get()) {
isFirstLineInvalid.set(false);
}
}

} catch (final IOException e) {
valid.set(false);
logger.error("Failed to validate {} against schema due to {}", flowFile, e);
if (validationError == null) {
validationError = e.getLocalizedMessage();
}
}
} finally {
if (!isWholeFFValidation) {
totalCount++;
}
}
}
});

} catch (final IOException e) {
valid = false;
logger.error("Failed to validate {} against schema due to {}", flowFile, e);
}

if (isWholeFFValidation) {
if (valid.get()) {
if (valid) {
logger.debug("Successfully validated {} against schema; routing to 'valid'", flowFile);
session.getProvenanceReporter().route(flowFile, REL_VALID);
session.transfer(flowFile, REL_VALID);
} else {
session.getProvenanceReporter().route(flowFile, REL_INVALID);
session.putAttribute(flowFile, "validation.error.message", validationError.get());
session.putAttribute(flowFile, "validation.error.message", validationError);
session.transfer(flowFile, REL_INVALID);
}
} else {
if (valid.get()) {
logger.debug("Successfully validated {} against schema; routing to 'valid'", validFF.get());
session.getProvenanceReporter().route(validFF.get(), REL_VALID, "All " + totalCount.get() + " line(s) are valid");
session.putAttribute(validFF.get(), "count.valid.lines", Integer.toString(totalCount.get()));
session.putAttribute(validFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
session.transfer(validFF.get(), REL_VALID);
session.remove(invalidFF.get());
if (valid) {
logger.debug("Successfully validated {} against schema; routing to 'valid'", validFF);
session.getProvenanceReporter().route(validFF, REL_VALID, "All " + totalCount + " line(s) are valid");
session.putAttribute(validFF, "count.valid.lines", Integer.toString(totalCount));
session.putAttribute(validFF, "count.total.lines", Integer.toString(totalCount));
session.transfer(validFF, REL_VALID);
session.remove(invalidFF);
session.remove(flowFile);
} else if (okCount.get() != 0) {
} else if (okCount != 0) {
// because of the finally within the 'while' loop
totalCount.set(totalCount.get() - 1);

logger.debug("Successfully validated {}/{} line(s) in {} against schema; routing valid lines to 'valid' and invalid lines to 'invalid'", okCount.get(), totalCount.get(), flowFile);
session.getProvenanceReporter().route(validFF.get(), REL_VALID, okCount.get() + " valid line(s)");
session.putAttribute(validFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
session.putAttribute(validFF.get(), "count.valid.lines", Integer.toString(okCount.get()));
session.transfer(validFF.get(), REL_VALID);
session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, (totalCount.get() - okCount.get()) + " invalid line(s)");
session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString((totalCount.get() - okCount.get())));
session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
session.putAttribute(invalidFF.get(), "validation.error.message", validationError.get());
session.transfer(invalidFF.get(), REL_INVALID);
totalCount--;

logger.debug("Successfully validated {}/{} line(s) in {} against schema; routing valid lines to 'valid' and invalid lines to 'invalid'",
okCount, totalCount, flowFile);
session.getProvenanceReporter().route(validFF, REL_VALID, okCount + " valid line(s)");
session.putAttribute(validFF, "count.total.lines", Integer.toString(totalCount));
session.putAttribute(validFF, "count.valid.lines", Integer.toString(okCount));
session.transfer(validFF, REL_VALID);
session.getProvenanceReporter().route(invalidFF, REL_INVALID, (totalCount - okCount) + " invalid line(s)");
session.putAttribute(invalidFF, "count.invalid.lines", Integer.toString((totalCount - okCount)));
session.putAttribute(invalidFF, "count.total.lines", Integer.toString(totalCount));
session.putAttribute(invalidFF, "validation.error.message", validationError);
session.transfer(invalidFF, REL_INVALID);
session.remove(flowFile);
} else {
logger.debug("All lines in {} are invalid; routing to 'invalid'", invalidFF.get());
session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, "All " + totalCount.get() + " line(s) are invalid");
session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString(totalCount.get()));
session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
session.putAttribute(invalidFF.get(), "validation.error.message", validationError.get());
session.transfer(invalidFF.get(), REL_INVALID);
session.remove(validFF.get());
logger.debug("All lines in {} are invalid; routing to 'invalid'", invalidFF);
session.getProvenanceReporter().route(invalidFF, REL_INVALID, "All " + totalCount + " line(s) are invalid");
session.putAttribute(invalidFF, "count.invalid.lines", Integer.toString(totalCount));
session.putAttribute(invalidFF, "count.total.lines", Integer.toString(totalCount));
session.putAttribute(invalidFF, "validation.error.message", validationError);
session.transfer(invalidFF, REL_INVALID);
session.remove(validFF);
session.remove(flowFile);
}
}
Expand Down
Loading