Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pandas on Spark refactor #275

Merged
merged 17 commits into from
Mar 25, 2024
Merged

Pandas on Spark refactor #275

merged 17 commits into from
Mar 25, 2024

Conversation

fdosani
Copy link
Member

@fdosani fdosani commented Mar 5, 2024

So a major update here.

  • Deprecating SparkCompare into LegacySparkCompare
  • SparkCompare will now implement the Pandas on Spark API to better align with Pandas and Polars.
  • Bump up to 0.12.0, Support Pandas 2+ and Spark 3.5+ only.

Closes #274
Closes #13

@fdosani fdosani added enhancement New feature or request help wanted Extra attention is needed spark labels Mar 5, 2024
@fdosani fdosani marked this pull request as ready for review March 6, 2024 00:01
@fdosani
Copy link
Member Author

fdosani commented Mar 6, 2024

Just want to share some benchmarks from my personal computer (8 cores, 32GB).

5 millions rows, 10 columns:

-------------------------------------------------------------------------------- benchmark: 4 tests -------------------------------------------------------------------------------
Name (time in s)         Min                Max               Mean            StdDev             Median               IQR            Outliers     OPS            Rounds  Iterations
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_polars           6.0296 (1.0)       6.1883 (1.0)       6.0927 (1.0)      0.0626 (1.0)       6.0791 (1.0)      0.0891 (1.0)           2;0  0.1641 (1.0)           5           1
test_pandas           6.8771 (1.14)      7.1202 (1.15)      6.9674 (1.14)     0.1132 (1.81)      6.9055 (1.14)     0.1948 (2.19)          1;0  0.1435 (0.87)          5           1
test_fugue           38.6459 (6.41)     40.8522 (6.60)     39.2254 (6.44)     0.9242 (14.77)    38.8499 (6.39)     0.8269 (9.28)          1;1  0.0255 (0.16)          5           1
test_koalas          65.8145 (10.92)    77.4109 (12.51)    68.7251 (11.28)    4.8806 (77.99)    66.9013 (11.01)    3.4088 (38.26)         1;1  0.0146 (0.09)          5           1
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

10 Million rows, 10 columns:

---------------------------------------------------------------------------------- benchmark: 4 tests ---------------------------------------------------------------------------------
Name (time in s)          Min                 Max                Mean            StdDev              Median               IQR            Outliers     OPS            Rounds  Iterations
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_polars           12.2832 (1.0)       12.3702 (1.0)       12.3223 (1.0)      0.0374 (1.0)       12.3134 (1.0)      0.0653 (1.0)           2;0  0.0812 (1.0)           5           1
test_pandas           14.0272 (1.14)      14.6079 (1.18)      14.2955 (1.16)     0.2492 (6.66)      14.2488 (1.16)     0.4386 (6.72)          2;0  0.0700 (0.86)          5           1
test_fugue            86.6508 (7.05)      94.9336 (7.67)      90.0143 (7.30)     3.5726 (95.56)     88.5212 (7.19)     5.9401 (91.01)         1;0  0.0111 (0.14)          5           1
test_koalas          117.1630 (9.54)     129.0155 (10.43)    120.2190 (9.76)     5.0000 (133.74)   117.8006 (9.57)     4.4314 (67.90)         1;1  0.0083 (0.10)          5           1
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Polars seems to win the horse race here and can handle fairly large data frames on my modest machine.
I'm thinking we can start with the implementation for Pandas on Spark and keep refining it to make performance improvements.

@fdosani fdosani marked this pull request as draft March 7, 2024 19:16
@fdosani
Copy link
Member Author

fdosani commented Mar 7, 2024

Converting this back to a draft. The minimal spark version support is a bit concerning. I think I might take another stab at the native PySpark version over Pandas on PySpark

UPDATE: I've spent more time than I'd like to admit, to get some form of backwards computability with older Spark version and Pandas 2.0+. Not sure, it is worth spending more time on this.

@ak-gupta @jdawang @NikhilJArora if you don't have any objections to only being Spark 3.5+ compatible in 0.12.0 on wards we can flag this PR as ready for review. The other option is to focus on the following branch: https://github.com/capitalone/datacompy/tree/spark-compare-rewrite

@fdosani
Copy link
Member Author

fdosani commented Mar 8, 2024

I actually was able to track down the support ticket. So Pandas 2.0 isn't supported until Spark 4. See here: https://issues.apache.org/jira/browse/SPARK-44101

@fdosani fdosani marked this pull request as ready for review March 8, 2024 14:54
@CLAassistant
Copy link

CLAassistant commented Mar 12, 2024

CLA assistant check
All committers have signed the CLA.

@fdosani
Copy link
Member Author

fdosani commented Mar 12, 2024

20 executors, 8 cores, 32 GB

------------------------------------------------------------------------------------- benchmark: 5 tests ------------------------------------------------------------------------------------
Name (time in s)                  Min                 Max               Mean            StdDev             Median               IQR            Outliers     OPS            Rounds  Iterations
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_core_spark_100K[200]     10.3011 (1.0)       14.1748 (1.0)      11.1113 (1.0)      1.1416 (1.11)     10.7997 (1.0)      0.7238 (1.0)           1;1  0.0900 (1.0)          10           1
test_core_spark_1000[200]     11.0437 (1.07)      33.4789 (2.36)     15.2838 (1.38)     6.8270 (6.62)     12.2837 (1.14)     5.5838 (7.71)          1;1  0.0654 (0.73)         10           1
test_core_spark_10M[200]      19.3565 (1.88)      23.1678 (1.63)     20.6274 (1.86)     1.1300 (1.10)     20.3513 (1.88)     1.0729 (1.48)          2;1  0.0485 (0.54)         10           1
test_core_spark_50M[200]      56.4752 (5.48)      59.1063 (4.17)     57.5735 (5.18)     1.0310 (1.0)      57.4501 (5.32)     2.0479 (2.83)          5;0  0.0174 (0.19)         10           1
test_core_spark_100M[200]     92.2887 (8.96)     111.3570 (7.86)     96.3204 (8.67)     5.6601 (5.49)     94.6446 (8.76)     2.6470 (3.66)          1;1  0.0104 (0.12)         10           1
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

@fdosani fdosani mentioned this pull request Mar 25, 2024
Copy link
Contributor

@gladysteh99 gladysteh99 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

README.md Outdated Show resolved Hide resolved
@gladysteh99 gladysteh99 self-requested a review March 25, 2024 17:03
Copy link
Contributor

@jdawang jdawang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few nitpicky comments if you'll indulge me and one question. We're purposefully removing LegacySparkCompare docs because we don't want people to start using it right?

@@ -25,4 +25,4 @@
unq_columns,
)
from datacompy.polars import PolarsCompare
from datacompy.spark import NUMERIC_SPARK_TYPES, SparkCompare
from datacompy.spark import SparkCompare
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add newline

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

pass # Let non-Spark people at least enjoy the loveliness of the pandas datacompy functionality


warn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The warning message doesn't seem accurate if I understand correctly? This is in the legacy module so it seems like we should either delete this message from this module or change the string to something warning that we're going to delete legacy functionality in a vx.x.x release.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a tweak.

@@ -19,11 +19,10 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: [3.8, 3.9, '3.10', '3.11']
spark-version: [3.1.3, 3.2.4, 3.3.4, 3.4.2, 3.5.0]
python-version: [3.9, '3.10', '3.11']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're not testing on py38, I think this would warrant a formal declaration of dropping support + changing supported versions in the pyproject.toml. Learned this the hard way haha.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making changes in pyproject.toml

class SparkCompare:
"""Comparison class used to compare two Spark Dataframes.
class SparkCompare(BaseCompare):
"""Comparison class to be used to compare whether two Pandas on Spark dataframes as equal.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpicky grammar, but as equal -> are equal

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.cumcount()
)
else:
return dataframe[join_columns].groupby(join_columns).cumcount()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add newline

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


DataComPy's Panads on Spark implementation ``SparkCompare`` (new in ``v0.12.0``)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Panads -> Pandas

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


DataComPy's Panads on Spark implementation ``SparkCompare`` (new in ``v0.12.0``)
is very similar port of the Pandas version
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is very -> is a very

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done



Until then we will not be supporting Pandas 2 for the Pandas on Spark API implementaion.
For Fugue and the Native Pandas implementation Pandas 2 is support.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

support -> supported?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@fdosani fdosani requested a review from jdawang March 25, 2024 17:59
@fdosani
Copy link
Member Author

fdosani commented Mar 25, 2024

I have a few nitpicky comments if you'll indulge me and one question. We're purposefully removing LegacySparkCompare docs because we don't want people to start using it right?

Yeah exactly. I want to keep it incase someone needs or wants it, but I don't want to support it moving forward. The new implementation is better aligned to Pandas in terms of logic so we should favour that.

Copy link
Contributor

@jdawang jdawang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just going to add a note here for future, currently seeing a small difference in pandas vs spark report sample rows when there are rows only in one dataframe.

  1. There is an additional _merge_right column which is not in the original dataframes, which could cause a bit of confusion for users.
  2. We're displaying the column names as their aliases, which could also be a bit confusing. It would be best to translate them back to their original names.

Not a blocker for this, but we should open a follow-up issue to keep track of this.

import pandas as pd
import pyspark.pandas as ps
pdf1 = pd.DataFrame.from_dict({"id": [1,2,3,4,5], "a": [2,3,2,3, 2], "b": ["a", "b", "c", "d", ""]})
pdf2 = pd.DataFrame.from_dict({"id": [1,2,3,4,5, 6], "a": [2,3,2,3, 2, np.nan], "b": ["a", "b", "c", "d", "", pd.NA]})
df1 = ps.DataFrame(pdf1)
df2 = ps.DataFrame(pdf2)

Spark

DataComPy Comparison
--------------------

DataFrame Summary
-----------------

  DataFrame  Columns  Rows
0       df1        3     5
1       df2        3     6

Column Summary
--------------

Number of columns in common: 3
Number of columns in df1 but not in df2: 0
Number of columns in df2 but not in df1: 0

Row Summary
-----------

Matched on: id
Any duplicates on match values: No
Absolute Tolerance: 0
Relative Tolerance: 0
Number of rows in common: 5
Number of rows in df1 but not in df2: 0
Number of rows in df2 but not in df1: 1

Number of rows with some compared columns unequal: 0
Number of rows with all compared columns equal: 5

Column Comparison
-----------------

Number of columns compared with some values unequal: 0
Number of columns compared with all values equal: 3
Total number of values which compare unequal: 0

Columns with Unequal Values or Types
------------------------------------

  Column df1 dtype df2 dtype  # Unequal  Max Diff  # Null Diff
0      a     int64   float64          0       0.0            0

Sample Rows with Unequal Values
-------------------------------

Sample Rows Only in df2 (First 10 Columns)
------------------------------------------

   id_df2  a_df2 b_df2  _merge_right
5       6    NaN  None          True

Pandas

DataComPy Comparison
--------------------

DataFrame Summary
-----------------

  DataFrame  Columns  Rows
0       df1        3     5
1       df2        3     6

Column Summary
--------------

Number of columns in common: 3
Number of columns in df1 but not in df2: 0
Number of columns in df2 but not in df1: 0

Row Summary
-----------

Matched on: id
Any duplicates on match values: No
Absolute Tolerance: 0
Relative Tolerance: 0
Number of rows in common: 5
Number of rows in df1 but not in df2: 0
Number of rows in df2 but not in df1: 1

Number of rows with some compared columns unequal: 0
Number of rows with all compared columns equal: 5

Column Comparison
-----------------

Number of columns compared with some values unequal: 0
Number of columns compared with all values equal: 3
Total number of values which compare unequal: 0

Columns with Unequal Values or Types
------------------------------------

  Column df1 dtype df2 dtype  # Unequal  Max Diff  # Null Diff
0      a     int64   float64          0       0.0            0

Sample Rows with Unequal Values
-------------------------------

Sample Rows Only in df2 (First 10 Columns)
------------------------------------------

   id   a     b
0   6 NaN  <NA>

@fdosani
Copy link
Member Author

fdosani commented Mar 25, 2024

Just going to add a note here for future, currently seeing a small difference in pandas vs spark report sample rows when there are rows only in one dataframe.

  1. There is an additional _merge_right column which is not in the original dataframes, which could cause a bit of confusion for users.
  2. We're displaying the column names as their aliases, which could also be a bit confusing. It would be best to translate them back to their original names.

Not a blocker for this, but we should open a follow-up issue to keep track of this.

import pandas as pd
import pyspark.pandas as ps
pdf1 = pd.DataFrame.from_dict({"id": [1,2,3,4,5], "a": [2,3,2,3, 2], "b": ["a", "b", "c", "d", ""]})
pdf2 = pd.DataFrame.from_dict({"id": [1,2,3,4,5, 6], "a": [2,3,2,3, 2, np.nan], "b": ["a", "b", "c", "d", "", pd.NA]})
df1 = ps.DataFrame(pdf1)
df2 = ps.DataFrame(pdf2)

Spark

DataComPy Comparison
--------------------

DataFrame Summary
-----------------

  DataFrame  Columns  Rows
0       df1        3     5
1       df2        3     6

Column Summary
--------------

Number of columns in common: 3
Number of columns in df1 but not in df2: 0
Number of columns in df2 but not in df1: 0

Row Summary
-----------

Matched on: id
Any duplicates on match values: No
Absolute Tolerance: 0
Relative Tolerance: 0
Number of rows in common: 5
Number of rows in df1 but not in df2: 0
Number of rows in df2 but not in df1: 1

Number of rows with some compared columns unequal: 0
Number of rows with all compared columns equal: 5

Column Comparison
-----------------

Number of columns compared with some values unequal: 0
Number of columns compared with all values equal: 3
Total number of values which compare unequal: 0

Columns with Unequal Values or Types
------------------------------------

  Column df1 dtype df2 dtype  # Unequal  Max Diff  # Null Diff
0      a     int64   float64          0       0.0            0

Sample Rows with Unequal Values
-------------------------------

Sample Rows Only in df2 (First 10 Columns)
------------------------------------------

   id_df2  a_df2 b_df2  _merge_right
5       6    NaN  None          True

Pandas

DataComPy Comparison
--------------------

DataFrame Summary
-----------------

  DataFrame  Columns  Rows
0       df1        3     5
1       df2        3     6

Column Summary
--------------

Number of columns in common: 3
Number of columns in df1 but not in df2: 0
Number of columns in df2 but not in df1: 0

Row Summary
-----------

Matched on: id
Any duplicates on match values: No
Absolute Tolerance: 0
Relative Tolerance: 0
Number of rows in common: 5
Number of rows in df1 but not in df2: 0
Number of rows in df2 but not in df1: 1

Number of rows with some compared columns unequal: 0
Number of rows with all compared columns equal: 5

Column Comparison
-----------------

Number of columns compared with some values unequal: 0
Number of columns compared with all values equal: 3
Total number of values which compare unequal: 0

Columns with Unequal Values or Types
------------------------------------

  Column df1 dtype df2 dtype  # Unequal  Max Diff  # Null Diff
0      a     int64   float64          0       0.0            0

Sample Rows with Unequal Values
-------------------------------

Sample Rows Only in df2 (First 10 Columns)
------------------------------------------

   id   a     b
0   6 NaN  <NA>

Ahhh good catch!
Maybe I can take a quick stab at cleaning this up. Or would you rather merge and I can do a follow up PR. This one is already getting a bit big.

@jdawang
Copy link
Contributor

jdawang commented Mar 25, 2024

Yeah, up to you, i don't think this is big enough to be a blocker if you want to merge :)

@fdosani fdosani merged commit 6a1920d into develop Mar 25, 2024
29 checks passed
@fdosani fdosani deleted the pandas-on-spark-refactor branch March 25, 2024 18:56
@fdosani fdosani mentioned this pull request Apr 30, 2024
rhaffar pushed a commit to rhaffar/datacompy that referenced this pull request Sep 12, 2024
* refactor SparkCompare

* tweaking SparkCompare and adding back Legacy

* conditional import

* cleaning up tests and using pytest-spark for legacy

* adding docs

* caching and some typo fixes

* adding in doc and pandas 2 changes

* adding pandas to testing matrix

* drop 3.8

* drop 3.8

* refactoring ^

* rebase fix for capitalone#277

* fixing legacy uncode column names

* unicode fix for legacy

* unicode test for new spark logic

* typo fix

* changes from PR review
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed spark
Projects
None yet
4 participants