Skip to content

Commit

Permalink
feedback from review, switch to monotonic and simplify checks
Browse files Browse the repository at this point in the history
  • Loading branch information
fdosani committed Jun 13, 2024
1 parent 47e4f10 commit 7ee85ed
Showing 1 changed file with 8 additions and 26 deletions.
34 changes: 8 additions & 26 deletions datacompy/spark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,8 @@ def _dataframe_merge(self, ignore_spaces: bool) -> None:
LOG.debug("Duplicate rows found, deduping by order of remaining fields")
# setting internal index
LOG.info("Adding internal index to dataframes")
df1 = df1.withColumn(
"__index",
row_number().over(Window.orderBy(monotonically_increasing_id())) - 1,
)
df2 = df2.withColumn(
"__index",
row_number().over(Window.orderBy(monotonically_increasing_id())) - 1,
)
df1 = df1.withColumn("__index", monotonically_increasing_id())
df2 = df2.withColumn("__index", monotonically_increasing_id())

# Create order column for uniqueness of match
order_column = temp_column_name(df1, df2)
Expand Down Expand Up @@ -1113,26 +1107,14 @@ def _generate_id_within_group(
Original dataframe with the ID column that's unique in each group
"""
default_value = "DATACOMPY_NULL"
null_cols = [f"any(isnull({c}))" for c in join_columns]
default_cols = [f"any({c} == '{default_value}')" for c in join_columns]

if len(join_columns) > 1:
isnull_check = dataframe.select(
greatest(*[isnull(c) for c in join_columns]).alias("isnull")
).filter("isnull == True")
isdefault_check = dataframe.select(
greatest(*[col(c) == default_value for c in join_columns]).alias(
"isdefault"
)
).filter("isdefault == True")
else: # greatest doesn't work for single joincolumns
isnull_check = dataframe.select(isnull(*join_columns).alias("isnull")).filter(
"isnull == True"
)
isdefault_check = dataframe.select(
(col(*join_columns) == default_value).alias("isdefault")
).filter("isdefault == True")
null_check = any(list(dataframe.selectExpr(null_cols).first()))
default_check = any(list(dataframe.selectExpr(default_cols).first()))

if isnull_check.count() > 0:
if isdefault_check.count() > 0:
if null_check:
if default_check:
raise ValueError(f"{default_value} was found in your join columns")

return (
Expand Down

0 comments on commit 7ee85ed

Please sign in to comment.