From 2a5bc806741a47b3b39f19c30e600aaba977970d Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 25 Sep 2024 16:23:16 -0500 Subject: [PATCH] Update from_json to use null as line delimiter Signed-off-by: Robert (Bobby) Evans --- .../src/main/python/json_matrix_test.py | 32 +++++++++++++++++++ .../catalyst/json/rapids/GpuJsonScan.scala | 2 +- .../spark/sql/rapids/GpuJsonReadCommon.scala | 12 ++++++- .../spark/sql/rapids/GpuJsonToStructs.scala | 31 ++++++++++-------- 4 files changed, 62 insertions(+), 15 deletions(-) diff --git a/integration_tests/src/main/python/json_matrix_test.py b/integration_tests/src/main/python/json_matrix_test.py index 8cb952c7ced..fe89384c84f 100644 --- a/integration_tests/src/main/python/json_matrix_test.py +++ b/integration_tests/src/main/python/json_matrix_test.py @@ -564,6 +564,38 @@ def test_json_tuple_dec_locale_non_aribic(std_input_path): lambda spark : read_json_as_text(spark, std_input_path + '/' + WITH_DEC_LOCALE_NON_ARIBIC_FILE, "json").selectExpr('''json_tuple(json, "data")'''), conf =_enable_json_tuple_conf) +def json_df_with_whitespace(spark): + return spark.createDataFrame([ + ["""{"a":\n100}"""], + ["""{"a":\r101}"""], + ["""{"a":\t102}"""], + ["""{"a": 102}"""], + ["""\n"""], + ["""\r"""], + ["""\t"""], + [""" """], + ["""\r\n\t """], + ["""{"a":"\r200"}"""], + ["""{"a":"\n201"}"""], + ["""{"a":"\t202"}"""], + ["""{"a":" 202"}"""]], + StructType([StructField("json", StringType())])) + +@pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/16915') +def test_from_json_with_whitespace(): + assert_gpu_and_cpu_are_equal_collect( + lambda spark : json_df_with_whitespace(spark).selectExpr('*', 'from_json(json, "a STRING")'), + conf =_enable_json_to_structs_conf) + +def test_get_json_object_with_whitespace(): + assert_gpu_and_cpu_are_equal_collect( + lambda spark : json_df_with_whitespace(spark).selectExpr('*', 'get_json_object(json, "$.a")')) + +def test_json_tuple_with_whitespace(): + assert_gpu_and_cpu_are_equal_collect( + lambda spark : json_df_with_whitespace(spark).selectExpr('*', 'json_tuple(json, "a")'), + conf =_enable_json_tuple_conf) + # These are common files used by most of the tests. A few files are for specific types, but these are very targeted tests COMMON_TEST_FILES=[ "int_formatted.json", diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index 506b22a22ab..621822f0828 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -350,7 +350,7 @@ class JsonPartitionReader( maxBytesPerChunk, execMetrics, FilterEmptyHostLineBuffererFactory) { def buildJsonOptions(parsedOptions: JSONOptions): cudf.JSONOptions = - GpuJsonReadCommon.cudfJsonOptions(parsedOptions) + GpuJsonReadCommon.cudfJsonOptions(parsedOptions, None) /** * Read the host buffer to GPU table diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala index 8ac3feeee53..72cb1582eff 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala @@ -318,7 +318,8 @@ object GpuJsonReadCommon { } } - def cudfJsonOptions(options: JSONOptions): ai.rapids.cudf.JSONOptions = { + def cudfJsonOptions(options: JSONOptions, + delimOverride: Option[Char]): ai.rapids.cudf.JSONOptions = { // This is really ugly, but options.allowUnquotedControlChars is marked as private // and this is the only way I know to get it without even uglier tricks @scala.annotation.nowarn("msg=Java enum ALLOW_UNQUOTED_CONTROL_CHARS in " + @@ -326,6 +327,14 @@ object GpuJsonReadCommon { val allowUnquotedControlChars = options.buildJsonFactory() .isEnabled(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS) + val lineDelimiter = delimOverride.getOrElse { + options.lineSeparatorInRead.map { sep => + if (sep.length > 1) { + throw new IllegalArgumentException("Only 1 byte separators are supported") + } + sep(0).toChar + } .getOrElse('\n') + } ai.rapids.cudf.JSONOptions.builder() .withRecoverWithNull(true) .withMixedTypesAsStrings(true) @@ -336,6 +345,7 @@ object GpuJsonReadCommon { .withLeadingZeros(options.allowNumericLeadingZeros) .withNonNumericNumbers(options.allowNonNumericNumbers) .withUnquotedControlChars(allowUnquotedControlChars) + .withLineDelimiter(lineDelimiter) .build() } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala index e60aefb8d59..68bcd9674ef 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.rapids import ai.rapids.cudf -import ai.rapids.cudf.{BaseDeviceMemoryBuffer, ColumnVector, ColumnView, Cuda, DataSource, DeviceMemoryBuffer, HostMemoryBuffer, Scalar} +import ai.rapids.cudf.{BaseDeviceMemoryBuffer, ColumnVector, ColumnView, Cuda, DataSource, DeviceMemoryBuffer, HostMemoryBuffer, RegexProgram, Scalar} import com.nvidia.spark.rapids.{GpuColumnVector, GpuScalar, GpuUnaryExpression, HostAlloc} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.jni.JSONUtils @@ -81,6 +81,9 @@ case class GpuJsonToStructs( import GpuJsonReadCommon._ private lazy val emptyRowStr = constructEmptyRow(schema) + // the nul char should be very rare and make it so we never really have to worry + // about it showing up in real world data. + private val lineDelimiter = '\u0000' private def constructEmptyRow(schema: DataType): String = { schema match { @@ -94,8 +97,10 @@ case class GpuJsonToStructs( val stripped = if (input.getData == null) { input.incRefCount } else { - withResource(cudf.Scalar.fromString(" ")) { space => - input.strip(space) + // TODO is there a way to do this without a regexp + withResource(cudf.Scalar.fromString("")) { empty => + val prog = new RegexProgram("(?:^[ \t\r\n]+)|(?:[ \t\r\n]+$)") + input.replaceRegex(prog, empty) } } @@ -119,11 +124,11 @@ case class GpuJsonToStructs( } withResource(isLiteralNull) { _ => withResource(isLiteralNull.ifElse(emptyRow, nullsReplaced)) { cleaned => - checkForNewline(cleaned, "\n", "line separator") - checkForNewline(cleaned, "\r", "carriage return") + checkForLineDelimiter(cleaned) // add a newline to each JSON line - val withNewline = withResource(cudf.Scalar.fromString("\n")) { lineSep => + val withNewline = withResource( + cudf.Scalar.fromString("" + lineDelimiter)) { lineSep => withResource(ColumnVector.fromScalar(lineSep, cleaned.getRowCount.toInt)) { newLineCol => ColumnVector.stringConcatenate(Array[ColumnView](cleaned, newLineCol)) @@ -141,13 +146,13 @@ case class GpuJsonToStructs( } } - private def checkForNewline(cleaned: ColumnVector, newlineStr: String, name: String): Unit = { - withResource(cudf.Scalar.fromString(newlineStr)) { newline => - withResource(cleaned.stringContains(newline)) { hasNewline => - withResource(hasNewline.any()) { anyNewline => - if (anyNewline.isValid && anyNewline.getBoolean) { + private def checkForLineDelimiter(cleaned: ColumnVector): Unit = { + withResource(cudf.Scalar.fromString("" + lineDelimiter)) { delimiter => + withResource(cleaned.stringContains(delimiter)) { hasDelimiter => + withResource(hasDelimiter.any()) { anyDelimiter => + if (anyDelimiter.isValid && anyDelimiter.getBoolean) { throw new IllegalArgumentException( - s"We cannot currently support parsing JSON that contains a $name in it") + s"We cannot currently support parsing JSON that contains a NUL character in it") } } } @@ -160,7 +165,7 @@ case class GpuJsonToStructs( SQLConf.get.columnNameOfCorruptRecord) private lazy val jsonOptions = - GpuJsonReadCommon.cudfJsonOptions(parsedOptions) + GpuJsonReadCommon.cudfJsonOptions(parsedOptions, Some(lineDelimiter)) override protected def doColumnar(input: GpuColumnVector): cudf.ColumnVector = { schema match {