Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow: Fix indexing in Parquet dictionary encoded values readers #11247

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

wypoon
Copy link
Contributor

@wypoon wypoon commented Oct 2, 2024

This fixes #11221.

There is a bug in VectorizedDictionaryEncodedParquetValuesReader.BaseDictEncodedReader::nextBatch where nextVal of the BaseDictEncodedReader subclass is called with the incorrect index for certain subclasses (in particular, for FixedSizeBinaryDictEncodedReader), leading to the value being set at the incorrect index in the FieldVector that is used to hold the values. E.g., for a Decimal column that requires 16 bytes to store, the values are stored in 16-byte fixed length byte arrays and the typewidth is 16. FixedSizeBinaryDictEncodedReader::nextVal is called with index 0, 16, 32, 48, etc instead of 0, 1, 2, 3, etc.
The fix is to not premultiply the index by the typewidth before calling nextVal, and instead, in each nextVal method, to account for the typewidth as appropriate.

A test is included that fails without the fix and passes with it.

@github-actions github-actions bot added the arrow label Oct 2, 2024
@github-actions github-actions bot added the spark label Oct 5, 2024
@wypoon wypoon changed the title [DRAFT] Fix indexing in dictionary encoded Parquet readers Spark: Fix indexing in dictionary encoded Parquet readers Oct 5, 2024
@wypoon wypoon changed the title Spark: Fix indexing in dictionary encoded Parquet readers Arrow: Fix indexing in dictionary encoded Parquet readers Oct 5, 2024
@wypoon wypoon changed the title Arrow: Fix indexing in dictionary encoded Parquet readers Arrow: Fix indexing in Parquet dictionary encoded values readers Oct 5, 2024
@wypoon
Copy link
Contributor Author

wypoon commented Oct 5, 2024

The bug occurs when reading a Parquet column chunk with multiple pages where some but not all of the pages are dictionary encoded. In particular, Impala tries to use dictionary encoding where possible when writing Parquet data, and when the number of values in the dictionary exceeds 40,000, it then switches to plain encoding. The data file uploaded in #11221 is such a data file written by Impala.
I wanted to write data programmatically from the test, and I tried to get the Java (parquet-mr) Parquet writer to write such data, but I was not able to. With the v1 writer, I was not able to get it to write dictionary encoded data at all for decimal values stored as fixed length byte arrays. With the v2 writer, I was able to get it to write some pages with RLE dictionary encoding and some not dictionary encoded, but those other pages use a delta encoding, which Iceberg does not support (and thus I cannot use it to reproduce the bug).
Eventually, I used pyarrow's C++ writer to write a small data file (less than 4 KB) with 2 pages, each with 200 rows, one RLE dictionary encoded and one plain encoded. I think this is acceptably small to be checked in.

@wypoon
Copy link
Contributor Author

wypoon commented Oct 5, 2024

CI failed with

Execution failed for task ':iceberg-flink:iceberg-flink-1.20:compileJmhJava'.
> Java heap space

I have seen such Java heap space failure before in the CI. It is unrelated to my change.

@wypoon
Copy link
Contributor Author

wypoon commented Oct 5, 2024

@nastra @rymurr the bug was introduced in 2842f0b (#2746).
Assuming the code was correct before that refactoring, an analysis of the refactoring will show that my change is correct.
Before that refactoring, there was a lot of duplicated code that began

    int left = numValuesToRead;
    int idx = startOffset;
    while (left > 0) {
      if (this.currentCount == 0) {
        this.readNextGroup();
      }
      int num = Math.min(left, this.currentCount);

and then, in many (but not all) cases, calls of the form vector.getDataBuffer().setXXX(idx * typeWidth, ...) are made.
For those, we want to use idx * typeWidth. However, there are cases where we want to use idx in setting the value in the vector. E.g., in readBatchOfDictionaryEncodedFixedSizeBinary, we do

           ((FixedSizeBinaryVector) vector).set(idx, vectorBytes);

What the refactoring did was to use index = idx * typeWidth and call overridden nextVal methods with that index, so that in this case, we end up doing ((FixedSizeBinaryVector) vector).set(index, vectorBytes);.

My change is to not premultiply idx by typeWidth before making the nextVal calls and in the nextVal calls, to account for the typeWidth (which is also passed to nextVal anyway).

cc @rdblue

@nastra nastra self-requested a review October 7, 2024 06:59
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class TestParquetDictionaryEncodedVectorizedReads extends TestParquetVectorizedReads {

protected static SparkSession spark = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we test this without having to start up spark? Also it seems like IntBackedDecimalDictEncodedReader/ LongBackedDecimalDictEncodedReader / VarWidthBinaryDictEncodedReader / FixedLengthDecimalDictEncodedReader have been affected, so maybe there should be tests for these too?

Copy link
Contributor Author

@wypoon wypoon Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the Spark DataFrameReader to read the Parquet file is the easiest way for the test verification.

IIUC, if a decimal can be represented by 4 bytes, we use IntBackedDecimalDictEncodedReader and if it cannot be represented by 4 bytes but can be by 8 bytes, we use LongBackedDecimalDictEncodedReader, but can you please tell me what types use FixedLengthDecimalDictEncodedReader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, afaics, VectorizedParquetDefinitionLevelReader.FixedLengthDecimalReader is never used, and thus neither is VectorizedDictionaryEncodedParquetValuesReader.FixedLengthDecimalDictEncodedReader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, even VectorizedParquetDefinitionLevelReader.IntBackedDecimalReader and VectorizedParquetDefinitionLevelReader.LongBackedDecimalReader are never used, afait.
In VectorizedParquetDefinitionLevelReader, the methods fixedLengthDecimalReader(), intBackedDecimalReader() and longBackedDecimalReader() are never called.

I see now that VectorizedPageIterator.IntBackedDecimalPageReader, VectorizedPageIterator.LongBackedDecimalPageReader, VectorizedPageIterator::intBackedDecimalPageReader(), and VectorizedPageIterator::longBackedDecimalPageReader() were deprecated in #3249 and subsequently all removed in Iceberg 1.4.0. So we have some dead code now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we test this without having to add a Parquet file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have explained in a comment why I used a Parquet file. If you can tell me how to generate the Parquet file with the required conditions using the Java Parquet writer, I can generate it from the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not know the parquet-java code, but looking around it a bit, I see, e.g., https://github.com/apache/parquet-java/blob/apache-parquet-1.13.1/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java#L79, that suggests that the v1 writer does not support dictionary encoding for fixed_len_byte_array. This accords with my empirical experience. I already explained the issue with the v2 writer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wypoon you need to use the one from iceberg-parquet as Iceberg has its own Parquet reader/writer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nastra the code in iceberg-parquet still has to use the parquet-java code underneath to do the actual writing, and when using v1, dictionary encoding does not appear to be supported for fixed_len_byte_array.
When I use

    Schema schema =
        new Schema(Types.NestedField.required(1, "dec_38_0", Types.DecimalType.of(38, 0)));
    File parquetFile = File.createTempFile("junit", null, temp.toFile());
    assertThat(parquetFile.delete()).as("Delete should succeed").isTrue();

    Iterable<GenericData.Record> records = RandomData.generate(schema, 500, 0L, 0.0f);
    try (FileAppender<GenericData.Record> writer =
        Parquet.write(Files.localOutput(parquetFile))
            .schema(schema)
            .set(PARQUET_DICT_SIZE_BYTES, "2048")
            .set(PARQUET_PAGE_ROW_LIMIT, "100")
            .build()) {
      writer.addAll(records);
    }

that writes a Parquet file with

Column: dec_38_0
--------------------------------------------------------------------------------
  page   type  enc  count   avg size   size       rows     nulls   min / max
  0-0    data  G _  100     16.00 B    1.563 kB                    
  0-1    data  G _  100     16.00 B    1.563 kB                    
  0-2    data  G _  100     16.00 B    1.563 kB                    
  0-3    data  G _  100     16.00 B    1.563 kB                    
  0-4    data  G _  100     16.00 B    1.563 kB                    

while if I use v2:

    try (FileAppender<GenericData.Record> writer =
        Parquet.write(Files.localOutput(parquetFile))
            .schema(schema)
            .set(PARQUET_DICT_SIZE_BYTES, "2048")
            .set(PARQUET_PAGE_ROW_LIMIT, "100")
            .writerVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
            .build()) {
      writer.addAll(records);
    }

that writes a Parquet file with

Column: dec_38_0
--------------------------------------------------------------------------------
  page   type  enc  count   avg size   size       rows     nulls   min / max
  0-D    dict  G _  96      16.00 B    1.500 kB  
  0-1    data  _ R  100     0.93 B     93 B       100      0       
  0-2    data  _ D  100     15.56 B    1.520 kB   100      0       
  0-3    data  _ D  100     14.76 B    1.441 kB   100      0       
  0-4    data  _ D  100     15.47 B    1.511 kB   100      0       
  0-5    data  _ D  100     15.06 B    1.471 kB   100      0       

As you can see, I am using the APIs in iceberg-parquet, generating the same data in both cases, using the same dictionary size and page row limit; in the v1 case, plain encoding is used for all the pages, while in the v2 case, one page is written with dictionary encoding (unfortunately the other pages are written with DELTA_BYTE_ARRAY encoding).

Copy link
Contributor Author

@wypoon wypoon Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also tried

    try (FileAppender<GenericData.Record> writer =
        Parquet.write(Files.localOutput(parquetFile))
            .schema(schema)
            .createWriterFunc(ParquetAvroWriter::buildWriter)
            .set(PARQUET_DICT_SIZE_BYTES, "2048")
            .set(PARQUET_PAGE_ROW_LIMIT, "100")
            .build()) {
      writer.addAll(records);
    }

and debugged what happens in writer.
The following chain is called to get the ValuesWriter for the decimal column:
https://github.com/apache/parquet-java/blob/apache-parquet-1.13.1/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java#L84
https://github.com/apache/parquet-java/blob/apache-parquet-1.13.1/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java#L167
https://github.com/apache/parquet-java/blob/apache-parquet-1.13.1/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java#L52
https://github.com/apache/parquet-java/blob/apache-parquet-1.13.1/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java#L55
https://github.com/apache/parquet-java/blob/apache-parquet-1.13.1/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java#L80
which is the code I cited previously:

  private ValuesWriter getFixedLenByteArrayValuesWriter(ColumnDescriptor path) {
    // dictionary encoding was not enabled in PARQUET 1.0
    return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
  }

I am using the iceberg-parquet APIs to create a writer, but eventually it calls parquet-java code.

Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the changes look correct to me, but it would be great if we could generally improve testing coverage for the affected readers

@wypoon
Copy link
Contributor Author

wypoon commented Oct 7, 2024

@nastra thank you for reviewing.

I have added a test that exercises VectorizedDictionaryEncodedParquetValuesReader.VarWidthBinaryDictEncodedReader::nextVal. This is a test that actually passes both before and after the fix, for this is a case where typeWidth == -1.

The int backed, long backed, and fixed length decimal readers are all dead (never called) code. I'll put up a separate PR to remove the dead code.

@wypoon
Copy link
Contributor Author

wypoon commented Oct 8, 2024

@nastra please see #11276 for the dead code removal.

Comment on lines -63 to -65
if (typeWidth == -1) {
index = idx;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happens in the case of var width binary, so the call to VarWidthBinaryDictEncodedReader::nextVal works correctly in that case.
Of course, it still works correctly with the change.

}
// After this, parquetFile contains one column chunk of binary data in five pages,
// the first two RLE dictionary encoded, and the remaining three plain encoded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

.build()) {
writer.addAll(records);
}
// After this, parquetFile contains one column chunk of binary data in five pages,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please add a newline before this

Copy link
Contributor Author

@wypoon wypoon Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added blank line before and removed blank line after.
The reason I did it the way I did was that the comment applies to the code before the comment rather than the code following it (which is more usual).
I tweaked the text of the comment slightly so that "After this" is not read as applying to the code following the comment.

Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @wypoon for finding and fixing this.

@amogh-jahagirdar or @RussellSpitzer could you also take a look please?

@wypoon
Copy link
Contributor Author

wypoon commented Oct 16, 2024

@amogh-jahagirdar I see you that you plan to review; can you please do so?

@amogh-jahagirdar
Copy link
Contributor

@wypoon Sure, I will take a look tomorrow!

Comment on lines +157 to +162
Path path =
Paths.get(
getClass()
.getClassLoader()
.getResource("decimal_dict_and_plain_encoding.parquet")
.toURI());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way we can generate this parquet file in the test locally via existing writers instead of uploading this resource?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just saw this thread #11247 (comment) let me go through it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amogh-jahagirdar I have already explained this at length and convinced @nastra.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Spark vectorized read of Parquet produces incorrect result for a decimal column
3 participants