Skip to content

Commit

Permalink
Update from_json to use null as line delimiter
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <[email protected]>
  • Loading branch information
revans2 committed Sep 25, 2024
1 parent 6a9731f commit 2a5bc80
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 15 deletions.
32 changes: 32 additions & 0 deletions integration_tests/src/main/python/json_matrix_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,38 @@ def test_json_tuple_dec_locale_non_aribic(std_input_path):
lambda spark : read_json_as_text(spark, std_input_path + '/' + WITH_DEC_LOCALE_NON_ARIBIC_FILE, "json").selectExpr('''json_tuple(json, "data")'''),
conf =_enable_json_tuple_conf)

def json_df_with_whitespace(spark):
return spark.createDataFrame([
["""{"a":\n100}"""],
["""{"a":\r101}"""],
["""{"a":\t102}"""],
["""{"a": 102}"""],
["""\n"""],
["""\r"""],
["""\t"""],
[""" """],
["""\r\n\t """],
["""{"a":"\r200"}"""],
["""{"a":"\n201"}"""],
["""{"a":"\t202"}"""],
["""{"a":" 202"}"""]],
StructType([StructField("json", StringType())]))

@pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/16915')
def test_from_json_with_whitespace():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : json_df_with_whitespace(spark).selectExpr('*', 'from_json(json, "a STRING")'),
conf =_enable_json_to_structs_conf)

def test_get_json_object_with_whitespace():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : json_df_with_whitespace(spark).selectExpr('*', 'get_json_object(json, "$.a")'))

def test_json_tuple_with_whitespace():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : json_df_with_whitespace(spark).selectExpr('*', 'json_tuple(json, "a")'),
conf =_enable_json_tuple_conf)

# These are common files used by most of the tests. A few files are for specific types, but these are very targeted tests
COMMON_TEST_FILES=[
"int_formatted.json",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ class JsonPartitionReader(
maxBytesPerChunk, execMetrics, FilterEmptyHostLineBuffererFactory) {

def buildJsonOptions(parsedOptions: JSONOptions): cudf.JSONOptions =
GpuJsonReadCommon.cudfJsonOptions(parsedOptions)
GpuJsonReadCommon.cudfJsonOptions(parsedOptions, None)

/**
* Read the host buffer to GPU table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,14 +318,23 @@ object GpuJsonReadCommon {
}
}

def cudfJsonOptions(options: JSONOptions): ai.rapids.cudf.JSONOptions = {
def cudfJsonOptions(options: JSONOptions,
delimOverride: Option[Char]): ai.rapids.cudf.JSONOptions = {
// This is really ugly, but options.allowUnquotedControlChars is marked as private
// and this is the only way I know to get it without even uglier tricks
@scala.annotation.nowarn("msg=Java enum ALLOW_UNQUOTED_CONTROL_CHARS in " +
"Java enum Feature is deprecated")
val allowUnquotedControlChars =
options.buildJsonFactory()
.isEnabled(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS)
val lineDelimiter = delimOverride.getOrElse {
options.lineSeparatorInRead.map { sep =>
if (sep.length > 1) {
throw new IllegalArgumentException("Only 1 byte separators are supported")
}
sep(0).toChar
} .getOrElse('\n')
}
ai.rapids.cudf.JSONOptions.builder()
.withRecoverWithNull(true)
.withMixedTypesAsStrings(true)
Expand All @@ -336,6 +345,7 @@ object GpuJsonReadCommon {
.withLeadingZeros(options.allowNumericLeadingZeros)
.withNonNumericNumbers(options.allowNonNumericNumbers)
.withUnquotedControlChars(allowUnquotedControlChars)
.withLineDelimiter(lineDelimiter)
.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.rapids

import ai.rapids.cudf
import ai.rapids.cudf.{BaseDeviceMemoryBuffer, ColumnVector, ColumnView, Cuda, DataSource, DeviceMemoryBuffer, HostMemoryBuffer, Scalar}
import ai.rapids.cudf.{BaseDeviceMemoryBuffer, ColumnVector, ColumnView, Cuda, DataSource, DeviceMemoryBuffer, HostMemoryBuffer, RegexProgram, Scalar}
import com.nvidia.spark.rapids.{GpuColumnVector, GpuScalar, GpuUnaryExpression, HostAlloc}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.jni.JSONUtils
Expand Down Expand Up @@ -81,6 +81,9 @@ case class GpuJsonToStructs(
import GpuJsonReadCommon._

private lazy val emptyRowStr = constructEmptyRow(schema)
// the nul char should be very rare and make it so we never really have to worry
// about it showing up in real world data.
private val lineDelimiter = '\u0000'

private def constructEmptyRow(schema: DataType): String = {
schema match {
Expand All @@ -94,8 +97,10 @@ case class GpuJsonToStructs(
val stripped = if (input.getData == null) {
input.incRefCount
} else {
withResource(cudf.Scalar.fromString(" ")) { space =>
input.strip(space)
// TODO is there a way to do this without a regexp
withResource(cudf.Scalar.fromString("")) { empty =>
val prog = new RegexProgram("(?:^[ \t\r\n]+)|(?:[ \t\r\n]+$)")
input.replaceRegex(prog, empty)
}
}

Expand All @@ -119,11 +124,11 @@ case class GpuJsonToStructs(
}
withResource(isLiteralNull) { _ =>
withResource(isLiteralNull.ifElse(emptyRow, nullsReplaced)) { cleaned =>
checkForNewline(cleaned, "\n", "line separator")
checkForNewline(cleaned, "\r", "carriage return")
checkForLineDelimiter(cleaned)

// add a newline to each JSON line
val withNewline = withResource(cudf.Scalar.fromString("\n")) { lineSep =>
val withNewline = withResource(
cudf.Scalar.fromString("" + lineDelimiter)) { lineSep =>
withResource(ColumnVector.fromScalar(lineSep, cleaned.getRowCount.toInt)) {
newLineCol =>
ColumnVector.stringConcatenate(Array[ColumnView](cleaned, newLineCol))
Expand All @@ -141,13 +146,13 @@ case class GpuJsonToStructs(
}
}

private def checkForNewline(cleaned: ColumnVector, newlineStr: String, name: String): Unit = {
withResource(cudf.Scalar.fromString(newlineStr)) { newline =>
withResource(cleaned.stringContains(newline)) { hasNewline =>
withResource(hasNewline.any()) { anyNewline =>
if (anyNewline.isValid && anyNewline.getBoolean) {
private def checkForLineDelimiter(cleaned: ColumnVector): Unit = {
withResource(cudf.Scalar.fromString("" + lineDelimiter)) { delimiter =>
withResource(cleaned.stringContains(delimiter)) { hasDelimiter =>
withResource(hasDelimiter.any()) { anyDelimiter =>
if (anyDelimiter.isValid && anyDelimiter.getBoolean) {
throw new IllegalArgumentException(
s"We cannot currently support parsing JSON that contains a $name in it")
s"We cannot currently support parsing JSON that contains a NUL character in it")
}
}
}
Expand All @@ -160,7 +165,7 @@ case class GpuJsonToStructs(
SQLConf.get.columnNameOfCorruptRecord)

private lazy val jsonOptions =
GpuJsonReadCommon.cudfJsonOptions(parsedOptions)
GpuJsonReadCommon.cudfJsonOptions(parsedOptions, Some(lineDelimiter))

override protected def doColumnar(input: GpuColumnVector): cudf.ColumnVector = {
schema match {
Expand Down

0 comments on commit 2a5bc80

Please sign in to comment.