-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Arrow: Fix indexing in Parquet dictionary encoded values readers #11247
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -18,13 +18,21 @@ | |||
*/ | ||||
package org.apache.iceberg.spark.data.parquet.vectorized; | ||||
|
||||
import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES; | ||||
import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT; | ||||
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT; | ||||
import static org.assertj.core.api.Assertions.assertThat; | ||||
|
||||
import java.io.File; | ||||
import java.io.IOException; | ||||
import java.nio.file.Path; | ||||
import java.nio.file.Paths; | ||||
import java.util.Iterator; | ||||
import java.util.List; | ||||
import org.apache.avro.generic.GenericData; | ||||
import org.apache.iceberg.Files; | ||||
import org.apache.iceberg.Schema; | ||||
import org.apache.iceberg.io.CloseableIterable; | ||||
import org.apache.iceberg.io.FileAppender; | ||||
import org.apache.iceberg.parquet.Parquet; | ||||
import org.apache.iceberg.relocated.com.google.common.base.Function; | ||||
|
@@ -33,11 +41,35 @@ | |||
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||||
import org.apache.iceberg.relocated.com.google.common.collect.Iterables; | ||||
import org.apache.iceberg.spark.data.RandomData; | ||||
import org.apache.iceberg.spark.data.TestHelpers; | ||||
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; | ||||
import org.apache.iceberg.types.Types; | ||||
import org.apache.spark.sql.Dataset; | ||||
import org.apache.spark.sql.Row; | ||||
import org.apache.spark.sql.SparkSession; | ||||
import org.apache.spark.sql.vectorized.ColumnarBatch; | ||||
import org.junit.jupiter.api.AfterAll; | ||||
import org.junit.jupiter.api.BeforeAll; | ||||
import org.junit.jupiter.api.Disabled; | ||||
import org.junit.jupiter.api.Test; | ||||
|
||||
public class TestParquetDictionaryEncodedVectorizedReads extends TestParquetVectorizedReads { | ||||
|
||||
protected static SparkSession spark = null; | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we test this without having to start up spark? Also it seems like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using the Spark IIUC, if a decimal can be represented by 4 bytes, we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, afaics, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, even I see now that |
||||
|
||||
@BeforeAll | ||||
public static void startSpark() { | ||||
spark = SparkSession.builder().master("local[2]").getOrCreate(); | ||||
} | ||||
|
||||
@AfterAll | ||||
public static void stopSpark() { | ||||
if (spark != null) { | ||||
spark.stop(); | ||||
spark = null; | ||||
} | ||||
} | ||||
|
||||
@Override | ||||
Iterable<GenericData.Record> generateData( | ||||
Schema schema, | ||||
|
@@ -93,4 +125,64 @@ public void testMixedDictionaryNonDictionaryReads() throws IOException { | |||
true, | ||||
BATCH_SIZE); | ||||
} | ||||
|
||||
@Test | ||||
public void testBinaryNotAllPagesDictionaryEncoded() throws IOException { | ||||
Schema schema = new Schema(Types.NestedField.required(1, "bytes", Types.BinaryType.get())); | ||||
File parquetFile = File.createTempFile("junit", null, temp.toFile()); | ||||
assertThat(parquetFile.delete()).as("Delete should succeed").isTrue(); | ||||
|
||||
Iterable<GenericData.Record> records = RandomData.generateFallbackData(schema, 500, 0L, 100); | ||||
try (FileAppender<GenericData.Record> writer = | ||||
Parquet.write(Files.localOutput(parquetFile)) | ||||
.schema(schema) | ||||
.set(PARQUET_DICT_SIZE_BYTES, "4096") | ||||
.set(PARQUET_PAGE_ROW_LIMIT, "100") | ||||
.build()) { | ||||
writer.addAll(records); | ||||
} | ||||
|
||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||
// After the above, parquetFile contains one column chunk of binary data in five pages, | ||||
// the first two RLE dictionary encoded, and the remaining three plain encoded. | ||||
assertRecordsMatch(schema, 500, records, parquetFile, true, BATCH_SIZE); | ||||
} | ||||
|
||||
/** | ||||
* decimal_dict_and_plain_encoding.parquet contains one column chunk of decimal(38, 0) data in two | ||||
* pages, one RLE dictionary encoded and one plain encoded, each with 200 rows. | ||||
*/ | ||||
@Test | ||||
public void testDecimalNotAllPagesDictionaryEncoded() throws Exception { | ||||
Schema schema = new Schema(Types.NestedField.required(1, "id", Types.DecimalType.of(38, 0))); | ||||
Path path = | ||||
Paths.get( | ||||
getClass() | ||||
.getClassLoader() | ||||
.getResource("decimal_dict_and_plain_encoding.parquet") | ||||
.toURI()); | ||||
Comment on lines
+157
to
+162
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any way we can generate this parquet file in the test locally via existing writers instead of uploading this resource? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just saw this thread #11247 (comment) let me go through it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @amogh-jahagirdar I have already explained this at length and convinced @nastra. |
||||
|
||||
Dataset<Row> df = spark.read().parquet(path.toString()); | ||||
List<Row> expected = df.collectAsList(); | ||||
long expectedSize = df.count(); | ||||
|
||||
Parquet.ReadBuilder readBuilder = | ||||
Parquet.read(Files.localInput(path.toFile())) | ||||
.project(schema) | ||||
.createBatchedReaderFunc( | ||||
type -> | ||||
VectorizedSparkParquetReaders.buildReader( | ||||
schema, type, ImmutableMap.of(), null)); | ||||
|
||||
try (CloseableIterable<ColumnarBatch> batchReader = readBuilder.build()) { | ||||
Iterator<Row> expectedIter = expected.iterator(); | ||||
Iterator<ColumnarBatch> batches = batchReader.iterator(); | ||||
int numRowsRead = 0; | ||||
while (batches.hasNext()) { | ||||
ColumnarBatch batch = batches.next(); | ||||
numRowsRead += batch.numRows(); | ||||
TestHelpers.assertEqualsBatchWithRows(schema.asStruct(), expectedIter, batch); | ||||
} | ||||
assertThat(numRowsRead).isEqualTo(expectedSize); | ||||
} | ||||
} | ||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This happens in the case of var width binary, so the call to
VarWidthBinaryDictEncodedReader::nextVal
works correctly in that case.Of course, it still works correctly with the change.